Amazon Kinesis Data Streams
開発者ガイド

チュートリアル: Amazon Kinesis Data Streams を使用したウェブトラフィックの可視化

このチュートリアルでは、Amazon Kinesis Data Streams の使用開始に役立つように、その重要な Kinesis Data Streams コンポーネントについて ストリームデータプロデューサーデータコンシューマーを中心に概説します。このチュートリアルでは、リアルタイムデータ分析の一般的ユースケース (「Amazon Kinesis Data Streams とは」で紹介) に基づいたサンプルアプリケーションを使用します。

このサンプルのウェブアプリケーションは、単純な JavaScript アプリケーションを使用して、スライドウィンドウにわたるトップ N 分析の結果を格納している DynamoDB テーブルをポーリングします。アプリケーションはこのデータを受け取り、結果を可視化します。

Kinesis Data Streams のデータ可視化サンプルアプリケーション

このチュートリアルのデータ可視化サンプルアプリケーションでは、Kinesis Data Streams を使用してリアルタイムでデータを取り込み分析する方法を示します。このサンプルアプリケーションは、さまざまな URL からのシミュレート上の閲覧者の数を Kinesis data stream に投入するデータプロデューサーを作成します。ストリームはそれらのデータレコードを受け取った順に保持します。データコンシューマーは、ストリームからこれらのレコードを取得し、特定の URL からの閲覧者の数を計算します。最後に、単純なウェブアプリケーションは計算結果をリアルタイムでポーリングし、計算結果を可視化します。

このサンプルアプリケーションは、ストリーム処理の一般的なユースケースとして、スライディングウィンドウ分析を10 秒間行います。先ほど示した可視化されたデータは、ストリームのスライディングウィンドウ分析の結果を反映したものであり、継続的に更新されてグラフとして表示されています。さらに、データコンシューマーはデータストリームに対してトップ N 分析を行って、上位 3 つの閲覧者を割り出します。その結果は、2 秒ごとに更新されてグラフのすぐ下に表として表示されます。

すばやく開始できるように、サンプルアプリケーションでは AWS CloudFormation を使用しています。AWS CloudFormation では、テンプレートを作成して、アプリケーションの実行に必要な AWS リソースおよび関連する依存関係やランタイムパラメータを記述できます。サンプルアプリケーションでは、テンプレートを使用してすべての必要なリソースをすばやく作成します。たとえば Amazon EC2 インスタンスで実行されるプロデューサーとコンシューマーのアプリケーションや、レコードの集計数を保存するための Amazon DynamoDB テーブルを作成します。

注記

サンプルアプリケーションの起動後、Kinesis Data Streams の使用に関するわずかな料金が発生します。サンプルアプリケーションでは、できるだけ AWS 無料利用枠 の対象となるリソースを使用します。このチュートリアルを終了したら、AWS リソースを削除して料金が発生しないようにしてください。詳細については、「ステップ 3: サンプルアプリケーションを削除する」を参照してください。

前提条件

このチュートリアルでは、Kinesis Data Streams のデータ可視化サンプルアプリケーションをセットアップして実行し、その結果を表示する手順を示します。サンプルアプリケーションを使用するには、最初に以下の作業をする必要があります。

ステップ 1: サンプルアプリケーションを起動する

AWS によって提供された AWS CloudFormation テンプレートを 使用してサンプルアプリケーションを起動します。このサンプルアプリケーションには、ランダムにレコードを生成して Kinesis data stream に送信するストリームライター、リソースに対する HTTPS リクエスト数をカウントするデータコンシューマー、およびストリーム処理データの出力を、継続的に更新されるグラフとして表示するウェブアプリケーションが含まれます。

アプリケーションを起動するには

  1. このチュートリアルの AWS CloudFormation テンプレートを開きます。

  2. [テンプレートの選択] ページに、テンプレートの URL が表示されます。[次へ] を選択します。

  3. [Specify Details (詳細の指定)] ページで、デフォルトのインスタンスタイプが t2.micro になっていることを確認します。ただし、T2 インスタンスは VPC が必要です。AWS アカウントにリージョンのデフォルト VPC がない場合は、[InstanceType] を m3.medium などの別のインスタンスタイプに変更する必要があります。[次へ] を選択します。

  4. [オプション] ページで、タグのキーとタグの値を任意で入力できます。このタグは、EC2 インスタンスなどのテンプレートによって作成されたリソースに追加されます。[次へ] を選択します。

  5. [Review (確認)] ページで、[I acknowledge that this template might cause AWS CloudFormation to create IAM resources (このテンプレートでは AWS CloudFormation によって IAM リソースが作成される場合があることを承認します)] を選択し、[Create (作成)] を選択します。

まず、ステータスを CREATE_IN_PROGRESS とする KinesisDataVisSample という名前のスタックが表示されます。スタックが作成されるまでに数分かかる場合があります。ステータスが CREATE_COMPLETE の場合、次のステップに進みます。ステータスが更新されない場合は、ページを更新してください。

ステップ 2: サンプルアプリケーションのコンポーネントを表示する

Kinesis Data Stream

ストリームは、大量のプロデューサーからリアルタイムでデータを取り込んで保存し、複数のコンシューマーに提供します。ストリームは、データレコードの順序付けられたシーケンスを意味します。ストリームの作成時に、ストリーム名とシャードの数を指定する必要があります。ストリームは 1 つまたは複数のシャードで構成されます。各シャードはデータレコードのグループです。

AWS CloudFormation は自動的にサンプルアプリケーションのストリームを作成します。AWS CloudFormation テンプレートのこのセクションは、CreateStream オペレーションで使用されるパラメータを示しています。

ストリームの詳細を表示するには

  1. [KinesisDataVisSample] スタックを選択します。

  2. [Outputs (出力)] タブで、URL のリンクを選択します。URL の形式は「http://ec2-xx-xx-xx-xx.compute-1.amazonaws.com」のようになります。

  3. アプリケーションスタックを作成し、データ解析グラフで表示する意味のあるデータにするには、10 分程度かかります。リアルタイムのデータ分析グラフは、[Kinesis Data Streams Data Visualization Sample] というタイトルの別のページに表示されます。このグラフは、10 秒間に参照元 URL から送信されたリクエストの数を表示し、1 秒ごとに更新されます。グラフのスパンは直近の 2 分間です。

    
							サンプルアプリケーションのデータ可視化

ストリームの詳細を表示するには

  1. https://console.aws.amazon.com/kinesis にある Kinesis コンソールを開きます。

  2. 名前に ( KinesisDataVisSampleApp-KinesisStream-[randomString]) のフォームがあるストリームを選択します。

  3. ストリームの詳細を表示するにはストリーム名を選択します。

  4. それらのグラフを見ると、データプロデューサーのアクティビティがストリームにレコードを投入し、データコンシューマーがストリームからデータを取得していることがわかります。

    
							サンプルアプリケーションのストリームモニタリンググラフ

データプロデューサー

データプロデューサーは Kinesis data stream にデータレコードを送信します。ストリームにデータを投入するために、プロデューサーはストリームに対して PutRecord オペレーションを呼び出します。

PutRecord の呼び出しごとに、ストリーム名とパーティションキーのほか、プロデューサーがストリームに追加するデータレコードが必要になります。ストリーム名により、レコードが存在することになるストリームが決まります。パーティションキーは、データレコードが追加されるストリーム内のシャードを決定するために使用されます。

使用するパーティションキーはアプリケーションのロジックによって異なります。パーティションキーの数は、通常、シャード数よりかなり多くなります。シャードに対して十分な数のパーティションキーがあることで、ストリームはデータレコードをストリーム内のシャードに均等に分配できます。

データデータプロデューサーは一般的な 6 つの URL を、2 シャード構成のストリームに投入された各レコードのパーティションキーとして使用します。これらの URL がシミュレート上のページ閲覧者を表します。HttpReferrerKinesisPutter コードの行 99~132 は Kinesis Data Streams にデータを送信します。3 つの必要なパラメータを PutRecord の呼び出し前に設定しています。パーティションキーを設定するために使用している pair.getResource により、HttpReferrerStreamWriter コードの行 85 ~ 92 で作成された 6 つの URL のいずれかがランダムに選択されます。

Kinesis Data Streams にデータを投入するデータプロデューサーとして使用できるのは、EC2 インスタンス、クライアントブラウザー、モバイルデバイスなどです。サンプルアプリケーションでは、データプロデューサーとそのデータコンシューマーとして同じ EC2 インスタンスを使用しています。一方、実際のシナリオでは、アプリケーションの各コンポーネントとして別々の EC2 インスタンスを使用することになります。以下の手順に従って、サンプルアプリケーションの EC2 インスタンスのデータを表示できます。

コンソールでインスタンスのデータを表示するには

  1. https://console.aws.amazon.com/ec2/) にある Amazon EC2 コンソールを開きます。

  2. ナビゲーションペインで、[インスタンス] を選択します。

  3. サンプルアプリケーション用に作成されたインスタンスを選択します。インスタンスが不明な場合、該当するインスタンスには KinesisDataVisSample の名前で始まるセキュリティグループがあります。

  4. [モニタリング] タブに、サンプルアプリケーションのデータプロデューサーとデータコンシューマーのリソース使用状況が表示されます。

データコンシューマー

データコンシューマーは Kinesis data stream 内のシャードからデータレコードを取得して処理します。各コンシューマーはそれぞれ特定のシャードからデータを読み取ります。コンシューマーは GetShardIterator および GetRecords オペレーションを使用してシャードからデータを取得します。

