Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
開始方法 (スカラー)
注記
バージョン 1.15 以降、Flink は Scala 無料です。アプリケーションが Scala の任意のバージョンから Java API を使用できるようになっています。Flink は引き続きいくつかの主要コンポーネントで Scala を内部的に使用しますが、Scala をユーザーコードクラスローダーに公開しません。そのため、Scala の依存関係を JAR アーカイブに追加する必要があります。
Flink 1.15 での Scala の変更についての詳しい情報は、Scala Free in One Fifteen
このエクササイズでは、ソースとシンクとしてKinesisストリームを使用して、Scala向けのApache Flinkアプリケーション 用 Managed Service を作成します。
このトピックには、次のセクションが含まれています。
依存リソースを作成する
この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
入力用と出力用の 2 つの Kinesis ストリーム。
アプリケーションのコードを保存するためのAmazon S3バケット (
ka-app-code-
)<username>
Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。
「Amazon Kinesis Data Streamsデベロッパーガイド」の「データストリームの作成および更新」 データストリーム
ExampleInputStream
とExampleOutputStream
に名前を付けます。データストリームを作成するには (AWS CLI)
最初のストリーム (
ExampleInputStream
) を作成するには、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を
ExampleOutputStream
に変更して同じコマンドを実行します。aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Amazon Simple Storage Service ユーザーガイドの「S3 バケットを作成する方法」を参照してください。ログイン名 (
ka-app-code-
など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。<username>
その他のリソース
アプリケーションを作成すると、Managed Service for Apache Flink は、以下の Amazon CloudWatch リソースがまだ存在しない場合、それらを作成します。
/AWS/KinesisAnalytics-java/MyApplication
という名前のロググループ。kinesis-analytics-log-stream
というログストリーム
サンプルレコードを入力ストリームに書き込む
このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。
注記
このセクションでは AWS SDK for Python (Boto)
注記
このセクションの Python スクリプトでは、 AWS CLIを使用しています。アカウント認証情報とデフォルトのリージョンを使用する AWS CLI ように を設定する必要があります。を設定するには AWS CLI、次のように入力します。
aws configure
-
次の内容で、
stock.py
という名前のファイルを作成します。import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
-
stock.py
スクリプトを実行します。$ python stock.py
チュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。
アプリケーションコードをダウンロードして調べる
この例の Python アプリケーションコードは、 から入手できます GitHub。アプリケーションコードをダウンロードするには、次の操作を行います。
Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「Git のインストール
」をご参照ください。 次のコマンドを使用してリモートリポジトリのクローンを作成します。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
amazon-kinesis-data-analytics-java-examples/scala/GettingStarted
ディレクトリに移動します。
アプリケーションコードに関して、以下の点に注意してください。
build.sbt
ファイルには、Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。この
BasicStreamingJob.scala
ファイルには、アプリケーションの機能を定義するメインメソッドが含まれています。アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis ソースが作成されます。
private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }
また、アプリケーションは Kinesis シンクを使用して結果ストリームに書き込みます。次のスニペットでは、Kinesis シンクが作成されます。
private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
アプリケーションは、 StreamExecutionEnvironment オブジェクトを使用して外部リソースにアクセスするためのソースコネクタとシンクコネクタを作成します。
アプリケーションは、動的アプリケーションプロパティを使用してソースコネクタとシンクコネクタを作成します。アプリケーションのプロパティを読み取ってコネクタを設定します。ランタイムプロパティの詳細については、ランタイムプロパティを参照してください。
アプリケーション・コードをコンパイルしてアップロードするには
このセクションでは、アプリケーションコードをコンパイルし、依存リソースを作成する セクションで作成したAmazon S3バケットにアップロードします。
アプリケーションコードのコンパイル
このセクションでは、「SBT
アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。SBT を使用してコードをコンパイルしてパッケージ化できます。
sbt assembly
-
アプリケーションのコンパイルに成功すると、次のファイルが作成されます。
target/scala-3.2.0/getting-started-scala-1.0.jar
Apache Flink Streaming Scala Code のアップロード
このセクションでは、Amazon S3 バケットを作成し、アプリケーションコードをアップロードします。
https://console.aws.amazon.com/s3/
でAmazon S3 コンソールを開きます。 [バケットを作成] を選択します。
[Bucket name (バケット名)] フィールドに
ka-app-code-<username>
と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[次へ] をクリックします。設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。
アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。
[バケットを作成] を選択します。
ka-app-code-<username>
バケットを選択し、アップロード を選択します。-
ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した
getting-started-scala-1.0.jar
ファイルに移動します。 オブジェクトの設定を変更する必要はないので、[アップロード] を選択してください。
アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。