Mengembangkan Konsumen Perpustakaan Klien Kinesis di Node.js - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengembangkan Konsumen Perpustakaan Klien Kinesis di Node.js

Anda dapat menggunakan Kinesis Client Library (KCL) untuk membangun aplikasi yang memproses data dari aliran data Kinesis Anda. Perpustakaan Klien Kinesis tersedia dalam berbagai bahasa. Topik ini membahas Node.js.

KCL adalah perpustakaan Java; dukungan untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. MultiLangDaemon Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Oleh karena itu, jika Anda menginstal KCL untuk Node.js dan menulis aplikasi konsumen Anda sepenuhnya di Node.js, Anda masih memerlukan Java diinstal pada sistem Anda karena itu MultiLangDaemon. Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS Wilayah yang terhubung dengannya. Untuk informasi lebih lanjut tentang MultiLangDaemon on GitHub, buka halaman MultiLangDaemon proyek KCL.

Untuk mengunduh Node.js KCL dari GitHub, buka Perpustakaan Klien Kinesis (Node.js).

Unduhan Kode Sampel

Ada dua contoh kode yang tersedia untuk KCL di Node.js:

  • sampel dasar

    Digunakan di bagian berikut untuk menggambarkan dasar-dasar membangun aplikasi konsumen KCL di Node.js.

  • click-stream-sample

    Sedikit lebih maju dan menggunakan skenario dunia nyata, setelah Anda membiasakan diri dengan kode sampel dasar. Sampel ini tidak dibahas di sini tetapi memiliki file README dengan informasi lebih lanjut.

Anda harus menyelesaikan tugas-tugas berikut saat menerapkan aplikasi konsumen KCL di Node.js:

Menerapkan Record Processor

