Getting started with CDC streams
Important
This feature is provided as an AWS Preview and is subject to change. For more
information, see section 2, Betas and Previews, in the AWS Service Terms
Before general availability, we will add new operation types ("op": "u" for
updates) to your stream payload. To ensure your application handles these changes without
modification, treat any unrecognized op value as an upsert by applying the
after payload. See Understanding CDC records for details.
This guide walks you through every step required to start streaming committed row-level changes from an Aurora DSQL cluster to an Amazon Kinesis data stream. By the end of this guide, you've created a working CDC pipeline and a Python script that reads and prints change records.
Prerequisites
Before you begin, confirm the following:
-
You've created an Aurora DSQL cluster in
ACTIVEstatus. If your cluster is idle, connect to it with any PostgreSQL-compatible client to wake it up before you create a CDC stream.CreateStreamreturns a validation error if the cluster isn't inACTIVEstatus. -
Aurora DSQL requires all CDC resources—the cluster, Amazon Kinesis data stream, IAM service role, and calling principal—to be in the same AWS account.
-
Your Amazon Kinesis data stream is in the same AWS Region as your Aurora DSQL cluster.
-
You've installed and configured the AWS CLI with credentials that have permission to create IAM roles and Amazon Kinesis data streams.
Step 1: Create an Amazon Kinesis data stream
Create a Kinesis data stream in the same AWS account and Region as your Aurora DSQL cluster. CDC records are larger than the corresponding Aurora DSQL row because the JSON format includes column names, metadata, and encoding overhead.
Sizing the Kinesis data stream
Aurora DSQL CDC delivers the full row on every change. An update that touches a single column produces a record that contains every column in the row. Delete records are the exception—they include only the primary key columns.
Estimate average record size
Measure the average on-disk row size to understand the volume that CDC will produce and to anticipate oversized records. The following query returns the average tuple size in bytes for a table:
SELECT avg(pg_column_size(t.*)) FROMyour_tablet;
The CDC record envelope adds column names, metadata, and encoding overhead on top of the row size. For the exact record format, see Record payload. For how Aurora DSQL handles records that exceed the Kinesis record size limit, see Handling oversized records. For the full set of Kinesis service limits, see Amazon Kinesis Data Streams quotas and limits in the Amazon Kinesis Data Streams Developer Guide.
Important
When you create the Kinesis data stream, set the following:
-
MaxRecordSizeInKiBto10240(10 MiB). The default Kinesis maximum of 1 MiB isn't always large enough for Aurora DSQL CDC records. Any record that exceeds the configured Kinesis record size causes the CDC stream to become impaired withKINESIS_OVERSIZE_RECORD. Aurora DSQL splits oversized records into fragments that can approach 10 MiB each, so the Kinesis data stream needs to accept records of that size. For details, see Handling oversized records. -
StreamModetoON_DEMAND. On-demand mode scales shard capacity automatically and protects you from under-provisioning during unexpected spikes. Kinesis can still returnWriteProvisionedThroughputExceededduring sharp seconds-scale bursts as capacity scales up. Plan for brief throttling events.
Create CloudWatch alarms on IncomingBytes and
WriteProvisionedThroughputExceeded in the AWS/Kinesis namespace.
Kinesis throttling slows CDC delivery and increases replication lag. For Aurora DSQL-side
metrics and alarm guidance, see Monitoring best practices.
The following example uses the AWS CLI. If your AWS CLI version doesn't support the
--max-record-size-in-ki-b parameter, use an AWS SDK to call the Kinesis
CreateStream
operation.
aws kinesis create-stream \ --stream-namemy-cdc-stream\ --stream-mode-details StreamMode=ON_DEMAND \ --max-record-size-in-ki-b 10240 \ --regionregion
Wait for the stream to become active:
aws kinesis describe-stream-summary \ --stream-namemy-cdc-stream\ --regionregion\ --query 'StreamDescriptionSummary.StreamStatus'
The command returns "ACTIVE" when the stream is ready.
Record the stream ARN from the output. You need it in the following steps. The ARN
has the format
arn:aws:kinesis:.region:account-id:stream/my-cdc-stream
Step 2: Create an IAM role for Aurora DSQL
Aurora DSQL assumes an IAM role to write CDC records to your Kinesis data stream. In this step, you create the role with a trust policy and attach a permissions policy. For a full explanation of each policy element, see Configuring IAM.
Create the trust policy file
Save the following JSON as trust-policy.json. Replace
your-account-id, region, and
cluster-id with your values.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "DSQLAccess", "Effect": "Allow", "Principal": { "Service": "dsql.amazonaws.com" }, "Action": "sts:AssumeRole", "Condition": { "StringEquals": { "aws:SourceAccount": "your-account-id" }, "ArnLike": { "aws:SourceArn": "arn:aws:dsql:region:your-account-id:cluster/cluster-id/stream/*" } } } ] }
Create the role
Run the following command to create the IAM role:
aws iam create-role \ --role-namedsql-cdc-role\ --assume-role-policy-document file://trust-policy.json
Create the permissions policy file
Save the following JSON as permissions-policy.json. Replace the
placeholder values with your Kinesis data stream ARN. The KMSAccess
statement is only required if your Kinesis data stream uses an AWS KMS customer managed
key, but you can include it preemptively so that adding a customer managed key later
doesn't break your CDC stream. For a full explanation of each condition, see
Service role permissions policy.
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KinesisAccess", "Effect": "Allow", "Action": [ "kinesis:PutRecord", "kinesis:PutRecords", "kinesis:DescribeStreamSummary", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream" }, { "Sid": "KMSAccess", "Effect": "Allow", "Action": [ "kms:GenerateDataKey" ], "Resource": "arn:aws:kms:*:*:key/*", "Condition": { "StringEquals": { "kms:ViaService": "kinesis.region.amazonaws.com", "kms:EncryptionContext:aws:kinesis:arn": "arn:aws:kinesis:region:your-account-id:stream/my-cdc-stream", "aws:ResourceAccount": "${aws:PrincipalAccount}" } } } ] }
Attach the permissions policy
Run the following command:
aws iam put-role-policy \ --role-namedsql-cdc-role\ --policy-name dsql-cdc-kinesis-access \ --policy-document file://permissions-policy.json
Record the role ARN from the create-role output. The ARN has the format
arn:aws:iam::.your-account-id:role/dsql-cdc-role
Step 3: Create the CDC stream
Use the AWS CLI to create a CDC stream that connects your Aurora DSQL cluster to the Kinesis data stream. Replace the placeholder values with the Kinesis stream ARN from Step 1, the IAM role ARN from Step 2, and your cluster identifier.
aws dsql create-stream \ --cluster-identifiercluster-id\ --target-definition '{"kinesis":{"streamArn":"kinesis-stream-arn","roleArn":"role-arn"}}' \ --ordering UNORDERED \ --format JSON \ --tags '{"Name":"my-cdc-stream"}' \ --regionregion
The response includes a stream identifier and a status of CREATING.
Stream creation typically takes one to three minutes.
Wait for the stream to become active
Poll the stream status until it reaches ACTIVE:
aws dsql get-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion\ --query 'status'
You can also use the StreamActive waiter in the AWS SDKs to poll
automatically.
After the stream reaches ACTIVE, Aurora DSQL begins delivering committed
row-level changes to your Kinesis data stream.
Note
Each Aurora DSQL cluster has a maximum number of CDC streams. If you reach this limit,
CreateStream returns a ServiceQuotaExceededException. For the
default limit, see
Quotas and limits.
Step 4: Verify that records are flowing
Insert a row into a table on your Aurora DSQL cluster. For example:
CREATE TABLE IF NOT EXISTS test_cdc ( id INT PRIMARY KEY, message TEXT ); INSERT INTO test_cdc VALUES (1, 'hello cdc');
Read from the Kinesis data stream to verify that the CDC record arrived:
SHARD_ITERATOR=$(aws kinesis get-shard-iterator \ --stream-namemy-cdc-stream\ --shard-id shardId-000000000000 \ --shard-iterator-type TRIM_HORIZON \ --regionregion\ --query 'ShardIterator' --output text) aws kinesis get-records \ --shard-iterator "$SHARD_ITERATOR" \ --regionregion
Each record's Data field contains a JSON payload. When you use the
AWS CLI, the payload is Base64-encoded in the response. When you use the boto3
SDK, the SDK decodes it automatically. The decoded JSON looks like the following:
{ "type": "full", "op": "c", "before": null, "after": {"id": 1, "message": "hello cdc"}, "source": { "version": "1.0", "ts_ms": 1705318200000, "ts_ns": 1705318200000000000, "txId": "ffthunp5stx6ffs2vyfqoatmfu", "schema": "public", "table": "test_cdc", "db": "postgres", "cluster": "cluster-id" }, "ts_ms": 1705318200125, "ts_ns": 1705318200125483291 }
For a complete description of each field, see Understanding CDC records.
Step 5: Consume records with a Python script
The following Python script reads CDC records from a Kinesis data stream and prints each
change event. The script uses the boto3 Amazon Kinesis client to iterate over
shards and decode each record. Because Aurora DSQL CDC uses at-least-once delivery, the script
might print the same record more than one time.
""" Read CDC records from an Amazon Kinesis data stream. Usage: pip install boto3 python consume_cdc.py --stream-name my-cdc-stream --region us-east-1 """ from __future__ import annotations import argparse import json import boto3 def consume_cdc(stream_name: str, region: str) -> None: kinesis = boto3.client("kinesis", region_name=region) # List all shards (paginate if the stream has many shards) shard_ids: list[str] = [] paginator = kinesis.get_paginator("list_shards") for page in paginator.paginate(StreamName=stream_name): shard_ids.extend(s["ShardId"] for s in page["Shards"]) print(f"Reading from {stream_name} ({len(shard_ids)} shard(s))") for shard_id in shard_ids: iterator_response = kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = iterator_response["ShardIterator"] while shard_iterator: records_response = kinesis.get_records( ShardIterator=shard_iterator, Limit=100 ) shard_iterator = records_response.get("NextShardIterator") for record in records_response["Records"]: # boto3 decodes Base64 automatically; record["Data"] is bytes. payload = json.loads(record["Data"]) # A record's "type" field identifies its structure. # "full": inlined record with before/after values. # "chunked": main record that references fragments for a split image. # "fragment": one piece of a chunked image; reassemble in production code. # For details, see cdc-record-format.html#cdc-oversized-records. record_type = payload.get("type", "full") if record_type == "fragment": print(f"[FRAGMENT] chunk_id={payload['chunk_id']} index={payload['index']}") continue source = payload["source"] op = payload["op"] ts_ns = source["ts_ns"] tx_id = source["txId"] table = f"{source['schema']}.{source['table']}" # Aurora DSQL currently emits "c" for both inserts and updates. A subsequent # release will emit "u" for updates, and "c" for inserts. Design your # consumer to handle all three values; this map stays correct across the # transition. op_labels = {"c": "INSERT/UPDATE", "u": "UPDATE", "d": "DELETE"} print( f"[{op_labels.get(op, op)}] {table} " f"txId={tx_id} ts_ns={ts_ns} type={record_type}" ) if payload.get("after"): print(f" after: {json.dumps(payload['after'])}") if payload.get("before"): print(f" before: {json.dumps(payload['before'])}") if record_type == "chunked": print(f" chunked: {json.dumps(payload['chunked'])}") if not records_response["Records"]: break # No more records in this shard if __name__ == "__main__": parser = argparse.ArgumentParser( description="Consume DSQL CDC records from Kinesis" ) parser.add_argument("--stream-name", required=True, help="Kinesis stream name") parser.add_argument("--region", required=True, help="AWS Region") args = parser.parse_args() consume_cdc(args.stream_name, args.region)
Run the script:
pip install boto3 python consume_cdc.py \ --stream-namemy-cdc-stream\ --regionregion
The script prints each change event as it arrives. You see output similar to the following:
Reading from my-cdc-stream (4 shard(s)) [INSERT/UPDATE] public.test_cdc txId=ffthunp5stx6ffs2vyfqoatmfu ts_ns=1705318200000000000 type=full after: {"id": 1, "message": "hello cdc"}
Adding last-writer-wins deduplication
Because Aurora DSQL CDC uses at-least-once delivery, production apps should
deduplicate and order records. The following code example shows a high-water-mark approach: for
each primary key, it tracks the highest source.ts_ns seen so far and discards
any record with an equal or earlier timestamp. Set PK_COLUMNS to the primary
key column names of the table you're processing. For strategies that handle multiple
tables or deletes, see
Consumer strategies.
# Set PK_COLUMNS to the primary key column(s) of your table. PK_COLUMNS = ["id"] # Maps each primary key value to the highest ts_ns seen for that key. high_water: dict[tuple, int] = {} def process_record(payload: dict) -> bool: """Return True if the record is new, False if it's a duplicate or stale. Skip fragment records; reassemble them into a full image before calling this. """ if payload.get("type") == "fragment": return False # Fragments are reassembled upstream, not deduplicated here. source = payload["source"] ts_ns = source["ts_ns"] op = payload["op"] # For inserts/updates the row is in "after"; for deletes it's in "before". row = payload.get("after") or payload.get("before") or {} pk = tuple(row.get(col) for col in PK_COLUMNS) prev_ts = high_water.get(pk, -1) if ts_ns <= prev_ts: return False # Duplicate or out-of-order record high_water[pk] = ts_ns return True
Managing CDC streams
Listing streams
To list all CDC streams for a cluster, use the ListStreams
operation:
aws dsql list-streams \ --cluster-identifiercluster-id\ --regionregion
Deleting a stream
To delete a CDC stream, run the following command:
aws dsql delete-stream \ --cluster-identifiercluster-id\ --stream-identifierstream-id\ --regionregion
You can use the StreamNotExists waiter to poll GetStream
until a ResourceNotFoundException is returned, indicating that Aurora DSQL has
fully deleted the stream.