RedshiftCopyActivity - AWS Data Pipeline

RedshiftCopyActivity

DynamoDB または Amazon S3 から Amazon Redshift にデータをコピーします。新しいテーブルにデータをロードすることも、既存のテーブルにデータを簡単にマージすることもできます。

以下に、RedshiftCopyActivity を使用するユースケースの概要を示します。

  1. AWS Data Pipeline を使用して Amazon S3 でデータをステージングすることで開始します。

  2. RedshiftCopyActivity を使用して、Amazon RDS および Amazon EMR から Amazon Redshift にデータを移動します。

    これにより、Amazon Redshift にデータをロードして分析を行うことができます。

  3. SqlActivity を使用して、Amazon Redshift にロードしたデータに対する SQL クエリを実行します。

さらに、RedshiftCopyActivity はマニフェストファイルをサポートするため、S3DataNode を操作できます。詳細については、「S3DataNode」を参照してください。

以下は、このオブジェクト型の例です。

この例では、変換形式を確認するために EMPTYASNULL および IGNOREBLANKLINES 特殊変換パラメータを commandOptions で使用します。詳細については、Amazon Redshift データベース開発者ガイドデータ変換パラメータを参照してください。

{ "id" : "S3ToRedshiftCopyActivity", "type" : "RedshiftCopyActivity", "input" : { "ref": "MyS3DataNode" }, "output" : { "ref": "MyRedshiftDataNode" }, "insertMode" : "KEEP_EXISTING", "schedule" : { "ref": "Hour" }, "runsOn" : { "ref": "MyEc2Resource" }, "commandOptions": ["EMPTYASNULL", "IGNOREBLANKLINES"] }

以下のパイプライン定義の例では、APPEND 挿入モードを使用するアクティビティを示しています。

{ "objects": [ { "id": "CSVId1", "name": "DefaultCSV1", "type": "CSV" }, { "id": "RedshiftDatabaseId1", "databaseName": "dbname", "username": "user", "name": "DefaultRedshiftDatabase1", "*password": "password", "type": "RedshiftDatabase", "clusterId": "redshiftclusterId" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "id": "RedshiftDataNodeId1", "schedule": { "ref": "ScheduleId1" }, "tableName": "orders", "name": "DefaultRedshiftDataNode1", "createTableSql": "create table StructuredLogs (requestBeginTime CHAR(30) PRIMARY KEY DISTKEY SORTKEY, requestEndTime CHAR(30), hostname CHAR(100), requestDate varchar(20));", "type": "RedshiftDataNode", "database": { "ref": "RedshiftDatabaseId1" } }, { "id": "Ec2ResourceId1", "schedule": { "ref": "ScheduleId1" }, "securityGroups": "MySecurityGroup", "name": "DefaultEc2Resource1", "role": "DataPipelineDefaultRole", "logUri": "s3://myLogs", "resourceRole": "DataPipelineDefaultResourceRole", "type": "Ec2Resource" }, { "id": "ScheduleId1", "startDateTime": "yyyy-mm-ddT00:00:00", "name": "DefaultSchedule1", "type": "Schedule", "period": "period", "endDateTime": "yyyy-mm-ddT00:00:00" }, { "id": "S3DataNodeId1", "schedule": { "ref": "ScheduleId1" }, "filePath": "s3://datapipeline-us-east-1/samples/hive-ads-samples.csv", "name": "DefaultS3DataNode1", "dataFormat": { "ref": "CSVId1" }, "type": "S3DataNode" }, { "id": "RedshiftCopyActivityId1", "input": { "ref": "S3DataNodeId1" }, "schedule": { "ref": "ScheduleId1" }, "insertMode": "APPEND", "name": "DefaultRedshiftCopyActivity1", "runsOn": { "ref": "Ec2ResourceId1" }, "type": "RedshiftCopyActivity", "output": { "ref": "RedshiftDataNodeId1" } } ] }

