Menggunakan Perpustakaan Klien Kinesis - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Perpustakaan Klien Kinesis

Salah satu metode pengembangan aplikasi konsumen kustom yang dapat memproses data dari aliran data KDS adalah dengan menggunakan Kinesis Client Library (KCL).

catatan

Untuk KCL 1.x dan KCL 2.x, Anda disarankan untuk meningkatkan ke versi KCL 1.x terbaru atau versi KCL 2.x, tergantung pada skenario penggunaan Anda. Baik KCL 1.x dan KCL 2.x diperbarui secara berkala dengan rilis yang lebih baru yang mencakup dependensi dan patch keamanan terbaru, perbaikan bug, dan fitur baru yang kompatibel ke belakang. Untuk informasi lebih lanjut, lihat https://github.com/awslabs/ amazon-kinesis-client /releases.

Apa itu Perpustakaan Klien Kinesis?

KCL membantu Anda mengkonsumsi dan memproses data dari aliran data Kinesis dengan menangani banyak tugas kompleks yang terkait dengan komputasi terdistribusi. Ini termasuk load balancing di beberapa instance aplikasi konsumen, menanggapi kegagalan instans aplikasi konsumen, memeriksa catatan yang diproses, dan bereaksi terhadap resharding. KCL menangani semua subtugas ini sehingga Anda dapat memfokuskan upaya Anda untuk menulis logika pemrosesan catatan khusus Anda.

KCL berbeda dari Kinesis Data Streams API yang tersedia di SDK. AWS Kinesis Data Streams API membantu Anda mengelola banyak aspek Kinesis Data Streams, termasuk membuat stream, resharding, dan menempatkan serta mendapatkan rekaman. KCL menyediakan lapisan abstraksi di sekitar semua subtugas ini, khususnya sehingga Anda dapat fokus pada logika pemrosesan data kustom aplikasi konsumen Anda. Untuk informasi tentang Kinesis Data Streams API, lihat Referensi API Amazon Kinesis.

penting

KCL adalah perpustakaan Java. Support untuk bahasa selain Java disediakan menggunakan antarmuka multi-bahasa yang disebut. MultiLangDaemon Daemon ini berbasis Java dan berjalan di latar belakang saat Anda menggunakan bahasa KCL selain Java. Misalnya, jika Anda menginstal KCL untuk Python dan menulis aplikasi konsumen Anda sepenuhnya dengan Python, Anda masih memerlukan Java diinstal pada sistem Anda karena itu. MultiLangDaemon Selanjutnya, MultiLangDaemon memiliki beberapa pengaturan default yang mungkin perlu Anda sesuaikan untuk kasus penggunaan Anda, misalnya, AWS wilayah yang terhubung dengannya. Untuk informasi selengkapnya tentang MultiLangDaemon on GitHub, lihat MultiLangDaemon proyek KCL.

KCL bertindak sebagai perantara antara logika pemrosesan rekaman Anda dan Kinesis Data Streams. KCL melakukan tugas-tugas berikut:

  • Terhubung ke aliran data

  • Menghitung pecahan dalam aliran data

  • Menggunakan sewa untuk mengoordinasikan asosiasi pecahan dengan pekerjanya

  • Membuat instance pemroses rekaman untuk setiap pecahan yang dikelolanya

  • Menarik catatan data dari aliran data

  • Mendorong rekaman ke pemroses rekaman yang sesuai

  • Catatan yang diproses di pos pemeriksaan

  • Menyeimbangkan asosiasi shard-worker (leases) saat jumlah instans pekerja berubah atau saat aliran data di-sharded (pecahan dibagi atau digabungkan)

Versi Tersedia KCL

Saat ini, Anda dapat menggunakan salah satu dari versi KCL yang didukung berikut ini untuk membangun aplikasi konsumen kustom Anda:

Anda dapat menggunakan KCL 1.x atau KCL 2.x untuk membangun aplikasi konsumen yang menggunakan throughput bersama. Untuk informasi selengkapnya, lihat Mengembangkan Konsumen Kustom dengan Throughput Bersama Menggunakan KCL.

Untuk membangun aplikasi konsumen yang menggunakan throughput khusus (konsumen fan-out yang disempurnakan), Anda hanya dapat menggunakan KCL 2.x. Untuk informasi selengkapnya, lihat Mengembangkan Konsumen Khusus dengan Throughput Khusus (Peningkatan Fan-Out).

