Getting started with streaming ingestion from Amazon Managed Streaming for Apache Kafka - Amazon Redshift

Getting started with streaming ingestion from Amazon Managed Streaming for Apache Kafka

The purpose of Amazon Redshift streaming ingestion is to simplify the process for directly ingesting stream data from a streaming service into Amazon Redshift or Amazon Redshift Serverless. This works with Amazon MSK and Amazon MSK Serverless, and with Kinesis. Amazon Redshift streaming ingestion removes the need to stage a Kinesis Data Streams stream or an Amazon MSK topic in Amazon S3 before ingesting the stream data into Redshift.

On a technical level, streaming ingestion, both from Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka, provides low-latency, high-speed ingestion of stream or topic data into an Amazon Redshift materialized view. Following setup, using materialized view refresh, you can take in large data volumes.

Set up Amazon Redshift streaming ingestion for Amazon MSK by performing the following steps:

  1. Create an external schema that maps to the streaming data source.

  2. Create a materialized view that references the external schema.

You must have an Amazon MSK source available, before configuring Amazon Redshift streaming ingestion. If you do not have a source, follow the instructions at Getting Started Using Amazon MSK.


Streaming ingestion and Amazon Redshift Serverless - The configuration steps in this topic apply both to provisioned Amazon Redshift clusters and to Amazon Redshift Serverless. For more information, see Streaming ingestion behavior and data types.

Setting up IAM permissions and performing streaming ingestion from Kafka

