Menggunakan AWS Lambda dengan Amazon DynamoDB - AWS Lambda

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan AWS Lambda dengan Amazon DynamoDB

Anda dapat menggunakan AWS Lambda fungsi untuk memproses catatan dalam aliran Amazon DynamoDB. Dengan DynamoDB Streams, Anda dapat memicu fungsi Lambda untuk melakukan pekerjaan tambahan setiap kali tabel DynamoDB diperbarui.

Lambda membaca rekaman dari aliran dan memanggil fungsi Anda secara sinkron dengan kejadian yang berisi rekaman aliran. Lambda membaca catatan dalam batch dan memanggil fungsi Anda untuk memproses catatan dari batch.

Contoh peristiwa

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

Polling dan batching stream

Lambda melakukan polling shard dalam aliran DynamoDB untuk catatan dengan tingkat dasar sebanyak 4 kali per detik. Saat rekaman tersedia, Lambda memanggil fungsi Anda dan menunggu hasilnya. Jika pemrosesan berhasil, Lambda melanjutkan polling sampai menerima lebih banyak catatan.

Secara default, Lambda memanggil fungsi Anda segera setelah catatan tersedia. Jika batch yang dibaca Lambda dari sumber peristiwa hanya memiliki satu catatan di dalamnya, Lambda hanya mengirimkan satu catatan ke fungsi tersebut. Untuk menghindari menjalankan fungsi dengan sejumlah kecil catatan, Anda dapat memberi tahu sumber acara untuk menyangga catatan hingga 5 menit dengan mengonfigurasi jendela batching. Sebelum menjalankan fungsi, Lambda terus membaca catatan dari sumber acara hingga mengumpulkan batch penuh, jendela batching kedaluwarsa, atau batch mencapai batas muatan 6 MB. Untuk informasi selengkapnya, lihat Perilaku batching.

Awas

Pemetaan sumber peristiwa Lambda memproses setiap peristiwa setidaknya sekali, dan pemrosesan duplikat catatan dapat terjadi. Untuk menghindari potensi masalah yang terkait dengan duplikat peristiwa, kami sangat menyarankan agar Anda membuat kode fungsi Anda idempoten. Untuk mempelajari lebih lanjut, lihat Bagaimana cara membuat fungsi Lambda saya idempoten di Pusat Pengetahuan. AWS

Konfigurasikan ParallelizationFactorpengaturan untuk memproses satu pecahan aliran DynamoDB dengan lebih dari satu pemanggilan Lambda secara bersamaan. Anda dapat menentukan jumlah batch bersamaan yang polling-nya dibuat Lambda dari shard melalui faktor paralelisasi mulai dari 1 (default) hingga 10. Saat Anda meningkatkan jumlah batch bersamaan per pecahan, Lambda masih memastikan pemrosesan dalam urutan pada level item (partisi dan kunci sortir).

Posisi awal polling dan streaming

Ketahuilah bahwa polling streaming selama pembuatan dan pembaruan pemetaan sumber acara pada akhirnya konsisten.

  • Selama pembuatan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk memulai acara polling dari aliran.

  • Selama pembaruan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk menghentikan dan memulai kembali acara pemungutan suara dari aliran.

Perilaku ini berarti bahwa jika Anda menentukan LATEST sebagai posisi awal untuk aliran, pemetaan sumber peristiwa dapat melewatkan peristiwa selama pembuatan atau pembaruan. Untuk memastikan tidak ada peristiwa yang terlewatkan, tentukan posisi awal aliran sebagaiTRIM_HORIZON.

Pembaca serpihan secara bersamaan di DynamoDB Streams

Untuk tabel Single-region yang bukan tabel global, Anda dapat mendesain hingga dua fungsi Lambda untuk dibaca dari pecahan DynamoDB Streams yang sama secara bersamaan. Melebihi batas ini dapat mengakibatkan throttling permintaan. Untuk tabel global, kami sarankan Anda membatasi jumlah fungsi simultan menjadi satu untuk menghindari pembatasan permintaan.

