Get started running AWS Glue crawlers and jobs using an AWS SDK - AWS Glue

Get started running AWS Glue crawlers and jobs using an AWS SDK

The following code examples show how to:

  • Create a crawler that crawls a public Amazon S3 bucket and generates a database of CSV-formatted metadata.

  • List information about databases and tables in your AWS Glue Data Catalog.

  • Create a job to extract CSV data from the S3 bucket, transform the data, and load JSON-formatted output into another S3 bucket.

  • List information about job runs, view transformed data, and clean up resources.

For more information, see Tutorial: Getting started with AWS Glue Studio.

.NET
AWS SDK for .NET
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create a class that wraps AWS Glue functions that are used in the scenario.

using System.Net; namespace GlueActions; public class GlueWrapper { private readonly IAmazonGlue _amazonGlue; /// <summary> /// Constructor for the AWS Glue actions wrapper. /// </summary> /// <param name="amazonGlue"></param> public GlueWrapper(IAmazonGlue amazonGlue) { _amazonGlue = amazonGlue; } /// <summary> /// Create an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name for the crawler.</param> /// <param name="crawlerDescription">A description of the crawler.</param> /// <param name="role">The AWS Identity and Access Management (IAM) role to /// be assumed by the crawler.</param> /// <param name="schedule">The schedule on which the crawler will be executed.</param> /// <param name="s3Path">The path to the Amazon Simple Storage Service (Amazon S3) /// bucket where the Python script has been stored.</param> /// <param name="dbName">The name to use for the database that will be /// created by the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateCrawlerAsync( string crawlerName, string crawlerDescription, string role, string schedule, string s3Path, string dbName) { var s3Target = new S3Target { Path = s3Path, }; var targetList = new List<S3Target> { s3Target, }; var targets = new CrawlerTargets { S3Targets = targetList, }; var crawlerRequest = new CreateCrawlerRequest { DatabaseName = dbName, Name = crawlerName, Description = crawlerDescription, Targets = targets, Role = role, Schedule = schedule, }; var response = await _amazonGlue.CreateCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Create an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="roleName">The name of the IAM role to be assumed by /// the job.</param> /// <param name="description">A description of the job.</param> /// <param name="scriptUrl">The URL to the script.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateJobAsync(string dbName, string tableName, string bucketUrl, string jobName, string roleName, string description, string scriptUrl) { var command = new JobCommand { PythonVersion = "3", Name = "glueetl", ScriptLocation = scriptUrl, }; var arguments = new Dictionary<string, string> { { "--input_database", dbName }, { "--input_table", tableName }, { "--output_bucket_url", bucketUrl } }; var request = new CreateJobRequest { Command = command, DefaultArguments = arguments, Description = description, GlueVersion = "3.0", Name = jobName, NumberOfWorkers = 10, Role = roleName, WorkerType = "G.1X" }; var response = await _amazonGlue.CreateJobAsync(request); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteCrawlerAsync(string crawlerName) { var response = await _amazonGlue.DeleteCrawlerAsync(new DeleteCrawlerRequest { Name = crawlerName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete the AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteDatabaseAsync(string dbName) { var response = await _amazonGlue.DeleteDatabaseAsync(new DeleteDatabaseRequest { Name = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteJobAsync(string jobName) { var response = await _amazonGlue.DeleteJobAsync(new DeleteJobRequest { JobName = jobName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete a table from an AWS Glue database. /// </summary> /// <param name="tableName">The table to delete.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteTableAsync(string dbName, string tableName) { var response = await _amazonGlue.DeleteTableAsync(new DeleteTableRequest { Name = tableName, DatabaseName = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Get information about an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Crawler object describing the crawler.</returns> public async Task<Crawler?> GetCrawlerAsync(string crawlerName) { var crawlerRequest = new GetCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.GetCrawlerAsync(crawlerRequest); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { var databaseName = response.Crawler.DatabaseName; Console.WriteLine($"{crawlerName} has the database {databaseName}"); return response.Crawler; } Console.WriteLine($"No information regarding {crawlerName} could be found."); return null; } /// <summary> /// Get information about the state of an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A value describing the state of the crawler.</returns> public async Task<CrawlerState> GetCrawlerStateAsync(string crawlerName) { var response = await _amazonGlue.GetCrawlerAsync( new GetCrawlerRequest { Name = crawlerName }); return response.Crawler.State; } /// <summary> /// Get information about an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Database object containing information about the database.</returns> public async Task<Database> GetDatabaseAsync(string dbName) { var databasesRequest = new GetDatabaseRequest { Name = dbName, }; var response = await _amazonGlue.GetDatabaseAsync(databasesRequest); return response.Database; } /// <summary> /// Get information about a specific AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="jobRunId">The Id of the job run.</param> /// <returns>A JobRun object with information about the job run.</returns> public async Task<JobRun> GetJobRunAsync(string jobName, string jobRunId) { var response = await _amazonGlue.GetJobRunAsync(new GetJobRunRequest { JobName = jobName, RunId = jobRunId }); return response.JobRun; } /// <summary> /// Get information about all AWS Glue runs of a specific job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A list of JobRun objects.</returns> public async Task<List<JobRun>> GetJobRunsAsync(string jobName) { var jobRuns = new List<JobRun>(); var request = new GetJobRunsRequest { JobName = jobName, }; // No need to loop to get all the log groups--the SDK does it for us behind the scenes var paginatorForJobRuns = _amazonGlue.Paginators.GetJobRuns(request); await foreach (var response in paginatorForJobRuns.Responses) { response.JobRuns.ForEach(jobRun => { jobRuns.Add(jobRun); }); } return jobRuns; } /// <summary> /// Get a list of tables for an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A list of Table objects.</returns> public async Task<List<Table>> GetTablesAsync(string dbName) { var request = new GetTablesRequest { DatabaseName = dbName }; var tables = new List<Table>(); // Get a paginator for listing the tables. var tablePaginator = _amazonGlue.Paginators.GetTables(request); await foreach (var response in tablePaginator.Responses) { tables.AddRange(response.TableList); } return tables; } /// <summary> /// List AWS Glue jobs using a paginator. /// </summary> /// <returns>A list of AWS Glue job names.</returns> public async Task<List<string>> ListJobsAsync() { var jobNames = new List<string>(); var listJobsPaginator = _amazonGlue.Paginators.ListJobs(new ListJobsRequest { MaxResults = 10 }); await foreach (var response in listJobsPaginator.Responses) { jobNames.AddRange(response.JobNames); } return jobNames; } /// <summary> /// Start an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> StartCrawlerAsync(string crawlerName) { var crawlerRequest = new StartCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.StartCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Start an AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A string representing the job run Id.</returns> public async Task<string> StartJobRunAsync( string jobName, string inputDatabase, string inputTable, string bucketName) { var request = new StartJobRunRequest { JobName = jobName, Arguments = new Dictionary<string, string> { {"--input_database", inputDatabase}, {"--input_table", inputTable}, {"--output_bucket_url", $"s3://{bucketName}/"} } }; var response = await _amazonGlue.StartJobRunAsync(request); return response.JobRunId; } }

Create a class that runs the scenario.

global using Amazon.Glue; global using GlueActions; global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Logging.Console; global using Microsoft.Extensions.Logging.Debug; using Amazon.Glue.Model; using Amazon.S3; using Amazon.S3.Model; namespace GlueBasics; public class GlueBasics { private static ILogger logger = null!; private static IConfiguration _configuration = null!; static async Task Main(string[] args) { // Set up dependency injection for AWS Glue. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonGlue>() .AddTransient<GlueWrapper>() .AddTransient<UiWrapper>() ) .Build(); logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<GlueBasics>(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally load local settings. .Build(); // These values are stored in settings.json // Once you have run the CDK script to deploy the resources, // edit the file to set "BucketName", "RoleName", and "ScriptURL" // to the appropriate values. Also set "CrawlerName" to the name // you want to give the crawler when it is created. string bucketName = _configuration["BucketName"]!; string bucketUrl = _configuration["BucketUrl"]!; string crawlerName = _configuration["CrawlerName"]!; string roleName = _configuration["RoleName"]!; string sourceData = _configuration["SourceData"]!; string dbName = _configuration["DbName"]!; string cron = _configuration["Cron"]!; string scriptUrl = _configuration["ScriptURL"]!; string jobName = _configuration["JobName"]!; var wrapper = host.Services.GetRequiredService<GlueWrapper>(); var uiWrapper = host.Services.GetRequiredService<UiWrapper>(); uiWrapper.DisplayOverview(); uiWrapper.PressEnter(); // Create the crawler and wait for it to be ready. uiWrapper.DisplayTitle("Create AWS Glue crawler"); Console.WriteLine("Let's begin by creating the AWS Glue crawler."); var crawlerDescription = "Crawler created for the AWS Glue Basics scenario."; var crawlerCreated = await wrapper.CreateCrawlerAsync(crawlerName, crawlerDescription, roleName, cron, sourceData, dbName); if (crawlerCreated) { Console.WriteLine($"The crawler: {crawlerName} has been created. Now let's wait until it's ready."); CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't create crawler {crawlerName}."); return; // Exit the application. } uiWrapper.DisplayTitle("Start AWS Glue crawler"); Console.WriteLine("Now let's wait until the crawler has successfully started."); var crawlerStarted = await wrapper.StartCrawlerAsync(crawlerName); if (crawlerStarted) { CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't start the crawler {crawlerName}."); return; // Exit the application. } uiWrapper.PressEnter(); Console.WriteLine($"\nLet's take a look at the database: {dbName}"); var database = await wrapper.GetDatabaseAsync(dbName); if (database != null) { uiWrapper.DisplayTitle($"{database.Name} Details"); Console.WriteLine($"{database.Name} created on {database.CreateTime}"); Console.WriteLine(database.Description); } uiWrapper.PressEnter(); var tables = await wrapper.GetTablesAsync(dbName); if (tables.Count > 0) { tables.ForEach(table => { Console.WriteLine($"{table.Name}\tCreated: {table.CreateTime}\tUpdated: {table.UpdateTime}"); }); } uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Create AWS Glue job"); Console.WriteLine("Creating a new AWS Glue job."); var description = "An AWS Glue job created using the AWS SDK for .NET"; await wrapper.CreateJobAsync(dbName, tables[0].Name, bucketUrl, jobName, roleName, description, scriptUrl); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Starting AWS Glue job"); Console.WriteLine("Starting the new AWS Glue job..."); var jobRunId = await wrapper.StartJobRunAsync(jobName, dbName, tables[0].Name, bucketName); var jobRunComplete = false; var jobRun = new JobRun(); do { jobRun = await wrapper.GetJobRunAsync(jobName, jobRunId); if (jobRun.JobRunState == "SUCCEEDED" || jobRun.JobRunState == "STOPPED" || jobRun.JobRunState == "FAILED" || jobRun.JobRunState == "TIMEOUT") { jobRunComplete = true; } } while (!jobRunComplete); uiWrapper.DisplayTitle($"Data in {bucketName}"); // Get the list of data stored in the S3 bucket. var s3Client = new AmazonS3Client(); var response = await s3Client.ListObjectsAsync(new ListObjectsRequest { BucketName = bucketName }); response.S3Objects.ForEach(s3Object => { Console.WriteLine(s3Object.Key); }); uiWrapper.DisplayTitle("AWS Glue jobs"); var jobNames = await wrapper.ListJobsAsync(); jobNames.ForEach(jobName => { Console.WriteLine(jobName); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Get AWS Glue job run information"); Console.WriteLine("Getting information about the AWS Glue job."); var jobRuns = await wrapper.GetJobRunsAsync(jobName); jobRuns.ForEach(jobRun => { Console.WriteLine($"{jobRun.JobName}\t{jobRun.JobRunState}\t{jobRun.CompletedOn}"); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Deleting resources"); Console.WriteLine("Deleting the AWS Glue job used by the example."); await wrapper.DeleteJobAsync(jobName); Console.WriteLine("Deleting the tables from the database."); tables.ForEach(async table => { await wrapper.DeleteTableAsync(dbName, table.Name); }); Console.WriteLine("Deleting the database."); await wrapper.DeleteDatabaseAsync(dbName); Console.WriteLine("Deleting the AWS Glue crawler."); await wrapper.DeleteCrawlerAsync(crawlerName); Console.WriteLine("The AWS Glue scenario has completed."); uiWrapper.PressEnter(); } } namespace GlueBasics; public class UiWrapper { public readonly string SepBar = new string('-', Console.WindowWidth); /// <summary> /// Show information about the scenario. /// </summary> public void DisplayOverview() { Console.Clear(); DisplayTitle("Amazon Glue: get started with crawlers and jobs"); Console.WriteLine("This example application does the following:"); Console.WriteLine("\t 1. Create a crawler, pass it the IAM role and the URL to the public S3 bucket that contains the source data"); Console.WriteLine("\t 2. Start the crawler."); Console.WriteLine("\t 3. Get the database created by the crawler and the tables in the database."); Console.WriteLine("\t 4. Create a job."); Console.WriteLine("\t 5. Start a job run."); Console.WriteLine("\t 6. Wait for the job run to complete."); Console.WriteLine("\t 7. Show the data stored in the bucket."); Console.WriteLine("\t 8. List jobs for the account."); Console.WriteLine("\t 9. Get job run details for the job that was run."); Console.WriteLine("\t10. Delete the demo job."); Console.WriteLine("\t11. Delete the database and tables created for the demo."); Console.WriteLine("\t12. Delete the crawler."); } /// <summary> /// Display a message and wait until the user presses enter. /// </summary> public void PressEnter() { Console.Write("\nPlease press <Enter> to continue. "); _ = Console.ReadLine(); } /// <summary> /// Pad a string with spaces to center it on the console display. /// </summary> /// <param name="strToCenter">The string to center on the screen.</param> /// <returns>The string padded to make it center on the screen.</returns> public string CenterString(string strToCenter) { var padAmount = (Console.WindowWidth - strToCenter.Length) / 2; var leftPad = new string(' ', padAmount); return $"{leftPad}{strToCenter}"; } /// <summary> /// Display a line of hyphens, the centered text of the title and another /// line of hyphens. /// </summary> /// <param name="strTitle">The string to be displayed.</param> public void DisplayTitle(string strTitle) { Console.WriteLine(SepBar); Console.WriteLine(CenterString(strTitle)); Console.WriteLine(SepBar); } }
C++
SDK for C++
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

//! Scenario which demonstrates using AWS Glue to add a crawler and run a job. /*! \\sa runGettingStartedWithGlueScenario() \param bucketName: An S3 bucket created in the setup. \param roleName: An AWS Identity and Access Management (IAM) role created in the setup. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::runGettingStartedWithGlueScenario(const Aws::String &bucketName, const Aws::String &roleName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::Glue::GlueClient client(clientConfig); Aws::String roleArn; if (!getRoleArn(roleName, roleArn, clientConfig)) { std::cerr << "Error getting role ARN for role." << std::endl; return false; } // 1. Upload the job script to the S3 bucket. { std::cout << "Uploading the job script '" << AwsDoc::Glue::PYTHON_SCRIPT << "'." << std::endl; if (!AwsDoc::Glue::uploadFile(bucketName, AwsDoc::Glue::PYTHON_SCRIPT_PATH, AwsDoc::Glue::PYTHON_SCRIPT, clientConfig)) { std::cerr << "Error uploading the job file." << std::endl; return false; } } // 2. Create a crawler. { Aws::Glue::Model::S3Target s3Target; s3Target.SetPath("s3://crawler-public-us-east-1/flight/2016/csv"); Aws::Glue::Model::CrawlerTargets crawlerTargets; crawlerTargets.AddS3Targets(s3Target); Aws::Glue::Model::CreateCrawlerRequest request; request.SetTargets(crawlerTargets); request.SetName(CRAWLER_NAME); request.SetDatabaseName(CRAWLER_DATABASE_NAME); request.SetTablePrefix(CRAWLER_DATABASE_PREFIX); request.SetRole(roleArn); Aws::Glue::Model::CreateCrawlerOutcome outcome = client.CreateCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the crawler." << std::endl; } else { std::cerr << "Error creating a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets("", CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 3. Get a crawler. { Aws::Glue::Model::GetCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome outcome = client.GetCrawler(request); if (outcome.IsSuccess()) { Aws::Glue::Model::CrawlerState crawlerState = outcome.GetResult().GetCrawler().GetState(); std::cout << "Retrieved crawler with state " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << "." << std::endl; } else { std::cerr << "Error retrieving a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 4. Start a crawler. { Aws::Glue::Model::StartCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::StartCrawlerOutcome outcome = client.StartCrawler(request); if (outcome.IsSuccess() || (Aws::Glue::GlueErrors::CRAWLER_RUNNING == outcome.GetError().GetErrorType())) { if (!outcome.IsSuccess()) { std::cout << "Crawler was already started." << std::endl; } else { std::cout << "Successfully started crawler." << std::endl; } std::cout << "This may take a while to run." << std::endl; Aws::Glue::Model::CrawlerState crawlerState = Aws::Glue::Model::CrawlerState::NOT_SET; int iterations = 0; while (Aws::Glue::Model::CrawlerState::READY != crawlerState) { std::this_thread::sleep_for(std::chrono::seconds(1)); ++iterations; if ((iterations % 10) == 0) { // Log status every 10 seconds. std::cout << "Crawler status " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << ". After " << iterations << " seconds elapsed." << std::endl; } Aws::Glue::Model::GetCrawlerRequest getCrawlerRequest; getCrawlerRequest.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome getCrawlerOutcome = client.GetCrawler( getCrawlerRequest); if (getCrawlerOutcome.IsSuccess()) { crawlerState = getCrawlerOutcome.GetResult().GetCrawler().GetState(); } else { std::cerr << "Error getting crawler. " << getCrawlerOutcome.GetError().GetMessage() << std::endl; break; } } if (Aws::Glue::Model::CrawlerState::READY == crawlerState) { std::cout << "Crawler finished running after " << iterations << " seconds." << std::endl; } } else { std::cerr << "Error starting a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 5. Get a database. { Aws::Glue::Model::GetDatabaseRequest request; request.SetName(CRAWLER_DATABASE_NAME); Aws::Glue::Model::GetDatabaseOutcome outcome = client.GetDatabase(request); if (outcome.IsSuccess()) { const Aws::Glue::Model::Database &database = outcome.GetResult().GetDatabase(); std::cout << "Successfully retrieve the database\n" << database.Jsonize().View().WriteReadable() << "'." << std::endl; } else { std::cerr << "Error getting the database. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 6. Get tables. Aws::String tableName; { Aws::Glue::Model::GetTablesRequest request; request.SetDatabaseName(CRAWLER_DATABASE_NAME); std::vector<Aws::Glue::Model::Table> all_tables; Aws::String nextToken; // Used for pagination. do { Aws::Glue::Model::GetTablesOutcome outcome = client.GetTables(request); if (outcome.IsSuccess()) { const std::vector<Aws::Glue::Model::Table> &tables = outcome.GetResult().GetTableList(); all_tables.insert(all_tables.end(), tables.begin(), tables.end()); nextToken = outcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting the tables. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } while (!nextToken.empty()); std::cout << "The database contains " << all_tables.size() << (all_tables.size() == 1 ? " table." : "tables.") << std::endl; std::cout << "Here is a list of the tables in the database."; for (size_t index = 0; index < all_tables.size(); ++index) { std::cout << " " << index + 1 << ": " << all_tables[index].GetName() << std::endl; } if (!all_tables.empty()) { int tableIndex = askQuestionForIntRange( "Enter an index to display the database detail ", 1, static_cast<int>(all_tables.size())); std::cout << all_tables[tableIndex - 1].Jsonize().View().WriteReadable() << std::endl; tableName = all_tables[tableIndex - 1].GetName(); } } // 7. Create a job. { Aws::Glue::Model::CreateJobRequest request; request.SetName(JOB_NAME); request.SetRole(roleArn); request.SetGlueVersion(GLUE_VERSION); Aws::Glue::Model::JobCommand command; command.SetName(JOB_COMMAND_NAME); command.SetPythonVersion(JOB_PYTHON_VERSION); command.SetScriptLocation( Aws::String("s3://") + bucketName + "/" + PYTHON_SCRIPT); request.SetCommand(command); Aws::Glue::Model::CreateJobOutcome outcome = client.CreateJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the job." << std::endl; } else { std::cerr << "Error creating the job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 8. Start a job run. { Aws::Glue::Model::StartJobRunRequest request; request.SetJobName(JOB_NAME); Aws::Map<Aws::String, Aws::String> arguments; arguments["--input_database"] = CRAWLER_DATABASE_NAME; arguments["--input_table"] = tableName; arguments["--output_bucket_url"] = Aws::String("s3://") + bucketName + "/"; request.SetArguments(arguments); Aws::Glue::Model::StartJobRunOutcome outcome = client.StartJobRun(request); if (outcome.IsSuccess()) { std::cout << "Successfully started the job." << std::endl; Aws::String jobRunId = outcome.GetResult().GetJobRunId(); int iterator = 0; bool done = false; while (!done) { ++iterator; std::this_thread::sleep_for(std::chrono::seconds(1)); Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(JOB_NAME); jobRunRequest.SetRunId(jobRunId); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { const Aws::Glue::Model::JobRun &jobRun = jobRunOutcome.GetResult().GetJobRun(); Aws::Glue::Model::JobRunState jobRunState = jobRun.GetJobRunState(); if ((jobRunState == Aws::Glue::Model::JobRunState::STOPPED) || (jobRunState == Aws::Glue::Model::JobRunState::FAILED) || (jobRunState == Aws::Glue::Model::JobRunState::TIMEOUT)) { std::cerr << "Error running job. " << jobRun.GetErrorMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } else if (jobRunState == Aws::Glue::Model::JobRunState::SUCCEEDED) { std::cout << "Job run succeeded after " << iterator << " seconds elapsed." << std::endl; done = true; } else if ((iterator % 10) == 0) { // Log status every 10 seconds. std::cout << "Job run status " << Aws::Glue::Model::JobRunStateMapper::GetNameForJobRunState( jobRunState) << ". " << iterator << " seconds elapsed." << std::endl; } } else { std::cerr << "Error retrieving job run state. " << jobRunOutcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } } else { std::cerr << "Error starting a job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 9. List the output data stored in the S3 bucket. { Aws::S3::S3Client s3Client; Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(bucketName); request.SetPrefix(OUTPUT_FILE_PREFIX); Aws::String continuationToken; // Used for pagination. std::vector<Aws::S3::Model::Object> allObjects; do { if (!continuationToken.empty()) { request.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome outcome = s3Client.ListObjectsV2( request); if (outcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = outcome.GetResult().GetContents(); allObjects.insert(allObjects.end(), objects.begin(), objects.end()); continuationToken = outcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << outcome.GetError().GetMessage() << std::endl; break; } } while (!continuationToken.empty()); std::cout << "Data from your job is in " << allObjects.size() << " files in the S3 bucket, " << bucketName << "." << std::endl; for (size_t i = 0; i < allObjects.size(); ++i) { std::cout << " " << i + 1 << ". " << allObjects[i].GetKey() << std::endl; } int objectIndex = askQuestionForIntRange( std::string( "Enter the number of a block to download it and see the first ") + std::to_string(LINES_OF_RUN_FILE_TO_DISPLAY) + " lines of JSON output in the block: ", 1, static_cast<int>(allObjects.size())); Aws::String objectKey = allObjects[objectIndex - 1].GetKey(); std::stringstream stringStream; if (getObjectFromBucket(bucketName, objectKey, stringStream, clientConfig)) { for (int i = 0; i < LINES_OF_RUN_FILE_TO_DISPLAY && stringStream; ++i) { std::string line; std::getline(stringStream, line); std::cout << " " << line << std::endl; } } else { deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 10. List all the jobs. Aws::String jobName; { Aws::Glue::Model::ListJobsRequest listJobsRequest; Aws::String nextToken; std::vector<Aws::String> allJobNames; do { if (!nextToken.empty()) { listJobsRequest.SetNextToken(nextToken); } Aws::Glue::Model::ListJobsOutcome listRunsOutcome = client.ListJobs( listJobsRequest); if (listRunsOutcome.IsSuccess()) { const std::vector<Aws::String> &jobNames = listRunsOutcome.GetResult().GetJobNames(); allJobNames.insert(allJobNames.end(), jobNames.begin(), jobNames.end()); nextToken = listRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error listing jobs. " << listRunsOutcome.GetError().GetMessage() << std::endl; } } while (!nextToken.empty()); std::cout << "Your account has " << allJobNames.size() << " jobs." << std::endl; for (size_t i = 0; i < allJobNames.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobNames[i] << std::endl; } int jobIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobNames.size()) + " to see the list of runs for a job: ", 1, static_cast<int>(allJobNames.size())); jobName = allJobNames[jobIndex - 1]; } // 11. Get the job runs for a job. Aws::String jobRunID; if (!jobName.empty()) { Aws::Glue::Model::GetJobRunsRequest getJobRunsRequest; getJobRunsRequest.SetJobName(jobName); Aws::String nextToken; // Used for pagination. std::vector<Aws::Glue::Model::JobRun> allJobRuns; do { if (!nextToken.empty()) { getJobRunsRequest.SetNextToken(nextToken); } Aws::Glue::Model::GetJobRunsOutcome jobRunsOutcome = client.GetJobRuns( getJobRunsRequest); if (jobRunsOutcome.IsSuccess()) { const std::vector<Aws::Glue::Model::JobRun> &jobRuns = jobRunsOutcome.GetResult().GetJobRuns(); allJobRuns.insert(allJobRuns.end(), jobRuns.begin(), jobRuns.end()); nextToken = jobRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting job runs. " << jobRunsOutcome.GetError().GetMessage() << std::endl; break; } } while (!nextToken.empty()); std::cout << "There are " << allJobRuns.size() << " runs in the job '" << jobName << "'." << std::endl; for (size_t i = 0; i < allJobRuns.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobRuns[i].GetJobName() << std::endl; } int runIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobRuns.size()) + " to see details for a run: ", 1, static_cast<int>(allJobRuns.size())); jobRunID = allJobRuns[runIndex - 1].GetId(); } // 12. Get a single job run. if (!jobRunID.empty()) { Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(jobName); jobRunRequest.SetRunId(jobRunID); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { std::cout << "Displaying the job run JSON description." << std::endl; std::cout << jobRunOutcome.GetResult().GetJobRun().Jsonize().View().WriteReadable() << std::endl; } else { std::cerr << "Error get a job run. " << jobRunOutcome.GetError().GetMessage() << std::endl; } } return deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); } //! Cleanup routine to delete created assets. /*! \\sa deleteAssets() \param crawler: Name of an AWS Glue crawler. \param database: The name of an AWS Glue database. \param job: The name of an AWS Glue job. \param bucketName: The name of an S3 bucket. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAssets(const Aws::String &crawler, const Aws::String &database, const Aws::String &job, const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { const Aws::Glue::GlueClient client(clientConfig); bool result = true; // 13. Delete a job. if (!job.empty()) { Aws::Glue::Model::DeleteJobRequest request; request.SetJobName(job); Aws::Glue::Model::DeleteJobOutcome outcome = client.DeleteJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the job." << std::endl; } else { std::cerr << "Error deleting the job. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 14. Delete a database. if (!database.empty()) { Aws::Glue::Model::DeleteDatabaseRequest request; request.SetName(database); Aws::Glue::Model::DeleteDatabaseOutcome outcome = client.DeleteDatabase( request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the database." << std::endl; } else { std::cerr << "Error deleting database. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 15. Delete a crawler. if (!crawler.empty()) { Aws::Glue::Model::DeleteCrawlerRequest request; request.SetName(crawler); Aws::Glue::Model::DeleteCrawlerOutcome outcome = client.DeleteCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the crawler." << std::endl; } else { std::cerr << "Error deleting the crawler. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 16. Delete the job script and run data from the S3 bucket. result &= AwsDoc::Glue::deleteAllObjectsInS3Bucket(bucketName, clientConfig); return result; } //! Routine which uploads a file to an S3 bucket. /*! \\sa uploadFile() \param bucketName: An S3 bucket created in the setup. \param filePath: The path of the file to upload. \param fileName The name for the uploaded file. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::uploadFile(const Aws::String &bucketName, const Aws::String &filePath, const Aws::String &fileName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client s3_client(clientConfig); Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucketName); request.SetKey(fileName); std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>("SampleAllocationTag", filePath.c_str(), std::ios_base::in | std::ios_base::binary); if (!*inputData) { std::cerr << "Error unable to read file " << filePath << std::endl; return false; } request.SetBody(inputData); Aws::S3::Model::PutObjectOutcome outcome = s3_client.PutObject(request); if (!outcome.IsSuccess()) { std::cerr << "Error: PutObject: " << outcome.GetError().GetMessage() << std::endl; } else { std::cout << "Added object '" << filePath << "' to bucket '" << bucketName << "'." << std::endl; } return outcome.IsSuccess(); } //! Routine which deletes all objects in an S3 bucket. /*! \\sa deleteAllObjectsInS3Bucket() \param bucketName: The S3 bucket name. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAllObjectsInS3Bucket(const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::ListObjectsV2Request listObjectsRequest; listObjectsRequest.SetBucket(bucketName); Aws::String continuationToken; // Used for pagination. bool result = true; do { if (!continuationToken.empty()) { listObjectsRequest.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome listObjectsOutcome = client.ListObjectsV2( listObjectsRequest); if (listObjectsOutcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = listObjectsOutcome.GetResult().GetContents(); if (!objects.empty()) { Aws::S3::Model::DeleteObjectsRequest deleteObjectsRequest; deleteObjectsRequest.SetBucket(bucketName); std::vector<Aws::S3::Model::ObjectIdentifier> objectIdentifiers; for (const Aws::S3::Model::Object &object: objects) { objectIdentifiers.push_back( Aws::S3::Model::ObjectIdentifier().WithKey( object.GetKey())); } Aws::S3::Model::Delete objectsDelete; objectsDelete.SetObjects(objectIdentifiers); objectsDelete.SetQuiet(true); deleteObjectsRequest.SetDelete(objectsDelete); Aws::S3::Model::DeleteObjectsOutcome deleteObjectsOutcome = client.DeleteObjects(deleteObjectsRequest); if (!deleteObjectsOutcome.IsSuccess()) { std::cerr << "Error deleting objects. " << deleteObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } else { std::cout << "Successfully deleted the objects." << std::endl; } } else { std::cout << "No objects to delete in '" << bucketName << "'." << std::endl; } continuationToken = listObjectsOutcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << listObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } } while (!continuationToken.empty()); return result; } //! Routine which retrieves an object from an S3 bucket. /*! \\sa getObjectFromBucket() \param bucketName: The S3 bucket name. \param objectKey: The object's name. \param objectStream: A stream to receive the retrieved data. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::getObjectFromBucket(const Aws::String &bucketName, const Aws::String &objectKey, std::ostream &objectStream, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::GetObjectRequest request; request.SetBucket(bucketName); request.SetKey(objectKey); Aws::S3::Model::GetObjectOutcome outcome = client.GetObject(request); if (outcome.IsSuccess()) { std::cout << "Successfully retrieved '" << objectKey << "'." << std::endl; auto &body = outcome.GetResult().GetBody(); objectStream << body.rdbuf(); } else { std::cerr << "Error retrieving object. " << outcome.GetError().GetMessage() << std::endl; } return outcome.IsSuccess(); }
Java
SDK for Java 2.x
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

/** * * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * To set up the resources, see this documentation topic: * * https://docs.aws.amazon.com/glue/latest/ug/tutorial-add-crawler.html * * This example performs the following tasks: * * 1. Create a database. * 2. Create a crawler. * 3. Get a crawler. * 4. Start a crawler. * 5. Get a database. * 6. Get tables. * 7. Create a job. * 8. Start a job run. * 9. List all jobs. * 10. Get job runs. * 11. Delete a job. * 12. Delete a database. * 13. Delete a crawler. */ public class GlueScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) throws InterruptedException { final String usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName>\s Where: iam - The ARN of the IAM role that has AWS Glue and S3 permissions.\s s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, CSV data). cron - A cron expression used to specify the schedule (i.e., cron(15 12 * * ? *). dbName - The database name.\s crawlerName - The name of the crawler.\s jobName - The name you assign to this job definition. scriptLocation - The Amazon S3 path to a script that runs a job. locationUri - The location of the database bucketNameSc - The Amazon S3 bucket name used when creating a job """; if (args.length != 9) { System.out.println(usage); System.exit(1); } String iam = args[0]; String s3Path = args[1]; String cron = args[2]; String dbName = args[3]; String crawlerName = args[4]; String jobName = args[5]; String scriptLocation = args[6]; String locationUri = args[7]; String bucketNameSc = args[8]; Region region = Region.US_EAST_1; GlueClient glueClient = GlueClient.builder() .region(region) .build(); System.out.println(DASHES); System.out.println("Welcome to the AWS Glue scenario."); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create a database."); createDatabase(glueClient, dbName, locationUri); System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Create a crawler."); createGlueCrawler(glueClient, iam, s3Path, cron, dbName, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Get a crawler."); getSpecificCrawler(glueClient, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Start a crawler."); startSpecificCrawler(glueClient, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Get a database."); getSpecificDatabase(glueClient, dbName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("*** Wait 5 min for the tables to become available"); TimeUnit.MINUTES.sleep(5); System.out.println("6. Get tables."); String myTableName = getGlueTables(glueClient, dbName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Create a job."); createJob(glueClient, jobName, iam, scriptLocation); System.out.println(DASHES); System.out.println(DASHES); System.out.println("8. Start a Job run."); startJob(glueClient, jobName, dbName, myTableName, bucketNameSc); System.out.println(DASHES); System.out.println(DASHES); System.out.println("9. List all jobs."); getAllJobs(glueClient); System.out.println(DASHES); System.out.println(DASHES); System.out.println("10. Get job runs."); getJobRuns(glueClient, jobName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("11. Delete a job."); deleteJob(glueClient, jobName); System.out.println("*** Wait 5 MIN for the " + crawlerName + " to stop"); TimeUnit.MINUTES.sleep(5); System.out.println(DASHES); System.out.println(DASHES); System.out.println("12. Delete a database."); deleteDatabase(glueClient, dbName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Delete a crawler."); deleteSpecificCrawler(glueClient, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Successfully completed the AWS Glue Scenario"); System.out.println(DASHES); } public static void createDatabase(GlueClient glueClient, String dbName, String locationUri) { try { DatabaseInput input = DatabaseInput.builder() .description("Built with the AWS SDK for Java V2") .name(dbName) .locationUri(locationUri) .build(); CreateDatabaseRequest request = CreateDatabaseRequest.builder() .databaseInput(input) .build(); glueClient.createDatabase(request); System.out.println(dbName + " was successfully created"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void createGlueCrawler(GlueClient glueClient, String iam, String s3Path, String cron, String dbName, String crawlerName) { try { S3Target s3Target = S3Target.builder() .path(s3Path) .build(); List<S3Target> targetList = new ArrayList<>(); targetList.add(s3Target); CrawlerTargets targets = CrawlerTargets.builder() .s3Targets(targetList) .build(); CreateCrawlerRequest crawlerRequest = CreateCrawlerRequest.builder() .databaseName(dbName) .name(crawlerName) .description("Created by the AWS Glue Java API") .targets(targets) .role(iam) .schedule(cron) .build(); glueClient.createCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully created"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getSpecificCrawler(GlueClient glueClient, String crawlerName) { try { GetCrawlerRequest crawlerRequest = GetCrawlerRequest.builder() .name(crawlerName) .build(); boolean ready = false; while (!ready) { GetCrawlerResponse response = glueClient.getCrawler(crawlerRequest); String status = response.crawler().stateAsString(); if (status.compareTo("READY") == 0) { ready = true; } Thread.sleep(3000); } System.out.println("The crawler is now ready"); } catch (GlueException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void startSpecificCrawler(GlueClient glueClient, String crawlerName) { try { StartCrawlerRequest crawlerRequest = StartCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.startCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully started!"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getSpecificDatabase(GlueClient glueClient, String databaseName) { try { GetDatabaseRequest databasesRequest = GetDatabaseRequest.builder() .name(databaseName) .build(); GetDatabaseResponse response = glueClient.getDatabase(databasesRequest); Instant createDate = response.database().createTime(); // Convert the Instant to readable date. DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT) .withLocale(Locale.US) .withZone(ZoneId.systemDefault()); formatter.format(createDate); System.out.println("The create date of the database is " + createDate); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static String getGlueTables(GlueClient glueClient, String dbName) { String myTableName = ""; try { GetTablesRequest tableRequest = GetTablesRequest.builder() .databaseName(dbName) .build(); GetTablesResponse response = glueClient.getTables(tableRequest); List<Table> tables = response.tableList(); if (tables.isEmpty()) { System.out.println("No tables were returned"); } else { for (Table table : tables) { myTableName = table.name(); System.out.println("Table name is: " + myTableName); } } } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return myTableName; } public static void startJob(GlueClient glueClient, String jobName, String inputDatabase, String inputTable, String outBucket) { try { Map<String, String> myMap = new HashMap<>(); myMap.put("--input_database", inputDatabase); myMap.put("--input_table", inputTable); myMap.put("--output_bucket_url", outBucket); StartJobRunRequest runRequest = StartJobRunRequest.builder() .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .arguments(myMap) .jobName(jobName) .build(); StartJobRunResponse response = glueClient.startJobRun(runRequest); System.out.println("The request Id of the job is " + response.responseMetadata().requestId()); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void createJob(GlueClient glueClient, String jobName, String iam, String scriptLocation) { try { JobCommand command = JobCommand.builder() .pythonVersion("3") .name("glueetl") .scriptLocation(scriptLocation) .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .description("A Job created by using the AWS SDK for Java V2") .glueVersion("2.0") .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .name(jobName) .role(iam) .command(command) .build(); glueClient.createJob(jobRequest); System.out.println(jobName + " was successfully created."); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getAllJobs(GlueClient glueClient) { try { GetJobsRequest jobsRequest = GetJobsRequest.builder() .maxResults(10) .build(); GetJobsResponse jobsResponse = glueClient.getJobs(jobsRequest); List<Job> jobs = jobsResponse.jobs(); for (Job job : jobs) { System.out.println("Job name is : " + job.name()); System.out.println("The job worker type is : " + job.workerType().name()); } } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getJobRuns(GlueClient glueClient, String jobName) { try { GetJobRunsRequest runsRequest = GetJobRunsRequest.builder() .jobName(jobName) .maxResults(20) .build(); boolean jobDone = false; while (!jobDone) { GetJobRunsResponse response = glueClient.getJobRuns(runsRequest); List<JobRun> jobRuns = response.jobRuns(); for (JobRun jobRun : jobRuns) { String jobState = jobRun.jobRunState().name(); if (jobState.compareTo("SUCCEEDED") == 0) { System.out.println(jobName + " has succeeded"); jobDone = true; } else if (jobState.compareTo("STOPPED") == 0) { System.out.println("Job run has stopped"); jobDone = true; } else if (jobState.compareTo("FAILED") == 0) { System.out.println("Job run has failed"); jobDone = true; } else if (jobState.compareTo("TIMEOUT") == 0) { System.out.println("Job run has timed out"); jobDone = true; } else { System.out.println("*** Job run state is " + jobRun.jobRunState().name()); System.out.println("Job run Id is " + jobRun.id()); System.out.println("The Glue version is " + jobRun.glueVersion()); } TimeUnit.SECONDS.sleep(5); } } } catch (GlueException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void deleteJob(GlueClient glueClient, String jobName) { try { DeleteJobRequest jobRequest = DeleteJobRequest.builder() .jobName(jobName) .build(); glueClient.deleteJob(jobRequest); System.out.println(jobName + " was successfully deleted"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteDatabase(GlueClient glueClient, String databaseName) { try { DeleteDatabaseRequest request = DeleteDatabaseRequest.builder() .name(databaseName) .build(); glueClient.deleteDatabase(request); System.out.println(databaseName + " was successfully deleted"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteSpecificCrawler(GlueClient glueClient, String crawlerName) { try { DeleteCrawlerRequest deleteCrawlerRequest = DeleteCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.deleteCrawler(deleteCrawlerRequest); System.out.println(crawlerName + " was deleted"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
JavaScript
SDK for JavaScript (v3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create and run a crawler that crawls a public Amazon Simple Storage Service (Amazon S3) bucket and generates a metadata database that describes the CSV-formatted data it finds.

const createCrawler = (name, role, dbName, tablePrefix, s3TargetPath) => { const client = new GlueClient({}); const command = new CreateCrawlerCommand({ Name: name, Role: role, DatabaseName: dbName, TablePrefix: tablePrefix, Targets: { S3Targets: [{ Path: s3TargetPath }], }, }); return client.send(command); }; const getCrawler = (name) => { const client = new GlueClient({}); const command = new GetCrawlerCommand({ Name: name, }); return client.send(command); }; const startCrawler = (name) => { const client = new GlueClient({}); const command = new StartCrawlerCommand({ Name: name, }); return client.send(command); }; const crawlerExists = async ({ getCrawler }, crawlerName) => { try { await getCrawler(crawlerName); return true; } catch { return false; } }; /** * @param {{ createCrawler: import('../../../actions/create-crawler.js').createCrawler}} actions */ const makeCreateCrawlerStep = (actions) => async (context) => { if (await crawlerExists(actions, process.env.CRAWLER_NAME)) { log("Crawler already exists. Skipping creation."); } else { await actions.createCrawler( process.env.CRAWLER_NAME, process.env.ROLE_NAME, process.env.DATABASE_NAME, process.env.TABLE_PREFIX, process.env.S3_TARGET_PATH, ); log("Crawler created successfully.", { type: "success" }); } return { ...context }; }; /** * @param {(name: string) => Promise<import('@aws-sdk/client-glue').GetCrawlerCommandOutput>} getCrawler * @param {string} crawlerName */ const waitForCrawler = async (getCrawler, crawlerName) => { const waitTimeInSeconds = 30; const { Crawler } = await getCrawler(crawlerName); if (!Crawler) { throw new Error(`Crawler with name ${crawlerName} not found.`); } if (Crawler.State === "READY") { return; } log(`Crawler is ${Crawler.State}. Waiting ${waitTimeInSeconds} seconds...`); await wait(waitTimeInSeconds); return waitForCrawler(getCrawler, crawlerName); }; const makeStartCrawlerStep = ({ startCrawler, getCrawler }) => async (context) => { log("Starting crawler."); await startCrawler(process.env.CRAWLER_NAME); log("Crawler started.", { type: "success" }); log("Waiting for crawler to finish running. This can take a while."); await waitForCrawler(getCrawler, process.env.CRAWLER_NAME); log("Crawler ready.", { type: "success" }); return { ...context }; };

List information about databases and tables in your AWS Glue Data Catalog.

const getDatabase = (name) => { const client = new GlueClient({}); const command = new GetDatabaseCommand({ Name: name, }); return client.send(command); }; const getTables = (databaseName) => { const client = new GlueClient({}); const command = new GetTablesCommand({ DatabaseName: databaseName, }); return client.send(command); }; const makeGetDatabaseStep = ({ getDatabase }) => async (context) => { const { Database: { Name }, } = await getDatabase(process.env.DATABASE_NAME); log(`Database: ${Name}`); return { ...context }; }; /** * @param {{ getTables: () => Promise<import('@aws-sdk/client-glue').GetTablesCommandOutput}} config */ const makeGetTablesStep = ({ getTables }) => async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME); log("Tables:"); log(TableList.map((table) => ` • ${table.Name}\n`)); return { ...context }; };

Create and run a job that extracts CSV data from the source Amazon S3 bucket, transforms it by removing and renaming fields, and loads JSON-formatted output into another Amazon S3 bucket.

const createJob = (name, role, scriptBucketName, scriptKey) => { const client = new GlueClient({}); const command = new CreateJobCommand({ Name: name, Role: role, Command: { Name: "glueetl", PythonVersion: "3", ScriptLocation: `s3://${scriptBucketName}/${scriptKey}`, }, GlueVersion: "3.0", }); return client.send(command); }; const startJobRun = (jobName, dbName, tableName, bucketName) => { const client = new GlueClient({}); const command = new StartJobRunCommand({ JobName: jobName, Arguments: { "--input_database": dbName, "--input_table": tableName, "--output_bucket_url": `s3://${bucketName}/`, }, }); return client.send(command); }; const makeCreateJobStep = ({ createJob }) => async (context) => { log("Creating Job."); await createJob( process.env.JOB_NAME, process.env.ROLE_NAME, process.env.BUCKET_NAME, process.env.PYTHON_SCRIPT_KEY, ); log("Job created.", { type: "success" }); return { ...context }; }; /** * @param {(name: string, runId: string) => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput> } getJobRun * @param {string} jobName * @param {string} jobRunId */ const waitForJobRun = async (getJobRun, jobName, jobRunId) => { const waitTimeInSeconds = 30; const { JobRun } = await getJobRun(jobName, jobRunId); if (!JobRun) { throw new Error(`Job run with id ${jobRunId} not found.`); } switch (JobRun.JobRunState) { case "FAILED": case "TIMEOUT": case "STOPPED": throw new Error( `Job ${JobRun.JobRunState}. Error: ${JobRun.ErrorMessage}`, ); case "RUNNING": break; case "SUCCEEDED": return; default: throw new Error(`Unknown job run state: ${JobRun.JobRunState}`); } log( `Job ${JobRun.JobRunState}. Waiting ${waitTimeInSeconds} more seconds...`, ); await wait(waitTimeInSeconds); return waitForJobRun(getJobRun, jobName, jobRunId); }; /** * @param {{ prompter: { prompt: () => Promise<{ shouldOpen: boolean }>} }} context */ const promptToOpen = async (context) => { const { shouldOpen } = await context.prompter.prompt({ name: "shouldOpen", type: "confirm", message: "Open the output bucket in your browser?", }); if (shouldOpen) { return open( `https://s3.console.aws.amazon.com/s3/buckets/${process.env.BUCKET_NAME} to view the output.`, ); } }; const makeStartJobRunStep = ({ startJobRun, getJobRun }) => async (context) => { log("Starting job."); const { JobRunId } = await startJobRun( process.env.JOB_NAME, process.env.DATABASE_NAME, process.env.TABLE_NAME, process.env.BUCKET_NAME, ); log("Job started.", { type: "success" }); log("Waiting for job to finish running. This can take a while."); await waitForJobRun(getJobRun, process.env.JOB_NAME, JobRunId); log("Job run succeeded.", { type: "success" }); await promptToOpen(context); return { ...context }; };

List information about job runs and view some of the transformed data.

const getJobRuns = (jobName) => { const client = new GlueClient({}); const command = new GetJobRunsCommand({ JobName: jobName, }); return client.send(command); }; const getJobRun = (jobName, jobRunId) => { const client = new GlueClient({}); const command = new GetJobRunCommand({ JobName: jobName, RunId: jobRunId, }); return client.send(command); }; /** * @typedef {{ prompter: { prompt: () => Promise<{jobName: string}> } }} Context */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput>} getJobRun */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunsCommandOutput} getJobRuns */ /** * * @param {getJobRun} getJobRun * @param {string} jobName * @param {string} jobRunId */ const logJobRunDetails = async (getJobRun, jobName, jobRunId) => { const { JobRun } = await getJobRun(jobName, jobRunId); log(JobRun, { type: "object" }); }; /** * * @param {{getJobRuns: getJobRuns, getJobRun: getJobRun }} funcs */ const makePickJobRunStep = ({ getJobRuns, getJobRun }) => async (/** @type { Context } */ context) => { if (context.selectedJobName) { const { JobRuns } = await getJobRuns(context.selectedJobName); const { jobRunId } = await context.prompter.prompt({ name: "jobRunId", type: "list", message: "Select a job run to see details.", choices: JobRuns.map((run) => run.Id), }); logJobRunDetails(getJobRun, context.selectedJobName, jobRunId); } return { ...context }; };

Delete all resources created by the demo.

const deleteJob = (jobName) => { const client = new GlueClient({}); const command = new DeleteJobCommand({ JobName: jobName, }); return client.send(command); }; const deleteTable = (databaseName, tableName) => { const client = new GlueClient({}); const command = new DeleteTableCommand({ DatabaseName: databaseName, Name: tableName, }); return client.send(command); }; const deleteDatabase = (databaseName) => { const client = new GlueClient({}); const command = new DeleteDatabaseCommand({ Name: databaseName, }); return client.send(command); }; const deleteCrawler = (crawlerName) => { const client = new GlueClient({}); const command = new DeleteCrawlerCommand({ Name: crawlerName, }); return client.send(command); }; /** * * @param {import('../../../actions/delete-job.js').deleteJob} deleteJobFn * @param {string[]} jobNames * @param {{ prompter: { prompt: () => Promise<any> }}} context */ const handleDeleteJobs = async (deleteJobFn, jobNames, context) => { /** * @type {{ selectedJobNames: string[] }} */ const { selectedJobNames } = await context.prompter.prompt({ name: "selectedJobNames", type: "checkbox", message: "Let's clean up jobs. Select jobs to delete.", choices: jobNames, }); if (selectedJobNames.length === 0) { log("No jobs selected."); } else { log("Deleting jobs."); await Promise.all( selectedJobNames.map((n) => deleteJobFn(n).catch(console.error)), ); log("Jobs deleted.", { type: "success" }); } }; /** * @param {{ * listJobs: import('../../../actions/list-jobs.js').listJobs, * deleteJob: import('../../../actions/delete-job.js').deleteJob * }} config */ const makeCleanUpJobsStep = ({ listJobs, deleteJob }) => async (context) => { const { JobNames } = await listJobs(); if (JobNames.length > 0) { await handleDeleteJobs(deleteJob, JobNames, context); } return { ...context }; }; /** * @param {import('../../../actions/delete-table.js').deleteTable} deleteTable * @param {string} databaseName * @param {string[]} tableNames */ const deleteTables = (deleteTable, databaseName, tableNames) => Promise.all( tableNames.map((tableName) => deleteTable(databaseName, tableName).catch(console.error), ), ); /** * @param {{ * getTables: import('../../../actions/get-tables.js').getTables, * deleteTable: import('../../../actions/delete-table.js').deleteTable * }} config */ const makeCleanUpTablesStep = ({ getTables, deleteTable }) => /** * @param {{ prompter: { prompt: () => Promise<any>}}} context */ async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME).catch( () => ({ TableList: null }), ); if (TableList && TableList.length > 0) { /** * @type {{ tableNames: string[] }} */ const { tableNames } = await context.prompter.prompt({ name: "tableNames", type: "checkbox", message: "Let's clean up tables. Select tables to delete.", choices: TableList.map((t) => t.Name), }); if (tableNames.length === 0) { log("No tables selected."); } else { log("Deleting tables."); await deleteTables(deleteTable, process.env.DATABASE_NAME, tableNames); log("Tables deleted.", { type: "success" }); } } return { ...context }; }; /** * @param {import('../../../actions/delete-database.js').deleteDatabase} deleteDatabase * @param {string[]} databaseNames */ const deleteDatabases = (deleteDatabase, databaseNames) => Promise.all( databaseNames.map((dbName) => deleteDatabase(dbName).catch(console.error)), ); /** * @param {{ * getDatabases: import('../../../actions/get-databases.js').getDatabases * deleteDatabase: import('../../../actions/delete-database.js').deleteDatabase * }} config */ const makeCleanUpDatabasesStep = ({ getDatabases, deleteDatabase }) => /** * @param {{ prompter: { prompt: () => Promise<any>}} context */ async (context) => { const { DatabaseList } = await getDatabases(); if (DatabaseList.length > 0) { /** @type {{ dbNames: string[] }} */ const { dbNames } = await context.prompter.prompt({ name: "dbNames", type: "checkbox", message: "Let's clean up databases. Select databases to delete.", choices: DatabaseList.map((db) => db.Name), }); if (dbNames.length === 0) { log("No databases selected."); } else { log("Deleting databases."); await deleteDatabases(deleteDatabase, dbNames); log("Databases deleted.", { type: "success" }); } } return { ...context }; }; const cleanUpCrawlerStep = async (context) => { log(`Deleting crawler.`); try { await deleteCrawler(process.env.CRAWLER_NAME); log("Crawler deleted.", { type: "success" }); } catch (err) { if (err.name === "EntityNotFoundException") { log(`Crawler is already deleted.`); } else { throw err; } } return { ...context }; };
Kotlin
SDK for Kotlin
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

suspend fun main(args: Array<String>) { val usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName> <scriptLocation> <locationUri> Where: iam - The Amazon Resource Name (ARN) of the AWS Identity and Access Management (IAM) role that has AWS Glue and Amazon Simple Storage Service (Amazon S3) permissions. s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, CSV data). cron - A cron expression used to specify the schedule (for example, cron(15 12 * * ? *). dbName - The database name. crawlerName - The name of the crawler. jobName - The name you assign to this job definition. scriptLocation - Specifies the Amazon S3 path to a script that runs a job. locationUri - Specifies the location of the database """ if (args.size != 8) { println(usage) exitProcess(1) } val iam = args[0] val s3Path = args[1] val cron = args[2] val dbName = args[3] val crawlerName = args[4] val jobName = args[5] val scriptLocation = args[6] val locationUri = args[7] println("About to start the AWS Glue Scenario") createDatabase(dbName, locationUri) createCrawler(iam, s3Path, cron, dbName, crawlerName) getCrawler(crawlerName) startCrawler(crawlerName) getDatabase(dbName) getGlueTables(dbName) createJob(jobName, iam, scriptLocation) startJob(jobName) getJobs() getJobRuns(jobName) deleteJob(jobName) println("*** Wait for 5 MIN so the $crawlerName is ready to be deleted") TimeUnit.MINUTES.sleep(5) deleteMyDatabase(dbName) deleteCrawler(crawlerName) } suspend fun createDatabase( dbName: String?, locationUriVal: String?, ) { val input = DatabaseInput { description = "Built with the AWS SDK for Kotlin" name = dbName locationUri = locationUriVal } val request = CreateDatabaseRequest { databaseInput = input } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createDatabase(request) println("The database was successfully created") } } suspend fun createCrawler( iam: String?, s3Path: String?, cron: String?, dbName: String?, crawlerName: String, ) { val s3Target = S3Target { path = s3Path } val targetList = ArrayList<S3Target>() targetList.add(s3Target) val targetOb = CrawlerTargets { s3Targets = targetList } val crawlerRequest = CreateCrawlerRequest { databaseName = dbName name = crawlerName description = "Created by the AWS Glue Java API" targets = targetOb role = iam schedule = cron } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createCrawler(crawlerRequest) println("$crawlerName was successfully created") } } suspend fun getCrawler(crawlerName: String?) { val request = GetCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getCrawler(request) val role = response.crawler?.role println("The role associated with this crawler is $role") } } suspend fun startCrawler(crawlerName: String) { val crawlerRequest = StartCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.startCrawler(crawlerRequest) println("$crawlerName was successfully started.") } } suspend fun getDatabase(databaseName: String?) { val request = GetDatabaseRequest { name = databaseName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getDatabase(request) val dbDesc = response.database?.description println("The database description is $dbDesc") } } suspend fun getGlueTables(dbName: String?) { val tableRequest = GetTablesRequest { databaseName = dbName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getTables(tableRequest) response.tableList?.forEach { tableName -> println("Table name is ${tableName.name}") } } } suspend fun startJob(jobNameVal: String?) { val runRequest = StartJobRunRequest { workerType = WorkerType.G1X numberOfWorkers = 10 jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.startJobRun(runRequest) println("The job run Id is ${response.jobRunId}") } } suspend fun createJob( jobName: String, iam: String?, scriptLocationVal: String?, ) { val commandOb = JobCommand { pythonVersion = "3" name = "MyJob1" scriptLocation = scriptLocationVal } val jobRequest = CreateJobRequest { description = "A Job created by using the AWS SDK for Java V2" glueVersion = "2.0" workerType = WorkerType.G1X numberOfWorkers = 10 name = jobName role = iam command = commandOb } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createJob(jobRequest) println("$jobName was successfully created.") } } suspend fun getJobs() { val request = GetJobsRequest { maxResults = 10 } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobs(request) response.jobs?.forEach { job -> println("Job name is ${job.name}") } } } suspend fun getJobRuns(jobNameVal: String?) { val request = GetJobRunsRequest { jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobRuns(request) response.jobRuns?.forEach { job -> println("Job name is ${job.jobName}") } } } suspend fun deleteJob(jobNameVal: String) { val jobRequest = DeleteJobRequest { jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteJob(jobRequest) println("$jobNameVal was successfully deleted") } } suspend fun deleteMyDatabase(databaseName: String) { val request = DeleteDatabaseRequest { name = databaseName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteDatabase(request) println("$databaseName was successfully deleted") } } suspend fun deleteCrawler(crawlerName: String) { val request = DeleteCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteCrawler(request) println("$crawlerName was deleted") } }
PHP
SDK for PHP
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

