了解将 AWS Glue 与 AWS SDK 结合使用的基础知识
以下代码示例演示了如何:
创建爬网程序,爬取公有 Amazon S3 存储桶并生成包含 CSV 格式的元数据的数据库。
在 AWS Glue Data Catalog 中列出有关数据库和表的信息。
创建任务,从 S3 存储桶提取 CSV 数据,转换数据,然后将 JSON 格式的输出加载到另一个 S3 存储桶中。
列出有关作业运行的信息,查看转换后的数据,并清理资源。
有关更多信息,请参阅教程:AWS Glue Studio 入门。
- .NET
-
- AWS SDK for .NET
-
注意
在 GitHub 上查看更多内容。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 创建一个包含场景中使用的 AWS Glue 函数的类。
using System.Net; namespace GlueActions; public class GlueWrapper { private readonly IAmazonGlue _amazonGlue; /// <summary> /// Constructor for the AWS Glue actions wrapper. /// </summary> /// <param name="amazonGlue"></param> public GlueWrapper(IAmazonGlue amazonGlue) { _amazonGlue = amazonGlue; } /// <summary> /// Create an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name for the crawler.</param> /// <param name="crawlerDescription">A description of the crawler.</param> /// <param name="role">The AWS Identity and Access Management (IAM) role to /// be assumed by the crawler.</param> /// <param name="schedule">The schedule on which the crawler will be executed.</param> /// <param name="s3Path">The path to the Amazon Simple Storage Service (Amazon S3) /// bucket where the Python script has been stored.</param> /// <param name="dbName">The name to use for the database that will be /// created by the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateCrawlerAsync( string crawlerName, string crawlerDescription, string role, string schedule, string s3Path, string dbName) { var s3Target = new S3Target { Path = s3Path, }; var targetList = new List<S3Target> { s3Target, }; var targets = new CrawlerTargets { S3Targets = targetList, }; var crawlerRequest = new CreateCrawlerRequest { DatabaseName = dbName, Name = crawlerName, Description = crawlerDescription, Targets = targets, Role = role, Schedule = schedule, }; var response = await _amazonGlue.CreateCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Create an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="roleName">The name of the IAM role to be assumed by /// the job.</param> /// <param name="description">A description of the job.</param> /// <param name="scriptUrl">The URL to the script.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateJobAsync(string dbName, string tableName, string bucketUrl, string jobName, string roleName, string description, string scriptUrl) { var command = new JobCommand { PythonVersion = "3", Name = "glueetl", ScriptLocation = scriptUrl, }; var arguments = new Dictionary<string, string> { { "--input_database", dbName }, { "--input_table", tableName }, { "--output_bucket_url", bucketUrl } }; var request = new CreateJobRequest { Command = command, DefaultArguments = arguments, Description = description, GlueVersion = "3.0", Name = jobName, NumberOfWorkers = 10, Role = roleName, WorkerType = "G.1X" }; var response = await _amazonGlue.CreateJobAsync(request); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteCrawlerAsync(string crawlerName) { var response = await _amazonGlue.DeleteCrawlerAsync(new DeleteCrawlerRequest { Name = crawlerName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete the AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteDatabaseAsync(string dbName) { var response = await _amazonGlue.DeleteDatabaseAsync(new DeleteDatabaseRequest { Name = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteJobAsync(string jobName) { var response = await _amazonGlue.DeleteJobAsync(new DeleteJobRequest { JobName = jobName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete a table from an AWS Glue database. /// </summary> /// <param name="tableName">The table to delete.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteTableAsync(string dbName, string tableName) { var response = await _amazonGlue.DeleteTableAsync(new DeleteTableRequest { Name = tableName, DatabaseName = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Get information about an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Crawler object describing the crawler.</returns> public async Task<Crawler?> GetCrawlerAsync(string crawlerName) { var crawlerRequest = new GetCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.GetCrawlerAsync(crawlerRequest); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { var databaseName = response.Crawler.DatabaseName; Console.WriteLine($"{crawlerName} has the database {databaseName}"); return response.Crawler; } Console.WriteLine($"No information regarding {crawlerName} could be found."); return null; } /// <summary> /// Get information about the state of an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A value describing the state of the crawler.</returns> public async Task<CrawlerState> GetCrawlerStateAsync(string crawlerName) { var response = await _amazonGlue.GetCrawlerAsync( new GetCrawlerRequest { Name = crawlerName }); return response.Crawler.State; } /// <summary> /// Get information about an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Database object containing information about the database.</returns> public async Task<Database> GetDatabaseAsync(string dbName) { var databasesRequest = new GetDatabaseRequest { Name = dbName, }; var response = await _amazonGlue.GetDatabaseAsync(databasesRequest); return response.Database; } /// <summary> /// Get information about a specific AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="jobRunId">The Id of the job run.</param> /// <returns>A JobRun object with information about the job run.</returns> public async Task<JobRun> GetJobRunAsync(string jobName, string jobRunId) { var response = await _amazonGlue.GetJobRunAsync(new GetJobRunRequest { JobName = jobName, RunId = jobRunId }); return response.JobRun; } /// <summary> /// Get information about all AWS Glue runs of a specific job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A list of JobRun objects.</returns> public async Task<List<JobRun>> GetJobRunsAsync(string jobName) { var jobRuns = new List<JobRun>(); var request = new GetJobRunsRequest { JobName = jobName, }; // No need to loop to get all the log groups--the SDK does it for us behind the scenes var paginatorForJobRuns = _amazonGlue.Paginators.GetJobRuns(request); await foreach (var response in paginatorForJobRuns.Responses) { response.JobRuns.ForEach(jobRun => { jobRuns.Add(jobRun); }); } return jobRuns; } /// <summary> /// Get a list of tables for an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A list of Table objects.</returns> public async Task<List<Table>> GetTablesAsync(string dbName) { var request = new GetTablesRequest { DatabaseName = dbName }; var tables = new List<Table>(); // Get a paginator for listing the tables. var tablePaginator = _amazonGlue.Paginators.GetTables(request); await foreach (var response in tablePaginator.Responses) { tables.AddRange(response.TableList); } return tables; } /// <summary> /// List AWS Glue jobs using a paginator. /// </summary> /// <returns>A list of AWS Glue job names.</returns> public async Task<List<string>> ListJobsAsync() { var jobNames = new List<string>(); var listJobsPaginator = _amazonGlue.Paginators.ListJobs(new ListJobsRequest { MaxResults = 10 }); await foreach (var response in listJobsPaginator.Responses) { jobNames.AddRange(response.JobNames); } return jobNames; } /// <summary> /// Start an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> StartCrawlerAsync(string crawlerName) { var crawlerRequest = new StartCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.StartCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Start an AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A string representing the job run Id.</returns> public async Task<string> StartJobRunAsync( string jobName, string inputDatabase, string inputTable, string bucketName) { var request = new StartJobRunRequest { JobName = jobName, Arguments = new Dictionary<string, string> { {"--input_database", inputDatabase}, {"--input_table", inputTable}, {"--output_bucket_url", $"s3://{bucketName}/"} } }; var response = await _amazonGlue.StartJobRunAsync(request); return response.JobRunId; } }
创建运行场景的类。
global using Amazon.Glue; global using GlueActions; global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Logging.Console; global using Microsoft.Extensions.Logging.Debug; using Amazon.Glue.Model; using Amazon.S3; using Amazon.S3.Model; namespace GlueBasics; public class GlueBasics { private static ILogger logger = null!; private static IConfiguration _configuration = null!; static async Task Main(string[] args) { // Set up dependency injection for AWS Glue. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonGlue>() .AddTransient<GlueWrapper>() .AddTransient<UiWrapper>() ) .Build(); logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<GlueBasics>(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally load local settings. .Build(); // These values are stored in settings.json // Once you have run the CDK script to deploy the resources, // edit the file to set "BucketName", "RoleName", and "ScriptURL" // to the appropriate values. Also set "CrawlerName" to the name // you want to give the crawler when it is created. string bucketName = _configuration["BucketName"]!; string bucketUrl = _configuration["BucketUrl"]!; string crawlerName = _configuration["CrawlerName"]!; string roleName = _configuration["RoleName"]!; string sourceData = _configuration["SourceData"]!; string dbName = _configuration["DbName"]!; string cron = _configuration["Cron"]!; string scriptUrl = _configuration["ScriptURL"]!; string jobName = _configuration["JobName"]!; var wrapper = host.Services.GetRequiredService<GlueWrapper>(); var uiWrapper = host.Services.GetRequiredService<UiWrapper>(); uiWrapper.DisplayOverview(); uiWrapper.PressEnter(); // Create the crawler and wait for it to be ready. uiWrapper.DisplayTitle("Create AWS Glue crawler"); Console.WriteLine("Let's begin by creating the AWS Glue crawler."); var crawlerDescription = "Crawler created for the AWS Glue Basics scenario."; var crawlerCreated = await wrapper.CreateCrawlerAsync(crawlerName, crawlerDescription, roleName, cron, sourceData, dbName); if (crawlerCreated) { Console.WriteLine($"The crawler: {crawlerName} has been created. Now let's wait until it's ready."); CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't create crawler {crawlerName}."); return; // Exit the application. } uiWrapper.DisplayTitle("Start AWS Glue crawler"); Console.WriteLine("Now let's wait until the crawler has successfully started."); var crawlerStarted = await wrapper.StartCrawlerAsync(crawlerName); if (crawlerStarted) { CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't start the crawler {crawlerName}."); return; // Exit the application. } uiWrapper.PressEnter(); Console.WriteLine($"\nLet's take a look at the database: {dbName}"); var database = await wrapper.GetDatabaseAsync(dbName); if (database != null) { uiWrapper.DisplayTitle($"{database.Name} Details"); Console.WriteLine($"{database.Name} created on {database.CreateTime}"); Console.WriteLine(database.Description); } uiWrapper.PressEnter(); var tables = await wrapper.GetTablesAsync(dbName); if (tables.Count > 0) { tables.ForEach(table => { Console.WriteLine($"{table.Name}\tCreated: {table.CreateTime}\tUpdated: {table.UpdateTime}"); }); } uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Create AWS Glue job"); Console.WriteLine("Creating a new AWS Glue job."); var description = "An AWS Glue job created using the AWS SDK for .NET"; await wrapper.CreateJobAsync(dbName, tables[0].Name, bucketUrl, jobName, roleName, description, scriptUrl); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Starting AWS Glue job"); Console.WriteLine("Starting the new AWS Glue job..."); var jobRunId = await wrapper.StartJobRunAsync(jobName, dbName, tables[0].Name, bucketName); var jobRunComplete = false; var jobRun = new JobRun(); do { jobRun = await wrapper.GetJobRunAsync(jobName, jobRunId); if (jobRun.JobRunState == "SUCCEEDED" || jobRun.JobRunState == "STOPPED" || jobRun.JobRunState == "FAILED" || jobRun.JobRunState == "TIMEOUT") { jobRunComplete = true; } } while (!jobRunComplete); uiWrapper.DisplayTitle($"Data in {bucketName}"); // Get the list of data stored in the S3 bucket. var s3Client = new AmazonS3Client(); var response = await s3Client.ListObjectsAsync(new ListObjectsRequest { BucketName = bucketName }); response.S3Objects.ForEach(s3Object => { Console.WriteLine(s3Object.Key); }); uiWrapper.DisplayTitle("AWS Glue jobs"); var jobNames = await wrapper.ListJobsAsync(); jobNames.ForEach(jobName => { Console.WriteLine(jobName); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Get AWS Glue job run information"); Console.WriteLine("Getting information about the AWS Glue job."); var jobRuns = await wrapper.GetJobRunsAsync(jobName); jobRuns.ForEach(jobRun => { Console.WriteLine($"{jobRun.JobName}\t{jobRun.JobRunState}\t{jobRun.CompletedOn}"); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Deleting resources"); Console.WriteLine("Deleting the AWS Glue job used by the example."); await wrapper.DeleteJobAsync(jobName); Console.WriteLine("Deleting the tables from the database."); tables.ForEach(async table => { await wrapper.DeleteTableAsync(dbName, table.Name); }); Console.WriteLine("Deleting the database."); await wrapper.DeleteDatabaseAsync(dbName); Console.WriteLine("Deleting the AWS Glue crawler."); await wrapper.DeleteCrawlerAsync(crawlerName); Console.WriteLine("The AWS Glue scenario has completed."); uiWrapper.PressEnter(); } } namespace GlueBasics; public class UiWrapper { public readonly string SepBar = new string('-', Console.WindowWidth); /// <summary> /// Show information about the scenario. /// </summary> public void DisplayOverview() { Console.Clear(); DisplayTitle("Amazon Glue: get started with crawlers and jobs"); Console.WriteLine("This example application does the following:"); Console.WriteLine("\t 1. Create a crawler, pass it the IAM role and the URL to the public S3 bucket that contains the source data"); Console.WriteLine("\t 2. Start the crawler."); Console.WriteLine("\t 3. Get the database created by the crawler and the tables in the database."); Console.WriteLine("\t 4. Create a job."); Console.WriteLine("\t 5. Start a job run."); Console.WriteLine("\t 6. Wait for the job run to complete."); Console.WriteLine("\t 7. Show the data stored in the bucket."); Console.WriteLine("\t 8. List jobs for the account."); Console.WriteLine("\t 9. Get job run details for the job that was run."); Console.WriteLine("\t10. Delete the demo job."); Console.WriteLine("\t11. Delete the database and tables created for the demo."); Console.WriteLine("\t12. Delete the crawler."); } /// <summary> /// Display a message and wait until the user presses enter. /// </summary> public void PressEnter() { Console.Write("\nPlease press <Enter> to continue. "); _ = Console.ReadLine(); } /// <summary> /// Pad a string with spaces to center it on the console display. /// </summary> /// <param name="strToCenter">The string to center on the screen.</param> /// <returns>The string padded to make it center on the screen.</returns> public string CenterString(string strToCenter) { var padAmount = (Console.WindowWidth - strToCenter.Length) / 2; var leftPad = new string(' ', padAmount); return $"{leftPad}{strToCenter}"; } /// <summary> /// Display a line of hyphens, the centered text of the title and another /// line of hyphens. /// </summary> /// <param name="strTitle">The string to be displayed.</param> public void DisplayTitle(string strTitle) { Console.WriteLine(SepBar); Console.WriteLine(CenterString(strTitle)); Console.WriteLine(SepBar); } }
-
有关 API 详细信息,请参阅《AWS SDK for .NET API 参考》中的以下主题。
-
- C++
-
- SDK for C++
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 //! Scenario which demonstrates using AWS Glue to add a crawler and run a job. /*! \\sa runGettingStartedWithGlueScenario() \param bucketName: An S3 bucket created in the setup. \param roleName: An AWS Identity and Access Management (IAM) role created in the setup. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::runGettingStartedWithGlueScenario(const Aws::String &bucketName, const Aws::String &roleName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::Glue::GlueClient client(clientConfig); Aws::String roleArn; if (!getRoleArn(roleName, roleArn, clientConfig)) { std::cerr << "Error getting role ARN for role." << std::endl; return false; } // 1. Upload the job script to the S3 bucket. { std::cout << "Uploading the job script '" << AwsDoc::Glue::PYTHON_SCRIPT << "'." << std::endl; if (!AwsDoc::Glue::uploadFile(bucketName, AwsDoc::Glue::PYTHON_SCRIPT_PATH, AwsDoc::Glue::PYTHON_SCRIPT, clientConfig)) { std::cerr << "Error uploading the job file." << std::endl; return false; } } // 2. Create a crawler. { Aws::Glue::Model::S3Target s3Target; s3Target.SetPath("s3://crawler-public-us-east-1/flight/2016/csv"); Aws::Glue::Model::CrawlerTargets crawlerTargets; crawlerTargets.AddS3Targets(s3Target); Aws::Glue::Model::CreateCrawlerRequest request; request.SetTargets(crawlerTargets); request.SetName(CRAWLER_NAME); request.SetDatabaseName(CRAWLER_DATABASE_NAME); request.SetTablePrefix(CRAWLER_DATABASE_PREFIX); request.SetRole(roleArn); Aws::Glue::Model::CreateCrawlerOutcome outcome = client.CreateCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the crawler." << std::endl; } else { std::cerr << "Error creating a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets("", CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 3. Get a crawler. { Aws::Glue::Model::GetCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome outcome = client.GetCrawler(request); if (outcome.IsSuccess()) { Aws::Glue::Model::CrawlerState crawlerState = outcome.GetResult().GetCrawler().GetState(); std::cout << "Retrieved crawler with state " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << "." << std::endl; } else { std::cerr << "Error retrieving a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 4. Start a crawler. { Aws::Glue::Model::StartCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::StartCrawlerOutcome outcome = client.StartCrawler(request); if (outcome.IsSuccess() || (Aws::Glue::GlueErrors::CRAWLER_RUNNING == outcome.GetError().GetErrorType())) { if (!outcome.IsSuccess()) { std::cout << "Crawler was already started." << std::endl; } else { std::cout << "Successfully started crawler." << std::endl; } std::cout << "This may take a while to run." << std::endl; Aws::Glue::Model::CrawlerState crawlerState = Aws::Glue::Model::CrawlerState::NOT_SET; int iterations = 0; while (Aws::Glue::Model::CrawlerState::READY != crawlerState) { std::this_thread::sleep_for(std::chrono::seconds(1)); ++iterations; if ((iterations % 10) == 0) { // Log status every 10 seconds. std::cout << "Crawler status " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << ". After " << iterations << " seconds elapsed." << std::endl; } Aws::Glue::Model::GetCrawlerRequest getCrawlerRequest; getCrawlerRequest.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome getCrawlerOutcome = client.GetCrawler( getCrawlerRequest); if (getCrawlerOutcome.IsSuccess()) { crawlerState = getCrawlerOutcome.GetResult().GetCrawler().GetState(); } else { std::cerr << "Error getting crawler. " << getCrawlerOutcome.GetError().GetMessage() << std::endl; break; } } if (Aws::Glue::Model::CrawlerState::READY == crawlerState) { std::cout << "Crawler finished running after " << iterations << " seconds." << std::endl; } } else { std::cerr << "Error starting a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 5. Get a database. { Aws::Glue::Model::GetDatabaseRequest request; request.SetName(CRAWLER_DATABASE_NAME); Aws::Glue::Model::GetDatabaseOutcome outcome = client.GetDatabase(request); if (outcome.IsSuccess()) { const Aws::Glue::Model::Database &database = outcome.GetResult().GetDatabase(); std::cout << "Successfully retrieve the database\n" << database.Jsonize().View().WriteReadable() << "'." << std::endl; } else { std::cerr << "Error getting the database. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 6. Get tables. Aws::String tableName; { Aws::Glue::Model::GetTablesRequest request; request.SetDatabaseName(CRAWLER_DATABASE_NAME); std::vector<Aws::Glue::Model::Table> all_tables; Aws::String nextToken; // Used for pagination. do { Aws::Glue::Model::GetTablesOutcome outcome = client.GetTables(request); if (outcome.IsSuccess()) { const std::vector<Aws::Glue::Model::Table> &tables = outcome.GetResult().GetTableList(); all_tables.insert(all_tables.end(), tables.begin(), tables.end()); nextToken = outcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting the tables. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } while (!nextToken.empty()); std::cout << "The database contains " << all_tables.size() << (all_tables.size() == 1 ? " table." : "tables.") << std::endl; std::cout << "Here is a list of the tables in the database."; for (size_t index = 0; index < all_tables.size(); ++index) { std::cout << " " << index + 1 << ": " << all_tables[index].GetName() << std::endl; } if (!all_tables.empty()) { int tableIndex = askQuestionForIntRange( "Enter an index to display the database detail ", 1, static_cast<int>(all_tables.size())); std::cout << all_tables[tableIndex - 1].Jsonize().View().WriteReadable() << std::endl; tableName = all_tables[tableIndex - 1].GetName(); } } // 7. Create a job. { Aws::Glue::Model::CreateJobRequest request; request.SetName(JOB_NAME); request.SetRole(roleArn); request.SetGlueVersion(GLUE_VERSION); Aws::Glue::Model::JobCommand command; command.SetName(JOB_COMMAND_NAME); command.SetPythonVersion(JOB_PYTHON_VERSION); command.SetScriptLocation( Aws::String("s3://") + bucketName + "/" + PYTHON_SCRIPT); request.SetCommand(command); Aws::Glue::Model::CreateJobOutcome outcome = client.CreateJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the job." << std::endl; } else { std::cerr << "Error creating the job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 8. Start a job run. { Aws::Glue::Model::StartJobRunRequest request; request.SetJobName(JOB_NAME); Aws::Map<Aws::String, Aws::String> arguments; arguments["--input_database"] = CRAWLER_DATABASE_NAME; arguments["--input_table"] = tableName; arguments["--output_bucket_url"] = Aws::String("s3://") + bucketName + "/"; request.SetArguments(arguments); Aws::Glue::Model::StartJobRunOutcome outcome = client.StartJobRun(request); if (outcome.IsSuccess()) { std::cout << "Successfully started the job." << std::endl; Aws::String jobRunId = outcome.GetResult().GetJobRunId(); int iterator = 0; bool done = false; while (!done) { ++iterator; std::this_thread::sleep_for(std::chrono::seconds(1)); Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(JOB_NAME); jobRunRequest.SetRunId(jobRunId); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { const Aws::Glue::Model::JobRun &jobRun = jobRunOutcome.GetResult().GetJobRun(); Aws::Glue::Model::JobRunState jobRunState = jobRun.GetJobRunState(); if ((jobRunState == Aws::Glue::Model::JobRunState::STOPPED) || (jobRunState == Aws::Glue::Model::JobRunState::FAILED) || (jobRunState == Aws::Glue::Model::JobRunState::TIMEOUT)) { std::cerr << "Error running job. " << jobRun.GetErrorMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } else if (jobRunState == Aws::Glue::Model::JobRunState::SUCCEEDED) { std::cout << "Job run succeeded after " << iterator << " seconds elapsed." << std::endl; done = true; } else if ((iterator % 10) == 0) { // Log status every 10 seconds. std::cout << "Job run status " << Aws::Glue::Model::JobRunStateMapper::GetNameForJobRunState( jobRunState) << ". " << iterator << " seconds elapsed." << std::endl; } } else { std::cerr << "Error retrieving job run state. " << jobRunOutcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } } else { std::cerr << "Error starting a job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 9. List the output data stored in the S3 bucket. { Aws::S3::S3Client s3Client; Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(bucketName); request.SetPrefix(OUTPUT_FILE_PREFIX); Aws::String continuationToken; // Used for pagination. std::vector<Aws::S3::Model::Object> allObjects; do { if (!continuationToken.empty()) { request.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome outcome = s3Client.ListObjectsV2( request); if (outcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = outcome.GetResult().GetContents(); allObjects.insert(allObjects.end(), objects.begin(), objects.end()); continuationToken = outcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << outcome.GetError().GetMessage() << std::endl; break; } } while (!continuationToken.empty()); std::cout << "Data from your job is in " << allObjects.size() << " files in the S3 bucket, " << bucketName << "." << std::endl; for (size_t i = 0; i < allObjects.size(); ++i) { std::cout << " " << i + 1 << ". " << allObjects[i].GetKey() << std::endl; } int objectIndex = askQuestionForIntRange( std::string( "Enter the number of a block to download it and see the first ") + std::to_string(LINES_OF_RUN_FILE_TO_DISPLAY) + " lines of JSON output in the block: ", 1, static_cast<int>(allObjects.size())); Aws::String objectKey = allObjects[objectIndex - 1].GetKey(); std::stringstream stringStream; if (getObjectFromBucket(bucketName, objectKey, stringStream, clientConfig)) { for (int i = 0; i < LINES_OF_RUN_FILE_TO_DISPLAY && stringStream; ++i) { std::string line; std::getline(stringStream, line); std::cout << " " << line << std::endl; } } else { deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 10. List all the jobs. Aws::String jobName; { Aws::Glue::Model::ListJobsRequest listJobsRequest; Aws::String nextToken; std::vector<Aws::String> allJobNames; do { if (!nextToken.empty()) { listJobsRequest.SetNextToken(nextToken); } Aws::Glue::Model::ListJobsOutcome listRunsOutcome = client.ListJobs( listJobsRequest); if (listRunsOutcome.IsSuccess()) { const std::vector<Aws::String> &jobNames = listRunsOutcome.GetResult().GetJobNames(); allJobNames.insert(allJobNames.end(), jobNames.begin(), jobNames.end()); nextToken = listRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error listing jobs. " << listRunsOutcome.GetError().GetMessage() << std::endl; } } while (!nextToken.empty()); std::cout << "Your account has " << allJobNames.size() << " jobs." << std::endl; for (size_t i = 0; i < allJobNames.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobNames[i] << std::endl; } int jobIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobNames.size()) + " to see the list of runs for a job: ", 1, static_cast<int>(allJobNames.size())); jobName = allJobNames[jobIndex - 1]; } // 11. Get the job runs for a job. Aws::String jobRunID; if (!jobName.empty()) { Aws::Glue::Model::GetJobRunsRequest getJobRunsRequest; getJobRunsRequest.SetJobName(jobName); Aws::String nextToken; // Used for pagination. std::vector<Aws::Glue::Model::JobRun> allJobRuns; do { if (!nextToken.empty()) { getJobRunsRequest.SetNextToken(nextToken); } Aws::Glue::Model::GetJobRunsOutcome jobRunsOutcome = client.GetJobRuns( getJobRunsRequest); if (jobRunsOutcome.IsSuccess()) { const std::vector<Aws::Glue::Model::JobRun> &jobRuns = jobRunsOutcome.GetResult().GetJobRuns(); allJobRuns.insert(allJobRuns.end(), jobRuns.begin(), jobRuns.end()); nextToken = jobRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting job runs. " << jobRunsOutcome.GetError().GetMessage() << std::endl; break; } } while (!nextToken.empty()); std::cout << "There are " << allJobRuns.size() << " runs in the job '" << jobName << "'." << std::endl; for (size_t i = 0; i < allJobRuns.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobRuns[i].GetJobName() << std::endl; } int runIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobRuns.size()) + " to see details for a run: ", 1, static_cast<int>(allJobRuns.size())); jobRunID = allJobRuns[runIndex - 1].GetId(); } // 12. Get a single job run. if (!jobRunID.empty()) { Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(jobName); jobRunRequest.SetRunId(jobRunID); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { std::cout << "Displaying the job run JSON description." << std::endl; std::cout << jobRunOutcome.GetResult().GetJobRun().Jsonize().View().WriteReadable() << std::endl; } else { std::cerr << "Error get a job run. " << jobRunOutcome.GetError().GetMessage() << std::endl; } } return deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); } //! Cleanup routine to delete created assets. /*! \\sa deleteAssets() \param crawler: Name of an AWS Glue crawler. \param database: The name of an AWS Glue database. \param job: The name of an AWS Glue job. \param bucketName: The name of an S3 bucket. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAssets(const Aws::String &crawler, const Aws::String &database, const Aws::String &job, const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { const Aws::Glue::GlueClient client(clientConfig); bool result = true; // 13. Delete a job. if (!job.empty()) { Aws::Glue::Model::DeleteJobRequest request; request.SetJobName(job); Aws::Glue::Model::DeleteJobOutcome outcome = client.DeleteJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the job." << std::endl; } else { std::cerr << "Error deleting the job. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 14. Delete a database. if (!database.empty()) { Aws::Glue::Model::DeleteDatabaseRequest request; request.SetName(database); Aws::Glue::Model::DeleteDatabaseOutcome outcome = client.DeleteDatabase( request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the database." << std::endl; } else { std::cerr << "Error deleting database. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 15. Delete a crawler. if (!crawler.empty()) { Aws::Glue::Model::DeleteCrawlerRequest request; request.SetName(crawler); Aws::Glue::Model::DeleteCrawlerOutcome outcome = client.DeleteCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the crawler." << std::endl; } else { std::cerr << "Error deleting the crawler. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 16. Delete the job script and run data from the S3 bucket. result &= AwsDoc::Glue::deleteAllObjectsInS3Bucket(bucketName, clientConfig); return result; } //! Routine which uploads a file to an S3 bucket. /*! \\sa uploadFile() \param bucketName: An S3 bucket created in the setup. \param filePath: The path of the file to upload. \param fileName The name for the uploaded file. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::uploadFile(const Aws::String &bucketName, const Aws::String &filePath, const Aws::String &fileName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client s3_client(clientConfig); Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucketName); request.SetKey(fileName); std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>("SampleAllocationTag", filePath.c_str(), std::ios_base::in | std::ios_base::binary); if (!*inputData) { std::cerr << "Error unable to read file " << filePath << std::endl; return false; } request.SetBody(inputData); Aws::S3::Model::PutObjectOutcome outcome = s3_client.PutObject(request); if (!outcome.IsSuccess()) { std::cerr << "Error: PutObject: " << outcome.GetError().GetMessage() << std::endl; } else { std::cout << "Added object '" << filePath << "' to bucket '" << bucketName << "'." << std::endl; } return outcome.IsSuccess(); } //! Routine which deletes all objects in an S3 bucket. /*! \\sa deleteAllObjectsInS3Bucket() \param bucketName: The S3 bucket name. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAllObjectsInS3Bucket(const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::ListObjectsV2Request listObjectsRequest; listObjectsRequest.SetBucket(bucketName); Aws::String continuationToken; // Used for pagination. bool result = true; do { if (!continuationToken.empty()) { listObjectsRequest.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome listObjectsOutcome = client.ListObjectsV2( listObjectsRequest); if (listObjectsOutcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = listObjectsOutcome.GetResult().GetContents(); if (!objects.empty()) { Aws::S3::Model::DeleteObjectsRequest deleteObjectsRequest; deleteObjectsRequest.SetBucket(bucketName); std::vector<Aws::S3::Model::ObjectIdentifier> objectIdentifiers; for (const Aws::S3::Model::Object &object: objects) { objectIdentifiers.push_back( Aws::S3::Model::ObjectIdentifier().WithKey( object.GetKey())); } Aws::S3::Model::Delete objectsDelete; objectsDelete.SetObjects(objectIdentifiers); objectsDelete.SetQuiet(true); deleteObjectsRequest.SetDelete(objectsDelete); Aws::S3::Model::DeleteObjectsOutcome deleteObjectsOutcome = client.DeleteObjects(deleteObjectsRequest); if (!deleteObjectsOutcome.IsSuccess()) { std::cerr << "Error deleting objects. " << deleteObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } else { std::cout << "Successfully deleted the objects." << std::endl; } } else { std::cout << "No objects to delete in '" << bucketName << "'." << std::endl; } continuationToken = listObjectsOutcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << listObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } } while (!continuationToken.empty()); return result; } //! Routine which retrieves an object from an S3 bucket. /*! \\sa getObjectFromBucket() \param bucketName: The S3 bucket name. \param objectKey: The object's name. \param objectStream: A stream to receive the retrieved data. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::getObjectFromBucket(const Aws::String &bucketName, const Aws::String &objectKey, std::ostream &objectStream, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::GetObjectRequest request; request.SetBucket(bucketName); request.SetKey(objectKey); Aws::S3::Model::GetObjectOutcome outcome = client.GetObject(request); if (outcome.IsSuccess()) { std::cout << "Successfully retrieved '" << objectKey << "'." << std::endl; auto &body = outcome.GetResult().GetBody(); objectStream << body.rdbuf(); } else { std::cerr << "Error retrieving object. " << outcome.GetError().GetMessage() << std::endl; } return outcome.IsSuccess(); }
-
有关 API 详细信息,请参阅《AWS SDK for C++ API 参考》中的以下主题。
-
- Java
-
- SDK for Java 2.x
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * <p> * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * To set up the resources, see this documentation topic: * * https://docs.aws.amazon.com/glue/latest/ug/tutorial-add-crawler.html * * This example performs the following tasks: * * 1. Create a database. * 2. Create a crawler. * 3. Get a crawler. * 4. Start a crawler. * 5. Get a database. * 6. Get tables. * 7. Create a job. * 8. Start a job run. * 9. List all jobs. * 10. Get job runs. * 11. Delete a job. * 12. Delete a database. * 13. Delete a crawler. */ public class GlueScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) throws InterruptedException { final String usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName> <scriptLocation> <locationUri> <bucketNameSc>\s Where: iam - The ARN of the IAM role that has AWS Glue and S3 permissions.\s s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, s3://<bucket name>/read). cron - A cron expression used to specify the schedule (i.e., cron(15 12 * * ? *). dbName - The database name.\s crawlerName - The name of the crawler.\s jobName - The name you assign to this job definition. scriptLocation - The Amazon S3 path to a script that runs a job. locationUri - The location of the database (you can find this file in resources folder). bucketNameSc - The Amazon S3 bucket name used when creating a job """; if (args.length != 9) { System.out.println(usage); return; } Scanner scanner = new Scanner(System.in); String iam = args[0]; String s3Path = args[1]; String cron = args[2]; String dbName = args[3]; String crawlerName = args[4]; String jobName = args[5]; String scriptLocation = args[6]; String locationUri = args[7]; String bucketNameSc = args[8]; Region region = Region.US_EAST_1; GlueClient glueClient = GlueClient.builder() .region(region) .build(); System.out.println(DASHES); System.out.println("Welcome to the AWS Glue scenario."); System.out.println(""" AWS Glue is a fully managed extract, transform, and load (ETL) service provided by Amazon Web Services (AWS). It is designed to simplify the process of building, running, and maintaining ETL pipelines, which are essential for data integration and data warehousing tasks. One of the key features of AWS Glue is its ability to automatically discover and catalog data stored in various sources, such as Amazon S3, Amazon RDS, Amazon Redshift, and other databases. This cataloging process creates a central metadata repository, known as the AWS Glue Data Catalog, which provides a unified view of an organization's data assets. This metadata can then be used to create ETL jobs, which can be scheduled and run on-demand or on a regular basis. Lets get started. """); waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create a database."); try { createDatabase(glueClient, dbName, locationUri); } catch (GlueException e) { if (e.awsErrorDetails().errorMessage().equals("Database already exists.")) { System.out.println("Database " + dbName + " already exists. Skipping creation."); } else { System.err.println(e.awsErrorDetails().errorMessage()); return; } } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Create a crawler."); try { createGlueCrawler(glueClient, iam, s3Path, cron, dbName, crawlerName); } catch (GlueException e) { if (e.awsErrorDetails().errorMessage().contains("already exists")) { System.out.println("Crawler " + crawlerName + " already exists. Skipping creation."); } else { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Get a crawler."); try { getSpecificCrawler(glueClient, crawlerName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Start a crawler."); try { startSpecificCrawler(glueClient, crawlerName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Get a database."); try { getSpecificDatabase(glueClient, dbName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("*** Wait 5 min for the tables to become available"); TimeUnit.MINUTES.sleep(5); System.out.println("6. Get tables."); String myTableName; try { myTableName = getGlueTables(glueClient, dbName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Create a job."); try { createJob(glueClient, jobName, iam, scriptLocation); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("8. Start a Job run."); try { startJob(glueClient, jobName, dbName, myTableName, bucketNameSc); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("9. List all jobs."); try { getAllJobs(glueClient); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("10. Get job runs."); try { getJobRuns(glueClient, jobName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("11. Delete a job."); try { deleteJob(glueClient, jobName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } System.out.println("*** Wait 5 MIN for the " + crawlerName + " to stop"); TimeUnit.MINUTES.sleep(5); waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("12. Delete a database."); try { deleteDatabase(glueClient, dbName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Delete a crawler."); try { deleteSpecificCrawler(glueClient, crawlerName); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Successfully completed the AWS Glue Scenario"); System.out.println(DASHES); } /** * Creates a Glue database with the specified name and location URI. * * @param glueClient The Glue client to use for the database creation. * @param dbName The name of the database to create. * @param locationUri The location URI for the database. */ public static void createDatabase(GlueClient glueClient, String dbName, String locationUri) { try { DatabaseInput input = DatabaseInput.builder() .description("Built with the AWS SDK for Java V2") .name(dbName) .locationUri(locationUri) .build(); CreateDatabaseRequest request = CreateDatabaseRequest.builder() .databaseInput(input) .build(); glueClient.createDatabase(request); System.out.println(dbName + " was successfully created"); } catch (GlueException e) { throw e; } } /** * Creates a new AWS Glue crawler using the AWS Glue Java API. * * @param glueClient the AWS Glue client used to interact with the AWS Glue service * @param iam the IAM role that the crawler will use to access the data source * @param s3Path the S3 path that the crawler will scan for data * @param cron the cron expression that defines the crawler's schedule * @param dbName the name of the AWS Glue database where the crawler will store the metadata * @param crawlerName the name of the crawler to be created */ public static void createGlueCrawler(GlueClient glueClient, String iam, String s3Path, String cron, String dbName, String crawlerName) { try { S3Target s3Target = S3Target.builder() .path(s3Path) .build(); List<S3Target> targetList = new ArrayList<>(); targetList.add(s3Target); CrawlerTargets targets = CrawlerTargets.builder() .s3Targets(targetList) .build(); CreateCrawlerRequest crawlerRequest = CreateCrawlerRequest.builder() .databaseName(dbName) .name(crawlerName) .description("Created by the AWS Glue Java API") .targets(targets) .role(iam) .schedule(cron) .build(); glueClient.createCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully created"); } catch (GlueException e) { throw e; } } /** * Retrieves a specific crawler from the AWS Glue service and waits for it to be in the "READY" state. * * @param glueClient the AWS Glue client used to interact with the Glue service * @param crawlerName the name of the crawler to be retrieved */ public static void getSpecificCrawler(GlueClient glueClient, String crawlerName) throws InterruptedException { try { GetCrawlerRequest crawlerRequest = GetCrawlerRequest.builder() .name(crawlerName) .build(); boolean ready = false; while (!ready) { GetCrawlerResponse response = glueClient.getCrawler(crawlerRequest); String status = response.crawler().stateAsString(); if (status.compareTo("READY") == 0) { ready = true; } Thread.sleep(3000); } System.out.println("The crawler is now ready"); } catch (GlueException | InterruptedException e) { throw e; } } /** * Starts a specific AWS Glue crawler. * * @param glueClient the AWS Glue client to use for the crawler operation * @param crawlerName the name of the crawler to start * @throws GlueException if there is an error starting the crawler */ public static void startSpecificCrawler(GlueClient glueClient, String crawlerName) { try { StartCrawlerRequest crawlerRequest = StartCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.startCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully started!"); } catch (GlueException e) { throw e; } } /** * Retrieves the specific database from the AWS Glue service. * * @param glueClient an instance of the AWS Glue client used to interact with the service * @param databaseName the name of the database to retrieve * @throws GlueException if there is an error retrieving the database from the AWS Glue service */ public static void getSpecificDatabase(GlueClient glueClient, String databaseName) { try { GetDatabaseRequest databasesRequest = GetDatabaseRequest.builder() .name(databaseName) .build(); GetDatabaseResponse response = glueClient.getDatabase(databasesRequest); Instant createDate = response.database().createTime(); // Convert the Instant to readable date. DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT) .withLocale(Locale.US) .withZone(ZoneId.systemDefault()); formatter.format(createDate); System.out.println("The create date of the database is " + createDate); } catch (GlueException e) { throw e; } } /** * Retrieves the names of the tables in the specified Glue database. * * @param glueClient the Glue client to use for the operation * @param dbName the name of the Glue database to retrieve the table names from * @return the name of the first table retrieved, or an empty string if no tables were found */ public static String getGlueTables(GlueClient glueClient, String dbName) { String myTableName = ""; try { GetTablesRequest tableRequest = GetTablesRequest.builder() .databaseName(dbName) .build(); GetTablesResponse response = glueClient.getTables(tableRequest); List<Table> tables = response.tableList(); if (tables.isEmpty()) { System.out.println("No tables were returned"); } else { for (Table table : tables) { myTableName = table.name(); System.out.println("Table name is: " + myTableName); } } } catch (GlueException e) { throw e; } return myTableName; } /** * Starts a job run in AWS Glue. * * @param glueClient the AWS Glue client to use for the job run * @param jobName the name of the Glue job to run * @param inputDatabase the name of the input database * @param inputTable the name of the input table * @param outBucket the URL of the output S3 bucket * @throws GlueException if there is an error starting the job run */ public static void startJob(GlueClient glueClient, String jobName, String inputDatabase, String inputTable, String outBucket) { try { Map<String, String> myMap = new HashMap<>(); myMap.put("--input_database", inputDatabase); myMap.put("--input_table", inputTable); myMap.put("--output_bucket_url", outBucket); StartJobRunRequest runRequest = StartJobRunRequest.builder() .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .arguments(myMap) .jobName(jobName) .build(); StartJobRunResponse response = glueClient.startJobRun(runRequest); System.out.println("The request Id of the job is " + response.responseMetadata().requestId()); } catch (GlueException e) { throw e; } } /** * Creates a new AWS Glue job. * * @param glueClient the AWS Glue client to use for the operation * @param jobName the name of the job to create * @param iam the IAM role to associate with the job * @param scriptLocation the location of the script to be used by the job * @throws GlueException if there is an error creating the job */ public static void createJob(GlueClient glueClient, String jobName, String iam, String scriptLocation) { try { JobCommand command = JobCommand.builder() .pythonVersion("3") .name("glueetl") .scriptLocation(scriptLocation) .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .description("A Job created by using the AWS SDK for Java V2") .glueVersion("2.0") .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .name(jobName) .role(iam) .command(command) .build(); glueClient.createJob(jobRequest); System.out.println(jobName + " was successfully created."); } catch (GlueException e) { throw e; } } /** * Retrieves and prints information about all the jobs in the Glue data catalog. * * @param glueClient the Glue client used to interact with the AWS Glue service */ public static void getAllJobs(GlueClient glueClient) { try { GetJobsRequest jobsRequest = GetJobsRequest.builder() .maxResults(10) .build(); GetJobsResponse jobsResponse = glueClient.getJobs(jobsRequest); List<Job> jobs = jobsResponse.jobs(); for (Job job : jobs) { System.out.println("Job name is : " + job.name()); System.out.println("The job worker type is : " + job.workerType().name()); } } catch (GlueException e) { throw e; } } /** * Retrieves the job runs for a given Glue job and prints the status of the job runs. * * @param glueClient the Glue client used to make API calls * @param jobName the name of the Glue job to retrieve the job runs for */ public static void getJobRuns(GlueClient glueClient, String jobName) { try { GetJobRunsRequest runsRequest = GetJobRunsRequest.builder() .jobName(jobName) .maxResults(20) .build(); boolean jobDone = false; while (!jobDone) { GetJobRunsResponse response = glueClient.getJobRuns(runsRequest); List<JobRun> jobRuns = response.jobRuns(); for (JobRun jobRun : jobRuns) { String jobState = jobRun.jobRunState().name(); if (jobState.compareTo("SUCCEEDED") == 0) { System.out.println(jobName + " has succeeded"); jobDone = true; } else if (jobState.compareTo("STOPPED") == 0) { System.out.println("Job run has stopped"); jobDone = true; } else if (jobState.compareTo("FAILED") == 0) { System.out.println("Job run has failed"); jobDone = true; } else if (jobState.compareTo("TIMEOUT") == 0) { System.out.println("Job run has timed out"); jobDone = true; } else { System.out.println("*** Job run state is " + jobRun.jobRunState().name()); System.out.println("Job run Id is " + jobRun.id()); System.out.println("The Glue version is " + jobRun.glueVersion()); } TimeUnit.SECONDS.sleep(5); } } } catch (GlueException e) { throw e; } catch (InterruptedException e) { throw new RuntimeException(e); } } /** * Deletes a Glue job. * * @param glueClient the Glue client to use for the operation * @param jobName the name of the job to be deleted * @throws GlueException if there is an error deleting the job */ public static void deleteJob(GlueClient glueClient, String jobName) { try { DeleteJobRequest jobRequest = DeleteJobRequest.builder() .jobName(jobName) .build(); glueClient.deleteJob(jobRequest); System.out.println(jobName + " was successfully deleted"); } catch (GlueException e) { throw e; } } /** * Deletes a AWS Glue Database. * * @param glueClient An instance of the AWS Glue client used to interact with the AWS Glue service. * @param databaseName The name of the database to be deleted. * @throws GlueException If an error occurs while deleting the database. */ public static void deleteDatabase(GlueClient glueClient, String databaseName) { try { DeleteDatabaseRequest request = DeleteDatabaseRequest.builder() .name(databaseName) .build(); glueClient.deleteDatabase(request); System.out.println(databaseName + " was successfully deleted"); } catch (GlueException e) { throw e; } } /** * Deletes a specific AWS Glue crawler. * * @param glueClient the AWS Glue client object * @param crawlerName the name of the crawler to be deleted * @throws GlueException if an error occurs during the deletion process */ public static void deleteSpecificCrawler(GlueClient glueClient, String crawlerName) { try { DeleteCrawlerRequest deleteCrawlerRequest = DeleteCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.deleteCrawler(deleteCrawlerRequest); System.out.println(crawlerName + " was deleted"); } catch (GlueException e) { throw e; } } private static void waitForInputToContinue(Scanner scanner) { while (true) { System.out.println(""); System.out.println("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { System.out.println("Continuing with the program..."); System.out.println(""); break; } else { // Handle invalid input. System.out.println("Invalid input. Please try again."); } } } }
-
有关 API 详细信息,请参阅《AWS SDK for Java 2.x API 参考》中的以下主题。
-
- JavaScript
-
- SDK for JavaScript (v3)
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 创建并运行爬网程序,爬取公共 Amazon Simple Storage Service(Amazon S3)存储桶并生成一个描述其找到的 CSV 格式数据的元数据数据库。
const createCrawler = (name, role, dbName, tablePrefix, s3TargetPath) => { const client = new GlueClient({}); const command = new CreateCrawlerCommand({ Name: name, Role: role, DatabaseName: dbName, TablePrefix: tablePrefix, Targets: { S3Targets: [{ Path: s3TargetPath }], }, }); return client.send(command); }; const getCrawler = (name) => { const client = new GlueClient({}); const command = new GetCrawlerCommand({ Name: name, }); return client.send(command); }; const startCrawler = (name) => { const client = new GlueClient({}); const command = new StartCrawlerCommand({ Name: name, }); return client.send(command); }; const crawlerExists = async ({ getCrawler }, crawlerName) => { try { await getCrawler(crawlerName); return true; } catch { return false; } }; /** * @param {{ createCrawler: import('../../../actions/create-crawler.js').createCrawler}} actions */ const makeCreateCrawlerStep = (actions) => async (context) => { if (await crawlerExists(actions, process.env.CRAWLER_NAME)) { log("Crawler already exists. Skipping creation."); } else { await actions.createCrawler( process.env.CRAWLER_NAME, process.env.ROLE_NAME, process.env.DATABASE_NAME, process.env.TABLE_PREFIX, process.env.S3_TARGET_PATH, ); log("Crawler created successfully.", { type: "success" }); } return { ...context }; }; /** * @param {(name: string) => Promise<import('@aws-sdk/client-glue').GetCrawlerCommandOutput>} getCrawler * @param {string} crawlerName */ const waitForCrawler = async (getCrawler, crawlerName) => { const waitTimeInSeconds = 30; const { Crawler } = await getCrawler(crawlerName); if (!Crawler) { throw new Error(`Crawler with name ${crawlerName} not found.`); } if (Crawler.State === "READY") { return; } log(`Crawler is ${Crawler.State}. Waiting ${waitTimeInSeconds} seconds...`); await wait(waitTimeInSeconds); return waitForCrawler(getCrawler, crawlerName); }; const makeStartCrawlerStep = ({ startCrawler, getCrawler }) => async (context) => { log("Starting crawler."); await startCrawler(process.env.CRAWLER_NAME); log("Crawler started.", { type: "success" }); log("Waiting for crawler to finish running. This can take a while."); await waitForCrawler(getCrawler, process.env.CRAWLER_NAME); log("Crawler ready.", { type: "success" }); return { ...context }; };
在 AWS Glue Data Catalog 中列出有关数据库和表的信息。
const getDatabase = (name) => { const client = new GlueClient({}); const command = new GetDatabaseCommand({ Name: name, }); return client.send(command); }; const getTables = (databaseName) => { const client = new GlueClient({}); const command = new GetTablesCommand({ DatabaseName: databaseName, }); return client.send(command); }; const makeGetDatabaseStep = ({ getDatabase }) => async (context) => { const { Database: { Name }, } = await getDatabase(process.env.DATABASE_NAME); log(`Database: ${Name}`); return { ...context }; }; /** * @param {{ getTables: () => Promise<import('@aws-sdk/client-glue').GetTablesCommandOutput}} config */ const makeGetTablesStep = ({ getTables }) => async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME); log("Tables:"); log(TableList.map((table) => ` • ${table.Name}\n`)); return { ...context }; };
创建并运行任务,从源 Amazon S3 存储桶提取 CSV 数据,通过删除和重命名字段对其进行转换,然后将 JSON 格式的输出加载到另一个 Amazon S3 存储桶中。
const createJob = (name, role, scriptBucketName, scriptKey) => { const client = new GlueClient({}); const command = new CreateJobCommand({ Name: name, Role: role, Command: { Name: "glueetl", PythonVersion: "3", ScriptLocation: `s3://${scriptBucketName}/${scriptKey}`, }, GlueVersion: "3.0", }); return client.send(command); }; const startJobRun = (jobName, dbName, tableName, bucketName) => { const client = new GlueClient({}); const command = new StartJobRunCommand({ JobName: jobName, Arguments: { "--input_database": dbName, "--input_table": tableName, "--output_bucket_url": `s3://${bucketName}/`, }, }); return client.send(command); }; const makeCreateJobStep = ({ createJob }) => async (context) => { log("Creating Job."); await createJob( process.env.JOB_NAME, process.env.ROLE_NAME, process.env.BUCKET_NAME, process.env.PYTHON_SCRIPT_KEY, ); log("Job created.", { type: "success" }); return { ...context }; }; /** * @param {(name: string, runId: string) => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput> } getJobRun * @param {string} jobName * @param {string} jobRunId */ const waitForJobRun = async (getJobRun, jobName, jobRunId) => { const waitTimeInSeconds = 30; const { JobRun } = await getJobRun(jobName, jobRunId); if (!JobRun) { throw new Error(`Job run with id ${jobRunId} not found.`); } switch (JobRun.JobRunState) { case "FAILED": case "TIMEOUT": case "STOPPED": throw new Error( `Job ${JobRun.JobRunState}. Error: ${JobRun.ErrorMessage}`, ); case "RUNNING": break; case "SUCCEEDED": return; default: throw new Error(`Unknown job run state: ${JobRun.JobRunState}`); } log( `Job ${JobRun.JobRunState}. Waiting ${waitTimeInSeconds} more seconds...`, ); await wait(waitTimeInSeconds); return waitForJobRun(getJobRun, jobName, jobRunId); }; /** * @param {{ prompter: { prompt: () => Promise<{ shouldOpen: boolean }>} }} context */ const promptToOpen = async (context) => { const { shouldOpen } = await context.prompter.prompt({ name: "shouldOpen", type: "confirm", message: "Open the output bucket in your browser?", }); if (shouldOpen) { return open( `https://s3.console.aws.amazon.com/s3/buckets/${process.env.BUCKET_NAME} to view the output.`, ); } }; const makeStartJobRunStep = ({ startJobRun, getJobRun }) => async (context) => { log("Starting job."); const { JobRunId } = await startJobRun( process.env.JOB_NAME, process.env.DATABASE_NAME, process.env.TABLE_NAME, process.env.BUCKET_NAME, ); log("Job started.", { type: "success" }); log("Waiting for job to finish running. This can take a while."); await waitForJobRun(getJobRun, process.env.JOB_NAME, JobRunId); log("Job run succeeded.", { type: "success" }); await promptToOpen(context); return { ...context }; };
列出有关任务运行的信息,并查看一些转换后的数据。
const getJobRuns = (jobName) => { const client = new GlueClient({}); const command = new GetJobRunsCommand({ JobName: jobName, }); return client.send(command); }; const getJobRun = (jobName, jobRunId) => { const client = new GlueClient({}); const command = new GetJobRunCommand({ JobName: jobName, RunId: jobRunId, }); return client.send(command); }; /** * @typedef {{ prompter: { prompt: () => Promise<{jobName: string}> } }} Context */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput>} getJobRun */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunsCommandOutput} getJobRuns */ /** * * @param {getJobRun} getJobRun * @param {string} jobName * @param {string} jobRunId */ const logJobRunDetails = async (getJobRun, jobName, jobRunId) => { const { JobRun } = await getJobRun(jobName, jobRunId); log(JobRun, { type: "object" }); }; /** * * @param {{getJobRuns: getJobRuns, getJobRun: getJobRun }} funcs */ const makePickJobRunStep = ({ getJobRuns, getJobRun }) => async (/** @type { Context } */ context) => { if (context.selectedJobName) { const { JobRuns } = await getJobRuns(context.selectedJobName); const { jobRunId } = await context.prompter.prompt({ name: "jobRunId", type: "list", message: "Select a job run to see details.", choices: JobRuns.map((run) => run.Id), }); logJobRunDetails(getJobRun, context.selectedJobName, jobRunId); } return { ...context }; };
删除演示创建的所有资源。
const deleteJob = (jobName) => { const client = new GlueClient({}); const command = new DeleteJobCommand({ JobName: jobName, }); return client.send(command); }; const deleteTable = (databaseName, tableName) => { const client = new GlueClient({}); const command = new DeleteTableCommand({ DatabaseName: databaseName, Name: tableName, }); return client.send(command); }; const deleteDatabase = (databaseName) => { const client = new GlueClient({}); const command = new DeleteDatabaseCommand({ Name: databaseName, }); return client.send(command); }; const deleteCrawler = (crawlerName) => { const client = new GlueClient({}); const command = new DeleteCrawlerCommand({ Name: crawlerName, }); return client.send(command); }; /** * * @param {import('../../../actions/delete-job.js').deleteJob} deleteJobFn * @param {string[]} jobNames * @param {{ prompter: { prompt: () => Promise<any> }}} context */ const handleDeleteJobs = async (deleteJobFn, jobNames, context) => { /** * @type {{ selectedJobNames: string[] }} */ const { selectedJobNames } = await context.prompter.prompt({ name: "selectedJobNames", type: "checkbox", message: "Let's clean up jobs. Select jobs to delete.", choices: jobNames, }); if (selectedJobNames.length === 0) { log("No jobs selected."); } else { log("Deleting jobs."); await Promise.all( selectedJobNames.map((n) => deleteJobFn(n).catch(console.error)), ); log("Jobs deleted.", { type: "success" }); } }; /** * @param {{ * listJobs: import('../../../actions/list-jobs.js').listJobs, * deleteJob: import('../../../actions/delete-job.js').deleteJob * }} config */ const makeCleanUpJobsStep = ({ listJobs, deleteJob }) => async (context) => { const { JobNames } = await listJobs(); if (JobNames.length > 0) { await handleDeleteJobs(deleteJob, JobNames, context); } return { ...context }; }; /** * @param {import('../../../actions/delete-table.js').deleteTable} deleteTable * @param {string} databaseName * @param {string[]} tableNames */ const deleteTables = (deleteTable, databaseName, tableNames) => Promise.all( tableNames.map((tableName) => deleteTable(databaseName, tableName).catch(console.error), ), ); /** * @param {{ * getTables: import('../../../actions/get-tables.js').getTables, * deleteTable: import('../../../actions/delete-table.js').deleteTable * }} config */ const makeCleanUpTablesStep = ({ getTables, deleteTable }) => /** * @param {{ prompter: { prompt: () => Promise<any>}}} context */ async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME).catch( () => ({ TableList: null }), ); if (TableList && TableList.length > 0) { /** * @type {{ tableNames: string[] }} */ const { tableNames } = await context.prompter.prompt({ name: "tableNames", type: "checkbox", message: "Let's clean up tables. Select tables to delete.", choices: TableList.map((t) => t.Name), }); if (tableNames.length === 0) { log("No tables selected."); } else { log("Deleting tables."); await deleteTables(deleteTable, process.env.DATABASE_NAME, tableNames); log("Tables deleted.", { type: "success" }); } } return { ...context }; }; /** * @param {import('../../../actions/delete-database.js').deleteDatabase} deleteDatabase * @param {string[]} databaseNames */ const deleteDatabases = (deleteDatabase, databaseNames) => Promise.all( databaseNames.map((dbName) => deleteDatabase(dbName).catch(console.error)), ); /** * @param {{ * getDatabases: import('../../../actions/get-databases.js').getDatabases * deleteDatabase: import('../../../actions/delete-database.js').deleteDatabase * }} config */ const makeCleanUpDatabasesStep = ({ getDatabases, deleteDatabase }) => /** * @param {{ prompter: { prompt: () => Promise<any>}} context */ async (context) => { const { DatabaseList } = await getDatabases(); if (DatabaseList.length > 0) { /** @type {{ dbNames: string[] }} */ const { dbNames } = await context.prompter.prompt({ name: "dbNames", type: "checkbox", message: "Let's clean up databases. Select databases to delete.", choices: DatabaseList.map((db) => db.Name), }); if (dbNames.length === 0) { log("No databases selected."); } else { log("Deleting databases."); await deleteDatabases(deleteDatabase, dbNames); log("Databases deleted.", { type: "success" }); } } return { ...context }; }; const cleanUpCrawlerStep = async (context) => { log("Deleting crawler."); try { await deleteCrawler(process.env.CRAWLER_NAME); log("Crawler deleted.", { type: "success" }); } catch (err) { if (err.name === "EntityNotFoundException") { log("Crawler is already deleted."); } else { throw err; } } return { ...context }; };
-
有关 API 详细信息,请参阅《AWS SDK for JavaScript API 参考》中的以下主题。
-
- Kotlin
-
- 适用于 Kotlin 的 SDK
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 suspend fun main(args: Array<String>) { val usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName> <scriptLocation> <locationUri> Where: iam - The Amazon Resource Name (ARN) of the AWS Identity and Access Management (IAM) role that has AWS Glue and Amazon Simple Storage Service (Amazon S3) permissions. s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, CSV data). cron - A cron expression used to specify the schedule (for example, cron(15 12 * * ? *). dbName - The database name. crawlerName - The name of the crawler. jobName - The name you assign to this job definition. scriptLocation - Specifies the Amazon S3 path to a script that runs a job. locationUri - Specifies the location of the database """ if (args.size != 8) { println(usage) exitProcess(1) } val iam = args[0] val s3Path = args[1] val cron = args[2] val dbName = args[3] val crawlerName = args[4] val jobName = args[5] val scriptLocation = args[6] val locationUri = args[7] println("About to start the AWS Glue Scenario") createDatabase(dbName, locationUri) createCrawler(iam, s3Path, cron, dbName, crawlerName) getCrawler(crawlerName) startCrawler(crawlerName) getDatabase(dbName) getGlueTables(dbName) createJob(jobName, iam, scriptLocation) startJob(jobName) getJobs() getJobRuns(jobName) deleteJob(jobName) println("*** Wait for 5 MIN so the $crawlerName is ready to be deleted") TimeUnit.MINUTES.sleep(5) deleteMyDatabase(dbName) deleteCrawler(crawlerName) } suspend fun createDatabase( dbName: String?, locationUriVal: String?, ) { val input = DatabaseInput { description = "Built with the AWS SDK for Kotlin" name = dbName locationUri = locationUriVal } val request = CreateDatabaseRequest { databaseInput = input } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createDatabase(request) println("The database was successfully created") } } suspend fun createCrawler( iam: String?, s3Path: String?, cron: String?, dbName: String?, crawlerName: String, ) { val s3Target = S3Target { path = s3Path } val targetList = ArrayList<S3Target>() targetList.add(s3Target) val targetOb = CrawlerTargets { s3Targets = targetList } val crawlerRequest = CreateCrawlerRequest { databaseName = dbName name = crawlerName description = "Created by the AWS Glue Java API" targets = targetOb role = iam schedule = cron } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createCrawler(crawlerRequest) println("$crawlerName was successfully created") } } suspend fun getCrawler(crawlerName: String?) { val request = GetCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getCrawler(request) val role = response.crawler?.role println("The role associated with this crawler is $role") } } suspend fun startCrawler(crawlerName: String) { val crawlerRequest = StartCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.startCrawler(crawlerRequest) println("$crawlerName was successfully started.") } } suspend fun getDatabase(databaseName: String?) { val request = GetDatabaseRequest { name = databaseName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getDatabase(request) val dbDesc = response.database?.description println("The database description is $dbDesc") } } suspend fun getGlueTables(dbName: String?) { val tableRequest = GetTablesRequest { databaseName = dbName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getTables(tableRequest) response.tableList?.forEach { tableName -> println("Table name is ${tableName.name}") } } } suspend fun startJob(jobNameVal: String?) { val runRequest = StartJobRunRequest { workerType = WorkerType.G1X numberOfWorkers = 10 jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.startJobRun(runRequest) println("The job run Id is ${response.jobRunId}") } } suspend fun createJob( jobName: String, iam: String?, scriptLocationVal: String?, ) { val commandOb = JobCommand { pythonVersion = "3" name = "MyJob1" scriptLocation = scriptLocationVal } val jobRequest = CreateJobRequest { description = "A Job created by using the AWS SDK for Java V2" glueVersion = "2.0" workerType = WorkerType.G1X numberOfWorkers = 10 name = jobName role = iam command = commandOb } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createJob(jobRequest) println("$jobName was successfully created.") } } suspend fun getJobs() { val request = GetJobsRequest { maxResults = 10 } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobs(request) response.jobs?.forEach { job -> println("Job name is ${job.name}") } } } suspend fun getJobRuns(jobNameVal: String?) { val request = GetJobRunsRequest { jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobRuns(request) response.jobRuns?.forEach { job -> println("Job name is ${job.jobName}") } } } suspend fun deleteJob(jobNameVal: String) { val jobRequest = DeleteJobRequest { jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteJob(jobRequest) println("$jobNameVal was successfully deleted") } } suspend fun deleteMyDatabase(databaseName: String) { val request = DeleteDatabaseRequest { name = databaseName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteDatabase(request) println("$databaseName was successfully deleted") } } suspend fun deleteCrawler(crawlerName: String) { val request = DeleteCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteCrawler(request) println("$crawlerName was deleted") } }
-
有关 API 详细信息,请参阅《AWS SDK for Kotlin API 参考》中的以下主题。
-
- PHP
-
- 适用于 PHP 的 SDK
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 namespace Glue; use Aws\Glue\GlueClient; use Aws\S3\S3Client; use AwsUtilities\AWSServiceClass; use GuzzleHttp\Psr7\Stream; use Iam\IAMService; class GettingStartedWithGlue { public function run() { echo("\n"); echo("--------------------------------------\n"); print("Welcome to the AWS Glue getting started demo using PHP!\n"); echo("--------------------------------------\n"); $clientArgs = [ 'region' => 'us-west-2', 'version' => 'latest', 'profile' => 'default', ]; $uniqid = uniqid(); $glueClient = new GlueClient($clientArgs); $glueService = new GlueService($glueClient); $iamService = new IAMService(); $crawlerName = "example-crawler-test-" . $uniqid; AWSServiceClass::$waitTime = 5; AWSServiceClass::$maxWaitAttempts = 20; $role = $iamService->getRole("AWSGlueServiceRole-DocExample"); $databaseName = "doc-example-database-$uniqid"; $path = 's3://crawler-public-us-east-1/flight/2016/csv'; $glueService->createCrawler($crawlerName, $role['Role']['Arn'], $databaseName, $path); $glueService->startCrawler($crawlerName); echo "Waiting for crawler"; do { $crawler = $glueService->getCrawler($crawlerName); echo "."; sleep(10); } while ($crawler['Crawler']['State'] != "READY"); echo "\n"; $database = $glueService->getDatabase($databaseName); echo "Found a database named " . $database['Database']['Name'] . "\n"; //Upload job script $s3client = new S3Client($clientArgs); $bucketName = "test-glue-bucket-" . $uniqid; $s3client->createBucket([ 'Bucket' => $bucketName, 'CreateBucketConfiguration' => ['LocationConstraint' => 'us-west-2'], ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'run_job.py', 'SourceFile' => __DIR__ . '/flight_etl_job_script.py' ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'setup_scenario_getting_started.yaml', 'SourceFile' => __DIR__ . '/setup_scenario_getting_started.yaml' ]); $tables = $glueService->getTables($databaseName); $jobName = 'test-job-' . $uniqid; $scriptLocation = "s3://$bucketName/run_job.py"; $job = $glueService->createJob($jobName, $role['Role']['Arn'], $scriptLocation); $outputBucketUrl = "s3://$bucketName"; $runId = $glueService->startJobRun($jobName, $databaseName, $tables, $outputBucketUrl)['JobRunId']; echo "waiting for job"; do { $jobRun = $glueService->getJobRun($jobName, $runId); echo "."; sleep(10); } while (!array_intersect([$jobRun['JobRun']['JobRunState']], ['SUCCEEDED', 'STOPPED', 'FAILED', 'TIMEOUT'])); echo "\n"; $jobRuns = $glueService->getJobRuns($jobName); $objects = $s3client->listObjects([ 'Bucket' => $bucketName, ])['Contents']; foreach ($objects as $object) { echo $object['Key'] . "\n"; } echo "Downloading " . $objects[1]['Key'] . "\n"; /** @var Stream $downloadObject */ $downloadObject = $s3client->getObject([ 'Bucket' => $bucketName, 'Key' => $objects[1]['Key'], ])['Body']->getContents(); echo "Here is the first 1000 characters in the object."; echo substr($downloadObject, 0, 1000); $jobs = $glueService->listJobs(); echo "Current jobs:\n"; foreach ($jobs['JobNames'] as $jobsName) { echo "{$jobsName}\n"; } echo "Delete the job.\n"; $glueClient->deleteJob([ 'JobName' => $job['Name'], ]); echo "Delete the tables.\n"; foreach ($tables['TableList'] as $table) { $glueService->deleteTable($table['Name'], $databaseName); } echo "Delete the databases.\n"; $glueClient->deleteDatabase([ 'Name' => $databaseName, ]); echo "Delete the crawler.\n"; $glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); $deleteObjects = $s3client->listObjectsV2([ 'Bucket' => $bucketName, ]); echo "Delete all objects in the bucket.\n"; $deleteObjects = $s3client->deleteObjects([ 'Bucket' => $bucketName, 'Delete' => [ 'Objects' => $deleteObjects['Contents'], ] ]); echo "Delete the bucket.\n"; $s3client->deleteBucket(['Bucket' => $bucketName]); echo "This job was brought to you by the number $uniqid\n"; } } namespace Glue; use Aws\Glue\GlueClient; use Aws\Result; use function PHPUnit\Framework\isEmpty; class GlueService extends \AwsUtilities\AWSServiceClass { protected GlueClient $glueClient; public function __construct($glueClient) { $this->glueClient = $glueClient; } public function getCrawler($crawlerName) { return $this->customWaiter(function () use ($crawlerName) { return $this->glueClient->getCrawler([ 'Name' => $crawlerName, ]); }); } public function createCrawler($crawlerName, $role, $databaseName, $path): Result { return $this->customWaiter(function () use ($crawlerName, $role, $databaseName, $path) { return $this->glueClient->createCrawler([ 'Name' => $crawlerName, 'Role' => $role, 'DatabaseName' => $databaseName, 'Targets' => [ 'S3Targets' => [[ 'Path' => $path, ]] ], ]); }); } public function startCrawler($crawlerName): Result { return $this->glueClient->startCrawler([ 'Name' => $crawlerName, ]); } public function getDatabase(string $databaseName): Result { return $this->customWaiter(function () use ($databaseName) { return $this->glueClient->getDatabase([ 'Name' => $databaseName, ]); }); } public function getTables($databaseName): Result { return $this->glueClient->getTables([ 'DatabaseName' => $databaseName, ]); } public function createJob($jobName, $role, $scriptLocation, $pythonVersion = '3', $glueVersion = '3.0'): Result { return $this->glueClient->createJob([ 'Name' => $jobName, 'Role' => $role, 'Command' => [ 'Name' => 'glueetl', 'ScriptLocation' => $scriptLocation, 'PythonVersion' => $pythonVersion, ], 'GlueVersion' => $glueVersion, ]); } public function startJobRun($jobName, $databaseName, $tables, $outputBucketUrl): Result { return $this->glueClient->startJobRun([ 'JobName' => $jobName, 'Arguments' => [ 'input_database' => $databaseName, 'input_table' => $tables['TableList'][0]['Name'], 'output_bucket_url' => $outputBucketUrl, '--input_database' => $databaseName, '--input_table' => $tables['TableList'][0]['Name'], '--output_bucket_url' => $outputBucketUrl, ], ]); } public function listJobs($maxResults = null, $nextToken = null, $tags = []): Result { $arguments = []; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } if (!empty($tags)) { $arguments['Tags'] = $tags; } return $this->glueClient->listJobs($arguments); } public function getJobRuns($jobName, $maxResults = 0, $nextToken = ''): Result { $arguments = ['JobName' => $jobName]; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } return $this->glueClient->getJobRuns($arguments); } public function getJobRun($jobName, $runId, $predecessorsIncluded = false): Result { return $this->glueClient->getJobRun([ 'JobName' => $jobName, 'RunId' => $runId, 'PredecessorsIncluded' => $predecessorsIncluded, ]); } public function deleteJob($jobName) { return $this->glueClient->deleteJob([ 'JobName' => $jobName, ]); } public function deleteTable($tableName, $databaseName) { return $this->glueClient->deleteTable([ 'DatabaseName' => $databaseName, 'Name' => $tableName, ]); } public function deleteDatabase($databaseName) { return $this->glueClient->deleteDatabase([ 'Name' => $databaseName, ]); } public function deleteCrawler($crawlerName) { return $this->glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); } }
-
有关 API 的详细信息,请参阅 AWS SDK for PHP API 参考中的以下主题。
-
- Python
-
- SDK for Python (Boto3)
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 创建一个包含场景中使用的 AWS Glue 函数的类。
class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_crawler(self, name): """ Gets information about a crawler. :param name: The name of the crawler to look up. :return: Data about the crawler. """ crawler = None try: response = self.glue_client.get_crawler(Name=name) crawler = response["Crawler"] except ClientError as err: if err.response["Error"]["Code"] == "EntityNotFoundException": logger.info("Crawler %s doesn't exist.", name) else: logger.error( "Couldn't get crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return crawler def create_crawler(self, name, role_arn, db_name, db_prefix, s3_target): """ Creates a crawler that can crawl the specified target and populate a database in your AWS Glue Data Catalog with metadata that describes the data in the target. :param name: The name of the crawler. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param db_name: The name to give the database that is created by the crawler. :param db_prefix: The prefix to give any database tables that are created by the crawler. :param s3_target: The URL to an S3 bucket that contains data that is the target of the crawler. """ try: self.glue_client.create_crawler( Name=name, Role=role_arn, DatabaseName=db_name, TablePrefix=db_prefix, Targets={"S3Targets": [{"Path": s3_target}]}, ) except ClientError as err: logger.error( "Couldn't create crawler. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_crawler(self, name): """ Starts a crawler. The crawler crawls its configured target and creates metadata that describes the data it finds in the target data source. :param name: The name of the crawler to start. """ try: self.glue_client.start_crawler(Name=name) except ClientError as err: logger.error( "Couldn't start crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_database(self, name): """ Gets information about a database in your Data Catalog. :param name: The name of the database to look up. :return: Information about the database. """ try: response = self.glue_client.get_database(Name=name) except ClientError as err: logger.error( "Couldn't get database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Database"] def get_tables(self, db_name): """ Gets a list of tables in a Data Catalog database. :param db_name: The name of the database to query. :return: The list of tables in the database. """ try: response = self.glue_client.get_tables(DatabaseName=db_name) except ClientError as err: logger.error( "Couldn't get tables %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["TableList"] def create_job(self, name, description, role_arn, script_location): """ Creates a job definition for an extract, transform, and load (ETL) job that can be run by AWS Glue. :param name: The name of the job definition. :param description: The description of the job definition. :param role_arn: The ARN of an IAM role that grants AWS Glue the permissions it requires to run the job. :param script_location: The Amazon S3 URL of a Python ETL script that is run as part of the job. The script defines how the data is transformed. """ try: self.glue_client.create_job( Name=name, Description=description, Role=role_arn, Command={ "Name": "glueetl", "ScriptLocation": script_location, "PythonVersion": "3", }, GlueVersion="3.0", ) except ClientError as err: logger.error( "Couldn't create job %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_job_run(self, name, input_database, input_table, output_bucket_name): """ Starts a job run. A job run extracts data from the source, transforms it, and loads it to the output bucket. :param name: The name of the job definition. :param input_database: The name of the metadata database that contains tables that describe the source data. This is typically created by a crawler. :param input_table: The name of the table in the metadata database that describes the source data. :param output_bucket_name: The S3 bucket where the output is written. :return: The ID of the job run. """ try: # The custom Arguments that are passed to this function are used by the # Python ETL script to determine the location of input and output data. response = self.glue_client.start_job_run( JobName=name, Arguments={ "--input_database": input_database, "--input_table": input_table, "--output_bucket_url": f"s3://{output_bucket_name}/", }, ) except ClientError as err: logger.error( "Couldn't start job run %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRunId"] def list_jobs(self): """ Lists the names of job definitions in your account. :return: The list of job definition names. """ try: response = self.glue_client.list_jobs() except ClientError as err: logger.error( "Couldn't list jobs. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobNames"] def get_job_runs(self, job_name): """ Gets information about runs that have been performed for a specific job definition. :param job_name: The name of the job definition to look up. :return: The list of job runs. """ try: response = self.glue_client.get_job_runs(JobName=job_name) except ClientError as err: logger.error( "Couldn't get job runs for %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRuns"] def get_job_run(self, name, run_id): """ Gets information about a single job run. :param name: The name of the job definition for the run. :param run_id: The ID of the run. :return: Information about the run. """ try: response = self.glue_client.get_job_run(JobName=name, RunId=run_id) except ClientError as err: logger.error( "Couldn't get job run %s/%s. Here's why: %s: %s", name, run_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRun"] def delete_job(self, job_name): """ Deletes a job definition. This also deletes data about all runs that are associated with this job definition. :param job_name: The name of the job definition to delete. """ try: self.glue_client.delete_job(JobName=job_name) except ClientError as err: logger.error( "Couldn't delete job %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_table(self, db_name, table_name): """ Deletes a table from a metadata database. :param db_name: The name of the database that contains the table. :param table_name: The name of the table to delete. """ try: self.glue_client.delete_table(DatabaseName=db_name, Name=table_name) except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_database(self, name): """ Deletes a metadata database from your Data Catalog. :param name: The name of the database to delete. """ try: self.glue_client.delete_database(Name=name) except ClientError as err: logger.error( "Couldn't delete database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_crawler(self, name): """ Deletes a crawler. :param name: The name of the crawler to delete. """ try: self.glue_client.delete_crawler(Name=name) except ClientError as err: logger.error( "Couldn't delete crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
创建运行场景的类。
class GlueCrawlerJobScenario: """ Encapsulates a scenario that shows how to create an AWS Glue crawler and job and use them to transform data from CSV to JSON format. """ def __init__(self, glue_client, glue_service_role, glue_bucket): """ :param glue_client: A Boto3 AWS Glue client. :param glue_service_role: An AWS Identity and Access Management (IAM) role that AWS Glue can assume to gain access to the resources it requires. :param glue_bucket: An S3 bucket that can hold a job script and output data from AWS Glue job runs. """ self.glue_client = glue_client self.glue_service_role = glue_service_role self.glue_bucket = glue_bucket @staticmethod def wait(seconds, tick=12): """ Waits for a specified number of seconds, while also displaying an animated spinner. :param seconds: The number of seconds to wait. :param tick: The number of frames per second used to animate the spinner. """ progress = "|/-\\" waited = 0 while waited < seconds: for frame in range(tick): sys.stdout.write(f"\r{progress[frame % len(progress)]}") sys.stdout.flush() time.sleep(1 / tick) waited += 1 def upload_job_script(self, job_script): """ Uploads a Python ETL script to an S3 bucket. The script is used by the AWS Glue job to transform data. :param job_script: The relative path to the job script. """ try: self.glue_bucket.upload_file(Filename=job_script, Key=job_script) print(f"Uploaded job script '{job_script}' to the example bucket.") except S3UploadFailedError as err: logger.error("Couldn't upload job script. Here's why: %s", err) raise def run(self, crawler_name, db_name, db_prefix, data_source, job_script, job_name): """ Runs the scenario. This is an interactive experience that runs at a command prompt and asks you for input throughout. :param crawler_name: The name of the crawler used in the scenario. If the crawler does not exist, it is created. :param db_name: The name to give the metadata database created by the crawler. :param db_prefix: The prefix to give tables added to the database by the crawler. :param data_source: The location of the data source that is targeted by the crawler and extracted during job runs. :param job_script: The job script that is used to transform data during job runs. :param job_name: The name to give the job definition that is created during the scenario. """ wrapper = GlueWrapper(self.glue_client) print(f"Checking for crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) if crawler is None: print(f"Creating crawler {crawler_name}.") wrapper.create_crawler( crawler_name, self.glue_service_role.arn, db_name, db_prefix, data_source, ) print(f"Created crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) pprint(crawler) print("-" * 88) print( f"When you run the crawler, it crawls data stored in {data_source} and " f"creates a metadata database in the AWS Glue Data Catalog that describes " f"the data in the data source." ) print("In this example, the source data is in CSV format.") ready = False while not ready: ready = Question.ask_question( "Ready to start the crawler? (y/n) ", Question.is_yesno ) wrapper.start_crawler(crawler_name) print("Let's wait for the crawler to run. This typically takes a few minutes.") crawler_state = None while crawler_state != "READY": self.wait(10) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler["State"] print(f"Crawler is {crawler['State']}.") print("-" * 88) database = wrapper.get_database(db_name) print(f"The crawler created database {db_name}:") pprint(database) print(f"The database contains these tables:") tables = wrapper.get_tables(db_name) for index, table in enumerate(tables): print(f"\t{index + 1}. {table['Name']}") table_index = Question.ask_question( f"Enter the number of a table to see more detail: ", Question.is_int, Question.in_range(1, len(tables)), ) pprint(tables[table_index - 1]) print("-" * 88) print(f"Creating job definition {job_name}.") wrapper.create_job( job_name, "Getting started example job.", self.glue_service_role.arn, f"s3://{self.glue_bucket.name}/{job_script}", ) print("Created job definition.") print( f"When you run the job, it extracts data from {data_source}, transforms it " f"by using the {job_script} script, and loads the output into " f"S3 bucket {self.glue_bucket.name}." ) print( "In this example, the data is transformed from CSV to JSON, and only a few " "fields are included in the output." ) job_run_status = None if Question.ask_question(f"Ready to run? (y/n) ", Question.is_yesno): job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["Name"], self.glue_bucket.name ) print(f"Job {job_name} started. Let's wait for it to run.") while job_run_status not in ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"]: self.wait(10) job_run = wrapper.get_job_run(job_name, job_run_id) job_run_status = job_run["JobRunState"] print(f"Job {job_name}/{job_run_id} is {job_run_status}.") print("-" * 88) if job_run_status == "SUCCEEDED": print( f"Data from your job run is stored in your S3 bucket '{self.glue_bucket.name}':" ) try: keys = [ obj.key for obj in self.glue_bucket.objects.filter(Prefix="run-") ] for index, key in enumerate(keys): print(f"\t{index + 1}: {key}") lines = 4 key_index = Question.ask_question( f"Enter the number of a block to download it and see the first {lines} " f"lines of JSON output in the block: ", Question.is_int, Question.in_range(1, len(keys)), ) job_data = io.BytesIO() self.glue_bucket.download_fileobj(keys[key_index - 1], job_data) job_data.seek(0) for _ in range(lines): print(job_data.readline().decode("utf-8")) except ClientError as err: logger.error( "Couldn't get job run data. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise print("-" * 88) job_names = wrapper.list_jobs() if job_names: print(f"Your account has {len(job_names)} jobs defined:") for index, job_name in enumerate(job_names): print(f"\t{index + 1}. {job_name}") job_index = Question.ask_question( f"Enter a number between 1 and {len(job_names)} to see the list of runs for " f"a job: ", Question.is_int, Question.in_range(1, len(job_names)), ) job_runs = wrapper.get_job_runs(job_names[job_index - 1]) if job_runs: print(f"Found {len(job_runs)} runs for job {job_names[job_index - 1]}:") for index, job_run in enumerate(job_runs): print( f"\t{index + 1}. {job_run['JobRunState']} on " f"{job_run['CompletedOn']:%Y-%m-%d %H:%M:%S}" ) run_index = Question.ask_question( f"Enter a number between 1 and {len(job_runs)} to see details for a run: ", Question.is_int, Question.in_range(1, len(job_runs)), ) pprint(job_runs[run_index - 1]) else: print(f"No runs found for job {job_names[job_index - 1]}") else: print("Your account doesn't have any jobs defined.") print("-" * 88) print( f"Let's clean up. During this example we created job definition '{job_name}'." ) if Question.ask_question( "Do you want to delete the definition and all runs? (y/n) ", Question.is_yesno, ): wrapper.delete_job(job_name) print(f"Job definition '{job_name}' deleted.") tables = wrapper.get_tables(db_name) print(f"We also created database '{db_name}' that contains these tables:") for table in tables: print(f"\t{table['Name']}") if Question.ask_question( "Do you want to delete the tables and the database? (y/n) ", Question.is_yesno, ): for table in tables: wrapper.delete_table(db_name, table["Name"]) print(f"Deleted table {table['Name']}.") wrapper.delete_database(db_name) print(f"Deleted database {db_name}.") print(f"We also created crawler '{crawler_name}'.") if Question.ask_question( "Do you want to delete the crawler? (y/n) ", Question.is_yesno ): wrapper.delete_crawler(crawler_name) print(f"Deleted crawler {crawler_name}.") print("-" * 88) def parse_args(args): """ Parse command line arguments. :param args: The command line arguments. :return: The parsed arguments. """ parser = argparse.ArgumentParser( description="Runs the AWS Glue getting started with crawlers and jobs scenario. " "Before you run this scenario, set up scaffold resources by running " "'python scaffold.py deploy'." ) parser.add_argument( "role_name", help="The name of an IAM role that AWS Glue can assume. This role must grant access " "to Amazon S3 and to the permissions granted by the AWSGlueServiceRole " "managed policy.", ) parser.add_argument( "bucket_name", help="The name of an S3 bucket that AWS Glue can access to get the job script and " "put job results.", ) parser.add_argument( "--job_script", default="flight_etl_job_script.py", help="The name of the job script file that is used in the scenario.", ) return parser.parse_args(args) def main(): args = parse_args(sys.argv[1:]) try: print("-" * 88) print( "Welcome to the AWS Glue getting started with crawlers and jobs scenario." ) print("-" * 88) scenario = GlueCrawlerJobScenario( boto3.client("glue"), boto3.resource("iam").Role(args.role_name), boto3.resource("s3").Bucket(args.bucket_name), ) scenario.upload_job_script(args.job_script) scenario.run( "doc-example-crawler", "doc-example-database", "doc-example-", "s3://crawler-public-us-east-1/flight/2016/csv", args.job_script, "doc-example-job", ) print("-" * 88) print( "To destroy scaffold resources, including the IAM role and S3 bucket " "used in this scenario, run 'python scaffold.py destroy'." ) print("\nThanks for watching!") print("-" * 88) except Exception: logging.exception("Something went wrong with the example.")
创建一个 ETL 脚本,AWS Glue 用此脚本在任务运行期间提取、转换和加载数据。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
-
有关 API 详细信息,请参阅《AWS SDK for Python (Boto3) API 参考》中的以下主题。
-
- Ruby
-
- 适用于 Ruby 的 SDK
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 创建一个包含场景中使用的 AWS Glue 函数的类。
# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves information about a specific crawler. # # @param name [String] The name of the crawler to retrieve information about. # @return [Aws::Glue::Types::Crawler, nil] The crawler object if found, or nil if not found. def get_crawler(name) @glue_client.get_crawler(name: name) rescue Aws::Glue::Errors::EntityNotFoundException @logger.info("Crawler #{name} doesn't exist.") false rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get crawler #{name}: \n#{e.message}") raise end # Creates a new crawler with the specified configuration. # # @param name [String] The name of the crawler. # @param role_arn [String] The ARN of the IAM role to be used by the crawler. # @param db_name [String] The name of the database where the crawler stores its metadata. # @param db_prefix [String] The prefix to be added to the names of tables that the crawler creates. # @param s3_target [String] The S3 path that the crawler will crawl. # @return [void] def create_crawler(name, role_arn, db_name, _db_prefix, s3_target) @glue_client.create_crawler( name: name, role: role_arn, database_name: db_name, targets: { s3_targets: [ { path: s3_target } ] } ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create crawler: \n#{e.message}") raise end # Starts a crawler with the specified name. # # @param name [String] The name of the crawler to start. # @return [void] def start_crawler(name) @glue_client.start_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not start crawler #{name}: \n#{e.message}") raise end # Deletes a crawler with the specified name. # # @param name [String] The name of the crawler to delete. # @return [void] def delete_crawler(name) @glue_client.delete_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete crawler #{name}: \n#{e.message}") raise end # Retrieves information about a specific database. # # @param name [String] The name of the database to retrieve information about. # @return [Aws::Glue::Types::Database, nil] The database object if found, or nil if not found. def get_database(name) response = @glue_client.get_database(name: name) response.database rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get database #{name}: \n#{e.message}") raise end # Retrieves a list of tables in the specified database. # # @param db_name [String] The name of the database to retrieve tables from. # @return [Array<Aws::Glue::Types::Table>] def get_tables(db_name) response = @glue_client.get_tables(database_name: db_name) response.table_list rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get tables #{db_name}: \n#{e.message}") raise end # Creates a new job with the specified configuration. # # @param name [String] The name of the job. # @param description [String] The description of the job. # @param role_arn [String] The ARN of the IAM role to be used by the job. # @param script_location [String] The location of the ETL script for the job. # @return [void] def create_job(name, description, role_arn, script_location) @glue_client.create_job( name: name, description: description, role: role_arn, command: { name: 'glueetl', script_location: script_location, python_version: '3' }, glue_version: '3.0' ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create job #{name}: \n#{e.message}") raise end # Starts a job run for the specified job. # # @param name [String] The name of the job to start the run for. # @param input_database [String] The name of the input database for the job. # @param input_table [String] The name of the input table for the job. # @param output_bucket_name [String] The name of the output S3 bucket for the job. # @return [String] The ID of the started job run. def start_job_run(name, input_database, input_table, output_bucket_name) response = @glue_client.start_job_run( job_name: name, arguments: { '--input_database': input_database, '--input_table': input_table, '--output_bucket_url': "s3://#{output_bucket_name}/" } ) response.job_run_id rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not start job run #{name}: \n#{e.message}") raise end # Retrieves a list of jobs in AWS Glue. # # @return [Aws::Glue::Types::ListJobsResponse] def list_jobs @glue_client.list_jobs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not list jobs: \n#{e.message}") raise end # Retrieves a list of job runs for the specified job. # # @param job_name [String] The name of the job to retrieve job runs for. # @return [Array<Aws::Glue::Types::JobRun>] def get_job_runs(job_name) response = @glue_client.get_job_runs(job_name: job_name) response.job_runs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Retrieves data for a specific job run. # # @param job_name [String] The name of the job run to retrieve data for. # @return [Glue::Types::GetJobRunResponse] def get_job_run(job_name, run_id) @glue_client.get_job_run(job_name: job_name, run_id: run_id) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Deletes a job with the specified name. # # @param job_name [String] The name of the job to delete. # @return [void] def delete_job(job_name) @glue_client.delete_job(job_name: job_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Deletes a table with the specified name. # # @param database_name [String] The name of the catalog database in which the table resides. # @param table_name [String] The name of the table to be deleted. # @return [void] def delete_table(database_name, table_name) @glue_client.delete_table(database_name: database_name, name: table_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Removes a specified database from a Data Catalog. # # @param database_name [String] The name of the database to delete. # @return [void] def delete_database(database_name) @glue_client.delete_database(name: database_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete database: \n#{e.message}") end # Uploads a job script file to an S3 bucket. # # @param file_path [String] The local path of the job script file. # @param bucket_resource [Aws::S3::Bucket] The S3 bucket resource to upload the file to. # @return [void] def upload_job_script(file_path, bucket_resource) File.open(file_path) do |file| bucket_resource.client.put_object({ body: file, bucket: bucket_resource.name, key: file_path }) end rescue Aws::S3::Errors::S3UploadFailedError => e @logger.error("S3 could not upload job script: \n#{e.message}") raise end end
创建运行场景的类。
class GlueCrawlerJobScenario def initialize(glue_client, glue_service_role, glue_bucket, logger) @glue_client = glue_client @glue_service_role = glue_service_role @glue_bucket = glue_bucket @logger = logger end def run(crawler_name, db_name, db_prefix, data_source, job_script, job_name) wrapper = GlueWrapper.new(@glue_client, @logger) setup_crawler(wrapper, crawler_name, db_name, db_prefix, data_source) query_database(wrapper, crawler_name, db_name) create_and_run_job(wrapper, job_script, job_name, db_name) end private def setup_crawler(wrapper, crawler_name, db_name, db_prefix, data_source) new_step(1, 'Create a crawler') crawler = wrapper.get_crawler(crawler_name) unless crawler puts "Creating crawler #{crawler_name}." wrapper.create_crawler(crawler_name, @glue_service_role.arn, db_name, db_prefix, data_source) puts "Successfully created #{crawler_name}." end wrapper.start_crawler(crawler_name) monitor_crawler(wrapper, crawler_name) end def monitor_crawler(wrapper, crawler_name) new_step(2, 'Monitor Crawler') crawler_state = nil until crawler_state == 'READY' custom_wait(15) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler[0]['state'] print "Crawler status: #{crawler_state}".yellow end end def query_database(wrapper, _crawler_name, db_name) new_step(3, 'Query the database.') wrapper.get_database(db_name) puts "The crawler created database #{db_name}:" puts "Database contains tables: #{wrapper.get_tables(db_name).map { |t| t['name'] }}" end def create_and_run_job(wrapper, job_script, job_name, db_name) new_step(4, 'Create and run job.') wrapper.upload_job_script(job_script, @glue_bucket) wrapper.create_job(job_name, 'ETL Job', @glue_service_role.arn, "s3://#{@glue_bucket.name}/#{job_script}") run_job(wrapper, job_name, db_name) end def run_job(wrapper, job_name, db_name) new_step(5, 'Run the job.') wrapper.start_job_run(job_name, db_name, wrapper.get_tables(db_name)[0]['name'], @glue_bucket.name) job_run_status = nil until %w[SUCCEEDED FAILED STOPPED].include?(job_run_status) custom_wait(10) job_run = wrapper.get_job_runs(job_name) job_run_status = job_run[0]['job_run_state'] print "Job #{job_name} status: #{job_run_status}".yellow end end end def main banner('../../helpers/banner.txt') puts 'Starting AWS Glue demo...' # Load resource names from YAML. resource_names = YAML.load_file('resource_names.yaml') # Setup services and resources. iam_role = Aws::IAM::Resource.new(region: 'us-east-1').role(resource_names['glue_service_role']) s3_bucket = Aws::S3::Resource.new(region: 'us-east-1').bucket(resource_names['glue_bucket']) # Instantiate scenario and run. scenario = GlueCrawlerJobScenario.new(Aws::Glue::Client.new(region: 'us-east-1'), iam_role, s3_bucket, @logger) random_suffix = rand(10**4) scenario.run("crawler-#{random_suffix}", "db-#{random_suffix}", "prefix-#{random_suffix}-", 's3://data_source', 'job_script.py', "job-#{random_suffix}") puts 'Demo complete.' end
创建一个 ETL 脚本,AWS Glue 用此脚本在任务运行期间提取、转换和加载数据。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
-
有关 API 详细信息,请参阅《AWS SDK for Ruby API 参考》中的以下主题。
-
- Rust
-
- 适用于 Rust 的 SDK
-
注意
查看 GitHub,了解更多信息。查找完整示例,学习如何在 AWS 代码示例存储库
中进行设置和运行。 创建并运行爬网程序,爬取公共 Amazon Simple Storage Service(Amazon S3)存储桶并生成一个描述其找到的 CSV 格式数据的元数据数据库。
let create_crawler = glue .create_crawler() .name(self.crawler()) .database_name(self.database()) .role(self.iam_role.expose_secret()) .targets( CrawlerTargets::builder() .s3_targets(S3Target::builder().path(CRAWLER_TARGET).build()) .build(), ) .send() .await; match create_crawler { Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::AlreadyExistsException(_) => { info!("Using existing crawler"); Ok(()) } _ => Err(GlueMvpError::GlueSdk(glue_err)), } } Ok(_) => Ok(()), }?; let start_crawler = glue.start_crawler().name(self.crawler()).send().await; match start_crawler { Ok(_) => Ok(()), Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::CrawlerRunningException(_) => Ok(()), _ => Err(GlueMvpError::GlueSdk(glue_err)), } } }?;
在 AWS Glue Data Catalog 中列出有关数据库和表的信息。
let database = glue .get_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)? .to_owned(); let database = database .database() .ok_or_else(|| GlueMvpError::Unknown("Could not find database".into()))?; let tables = glue .get_tables() .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let tables = tables.table_list();
创建并运行任务,从源 Amazon S3 存储桶提取 CSV 数据,通过删除和重命名字段对其进行转换,然后将 JSON 格式的输出加载到另一个 Amazon S3 存储桶中。
let create_job = glue .create_job() .name(self.job()) .role(self.iam_role.expose_secret()) .command( JobCommand::builder() .name("glueetl") .python_version("3") .script_location(format!("s3://{}/job.py", self.bucket())) .build(), ) .glue_version("3.0") .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job_name = create_job.name().ok_or_else(|| { GlueMvpError::Unknown("Did not get job name after creating job".into()) })?; let job_run_output = glue .start_job_run() .job_name(self.job()) .arguments("--input_database", self.database()) .arguments( "--input_table", self.tables .first() .ok_or_else(|| GlueMvpError::Unknown("Missing crawler table".into()))? .name(), ) .arguments("--output_bucket_url", self.bucket()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job = job_run_output .job_run_id() .ok_or_else(|| GlueMvpError::Unknown("Missing run id from just started job".into()))? .to_string();
删除演示创建的所有资源。
glue.delete_job() .job_name(self.job()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; for t in &self.tables { glue.delete_table() .name(t.name()) .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; } glue.delete_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; glue.delete_crawler() .name(self.crawler()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?;
-
有关 API 详细信息,请参阅《AWS SDK for Rust API 参考》中的以下主题。
-
有关 AWS SDK 开发人员指南和代码示例的完整列表,请参阅 将此服务与 AWS SDK 结合使用。本主题还包括有关入门的信息以及有关先前的 SDK 版本的详细信息。