Using an OpenSearch Ingestion pipeline with Amazon DynamoDB
You can use an OpenSearch Ingestion pipeline with DynamoDB to stream DynamoDB table events (such as create, update, and delete) to Amazon OpenSearch Service domains and collections. The OpenSearch Ingestion pipeline incorporates change data capture (CDC) infrastructure to provide a high-scale, low-latency way to continuously stream data from a DynamoDB table.
There are two ways that you can use DynamoDB as a source to process data—with and without a full initial snapshot.
A full initial snapshot is a backup of a table that DynamoDB takes with the point-in-time recovery (PITR) feature. DynamoDB uploads this snapshot to Amazon S3. From there, an OpenSearch Ingestion pipeline sends it to one index in a domain, or partitions it to multiple indexes in a domain. To keep the data in DynamoDB and OpenSearch consistent, the pipeline syncs all of the create, update, and delete events in the DynamoDB table with the documents saved in the OpenSearch index or indexes.
When you use a full initial snapshot, your OpenSearch Ingestion pipeline first ingests the snapshot and then starts reading data from DynamoDB Streams. It eventually catches up and maintains near real-time data consistency between DynamoDB and OpenSearch. When you choose this option, you must enable both PITR and a DynamoDB stream on your table.
You can also use the OpenSearch Ingestion integration with DynamoDB to stream events without a snapshot. Choose this option if you already have a full snapshot from some other mechanism, or if you just want to stream current events from a DynamoDB table with DynamoDB Streams. When you choose this option, you only need to enable a DynamoDB stream on your table.
For more information about this integration, see DynamoDB zero-ETL integration with Amazon OpenSearch Service in the Amazon DynamoDB Developer Guide.
Topics
Prerequisites
To set up your pipeline, you must have a DynamoDB table with DynamoDB Streams enabled. Your stream
should use the NEW_IMAGE
stream view type. However, OpenSearch Ingestion
pipelines can also stream events with NEW_AND_OLD_IMAGES
if this stream
view type fits your use case.
If you're using snapshots, you must also enable point-in-time recovery on your table. For more information, see Creating a table, Enabling point-in-time recovery, and Enabling a stream in the Amazon DynamoDB Developer Guide.
Step 1: Configure the pipeline role
After you have your DynamoDB table set up, set up the pipeline role that you want to use in your pipeline configuration, and add the following DynamoDB permissions in the role:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "allowRunExportJob", "Effect": "Allow", "Action": [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime" ], "Resource": [ "arn:aws:dynamodb:
us-east-1
:{account-id}
:table/my-table
" ] }, { "Sid": "allowCheckExportjob", "Effect": "Allow", "Action": [ "dynamodb:DescribeExport" ], "Resource": [ "arn:aws:dynamodb:us-east-1
:{account-id}
:table/my-table
/export/*" ] }, { "Sid": "allowReadFromStream", "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator" ], "Resource": [ "arn:aws:dynamodb:us-east-1
:{account-id}
:table/my-table
/stream/*" ] }, { "Sid": "allowReadAndWriteToS3ForExport", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl" ], "Resource": [ "arn:aws:s3:::my-bucket
/{exportPath}
/*" ] } ] }
You can also use an AWS KMS customer managed key to encrypt the export data files. To decrypt the
exported objects, specify s3_sse_kms_key_id
for the key ID in the export
configuration of the pipeline with the following format:
arn:aws:kms:
.
The following policy includes the required permissions for using a customer managed key:us-west-2
:{account-id}
:key/my-key-id
{ "Sid": "allowUseOfCustomManagedKey", "Effect": "Allow", "Action": [ "kms:GenerateDataKey", "kms:Decrypt" ], "Resource":
arn:aws:kms:
}us-west-2
:{account-id}
:key/my-key-id
Step 2: Create the pipeline
You can then configure an OpenSearch Ingestion pipeline like the following, which specifies
DynamoDB as the source. This sample pipeline ingests data from table-a
with
the PITR snapshot, followed by events from DynamoDB Streams. A start position of LATEST
indicates that the pipeline should read the latest data from DynamoDB Streams.
version: "2" cdc-pipeline: source: dynamodb: tables: - table_arn: "arn:aws:dynamodb:
us-west-2
:{account-id}
:table/table-a
" export: s3_bucket: "my-bucket
" s3_prefix: "export/" stream: start_position: "LATEST" aws: region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}
:role/pipeline-role" sink: - opensearch: hosts: ["https://search-mydomain.us-east-1.es.amazonaws.com
"] index: "${getMetadata(\"table_name
\")}" index_type: custom normalize_index: true document_id: "${getMetadata(\"primary_key\")}" action: "${getMetadata(\"opensearch_action\")}" document_version: "${getMetadata(\"document_version\")}" document_version_type: "external"
You can use a preconfigured DynamoDB blueprint to create this pipeline. For more information, see Using blueprints to create a pipeline.
Data consistency
OpenSearch Ingestion supports end-to-end acknowledgement to ensure data durability. When a pipeline reads snapshots or streams, it dynamically creates partitions for parallel processing. The pipeline marks a partition as complete when it receives an acknowledgement after ingesting all records in the OpenSearch domain or collection.
If you want to ingest into an OpenSearch Serverless search collection, you can generate a document ID in the pipeline. If you want to ingest into an OpenSearch Serverless time series collection, note that the pipeline doesn't generate a document ID.
An OpenSearch Ingestion pipeline also maps incoming event actions into corresponding bulk indexing actions to help ingest documents. This keeps data consistent, so that every data change in DynamoDB is reconciled with the corresponding document changes in OpenSearch.
Mapping data types
OpenSearch Service dynamically maps data types in each incoming document to the corresponding data type in DynamoDB. The following table shows how OpenSearch Service automatically maps various data types.
Data type | OpenSearch | DynamoDB |
---|---|---|
Number |
OpenSearch automatically maps numeric data. If the number is a whole number, OpenSearch maps it as a long value. If the number is fractional, then OpenSearch maps it as a float value. OpenSearch dynamically maps various attributes based on the first sent document. If you have a mix of data types for the same attribute in DynamoDB, such as both a whole number and a fractional number, mapping might fail. For example, if your first document has an attribute that is a whole number, and a later document has that same attribute as a fractional number, OpenSearch fails to ingest the second document. In these cases, you should provide an explicit mapping template, such as the following:
If you need double precision, use string-type field mapping. There is no equivalent numeric type that supports 38 digits of precision in OpenSearch. |
DynamoDB supports numbers. |
Number set | OpenSearch automatically maps a number set into an array of either long values or float values. As with the scalar numbers, this depends on whether the first number ingested is a whole number or a fractional number. You can provide mappings for number sets the same way that you map scalar strings. |
DynamoDB supports types that represent sets of numbers. |
String |
OpenSearch automatically maps string values as text. In some situations, such as enumerated values, you can map to the keyword type. The following example shows how to map a DynamoDB attribute named
|
DynamoDB supports strings. |
String set |
OpenSearch automatically maps a string set into an array of strings. You can provide mappings for string sets the same way that you map scalar strings. |
DynamoDB supports types that represent sets of strings. |
Binary |
OpenSearch automatically maps binary data as text. You can provide a mapping to write these as binary fields in OpenSearch. The following example shows how to map a DynamoDB attribute named
|
DynamoDB supports binary type attributes. |
Binary set |
OpenSearch automatically maps a binary set into an array of binary data as text. You can provide mappings for number sets the same way that you map scalar binary. |
DynamoDB supports types that represent sets of binary values. |
Boolean |
OpenSearch maps a DynamoDB Boolean type into an OpenSearch Boolean type. |
DynamoDB supports Boolean type attributes. |
Null |
OpenSearch can ingest documents with the DynamoDB null type. It saves the value as a null value in the document. There is no mapping for this type, and this field is not indexed or searchable. If the same attribute name is used for a null type and then later changes to different type such as string, OpenSearch creates a dynamic mapping for the first non-null value. Subsequent values can still be DynamoDB null values. |
DynamoDB supports null type attributes. |
Map |
OpenSearch maps DynamoDB map attributes to nested fields. The same mappings apply within a nested field. The following example maps a string in a nested field to a keyword type in OpenSearch:
|
DynamoDB supports map type attributes. |
List |
OpenSearch provides different results for DynamoDB lists, depending on what is in the list. When a list contains all of the same type of scalar types (for example, a list of all strings), then OpenSearch ingests the list as an array of that type. This works for string, number, Boolean, and null types. The restrictions for each of these types are the same as restrictions for a scalar of that type. You can also provide mappings for lists of maps by using the same mapping as you would use for a map. You can't provide a list of mixed types. |
DynamoDB supports list type attributes. |
Set |
OpenSearch provides different results for DynamoDB sets depending on what is in the set. When a set contains all of the same type of scalar types (for example, a set of all strings), then OpenSearch ingests the set as an array of that type. This works for string, number, Boolean, and null types. The restrictions for each of these types are the same as the restrictions for a scalar of that type. You can also provide mappings for sets of maps by using the same mapping as you would use for a map. You can't provide a set of mixed types. |
DynamoDB supports types that represent sets. |
We recommend that you configure the dead-letter queue (DLQ) in your OpenSearch Ingestion pipeline. If you've configured the queue, OpenSearch Service sends all failed documents that can't be ingested due to dynamic mapping failures to the queue.
In case automatic mappings fail, you can use template_type
and
template_content
in your pipeline configuration to define explicit
mapping rules. Alternatively, you can create mapping templates directly in your search
domain or collection before you start the pipeline.
Limitations
Consider the following limitations when you set up an OpenSearch Ingestion pipeline for DynamoDB:
-
The OpenSearch Ingestion integration with DynamoDB currently doesn't support cross-Region ingestion. Your DynamoDB table and OpenSearch Ingestion pipeline must be in the same AWS Region.
-
Your DynamoDB table and OpenSearch Ingestion pipeline must be in the same AWS account.
-
An OpenSearch Ingestion pipeline supports only one DynamoDB table as its source.
-
DynamoDB Streams only stores data in a log for up to 24 hours. If ingestion from an initial snapshot of a large table takes 24 hours or more, there will be some initial data loss. To mitigate this data loss, estimate the size of the table and configure appropriate compute units of OpenSearch Ingestion pipelines.
Recommended CloudWatch Alarms for DynamoDB
The following CloudWatch metrics are recommended for monitioring the performance of your ingestion pipeline. These metrics can help you identify the amount of data processed from exports, the amount of events processed from streams, the errors in processing exports and stream events, and the number of documents written to the destination. You can setup CloudWatch alarms to perform an action when one of these metrics exceed a specified value for a specified amount of time.
Metric | Description |
---|---|
dynamodb-pipeline.BlockingBuffer.bufferUsage.value |
Indicates how much of the buffer is being utilized. |
dynamodb-pipeline.dynamodb.activeExportS3ObjectConsumers.value
|
Shows the total number of OCUs that are actively processing Amazon S3 objects for the export. |
dynamodb-pipeline.dynamodb.bytesProcessed.count
|
Count of bytes processed from DynamoDB source. |
dynamodb-pipeline.dynamodb.changeEventsProcessed.count
|
Number of change events processed from DynamoDB stream. |
dynamodb-pipeline.dynamodb.changeEventsProcessingErrors.count
|
Number of errors from change events processed from DynamoDB. |
dynamodb-pipeline.dynamodb.exportJobFailure.count
|
Number of export job submission attempts that have failed. |
dynamodb-pipeline.dynamodb.exportJobSuccess.count
|
Number of export jobs that have been submitted successfully. |
dynamodb-pipeline.dynamodb.exportRecordsProcessed.count
|
Total number of records processed from the export. |
dynamodb-pipeline.dynamodb.exportRecordsTotal.count
|
Total number of records exported from DynamoDB, essential for tracking data export volumes. |
dynamodb-pipeline.dynamodb.exportS3ObjectsProcessed.count
|
Total number of export data files that have been processed successfully from Amazon S3. |
dynamodb-pipeline.opensearch.bulkBadRequestErrors.count
|
Count of errors during bulk requests due to malformed request. |
dynamodb-pipeline.opensearch.bulkRequestLatency.avg
|
Average latency for bulk write requests made to OpenSearch. |
dynamodb-pipeline.opensearch.bulkRequestNotFoundErrors.count
|
Number of bulk requests that failed because the target data could not be found. |
dynamodb-pipeline.opensearch.bulkRequestNumberOfRetries.count
|
Number of retries by OpenSearch Ingestion pipelines to write OpenSearch cluster. |
dynamodb-pipeline.opensearch.bulkRequestSizeBytes.sum
|
Total size in bytes of all bulk requests made to OpenSearch. |
dynamodb-pipeline.opensearch.documentErrors.count
|
Number of errors when sending documents to OpenSearch. The documents causing the errors witll be sent to DLQ. |
dynamodb-pipeline.opensearch.documentsSuccess.count
|
Number of documents successfully written to an OpenSearch cluster or collection. |
dynamodb-pipeline.opensearch.documentsSuccessFirstAttempt.count
|
Number of documents successfully indexed in OpenSearch on the first attempt. |
|
Count of errors due to version conflicts in documents during processing. |
|
Average latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writint to the destination. |
dynamodb-pipeline.opensearch.PipelineLatency.max
|
Maximum latency of OpenSearch Ingestion pipeline to process the data by reading from the source to writing the destination. |
dynamodb-pipeline.opensearch.recordsIn.count
|
Count of records successfully ingested into OpenSearch. This metric is essential for tracking the volume of data being processed and stored. |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsFailed.count
|
Number of records that failed to write to DLQ. |
dynamodb-pipeline.opensearch.s3.dlqS3RecordsSuccess.count
|
Number of records that are written to DLQ. |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.count
|
Count of latency measurements for requests to the Amazon S3 dead-letter queue. |
dynamodb-pipeline.opensearch.s3.dlqS3RequestLatency.sum
|
Total latency for all requests to the Amazon S3 dead-letter queue |
dynamodb-pipeline.opensearch.s3.dlqS3RequestSizeBytes.sum
|
Total size in bytes of all requests made to the Amazon S3 dead-letter queue. |
dynamodb-pipeline.recordsProcessed.count
|
Total number of records processed in the pipeline, a key metric for overal throughput. |
dynamodb.changeEventsProcessed.count
|
No records are being gathered from DynamoDB streams. This could be due to no activitiy on the table, an export being in progress, or an issue accessing the DynamoDB streams. |
|
The attempt to trigger an export to S3 failed. |
|
Count of bulk request errors in OpenSearch due to invalid input, crucial for monitoring data quality and operational issues. |
opensearch.EndToEndLatency.avg
|
The end to end latnecy is higher than desired for reading from DynamoDB streams. This could be due to an underscaled OpenSearch cluster or a maximum pipeline OCU capacity that is too low for the WCU throughput on the DynamoDB table. This end to end latency will be high after an export and should decrease over time as it catches up to the latest DynamoDB streams. |