Izin peran eksekusi

Kebijakan AWSLambdaDynamoDBExecutionRole AWS terkelola mencakup izin yang perlu dibaca Lambda dari aliran DynamoDB Anda. Tambahkan kebijakan terkelola ini ke peran eksekusi fungsi Anda.

Untuk mengirim catatan batch gagal ke antrian SQS standar atau topik SNS standar, fungsi Anda memerlukan izin tambahan. Setiap layanan tujuan memerlukan izin yang berbeda, sebagai berikut:

Tambahkan izin dan buat pemetaan sumber acara

Buat pemetaan sumber kejadian untuk memberi tahu Lambda agar mengirim rekaman dari aliran Anda ke fungsi Lambda. Anda dapat membuat beberapa pemetaan sumber kejadian untuk memproses data yang sama dengan beberapa fungsi Lambda, atau untuk memproses item dari beberapa aliran dengan satu fungsi.

Untuk mengonfigurasi fungsi agar dibaca dari DynamoDB Streams, lampirkan kebijakan terkelola ke peran eksekusi, lalu buat AWSLambdaDynamoDBExecutionRole AWS pemicu DynamoDB.

Untuk menambahkan izin dan membuat pemicu
  1. Buka Halaman fungsi di konsol Lambda.

  2. Pilih nama sebuah fungsi.

  3. Pilih tab Konfigurasi, lalu pilih Izin.

  4. Di bawah Nama peran, pilih tautan ke peran eksekusi Anda. Tautan ini membuka peran di konsol IAM.

    Tautan ke peran eksekusi
  5. Pilih Tambahkan izin, lalu pilih Lampirkan kebijakan.

    Lampirkan kebijakan di konsol IAM
  6. Di bidang pencarian, masukkanAWSLambdaDynamoDBExecutionRole. Tambahkan kebijakan ini ke peran eksekusi Anda. Ini adalah kebijakan AWS terkelola yang berisi izin yang perlu dibaca fungsi Anda dari aliran DynamoDB. Untuk informasi selengkapnya tentang kebijakan ini, lihat AWSLambdaDynamoDBExecutionRoledi Referensi Kebijakan AWS Terkelola.

  7. Kembali ke fungsi Anda di konsol Lambda. Di bagian Gambaran umum fungsi, pilih Tambah pemicu.

    Bagian ikhtisar fungsi dari konsol Lambda
  8. Pilih jenis pemicu.

  9. Konfigurasikan opsi yang diperlukan, lalu pilih Tambah.

Lambda mendukung opsi berikut untuk sumber acara DynamoDB:

Opsi sumber kejadian
  • Tabel DynamoDB – Tabel DynamoDB tempat untuk membaca catatan.

  • Ukuran batch – Jumlah rekaman yang akan dikirimkan ke fungsi dalam setiap batch, paling banyak 10.000. Lambda meneruskan semua rekaman dalam batch ke fungsi dalam satu panggilan, selama ukuran total kejadian tidak melebihi batas payload untuk invokasi sinkron (6 MB).

  • Jendela batch – Tentukan jumlah waktu maksimum untuk mengumpulkan rekaman sebelum memanggil fungsi, dalam hitungan detik.

  • Posisi mulai – Hanya memproses rekaman baru, atau semua rekaman yang ada.

    • Terbaru – Memproses rekaman baru yang ditambahkan ke stream.

    • Trim horizon – Memproses semua rekaman dalam aliran.

    Setelah memproses rekaman yang sudah ada, fungsi berhenti dan melanjutkan memproses rekaman baru.

  • Tujuan kegagalan — Antrian SQS standar atau topik SNS standar untuk catatan yang tidak dapat diproses. Ketika Lambda membuang sekumpulan catatan yang terlalu tua atau telah kehabisan semua percobaan ulang, Lambda mengirimkan detail tentang batch ke antrian atau topik.

  • Percobaan ulang – Jumlah maksimum percobaan ulang Lambda saat fungsi memunculkan kesalahan. Hal ini tidak berlaku untuk kesalahan layanan atau pembatasan di mana batch tidak mencapai fungsi.

  • Maximum age of record – Usia maksimum rekaman yang dikirimkan Lambda ke fungsi Anda.

  • Split batch on error – Ketika fungsi mengembalikan kesalahan, bagi batch menjadi dua bagian sebelum mencoba kembali. Pengaturan ukuran batch asli Anda tetap tidak berubah.

  • Batch bersamaan per pecahan — Secara bersamaan memproses beberapa batch dari pecahan yang sama.

  • Diaktifkan – Atur ke true untuk mengaktifkan pemetaan sumber kejadian. Atur ke false untuk menghentikan pemrosesan rekaman. Lambda mencatat rekaman terakhir yang diproses dan melanjutkan pemrosesan dari titik tersebut saat pemetaan diaktifkan kembali.

catatan

Anda tidak dikenakan biaya untuk panggilan GetRecords API yang dipanggil oleh Lambda sebagai bagian dari pemicu DynamoDB.

Untuk mengelola konfigurasi sumber kejadian nanti, pilih pemicu di desainer.

Penanganan kesalahan

Penanganan kesalahan untuk pemetaan sumber peristiwa DynamoDB tergantung pada apakah kesalahan terjadi sebelum fungsi dipanggil atau selama pemanggilan fungsi:

Jika tindakan penanganan kesalahan gagal, Lambda membuang rekaman dan melanjutkan pemrosesan batch dari aliran. Dengan pengaturan default, ini berarti rekaman yang buruk dapat memblokir pemrosesan pada shard yang terpengaruh hingga selama satu hari. Untuk menghindari hal ini, konfigurasikan pemetaan sumber kejadian fungsi Anda dengan jumlah percobaan ulang yang wajar dan usia maksimum rekaman yang sesuai dengan kasus penggunaan Anda.

Mengonfigurasi tujuan untuk pemanggilan yang gagal

Untuk menyimpan catatan pemanggilan pemetaan sumber peristiwa yang gagal, tambahkan tujuan ke pemetaan sumber peristiwa fungsi Anda. Setiap catatan yang dikirim ke tujuan adalah dokumen JSON dengan metadata tentang pemanggilan yang gagal. Anda dapat mengonfigurasi topik Amazon SNS atau antrian Amazon SQS sebagai tujuan. Peran eksekusi Anda harus memiliki izin untuk tujuan:

Untuk mengonfigurasi tujuan saat gagal menggunakan konsol, ikuti langkah-langkah berikut:

  1. Buka halaman Fungsi di konsol Lambda.

  2. Pilih fungsi.

  3. Di bagian Gambaran umum fungsi, pilih Tambahkan tujuan.

  4. Untuk Sumber, pilih Pemanggilan pemetaan sumber acara.

  5. Untuk pemetaan sumber peristiwa, pilih sumber peristiwa yang dikonfigurasi untuk fungsi ini.

  6. Untuk Kondisi, pilih On failure. Untuk pemanggilan pemetaan sumber peristiwa, ini adalah satu-satunya kondisi yang diterima.

  7. Untuk tipe Tujuan, pilih jenis tujuan yang Lambda kirimkan catatan pemanggilan.

  8. Untuk Tujuan, pilih sumber daya.

  9. Pilih Simpan.

Anda juga dapat mengonfigurasi tujuan pada kegagalan menggunakan AWS Command Line Interface (AWS CLI). Misalnya, perintah create-event-source-mapping berikut menambahkan pemetaan sumber peristiwa dengan tujuan kegagalan SQS ke: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Perintah update-event-source-mapping berikut memperbarui pemetaan sumber peristiwa untuk mengirim catatan pemanggilan yang gagal ke tujuan SNS setelah dua percobaan ulang, atau jika catatan berusia lebih dari satu jam.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