Untuk informasi tentang perbedaan antara KCL 1.x dan KCL 2.x, dan petunjuk tentang cara bermigrasi dari KCL 1.x ke KCL 2.x, lihat. Migrasi Konsumen dari KCL 1.x ke KCL 2.x

Konsep KCL

  • Aplikasi konsumen KCL — aplikasi yang dibuat khusus menggunakan KCL dan dirancang untuk membaca dan memproses catatan dari aliran data.

  • Contoh aplikasi konsumen - Aplikasi konsumen KCL biasanya didistribusikan, dengan satu atau lebih instance aplikasi berjalan secara bersamaan untuk mengoordinasikan kegagalan dan pemrosesan catatan data keseimbangan beban secara dinamis.

  • Worker — kelas tingkat tinggi yang digunakan instance aplikasi konsumen KCL untuk mulai memproses data.

    penting

    Setiap instance aplikasi konsumen KCL memiliki satu pekerja.

    Pekerja menginisialisasi dan mengawasi berbagai tugas, termasuk menyinkronkan informasi pecahan dan sewa, melacak tugas pecahan, dan memproses data dari pecahan. Seorang pekerja memberi KCL informasi konfigurasi untuk aplikasi konsumen, seperti nama aliran data yang datanya mencatat aplikasi konsumen KCL ini akan diproses dan AWS kredensyal yang diperlukan untuk mengakses aliran data ini. Pekerja juga memulai instance aplikasi konsumen KCL tertentu untuk mengirimkan catatan data dari aliran data ke prosesor rekaman.

    penting

    Dalam KCL 1.x kelas ini disebut Worker. Untuk informasi lebih lanjut, (ini adalah repositori Java KCL), lihat https://github.com/awslabs/ /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/worker.java. amazon-kinesis-client Dalam KCL 2.x, kelas ini disebut Scheduler. Tujuan Scheduler di KCL 2.x identik dengan tujuan Pekerja di KCL 1.x. Untuk informasi selengkapnya tentang kelas Scheduler di KCL 2.x, lihat https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java. amazon-kinesis-client

  • Sewa — data yang mendefinisikan pengikatan antara pekerja dan pecahan. Aplikasi konsumen KCL terdistribusi menggunakan sewa untuk mempartisi pemrosesan catatan data di seluruh armada pekerja. Pada waktu tertentu, setiap pecahan catatan data terikat pada pekerja tertentu dengan sewa yang diidentifikasi oleh variabel LeaseKey.

    Secara default, seorang pekerja dapat memegang satu atau lebih sewa (tunduk pada nilai variabel maxLeasesForWorker) pada saat yang sama.

    penting

    Setiap pekerja akan bersaing untuk memegang semua sewa yang tersedia untuk semua pecahan yang tersedia dalam aliran data. Tetapi hanya satu pekerja yang akan berhasil memegang setiap sewa pada satu waktu.

    Misalnya, jika Anda memiliki instance aplikasi konsumen A dengan pekerja A yang memproses aliran data dengan 4 pecahan, pekerja A dapat menyimpan sewa ke pecahan 1, 2, 3, dan 4 secara bersamaan. Tetapi jika Anda memiliki dua instance aplikasi konsumen: A dan B dengan pekerja A dan pekerja B, dan instance ini memproses aliran data dengan 4 pecahan, pekerja A dan pekerja B tidak dapat menahan sewa untuk shard 1 secara bersamaan. Seorang pekerja memegang sewa ke pecahan tertentu sampai siap untuk berhenti memproses catatan data pecahan ini atau sampai gagal. Ketika satu pekerja berhenti memegang sewa, pekerja lain mengambil dan memegang sewa.

    Untuk informasi lebih lanjut, (ini adalah repositori Java KCL), lihat https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/lease.java untuk KCL 1.x dan https://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/leases/Lease.java untuk KCL 2.x. amazon-kinesis-client amazon-kinesis-client

  • Tabel sewa - tabel Amazon DynamoDB unik yang digunakan untuk melacak pecahan dalam aliran data KDS yang disewakan dan diproses oleh pekerja aplikasi konsumen KCL. Tabel sewa harus tetap sinkron (di dalam pekerja dan di semua pekerja) dengan informasi pecahan terbaru dari aliran data saat aplikasi konsumen KCL berjalan. Untuk informasi selengkapnya, lihat Menggunakan Meja Sewa untuk Melacak Pecahan yang Diproses oleh Aplikasi Konsumen KCL.

  • Rekam prosesor — logika yang mendefinisikan bagaimana aplikasi konsumen KCL Anda memproses data yang didapatnya dari aliran data. Saat runtime, instance aplikasi konsumen KCL membuat instance pekerja, dan pekerja ini membuat instance satu prosesor rekaman untuk setiap pecahan yang disewanya.

Menggunakan Meja Sewa untuk Melacak Pecahan yang Diproses oleh Aplikasi Konsumen KCL

Apa itu Meja Sewa

Untuk setiap aplikasi Amazon Kinesis Data Streams, KCL menggunakan tabel sewa unik (disimpan dalam tabel Amazon DynamoDB) untuk melacak pecahan dalam aliran data KDS yang disewakan dan diproses oleh pekerja aplikasi konsumen KCL.

penting

KCL menggunakan nama aplikasi konsumen untuk membuat nama tabel sewa yang digunakan aplikasi konsumen ini, oleh karena itu setiap nama aplikasi konsumen harus unik.

Anda dapat melihat tabel sewa menggunakan konsol Amazon DynamoDB saat aplikasi konsumen sedang berjalan.

Jika tabel sewa untuk aplikasi konsumen KCL Anda tidak ada saat aplikasi dimulai, salah satu pekerja membuat tabel sewa untuk aplikasi ini.

penting

Akun Anda dikenakan biaya untuk biaya yang terkait dengan tabel DynamoDB, selain biaya yang terkait dengan Kinesis Data Streams itu sendiri.

Setiap baris dalam tabel sewa mewakili pecahan yang sedang diproses oleh pekerja aplikasi konsumen Anda. Jika aplikasi konsumen KCL Anda hanya memproses satu aliran dataleaseKey, maka kunci hash untuk tabel sewa adalah ID pecahan. Jika yaMemproses Beberapa Aliran Data dengan KCL 2.x yang sama untuk Aplikasi Konsumen Java, maka struktur LeaseKey terlihat seperti ini:. account-id:StreamName:streamCreationTimestamp:ShardId Misalnya, 111111111:multiStreamTest-1:12345:shardId-000000000336.

Selain ID pecahan, setiap baris juga menyertakan data berikut:

  • pos pemeriksaan: Nomor urutan pos pemeriksaan terbaru untuk pecahan. Nilai ini unik di semua pecahan dalam aliran data.

  • checkpointSubSequenceNomor: Saat menggunakan fitur agregasi Perpustakaan Produser Kinesis, ini adalah ekstensi ke pos pemeriksaan yang melacak catatan pengguna individu dalam catatan Kinesis.

  • LeaseCounter: Digunakan untuk pembuatan versi sewa sehingga pekerja dapat mendeteksi bahwa sewa mereka telah diambil oleh pekerja lain.

  • LeaseKey: Pengidentifikasi unik untuk sewa. Setiap sewa khusus untuk pecahan dalam aliran data dan dipegang oleh satu pekerja pada satu waktu.

  • LeaseOwner: Pekerja yang memegang sewa ini.

  • ownerSwitchesSincePos pemeriksaan: Berapa kali sewa ini telah berganti pekerja sejak terakhir kali pos pemeriksaan ditulis.

  • parentShardId: Digunakan untuk memastikan bahwa pecahan induk diproses sepenuhnya sebelum pemrosesan dimulai pada pecahan anak. Ini memastikan bahwa catatan diproses dalam urutan yang sama dengan yang dimasukkan ke dalam aliran.

  • hashrange: Digunakan oleh PeriodicShardSyncManager untuk menjalankan sinkronisasi berkala untuk menemukan pecahan yang hilang di tabel sewa dan membuat sewa untuk mereka jika diperlukan.

    catatan

    Data ini hadir dalam tabel sewa untuk setiap pecahan dimulai dengan KCL 1.14 dan KCL 2.3. Untuk informasi lebih lanjut tentang PeriodicShardSyncManager dan sinkronisasi berkala antara sewa dan pecahan, lihat. Bagaimana Tabel Sewa Disinkronkan dengan Pecahan dalam Aliran Data KDS

  • childshards: Digunakan oleh LeaseCleanupManager untuk meninjau status pemrosesan pecahan anak dan memutuskan apakah pecahan induk dapat dihapus dari tabel sewa.

    catatan

    Data ini hadir dalam tabel sewa untuk setiap pecahan dimulai dengan KCL 1.14 dan KCL 2.3.

  • ShardID: ID pecahan.

    catatan

    Data ini hanya ada di tabel sewa jika AndaMemproses Beberapa Aliran Data dengan KCL 2.x yang sama untuk Aplikasi Konsumen Java. Ini hanya didukung di KCL 2.x untuk Java, dimulai dengan KCL 2.3 untuk Java dan seterusnya.

  • nama aliran Pengidentifikasi aliran data dalam format berikut:account-id:StreamName:streamCreationTimestamp.

    catatan

    Data ini hanya ada di tabel sewa jika AndaMemproses Beberapa Aliran Data dengan KCL 2.x yang sama untuk Aplikasi Konsumen Java. Ini hanya didukung di KCL 2.x untuk Java, dimulai dengan KCL 2.3 untuk Java dan seterusnya.

