Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams - Amazon DynamoDB

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams

In questa sezione viene riportata una spiegazione passo per passo di un'applicazione Java che utilizza Amazon Kinesis Client Library e l'adattatore Amazon DynamoDB Streams Kinesis. L'applicazione mostra un esempio di replica dei dati, in cui l'attività di scrittura da una tabella viene applicata a una seconda tabella e i contenuti di entrambe le tabelle rimangono sincronizzati. Per il codice sorgente, consulta Programma completo: Adattatore Kinesis di DynamoDB Streams.

Il programma effettua le seguenti operazioni:

  1. Crea due tabelle DynamoDB denominate KCL-Demo-src e KCL-Demo-dst. Su ognuna di queste tabelle è abilitato un flusso.

  2. Genera l'attività di aggiornamento nella tabella di origine aggiungendo, aggiornando ed eliminando gli elementi. Questo fa sì che i dati vengano scritti nel flusso della tabella.

  3. Legge i record dal flusso, li ricostruisce come richieste DynamoDB e applica le richieste alla tabella di destinazione.

  4. Esegue la scansione delle tabelle di origine e di destinazione per garantire che i contenuti siano identici.

  5. Esegue la pulizia eliminando le tabelle.

Queste fasi sono descritte nelle sezioni seguenti e l'applicazione completa viene mostrata alla fine della procedura guidata.

Fase 1: creazione di tabelle DynamoDB

Il primo passo consiste nel creare due tabelle DynamoDB, una di origine e una di destinazione. StreamViewType sul flusso della tabella di origine è NEW_IMAGE. Questo significa che ogni volta che un item viene modificato in questa tabella, l'immagine "successiva" dell'item viene scritta nel flusso. In questo modo, il flusso tiene traccia di tutte le attività di scrittura della tabella.

Il seguente esempio mostra il codice utilizzato per creare entrambe le tabelle.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

Fase 2: generazione dell'attività di aggiornamento nella tabella di origine

La fase successiva consiste nel generare le attività di scrittura sulla tabella di origine. Mentre questa attività è in corso, il flusso della tabella di origine viene aggiornato pressoché in tempo reale.

L'applicazione definisce una classe di supporto con metodi che chiamano PutItemUpdateItem, e DeleteItem API operazioni per la scrittura dei dati. Il seguente esempio di codice mostra come vengono utilizzati questi metodi.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

Fase 3: elaborazione del flusso

Ora il programma inizia l'elaborazione del flusso. Il DynamoDB Streams Kinesis Adapter funge da livello trasparente tra l'endpoint DynamoDB Streams KCL e l'endpoint DynamoDB Streams, in modo che il codice possa essere utilizzato appieno anziché dover effettuare chiamate DynamoDB Streams di basso livello. KCL Il programma esegue le attività di seguito elencate:

  • Definisce una classe di processori di record, con metodi conformi alla definizione dell'interfaccia:,StreamsRecordProcessor, e. KCL initialize processRecords shutdown Il metodo processRecords contiene la logica necessaria per la lettura dal flusso della tabella di origine e la scrittura nella tabella di destinazione.

  • Definisce una factory di classe per la classe di elaboratore di record (StreamsRecordProcessorFactory). Ciò è necessario per i programmi Java che utilizzanoKCL.

  • Crea un'istanza nuova KCLWorker, associata alla class factory.

  • Arresta il Worker quando l'elaborazione del record è completata.

Per ulteriori informazioni sulla definizione dell'KCLinterfaccia, consulta Developing consumer using the Kinesis Client Library nella Amazon Kinesis Data Streams Developer Guide.

Il seguente esempio di codice mostra il loop principale in StreamsRecordProcessor. L'istruzione case determina quale operazione eseguire, sulla base dell'item OperationType presente nel record del flusso.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

Fase 4: verifica che entrambe le tabelle abbiano contenuti identici

A questo punto, i contenuti delle tabelle di origine e destinazione sono sincronizzati. L'applicazione emette le richieste Scan su entrambe le tabelle per verificare che i loro contenuti siano effettivamente identici.

La DemoHelper classe contiene un ScanTable metodo che chiama il livello basso. Scan API L'esempio seguente mostra come viene utilizzato.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

Fase 5: rimozione

La demo è completata, quindi l'applicazione elimina le tabelle di origine e di destinazione. Vedere l'esempio di codice seguente. Anche dopo l'eliminazione delle tabelle, i flussi rimangono disponibili per altre 24 ore, dopo di che vengono automaticamente eliminati.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));