Tutorial: Lakukan operasi Kinesis Data Streams dasar menggunakan AWS CLI - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Tutorial: Lakukan operasi Kinesis Data Streams dasar menggunakan AWS CLI

Bagian ini menjelaskan penggunaan dasar aliran data Kinesis dari baris perintah menggunakan file. AWS CLI Pastikan Anda terbiasa dengan konsep yang dibahas diTerminologi dan konsep Amazon Kinesis Data Streams.

catatan

Setelah Anda membuat stream, akun Anda akan dikenakan biaya nominal untuk penggunaan Kinesis Data Streams karena Kinesis Data Streams tidak memenuhi syarat untuk Tingkat Gratis. AWS Setelah Anda selesai dengan tutorial ini, hapus AWS sumber daya Anda untuk menghentikan biaya. Untuk informasi selengkapnya, lihat Langkah 4: Membersihkan.

Langkah 1: Buat aliran

Langkah pertama Anda adalah membuat aliran dan memverifikasi bahwa itu berhasil dibuat. Gunakan perintah berikut untuk membuat aliran bernama “Foo”:

aws kinesis create-stream --stream-name Foo

Selanjutnya, keluarkan perintah berikut untuk memeriksa kemajuan pembuatan stream:

aws kinesis describe-stream-summary --stream-name Foo

Anda harus mendapatkan output yang mirip dengan contoh berikut:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Dalam contoh ini, aliran memiliki statusCREATING, yang berarti belum siap digunakan. Periksa lagi dalam beberapa saat, dan Anda akan melihat output yang mirip dengan contoh berikut:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Ada informasi dalam output ini yang tidak Anda perlukan untuk tutorial ini. Informasi penting untuk saat ini adalah"StreamStatus": "ACTIVE", yang memberi tahu Anda bahwa aliran siap digunakan, dan informasi tentang pecahan tunggal yang Anda minta. Anda juga dapat memverifikasi keberadaan aliran baru Anda dengan menggunakan list-streams perintah, seperti yang ditunjukkan di sini:

aws kinesis list-streams

Output:

{ "StreamNames": [ "Foo" ] }

Langkah 2: Letakkan catatan

Sekarang setelah Anda memiliki aliran aktif, Anda siap untuk memasukkan beberapa data. Untuk tutorial ini, Anda akan menggunakan perintah sesederhana mungkinput-record, yang menempatkan catatan data tunggal yang berisi teks “testdata” ke dalam aliran:

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

Perintah ini, jika berhasil, akan menghasilkan output yang mirip dengan contoh berikut:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

Selamat, Anda baru saja menambahkan data ke aliran! Selanjutnya Anda akan melihat cara mengeluarkan data dari aliran.

Langkah 3: Dapatkan catatan

GetShardIterator

Sebelum Anda bisa mendapatkan data dari stream, Anda harus mendapatkan iterator shard untuk shard yang Anda minati. Sebuah iterator shard mewakili posisi aliran dan pecahan dari mana konsumen (get-recordperintah dalam hal ini) akan membaca. Anda akan menggunakan get-shard-iterator perintah sebagai berikut:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

Ingat bahwa aws kinesis perintah memiliki Kinesis API Data Streams di belakangnya, jadi jika Anda ingin tahu tentang salah satu parameter yang ditampilkan, Anda dapat membacanya GetShardIteratorAPIdi topik referensi. Eksekusi yang berhasil akan menghasilkan output yang mirip dengan contoh berikut:

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

String panjang dari karakter yang tampaknya acak adalah iterator pecahan (milik Anda akan berbeda). Anda harus menyalin/menempelkan iterator shard ke dalam perintah get, ditampilkan berikutnya. Iterator shard memiliki masa pakai yang valid 300 detik, yang seharusnya cukup waktu bagi Anda untuk menyalin/menempelkan iterator shard ke perintah berikutnya. Anda harus menghapus baris baru dari iterator shard Anda sebelum menempel ke perintah berikutnya. Jika Anda mendapatkan pesan kesalahan bahwa iterator shard tidak lagi valid, jalankan get-shard-iterator perintah lagi.

GetRecords

get-recordsPerintah mendapatkan data dari aliran, dan menyelesaikan panggilan ke GetRecordsdalam Kinesis Data Streams. API Iterator shard menentukan posisi dalam pecahan dari mana Anda ingin mulai membaca catatan data secara berurutan. Jika tidak ada catatan yang tersedia di bagian pecahan yang ditunjuk iterator, GetRecords mengembalikan daftar kosong. Mungkin diperlukan beberapa panggilan untuk sampai ke sebagian pecahan yang berisi catatan.

Dalam contoh get-records perintah berikut:

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

Jika Anda menjalankan tutorial ini dari prosesor perintah tipe Unix seperti bash, Anda dapat mengotomatiskan akuisisi iterator shard menggunakan perintah bersarang, seperti ini:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

Jika Anda menjalankan tutorial ini dari sistem yang mendukung PowerShell, Anda dapat mengotomatiskan akuisisi iterator shard menggunakan perintah seperti ini:

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

Hasil get-records perintah yang berhasil akan meminta catatan dari aliran Anda untuk pecahan yang Anda tentukan saat Anda memperoleh iterator shard, seperti pada contoh berikut:

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

Perhatikan bahwa get-records dijelaskan di atas sebagai permintaan, yang berarti Anda mungkin menerima nol atau lebih catatan bahkan jika ada catatan dalam aliran Anda. Setiap catatan yang dikembalikan mungkin tidak mewakili semua catatan yang saat ini ada di aliran Anda. Ini normal, dan kode produksi akan melakukan polling aliran untuk catatan pada interval yang sesuai. Kecepatan polling ini akan bervariasi tergantung pada persyaratan desain aplikasi spesifik Anda.

Dalam catatan Anda di bagian tutorial ini, Anda akan melihat bahwa data tampaknya sampah - dan itu bukan teks yang jelas yang testdata kami kirim. Ini karena cara put-record menggunakan pengkodean Base64 untuk memungkinkan Anda mengirim data biner. Namun, dukungan Kinesis Data Streams AWS CLI di tidak menyediakan decoding Base64 karena decoding Base64 ke konten biner mentah yang dicetak ke stdout dapat menyebabkan perilaku yang tidak diinginkan dan potensi masalah keamanan pada platform dan terminal tertentu. Jika Anda menggunakan decoder Base64 (misalnya, https://www.base64decode.org/) untuk memecahkan kode secara manual, dGVzdGRhdGE= Anda akan melihat bahwa itu sebenarnya. testdata Ini cukup untuk kepentingan tutorial ini karena, dalam praktiknya, jarang AWS CLI digunakan untuk mengkonsumsi data. Lebih sering, ini digunakan untuk memantau keadaan aliran dan mendapatkan informasi, seperti yang ditunjukkan sebelumnya (describe-streamdanlist-streams). Untuk informasi selengkapnya tentangKCL, lihat Mengembangkan Konsumen Kustom dengan Menggunakan KCL Throughput Bersama.

get-recordstidak selalu mengembalikan semua catatan dalam stream/shard yang ditentukan. Ketika itu terjadi, gunakan NextShardIterator dari hasil terakhir untuk mendapatkan set catatan berikutnya. Jika lebih banyak data dimasukkan ke dalam aliran, yang merupakan situasi normal dalam aplikasi produksi, Anda dapat terus melakukan polling untuk data yang digunakan get-records setiap kali. Namun, jika Anda tidak memanggil get-records menggunakan iterator shard berikutnya dalam masa pakai iterator shard 300 detik, Anda akan mendapatkan pesan kesalahan, dan Anda harus menggunakan get-shard-iterator perintah untuk mendapatkan iterator shard baru.

Juga disediakan dalam output ini adalahMillisBehindLatest, yang merupakan jumlah milidetik respons GetRecordsoperasi dari ujung aliran, menunjukkan seberapa jauh di belakang waktu konsumen saat ini. Nilai nol menunjukkan pemrosesan catatan tertangkap, dan tidak ada catatan baru untuk diproses saat ini. Dalam kasus tutorial ini, Anda mungkin melihat angka yang cukup besar jika Anda telah meluangkan waktu untuk membaca bersama saat Anda pergi. Secara default, catatan data tetap berada dalam aliran selama 24 jam menunggu Anda untuk mengambilnya. Kerangka waktu ini disebut periode retensi dan dapat dikonfigurasi hingga 365 hari.

get-recordsHasil yang sukses akan selalu memiliki NextShardIterator bahkan jika tidak ada lagi catatan saat ini dalam aliran. Ini adalah model polling yang mengasumsikan produsen berpotensi menempatkan lebih banyak catatan ke dalam aliran pada waktu tertentu. Meskipun Anda dapat menulis rutinitas polling Anda sendiri, jika Anda menggunakan yang disebutkan sebelumnya KCL untuk mengembangkan aplikasi konsumen, polling ini diurus untuk Anda.

Jika Anda menelepon get-records sampai tidak ada lagi catatan dalam aliran dan pecahan yang Anda tarik, Anda akan melihat output dengan catatan kosong yang mirip dengan contoh berikut:

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

Langkah 4: Membersihkan

Hapus streaming Anda untuk mengosongkan sumber daya dan menghindari tagihan yang tidak diinginkan ke akun Anda. Lakukan ini setiap kali Anda telah membuat aliran dan tidak akan menggunakannya, karena biaya bertambah per aliran apakah Anda menempatkan dan mendapatkan data dengannya atau tidak. Perintah pembersihan adalah sebagai berikut:

aws kinesis delete-stream --stream-name Foo

Sukses menghasilkan tidak ada output. Gunakan describe-stream untuk memeriksa kemajuan penghapusan:

aws kinesis describe-stream-summary --stream-name Foo

Jika Anda menjalankan perintah ini segera setelah perintah delete, Anda akan melihat output yang mirip dengan contoh berikut:

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

Setelah aliran dihapus sepenuhnya, describe-stream akan menghasilkan kesalahan “tidak ditemukan”:

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.