namespace Glue; use Aws\Glue\GlueClient; use Aws\S3\S3Client; use AwsUtilities\AWSServiceClass; use GuzzleHttp\Psr7\Stream; use Iam\IAMService; class GettingStartedWithGlue { public function run() { echo("\n"); echo("--------------------------------------\n"); print("Welcome to the AWS Glue getting started demo using PHP!\n"); echo("--------------------------------------\n"); $clientArgs = [ 'region' => 'us-west-2', 'version' => 'latest', 'profile' => 'default', ]; $uniqid = uniqid(); $glueClient = new GlueClient($clientArgs); $glueService = new GlueService($glueClient); $iamService = new IAMService(); $crawlerName = "example-crawler-test-" . $uniqid; AWSServiceClass::$waitTime = 5; AWSServiceClass::$maxWaitAttempts = 20; $role = $iamService->getRole("AWSGlueServiceRole-DocExample"); $databaseName = "doc-example-database-$uniqid"; $path = 's3://crawler-public-us-east-1/flight/2016/csv'; $glueService->createCrawler($crawlerName, $role['Role']['Arn'], $databaseName, $path); $glueService->startCrawler($crawlerName); echo "Waiting for crawler"; do { $crawler = $glueService->getCrawler($crawlerName); echo "."; sleep(10); } while ($crawler['Crawler']['State'] != "READY"); echo "\n"; $database = $glueService->getDatabase($databaseName); echo "Found a database named " . $database['Database']['Name'] . "\n"; //Upload job script $s3client = new S3Client($clientArgs); $bucketName = "test-glue-bucket-" . $uniqid; $s3client->createBucket([ 'Bucket' => $bucketName, 'CreateBucketConfiguration' => ['LocationConstraint' => 'us-west-2'], ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'run_job.py', 'SourceFile' => __DIR__ . '/flight_etl_job_script.py' ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'setup_scenario_getting_started.yaml', 'SourceFile' => __DIR__ . '/setup_scenario_getting_started.yaml' ]); $tables = $glueService->getTables($databaseName); $jobName = 'test-job-' . $uniqid; $scriptLocation = "s3://$bucketName/run_job.py"; $job = $glueService->createJob($jobName, $role['Role']['Arn'], $scriptLocation); $outputBucketUrl = "s3://$bucketName"; $runId = $glueService->startJobRun($jobName, $databaseName, $tables, $outputBucketUrl)['JobRunId']; echo "waiting for job"; do { $jobRun = $glueService->getJobRun($jobName, $runId); echo "."; sleep(10); } while (!array_intersect([$jobRun['JobRun']['JobRunState']], ['SUCCEEDED', 'STOPPED', 'FAILED', 'TIMEOUT'])); echo "\n"; $jobRuns = $glueService->getJobRuns($jobName); $objects = $s3client->listObjects([ 'Bucket' => $bucketName, ])['Contents']; foreach ($objects as $object) { echo $object['Key'] . "\n"; } echo "Downloading " . $objects[1]['Key'] . "\n"; /** @var Stream $downloadObject */ $downloadObject = $s3client->getObject([ 'Bucket' => $bucketName, 'Key' => $objects[1]['Key'], ])['Body']->getContents(); echo "Here is the first 1000 characters in the object."; echo substr($downloadObject, 0, 1000); $jobs = $glueService->listJobs(); echo "Current jobs:\n"; foreach ($jobs['JobNames'] as $jobsName) { echo "{$jobsName}\n"; } echo "Delete the job.\n"; $glueClient->deleteJob([ 'JobName' => $job['Name'], ]); echo "Delete the tables.\n"; foreach ($tables['TableList'] as $table) { $glueService->deleteTable($table['Name'], $databaseName); } echo "Delete the databases.\n"; $glueClient->deleteDatabase([ 'Name' => $databaseName, ]); echo "Delete the crawler.\n"; $glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); $deleteObjects = $s3client->listObjectsV2([ 'Bucket' => $bucketName, ]); echo "Delete all objects in the bucket.\n"; $deleteObjects = $s3client->deleteObjects([ 'Bucket' => $bucketName, 'Delete' => [ 'Objects' => $deleteObjects['Contents'], ] ]); echo "Delete the bucket.\n"; $s3client->deleteBucket(['Bucket' => $bucketName]); echo "This job was brought to you by the number $uniqid\n"; } } namespace Glue; use Aws\Glue\GlueClient; use Aws\Result; use function PHPUnit\Framework\isEmpty; class GlueService extends \AwsUtilities\AWSServiceClass { protected GlueClient $glueClient; public function __construct($glueClient) { $this->glueClient = $glueClient; } public function getCrawler($crawlerName) { return $this->customWaiter(function () use ($crawlerName) { return $this->glueClient->getCrawler([ 'Name' => $crawlerName, ]); }); } public function createCrawler($crawlerName, $role, $databaseName, $path): Result { return $this->customWaiter(function () use ($crawlerName, $role, $databaseName, $path) { return $this->glueClient->createCrawler([ 'Name' => $crawlerName, 'Role' => $role, 'DatabaseName' => $databaseName, 'Targets' => [ 'S3Targets' => [[ 'Path' => $path, ]] ], ]); }); } public function startCrawler($crawlerName): Result { return $this->glueClient->startCrawler([ 'Name' => $crawlerName, ]); } public function getDatabase(string $databaseName): Result { return $this->customWaiter(function () use ($databaseName) { return $this->glueClient->getDatabase([ 'Name' => $databaseName, ]); }); } public function getTables($databaseName): Result { return $this->glueClient->getTables([ 'DatabaseName' => $databaseName, ]); } public function createJob($jobName, $role, $scriptLocation, $pythonVersion = '3', $glueVersion = '3.0'): Result { return $this->glueClient->createJob([ 'Name' => $jobName, 'Role' => $role, 'Command' => [ 'Name' => 'glueetl', 'ScriptLocation' => $scriptLocation, 'PythonVersion' => $pythonVersion, ], 'GlueVersion' => $glueVersion, ]); } public function startJobRun($jobName, $databaseName, $tables, $outputBucketUrl): Result { return $this->glueClient->startJobRun([ 'JobName' => $jobName, 'Arguments' => [ 'input_database' => $databaseName, 'input_table' => $tables['TableList'][0]['Name'], 'output_bucket_url' => $outputBucketUrl, '--input_database' => $databaseName, '--input_table' => $tables['TableList'][0]['Name'], '--output_bucket_url' => $outputBucketUrl, ], ]); } public function listJobs($maxResults = null, $nextToken = null, $tags = []): Result { $arguments = []; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } if (!empty($tags)) { $arguments['Tags'] = $tags; } return $this->glueClient->listJobs($arguments); } public function getJobRuns($jobName, $maxResults = 0, $nextToken = ''): Result { $arguments = ['JobName' => $jobName]; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } return $this->glueClient->getJobRuns($arguments); } public function getJobRun($jobName, $runId, $predecessorsIncluded = false): Result { return $this->glueClient->getJobRun([ 'JobName' => $jobName, 'RunId' => $runId, 'PredecessorsIncluded' => $predecessorsIncluded, ]); } public function deleteJob($jobName) { return $this->glueClient->deleteJob([ 'JobName' => $jobName, ]); } public function deleteTable($tableName, $databaseName) { return $this->glueClient->deleteTable([ 'DatabaseName' => $databaseName, 'Name' => $tableName, ]); } public function deleteDatabase($databaseName) { return $this->glueClient->deleteDatabase([ 'Name' => $databaseName, ]); } public function deleteCrawler($crawlerName) { return $this->glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); } }
Python
SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create a class that wraps AWS Glue functions used in the scenario.

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_crawler(self, name): """ Gets information about a crawler. :param name: The name of the crawler to look up. :return: Data about the crawler. """ crawler = None try: response = self.glue_client.get_crawler(Name=name) crawler = response["Crawler"] except ClientError as err: if err.response["Error"]["Code"] == "EntityNotFoundException": logger.info("Crawler %s doesn't exist.", name) else: logger.error( "Couldn't get crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return crawler def create_crawler(self, name, role_arn, db_name, db_prefix, s3_target): """ Creates a crawler that can crawl the specified target and populate a database in your AWS Glue Data Catalog with metadata that describes the data in the target. :param name: The name of the crawler. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param db_name: The name to give the database that is created by the crawler. :param db_prefix: The prefix to give any database tables that are created by the crawler. :param s3_target: The URL to an S3 bucket that contains data that is the target of the crawler. """ try: self.glue_client.create_crawler( Name=name, Role=role_arn, DatabaseName=db_name, TablePrefix=db_prefix, Targets={"S3Targets": [{"Path": s3_target}]}, ) except ClientError as err: logger.error( "Couldn't create crawler. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_crawler(self, name): """ Starts a crawler. The crawler crawls its configured target and creates metadata that describes the data it finds in the target data source. :param name: The name of the crawler to start. """ try: self.glue_client.start_crawler(Name=name) except ClientError as err: logger.error( "Couldn't start crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_database(self, name): """ Gets information about a database in your Data Catalog. :param name: The name of the database to look up. :return: Information about the database. """ try: response = self.glue_client.get_database(Name=name) except ClientError as err: logger.error( "Couldn't get database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Database"] def get_tables(self, db_name): """ Gets a list of tables in a Data Catalog database. :param db_name: The name of the database to query. :return: The list of tables in the database. """ try: response = self.glue_client.get_tables(DatabaseName=db_name) except ClientError as err: logger.error( "Couldn't get tables %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["TableList"] def create_job(self, name, description, role_arn, script_location): """ Creates a job definition for an extract, transform, and load (ETL) job that can be run by AWS Glue. :param name: The name of the job definition. :param description: The description of the job definition. :param role_arn: The ARN of an IAM role that grants AWS Glue the permissions it requires to run the job. :param script_location: The Amazon S3 URL of a Python ETL script that is run as part of the job. The script defines how the data is transformed. """ try: self.glue_client.create_job( Name=name, Description=description, Role=role_arn, Command={ "Name": "glueetl", "ScriptLocation": script_location, "PythonVersion": "3", }, GlueVersion="3.0", ) except ClientError as err: logger.error( "Couldn't create job %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_job_run(self, name, input_database, input_table, output_bucket_name): """ Starts a job run. A job run extracts data from the source, transforms it, and loads it to the output bucket. :param name: The name of the job definition. :param input_database: The name of the metadata database that contains tables that describe the source data. This is typically created by a crawler. :param input_table: The name of the table in the metadata database that describes the source data. :param output_bucket_name: The S3 bucket where the output is written. :return: The ID of the job run. """ try: # The custom Arguments that are passed to this function are used by the # Python ETL script to determine the location of input and output data. response = self.glue_client.start_job_run( JobName=name, Arguments={ "--input_database": input_database, "--input_table": input_table, "--output_bucket_url": f"s3://{output_bucket_name}/", }, ) except ClientError as err: logger.error( "Couldn't start job run %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRunId"] def list_jobs(self): """ Lists the names of job definitions in your account. :return: The list of job definition names. """ try: response = self.glue_client.list_jobs() except ClientError as err: logger.error( "Couldn't list jobs. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobNames"] def get_job_runs(self, job_name): """ Gets information about runs that have been performed for a specific job definition. :param job_name: The name of the job definition to look up. :return: The list of job runs. """ try: response = self.glue_client.get_job_runs(JobName=job_name) except ClientError as err: logger.error( "Couldn't get job runs for %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRuns"] def get_job_run(self, name, run_id): """ Gets information about a single job run. :param name: The name of the job definition for the run. :param run_id: The ID of the run. :return: Information about the run. """ try: response = self.glue_client.get_job_run(JobName=name, RunId=run_id) except ClientError as err: logger.error( "Couldn't get job run %s/%s. Here's why: %s: %s", name, run_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRun"] def delete_job(self, job_name): """ Deletes a job definition. This also deletes data about all runs that are associated with this job definition. :param job_name: The name of the job definition to delete. """ try: self.glue_client.delete_job(JobName=job_name) except ClientError as err: logger.error( "Couldn't delete job %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_table(self, db_name, table_name): """ Deletes a table from a metadata database. :param db_name: The name of the database that contains the table. :param table_name: The name of the table to delete. """ try: self.glue_client.delete_table(DatabaseName=db_name, Name=table_name) except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_database(self, name): """ Deletes a metadata database from your Data Catalog. :param name: The name of the database to delete. """ try: self.glue_client.delete_database(Name=name) except ClientError as err: logger.error( "Couldn't delete database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_crawler(self, name): """ Deletes a crawler. :param name: The name of the crawler to delete. """ try: self.glue_client.delete_crawler(Name=name) except ClientError as err: logger.error( "Couldn't delete crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise

Create a class that runs the scenario.

class GlueCrawlerJobScenario: """ Encapsulates a scenario that shows how to create an AWS Glue crawler and job and use them to transform data from CSV to JSON format. """ def __init__(self, glue_client, glue_service_role, glue_bucket): """ :param glue_client: A Boto3 AWS Glue client. :param glue_service_role: An AWS Identity and Access Management (IAM) role that AWS Glue can assume to gain access to the resources it requires. :param glue_bucket: An S3 bucket that can hold a job script and output data from AWS Glue job runs. """ self.glue_client = glue_client self.glue_service_role = glue_service_role self.glue_bucket = glue_bucket @staticmethod def wait(seconds, tick=12): """ Waits for a specified number of seconds, while also displaying an animated spinner. :param seconds: The number of seconds to wait. :param tick: The number of frames per second used to animate the spinner. """ progress = "|/-\\" waited = 0 while waited < seconds: for frame in range(tick): sys.stdout.write(f"\r{progress[frame % len(progress)]}") sys.stdout.flush() time.sleep(1 / tick) waited += 1 def upload_job_script(self, job_script): """ Uploads a Python ETL script to an S3 bucket. The script is used by the AWS Glue job to transform data. :param job_script: The relative path to the job script. """ try: self.glue_bucket.upload_file(Filename=job_script, Key=job_script) print(f"Uploaded job script '{job_script}' to the example bucket.") except S3UploadFailedError as err: logger.error("Couldn't upload job script. Here's why: %s", err) raise def run(self, crawler_name, db_name, db_prefix, data_source, job_script, job_name): """ Runs the scenario. This is an interactive experience that runs at a command prompt and asks you for input throughout. :param crawler_name: The name of the crawler used in the scenario. If the crawler does not exist, it is created. :param db_name: The name to give the metadata database created by the crawler. :param db_prefix: The prefix to give tables added to the database by the crawler. :param data_source: The location of the data source that is targeted by the crawler and extracted during job runs. :param job_script: The job script that is used to transform data during job runs. :param job_name: The name to give the job definition that is created during the scenario. """ wrapper = GlueWrapper(self.glue_client) print(f"Checking for crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) if crawler is None: print(f"Creating crawler {crawler_name}.") wrapper.create_crawler( crawler_name, self.glue_service_role.arn, db_name, db_prefix, data_source, ) print(f"Created crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) pprint(crawler) print("-" * 88) print( f"When you run the crawler, it crawls data stored in {data_source} and " f"creates a metadata database in the AWS Glue Data Catalog that describes " f"the data in the data source." ) print("In this example, the source data is in CSV format.") ready = False while not ready: ready = Question.ask_question( "Ready to start the crawler? (y/n) ", Question.is_yesno ) wrapper.start_crawler(crawler_name) print("Let's wait for the crawler to run. This typically takes a few minutes.") crawler_state = None while crawler_state != "READY": self.wait(10) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler["State"] print(f"Crawler is {crawler['State']}.") print("-" * 88) database = wrapper.get_database(db_name) print(f"The crawler created database {db_name}:") pprint(database) print(f"The database contains these tables:") tables = wrapper.get_tables(db_name) for index, table in enumerate(tables): print(f"\t{index + 1}. {table['Name']}") table_index = Question.ask_question( f"Enter the number of a table to see more detail: ", Question.is_int, Question.in_range(1, len(tables)), ) pprint(tables[table_index - 1]) print("-" * 88) print(f"Creating job definition {job_name}.") wrapper.create_job( job_name, "Getting started example job.", self.glue_service_role.arn, f"s3://{self.glue_bucket.name}/{job_script}", ) print("Created job definition.") print( f"When you run the job, it extracts data from {data_source}, transforms it " f"by using the {job_script} script, and loads the output into " f"S3 bucket {self.glue_bucket.name}." ) print( "In this example, the data is transformed from CSV to JSON, and only a few " "fields are included in the output." ) job_run_status = None if Question.ask_question(f"Ready to run? (y/n) ", Question.is_yesno): job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["Name"], self.glue_bucket.name ) print(f"Job {job_name} started. Let's wait for it to run.") while job_run_status not in ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"]: self.wait(10) job_run = wrapper.get_job_run(job_name, job_run_id) job_run_status = job_run["JobRunState"] print(f"Job {job_name}/{job_run_id} is {job_run_status}.") print("-" * 88) if job_run_status == "SUCCEEDED": print( f"Data from your job run is stored in your S3 bucket '{self.glue_bucket.name}':" ) try: keys = [ obj.key for obj in self.glue_bucket.objects.filter(Prefix="run-") ] for index, key in enumerate(keys): print(f"\t{index + 1}: {key}") lines = 4 key_index = Question.ask_question( f"Enter the number of a block to download it and see the first {lines} " f"lines of JSON output in the block: ", Question.is_int, Question.in_range(1, len(keys)), ) job_data = io.BytesIO() self.glue_bucket.download_fileobj(keys[key_index - 1], job_data) job_data.seek(0) for _ in range(lines): print(job_data.readline().decode("utf-8")) except ClientError as err: logger.error( "Couldn't get job run data. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise print("-" * 88) job_names = wrapper.list_jobs() if job_names: print(f"Your account has {len(job_names)} jobs defined:") for index, job_name in enumerate(job_names): print(f"\t{index + 1}. {job_name}") job_index = Question.ask_question( f"Enter a number between 1 and {len(job_names)} to see the list of runs for " f"a job: ", Question.is_int, Question.in_range(1, len(job_names)), ) job_runs = wrapper.get_job_runs(job_names[job_index - 1]) if job_runs: print(f"Found {len(job_runs)} runs for job {job_names[job_index - 1]}:") for index, job_run in enumerate(job_runs): print( f"\t{index + 1}. {job_run['JobRunState']} on " f"{job_run['CompletedOn']:%Y-%m-%d %H:%M:%S}" ) run_index = Question.ask_question( f"Enter a number between 1 and {len(job_runs)} to see details for a run: ", Question.is_int, Question.in_range(1, len(job_runs)), ) pprint(job_runs[run_index - 1]) else: print(f"No runs found for job {job_names[job_index - 1]}") else: print("Your account doesn't have any jobs defined.") print("-" * 88) print( f"Let's clean up. During this example we created job definition '{job_name}'." ) if Question.ask_question( "Do you want to delete the definition and all runs? (y/n) ", Question.is_yesno, ): wrapper.delete_job(job_name) print(f"Job definition '{job_name}' deleted.") tables = wrapper.get_tables(db_name) print(f"We also created database '{db_name}' that contains these tables:") for table in tables: print(f"\t{table['Name']}") if Question.ask_question( "Do you want to delete the tables and the database? (y/n) ", Question.is_yesno, ): for table in tables: wrapper.delete_table(db_name, table["Name"]) print(f"Deleted table {table['Name']}.") wrapper.delete_database(db_name) print(f"Deleted database {db_name}.") print(f"We also created crawler '{crawler_name}'.") if Question.ask_question( "Do you want to delete the crawler? (y/n) ", Question.is_yesno ): wrapper.delete_crawler(crawler_name) print(f"Deleted crawler {crawler_name}.") print("-" * 88) def parse_args(args): """ Parse command line arguments. :param args: The command line arguments. :return: The parsed arguments. """ parser = argparse.ArgumentParser( description="Runs the AWS Glue getting started with crawlers and jobs scenario. " "Before you run this scenario, set up scaffold resources by running " "'python scaffold.py deploy'." ) parser.add_argument( "role_name", help="The name of an IAM role that AWS Glue can assume. This role must grant access " "to Amazon S3 and to the permissions granted by the AWSGlueServiceRole " "managed policy.", ) parser.add_argument( "bucket_name", help="The name of an S3 bucket that AWS Glue can access to get the job script and " "put job results.", ) parser.add_argument( "--job_script", default="flight_etl_job_script.py", help="The name of the job script file that is used in the scenario.", ) return parser.parse_args(args) def main(): args = parse_args(sys.argv[1:]) try: print("-" * 88) print( "Welcome to the AWS Glue getting started with crawlers and jobs scenario." ) print("-" * 88) scenario = GlueCrawlerJobScenario( boto3.client("glue"), boto3.resource("iam").Role(args.role_name), boto3.resource("s3").Bucket(args.bucket_name), ) scenario.upload_job_script(args.job_script) scenario.run( "doc-example-crawler", "doc-example-database", "doc-example-", "s3://crawler-public-us-east-1/flight/2016/csv", args.job_script, "doc-example-job", ) print("-" * 88) print( "To destroy scaffold resources, including the IAM role and S3 bucket " "used in this scenario, run 'python scaffold.py destroy'." ) print("\nThanks for watching!") print("-" * 88) except Exception: logging.exception("Something went wrong with the example.")

Create an ETL script that is used by AWS Glue to extract, transform, and load data during job runs.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
Ruby
SDK for Ruby
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create a class that wraps AWS Glue functions used in the scenario.

# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves information about a specific crawler. # # @param name [String] The name of the crawler to retrieve information about. # @return [Aws::Glue::Types::Crawler, nil] The crawler object if found, or nil if not found. def get_crawler(name) @glue_client.get_crawler(name: name) rescue Aws::Glue::Errors::EntityNotFoundException @logger.info("Crawler #{name} doesn't exist.") false rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get crawler #{name}: \n#{e.message}") raise end # Creates a new crawler with the specified configuration. # # @param name [String] The name of the crawler. # @param role_arn [String] The ARN of the IAM role to be used by the crawler. # @param db_name [String] The name of the database where the crawler stores its metadata. # @param db_prefix [String] The prefix to be added to the names of tables that the crawler creates. # @param s3_target [String] The S3 path that the crawler will crawl. # @return [void] def create_crawler(name, role_arn, db_name, db_prefix, s3_target) @glue_client.create_crawler( name: name, role: role_arn, database_name: db_name, targets: { s3_targets: [ { path: s3_target } ] } ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create crawler: \n#{e.message}") raise end # Starts a crawler with the specified name. # # @param name [String] The name of the crawler to start. # @return [void] def start_crawler(name) @glue_client.start_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not start crawler #{name}: \n#{e.message}") raise end # Deletes a crawler with the specified name. # # @param name [String] The name of the crawler to delete. # @return [void] def delete_crawler(name) @glue_client.delete_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete crawler #{name}: \n#{e.message}") raise end # Retrieves information about a specific database. # # @param name [String] The name of the database to retrieve information about. # @return [Aws::Glue::Types::Database, nil] The database object if found, or nil if not found. def get_database(name) response = @glue_client.get_database(name: name) response.database rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get database #{name}: \n#{e.message}") raise end # Retrieves a list of tables in the specified database. # # @param db_name [String] The name of the database to retrieve tables from. # @return [Array<Aws::Glue::Types::Table>] def get_tables(db_name) response = @glue_client.get_tables(database_name: db_name) response.table_list rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get tables #{db_name}: \n#{e.message}") raise end # Creates a new job with the specified configuration. # # @param name [String] The name of the job. # @param description [String] The description of the job. # @param role_arn [String] The ARN of the IAM role to be used by the job. # @param script_location [String] The location of the ETL script for the job. # @return [void] def create_job(name, description, role_arn, script_location) @glue_client.create_job( name: name, description: description, role: role_arn, command: { name: "glueetl", script_location: script_location, python_version: "3" }, glue_version: "3.0" ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create job #{name}: \n#{e.message}") raise end # Starts a job run for the specified job. # # @param name [String] The name of the job to start the run for. # @param input_database [String] The name of the input database for the job. # @param input_table [String] The name of the input table for the job. # @param output_bucket_name [String] The name of the output S3 bucket for the job. # @return [String] The ID of the started job run. def start_job_run(name, input_database, input_table, output_bucket_name) response = @glue_client.start_job_run( job_name: name, arguments: { '--input_database': input_database, '--input_table': input_table, '--output_bucket_url': "s3://#{output_bucket_name}/" } ) response.job_run_id rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not start job run #{name}: \n#{e.message}") raise end # Retrieves a list of jobs in AWS Glue. # # @return [Aws::Glue::Types::ListJobsResponse] def list_jobs @glue_client.list_jobs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not list jobs: \n#{e.message}") raise end # Retrieves a list of job runs for the specified job. # # @param job_name [String] The name of the job to retrieve job runs for. # @return [Array<Aws::Glue::Types::JobRun>] def get_job_runs(job_name) response = @glue_client.get_job_runs(job_name: job_name) response.job_runs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Retrieves data for a specific job run. # # @param job_name [String] The name of the job run to retrieve data for. # @return [Glue::Types::GetJobRunResponse] def get_job_run(job_name, run_id) @glue_client.get_job_run(job_name: job_name, run_id: run_id) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Deletes a job with the specified name. # # @param job_name [String] The name of the job to delete. # @return [void] def delete_job(job_name) @glue_client.delete_job(job_name: job_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Deletes a table with the specified name. # # @param database_name [String] The name of the catalog database in which the table resides. # @param table_name [String] The name of the table to be deleted. # @return [void] def delete_table(database_name, table_name) @glue_client.delete_table(database_name: database_name, name: table_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Removes a specified database from a Data Catalog. # # @param database_name [String] The name of the database to delete. # @return [void] def delete_database(database_name) @glue_client.delete_database(name: database_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete database: \n#{e.message}") end # Uploads a job script file to an S3 bucket. # # @param file_path [String] The local path of the job script file. # @param bucket_resource [Aws::S3::Bucket] The S3 bucket resource to upload the file to. # @return [void] def upload_job_script(file_path, bucket_resource) File.open(file_path) do |file| bucket_resource.client.put_object({ body: file, bucket: bucket_resource.name, key: file_path }) end rescue Aws::S3::Errors::S3UploadFailedError => e @logger.error("S3 could not upload job script: \n#{e.message}") raise end end

Create a class that runs the scenario.

class GlueCrawlerJobScenario def initialize(glue_client, glue_service_role, glue_bucket, logger) @glue_client = glue_client @glue_service_role = glue_service_role @glue_bucket = glue_bucket @logger = logger end def run(crawler_name, db_name, db_prefix, data_source, job_script, job_name) wrapper = GlueWrapper.new(@glue_client, @logger) new_step(1, "Create a crawler") puts "Checking for crawler #{crawler_name}." crawler = wrapper.get_crawler(crawler_name) if crawler == false puts "Creating crawler #{crawler_name}." wrapper.create_crawler(crawler_name, @glue_service_role.arn, db_name, db_prefix, data_source) puts "Successfully created #{crawler_name}:" crawler = wrapper.get_crawler(crawler_name) puts JSON.pretty_generate(crawler).yellow end print "\nDone!\n".green new_step(2, "Run a crawler to output a database.") puts "Location of input data analyzed by crawler: #{data_source}" puts "Outputs: a Data Catalog database in CSV format containing metadata on input." wrapper.start_crawler(crawler_name) puts "Starting crawler... (this typically takes a few minutes)" crawler_state = nil while crawler_state != "READY" custom_wait(15) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler[0]["state"] print "Status check: #{crawler_state}.".yellow end print "\nDone!\n".green new_step(3, "Query the database.") database = wrapper.get_database(db_name) puts "The crawler created database #{db_name}:" print "#{database}".yellow puts "\nThe database contains these tables:" tables = wrapper.get_tables(db_name) tables.each_with_index do |table, index| print "\t#{index + 1}. #{table['name']}".yellow end print "\nDone!\n".green new_step(4, "Create a job definition that runs an ETL script.") puts "Uploading Python ETL script to S3..." wrapper.upload_job_script(job_script, @glue_bucket) puts "Creating job definition #{job_name}:\n" response = wrapper.create_job(job_name, "Getting started example job.", @glue_service_role.arn, "s3://#{@glue_bucket.name}/#{job_script}") puts JSON.pretty_generate(response).yellow print "\nDone!\n".green new_step(5, "Start a new job") job_run_status = nil job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["name"], @glue_bucket.name ) puts "Job #{job_name} started. Let's wait for it to run." until ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"].include?(job_run_status) custom_wait(10) job_run = wrapper.get_job_runs(job_name) job_run_status = job_run[0]["job_run_state"] print "Status check: #{job_name}/#{job_run_id} - #{job_run_status}.".yellow end print "\nDone!\n".green new_step(6, "View results from a successful job run.") if job_run_status == "SUCCEEDED" puts "Data from your job run is stored in your S3 bucket '#{@glue_bucket.name}'. Files include:" begin # Print the key name of each object in the bucket. @glue_bucket.objects.each do |object_summary| if object_summary.key.include?("run-") print "#{object_summary.key}".yellow end end # Print the first 256 bytes of a run file desired_sample_objects = 1 @glue_bucket.objects.each do |object_summary| if object_summary.key.include?("run-") if desired_sample_objects > 0 sample_object = @glue_bucket.object(object_summary.key) sample = sample_object.get(range: "bytes=0-255").body.read puts "\nSample run file contents:" print "#{sample}".yellow desired_sample_objects -= 1 end end end rescue Aws::S3::Errors::ServiceError => e logger.error( "Couldn't get job run data. Here's why: %s: %s", e.response.error.code, e.response.error.message ) raise end end print "\nDone!\n".green new_step(7, "Delete job definition and crawler.") wrapper.delete_job(job_name) puts "Job deleted: #{job_name}." wrapper.delete_crawler(crawler_name) puts "Crawler deleted: #{crawler_name}." wrapper.delete_table(db_name, tables[0]["name"]) puts "Table deleted: #{tables[0]["name"]} in #{db_name}." wrapper.delete_database(db_name) puts "Database deleted: #{db_name}." print "\nDone!\n".green end end def main banner("../../helpers/banner.txt") puts "######################################################################################################".yellow puts "# #".yellow puts "# EXAMPLE CODE DEMO: #".yellow puts "# AWS Glue #".yellow puts "# #".yellow puts "######################################################################################################".yellow puts "" puts "You have launched a demo of AWS Glue using the AWS for Ruby v3 SDK. Over the next 60 seconds, it will" puts "do the following:" puts " 1. Create a crawler." puts " 2. Run a crawler to output a database." puts " 3. Query the database." puts " 4. Create a job definition that runs an ETL script." puts " 5. Start a new job." puts " 6. View results from a successful job run." puts " 7. Delete job definition and crawler." puts "" confirm_begin billing security puts "\e[H\e[2J" # Set input file names job_script_filepath = "job_script.py" resource_names = YAML.load_file("resource_names.yaml") # Instantiate existing IAM role. iam = Aws::IAM::Resource.new(region: "us-east-1") iam_role_name = resource_names["glue_service_role"] iam_role = iam.role(iam_role_name) # Instantiate existing S3 bucket. s3 = Aws::S3::Resource.new(region: "us-east-1") s3_bucket_name = resource_names["glue_bucket"] s3_bucket = s3.bucket(s3_bucket_name) scenario = GlueCrawlerJobScenario.new( Aws::Glue::Client.new(region: "us-east-1"), iam_role, s3_bucket, @logger ) random_int = rand(10 ** 4) scenario.run( "doc-example-crawler-#{random_int}", "doc-example-database-#{random_int}", "doc-example-#{random_int}-", "s3://crawler-public-us-east-1/flight/2016/csv", job_script_filepath, "doc-example-job-#{random_int}" ) puts "-" * 88 puts "You have reached the end of this tour of AWS Glue." puts "To destroy CDK-created resources, run:\n cdk destroy" puts "-" * 88 end

Create an ETL script that is used by AWS Glue to extract, transform, and load data during job runs.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
Rust
SDK for Rust
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create and run a crawler that crawls a public Amazon Simple Storage Service (Amazon S3) bucket and generates a metadata database that describes the CSV-formatted data it finds.

let create_crawler = glue .create_crawler() .name(self.crawler()) .database_name(self.database()) .role(self.iam_role.expose_secret()) .targets( CrawlerTargets::builder() .s3_targets(S3Target::builder().path(CRAWLER_TARGET).build()) .build(), ) .send() .await; match create_crawler { Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::AlreadyExistsException(_) => { info!("Using existing crawler"); Ok(()) } _ => Err(GlueMvpError::GlueSdk(glue_err)), } } Ok(_) => Ok(()), }?; let start_crawler = glue.start_crawler().name(self.crawler()).send().await; match start_crawler { Ok(_) => Ok(()), Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::CrawlerRunningException(_) => Ok(()), _ => Err(GlueMvpError::GlueSdk(glue_err)), } } }?;

List information about databases and tables in your AWS Glue Data Catalog.

let database = glue .get_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)? .to_owned(); let database = database .database() .ok_or_else(|| GlueMvpError::Unknown("Could not find database".into()))?; let tables = glue .get_tables() .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let tables = tables.table_list();

Create and run a job that extracts CSV data from the source Amazon S3 bucket, transforms it by removing and renaming fields, and loads JSON-formatted output into another Amazon S3 bucket.

let create_job = glue .create_job() .name(self.job()) .role(self.iam_role.expose_secret()) .command( JobCommand::builder() .name("glueetl") .python_version("3") .script_location(format!("s3://{}/job.py", self.bucket())) .build(), ) .glue_version("3.0") .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job_name = create_job.name().ok_or_else(|| { GlueMvpError::Unknown("Did not get job name after creating job".into()) })?; let job_run_output = glue .start_job_run() .job_name(self.job()) .arguments("--input_database", self.database()) .arguments( "--input_table", self.tables .first() .ok_or_else(|| GlueMvpError::Unknown("Missing crawler table".into()))? .name(), ) .arguments("--output_bucket_url", self.bucket()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job = job_run_output .job_run_id() .ok_or_else(|| GlueMvpError::Unknown("Missing run id from just started job".into()))? .to_string();

Delete all resources created by the demo.

glue.delete_job() .job_name(self.job()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; for t in &self.tables { glue.delete_table() .name(t.name()) .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; } glue.delete_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; glue.delete_crawler() .name(self.crawler()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?;

For a complete list of AWS SDK developer guides and code examples, see Using this service with an AWS SDK. This topic also includes information about getting started and details about previous SDK versions.