Seleziona le tue preferenze relative ai cookie

Utilizziamo cookie essenziali e strumenti simili necessari per fornire il nostro sito e i nostri servizi. Utilizziamo i cookie prestazionali per raccogliere statistiche anonime in modo da poter capire come i clienti utilizzano il nostro sito e apportare miglioramenti. I cookie essenziali non possono essere disattivati, ma puoi fare clic su \"Personalizza\" o \"Rifiuta\" per rifiutare i cookie prestazionali.

Se sei d'accordo, AWS e le terze parti approvate utilizzeranno i cookie anche per fornire utili funzionalità del sito, ricordare le tue preferenze e visualizzare contenuti pertinenti, inclusa la pubblicità pertinente. Per continuare senza accettare questi cookie, fai clic su \"Continua\" o \"Rifiuta\". Per effettuare scelte più dettagliate o saperne di più, fai clic su \"Personalizza\".

Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams

Modalità Focus
Spiegazione passo per passo: Adattatore Kinesis DynamoDB Streams - Amazon DynamoDB

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

In questa sezione viene riportata una spiegazione passo per passo di un'applicazione Java che utilizza Amazon Kinesis Client Library e l'adattatore Amazon DynamoDB Streams Kinesis. L'applicazione mostra un esempio di replica dei dati, in cui l'attività di scrittura da una tabella viene applicata a una seconda tabella e i contenuti di entrambe le tabelle rimangono sincronizzati. Per il codice sorgente, consulta Programma completo: Adattatore Kinesis di DynamoDB Streams.

Il programma effettua le seguenti operazioni:

  1. Crea due tabelle DynamoDB denominate KCL-Demo-src e KCL-Demo-dst. Su ognuna di queste tabelle è abilitato un flusso.

  2. Genera l'attività di aggiornamento nella tabella di origine aggiungendo, aggiornando ed eliminando gli elementi. Questo fa sì che i dati vengano scritti nel flusso della tabella.

  3. Legge i record dal flusso, li ricostruisce come richieste DynamoDB e applica le richieste alla tabella di destinazione.

  4. Esegue la scansione delle tabelle di origine e di destinazione per garantire che i contenuti siano identici.

  5. Esegue la pulizia eliminando le tabelle.

Queste fasi sono descritte nelle sezioni seguenti e l'applicazione completa viene mostrata alla fine della procedura guidata.

Fase 1: creazione di tabelle DynamoDB

Il primo passo consiste nel creare due tabelle DynamoDB, una di origine e una di destinazione. StreamViewType sul flusso della tabella di origine è NEW_IMAGE. Questo significa che ogni volta che un item viene modificato in questa tabella, l'immagine "successiva" dell'item viene scritta nel flusso. In questo modo, il flusso tiene traccia di tutte le attività di scrittura della tabella.

Il seguente esempio mostra il codice utilizzato per creare entrambe le tabelle.

java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N")); java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition // key ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L) .withWriteCapacityUnits(2L); StreamSpecification streamSpecification = new StreamSpecification(); streamSpecification.setStreamEnabled(true); streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE); CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName) .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema) .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);

Fase 2: generazione dell'attività di aggiornamento nella tabella di origine

La fase successiva consiste nel generare le attività di scrittura sulla tabella di origine. Mentre questa attività è in corso, il flusso della tabella di origine viene aggiornato pressoché in tempo reale.

L'applicazione definisce una classe helper con metodi che chiamano le operazioni API PutItem, UpdateItem e DeleteItem per scrivere i dati. Il seguente esempio di codice mostra come vengono utilizzati questi metodi.

StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101"); StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3"); StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4"); StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");

Fase 3: elaborazione del flusso

Ora il programma inizia l'elaborazione del flusso. L'adattatore Kinesis di DynamoDB Streams agisce come un livello trasparente tra KCL e l'endpoint DynamoDB Streams in modo che il codice possa utilizzare appieno KCL piuttosto che effettuare chiamate a DynamoDB Streams di basso livello. Il programma esegue le attività di seguito elencate:

  • Definisce una classe di elaboratore di record, StreamsRecordProcessor, con metodi conformi alla definizione dell'interfaccia KCL: initialize, processRecords e shutdown. Il metodo processRecords contiene la logica necessaria per la lettura dal flusso della tabella di origine e la scrittura nella tabella di destinazione.

  • Definisce una factory di classe per la classe di elaboratore di record (StreamsRecordProcessorFactory). Ciò è richiesto per i programmi Java che utilizzano KCL.

  • Crea un'istanza di un nuovo Worker KCL che è associato alla factory di classe.

  • Arresta il Worker quando l'elaborazione del record è completata.

Per ulteriori informazioni sulla definizione dell'interfaccia KCL, consulta Sviluppo di consumatori utilizzando la Kinesis Client Library nella Guida per gli sviluppatori di Amazon Kinesis Data Streams.

Il seguente esempio di codice mostra il loop principale in StreamsRecordProcessor. L'istruzione case determina quale operazione eseguire, sulla base dell'item OperationType presente nel record del flusso.

for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record) .getInternalObject(); switch (streamRecord.getEventName()) { case "INSERT": case "MODIFY": StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getNewImage()); break; case "REMOVE": StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, streamRecord.getDynamodb().getKeys().get("Id").getN()); } } checkpointCounter += 1; if (checkpointCounter % 10 == 0) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }

Fase 4: verifica che entrambe le tabelle abbiano contenuti identici

A questo punto, i contenuti delle tabelle di origine e destinazione sono sincronizzati. L'applicazione emette le richieste Scan su entrambe le tabelle per verificare che i loro contenuti siano effettivamente identici.

La classe DemoHelper contiene un metodo ScanTable che chiama l'API Scan di basso livello. L'esempio seguente mostra come viene utilizzato.

if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems() .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) { System.out.println("Scan result is equal."); } else { System.out.println("Tables are different!"); }

Fase 5: rimozione

La demo è completata, quindi l'applicazione elimina le tabelle di origine e di destinazione. Vedere l'esempio di codice seguente. Anche dopo l'eliminazione delle tabelle, i flussi rimangono disponibili per altre 24 ore, dopo di che vengono automaticamente eliminati.

dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable)); dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
PrivacyCondizioni del sitoPreferenze cookie
© 2025, Amazon Web Services, Inc. o società affiliate. Tutti i diritti riservati.