Kinesis-Beispiele unter Verwendung von SDK für JavaScript (v3) - AWS-SDK-Codebeispiele

Weitere AWS-SDK-Beispiele sind im GitHub-Repository Beispiele für AWS Doc SDKs verfügbar.

Kinesis-Beispiele unter Verwendung von SDK für JavaScript (v3)

Die folgenden Codebeispiele zeigen, wie Sie Aktionen durchführen und gängige Szenarien implementieren, indem Sie AWS SDK für JavaScript (v3) mit Kinesis nutzen.

Aktionen sind Codeauszüge aus größeren Programmen und müssen im Kontext ausgeführt werden. Während Aktionen Ihnen zeigen, wie Sie einzelne Servicefunktionen aufrufen, können Sie Aktionen im Kontext der zugehörigen Szenarien anzeigen.

Jedes Beispiel enthält einen Link zum vollständigen Quellcode, wo Sie Anweisungen zum Einrichten und Ausführen des Codes im Kodex finden.

Aktionen

Die folgenden Codebeispiele zeigen, wie PutRecords verwendet wird.

SDK für JavaScript (v3)
Anmerkung

Auf GitHub finden Sie noch mehr. Hier finden Sie das vollständige Beispiel und erfahren, wie Sie das AWS-Codebeispiel-Repository einrichten und ausführen.

import { PutRecordsCommand, KinesisClient } from "@aws-sdk/client-kinesis"; /** * Put multiple records into a Kinesis stream. * @param {{ streamArn: string }} config */ export const main = async ({ streamArn }) => { const client = new KinesisClient({}); try { await client.send( new PutRecordsCommand({ StreamARN: streamArn, Records: [ { Data: new Uint8Array(), /** * Determines which shard in the stream the data record is assigned to. * Partition keys are Unicode strings with a maximum length limit of 256 * characters for each key. Amazon Kinesis Data Streams uses the partition * key as input to a hash function that maps the partition key and * associated data to a specific shard. */ PartitionKey: "TEST_KEY", }, { Data: new Uint8Array(), PartitionKey: "TEST_KEY", }, ], }), ); } catch (caught) { if (caught instanceof Error) { // } else { throw caught; } } }; // Call function if run directly. import { fileURLToPath } from "node:url"; import { parseArgs } from "node:util"; if (process.argv[1] === fileURLToPath(import.meta.url)) { const options = { streamArn: { type: "string", description: "The ARN of the stream.", }, }; const { values } = parseArgs({ options }); main(values); }
  • Weitere API-Informationen finden Sie unter PutRecords in der AWS SDK für JavaScript-API-Referenz.

Serverless-Beispiele

Das folgende Codebeispiel veranschaulicht, wie eine Lambda-Funktion implementiert wird, die ein durch den Empfang von Datensätzen aus einem Kinesis-Stream ausgelöstes Ereignis empfängt. Die Funktion ruft die Kinesis-Nutzlast ab, dekodiert von Base64 und protokolliert den Datensatzinhalt.

SDK für JavaScript (v3)
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von TypeScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Das folgende Codebeispiel zeigt, wie eine teilweise Batch-Antwort für Lambda-Funktionen implementiert wird, die Ereignisse von einem Kinesis-Stream empfangen. Die Funktion meldet die Batch-Elementfehler in der Antwort und signalisiert Lambda, diese Nachrichten später erneut zu versuchen.

SDK für JavaScript (v3)
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Javascript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von TypeScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }