使用命令行复制 MySQL 数据 - AWS Data Pipeline

使用命令行复制 MySQL 数据

您可以创建管道,将数据从 MySQL 表复制到 Amazon S3 存储桶中的文件。

先决条件

在开始本教程之前,您必须完成以下步骤:

  1. 安装和配置命令行界面 (CLI)。有关更多信息,请参阅访问 AWS Data Pipeline

  2. 确保名为 DataPipelineDefaultRoleDataPipelineDefaultResourceRole 的 IAM 角色存在。AWS Data Pipeline 控制台会自动为您创建这些角色。如果您一次也没有使用过 AWS Data Pipeline 控制台,则必须手动创建这些角色。有关更多信息,请参阅适用于 AWS Data Pipeline 的 IAM 角色

  3. 设置 Amazon S3 存储桶和 Amazon RDS 实例。有关更多信息,请参阅开始前的准备工作

以 JSON 格式定义管道

此示例场景显示如何使用 JSON 管道定义和 AWS Data Pipeline CLI 按指定的时间间隔,将数据(行)从 MySQL 数据库中的表复制到 Amazon S3 存储桶中的 CSV(逗号分隔值)文件。

这是完整管道定义 JSON 文件,其每个部分后跟说明。

注意

我们建议您使用文本编辑器,这可帮助您验证 JSON 格式文件的语法,并使用 .json 文件扩展名来命名文件。

{ "objects": [ { "id": "ScheduleId113", "startDateTime": "2013-08-26T00:00:00", "name": "My Copy Schedule", "type": "Schedule", "period": "1 Days" }, { "id": "CopyActivityId112", "input": { "ref": "MySqlDataNodeId115" }, "schedule": { "ref": "ScheduleId113" }, "name": "My Copy", "runsOn": { "ref": "Ec2ResourceId116" }, "onSuccess": { "ref": "ActionId1" }, "onFail": { "ref": "SnsAlarmId117" }, "output": { "ref": "S3DataNodeId114" }, "type": "CopyActivity" }, { "id": "S3DataNodeId114", "schedule": { "ref": "ScheduleId113" }, "filePath": "s3://example-bucket/rds-output/output.csv", "name": "My S3 Data", "type": "S3DataNode" }, { "id": "MySqlDataNodeId115", "username": "my-username", "schedule": { "ref": "ScheduleId113" }, "name": "My RDS Data", "*password": "my-password", "table": "table-name", "connectionString": "jdbc:mysql://your-sql-instance-name.id.region-name.rds.amazonaws.com:3306/database-name", "selectQuery": "select * from #{table}", "type": "SqlDataNode" }, { "id": "Ec2ResourceId116", "schedule": { "ref": "ScheduleId113" }, "name": "My EC2 Resource", "role": "DataPipelineDefaultRole", "type": "Ec2Resource", "resourceRole": "DataPipelineDefaultResourceRole" }, { "message": "This is a success message.", "id": "ActionId1", "subject": "RDS to S3 copy succeeded!", "name": "My Success Alarm", "role": "DataPipelineDefaultRole", "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic", "type": "SnsAlarm" }, { "id": "Default", "scheduleType": "timeseries", "failureAndRerunMode": "CASCADE", "name": "Default", "role": "DataPipelineDefaultRole", "resourceRole": "DataPipelineDefaultResourceRole" }, { "message": "There was a problem executing #{node.name} at for period #{node.@scheduledStartTime} to #{node.@scheduledEndTime}", "id": "SnsAlarmId117", "subject": "RDS to S3 copy failed", "name": "My Failure Alarm", "role": "DataPipelineDefaultRole", "topicArn": "arn:aws:sns:us-east-1:123456789012:example-topic", "type": "SnsAlarm" } ] }

MySQL 数据节点

输入 MySqlDataNode 管道组件定义输入数据的位置;在本例中是 Amazon RDS 实例。输入 MySqlDataNode 组件由以下字段定义:

{ "id": "MySqlDataNodeId115", "username": "my-username", "schedule": { "ref": "ScheduleId113" }, "name": "My RDS Data", "*password": "my-password", "table": "table-name", "connectionString": "jdbc:mysql://your-sql-instance-name.id.region-name.rds.amazonaws.com:3306/database-name", "selectQuery": "select * from #{table}", "type": "SqlDataNode" },
Id

用户定义名称,这是仅供您参考的标签。

Username

数据库账户的用户名,该账户具有足够的权限从数据库表检索数据。使用您用户的名称替换 my-username

计划

对我们在 JSON 文件前面行中创建的计划组件的引用。

名称

用户定义名称,这是仅供您参考的标签。

*密码

数据库账户密码带有星号前缀,指示 AWS Data Pipeline 必须加密密码值。使用您的用户的正确密码替换 my-password。密码字段前面带有星号特殊字符。有关更多信息,请参阅特殊字符

包含要复制的数据的数据库表的名称。使用您的数据库表名称替换 table-name

connectionString

连接到数据库的 CopyActivity 对象的 JDBC 连接字符串。

selectQuery

有效的 SQL SELECT 查询,用于指定要从数据库表复制的数据。请注意,#{table} 是重新使用在 JSON 文件前面行中“table”变量提供的表名的表达式。

类型

SqlDataNode 类型,在本例中是使用 MySQL 的 Amazon RDS 实例。

注意

MySqlDataNode 类型已被弃用。尽管您仍然可以使用 MySqlDataNode,我们建议您使用 SqlDataNode。

Amazon S3 数据节点

接下来,S3Output 管道组件定义输出文件的位置;在这种情况下是 Amazon S3 存储桶位置中的 CSV 文件。输出 S3DataNode 组件由以下字段定义:

{ "id": "S3DataNodeId114", "schedule": { "ref": "ScheduleId113" }, "filePath": "s3://example-bucket/rds-output/output.csv", "name": "My S3 Data", "type": "S3DataNode" },
Id

用户定义 ID,这是仅供您参考的标签。

计划

对我们在 JSON 文件前面行中创建的计划组件的引用。

filePath

与数据节点关联的数据的路径,在本示例中是一个 CSV 输出文件。

名称

用户定义名称,这是仅供您参考的标签。

类型

管道对象类型,为 S3DataNode,与 Amazon S3 存储桶中数据所在位置匹配。

资源

这是执行复制操作的计算资源的定义。在本示例中,AWS Data Pipeline 应该自动创建 EC2 实例来执行复制任务,并在任务完成后终止资源。此处定义的字段控制完成工作的 EC2 实例的创建操作及其功能。EC2Resource 由以下字段定义:

{ "id": "Ec2ResourceId116", "schedule": { "ref": "ScheduleId113" }, "name": "My EC2 Resource", "role": "DataPipelineDefaultRole", "type": "Ec2Resource", "resourceRole": "DataPipelineDefaultResourceRole" },
Id

用户定义 ID,这是仅供您参考的标签。

计划

根据它来创建此计算资源的计划。

名称

用户定义名称,这是仅供您参考的标签。

角色

访问资源的账户的 IAM 角色,例如访问 Amazon S3 存储桶检索数据。

类型

执行工作的计算资源的类型;在这种情况下为 EC2 实例。还有其他资源类型可用,如 EmrCluster 类型。

resourceRole

创建资源的账户的 IAM 角色,如代表您创建和配置 EC2 实例。Role 和 ResourceRole 可以是相同角色,但在安全配置中可分别提供更细粒度。

活动

JSON 文件的最后一个部分是活动的定义,表示要执行的工作。在本例中,我们使用 CopyActivity 组件将数据从 Amazon S3 存储桶中的文件复制到另一个文件。CopyActivity 组件由以下字段定义:

{ "id": "CopyActivityId112", "input": { "ref": "MySqlDataNodeId115" }, "schedule": { "ref": "ScheduleId113" }, "name": "My Copy", "runsOn": { "ref": "Ec2ResourceId116" }, "onSuccess": { "ref": "ActionId1" }, "onFail": { "ref": "SnsAlarmId117" }, "output": { "ref": "S3DataNodeId114" }, "type": "CopyActivity" },
Id

用户定义 ID,这是仅供您参考的标签

输入

待复制 MySQL 数据的位置

计划

运行此活动的计划

名称

用户定义名称,这是仅供您参考的标签

runsOn

执行此活动定义的工作的计算资源。在本示例中,我们提供了对之前定义的 EC2 实例的引用。使用 runsOn 字段会让 AWS Data Pipeline 为您创建 EC2 实例。runsOn 字段指示资源存在于 AWS 基础设施中,而 workerGroup 值指示您要使用自己的本地资源执行工作。

onSuccess

活动成功完成时发送的 SnsAlarm

onFail

活动失败时发送的 SnsAlarm

输出

CSV 输出文件的 Amazon S3 位置

类型

要执行的活动类型。

上传并激活管道定义

您必须上传您的管道定义并激活您的管道。在以下示例命令中,将 pipeline_name 替换为管道的标签,将 pipeline_file 替换为管道定义 .json 文件的完全限定路径。

AWS CLI

要创建管道定义并激活管道,请使用以下 create-pipeline 命令。记下您的管道 ID,因为您将在大多数 CLI 命令中使用这个值。

aws datapipeline create-pipeline --name pipeline_name --unique-id token { "pipelineId": "df-00627471SOVYZEXAMPLE" }

使用以下 put-pipeline-definition 命令更新管道定义。

aws datapipeline put-pipeline-definition --pipeline-id df-00627471SOVYZEXAMPLE --pipeline-definition file://MyEmrPipelineDefinition.json

如果您的管道成功验证,则 validationErrors 字段为空。您应该查看所有警告。

要激活管道,请使用以下 activate-pipeline 命令。

aws datapipeline activate-pipeline --pipeline-id df-00627471SOVYZEXAMPLE

您可以使用以下 list-pipelines 命令来验证您的管道是否出现在管道列表中。

aws datapipeline list-pipelines