ストリーミングでのデータ処理 - Amazon EMR

ストリーミングでのデータ処理

Hadoop ストリーミングは、Hadoop に付属しているユーティリティの 1 つです。Java 以外の言語で MapReduce 実行可能ファイルを作成できるようになります。JAR ファイルの形式でストリーミングが実装されるため、Amazon EMR API から、またはコマンドラインから標準のJAR ファイルの用に実行できます。

このセクションでは、Amazon EMR でストリーミングを使用する方法について説明します。

注記

Apache Hadoop Streaming は独立したツールです。そのため、ここでは、その関数とパラメータすべてを説明するわけではありません。Hadoop Streaming の詳細については、http://hadoop.apache.org/docs/stable/hadoop-streaming/HadoopStreaming.html を参照してください。

Hadoop ストリーミングユーティリティの使用

このセクションでは、Hadoop のストリーミングユーティリティの使用方法について説明します。

1

自由にプログラム言語を選択して、実行可能なマッパーとリデューサーを作成します。

Hadoop のドキュメントの指示に従ってストリーミング実行ファイルを書いてください。プログラムは、標準出力による標準入出力データからの入力を読み取れるようにする必要があります。デフォルトでは、入出力の各行は 1 つのレコードに対応し、各行の最初のタブはキーと値を区切ります。

2

ローカルで実行ファイルをテストしてから、Amazon S3 へアップロードします。

3

Amazon EMR コマンドラインインターフェイスまたは Amazon EMR コンソールを使用してアプリケーションを実行します。

各マッパースクリプトは、 クラスター内の独立したプロセスとして実行されます。各リデューサー実行ファイルは、マッパー実行ファイルの出力をジョブフローによるデータ出力にします。

inputoutputmapper、および reducer パラメータは、ほとんどのストリーミングアプリケーションで必要とされます。以下の表は、これらのパラメータと他のオプションパラメータついて説明します。

パラメータ 説明 必須
-input

Amazon S3 上の入力データの場所。

型: 文字列

デフォルト: なし

制約: URI。プロトコルの指定がなければ、クラスターのデフォルトファイルシステムが使用されます。

Yes
-output

Amazon EMR が処理されたデータをアップロードする Amazon S3 上の場所。

型: 文字列

デフォルト: なし

制約: URI

デフォルト: 場所の指定がなければ、Amazon EMR は input で指定された場所にデータをアップロードします。

Yes
-mapper

マッパー実行ファイルの名前。

型: 文字列

デフォルト: なし

Yes
-reducer

リデューサー実行ファイルの名前。

型: 文字列

デフォルト: なし

Yes
-cacheFile

(主としてパフォーマンス向上のために)Hadoop がローカル作業ディレクトリにコピーするファイルが格納されている Amazon S3 の場所。

型: 文字列

デフォルト: なし

制約: [URI]#[作業ディレクトリに作成するシンボリックリンク名]

No
-cacheArchive

作業ディレクトリに抽出する JAR ファイル

型: 文字列

デフォルト: なし

制約: [URI]#[作業ディレクトリに作成するシンボリックリンクディレクトリ名

No
-combiner

結果の結合

型: 文字列

デフォルト: なし

制約: Java クラス名

No

次のコード例は、Python で作成されたマッパーの実行可能ファイルです。このスクリプトは WordCount サンプルアプリケーションの一部です。

#!/usr/bin/python import sys def main(argv): line = sys.stdin.readline() try: while line: line = line.rstrip() words = line.split() for word in words: print "LongValueSum:" + word + "\t" + "1" line = sys.stdin.readline() except "end of file": return None if __name__ == "__main__": main(sys.argv)