Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
Managed Service for Apache Flink での Java の例
次の例では、Java で記述したアプリケーションを作成する方法について説明します。
注記
ほとんどの例は、ローカル、開発マシン、選択した IDE、Amazon Managed Service for Apache Flink の両方で実行されるように設計されています。これらは、アプリケーションパラメータを渡すために使用できるメカニズムと、両方の環境でアプリケーションを変更せずに実行するために依存関係を正しく設定する方法を示しています。
この例は、レコードまたは状態オブジェクトでカスタム TypeInfo を定義して、シリアル化が効率の低い Kryo シリアル化にフォールバックしないようにする方法を示します。これは、オブジェクトに List または Map が含まれている場合などに必要です。詳細については、Apache Flink ドキュメントの「Data Types & Serialization
コード例: CustomTypeInfo
この例は、DataStream API を使用して Kinesis データストリームから読み取り、別の Kinesis データストリームに書き込むシンプルなアプリケーションを示しています。この例では、正しい依存関係でファイルをセットアップし、uber-JAR をビルドして設定パラメータを解析する方法を示しています。これにより、アプリケーションをローカル、IDE、Amazon Managed Service for Apache Flink の両方で実行できます。
コード例: GettingStarted
この例は、Table API と SQL を使用したシンプルなアプリケーションを示しています。同じ Java アプリケーションで DataStream API を Table API または SQL と統合する方法を示します。また、DataGen コネクタを使用して、外部データジェネレーターを必要とせずに、Flink アプリケーション自体の中でランダムなテストデータを生成する方法も示します。
完全な例: GettingStartedTable
この例は、DataStream API の FileSink を使用して S3 バケットに JSON ファイルを書き込む方法を示します。
コード例: S3Sink
この例は、標準コンシューマーまたは EFO のいずれかを使用して Kinesis データストリームから消費するソースを設定する方法と、Kinesis データストリームへのシンクを設定する方法を示します。
コード例: KinesisConnectors
この例は、Amazon Data Firehose (旧称 Kinesis Data Firehose) にデータを送信する方法を示します。
コード例: KinesisFirehoseSink
この例は、Prometheus シンクコネクタ
コード例: PrometheusSink
この例は、DataStream API の 4 タイプのウィンドウイング集約を示しています。
-
処理時間に基づくスライディングウィンドウ
-
イベント時間に基づくスライディングウィンドウ
-
処理時間に基づくタンブリングウィンドウ
-
イベント時間に基づくタンブリングウィンドウ
コード例: Windowing
この例は、Flink アプリケーションにカスタムメトリクスを追加して CloudWatch メトリクスに送信する方法を示します。
コード例: CustomMetrics
この例は、Kafka 設定プロバイダーを使用して、Kafka コネクタの mTLS 認証用の証明書を持つカスタムキーストアとトラストストアを設定する方法を示します。この方法では、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動時に AWS Secrets Manager からシークレットをロードできます。
この例は、Kafka 設定プロバイダーを使用して AWS Secrets Manager から認証情報を取得し、Amazon S3 からトラストストアをダウンロードして、Kafka コネクタで SASL/SCRAM 認証を設定する方法を示します。この方法では、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動時に AWS Secrets Manager からシークレットをロードできます。
この例は、Table API/SQL で Kafka 設定プロバイダーを使用して、Kafka コネクタの mTLS 認証用の証明書を持つカスタムキーストアとトラストストアを設定する方法を示します。この方法では、Amazon S3 から必要なカスタム証明書をロードし、アプリケーションの起動時に AWS Secrets Manager からシークレットをロードできます。
この例は、Apache Flink のサイド出力
コード例: SideOutputs
この例は、Apache Flink 非同期 I/O
コード例: AsyncIO