Preparar dados e tabelas com atividades de pipeline - AWS Data Pipeline

Preparar dados e tabelas com atividades de pipeline

O AWS Data Pipeline pode preparar dados de entrada e saída nos pipelines para facilitar o uso de determinadas atividades, como ShellCommandActivity e HiveActivity.

A preparação de dados permite a você copiar os dados do nó de dados de entrada para o recurso que executa a atividade e, de maneira semelhante, do recurso para o nó de dados de saída.

Os dados preparados no recurso Amazon EMR ou Amazon EC2 são disponibilizados usando variáveis especiais nos comandos shell da atividade ou nos scripts do Hive.

A preparação da tabela é semelhante à preparação dos dados, exceto pelos dados preparados assumirem a forma de tabelas de banco de dados, mais especificamente.

O AWS Data Pipeline oferece suporte aos seguintes cenários de preparação:

  • Preparação de dados com ShellCommandActivity

  • Preparação da tabela com Hive e nós de dados compatíveis com preparação

  • Preparação da tabela com Hive e nós de dados incompatíveis com preparação

nota

A preparação funciona somente quando o campo stage é definido como true em uma atividade, como ShellCommandActivity. Para obter mais informações, consulte ShellCommandActivity.

Além disso, os nós de dados e as atividades podem estar relacionados de quatro maneiras:

Preparar dados localmente em um recurso

Os dados de entrada são copiados automaticamente para o sistema de arquivos local. Os dados de saída são copiados automaticamente do sistema de arquivos local para o nó de dados de saída. Por exemplo, quando você configura entradas e saídas ShellCommandActivity com staging = true, os dados de entrada são disponibilizados como INPUTx_STAGING_DIR e os dados de saída são disponibilizados como OUTPUTx_STAGING_DIR, em que x é o número de entrada ou saída.

Preparar definições de entrada e saída para uma atividade

O formato de dados de entrada (nomes de coluna e de tabela) é copiado automaticamente para o recurso da atividade. Por exemplo, quando você configura HiveActivity com staging = true. O formato de dados especificado na entrada S3DataNode é usado para preparar a definição da tabela Hive.

Preparação não ativada

Os objetos de entrada e saída e os campos estão disponíveis para a atividade, mas os dados não. Por exemplo, EmrActivity por padrão, ou quando você configura outras atividades com staging = false. Nessa configuração, os campos de dados estão disponíveis para a atividade fazer uma referência a eles usando a sintaxe da expressão do AWS Data Pipeline, e isso ocorre somente quando a dependência é atendida. Isso funciona somente como verificação de dependência. O código na atividade é responsável por copiar os dados da entrada para o recurso que executa a atividade.

Relação de dependência entre objetos

Existe uma relação de dependência entre dois objetos, o que resulta em uma situação semelhante a quando a preparação não está ativada. Isso faz um nó de dados ou uma atividade funcionar como uma precondição para a execução de outra atividade.

Preparação de dados com ShellCommandActivity

Considere um cenário que use um ShellCommandActivity com objetos S3DataNode como entrada e saída de dados. O AWS Data Pipeline prepara automaticamente os nós de dados para torná-los acessíveis ao comando shell como se fossem pastas de arquivos locais usando as variáveis de ambiente ${INPUT1_STAGING_DIR} e ${OUTPUT1_STAGING_DIR}, conforme mostrado no exemplo a seguir. A parte numérica das variáveis chamadas INPUT1_STAGING_DIR e OUTPUT1_STAGING_DIR é incrementada dependendo do número de nós de dados e das referências de atividade.

nota

Esse cenário funcionará somente conforme descrito se as entradas e as saídas de dados forem objetos S3DataNode. Além disso, a preparação de dados de saída é permitida somente quando directoryPath é definido no objeto S3DataNode de saída.

{ "id": "AggregateFiles", "type": "ShellCommandActivity", "stage": "true", "command": "cat ${INPUT1_STAGING_DIR}/part* > ${OUTPUT1_STAGING_DIR}/aggregated.csv", "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" } }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "filePath": "s3://my_bucket/source/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}/items" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://my_bucket/destination/#{format(@scheduledStartTime,'YYYY-MM-dd_HHmmss')}" } }, ...

Preparação da tabela com Hive e nós de dados compatíveis com preparação

Considere um cenário que use um HiveActivity com objetos S3DataNode como entrada e saída de dados. O AWS Data Pipeline prepara automaticamente os nós de dados para torná-los acessíveis ao script do Hive como se fossem tabelas do Hive usando as variáveis de ambiente ${input1} e ${output1}, conforme mostrado no exemplo a seguir para HiveActivity. A parte numérica das variáveis chamadas input e output é incrementada dependendo do número de nós de dados e das referências de atividade.

nota

Esse cenário funcionará somente conforme descrito se as entradas e as saídas de dados forem objetos S3DataNode ou MySqlDataNode. A preparação de tabelas não é compatível com DynamoDBDataNode.

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyInputData" }, "output": { "ref": "MyOutputData" }, "hiveScript": "INSERT OVERWRITE TABLE ${output1} select * from ${input1};" }, { "id": "MyInputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/input" } }, { "id": "MyOutputData", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...

Preparação da tabela com Hive e nós de dados incompatíveis com preparação

Considere um cenário que use um HiveActivity com DynamoDBDataNode como entrada de dados e um objeto S3DataNode como a saída. Nenhuma preparação de dados está disponível para DynamoDBDataNode. Por isso, você deve primeiro criar manualmente a tabela dentro do script do Hive usando o nome da variável #{input.tableName} para referenciar a tabela do DynamoDB. Uma nomenclatura semelhante se aplicará se a tabela do DynamoDB for a saída, exceto se você usar a variável #{output.tableName}. A preparação está disponível para o objeto S3DataNode de saída neste exemplo. Por isso, você pode se referir ao nó de dados de saída como ${output1}.

nota

Neste exemplo, a variável do nome da tabela tem o prefixo de caractere # (hash), pois o AWS Data Pipeline usa expressões para acessar o tableName ou directoryPath. Para obter mais informações sobre como a avaliação da expressão funciona no AWS Data Pipeline, consulte Avaliação de expressões.

{ "id": "MyHiveActivity", "type": "HiveActivity", "schedule": { "ref": "MySchedule" }, "runsOn": { "ref": "MyEmrResource" }, "input": { "ref": "MyDynamoData" }, "output": { "ref": "MyS3Data" }, "hiveScript": "-- Map DynamoDB Table SET dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com; SET dynamodb.throughput.read.percent = 0.5; CREATE EXTERNAL TABLE dynamodb_table (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "#{input.tableName}"); INSERT OVERWRITE TABLE ${output1} SELECT * FROM dynamodb_table;" }, { "id": "MyDynamoData", "type": "DynamoDBDataNode", "schedule": { "ref": "MySchedule" }, "tableName": "MyDDBTable" }, { "id": "MyS3Data", "type": "S3DataNode", "schedule": { "ref": "MySchedule" }, "directoryPath": "s3://test-hive/output" } }, ...