スループットの増加に合わせた入力ストリームの並列処理 - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

スループットの増加に合わせた入力ストリームの並列処理

注記

2023 年 9 月 12 日以降、SQL 用 Kinesis Data Analytics をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「制限」を参照してください。

Amazon Kinesis Data Analytics アプリケーションでは、アプリケーション内入力ストリームのスループットを超えるアプリケーションをスケーリングするために、複数のアプリケーション内入力ストリームをサポートできます。アプリケーション内入力ストリームの詳細については、「Amazon Kinesis Data Analytics for SQL Applications: 仕組み」を参照してください。

ほとんどの場合、Amazon Kinesis Data Analytics は、アプリケーションにフィードする Kinesis ストリームまたは Firehose ソースストリームの容量を処理するようにアプリケーションをスケーリングします。ただし、ソースストリームのスループットが、単一のアプリケーション内入力ストリームのスループットを超える場合は、アプリケーションで使用されるアプリケーション内入力ストリームの数を明示的に増やすことができます。そのためには、InputParallelism パラメータを使用します。

InputParallelism パラメータが 1 以上の場合、Amazon Kinesis Data Analytics は、アプリケーション内ストリーム間のソースストリームのパーティションを均等に分割します。たとえば、ソースストリームに 50 シャードあり、InputParallelism2 に設定した場合、アプリケーション内入力ストリームはそれぞれ、25 のソースストリームのシャードから入力を受け取ります。

アプリケーション内ストリームの数を増やす場合は、アプリケーションから、各ストリームのデータに明示的にアクセスする必要があります。コードで複数のアプリケーション内ストリームにアクセスする方法については、「Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス」を参照してください。

Kinesis Data Streams と Firehose ストリームシャードはどちらも同じ方法でアプリケーション内ストリームに分割されますが、アプリケーションに表示される方法も異なります。

  • Kinesis データストリームのレコードには、shard_id フィールドが含まれており、レコードのソースシャードを識別できます。

  • Firehose 配信ストリームからのレコードには、レコードのソースシャードまたはパーティションを識別するフィールドは含まれません。これは、Firehose がこの情報をアプリケーションから抽象化するためです。

アプリケーション内入力ストリームの数の増加を評価する

ほとんどの場合、入力ストリームの複雑性やデータサイズに応じて、1 つのアプリケーション内入力ストリームで、1 つのソースストリームのスループットを処理することができます。アプリケーション内入力ストリームの数を増やす必要があるかどうかを判断するには、Amazon で InputBytesおよび MillisBehindLatestメトリクスをモニタリングできます CloudWatch。

InputBytes メトリクスが 100 MB/秒より大きい場合 (または、このレートより大きくなることが予想される場合)、MillisBehindLatest が増えたり、アプリケーションの問題の影響が大きくなったりする可能性があります。この問題に対応するため、アプリケーションに対して次の言語を選択することをお勧めします。

  • アプリケーションのスケーリングニーズが 100 MB/秒を超える場合は、複数のストリームと Kinesis Data Analytics for SQL Applications を使用します。

  • 1 つのストリームとアプリケーションを使用する場合は、Kinesis Data Analytics for Java Applications を使用します。

MillisBehindLatest メトリクスに次のいずれかの特性がある場合は、アプリケーションの InputParallelism 設定を増やす必要があります。

  • MillisBehindLatest メトリクスが増加しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データが遅延していることを意味します。

  • MillisBehindLatest メトリクスは一貫して 1,000 (1 秒あたり) を超えています。

以下が真の場合は、アプリケーションの InputParallelism 設定を増やす必要はありません。

  • MillisBehindLatest メトリクスが減少しつつあります。これは、アプリケーションにおいて、ストリーム内の最新データの遅れを取り戻していることを意味します。

  • MillisBehindLatest メトリクスは一貫して 1,000 (1 秒あたり) を下回っています。

の使用の詳細については CloudWatch、「 CloudWatch ユーザーガイド」を参照してください。

複数のアプリケーション内入力ストリームの実装

CreateApplication」を使用してアプリケーションを作成する際、アプリケーション内入力ストリームの数を設定できます。この数は、「UpdateApplication」を使用してアプリケーションを作成した後に設定します。

