開始方法 (スカラー) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

開始方法 (スカラー)

注記

バージョン 1.15 以降、Flink は Scala 無料です。アプリケーションが Scala の任意のバージョンから Java API を使用できるようになっています。Flink は引き続きいくつかの主要コンポーネントで Scala を内部的に使用しますが、Scala をユーザーコードクラスローダーに公開しません。そのため、Scala の依存関係を JAR アーカイブに追加する必要があります。

Flink 1.15 での Scala の変更についての詳しい情報は、Scala Free in One Fifteenを参照してください。

このエクササイズでは、ソースとシンクとしてKinesisストリームを使用して、Scala向けのApache Flinkアプリケーション 用 Managed Service を作成します。

依存リソースを作成する

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。

  • 入力用と出力用の 2 つの Kinesis ストリーム。

  • アプリケーションのコードを保存するためのAmazon S3バケット (ka-app-code-<username>)

Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。

  • 「Amazon Kinesis Data Streamsデベロッパーガイド」の「データストリームの作成および更新」 データストリームExampleInputStreamExampleOutputStreamに名前を付けます。

    データストリームを作成するには (AWS CLI)

    • 最初のストリーム (ExampleInputStream) を作成するには、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を ExampleOutputStream に変更して同じコマンドを実行します。

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • Amazon Simple Storage Service ユーザーガイドの「S3 バケットを作成する方法」を参照してください。ログイン名 (ka-app-code-<username> など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。

その他のリソース

アプリケーションを作成すると、Managed Service for Apache Flink は、以下の Amazon CloudWatch リソースがまだ存在しない場合、それらを作成します。

  • /AWS/KinesisAnalytics-java/MyApplicationという名前のロググループ。

  • kinesis-analytics-log-stream というログストリーム

サンプルレコードを入力ストリームに書き込む

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。

注記

このセクションでは AWS SDK for Python (Boto) が必要です。

注記

このセクションの Python スクリプトでは、 AWS CLIを使用しています。アカウント認証情報とデフォルトのリージョンを使用する AWS CLI ように を設定する必要があります。を設定するには AWS CLI、次のように入力します。

aws configure
  1. 次の内容で、stock.py という名前のファイルを作成します。

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. stock.py スクリプトを実行します。

    $ python stock.py

    チュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。

アプリケーションコードをダウンロードして調べる

この例の Python アプリケーションコードは、 から入手できます GitHub。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「Git のインストール」をご参照ください。

  2. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. amazon-kinesis-data-analytics-java-examples/scala/GettingStarted ディレクトリに移動します。

アプリケーションコードに関して、以下の点に注意してください。

  • build.sbtファイルには、Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。

  • この BasicStreamingJob.scala ファイルには、アプリケーションの機能を定義するメインメソッドが含まれています。

  • アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis ソースが作成されます。

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    また、アプリケーションは Kinesis シンクを使用して結果ストリームに書き込みます。次のスニペットでは、Kinesis シンクが作成されます。

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • アプリケーションは、 StreamExecutionEnvironment オブジェクトを使用して外部リソースにアクセスするためのソースコネクタとシンクコネクタを作成します。

  • アプリケーションは、動的アプリケーションプロパティを使用してソースコネクタとシンクコネクタを作成します。アプリケーションのプロパティを読み取ってコネクタを設定します。ランタイムプロパティの詳細については、ランタイムプロパティを参照してください。

アプリケーション・コードをコンパイルしてアップロードするには

このセクションでは、アプリケーションコードをコンパイルし、依存リソースを作成する セクションで作成したAmazon S3バケットにアップロードします。

アプリケーションコードのコンパイル

このセクションでは、「SBT」 ビルド・ツールを使用してアプリケーションの Scala コードをビルドします。SBTをインストールするには、Install sbt with cs setupを参照してください。また、Java 開発キット(JDK)をインストールする必要があります。演習を完了するための前提条件 を参照してください。

  1. アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。SBT を使用してコードをコンパイルしてパッケージ化できます。

    sbt assembly
  2. アプリケーションのコンパイルに成功すると、次のファイルが作成されます。

    target/scala-3.2.0/getting-started-scala-1.0.jar
Apache Flink Streaming Scala Code のアップロード

このセクションでは、Amazon S3 バケットを作成し、アプリケーションコードをアップロードします。

  1. https://console.aws.amazon.com/s3/でAmazon S3 コンソールを開きます。

  2. [バケットを作成] を選択します。

  3. [Bucket name (バケット名)] フィールドにka-app-code-<username>と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[次へ] をクリックします。

  4. 設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。

  5. アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。

  6. [バケットを作成] を選択します。

  7. ka-app-code-<username>バケットを選択し、アップロード を選択します。

  8. ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した getting-started-scala-1.0.jar ファイルに移動します。

  9. オブジェクトの設定を変更する必要はないので、[アップロード] を選択してください。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。