Menerapkan de-agregasi konsumen - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menerapkan de-agregasi konsumen

Dimulai dengan rilis 1.4.0, KCL mendukung de-agregasi otomatis catatan pengguna. KPL Kode aplikasi konsumen yang ditulis dengan versi sebelumnya dari KCL akan dikompilasi tanpa modifikasi apa pun setelah Anda memperbarui. KCL Namun, jika KPL agregasi digunakan di sisi produsen, ada kehalusan yang melibatkan checkpointing: semua subrecord dalam catatan agregat memiliki nomor urut yang sama, sehingga data tambahan harus disimpan dengan pos pemeriksaan jika Anda perlu membedakan antara subrecord. Data tambahan ini disebut sebagai nomor urutan.

Migrasi dari versi sebelumnya KCL

Anda tidak diharuskan mengubah panggilan yang ada untuk melakukan checkpointing bersamaan dengan agregasi. Masih dijamin bahwa Anda dapat mengambil semua catatan yang berhasil disimpan di Kinesis Data Streams. KCLSekarang menyediakan dua operasi pos pemeriksaan baru untuk mendukung kasus penggunaan tertentu, dijelaskan di bawah ini.

Jika kode Anda yang ada ditulis untuk KPL dukungan KCL sebelumnya, dan operasi pos pemeriksaan Anda dipanggil tanpa argumen, itu setara dengan memeriksa nomor urut catatan KPL pengguna terakhir dalam batch. Jika operasi pos pemeriksaan Anda dipanggil dengan string nomor urut, itu setara dengan pemeriksaan nomor urut yang diberikan dari batch bersama dengan nomor urutan implisit 0 (nol).

Memanggil operasi KCL pos pemeriksaan baru checkpoint() tanpa argumen apa pun secara semantik setara dengan pemeriksaan nomor urut Record panggilan terakhir dalam batch, bersama dengan nomor urutan implisit 0 (nol).

Memanggil operasi KCL pos pemeriksaan baru checkpoint(Record record) secara semantik setara dengan pemeriksaan nomor urut yang diberikan Record bersama dengan nomor urutan implisit 0 (nol). Jika Record panggilan sebenarnya aUserRecord, nomor UserRecord urut dan nomor urutan diperiksa.

Memanggil operasi KCL pos pemeriksaan baru checkpoint(String sequenceNumber, long subSequenceNumber) secara eksplisit memeriksa nomor urut yang diberikan bersama dengan nomor urutan yang diberikan.

Dalam salah satu kasus ini, setelah pos pemeriksaan disimpan di tabel pos pemeriksaan Amazon DynamoDBKCL, dapat melanjutkan pengambilan catatan dengan benar bahkan ketika aplikasi mogok dan restart. Jika lebih banyak catatan terkandung dalam urutan, pengambilan terjadi dimulai dengan catatan nomor urutan berikutnya dalam catatan dengan nomor urut yang terakhir diperiksa. Jika pos pemeriksaan terbaru menyertakan nomor urutan terakhir dari catatan nomor urut sebelumnya, pengambilan terjadi dimulai dengan catatan dengan nomor urut berikutnya.

Bagian selanjutnya membahas rincian urutan dan urutan checkpointing untuk konsumen yang perlu menghindari melewatkan dan duplikasi catatan. Jika melewatkan (atau duplikasi) catatan saat menghentikan dan memulai ulang pemrosesan catatan konsumen Anda tidak penting, Anda dapat menjalankan kode yang ada tanpa modifikasi.

Gunakan KCL ekstensi untuk KPL de-agregasi

KPLde-agregasi dapat melibatkan pemeriksaan urutan. Untuk memfasilitasi penggunaan checkpointing sequence, UserRecord kelas telah ditambahkan ke: KCL

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

Kelas ini sekarang digunakan sebagai penggantiRecord. Ini tidak merusak kode yang ada karena merupakan subkelas dari. Record UserRecordKelas mewakili subrecord aktual dan standar, catatan non-agregat. Catatan non-agregat dapat dianggap sebagai catatan agregat dengan tepat satu subrecord.

Selain itu, dua operasi baru ditambahkan keIRecordProcessorCheckpointer:

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

Untuk mulai menggunakan checkpointing nomor urutan, Anda dapat melakukan konversi berikut. Ubah kode formulir berikut:

checkpointer.checkpoint(record.getSequenceNumber());

Kode formulir baru:

checkpointer.checkpoint(record);

Kami menyarankan Anda menggunakan checkpoint(Record record) formulir untuk checkpointing berikutnya. Namun, jika Anda sudah menyimpan sequenceNumbers dalam string untuk digunakan untuk checkpointing, Anda sekarang juga harus menyimpansubSequenceNumber, seperti yang ditunjukkan pada contoh berikut:

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

Pemeran dari Record untuk UserRecord selalu berhasil karena implementasinya selalu menggunakanUserRecord. Kecuali ada kebutuhan untuk melakukan aritmatika pada nomor urut, pendekatan ini tidak disarankan.

Saat memproses catatan KPL pengguna, KCL menulis nomor urutan ke Amazon DynamoDB sebagai bidang tambahan untuk setiap baris. Versi sebelumnya dari yang KCL digunakan AFTER_SEQUENCE_NUMBER untuk mengambil catatan saat melanjutkan pos pemeriksaan. Arus KCL dengan KPL dukungan menggunakan AT_SEQUENCE_NUMBER sebagai gantinya. Ketika catatan pada nomor urut yang diperiksa diambil, nomor urutan yang diperiksa diperiksa, dan subrecord dijatuhkan sebagaimana mestinya (yang mungkin semuanya, jika subrecord terakhir adalah yang diperiksa). Sekali lagi, catatan non-agregat dapat dianggap sebagai catatan agregat dengan satu subrecord, sehingga algoritma yang sama berfungsi untuk catatan agregat dan non-agregat.

Gunakan GetRecords secara langsung

Anda juga dapat memilih untuk tidak menggunakan KCL tetapi menjalankan API operasi GetRecords secara langsung untuk mengambil catatan Kinesis Data Streams. Untuk membongkar catatan yang diambil ini ke dalam catatan KPL pengguna asli Anda, panggil salah satu operasi statis berikut di: UserRecord.java

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

Operasi pertama menggunakan nilai default 0 (nol) untuk startingHashKey dan nilai default 2^128 -1 untukendingHashKey.

Masing-masing operasi ini melakukan de-agregasi daftar catatan Kinesis Data Streams yang diberikan ke dalam daftar catatan pengguna. KPL Setiap catatan KPL pengguna yang kunci hash eksplisit atau kunci partisi berada di luar jangkauan startingHashKey (inklusif) dan endingHashKey (inklusif) dibuang dari daftar catatan yang dikembalikan.