Python アプリケーション用の Apache Flink の作成と実行 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

Python アプリケーション用の Apache Flink の作成と実行

このセクションでは、ソースおよびシンクとして Kinesis ストリームを使用して、Python アプリケーション用の Managed Service for Apache Flink を作成します。

依存リソースを作成する

このエクササイズで Apache Flink 用 Managed Service を作成する前に、以下の依存リソースを作成します。

  • 入力用と出力用の 2 つの Kinesis ストリーム。

  • アプリケーションのコードを保存する Amazon S3 バケット。

注記

このチュートリアルでは、アプリケーションを us-east-1 リージョンにデプロイすることが前提とされます。別のリージョンを使用する場合、必要に応じてすべてのステップを調整する必要があります。

2 つの Kinesis ストリームを作成する

この演習用に Managed Service for Apache Flink アプリケーションを作成する前に、アプリケーション (この例では us-east-1) のデプロイに使用するのと同じリージョンで 2 つの Kinesis Data Streams (ExampleInputStream および ExampleOutputStream) を作成します。アプリケーションでは、これらのストリームを使用してアプリケーションの送信元と送信先のストリームを選択します。

これらのストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールの操作方法については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Creating and Updating Data Streams」を参照してください。

データストリームを作成するには (AWS CLI)
  1. 次の Amazon Kinesis create-stream AWS CLI コマンドを使用して、1 つ目のストリーム (ExampleInputStream) を作成します。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. アプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を ExampleOutputStream に変更して同じコマンドを実行します。

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

Amazon S3 バケットを作成する

Amazon S3 バケットは、コンソールを使用して作成できます。このリソースの作成手順については、次のトピックを参照してください。

  • Amazon Simple Storage Service ユーザーガイド」の「How Do I Create an S3 Bucket?」。Amazon S3 バケットにグローバルに一意の名前を付けます (例えば、ログイン名を追加して)。

    注記

    このチュートリアルで使用するリージョン (us-east-1) で必ず S3 バケットを作成してください。

その他のリソース

アプリケーションを作成すると、Apache Flink 用 Managed Service によって次の Amazon CloudWatch リソースが作成されます(これらのリソースがまだ存在しない場合)。

  • /AWS/KinesisAnalytics-java/<my-application>という名前のロググループ。

  • kinesis-analytics-log-stream というログストリーム。

ローカルの開発環境のセットアップ

開発およびデバッグの場合、マシンで Python Flink アプリケーションを実行できます。python main.py または任意の Python IDE を使用して、コマンドラインからアプリケーションを起動できます。

注記

開発マシンには Python 3.10 または 3.11 の他に、Java 11、Apache Maven、Git がインストールされている必要があります。PyCharmVisual Studio Code などの IDE を使用することをお勧めします。すべての前提条件を満たしていることを確認するには、先に進む前に「演習を完了するための前提条件を満たす」を参照してください。

アプリケーションを開発してローカルで実行するには、Flink Python ライブラリをインストールする必要があります。

  1. VirtualEnv、Conda、同様の Python ツールを使用して、スタンドアロンの Python 環境を作成します。

  2. PyFlink ライブラリをその環境にインストールします。Amazon Managed Service for Apache Flink で使用する Apache Flink ランタイムバージョンと同じものを使用します。現在、推奨されるランタイムは 1.19.1 です。

    $ pip install apache-flink==1.19.1
  3. アプリケーションを実行するときの環境がアクティブであることを確認してください。IDE でアプリケーションを実行する場合、IDE がランタイムとして環境を使用していることを確認してください。プロセスは使用している IDE によって異なります。

    注記

    必要な準備は PyFlink ライブラリをインストールするだけです。マシンに Apache Flink クラスターをインストールする必要はありません

AWS セッションを認証する

