È possibile elaborare a livello di codice un flusso di attività utilizzando. AWS SDK Di seguito sono riportati esempi Java e Python completamente funzionanti dell'utilizzo dei record dei flussi di attività del database per l'abilitazione basata sull'istanza.
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.NoSuchAlgorithmException;
import java.security.NoSuchProviderException;
import java.security.Security;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.zip.GZIPInputStream;
import javax.crypto.Cipher;
import javax.crypto.NoSuchPaddingException;
import javax.crypto.spec.SecretKeySpec;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.encryptionsdk.AwsCrypto;
import com.amazonaws.encryptionsdk.CryptoInputStream;
import com.amazonaws.encryptionsdk.jce.JceMasterKey;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder;
import com.amazonaws.services.kinesis.model.Record;
import com.amazonaws.services.kms.AWSKMS;
import com.amazonaws.services.kms.AWSKMSClientBuilder;
import com.amazonaws.services.kms.model.DecryptRequest;
import com.amazonaws.services.kms.model.DecryptResult;
import com.amazonaws.util.Base64;
import com.amazonaws.util.IOUtils;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.annotations.SerializedName;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
public class DemoConsumer {
private static final String STREAM_NAME = "aws-rds-das-[instance-external-resource-id]"; // aws-rds-das-db-ABCD123456
private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking
private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]";
private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]";
private static final String RESOURCE_ID = "[external-resource-id]"; // db-ABCD123456
private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2...
private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS);
private static final AwsCrypto CRYPTO = new AwsCrypto();
private static final AWSKMS KMS = AWSKMSClientBuilder.standard()
class Activity {
String type;
String version;
String databaseActivityEvents;
String key;
class ActivityEvent {
@SerializedName("class") String _class;
String clientApplication;
String command;
String commandText;
String databaseName;
String dbProtocol;
String dbUserName;
String endTime;
String errorMessage;
String exitCode;
String logTime;
String netProtocol;
String objectName;
String objectType;
List<String> paramList;
String pid;
String remoteHost;
String remotePort;
String rowCount;
String serverHost;
String serverType;
String serverVersion;
String serviceName;
String sessionId;
String startTime;
String statementId;
String substatementId;
String transactionId;
String type;
class ActivityRecords {
String type;
String clusterId; // note that clusterId will contain an empty string on RDS Oracle and RDS SQL Server
String instanceId;
List<ActivityEvent> databaseActivityEventList;
static class RecordProcessorFactory implements IRecordProcessorFactory {
public IRecordProcessor createProcessor() {
return new RecordProcessor();
static class RecordProcessor implements IRecordProcessor {
private static final long BACKOFF_TIME_IN_MILLIS = 3000L;
private static final int PROCESSING_RETRIES_MAX = 10;
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private static final Gson GSON = new GsonBuilder().serializeNulls().create();
private static final Cipher CIPHER;
static {
Security.insertProviderAt(new BouncyCastleProvider(), 1);
try {
CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC");
} catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) {
throw new ExceptionInInitializerError(e);
private long nextCheckpointTimeInMillis;
public void initialize(String shardId) {
public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) {
for (final Record record : records) {
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
private void processSingleBlob(final ByteBuffer bytes) {
try {
// JSON $Activity
final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class);
// Base64.Decode
final byte[] decoded = Base64.decode(activity.databaseActivityEvents);
final byte[] decodedDataKey = Base64.decode(activity.key);
Map<String, String> context = new HashMap<>();
context.put("aws:rds:db-id", RESOURCE_ID);
// Decrypt
final DecryptRequest decryptRequest = new DecryptRequest()
final DecryptResult decryptResult = KMS.decrypt(decryptRequest);
final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext()));
// GZip Decompress
final byte[] decompressed = decompress(decrypted);
// JSON $ActivityRecords
final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class);
// Iterate throught $ActivityEvents
for (final ActivityEvent event : activityRecords.databaseActivityEventList) {
} catch (Exception e) {
// Handle error.
private static byte[] decompress(final byte[] src) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
return IOUtils.toByteArray(gzipInputStream);
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) {
try {
} catch (ShutdownException se) {
// Ignore checkpoint if the processor instance has been shutdown (fail over).
System.out.println("Caught shutdown exception, skipping checkpoint." + se);
} catch (ThrottlingException e) {
// Backoff and re-attempt checkpoint upon transient failures
if (i >= (PROCESSING_RETRIES_MAX - 1)) {
System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e);
} else {
System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e);
} catch (InvalidStateException e) {
// This indicates an issue with the DynamoDB table (check for table, provisioned IOPS).
System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e);
try {
} catch (InterruptedException e) {
System.out.println("Interrupted sleep" + e);
private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException {
// Create a JCE master key provider using the random key and an AES-GCM encryption algorithm
final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"),
"BC", "DataKey", "AES/GCM/NoPadding");
try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded));
final ByteArrayOutputStream out = new ByteArrayOutputStream()) {
IOUtils.copy(decryptingStream, out);
return out.toByteArray();
public static void main(String[] args) throws Exception {
final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();
final KinesisClientLibConfiguration kinesisClientLibConfiguration =
final Worker worker = new Builder()
.recordProcessorFactory(new RecordProcessorFactory())
System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId);
try {
} catch (Throwable t) {
System.err.println("Caught throwable while processing data.");
private static byte[] getByteArray(final ByteBuffer b) {
byte[] byteArray = new byte[b.remaining()];
return byteArray;