APPEND オペレーションは、プライマリキーまたはソートキーにかかわらず、テーブルに項目を追加します。たとえば、以下のテーブルがあるとすると、同じ ID とユーザー値のレコードを追加できます。

ID(PK) USER 1 aaa 2 bbb

以下のように、同じ ID とユーザー値のレコードを追加できます。

ID(PK) USER 1 aaa 2 bbb 1 aaa
注記

APPEND オペレーションが中断されて再試行される場合、結果として再実行されるパイプラインは、最初から追加される可能性があります。これにより、重複するレコードが追加される可能性があるため、行数をカウントするロジックがある場合は、この動作に注意する必要があります。

チュートリアルについては、「AWS Data Pipeline を使用した Amazon Redshift へのデータのコピー」を参照してください。

Syntax

必須フィールド 説明 スロットタイプ
insertMode

ターゲットテーブルの既存データが、ロードするデータ行と重複している場合、AWS Data Pipeline がどのように処理するかを決定します。

有効な値は、KEEP_EXISTINGOVERWRITE_EXISTINGTRUNCATEAPPEND です。

KEEP_EXISTING を指定すると、既存の行を変更することなく、新しい行がテーブルに追加されます。

KEEP_EXISTING および OVERWRITE_EXISTING はプライマリキー、ソート、およびディストリビューションキーを使用して、既存の行と一致する受信行を識別します。Amazon Redshift データベース開発者ガイド新しいデータの更新と挿入を参照してください。

TRUNCATE は、ターゲットテーブルのデータをすべて削除した後、新しいデータを書き込みます。

APPEND は Redshift テーブルの末尾にすべてのレコードを追加します。APPEND にはプライマリキーも、分散キーも、ソートキーも不要なため、重複する項目が追加される可能性があります。

一覧表

オブジェクト呼び出しフィールド 説明 スロットタイプ
schedule

このオブジェクトは、スケジュール期間の実行中に呼び出されます。

このオブジェクトの依存関係の実行順序を設定するために、別のオブジェクトへのスケジュール参照を指定します。

ほとんどの場合、すべてのオブジェクトがそのスケジュールを継承するように、スケジュール参照をデフォルトのパイプラインオブジェクトに配置することをお勧めします。たとえば、"schedule": {"ref": "DefaultSchedule"} を指定することで、オブジェクトでスケジュールを明示的に設定できます。

パイプラインのマスタースケジュールにネストされたスケジュールがある場合、スケジュール参照がある親オブジェクトを作成します。

オプションのスケジュール設定の例については、「スケジュール」を参照してください。

参照オブジェクト ( "schedule":{"ref":"myScheduleId"} など)

必須のグループ (次のいずれかが必要です) 説明 スロットタイプ
runsOn アクティビティまたはコマンドを実行するコンピューティングリソース。たとえば、Amazon EC2 インスタンスまたは Amazon EMR クラスター。 参照オブジェクト ("runsOn":{"ref":"myResourceId"} など)
workerGroup ワーカーグループ。これはルーティングタスクに使用されます。runsOn 値を指定して、workerGroup がある場合、workerGroup は無視されます。 文字列

オプションのフィールド 説明 スロットタイプ
attemptStatus リモートアクティビティから最も最近報告されたステータス。 文字列
attemptTimeout リモートの作業完了のタイムアウト。設定された場合、設定された開始時間内に完了しなかったリモートアクティビティを再試行することができます。 [Period] (期間)
commandOptions

COPY オペレーションの実行時に Amazon Redshift データノードに渡すパラメータを受け取ります。パラメータの詳細については、Amazon Redshift データベース開発者ガイドCOPY を参照してください。

テーブルをロードする際に、COPY は暗黙的に文字列をターゲット列のデータ型に変換しようとします。自動的に発生するデフォルトのデータ変換に加えて、エラーが表示される場合や他の変換を必要とする場合は、追加の変換パラメータを指定できます。詳細については、Amazon Redshift データベース開発者ガイドデータ変換パラメータを参照してください。

入力データノードまたは出力データノードにデータ形式が関連付けられている場合、指定されたパラメータは無視されます。

コピーオペレーションがまず COPY を使用して、ステージングテーブルにデータを挿入してから、INSERT コマンドを使用して、ステージングテーブルからターゲットテーブルにデータをコピーするため、COPY の一部のパラメータは適用されません。たとえば、COPY コマンドでテーブルの自動圧縮を有効にするパラメーターは適用されません。圧縮が必要な場合は、CREATE TABLE ステートメントに列エンコードの詳細を追加します。

また、場合によっては、Amazon Redshift クラスターからデータをアンロードし、Amazon S3 でファイルを作成する必要があるときに、RedshiftCopyActivity は Amazon Redshift からの UNLOAD オペレーションに依存します。

コピーおよびアンロード中のパフォーマンスを向上させるには、UNLOAD コマンドで PARALLEL OFF パラメータを指定します。パラメータの詳細については、Amazon Redshift データベース開発者ガイドUNLOAD を参照してください。

文字列
dependsOn 実行可能な別のオブジェクトで依存関係を指定します。 参照オブジェクト: "dependsOn":{"ref":"myActivityId"}
failureAndRerunMode 依存関係が失敗または再実行されたときのコンシューマーノードの動作を示します。 一覧表
input 入力データノード。データソースは、Amazon S3、DynamoDB、または Amazon Redshift を使用できます。 参照オブジェクト: "input":{"ref":"myDataNodeId"}
lateAfterTimeout オブジェクトが完了しなければならない、パイプライン開始からの経過時間。スケジュールタイプが ondemand に設定されていない場合にのみトリガーされます。 [Period] (期間)
maxActiveInstances コンポーネントで同時にアクティブになるインスタンスの最大数。再実行はアクティブなインスタンスの数にはカウントされません。 整数
maximumRetries 失敗時の最大再試行回数 整数
onFail 現在のオブジェクトが失敗したときに実行するアクション。 参照オブジェクト: "onFail":{"ref":"myActionId"}
onLateAction オブジェクトが予定されていないか、まだ完了していない場合にトリガーされるアクション。 参照オブジェクト: "onLateAction":{"ref":"myActionId"}
onSuccess 現在のオブジェクトが成功したときに実行するアクション。 参照オブジェクト: "onSuccess":{"ref":"myActionId"}
output 出力データノード。出力場所は、Amazon S3 または Amazon Redshift を使用できます。 参照オブジェクト: "output":{"ref":"myDataNodeId"}
parent スロットの継承元となる現在のオブジェクトの親。 参照オブジェクト: "parent":{"ref":"myBaseObjectId"}
pipelineLogUri パイプラインのログをアップロードするための S3 URI (s3://BucketName/Key/ など)。 文字列
precondition オプションで前提条件を定義します。すべての前提条件を満たすまで、データノードは "READY" とマークされません。 参照オブジェクト: "precondition":{"ref":"myPreconditionId"}
キュー

同時発生した複数アクティビティの割り当てと優先順位付けをキュー内の位置に基づいて行うことができる、Amazon Redshift の query_group 設定に相当します。

Amazon Redshift では、同時接続数が 15 に制限されています。詳細については、Amazon RDS データベース開発者ガイドキューへのクエリの割り当てを参照してください。

文字列
reportProgressTimeout

reportProgress へのリモート作業の連続した呼び出しのタイムアウト。

設定された場合、指定された期間の進捗状況を報告しないリモートアクティビティは停止されたと見なし、再試行できます。

[Period] (期間)
retryDelay 2 回の再試行の間のタイムアウト期間。 [Period] (期間)
scheduleType

パイプライン内のオブジェクトのスケジュールを指定できます。値は、cronondemand、および timeseries です。

timeseries スケジューリングは、インスタンスが各間隔の最後にスケジュールされることを意味します。

Cron スケジューリングは、インスタンスが各間隔の最初にスケジュールされることを意味します。

ondemand スケジュールにより、アクティベーションごとに 1 回パイプラインを実行することができます。つまり、パイプラインを再実行するために、クローンしたり再作成したりする必要はありません。

ondemand パイプラインを使用するには、それ以降の実行ごとに、ActivatePipeline オペレーションを呼び出します。

ondemand スケジュールを使用する場合は、デフォルトオブジェクトで指定し、パイプラインのオブジェクトに対して指定される唯一の scheduleType である必要があります。

一覧表
transformSql

入力データの変換に使用される SQL SELECT 式。

transformSql 式を staging という名前のテーブルで実行します。

DynamoDB または Amazon S3 からデータをコピーすると、AWS Data Pipeline によって「staging」という名前のテーブルが作成され、データがあらかじめロードされます。このテーブルのデータは、ターゲットテーブルの更新に使用されます。

transformSql の出力スキーマは最終的なターゲットテーブルのスキーマと一致する必要があります。

transformSql オプションを指定した場合は、指定の SQL ステートメントから 2 番目のステージングテーブルが作成されます。この 2 番目のステージングテーブルからのデータが、最終的なターゲットテーブルで更新されます。

文字列

実行時フィールド 説明 スロットタイプ
@activeInstances 現在スケジュールされているアクティブなインスタンスオブジェクトのリスト。 参照オブジェクト: "activeInstances":{"ref":"myRunnableObjectId"}
@actualEndTime このオブジェクトの実行が終了した時刻。 DateTime
@actualStartTime このオブジェクトの実行が開始された時刻。 DateTime
cancellationReason このオブジェクトがキャンセルされた場合の cancellationReason。 文字列
@cascadeFailedOn オブジェクトが失敗した際の依存関係チェーンの説明。 参照オブジェクト: "cascadeFailedOn":{"ref":"myRunnableObjectId"}
emrStepLog EMR アクティビティの試行でのみ使用可能な EMR ステップログ 文字列
errorId このオブジェクトが失敗した場合は errorId。 文字列
errorMessage このオブジェクトが失敗した場合は errorMessage。 文字列
errorStackTrace このオブジェクトが失敗した場合は、エラースタックトレース。 文字列
@finishedTime このオブジェクトが実行を終了した時刻。 DateTime
hadoopJobLog EMR ベースのアクティビティで試みることができる Hadoop ジョブのログ。 文字列
@healthStatus 終了状態に達した最後のオブジェクトインスタンスの成功または失敗を反映する、オブジェクトのヘルスステータス。 文字列
@healthStatusFromInstanceId 終了状態に達した最後のインスタンスオブジェクトの ID。 文字列
@healthStatusUpdatedTime ヘルス状態が最後に更新された時間。 DateTime
hostname タスクの試行を取得したクライアントのホスト名。 文字列
@lastDeactivatedTime このオブジェクトが最後に非アクティブ化された時刻。 DateTime
@latestCompletedRunTime 実行が完了した最後の実行の時刻。 DateTime
@latestRunTime 実行がスケジュールされた最後の実行の時刻。 DateTime
@nextRunTime 次回にスケジュールされた実行の時刻。 DateTime
reportProgressTime リモートアクティビティで進捗状況が報告された最新の時刻。 DateTime
@scheduledEndTime オブジェクトの予定された終了時刻。 DateTime
@scheduledStartTime オブジェクトの予定された開始時刻。 DateTime
@status このオブジェクトのステータス。 文字列
@version オブジェクトが作成されたパイプラインのバージョン。 文字列
@waitingOn このオブジェクトが待機している依存関係のリストの説明。 参照オブジェクト: "waitingOn":{"ref":"myRunnableObjectId"}

システムフィールド 説明 スロットタイプ
@error 形式が正しくないオブジェクトを説明するエラー。 文字列
@pipelineId このオブジェクトが属するパイプラインの ID。 文字列
@sphere オブジェクトの球です。ライフサイクルにおける場所を示します。たとえば、コンポーネントオブジェクトにより、試行オブジェクトを実行するインスタンスオブジェクトが発生します。 文字列