将 CreateJob 与 AWS SDK 或命令行工具配合使用 - AWS Glue

CreateJob 与 AWS SDK 或命令行工具配合使用

以下代码示例演示如何使用 CreateJob

操作示例是大型程序的代码摘录,必须在上下文中运行。在以下代码示例中,您可以查看此操作的上下文:

.NET
AWS SDK for .NET
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/// <summary> /// Create an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="roleName">The name of the IAM role to be assumed by /// the job.</param> /// <param name="description">A description of the job.</param> /// <param name="scriptUrl">The URL to the script.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateJobAsync(string dbName, string tableName, string bucketUrl, string jobName, string roleName, string description, string scriptUrl) { var command = new JobCommand { PythonVersion = "3", Name = "glueetl", ScriptLocation = scriptUrl, }; var arguments = new Dictionary<string, string> { { "--input_database", dbName }, { "--input_table", tableName }, { "--output_bucket_url", bucketUrl } }; var request = new CreateJobRequest { Command = command, DefaultArguments = arguments, Description = description, GlueVersion = "3.0", Name = jobName, NumberOfWorkers = 10, Role = roleName, WorkerType = "G.1X" }; var response = await _amazonGlue.CreateJobAsync(request); return response.HttpStatusCode == HttpStatusCode.OK; }
  • 有关 API 详细信息,请参阅 AWS SDK for .NET API 参考中的 CreateJob

C++
适用于 C++ 的 SDK
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

Aws::Client::ClientConfiguration clientConfig; // Optional: Set to the AWS Region in which the bucket was created (overrides config file). // clientConfig.region = "us-east-1"; Aws::Glue::GlueClient client(clientConfig); Aws::Glue::Model::CreateJobRequest request; request.SetName(JOB_NAME); request.SetRole(roleArn); request.SetGlueVersion(GLUE_VERSION); Aws::Glue::Model::JobCommand command; command.SetName(JOB_COMMAND_NAME); command.SetPythonVersion(JOB_PYTHON_VERSION); command.SetScriptLocation( Aws::String("s3://") + bucketName + "/" + PYTHON_SCRIPT); request.SetCommand(command); Aws::Glue::Model::CreateJobOutcome outcome = client.CreateJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the job." << std::endl; } else { std::cerr << "Error creating the job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; }
  • 有关 API 详细信息,请参阅《AWS SDK for C++ API 参考》中的 CreateJob

CLI
AWS CLI

创建用于转换数据的任务

以下 create-job 示例创建了一个运行存储在 S3 中的脚本的流式处理任务。

aws glue create-job \ --name my-testing-job \ --role AWSGlueServiceRoleDefault \ --command '{ \ "Name": "gluestreaming", \ "ScriptLocation": "s3://DOC-EXAMPLE-BUCKET/folder/" \ }' \ --region us-east-1 \ --output json \ --default-arguments '{ \ "--job-language":"scala", \ "--class":"GlueApp" \ }' \ --profile my-profile \ --endpoint https://glue.us-east-1.amazonaws.com

test_script.scala 的内容:

import com.amazonaws.services.glue.ChoiceOption import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.ResolveSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // @type: DataSource // @args: [database = "tempdb", table_name = "s3-source", transformation_ctx = "datasource0"] // @return: datasource0 // @inputs: [] val datasource0 = glueContext.getCatalogSource(database = "tempdb", tableName = "s3-source", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame() // @type: ApplyMapping // @args: [mapping = [("sensorid", "int", "sensorid", "int"), ("currenttemperature", "int", "currenttemperature", "int"), ("status", "string", "status", "string")], transformation_ctx = "applymapping1"] // @return: applymapping1 // @inputs: [frame = datasource0] val applymapping1 = datasource0.applyMapping(mappings = Seq(("sensorid", "int", "sensorid", "int"), ("currenttemperature", "int", "currenttemperature", "int"), ("status", "string", "status", "string")), caseSensitive = false, transformationContext = "applymapping1") // @type: SelectFields // @args: [paths = ["sensorid", "currenttemperature", "status"], transformation_ctx = "selectfields2"] // @return: selectfields2 // @inputs: [frame = applymapping1] val selectfields2 = applymapping1.selectFields(paths = Seq("sensorid", "currenttemperature", "status"), transformationContext = "selectfields2") // @type: ResolveChoice // @args: [choice = "MATCH_CATALOG", database = "tempdb", table_name = "my-s3-sink", transformation_ctx = "resolvechoice3"] // @return: resolvechoice3 // @inputs: [frame = selectfields2] val resolvechoice3 = selectfields2.resolveChoice(choiceOption = Some(ChoiceOption("MATCH_CATALOG")), database = Some("tempdb"), tableName = Some("my-s3-sink"), transformationContext = "resolvechoice3") // @type: DataSink // @args: [database = "tempdb", table_name = "my-s3-sink", transformation_ctx = "datasink4"] // @return: datasink4 // @inputs: [frame = resolvechoice3] val datasink4 = glueContext.getCatalogSink(database = "tempdb", tableName = "my-s3-sink", redshiftTmpDir = "", transformationContext = "datasink4").writeDynamicFrame(resolvechoice3) Job.commit() } }

输出:

{ "Name": "my-testing-job" }

有关更多信息,请参阅《AWS Glue 开发人员指南》中的在 AWS Glue 中编写任务

  • 有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 CreateJob

