Memproses pesan Apache Kafka yang dikelola sendiri dengan Lambda - AWS Lambda

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memproses pesan Apache Kafka yang dikelola sendiri dengan Lambda

Menambahkan klaster Kafka sebagai sumber peristiwa

Untuk membuat pemetaan sumber peristiwa, tambahkan klaster Kafka Anda sebagai pemicu fungsi Lambda menggunakan konsol Lambda, AWS SDK, atauAWS Command Line Interface (AWS CLI).

Bagian ini menjelaskan cara membuat pemetaan sumber peristiwa menggunakan konsol Lambda dan AWS CLI.

Prasyarat

  • Klaster Apache Kafka yang dikelola sendiri. Lambda mendukung Apache Kafka versi 0.10.1.0 dan yang lebih baru.

  • Peran eksekusi dengan izin untuk mengakses AWS sumber daya yang digunakan cluster Kafka yang dikelola sendiri oleh Anda.

ID grup konsumen yang dapat disesuaikan

Saat mengatur Kafka sebagai sumber acara, Anda dapat menentukan ID grup konsumen. ID grup konsumen ini adalah pengenal yang ada untuk grup konsumen Kafka yang Anda inginkan agar fungsi Lambda Anda bergabung. Anda dapat menggunakan fitur ini untuk memigrasikan pengaturan pemrosesan catatan Kafka yang sedang berlangsung dengan mulus dari konsumen lain ke Lambda.

Jika Anda menentukan ID grup konsumen dan ada poller aktif lainnya dalam grup konsumen tersebut, Kafka mendistribusikan pesan ke semua konsumen. Dengan kata lain, Lambda tidak menerima semua pesan untuk topik Kafka. Jika Anda ingin Lambda menangani semua pesan dalam topik, matikan poller lain di grup konsumen tersebut.

Selain itu, jika Anda menentukan ID grup konsumen, dan Kafka menemukan grup konsumen yang sudah ada dengan ID yang sama, Lambda mengabaikan parameter untuk StartingPosition pemetaan sumber peristiwa Anda. Sebaliknya, Lambda mulai memproses catatan sesuai dengan offset yang dilakukan dari kelompok konsumen. Jika Anda menentukan ID grup konsumen, dan Kafka tidak dapat menemukan grup konsumen yang ada, maka Lambda mengonfigurasi sumber acara Anda dengan yang ditentukan. StartingPosition

ID grup konsumen yang Anda tentukan harus unik di antara semua sumber acara Kafka Anda. Setelah membuat pemetaan sumber acara Kafka dengan ID grup konsumen yang ditentukan, Anda tidak dapat memperbarui nilai ini.

Menambahkan klaster Kafka yang dikelola sendiri (konsol)

Ikuti langkah-langkah untuk menambahkan klaster Apache Kafka yang dikelola sendiri dan topik Kafka sebagai pemicu untuk fungsi Lambda Anda.

Untuk menambahkan pemicu Apache Kafka untuk fungsi Lambda Anda (konsol)
  1. Buka halaman Fungsi di konsol Lambda.

  2. Pilih nama fungsi Lambda Anda.

  3. Di bagian Gambaran umum fungsi, pilih Tambah pemicu.

  4. Pada Konfigurasi pemicu, lakukan hal berikut:

    1. Pilih jenis pemicu Apache Kafka.

    2. Untuk Server bootstrap, masukkan alamat pasangan host dan port broker Kafka di klaster Anda, lalu pilih Tambahkan. Ulangi untuk setiap broker Kafka di klaster.

    3. Untuk Nama topik, masukkan nama topik Kafka yang digunakan untuk menyimpan catatan dalam klaster.

    4. (Opsional) Untuk Ukuran batch, masukkan jumlah maksimum catatan yang akan diterima dalam satu batch.

    5. Untuk jendela Batch, masukkan jumlah maksimum detik yang dihabiskan Lambda untuk mengumpulkan catatan sebelum menjalankan fungsi.

    6. (Opsional) Untuk ID grup Konsumen, masukkan ID grup konsumen Kafka untuk bergabung.

    7. (Opsional) Untuk posisi Mulai, pilih Terbaru untuk mulai membaca aliran dari catatan terbaru, Potong cakrawala untuk memulai pada catatan paling awal yang tersedia, atau Pada stempel waktu untuk menentukan stempel waktu untuk mulai membaca.

    8. (Opsional) Untuk VPC, pilih Amazon VPC untuk cluster Kafka Anda. Kemudian, pilih subnet VPC dan grup keamanan VPC.

      Pengaturan ini diperlukan jika hanya pengguna dalam VPC Anda yang mengakses broker Anda.

    9. (Opsional) Untuk Otentikasi, pilih Tambah, lalu lakukan hal berikut:

      1. Pilih protokol akses atau otentikasi broker Kafka di cluster Anda.

        • Jika broker Kafka Anda menggunakan otentikasi SASL/PLAIN, pilih BASIC_AUTH.

        • Jika broker Anda menggunakan otentikasi SASL/SCRAM, pilih salah satu protokol SASL_SCRAM.

        • Jika Anda mengonfigurasi otentikasi mTLS, pilih protokol CLIENT_CERTIFICATE_TLS_AUTH.

      2. Untuk autentikasi SASL/SCRAM atau mTLS, pilih kunci rahasia Secrets Manager yang berisi kredensyal untuk cluster Kafka Anda.

    10. (Opsional) Untuk Enkripsi, pilih rahasia Secrets Manager yang berisi sertifikat CA root yang digunakan broker Kafka Anda untuk enkripsi TLS, jika broker Kafka Anda menggunakan sertifikat yang ditandatangani oleh CA pribadi.

      Pengaturan ini berlaku untuk enkripsi TLS untuk SASL/SCRAM atau SASL/PLAIN, dan untuk otentikasi mTLS.

    11. Untuk membuat pemicu dalam status nonaktif untuk pengujian (disarankan), hapus Aktifkan pemicu. Atau, untuk segera mengaktifkan pemicu, pilih Aktifkan pemicu.

  5. Untuk membuat pemicu, pilih Tambahkan.

Menambahkan klaster Kafka yang dikelola sendiri (AWS CLI)

Gunakan contoh AWS CLI perintah berikut untuk membuat dan melihat pemicu Apache Kafka yang dikelola sendiri untuk fungsi Lambda Anda.

Menggunakan SASL/SCRAM

Jika pengguna Kafka mengakses broker Kafka Anda melalui internet, tentukan rahasia Secrets Manager yang Anda buat untuk otentikasi SASL/SCRAM. Contoh berikut menggunakan create-event-source-mapping AWS CLI perintah untuk memetakan fungsi Lambda bernama my-kafka-function ke topik Kafka bernama. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Menggunakan VPC

Jika hanya Kafka pengguna dalam VPC Anda yang mengakses broker Kafka Anda, Anda harus menentukan VPC, subnet, dan grup keamanan VPC Anda. Contoh berikut menggunakan create-event-source-mapping AWS CLI perintah untuk memetakan fungsi Lambda bernama my-kafka-function ke topik Kafka bernama. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Melihat status menggunakan AWS CLI

Contoh berikut menggunakan get-event-source-mapping AWS CLI perintah untuk menggambarkan status pemetaan sumber peristiwa yang Anda buat.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Parameter konfigurasi Apache Kafka yang dikelola sendiri

Semua jenis sumber peristiwa Lambda berbagi operasi yang sama CreateEventSourceMappingdan UpdateEventSourceMappingAPI. Namun, hanya beberapa parameter yang berlaku untuk Apache Kafka.

Parameter sumber peristiwa yang berlaku untuk Apache Kafka yang dikelola sendiri
Parameter Diperlukan Default Catatan

BatchSize

T

100

Maksimum: 10.000.

Diaktifkan

N

Diaktifkan

FunctionName

T

FilterCriteria

T

Pemfilteran acara Lambda

MaximumBatchingWindowInSeconds

T

500 ms

Perilaku batching

SelfManagedEventSource

T

Daftar Broker Kafka. Hanya dapat mengatur di Create

SelfManagedKafkaEventSourceConfig

T

Berisi ConsumerGroupId bidang yang default ke nilai unik.

Hanya dapat mengatur di Create

SourceAccessConfigurations

T

Tidak ada kredenensi

Informasi VPC atau kredensial autentikasi untuk klaster

Untuk SASL_PLAIN, atur ke BASIC_AUT

StartingPosition

T

AT_TIMESTAMP, TRIM_HORIZON, atau TERBARU

Hanya dapat mengatur di Create

StartingPositionTimestamp

T

Diperlukan jika StartingPosition disetel ke AT_TIMESTAMP

Topik

Y

Nama topik

Hanya dapat mengatur di Create

Menggunakan klaster Kafka sebagai sumber peristiwa

Ketika Anda menambahkan klaster Apache Kafka Anda sebagai pemicu untuk fungsi Lambda Anda, klaster digunakan sebagai sumber peristiwa.

Lambda membaca data peristiwa dari topik Kafka yang Anda tentukan seperti Topics dalam CreateEventSourceMappingpermintaan, berdasarkan StartingPosition yang Anda tentukan. Setelah pemrosesan berhasil, topik Kafka Anda dijalankan untuk klaster Kafka Anda.

Jika Anda menentukan StartingPosition sebagaiLATEST, Lambda mulai membaca dari pesan terbaru di setiap partisi milik topik. Karena mungkin ada beberapa penundaan setelah konfigurasi pemicu sebelum Lambda mulai membaca pesan, Lambda tidak membaca pesan apa pun yang dihasilkan selama jendela ini.

Lambda memproses catatan dari satu atau beberapa partisi topik Kafka yang Anda tentukan dan mengirimkan payload JSON ke fungsi Anda. Bila lebih banyak rekaman tersedia, Lambda terus memproses catatan dalam batch, berdasarkan BatchSize nilai yang Anda tentukan dalam CreateEventSourceMappingpermintaan, hingga fungsi Anda mengikuti topik.

Jika fungsi Anda mengembalikan kesalahan untuk salah satu pesan dalam batch, Lambda mencoba ulang seluruh batch pesan sampai berhasil diproses atau pesan berakhir. Anda dapat mengirim catatan yang gagal dalam semua upaya percobaan ulang ke tujuan yang gagal untuk diproses nanti.

catatan

Sementara fungsi Lambda biasanya memiliki batas waktu tunggu maksimum 15 menit, pemetaan sumber acara untuk Amazon MSK, Apache Kafka yang dikelola sendiri, Amazon DocumentDB, dan Amazon MQ untuk ActiveMQ dan RabbitMQ hanya mendukung fungsi dengan batas waktu tunggu maksimum 14 menit. Kendala ini memastikan bahwa pemetaan sumber peristiwa dapat menangani kesalahan fungsi dan percobaan ulang dengan benar.

Posisi awal polling dan streaming

Ketahuilah bahwa polling streaming selama pembuatan dan pembaruan pemetaan sumber acara pada akhirnya konsisten.

  • Selama pembuatan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk memulai acara polling dari aliran.

  • Selama pembaruan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk menghentikan dan memulai kembali acara pemungutan suara dari aliran.

Perilaku ini berarti bahwa jika Anda menentukan LATEST sebagai posisi awal untuk aliran, pemetaan sumber peristiwa dapat melewatkan peristiwa selama pembuatan atau pembaruan. Untuk memastikan bahwa tidak ada peristiwa yang terlewatkan, tentukan posisi awal aliran sebagai TRIM_HORIZON atauAT_TIMESTAMP.

Penskalaan otomatis sumber peristiwa Kafka

Saat Anda awalnya membuat sumber acara Apache Kafka, Lambda mengalokasikan satu konsumen untuk memproses semua partisi dalam topik Kafka. Setiap konsumen memiliki beberapa prosesor yang berjalan secara paralel untuk menangani peningkatan beban kerja. Selain itu, Lambda secara otomatis meningkatkan atau menurunkan jumlah konsumen, berdasarkan beban kerja. Untuk mempertahankan pemesanan pesan di setiap partisi, jumlah maksimum konsumen adalah satu konsumen per partisi dalam topik.

Dalam interval satu menit, Lambda mengevaluasi lag offset konsumen dari semua partisi dalam topik. Jika lag terlalu tinggi, partisi menerima pesan lebih cepat daripada yang dapat diproses Lambda. Jika perlu, Lambda menambahkan atau menghapus konsumen dari topik tersebut. Proses penskalaan penambahan atau penghapusan konsumen terjadi dalam waktu tiga menit setelah evaluasi.

Jika fungsi target Lambda Anda kelebihan beban, Lambda mengurangi jumlah konsumen. Tindakan ini mengurangi beban kerja pada fungsi dengan mengurangi jumlah pesan yang dapat konsumen ambil dan kirim ke fungsi.

Untuk memantau throughput dari topik Kafka Anda, Anda dapat melihat metrik konsumen Apache Kafka, seperti consumer_lag dan consumer_offset. Untuk memeriksa berapa banyak fungsi invokasi yang terjadi secara paralel, Anda juga dapat memantau metrik konkurensi untuk fungsi Anda.

CloudWatch Metrik Amazon

Lambda memancarkan OffsetLag metrik saat fungsi Anda memproses catatan. Nilai metrik ini adalah perbedaan offset antara catatan terakhir yang ditulis ke topik sumber peristiwa Kafka dan catatan terakhir yang diproses oleh grup konsumen fungsi Anda. Anda dapat menggunakan OffsetLag untuk memperkirakan latensi antara saat catatan ditambahkan dan kapan grup konsumen Anda memprosesnya.

Tren yang meningkat OffsetLag dapat menunjukkan masalah dengan poller dalam kelompok konsumen fungsi Anda. Untuk informasi selengkapnya, lihat Bekerja dengan metrik fungsi Lambda.