

# Usar o adaptador do DynamoDB Streams Kinesis Adapter para processar registros de fluxos
<a name="Streams.KCLAdapter"></a>

Usar o Amazon Kinesis Adapter é a forma recomendada para consumir fluxos do Amazon DynamoDB. A API do DynamoDB Streams é intencionalmente semelhante à do Kinesis Data Streams. Em ambos os serviços, os fluxos de dados são compostos de fragmentos, os quais são contêineres de registros de stream. Ambas as APIs de serviços contêm as operações `ListStreams`, `DescribeStream`, `GetShards` e `GetShardIterator`. (Embora essas ações do DynamoDB Streams sejam semelhantes às suas equivalentes no Kinesis Data Streams, elas não são 100% idênticas.)

Como um usuário do DynamoDB Streams, você pode utilizar os padrões de design encontrados no KCL para processar fragmentos e registros de fluxos do DynamoDB Streams. Para isso, você pode usar o DynamoDB Streams Kinesis Adapter. O Kinesis Adapter implementa a interface do Kinesis Data Streams para que a KCL possa ser usada para consumir e processar registros do DynamoDB Streams. Para obter instruções sobre como configurar e instalar o DynamoDB Streams Kinesis Adapter, consulte o [repositório do GitHub](https://github.com/awslabs/dynamodb-streams-kinesis-adapter).

Você pode escrever aplicações para Kinesis Data Streams usando a Kinesis Client Library (KCL). A KCL simplifica a codificação fornecendo abstrações úteis acima da API de baixo nível do Kinesis Data Streams. Para obter mais informações sobre a KCL, consulte [Desenvolver consumidores usando a biblioteca de clientes do Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams Developer Guide*.

O DynamoDB recomenda usar a KCL versão 3.x com o AWS SDK para Java v2.x. Durante o período de transição, a versão atual do DynamoDB Streams Kinesis Adapter versão 1.x com o AWS SDK para AWS SDK para Java v1.x continuará recebendo suporte total ao longo do ciclo de vida, de acordo com o estabelecido em alinhamento com a [política de manutenção de AWS SDKs e ferramentas](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html).

**nota**  
As versões 1.x e 2.x da Amazon Kinesis Client Library (KCL) estão desatualizadas. A KCL 1.x chegará ao fim do suporte em 30 de janeiro de 2026. É altamente recomendável que você migre suas aplicações de KCL que usam a versão 1.x para a versão mais recente da KCL antes de 30 de janeiro de 2026. Para encontrar a versão mais recente da KCL, consulte a [página da Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) no GitHub. Para ter mais informações sobre as versões mais recentes da KCL, consulte [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html).. Para obter mais informações sobre como migrar da KCL 1.x para a KCL 3.x, consulte Migração da KCL 1.x para a KCL 3.x .

O diagrama a seguir mostra como essas bibliotecas interagem entre si.

![\[Interação entre o DynamoDB Streams, o Kinesis Data Streams e o KCL para processar registros do DynamoDB Streams.\]](http://docs.aws.amazon.com/pt_br/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


Com o DynamoDB Streams Kinesis Adapter implementado, você pode começar a desenvolver a interface da KCL com as chamadas de API perfeitamente direcionadas no endpoint do DynamoDB Streams.

Quando a aplicação é iniciada, ela chama a KCL para instanciar um operador. Você deve fornecer ao operador informações de configuração da aplicação, como o descritor do fluxo e as credenciais da AWS, além do nome de uma classe de processador de registro que você fornecer. Como ele executa o código no processador de registro, o operador executa as seguintes tarefas:
+ Conecta-se ao stream
+ Enumera os fragmentos no fluxo
+ Verifica e enumera fragmentos filhos de um fragmento pai fechado dentro do fluxo
+ Coordena as associações do estilhaço com outros operadores (se houver)
+ Cria uma instância de um processador de registro para cada fragmento que gerencia
+ Extrai registros do fluxo
+ Escala a taxa de chamadas de API GetRecords quanto há alto throughput (se o modo de recuperação estiver configurado).
+ Envia os registros ao processador de registros correspondente
+ Registros processados pelos pontos de verificação
+ Equilibra as associações de estilhaço-operador quando a contagem de instância de operadores muda
+ Equilibra as associações de fragmento-operador quando os fragmentos se dividem

O adaptador da KCL permite o modo de recuperação, um recurso de ajuste automático da taxa de chamadas para lidar com aumentos temporários de throughput. Quando o atraso no processamento de fluxos excede um limite configurável (padrão: um minuto), o modo de recuperação escala a frequência de chamadas de API GetRecords de acordo com um valor configurável (padrão: três vezes) para recuperar os registros mais depressa e depois volta ao normal quando o atraso diminui. Isso é fundamental durante períodos de alto throughput em que a atividade de gravação do DynamoDB pode sobrecarregar os consumidores usando taxas de sondagem padrão. O modo de recuperação pode ser habilitado por meio do parâmetro de configuração `catchupEnabled` (padrão: false).

**nota**  
Para obter uma descrição dos conceitos da KCL aqui listados, consulte [Desenvolver consumidores usando a biblioteca clientes do Kinesis](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*.  
Para obter mais informações sobre como usar fluxos com o AWS Lambda, consulte [DynamoDB Streams e acionadores do AWS Lambda](Streams.Lambda.md)

# Migrar da KCL 1.x para a KCL 3.x
<a name="streams-migrating-kcl"></a>

## Visão geral
<a name="migrating-kcl-overview"></a>

Este guia oferece instruções para migrar uma aplicação de consumidor da KCL 1.x para a KCL 3.x. Devido a diferenças de arquitetura entre a KCL 1.x e a KCL 3.x, a migração requer a atualização de vários componentes para garantir compatibilidade.

A KCL 1.x usa classes e interfaces diferentes em comparação com o KCL 3.x. Você deve primeiro migrar o processador de registros, a fábrica do processador de registros e as classes de operador para o formato compatível com a KCL 3.x e seguir as etapas de migração da KCL 1.x para a KCL 3.x.

## Etapas da migração
<a name="migration-steps"></a>

**Topics**
+ [Etapa 1: migrar o processador de registros](#step1-record-processor)
+ [Etapa 2: migrar a fábrica do processador de registros](#step2-record-processor-factory)
+ [Etapa 3: migrar o operador](#step3-worker-migration)
+ [Etapa 4: visão geral e recomendações de configuração da KCL 3.x](#step4-configuration-migration)
+ [Etapa 5: migrar da KCL 2.x para a KCL 3.x](#step5-kcl2-to-kcl3)

### Etapa 1: migrar o processador de registros
<a name="step1-record-processor"></a>

Este exemplo mostra um processador de registros implementado para o DynamoDB Streams Kinesis Adapter da KCL 1.x:

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**Como migrar a classe RecordProcessor**

1. Altere as interfaces de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` e `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` para `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor` da seguinte forma:

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. Atualize as instruções de importação para os métodos `initialize` e `processRecords`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. Substitua o método `shutdownRequested` pelos novos métodos a seguir: `leaseLost`, `shardEnded`, e `shutdownRequested`.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

Esta é a versão atualizada da classe de processador de registros:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**nota**  
O DynamoDB Streams Kinesis Adapter agora usa o modelo de registro SDKv2. No SDKv2, objetos `AttributeValue` complexos (`BS`, `NS`, `M`, `L` e `SS`) nunca retornam null. Use os métodos `hasBs()`, `hasNs()`, `hasM()`, `hasL()` e `hasSs()` para verificar se esses valores existem.

### Etapa 2: migrar a fábrica do processador de registros
<a name="step2-record-processor-factory"></a>

A fábrica do processador de registros é responsável por criar processadores de registro quando uma concessão é realizada. Veja o seguinte exemplo de fábrica da KCL 1.x:

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**Migrar para `RecordProcessorFactory`**
+ Altere a interface implementada de `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` para `software.amazon.kinesis.processor.ShardRecordProcessorFactory` da seguinte forma:

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

Veja o seguinte exemplo de fábrica de processador de registros em 3.0:

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### Etapa 3: migrar o operador
<a name="step3-worker-migration"></a>

Na versão 3.0 da KCL, uma nova classe, chamada **Scheduler**, substitui a classe **Worker**. Veja o seguinte exemplo de operador da KCL 1.x:

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**Para migrar o operador**

1. Altere a instrução `import` para a classe `Worker` para as instruções de importação para as classes `Scheduler` e `ConfigsBuilder`.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. Importe `StreamTracker` e altere a importação de `StreamsWorkerFactory` para `StreamsSchedulerFactory`.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. Escolha a posição por meio da qual iniciar a aplicação. Ela pode ser `TRIM_HORIZON` ou `LATEST`.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. Crie uma instância de `StreamTracker`.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. Crie o objeto `AmazonDynamoDBStreamsAdapterClient`.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. Crie o objeto `ConfigsBuilder`.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. Crie o `Scheduler` usando `ConfigsBuilder` como mostrado no seguinte exemplo:

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**Importante**  
A configuração `CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` mantém a compatibilidade entre o DynamoDB Streams Kinesis Adapter para a KCL v3 e a KCL v1, não entre a KCL v2 e a v3.

### Etapa 4: visão geral e recomendações de configuração da KCL 3.x
<a name="step4-configuration-migration"></a>

Para obter uma descrição detalhada das configurações introduzidas após a KCL 1.x que são relevantes na KCL 3.x, consulte [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) e [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration).

**Importante**  
Em vez de criar objetos diretamente de `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig` e `retrievalConfig`, recomendamos usar `ConfigsBuilder` para definir configurações na KCL 3.x e versões posteriores para evitar problemas de inicialização do Agendador. O `ConfigsBuilder` oferece uma maneira mais flexível e sustentável de configurar sua aplicação da KCL.

#### Configurações com valor padrão de atualização na KCL 3.x
<a name="kcl3-configuration-overview"></a>

`billingMode`  
Na KCL versão 1.x, o valor padrão para `billingMode` é definido como `PROVISIONED`. No entanto, na KCL versão 3.x, o padrão `billingMode` é `PAY_PER_REQUEST` (modo sob demanda). Recomendamos que você use o modo de capacidade sob demanda em sua tabela de concessões para ajustar automaticamente a capacidade com base no uso. Para obter orientações sobre como usar a capacidade provisionada para suas tabelas de concessões, consulte [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html).

`idleTimeBetweenReadsInMillis`  
Na KCL versão 1.x, o valor padrão para `idleTimeBetweenReadsInMillis` é definido como 1.000 (ou 1 segundo). A KCL versão 3.x define o valor padrão para i`dleTimeBetweenReadsInMillis` como 1.500 (ou 1,5 segundo), mas o Amazon DynamoDB Streams Kinesis Adapter substitui esse valor padrão, definindo-o como 1.000 (ou 1 segundo).

#### Novas configurações na KCL 3.x
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
Essa configuração define o intervalo de tempo antes que os fragmentos recém-descobertos comecem a ser processados, e é calculada como 1,5 × `leaseAssignmentIntervalMillis`. Se essa configuração não for definida explicitamente, o intervalo de tempo será padronizado como 1,5 × `failoverTimeMillis`. O processamento de novos fragmentos exige a verificação da tabela de concessões e a consulta a um índice secundário global (GSI) na tabela de concessões. A redução de `leaseAssignmentIntervalMillis` aumenta a frequência dessas operações de verificação e consulta, aumentando os custos do DynamoDB. Recomendamos definir esse valor como 2 mil (ou 2 segundos) para minimizar o atraso no processamento de novos fragmentos.

`shardConsumerDispatchPollIntervalMillis`  
Essa configuração define o intervalo entre pesquisas sucessivas feitas pelo consumidor do fragmento para acionar transições de estado. Na KCL versão 1.x, esse comportamento era controlado pelo parâmetro `idleTimeInMillis`, que não era exposto como uma definição configurável. Na KCL versão 3.x, recomendamos definir essa configuração para corresponder ao valor usado em ` idleTimeInMillis` na configuração da KCL versão 1.x.

### Etapa 5: migrar da KCL 2.x para a KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

Para garantir uma transição tranquila e a compatibilidade com a versão mais recente da Kinesis Client Library (KCL), siga as etapas de 5 a 8 nas instruções do guia de migração para [atualizar da KCL 2.x para a KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics).

Para solucionar problemas comuns da KCL 3.x, consulte [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html).

# Reverter para a versão anterior da KCL
<a name="kcl-migration-rollback"></a>

Este tópico explica como reverter sua aplicação de consumidor para a versão anterior da KCL. O processo de reversão consiste em duas etapas:

1. Execute a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Reimplante o código da versão anterior da KCL.

## Etapa 1: executar a Ferramenta de Migração da KCL
<a name="kcl-migration-rollback-step1"></a>

Quando precisar reverter para a versão anterior da KCL, você deve executar a Ferramenta de Migração da KCL. Essa ferramenta executa duas tarefas importantes:
+ Ela remove uma tabela de metadados chamada tabela de métricas do operador e o índice secundário global na tabela de concessões no DynamoDB. Esses artefatos são criados pela KCL 3.x, mas não são necessários quando você reverte para a versão anterior.
+ Ela faz com que todos os operadores funcionem em um modo compatível com a KCL 1.x e comecem a usar o algoritmo de balanceamento de carga usado nas versões anteriores da KCL. Se você tiver problemas com o novo algoritmo de balanceamento de carga na KCL 3.x, isso mitigará o problema imediatamente.

**Importante**  
A tabela de estados do coordenador no DynamoDB deve existir e não deve ser excluída durante o processo de migração, reversão e avanço.

**nota**  
É importante que todos os operadores em sua aplicação de consumidor usem o mesmo algoritmo de balanceamento de carga em um determinado momento. A Ferramenta de Migração da KCL garante que todos os operadores em sua aplicação de consumidor da KCL 3.x mudem para o modo compatível com a KCL 1.x para que todos os operadores executem o mesmo algoritmo de balanceamento de carga durante a reversão da aplicação para a versão anterior da KCL.

Você pode baixar a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py) no diretório de scripts do [repositório da KCL no GitHub](https://github.com/awslabs/amazon-kinesis-client/tree/master). Execute o script usando um operador ou host com as permissões apropriadas para gravar na tabela de estados do coordenador, na tabela de métricas do operador e na tabela de concessões. As [permissões do IAM](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html) apropriadas devem estar configuradas para aplicações de consumidor da KCL. Execute o script uma única vez para cada aplicação da KCL usando o comando especificado:

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parâmetros
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
Substitua *region* pela Região da AWS.

`--application_name`  
Esse parâmetro é obrigatório se você estiver usando nomes padrão para suas tabelas de metadados do DynamoDB (tabela de concessões, tabela de estados do coordenador e tabela de métricas do operador). Se você tiver especificado nomes personalizados para essas tabelas, poderá omitir esse parâmetro. Substitua *applicationName* pelo nome da aplicação da KCL. A ferramenta usa esse nome para obter os nomes de tabela padrão se os nomes personalizados não forem fornecidos.

`--lease_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de concessões na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *leaseTableName* pelo nome da tabela personalizada que você especificou para a tabela de concessões.

`--coordinator_state_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de estados do coordenador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *coordinatorStateTableName* pelo nome da tabela personalizada que você especificou para a tabela de estados do coordenador.

`--worker_metrics_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de métricas do operador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *workerMetricsTableName* pelo nome da tabela personalizada que você especificou para a tabela de métricas do operador.

## Etapa 2: reimplantar o código com a versão anterior da KCL
<a name="kcl-migration-rollback-step2"></a>

**Importante**  
Qualquer menção à versão 2.x na saída gerada pela Ferramenta de Migração da KCL deve ser interpretada como sendo a versão 1.x da KCL. A execução do script não realiza uma reversão completa, apenas alterna o algoritmo de balanceamento de carga para o usado na versão 1.x da KCL.

Depois de executar a Ferramenta de Migração da KCL para uma reversão, você verá uma destas mensagens:

Mensagem 1  
“Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Ação necessária:** isso significa que os operadores estavam executando no modo compatível com a KCL 1.x. Reimplante o código para os operadores com a versão anterior da KCL.

Mensagem 2  
“Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version”.  
**Ação necessária:** isso significa que os operadores estavam executando no modo da KCL 3.x e a Ferramenta de Migração da KCL mudou todos os operadores para o modo compatível com a KCL 1.x. Reimplante o código para os operadores com a versão anterior da KCL.

Mensagem 3  
“Application was already rolled back. Any KCLv3 resources that could be deleted were cleaned up to avoid charges until the application can be rolled forward with migration”.  
**Ação necessária:** isso significa que os operadores já foram revertidos para execução no modo compatível com a KCL 1.x. Reimplante o código para os operadores com a versão anterior da KCL.

# Avançar para a KCL 3.x após uma reversão
<a name="kcl-migration-rollforward"></a>

Este tópico explica como avançar sua aplicação de consumidor para a KCL 3.x após uma reversão. Quando precisar avançar, você deve concluir um processo de duas etapas:

1. Execute a [Ferramenta de Migração da KCL](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py).

1. Implantar o código com a KCL 3.x.

## Etapa 1: executar a Ferramenta de Migração da KCL
<a name="kcl-migration-rollforward-step1"></a>

Execute a Ferramenta de Migração da KCL com o seguinte comando para avançar para a KCL 3.x:

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parâmetros
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
Substitua *region* pela Região da AWS.

`--application_name`  
Esse parâmetro será obrigatório se você estiver usando nomes padrão para a tabela de estados do coordenador. Se você tiver especificado nomes personalizados para a tabela de estados do coordenador, poderá omitir esse parâmetro. Substitua *applicationName* pelo nome da aplicação da KCL. A ferramenta usa esse nome para obter os nomes de tabela padrão se os nomes personalizados não forem fornecidos.

`--coordinator_state_table_name`  
Esse parâmetro é necessário quando você define um nome personalizado para a tabela de estados do coordenador na configuração da KCL. Se você estiver usando o nome padrão da tabela, poderá omitir esse parâmetro. Substitua *coordinatorStateTableName* pelo nome da tabela personalizada que você especificou para a tabela de estados do coordenador.

Após a execução da Ferramenta de Migração no modo de avanço, o KCL cria os seguintes recursos do DynamoDB necessários para a KCL 3.x:
+ Um índice secundário global na tabela de concessões
+ Uma tabela de métricas do operador

## Etapa 2: implantar o código com a KCL 3.x
<a name="kcl-migration-rollforward-step2"></a>

Depois de executar a Ferramenta de Migração da KCL para um avanço, implante seu código com a KCL 3.x nos operadores. Para concluir sua migração, consulte [Step 8: Complete the migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish).

# Demonstração: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

Esta seção é uma demonstração de uma aplicação em Java que usa a Amazon Kinesis Client Library e o Amazon DynamoDB Streams Kinesis Adapter. A aplicação mostra um exemplo de replicação de dados, no qual as atividades de gravação de uma tabela são aplicadas a uma segunda tabela, com o conteúdo de ambas mantido em sincronia. Para o código-fonte, consulte [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md).

O programa faz o seguinte:

1. Cria duas tabelas do DynamoDB chamadas `KCL-Demo-src` e `KCL-Demo-dst`. Cada uma dessas tabelas tem um fluxo habilitado.

1. Gera atividades de atualização na tabela de origem, adicionando, atualizando e excluindo itens. Isso faz com que os dados sejam gravados no fluxo da tabela.

1. Lê os registros do fluxo, faz a reconstrução desses registros como solicitações do DynamoDB e aplica essas solicitações à tabela de destino.

1. Verifica as tabelas de origem e destino para garantir que o conteúdo seja idêntico.

1. Realiza uma limpeza excluindo as tabelas.

Essas etapas estão descritas nas seções a seguir, e a aplicação completa é mostrada no final do passo-a-passo.

**Topics**
+ [Etapa 1: criar tabelas do DynamoDB](#Streams.KCLAdapter.Walkthrough.Step1)
+ [Etapa 2: gerar atividades de atualização na tabela de origem](#Streams.KCLAdapter.Walkthrough.Step2)
+ [Etapa 3: processar o fluxo](#Streams.KCLAdapter.Walkthrough.Step3)
+ [Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico](#Streams.KCLAdapter.Walkthrough.Step4)
+ [Etapa 5: limpar](#Streams.KCLAdapter.Walkthrough.Step5)
+ [Programa completo: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## Etapa 1: criar tabelas do DynamoDB
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

A primeira etapa é criar duas tabelas do DynamoDB: uma de origem e outra de destino. O `StreamViewType` no fluxo da tabela de origem é `NEW_IMAGE`. Isso significa que sempre que um item é modificado nessa tabela, o item “depois” da imagem é gravado no fluxo. Dessa forma, o fluxo mantém o controle de todas as atividades de gravação na tabela.

O exemplo a seguir mostra o código usado para a criação das duas tabelas.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## Etapa 2: gerar atividades de atualização na tabela de origem
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

O próximo passo é gerar algumas atividades de gravação na tabela de origem. Enquanto essas atividades estão ocorrendo, o fluxo da tabela de origem também é atualizado quase em tempo real.

A aplicação define uma classe auxiliar com métodos que chamam as operações da API `PutItem`, `UpdateItem` e `DeleteItem` para gravar os dados. O exemplo de código a seguir mostra como esses métodos são usados.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## Etapa 3: processar o fluxo
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

Agora, o programa inicia o processamento do fluxo. O DynamoDB Streams Kinesis Adapter atua como uma camada transparente entre a KCL e o endpoint do DynamoDB Streams para que o código possa usar totalmente a KCL em vez de precisar fazer chamadas de baixo nível ao DynamoDB Streams. O programa realiza as seguintes tarefas:
+ Ele define uma classe de processador de registro, `StreamsRecordProcessor`, com métodos que estão em conformidade com a definição de interface da KCL: `initialize`, `processRecords` e `shutdown`. O método `processRecords` contém a lógica necessária para leituras do fluxo da tabela de origem e para gravações na tabela de destino.
+ Ele define uma fábrica de classes para a classe de processador de registro (`StreamsRecordProcessorFactory`). Isso é necessário para programas Java que usam a KCL.
+ Ele instancia um novo `Worker` da KCL, que está associado à fábrica de classes.
+ Ele desliga `Worker` quando o processamento do registro é concluído.

Opcionalmente, habilite o modo de recuperação na configuração do adaptador da KCL do Streams para escalar automaticamente a taxa de chamadas de API GetRecords em três vezes (padrão) quando o atraso no processamento de fluxos exceder um minuto (padrão), o que ajuda o consumidor de fluxos a lidar com altos picos de throughput na tabela.

Para saber mais sobre a definição da interface da KCL, consulte [Desenvolvimento de consumidores usando a Amazon Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) no *Guia do desenvolvedor do Amazon Kinesis Data Streams*. 

O exemplo de código a seguir mostra o loop principal em `StreamsRecordProcessor`. A instrução `case` determina a ação a ser executada, com base no `OperationType` que aparece no registro de fluxo.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## Etapa 4: garantir que as duas tabelas tenham conteúdo idêntico
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

Neste ponto, o conteúdo das tabelas de origem e destino está sincronizado. A aplicação emite solicitações `Scan` em ambas as tabelas para verificar se o conteúdo delas é realmente idêntico.

A classe `DemoHelper` contém um método `ScanTable` que chama a API de `Scan` de baixo nível. O exemplo a seguir mostra como fazer isso.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## Etapa 5: limpar
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

A demonstração está concluída e, portanto, a aplicação exclui as tabelas de origem e destino. Consulte o seguinte exemplo de código. Mesmo depois que as tabelas são excluídas, seus fluxos permanecem disponíveis por até 24 horas. Após esse período, eles serão automaticamente excluídos.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# Programa completo: DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

Veja a seguir o programa Java completo que realiza as tarefas descritas em [Demonstração: DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.md). Quando executá-lo, você verá uma saída semelhante à seguinte:

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**Importante**  
 Para executar esse programa, verifique se a aplicação cliente tem acesso ao DynamoDB e ao Amazon CloudWatch usando políticas. Para obter mais informações, consulte [Políticas baseadas em identidade para o DynamoDB](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies). 

O código-fonte consiste em quatro arquivos `.java`. Para criar esse programa, adicione a seguinte dependência, que inclui a Amazon Kinesis Client Library (KCL) 3.x e o AWS SDK para Java v2 como dependências temporárias:

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

Os arquivos de origem são:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```