Throughput

Jika aplikasi Amazon Kinesis Data Streams menerima pengecualian throughput yang disediakan, Anda harus meningkatkan throughput yang disediakan untuk tabel DynamoDB. KCL membuat tabel dengan throughput yang disediakan 10 pembacaan per detik dan 10 penulisan per detik, tetapi ini mungkin tidak cukup untuk aplikasi Anda. Misalnya, jika aplikasi Amazon Kinesis Data Streams sering melakukan pemeriksaan atau beroperasi pada aliran yang terdiri dari banyak pecahan, Anda mungkin memerlukan lebih banyak throughput.

Untuk informasi tentang throughput yang disediakan di DynamoDB, lihat Mode Kapasitas Baca/Tulis dan Bekerja dengan Tabel dan Data di Panduan Pengembang Amazon DynamoDB.

Bagaimana Tabel Sewa Disinkronkan dengan Pecahan dalam Aliran Data KDS

Pekerja dalam aplikasi konsumen KCL menggunakan sewa untuk memproses pecahan dari aliran data tertentu. Informasi tentang pekerja apa yang menyewakan pecahan apa pada waktu tertentu disimpan dalam tabel sewa. Tabel sewa harus tetap sinkron dengan informasi pecahan terbaru dari aliran data saat aplikasi konsumen KCL berjalan. KCL menyinkronkan tabel sewa dengan informasi pecahan yang diperoleh dari layanan Kinesis Data Streams selama bootstrap aplikasi konsumen (baik ketika aplikasi konsumen diinisialisasi atau dimulai ulang) dan juga setiap kali pecahan yang sedang diproses mencapai akhir (resharding). Dengan kata lain, pekerja atau aplikasi konsumen KCL disinkronkan dengan aliran data yang mereka proses selama bootstrap aplikasi konsumen awal dan setiap kali aplikasi konsumen menemukan peristiwa reshard aliran data.

Sinkronisasi di KCL 1.0 - 1.13 dan KCL 2.0 - 2.2

Di KCL 1.0 - 1.13 dan KCL 2.0 - 2.2, selama bootstrap aplikasi konsumen dan juga selama setiap peristiwa reshard aliran data, KCL menyinkronkan tabel sewa dengan informasi pecahan yang diperoleh dari layanan Kinesis Data Streams dengan menjalankan atau API penemuan. ListShards DescribeStream Dalam semua versi KCL yang tercantum di atas, setiap pekerja aplikasi konsumen KCL menyelesaikan langkah-langkah berikut untuk melakukan proses sinkronisasi sewa/shard selama bootstrap aplikasi konsumen dan pada setiap acara reshard aliran:

  • Mengambil semua pecahan untuk data aliran yang sedang diproses

  • Mengambil semua sewa pecahan dari tabel sewa

  • Menyaring setiap pecahan terbuka yang tidak memiliki sewa di tabel sewa

  • Mengulangi semua pecahan terbuka yang ditemukan dan untuk setiap pecahan terbuka tanpa induk terbuka:

    • Melintasi pohon hierarki melalui jalur leluhurnya untuk menentukan apakah pecahan itu adalah keturunan. Pecahan dianggap sebagai keturunan, jika pecahan leluhur sedang diproses (entri sewa untuk pecahan leluhur ada di tabel sewa) atau jika pecahan leluhur harus diproses (misalnya, jika posisi awal adalah atau) TRIM_HORIZON AT_TIMESTAMP

    • Jika pecahan terbuka dalam konteks adalah keturunan, KCL memeriksa pecahan berdasarkan posisi awal dan membuat sewa untuk orang tuanya, jika diperlukan

