Using an OpenSearch Ingestion pipeline with Amazon Managed Streaming for Apache Kafka - Amazon OpenSearch Service

Using an OpenSearch Ingestion pipeline with Amazon Managed Streaming for Apache Kafka

You can use the Kafka plugin to ingest data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) into your OpenSearch Ingestion pipeline. With Amazon MSK, you can build and run applications that use Apache Kafka to process streaming data. OpenSearch Ingestion uses AWS PrivateLink to connect to Amazon MSK. You can ingest data from both Amazon MSK and Amazon MSK Serverless clusters. The only difference between the two processes is the prerequisite steps you must take before you set up your pipeline.

Amazon MSK prerequisites

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

  1. Create an Amazon MSK provisioned cluster by following the steps in Creating a cluster in the Amazon Managed Streaming for Apache Kafka Developer Guide. For Broker type, choose any option except for t3 types, as these aren't supported by OpenSearch Ingestion.

  2. After the cluster has an Active status, follow the steps in Turn on multi-VPC connectivity.

  3. Follow the steps in Attach a cluster policy to the MSK cluster to attach one of the following policies, depending on if your cluster and pipeline are in the same AWS account. This policy allows OpenSearch Ingestion to create a AWS PrivateLink connection to your Amazon MSK cluster and read data from Kafka topics. Make sure that you update the resource with your own ARN.

    The following policies applies when your cluster and pipeline are in the same AWS account:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    If your Amazon MSK cluster is in a different AWS account than your pipeline, attach the following policy instead. Note that cross-account access is only possible with provisioned Amazon MSK clusters and not Amazon MSK Serverless clusters. The ARN for the AWS principal should be the ARN for the same pipeline role that you provide to your pipleine YAML configuration:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:{msk-account-id}:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:{msk-account-id}:group/cluster-name/*" ] } ] }
  4. Create a Kafka topic by following the steps in Create a topic. Make sure that BootstrapServerString is one of the private endpoint (single-VPC) bootstrap URLs. The value for --replication-factor should be 2 or 3, based on the number of zones your Amazon MSK cluster has. The value for --partitions should be at least 10.

  5. Produce and consume data by following the steps in Produce and consume data. Again, make sure that BootstrapServerString is one of your private endpoint (single-VPC) bootstrap URLs.

Amazon MSK Serverless prerequisites

Before you create your OpenSearch Ingestion pipeline, perform the following steps:

  1. Create an Amazon MSK Serverless cluster by following the steps in Create an MSK Serverless cluster in the Amazon Managed Streaming for Apache Kafka Developer Guide.

  2. After the cluster has an Active status, follow the steps in Attach a cluster policy to the MSK cluster to attach the following policy. Make sure that you update the resource with your own ARN.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    This policy allows OpenSearch Ingestion to create a AWS PrivateLink connection to your Amazon MSK Serverless cluster and read data from Kafka topics. This policy applies when your cluster and pipeline are in the same AWS account, which must be true as Amazon MSK Serverless doesn't support cross-account access.

  3. Create a Kafka topic by following the steps in Create a topic. Make sure that BootstrapServerString is one of your Simple Authentication and Security Layer (SASL) IAM bootstrap URLs. The value for --replication-factor should be 2 or 3, based on the number of zones your Amazon MSK Serverless cluster has. The value for --partitions should be at least 10.

  4. Produce and consume data by following the steps in Produce and consume data. Again, make sure that BootstrapServerString is one of your Simple Authentication and Security Layer (SASL) IAM bootstrap URLs.

Step 1: Configure the pipeline role

After you have your Amazon MSK provisoned or serverless cluster set up, add the following Kafka permissions in the pipeline role that you want to use in your pipeline configuration:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:group/cluster-name/*" ] } ] }

Step 2: Create the pipeline

You can then configure an OpenSearch Ingestion pipeline like the following, which specifies Kafka as the source:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "group-id" aws: msk: arn: "arn:aws:kafka:{region}:{account-id}:cluster/cluster-name/cluster-id" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index_name" aws_sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" aws_region: "us-east-1" aws_sigv4: true

You can use a preconfigured Amazon MSK blueprint to create this pipeline. For more information, see Using blueprints to create a pipeline.

Step 3: (Optional) Use the AWS Glue Schema Registry

When you use OpenSearch Ingestion with Amazon MSK, you can use the AVRO data format for schemas hosted in the AWS Glue Schema Registry. With the AWS Glue Schema Registry, you can centrally discover, control, and evolve data stream schemas.

To use this option, enable the schema type in your pipeline configuration:

schema: type: "aws_glue"

You must also provide AWS Glue with read access permissions in your pipeline role. You can use the AWS managed policy called AWSGlueSchemaRegistryReadonlyAccess. Additionally, your registry must be in the same AWS account and Region as your OpenSearch Ingestion pipeline.

Step 4: (Optional) Configure recommended compute units (OCUs) for the Amazon MSK pipeline

Each compute unit has one consumer per topic. Brokers balance partitions among these consumers for a given topic. However, when the number of partitions is greater than the number of consumers, Amazon MSK hosts multiple partitions on every consumer. OpenSearch Ingestion has built-in auto scaling to scale up or down based on CPU usage or number of pending records in the pipeline.

For optimal performance, distribute your partitions across many compute units for parallel processing. If topics have a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), we recommend that you configure a pipeline with 1–96 OCUs. This is because it will automatically scale as needed. If a topic has a low number of partitions (for example, less than 96), keep the maximum compute unit the same as the number of partitions.

When a pipeline has more than one topic, choose the topic with the highest number of partitions as a reference to configure maximum computes units. By adding another pipeline with a new set of OCUs to the same topic and consumer group, you can scale the throughput almost linearly.