将 StartJobRun 与 AWS SDK 或 CLI 配合使用 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

StartJobRun 与 AWS SDK 或 CLI 配合使用

以下代码示例演示如何使用 StartJobRun

操作示例是大型程序的代码摘录,必须在上下文中运行。在以下代码示例中,您可以查看此操作的上下文:

.NET
AWS SDK for .NET
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

/// <summary> /// Start an AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A string representing the job run Id.</returns> public async Task<string> StartJobRunAsync( string jobName, string inputDatabase, string inputTable, string bucketName) { var request = new StartJobRunRequest { JobName = jobName, Arguments = new Dictionary<string, string> { {"--input_database", inputDatabase}, {"--input_table", inputTable}, {"--output_bucket_url", $"s3://{bucketName}/"} } }; var response = await _amazonGlue.StartJobRunAsync(request); return response.JobRunId; }
  • 有关 API 详细信息,请参阅 AWS SDK for .NET API 参考中的 StartJobRun

C++
适用于 C++ 的 SDK
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

Aws::Client::ClientConfiguration clientConfig; // Optional: Set to the AWS Region in which the bucket was created (overrides config file). // clientConfig.region = "us-east-1"; Aws::Glue::GlueClient client(clientConfig); Aws::Glue::Model::StartJobRunRequest request; request.SetJobName(JOB_NAME); Aws::Map<Aws::String, Aws::String> arguments; arguments["--input_database"] = CRAWLER_DATABASE_NAME; arguments["--input_table"] = tableName; arguments["--output_bucket_url"] = Aws::String("s3://") + bucketName + "/"; request.SetArguments(arguments); Aws::Glue::Model::StartJobRunOutcome outcome = client.StartJobRun(request); if (outcome.IsSuccess()) { std::cout << "Successfully started the job." << std::endl; Aws::String jobRunId = outcome.GetResult().GetJobRunId(); int iterator = 0; bool done = false; while (!done) { ++iterator; std::this_thread::sleep_for(std::chrono::seconds(1)); Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(JOB_NAME); jobRunRequest.SetRunId(jobRunId); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { const Aws::Glue::Model::JobRun &jobRun = jobRunOutcome.GetResult().GetJobRun(); Aws::Glue::Model::JobRunState jobRunState = jobRun.GetJobRunState(); if ((jobRunState == Aws::Glue::Model::JobRunState::STOPPED) || (jobRunState == Aws::Glue::Model::JobRunState::FAILED) || (jobRunState == Aws::Glue::Model::JobRunState::TIMEOUT)) { std::cerr << "Error running job. " << jobRun.GetErrorMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } else if (jobRunState == Aws::Glue::Model::JobRunState::SUCCEEDED) { std::cout << "Job run succeeded after " << iterator << " seconds elapsed." << std::endl; done = true; } else if ((iterator % 10) == 0) { // Log status every 10 seconds. std::cout << "Job run status " << Aws::Glue::Model::JobRunStateMapper::GetNameForJobRunState( jobRunState) << ". " << iterator << " seconds elapsed." << std::endl; } } else { std::cerr << "Error retrieving job run state. " << jobRunOutcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } } else { std::cerr << "Error starting a job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; }
  • 有关 API 的详细信息,请参阅 AWS SDK for C++ API 参考中的 StartJobRun

CLI
AWS CLI

开始运行任务

以下 start-job-run 示例启动了一个任务。

aws glue start-job-run \ --job-name my-job

输出:

{ "JobRunId": "jr_22208b1f44eb5376a60569d4b21dd20fcb8621e1a366b4e7b2494af764b82ded" }

有关更多信息,请参阅《AWS Glue 开发人员指南》中的编写任务

  • 有关 API 详细信息,请参阅《AWS CLI 命令参考》中的 StartJobRun

JavaScript
SDK for JavaScript (v3)
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

const startJobRun = (jobName, dbName, tableName, bucketName) => { const client = new GlueClient({}); const command = new StartJobRunCommand({ JobName: jobName, Arguments: { "--input_database": dbName, "--input_table": tableName, "--output_bucket_url": `s3://${bucketName}/`, }, }); return client.send(command); };
  • 有关 API 详细信息,请参阅 AWS SDK for JavaScript API 参考中的 StartJobRun

PHP
适用于 PHP 的 SDK
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

$jobName = 'test-job-' . $uniqid; $databaseName = "doc-example-database-$uniqid"; $tables = $glueService->getTables($databaseName); $outputBucketUrl = "s3://$bucketName"; $runId = $glueService->startJobRun($jobName, $databaseName, $tables, $outputBucketUrl)['JobRunId']; public function startJobRun($jobName, $databaseName, $tables, $outputBucketUrl): Result { return $this->glueClient->startJobRun([ 'JobName' => $jobName, 'Arguments' => [ 'input_database' => $databaseName, 'input_table' => $tables['TableList'][0]['Name'], 'output_bucket_url' => $outputBucketUrl, '--input_database' => $databaseName, '--input_table' => $tables['TableList'][0]['Name'], '--output_bucket_url' => $outputBucketUrl, ], ]); }
  • 有关 API 详细信息,请参阅 AWS SDK for PHP API 参考中的 StartJobRun

Python
SDK for Python(Boto3)
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def start_job_run(self, name, input_database, input_table, output_bucket_name): """ Starts a job run. A job run extracts data from the source, transforms it, and loads it to the output bucket. :param name: The name of the job definition. :param input_database: The name of the metadata database that contains tables that describe the source data. This is typically created by a crawler. :param input_table: The name of the table in the metadata database that describes the source data. :param output_bucket_name: The S3 bucket where the output is written. :return: The ID of the job run. """ try: # The custom Arguments that are passed to this function are used by the # Python ETL script to determine the location of input and output data. response = self.glue_client.start_job_run( JobName=name, Arguments={ "--input_database": input_database, "--input_table": input_table, "--output_bucket_url": f"s3://{output_bucket_name}/", }, ) except ClientError as err: logger.error( "Couldn't start job run %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRunId"]
  • 有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的 StartJobRun

Ruby
适用于 Ruby 的 SDK
注意

查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Starts a job run for the specified job. # # @param name [String] The name of the job to start the run for. # @param input_database [String] The name of the input database for the job. # @param input_table [String] The name of the input table for the job. # @param output_bucket_name [String] The name of the output S3 bucket for the job. # @return [String] The ID of the started job run. def start_job_run(name, input_database, input_table, output_bucket_name) response = @glue_client.start_job_run( job_name: name, arguments: { '--input_database': input_database, '--input_table': input_table, '--output_bucket_url': "s3://#{output_bucket_name}/" } ) response.job_run_id rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not start job run #{name}: \n#{e.message}") raise end
  • 有关 API 的详细信息,请参阅 AWS SDK for Ruby API 参考中的 StartJobRun

Rust
适用于 Rust 的 SDK
注意

在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

let job_run_output = glue .start_job_run() .job_name(self.job()) .arguments("--input_database", self.database()) .arguments( "--input_table", self.tables .first() .ok_or_else(|| GlueMvpError::Unknown("Missing crawler table".into()))? .name(), ) .arguments("--output_bucket_url", self.bucket()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job = job_run_output .job_run_id() .ok_or_else(|| GlueMvpError::Unknown("Missing run id from just started job".into()))? .to_string();
  • 有关 API 详细信息,请参阅适用于 Rust 的 AWS SDK API 参考中的 StartJobRun

有关 AWS SDK 开发人员指南和代码示例的完整列表,请参阅 将此服务与 AWS SDK 结合使用。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。