Sinkronisasi di KCL 2.x, Dimulai dengan KCL 2.3 dan Beyond

Dimulai dengan versi terbaru yang didukung dari KCL 2.x (KCL 2.3) dan seterusnya, perpustakaan sekarang mendukung perubahan berikut pada proses sinkronisasi. Perubahan sinkronisasi lease/shard ini secara signifikan mengurangi jumlah panggilan API yang dilakukan oleh aplikasi konsumen KCL ke layanan Kinesis Data Streams dan mengoptimalkan manajemen sewa di aplikasi konsumen KCL Anda.

  • Selama bootstrap aplikasi, jika tabel sewa kosong, KCL menggunakan opsi pemfilteran ListShard API (parameter permintaan ShardFilter opsional) untuk mengambil dan membuat sewa hanya untuk snapshot pecahan yang terbuka pada waktu yang ditentukan oleh parameter. ShardFilter ShardFilterParameter ini memungkinkan Anda untuk memfilter respons ListShards API. Satu-satunya properti yang diperlukan dari ShardFilter parameter adalahType. KCL menggunakan properti Type filter dan berikut nilai validnya untuk mengidentifikasi dan mengembalikan snapshot pecahan terbuka yang mungkin memerlukan sewa baru:

    • AT_TRIM_HORIZON- Responsnya mencakup semua pecahan yang terbuka diTRIM_HORIZON.

    • AT_LATEST- Respons hanya mencakup pecahan aliran data yang saat ini terbuka.

    • AT_TIMESTAMP- respons mencakup semua pecahan yang stempel waktu awalnya kurang dari atau sama dengan stempel waktu yang diberikan dan stempel waktu akhir lebih besar dari atau sama dengan stempel waktu yang diberikan atau masih terbuka.

    ShardFilterdigunakan saat membuat sewa untuk tabel sewa kosong untuk menginisialisasi sewa untuk snapshot pecahan yang ditentukan di. RetrievalConfig#initialPositionInStreamExtended

    Untuk informasi selengkapnya tentang ShardFilter, lihat https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Alih-alih semua pekerja melakukan sinkronisasi sewa/pecahan untuk menjaga tabel sewa tetap mutakhir dengan pecahan terbaru dalam aliran data, satu pemimpin pekerja terpilih melakukan sinkronisasi sewa/pecahan.

  • KCL 2.3 menggunakan parameter ChildShards pengembalian GetRecords dan SubscribeToShard API untuk melakukan sinkronisasi sewa/pecahan yang terjadi pada pecahan tertutup, memungkinkan pekerja KCL SHARD_END untuk hanya membuat sewa untuk pecahan anak dari pecahan yang selesai diproses. Untuk dibagikan di seluruh aplikasi konsumen, pengoptimalan sinkronisasi sewa/pecahan ini menggunakan parameter API. ChildShards GetRecords Untuk aplikasi konsumen throughput khusus (fan-out yang ditingkatkan), pengoptimalan sinkronisasi lease/shard ini menggunakan parameter API. ChildShards SubscribeToShard Lihat informasi selengkapnya di GetRecords, SubscribeToShards, dan ChildShard.

  • Dengan perubahan di atas, perilaku KCL bergerak dari model semua pekerja yang belajar tentang semua pecahan yang ada ke model pekerja yang hanya belajar tentang pecahan pecahan anak-anak yang dimiliki setiap pekerja. Oleh karena itu, selain sinkronisasi yang terjadi selama bootstraping aplikasi konsumen dan peristiwa reshard, KCL sekarang juga melakukan pemindaian shard/lease berkala tambahan untuk mengidentifikasi lubang potensial dalam tabel sewa (dengan kata lain, untuk mempelajari semua pecahan baru) untuk memastikan rentang hash lengkap dari aliran data sedang diproses dan membuat sewa untuk mereka jika diperlukan. PeriodicShardSyncManageradalah komponen yang bertanggung jawab untuk menjalankan pemindaian lease/shard berkala.

    Untuk informasi lebih lanjut tentang PeriodicShardSyncManager di KCL 2.3, lihat https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/leases/ amazon-kinesis-client .java #L201 -L213. LeaseManagementConfig

    Di KCL 2.3, opsi konfigurasi baru tersedia untuk dikonfigurasi PeriodicShardSyncManager diLeaseManagementConfig:

    Nama Nilai default Deskripsi
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000 (2 menit)

    Frekuensi (dalam millis) pekerjaan auditor untuk memindai sewa sebagian dalam tabel sewa. Jika auditor mendeteksi lubang apa pun dalam sewa untuk aliran, maka itu akan memicu sinkronisasi pecahan berdasarkan. leasesRecoveryAuditorInconsistencyConfidenceThreshold

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Ambang batas kepercayaan untuk pekerjaan auditor periodik untuk menentukan apakah sewa untuk aliran data dalam tabel sewa tidak konsisten. Jika auditor menemukan kumpulan inkonsistensi yang sama secara berurutan untuk aliran data untuk ini berkali-kali, maka itu akan memicu sinkronisasi pecahan.

    CloudWatch Metrik baru juga sekarang dipancarkan untuk memantau kesehatan. PeriodicShardSyncManager Untuk informasi selengkapnya, lihat PeriodicShardSyncManager.

  • Termasuk optimasi HierarchicalShardSyncer untuk hanya membuat sewa untuk satu lapisan pecahan.

