AWS Data Pipeline 不再提供給新客戶。現有客戶 AWS Data Pipeline 可繼續正常使用此服務。進一步了解
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用管道活動預備資料和資料表
AWS Data Pipeline 可以在您的管道中預備輸入和輸出資料,讓使用特定活動 (例如 ShellCommandActivity
和 HiveActivity
) 更為容易。
資料預備可讓您將資料從輸入資料節點複製到執行活動的資源,並且以相似的方式,從資源複製到輸出資料節點。
Amazon EMR 或 Amazon EC2 資源上的暫存資料可透過在活動的殼層命令或 Hive 指令碼中使用特殊變數來取得。
資料表預備與資料預備相似,其不同處在於其預備的資料會特別採取資料庫資料表的形式。
AWS Data Pipeline 支援下列預備案例:
-
使用
ShellCommandActivity
進行資料預備 -
使用 Hive 及支援預備的資料節點進行資料表預備
-
使用 Hive 及不支援預備的資料節點進行資料表預備
注意
預備只有在活動 (例如 ShellCommandActivity
) 上的 stage
欄位設為 true
時才能運作。如需詳細資訊,請參閱ShellCommandActivity。
此外,資料節點和活動可以透過四種方式相關:
- 在資源上於本機預備資料
-
輸入資料會自動複製到資源的本機檔案系統。輸出資料會自動從資源的本機檔案系統複製到輸出資料節點。例如,當您設定
ShellCommandActivity
輸入和輸出,並設定 staging = true 時,輸入資料可透過 INPUTx_STAGING_DIR 取得,輸出資料則可透過 OUTPUTx_STAGING_DIR 取得,其中 x 是輸入和輸出的數字。 - 活動的預備輸入及輸出定義
-
輸入資料格式 (資料行名稱和資料表名稱) 會自動複製到活動的資源。例如,當您設定
HiveActivity
,並設定 staging = true 時。輸入S3DataNode
上指定的資料格式會用來從 Hive 資料表預備資料表定義。 - 未啟用預備
-
活動可以取得輸入和輸出物件及其欄位,但無法取得資料本身。例如,根據預設的
EmrActivity
,或是當您以 staging = false 設定其他活動時。在此組態中,活動可透過使用 AWS Data Pipeline 表達式語法來取得資料欄位並參考他們,而這只會在滿足依存項目時才會發生。其用途僅只是檢查依存項目。活動中的程式碼會負責將資料從輸入複製到執行活動的資源。 - 物件之間的依存項目關係
-
兩個物件之間存在一種依存關係,這會在未啟用預備時導致類似的情況。這會使資料節點或活動做為執行另一個活動的先決條件。
資料暫存 ShellCommandActivity
考慮搭配 S3DataNode
物件做為資料輸入和輸出,使用 ShellCommandActivity
的案例。AWS Data Pipeline 會使用環境變數 ${INPUT1_STAGING_DIR}
和 ${OUTPUT1_STAGING_DIR}
自動預備資料節點,使其提供給殼層命令存取,就好像他們是本機檔案資料夾,如以下範例所示。名為 INPUT1_STAGING_DIR
和 OUTPUT1_STAGING_DIR
變數的數字部分,會根據您活動參考的資料節點數累加。
注意
此案例只有在您的資料輸入和輸出為 S3DataNode
物件時,才會以說明的方式運作。此外,只有在輸出 S3DataNode
物件上有設定 directoryPath
時,才允許輸出資料預備。
{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...
使用 Hive 及支援預備的資料節點進行資料表預備
考慮搭配 S3DataNode
物件做為資料輸入和輸出,使用 HiveActivity
的案例。AWS Data Pipeline 會使用變數 ${input1}
和 ${output1}
自動預備資料節點,使其提供給 Hive 指令碼存取,就好像他們是 Hive 資料表,如以下 HiveActivity
範例所示。名為 input
和 output
變數的數字部分,會根據您活動參考的資料節點數累加。
注意
此案例只有在您的資料輸入和輸出為 S3DataNode
或 MySqlDataNode
物件時,才會以說明的方式運作。DynamoDBDataNode
不支援資料表預備。
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...
使用 Hive 及不支援預備的資料節點進行資料表預備
考慮搭配 DynamoDBDataNode
做為資料輸入,S3DataNode
物件做為輸出,使用 HiveActivity
的案例。沒有可用的資料暫存DynamoDBDataNode
,因此您必須先在 Hive 指令碼中使用變數名稱#{input.tableName}
來參考 DynamoDB 表格,手動建立表格。如果 DynamoDB 表是輸出,則適用類似的命名法,除非您使用變數。#{output.tableName}
預備可供此範例中的輸出 S3DataNode
物件使用,因此您可以以 ${output1}
參考輸出資料節點。
注意
在此範例中,資料表名稱變數具有 # (井字) 字元前綴,因為 AWS Data Pipeline 使用表達式來存取 tableName
或 directoryPath
。如需表達式在 AWS Data Pipeline 中評估方式的詳細資訊,請參閱表達式評估。
{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...