Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Kembangkan konsumen Perpustakaan Klien Kinesis dengan Python

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Python.

KCLIni adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. MultiLangDaemon Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan KCL bahasa selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Python dan menulis aplikasi konsumen Anda sepenuhnya dengan Python, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman KCL MultiLangDaemon proyek.

Untuk men-download Python KCL dari GitHub, pergi ke Kinesis Client Library (Python). Untuk mengunduh kode sampel untuk aplikasi KCL konsumen Python, buka halaman proyek sampel untuk KCL Python. GitHub

Anda harus menyelesaikan tugas-tugas berikut saat mengimplementasikan aplikasi KCL konsumen dengan Python:

Menerapkan metode RecordProcessor kelas

RecordProcessKelas harus memperluas RecordProcessorBase untuk mengimplementasikan metode berikut. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihatsample_kclpy_app.py).

def initialize(self, shard_id) def process_records(self, records, checkpointer) def shutdown(self, checkpointer, reason)
menginisialisasi

KCLMemanggil initialize metode ketika prosesor rekaman dipakai, melewati ID pecahan tertentu sebagai parameter. Prosesor rekaman ini hanya memproses pecahan ini, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data Streams memiliki semantik setidaknya sekali, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihatGunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan.

def initialize(self, shard_id)
process_records

KCLMemanggil metode ini, melewati daftar catatan data dari pecahan yang ditentukan oleh initialize metode. Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

def process_records(self, records, checkpointer)

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. recordKamus mengekspos pasangan kunci-nilai berikut untuk mengakses data catatan, nomor urut, dan kunci partisi:

record.get('data') record.get('sequenceNumber') record.get('partitionKey')

Perhatikan bahwa data tersebut dikodekan oleh Base64.

Dalam sampel, metode ini process_records memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. The KCL menangani pelacakan ini untuk Anda dengan mengirimkan Checkpointer objek keprocess_records. Prosesor rekaman memanggil checkpoint metode pada objek ini untuk menginformasikan seberapa jauh perkembangannya dalam memproses catatan di pecahan. KCL Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau penggabungan, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil checkpoint untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan ke checkpoint berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil checkpoint hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil checkpoint setiap panggilan keprocess_records. Prosesor dapat, misalnya, memanggil checkpoint setiap panggilan ketiga. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untukcheckpoint. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Dalam sampel, metode pribadi checkpoint menunjukkan cara memanggil Checkpointer.checkpoint metode menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCLBergantung pada process_records untuk menangani pengecualian apa pun yang timbul dari pemrosesan catatan data. Jika pengecualian dilemparkan dariprocess_records, KCL melompati catatan data yang diteruskan process_records sebelum pengecualian. Artinya, catatan ini tidak dikirim kembali ke prosesor rekaman yang melemparkan pengecualian atau ke prosesor rekaman lainnya di konsumen.

penonaktifan

KCLMemanggil shutdown metode baik saat pemrosesan berakhir (alasan shutdownTERMINATE) atau pekerja tidak lagi merespons (shutdown reason adalah). ZOMBIE

def shutdown(self, checkpointer, reason)

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

Itu KCL juga melewati Checkpointer objek keshutdown. Jika shutdown reasonTERMINATE, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil checkpoint metode pada antarmuka ini.

Ubah properti konfigurasi

Sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihatsample.properties).

Nama aplikasi

KCLMemerlukan nama aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:

  • Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, instance kedua KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.

  • KCLMembuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi negara (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL.

Siapkan kredensil

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan AWSCredentialsProvider properti untuk menetapkan penyedia kredensial. Sample.properties harus membuat kredensil Anda tersedia untuk salah satu penyedia kredensional dalam rantai penyedia kredensi default. Jika Anda menjalankan aplikasi konsumen di EC2 instans Amazon, sebaiknya Anda mengonfigurasi instance dengan IAM peran. AWS kredensil yang mencerminkan izin yang terkait dengan IAM peran ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada sebuah EC2 instance.

File properti sampel dikonfigurasi KCL untuk memproses aliran data Kinesis yang disebut “kata-kata” menggunakan prosesor rekaman yang disertakan. sample_kclpy_app.py