Sinkronisasi di KCL 1.x, Dimulai dengan KCL 1.14 dan Beyond

Dimulai dengan versi terbaru yang didukung dari KCL 1.x (KCL 1.14) dan seterusnya, perpustakaan sekarang mendukung perubahan berikut pada proses sinkronisasi. Perubahan sinkronisasi lease/shard ini secara signifikan mengurangi jumlah panggilan API yang dilakukan oleh aplikasi konsumen KCL ke layanan Kinesis Data Streams dan mengoptimalkan manajemen sewa di aplikasi konsumen KCL Anda.

  • Selama bootstrap aplikasi, jika tabel sewa kosong, KCL menggunakan opsi pemfilteran ListShard API (parameter permintaan ShardFilter opsional) untuk mengambil dan membuat sewa hanya untuk snapshot pecahan yang terbuka pada waktu yang ditentukan oleh parameter. ShardFilter ShardFilterParameter ini memungkinkan Anda untuk memfilter respons ListShards API. Satu-satunya properti yang diperlukan dari ShardFilter parameter adalahType. KCL menggunakan properti Type filter dan berikut nilai validnya untuk mengidentifikasi dan mengembalikan snapshot pecahan terbuka yang mungkin memerlukan sewa baru:

    • AT_TRIM_HORIZON- Responsnya mencakup semua pecahan yang terbuka diTRIM_HORIZON.

    • AT_LATEST- Respons hanya mencakup pecahan aliran data yang saat ini terbuka.

    • AT_TIMESTAMP- respons mencakup semua pecahan yang stempel waktu awalnya kurang dari atau sama dengan stempel waktu yang diberikan dan stempel waktu akhir lebih besar dari atau sama dengan stempel waktu yang diberikan atau masih terbuka.

    ShardFilterdigunakan saat membuat sewa untuk tabel sewa kosong untuk menginisialisasi sewa untuk snapshot pecahan yang ditentukan di. KinesisClientLibConfiguration#initialPositionInStreamExtended

    Untuk informasi selengkapnya tentang ShardFilter, lihat https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Alih-alih semua pekerja melakukan sinkronisasi sewa/pecahan untuk menjaga tabel sewa tetap mutakhir dengan pecahan terbaru dalam aliran data, satu pemimpin pekerja terpilih melakukan sinkronisasi sewa/pecahan.

  • KCL 1.14 menggunakan parameter ChildShards pengembalian GetRecords dan SubscribeToShard API untuk melakukan sinkronisasi sewa/pecahan yang terjadi pada pecahan tertutup, memungkinkan pekerja KCL SHARD_END untuk hanya membuat sewa untuk pecahan anak dari pecahan pecahan yang selesai diproses. Lihat informasi yang lebih lengkap di GetRecords dan ChildShard.

  • Dengan perubahan di atas, perilaku KCL bergerak dari model semua pekerja yang belajar tentang semua pecahan yang ada ke model pekerja yang hanya belajar tentang pecahan pecahan anak-anak yang dimiliki setiap pekerja. Oleh karena itu, selain sinkronisasi yang terjadi selama bootstraping aplikasi konsumen dan peristiwa reshard, KCL sekarang juga melakukan pemindaian shard/lease berkala tambahan untuk mengidentifikasi lubang potensial dalam tabel sewa (dengan kata lain, untuk mempelajari semua pecahan baru) untuk memastikan rentang hash lengkap dari aliran data sedang diproses dan membuat sewa untuk mereka jika diperlukan. PeriodicShardSyncManageradalah komponen yang bertanggung jawab untuk menjalankan pemindaian lease/shard berkala.

    Kapan KinesisClientLibConfiguration#shardSyncStrategyType diatur keShardSyncStrategyType.SHARD_END, PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold digunakan untuk menentukan ambang batas untuk jumlah pemindaian berturut-turut yang berisi lubang di tabel sewa setelah itu untuk menegakkan sinkronisasi pecahan. Kapan KinesisClientLibConfiguration#shardSyncStrategyType diatur keShardSyncStrategyType.PERIODIC, leasesRecoveryAuditorInconsistencyConfidenceThreshold diabaikan.

    Untuk informasi lebih lanjut tentang PeriodicShardSyncManager di KCL 1.14, lihat https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ KinesisClientLibConfiguration .java #L987 -L999.

    Di KCL 1.14, opsi konfigurasi baru tersedia untuk dikonfigurasi PeriodicShardSyncManager di: LeaseManagementConfig

    Nama Nilai default Deskripsi
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Ambang batas kepercayaan untuk pekerjaan auditor periodik untuk menentukan apakah sewa untuk aliran data dalam tabel sewa tidak konsisten. Jika auditor menemukan kumpulan inkonsistensi yang sama secara berurutan untuk aliran data untuk ini berkali-kali, maka itu akan memicu sinkronisasi pecahan.

    CloudWatch Metrik baru juga sekarang dipancarkan untuk memantau kesehatan. PeriodicShardSyncManager Untuk informasi selengkapnya, lihat PeriodicShardSyncManager.

  • KCL 1.14 sekarang juga mendukung pembersihan sewa yang ditangguhkan. Sewa dihapus secara asinkron LeaseCleanupManager pada saat mencapaiSHARD_END, ketika pecahan telah kedaluwarsa melewati periode retensi aliran data atau ditutup sebagai hasil dari operasi resharding.

    Opsi konfigurasi baru tersedia untuk dikonfigurasiLeaseCleanupManager.

    Nama Nilai default Deskripsi
    leaseCleanupIntervalMillis

    1 menit

    Interval untuk menjalankan thread pembersihan sewa.

    completedLeaseCleanupIntervalMillis 5 menit

    Interval untuk memeriksa apakah sewa selesai atau tidak.

    garbageLeaseCleanupIntervalMillis 30 menit

    Interval untuk memeriksa apakah sewa adalah sampah (yaitu dipangkas melewati periode retensi aliran data) atau tidak.

  • Termasuk optimasi KinesisShardSyncer untuk hanya membuat sewa untuk satu lapisan pecahan.