Assuming you have an Amazon MSK cluster available, the first step is to define a schema in Redshift with CREATE EXTERNAL SCHEMA and to reference the Kafka topic as the data source. Following that, to access data in the topic, define the STREAM in a materialized view. You can store records from your topic in the semi-structured SUPER format, or define a schema that results in data converted to Amazon Redshift data types. When you query the materialized view, the returned records are a point-in-time view of the topic.

  1. Create an IAM role with a trust policy that allows your Amazon Redshift cluster or Amazon Redshift Serverless to assume the role. For information about how to configure the trust policy for the IAM role, see Authorizing Amazon Redshift to access other AWS services on your behalf. After it's created, the role should have the following IAM policy, which provides permission for communication with the Amazon MSK cluster. The policy you need depends on the authentication method used on your cluster, if you use Amazon MSK. See Authentication and Authorization for Apache Kafka APIs for authentication methods available in Amazon MSK.

    An IAM policy for Amazon MSK using unauthenticated access:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }

    An IAM policy for Amazon MSK when using IAM authentication:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKIAMpolicy", "Effect": "Allow", "Action": [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic", "kafka-cluster:Connect" ], "Resource": [ "arn:aws:kafka:*:0123456789:cluster/MyTestCluster/*", "arn:aws:kafka:*:0123456789:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:*:0123456789:group/MyTestCluster/*" ] }, { "Sid": "MSKPolicy", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }
  2. Check your VPC and verify that your Amazon Redshift cluster or Amazon Redshift Serverless has a route to get to your Amazon MSK cluster. The inbound security group rules for your Amazon MSK cluster should allow your Amazon Redshift cluster's or your Amazon Redshift Serverless workgroup's security group. The ports you specify depend on the authentication method used for your cluster, when you use Amazon MSK. For more information, see Port information and Access from within AWS but outside the VPC.

    Note that client authentication with mTLS isn't supported for streaming ingestion. For more information, see Limitations.

    The following table shows complimentary configuration options to set for streaming ingestion from Amazon MSK:

    Amazon Redshift configuration Amazon MSK configuration Port to open between Redshift and Amazon MSK
    AUTHENTICATION NONE TLS transport disabled 9092
    AUTHENTICATION NONE TLS transport enabled 9094

    Amazon Redshift authentication is set in the CREATE EXTERNAL SCHEMA statement.

    In a case where the Amazon MSK cluster has Mutual Transport Layer Security (mTLS) authentication enabled, configuring Amazon Redshift to use AUTHENTICATION NONE directs it to use port 9094 for unauthenticated access. However, this will fail because the port is being used by mTLS authentication. Because of this, we recommend that you switch to AUTHENTICATION IAM when you use mTLS.

  3. Enable enhanced VPC routing on your Amazon Redshift cluster or Amazon Redshift Serverless workgroup. For more information, see Enabling enhanced VPC routing.


    In order to retrieve the Amazon MSK bootstrap brokers URL, Amazon Redshift makes a GetBootstrapBrokers API call, using permissions provided by the attached IAM role. Note that in order for this request to succeed when enhanced VPC routing is enabled, the subnet for your Amazon Redshift provisioned cluster or Amazon Redshift Serverless workgroup must have a NAT gateway or internet gateway. Your network ACLs and security-group outbound rules for the aforementioned subnet must also allow access to the Amazon MSK API service endpoints. For more information, see Amazon Managed Streaming for Apache Kafka endpoints and quotas.

  4. In Amazon Redshift, create an external schema to map to the Amazon MSK cluster.

    CREATE EXTERNAL SCHEMA MySchema FROM MSK IAM_ROLE { default | 'iam-role-arn' } AUTHENTICATION { none | iam } CLUSTER_ARN 'msk-cluster-arn';

    In the FROM clause, Amazon MSK denotes that the schema maps data from Managed Kafka Services.

    Streaming ingestion for Amazon MSK provides the following authentication types, when you create the external schema:

    • none – Specifies that there is no authentication step.

    • iam – Specifies IAM authentication. When you choose this, make sure that the IAM role has permissions for IAM authentication.

    Additional Amazon MSK authentication methods, such as TLS authentication or a username and password, aren't supported for streaming ingestion.

    CLUSTER_ARN specifies the Amazon MSK cluster that you’re streaming from.

  5. Create a materialized view to consume the data from the topic. Use a SQL command like this sample if you don't want error records to be skipped.


    The following example defines a materialized view with JSON source data. Note that the following view validates that the data is valid JSON and utf8. Kafka topic names are case sensitive and can contain both uppercase and lowercase letters. To ingest from topics with uppercase names, you can set the configuration enable_case_sensitive_identifier to true at the database level. For more information, see Names and identifiers and enable_case_sensitive_identifier.

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT kafka_partition, kafka_offset, kafka_timestamp_type, kafka_timestamp, kafka_key, JSON_PARSE(kafka_value) as kafka_data, kafka_headers, refresh_time FROM MySchema."mytopic" WHERE CAN_JSON_PARSE(kafka_value);

    To turn on auto refresh, use AUTO REFRESH YES. The default behavior is manual refresh.

    Metadata columns include the following:

    Metadata column Data type Description
    kafka_partition bigint Partition id of the record from the Kafka topic
    kafka_offset bigint Offset of the record in the Kafka topic for a given partition
    kafka_timestamp_type char(1)

    Type of timestamp used in the Kafka record:

    • C – Record creation time (CREATE_TIME) on the client side

    • L – Record append time (LOG_APPEND_TIME) on the Kafka server side

    • U – Record creation time is not available (NO_TIMESTAMP_TYPE)

    kafka_timestamp timestamp without time zone The timestamp value for the record
    kafka_key varbyte The key of the Kafka record
    kafka_value varbyte The record received from Kafka
    kafka_headers super The header of the record received from Kafka
    refresh_time timestamp without time zone The time the refresh started

    It's important to note if you have business logic in your materialized view definition that business-logic errors can cause a block in streaming ingestion in some cases. This might lead to you having to drop and re-create the materialized view. To avoid this, we recommend that you keep your business logic simple and run additional logic on the data after you ingest it.

  6. Refresh the view, which invokes Amazon Redshift to read from the topic and load data into the materialized view.

  7. Query data in the materialized view.

    select * from MyView;

    The materialized view is updated directly from the topic when REFRESH is run. You create a materialized view that maps to the Kafka topic data source. You can perform filtering and aggregations on the data as part of the materialized view definition. Your streaming ingestion materialized view (base materialized view) can reference only one Kafka topic, but you can create additional materialized views that join with the base materialized view and with other materialized views or tables.

For more information about limitations for streaming ingestion, see Streaming ingestion behavior and data types.