使用管道活動預備資料和資料表 - AWS Data Pipeline

AWS Data Pipeline 不再提供給新客戶。現有客戶 AWS Data Pipeline 可繼續正常使用此服務。進一步了解

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用管道活動預備資料和資料表

AWS Data Pipeline 可以在您的管道中預備輸入和輸出資料,讓使用特定活動 (例如 ShellCommandActivityHiveActivity) 更為容易。

資料預備可讓您將資料從輸入資料節點複製到執行活動的資源,並且以相似的方式,從資源複製到輸出資料節點。

Amazon EMR 或 Amazon EC2 資源上的暫存資料可透過在活動的殼層命令或 Hive 指令碼中使用特殊變數來取得。

資料表預備與資料預備相似,其不同處在於其預備的資料會特別採取資料庫資料表的形式。

AWS Data Pipeline 支援下列預備案例:

  • 使用 ShellCommandActivity 進行資料預備

  • 使用 Hive 及支援預備的資料節點進行資料表預備

  • 使用 Hive 及不支援預備的資料節點進行資料表預備

注意

預備只有在活動 (例如 ShellCommandActivity) 上的 stage 欄位設為 true 時才能運作。如需詳細資訊,請參閱ShellCommandActivity

此外,資料節點和活動可以透過四種方式相關:

在資源上於本機預備資料

輸入資料會自動複製到資源的本機檔案系統。輸出資料會自動從資源的本機檔案系統複製到輸出資料節點。例如,當您設定 ShellCommandActivity 輸入和輸出,並設定 staging = true 時,輸入資料可透過 INPUTx_STAGING_DIR 取得,輸出資料則可透過 OUTPUTx_STAGING_DIR 取得,其中 x 是輸入和輸出的數字。

活動的預備輸入及輸出定義

輸入資料格式 (資料行名稱和資料表名稱) 會自動複製到活動的資源。例如,當您設定 HiveActivity,並設定 staging = true 時。輸入 S3DataNode 上指定的資料格式會用來從 Hive 資料表預備資料表定義。

未啟用預備

活動可以取得輸入和輸出物件及其欄位,但無法取得資料本身。例如,根據預設的 EmrActivity,或是當您以 staging = false 設定其他活動時。在此組態中,活動可透過使用 AWS Data Pipeline 表達式語法來取得資料欄位並參考他們,而這只會在滿足依存項目時才會發生。其用途僅只是檢查依存項目。活動中的程式碼會負責將資料從輸入複製到執行活動的資源。

物件之間的依存項目關係

兩個物件之間存在一種依存關係,這會在未啟用預備時導致類似的情況。這會使資料節點或活動做為執行另一個活動的先決條件。

資料暫存 ShellCommandActivity

考慮搭配 S3DataNode 物件做為資料輸入和輸出,使用 ShellCommandActivity 的案例。AWS Data Pipeline 會使用環境變數 ${INPUT1_STAGING_DIR}${OUTPUT1_STAGING_DIR} 自動預備資料節點,使其提供給殼層命令存取,就好像他們是本機檔案資料夾,如以下範例所示。名為 INPUT1_STAGING_DIROUTPUT1_STAGING_DIR 變數的數字部分,會根據您活動參考的資料節點數累加。

注意

此案例只有在您的資料輸入和輸出為 S3DataNode 物件時,才會以說明的方式運作。此外,只有在輸出 S3DataNode 物件上有設定 directoryPath 時,才允許輸出資料預備。

{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...

使用 Hive 及支援預備的資料節點進行資料表預備

考慮搭配 S3DataNode 物件做為資料輸入和輸出,使用 HiveActivity 的案例。AWS Data Pipeline 會使用變數 ${input1}${output1} 自動預備資料節點,使其提供給 Hive 指令碼存取,就好像他們是 Hive 資料表,如以下 HiveActivity 範例所示。名為 inputoutput 變數的數字部分,會根據您活動參考的資料節點數累加。

注意

此案例只有在您的資料輸入和輸出為 S3DataNodeMySqlDataNode 物件時,才會以說明的方式運作。DynamoDBDataNode 不支援資料表預備。

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...

使用 Hive 及不支援預備的資料節點進行資料表預備

考慮搭配 DynamoDBDataNode 做為資料輸入,S3DataNode 物件做為輸出,使用 HiveActivity 的案例。沒有可用的資料暫存DynamoDBDataNode,因此您必須先在 Hive 指令碼中使用變數名稱#{input.tableName}來參考 DynamoDB 表格,手動建立表格。如果 DynamoDB 表是輸出,則適用類似的命名法,除非您使用變數。#{output.tableName}預備可供此範例中的輸出 S3DataNode 物件使用,因此您可以以 ${output1} 參考輸出資料節點。

注意

在此範例中,資料表名稱變數具有 # (井字) 字元前綴,因為 AWS Data Pipeline 使用表達式來存取 tableNamedirectoryPath。如需表達式在 AWS Data Pipeline 中評估方式的詳細資訊,請參閱表達式評估

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...