Using an OpenSearch Ingestion pipeline with Amazon Kinesis Data Streams - Amazon OpenSearch Service

Using an OpenSearch Ingestion pipeline with Amazon Kinesis Data Streams

You can use the Kinesis plugin to stream data from Amazon Kinesis Data Streams to Amazon OpenSearch Service domains and OpenSearch Serverless collections. The pipeline pulls records from Amazon Kinesis and sends them to OpenSearch, and automatically generates indexes based on the stream name and current date.

Connectivity to Amazon Kinesis Data Streams

You can use OpenSearch Ingestion pipelines to migrate data from Amazon Kinesis Data Streams with public configuration, which means that the domain DNS name can be publicly resolved. To do so, set up an OpenSearch Ingestion pipeline with Amazon Kinesis Data Streams as the source and OpenSearch Service or OpenSearch Serverless as the destination. This processes your streaming data from a self-managed source cluster to an AWS-managed destination domain or collection.

Prerequisites

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

  1. Create an Amazon Kinesis data stream acting as a source. The stream should contain the data you want to ingest into OpenSearch Service.

  2. Create an OpenSearch Service domain or OpenSearch Serverless collection where you want to migrate data to. For more information, see Creating OpenSearch Service domains and Creating collections.

  3. Set up authentication on your Amazon Kinesis data stream with AWS Secrets Manager. Enable secrets rotation by following the steps in Rotate AWS Secrets Manager secrets.

  4. Attach a resource-based policy to your domain or a data access policy to your collection. These access policies allow OpenSearch Ingestion to write data from your self-managed cluster to your domain or collection.

    The following sample domain access policy allows the pipeline role, which you create in the next step, to write data to a domain. Make sure that you update the resource with your own ARN.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::account-id:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:region:account-id:domain/domain-name" ] } ] }

    To create an IAM role with the correct permissions to access write data to the collection or domain, see Setting up roles and users in Amazon OpenSearch Ingestion.

Step 1: Configure the pipeline role

After you have your Amazon Kinesis Data Streams pipeline prerequisites set up, configure the pipeline role that you want to use in your pipeline configuration, and add permission to write to an OpenSearch Service domain or OpenSearch Serverless collection, as well as permission to read secrets from Secrets Manager.

The following permission is needed to write to an Amazon S3 bucket, domain, and collection:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:DescribeStreamConsumer", "kinesis:DescribeStreamSummary", "kinesis:GetRecords", "kinesis:GetShardIterator", "kinesis:ListShards", "kinesis:ListStreams", "kinesis:ListStreamConsumers", "kinesis:RegisterStreamConsumer", "kinesis:SubscribeToShard" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream-name" ] } ] }

If server-side encryption is enabled for streams, the following KMS policy will decrypt the stream records:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "allowDecryptionOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:region:account-id:key/key-id" } ] }

In order for a pipeline to write data to a domain, the domain must have a domain-level access policy that allows the sts_role_arn pipeline role to access it. The following sample domain access policy allows the pipeline role named pipeline-role , which you created in the previous step, to write data to the domain named ingestion-domain :

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::account-id:role/pipeline-role" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:region:account-id:domain/domain-name/*" } ] }

Step 2: Create the pipeline

You can then configure an OpenSearch Ingestion pipeline which specifying Amazon Kinesis as the source. The metadata attributes available are:

  • stream_name: name of the Kinesis data stream that the record is being ingested from.

  • partition_key: partition key of the Kinesis data stream record that is being ingested.

  • sequence_number: sequence number of the Kinesis data stream record that is being ingested.

  • sub_sequence_number: sub sequence number of the Kinesis data stream record that is being ingested.

You can specify multiple OpenSearch Service domains as destinations for your data. This capability enables conditional routing or replication of incoming data into multiple OpenSearch Service domains.

You can also migrate data from Amazon Kinesis to an OpenSearch Serverless VPC collection. There is a blueprint available on the OpenSearch Ingestion console for creating a pipeline. To create a pipeline, you can use the following AWS-KinesisDataStreamsPipeline blueprint.

version: "2" kinesis_data_streams_pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_name: "<stream name>" - stream_name: "<stream name>" aws: sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role" region: "us-east-1" sink: - opensearch: hosts: [ "https://search-mydomain.us-east-1.es.amazonaws.com" ] index: "index_${getMetadata(\"stream-name\")}" document_id: "${getMetadata(\"{partition_key}\")}" aws: sts_role_arn: "<<arn:aws:iam::123456789012:role/Example-Role>>" region: "<<us-east-1>>" s3: bucket: "amzn-s3-demo-bucket" region: "<<us-east-1>>" sts_role_arn: "arn:aws:iam::account-id:role/pipeline-role"

You can use a preconfigured blueprint to create this pipeline. For more information, see Using blueprints to create a pipeline. You can also review the opensource Opensearch documentation for additional confiugration options. To learn more, see configuration options.

Data consistency

OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When the pipeline reads stream records from Kinesis, it dynamically distributes the work of reading stream records based on the shards associated with the stream. Pipeline will automatically checkpoint streams when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection. This will avoid duplicate processing of stream records.

Note

If you want to create the index based on the stream name, you can define the index in the opensearch sink section as “index_${getMetadata(\"stream_name\")}".

(Optional) Configure recommended compute units (OCUs) for the Kinesis Data Streams pipeline

A minimum of 2 compute units (OCU) are recommended when creating a Kinesis source pipeline. This will allow for the Kinesis data stream records per shard processing to be distributed evenly among the compute units thereby ensuring a low-latency mechansim for stream records ingestion.

An OpenSearchKinesis data streams source pipeline can also be configured to ingest stream records from more than one stream. It is recommended to add an additional compute unit per new stream.

Note

If your pipeline has more compute units (OCU) than there are shards in the set of streams configured in the pipeline, some compute units could sit idle without processing any stream records per shard.