Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Memecahkan Masalah Kinesis Data Streams konsumen
Topik berikut menawarkan solusi untuk masalah umum dengan konsumen Amazon Kinesis Data Streams:
Kesalahan kompilasi dengan LeaseManagementConfig konstruktor
Beberapa catatan Kinesis Data Streams dilewati saat menggunakan Perpustakaan Klien Kinesis
Catatan milik pecahan yang sama diproses oleh prosesor rekaman yang berbeda pada saat yang sama
Aplikasi konsumen membaca pada tingkat yang lebih lambat dari yang diharapkan
GetRecords mengembalikan array catatan kosong bahkan ketika ada data dalam aliran
Kesalahan kompilasi dengan LeaseManagementConfig konstruktor
Saat memutakhirkan ke Kinesis Client Library (KCL) versi 3.x atau yang lebih baru, Anda mungkin mengalami kesalahan kompilasi yang terkait dengan konstruktor. LeaseManagementConfig
Jika Anda langsung membuat LeaseManagementConfig
objek untuk mengatur konfigurasi alih-alih menggunakan ConfigsBuilder
dalam KCL versi 3.x atau yang lebih baru, Anda mungkin melihat pesan kesalahan berikut saat mengkompilasi kode aplikasi KCL Anda.
Cannot resolve constructor 'LeaseManagementConfig(String, DynamoDbAsyncClient, KinesisAsyncClient, String)'
KCL dengan versi 3.x atau yang lebih baru mengharuskan Anda untuk menambahkan satu parameter lagi, applicationName (type: String), setelah parameter TableName.
-
Sebelum: leaseManagementConfig = baru LeaseManagementConfig (TableName, DBClient dynamo, KinesisClient, streamName, WorkerIdentifier)
-
Setelah: leaseManagementConfig = baru LeaseManagementConfig (TableName, applicationName, dynamo, KinesisClient, streamNameDBClient, workerIdentifier)
Alih-alih langsung membuat LeaseManagementConfig objek, sebaiknya gunakan ConfigsBuilder
untuk mengatur konfigurasi di KCL 3.x dan versi yang lebih baru. ConfigsBuilder
menyediakan cara yang lebih fleksibel dan dapat dipelihara untuk mengkonfigurasi aplikasi KCL Anda.
Berikut ini adalah contoh penggunaan ConfigsBuilder
untuk mengatur konfigurasi KCL.
ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig() .failoverTimeMillis(60000), // this is an example configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Beberapa catatan Kinesis Data Streams dilewati saat menggunakan Perpustakaan Klien Kinesis
Penyebab paling umum dari catatan yang dilewati adalah pengecualian yang tidak tertangani yang dilemparkan. processRecords
Perpustakaan Klien Kinesis (KCL) bergantung pada processRecords
kode Anda untuk menangani pengecualian apa pun yang timbul dari pemrosesan catatan data. Setiap pengecualian yang dilemparkan processRecords
diserap oleh KCL. Untuk menghindari percobaan ulang tak terbatas pada kegagalan berulang, KCL tidak mengirim ulang batch catatan yang diproses pada saat pengecualian. KCL kemudian memanggil processRecords
batch rekaman data berikutnya tanpa memulai ulang prosesor rekaman. Ini secara efektif menghasilkan aplikasi konsumen yang mengamati catatan yang dilewati. Untuk mencegah catatan yang dilewati, tangani semua pengecualian di dalamnya processRecords
dengan tepat.
Catatan milik pecahan yang sama diproses oleh prosesor rekaman yang berbeda pada saat yang sama
Untuk aplikasi Kinesis Client Library (KCL) yang berjalan, pecahan hanya memiliki satu pemilik. Namun, beberapa prosesor rekaman dapat memproses pecahan yang sama untuk sementara. Jika instance pekerja kehilangan konektivitas jaringan, KCL mengasumsikan bahwa pekerja yang tidak dapat dijangkau tidak lagi memproses catatan setelah waktu failover berakhir, dan mengarahkan instance pekerja lain untuk mengambil alih. Untuk waktu yang singkat, prosesor rekaman baru dan prosesor rekaman dari pekerja yang tidak terjangkau dapat memproses data dari pecahan yang sama.
Tetapkan waktu failover yang sesuai untuk aplikasi Anda. Untuk aplikasi latensi rendah, default 10 detik mungkin mewakili waktu maksimum yang ingin Anda tunggu. Namun, dalam kasus di mana Anda mengharapkan masalah konektivitas seperti melakukan panggilan di seluruh wilayah geografis di mana konektivitas dapat hilang lebih sering, jumlah ini mungkin terlalu rendah.
Aplikasi Anda harus mengantisipasi dan menangani skenario ini, terutama karena konektivitas jaringan biasanya dikembalikan ke pekerja yang sebelumnya tidak dapat dijangkau. Jika prosesor rekaman memiliki pecahan yang diambil oleh prosesor rekaman lain, ia harus menangani dua kasus berikut untuk melakukan shutdown yang anggun:
-
Setelah panggilan saat ini
processRecords
selesai, KCL memanggil metode shutdown pada prosesor rekaman dengan alasan shutdown 'ZOMBIE.' Prosesor rekaman Anda diharapkan untuk membersihkan sumber daya apa pun yang sesuai dan kemudian keluar. -
Ketika Anda mencoba untuk memeriksa dari pekerja 'zombie', KCL melempar.
ShutdownException
Setelah menerima pengecualian ini, kode Anda diharapkan keluar dari metode saat ini dengan bersih.
Untuk informasi selengkapnya, lihat Menangani catatan duplikat.
Aplikasi konsumen membaca pada tingkat yang lebih lambat dari yang diharapkan
Alasan paling umum untuk throughput baca lebih lambat dari yang diharapkan adalah sebagai berikut:
-
Beberapa aplikasi konsumen memiliki total pembacaan melebihi batas per-shard. Untuk informasi selengkapnya, lihat Kuota dan batas. Dalam hal ini, tingkatkan jumlah pecahan dalam aliran data Kinesis.
-
Batas yang menentukan jumlah maksimum GetRecords per panggilan mungkin telah dikonfigurasi dengan nilai rendah. Jika Anda menggunakan KCL, Anda mungkin telah mengonfigurasi pekerja dengan nilai rendah untuk
maxRecords
properti tersebut. Secara umum, kami sarankan menggunakan default sistem untuk properti ini. -
Logika di dalam
processRecords
panggilan Anda mungkin memakan waktu lebih lama dari yang diharapkan karena sejumlah alasan yang mungkin; logikanya mungkin intensif CPU, pemblokiran I/O, atau macet pada sinkronisasi. Untuk menguji apakah ini benar, uji coba jalankan prosesor rekaman kosong dan bandingkan throughput baca. Untuk informasi tentang cara mengikuti data yang masuk, lihatGunakan resharding, scaling, dan parallel processing untuk mengubah jumlah pecahan.
Jika Anda hanya memiliki satu aplikasi konsumen, selalu mungkin untuk membaca setidaknya dua kali lebih cepat dari tarif put. Itu karena Anda dapat menulis hingga 1.000 catatan per detik untuk menulis, hingga total tingkat penulisan data maksimum 1 MB per detik (termasuk kunci partisi). Setiap pecahan terbuka dapat mendukung hingga 5 transaksi per detik untuk pembacaan, hingga total kecepatan baca data maksimum 2 MB per detik. Perhatikan bahwa setiap pembacaan (GetRecordspanggilan) mendapat sekumpulan catatan. Ukuran data yang dikembalikan GetRecords bervariasi tergantung pada pemanfaatan pecahan. Ukuran maksimum data yang GetRecords dapat dikembalikan adalah 10 MB. Jika panggilan mengembalikan batas itu, panggilan berikutnya yang dilakukan dalam 5 detik berikutnya akan melempar aProvisionedThroughputExceededException
.
GetRecords mengembalikan array catatan kosong bahkan ketika ada data dalam aliran
Mengkonsumsi, atau mendapatkan catatan adalah model tarik. Pengembang diharapkan untuk memanggil GetRecordsdalam loop kontinu tanpa back-off. Setiap panggilan untuk GetRecords juga mengembalikan ShardIterator
nilai, yang harus digunakan dalam iterasi berikutnya dari loop.
GetRecordsOperasi tidak memblokir. Sebaliknya, ia segera kembali; dengan catatan data yang relevan atau dengan Records
elemen kosong. Records
Elemen kosong dikembalikan dalam dua kondisi:
-
Tidak ada lagi data saat ini di pecahan.
-
Tidak ada data di dekat bagian pecahan yang ditunjukkan oleh.
ShardIterator
Kondisi terakhir tidak kentara, tetapi merupakan tradeoff desain yang diperlukan untuk menghindari waktu pencarian (latensi) tak terbatas saat mengambil catatan. Dengan demikian, aplikasi yang memakan streaming harus mengulang dan memanggilGetRecords, menangani catatan kosong sebagai hal yang biasa.
Dalam skenario produksi, satu-satunya waktu loop kontinu harus keluar adalah ketika NextShardIterator
nilainya. NULL
KapanNULL
, NextShardIterator
itu berarti bahwa pecahan saat ini telah ditutup dan ShardIterator
nilainya akan melewati rekor terakhir. Jika aplikasi yang mengkonsumsi tidak pernah memanggil SplitShard atauMergeShards, pecahan tetap terbuka dan panggilan untuk GetRecords tidak pernah mengembalikan NextShardIterator
nilai yang adaNULL
.
Jika Anda menggunakan Kinesis Client Library (KCL), pola konsumsi sebelumnya diabstraksikan untuk Anda. Ini termasuk penanganan otomatis dari satu set pecahan yang berubah secara dinamis. Dengan KCL, pengembang hanya memasok logika untuk memproses catatan yang masuk. Ini dimungkinkan karena perpustakaan membuat panggilan terus menerus GetRecords untuk Anda.
Iterator shard kedaluwarsa secara tak terduga
Sebuah iterator shard baru dikembalikan oleh setiap GetRecords permintaan (asNextShardIterator
), yang kemudian Anda gunakan dalam GetRecords permintaan berikutnya (asShardIterator
). Biasanya, iterator pecahan ini tidak kedaluwarsa sebelum Anda menggunakannya. Namun, Anda mungkin menemukan bahwa iterator shard kedaluwarsa karena Anda belum menelepon GetRecords lebih dari 5 menit, atau karena Anda telah melakukan restart aplikasi konsumen Anda.
Jika iterator shard kedaluwarsa segera sebelum Anda dapat menggunakannya, ini mungkin menunjukkan bahwa tabel DynamoDB yang digunakan oleh Kinesis tidak memiliki kapasitas yang cukup untuk menyimpan data sewa. Situasi ini lebih mungkin terjadi jika Anda memiliki sejumlah besar pecahan. Untuk mengatasi masalah ini, tingkatkan kapasitas tulis yang ditetapkan ke tabel pecahan. Untuk informasi selengkapnya, lihat Gunakan tabel sewa untuk melacak pecahan yang diproses oleh aplikasi konsumen KCL.
Pemrosesan catatan konsumen tertinggal
Untuk sebagian besar kasus penggunaan, aplikasi konsumen membaca data terbaru dari aliran. Dalam keadaan tertentu, bacaan konsumen mungkin tertinggal, yang mungkin tidak diinginkan. Setelah Anda mengidentifikasi seberapa jauh di belakang konsumen Anda membaca, lihat alasan paling umum mengapa konsumen tertinggal.
Mulailah dengan GetRecords.IteratorAgeMilliseconds
metrik, yang melacak posisi baca di semua pecahan dan konsumen di aliran. Perhatikan bahwa jika usia iterator melewati 50% dari periode retensi (secara default, 24 jam, dapat dikonfigurasi hingga 365 hari), ada risiko kehilangan data karena kedaluwarsa rekaman. Solusi sementara yang cepat adalah meningkatkan periode retensi. Ini menghentikan hilangnya data penting saat Anda memecahkan masalah lebih lanjut. Untuk informasi selengkapnya, lihat Pantau layanan Amazon Kinesis Data Streams dengan Amazon CloudWatch. Selanjutnya, identifikasi seberapa jauh di belakang aplikasi konsumen Anda membaca dari setiap pecahan menggunakan CloudWatch metrik khusus yang dipancarkan oleh Kinesis Client Library (KCL),. MillisBehindLatest
Untuk informasi selengkapnya, lihat Pantau Perpustakaan Klien Kinesis dengan Amazon CloudWatch.
Berikut adalah alasan paling umum konsumen dapat tertinggal:
-
Peningkatan besar yang tiba-tiba ke
GetRecords.IteratorAgeMilliseconds
atauMillisBehindLatest
biasanya menunjukkan masalah sementara, seperti kegagalan operasi API ke aplikasi hilir. Selidiki peningkatan mendadak ini jika salah satu metrik secara konsisten menampilkan perilaku ini. -
Peningkatan bertahap pada metrik ini menunjukkan bahwa konsumen tidak mengikuti aliran karena tidak memproses catatan dengan cukup cepat. Akar penyebab paling umum untuk perilaku ini adalah sumber daya fisik yang tidak mencukupi atau logika pemrosesan catatan yang belum diskalakan dengan peningkatan throughput aliran. Anda dapat memverifikasi perilaku ini dengan melihat CloudWatch metrik kustom lain yang dipancarkan KCL terkait dengan
processTask
operasi, termasukRecordProcessor.processRecords.Time
,, dan.Success
RecordsProcessed
-
Jika Anda melihat peningkatan
processRecords.Time
metrik yang berkorelasi dengan peningkatan throughput, Anda harus menganalisis logika pemrosesan catatan Anda untuk mengidentifikasi mengapa metrik tidak menskalakan dengan peningkatan throughput. -
Jika Anda melihat peningkatan
processRecords.Time
nilai yang tidak berkorelasi dengan peningkatan throughput, periksa untuk melihat apakah Anda melakukan panggilan pemblokiran di jalur kritis, yang sering menjadi penyebab perlambatan dalam pemrosesan catatan. Pendekatan alternatif adalah meningkatkan paralelisme Anda dengan meningkatkan jumlah pecahan. Terakhir, konfirmasikan bahwa Anda memiliki jumlah sumber daya fisik yang memadai (memori, pemanfaatan CPU, antara lain) pada node pemrosesan yang mendasarinya selama permintaan puncak.
-
Kesalahan izin kunci KMS yang tidak sah
Kesalahan ini terjadi ketika aplikasi konsumen membaca dari aliran terenkripsi tanpa izin pada kunci. AWS KMS Untuk menetapkan izin ke aplikasi untuk mengakses kunci KMS, lihat Menggunakan Kebijakan Utama di KMS dan Menggunakan Kebijakan IAM dengan AWS KMS. AWS
DynamoDbException: Jalur dokumen yang disediakan dalam ekspresi pembaruan tidak valid untuk pembaruan
Saat menggunakan KCL 3.x dengan AWS SDK for Java versi 2.27.19 hingga 2.27.23, Anda mungkin menemukan pengecualian DynamoDB berikut:
“software.amazon.awssdk.services.dynamodb.model. DynamoDbException: Jalur dokumen yang disediakan dalam ekspresi pembaruan tidak valid untuk pembaruan (Layanan: DynamoDb, Kode Status: 400, ID Permintaan: xxx)”
Kesalahan ini terjadi karena masalah yang diketahui di yang mempengaruhi tabel metadata DynamoDB AWS SDK for Java yang dikelola oleh KCL 3.x. Masalah ini diperkenalkan di versi 2.27.19 dan berdampak pada semua versi hingga 2.27.23. Masalah telah diselesaikan di AWS SDK for Java versi 2.27.24. Untuk kinerja dan stabilitas yang optimal, kami sarankan untuk meningkatkan ke versi 2.28.0 atau yang lebih baru.
Memecahkan masalah umum lainnya bagi konsumen
-
Mengapa pemicu Kinesis Data Streams tidak dapat menjalankan fungsi Lambda saya?
-
Mengapa saya mengalami masalah latensi tinggi dengan Kinesis Data Streams?
-
Mengapa aliran data Kinesis saya mengembalikan Kesalahan Server Internal 500?
-
Bagaimana cara memecahkan masalah aplikasi KCL yang diblokir atau macet untuk Kinesis Data Streams?