Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Introducción a Kinesis Data Streams para Amazon DynamoDB

Modo de enfoque
Introducción a Kinesis Data Streams para Amazon DynamoDB - Amazon DynamoDB

En esta sección se describe cómo usar las tablas de Kinesis Data Streams para Amazon DynamoDB con la consola de Amazon DynamoDB, la AWS Command Line Interface (AWS CLI) y la API.

Creación de un flujo de datos de Amazon Kinesis activo

Todos estos ejemplos utilizan la tabla Music de DynamoDB que se creó como parte del tutorial Introducción a DynamoDB.

Para obtener más información sobre cómo crear consumidores y conectar el flujo de datos de Kinesis a otros servicios de AWS, consulte Lectura de datos de Kinesis Data Streams en la Guía para desarrolladores de Amazon Kinesis Data Streams.

nota

Cuando utilice particiones de KDS por primera vez, le recomendamos configurar los fragmentos para escalar verticalmente y reducir verticalmente según los patrones de uso. Cuando haya acumulado más datos sobre los patrones de uso, podrá ajustar las particiones de la transmisión para que coincidan.

Console
  1. Inicie sesión en la AWS Management Console y abra la consola de Kinesis en https://console.aws.amazon.com/kinesis/.

  2. Seleccionar Creación de flujos de datos y siga las instrucciones para crear una transmisión llamada samplestream.

  3. Abra la consola de DynamoDB en https://console.aws.amazon.com/dynamodb/.

  4. En el panel de navegación del lado izquierdo de la consola, elija Tablas.

  5. Elija la tabla Music.

  6. Elija la pestaña Exports and streams (Exportaciones y flujos).

  7. (Opcional) En Detalles de Amazon Kinesis Data Streams, puede cambiar la precisión de la marca de tiempo del registro de microsegundos (predeterminado) a milisegundos.

  8. Elija samplestream de la lista desplegable.

  9. Seleccione el botón Activar.

AWS CLI
  1. Cree un flujo de datos de Kinesis llamado samplestream mediante el uso del comando create-stream.

    aws kinesis create-stream --stream-name samplestream --shard-count 3

    Consulte Consideraciones sobre la administración de particiones para Kinesis Data Streams antes de configurar el número de fragmentos para el flujo de datos de Kinesis.

  2. Verifique que la transmisión de Kinesis esté activa y lista para su uso mediante el comando describe-stream.

    aws kinesis describe-stream --stream-name samplestream
  3. Habilite el streaming de Kinesis en la tabla de DynamoDB mediante el enable-kinesis-streaming-destination comando. Reemplace el valor de stream-arn por el que describe-stream devolvió en el paso anterior. Si lo desea, active la transmisión con una precisión más detallada (microsegundos) de los valores de marca de tiempo devueltos en cada registro.

    Active la transmisión con una precisión de marca de tiempo de microsegundos:

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream --enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND

    O active la transmisión con la precisión de marca de tiempo predeterminada (milisegundos):

    aws dynamodb enable-kinesis-streaming-destination \ --table-name Music \ --stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
  4. Verifique si el streaming de Kinesis está activa en la tabla mediante el comando describe-kinesis-streaming-destination de DynamoDB.

    aws dynamodb describe-kinesis-streaming-destination --table-name Music
  5. Escribir datos en la tabla de DynamoDB utilizando el comando put-item, tal y como se describe en la Guía para desarrolladores de DynamoDB.

    aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}' aws dynamodb put-item \ --table-name Music \ --item \ '{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
  6. Uso del comando de la CLI get-records para Kinesis para recuperar el contenido del flujo de Kinesis. A continuación, utilice el siguiente fragmento de código para deserializar el contenido de la transmisión.

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we want to fetch the value * of this attribute from the new item image. The following code fetches this value. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
Java
  1. Siga las instrucciones de la guía para desarrolladores de Kinesis Data Streams para crear una secuencia de datos de Kinesis llamada samplestream usando Java.

    Consulte Consideraciones sobre la administración de particiones para Kinesis Data Streams antes de configurar el número de fragmentos para el flujo de datos de Kinesis.

  2. Use el siguiente fragmento de código para habilitar el streaming de Kinesis en la tabla de DynamoDB. Si lo desea, active la transmisión con una precisión más detallada (microsegundos) de los valores de marca de tiempo devueltos en cada registro.

    Active la transmisión con una precisión de marca de tiempo de microsegundos:

    EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder() .approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND) .build(); EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .enableKinesisStreamingConfiguration(enableKdsConfig) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);

    O active la transmisión con la precisión de marca de tiempo predeterminada (milisegundos):

    EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder() .tableName(tableName) .streamArn(kdsArn) .build(); EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
  3. Siga las instrucciones de la Guía para desarrolladores de Kinesis Data Streams para leer desde el flujo de datos creado.

  4. Utilice el siguiente fragmento de código para deserializar el contenido del flujo.

    /** * Takes as input a Record fetched from Kinesis and does arbitrary processing as an example. */ public void processRecord(Record kinesisRecord) throws IOException { ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData(); JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array()); JsonNode dynamoDBRecord = rootNode.get("dynamodb"); JsonNode oldItemImage = dynamoDBRecord.get("OldImage"); JsonNode newItemImage = dynamoDBRecord.get("NewImage"); Instant recordTimestamp = fetchTimestamp(dynamoDBRecord); /** * Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value * of this attribute from the new item image, the below code would fetch this. */ JsonNode attributeNode = newItemImage.get("stringName"); JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute String attributeValue = attributeValueNode.textValue(); System.out.println(attributeValue); } private Instant fetchTimestamp(JsonNode dynamoDBRecord) { JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime"); JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision"); if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) { return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS); } return Instant.ofEpochMilli(timestampJson.longValue()); }
  1. Inicie sesión en la AWS Management Console y abra la consola de Kinesis en https://console.aws.amazon.com/kinesis/.

  2. Seleccionar Creación de flujos de datos y siga las instrucciones para crear una transmisión llamada samplestream.

  3. Abra la consola de DynamoDB en https://console.aws.amazon.com/dynamodb/.

  4. En el panel de navegación del lado izquierdo de la consola, elija Tablas.

  5. Elija la tabla Music.

  6. Elija la pestaña Exports and streams (Exportaciones y flujos).

  7. (Opcional) En Detalles de Amazon Kinesis Data Streams, puede cambiar la precisión de la marca de tiempo del registro de microsegundos (predeterminado) a milisegundos.

  8. Elija samplestream de la lista desplegable.

  9. Seleccione el botón Activar.

Aplicación de cambios en un flujo de datos de Amazon Kinesis activo

En esta sección se describe cómo realizar cambios en una configuración de Kinesis Data Streams para DynamoDB desde la consola, la AWS CLI o la API.

AWS Management Console

  1. Abra la consola de DynamoDB desde ttps://console.aws.amazon.com/dynamodb/.

  2. Busque su tabla.

  3. Elija la pestaña Exportaciones y flujos.

AWS CLI

  1. Llame a describe-kinesis-streaming-destination para confirmar que el flujo es ACTIVE.

  2. Llame a UpdateKinesisStreamingDestination, como en este ejemplo:

    aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
  3. Llame a describe-kinesis-streaming-destination para confirmar que el flujo es UPDATING.

  4. Llame a describe-kinesis-streaming-destination periódicamente hasta que el estado de la transmisión sea ACTIVE de nuevo. Las actualizaciones de precisión de la marca de tiempo suelen aplicarse al cabo de unos cinco minutos. Una vez actualizado el estado, significa que la actualización se ha completado y que el nuevo valor de precisión se aplicará a los registros futuros.

  5. Escriba en la tabla con putItem.

  6. Uso el comando get-records de Kinesis para recuperar el contenido del flujo.

  7. Confirme que el ApproximateCreationDateTime de las escrituras tienen la precisión deseada.

API de Java

  1. Proporcione un fragmento de código que construya una solicitud UpdateKinesisStreamingDestination y una respuesta UpdateKinesisStreamingDestination.

  2. Proporcione un fragmento de código que construya una solicitud DescribeKinesisStreamingDestination y una DescribeKinesisStreamingDestination response.

  3. Llame a describe-kinesis-streaming-destination periódicamente hasta que el estado del flujo sea ACTIVE de nuevo, lo que significa que la actualización se ha completado y que el nuevo valor de precisión se aplicará a los registros futuros.

  4. Realice escrituras en la tabla.

  5. Lea el flujo y deserialice el contenido del flujo.

  6. Confirme que el ApproximateCreationDateTime de las escrituras tienen la precisión deseada.

PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.