ベストプラクティス - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ベストプラクティス

このセクションでは、Amazon Kinesis Data Analytics アプリケーションを使用する場合のベストプラクティスを説明します。

アプリケーションを管理する

Amazon Kinesis Data Analytics アプリケーションを管理するときは、以下のベストプラクティスに従います。

  • Amazon CloudWatch アラームの設定 – Kinesis Data Analytics が提供する CloudWatch メトリクスを使用して、以下をモニタリングできます。

    • 入力バイトおよび入力レコード (アプリケーションに入力されるバイト数およびレコード数)

    • 出力バイトおよび出力レコード

    • MillisBehindLatest (アプリケーションがストリーミングソースからの読み取りにおいてどの程度遅延しているか)

    本番稼働用アプリケーションでは、次のメトリクスに少なくとも 2 つの CloudWatch アラームを設定することをお勧めします。

    • MillisBehindLatest – ほとんどの場合、アプリケーションが 1 分間の平均で最新のデータから 1 時間遅延した場合にトリガーされるようにこのアラームを設定することをお勧めします。 end-to-end 処理ニーズの低いアプリケーションでは、これをより低い許容値に調整できます。このアラームを使用して、アプリケーションが最新データを読み取っていることを確認できます。

       

  • ReadProvisionedThroughputException の例外の発生を回避するには、同じ Kinesis データストリームから読み取る本稼働アプリケーションの数を 2 つのアプリケーションに制限します。

    注記

    この場合、アプリケーションは、ストリーミングソースを読み取ることができる任意のアプリケーションを意味します。Firehose 配信ストリームから読み取ることができるのは Kinesis Data Analytics アプリケーションのみです。ただし、Kinesis Data Analytics アプリケーションや など、多くのアプリケーションは Kinesis データストリームから読み取ることができます AWS Lambda。アプリケーション制限の推奨は、ストリーミングソースを読み取ることができるすべてのアプリケーションに対するものです。

     

    Amazon Kinesis Data Analytics は、アプリケーションごとに 1 秒に約 1 度、ストリーミングソースを読み取ります。ただし、遅延したアプリケーションは、追いつくためにより高速でデータを読み取る場合があります。アプリケーションが追い付くために十分なスループットを使用できるように、同じデータソースを読み取るアプリケーションの数を制限します。

     

  • 同じ Firehose 配信ストリームから読み取る本番アプリケーションの数を 1 つのアプリケーションに制限します。

    Firehose 配信ストリームは、Amazon S3 や Amazon Redshift などの宛先に書き込むことができます。これは、Kinesis Data Analytics アプリケーションのストリーミングソースとすることもできます。したがって、Firehose 配信ストリームごとに複数の Kinesis Data Analytics アプリケーションを設定しないことをお勧めします。これにより、配信ストリームが別の宛先にも配信できるようになります。

アプリケーションのスケーリング

アプリケーション内入力ストリームの数をデフォルト (1) からプロアクティブに増やすことで、将来のスケーリングニーズに備えてアプリケーションをセットアップします。アプリケーションのスループットに基づいて、次の言語を選択することをお勧めします。

  • アプリケーションのスケーリングニーズが 100 MB/秒を超える場合は、複数のストリームと Kinesis Data Analytics for SQL Applications を使用します。

  • 引き続き 1 つのストリームとアプリケーションを使用する場合は、Managed Service for Apache Flink Applications を使用します。

注記

アプリケーションの予測入力スループットが 100 MB/秒を超える場合は、複数の SQL アプリケーションを使用する計画を事前に立てたり、Apache Flink for Java アプリケーション用の managed-flink/latest/java/ に移行したりできるように、アプリケーションの InputProcessing.OkBytes メトリックを定期的に確認することをお勧めします。

アプリケーションのモニタリング

アプリケーションが入力スループット制限に近づいたときに通知InputProcessing.OkBytesされるように、 で CloudWatch アラームを作成することをお勧めします。これは、アプリケーションのクエリを更新してスループットの向上とトレードオフすることができ、分析のバックプレッシャーや遅延を回避できるので便利です。詳細については、「トラブルシューティング」を参照してください。これは、アップストリームのスループットを低下させるメカニズムがある場合にも役立ちます。

  • 1 つのアプリケーション内ストリームに対して推奨される最大スループットは、アプリケーションのクエリの複雑さに応じて、2 ~ 20 MB/秒です。

  • 単一の Kinesis Data Analytics の SQL アプリケーションで処理できる最大ストリーミングスループットは、約 100 MB/秒です。これは、アプリケーション内ストリームの数を最大値の 64 に増やし、KPU の制限を 8 より大きくしたことを前提にしています。詳細については、「制限」を参照してください。