シャードイテレーターは、ストリームの位置とコンシューマーが読み取るシャードを表します。コンシューマーは、ストリームからのレコードの読み取りを開始したり、読み取り位置を変更したりするときは、このシャードイテレーターを取得します。シャードイテレーターを取得するには、ストリーム名、シャード ID、シャードイテレータ イプを提供する必要があります。シャードイテレーター型により、コンシューマーがストリームのどこから読み取りを開始するかを指定できます。たとえば、データがリアルタイムで到着する場合はストリームの先頭を指定できます。ストリームはレコードをバッチにまとめて返します。バッチのサイズは必要に応じて制限パラメータを使用して制御できます。

データコンシューマーは、アプリケーションの状態情報 (チェックポイントやワーカーシャードマッピングなど) を維持するためのテーブルを DynamoDB に作成します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。

データコンシューマーは最後の 2 秒間に特定の各 URL からの閲覧者のリクエストをカウントします。このタイプのリアルタイムアプリケーションはスライディングウィンドウにわたるトップ N 分析を採用しています。この例では、上位 N 個はページリクエスト数で上位 3 つの閲覧者であり、スライディングウィンドウは 2 秒です。これは、Kinesis Data Streams を使用した実際のデータ分析を示す、一般的な処理パターンです。この計算の結果は DynamoDB テーブルに保持されます。

Amazon DynamoDB テーブルを表示するには

  1. https://console.aws.amazon.com/dynamodb/ にある DynamoDB コンソールを開きます。

  2. ナビゲーションペインで、[Tables (テーブル)] を選択します。

  3. サンプルアプリケーションによって作成された 2 つのテーブルがあります。

    • KinesisDataVisSampleApp-KCLDynamoDBTable-[randomString]— は状態情報を管理します。

    • KinesisDataVisSampleApp-CountsDynamoDBTable-[randomString]— はスライディングウィンドウにわたりトップ N 分析を持続します。

  4. Select the KinesisDataVisSampleApp-KCLDynamoDBTable-[randomString] テーブル。テーブルには 2 つのエントリがあり、特定のシャード (leaseKey)、ストリーム内の位置 (checkpoint)、データを読み取るアプリケーション (leaseOwner) を示します。

  5. Select the KinesisDataVisSampleApp-CountsDynamoDBTable-[randomString] テーブル。データコンシューマーがスライディングウィンドウ分析の一部として計算した総閲覧者数 (referrerCounts) を確認できます。

Kinesis クライアントライブラリ (KCL)

コンシューマーアプリケーションは、Kinesis Client Library (KCL) を使用して、ストリームの並列処理を簡素化できます。KCL は、分散コンピューティングに関連する多くの複雑なタスクを処理します。たとえば、複数のインスタンス間での負荷分散、インスタンスの障害に対する応答、処理済みのレコードのチェックポイント作成、リシャーディングへの対応が挙げられます。KCL によって、レコード処理のロジックの記述に集中できます。

データコンシューマーは、読み取るストリーム内の位置を KCL に渡します。この例では、ストリームの先頭からの最新のデータを読み取るように指定しています。KCL はこの位置を使用して、コンシューマーに代わって GetShardIterator を呼び出します。コンシューマーコンポーネントは、IRecordProcessor という重要な KCL インターフェイスにより、レコードに対してどのような処理を行うかも KCL に指定します。KCL はコールコンシューマーに代わって GetRecords を呼び出し、IRecordProcessor により指定されたようにそれらのレコードを処理します。

  • HttpReferrerCounterApplication サンプルコードの行 92~98 は KCL を設定します。これは、データを読み取るストリーム内の位置の設定など、KCL の初期設定になります。

  • HttpReferrerCounterApplication サンプルコードの行 104〜108 は、IRecordProcessor という重要な KCL コンポーネントを使用してレコードを処理するためのロジックを KCL に通知します。

  • CountingRecordProcessor サンプルコードの行 186 ~ 203 は、IRecordProcessor を使用するトップ N 分析のためのカウントロジックを含んでいます。

ステップ 3: サンプルアプリケーションを削除する

サンプルアプリケーションは、アプリケーションの実行中に、シャードの使用料が発生する 2 つのシャードを作成します。AWS アカウントが請求し続けないように、サンプルアプリケーションを終了したら AWS CloudFormation スタックを削除してください。

アプリケーションリソースを削除するには

  1. https://console.aws.amazon.com/cloudformation で AWS CloudFormation コンソールを開きます。

  2. スタックを選択します。

  3. [アクション]、[Delete Stack (スタックの削除)] の順に選択します。

  4. 確認を求めるメッセージが表示されたら、[Yes, Delete] を選択します。

サンプルアプリケーションに関連付けられたリソースを AWS CloudFormation クリーンアップしている間、ステータスが [DELETE_IN_PROGRESS] に変わります。AWS CloudFormation がリソースのクリーンアップを終了したら、リストからスタックを削除します。

ステップ 4: 次のステップ