Getting started with streaming ingestion from Amazon Managed Streaming for Apache Kafka (Amazon MSK) - Amazon Redshift

Getting started with streaming ingestion from Amazon Managed Streaming for Apache Kafka (Amazon MSK)

This topic describes how to consume streaming data from Amazon MSK using a materialized view.

The purpose of Amazon Redshift streaming ingestion is to simplify the process for directly ingesting stream data from a streaming service into Amazon Redshift or Amazon Redshift Serverless. This works with Amazon MSK Provisioned and Amazon MSK Serverless, and with Kinesis Data Streams. Amazon Redshift streaming ingestion removes the need to stage a Kinesis Data Streams stream or an Amazon MSK topic in Amazon S3 before ingesting the stream data into Redshift.

On a technical level, streaming ingestion, both from Amazon Kinesis Data Streams and Amazon MSK, provides low-latency, high-speed ingestion of stream or topic data into an Amazon Redshift materialized view. Following setup, using materialized view refresh, you can take in large data volumes.

Set up Amazon Redshift streaming ingestion for Amazon MSK by performing the following steps:

  1. Create an external schema that maps to the streaming data source.

  2. Create a materialized view that references the external schema.

You must have an Amazon MSK source available, before configuring Amazon Redshift streaming ingestion. If you do not have a source, follow the instructions at Getting Started Using Amazon MSK.

Note

Streaming ingestion and Amazon Redshift Serverless - The configuration steps in this topic apply both to provisioned Amazon Redshift clusters and to Amazon Redshift Serverless. For more information, see Streaming ingestion behavior and data types.

Setting up IAM permissions and performing streaming ingestion from Kafka

Assuming you have an Amazon MSK cluster available, the first step is to define a schema in Redshift with CREATE EXTERNAL SCHEMA and to reference the Amazon MSK cluster as the data source. Following that, to access data in the topic, define the STREAM in a materialized view. You can store records from your topic using the default Amazon Redshift VARBYTE datatype, or define a schema that converts the data to the semi-structured SUPER format. When you query the materialized view, the returned records are a point-in-time view of the topic.

  1. If you use AUTHENTICATION NONE to connect to MSK, no IAM role is required. However, if you use AUTHENTICATION IAM or MTLS to authenticate with your Amazon MSK cluster, your Amazon Redshift cluster or Amazon Redshift Serverless namespace must have an attached IAM role with appropriate permissions. Create an IAM role with a trust policy that allows your Amazon Redshift cluster or Amazon Redshift Serverless namespace to assume the role. After you create the role, add one of the following permissions to support IAM or MTLS. For mTLS authentication, the certificates that Amazon Redshift uses can be stored in AWS Certificate Manager or AWS Secrets Manager, so you must choose the policy that matches where the certificate is stored. Attach the role to your Amazon Redshift provisioned cluster or Redshift Serverless namespace. For information about how to configure the trust policy for the IAM role, see Authorizing Amazon Redshift to access other AWS services on your behalf.

    AUTHENTICATION IAM:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKIAMpolicy", "Effect": "Allow", "Action": [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic", "kafka-cluster:Connect" ], "Resource": [ "arn:aws:kafka:*:0123456789:cluster/MyTestCluster/*", "arn:aws:kafka:*:0123456789:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:*:0123456789:group/MyTestCluster/*" ] } ] }

    AUTHENTICATION MTLS: using a certificate stored in AWS Certificate Manager

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKmTLSACMpolicy", "Effect": "Allow", "Action": [ "acm:ExportCertificate" ], "Resource": [ "arn:aws:acm:us-east-1:444455556666:certificate/certificate_ID" ] } ] }

    AUTHENTICATION MTLS: using a certificate stored in AWS Secrets Manager

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKmTLSSecretsManagerpolicy", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": [ "arn:aws:secretsmanager:us-east-1:444455556666:secret:secret_ID" ] } ] }
  2. Check your VPC and verify that your Amazon Redshift cluster or Amazon Redshift Serverless has a route to get to your Amazon MSK cluster. The inbound security group rules for your Amazon MSK cluster should allow your Amazon Redshift cluster's or your Amazon Redshift Serverless workgroup's security group. The ports you specify depend on the authentication methods configured on your Amazon MSK cluster. For more information, see Port information and Access from within AWS but outside the VPC.

    The following table shows complimentary configuration options to set for streaming ingestion from Amazon MSK:

    Amazon Redshift configuration Amazon MSK configuration Port to open between Redshift and Amazon MSK
    AUTHENTICATION NONE TLS transport disabled 9092
    AUTHENTICATION NONE TLS transport enabled 9094
    AUTHENTICATION IAM IAM 9098/9198
    AUTHENTICATION MTLS TLS transport enabled 9094

    Amazon Redshift authentication is set in the CREATE EXTERNAL SCHEMA statement.

    Note

    In a case where the Amazon MSK cluster has Mutual Transport Layer Security (mTLS) authentication enabled, configuring Amazon Redshift to use AUTHENTICATION NONE directs it to use port 9094 for unauthenticated access. However, this will fail because the port is being used by mTLS authentication. Because of this, we recommend that you switch to AUTHENTICATION mtls when you use mTLS.

  3. Enable enhanced VPC routing on your Amazon Redshift cluster or Amazon Redshift Serverless workgroup. For more information, see Enabling enhanced VPC routing.

  4. In Amazon Redshift, create an external schema to map to the Amazon MSK cluster. The syntax is the following:

    CREATE EXTERNAL SCHEMA MySchema FROM MSK [ IAM_ROLE [ default | 'iam-role-arn' ] ] AUTHENTICATION [ none | iam | mtls ] [AUTHENTICATION_ARN 'acm-certificate-arn' | SECRET_ARN 'ssm-secret-arn' ];

    In the FROM clause, MSK denotes that the schema maps data from Managed Kafka Services.

    AUTHENTICATION denotes the authentication type for streaming ingestion with Amazon MSK. There are three types available:

    • none – Specifies that there is no authentication required. This corresponds to Unauthenticated access on MSK.

    • iam – Specifies IAM authentication. When you choose this, make sure that the IAM role has permissions for IAM authentication. For more information about setting up the required IAM policies, see Setting up IAM permissions and performing streaming ingestion from Kafka.

    • mtls – Specifies that mutual transport layer security provides secure communication by facilitating authentication between a client and server. In this case, the client is Redshift and the server is Amazon MSK. For more information about configuring streaming ingestion with mTLS, see Authentication with mTLS for Redshift streaming ingestion from Amazon MSK.

    Note that Amazon MSK authentication with a username and password isn't supported for streaming ingestion.

    AUTHENTICATION_ARN specifies the ARN of the ACM mutual transport layer security (mTLS) certificate you use to establish an encrypted connection.

    SECRET_ARN specifies the arn of the AWS Secrets Manager secret containing the certificate to be used by Amazon Redshift for mTLS.

    The following samples show how to set the broker URI for the Amazon MSK cluster when you create the external schema:

    CREATE EXTERNAL SCHEMA my_schema FROM MSK IAM_ROLE 'arn:aws:iam::012345678901:role/my_role' AUTHENTICATION IAM URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098,b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9098'

    Using no authentication:

    CREATE EXTERNAL SCHEMA my_schema FROM MSK AUTHENTICATION none URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092,b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092'

    Using mTLS

    CREATE EXTERNAL SCHEMA my_schema FROM MSK IAM_ROLE 'arn:aws:iam::012345678901:role/my_role' AUTHENTICATION MTLS URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9094,b- 2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9094' AUTHENTICATION_ARN 'acm-certificate-arn' | [ SECRET_ARN 'ssm-secret-arn' ];

    For more information on creating an external schema, see CREATE EXTERNAL SCHEMA.

  5. Create a materialized view to consume the data from the topic. Use a SQL command such as the following sample.

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT * FROM MySchema."mytopic";

    Kafka topic names are case sensitive and can contain both uppercase and lowercase letters. To ingest from topics with uppercase names, you can set the configuration enable_case_sensitive_identifier to true at the session or database level. For more information, see Names and identifiers and enable_case_sensitive_identifier.

    To turn on auto refresh, use AUTO REFRESH YES. The default behavior is manual refresh.

    Metadata columns include the following:

    Metadata column Data type Description
    kafka_partition bigint Partition id of the record from the Kafka topic
    kafka_offset bigint Offset of the record in the Kafka topic for a given partition
    kafka_timestamp_type char(1)

    Type of timestamp used in the Kafka record:

    • C – Record creation time (CREATE_TIME) on the client side

    • L – Record append time (LOG_APPEND_TIME) on the Kafka server side

    • U – Record creation time is not available (NO_TIMESTAMP_TYPE)

    kafka_timestamp timestamp without time zone The timestamp value for the record
    kafka_key varbyte The key of the Kafka record
    kafka_value varbyte The record received from Kafka
    kafka_headers super The header of the record received from Kafka
    refresh_time timestamp without time zone The time the refresh started

    It's important to note if you have business logic in your materialized view definition that results in business logic errors, this can result in ingestion failures in streaming ingestion in some cases. This might lead to you having to drop and re-create the materialized view. To avoid this, we recommend that you keep your business logic simple and run additional logic on the data after you ingest it.

  6. Refresh the view, which invokes Amazon Redshift to read from the topic and load data into the materialized view.

    REFRESH MATERIALIZED VIEW MyView;
  7. Query data in the materialized view.

    select * from MyView;

    The materialized view is updated directly from the topic when REFRESH is run. You create a materialized view that maps to the Kafka topic data source. You can perform filtering and aggregations on the data as part of the materialized view definition. Your streaming ingestion materialized view (base materialized view) can reference only one Kafka topic, but you can create additional materialized views that join with the base materialized view and with other materialized views or tables.

For more information about limitations for streaming ingestion, see Streaming ingestion behavior and data types.