注記

アプリケーションの予測入力スループットが 100 MB/秒を超える場合は、複数の SQL アプリケーションを使用する計画を事前に立てたり、Apache Flink for Java アプリケーション用の managed-flink/latest/java/ に移行したりできるように、アプリケーションの InputProcessing.OkBytes メトリックを定期的に確認することをお勧めします。

入力スキーマの定義

コンソールでアプリケーション入力を設定するばあいは、まずストリーミングソースを指定します。その後、コンソールは配信 API (「DiscoverInputSchema」を参照) を使用してストリーミングソースのレコードをサンプリングし、スキーマを推測します。このスキーマは、特に、結果のアプリケーション内ストリームの列の名前およびデータ型を定義します。コンソールにスキーマが表示されます。この推測スキーマで以下を行うことをお勧めします。

  • 推測スキーマを十分にテストします。検出プロセスでは、ストリーミングソースのレコードのサンプルのみを使用してスキーマを推測します。ストリーミングソースに多くのレコードタイプ がある場合、検出 API で 1 つまたは複数のレコードタイプのサンプリングが行われていない可能性があります。この状況により、ストリーミングソースのデータを正確に反映しないスキーマが発生する可能性があります。

    アプリケーションが起動した時に、これらの除外されたレコードタイプによって解析エラーが発生する場合があります。Amazon Kinesis Data Analytics は、これらのレコードをアプリケーション内エラーストリームに送信します。このような解析エラーを減らすために、推測スキーマをコンソールでインタラクティブにテストし、欠落したレコードがないかアプリケーション内ストリームをモニタリングすることをお勧めします。

     

  • API では、入力設定で列に対する NOT NULL 制約の指定はサポートされていません。アプリケーション内ストリームの列に NOT NULL 制約をつける場合は、アプリケーションコードを使用してそのようなアプリケーション内ストリームを作成します。その後、1 つのアプリケーション内ストリームから別の 1 つにデータをコピーすると、制約が反映されます。

    値が必要なときに NULL 値を含む行を挿入しようとすると、エラーが発生します。Kinesis Data Analytics は、これらのエラーをアプリケーション内エラーストリームに送信します。

     

  • 検出処理で推測されるデータ型を緩和します。検出プロセスでは、ランダムにサンプリングしたストリーミングソースのレコードに基づいて列とデータ型が推奨されます。これらを注意深く確認して、入力したレコードのあらゆるケースをカバーできるようにデータ型の緩和を検討することをお勧めします。これにより、アプリケーションの実行中に全体で解析エラーを減らすことができます。たとえば、推測スキーマに列タイプとして SMALLINT がある場合、これを INTEGER に変更することを検討します。

     

  • アプリケーションコードで SQL 関数を使用して、非構造化データまたは列を処理します。入力に、ログデータなど、非構造化データまたは列がある場合があります。例については、「例: DateTime 値の変換」を参照してください。このタイプのデータを処理する方法の 1 つとして、タイプが VARCHAR(N) である列 1 つのみを持つようにスキーマを定義する方法があります。ここでの N はストリームで発生する可能性のある最も大きい行です。その後、入力されるレコードをアプリケーションコードが読み取り、String および Date Time 関数を使用して、未加工データを解析してスキーマ化します。

     

  • 2 レベル以上の入れ子構造になっているストリーミングソースデータが完全に処理されていることを確認します。ソースデータが JSON である場合、入れ子構造になっていることがあります。検出 API は入れ子構造を 1 レベルにフラット化したスキーマを推測します。2 レベルの入れ子構造の場合も、検出 API でこれらのフラット化を試みます。2 レベルを超える入れ子構造の場合、フラット化のサポートには制限があります。入れ子構造を完全に処理するには、ニーズに合わせて推測スキーマを手動で編集する必要があります。以下の方法のいずれかを使用して行ってください。

     

    • JSON 列パスを使用して、アプリケーションに必要なキーと値のペアのみを、選択的に抽出します。JSON 列パスは、アプリケーションに持ってくる特定のキーと値のペアに対するポインタを提供します。これは入れ子構造の任意のレベルに対して実行できます。

    • JSON 列パスを使用して複雑な JSON オブジェクトを選択的に抽出し、アプリケーションコードで文字列操作関数を使用して必要な特定のデータを抽出します。

出力への接続

