Contoh: Mengambil Nilai yang Paling Sering Terjadi (TOP_K_ITEMS_TUMBLING) - Panduan Developer Amazon Kinesis Data Analytics untuk Aplikasi SQL

Untuk proyek baru, kami menyarankan Anda menggunakan Managed Service baru untuk Apache Flink Studio melalui Kinesis Data Analytics untuk Aplikasi SQL. Layanan Terkelola untuk Apache Flink Studio menggabungkan kemudahan penggunaan dengan kemampuan analitis tingkat lanjut, memungkinkan Anda membangun aplikasi pemrosesan aliran yang canggih dalam hitungan menit.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Contoh: Mengambil Nilai yang Paling Sering Terjadi (TOP_K_ITEMS_TUMBLING)

Contoh Amazon Kinesis Data Analytics menunjukkan cara menggunakan fungsi TOP_K_ITEMS_TUMBLING untuk mengambil nilai yang paling sering terjadi di jendela tumbling. Untuk informasi selengkapnya, lihat TOP_K_ITEMS_TUMBLINGfungsi di Amazon Managed Service for Apache Flink SQL Reference.

Fungsi TOP_K_ITEMS_TUMBLING berguna ketika menggabungkan lebih dari puluhan atau ratusan ribu kunci, dan Anda ingin mengurangi penggunaan sumber daya Anda. Fungsi ini menghasilkan hasil yang sama seperti menggabungkan dengan klausa GROUP BY dan ORDER BY.

Dalam contoh ini, Anda menulis catatan berikut ke Amazon Kinesis data stream:

{"TICKER": "TBV"} {"TICKER": "INTC"} {"TICKER": "MSFT"} {"TICKER": "AMZN"} ...

Anda kemudian membuat aplikasi Kinesis Data Analytics di AWS Management Console, dengan Kinesis data stream sebagai sumber streaming. Proses penemuan membaca catatan sampel pada sumber streaming dan menyimpulkan skema dalam aplikasi dengan satu kolom (TICKER) seperti yang ditunjukkan di bawah ini.


                Tangkapan layar konsol yang menampilkan skema dalam aplikasi dengan kolom ticker.

Anda menggunakan kode aplikasi dengan fungsi TOP_K_VALUES_TUMBLING untuk membuat jendela agregasi data. Anda selanjutnya memasukkan data yang dihasilkan ke aliran dalam aplikasi lainnya, seperti yang ditunjukkan dalam tangkapan layar bawah ini:


                Tangkapan layar konsol yang menampilkan data yang dihasilkan di aliran dalam aplikasi.

Dalam prosedur berikut, Anda membuat aplikasi Kinesis Data Analytics yang mengambil nilai yang paling sering terjadi di aliran input.

Langkah 1: Buat Kinesis Data Stream

Buat Amazon Kinesis data stream dan isi catatan sebagai berikut:

  1. Masuk ke AWS Management Console dan buka konsol Kinesis di https://console.aws.amazon.com/kinesis.

  2. Pilih Data Streams (Aliran Data) di panel navigasi.

  3. Pilih Create Kinesis stream (Buat Aliran Kinesis), lalu buat aliran dengan satu serpihan. Untuk informasi selengkapnya, lihat Buat Aliran di Panduan Developer Amazon Kinesis Data Streams.

  4. Untuk menulis catatan ke Kinesis data stream di lingkungan produksi, sebaiknya gunakan Kinesis Client Library atau API Kinesis Data Streams. Untuk kemudahan, contoh ini menggunakan skrip Python berikut untuk menghasilkan catatan. Jalankan kode untuk mengisi catatan ticker sampel. Kode sederhana ini terus menulis catatan ticker acak ke aliran. Biarkan skrip tetap berjalan agar Anda dapat menghasilkan skema aplikasi di langkah berikutnya.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Langkah 2: Buat Aplikasi Kinesis Data Analytics

Buat aplikasi Amazon Kinesis Data Analytics seperti berikut:

  1. Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/kinesisanalytics.

  2. Pilih Create application (Buat aplikasi), masukkan nama aplikasi, dan pilih Create application (Buat aplikasi).

  3. Pada halaman detail aplikasi, pilih Connect data streaming (Sambungkan data streaming) untuk menyambungkan ke sumber.

  4. Di halaman Sambungkan ke sumber, lakukan hal berikut:

    1. Pilih aliran yang Anda buat di bagian sebelumnya.

    2. Pilih Discover Schema (Temukan Skema). Tunggu hingga konsol menampilkan skema yang disimpulkan dan catatan sampel yang digunakan untuk menyimpulkan skema untuk aliran dalam aplikasi yang dibuat. Skema yang disimpulkan memiliki satu kolom.

    3. Pilih Save schema and update stream samples (Simpan skema dan perbarui sampel aliran). Setelah konsol menyimpan skema, pilih Exit (Keluar).

    4. Jangan pilih Save and continue (Simpan dan lanjutkan).

  5. Di halaman detail aplikasi, pilih Go to SQL editor (Buka editor SQL). Untuk memulai aplikasi, pilih Yes, start application (Ya, mulai aplikasi) di kotak dialog yang muncul.

  6. Di editor SQL, tulis kode aplikasi, dan verifikasi hasilnya sebagai berikut:

    1. Salin kode aplikasi berikut dan tempelkan ke editor:

      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM ( "TICKER" VARCHAR(4), "MOST_FREQUENT_VALUES" BIGINT ); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM * FROM TABLE (TOP_K_ITEMS_TUMBLING( CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 'TICKER', -- name of column in single quotes 5, -- number of the most frequently occurring values 60 -- tumbling window size in seconds ) );
    2. Pilih Save and run SQL (Simpan dan jalankan SQL).

      Di tab Real-time analytics (Analitik waktu nyata), Anda dapat melihat semua aliran dalam aplikasi yang dibuat aplikasi dan memverifikasi data.