Membuat aplikasi menggunakan Apache Beam - Layanan Terkelola untuk Apache Flink

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Membuat aplikasi menggunakan Apache Beam

Dalam latihan ini, Anda membuat Layanan Terkelola untuk aplikasi Apache Flink yang mengubah data menggunakan Apache Beam. Apache Beam adalah model pemrograman untuk memproses data streaming. Untuk informasi tentang menggunakan Apache Beam dengan Managed Service untuk Apache Flink, lihat. Menggunakan Apache Beam

catatan

Untuk menyiapkan prasyarat yang diperlukan untuk latihan ini, selesaikan latihan Tutorial: Memulai menggunakan DataStream API terlebih dulu.

Buat sumber daya yang bergantung

Sebelum Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk latihan ini, Anda membuat sumber daya dependen berikut:

  • Dua Kinesis data streams (ExampleInputStream dan ExampleOutputStream)

  • Bucket Amazon S3 untuk menyimpan kode aplikasi (ka-app-code-<username>)

Anda dapat membuat aliran Kinesis dan bucket Amazon S3 menggunakan konsol. Untuk petunjuk membuat sumber daya ini, lihat topik berikut:

  • Membuat dan Memperbarui Aliran Data di Panduan Developer Amazon Kinesis Data Streams. Beri nama aliran data ExampleInputStream dan ExampleOutputStream Anda.

  • Bagaimana Cara Membuat Bucket S3? di Panduan Pengguna Layanan Penyimpanan Sederhana Amazon. Beri bucket Amazon S3 nama yang unik secara global dengan menambahkan nama login Anda, seperti ka-app-code-<username>.

Tulis catatan sampel ke aliran input

Di bagian ini, Anda menggunakan script Python untuk menulis string acak ke aliran untuk diproses aplikasi.

catatan

Bagian ini memerlukan AWS SDK for Python (Boto).

  1. Buat file bernama ping.py dengan konten berikut:

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Jalankan skrip ping.py.

    $ python ping.py

    Biarkan skrip tetap berjalan saat menyelesaikan sisa tutorial.

Unduh dan periksa kode aplikasi

Kode aplikasi Java untuk contoh ini tersedia dari GitHub. Untuk mengunduh kode aplikasi, lakukan hal berikut:

  1. Instal klien Git jika Anda belum menginstalnya. Untuk informasi selengkapnya, lihat Menginstal Git.

  2. Klon repositori jarak jauh dengan perintah berikut:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Buka direktori amazon-kinesis-data-analytics-java-examples/Beam tersebut.

Kode aplikasi terletak di file BasicBeamStreamingJob.java. Perhatikan hal tentang kode aplikasi berikut:

  • Aplikasi ini menggunakan Apache Beam ParDountuk memproses catatan masuk dengan menjalankan fungsi transformasi kustom yang disebut. PingPongFn

    Kode untuk memanggil fungsi PingPongFn adalah sebagai berikut:

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Managed Service untuk aplikasi Apache Flink yang menggunakan Apache Beam memerlukan komponen-komponen berikut. Jika Anda tidak menyertakan komponen dan versi ini di pom.xml Anda, aplikasi Anda memuat versi yang salah dari dependensi lingkungan, dan karena versi tidak cocok, aplikasi Anda mengalahi crash saat runtime.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • Fungsi ubah PingPongFn meneruskan data input ke aliran output, kecuali data input adalah ping, yang dalam hal ini memancarkan string pong\n ke aliran output.

    Kode fungsi ubah adalah sebagai berikut:

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Kompilasi kode aplikasi

Untuk mengompilasi aplikasi, lakukan hal berikut:

  1. Instal Java dan Maven jika Anda belum menginstalnya. Untuk informasi selengkapnya, lihat Lengkapi prasyarat yang diperlukan di tutorial Tutorial: Memulai menggunakan DataStream API.

  2. Susun aplikasi dengan perintah berikut:

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    catatan

    Kode sumber yang disediakan bergantung pada pustaka dari Java 11.

Mengkompilasi aplikasi membuat file JAR aplikasi (target/basic-beam-app-1.0.jar).

Unggah kode Java streaming Apache Flink

Di bagian ini, Anda mengunggah kode aplikasi ke bucket Amazon S3 yang Anda buat di bagian Buat sumber daya yang bergantung.

  1. Di konsol Amazon S3, pilih bucket ka-app-code -, dan pilih Unggah. <username>

  2. Di langkah Pilih file, pilih Add files (Tambahkan berkas). Navigasikan ke file basic-beam-app-1.0.jar yang Anda buat di langkah sebelumnya.

  3. Anda tidak perlu mengubah pengaturan apa pun untuk objek, jadi pilih Upload (Unggah).

Kode aplikasi Anda sekarang disimpan di bucket Amazon S3 yang dapat diakses aplikasi Anda.

Buat dan jalankan Managed Service untuk aplikasi Apache Flink

Ikuti langkah-langkah ini untuk membuat, mengonfigurasi, memperbarui, dan menjalankan aplikasi menggunakan konsol.

Buat Aplikasi

  1. Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/flink

  2. Pada dashboard Managed Service for Apache Flink, pilih Create Analytics Application.

  3. Pada Layanan Terkelola untuk Apache Flink - Buat halaman aplikasi, berikan detail aplikasi sebagai berikut:

    • Untuk Application name (Nama aplikasi), masukkan MyApplication.

    • Untuk Runtime, pilih Apache Flink.

      catatan

      Apache Beam saat ini tidak kompatibel dengan Apache Flink versi 1.19 atau yang lebih baru.

    • Pilih Apache Flink versi 1.15 dari versi pulldown.

  4. Untuk Access permissions (Izin akses), pilih Create / update IAM role kinesis-analytics-MyApplication-us-west-2 (Buat/perbarui IAM role ).

  5. Pilih Create application (Buat aplikasi).

catatan

Saat membuat aplikasi Managed Service for Apache Flink menggunakan konsol, Anda memiliki opsi untuk membuat peran dan kebijakan IAM untuk aplikasi Anda. Aplikasi Anda menggunakan peran dan kebijakan ini untuk mengakses sumber daya dependen. Sumber daya IAM ini diberi nama menggunakan nama aplikasi dan Wilayah sebagai berikut:

  • Kebijakan: kinesis-analytics-service-MyApplication-us-west-2

  • Peran: kinesis-analytics-MyApplication-us-west-2

Edit kebijakan IAM

Edit kebijakan IAM untuk menambahkan izin mengakses Kinesis data streams.

  1. Buka konsol IAM di https://console.aws.amazon.com/iam/.

  2. Pilih Policies (Kebijakan). Pilih kebijakan kinesis-analytics-service-MyApplication-us-west-2 yang dibuat konsol untuk Anda di bagian sebelumnya.

  3. Di halaman Ringkasan, pilih Edit policy (Edit kebijakan). Pilih tab JSON.

  4. Tambahkan bagian yang disorot dari contoh kebijakan berikut ke kebijakan. Ganti ID akun sampel (012345678901) dengan ID akun Anda.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Konfigurasikan aplikasi

  1. Pada MyApplicationhalaman, pilih Konfigurasi.

  2. Di halaman Konfigurasikan aplikasi, berikan Code location (Lokasi kode):

    • Untuk Bucket Amazon S3, masukkan ka-app-code-<username>.

    • Untuk Jalur ke objek Amazon S3, masukkan basic-beam-app-1.0.jar.

  3. Di bawah Akses ke sumber daya aplikasi, untuk Access permissions (Izin akses), pilih Create / update IAM role kinesis-analytics-MyApplication-us-west-2 (Pilih/perbarui IAM role ).

  4. Masukkan yang berikut ini:

    ID Grup Kunci Nilai
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. Di bawah Pemantauan, pastikan Memantau tingkat metrik diatur ke Aplikasi.

  6. Untuk CloudWatch logging, pilih kotak centang Aktifkan.

  7. Pilih Perbarui.

catatan

Saat Anda memilih untuk mengaktifkan CloudWatch logging, Managed Service for Apache Flink membuat grup log dan aliran log untuk Anda. Nama-nama sumber daya ini adalah sebagai berikut:

  • Grup log: /aws/kinesis-analytics/MyApplication

  • Aliran log: kinesis-analytics-log-stream

Aliran log ini digunakan untuk memantau aplikasi. Ini bukan aliran log yang sama dengan yang digunakan aplikasi untuk mengirim hasil.

Jalankan aplikasi

Grafik pekerjaan Flink dapat dilihat dengan menjalankan aplikasi, membuka dasbor Apache Flink, dan memilih pekerjaan Flink yang diinginkan.

Anda dapat memeriksa metrik Managed Service for Apache Flink di CloudWatch konsol untuk memverifikasi bahwa aplikasi berfungsi.

Bersihkan AWS sumber daya

Bagian ini mencakup prosedur untuk membersihkan AWS sumber daya yang dibuat dalam tutorial Tumbling Window.

Hapus Layanan Terkelola Anda untuk aplikasi Apache Flink

  1. Buka Layanan Terkelola untuk konsol Apache Flink di https://console.aws.amazon.com/flink

  2. di panel Managed Service for Apache Flink, pilih. MyApplication

  3. Di halaman aplikasi, pilih Delete (Hapus), lalu konfirmasikan penghapusan.

Hapus aliran data Kinesis

  1. Buka konsol Kinesis di https://console.aws.amazon.com/kinesis.

  2. Di panel Kinesis Data Streams, pilih. ExampleInputStream

  3. Di ExampleInputStreamhalaman, pilih Hapus Stream Kinesis dan kemudian konfirmasikan penghapusan.

  4. Di halaman Kinesis streams, pilih, pilih Tindakan ExampleOutputStream, pilih Hapus, lalu konfirmasikan penghapusan.

Hapus objek dan ember Amazon S3 Anda

  1. Buka konsol Amazon S3 di https://console.aws.amazon.com/s3/.

  2. Pilih ka-app-code- ember. <username>

  3. Pilih Delete (Hapus), lalu masukkan nama bucket untuk mengonfirmasi penghapusan.

Hapus sumber daya IAM Anda

  1. Buka konsol IAM di https://console.aws.amazon.com/iam/.

  2. Di bilah navigasi, pilih Policies (Kebijakan).

  3. Di kontrol filter, masukkan kinesis.

  4. Pilih kebijakan kinesis-analytics-service- MyApplication -us-west-2.

  5. Pilih Policy Actions (Tindakan Kebijakan), lalu pilih Delete (Hapus).

  6. Di bilah navigasi, pilih Roles (Peran).

  7. Pilih peran kinesis-analytics- -us-west-2. MyApplication

  8. Pilih Delete role (Hapus peran), lalu konfirmasi penghapusan.

Hapus CloudWatch sumber daya Anda

  1. Buka CloudWatch konsol di https://console.aws.amazon.com/cloudwatch/.

  2. Di bilah navigasi, pilih Logs.

  3. Pilih grup log MyApplication/aws/kinesis-analytics/.

  4. Pilih Delete Log Group (Hapus Grup Log), lalu konfirmasi penghapusan.

Langkah selanjutnya

Sekarang setelah Anda membuat dan menjalankan Managed Service dasar untuk aplikasi Apache Flink yang mengubah data menggunakan Apache Beam, lihat aplikasi berikut untuk contoh Managed Service yang lebih canggih untuk solusi Apache Flink.