アプリケーションは Kinesis Data Streams を使用してデータを発行します。ローカルで実行する場合、Kinesis データストリームに書き込むため、アクセス許可を持つ有効な AWS 認証済みセッションが必要です。次のステップに従って、セッションを認証します。

  1. AWS CLI および有効な認証情報が設定された名前付きプロファイルがない場合、「AWS Command Line Interface (AWS CLI) をセットアップする」を参照してください。

  2. 次のテストレコードを発行することで、AWS CLI が正しく設定されて、ユーザーが Kinesis データストリームに書き込むアクセス許可を持っていることを確認してください。

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. IDE に AWS と統合するプラグインがある場合、それを使用して IDE で実行されているアプリケーションに認証情報を渡すことができます。詳細については、「AWS Toolkit for PyCharm」、「「AWS Toolkit for Visual Studio Code」、「AWS Toolkit for IntelliJ IDEA」を参照してください。

Apache Flink ストリーミング Python コードをダウンロードして検証する

この例の Python アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

  1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. ./python/GettingStarted ディレクトリに移動します。

アプリケーションコンポーネントを確認する

アプリケーションコードは main.py にあります。Python に埋め込まれた SQL を使用して、アプリケーションのフローを定義します。

注記

最適化された開発者エクスペリエンスをお使いのマシンで開発するため、アプリケーションは Amazon Managed Service for Apache Flink とローカルの両方でコードを変更せずに実行されるように設計されています。アプリケーションは環境変数 IS_LOCAL = true を使用して、ローカルで実行されているタイミングを検出します。環境変数 IS_LOCAL = true をシェルまたは IDE の実行設定で設定する必要があります。

  • アプリケーションは実行環境を設定して、ランタイム設定を読み取ります。Amazon Managed Service for Apache Flink およびローカルの両方で動作させるため、アプリケーションは IS_LOCAL 変数を確認します。

    • 次の内容は、アプリケーションを Amazon Managed Service for Apache Flink で実行するときのデフォルト動作です。

      1. アプリケーションでパッケージ化された依存関係を読み込む 詳細については、「(リンク)」 を参照してください。

      2. Amazon Managed Service for Apache Flink アプリケーションで定義したランタイムプロパティから設定を読み込みます。詳細については、「(リンク)」 を参照してください。

    • ローカルでアプリケーションを実行するときにアプリケーションが IS_LOCAL = true を検出した場合

      1. プロジェクトから外部依存関係を読み込みます。

      2. プロジェクトに含まれる application_properties.json ファイルから設定を読み込みます。

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • アプリケーションは Kinesis コネクタを使用して、CREATE TABLE ステートメントでソーステーブルが定義されます。このテーブルは、入力 Kinesis ストリームからデータを読み取ります。アプリケーションにより、ランタイム設定からストリームの名前、リージョン、初期位置が取得されます。

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • この例では、アプリケーションで Kinesis コネクタ を使用してシンクテーブルも定義されます。このテーブルは出力 Kinesis ストリームにデータを送信します。

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • 最後に、アプリケーションにより、ソーステーブルからシンクテーブルを INSERT INTO... する SQL が実行されます。より複雑なアプリケーションでは、シンクに書き込む前にデータを変換する追加のステップが含まれている可能性があります。

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • アプリケーションをローカルで実行するには、main() 関数の最後に別のステップを追加する必要があります。

    if is_local: table_result.wait()

    このステートメントがないと、ローカルで実行したときにアプリケーションは直ちに終了します。Amazon Managed Service for Apache Flink でアプリケーションを実行するとき、このステートメントを実行することはできません。

JAR 依存関係を管理する

通常、PyFlink アプリケーションには 1 つ以上のコネクタが必要です。このチュートリアルのアプリケーションは Kinesis コネクタ を使用します。Apache Flink は Java JVM で実行されるため、Python でアプリケーションを実装するかどうかを問わず、コネクタは JAR ファイルとして配布されます。Amazon Managed Service for Apache Flink にデプロイするとき、これらの依存関係をアプリケーションと一緒にパッケージ化する必要があります。

この例では、Apache Maven を使用して依存関係を取得し、アプリケーションをパッケージ化して Managed Service for Apache Flink で実行する方法について示されます。

注記

依存関係を取得してパッケージ化する別の方法があります。この例では、1 つ以上のコネクタで正しく動作するメソッドが示されます。コードを変更せず、アプリケーションをローカルで開発用に実行することも、Managed Service for Apache Flink で実行することもできます。

pom.xml ファイルを使用する

Apache Maven は pom.xml ファイルを使用して、依存関係およびアプリケーションのパッケージ化を制御します。

JAR 依存関係は、<dependencies>...</dependencies> ブロックの pom.xml ファイルで指定されます。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

使用するコネクタの正しいアーティファクトおよびバージョンを確認するには、「Managed Service for Apache Flink で Apache Flink コネクタを使用する」を参照してください。使用している Apache Flink のバージョンを必ず参照してください。この例では、Kinesis コネクタを使用します。Apache Flink 1.19 の場合、コネクタのバージョンは 4.3.0-1.19 です。

注記

Apache Flink 1.19 を使用している場合、特にこのバージョン用にリリースされたコネクタのバージョンはありません。1.18 用にリリースされたコネクタを使用します。

ダウンロードおよびパッケージ化の依存関係

Maven を使用して pom.xml ファイルで定義されている依存関係をダウンロードし、Python Flink アプリケーション用にパッケージ化します。

  1. python/GettingStarted という Python 入門プロジェクトを含むディレクトリに移動します。

  2. 次のコマンドを実行してください。

$ mvn package

Maven は、./target/pyflink-dependencies.jar という名前の新しいファイルを作成します。マシンでローカルに開発するとき、Python アプリケーションはこのファイルを検索します。

注記

このコマンドの実行を忘れて、アプリケーションを実行しようとした場合、[識別子「kinesis」のファクトリが見つかりませんでした] というエラーが表示され失敗します。

入力ストリームにサンプルレコードを書き込む

このセクションでは、アプリケーションが処理するサンプルレコードをストリームに送信します。サンプルデータを生成するオプションは 2 つあり、Python スクリプトまたは Kinesis Data Generator のいずれかを使用します。

Python スクリプトを使用してサンプルデータを生成する

Python スクリプトを使用して、サンプルレコードをストリームに送信できます。

注記

この Python スクリプトを実行するには、Python 3.x を使用して AWS SDK for Python (Boto) ライブラリがインストールされている必要があります。

Kinesis 入力ストリームにテストデータの送信を開始する方法

  1. Data Generator GitHub リポジトリから Data Generator stock.py Python スクリプトをダウンロードします。

  2. stock.py スクリプトを実行します。

    $ python stock.py

チュートリアルの残りの部分を実践する間、スクリプトを実行し続けてください。Apache Flink アプリケーションを実行できるようになりました。

Kinesis Data Generator を使用してサンプルデータを生成する

Python スクリプトを使用する代替手段として、ホストバージョンでも利用可能な Kinesis Data Generator を使用し、ランダムなサンプルデータをストリームに送信できます。Kinesis Data Generator はブラウザで実行されるため、マシンに何もインストールする必要はありません。

Kinesis Data Generator を設定して実行する方法

  1. Kinesis Data Generator ドキュメント」の指示に従って、ツールへのアクセスを設定します。ユーザーおよびパスワードを設定する CloudFormation テンプレートを実行します。

  2. CloudFormation テンプレートによって生成された URL を介して Kinesis Data Generator にアクセスします。CloudFormation テンプレートが完了したら、[出力] タブに URL が表示されます。

  3. Data Generator を設定します。

    • リージョン: このチュートリアルで使用しているリージョン (us-east-1) を選択します。

    • ストリーム/配信ストリーム: アプリケーションが使用する入力ストリーム (ExampleInputStream) を選択します。

    • 1 秒あたりのレコード数: 100

    • レコードテンプレート: 次のテンプレートをコピーして貼り付けます。

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. テンプレートをテストする: [テンプレートのテスト] を選択し、生成されたレコードが次の内容と同じであることを確認してください。

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Data Generator を起動する: [データ送信の選択] を選択します。

現在、Kinesis Data Generator は ExampleInputStream にデータを送信しています。

アプリケーションをローカルで実行する

python main.py を使用してコマンドラインから実行するか IDE から実行して、アプリケーションをローカルでテストできます。

アプリケーションをローカルで実行するには、前のセクションで記述されているとおり、PyFlink ライブラリの正しいバージョンがインストールされている必要があります。詳細については、「(リンク)」 を参照してください。

注記

続行する前に、入力ストリームと出力ストリームが利用できることを確認してください。「2 つの Amazon Kinesis Data Streams を作成する」を参照してください。また、両方のストリームから読み書きするアクセス許可があることを確認してください。「AWS セッションを認証する」を参照してください。

Python プロジェクトを IDE にインポートする

IDE でアプリケーションの使用を開始するには、Python プロジェクトとしてインポートする必要があります。

クローンしたリポジトリには、複数の例が含まれています。各例は個別のプロジェクトです。このチュートリアルでは、./python/GettingStarted サブディレクトリのコンテンツを IDE にインポートします。

コードを既存の Python プロジェクトとしてインポートします。

注記

新しい Python プロジェクトをインポートする正確なプロセスは、使用している IDE によって異なります。

ローカルアプリケーション設定を確認する

ローカルで実行すると、アプリケーションでは ./src/main/resources のプロジェクトのリソースフォルダにある application_properties.json ファイルの設定が使用されます。このファイルを編集して、異なる Kinesis ストリーム名またはリージョンを使用できます。

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Python アプリケーションをローカルで実行する

アプリケーションはローカルで実行できますが、コマンドラインから通常の Python スクリプトとして実行するか、IDE から実行できます。

コマンドラインからアプリケーションを実行する方法
  1. Python Flink ライブラリをインストールした Conda や VirtualEnv などのスタンドアロン Python 環境が、現在アクティブであることを確認してください。

  2. mvn package を少なくとも 1 回実行したことを確認してください。

  3. IS_LOCAL = true 環境変数を設定します:

    $ export IS_LOCAL=true
  4. アプリケーションを通常の Python スクリプトとして実行します。

    $python main.py
IDE 内からアプリケーションを実行する方法
  1. 次の設定で main.py スクリプトを実行するように IDE を設定します。

    1. PyFlink ライブラリをインストールした Conda や VirtualEnv などのスタンドアロン Python 環境を使用します。

    2. AWS 認証情報を使用して、入出力の Kinesis Data Streams にアクセスします。

    3. IS_LOCAL = true を設定します。

  2. 実行設定を設定する正確なプロセスは、IDE によって異なります。

  3. IDE を設定したら、アプリケーションの実行中に Python スクリプトを実行し、IDE が提供するツールを使用します。

アプリケーションログをローカルで確認する

ローカルで実行するとき、アプリケーションはコンソールでログを表示しません。ただし、アプリケーションの起動時に数行が出力されて表示されます。PyFlink は、Python Flink ライブラリがインストールされているディレクトリ内のファイルにログを書き込みます。アプリケーションにより、起動時にログの場所が出力されます。次のコマンドを実行してログを取得することができます。

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. ファイルをログ記録ディレクトリに一覧表示します。通常、1 つの .log ファイルがあります。

  2. アプリケーションの実行中にファイルを監視します (tail -f <log-path>/<log-file>.log)。

Kinesis ストリームで入出力データを観察する

Amazon Kinesis コンソールで [データビューワー] を使用することで、(生成サンプル Python) または Amazon Kinesis Data Generator (リンク) によって入力ストリームに送信されるレコードを観察できます。

レコードを観察する方法

ローカルで実行されているアプリケーションを停止する

IDE で実行されているアプリケーションを停止します。  通常、IDE には「停止」オプションがあります。正確な場所および方法は IDE によって異なります。

アプリケーションコードをパッケージ化する

このセクションでは、Apache Maven を使用して、アプリケーションコードおよび必要な依存関係をすべて .zip ファイルにパッケージ化します。

Maven パッケージコマンドを再度実行します。

$ mvn package

このコマンドは target/managed-flink-pyflink-getting-started-1.0.0.zip ファイルを生成します。

Amazon S3 バケットにデモアプリケーションをアップロードする

このセクションでは、前のセクションで作成した .zip ファイルを、このチュートリアルの冒頭で作成した Amazon Simple Storage Service (Amazon S3) バケットにアップロードします。このステップを完了していない場合、「(リンク)」を参照してください。