Konsumen paling sederhana yang mungkin menggunakan KCL untuk Node.js harus mengimplementasikan recordProcessor fungsi, yang pada gilirannya berisi fungsiinitialize,processRecords, danshutdown. Sampel menyediakan implementasi yang dapat Anda gunakan sebagai titik awal (lihatsample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
menginisialisasi

KCL memanggil initialize fungsi ketika prosesor rekaman dimulai. Prosesor rekaman ini hanya memproses ID pecahan yang diteruskan sebagaiinitializeInput.shardId, dan biasanya, kebalikannya juga benar (pecahan ini hanya diproses oleh prosesor rekaman ini). Namun, konsumen Anda harus memperhitungkan kemungkinan bahwa catatan data dapat diproses lebih dari satu kali. Ini karena Kinesis Data Streams memiliki semantik setidaknya sekali, artinya setiap catatan data dari pecahan diproses setidaknya satu kali oleh pekerja di konsumen Anda. Untuk informasi lebih lanjut tentang kasus di mana pecahan tertentu dapat diproses oleh lebih dari satu pekerja, lihatResharding, Scaling, dan Pengolahan Paralel.

initialize: function(initializeInput, completeCallback)
processRecords

KCL memanggil fungsi ini dengan input yang berisi daftar catatan data dari pecahan yang ditentukan ke fungsi tersebutinitialize. Prosesor rekaman yang Anda terapkan memproses data dalam catatan ini sesuai dengan semantik konsumen Anda. Misalnya, pekerja mungkin melakukan transformasi pada data dan kemudian menyimpan hasilnya di bucket Amazon Simple Storage Service (Amazon S3).

processRecords: function(processRecordsInput, completeCallback)

Selain data itu sendiri, catatan juga berisi nomor urut dan kunci partisi, yang dapat digunakan pekerja saat memproses data. Misalnya, pekerja dapat memilih bucket S3 untuk menyimpan data berdasarkan nilai kunci partisi. recordKamus mengekspos pasangan kunci-nilai berikut untuk mengakses data catatan, nomor urut, dan kunci partisi:

record.data record.sequenceNumber record.partitionKey

Perhatikan bahwa data tersebut dikodekan oleh Base64.

Dalam sampel dasar, fungsi processRecords memiliki kode yang menunjukkan bagaimana seorang pekerja dapat mengakses data rekaman, nomor urut, dan kunci partisi.

Kinesis Data Streams membutuhkan prosesor rekaman untuk melacak catatan yang telah diproses dalam pecahan. KCL menangani pelacakan ini dengan checkpointer objek yang dilewatkan sebagaiprocessRecordsInput.checkpointer. Prosesor rekaman Anda memanggil checkpointer.checkpoint fungsi untuk memberi tahu KCL seberapa jauh perkembangannya dalam memproses catatan di pecahan. Jika pekerja gagal, KCL menggunakan informasi ini saat Anda memulai ulang pemrosesan pecahan sehingga berlanjut dari catatan olahan terakhir yang diketahui.

Untuk operasi split atau merge, KCL tidak mulai memproses pecahan baru sampai prosesor untuk pecahan asli dipanggil checkpoint untuk memberi sinyal bahwa semua pemrosesan pada pecahan asli selesai.

Jika Anda tidak meneruskan nomor urut ke checkpoint fungsi, KCL mengasumsikan bahwa panggilan ke checkpoint berarti bahwa semua catatan telah diproses, hingga catatan terakhir yang diteruskan ke prosesor rekaman. Oleh karena itu, prosesor rekaman harus memanggil checkpoint hanya setelah memproses semua catatan dalam daftar yang diteruskan ke sana. Prosesor rekaman tidak perlu memanggil checkpoint setiap panggilan keprocessRecords. Prosesor dapat, misalnya, memanggil setiap panggilan ketiga, atau beberapa peristiwa checkpoint di luar prosesor rekaman Anda, seperti layanan verifikasi/validasi khusus yang telah Anda terapkan.

Anda dapat secara opsional menentukan nomor urut yang tepat dari catatan sebagai parameter untukcheckpoint. Dalam hal ini, KCL mengasumsikan bahwa semua catatan telah diproses hingga catatan itu saja.

Aplikasi sampel dasar menunjukkan panggilan sesederhana mungkin ke checkpointer.checkpoint fungsi tersebut. Anda dapat menambahkan logika checkpointing lain yang Anda butuhkan untuk konsumen Anda pada titik ini dalam fungsi.

penonaktifan

KCL memanggil shutdown fungsi baik saat pemrosesan berakhir (shutdownInput.reasonisTERMINATE) atau pekerja tidak lagi merespons (shutdownInput.reasonisZOMBIE).

shutdown: function(shutdownInput, completeCallback)

Pemrosesan berakhir ketika prosesor rekaman tidak menerima catatan lebih lanjut dari pecahan, karena pecahan dipecah atau digabungkan, atau aliran dihapus.

KCL juga meneruskan shutdownInput.checkpointer objek keshutdown. Jika alasan shutdown adalahTERMINATE, Anda harus memastikan bahwa prosesor rekaman telah selesai memproses catatan data apa pun, dan kemudian memanggil checkpoint fungsi pada antarmuka ini.

Ubah Properti Konfigurasi

Sampel memberikan nilai default untuk properti konfigurasi. Anda dapat mengganti salah satu properti ini dengan nilai Anda sendiri (lihat sample.properties di sampel dasar).

Nama Aplikasi

KCL memerlukan aplikasi yang unik di antara aplikasi Anda, dan di antara tabel Amazon DynamoDB di Wilayah yang sama. Ini menggunakan nilai konfigurasi nama aplikasi dengan cara berikut:

  • Semua pekerja yang terkait dengan nama aplikasi ini diasumsikan bekerja sama pada aliran yang sama. Pekerja ini dapat didistribusikan pada beberapa contoh. Jika Anda menjalankan instance tambahan dari kode aplikasi yang sama, tetapi dengan nama aplikasi yang berbeda, KCL memperlakukan instance kedua sebagai aplikasi yang sepenuhnya terpisah yang juga beroperasi pada aliran yang sama.

  • KCL membuat tabel DynamoDB dengan nama aplikasi dan menggunakan tabel untuk mempertahankan informasi status (seperti pos pemeriksaan dan pemetaan pecahan pekerja) untuk aplikasi. Setiap aplikasi memiliki tabel DynamoDB sendiri. Untuk informasi selengkapnya, lihat Menggunakan Meja Sewa untuk Melacak Pecahan yang Diproses oleh Aplikasi Konsumen KCL.

Mengatur Kredensial

Anda harus membuat AWS kredensil Anda tersedia untuk salah satu penyedia kredensi dalam rantai penyedia kredensi default. Anda dapat menggunakan AWSCredentialsProvider properti untuk menetapkan penyedia kredensial. sample.propertiesFile harus membuat kredensyal Anda tersedia untuk salah satu penyedia kredensyal dalam rantai penyedia kredensi default. Jika Anda menjalankan konsumen di instans Amazon EC2, sebaiknya Anda mengonfigurasi instans dengan peran IAM. AWSkredensyal yang mencerminkan izin yang terkait dengan peran IAM ini tersedia untuk aplikasi pada instance melalui metadata instance-nya. Ini adalah cara paling aman untuk mengelola kredensil untuk aplikasi konsumen yang berjalan pada instans EC2.

Contoh berikut mengkonfigurasi KCL untuk memproses aliran data Kinesis bernama kclnodejssample menggunakan prosesor rekaman yang disediakan di: sample_kcl_app.js

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON