Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Mengonfigurasi kontrol penanganan kesalahan untuk sumber acara Kafka
Anda dapat mengonfigurasi cara Lambda menangani kesalahan dan percobaan ulang untuk pemetaan sumber acara Kafka Anda. Konfigurasi ini membantu Anda mengontrol cara Lambda memproses catatan yang gagal dan mengelola perilaku coba lagi.
Konfigurasi coba lagi yang tersedia
Konfigurasi coba lagi berikut tersedia untuk Amazon MSK dan sumber acara Kafka yang dikelola sendiri:
-
Upaya coba ulang maksimum - Jumlah maksimum kali Lambda mencoba ulang ketika fungsi Anda mengembalikan kesalahan. Ini tidak menghitung upaya pemanggilan awal. Defaultnya adalah -1 (tak terbatas).
-
Usia rekaman maksimum - Usia maksimum rekaman yang dikirim Lambda ke fungsi Anda. Defaultnya adalah -1 (tak terbatas).
-
Pisahkan batch pada kesalahan - Ketika fungsi Anda mengembalikan kesalahan, bagi batch menjadi dua batch yang lebih kecil dan coba lagi masing-masing secara terpisah. Ini membantu mengisolasi catatan bermasalah.
-
Respons batch sebagian - Izinkan fungsi Anda mengembalikan informasi tentang catatan mana dalam batch yang gagal diproses, sehingga Lambda hanya dapat mencoba kembali catatan yang gagal.
Mengkonfigurasi kontrol penanganan kesalahan (konsol)
Anda dapat mengonfigurasi perilaku coba lagi saat membuat atau memperbarui pemetaan sumber peristiwa Kafka di konsol Lambda.
Untuk mengonfigurasi perilaku coba lagi untuk sumber acara Kafka (konsol)
-
Buka halaman Fungsi
di konsol Lambda. -
Pilih nama fungsi Anda.
-
Lakukan salah satu tindakan berikut:
-
Untuk menambahkan pemicu Kafka baru, di bawah Ikhtisar fungsi, pilih Tambah pemicu.
-
Untuk memodifikasi pemicu Kafka yang ada, pilih pemicu dan kemudian pilih Edit.
-
-
Di bawah konfigurasi poller peristiwa, pilih mode yang disediakan untuk mengonfigurasi kontrol penanganan kesalahan:
-
Untuk mencoba lagi, masukkan jumlah maksimum percobaan ulang (0-10000, atau -1 untuk tak terbatas).
-
Untuk usia rekaman maksimum, masukkan usia maksimum dalam hitungan detik (60-604800, atau -1 untuk tak terbatas).
-
Untuk mengaktifkan pemisahan batch saat terjadi kesalahan, pilih Pisahkan batch pada kesalahan.
-
Untuk mengaktifkan respons batch sebagian, pilih ReportBatchItemFailures.
-
-
Pilih Tambah atau Simpan.
Mengkonfigurasi perilaku coba lagi ()AWS CLI
Gunakan AWS CLI perintah berikut untuk mengonfigurasi perilaku coba lagi untuk pemetaan sumber acara Kafka Anda.
Membuat pemetaan sumber peristiwa dengan konfigurasi coba lagi
Contoh berikut membuat pemetaan sumber peristiwa Kafka yang dikelola sendiri dengan kontrol penanganan kesalahan:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Untuk sumber acara MSK Amazon:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Memperbarui konfigurasi coba lagi
Gunakan update-event-source-mapping perintah untuk memodifikasi konfigurasi coba lagi untuk pemetaan sumber peristiwa yang ada:
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
PartialBatchResponse
Respons batch sebagian, juga dikenal sebagai ReportBatchItemFailures, adalah fitur utama untuk penanganan kesalahan dalam integrasi Lambda dengan sumber Kafka. Tanpa fitur ini, ketika terjadi kesalahan di salah satu item dalam batch, itu menghasilkan pemrosesan ulang semua pesan dalam batch itu. Dengan respons batch sebagian diaktifkan dan diimplementasikan, handler mengembalikan pengenal hanya untuk pesan yang gagal, memungkinkan Lambda untuk mencoba lagi hanya item tertentu tersebut. Ini memberikan kontrol yang lebih besar atas bagaimana batch yang berisi pesan gagal diproses.
Untuk melaporkan kesalahan batch, Anda akan menggunakan skema JSON ini:
{ "batchItemFailures": [ { "itemIdentifier": { "topic-partition": "topic-partition_number", "offset": 100 } }, ... ] }
penting
Jika Anda mengembalikan JSON atau null yang valid kosong, pemetaan sumber peristiwa akan menganggap batch berhasil diproses. Setiap topik-partition_number atau offset yang dikembalikan yang tidak valid yang tidak ada dalam acara yang dipanggil akan diperlakukan sebagai kegagalan dan seluruh batch akan dicoba ulang.
Contoh kode berikut menunjukkan cara mengimplementasikan respons batch sebagian untuk fungsi Lambda yang menerima peristiwa dari sumber Kafka. Fungsi melaporkan kegagalan item batch dalam respons, memberi sinyal ke Lambda untuk mencoba lagi pesan tersebut nanti.
Berikut adalah implementasi handler Lambda Python yang menunjukkan pendekatan ini:
import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "topic-partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass
Berikut adalah versi Node.js:
const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "topic-partition": `${topic}-${partition}`, offset: Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };