CopyActivity - AWS Data Pipeline

CopyActivity

​ある場所から別の場所にデータをコピーします。CopyActivity は入出力として S3DataNodeSqlDataNode をサポートし、コピーオペレーションは通常、レコード単位で実行されます。ただし、以下のすべての条件が満たされた場合、CopyActivity は Amazon S3 から Amazon S3 への高速コピーを実行します。

  • 入力と出力が S3DataNodes であること

  • dataFormat フィールドが入力と出力で同じであること

入力として圧縮データファイルを指定した場合に、それを S3 データノードの compression フィールドを使用して示していなければ、CopyActivity は失敗することがあります。この場合、CopyActivity がレコード終了文字を適切に検出できないために、操作が失敗しています。さらに、CopyActivity では、ディレクトリから別のディレクトリへのコピー、および、ファイルのディレクトリへのコピーがサポートされますが、ディレクトリをファイルにコピーする際はレコード単位のコピーが実行されます。また、CopyActivity では、マルチパートの Amazon S3 ファイルのコピーはサポートされません。

CopyActivity には CSV サポートに関して特定の制限があります。CopyActivity の入力として S3DataNode を使用するとき、Amazon S3 入出力フィールドに使用できるのは、Unix/Linux の CSV データファイル形式のみです。この Unix/Linux 形式では、次の条件が満たされる必要があります。

  • 区切り文字はカンマ(,)文字です。

  • レコードが引用符で囲まれることはありません。

  • デフォルトのエスケープ文字は、ASCII 値 92(バックスラッシュ)です。

  • レコード終了識別子は、ASCII 値 10("\n")です。

Windows ベースのシステムでは、通常、復帰と改行の連続(ASCII 値 13 および ASCII 値 10)という別のレコード終了文字が使用されています。入力データを変更するコピー前スクリプトなどを利用して、この違いを吸収し、CopyActivity が適切にレコード終端を検出できるようにする必要があります。そうしないと、CopyActivity が繰り返し失敗します。

CopyActivity を使用して PostgreSQL RDS オブジェクトから TSV データ形式にエクスポートする場合、デフォルトの NULL 文字は \n です。

以下は、このオブジェクト型の例です。このオブジェクトは、同じパイプライン定義ファイルで定義した他のオブジェクトを 3 つ参照します。CopyPeriodSchedule オブジェクトで、InputDataOutputData はデータノードオブジェクトです。

{ "id" : "S3ToS3Copy", "type" : "CopyActivity", "schedule" : { "ref" : "CopyPeriod" }, "input" : { "ref" : "InputData" }, "output" : { "ref" : "OutputData" }, "runsOn" : { "ref" : "MyEc2Resource" } }

Syntax

オブジェクト呼び出しフィールド 説明 スロットタイプ
schedule このオブジェクトは、スケジュール期間の実行中に呼び出されます。ユーザーは、このオブジェクトの依存関係の実行順序を設定するには、別のオブジェクトへのスケジュール参照を指定する必要があります。ユーザーは、オブジェクトでスケジュールを明示的に設定して、この要件を満たすことができます。たとえば、"schedule": {"ref": "DefaultSchedule"} と指定します。ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。または、パイプラインにスケジュールのツリー (マスタースケジュール内のスケジュール) がある場合、ユーザーは、スケジュール参照がある親オブジェクトを作成することができます。オプションのスケジュール設定の例については、「https://docs.aws.amazon.com/datapipeline/latest/DeveloperGuide/dp-object-schedule.html」を参照してください。 参照オブジェクト ("schedule":{"ref":"myScheduleId"} など)
必須のグループ (次のいずれかが必要です) 説明 スロットタイプ
runsOn アクティビティまたはコマンドを実行するコンピューティングリソース。たとえば、Amazon EC2 インスタンスまたは Amazon EMR クラスター。 参照オブジェクト ("runsOn":{"ref":"myResourceId"} など)
workerGroup ワーカーグループ。これはルーティングタスクに使用されます。runsOn 値を指定して、workerGroup がある場合、workerGroup は無視されます。 文字列

オプションのフィールド 説明 スロットタイプ
attemptStatus リモートアクティビティから最も最近報告されたステータス。 文字列
attemptTimeout リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 [Period] (期間)
dependsOn 実行可能な別のオブジェクトで依存関係を指定します。 参照オブジェクト ("dependsOn":{"ref":"myActivityId"} など)
failureAndRerunMode 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 一覧表
input 入力データソース。 参照オブジェクト ("input":{"ref":"myDataNodeId"} など)
lateAfterTimeout オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 [Period] (期間)
maxActiveInstances コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 整数
maximumRetries 失敗時の最大再試行回数 整数
onFail 現在のオブジェクトが失敗したときに実行するアクション。 参照オブジェクト (onFail:{"ref":"myActionId"} など)
onLateAction オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 参照オブジェクト ("onLateAction":{"ref":"myActionId"} など)
onSuccess 現在のオブジェクトが成功したときに実行するアクション。 参照オブジェクト ("onSuccess":{"ref":"myActionId"} など)
output 出力データソース。 参照オブジェクト ("output":{"ref":"myDataNodeId"} など)
parent スロットの継承元となる現在のオブジェクトの親。 参照オブジェクト ("parent":{"ref":"myBaseObjectId"} など)
pipelineLogUri パイプラインのログをアップロードするための S3 URI (s3://BucketName/Key/ など)。 文字列
precondition オプションで前提条件を定義します。すべての前提条件を満たすまで、データノードは "READY" とマークされません。 参照オブジェクト ("precondition":{"ref":"myPreconditionId"} など)
reportProgressTimeout reportProgress へのリモート作業の連続した呼び出しのタイムアウト。設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。 [Period] (期間)
retryDelay 2 回の再試行の間のタイムアウト期間。 [Period] (期間)
scheduleType スケジュールタイプによって、パイプライン定義のオブジェクトを、期間の最初にスケジュールするか、最後にスケジュールするかを指定できます。[Time Series Style Scheduling] は、インスタンスが各間隔の最後にスケジュールされることを意味し、[Cron Style Scheduling] は、インスタンスが各間隔の最初にスケジュールされることを意味します。オンデマンドスケジュールにより、アクティベーションごとに 1 回パイプラインを実行することができます。つまり、パイプラインを再実行するために、クローンしたり再作成したりする必要はありません。オンデマンドスケジュールを使用する場合は、デフォルトオブジェクトで指定し、パイプラインのオブジェクトに対して指定される唯一の scheduleType である必要があります。オンデマンドパイプラインを使用するには、それ以降の実行ごとに、ActivatePipeline オペレーションを呼び出すだけです。値は、cron、ondemand、および timeseries です。 一覧表

実行時フィールド 説明 スロットタイプ
@activeInstances 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 参照オブジェクト ("activeInstances":{"ref":"myRunnableObjectId"} など)
@actualEndTime このオブジェクトの実行が終了した時刻。 DateTime
@actualStartTime このオブジェクトの実行が開始された時刻。 DateTime
cancellationReason このオブジェクトがキャンセルされた場合の cancellationReason。 文字列
@cascadeFailedOn オブジェクトが失敗した際の依存関係チェーンの説明。 参照オブジェクト ("cascadeFailedOn":{"ref":"myRunnableObjectId"} など)
emrStepLog EMR アクティビティの試行でのみ使用可能な EMR ステップログ 文字列
errorId このオブジェクトが失敗した場合は errorId。 文字列
errorMessage このオブジェクトが失敗した場合は errorMessage。 文字列
errorStackTrace このオブジェクトが失敗した場合は、エラースタックトレース。 文字列
@finishedTime このオブジェクトが実行を終了した時刻。 DateTime
hadoopJobLog EMR ベースのアクティビティで試みることができる Hadoop ジョブのログ。 文字列
@healthStatus 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 文字列
@healthStatusFromInstanceId 終了状態に達した最後のインスタンスオブジェクトの ID。 文字列
@healthStatusUpdatedTime ヘルス状態が最後に更新された時間。 DateTime
hostname タスクの試行を取得したクライアントのホスト名。 文字列
@lastDeactivatedTime このオブジェクトが最後に非アクティブ化された時刻。 DateTime
@latestCompletedRunTime 実行が完了した最後の実行の時刻。 DateTime
@latestRunTime 実行がスケジュールされた最後の実行の時刻。 DateTime
@nextRunTime 次回にスケジュールされた実行の時刻。 DateTime
reportProgressTime リモートアクティビティで進捗状況が報告された最新の時刻。 DateTime
@scheduledEndTime オブジェクトの予定された終了時刻 DateTime
@scheduledStartTime オブジェクトの予定された開始時刻 DateTime
@status このオブジェクトのステータス。 文字列
@version オブジェクトが作成されたパイプラインのバージョン。 文字列
@waitingOn このオブジェクトが待機している依存関係のリストの説明。 参照オブジェクト ("waitingOn":{"ref":"myRunnableObjectId"} など)

システムフィールド 説明 スロットタイプ
@error 形式が正しくないオブジェクトを説明するエラー 文字列
@pipelineId このオブジェクトが属するパイプラインの ID 文字列
@sphere オブジェクトの球は、ライフサイクルにおける場所を示します。コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します 文字列

以下の資料も参照してください。