Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Erste Schritte mit Kinesis Data Streams für Amazon DynamoDB
In diesem Abschnitt wird beschrieben, wie Kinesis Data Streams für Amazon DynamoDB-Tabellen mit der Amazon DynamoDB DynamoDB-Konsole, der AWS Command Line Interface (AWS CLI) und der API verwendet wird.
Einen aktiven Amazon Kinesis Kinesis-Datenstream erstellen
Alle diese Beispiele verwenden die Music
-DynamoDB-Tabelle, die als Teil des Erste Schritte mit DynamoDB-Tutorials erstellt wurde.
Weitere Informationen zum Erstellen von Konsumenten und zum Verbinden Ihres Kinesis Data Streams mit anderen AWS -Services finden Sie unter Lesen von Daten aus Amazon Kinesis Data Streams im Amazon-Kinesis-Data-Streams-Entwicklerhandbuch.
Wenn Sie zum ersten Mal KDS-Shards verwenden, empfehlen wir, Ihre Shards so einzustellen, dass sie den Nutzungsmustern entsprechend hoch- und herunterskaliert werden. Nachdem Sie mehr Daten zu den Nutzungsmustern gesammelt haben, können Sie die Shards in Ihrem Datenstrom entsprechend anpassen.
- Console
-
-
Melden Sie sich bei der an AWS Management Console und öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis/.
-
Klicken Sie auf Create data stream (Datenstrom erstellen) und befolgen Sie die Anweisungen, um einen Stream mit dem Namen samplestream
zu erstellen.
-
Öffnen Sie die DynamoDB-Konsole unter. https://console.aws.amazon.com/dynamodb/
-
Klicken Sie im Navigationsbereich auf der linken Seite der Konsole auf Tables (Tabellen).
-
Wählen Sie die Tabelle Music (Musik).
-
Wählen Sie die Registerkarte Exports and streams (Exporte und Streams).
-
(Optional) Unter Amazon Kinesis Kinesis-Datenstream-Details können Sie die Genauigkeit des Aufzeichnungszeitstempels von Mikrosekunde (Standard) auf Millisekunde ändern.
-
Klicken Sie auf samplestream aus der Dropdown-Liste.
-
Wählen Sie die Schaltfläche „Einschalten“.
- AWS CLI
-
-
Erstellen Sie einen Kinesis Data Stream mit dem Namen samplestream
, indem Sie den Befehl create-stream wählen.
aws kinesis create-stream --stream-name samplestream --shard-count 3
Sehen Sie sich Überlegungen zur Shard-Verwaltung für Kinesis Data Streams an, bevor Sie die Anzahl der Shards für den Kinesis Data Stream festlegen.
-
Überprüfen Sie, ob der Kinesis Stream aktiv und einsatzbereit ist, indem Sie den Befehl Stream beschreiben auswählen.
aws kinesis describe-stream --stream-name samplestream
-
Aktivieren Sie Kinesis-Streaming für die DynamoDB-Tabelle mithilfe des DynamoDB-Befehls enable-kinesis-streaming-destination
. Ersetzen Sie den stream-arn
-Wert durch den Wert, der im vorherigen Schritt von describe-stream
zurückgegeben wurde. Aktivieren Sie optional das Streaming mit einer detaillierteren Genauigkeit (Mikrosekunden) der für jeden Datensatz zurückgegebenen Zeitstempelwerte.
Aktivieren Sie Streaming mit Zeitstempelgenauigkeit im Mikrosekundenbereich:
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
--enable-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
Oder aktivieren Sie das Streaming mit der Standard-Zeitstempelgenauigkeit (Millisekunde):
aws dynamodb enable-kinesis-streaming-destination \
--table-name Music \
--stream-arn arn:aws:kinesis:us-west-2:12345678901:stream/samplestream
-
Überprüfen Sie, ob Kinesis-Streaming in der Tabelle aktiv ist, mithilfe des DynamoDB-describe-kinesis-streaming-destination
-Befehls.
aws dynamodb describe-kinesis-streaming-destination --table-name Music
-
Schreiben Sie Daten in die DynamoDB-Tabelle mithilfe der put-item
, wie in dem Entwicklerhandbuch von DynamoDB angegeben ist.
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"}, "AlbumTitle": {"S": "Somewhat Famous"}, "Awards": {"N": "1"}}'
aws dynamodb put-item \
--table-name Music \
--item \
'{"Artist": {"S": "Acme Band"}, "SongTitle": {"S": "Happy Day"}, "AlbumTitle": {"S": "Songs About Life"}, "Awards": {"N": "10"} }'
-
Verwenden Sie den Kinesis-CLI-Befehl get-records, um den Inhalt des Kinesis Streams abzurufen. Verwenden Sie dann den folgenden Codeausschnitt, um den Streaminhalt zu deserialisieren.
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we want to fetch the value
* of this attribute from the new item image. The following code fetches this value.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
- Java
-
-
Befolgen Sie die Anweisungen im Kinesis-Data-Streams-Entwicklerhandbuch, um einen Kinesis Data Stream mit samplestream
Java-Namen zu erstellen.
Sehen Sie sich Überlegungen zur Shard-Verwaltung für Kinesis Data Streams an, bevor Sie die Anzahl der Shards für den Kinesis Data Stream festlegen.
-
Verwenden den folgenden Codeausschnitt , um Kinesis -Streaming für die DynamoDB-Tabelle zu aktivieren. Aktivieren Sie optional das Streaming mit einer detaillierteren Genauigkeit (Mikrosekunden) der für jeden Datensatz zurückgegebenen Zeitstempelwerte.
Aktivieren Sie Streaming mit Zeitstempelgenauigkeit im Mikrosekundenbereich:
EnableKinesisStreamingConfiguration enableKdsConfig = EnableKinesisStreamingConfiguration.builder()
.approximateCreationDateTimePrecision(ApproximateCreationDateTimePrecision.MICROSECOND)
.build();
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.enableKinesisStreamingConfiguration(enableKdsConfig)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
Oder aktivieren Sie das Streaming mit der Standard-Zeitstempelgenauigkeit (Millisekunde):
EnableKinesisStreamingDestinationRequest enableKdsRequest = EnableKinesisStreamingDestinationRequest.builder()
.tableName(tableName)
.streamArn(kdsArn)
.build();
EnableKinesisStreamingDestinationResponse enableKdsResponse = ddbClient.enableKinesisStreamingDestination(enableKdsRequest);
-
Befolgen Sie die Anweisungen im Kinesis-Data-Streams-Entwicklerhandbuch, um aus dem erstellten Datenstrom zu lesen.
-
Verwenden Sie dann den folgenden Codeausschnitt, um den Streaminhalt zu deserialisieren
/**
* Takes as input a Record fetched from Kinesis and does arbitrary processing as an example.
*/
public void processRecord(Record kinesisRecord) throws IOException {
ByteBuffer kdsRecordByteBuffer = kinesisRecord.getData();
JsonNode rootNode = OBJECT_MAPPER.readTree(kdsRecordByteBuffer.array());
JsonNode dynamoDBRecord = rootNode.get("dynamodb");
JsonNode oldItemImage = dynamoDBRecord.get("OldImage");
JsonNode newItemImage = dynamoDBRecord.get("NewImage");
Instant recordTimestamp = fetchTimestamp(dynamoDBRecord);
/**
* Say for example our record contains a String attribute named "stringName" and we wanted to fetch the value
* of this attribute from the new item image, the below code would fetch this.
*/
JsonNode attributeNode = newItemImage.get("stringName");
JsonNode attributeValueNode = attributeNode.get("S"); // Using DynamoDB "S" type attribute
String attributeValue = attributeValueNode.textValue();
System.out.println(attributeValue);
}
private Instant fetchTimestamp(JsonNode dynamoDBRecord) {
JsonNode timestampJson = dynamoDBRecord.get("ApproximateCreationDateTime");
JsonNode timestampPrecisionJson = dynamoDBRecord.get("ApproximateCreationDateTimePrecision");
if (timestampPrecisionJson != null && timestampPrecisionJson.equals("MICROSECOND")) {
return Instant.EPOCH.plus(timestampJson.longValue(), ChronoUnit.MICROS);
}
return Instant.ofEpochMilli(timestampJson.longValue());
}
Änderungen an einem aktiven Amazon Kinesis Kinesis-Datenstream vornehmen
In diesem Abschnitt wird beschrieben, wie Sie mithilfe der Konsole und der API Änderungen an einem aktiven Kinesis Data Streams for DynamoDB-Setup vornehmen. AWS CLI
AWS Management Console
AWS CLI
-
Rufen Sie andescribe-kinesis-streaming-destination
, um zu bestätigen, dass es sich um einen Stream handeltACTIVE
.
-
Rufen Sie anUpdateKinesisStreamingDestination
, wie in diesem Beispiel:
aws dynamodb update-kinesis-streaming-destination --table-name enable_test_table --stream-arn arn:aws:kinesis:us-east-1:12345678901:stream/enable_test_stream --update-kinesis-streaming-configuration ApproximateCreationDateTimePrecision=MICROSECOND
-
Rufen Sie andescribe-kinesis-streaming-destination
, um zu bestätigen, dass es sich um einen Stream handeltUPDATING
.
-
Rufen Sie describe-kinesis-streaming-destination
regelmäßig an, bis der Streaming-Status ACTIVE
wieder angezeigt wird. In der Regel dauert es bis zu 5 Minuten, bis die Timestamp-Präzisions-Updates wirksam werden. Sobald dieser Status aktualisiert wird, bedeutet dies, dass die Aktualisierung abgeschlossen ist und der neue Genauigkeitswert auf future Datensätze angewendet wird.
-
Schreiben Sie in die Tabelle mitputItem
.
-
Verwenden Sie den get-records
Kinesis-Befehl, um den Stream-Inhalt abzurufen.
-
Stellen Sie sicher, dass ApproximateCreationDateTime
die Schreibvorgänge die gewünschte Genauigkeit haben.
Java-API
-
Stellen Sie einen Codeausschnitt bereit, der eine UpdateKinesisStreamingDestination
Anfrage und eine Antwort erstellt. UpdateKinesisStreamingDestination
-
Stellen Sie einen Codeausschnitt bereit, der eine Anfrage und eine erstellt. DescribeKinesisStreamingDestination
DescribeKinesisStreamingDestination response
-
Rufen Sie describe-kinesis-streaming-destination
regelmäßig an, bis der Streaming-Status ACTIVE
wieder angezeigt wird. Dies bedeutet, dass die Aktualisierung abgeschlossen ist und der neue Genauigkeitswert auf future Datensätze angewendet wird.
-
Führt Schreibvorgänge in die Tabelle durch.
-
Lesen Sie aus dem Stream und deserialisieren Sie den Stream-Inhalt.
-
Vergewissern Sie sich, dass ApproximateCreationDateTime
die Schreibvorgänge die gewünschte Genauigkeit haben.