ステップ 3.3: リアルタイム分析を追加する (アプリケーションコードの追加) - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ステップ 3.3: リアルタイム分析を追加する (アプリケーションコードの追加)

アプリケーション内ストリームに対して独自の SQL クエリを作成することもできますが、以下のステップではサンプルコードを提供するテンプレートの 1 つを使用します。

  1. アプリケーションハブページで、[Go to SQL editor] を選択します。

    
                            [SQL エディタを表示] ボタンのあるサンプルアプリケーションページのスクリーンショット。
  2. 「"ExampleApp?」の実行を開始するダイアログボックスではい、アプリケーションを起動しますを選択します。

    コンソールでアプリケーションの開始リクエストが送信され (「StartApplication」を参照)、SQL エディタページが表示されます。

  3. コンソールに SQL エディタページが開きます。ボタン ([Add SQL from templates]、[Save and run SQL]) およびさまざまなタブを含むページを確認します。

  4. SQL エディタで、[Add SQL from templates] を選択します。

  5. 使用可能なテンプレートの一覧から、[Continuous filter] を選択します。次のように、サンプルコードは 1 つのアプリケーション内ストリームからデータを読み取り (WHERE 句は行をフィルタします)、別のアプリケーション内ストリームに挿入します。

    • アプリケーション内ストリーム DESTINATION_SQL_STREAM を作成します。

    • ポンプ (STREAM_PUMP) を作成し、これを使用して SOURCE_SQL_STREAM_001 から行から選択して、DESTINATION_SQL_STREAM に挿入します。

  6. [Add this SQL to editor] を選択します。

  7. 次のように、アプリケーションコードをテストします。

    アプリケーションが既に開始されていることを忘れないでください (ステータスは RUNNING です)。従って、Amazon Kinesis Data Analytics はすでにストリーミングソースから継続的に読み取り、アプリケーション内ストリーム SOURCE_SQL_STREAM_001 に行を追加しています。

    1. SQL エディタで、[Save and run SQL] を選択します。コンソールはまず更新リクエストを送信してアプリケーションコードを保存します。その後、コードは継続実行されます。

    2. 結果は [Real-time analytics] に表示されます。

      
                                    リアルタイム解析タブに結果が表示された SQL エディタのスクリーンショット。

      SQL エディタには次のタブがあります。

      • [Source data] タブに、ストリーミングソースにマッピングされたアプリケーション内入力ストリームが表示されます。アプリケーション内ストリームを選択すると、データが入力されます。入力設定で指定されていないアプリケーション内入力ストリームの追加列を書き留めてください。これらには以下のタイプスタンプ列が含まれます。

         

        • ROWTIME – アプリケーション内ストリームの各行には、ROWTIME という特殊な列があります。この列は、Amazon Kinesis Data Analytics が最初のアプリケーション内ストリーム (ストリーミングソースにマッピングされたアプリケーション内入力ストリーム) に行を挿入したときのタイムスタンプです。

           

        • Approximate_Arrival_Time – 各 Kinesis レコードには、Approximate_Arrival_Time という値が含まれています。この値は、ストリーミングソースが正常にレコードを受信して保存したときに設定されるおおよその到達タイムスタンプです。Kinesis Data Analytics がストリーミングソースからレコードを読み取る際に、この列をアプリケーション内入力ストリームにフェッチします。

        これらのタイムスタンプ値は時間ベースのウィンドウクエリで役に立ちます。詳細については、「ウィンドウクエリ」を参照してください。

         

      • [Real-time analytics] タブには、アプリケーションコードによって作成された他のすべてのアプリケーション内ストリームが表示されます。また、エラーストリームも含まれます。Kinesis Data Analytics では、処理できない行はエラーストリームに送信されます。詳細については、「エラー処理」を参照してください。

         

        DESTINATION_SQL_STREAM を選択して、アプリケーションコードが挿入した行を表示します。アプリケーションコードで作成されなかった追加列を書き留めてください。この列には、ROWTIME タイムスタンプ列が含まれます。Kinesis Data Analytics は、単純にこれらの値をソース (SOURCE_SQL_STREAM_001) からコピーします。

         

      • [宛先] タブには、Kinesis Data Analytics がクエリ結果を書き込む外部宛先が表示されます。まだアプリケーション出力用の外部宛先は設定されていません。

次のステップ

ステップ 3.4: (オプション) アプリケーションコードを更新する