Memproses Beberapa Aliran Data dengan KCL 2.x yang sama untuk Aplikasi Konsumen Java

Bagian ini menjelaskan perubahan berikut dalam KCL 2.x untuk Java yang memungkinkan Anda membuat aplikasi konsumen KCL yang dapat memproses lebih dari satu aliran data pada saat yang bersamaan.

penting

Pemrosesan multistream hanya didukung di KCL 2.x untuk Java, dimulai dengan KCL 2.3 untuk Java dan seterusnya.

Pemrosesan multistream TIDAK didukung untuk bahasa lain di mana KCL 2.x dapat diimplementasikan.

Pemrosesan multistream TIDAK didukung dalam versi KCL 1.x apa pun.

  • MultistreamTracker antarmuka

    Untuk membangun aplikasi konsumen yang dapat memproses beberapa aliran pada saat yang sama, Anda harus menerapkan antarmuka baru yang disebut MultistreamTracker. Antarmuka ini mencakup streamConfigList metode yang mengembalikan daftar aliran data dan konfigurasinya untuk diproses oleh aplikasi konsumen KCL. Perhatikan bahwa aliran data yang sedang diproses dapat diubah selama runtime aplikasi konsumen. streamConfigListdisebut secara berkala oleh KCL untuk mempelajari tentang perubahan aliran data untuk diproses.

    streamConfigListMetode ini mengisi StreamConfigdaftar.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    Perhatikan bahwa bidang StreamIdentifier dan InitialPositionInStreamExtended wajib, sementara consumerArn adalah opsional. Anda harus menyediakan consumerArn satu-satunya jika Anda menggunakan KCL 2.x untuk mengimplementasikan aplikasi konsumen fan-out yang disempurnakan.

    Untuk informasi lebih lanjut tentangStreamIdentifier, lihat https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ amazon-kinesis-client /src/main/java/software/amazon/kinesis/common/ .java #L29. StreamIdentifier Anda dapat membuat instance multistream untuk StreamIdentifier dari pengenal aliran serial. Pengidentifikasi aliran serial harus dari format berikut:. account-id:StreamName:streamCreationTimestamp

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTrackerjuga mencakup strategi untuk menghapus sewa aliran lama di tabel sewa (). formerStreamsLeasesDeletionStrategy Perhatikan bahwa strategi TIDAK DAPAT diubah selama runtime aplikasi konsumen. Untuk informasi lebih lanjut, lihat https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ amazon-kinesis-client /src/main/java/software/amazon/kinesis/processor/ .java FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilderadalah kelas seluruh aplikasi yang dapat Anda gunakan untuk menentukan semua pengaturan konfigurasi KCL 2.x yang akan digunakan saat membangun aplikasi konsumen KCL Anda. ConfigsBuilderkelas sekarang memiliki dukungan untuk MultistreamTracker antarmuka. Anda dapat menginisialisasi ConfigsBuilder baik dengan nama satu aliran data untuk menggunakan catatan dari:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    Atau Anda dapat menginisialisasi ConfigsBuilder dengan MultiStreamTracker jika Anda ingin mengimplementasikan aplikasi konsumen KCL yang memproses beberapa aliran secara bersamaan.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Dengan dukungan multistream yang diterapkan untuk aplikasi konsumen KCL Anda, setiap baris tabel sewa aplikasi sekarang berisi ID pecahan dan nama aliran dari beberapa aliran data yang diproses aplikasi ini.

  • Ketika dukungan multistream untuk aplikasi konsumen KCL Anda diimplementasikan, LeaseKey mengambil struktur berikut: account-id:StreamName:streamCreationTimestamp:ShardId Misalnya, 111111111:multiStreamTest-1:12345:shardId-000000000336.

    penting

    Ketika aplikasi konsumen KCL Anda yang ada dikonfigurasi untuk memproses hanya satu aliran data, LeaseKey (yang merupakan kunci hash untuk tabel sewa) adalah ID pecahan. Jika Anda mengkonfigurasi ulang aplikasi konsumen KCL yang ada ini untuk memproses beberapa aliran data, itu merusak tabel sewa Anda, karena dengan dukungan multistream, struktur LeaseKey harus sebagai berikut:. account-id:StreamName:StreamCreationTimestamp:ShardId

