Menggunakan Lambda dengan Amazon MSK - AWS Lambda

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Lambda dengan Amazon MSK

Amazon Managed Streaming for Apache Kafka (Amazon MSK) adalah layanan terkelola penuh yang yang dapat Anda gunakan untuk membangun dan menjalankan aplikasi yang menggunakan Apache Kafka untuk memproses data streaming. Amazon MSK menyederhanakan penyiapan, penskalaan, dan manajemen klaster yang menjalankan Kafka. Amazon MSK juga memudahkan untuk mengkonfigurasi aplikasi Anda untuk beberapa Availability Zone dan untuk keamanan dengan AWS Identity and Access Management (IAM). Amazon MSK mendukung beberapa versi open-source Kafka.

Amazon MSK sebagai sumber peristiwa beroperasi sama dengan menggunakan Amazon Simple Queue Service (Amazon SQS) atau Amazon Kinesis. Lambda melakukan polling secara internal untuk pesan baru dari sumber peristiwa, lalu memanggil fungsi Lambda target secara sinkron. Lambda membaca pesan dalam batch dan menyediakan ini untuk fungsi Anda sebagai muatan acara. Ukuran batch maksimum dapat dikonfigurasi (defaultnya adalah 100 pesan). Untuk informasi selengkapnya, lihat Perilaku batching.

catatan

Sementara fungsi Lambda biasanya memiliki batas waktu tunggu maksimum 15 menit, pemetaan sumber acara untuk Amazon MSK, Apache Kafka yang dikelola sendiri, Amazon DocumentDB, dan Amazon MQ untuk ActiveMQ dan RabbitMQ hanya mendukung fungsi dengan batas waktu tunggu maksimum 14 menit. Kendala ini memastikan bahwa pemetaan sumber peristiwa dapat menangani kesalahan fungsi dan percobaan ulang dengan benar.

Lambda membaca pesan secara berurutan untuk setiap partisi. Payload Lambda tunggal dapat berisi pesan dari beberapa partisi. Setelah Lambda memproses setiap batch, Lambda melakukan offset pesan dalam batch tersebut. Jika fungsi Anda mengembalikan kesalahan untuk salah satu pesan dalam batch, Lambda mencoba ulang seluruh batch pesan sampai berhasil diproses atau pesan berakhir.

Awas

Pemetaan sumber peristiwa Lambda memproses setiap peristiwa setidaknya sekali, dan pemrosesan duplikat catatan dapat terjadi. Untuk menghindari potensi masalah yang terkait dengan duplikat peristiwa, kami sangat menyarankan agar Anda membuat kode fungsi Anda idempoten. Untuk mempelajari lebih lanjut, lihat Bagaimana cara membuat fungsi Lambda saya idempoten di Pusat Pengetahuan. AWS

Untuk contoh cara mengonfigurasi MSK Amazon sebagai sumber peristiwa, lihat Menggunakan MSK Amazon sebagai sumber peristiwa AWS Lambda di Blog AWS Komputasi. Untuk tutorial selengkapnya, lihat Integrasi Lambda MSK Amazon di Amazon MSK Labs.

Contoh peristiwa

Lambda mengirimkan batch pesan dalam parameter peristiwa ketika memanggil fungsi Anda. Muatan peristiwa berisi array pesan. Setiap item array berisi detail dari topik Amazon MSK dan pengidentifikasi partisi, bersama-sama dengan stempel waktu dan pesan berkode base64.

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Otentikasi kluster MSK

Lambda memerlukan izin untuk mengakses kluster MSK Amazon, mengambil catatan, dan melakukan tugas lainnya. Amazon MSK mendukung beberapa opsi untuk mengontrol akses klien ke cluster MSK.

Akses tidak diautentikasi

Jika tidak ada klien yang mengakses cluster melalui internet, Anda dapat menggunakan akses yang tidak diautentikasi.

Otentikasi SASL/SCRAM

Amazon MSK mendukung otentikasi Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) otentikasi dengan enkripsi Transport Layer Security (TLS). Agar Lambda terhubung ke cluster, Anda menyimpan kredensyal otentikasi (nama pengguna dan kata sandi) secara rahasia. AWS Secrets Manager

Untuk informasi selengkapnya tentang menggunakan Secrets Manager, lihat Autentikasi nama pengguna dan kata sandi dengan AWS Secrets Manager di Panduan Pengembang Amazon Managed Streaming for Apache Kafka.

Amazon MSK tidak mendukung otentikasi SASL/PLAIN.

Autentikasi berbasis peran IAM

Anda dapat menggunakan IAM untuk mengautentikasi identitas klien yang terhubung ke cluster MSK. Jika autentikasi IAM aktif di kluster MSK Anda, dan Anda tidak memberikan rahasia untuk autentikasi, Lambda secara otomatis default menggunakan autentikasi IAM. Untuk membuat dan menerapkan kebijakan berbasis pengguna atau peran, gunakan konsol IAM atau API. Untuk informasi selengkapnya, lihat Kontrol akses IAM di Panduan Pengembang Amazon Managed Streaming for Apache Kafka Kafka.

Untuk memungkinkan Lambda terhubung ke kluster MSK, membaca catatan, dan melakukan tindakan lain yang diperlukan, tambahkan izin berikut ke peran eksekusi fungsi Anda.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

Anda dapat membuat cakupan izin ini ke klaster, topik, dan grup tertentu. Untuk informasi selengkapnya, lihat tindakan Amazon MSK Kafka di Panduan Pengembang Amazon Managed Streaming for Apache Kafka.

Otentikasi TLS timbal balik

Mutual TLS (mTLS) menyediakan otentikasi dua arah antara klien dan server. Klien mengirimkan sertifikat ke server untuk server untuk memverifikasi klien, dan server mengirimkan sertifikat ke klien untuk klien untuk memverifikasi server.

Untuk Amazon MSK, Lambda bertindak sebagai klien. Anda mengonfigurasi sertifikat klien (sebagai rahasia di Secrets Manager) untuk mengautentikasi Lambda dengan broker di kluster MSK Anda. Sertifikat klien harus ditandatangani oleh CA di toko kepercayaan server. Cluster MSK mengirimkan sertifikat server ke Lambda untuk mengautentikasi broker dengan Lambda. Sertifikat server harus ditandatangani oleh otoritas sertifikat (CA) yang ada di toko AWS kepercayaan.

Untuk petunjuk tentang cara membuat sertifikat klien, lihat Memperkenalkan autentikasi TLS timbal balik untuk Amazon MSK sebagai sumber peristiwa.

Amazon MSK tidak mendukung sertifikat server yang ditandatangani sendiri, karena semua broker di Amazon MSK menggunakan sertifikat publik yang ditandatangani oleh Amazon Trust Services CA, yang dipercaya Lambda secara default.

Untuk informasi selengkapnya tentang MTL untuk Amazon MSK, lihat Mutual TLS Authentication di Amazon Managed Streaming for Apache Kafka Developer Guide.

Mengkonfigurasi rahasia mTLS

Rahasia CLIENT_CERTIFICATE_TLS_AUTH memerlukan bidang sertifikat dan bidang kunci pribadi. Untuk kunci pribadi terenkripsi, rahasianya memerlukan kata sandi kunci pribadi. Baik sertifikat dan kunci pribadi harus dalam format PEM.

catatan

Lambda mendukung algoritma enkripsi kunci pribadi PBES1 (tetapi bukan PBES2).

Bidang sertifikat harus berisi daftar sertifikat, dimulai dengan sertifikat klien, diikuti oleh sertifikat perantara, dan diakhiri dengan sertifikat root. Setiap sertifikat harus dimulai pada baris baru dengan struktur berikut:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager mendukung rahasia hingga 65.536 byte, yang merupakan ruang yang cukup untuk rantai sertifikat yang panjang.

Kunci pribadi harus dalam format PKCS #8, dengan struktur berikut:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Untuk kunci pribadi terenkripsi, gunakan struktur berikut:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

Contoh berikut menunjukkan isi rahasia untuk otentikasi mTLS menggunakan kunci pribadi terenkripsi. Untuk kunci pribadi terenkripsi, Anda menyertakan kata sandi kunci pribadi dalam rahasia.

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Bagaimana Lambda memilih broker bootstrap

Lambda memilih broker bootstrap berdasarkan metode otentikasi yang tersedia di cluster Anda, dan apakah Anda memberikan rahasia untuk otentikasi. Jika Anda memberikan rahasia untuk mTLS atau SASL/SCRAM, Lambda secara otomatis memilih metode autentikasi itu. Jika Anda tidak memberikan rahasia, Lambda memilih metode autentikasi terkuat yang aktif di cluster Anda. Berikut ini adalah urutan prioritas di mana Lambda memilih broker, dari autentikasi terkuat hingga terlemah:

  • mTL (rahasia disediakan untuk mTL)

  • SASL/SCRAM (rahasia disediakan untuk SASL/SCRAM)

  • SASL IAM (tidak ada rahasia yang disediakan, dan autentikasi IAM aktif)

  • TLS yang tidak diautentikasi (tidak ada rahasia yang disediakan, dan autentikasi IAM tidak aktif)

  • Plaintext (tidak ada rahasia yang disediakan, dan autentikasi IAM dan TLS yang tidak diautentikasi tidak aktif)

catatan

Jika Lambda tidak dapat terhubung ke jenis broker yang paling aman, Lambda tidak mencoba untuk terhubung ke jenis broker yang berbeda (lebih lemah). Jika Anda ingin Lambda memilih jenis broker yang lebih lemah, nonaktifkan semua metode autentikasi yang lebih kuat di cluster Anda.

Mengelola akses dan izin API

Selain mengakses kluster MSK Amazon, fungsi Anda memerlukan izin untuk melakukan berbagai tindakan Amazon MSK API. Anda menambahkan izin ini ke peran eksekusi fungsi. Jika pengguna Anda memerlukan akses ke salah satu tindakan Amazon MSK API, tambahkan izin yang diperlukan ke kebijakan identitas untuk pengguna atau peran.

Anda dapat menambahkan setiap izin berikut ke peran eksekusi Anda secara manual. Atau, Anda dapat melampirkan kebijakan AWS terkelola AWSLambdaMSKExecutionRoleke peran eksekusi Anda. AWSLambdaMSKExecutionRoleKebijakan ini berisi semua tindakan API yang diperlukan dan izin VPC yang tercantum di bawah ini.

Izin peran eksekusi fungsi Lambda yang diperlukan

Untuk membuat dan menyimpan log dalam grup log di Amazon CloudWatch Logs, fungsi Lambda Anda harus memiliki izin berikut dalam peran pelaksanaannya:

Agar Lambda dapat mengakses klaster MSK Amazon Anda atas nama Anda, fungsi Lambda Anda harus memiliki izin berikut dalam peran pelaksanaannya:

Anda hanya perlu menambahkan salah satu dari salah satu kafka:DescribeCluster ataukafka:DescribeClusterV2. Untuk klaster MSK yang disediakan, izin berfungsi. Untuk cluster MSK tanpa server, Anda harus menggunakan. kafka:DescribeClusterV2

catatan

Lambda akhirnya berencana untuk menghapus kafka:DescribeCluster izin dari kebijakan AWSLambdaMSKExecutionRole terkelola terkait. Jika Anda menggunakan kebijakan ini, Anda harus memigrasikan aplikasi apa pun yang digunakan kafka:DescribeCluster untuk digunakankafka:DescribeClusterV2.

Izin VPC

Jika hanya pengguna dalam VPC yang dapat mengakses kluster MSK Amazon Anda, fungsi Lambda Anda harus memiliki izin untuk mengakses sumber daya Amazon VPC Anda. Sumber daya ini termasuk VPC, subnet, grup keamanan, dan antarmuka jaringan. Untuk mengakses sumber daya ini, peran eksekusi fungsi Anda harus memiliki izin berikut. Izin ini disertakan dalam kebijakan AWSLambdaMSKExecutionRole AWS terkelola.

Izin fungsi Lambda opsional

Fungsi Lambda Anda mungkin juga memerlukan izin untuk:

  • Akses rahasia SCRAM Anda, jika menggunakan otentikasi SASL/SCRAM.

  • Jelaskan rahasia Secrets Manager Anda.

  • Akses AWS Key Management Service (AWS KMS) kunci terkelola pelanggan Anda.

  • Kirim catatan pemanggilan yang gagal ke tujuan.

Secrets Manager dan AWS KMS izin

Bergantung pada jenis kontrol akses yang Anda konfigurasikan untuk broker MSK Amazon Anda, fungsi Lambda Anda mungkin memerlukan izin untuk mengakses rahasia SCRAM Anda (jika menggunakan otentikasi SASL/SCRAM), atau rahasia Secrets Manager untuk mendekripsi kunci yang dikelola pelanggan Anda. AWS KMS Untuk mengakses sumber daya ini, peran eksekusi fungsi Anda harus memiliki izin berikut:

Menambahkan izin ke peran eksekusi

Ikuti langkah-langkah berikut untuk menambahkan kebijakan AWS terkelola AWSLambdaMSKExecutionRoleke peran eksekusi Anda menggunakan konsol IAM.

Untuk menambahkan kebijakan AWS terkelola
  1. Buka halaman Kebijakan konsol IAM.

  2. Dalam kotak pencarian kebijakan, masukkan nama kebijakan (AWSLambdaMSKExecutionRole).

  3. Pilih kebijakan dari daftar, lalu pilih Tindakan kebijakan, Lampirkan.

  4. Di halaman Lampirkan kebijakan, pilih peran eksekusi Anda dari daftar, lalu pilih Lampirkan kebijakan.

Memberikan akses pengguna dengan kebijakan IAM

Secara default, pengguna dan peran tidak memiliki izin untuk melakukan operasi Amazon MSK API. Untuk memberikan akses ke pengguna di organisasi atau akun Anda, Anda dapat menambahkan atau memperbarui kebijakan berbasis identitas. Untuk informasi selengkapnya, lihat Contoh Kebijakan Berbasis Identitas Amazon MSK di Panduan Developer Amazon Managed Streaming for Apache Kafka.

Kesalahan otentikasi dan otorisasi

Jika salah satu izin yang diperlukan untuk mengkonsumsi data dari kluster MSK Amazon tidak ada, Lambda menampilkan salah satu pesan galat berikut dalam pemetaan sumber peristiwa di bawah Hasil. LastProcessing

Cluster gagal mengotorisasi Lambda

Untuk SASL/SCRAM atau mTL, kesalahan ini menunjukkan bahwa pengguna yang disediakan tidak memiliki semua izin daftar kontrol akses (ACL) Kafka yang diperlukan berikut:

  • DescribeConfigs Cluster

  • Jelaskan Grup

  • Baca Grup

  • Jelaskan Topik

  • Baca Topik

Untuk kontrol akses IAM, peran eksekusi fungsi Anda tidak memiliki satu atau beberapa izin yang diperlukan untuk mengakses grup atau topik. Tinjau daftar izin yang diperlukan diAutentikasi berbasis peran IAM.

Saat Anda membuat ACL Kafka atau kebijakan IAM dengan izin klaster Kafka yang diperlukan, tentukan topik dan kelompokkan sebagai sumber daya. Nama topik harus cocok dengan topik dalam pemetaan sumber acara. Nama grup harus cocok dengan UUID pemetaan sumber peristiwa.

Setelah Anda menambahkan izin yang diperlukan ke peran eksekusi, mungkin perlu beberapa menit agar perubahan diterapkan.

Otentikasi SASL gagal

Untuk SASL/SCRAM, kesalahan ini menunjukkan bahwa nama pengguna dan kata sandi yang diberikan tidak valid.

Untuk kontrol akses IAM, peran eksekusi tidak memiliki kafka-cluster:Connect izin untuk cluster MSK. Tambahkan izin ini ke peran dan tentukan Amazon Resource Name (ARN) cluster sebagai sumber daya.

Anda mungkin melihat kesalahan ini terjadi sebentar-sebentar. Cluster menolak koneksi setelah jumlah koneksi TCP melebihi kuota layanan MSK Amazon. Lambda mundur dan mencoba lagi sampai koneksi berhasil. Setelah Lambda terhubung ke cluster dan polling untuk catatan, hasil pemrosesan terakhir berubah menjadi. OK

Server gagal mengautentikasi Lambda

Kesalahan ini menunjukkan bahwa broker Amazon MSK Kafka gagal mengautentikasi dengan Lambda. Ini dapat terjadi karena salah satu alasan berikut:

  • Anda tidak memberikan sertifikat klien untuk otentikasi mTLS.

  • Anda memberikan sertifikat klien, tetapi broker tidak dikonfigurasi untuk menggunakan MTL.

  • Sertifikat klien tidak dipercaya oleh broker.

Sertifikat atau kunci pribadi yang diberikan tidak valid

Kesalahan ini menunjukkan bahwa konsumen MSK Amazon tidak dapat menggunakan sertifikat atau kunci pribadi yang disediakan. Pastikan sertifikat dan kunci menggunakan format PEM, dan enkripsi kunci pribadi menggunakan algoritma PBES1.

Konfigurasi jaringan

Agar Lambda dapat menggunakan cluster Kafka Anda sebagai sumber acara, Lambda memerlukan akses ke VPC Amazon tempat klaster Anda berada. Kami menyarankan Anda menerapkan titik akhir AWS PrivateLink VPC untuk Lambda untuk mengakses VPC Anda. Terapkan titik akhir untuk Lambda dan (). AWS Security Token Service AWS STS Jika broker menggunakan otentikasi, gunakan juga titik akhir VPC untuk Secrets Manager. Jika Anda mengonfigurasi tujuan saat kegagalan, gunakan juga titik akhir VPC untuk layanan tujuan.

Pastikan juga VPC yang terkait dengan klaster Kafka Anda termasuk mencakup gateway NAT per subnet publik. Untuk informasi selengkapnya, lihat Aktifkan akses internet untuk fungsi Lambda yang terhubung dengan VPC.

Jika Anda menggunakan titik akhir VPC, Anda juga harus mengonfigurasinya untuk mengaktifkan nama DNS pribadi.

Saat Anda membuat pemetaan sumber peristiwa untuk kluster MSK, Lambda memeriksa apakah Elastic Network Interfaces (ENI) sudah ada untuk subnet dan grup keamanan VPC klaster Anda. Jika Lambda menemukan ENI yang ada, ia mencoba untuk menggunakannya kembali. Jika tidak, Lambda membuat ENI baru untuk terhubung ke sumber acara dan menjalankan fungsi Anda.

catatan

Fungsi Lambda selalu berjalan di dalam VPC yang dimiliki oleh layanan Lambda. VPC ini dikelola secara otomatis oleh layanan dan tidak terlihat oleh pelanggan. Anda juga dapat menghubungkan fungsi Anda ke VPC Amazon. Dalam kedua kasus tersebut, konfigurasi VPC fungsi Anda tidak memengaruhi pemetaan sumber peristiwa. Hanya konfigurasi VPC sumber acara yang menentukan bagaimana Lambda terhubung ke sumber acara Anda.

Konfigurasi VPC Amazon Anda dapat ditemukan melalui Amazon MSK API. Anda tidak perlu mengkonfigurasinya selama pengaturan menggunakan create-event-source-mapping perintah.

Untuk informasi selengkapnya tentang mengonfigurasi jaringan, lihat Menyiapkan AWS Lambda dengan cluster Apache Kafka dalam VPC di Blog Komputasi. AWS

Aturan grup keamanan VPC

Konfigurasikan grup keamanan untuk VPC Amazon yang berisi klaster Anda dengan aturan berikut (minimal):

  • Aturan masuk - Izinkan semua lalu lintas di port broker MSK Amazon (9092 untuk teks biasa, 9094 untuk TLS, 9096 untuk SASL, 9098 untuk IAM) untuk grup keamanan yang ditentukan untuk sumber acara Anda.

  • Aturan keluar - Izinkan semua lalu lintas di port 443 untuk semua tujuan. Izinkan semua lalu lintas di port broker MSK Amazon (9092 untuk plaintext, 9094 untuk TLS, 9096 untuk SASL, 9098 untuk IAM) untuk grup keamanan yang ditentukan untuk sumber acara Anda.

  • Jika Anda menggunakan titik akhir VPC alih-alih gateway NAT, grup keamanan yang terkait dengan titik akhir VPC harus mengizinkan semua lalu lintas masuk pada port 443 dari grup keamanan sumber acara.

Bekerja dengan VPC endpoint

Saat Anda menggunakan titik akhir VPC, panggilan API untuk menjalankan fungsi Anda dirutekan melalui titik akhir ini menggunakan ENI. Prinsipal layanan Lambda perlu memanggil sts:AssumeRole dan lambda:InvokeFunction pada peran dan fungsi apa pun yang menggunakan ENI tersebut.

Secara default, titik akhir VPC memiliki kebijakan IAM yang terbuka. Praktik terbaik adalah membatasi kebijakan ini untuk memungkinkan hanya prinsipal tertentu untuk melakukan tindakan yang diperlukan menggunakan titik akhir itu. Untuk memastikan bahwa pemetaan sumber peristiwa Anda dapat menjalankan fungsi Lambda Anda, kebijakan titik akhir VPC harus mengizinkan prinsip layanan Lambda untuk memanggil dan. sts:AssumeRole lambda:InvokeFunction Membatasi kebijakan titik akhir VPC Anda agar hanya mengizinkan panggilan API yang berasal dari organisasi Anda mencegah pemetaan sumber peristiwa berfungsi dengan baik.

Contoh kebijakan titik akhir VPC berikut menunjukkan cara memberikan akses yang diperlukan ke prinsipal layanan Lambda untuk titik akhir dan Lambda. AWS STS

contoh Kebijakan titik akhir VPC - titik akhir AWS STS
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
contoh Kebijakan titik akhir VPC - Titik akhir Lambda
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Jika broker Kafka Anda menggunakan otentikasi, Anda juga dapat membatasi kebijakan titik akhir VPC untuk titik akhir Secrets Manager. Untuk memanggil Secrets Manager API, Lambda menggunakan peran fungsi Anda, bukan kepala layanan Lambda. Contoh berikut menunjukkan kebijakan titik akhir Secrets Manager.

contoh Kebijakan titik akhir VPC - Titik akhir Secrets Manager
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

Jika Anda memiliki tujuan pada kegagalan yang dikonfigurasi, Lambda juga menggunakan peran fungsi Anda untuk memanggil s3:PutObject salah satusns:Publish, sqs:sendMessage atau menggunakan ENI yang dikelola Lambda.

Menambahkan Amazon MSK sebagai sumber peristiwa

Untuk membuat pemetaan sumber peristiwa, tambahkan Amazon MSK sebagai pemicu fungsi Lambda menggunakan konsol Lambda, AWS SDK, atau AWS Command Line Interface (AWS CLI). Perhatikan bahwa saat Anda menambahkan Amazon MSK sebagai pemicu, Lambda mengasumsikan pengaturan VPC klaster MSK Amazon, bukan pengaturan VPC fungsi Lambda.

Bagian ini menjelaskan cara membuat pemetaan sumber peristiwa menggunakan konsol Lambda dan AWS CLI.

Prasyarat

  • Klaster Amazon MSK dan topik Kafka. Untuk informasi selengkapnya, lihat Mulai Menggunakan Amazon MSK di Panduan Developer Amazon Managed Streaming for Apache Kafka.

  • Peran eksekusi dengan izin untuk mengakses AWS sumber daya yang digunakan kluster MSK Anda.

ID grup konsumen yang dapat disesuaikan

Saat mengatur Kafka sebagai sumber acara, Anda dapat menentukan ID grup konsumen. ID grup konsumen ini adalah pengenal yang ada untuk grup konsumen Kafka yang Anda inginkan agar fungsi Lambda Anda bergabung. Anda dapat menggunakan fitur ini untuk memigrasikan pengaturan pemrosesan catatan Kafka yang sedang berlangsung dengan mulus dari konsumen lain ke Lambda.

Jika Anda menentukan ID grup konsumen dan ada poller aktif lainnya dalam grup konsumen tersebut, Kafka mendistribusikan pesan ke semua konsumen. Dengan kata lain, Lambda tidak menerima semua pesan untuk topik Kafka. Jika Anda ingin Lambda menangani semua pesan dalam topik, matikan poller lain di grup konsumen tersebut.

Selain itu, jika Anda menentukan ID grup konsumen, dan Kafka menemukan grup konsumen yang sudah ada yang valid dengan ID yang sama, Lambda mengabaikan parameter untuk StartingPosition pemetaan sumber peristiwa Anda. Sebaliknya, Lambda mulai memproses catatan sesuai dengan offset yang dilakukan dari kelompok konsumen. Jika Anda menentukan ID grup konsumen, dan Kafka tidak dapat menemukan grup konsumen yang ada, maka Lambda mengonfigurasi sumber acara Anda dengan yang ditentukan. StartingPosition

ID grup konsumen yang Anda tentukan harus unik di antara semua sumber acara Kafka Anda. Setelah membuat pemetaan sumber acara Kafka dengan ID grup konsumen yang ditentukan, Anda tidak dapat memperbarui nilai ini.

Menambahkan pemicu Amazon MSK (konsol)

Ikuti langkah-langkah untuk menambahkan klaster Amazon MSK dan topik Kafka sebagai pemicu untuk fungsi Lambda Anda.

Untuk menambahkan pemicu Amazon MSK ke fungsi Lambda Anda (konsol)
  1. Buka halaman Fungsi di konsol Lambda.

  2. Pilih nama fungsi Lambda Anda.

  3. Di bagian Gambaran umum fungsi, pilih Tambah pemicu.

  4. Pada Konfigurasi pemicu, lakukan hal berikut:

    1. Pilih tipe pemicu MSK.

    2. Untuk Klaster MSK, pilih klaster Anda.

    3. Untuk Ukuran batch, masukkan jumlah maksimum pesan yang akan diterima dalam satu batch.

    4. Untuk jendela Batch, masukkan jumlah maksimum detik yang dihabiskan Lambda untuk mengumpulkan catatan sebelum menjalankan fungsi.

    5. Untuk Nama topik, masukkan nama topik Kafka.

    6. (Opsional) Untuk ID grup Konsumen, masukkan ID grup konsumen Kafka untuk bergabung.

    7. (Opsional) Untuk posisi Mulai, pilih Terbaru untuk mulai membaca aliran dari catatan terbaru, Pangkas cakrawala untuk memulai pada catatan paling awal yang tersedia, atau Pada stempel waktu untuk menentukan stempel waktu untuk mulai membaca.

    8. (Opsional) Untuk Otentikasi, pilih kunci rahasia untuk otentikasi dengan broker di klaster MSK Anda.

    9. Untuk membuat pemicu dalam status nonaktif untuk pengujian (disarankan), hapus Aktifkan pemicu. Atau, untuk segera mengaktifkan pemicu, pilih Aktifkan pemicu.

  5. Untuk membuat pemicu, pilih Tambahkan.

Menambahkan pemicu Amazon MSK (AWS CLI)

Gunakan contoh AWS CLI perintah berikut untuk membuat dan melihat pemicu MSK Amazon untuk fungsi Lambda Anda.

Membuat pemicu menggunakan AWS CLI

contoh — Buat pemetaan sumber acara untuk cluster yang menggunakan otentikasi IAM

Contoh berikut menggunakan create-event-source-mapping AWS CLI perintah untuk memetakan fungsi Lambda bernama my-kafka-function ke topik Kafka bernama. AWSKafkaTopic Posisi awal topik diatur ke LATEST. Saat cluster menggunakan autentikasi berbasis peran IAM, Anda tidak memerlukan objek Konfigurasi. SourceAccess Contoh:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
contoh — Buat pemetaan sumber acara untuk cluster yang menggunakan otentikasi SASL/SCRAM

Jika cluster menggunakan otentikasi SASL/SCRAM, Anda harus menyertakan objek SourceAccessKonfigurasi yang menentukan dan ARN SASL_SCRAM_512_AUTH rahasia Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
contoh — Buat pemetaan sumber acara untuk cluster yang menggunakan otentikasi mTLS

Jika cluster menggunakan otentikasi mTLS, Anda harus menyertakan objek SourceAccessKonfigurasi yang menentukan CLIENT_CERTIFICATE_TLS_AUTH dan ARN rahasia Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

Untuk informasi selengkapnya, lihat dokumentasi referensi CreateEventSourceMappingAPI.

Melihat status menggunakan AWS CLI

Contoh berikut menggunakan get-event-source-mapping AWS CLI perintah untuk menggambarkan status pemetaan sumber peristiwa yang Anda buat.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Membuat pemetaan sumber acara lintas akun

Anda dapat menggunakan konektivitas pribadi multi-VPC untuk menghubungkan fungsi Lambda ke kluster MSK yang disediakan secara berbeda. Akun AWS Konektivitas multi-VPC menggunakan AWS PrivateLink, yang menjaga semua lalu lintas dalam jaringan. AWS

catatan

Anda tidak dapat membuat pemetaan sumber peristiwa lintas akun untuk kluster MSK tanpa server.

Untuk membuat pemetaan sumber peristiwa lintas akun, Anda harus terlebih dahulu mengonfigurasi konektivitas multi-VPC untuk kluster MSK. Saat Anda membuat pemetaan sumber peristiwa, gunakan ARN koneksi VPC terkelola alih-alih ARN cluster, seperti yang ditunjukkan pada contoh berikut. CreateEventSourceMappingOperasi juga berbeda tergantung pada jenis otentikasi yang digunakan kluster MSK.

contoh — Buat pemetaan sumber peristiwa lintas akun untuk cluster yang menggunakan otentikasi IAM

Saat cluster menggunakan autentikasi berbasis peran IAM, Anda tidak memerlukan objek Konfigurasi. SourceAccess Contoh:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
contoh — Buat pemetaan sumber peristiwa lintas akun untuk cluster yang menggunakan otentikasi SASL/SCRAM

Jika cluster menggunakan otentikasi SASL/SCRAM, Anda harus menyertakan objek SourceAccessKonfigurasi yang menentukan dan ARN SASL_SCRAM_512_AUTH rahasia Secrets Manager.

Ada dua cara untuk menggunakan rahasia untuk pemetaan sumber peristiwa Amazon MSK lintas akun dengan otentikasi SASL/SCRAM:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
contoh — Buat pemetaan sumber peristiwa lintas akun untuk cluster yang menggunakan otentikasi mTLS

Jika cluster menggunakan otentikasi mTLS, Anda harus menyertakan objek SourceAccessKonfigurasi yang menentukan CLIENT_CERTIFICATE_TLS_AUTH dan ARN rahasia Secrets Manager. Rahasianya dapat disimpan di akun cluster atau akun fungsi Lambda.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Destinasi yang gagal

Untuk menyimpan catatan pemanggilan pemetaan sumber peristiwa yang gagal, tambahkan tujuan ke pemetaan sumber peristiwa fungsi Anda. Setiap catatan yang dikirim ke tujuan adalah dokumen JSON dengan metadata tentang pemanggilan yang gagal. Anda dapat mengonfigurasi topik Amazon SNS, antrian Amazon SQS, atau bucket S3 sebagai tujuan. Peran eksekusi Anda harus memiliki izin untuk tujuan:

Selain itu, jika Anda mengonfigurasi kunci KMS di tujuan Anda, Lambda memerlukan izin berikut tergantung pada jenis tujuan:

Untuk mengonfigurasi tujuan saat gagal menggunakan konsol, ikuti langkah-langkah berikut:

  1. Buka halaman Fungsi di konsol Lambda.

  2. Pilih fungsi.

  3. Di bagian Gambaran umum fungsi, pilih Tambahkan tujuan.

  4. Untuk Sumber, pilih Pemanggilan pemetaan sumber acara.

  5. Untuk pemetaan sumber peristiwa, pilih sumber peristiwa yang dikonfigurasi untuk fungsi ini.

  6. Untuk Kondisi, pilih On failure. Untuk pemanggilan pemetaan sumber peristiwa, ini adalah satu-satunya kondisi yang diterima.

  7. Untuk tipe Tujuan, pilih tipe tujuan yang Lambda kirimkan catatan pemanggilan.

  8. Untuk Tujuan, pilih sumber daya.

  9. Pilih Simpan.

Anda juga dapat mengonfigurasi tujuan pada kegagalan menggunakan file. AWS CLI Misalnya, perintah create-event-source-mapping berikut menambahkan pemetaan sumber peristiwa dengan tujuan kegagalan SQS ke: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Perintah update-event-source-mapping berikut menambahkan tujuan kegagalan S3 ke sumber peristiwa yang terkait dengan input: uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Untuk menghapus tujuan, berikan string kosong sebagai argumen ke destination-config parameter:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Catatan pemanggilan contoh SNS dan SQS

Contoh berikut menunjukkan apa yang Lambda kirim ke topik SNS atau tujuan antrian SQS untuk pemanggilan sumber peristiwa Kafka yang gagal. Setiap kunci di bawah recordsInfo berisi topik dan partisi Kafka, dipisahkan oleh tanda hubung. Misalnya, untuk kuncinya"Topic-0", Topic adalah topik Kafka, dan 0 merupakan partisi. Untuk setiap topik dan partisi, Anda dapat menggunakan data offset dan stempel waktu untuk menemukan catatan pemanggilan asli.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Catatan pemanggilan contoh tujuan S3

Untuk tujuan S3, Lambda mengirimkan seluruh catatan pemanggilan bersama dengan metadata ke tujuan. Contoh berikut menunjukkan bahwa Lambda mengirim ke tujuan bucket S3 untuk pemanggilan sumber peristiwa Kafka yang gagal. Selain semua bidang dari contoh sebelumnya untuk tujuan SQS dan SNS, payload bidang berisi catatan pemanggilan asli sebagai string JSON yang lolos.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Tip

Sebaiknya aktifkan versi S3 di bucket tujuan Anda.

Penskalaan otomatis sumber peristiwa Amazon MSK

Saat Anda awalnya membuat sumber acara MSK Amazon, Lambda mengalokasikan satu konsumen untuk memproses semua partisi dalam topik Kafka. Setiap konsumen memiliki beberapa prosesor yang berjalan secara paralel untuk menangani peningkatan beban kerja. Selain itu, Lambda secara otomatis meningkatkan atau menurunkan jumlah konsumen, berdasarkan beban kerja. Untuk mempertahankan pemesanan pesan di setiap partisi, jumlah maksimum konsumen adalah satu konsumen per partisi dalam topik.

Dalam interval satu menit, Lambda mengevaluasi lag offset konsumen dari semua partisi dalam topik. Jika lag terlalu tinggi, partisi menerima pesan lebih cepat daripada yang dapat diproses Lambda. Jika perlu, Lambda menambahkan atau menghapus konsumen dari topik tersebut. Proses penskalaan penambahan atau penghapusan konsumen terjadi dalam waktu tiga menit setelah evaluasi.

Jika fungsi Lambda target Anda dibatasi, Lambda mengurangi jumlah konsumen. Tindakan ini mengurangi beban kerja pada fungsi dengan mengurangi jumlah pesan yang dapat konsumen ambil dan kirim ke fungsi.

Untuk memantau throughput topik Kafka Anda, lihat metrik Lambda Offset lag yang dipancarkan saat fungsi Anda memproses catatan.

Untuk memeriksa berapa banyak fungsi invokasi yang terjadi secara paralel, Anda juga dapat memantau metrik konkurensi untuk fungsi Anda.

Posisi awal polling dan streaming

Ketahuilah bahwa polling streaming selama pembuatan dan pembaruan pemetaan sumber acara pada akhirnya konsisten.

  • Selama pembuatan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk memulai acara polling dari aliran.

  • Selama pembaruan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk menghentikan dan memulai kembali acara pemungutan suara dari aliran.

Perilaku ini berarti bahwa jika Anda menentukan LATEST sebagai posisi awal untuk aliran, pemetaan sumber peristiwa dapat melewatkan peristiwa selama pembuatan atau pembaruan. Untuk memastikan bahwa tidak ada peristiwa yang terlewatkan, tentukan posisi awal aliran sebagai TRIM_HORIZON atauAT_TIMESTAMP.

CloudWatch Metrik Amazon

Lambda memancarkan OffsetLag metrik saat fungsi Anda memproses catatan. Nilai metrik ini adalah perbedaan offset antara catatan terakhir yang ditulis ke topik sumber peristiwa Kafka dan catatan terakhir yang diproses oleh grup konsumen fungsi Anda. Anda dapat menggunakan OffsetLag untuk memperkirakan latensi antara saat catatan ditambahkan dan kapan grup konsumen Anda memprosesnya.

Tren yang meningkat OffsetLag dapat menunjukkan masalah dengan poller dalam kelompok konsumen fungsi Anda. Untuk informasi selengkapnya, lihat Bekerja dengan metrik fungsi Lambda.

Parameter konfigurasi Amazon MSK

Semua jenis sumber peristiwa Lambda berbagi operasi yang sama CreateEventSourceMappingdan UpdateEventSourceMappingAPI. Namun, hanya beberapa parameter yang berlaku untuk Amazon MSK.

Parameter sumber peristiwa yang berlaku untuk Amazon MSK
Parameter Diperlukan Default Catatan

AmazonManagedKafkaEventSourceConfig

T

Berisi ConsumerGroupId bidang, yang default ke nilai unik.

Hanya dapat mengatur di Create

BatchSize

T

100

Maksimum: 10.000.

Diaktifkan

N

Diaktifkan

EventSourceArn

T

Hanya dapat mengatur di Create

FunctionName

T

FilterCriteria

T

Pemfilteran acara Lambda

MaximumBatchingWindowInDetik

T

500 ms

Perilaku batching

SourceAccessKonfigurasi

T

Tidak ada kredensial

Kredensyal otentikasi SASL/SCRAM atau CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) untuk sumber acara Anda

StartingPosition

T

AT_TIMESTAMP, TRIM_HORIZON, atau TERBARU

Hanya dapat mengatur di Create

StartingPositionStempel waktu

T

Diperlukan jika StartingPosition disetel ke AT_TIMESTAMP

Topik

Y

Nama topik Kafka

Hanya dapat mengatur di Create