Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
In questa sezione viene riportata una spiegazione passo per passo di un'applicazione Java che utilizza Amazon Kinesis Client Library e l'adattatore Amazon DynamoDB Streams Kinesis. L'applicazione mostra un esempio di replica dei dati, in cui l'attività di scrittura da una tabella viene applicata a una seconda tabella e i contenuti di entrambe le tabelle rimangono sincronizzati. Per il codice sorgente, consulta Programma completo: Adattatore Kinesis di DynamoDB Streams.
Il programma effettua le seguenti operazioni:
-
Crea due tabelle DynamoDB denominate
KCL-Demo-src
eKCL-Demo-dst
. Su ognuna di queste tabelle è abilitato un flusso. -
Genera l'attività di aggiornamento nella tabella di origine aggiungendo, aggiornando ed eliminando gli elementi. Questo fa sì che i dati vengano scritti nel flusso della tabella.
-
Legge i record dal flusso, li ricostruisce come richieste DynamoDB e applica le richieste alla tabella di destinazione.
-
Esegue la scansione delle tabelle di origine e di destinazione per garantire che i contenuti siano identici.
-
Esegue la pulizia eliminando le tabelle.
Queste fasi sono descritte nelle sezioni seguenti e l'applicazione completa viene mostrata alla fine della procedura guidata.
Argomenti
Fase 1: creazione di tabelle DynamoDB
Il primo passo consiste nel creare due tabelle DynamoDB, una di origine e una di destinazione. StreamViewType
sul flusso della tabella di origine è NEW_IMAGE
. Questo significa che ogni volta che un item viene modificato in questa tabella, l'immagine "successiva" dell'item viene scritta nel flusso. In questo modo, il flusso tiene traccia di tutte le attività di scrittura della tabella.
Il seguente esempio mostra il codice utilizzato per creare entrambe le tabelle.
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));
java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
// key
ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
.withWriteCapacityUnits(2L);
StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
.withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
.withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
Fase 2: generazione dell'attività di aggiornamento nella tabella di origine
La fase successiva consiste nel generare le attività di scrittura sulla tabella di origine. Mentre questa attività è in corso, il flusso della tabella di origine viene aggiornato pressoché in tempo reale.
L'applicazione definisce una classe helper con metodi che chiamano le operazioni API PutItem
, UpdateItem
e DeleteItem
per scrivere i dati. Il seguente esempio di codice mostra come vengono utilizzati questi metodi.
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
Fase 3: elaborazione del flusso
Ora il programma inizia l'elaborazione del flusso. L'adattatore Kinesis di DynamoDB Streams agisce come un livello trasparente tra KCL e l'endpoint DynamoDB Streams in modo che il codice possa utilizzare appieno KCL piuttosto che effettuare chiamate a DynamoDB Streams di basso livello. Il programma esegue le attività di seguito elencate:
-
Definisce una classe di elaboratore di record,
StreamsRecordProcessor
, con metodi conformi alla definizione dell'interfaccia KCL:initialize
,processRecords
eshutdown
. Il metodoprocessRecords
contiene la logica necessaria per la lettura dal flusso della tabella di origine e la scrittura nella tabella di destinazione. -
Definisce una factory di classe per la classe di elaboratore di record (
StreamsRecordProcessorFactory
). Ciò è richiesto per i programmi Java che utilizzano KCL. -
Crea un'istanza di un nuovo
Worker
KCL che è associato alla factory di classe. -
Arresta il
Worker
quando l'elaborazione del record è completata.
Per ulteriori informazioni sulla definizione dell'interfaccia KCL, consulta Sviluppo di consumatori utilizzando la Kinesis Client Library nella Guida per gli sviluppatori di Amazon Kinesis Data Streams.
Il seguente esempio di codice mostra il loop principale in StreamsRecordProcessor
. L'istruzione case
determina quale operazione eseguire, sulla base dell'item OperationType
presente nel record del flusso.
for (Record record : records) {
String data = new String(record.getData().array(), Charset.forName("UTF-8"));
System.out.println(data);
if (record instanceof RecordAdapter) {
com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record)
.getInternalObject();
switch (streamRecord.getEventName()) {
case "INSERT":
case "MODIFY":
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getNewImage());
break;
case "REMOVE":
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
streamRecord.getDynamodb().getKeys().get("Id").getN());
}
}
checkpointCounter += 1;
if (checkpointCounter % 10 == 0) {
try {
checkpointer.checkpoint();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Fase 4: verifica che entrambe le tabelle abbiano contenuti identici
A questo punto, i contenuti delle tabelle di origine e destinazione sono sincronizzati. L'applicazione emette le richieste Scan
su entrambe le tabelle per verificare che i loro contenuti siano effettivamente identici.
La classe DemoHelper
contiene un metodo ScanTable
che chiama l'API Scan
di basso livello. L'esempio seguente mostra come viene utilizzato.
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
.equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
System.out.println("Scan result is equal.");
}
else {
System.out.println("Tables are different!");
}
Fase 5: rimozione
La demo è completata, quindi l'applicazione elimina le tabelle di origine e di destinazione. Vedere l'esempio di codice seguente. Anche dopo l'eliminazione delle tabelle, i flussi rimangono disponibili per altre 24 ore, dopo di che vengono automaticamente eliminati.
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));