パイプラインのアクティビティによるデータとテーブルのステージング - AWS Data Pipeline

パイプラインのアクティビティによるデータとテーブルのステージング

AWS Data Pipeline では、パイプラインの入出力データをステージングすることで、ShellCommandActivityHiveActivity などの特定のアクティビティの使用が容易になります。

データのステージングを使用して、入力データノードからアクティビティを実行するリソースにデータをコピーできます。同様に、リソースから出力データノードにデータをコピーすることもできます。

Amazon EMR または Amazon EC2 のリソースでステージングされたデータは、アクティビティのシェルコマンドまたは Hive スクリプトで特別な変数を用いることで使用可能です。

テーブルのステージングはデータのステージングと似ていますが、具体的には、ステージングされたデータがデータベーステーブルの形式である点が異なります。

AWS Data Pipeline では、以下のシナリオのステージングがサポートされます。

  • ShellCommandActivity によるデータのステージング

  • Hive およびステージングをサポートするデータノードによるテーブルのステージング

  • Hive およびステージングをサポートしないデータノードによるテーブルのステージング

注記

ステージングは、ShellCommandActivity などのアクティビティで stage フィールドが true に設定されている場合にのみ機能します。詳細については、「ShellCommandActivity」を参照してください。

また、データノードとアクティビティの関係には、次の 4 種類があります。

リソースでローカルにデータをステージング

入力データはリソースのローカルファイルシステムに自動的にコピーされます。出力データはリソースのローカルファイルシステムから出力データノードに自動的にコピーされます。たとえば、ShellCommandActivity の入出力で staging = true に設定すると、入力データは INPUTx_STAGING_DIR、出力データは OUTPUTx_STAGING_DIR として使用可能になります (x は入力または出力の番号です)。

アクティビティの入出力定義のステージング

入力データ形式(列名、テーブル名)がアクティビティのリソースに自動的にコピーされます。たとえば、HiveActivity で staging = true に設定する場合です。Hive テーブルからテーブル定義をステージングするために、入力 S3DataNode で指定されたデータ形式が使用されます。

ステージングが有効ではない

アクティビティで入出力オブジェクトとそのフィールドは使用できますが、データそのものは使用できません。たとえば、デフォルトが EmrActivity であるか、それ以外のアクティビティで staging = false と設定する場合です。この設定では、AWS Data Pipeline の式構文を使用してデータフィールドを参照するためにアクティビティでデータフィールドを使用できますが、これは依存関係が満たされている場合にのみ参照されます。これは、依存関係のチェックとしてのみ機能します。アクティビティを実行するリソースに対する入力からデータをコピーする責任は、アクティビティのコードにあります。

オブジェクト間の依存関係

2 個のオブジェクト間に依存関係があり、結果としてステージングが有効でない場合と同じような状況になります。このため、データノードまたはアクティビティは、別のアクティビティを実行するための前提条件として機能します。

ShellCommandActivity によるデータのステージング

ShellCommandActivity でデータ入出力として S3DataNode オブジェクトを使用するシナリオを検討します。AWS Data Pipeline はデータノードを自動的にステージングし、以下の例に示すように、それらのデータノードがローカルファイルフォルダであるかのように環境変数 ${INPUT1_STAGING_DIR} および ${OUTPUT1_STAGING_DIR} を使用してシェルコマンドにアクセスできるようにします。INPUT1_STAGING_DIR という名前の変数の数値部分と、アクティビティが参照するデータノード数に応じた OUTPUT1_STAGING_DIR 増分。

注記

このシナリオは、記述されているように、データ入出力が S3DataNode オブジェクトの場合にのみ機能します。さらに、出力データのステージングは、出力 S3DataNodedirectoryPath が設定されている場合にのみ許可されます。

{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...

Hive およびステージングをサポートするデータノードによるテーブルのステージング

HiveActivity でデータ入出力として S3DataNode オブジェクトを使用するシナリオを検討します。AWS Data Pipeline はデータノードを自動的にステージングし、以下の例の HiveActivity に示すように、それらのデータノードが変数 ${input1}${output1} を使用する Hive テーブルであるかのように Hive スクリプトにアクセスできるようにします。input という名前の変数の数値部分と、アクティビティが参照するデータノード数に応じた output 増分。

注記

このシナリオは、記述されているように、データ入出力が S3DataNode オブジェクトまたは MySqlDataNode オブジェクトの場合にのみ機能します。テーブルのステージングは DynamoDBDataNode ではサポートされません。

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...

Hive およびステージングをサポートしないデータノードによるテーブルのステージング

HiveActivity で入力データとして DynamoDBDataNode、出力として S3DataNode を使用するシナリオを検討します。DynamoDBDataNode ではデータのステージングは使用できないため、まず、DynamoDB テーブルを参照するために変数名 #{input.tableName} を使用して、Hive スクリプト内でテーブルを手動で作成する必要があります。この DynamoDB テーブルが出力となる場合も同様の命名法が適用されますが、変数が #{output.tableName} である点が異なります。この例の出力 S3DataNode オブジェクトではステージングを使用できるため、出力データノードを ${output1} として参照できます。

注記

AWS Data Pipeline では tableName または directoryPath にアクセスするために式を使用するため、この例では、テーブル名変数に #(ハッシュ)文字プレフィックスが付きます。AWS Data Pipeline で式の評価がどのように機能するかの詳細については、「式の評価」を参照してください。

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...