JavaScript
SDK for JavaScript (v3)
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

const createJob = (name, role, scriptBucketName, scriptKey) => { const client = new GlueClient({}); const command = new CreateJobCommand({ Name: name, Role: role, Command: { Name: "glueetl", PythonVersion: "3", ScriptLocation: `s3://${scriptBucketName}/${scriptKey}`, }, GlueVersion: "3.0", }); return client.send(command); };
  • 有关 API 详细信息,请参阅 AWS SDK for JavaScript API 参考中的 CreateJob

PHP
适用于 PHP 的 SDK
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

$role = $iamService->getRole("AWSGlueServiceRole-DocExample"); $jobName = 'test-job-' . $uniqid; $scriptLocation = "s3://$bucketName/run_job.py"; $job = $glueService->createJob($jobName, $role['Role']['Arn'], $scriptLocation); public function createJob($jobName, $role, $scriptLocation, $pythonVersion = '3', $glueVersion = '3.0'): Result { return $this->glueClient->createJob([ 'Name' => $jobName, 'Role' => $role, 'Command' => [ 'Name' => 'glueetl', 'ScriptLocation' => $scriptLocation, 'PythonVersion' => $pythonVersion, ], 'GlueVersion' => $glueVersion, ]); }
  • 有关 API 详细信息,请参阅《AWS SDK for PHP API 参考》中的 CreateJob

PowerShell
适用于 PowerShell 的工具

示例 1:此示例在 AWS Glue 中创建了一个新作业。命令名称的值始终为 glueetl。AWSGlue 支持运行使用 Python 或 Scala 编写的脚本。在此示例中,作业脚本(MyTestGlueJob.py)是使用 Python 编写的。Python 参数在 $DefArgs 变量中指定,然后在 DefaultArguments 参数中传递给接受哈希表的 PowerShell 命令。$JobParams 变量中的参数来自 CreateJob API,详情请参阅 AWS Glue API 参考的“作业”(https://docs.aws.amazon.com/glue/latest/dg/aws-glue-api-jobs-job.html)主题。

$Command = New-Object Amazon.Glue.Model.JobCommand $Command.Name = 'glueetl' $Command.ScriptLocation = 's3://aws-glue-scripts-000000000000-us-west-2/admin/MyTestGlueJob.py' $Command $Source = "source_test_table" $Target = "target_test_table" $Connections = $Source, $Target $DefArgs = @{ '--TempDir' = 's3://aws-glue-temporary-000000000000-us-west-2/admin' '--job-bookmark-option' = 'job-bookmark-disable' '--job-language' = 'python' } $DefArgs $ExecutionProp = New-Object Amazon.Glue.Model.ExecutionProperty $ExecutionProp.MaxConcurrentRuns = 1 $ExecutionProp $JobParams = @{ "AllocatedCapacity" = "5" "Command" = $Command "Connections_Connection" = $Connections "DefaultArguments" = $DefArgs "Description" = "This is a test" "ExecutionProperty" = $ExecutionProp "MaxRetries" = "1" "Name" = "MyOregonTestGlueJob" "Role" = "Amazon-GlueServiceRoleForSSM" "Timeout" = "20" } New-GlueJob @JobParams
  • 有关 API 的详细信息,请参阅《AWS Tools for PowerShell Cmdlet 参考》中的 CreateJob

Python
SDK for Python(Boto3)
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def create_job(self, name, description, role_arn, script_location): """ Creates a job definition for an extract, transform, and load (ETL) job that can be run by AWS Glue. :param name: The name of the job definition. :param description: The description of the job definition. :param role_arn: The ARN of an IAM role that grants AWS Glue the permissions it requires to run the job. :param script_location: The Amazon S3 URL of a Python ETL script that is run as part of the job. The script defines how the data is transformed. """ try: self.glue_client.create_job( Name=name, Description=description, Role=role_arn, Command={ "Name": "glueetl", "ScriptLocation": script_location, "PythonVersion": "3", }, GlueVersion="3.0", ) except ClientError as err: logger.error( "Couldn't create job %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 CreateJob

Ruby
适用于 Ruby 的 SDK
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Creates a new job with the specified configuration. # # @param name [String] The name of the job. # @param description [String] The description of the job. # @param role_arn [String] The ARN of the IAM role to be used by the job. # @param script_location [String] The location of the ETL script for the job. # @return [void] def create_job(name, description, role_arn, script_location) @glue_client.create_job( name: name, description: description, role: role_arn, command: { name: "glueetl", script_location: script_location, python_version: "3" }, glue_version: "3.0" ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create job #{name}: \n#{e.message}") raise end
  • 有关 API 详细信息,请参阅《AWS SDK for Ruby API 参考》中的 CreateJob

Rust
适用于 Rust 的 SDK
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

let create_job = glue .create_job() .name(self.job()) .role(self.iam_role.expose_secret()) .command( JobCommand::builder() .name("glueetl") .python_version("3") .script_location(format!("s3://{}/job.py", self.bucket())) .build(), ) .glue_version("3.0") .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job_name = create_job.name().ok_or_else(|| { GlueMvpError::Unknown("Did not get job name after creating job".into()) })?;
  • 有关 API 详细信息,请参阅适用于 Rust 的 AWS SDK API 参考中的 CreateJob

有关 AWS SDK 开发人员指南和代码示例的完整列表,请参阅 将此服务与 AWS SDK 结合使用。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。