各アプリケーションに最低 2 つの出力を設定することをお勧めします。

  • 最初の宛先は、SQL クエリの結果を挿入するために使用します。

  • 2 番目の送信先を使用してエラーストリーム全体を挿入し、Firehose 配信ストリームを介して S3 バケットに送信します。

アプリケーションコードの作成

次の構成を推奨します。

  • 以下の理由で、SQL ステートメントでは 1 時間を超える時間ベースのウィンドウを指定しないでください。

    • 場合によっては、アプリケーションを再起動する必要があります。これは、アプリケーションを更新したためか、Kinesis Data Analytics の内部的な理由によるものです。再起動すると、ウィンドウに含まれるすべてのデータはストリーミングデータソースからもう一度読み取る必要があります。これにより、Kinesis Data Analytics でこのウィンドウの出力を発行できるようになるまでに時間がかかります。

    • Kinesis Data Analytics は、関連データを含むアプリケーションの状態に関連するすべてを、期間中保持する必要があります。これには、Kinesis Data Analytics 処理ユニットが大量に消費されます。

  • 開発中は、結果がより速く表示されるように、SQL ステートメントでウィンドウのサイズを小さくしておいてください。アプリケーションを本稼働環境にデプロイするときに、ウィンドウを適切なサイズに設定できます。

  • 1 つの複雑な SQL ステートメントよりも、複数のステートメントに分割して、それぞれのステップで結果を中間アプリケーション内ストリームに保存することを検討してください。迅速なデバッグに役立ちます。

  • タンブリングウィンドウを使用する場合は、2 つのウィンドウを使用し、1 つを処理時間、もう 1 つを論理時間 (取り込み時間またはイベント時間) にすることをお勧めします。詳細については、「タイムスタンプと ROWTIME 列」を参照してください。

アプリケーションのテスト

Kinesis Data Analytics アプリケーションのスキーマまたはアプリケーションコードを変更する場合は、テストアプリケーションを使用して、本稼働環境にデプロイする前に変更を確認することをお勧めします。

テストアプリケーションのセットアップ

テストアプリケーションのセットアップは、コンソールを通じて、または AWS CloudFormation テンプレートを使用して行うことができます。 AWS CloudFormation テンプレートを使用すると、テストアプリケーションとライブアプリケーションに加えたコード変更の一貫性を確保できます。

テストアプリケーションをセットアップする場合は、ライブデータにアプリケーションを接続するか、テスト対象のストリームにモックデータを入力できます。ストリームにモックデータを入力するために、次の 2 つの方法をお勧めします。

  • Kinesis Data Generator (KDG) を使用します。KDG では、データテンプレートを使用して、Kinesis ストリームにランダムなデータを送信します。KDG は簡単に使用できますが、データホットスポットや異常を検出するアプリケーションなど、データ項目間の複雑な関係をテストすることには適していません。

  • より複雑なデータを Kinesis データストリームに送信するには、カスタム Python アプリケーションを使用します。Python アプリケーションはホットスポットや異常など、データ項目間の複雑な関係を生成できます。データホットスポットにクラスター化されたデータを送信する Python アプリケーションの例については、「例 : ストリーム上のホットスポットの検出 (HOTSPOTS 関数)」を参照してください。

テストアプリケーションを実行するときは、コンソールでアプリケーション内ストリームを表示する代わりに、送信先 (Amazon Redshift データベースへの Firehose 配信ストリームなど) を使用して結果を表示します。コンソールに表示されるデータは、ストリームのサンプリングであり、すべてのレコードは含まれません。

スキーマ変更のテスト

アプリケーションの入力ストリームのスキーマを変更する場合は、テストアプリケーションを使用して、以下の条件が満たされていることを確認します。

  • ストリームからのデータが正しいデータ型に強制変換される。たとえば、日時データが文字列としてアプリケーションに取り込まれないことを確認します。

  • データが解析され、目的のデータ型に強制変換される。解析または強制変換エラーが発生した場合は、コンソールで表示できます。または、エラーストリームに送信先を割り当てて、送信先ストアでエラーを表示できます。

  • 文字データのデータフィールドが十分な長さで、アプリケーションが文字データを切り捨てない。送信先ストアのデータレコードで、アプリケーションデータが切り捨てられていないことを確認できます。

コード変更のテスト

SQL コードの変更のテストでは、アプリケーションのドメインに関する知識がいくらか必要です。テストする必要がある出力と、正しい出力について判断できる必要があります。アプリケーションの SQL コードを変更するときに確認する、問題の起きそうな分野については、「Amazon Kinesis Data Analytics for SQL Applications のトラブルシューティング」を参照してください。