Managed Service for Apache Flink での Java の例 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

Managed Service for Apache Flink での Java の例

次の例では、Java で記述したアプリケーションを作成する方法について説明します。

注記

ほとんどの例は、ローカル、開発マシン、選択した IDE、Amazon Managed Service for Apache Flink の両方で実行されるように設計されています。これらは、アプリケーションパラメータを渡すために使用できるメカニズムと、両方の環境でアプリケーションを変更せずに実行するために依存関係を正しく設定する方法を示しています。

この例は、レコードまたは状態オブジェクトでカスタム TypeInfo を定義して、シリアル化が効率の低い Kryo シリアル化にフォールバックしないようにする方法を示します。これは、オブジェクトに List または Map が含まれている場合などに必要です。詳細については、Apache Flink ドキュメントの「Data Types & Serialization」を参照してください。この例では、オブジェクトのシリアル化が効率の低い Kryo シリアル化にフォールバックするかどうかをテストする方法も示しています。

コード例: CustomTypeInfo

この例は、DataStream API を使用して Kinesis データストリームから読み取り、別の Kinesis データストリームに書き込むシンプルなアプリケーションを示しています。この例では、正しい依存関係でファイルをセットアップし、uber-JAR をビルドして設定パラメータを解析する方法を示しています。これにより、アプリケーションをローカル、IDE、Amazon Managed Service for Apache Flink の両方で実行できます。

コード例: GettingStarted

この例は、Table API と SQL を使用したシンプルなアプリケーションを示しています。同じ Java アプリケーションで DataStream API を Table API または SQL と統合する方法を示します。また、DataGen コネクタを使用して、外部データジェネレーターを必要とせずに、Flink アプリケーション自体の中でランダムなテストデータを生成する方法も示します。

完全な例: GettingStartedTable

この例は、DataStream API の FileSink を使用して S3 バケットに JSON ファイルを書き込む方法を示します。

コード例: S3Sink

この例は、標準コンシューマーまたは EFO のいずれかを使用して Kinesis データストリームから消費するソースを設定する方法と、Kinesis データストリームへのシンクを設定する方法を示します。

コード例: KinesisConnectors

この例は、Amazon Data Firehose (旧称 Kinesis Data Firehose) にデータを送信する方法を示します。

コード例: KinesisFirehoseSink

この例は、Prometheus シンクコネクタを使用して時系列データを Prometheus に書き込む方法を示します。

コード例: PrometheusSink

この例は、DataStream API の 4 タイプのウィンドウイング集約を示しています。

  1. 処理時間に基づくスライディングウィンドウ

  2. イベント時間に基づくスライディングウィンドウ

  3. 処理時間に基づくタンブリングウィンドウ

  4. イベント時間に基づくタンブリングウィンドウ

コード例: Windowing

この例は、Flink アプリケーションにカスタムメトリクスを追加して CloudWatch メトリクスに送信する方法を示します。

コード例: CustomMetrics

この例は、Kafka 設定プロバイダーを使用して、Kafka コネクタの mTLS 認証用の証明書を持つカスタムキーストアとトラストストアを設定する方法を示します。この方法では、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動時に AWS Secrets Manager からシークレットをロードできます。

コード例: Kafka-mTLS-Keystore-ConfigProviders

この例は、Kafka 設定プロバイダーを使用して AWS Secrets Manager から認証情報を取得し、Amazon S3 からトラストストアをダウンロードして、Kafka コネクタで SASL/SCRAM 認証を設定する方法を示します。この方法では、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動時に AWS Secrets Manager からシークレットをロードできます。

コード例: Kafka-SASL_SSL-ConfigProviders

この例は、Table API/SQL で Kafka 設定プロバイダーを使用して、Kafka コネクタの mTLS 認証用の証明書を持つカスタムキーストアとトラストストアを設定する方法を示します。この方法では、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動時に AWS Secrets Manager からシークレットをロードできます。

コード例: Kafka-mTLS-Keystore-Sql-ConfigProviders

この例は、Apache Flink のサイド出力を活用して、指定された属性でストリームを分割する方法を示します。このパターンは、ストリーミングアプリケーションでデッドレターキュー (DLQ) の概念を実装しようとする場合に特に役立ちます。

コード例: SideOutputs

この例は、Apache Flink 非同期 I/O を使用して外部エンドポイントをノンブロッキング方式で呼び出し、復元可能なエラーを再試行する方法を示します。

コード例: AsyncIO