Apache Beam を使用してアプリケーションを作成する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Beam を使用してアプリケーションを作成する

この課題では、「Apache Beam」を使用してデータを変換する Apache Flink アプリケーション用 Managed Serviceを作成します。Apache Beam はストリーミングデータを処理するためのプログラミングモデルです。Apache Flink 用 Managed Service での Apache Beam の使用ついては、 Managed Service for Apache Flink アプリケーションで Apache Beam を使用する を参照してください。

注記

この演習に必要な前提条件を設定するには、まずチュートリアル: Managed Service for Apache Flink で DataStream API の使用を開始する演習を完了してください。

依存リソースを作成する

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。

  • 2 つの Kinesis Data Streams (ExampleInputStreamExampleOutputStream)

  • アプリケーションのコードを保存するためのAmazon S3バケット (ka-app-code-<username>)

Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。

  • 「Amazon Kinesis Data Streamsデベロッパーガイド」の「データストリームの作成および更新」 データストリームExampleInputStreamExampleOutputStreamに名前を付けます。

  • Amazon Simple Storage Service ユーザーガイドの「S3 バケットを作成する方法」を参照してください。ログイン名 (ka-app-code-<username> など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。

サンプルレコードを入力ストリームに書き込む

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するランダムな文字列をストリームに書き込みます。

注記

このセクションでは AWS SDK for Python (Boto) が必要です。

  1. 次の内容で、ping.py という名前のファイルを作成します。

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. ping.py スクリプトを実行します。

    $ python ping.py

    チュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。

アプリケーションコードをダウンロードして調べる

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「Git のインストール」をご参照ください。

  2. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. amazon-kinesis-data-analytics-java-examples/Beam ディレクトリに移動します。

アプリケーションコードはBasicBeamStreamingJob.javaファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。

  • アプリケーションは Apache Beam 「ParDo」を使用して、 PingPongFn というカスタム変換関数を呼び出して受信レコードを処理します。

    PingPongFn 関数を呼び出すコードは次のとおりです。

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Apache Beam を使用する Apache Flink アプリケーション用 Managed Serviceには、以下のコンポーネントが必要です。これらのコンポーネントとバージョンを pom.xml に含めないと、アプリケーションは環境の依存関係から誤ったバージョンをロードし、バージョンが一致しないため、実行時にアプリケーションがクラッシュします。

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • PingPongFn 変換関数は、入力データが ping でない限り、入力データを出力ストリームに渡します。「ping」である場合は、文字列「pong\n」を出力ストリームに出力します。

    変換関数のコードは以下のとおりです。

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

アプリケーションコードのコンパイル

アプリケーションをコンパイルするには、次の操作を行います。

  1. Java と Maven がまだインストールされていない場合は、インストールします。詳細については、チュートリアル: Managed Service for Apache Flink で DataStream API の使用を開始するチュートリアルの「必要な前提条件を満たす」を参照してください。

  2. 次のコマンドを使用して、アプリケーションをコンパイルします。

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    注記

    提供されているソースコードは Java 11 のライブラリーに依存しています。

アプリケーションをコンパイルすると、アプリケーション JAR ファイル (target/basic-beam-app-1.0.jar) が作成されます。

Apache Flink Streaming Java Code のアップロードしてください

このセクションでは、依存リソースを作成する のセクションで作成した Amazon S3 バケットにアプリケーションコードをアップロードします。

  1. Amazon S3 コンソールで ka-app-code-<username> バケットを選択し、[アップロード] を選択します。

  2. ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した basic-beam-app-1.0.jar ファイルに移動します。

  3. オブジェクトの設定を変更する必要はないので、[アップロード] を選択してください。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

Managed Service for Apache Flink アプリケーションを作成して実行する

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

アプリケーションの作成

  1. にサインインし AWS Management Console、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

  2. Managed Service for Apache Flinkのダッシュボードで、「分析アプリケーションの作成」 を選択します。

  3. Managed Service for Apache Flink-アプリケーションの作成」ページで、次のようにアプリケーションの詳細を入力します。

    • [アプリケーション名] には MyApplication と入力します。

    • [ランタイム] には、[Apache Flink] を選択します。

      注記

      Apache Beam は現在、Apache Flink バージョン 1.19 以降と互換性がありません。

    • バージョンプルダウンから Apache Flink バージョン 1.15 を選択します。

  4. [アクセス許可] には、[IAM ロールの作成 / 更新kinesis-analytics-MyApplication-us-west-2] を選択します。

  5. [Create application] を選択します。

注記

コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-west-2

  • ロール: kinesis-analytics-MyApplication-us-west-2

IAM ポリシーを編集する

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. [ポリシー] を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-west-2 ポリシーを選択します。

  3. [概要] ページで、[ポリシーの編集] を選択します。[JSON] タブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

    JSON
    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

アプリケーションを設定する

  1. [MyApplication] ページで、[Congirue] を選択します。

  2. [Configure application] ページで、[Code location] を次のように指定します。

    • [Amazon S3 バケット] で、ka-app-code-<username>と入力します。

    • [Amazon S3 オブジェクトへのパス] で、basic-beam-app-1.0.jarと入力します。

  3. [Access to application resources] の [Access permissions] では、[Create / update IAM rolekinesis-analytics-MyApplication-us-west-2] を選択します。

  4. 次のように入力します。

    グループ ID キー
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. [Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。

  6. [CloudWatch logging] では、[Enable] チェックボックスをオンにします。

  7. [更新] を選択します。

注記

CloudWatch ログ記録の有効化を選択すると、Managed Service for Apache Flink がユーザーに代わってロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

このログストリームはアプリケーションのモニターに使用されます。このログストリームは、アプリケーションの結果の送信に使用されたログストリームとは異なります。

アプリケーションを実行する

Flink ジョブグラフは、アプリケーションを実行し、Apache Flink ダッシュボードを開き、目的の Flink ジョブを選択すると表示できます。

CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。

AWS リソースをクリーンアップする

このセクションでは、タンブリングウィンドウチュートリアルで作成した AWS リソースをクリーンアップする手順について説明します。

Managed Service for Apache Flink アプリケーションを削除する

  1. にサインインし AWS Management Console、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

  2. Apache Flink 用 Managed Serviceパネルで、MyApplication を選択します。

  3. アプリケーションのページで[削除]を選択し、削除を確認します。

Kinesis データストリームを削除する

  1. https://console.aws.amazon.com/kinesis」で Kinesis コンソールを開きます。

  2. Kinesis Data Streams パネルで、「ExampleInputStream」を選択します。

  3. ExampleInputStream」ページで、「Kinesis ストリームを削除」を選択し、削除を確定します。

  4. Kinesis ストリーム」ページで、「ExampleOutputStream」を選択し、「アクション」を選択し、「削除」を選択し、削除を確定します。

Amazon S3 オブジェクトとバケットを削除する

  1. Amazon S3 コンソール (https://console.aws.amazon.com/s3/) を開きます。

  2. ka-app-code-<username>バケットを選択します。

  3. [削除] を選択し、バケット名を入力して削除を確認します。

IAM リソースを削除する

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. ナビゲーションバーで、[ポリシー] を選択します。

  3. フィルターコントロールに「kinesis」と入力します。

  4. kinesis-analytics-service-MyApplication-us-west-2」ポリシーを選択します。

  5. [ポリシーアクション]、[削除] の順に選択します。

  6. ナビゲーションバーで [ロール]を選択します。

  7. kinesis-analytics-MyApplication-us-west-2」ロールを選択します。

  8. [ロールの削除] を選択し、削除を確定します。

CloudWatch リソースを削除する

  1. CloudWatch コンソールの https://console.aws.amazon.com/cloudwatch/ を開いてください。

  2. ナビゲーションバーで [ログ] を選択します。

  3. /aws/kinesis-analytics/MyApplication」ロググループを選択してください。

  4. [ロググループの削除]を選択し、削除を確認してください。

次のステップ

Apache Beam を使用してデータを変換する基本的なApache Flink アプリケーション用 Managed Service を作成し、実行しますた。次に、より高度なApache Flink ソリューション用 Managed Service の例として次のアプリケーションをご覧ください。