Acquisizione di batch scartati per una sorgente di eventi Apache Kafka autogestita - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Acquisizione di batch scartati per una sorgente di eventi Apache Kafka autogestita

Per mantenere i record delle chiamate non riuscite allo strumento di mappatura dell'origine degli eventi, aggiungi una destinazione allo strumento di mappatura dell'origine degli eventi della funzione. Ogni record inviato alla destinazione è un JSON documento con metadati sulla chiamata non riuscita. Puoi configurare qualsiasi SNS argomento Amazon, Amazon SQS queue o bucket S3 come destinazione. Il tuo ruolo di esecuzione deve avere le autorizzazioni per la destinazione:

Inoltre, se hai configurato una KMS chiave sulla tua destinazione, Lambda necessita delle seguenti autorizzazioni a seconda del tipo di destinazione:

Configurazione di destinazioni in caso di errore per una mappatura delle sorgenti di eventi di Apache Kafka autogestita

Per configurare una destinazione in caso di errore tramite la console, completa i seguenti passaggi:

  1. Aprire la pagina Funzioni della console Lambda.

  2. Scegliere una funzione.

  3. In Function overview (Panoramica delle funzioni), scegliere Add destination (Aggiungi destinazione).

  4. Per Origine, scegli Chiamata allo strumento di mappatura dell'origine degli eventi.

  5. Per Strumento di mappatura dell'origine degli eventi, scegli un'origine dell'evento configurata per questa funzione.

  6. Per Condizione, seleziona In caso di errore. Per le chiamate allo strumento di mappatura dell'origine degli eventi, questa è l'unica condizione accettata.

  7. Per Tipo di destinazione, scegli il tipo di destinazione a cui Lambda deve inviare i record di chiamata.

  8. Per Destination (Destinazione), scegliere una risorsa.

  9. Seleziona Salva.

È inoltre possibile configurare una destinazione in caso di errore utilizzando. AWS CLI Ad esempio, il create-event-source-mappingcomando seguente aggiunge una mappatura dell'origine degli eventi con una destinazione in caso di SQS errore a: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Il update-event-source-mappingcomando seguente aggiunge una destinazione S3 in caso di errore all'origine dell'evento associata all'input: uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Per rimuovere una destinazione, fornisci una stringa vuota come argomento del parametro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

SNSed SQS esempio di record di invocazione

L'esempio seguente mostra ciò che Lambda invia a un SNS argomento o a una destinazione di SQS coda per una chiamata alla fonte di un evento Kafka non riuscita. Ciascuna delle chiavi in recordsInfo contiene sia l'argomento sia la partizione di Kafka, separati da un trattino. Ad esempio, per la chiave "Topic-0", Topic è l'argomento di Kafka e 0 è la partizione. Per ogni argomento e partizione, è possibile utilizzare i dati di offset e timestamp per individuare i record di chiamata originali.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Esempio di destinazione S3 (record di invocazione)

Se le destinazioni sono S3, Lambda invia alla destinazione l'intero record di chiamata insieme ai metadati. L'esempio seguente mostra ciò che Lambda invia a un bucket S3 di destinazione per una chiamata non riuscita all'origine dell'evento Kafka. Oltre a tutti i campi dell'esempio precedente for SQS and SNS destinations, il payload campo contiene il record di invocazione originale come stringa di escape. JSON

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Suggerimento

Ti consigliamo di abilitare il controllo delle versioni S3 sul bucket di destinazione.