Getting started with streaming ingestion from Amazon Kinesis Data Streams
Setting up Amazon Redshift streaming ingestion involves creating an external schema that maps to the streaming data source and creating a materialized view that references the external schema. Amazon Redshift streaming ingestion supports Kinesis Data Streams as a source. As such, you must have a Kinesis Data Streams source available before configuring streaming ingestion. If you don't have a source, follow the instructions in the Kinesis documentation at Getting Started with Amazon Kinesis Data Streams or create one on the console using the instructions at Creating a Stream via the AWS Management Console.
Amazon Redshift streaming ingestion uses a materialized view, which is updated directly from the stream when REFRESH
is run. The materialized view
maps to the stream data source. You can perform filtering and aggregations on the stream data as part of the materialized-view definition. Your
streaming ingestion materialized view (the base materialized view) can reference only one stream, but you
can create additional materialized views that join with the base materialized view and with other materialized views or tables.
Note
Streaming ingestion and Amazon Redshift Serverless - The configuration steps in this topic apply both to provisioned Amazon Redshift clusters and to Amazon Redshift Serverless. For more information, see Streaming ingestion behavior and data types.
Assuming you have a Kinesis Data Streams stream available, the first step is to define a schema
in Amazon Redshift with CREATE EXTERNAL SCHEMA
and to reference a Kinesis Data Streams
resource. Following that, to access data in the stream, define the
STREAM
in a materialized view. You can store stream records in the
semi-structured SUPER
format, or define a schema that results in data
converted to Redshift data types. When you query the materialized view,
the returned records are a point-in-time view of the stream.
-
Create an IAM role with a trust policy that allows your Amazon Redshift cluster or Amazon Redshift Serverless workgroup to assume the role. For information about how to configure the trust policy for the IAM role, see Authorizing Amazon Redshift to access other AWS services on your behalf. After it is created, the role should have the following IAM policy, which provides permission for communication with the Amazon Kinesis data stream.
IAM policy for an unencrypted stream from Kinesis Data Streams
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:0123456789:stream/*" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }
IAM policy for an encrypted stream from Kinesis Data Streams
{ "Version": "2012-10-17", "Statement": [{ "Sid": "ReadStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStreamSummary", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream" ], "Resource": "arn:aws:kinesis:*:0123456789:stream/*" }, { "Sid": "DecryptStream", "Effect": "Allow", "Action": [ "kms:Decrypt" ], "Resource": "arn:aws:kms:us-east-1:0123456789:key/1234abcd-12ab-34cd-56ef-1234567890ab" }, { "Sid": "ListStream", "Effect": "Allow", "Action": [ "kinesis:ListStreams", "kinesis:ListShards" ], "Resource": "*" } ] }
Check your VPC and verify that your Amazon Redshift cluster or Amazon Redshift Serverless has a route to get to the Kinesis Data Streams endpoints over the internet using a NAT gateway or internet gateway. If you want traffic between Redshift and Kinesis Data Streams to remain within the AWS network, consider using a Kinesis Interface VPC Endpoint. For more information, see Using Amazon Kinesis Data Streams Kinesis Data Streams with Interface VPC Endpoints.
In Amazon Redshift, create an external schema to map the data from Kinesis to a schema.
CREATE EXTERNAL SCHEMA kds FROM KINESIS IAM_ROLE { default | 'iam-role-arn' };
Streaming ingestion for Kinesis Data Streams doesn't require an authentication type. It uses the IAM role defined in the
CREATE EXTERNAL SCHEMA
statement for making Kinesis Data Streams requests.Optional: Use the REGION keyword to specify the region where the Amazon Kinesis Data Streams or Amazon MSK stream resides.
CREATE EXTERNAL SCHEMA kds FROM KINESIS REGION 'us-west-2' IAM_ROLE { default | 'iam-role-arn' };
In this sample, the region specifies the location of the source stream. The IAM_ROLE is a sample.
-
Create a materialized view to consume the stream data. With a statement like the following, if a record can't be parsed, it causes an error. Use a command like this if you don't want error records to be skipped.
CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS SELECT * FROM kds.my_stream_name;
The following example defines a materialized view for source data in JSON format. The view validates that incoming data is properly formatted JSON. Kinesis stream names are case sensitive and can contain both uppercase and lowercase letters. To ingest from streams with uppercase names, you can set the configuration
enable_case_sensitive_identifier
totrue
at the database level. For more information, see Names and identifiers and enable_case_sensitive_identifier.CREATE MATERIALIZED VIEW my_view AUTO REFRESH YES AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, refresh_time, JSON_PARSE(kinesis_data) as kinesis_data FROM kds.my_stream_name WHERE CAN_JSON_PARSE(kinesis_data);
To turn on auto refresh, use
AUTO REFRESH YES
. The default behavior is manual refresh. Note when you use CAN_JSON_PARSE, it's possible that records that can't be parsed are skipped.Metadata columns include the following:
Metadata column Data type Description approximate_arrival_timestamp timestamp without time zone The approximate time that the record was inserted into the Kinesis stream partition_key varchar(256) The key used by Kinesis to assign the record to a shard shard_id char(20) The unique identifier of the shard within the stream from which the record was retrieved sequence_number varchar(128) The unique identifier of the record from the Kinesis shard refresh_time timestamp without time zone The time the refresh started kinesis_data varbyte The record from the Kinesis stream It's important to note if you have business logic in your materialized view definition that business-logic errors can cause streaming ingestion to be blocked in some cases. This might lead to you having to drop and re-create the materialized view. To avoid this, we recommend that you keep your logic as simple as possible and perform most of your business-logic checks on the data after it's ingested.
Refresh the view, which invokes Redshift to read from the stream and load data into the materialized view.
REFRESH MATERIALIZED VIEW my_view;
Query data in the materialized view.
select * from my_view;