Memulai (Scala) - Layanan Terkelola untuk Apache Flink

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memulai (Scala)

catatan

Mulai dari versi 1.15, Flink gratis Scala. Aplikasi sekarang dapat menggunakan Java API dari versi Scala apa pun. Flink masih menggunakan Scala di beberapa komponen kunci secara internal, tetapi tidak mengekspos Scala ke dalam classloader kode pengguna. Karena itu, Anda harus menambahkan dependensi Scala ke arsip JAR Anda.

Untuk informasi selengkapnya tentang perubahan Scala di Flink 1.15, lihat Scala Free in One Fifteen.

Dalam latihan ini, Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk Scala dengan aliran Kinesis sebagai sumber dan wastafel.

Buat sumber daya yang bergantung

Sebelum Anda membuat Layanan Terkelola untuk aplikasi Apache Flink untuk latihan ini, Anda membuat sumber daya dependen berikut:

  • Dua aliran Kinesis untuk input dan output.

  • Bucket Amazon S3 untuk menyimpan kode aplikasi (ka-app-code-<username>)

Anda dapat membuat aliran Kinesis dan bucket Amazon S3 menggunakan konsol. Untuk petunjuk membuat sumber daya ini, lihat topik berikut:

  • Membuat dan Memperbarui Aliran Data di Panduan Developer Amazon Kinesis Data Streams. Beri nama aliran data ExampleInputStream dan ExampleOutputStream Anda.

    Untuk membuat aliran data AWS CLI

    • Untuk membuat stream (ExampleInputStream) pertama, gunakan perintah Amazon Kinesis AWS CLI create-stream berikut.

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • Untuk membuat aliran kedua yang digunakan aplikasi untuk menulis output, jalankan perintah yang sama, yang mengubah nama aliran menjadi ExampleOutputStream.

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • Bagaimana Cara Membuat Bucket S3? di Panduan Pengguna Layanan Penyimpanan Sederhana Amazon. Beri bucket Amazon S3 nama yang unik secara global dengan menambahkan nama login Anda, seperti ka-app-code-<username>.

Sumber daya lainnya

Saat Anda membuat aplikasi, Managed Service for Apache Flink akan membuat CloudWatch resource Amazon berikut jika belum ada:

  • Sebuah grup log yang disebut /AWS/KinesisAnalytics-java/MyApplication

  • Aliran log yang disebut kinesis-analytics-log-stream

Tulis catatan sampel ke aliran input

Di bagian ini, Anda menggunakan script Python untuk menulis catatan sampel ke aliran untuk diproses aplikasi.

catatan

Bagian ini memerlukan AWS SDK for Python (Boto).

catatan

Skrip Python di bagian ini menggunakan AWS CLI. Anda harus mengonfigurasi AWS CLI untuk menggunakan kredensyal akun dan wilayah default Anda. Untuk mengkonfigurasi Anda AWS CLI, masukkan yang berikut ini:

aws configure
  1. Buat file bernama stock.py dengan konten berikut:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Jalankan skrip stock.py.

    $ python stock.py

    Biarkan skrip tetap berjalan saat menyelesaikan sisa tutorial.

Unduh dan periksa kode aplikasi

Kode aplikasi Python untuk contoh ini tersedia dari. GitHub Untuk mengunduh kode aplikasi, lakukan hal berikut:

  1. Instal klien Git jika Anda belum menginstalnya. Untuk informasi selengkapnya, lihat Menginstal Git.

  2. Klon repositori jarak jauh dengan perintah berikut:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Buka direktori amazon-kinesis-data-analytics-java-examples/scala/GettingStarted tersebut.

Perhatikan hal tentang kode aplikasi berikut:

  • build.sbtFile berisi informasi tentang konfigurasi dan dependensi aplikasi, termasuk Layanan Terkelola untuk pustaka Apache Flink.

  • BasicStreamingJob.scalaFile berisi metode utama yang mendefinisikan fungsionalitas aplikasi.

  • Aplikasi menggunakan sumber Kinesis untuk membaca dari aliran sumber. Cuplikan berikut ini membuat sumber Kinesis:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    Aplikasi ini juga menggunakan sink Kinesis untuk menulis ke dalam aliran hasil. Cuplikan berikut membuat sink Kinesis:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • Aplikasi ini membuat konektor sumber dan wastafel untuk mengakses sumber daya eksternal menggunakan StreamExecutionEnvironment objek.

  • Aplikasi ini membuat konektor sumber dan wastafel menggunakan properti aplikasi dinamis. Properti aplikasi runtime dibaca untuk mengkonfigurasi konektor. Untuk informasi selengkapnya tentang properti runtime, lihat Properti Runtime.

Kompilasi dan unggah kode aplikasi

Di bagian ini, Anda mengkompilasi dan mengunggah kode aplikasi Anda ke bucket Amazon S3 yang Anda buat di Buat sumber daya yang bergantung bagian tersebut.

Kompilasi Kode Aplikasi

Di bagian ini, Anda menggunakan alat build SBT untuk membangun kode Scala untuk aplikasi. Untuk menginstal SBT, lihat Menginstal sbt dengan pengaturan cs. Anda juga perlu menginstal Java Development Kit (JDK). Lihat Prasyarat untuk Menyelesaikan Latihan.

  1. Untuk menggunakan kode aplikasi Anda, Anda mengompilasi dan mengemasnya ke dalam file JAR. Anda dapat mengkompilasi dan mengemas kode Anda dengan SBT:

    sbt assembly
  2. Jika aplikasi berhasil mengompilasi, file berikut dibuat:

    target/scala-3.2.0/getting-started-scala-1.0.jar
Unggah Kode Scala Streaming Apache Flink

Di bagian ini, Anda membuat bucket Amazon S3 dan mengunggah kode aplikasi Anda.

  1. Buka konsol Amazon S3 di https://console.aws.amazon.com/s3/.

  2. Pilih Buat ember

  3. Masukkan ka-app-code-<username> di bidang Bucket name (Nama bucket). Tambahkan sufiks ke nama bucket, seperti nama pengguna Anda, untuk membuatnya unik secara global. Pilih Next (Selanjutnya).

  4. Di opsi Konfigurasi, pertahankan pengaturan apa adanya, dan pilih Berikutnya.

  5. Di Setel izin, pertahankan pengaturan apa adanya, dan pilih Berikutnya.

  6. Pilih Buat bucket.

  7. Pilih ka-app-code-<username> bucket, lalu pilih Unggah.

  8. Di langkah Pilih file, pilih Add files (Tambahkan berkas). Navigasikan ke file getting-started-scala-1.0.jar yang Anda buat di langkah sebelumnya.

  9. Anda tidak perlu mengubah pengaturan apa pun untuk objek, jadi pilih Upload (Unggah).

Kode aplikasi Anda sekarang disimpan di bucket Amazon S3 yang dapat diakses aplikasi Anda.