Pengaturan yang diperbarui diterapkan secara asinkron dan tidak muncul dalam output hingga proses selesai. Gunakan perintah get-event-source-mapping untuk melihat status saat ini.

Untuk menghapus tujuan, berikan string kosong sebagai argumen ke destination-config parameter:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Contoh berikut menunjukkan catatan invokasi untuk aliran DynamoDB.

contoh Rekaman Invokasi
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Anda dapat menggunakan informasi ini guna mengambil rekaman yang terpengaruh dari aliran untuk pemecahan masalah. Rekaman aktual tidak disertakan, jadi Anda harus memproses rekaman ini dan mengambilnya dari aliran sebelum kedaluwarsa dan hilang.

CloudWatch Metrik Amazon

Lambda memancarkan metrik IteratorAge saat fungsi Anda menyelesaikan pemrosesan suatu batch rekaman. Metrik menunjukkan berapa usia rekaman terakhir dalam batch saat pemrosesan selesai. Jika fungsi Anda memproses kejadian baru, Anda dapat menggunakan usia iterator untuk memperkirakan latensi antara waktu saat rekaman ditambahkan dan saat fungsi memprosesnya.

Tren yang meningkat dalam usia iterator dapat menandakan masalah dengan fungsi Anda. Untuk informasi selengkapnya, lihat Bekerja dengan metrik fungsi Lambda.

Jendela waktu

Fungsi Lambda dapat menjalankan aplikasi pemrosesan aliran berkelanjutan. Aliran merupakan data tidak terbatas yang mengalir terus-menerus melalui aplikasi Anda. Untuk menganalisis informasi dari input yang terus diperbarui ini, Anda dapat mengikat catatan yang disertakan menggunakan jendela yang didefinisikan dalam hal waktu.

Jatuh jendela adalah jendela waktu yang berbeda yang membuka dan menutup secara berkala. Secara default, pemanggilan Lambda tanpa status — Anda tidak dapat menggunakannya untuk memproses data di beberapa pemanggilan berkelanjutan tanpa database eksternal. Namun, dengan jendela yang jatuh, Anda dapat mempertahankan status Anda di seluruh pemanggilan. Status ini berisi hasil agregat pesan yang sebelumnya diproses untuk jendela saat ini. Status Anda maksimal bisa sebesar 1 MB per shard. Jika melebihi ukuran tersebut, Lambda mengakhiri jendela lebih awal.

Setiap rekaman dalam aliran milik jendela tertentu. Lambda akan memproses setiap rekaman setidaknya sekali, tetapi tidak menjamin bahwa setiap rekaman akan diproses hanya sekali. Dalam kasus yang jarang terjadi, seperti penanganan kesalahan, beberapa catatan mungkin diproses lebih dari sekali. Catatan selalu diproses secara berurutan pertama kali. Jika catatan diproses lebih dari satu kali, mereka mungkin diproses rusak.

Agregasi dan pemrosesan

Fungsi yang dikelola pengguna Anda dipanggil baik untuk agregasi maupun untuk memproses hasil akhir agregasi tersebut. Lambda mengumpulkan semua catatan yang diterima di jendela. Anda dapat menerima catatan ini dalam beberapa batch, masing-masing sebagai invokasi terpisah. Setiap invokasi menerima status. Jadi, saat menggunakan jendela tumbling, respons fungsi Lambda Anda harus berisi state properti. Jika respons tidak berisi state properti, Lambda menganggap ini sebagai pemanggilan yang gagal. Untuk memenuhi kondisi ini, fungsi Anda dapat mengembalikan TimeWindowEventResponse objek, yang memiliki bentuk JSON berikut:

contoh Nilai TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
catatan

Untuk fungsi Java, sebaiknya gunakan a Map<String, String> untuk mewakili status.

Di akhir jendela, tanda isFinalInvokeForWindow diatur ke true untuk menunjukkan bahwa ini adalah status akhir dan bahwa itu siap untuk diproses. Setelah pemrosesan, jendela selesai, dan invokasi akhir Anda selesai, lalu status dihapus.

