Kinesis - Amazon EMR

Kinesis

Amazon EMR クラスターは、Hive、Pig、MapReduce、Hadoop Streaming API、Cascading などの Hadoop エコシステムで使い慣れたツールを使用して、Amazon Kinesis Streams を直接読み込み、処理することができます。また、実行しているクラスターで Amazon Kinesis のリアルタイムデータを Amazon S3、Amazon DynamoDB、および HDFS の既存データと結合できます。後処理のアクティビティとして、Amazon EMR から Amazon S3 または DynamoDB に直接データをロードできます。Amazon Kinesis の主なサービスと料金については、「Amazon Kinesis」を参照してください。

Amazon EMR と Amazon Kinesis の統合で何ができますか?

Amazon EMR と Amazon Kinesis の統合により、以下のようなシナリオへの対応が非常に容易になります。

  • ストリーミングログ分析 – ストリーミングのウェブログを分析して、リージョン別、ブラウザ別、およびアクセスドメイン別に、数分ごとの上位 10 件のエラータイプのリストを生成できます。

  • カスタマーエンゲージメント – Amazon Kinesis のクリックストリームデータと DynamoDB テーブルに保存されている広告キャンペーン情報を結合するクエリを作成し、特定のウェブサイトに表示される最も効果的な広告カテゴリを特定できます。

  • アドホックインタラクティブクエリ – Amazon Kinesis Streams から HDFS に定期的にデータを読み込み、ローカルの Impala テーブルとして使用可能にすることで、高速かつインタラクティブな分析クエリを実行できます。

Amazon Kinesis Streams のチェックポイントの分析

ユーザーは、Amazon Kinesis Streams の定期的なバッチ分析をいわゆる反復で実行できます。Amazon Kinesis Streams のデータレコードはシーケンス番号を使用して取得されるため、反復の境界は Amazon EMR によって DynamoDB テーブルに格納される開始と終了のシーケンス番号で定義されます。たとえば、iteration0 が終了すると、終了のシーケンス番号が DynamoDB に格納されるため、iteration1 ジョブが開始されたとき、ストリームからそれに続くデータを取得できます。このストリームデータの反復のマッピングはチェックポイントと呼ばれます。詳細については、「Kinesis コネクター」を参照してください。

反復にチェックポイントが設定された後に、ジョブの反復処理が失敗した場合、Amazon EMR では、その反復のレコード処理が再試行されます。

チェックポイントは、次のことが可能になる機能です。

  • 同じストリームと論理名で実行した前のクエリにより処理された連続番号の後でデータ処理を開始します

  • 前のクエリで処理された Kinesis のデータと同じバッチを再処理します

チェックポイントを有効にするには、スクリプトで kinesis.checkpoint.enabled パラメータを true に設定します。また、以下のパラメータを設定します。

構成設定 説明
kinesis.checkpoint.metastore.table.name チェックポイント情報が保存される DynamoDB テーブル名
kinesis.checkpoint.metastore.hash.key.name DynamoDB テーブルのハッシュキー名
kinesis.checkpoint.metastore.hash.range.name DynamoDB テーブルの範囲キー名
kinesis.checkpoint.logical.name 現在の処理の論理名
kinesis.checkpoint.iteration.no 論理名に関連付けられている処理の反復番号
kinesis.rerun.iteration.without.wait 失敗した反復がタイムアウトを待たずに再実行できるかどうかを示すブール値(デフォルトは false)。

Amazon DynamoDB テーブルのプロビジョニングされた IOPS に関する推奨事項

Amazon Kinesis 用の Amazon EMR コネクターは、チェックポイントのメタデータの補助として DynamoDB データベースを使用します。Amazon Kinesis Streams のデータをチェックポイントの間隔で Amazon EMR クラスターで使用する前に、DynamoDB のテーブルを作成する必要があります。テーブルは Amazon EMR クラスターと同じリージョンに存在する必要があります。以下は、DynamoDB テーブルにプロビジョニングする必要がある IOPS の数に関する一般的な推奨です。j は同時に実行できる Hadoop ジョブ (異なる論理名 + 反復数の組み合わせ) の最大数で、s はジョブが処理するシャードの最大数です。

読み込みキャパシティーユニット: j*s/5

書き込みキャパシティーユニット: j*s

パフォーマンスに関する考慮事項

Amazon Kinesis シャードスループットは、Amazon EMR クラスターのノードのインスタンスのサイズ、およびストリームのレコードのサイズに正比例しています。マスターノードやコアノードで m5.xlarge かそれ以上のインスタンスを使用することをお勧めします。

Amazon EMR で Amazon Kinesis 分析をスケジュールする

任意の繰り返しについてタイムアウトと最大期間で制限される、アクティブな Amazon Kinesis Streams でデータを分析するときは、ストリームから定期的に詳細を取得するために、分析を頻繁に実行することが重要です。定期的な間隔でこのようなスクリプトおよびクエリを実行する方法は複数ありますが、これらのような反復タスクには、AWS Data Pipeline を使用することをお勧めします。詳細については、「AWS Data Pipeline 開発者ガイド」の「AWS Data Pipeline PigActivity」および「AWS Data Pipeline HiveActivity」を参照してください。