アプリケーションコードの JAR ファイルをアップロードする方法
  1. Amazon S3 コンソール (https://console.aws.amazon.com/s3/) を開きます。

  2. アプリケーションコード用に以前作成したバケットを選択します。

  3. アップロードを選択します。

  4. ファイルの追加を選択します。

  5. 前のステップで生成された .zip ファイルに移動します (target/managed-flink-pyflink-getting-started-1.0.0.zip)。

  6. 他の設定を変更せずに [アップロード] を選択します。

Managed Service for Apache Flink アプリケーションを作成して設定する

コンソールまたは AWS CLI のいずれかを使用して Managed Service for Apache Flink アプリケーションを作成および設定することができます。このチュートリアルでは、コンソールを使用します。

アプリケーションの作成

  1. AWS マネジメントコンソールにサインインして、Amazon MSF コンソール (https://console.aws.amazon.com/flink) を開きます。

  2. 正しいリージョンが選択されていることを確認してください: 米国東部 (バージニア北部)us-east-1。

  3. 右側のメニューを開いて [Apache Flink アプリケーション] を選択したら、[ストリーミングアプリケーションの作成] を選択します。または、最初のページの[開始方法] セクションの [ストリーミングアプリケーションの作成] を選択します。

  4. [ストリーミングアプリケーションの作成] ページで、次の操作を行います。

    • [ストリーム処理アプリケーションの設定方法の選択] には、[最初から作成] を選択します。

    • [Apache Flink の設定、Application Flink バージョン] には、[Apache Flink 1.19] を選択します。

    • [アプリケーション設定] の場合

      • [アプリケーション名] には MyApplication と入力します。

      • [Description (説明)] に My Python test app と入力します。

      • [アプリケーションリソースへのアクセス] で、[必要なポリシーを使用して IAM ロール kinesis-analytics-MyApplication-us-east-1 を作成/更新] を選択します。

    • [アプリケーション設定のテンプレート] の場合

      • [テンプレート] には、[開発] を選択します。

    • [ストリーミングアプリケーションの作成] を選択します。

注記

コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。

  • ポリシー: kinesis-analytics-service-MyApplication-us-west-2

  • ロール: kinesisanalytics-MyApplication-us-west-2

Amazon Managed Service for Apache Flink は、以前は Kinesis Data Analytics と呼ばれていました。自動的に生成されるリソースの名前には、下位互換性のために「kinesis-analytics」のプレフィックスが付きます。

IAM ポリシーを編集する

Amazon S3 バケットにアクセスする許可を追加するように IAM ポリシーを編集します。

IAM ポリシーを編集して S3 バケット権限を追加するには
  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. [ポリシー] を選択します。前のセクションでコンソールによって作成された kinesis-analytics-service-MyApplication-us-east-1 ポリシーを選択します。

  3. [編集] を選択して、[JSON] タブを選択します。

  4. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (012345678901) を自分のアカウント ID に置き換えます。

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. [次へ]、[変更を保存] の順に選択します。

アプリケーションを設定する

アプリケーション設定を編集して、アプリケーションコードアーティファクトを設定します。

アプリケーションを構成するには
  1. [MyApplication] ページで、[Congirue] を選択します。

  2. [アプリケーションコードの場所] セクションで、次の操作を行います。

    • [Amazon S3 バケット] には、アプリケーションコード用に以前作成したバケットを選択します。[参照] を選択して正しいバケットを選択したら、[選択] を選択します。バケット名を選択しないでください。

    • [Amazon S3 オブジェクトへのパス] で、managed-flink-pyflink-getting-started-1.0.0.zipと入力します。

  3. [アクセス許可] には、[必要なポリシーで IAM ロール kinesis-analytics-MyApplication-us-east-1 を作成/更新] を選択します。

  4. [ランタイムプロパティ] に移動し、他のすべての設定はデフォルト値のままにします。

  5. [新しい項目の追加] を選択して、次のパラメータをすべて追加します。

    グループ ID キー
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. 他のセクションは変更せず、[変更を保存] を選択します。

注記

Amazon CloudWatch ログ記録を有効にすることを選択すると、ロググループとログストリームが Kinesis Data Analytics によって作成されます。これらのリソースの名前は次のとおりです。

  • ロググループ: /aws/kinesis-analytics/MyApplication

  • ログストリーム: kinesis-analytics-log-stream

アプリケーションを実行する

これでアプリケーションが設定され、実行する準備が整いました。

アプリケーションを実行するには
  1. Amazon Managed Service for Apache Flink のコンソールで、[自分のアプリケーション] を選択して [実行] を選択します。

  2. 次のページのアプリケーション復元設定ページで、[最新のスナップショットで実行] を選択したら、[実行] を選択します。

    [アプリケーション詳細][ステータス] は「Ready」から「Starting」に移行し、アプリケーションが起動されると「Running」に移行します。

アプリケーションが「Running」ステータスのとき、Flink ダッシュボードを開けるようになります。

ダッシュボードを開くには
  1. [Open Apache Flink ダッシュボード] を選択します。ダッシュボードは新しいページで開かれます。

  2. [実行中のジョブ] リストで、表示されている 1 つのジョブを選択します。

    注記

    ランタイムプロパティを設定したり、IAM ポリシーを誤って編集したりすると、アプリケーションのステータスが「Running」になることがありますが、Flink ダッシュボードではジョブが継続的に再起動されていることが表示されます。これは、アプリケーションの設定が間違っているか、外部リソースへのアクセス許可がない場合の一般的な障害シナリオです。

    これが発生した場合、Flink ダッシュボードの [例外] タブを見て、問題の原因を確認してください。

実行中のアプリケーションのメトリクスを観察する

[MyApplication] ページの [Amazon CloudWatch メトリクス] セクションで、実行中のアプリケーションの基本的なメトリクスの一部を確認できます。

メトリクスを表示する方法
  1. [更新] ボタンの横にあるドロップダウンリストから [10 秒] を選択します。

  2. アプリケーションが実行中で正常なとき、[アップタイム] メトリクスが継続的に増加していることを確認できます。

  3. [完全再起動] メトリクスはゼロである必要があります。増加している場合、設定に問題がある可能性があります。問題を調査するには、Flink ダッシュボードの [例外] タブを確認してください。

  4. 正常なアプリケーションでは、[失敗したチェックポイント数] メトリクスは 0 です。

    注記

    このダッシュボードには、5 分の粒度で一定の一連のメトリクスが表示されます。CloudWatch ダッシュボードで任意のメトリクスを使用してカスタムアプリケーションのダッシュボードを作成できます。

Kinesis ストリームの出力データを観察する

Python スクリプトまたは Kinesis Data Generator のいずれかを使用して、入力にデータを引き続き発行していることを確認してください。

https://console.aws.amazon.com/kinesis/」のデータビューワーを使用 (以前に行った内容と同様に) することで、Managed Service for Apache Flink で実行されているアプリケーションの出力を観察できるようになりました。

出力の表示方法
  1. Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. リージョンが、このチュートリアルの実行に使用しているリージョンと同じであることを確認してください。デフォルトでは、us-east-1 米国東部 (バージニア北部) です。必要に応じてリージョンを変更します。

  3. [データストリーム] を選択します。

  4. 観察するストリームを選択します。このチュートリアルでは、ExampleOutputStream を使用します。

  5. [データビューワー] タブを選択します。

  6. 任意の [シャード] を選択し、[最新][開始位置] のままにして [レコードの取得] を選択します。「このリクエストのレコードが見つかりません」というエラーが表示される場合があります。この場合、[レコードの取得を再試行] を選択します。ストリームディスプレイに発行された最新レコード。

  7. データ列の値を選択して、レコードの内容を JSON 形式で確認します。

アプリケーションを停止する

アプリケーションを停止するには、MyApplication という名前の Managed Service for Apache Flink アプリケーションのコンソールページに移動します。

アプリケーションを停止するには
  1. [アクション] ドロップダウンリストで、[停止] を選択します。

  2. [アプリケーション詳細][ステータス] は「Running」から「Stopping」に移行し、アプリケーションが完全に停止すると「Ready」に移行します。

    注記

    Python スクリプトまたは Kinesis Data Generator から入力ストリームへのデータ送信も必ず停止してください。

次のステップ

AWS リソースをクリーンアップする