Di akhir jendela Anda, Lambda menggunakan pemrosesan akhir untuk tindakan pada hasil agregasi. Pemrosesan akhir Anda dipanggil secara sinkron. Setelah pemanggilan berhasil, fungsi Anda memeriksa nomor urut dan pemrosesan aliran berlanjut. Jika invokasi tidak berhasil, fungsi Lambda Anda menunda pemrosesan lebih lanjut sampai invokasi sukses.

contoh DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Konfigurasi

Anda dapat mengonfigurasi jendela berguling saat membuat atau memperbarui pemetaan sumber peristiwa. Untuk mengkonfigurasi jendela tumbling, tentukan jendela dalam hitungan detik (TumblingWindowInSeconds). Contoh berikut AWS Command Line Interface (AWS CLI) perintah membuat pemetaan sumber acara streaming yang memiliki jendela jatuh 120 detik. Fungsi Lambda yang didefinisikan untuk agregasi dan pemrosesan diberi nama tumbling-window-example-function.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda menentukan jatuh batas jendela berguling berdasarkan waktu ketika catatan dimasukkan ke dalam aliran. Semua catatan memiliki stempel waktu perkiraan yang tersedia yang digunakan Lambda dalam penentuan batas.

Agregasi jendela berguling tidak mendukung shard ulang. Ketika shard berakhir, Lambda menganggap jendela ditutup, dan shard anak memulai jendela mereka sendiri dalam status baru.

Jendela berguling sepenuhnya mendukung kebijakan coba lagi yang ada maxRetryAttempts dan maxRecordAge.

contoh Handler.py - Agregasi dan pemrosesan

Fungsi Python berikut menunjukkan cara untuk menggabungkan, lalu memproses status akhir Anda:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Melaporkan kegagalan item batch

Ketika menggunakan dan memproses data streaming dari sumber peristiwa, secara default Lambda memeriksa urutan nomor tertinggi batch hanya ketika batch berhasil diselesaikan. Lambda memperlakukan semua hasil lain sebagai sepenuhnya gagal dan mencoba lagi pemrosesan batch hingga batas coba lagi. Untuk memungkinkan keberhasilan parsial saat memproses batch dari aliran, aktifkan ReportBatchItemFailures. Mengizinkan keberhasilan parsial dapat membantu mengurangi jumlah percobaan ulang pada catatan, meskipun tidak sepenuhnya mencegah kemungkinan percobaan ulang dalam catatan yang sukses.

Untuk mengaktifkanReportBatchItemFailures, sertakan nilai enum ReportBatchItemFailures dalam daftar FunctionResponseJenis. Daftar ini menunjukkan tipe respon yang diaktifkan untuk fungsi Anda. Anda dapat mengonfigurasi daftar ini saat membuat atau memperbarui pemetaan sumber peristiwa.

Sintaks laporan

Saat mengonfigurasi pelaporan pada kegagalan item batch, kelas StreamsEventResponse dikembalikan dengan daftar kegagalan item batch. Anda dapat menggunakan objek StreamsEventResponse untuk mengembalikan nomor urutan catatan gagal pertama dalam batch. Anda juga dapat membuat kelas kustom Anda sendiri menggunakan sintaks respons yang benar. Struktur JSON berikut menunjukkan sintaks respons yang diperlukan:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
catatan

Jika batchItemFailures array berisi beberapa item, Lambda menggunakan catatan dengan nomor urut terendah sebagai pos pemeriksaan. Lambda kemudian mencoba kembali semua catatan mulai dari pos pemeriksaan itu.

Status berhasil dan gagal

Lambda memperlakukan batch sebagai sepenuhnya berhasil jika Anda mengembalikan salah satu dari berikut:

  • Daftar batchItemFailure kosong

  • Daftar batchItemFailure nol

  • EventResponse kosong

  • EventResponse nol

Lambda memperlakukan batch sebagai sepenuhnya gagal jika Anda mengembalikan salah satu dari berikut:

  • String itemIdentifier kosong

  • itemIdentifier nol

  • itemIdentifier dengan nama kunci yang buruk

Lambda mencoba kembali kegagalan berdasarkan strategi coba lagi Anda.

Membagi batch

Jika invokasi Anda gagal dan BisectBatchOnFunctionError diaktifkan, batch dibagi terlepas dari pengaturan ReportBatchItemFailures Anda.

Ketika respons berhasil batch parsial diterima dan kedua BisectBatchOnFunctionError dan ReportBatchItemFailures diaktifkan, batch dibagi dua di nomor urutan yang dikembalikan dan Lambda mencoba hanya catatan yang tersisa.

Berikut adalah beberapa contoh kode fungsi yang mengembalikan daftar ID pesan yang gagal dalam batch:

.NET
AWS SDK for .NET
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan.NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
Go
SDK untuk Go V2
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
Java
SDK untuk Java 2.x
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
JavaScript
SDK untuk JavaScript (v3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan penggunaan Lambda. JavaScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };

Melaporkan kegagalan item batch DynamoDB dengan penggunaan Lambda. TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { DynamoDBBatchItemFailure, DynamoDBStreamEvent } from "aws-lambda"; export const handler = async (event: DynamoDBStreamEvent): Promise<DynamoDBBatchItemFailure[]> => { const batchItemsFailures: DynamoDBBatchItemFailure[] = [] let curRecordSequenceNumber for(const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber if(curRecordSequenceNumber) { batchItemsFailures.push({ itemIdentifier: curRecordSequenceNumber }) } } return batchItemsFailures }
PHP
SDK untuk PHP
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan PHP.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $dynamoDbEvent = new DynamoDbEvent($event); $this->logger->info("Processing records"); $records = $dynamoDbEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
SDK untuk Ruby
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
Rust
SDK untuk Rust
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori contoh Nirserver.

Melaporkan kegagalan item batch DynamoDB dengan Lambda menggunakan Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Parameter konfigurasi Amazon DynamoDB Streams

Semua jenis sumber peristiwa Lambda berbagi operasi yang sama CreateEventSourceMappingdan UpdateEventSourceMappingAPI. Namun, hanya beberapa parameter yang berlaku untuk DynamoDB Streams.

Parameter sumber peristiwa yang berlaku untuk DynamoDB Streams
Parameter Diperlukan Default Catatan

BatchSize

T

100

Maksimum: 10.000.

BisectBatchOnFunctionKesalahan

T

false

DestinationConfig

T

Antrian Amazon SQS standar atau tujuan topik Amazon SNS standar untuk catatan yang dibuang

Diaktifkan

T

true

EventSourceArn

T

ARN dari aliran data atau konsumen aliran

FilterCriteria

T

Pemfilteran acara Lambda

FunctionName

T

FunctionResponseJenis

T

Agar fungsi Anda melaporkan kegagalan tertentu dalam satu batch, sertakan nilainya ReportBatchItemFailuresFunctionResponseTypes. Untuk informasi selengkapnya, lihat Melaporkan kegagalan item batch.

MaximumBatchingWindowInDetik

T

0

MaximumRecordAgeInDetik

T

-1

-1 berarti tak terbatas: catatan gagal dicoba ulang sampai catatan kedaluwarsa. Batas retensi data untuk DynamoDB Streams adalah 24 jam.

Minimal: -1

Maksimal: 604.800

MaximumRetryUpaya

T

-1

-1 berarti tak terbatas: catatan gagal dicoba ulang sampai catatan kedaluwarsa

Minimal: 0

Maksimum: 10.000.

ParallelizationFactor

T

1

Maksimal: 10

StartingPosition

T

TRIM_HORIZON

TumblingWindowInSeconds

T

Minimal: 0

Maksimal: 900