Mengembangkan Konsumen Perpustakaan Klien Kinesis di .NET - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengembangkan Konsumen Perpustakaan Klien Kinesis di .NET

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas .NET.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. MultiLangDaemon Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk.NET dan menulis aplikasi konsumen Anda sepenuhnya di .NET, Anda masih perlu Java diinstal pada sistem Anda karena itu MultiLangDaemon. Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman MultiLangDaemon proyek KCL.

Untuk mengunduh .NET KCL dari GitHub, buka Kinesis Client Library (.NET). Untuk mengunduh kode sampel untuk aplikasi konsumen.NET KCL, buka halaman proyek konsumen sampel KCL untuk .NET di. GitHub

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di .NET:

Menerapkan Metode RecordProcessor Kelas I

Konsumen harus menerapkan metode berikut untukIRecordProcessor. Konsumen sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihat SampleRecordProcessor kelas diSampleConsumer/AmazonKinesisSampleConsumer.cs).

public void Initialize(InitializationInput input) public void ProcessRecords(ProcessRecordsInput input) public void Shutdown(ShutdownInput input)
Inisialisasi

KCL memanggil metode ini ketika prosesor rekaman dipakai, melewati ID pecahan tertentu dalam parameter (). input input.ShardId Prosesor rekaman ini hanya memproses pecahan ini, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data Streams memiliki semantik setidaknya sekali, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihatResharding, Scaling, dan Pemrosesan Paralel.

public void Initialize(InitializationInput input)
ProcessRecords

KCL memanggil metode ini, melewati daftar catatan data dalam input parameter (input.Records) dari pecahan yang ditentukan oleh metode. Initialize Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

public void ProcessRecords(ProcessRecordsInput input)

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi. Pekerja dapat menggunakan nilai-nilai ini saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. RecordKelas mengekspos berikut ini untuk mengakses data catatan, nomor urut, dan kunci partisi:

byte[] Record.Data string Record.SequenceNumber string Record.PartitionKey

Dalam sampel, metode ini ProcessRecordsWithRetries memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini untuk Anda dengan meneruskan Checkpointer objek ke ProcessRecords (input.Checkpointer). Prosesor rekaman memanggil Checkpointer.Checkpoint metode untuk menginformasikan KCL tentang seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini untuk memulai kembali pemrosesan pecahan pada catatan diproses terakhir yang diketahui.

Untuk operasi split atau merge, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil Checkpointer.Checkpoint untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak melewati parameter, KCL mengasumsikan bahwa panggilan untuk Checkpointer.Checkpoint menandakan bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil Checkpointer.Checkpoint hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil Checkpointer.Checkpoint setiap panggilan keProcessRecords. Prosesor dapat, misalnya, memanggil Checkpointer.Checkpoint setiap panggilan ketiga atau keempat. Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untukCheckpointer.Checkpoint. Dalam hal ini, KCL mengasumsikan bahwa catatan telah diproses hanya hingga catatan itu.

Dalam sampel, metode pribadi Checkpoint(Checkpointer checkpointer) menunjukkan cara memanggil Checkpointer.Checkpoint metode menggunakan penanganan pengecualian yang sesuai dan logika coba lagi.

KCL untuk.NET menangani pengecualian secara berbeda dari pustaka bahasa KCL lainnya karena tidak menangani pengecualian apa pun yang muncul dari pemrosesan catatan data. Setiap pengecualian yang tidak tertangkap dari kode pengguna akan merusak program.

Shutdown

KCL memanggil Shutdown metode baik saat pemrosesan berakhir (alasan shutdown adalahTERMINATE) atau pekerja tidak lagi merespons (nilai shutdown input.Reason adalah). ZOMBIE

public void Shutdown(ShutdownInput input)

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan Checkpointer objek keshutdown. Jika alasan shutdown adalahTERMINATE, prosesor rekaman harus menyelesaikan pemrosesan catatan data apa pun, dan kemudian memanggil checkpoint metode pada antarmuka ini.

Ubah Properti Konfigurasi

Konsumen sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihatSampleConsumer/kcl.properties).

Nama Aplikasi

KCL memerlukan aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:

  • Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.

  • KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat Menggunakan Meja Sewa untuk Melacak Pecahan yang Diproses oleh Aplikasi Konsumen KCL.

Mengatur Kredensial

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan AWSCredentialsProvider properti untuk menetapkan penyedia kredensial. Sample.properties harus membuat kredensyal Anda tersedia untuk salah satu penyedia kredensyal dalam rantai penyedia kredensyal default. Jika Anda menjalankan aplikasi konsumen pada instans EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWS kredensyal yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensional bagi konsumen yang berjalan pada instans EC2.

File properti sampel mengonfigurasi KCL untuk memproses aliran data Kinesis yang disebut “kata-kata” menggunakan prosesor rekaman yang disertakan. AmazonKinesisSampleConsumer.cs