Menggunakan Perpustakaan Klien Kinesis dengan Registri Skema AWS Glue

Anda dapat mengintegrasikan aliran data Kinesis Anda dengan registri skema AWS Glue. Registri skema AWS Glue memungkinkan Anda menemukan, mengontrol, dan mengembangkan skema secara terpusat, sambil memastikan data yang dihasilkan terus divalidasi oleh skema terdaftar. Sebuah skema mendefinisikan struktur dan format catatan data. Sebuah skema adalah sebuah spesifikasi berversi untuk publikasi data yang handal, konsumsi, atau penyimpanan. AWSGlue Schema Registry memungkinkan Anda untuk meningkatkan kualitas end-to-end data dan tata kelola data dalam aplikasi streaming Anda. Untuk informasi selengkapnya, lihat AWSGlue Schema Registry. Salah satu cara untuk mengatur integrasi ini adalah melalui KCL di Jawa.

penting

Saat ini, integrasi registri skema Kinesis Data AWS Streams dan Glue hanya didukung untuk aliran data Kinesis yang menggunakan konsumen KCL 2.3 yang diterapkan di Jawa. Support multi-bahasa tidak tersedia. Konsumen KCL 1.0 tidak didukung. Konsumen KCL 2.x sebelum KCL 2.3 tidak didukung.

Untuk petunjuk terperinci tentang cara mengatur integrasi Kinesis Data Streams dengan Schema Registry menggunakan KCL, lihat bagian “Berinteraksi dengan Data Menggunakan Perpustakaan KPL/KCL” di Kasus Penggunaan: Mengintegrasikan Aliran Data Kinesis Amazon dengan Registri Skema Glue. AWS