注記

InputParallelism 設定は、Amazon Kinesis Data Analytics API または AWS CLIを使ってのみ設定できます。この設定は、 を使用して設定することはできません AWS Management Console。のセットアップについては AWS CLI、「」を参照してくださいステップ 2: AWS Command Line Interface (AWS CLI) をセットアップする

新しいアプリケーションの入力ストリームカウントの設定

次の例では、API アクション (CreateApplication) を使用して、新しいアプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。

CreateApplication の詳細については、「CreateApplication」を参照してください。

{ "ApplicationCode": "<The SQL code the new application will run on the input stream>", "ApplicationDescription": "<A friendly description for the new application>", "ApplicationName": "<The name for the new application>", "Inputs": [ { "InputId": "ID for the new input stream", "InputParallelism": { "Count": 2 }], "Outputs": [ ... ], }] }

既存アプリケーションの入力ストリームカウントの設定

次の例では、API アクション (UpdateApplication) を使用して、既存アプリケーションの入力ストリームカウントを 2 に設定する方法について解説します。

Update_Application の詳細については、「UpdateApplication」を参照してください。

{ "InputUpdates": [ { "InputId": "yourInputId", "InputParallelismUpdate": { "CountUpdate": 2 } } ], }

Amazon Kinesis Data Analytics アプリケーションでの別のアプリケーション内ストリームへのアクセス

複数のアプリケーション内入力ストリームをアプリケーションで使用するには、別のストリームから明示的に選択する必要があります。次のコード例では、入門チュートリアルで作成した複数の入力ストリームをアプリケーションでクエリを行う方法について説明します。

次の例では、in_application_stream001 という 1 つのアプリケーション内ストリームに結合される前に、まず COUNT を使用して各ソースストリームが集約されます。事前にソースストリームを集約すると、結合されたアプリケーション内ストリームで、負荷をかけ過ぎることなく複数のストリームからのトラフィックを処理しやすくなります。

注記

この例を実行して、両方のアプリケーション内入力ストリームから結果を得るには、ソースストリームのシャード数とアプリケーションの InputParallelism パラメータを両方とも更新します。

CREATE OR REPLACE STREAM in_application_stream_001 ( ticker VARCHAR(64), ticker_count INTEGER ); CREATE OR REPLACE PUMP pump001 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_001 GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND), ticker_symbol; CREATE OR REPLACE PUMP pump002 AS INSERT INTO in_application_stream_001 SELECT STREAM ticker_symbol, COUNT(ticker_symbol) FROM source_sql_stream_002 GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND), ticker_symbol;

前述のコード例では、以下のような出力を in_application_stream001 に生成します。

追加の考慮事項

複数の入力ストリームを使用する場合は、以下の点に注意してください。

  • アプリケーション内入力ストリームの最大数は 64 です。

  • アプリケーション内入力ストリームは、アプリケーションの入力ストリームのシャード間で均等に分散されます。

  • アプリケーション内ストリームの追加により向上するパフォーマンスは、直線的にスケールしません。つまり、アプリケーション内ストリームの数を 2 倍にしても、スループットは 2 倍になりません。一般的な行サイズを使用すると、アプリケーション内ストリームはそれぞれ、1 秒あたり約 5,000~15,000 行のスループットを達成します。アプリケーション内ストリームカウントを 10 に増やすことによって、1 秒あたり 20,000~30,000 行のスループットを達成できます。スループット速度は、入力ストリームのフィールドのカウント、データ型、サイズによって異なります。

  • 一部の集計関数 (AVG) では、別のシャードに分割されている入力ストリームに適用されると、予期しない結果が生成される場合があります。集計ストリームに結合する前に、個々のシャードで集計オペレーションを実行する必要があるため、レコードが多く含まれているストリームに関係なく加重される場合があります。

  • 入力ストリームの数を増やした後にアプリケーションのパフォーマンスが低下し続ける (高い MillisBehindLatest メトリクスにより反映される) 場合は、Kinesis 処理ユニット (KPU) の上限に達している可能性があります。詳細については、「アプリケーションを自動的にスケーリングしてスループットを向上させる」を参照してください。