Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Aprenda los conceptos básicos de AWS Glue con un AWS SDK
En el siguiente ejemplo de código, se muestra cómo:
Cree un rastreador que rastree un bucket público de Amazon S3 y genere una base de datos de CSV metadatos con formato.
Enumere información sobre bases de datos y tablas en su. AWS Glue Data Catalog
Cree un trabajo para extraer CSV datos del depósito de S3, transformarlos y cargar la salida JSON con formato en otro depósito de S3.
Incluir información sobre las ejecuciones de trabajos, ver algunos de los datos transformados y limpiar los recursos.
Para obtener más información, consulta el tutorial: Cómo empezar a usar AWS Glue Studio.
- .NET
-
- AWS SDK for .NET
-
nota
Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Cree una clase que agrupe AWS Glue las funciones que se utilizan en el escenario.
using System.Net; namespace GlueActions; public class GlueWrapper { private readonly IAmazonGlue _amazonGlue; /// <summary> /// Constructor for the AWS Glue actions wrapper. /// </summary> /// <param name="amazonGlue"></param> public GlueWrapper(IAmazonGlue amazonGlue) { _amazonGlue = amazonGlue; } /// <summary> /// Create an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name for the crawler.</param> /// <param name="crawlerDescription">A description of the crawler.</param> /// <param name="role">The AWS Identity and Access Management (IAM) role to /// be assumed by the crawler.</param> /// <param name="schedule">The schedule on which the crawler will be executed.</param> /// <param name="s3Path">The path to the Amazon Simple Storage Service (Amazon S3) /// bucket where the Python script has been stored.</param> /// <param name="dbName">The name to use for the database that will be /// created by the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateCrawlerAsync( string crawlerName, string crawlerDescription, string role, string schedule, string s3Path, string dbName) { var s3Target = new S3Target { Path = s3Path, }; var targetList = new List<S3Target> { s3Target, }; var targets = new CrawlerTargets { S3Targets = targetList, }; var crawlerRequest = new CreateCrawlerRequest { DatabaseName = dbName, Name = crawlerName, Description = crawlerDescription, Targets = targets, Role = role, Schedule = schedule, }; var response = await _amazonGlue.CreateCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Create an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="roleName">The name of the IAM role to be assumed by /// the job.</param> /// <param name="description">A description of the job.</param> /// <param name="scriptUrl">The URL to the script.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> CreateJobAsync(string dbName, string tableName, string bucketUrl, string jobName, string roleName, string description, string scriptUrl) { var command = new JobCommand { PythonVersion = "3", Name = "glueetl", ScriptLocation = scriptUrl, }; var arguments = new Dictionary<string, string> { { "--input_database", dbName }, { "--input_table", tableName }, { "--output_bucket_url", bucketUrl } }; var request = new CreateJobRequest { Command = command, DefaultArguments = arguments, Description = description, GlueVersion = "3.0", Name = jobName, NumberOfWorkers = 10, Role = roleName, WorkerType = "G.1X" }; var response = await _amazonGlue.CreateJobAsync(request); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteCrawlerAsync(string crawlerName) { var response = await _amazonGlue.DeleteCrawlerAsync(new DeleteCrawlerRequest { Name = crawlerName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete the AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteDatabaseAsync(string dbName) { var response = await _amazonGlue.DeleteDatabaseAsync(new DeleteDatabaseRequest { Name = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete an AWS Glue job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteJobAsync(string jobName) { var response = await _amazonGlue.DeleteJobAsync(new DeleteJobRequest { JobName = jobName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Delete a table from an AWS Glue database. /// </summary> /// <param name="tableName">The table to delete.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> DeleteTableAsync(string dbName, string tableName) { var response = await _amazonGlue.DeleteTableAsync(new DeleteTableRequest { Name = tableName, DatabaseName = dbName }); return response.HttpStatusCode == HttpStatusCode.OK; } /// <summary> /// Get information about an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Crawler object describing the crawler.</returns> public async Task<Crawler?> GetCrawlerAsync(string crawlerName) { var crawlerRequest = new GetCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.GetCrawlerAsync(crawlerRequest); if (response.HttpStatusCode == System.Net.HttpStatusCode.OK) { var databaseName = response.Crawler.DatabaseName; Console.WriteLine($"{crawlerName} has the database {databaseName}"); return response.Crawler; } Console.WriteLine($"No information regarding {crawlerName} could be found."); return null; } /// <summary> /// Get information about the state of an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A value describing the state of the crawler.</returns> public async Task<CrawlerState> GetCrawlerStateAsync(string crawlerName) { var response = await _amazonGlue.GetCrawlerAsync( new GetCrawlerRequest { Name = crawlerName }); return response.Crawler.State; } /// <summary> /// Get information about an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A Database object containing information about the database.</returns> public async Task<Database> GetDatabaseAsync(string dbName) { var databasesRequest = new GetDatabaseRequest { Name = dbName, }; var response = await _amazonGlue.GetDatabaseAsync(databasesRequest); return response.Database; } /// <summary> /// Get information about a specific AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <param name="jobRunId">The Id of the job run.</param> /// <returns>A JobRun object with information about the job run.</returns> public async Task<JobRun> GetJobRunAsync(string jobName, string jobRunId) { var response = await _amazonGlue.GetJobRunAsync(new GetJobRunRequest { JobName = jobName, RunId = jobRunId }); return response.JobRun; } /// <summary> /// Get information about all AWS Glue runs of a specific job. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A list of JobRun objects.</returns> public async Task<List<JobRun>> GetJobRunsAsync(string jobName) { var jobRuns = new List<JobRun>(); var request = new GetJobRunsRequest { JobName = jobName, }; // No need to loop to get all the log groups--the SDK does it for us behind the scenes var paginatorForJobRuns = _amazonGlue.Paginators.GetJobRuns(request); await foreach (var response in paginatorForJobRuns.Responses) { response.JobRuns.ForEach(jobRun => { jobRuns.Add(jobRun); }); } return jobRuns; } /// <summary> /// Get a list of tables for an AWS Glue database. /// </summary> /// <param name="dbName">The name of the database.</param> /// <returns>A list of Table objects.</returns> public async Task<List<Table>> GetTablesAsync(string dbName) { var request = new GetTablesRequest { DatabaseName = dbName }; var tables = new List<Table>(); // Get a paginator for listing the tables. var tablePaginator = _amazonGlue.Paginators.GetTables(request); await foreach (var response in tablePaginator.Responses) { tables.AddRange(response.TableList); } return tables; } /// <summary> /// List AWS Glue jobs using a paginator. /// </summary> /// <returns>A list of AWS Glue job names.</returns> public async Task<List<string>> ListJobsAsync() { var jobNames = new List<string>(); var listJobsPaginator = _amazonGlue.Paginators.ListJobs(new ListJobsRequest { MaxResults = 10 }); await foreach (var response in listJobsPaginator.Responses) { jobNames.AddRange(response.JobNames); } return jobNames; } /// <summary> /// Start an AWS Glue crawler. /// </summary> /// <param name="crawlerName">The name of the crawler.</param> /// <returns>A Boolean value indicating the success of the action.</returns> public async Task<bool> StartCrawlerAsync(string crawlerName) { var crawlerRequest = new StartCrawlerRequest { Name = crawlerName, }; var response = await _amazonGlue.StartCrawlerAsync(crawlerRequest); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } /// <summary> /// Start an AWS Glue job run. /// </summary> /// <param name="jobName">The name of the job.</param> /// <returns>A string representing the job run Id.</returns> public async Task<string> StartJobRunAsync( string jobName, string inputDatabase, string inputTable, string bucketName) { var request = new StartJobRunRequest { JobName = jobName, Arguments = new Dictionary<string, string> { {"--input_database", inputDatabase}, {"--input_table", inputTable}, {"--output_bucket_url", $"s3://{bucketName}/"} } }; var response = await _amazonGlue.StartJobRunAsync(request); return response.JobRunId; } }
Crear una clase que ejecute el escenario.
global using Amazon.Glue; global using GlueActions; global using Microsoft.Extensions.Configuration; global using Microsoft.Extensions.DependencyInjection; global using Microsoft.Extensions.Hosting; global using Microsoft.Extensions.Logging; global using Microsoft.Extensions.Logging.Console; global using Microsoft.Extensions.Logging.Debug; using Amazon.Glue.Model; using Amazon.S3; using Amazon.S3.Model; namespace GlueBasics; public class GlueBasics { private static ILogger logger = null!; private static IConfiguration _configuration = null!; static async Task Main(string[] args) { // Set up dependency injection for AWS Glue. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonGlue>() .AddTransient<GlueWrapper>() .AddTransient<UiWrapper>() ) .Build(); logger = LoggerFactory.Create(builder => { builder.AddConsole(); }) .CreateLogger<GlueBasics>(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally load local settings. .Build(); // These values are stored in settings.json // Once you have run the CDK script to deploy the resources, // edit the file to set "BucketName", "RoleName", and "ScriptURL" // to the appropriate values. Also set "CrawlerName" to the name // you want to give the crawler when it is created. string bucketName = _configuration["BucketName"]!; string bucketUrl = _configuration["BucketUrl"]!; string crawlerName = _configuration["CrawlerName"]!; string roleName = _configuration["RoleName"]!; string sourceData = _configuration["SourceData"]!; string dbName = _configuration["DbName"]!; string cron = _configuration["Cron"]!; string scriptUrl = _configuration["ScriptURL"]!; string jobName = _configuration["JobName"]!; var wrapper = host.Services.GetRequiredService<GlueWrapper>(); var uiWrapper = host.Services.GetRequiredService<UiWrapper>(); uiWrapper.DisplayOverview(); uiWrapper.PressEnter(); // Create the crawler and wait for it to be ready. uiWrapper.DisplayTitle("Create AWS Glue crawler"); Console.WriteLine("Let's begin by creating the AWS Glue crawler."); var crawlerDescription = "Crawler created for the AWS Glue Basics scenario."; var crawlerCreated = await wrapper.CreateCrawlerAsync(crawlerName, crawlerDescription, roleName, cron, sourceData, dbName); if (crawlerCreated) { Console.WriteLine($"The crawler: {crawlerName} has been created. Now let's wait until it's ready."); CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't create crawler {crawlerName}."); return; // Exit the application. } uiWrapper.DisplayTitle("Start AWS Glue crawler"); Console.WriteLine("Now let's wait until the crawler has successfully started."); var crawlerStarted = await wrapper.StartCrawlerAsync(crawlerName); if (crawlerStarted) { CrawlerState crawlerState; do { crawlerState = await wrapper.GetCrawlerStateAsync(crawlerName); } while (crawlerState != "READY"); Console.WriteLine($"The crawler {crawlerName} is now ready for use."); } else { Console.WriteLine($"Couldn't start the crawler {crawlerName}."); return; // Exit the application. } uiWrapper.PressEnter(); Console.WriteLine($"\nLet's take a look at the database: {dbName}"); var database = await wrapper.GetDatabaseAsync(dbName); if (database != null) { uiWrapper.DisplayTitle($"{database.Name} Details"); Console.WriteLine($"{database.Name} created on {database.CreateTime}"); Console.WriteLine(database.Description); } uiWrapper.PressEnter(); var tables = await wrapper.GetTablesAsync(dbName); if (tables.Count > 0) { tables.ForEach(table => { Console.WriteLine($"{table.Name}\tCreated: {table.CreateTime}\tUpdated: {table.UpdateTime}"); }); } uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Create AWS Glue job"); Console.WriteLine("Creating a new AWS Glue job."); var description = "An AWS Glue job created using the AWS SDK for .NET"; await wrapper.CreateJobAsync(dbName, tables[0].Name, bucketUrl, jobName, roleName, description, scriptUrl); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Starting AWS Glue job"); Console.WriteLine("Starting the new AWS Glue job..."); var jobRunId = await wrapper.StartJobRunAsync(jobName, dbName, tables[0].Name, bucketName); var jobRunComplete = false; var jobRun = new JobRun(); do { jobRun = await wrapper.GetJobRunAsync(jobName, jobRunId); if (jobRun.JobRunState == "SUCCEEDED" || jobRun.JobRunState == "STOPPED" || jobRun.JobRunState == "FAILED" || jobRun.JobRunState == "TIMEOUT") { jobRunComplete = true; } } while (!jobRunComplete); uiWrapper.DisplayTitle($"Data in {bucketName}"); // Get the list of data stored in the S3 bucket. var s3Client = new AmazonS3Client(); var response = await s3Client.ListObjectsAsync(new ListObjectsRequest { BucketName = bucketName }); response.S3Objects.ForEach(s3Object => { Console.WriteLine(s3Object.Key); }); uiWrapper.DisplayTitle("AWS Glue jobs"); var jobNames = await wrapper.ListJobsAsync(); jobNames.ForEach(jobName => { Console.WriteLine(jobName); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Get AWS Glue job run information"); Console.WriteLine("Getting information about the AWS Glue job."); var jobRuns = await wrapper.GetJobRunsAsync(jobName); jobRuns.ForEach(jobRun => { Console.WriteLine($"{jobRun.JobName}\t{jobRun.JobRunState}\t{jobRun.CompletedOn}"); }); uiWrapper.PressEnter(); uiWrapper.DisplayTitle("Deleting resources"); Console.WriteLine("Deleting the AWS Glue job used by the example."); await wrapper.DeleteJobAsync(jobName); Console.WriteLine("Deleting the tables from the database."); tables.ForEach(async table => { await wrapper.DeleteTableAsync(dbName, table.Name); }); Console.WriteLine("Deleting the database."); await wrapper.DeleteDatabaseAsync(dbName); Console.WriteLine("Deleting the AWS Glue crawler."); await wrapper.DeleteCrawlerAsync(crawlerName); Console.WriteLine("The AWS Glue scenario has completed."); uiWrapper.PressEnter(); } } namespace GlueBasics; public class UiWrapper { public readonly string SepBar = new string('-', Console.WindowWidth); /// <summary> /// Show information about the scenario. /// </summary> public void DisplayOverview() { Console.Clear(); DisplayTitle("Amazon Glue: get started with crawlers and jobs"); Console.WriteLine("This example application does the following:"); Console.WriteLine("\t 1. Create a crawler, pass it the IAM role and the URL to the public S3 bucket that contains the source data"); Console.WriteLine("\t 2. Start the crawler."); Console.WriteLine("\t 3. Get the database created by the crawler and the tables in the database."); Console.WriteLine("\t 4. Create a job."); Console.WriteLine("\t 5. Start a job run."); Console.WriteLine("\t 6. Wait for the job run to complete."); Console.WriteLine("\t 7. Show the data stored in the bucket."); Console.WriteLine("\t 8. List jobs for the account."); Console.WriteLine("\t 9. Get job run details for the job that was run."); Console.WriteLine("\t10. Delete the demo job."); Console.WriteLine("\t11. Delete the database and tables created for the demo."); Console.WriteLine("\t12. Delete the crawler."); } /// <summary> /// Display a message and wait until the user presses enter. /// </summary> public void PressEnter() { Console.Write("\nPlease press <Enter> to continue. "); _ = Console.ReadLine(); } /// <summary> /// Pad a string with spaces to center it on the console display. /// </summary> /// <param name="strToCenter">The string to center on the screen.</param> /// <returns>The string padded to make it center on the screen.</returns> public string CenterString(string strToCenter) { var padAmount = (Console.WindowWidth - strToCenter.Length) / 2; var leftPad = new string(' ', padAmount); return $"{leftPad}{strToCenter}"; } /// <summary> /// Display a line of hyphens, the centered text of the title and another /// line of hyphens. /// </summary> /// <param name="strTitle">The string to be displayed.</param> public void DisplayTitle(string strTitle) { Console.WriteLine(SepBar); Console.WriteLine(CenterString(strTitle)); Console.WriteLine(SepBar); } }
-
Para API obtener más información, consulte los siguientes temas en AWS SDK for .NET APIReference.
-
- C++
-
- SDKpara C++
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. //! Scenario which demonstrates using AWS Glue to add a crawler and run a job. /*! \\sa runGettingStartedWithGlueScenario() \param bucketName: An S3 bucket created in the setup. \param roleName: An AWS Identity and Access Management (IAM) role created in the setup. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::runGettingStartedWithGlueScenario(const Aws::String &bucketName, const Aws::String &roleName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::Glue::GlueClient client(clientConfig); Aws::String roleArn; if (!getRoleArn(roleName, roleArn, clientConfig)) { std::cerr << "Error getting role ARN for role." << std::endl; return false; } // 1. Upload the job script to the S3 bucket. { std::cout << "Uploading the job script '" << AwsDoc::Glue::PYTHON_SCRIPT << "'." << std::endl; if (!AwsDoc::Glue::uploadFile(bucketName, AwsDoc::Glue::PYTHON_SCRIPT_PATH, AwsDoc::Glue::PYTHON_SCRIPT, clientConfig)) { std::cerr << "Error uploading the job file." << std::endl; return false; } } // 2. Create a crawler. { Aws::Glue::Model::S3Target s3Target; s3Target.SetPath("s3://crawler-public-us-east-1/flight/2016/csv"); Aws::Glue::Model::CrawlerTargets crawlerTargets; crawlerTargets.AddS3Targets(s3Target); Aws::Glue::Model::CreateCrawlerRequest request; request.SetTargets(crawlerTargets); request.SetName(CRAWLER_NAME); request.SetDatabaseName(CRAWLER_DATABASE_NAME); request.SetTablePrefix(CRAWLER_DATABASE_PREFIX); request.SetRole(roleArn); Aws::Glue::Model::CreateCrawlerOutcome outcome = client.CreateCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the crawler." << std::endl; } else { std::cerr << "Error creating a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets("", CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 3. Get a crawler. { Aws::Glue::Model::GetCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome outcome = client.GetCrawler(request); if (outcome.IsSuccess()) { Aws::Glue::Model::CrawlerState crawlerState = outcome.GetResult().GetCrawler().GetState(); std::cout << "Retrieved crawler with state " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << "." << std::endl; } else { std::cerr << "Error retrieving a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 4. Start a crawler. { Aws::Glue::Model::StartCrawlerRequest request; request.SetName(CRAWLER_NAME); Aws::Glue::Model::StartCrawlerOutcome outcome = client.StartCrawler(request); if (outcome.IsSuccess() || (Aws::Glue::GlueErrors::CRAWLER_RUNNING == outcome.GetError().GetErrorType())) { if (!outcome.IsSuccess()) { std::cout << "Crawler was already started." << std::endl; } else { std::cout << "Successfully started crawler." << std::endl; } std::cout << "This may take a while to run." << std::endl; Aws::Glue::Model::CrawlerState crawlerState = Aws::Glue::Model::CrawlerState::NOT_SET; int iterations = 0; while (Aws::Glue::Model::CrawlerState::READY != crawlerState) { std::this_thread::sleep_for(std::chrono::seconds(1)); ++iterations; if ((iterations % 10) == 0) { // Log status every 10 seconds. std::cout << "Crawler status " << Aws::Glue::Model::CrawlerStateMapper::GetNameForCrawlerState( crawlerState) << ". After " << iterations << " seconds elapsed." << std::endl; } Aws::Glue::Model::GetCrawlerRequest getCrawlerRequest; getCrawlerRequest.SetName(CRAWLER_NAME); Aws::Glue::Model::GetCrawlerOutcome getCrawlerOutcome = client.GetCrawler( getCrawlerRequest); if (getCrawlerOutcome.IsSuccess()) { crawlerState = getCrawlerOutcome.GetResult().GetCrawler().GetState(); } else { std::cerr << "Error getting crawler. " << getCrawlerOutcome.GetError().GetMessage() << std::endl; break; } } if (Aws::Glue::Model::CrawlerState::READY == crawlerState) { std::cout << "Crawler finished running after " << iterations << " seconds." << std::endl; } } else { std::cerr << "Error starting a crawler. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 5. Get a database. { Aws::Glue::Model::GetDatabaseRequest request; request.SetName(CRAWLER_DATABASE_NAME); Aws::Glue::Model::GetDatabaseOutcome outcome = client.GetDatabase(request); if (outcome.IsSuccess()) { const Aws::Glue::Model::Database &database = outcome.GetResult().GetDatabase(); std::cout << "Successfully retrieve the database\n" << database.Jsonize().View().WriteReadable() << "'." << std::endl; } else { std::cerr << "Error getting the database. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 6. Get tables. Aws::String tableName; { Aws::Glue::Model::GetTablesRequest request; request.SetDatabaseName(CRAWLER_DATABASE_NAME); std::vector<Aws::Glue::Model::Table> all_tables; Aws::String nextToken; // Used for pagination. do { Aws::Glue::Model::GetTablesOutcome outcome = client.GetTables(request); if (outcome.IsSuccess()) { const std::vector<Aws::Glue::Model::Table> &tables = outcome.GetResult().GetTableList(); all_tables.insert(all_tables.end(), tables.begin(), tables.end()); nextToken = outcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting the tables. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } while (!nextToken.empty()); std::cout << "The database contains " << all_tables.size() << (all_tables.size() == 1 ? " table." : "tables.") << std::endl; std::cout << "Here is a list of the tables in the database."; for (size_t index = 0; index < all_tables.size(); ++index) { std::cout << " " << index + 1 << ": " << all_tables[index].GetName() << std::endl; } if (!all_tables.empty()) { int tableIndex = askQuestionForIntRange( "Enter an index to display the database detail ", 1, static_cast<int>(all_tables.size())); std::cout << all_tables[tableIndex - 1].Jsonize().View().WriteReadable() << std::endl; tableName = all_tables[tableIndex - 1].GetName(); } } // 7. Create a job. { Aws::Glue::Model::CreateJobRequest request; request.SetName(JOB_NAME); request.SetRole(roleArn); request.SetGlueVersion(GLUE_VERSION); Aws::Glue::Model::JobCommand command; command.SetName(JOB_COMMAND_NAME); command.SetPythonVersion(JOB_PYTHON_VERSION); command.SetScriptLocation( Aws::String("s3://") + bucketName + "/" + PYTHON_SCRIPT); request.SetCommand(command); Aws::Glue::Model::CreateJobOutcome outcome = client.CreateJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully created the job." << std::endl; } else { std::cerr << "Error creating the job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, "", bucketName, clientConfig); return false; } } // 8. Start a job run. { Aws::Glue::Model::StartJobRunRequest request; request.SetJobName(JOB_NAME); Aws::Map<Aws::String, Aws::String> arguments; arguments["--input_database"] = CRAWLER_DATABASE_NAME; arguments["--input_table"] = tableName; arguments["--output_bucket_url"] = Aws::String("s3://") + bucketName + "/"; request.SetArguments(arguments); Aws::Glue::Model::StartJobRunOutcome outcome = client.StartJobRun(request); if (outcome.IsSuccess()) { std::cout << "Successfully started the job." << std::endl; Aws::String jobRunId = outcome.GetResult().GetJobRunId(); int iterator = 0; bool done = false; while (!done) { ++iterator; std::this_thread::sleep_for(std::chrono::seconds(1)); Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(JOB_NAME); jobRunRequest.SetRunId(jobRunId); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { const Aws::Glue::Model::JobRun &jobRun = jobRunOutcome.GetResult().GetJobRun(); Aws::Glue::Model::JobRunState jobRunState = jobRun.GetJobRunState(); if ((jobRunState == Aws::Glue::Model::JobRunState::STOPPED) || (jobRunState == Aws::Glue::Model::JobRunState::FAILED) || (jobRunState == Aws::Glue::Model::JobRunState::TIMEOUT)) { std::cerr << "Error running job. " << jobRun.GetErrorMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } else if (jobRunState == Aws::Glue::Model::JobRunState::SUCCEEDED) { std::cout << "Job run succeeded after " << iterator << " seconds elapsed." << std::endl; done = true; } else if ((iterator % 10) == 0) { // Log status every 10 seconds. std::cout << "Job run status " << Aws::Glue::Model::JobRunStateMapper::GetNameForJobRunState( jobRunState) << ". " << iterator << " seconds elapsed." << std::endl; } } else { std::cerr << "Error retrieving job run state. " << jobRunOutcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } } else { std::cerr << "Error starting a job. " << outcome.GetError().GetMessage() << std::endl; deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 9. List the output data stored in the S3 bucket. { Aws::S3::S3Client s3Client; Aws::S3::Model::ListObjectsV2Request request; request.SetBucket(bucketName); request.SetPrefix(OUTPUT_FILE_PREFIX); Aws::String continuationToken; // Used for pagination. std::vector<Aws::S3::Model::Object> allObjects; do { if (!continuationToken.empty()) { request.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome outcome = s3Client.ListObjectsV2( request); if (outcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = outcome.GetResult().GetContents(); allObjects.insert(allObjects.end(), objects.begin(), objects.end()); continuationToken = outcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << outcome.GetError().GetMessage() << std::endl; break; } } while (!continuationToken.empty()); std::cout << "Data from your job is in " << allObjects.size() << " files in the S3 bucket, " << bucketName << "." << std::endl; for (size_t i = 0; i < allObjects.size(); ++i) { std::cout << " " << i + 1 << ". " << allObjects[i].GetKey() << std::endl; } int objectIndex = askQuestionForIntRange( std::string( "Enter the number of a block to download it and see the first ") + std::to_string(LINES_OF_RUN_FILE_TO_DISPLAY) + " lines of JSON output in the block: ", 1, static_cast<int>(allObjects.size())); Aws::String objectKey = allObjects[objectIndex - 1].GetKey(); std::stringstream stringStream; if (getObjectFromBucket(bucketName, objectKey, stringStream, clientConfig)) { for (int i = 0; i < LINES_OF_RUN_FILE_TO_DISPLAY && stringStream; ++i) { std::string line; std::getline(stringStream, line); std::cout << " " << line << std::endl; } } else { deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); return false; } } // 10. List all the jobs. Aws::String jobName; { Aws::Glue::Model::ListJobsRequest listJobsRequest; Aws::String nextToken; std::vector<Aws::String> allJobNames; do { if (!nextToken.empty()) { listJobsRequest.SetNextToken(nextToken); } Aws::Glue::Model::ListJobsOutcome listRunsOutcome = client.ListJobs( listJobsRequest); if (listRunsOutcome.IsSuccess()) { const std::vector<Aws::String> &jobNames = listRunsOutcome.GetResult().GetJobNames(); allJobNames.insert(allJobNames.end(), jobNames.begin(), jobNames.end()); nextToken = listRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error listing jobs. " << listRunsOutcome.GetError().GetMessage() << std::endl; } } while (!nextToken.empty()); std::cout << "Your account has " << allJobNames.size() << " jobs." << std::endl; for (size_t i = 0; i < allJobNames.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobNames[i] << std::endl; } int jobIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobNames.size()) + " to see the list of runs for a job: ", 1, static_cast<int>(allJobNames.size())); jobName = allJobNames[jobIndex - 1]; } // 11. Get the job runs for a job. Aws::String jobRunID; if (!jobName.empty()) { Aws::Glue::Model::GetJobRunsRequest getJobRunsRequest; getJobRunsRequest.SetJobName(jobName); Aws::String nextToken; // Used for pagination. std::vector<Aws::Glue::Model::JobRun> allJobRuns; do { if (!nextToken.empty()) { getJobRunsRequest.SetNextToken(nextToken); } Aws::Glue::Model::GetJobRunsOutcome jobRunsOutcome = client.GetJobRuns( getJobRunsRequest); if (jobRunsOutcome.IsSuccess()) { const std::vector<Aws::Glue::Model::JobRun> &jobRuns = jobRunsOutcome.GetResult().GetJobRuns(); allJobRuns.insert(allJobRuns.end(), jobRuns.begin(), jobRuns.end()); nextToken = jobRunsOutcome.GetResult().GetNextToken(); } else { std::cerr << "Error getting job runs. " << jobRunsOutcome.GetError().GetMessage() << std::endl; break; } } while (!nextToken.empty()); std::cout << "There are " << allJobRuns.size() << " runs in the job '" << jobName << "'." << std::endl; for (size_t i = 0; i < allJobRuns.size(); ++i) { std::cout << " " << i + 1 << ". " << allJobRuns[i].GetJobName() << std::endl; } int runIndex = askQuestionForIntRange( Aws::String("Enter a number between 1 and ") + std::to_string(allJobRuns.size()) + " to see details for a run: ", 1, static_cast<int>(allJobRuns.size())); jobRunID = allJobRuns[runIndex - 1].GetId(); } // 12. Get a single job run. if (!jobRunID.empty()) { Aws::Glue::Model::GetJobRunRequest jobRunRequest; jobRunRequest.SetJobName(jobName); jobRunRequest.SetRunId(jobRunID); Aws::Glue::Model::GetJobRunOutcome jobRunOutcome = client.GetJobRun( jobRunRequest); if (jobRunOutcome.IsSuccess()) { std::cout << "Displaying the job run JSON description." << std::endl; std::cout << jobRunOutcome.GetResult().GetJobRun().Jsonize().View().WriteReadable() << std::endl; } else { std::cerr << "Error get a job run. " << jobRunOutcome.GetError().GetMessage() << std::endl; } } return deleteAssets(CRAWLER_NAME, CRAWLER_DATABASE_NAME, JOB_NAME, bucketName, clientConfig); } //! Cleanup routine to delete created assets. /*! \\sa deleteAssets() \param crawler: Name of an AWS Glue crawler. \param database: The name of an AWS Glue database. \param job: The name of an AWS Glue job. \param bucketName: The name of an S3 bucket. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAssets(const Aws::String &crawler, const Aws::String &database, const Aws::String &job, const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { const Aws::Glue::GlueClient client(clientConfig); bool result = true; // 13. Delete a job. if (!job.empty()) { Aws::Glue::Model::DeleteJobRequest request; request.SetJobName(job); Aws::Glue::Model::DeleteJobOutcome outcome = client.DeleteJob(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the job." << std::endl; } else { std::cerr << "Error deleting the job. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 14. Delete a database. if (!database.empty()) { Aws::Glue::Model::DeleteDatabaseRequest request; request.SetName(database); Aws::Glue::Model::DeleteDatabaseOutcome outcome = client.DeleteDatabase( request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the database." << std::endl; } else { std::cerr << "Error deleting database. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 15. Delete a crawler. if (!crawler.empty()) { Aws::Glue::Model::DeleteCrawlerRequest request; request.SetName(crawler); Aws::Glue::Model::DeleteCrawlerOutcome outcome = client.DeleteCrawler(request); if (outcome.IsSuccess()) { std::cout << "Successfully deleted the crawler." << std::endl; } else { std::cerr << "Error deleting the crawler. " << outcome.GetError().GetMessage() << std::endl; result = false; } } // 16. Delete the job script and run data from the S3 bucket. result &= AwsDoc::Glue::deleteAllObjectsInS3Bucket(bucketName, clientConfig); return result; } //! Routine which uploads a file to an S3 bucket. /*! \\sa uploadFile() \param bucketName: An S3 bucket created in the setup. \param filePath: The path of the file to upload. \param fileName The name for the uploaded file. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::uploadFile(const Aws::String &bucketName, const Aws::String &filePath, const Aws::String &fileName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client s3_client(clientConfig); Aws::S3::Model::PutObjectRequest request; request.SetBucket(bucketName); request.SetKey(fileName); std::shared_ptr<Aws::IOStream> inputData = Aws::MakeShared<Aws::FStream>("SampleAllocationTag", filePath.c_str(), std::ios_base::in | std::ios_base::binary); if (!*inputData) { std::cerr << "Error unable to read file " << filePath << std::endl; return false; } request.SetBody(inputData); Aws::S3::Model::PutObjectOutcome outcome = s3_client.PutObject(request); if (!outcome.IsSuccess()) { std::cerr << "Error: PutObject: " << outcome.GetError().GetMessage() << std::endl; } else { std::cout << "Added object '" << filePath << "' to bucket '" << bucketName << "'." << std::endl; } return outcome.IsSuccess(); } //! Routine which deletes all objects in an S3 bucket. /*! \\sa deleteAllObjectsInS3Bucket() \param bucketName: The S3 bucket name. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::deleteAllObjectsInS3Bucket(const Aws::String &bucketName, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::ListObjectsV2Request listObjectsRequest; listObjectsRequest.SetBucket(bucketName); Aws::String continuationToken; // Used for pagination. bool result = true; do { if (!continuationToken.empty()) { listObjectsRequest.SetContinuationToken(continuationToken); } Aws::S3::Model::ListObjectsV2Outcome listObjectsOutcome = client.ListObjectsV2( listObjectsRequest); if (listObjectsOutcome.IsSuccess()) { const std::vector<Aws::S3::Model::Object> &objects = listObjectsOutcome.GetResult().GetContents(); if (!objects.empty()) { Aws::S3::Model::DeleteObjectsRequest deleteObjectsRequest; deleteObjectsRequest.SetBucket(bucketName); std::vector<Aws::S3::Model::ObjectIdentifier> objectIdentifiers; for (const Aws::S3::Model::Object &object: objects) { objectIdentifiers.push_back( Aws::S3::Model::ObjectIdentifier().WithKey( object.GetKey())); } Aws::S3::Model::Delete objectsDelete; objectsDelete.SetObjects(objectIdentifiers); objectsDelete.SetQuiet(true); deleteObjectsRequest.SetDelete(objectsDelete); Aws::S3::Model::DeleteObjectsOutcome deleteObjectsOutcome = client.DeleteObjects(deleteObjectsRequest); if (!deleteObjectsOutcome.IsSuccess()) { std::cerr << "Error deleting objects. " << deleteObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } else { std::cout << "Successfully deleted the objects." << std::endl; } } else { std::cout << "No objects to delete in '" << bucketName << "'." << std::endl; } continuationToken = listObjectsOutcome.GetResult().GetNextContinuationToken(); } else { std::cerr << "Error listing objects. " << listObjectsOutcome.GetError().GetMessage() << std::endl; result = false; break; } } while (!continuationToken.empty()); return result; } //! Routine which retrieves an object from an S3 bucket. /*! \\sa getObjectFromBucket() \param bucketName: The S3 bucket name. \param objectKey: The object's name. \param objectStream: A stream to receive the retrieved data. \param clientConfig: AWS client configuration. \return bool: Successful completion. */ bool AwsDoc::Glue::getObjectFromBucket(const Aws::String &bucketName, const Aws::String &objectKey, std::ostream &objectStream, const Aws::Client::ClientConfiguration &clientConfig) { Aws::S3::S3Client client(clientConfig); Aws::S3::Model::GetObjectRequest request; request.SetBucket(bucketName); request.SetKey(objectKey); Aws::S3::Model::GetObjectOutcome outcome = client.GetObject(request); if (outcome.IsSuccess()) { std::cout << "Successfully retrieved '" << objectKey << "'." << std::endl; auto &body = outcome.GetResult().GetBody(); objectStream << body.rdbuf(); } else { std::cerr << "Error retrieving object. " << outcome.GetError().GetMessage() << std::endl; } return outcome.IsSuccess(); }
-
Para API obtener más información, consulte los siguientes temas en AWS SDK for C++ APIReference.
-
- Java
-
- SDKpara Java 2.x
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. /** * * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html * * To set up the resources, see this documentation topic: * * https://docs.aws.amazon.com/glue/latest/ug/tutorial-add-crawler.html * * This example performs the following tasks: * * 1. Create a database. * 2. Create a crawler. * 3. Get a crawler. * 4. Start a crawler. * 5. Get a database. * 6. Get tables. * 7. Create a job. * 8. Start a job run. * 9. List all jobs. * 10. Get job runs. * 11. Delete a job. * 12. Delete a database. * 13. Delete a crawler. */ public class GlueScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); public static void main(String[] args) throws InterruptedException { final String usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName>\s Where: iam - The ARN of the IAM role that has AWS Glue and S3 permissions.\s s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, CSV data). cron - A cron expression used to specify the schedule (i.e., cron(15 12 * * ? *). dbName - The database name.\s crawlerName - The name of the crawler.\s jobName - The name you assign to this job definition. scriptLocation - The Amazon S3 path to a script that runs a job. locationUri - The location of the database bucketNameSc - The Amazon S3 bucket name used when creating a job """; if (args.length != 9) { System.out.println(usage); System.exit(1); } String iam = args[0]; String s3Path = args[1]; String cron = args[2]; String dbName = args[3]; String crawlerName = args[4]; String jobName = args[5]; String scriptLocation = args[6]; String locationUri = args[7]; String bucketNameSc = args[8]; Region region = Region.US_EAST_1; GlueClient glueClient = GlueClient.builder() .region(region) .build(); System.out.println(DASHES); System.out.println("Welcome to the AWS Glue scenario."); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create a database."); createDatabase(glueClient, dbName, locationUri); System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Create a crawler."); createGlueCrawler(glueClient, iam, s3Path, cron, dbName, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Get a crawler."); getSpecificCrawler(glueClient, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Start a crawler."); startSpecificCrawler(glueClient, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Get a database."); getSpecificDatabase(glueClient, dbName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("*** Wait 5 min for the tables to become available"); TimeUnit.MINUTES.sleep(5); System.out.println("6. Get tables."); String myTableName = getGlueTables(glueClient, dbName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Create a job."); createJob(glueClient, jobName, iam, scriptLocation); System.out.println(DASHES); System.out.println(DASHES); System.out.println("8. Start a Job run."); startJob(glueClient, jobName, dbName, myTableName, bucketNameSc); System.out.println(DASHES); System.out.println(DASHES); System.out.println("9. List all jobs."); getAllJobs(glueClient); System.out.println(DASHES); System.out.println(DASHES); System.out.println("10. Get job runs."); getJobRuns(glueClient, jobName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("11. Delete a job."); deleteJob(glueClient, jobName); System.out.println("*** Wait 5 MIN for the " + crawlerName + " to stop"); TimeUnit.MINUTES.sleep(5); System.out.println(DASHES); System.out.println(DASHES); System.out.println("12. Delete a database."); deleteDatabase(glueClient, dbName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Delete a crawler."); deleteSpecificCrawler(glueClient, crawlerName); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Successfully completed the AWS Glue Scenario"); System.out.println(DASHES); } public static void createDatabase(GlueClient glueClient, String dbName, String locationUri) { try { DatabaseInput input = DatabaseInput.builder() .description("Built with the AWS SDK for Java V2") .name(dbName) .locationUri(locationUri) .build(); CreateDatabaseRequest request = CreateDatabaseRequest.builder() .databaseInput(input) .build(); glueClient.createDatabase(request); System.out.println(dbName + " was successfully created"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void createGlueCrawler(GlueClient glueClient, String iam, String s3Path, String cron, String dbName, String crawlerName) { try { S3Target s3Target = S3Target.builder() .path(s3Path) .build(); List<S3Target> targetList = new ArrayList<>(); targetList.add(s3Target); CrawlerTargets targets = CrawlerTargets.builder() .s3Targets(targetList) .build(); CreateCrawlerRequest crawlerRequest = CreateCrawlerRequest.builder() .databaseName(dbName) .name(crawlerName) .description("Created by the AWS Glue Java API") .targets(targets) .role(iam) .schedule(cron) .build(); glueClient.createCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully created"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getSpecificCrawler(GlueClient glueClient, String crawlerName) { try { GetCrawlerRequest crawlerRequest = GetCrawlerRequest.builder() .name(crawlerName) .build(); boolean ready = false; while (!ready) { GetCrawlerResponse response = glueClient.getCrawler(crawlerRequest); String status = response.crawler().stateAsString(); if (status.compareTo("READY") == 0) { ready = true; } Thread.sleep(3000); } System.out.println("The crawler is now ready"); } catch (GlueException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void startSpecificCrawler(GlueClient glueClient, String crawlerName) { try { StartCrawlerRequest crawlerRequest = StartCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.startCrawler(crawlerRequest); System.out.println(crawlerName + " was successfully started!"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getSpecificDatabase(GlueClient glueClient, String databaseName) { try { GetDatabaseRequest databasesRequest = GetDatabaseRequest.builder() .name(databaseName) .build(); GetDatabaseResponse response = glueClient.getDatabase(databasesRequest); Instant createDate = response.database().createTime(); // Convert the Instant to readable date. DateTimeFormatter formatter = DateTimeFormatter.ofLocalizedDateTime(FormatStyle.SHORT) .withLocale(Locale.US) .withZone(ZoneId.systemDefault()); formatter.format(createDate); System.out.println("The create date of the database is " + createDate); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static String getGlueTables(GlueClient glueClient, String dbName) { String myTableName = ""; try { GetTablesRequest tableRequest = GetTablesRequest.builder() .databaseName(dbName) .build(); GetTablesResponse response = glueClient.getTables(tableRequest); List<Table> tables = response.tableList(); if (tables.isEmpty()) { System.out.println("No tables were returned"); } else { for (Table table : tables) { myTableName = table.name(); System.out.println("Table name is: " + myTableName); } } } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } return myTableName; } public static void startJob(GlueClient glueClient, String jobName, String inputDatabase, String inputTable, String outBucket) { try { Map<String, String> myMap = new HashMap<>(); myMap.put("--input_database", inputDatabase); myMap.put("--input_table", inputTable); myMap.put("--output_bucket_url", outBucket); StartJobRunRequest runRequest = StartJobRunRequest.builder() .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .arguments(myMap) .jobName(jobName) .build(); StartJobRunResponse response = glueClient.startJobRun(runRequest); System.out.println("The request Id of the job is " + response.responseMetadata().requestId()); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void createJob(GlueClient glueClient, String jobName, String iam, String scriptLocation) { try { JobCommand command = JobCommand.builder() .pythonVersion("3") .name("glueetl") .scriptLocation(scriptLocation) .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .description("A Job created by using the AWS SDK for Java V2") .glueVersion("2.0") .workerType(WorkerType.G_1_X) .numberOfWorkers(10) .name(jobName) .role(iam) .command(command) .build(); glueClient.createJob(jobRequest); System.out.println(jobName + " was successfully created."); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getAllJobs(GlueClient glueClient) { try { GetJobsRequest jobsRequest = GetJobsRequest.builder() .maxResults(10) .build(); GetJobsResponse jobsResponse = glueClient.getJobs(jobsRequest); List<Job> jobs = jobsResponse.jobs(); for (Job job : jobs) { System.out.println("Job name is : " + job.name()); System.out.println("The job worker type is : " + job.workerType().name()); } } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void getJobRuns(GlueClient glueClient, String jobName) { try { GetJobRunsRequest runsRequest = GetJobRunsRequest.builder() .jobName(jobName) .maxResults(20) .build(); boolean jobDone = false; while (!jobDone) { GetJobRunsResponse response = glueClient.getJobRuns(runsRequest); List<JobRun> jobRuns = response.jobRuns(); for (JobRun jobRun : jobRuns) { String jobState = jobRun.jobRunState().name(); if (jobState.compareTo("SUCCEEDED") == 0) { System.out.println(jobName + " has succeeded"); jobDone = true; } else if (jobState.compareTo("STOPPED") == 0) { System.out.println("Job run has stopped"); jobDone = true; } else if (jobState.compareTo("FAILED") == 0) { System.out.println("Job run has failed"); jobDone = true; } else if (jobState.compareTo("TIMEOUT") == 0) { System.out.println("Job run has timed out"); jobDone = true; } else { System.out.println("*** Job run state is " + jobRun.jobRunState().name()); System.out.println("Job run Id is " + jobRun.id()); System.out.println("The Glue version is " + jobRun.glueVersion()); } TimeUnit.SECONDS.sleep(5); } } } catch (GlueException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } } public static void deleteJob(GlueClient glueClient, String jobName) { try { DeleteJobRequest jobRequest = DeleteJobRequest.builder() .jobName(jobName) .build(); glueClient.deleteJob(jobRequest); System.out.println(jobName + " was successfully deleted"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteDatabase(GlueClient glueClient, String databaseName) { try { DeleteDatabaseRequest request = DeleteDatabaseRequest.builder() .name(databaseName) .build(); glueClient.deleteDatabase(request); System.out.println(databaseName + " was successfully deleted"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } public static void deleteSpecificCrawler(GlueClient glueClient, String crawlerName) { try { DeleteCrawlerRequest deleteCrawlerRequest = DeleteCrawlerRequest.builder() .name(crawlerName) .build(); glueClient.deleteCrawler(deleteCrawlerRequest); System.out.println(crawlerName + " was deleted"); } catch (GlueException e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } }
-
Para API obtener más información, consulte los siguientes temas en AWS SDK for Java 2.x APIReference.
-
- JavaScript
-
- SDKpara JavaScript (v3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Cree y ejecute un rastreador que rastree un bucket público de Amazon Simple Storage Service (Amazon S3) y genere una base de datos de metadatos que describa los CSV datos con formato B que encuentre.
const createCrawler = (name, role, dbName, tablePrefix, s3TargetPath) => { const client = new GlueClient({}); const command = new CreateCrawlerCommand({ Name: name, Role: role, DatabaseName: dbName, TablePrefix: tablePrefix, Targets: { S3Targets: [{ Path: s3TargetPath }], }, }); return client.send(command); }; const getCrawler = (name) => { const client = new GlueClient({}); const command = new GetCrawlerCommand({ Name: name, }); return client.send(command); }; const startCrawler = (name) => { const client = new GlueClient({}); const command = new StartCrawlerCommand({ Name: name, }); return client.send(command); }; const crawlerExists = async ({ getCrawler }, crawlerName) => { try { await getCrawler(crawlerName); return true; } catch { return false; } }; /** * @param {{ createCrawler: import('../../../actions/create-crawler.js').createCrawler}} actions */ const makeCreateCrawlerStep = (actions) => async (context) => { if (await crawlerExists(actions, process.env.CRAWLER_NAME)) { log("Crawler already exists. Skipping creation."); } else { await actions.createCrawler( process.env.CRAWLER_NAME, process.env.ROLE_NAME, process.env.DATABASE_NAME, process.env.TABLE_PREFIX, process.env.S3_TARGET_PATH, ); log("Crawler created successfully.", { type: "success" }); } return { ...context }; }; /** * @param {(name: string) => Promise<import('@aws-sdk/client-glue').GetCrawlerCommandOutput>} getCrawler * @param {string} crawlerName */ const waitForCrawler = async (getCrawler, crawlerName) => { const waitTimeInSeconds = 30; const { Crawler } = await getCrawler(crawlerName); if (!Crawler) { throw new Error(`Crawler with name ${crawlerName} not found.`); } if (Crawler.State === "READY") { return; } log(`Crawler is ${Crawler.State}. Waiting ${waitTimeInSeconds} seconds...`); await wait(waitTimeInSeconds); return waitForCrawler(getCrawler, crawlerName); }; const makeStartCrawlerStep = ({ startCrawler, getCrawler }) => async (context) => { log("Starting crawler."); await startCrawler(process.env.CRAWLER_NAME); log("Crawler started.", { type: "success" }); log("Waiting for crawler to finish running. This can take a while."); await waitForCrawler(getCrawler, process.env.CRAWLER_NAME); log("Crawler ready.", { type: "success" }); return { ...context }; };
Enumere la información sobre las bases de datos y las tablas de su. AWS Glue Data Catalog
const getDatabase = (name) => { const client = new GlueClient({}); const command = new GetDatabaseCommand({ Name: name, }); return client.send(command); }; const getTables = (databaseName) => { const client = new GlueClient({}); const command = new GetTablesCommand({ DatabaseName: databaseName, }); return client.send(command); }; const makeGetDatabaseStep = ({ getDatabase }) => async (context) => { const { Database: { Name }, } = await getDatabase(process.env.DATABASE_NAME); log(`Database: ${Name}`); return { ...context }; }; /** * @param {{ getTables: () => Promise<import('@aws-sdk/client-glue').GetTablesCommandOutput}} config */ const makeGetTablesStep = ({ getTables }) => async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME); log("Tables:"); log(TableList.map((table) => ` • ${table.Name}\n`)); return { ...context }; };
Cree y ejecute un trabajo que extraiga CSV datos del bucket de Amazon S3 de origen, los transforme eliminando campos y cambiándoles el nombre, y cargue la salida JSON con formato en otro bucket de Amazon S3.
const createJob = (name, role, scriptBucketName, scriptKey) => { const client = new GlueClient({}); const command = new CreateJobCommand({ Name: name, Role: role, Command: { Name: "glueetl", PythonVersion: "3", ScriptLocation: `s3://${scriptBucketName}/${scriptKey}`, }, GlueVersion: "3.0", }); return client.send(command); }; const startJobRun = (jobName, dbName, tableName, bucketName) => { const client = new GlueClient({}); const command = new StartJobRunCommand({ JobName: jobName, Arguments: { "--input_database": dbName, "--input_table": tableName, "--output_bucket_url": `s3://${bucketName}/`, }, }); return client.send(command); }; const makeCreateJobStep = ({ createJob }) => async (context) => { log("Creating Job."); await createJob( process.env.JOB_NAME, process.env.ROLE_NAME, process.env.BUCKET_NAME, process.env.PYTHON_SCRIPT_KEY, ); log("Job created.", { type: "success" }); return { ...context }; }; /** * @param {(name: string, runId: string) => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput> } getJobRun * @param {string} jobName * @param {string} jobRunId */ const waitForJobRun = async (getJobRun, jobName, jobRunId) => { const waitTimeInSeconds = 30; const { JobRun } = await getJobRun(jobName, jobRunId); if (!JobRun) { throw new Error(`Job run with id ${jobRunId} not found.`); } switch (JobRun.JobRunState) { case "FAILED": case "TIMEOUT": case "STOPPED": throw new Error( `Job ${JobRun.JobRunState}. Error: ${JobRun.ErrorMessage}`, ); case "RUNNING": break; case "SUCCEEDED": return; default: throw new Error(`Unknown job run state: ${JobRun.JobRunState}`); } log( `Job ${JobRun.JobRunState}. Waiting ${waitTimeInSeconds} more seconds...`, ); await wait(waitTimeInSeconds); return waitForJobRun(getJobRun, jobName, jobRunId); }; /** * @param {{ prompter: { prompt: () => Promise<{ shouldOpen: boolean }>} }} context */ const promptToOpen = async (context) => { const { shouldOpen } = await context.prompter.prompt({ name: "shouldOpen", type: "confirm", message: "Open the output bucket in your browser?", }); if (shouldOpen) { return open( `https://s3.console.aws.amazon.com/s3/buckets/${process.env.BUCKET_NAME} to view the output.`, ); } }; const makeStartJobRunStep = ({ startJobRun, getJobRun }) => async (context) => { log("Starting job."); const { JobRunId } = await startJobRun( process.env.JOB_NAME, process.env.DATABASE_NAME, process.env.TABLE_NAME, process.env.BUCKET_NAME, ); log("Job started.", { type: "success" }); log("Waiting for job to finish running. This can take a while."); await waitForJobRun(getJobRun, process.env.JOB_NAME, JobRunId); log("Job run succeeded.", { type: "success" }); await promptToOpen(context); return { ...context }; };
Incluya información sobre las ejecuciones de trabajos y vea algunos de los datos transformados.
const getJobRuns = (jobName) => { const client = new GlueClient({}); const command = new GetJobRunsCommand({ JobName: jobName, }); return client.send(command); }; const getJobRun = (jobName, jobRunId) => { const client = new GlueClient({}); const command = new GetJobRunCommand({ JobName: jobName, RunId: jobRunId, }); return client.send(command); }; /** * @typedef {{ prompter: { prompt: () => Promise<{jobName: string}> } }} Context */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunCommandOutput>} getJobRun */ /** * @typedef {() => Promise<import('@aws-sdk/client-glue').GetJobRunsCommandOutput} getJobRuns */ /** * * @param {getJobRun} getJobRun * @param {string} jobName * @param {string} jobRunId */ const logJobRunDetails = async (getJobRun, jobName, jobRunId) => { const { JobRun } = await getJobRun(jobName, jobRunId); log(JobRun, { type: "object" }); }; /** * * @param {{getJobRuns: getJobRuns, getJobRun: getJobRun }} funcs */ const makePickJobRunStep = ({ getJobRuns, getJobRun }) => async (/** @type { Context } */ context) => { if (context.selectedJobName) { const { JobRuns } = await getJobRuns(context.selectedJobName); const { jobRunId } = await context.prompter.prompt({ name: "jobRunId", type: "list", message: "Select a job run to see details.", choices: JobRuns.map((run) => run.Id), }); logJobRunDetails(getJobRun, context.selectedJobName, jobRunId); } return { ...context }; };
Elimine todos los recursos creados en la demostración.
const deleteJob = (jobName) => { const client = new GlueClient({}); const command = new DeleteJobCommand({ JobName: jobName, }); return client.send(command); }; const deleteTable = (databaseName, tableName) => { const client = new GlueClient({}); const command = new DeleteTableCommand({ DatabaseName: databaseName, Name: tableName, }); return client.send(command); }; const deleteDatabase = (databaseName) => { const client = new GlueClient({}); const command = new DeleteDatabaseCommand({ Name: databaseName, }); return client.send(command); }; const deleteCrawler = (crawlerName) => { const client = new GlueClient({}); const command = new DeleteCrawlerCommand({ Name: crawlerName, }); return client.send(command); }; /** * * @param {import('../../../actions/delete-job.js').deleteJob} deleteJobFn * @param {string[]} jobNames * @param {{ prompter: { prompt: () => Promise<any> }}} context */ const handleDeleteJobs = async (deleteJobFn, jobNames, context) => { /** * @type {{ selectedJobNames: string[] }} */ const { selectedJobNames } = await context.prompter.prompt({ name: "selectedJobNames", type: "checkbox", message: "Let's clean up jobs. Select jobs to delete.", choices: jobNames, }); if (selectedJobNames.length === 0) { log("No jobs selected."); } else { log("Deleting jobs."); await Promise.all( selectedJobNames.map((n) => deleteJobFn(n).catch(console.error)), ); log("Jobs deleted.", { type: "success" }); } }; /** * @param {{ * listJobs: import('../../../actions/list-jobs.js').listJobs, * deleteJob: import('../../../actions/delete-job.js').deleteJob * }} config */ const makeCleanUpJobsStep = ({ listJobs, deleteJob }) => async (context) => { const { JobNames } = await listJobs(); if (JobNames.length > 0) { await handleDeleteJobs(deleteJob, JobNames, context); } return { ...context }; }; /** * @param {import('../../../actions/delete-table.js').deleteTable} deleteTable * @param {string} databaseName * @param {string[]} tableNames */ const deleteTables = (deleteTable, databaseName, tableNames) => Promise.all( tableNames.map((tableName) => deleteTable(databaseName, tableName).catch(console.error), ), ); /** * @param {{ * getTables: import('../../../actions/get-tables.js').getTables, * deleteTable: import('../../../actions/delete-table.js').deleteTable * }} config */ const makeCleanUpTablesStep = ({ getTables, deleteTable }) => /** * @param {{ prompter: { prompt: () => Promise<any>}}} context */ async (context) => { const { TableList } = await getTables(process.env.DATABASE_NAME).catch( () => ({ TableList: null }), ); if (TableList && TableList.length > 0) { /** * @type {{ tableNames: string[] }} */ const { tableNames } = await context.prompter.prompt({ name: "tableNames", type: "checkbox", message: "Let's clean up tables. Select tables to delete.", choices: TableList.map((t) => t.Name), }); if (tableNames.length === 0) { log("No tables selected."); } else { log("Deleting tables."); await deleteTables(deleteTable, process.env.DATABASE_NAME, tableNames); log("Tables deleted.", { type: "success" }); } } return { ...context }; }; /** * @param {import('../../../actions/delete-database.js').deleteDatabase} deleteDatabase * @param {string[]} databaseNames */ const deleteDatabases = (deleteDatabase, databaseNames) => Promise.all( databaseNames.map((dbName) => deleteDatabase(dbName).catch(console.error)), ); /** * @param {{ * getDatabases: import('../../../actions/get-databases.js').getDatabases * deleteDatabase: import('../../../actions/delete-database.js').deleteDatabase * }} config */ const makeCleanUpDatabasesStep = ({ getDatabases, deleteDatabase }) => /** * @param {{ prompter: { prompt: () => Promise<any>}} context */ async (context) => { const { DatabaseList } = await getDatabases(); if (DatabaseList.length > 0) { /** @type {{ dbNames: string[] }} */ const { dbNames } = await context.prompter.prompt({ name: "dbNames", type: "checkbox", message: "Let's clean up databases. Select databases to delete.", choices: DatabaseList.map((db) => db.Name), }); if (dbNames.length === 0) { log("No databases selected."); } else { log("Deleting databases."); await deleteDatabases(deleteDatabase, dbNames); log("Databases deleted.", { type: "success" }); } } return { ...context }; }; const cleanUpCrawlerStep = async (context) => { log(`Deleting crawler.`); try { await deleteCrawler(process.env.CRAWLER_NAME); log("Crawler deleted.", { type: "success" }); } catch (err) { if (err.name === "EntityNotFoundException") { log(`Crawler is already deleted.`); } else { throw err; } } return { ...context }; };
-
Para API obtener más información, consulte los siguientes temas en la sección de referencia.AWS SDK for JavaScript API
-
- Kotlin
-
- SDKpara Kotlin
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. suspend fun main(args: Array<String>) { val usage = """ Usage: <iam> <s3Path> <cron> <dbName> <crawlerName> <jobName> <scriptLocation> <locationUri> Where: iam - The Amazon Resource Name (ARN) of the AWS Identity and Access Management (IAM) role that has AWS Glue and Amazon Simple Storage Service (Amazon S3) permissions. s3Path - The Amazon Simple Storage Service (Amazon S3) target that contains data (for example, CSV data). cron - A cron expression used to specify the schedule (for example, cron(15 12 * * ? *). dbName - The database name. crawlerName - The name of the crawler. jobName - The name you assign to this job definition. scriptLocation - Specifies the Amazon S3 path to a script that runs a job. locationUri - Specifies the location of the database """ if (args.size != 8) { println(usage) exitProcess(1) } val iam = args[0] val s3Path = args[1] val cron = args[2] val dbName = args[3] val crawlerName = args[4] val jobName = args[5] val scriptLocation = args[6] val locationUri = args[7] println("About to start the AWS Glue Scenario") createDatabase(dbName, locationUri) createCrawler(iam, s3Path, cron, dbName, crawlerName) getCrawler(crawlerName) startCrawler(crawlerName) getDatabase(dbName) getGlueTables(dbName) createJob(jobName, iam, scriptLocation) startJob(jobName) getJobs() getJobRuns(jobName) deleteJob(jobName) println("*** Wait for 5 MIN so the $crawlerName is ready to be deleted") TimeUnit.MINUTES.sleep(5) deleteMyDatabase(dbName) deleteCrawler(crawlerName) } suspend fun createDatabase( dbName: String?, locationUriVal: String?, ) { val input = DatabaseInput { description = "Built with the AWS SDK for Kotlin" name = dbName locationUri = locationUriVal } val request = CreateDatabaseRequest { databaseInput = input } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createDatabase(request) println("The database was successfully created") } } suspend fun createCrawler( iam: String?, s3Path: String?, cron: String?, dbName: String?, crawlerName: String, ) { val s3Target = S3Target { path = s3Path } val targetList = ArrayList<S3Target>() targetList.add(s3Target) val targetOb = CrawlerTargets { s3Targets = targetList } val crawlerRequest = CreateCrawlerRequest { databaseName = dbName name = crawlerName description = "Created by the AWS Glue Java API" targets = targetOb role = iam schedule = cron } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createCrawler(crawlerRequest) println("$crawlerName was successfully created") } } suspend fun getCrawler(crawlerName: String?) { val request = GetCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getCrawler(request) val role = response.crawler?.role println("The role associated with this crawler is $role") } } suspend fun startCrawler(crawlerName: String) { val crawlerRequest = StartCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.startCrawler(crawlerRequest) println("$crawlerName was successfully started.") } } suspend fun getDatabase(databaseName: String?) { val request = GetDatabaseRequest { name = databaseName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getDatabase(request) val dbDesc = response.database?.description println("The database description is $dbDesc") } } suspend fun getGlueTables(dbName: String?) { val tableRequest = GetTablesRequest { databaseName = dbName } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getTables(tableRequest) response.tableList?.forEach { tableName -> println("Table name is ${tableName.name}") } } } suspend fun startJob(jobNameVal: String?) { val runRequest = StartJobRunRequest { workerType = WorkerType.G1X numberOfWorkers = 10 jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.startJobRun(runRequest) println("The job run Id is ${response.jobRunId}") } } suspend fun createJob( jobName: String, iam: String?, scriptLocationVal: String?, ) { val commandOb = JobCommand { pythonVersion = "3" name = "MyJob1" scriptLocation = scriptLocationVal } val jobRequest = CreateJobRequest { description = "A Job created by using the AWS SDK for Java V2" glueVersion = "2.0" workerType = WorkerType.G1X numberOfWorkers = 10 name = jobName role = iam command = commandOb } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.createJob(jobRequest) println("$jobName was successfully created.") } } suspend fun getJobs() { val request = GetJobsRequest { maxResults = 10 } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobs(request) response.jobs?.forEach { job -> println("Job name is ${job.name}") } } } suspend fun getJobRuns(jobNameVal: String?) { val request = GetJobRunsRequest { jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> val response = glueClient.getJobRuns(request) response.jobRuns?.forEach { job -> println("Job name is ${job.jobName}") } } } suspend fun deleteJob(jobNameVal: String) { val jobRequest = DeleteJobRequest { jobName = jobNameVal } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteJob(jobRequest) println("$jobNameVal was successfully deleted") } } suspend fun deleteMyDatabase(databaseName: String) { val request = DeleteDatabaseRequest { name = databaseName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteDatabase(request) println("$databaseName was successfully deleted") } } suspend fun deleteCrawler(crawlerName: String) { val request = DeleteCrawlerRequest { name = crawlerName } GlueClient { region = "us-east-1" }.use { glueClient -> glueClient.deleteCrawler(request) println("$crawlerName was deleted") } }
-
Para API obtener más información, consulta los siguientes temas en la sección AWS SDKde API referencia sobre Kotlin.
-
- PHP
-
- SDK para PHP
-
nota
Hay más información sobre. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. namespace Glue; use Aws\Glue\GlueClient; use Aws\S3\S3Client; use AwsUtilities\AWSServiceClass; use GuzzleHttp\Psr7\Stream; use Iam\IAMService; class GettingStartedWithGlue { public function run() { echo("\n"); echo("--------------------------------------\n"); print("Welcome to the AWS Glue getting started demo using PHP!\n"); echo("--------------------------------------\n"); $clientArgs = [ 'region' => 'us-west-2', 'version' => 'latest', 'profile' => 'default', ]; $uniqid = uniqid(); $glueClient = new GlueClient($clientArgs); $glueService = new GlueService($glueClient); $iamService = new IAMService(); $crawlerName = "example-crawler-test-" . $uniqid; AWSServiceClass::$waitTime = 5; AWSServiceClass::$maxWaitAttempts = 20; $role = $iamService->getRole("AWSGlueServiceRole-DocExample"); $databaseName = "doc-example-database-$uniqid"; $path = 's3://crawler-public-us-east-1/flight/2016/csv'; $glueService->createCrawler($crawlerName, $role['Role']['Arn'], $databaseName, $path); $glueService->startCrawler($crawlerName); echo "Waiting for crawler"; do { $crawler = $glueService->getCrawler($crawlerName); echo "."; sleep(10); } while ($crawler['Crawler']['State'] != "READY"); echo "\n"; $database = $glueService->getDatabase($databaseName); echo "Found a database named " . $database['Database']['Name'] . "\n"; //Upload job script $s3client = new S3Client($clientArgs); $bucketName = "test-glue-bucket-" . $uniqid; $s3client->createBucket([ 'Bucket' => $bucketName, 'CreateBucketConfiguration' => ['LocationConstraint' => 'us-west-2'], ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'run_job.py', 'SourceFile' => __DIR__ . '/flight_etl_job_script.py' ]); $s3client->putObject([ 'Bucket' => $bucketName, 'Key' => 'setup_scenario_getting_started.yaml', 'SourceFile' => __DIR__ . '/setup_scenario_getting_started.yaml' ]); $tables = $glueService->getTables($databaseName); $jobName = 'test-job-' . $uniqid; $scriptLocation = "s3://$bucketName/run_job.py"; $job = $glueService->createJob($jobName, $role['Role']['Arn'], $scriptLocation); $outputBucketUrl = "s3://$bucketName"; $runId = $glueService->startJobRun($jobName, $databaseName, $tables, $outputBucketUrl)['JobRunId']; echo "waiting for job"; do { $jobRun = $glueService->getJobRun($jobName, $runId); echo "."; sleep(10); } while (!array_intersect([$jobRun['JobRun']['JobRunState']], ['SUCCEEDED', 'STOPPED', 'FAILED', 'TIMEOUT'])); echo "\n"; $jobRuns = $glueService->getJobRuns($jobName); $objects = $s3client->listObjects([ 'Bucket' => $bucketName, ])['Contents']; foreach ($objects as $object) { echo $object['Key'] . "\n"; } echo "Downloading " . $objects[1]['Key'] . "\n"; /** @var Stream $downloadObject */ $downloadObject = $s3client->getObject([ 'Bucket' => $bucketName, 'Key' => $objects[1]['Key'], ])['Body']->getContents(); echo "Here is the first 1000 characters in the object."; echo substr($downloadObject, 0, 1000); $jobs = $glueService->listJobs(); echo "Current jobs:\n"; foreach ($jobs['JobNames'] as $jobsName) { echo "{$jobsName}\n"; } echo "Delete the job.\n"; $glueClient->deleteJob([ 'JobName' => $job['Name'], ]); echo "Delete the tables.\n"; foreach ($tables['TableList'] as $table) { $glueService->deleteTable($table['Name'], $databaseName); } echo "Delete the databases.\n"; $glueClient->deleteDatabase([ 'Name' => $databaseName, ]); echo "Delete the crawler.\n"; $glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); $deleteObjects = $s3client->listObjectsV2([ 'Bucket' => $bucketName, ]); echo "Delete all objects in the bucket.\n"; $deleteObjects = $s3client->deleteObjects([ 'Bucket' => $bucketName, 'Delete' => [ 'Objects' => $deleteObjects['Contents'], ] ]); echo "Delete the bucket.\n"; $s3client->deleteBucket(['Bucket' => $bucketName]); echo "This job was brought to you by the number $uniqid\n"; } } namespace Glue; use Aws\Glue\GlueClient; use Aws\Result; use function PHPUnit\Framework\isEmpty; class GlueService extends \AwsUtilities\AWSServiceClass { protected GlueClient $glueClient; public function __construct($glueClient) { $this->glueClient = $glueClient; } public function getCrawler($crawlerName) { return $this->customWaiter(function () use ($crawlerName) { return $this->glueClient->getCrawler([ 'Name' => $crawlerName, ]); }); } public function createCrawler($crawlerName, $role, $databaseName, $path): Result { return $this->customWaiter(function () use ($crawlerName, $role, $databaseName, $path) { return $this->glueClient->createCrawler([ 'Name' => $crawlerName, 'Role' => $role, 'DatabaseName' => $databaseName, 'Targets' => [ 'S3Targets' => [[ 'Path' => $path, ]] ], ]); }); } public function startCrawler($crawlerName): Result { return $this->glueClient->startCrawler([ 'Name' => $crawlerName, ]); } public function getDatabase(string $databaseName): Result { return $this->customWaiter(function () use ($databaseName) { return $this->glueClient->getDatabase([ 'Name' => $databaseName, ]); }); } public function getTables($databaseName): Result { return $this->glueClient->getTables([ 'DatabaseName' => $databaseName, ]); } public function createJob($jobName, $role, $scriptLocation, $pythonVersion = '3', $glueVersion = '3.0'): Result { return $this->glueClient->createJob([ 'Name' => $jobName, 'Role' => $role, 'Command' => [ 'Name' => 'glueetl', 'ScriptLocation' => $scriptLocation, 'PythonVersion' => $pythonVersion, ], 'GlueVersion' => $glueVersion, ]); } public function startJobRun($jobName, $databaseName, $tables, $outputBucketUrl): Result { return $this->glueClient->startJobRun([ 'JobName' => $jobName, 'Arguments' => [ 'input_database' => $databaseName, 'input_table' => $tables['TableList'][0]['Name'], 'output_bucket_url' => $outputBucketUrl, '--input_database' => $databaseName, '--input_table' => $tables['TableList'][0]['Name'], '--output_bucket_url' => $outputBucketUrl, ], ]); } public function listJobs($maxResults = null, $nextToken = null, $tags = []): Result { $arguments = []; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } if (!empty($tags)) { $arguments['Tags'] = $tags; } return $this->glueClient->listJobs($arguments); } public function getJobRuns($jobName, $maxResults = 0, $nextToken = ''): Result { $arguments = ['JobName' => $jobName]; if ($maxResults) { $arguments['MaxResults'] = $maxResults; } if ($nextToken) { $arguments['NextToken'] = $nextToken; } return $this->glueClient->getJobRuns($arguments); } public function getJobRun($jobName, $runId, $predecessorsIncluded = false): Result { return $this->glueClient->getJobRun([ 'JobName' => $jobName, 'RunId' => $runId, 'PredecessorsIncluded' => $predecessorsIncluded, ]); } public function deleteJob($jobName) { return $this->glueClient->deleteJob([ 'JobName' => $jobName, ]); } public function deleteTable($tableName, $databaseName) { return $this->glueClient->deleteTable([ 'DatabaseName' => $databaseName, 'Name' => $tableName, ]); } public function deleteDatabase($databaseName) { return $this->glueClient->deleteDatabase([ 'Name' => $databaseName, ]); } public function deleteCrawler($crawlerName) { return $this->glueClient->deleteCrawler([ 'Name' => $crawlerName, ]); } }
-
Para API obtener más información, consulte los siguientes temas en AWS SDK for PHP APIReference.
-
- Python
-
- SDKpara Python (Boto3)
-
nota
Hay más información. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Cree una clase que agrupe AWS Glue las funciones utilizadas en el escenario.
class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def get_crawler(self, name): """ Gets information about a crawler. :param name: The name of the crawler to look up. :return: Data about the crawler. """ crawler = None try: response = self.glue_client.get_crawler(Name=name) crawler = response["Crawler"] except ClientError as err: if err.response["Error"]["Code"] == "EntityNotFoundException": logger.info("Crawler %s doesn't exist.", name) else: logger.error( "Couldn't get crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return crawler def create_crawler(self, name, role_arn, db_name, db_prefix, s3_target): """ Creates a crawler that can crawl the specified target and populate a database in your AWS Glue Data Catalog with metadata that describes the data in the target. :param name: The name of the crawler. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param db_name: The name to give the database that is created by the crawler. :param db_prefix: The prefix to give any database tables that are created by the crawler. :param s3_target: The URL to an S3 bucket that contains data that is the target of the crawler. """ try: self.glue_client.create_crawler( Name=name, Role=role_arn, DatabaseName=db_name, TablePrefix=db_prefix, Targets={"S3Targets": [{"Path": s3_target}]}, ) except ClientError as err: logger.error( "Couldn't create crawler. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_crawler(self, name): """ Starts a crawler. The crawler crawls its configured target and creates metadata that describes the data it finds in the target data source. :param name: The name of the crawler to start. """ try: self.glue_client.start_crawler(Name=name) except ClientError as err: logger.error( "Couldn't start crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def get_database(self, name): """ Gets information about a database in your Data Catalog. :param name: The name of the database to look up. :return: Information about the database. """ try: response = self.glue_client.get_database(Name=name) except ClientError as err: logger.error( "Couldn't get database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["Database"] def get_tables(self, db_name): """ Gets a list of tables in a Data Catalog database. :param db_name: The name of the database to query. :return: The list of tables in the database. """ try: response = self.glue_client.get_tables(DatabaseName=db_name) except ClientError as err: logger.error( "Couldn't get tables %s. Here's why: %s: %s", db_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["TableList"] def create_job(self, name, description, role_arn, script_location): """ Creates a job definition for an extract, transform, and load (ETL) job that can be run by AWS Glue. :param name: The name of the job definition. :param description: The description of the job definition. :param role_arn: The ARN of an IAM role that grants AWS Glue the permissions it requires to run the job. :param script_location: The Amazon S3 URL of a Python ETL script that is run as part of the job. The script defines how the data is transformed. """ try: self.glue_client.create_job( Name=name, Description=description, Role=role_arn, Command={ "Name": "glueetl", "ScriptLocation": script_location, "PythonVersion": "3", }, GlueVersion="3.0", ) except ClientError as err: logger.error( "Couldn't create job %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def start_job_run(self, name, input_database, input_table, output_bucket_name): """ Starts a job run. A job run extracts data from the source, transforms it, and loads it to the output bucket. :param name: The name of the job definition. :param input_database: The name of the metadata database that contains tables that describe the source data. This is typically created by a crawler. :param input_table: The name of the table in the metadata database that describes the source data. :param output_bucket_name: The S3 bucket where the output is written. :return: The ID of the job run. """ try: # The custom Arguments that are passed to this function are used by the # Python ETL script to determine the location of input and output data. response = self.glue_client.start_job_run( JobName=name, Arguments={ "--input_database": input_database, "--input_table": input_table, "--output_bucket_url": f"s3://{output_bucket_name}/", }, ) except ClientError as err: logger.error( "Couldn't start job run %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRunId"] def list_jobs(self): """ Lists the names of job definitions in your account. :return: The list of job definition names. """ try: response = self.glue_client.list_jobs() except ClientError as err: logger.error( "Couldn't list jobs. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobNames"] def get_job_runs(self, job_name): """ Gets information about runs that have been performed for a specific job definition. :param job_name: The name of the job definition to look up. :return: The list of job runs. """ try: response = self.glue_client.get_job_runs(JobName=job_name) except ClientError as err: logger.error( "Couldn't get job runs for %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRuns"] def get_job_run(self, name, run_id): """ Gets information about a single job run. :param name: The name of the job definition for the run. :param run_id: The ID of the run. :return: Information about the run. """ try: response = self.glue_client.get_job_run(JobName=name, RunId=run_id) except ClientError as err: logger.error( "Couldn't get job run %s/%s. Here's why: %s: %s", name, run_id, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response["JobRun"] def delete_job(self, job_name): """ Deletes a job definition. This also deletes data about all runs that are associated with this job definition. :param job_name: The name of the job definition to delete. """ try: self.glue_client.delete_job(JobName=job_name) except ClientError as err: logger.error( "Couldn't delete job %s. Here's why: %s: %s", job_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_table(self, db_name, table_name): """ Deletes a table from a metadata database. :param db_name: The name of the database that contains the table. :param table_name: The name of the table to delete. """ try: self.glue_client.delete_table(DatabaseName=db_name, Name=table_name) except ClientError as err: logger.error( "Couldn't delete table %s. Here's why: %s: %s", table_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_database(self, name): """ Deletes a metadata database from your Data Catalog. :param name: The name of the database to delete. """ try: self.glue_client.delete_database(Name=name) except ClientError as err: logger.error( "Couldn't delete database %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise def delete_crawler(self, name): """ Deletes a crawler. :param name: The name of the crawler to delete. """ try: self.glue_client.delete_crawler(Name=name) except ClientError as err: logger.error( "Couldn't delete crawler %s. Here's why: %s: %s", name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
Crear una clase que ejecute el escenario.
class GlueCrawlerJobScenario: """ Encapsulates a scenario that shows how to create an AWS Glue crawler and job and use them to transform data from CSV to JSON format. """ def __init__(self, glue_client, glue_service_role, glue_bucket): """ :param glue_client: A Boto3 AWS Glue client. :param glue_service_role: An AWS Identity and Access Management (IAM) role that AWS Glue can assume to gain access to the resources it requires. :param glue_bucket: An S3 bucket that can hold a job script and output data from AWS Glue job runs. """ self.glue_client = glue_client self.glue_service_role = glue_service_role self.glue_bucket = glue_bucket @staticmethod def wait(seconds, tick=12): """ Waits for a specified number of seconds, while also displaying an animated spinner. :param seconds: The number of seconds to wait. :param tick: The number of frames per second used to animate the spinner. """ progress = "|/-\\" waited = 0 while waited < seconds: for frame in range(tick): sys.stdout.write(f"\r{progress[frame % len(progress)]}") sys.stdout.flush() time.sleep(1 / tick) waited += 1 def upload_job_script(self, job_script): """ Uploads a Python ETL script to an S3 bucket. The script is used by the AWS Glue job to transform data. :param job_script: The relative path to the job script. """ try: self.glue_bucket.upload_file(Filename=job_script, Key=job_script) print(f"Uploaded job script '{job_script}' to the example bucket.") except S3UploadFailedError as err: logger.error("Couldn't upload job script. Here's why: %s", err) raise def run(self, crawler_name, db_name, db_prefix, data_source, job_script, job_name): """ Runs the scenario. This is an interactive experience that runs at a command prompt and asks you for input throughout. :param crawler_name: The name of the crawler used in the scenario. If the crawler does not exist, it is created. :param db_name: The name to give the metadata database created by the crawler. :param db_prefix: The prefix to give tables added to the database by the crawler. :param data_source: The location of the data source that is targeted by the crawler and extracted during job runs. :param job_script: The job script that is used to transform data during job runs. :param job_name: The name to give the job definition that is created during the scenario. """ wrapper = GlueWrapper(self.glue_client) print(f"Checking for crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) if crawler is None: print(f"Creating crawler {crawler_name}.") wrapper.create_crawler( crawler_name, self.glue_service_role.arn, db_name, db_prefix, data_source, ) print(f"Created crawler {crawler_name}.") crawler = wrapper.get_crawler(crawler_name) pprint(crawler) print("-" * 88) print( f"When you run the crawler, it crawls data stored in {data_source} and " f"creates a metadata database in the AWS Glue Data Catalog that describes " f"the data in the data source." ) print("In this example, the source data is in CSV format.") ready = False while not ready: ready = Question.ask_question( "Ready to start the crawler? (y/n) ", Question.is_yesno ) wrapper.start_crawler(crawler_name) print("Let's wait for the crawler to run. This typically takes a few minutes.") crawler_state = None while crawler_state != "READY": self.wait(10) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler["State"] print(f"Crawler is {crawler['State']}.") print("-" * 88) database = wrapper.get_database(db_name) print(f"The crawler created database {db_name}:") pprint(database) print(f"The database contains these tables:") tables = wrapper.get_tables(db_name) for index, table in enumerate(tables): print(f"\t{index + 1}. {table['Name']}") table_index = Question.ask_question( f"Enter the number of a table to see more detail: ", Question.is_int, Question.in_range(1, len(tables)), ) pprint(tables[table_index - 1]) print("-" * 88) print(f"Creating job definition {job_name}.") wrapper.create_job( job_name, "Getting started example job.", self.glue_service_role.arn, f"s3://{self.glue_bucket.name}/{job_script}", ) print("Created job definition.") print( f"When you run the job, it extracts data from {data_source}, transforms it " f"by using the {job_script} script, and loads the output into " f"S3 bucket {self.glue_bucket.name}." ) print( "In this example, the data is transformed from CSV to JSON, and only a few " "fields are included in the output." ) job_run_status = None if Question.ask_question(f"Ready to run? (y/n) ", Question.is_yesno): job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["Name"], self.glue_bucket.name ) print(f"Job {job_name} started. Let's wait for it to run.") while job_run_status not in ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"]: self.wait(10) job_run = wrapper.get_job_run(job_name, job_run_id) job_run_status = job_run["JobRunState"] print(f"Job {job_name}/{job_run_id} is {job_run_status}.") print("-" * 88) if job_run_status == "SUCCEEDED": print( f"Data from your job run is stored in your S3 bucket '{self.glue_bucket.name}':" ) try: keys = [ obj.key for obj in self.glue_bucket.objects.filter(Prefix="run-") ] for index, key in enumerate(keys): print(f"\t{index + 1}: {key}") lines = 4 key_index = Question.ask_question( f"Enter the number of a block to download it and see the first {lines} " f"lines of JSON output in the block: ", Question.is_int, Question.in_range(1, len(keys)), ) job_data = io.BytesIO() self.glue_bucket.download_fileobj(keys[key_index - 1], job_data) job_data.seek(0) for _ in range(lines): print(job_data.readline().decode("utf-8")) except ClientError as err: logger.error( "Couldn't get job run data. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise print("-" * 88) job_names = wrapper.list_jobs() if job_names: print(f"Your account has {len(job_names)} jobs defined:") for index, job_name in enumerate(job_names): print(f"\t{index + 1}. {job_name}") job_index = Question.ask_question( f"Enter a number between 1 and {len(job_names)} to see the list of runs for " f"a job: ", Question.is_int, Question.in_range(1, len(job_names)), ) job_runs = wrapper.get_job_runs(job_names[job_index - 1]) if job_runs: print(f"Found {len(job_runs)} runs for job {job_names[job_index - 1]}:") for index, job_run in enumerate(job_runs): print( f"\t{index + 1}. {job_run['JobRunState']} on " f"{job_run['CompletedOn']:%Y-%m-%d %H:%M:%S}" ) run_index = Question.ask_question( f"Enter a number between 1 and {len(job_runs)} to see details for a run: ", Question.is_int, Question.in_range(1, len(job_runs)), ) pprint(job_runs[run_index - 1]) else: print(f"No runs found for job {job_names[job_index - 1]}") else: print("Your account doesn't have any jobs defined.") print("-" * 88) print( f"Let's clean up. During this example we created job definition '{job_name}'." ) if Question.ask_question( "Do you want to delete the definition and all runs? (y/n) ", Question.is_yesno, ): wrapper.delete_job(job_name) print(f"Job definition '{job_name}' deleted.") tables = wrapper.get_tables(db_name) print(f"We also created database '{db_name}' that contains these tables:") for table in tables: print(f"\t{table['Name']}") if Question.ask_question( "Do you want to delete the tables and the database? (y/n) ", Question.is_yesno, ): for table in tables: wrapper.delete_table(db_name, table["Name"]) print(f"Deleted table {table['Name']}.") wrapper.delete_database(db_name) print(f"Deleted database {db_name}.") print(f"We also created crawler '{crawler_name}'.") if Question.ask_question( "Do you want to delete the crawler? (y/n) ", Question.is_yesno ): wrapper.delete_crawler(crawler_name) print(f"Deleted crawler {crawler_name}.") print("-" * 88) def parse_args(args): """ Parse command line arguments. :param args: The command line arguments. :return: The parsed arguments. """ parser = argparse.ArgumentParser( description="Runs the AWS Glue getting started with crawlers and jobs scenario. " "Before you run this scenario, set up scaffold resources by running " "'python scaffold.py deploy'." ) parser.add_argument( "role_name", help="The name of an IAM role that AWS Glue can assume. This role must grant access " "to Amazon S3 and to the permissions granted by the AWSGlueServiceRole " "managed policy.", ) parser.add_argument( "bucket_name", help="The name of an S3 bucket that AWS Glue can access to get the job script and " "put job results.", ) parser.add_argument( "--job_script", default="flight_etl_job_script.py", help="The name of the job script file that is used in the scenario.", ) return parser.parse_args(args) def main(): args = parse_args(sys.argv[1:]) try: print("-" * 88) print( "Welcome to the AWS Glue getting started with crawlers and jobs scenario." ) print("-" * 88) scenario = GlueCrawlerJobScenario( boto3.client("glue"), boto3.resource("iam").Role(args.role_name), boto3.resource("s3").Bucket(args.bucket_name), ) scenario.upload_job_script(args.job_script) scenario.run( "doc-example-crawler", "doc-example-database", "doc-example-", "s3://crawler-public-us-east-1/flight/2016/csv", args.job_script, "doc-example-job", ) print("-" * 88) print( "To destroy scaffold resources, including the IAM role and S3 bucket " "used in this scenario, run 'python scaffold.py destroy'." ) print("\nThanks for watching!") print("-" * 88) except Exception: logging.exception("Something went wrong with the example.")
Cree un ETL script que sirva AWS Glue para extraer, transformar y cargar datos durante la ejecución de los trabajos.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
-
Para API obtener más información, consulte los siguientes temas en la sección AWS SDKde referencia sobre Python (Boto3). API
-
- Ruby
-
- SDKpara Ruby
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Cree una clase que agrupe AWS Glue las funciones utilizadas en el escenario.
# The `GlueWrapper` class serves as a wrapper around the AWS Glue API, providing a simplified interface for common operations. # It encapsulates the functionality of the AWS SDK for Glue and provides methods for interacting with Glue crawlers, databases, tables, jobs, and S3 resources. # The class initializes with a Glue client and a logger, allowing it to make API calls and log any errors or informational messages. class GlueWrapper def initialize(glue_client, logger) @glue_client = glue_client @logger = logger end # Retrieves information about a specific crawler. # # @param name [String] The name of the crawler to retrieve information about. # @return [Aws::Glue::Types::Crawler, nil] The crawler object if found, or nil if not found. def get_crawler(name) @glue_client.get_crawler(name: name) rescue Aws::Glue::Errors::EntityNotFoundException @logger.info("Crawler #{name} doesn't exist.") false rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get crawler #{name}: \n#{e.message}") raise end # Creates a new crawler with the specified configuration. # # @param name [String] The name of the crawler. # @param role_arn [String] The ARN of the IAM role to be used by the crawler. # @param db_name [String] The name of the database where the crawler stores its metadata. # @param db_prefix [String] The prefix to be added to the names of tables that the crawler creates. # @param s3_target [String] The S3 path that the crawler will crawl. # @return [void] def create_crawler(name, role_arn, db_name, db_prefix, s3_target) @glue_client.create_crawler( name: name, role: role_arn, database_name: db_name, targets: { s3_targets: [ { path: s3_target } ] } ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create crawler: \n#{e.message}") raise end # Starts a crawler with the specified name. # # @param name [String] The name of the crawler to start. # @return [void] def start_crawler(name) @glue_client.start_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not start crawler #{name}: \n#{e.message}") raise end # Deletes a crawler with the specified name. # # @param name [String] The name of the crawler to delete. # @return [void] def delete_crawler(name) @glue_client.delete_crawler(name: name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete crawler #{name}: \n#{e.message}") raise end # Retrieves information about a specific database. # # @param name [String] The name of the database to retrieve information about. # @return [Aws::Glue::Types::Database, nil] The database object if found, or nil if not found. def get_database(name) response = @glue_client.get_database(name: name) response.database rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get database #{name}: \n#{e.message}") raise end # Retrieves a list of tables in the specified database. # # @param db_name [String] The name of the database to retrieve tables from. # @return [Array<Aws::Glue::Types::Table>] def get_tables(db_name) response = @glue_client.get_tables(database_name: db_name) response.table_list rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get tables #{db_name}: \n#{e.message}") raise end # Creates a new job with the specified configuration. # # @param name [String] The name of the job. # @param description [String] The description of the job. # @param role_arn [String] The ARN of the IAM role to be used by the job. # @param script_location [String] The location of the ETL script for the job. # @return [void] def create_job(name, description, role_arn, script_location) @glue_client.create_job( name: name, description: description, role: role_arn, command: { name: "glueetl", script_location: script_location, python_version: "3" }, glue_version: "3.0" ) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not create job #{name}: \n#{e.message}") raise end # Starts a job run for the specified job. # # @param name [String] The name of the job to start the run for. # @param input_database [String] The name of the input database for the job. # @param input_table [String] The name of the input table for the job. # @param output_bucket_name [String] The name of the output S3 bucket for the job. # @return [String] The ID of the started job run. def start_job_run(name, input_database, input_table, output_bucket_name) response = @glue_client.start_job_run( job_name: name, arguments: { '--input_database': input_database, '--input_table': input_table, '--output_bucket_url': "s3://#{output_bucket_name}/" } ) response.job_run_id rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not start job run #{name}: \n#{e.message}") raise end # Retrieves a list of jobs in AWS Glue. # # @return [Aws::Glue::Types::ListJobsResponse] def list_jobs @glue_client.list_jobs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not list jobs: \n#{e.message}") raise end # Retrieves a list of job runs for the specified job. # # @param job_name [String] The name of the job to retrieve job runs for. # @return [Array<Aws::Glue::Types::JobRun>] def get_job_runs(job_name) response = @glue_client.get_job_runs(job_name: job_name) response.job_runs rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Retrieves data for a specific job run. # # @param job_name [String] The name of the job run to retrieve data for. # @return [Glue::Types::GetJobRunResponse] def get_job_run(job_name, run_id) @glue_client.get_job_run(job_name: job_name, run_id: run_id) rescue Aws::Glue::Errors::GlueException => e @logger.error("Glue could not get job runs: \n#{e.message}") end # Deletes a job with the specified name. # # @param job_name [String] The name of the job to delete. # @return [void] def delete_job(job_name) @glue_client.delete_job(job_name: job_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Deletes a table with the specified name. # # @param database_name [String] The name of the catalog database in which the table resides. # @param table_name [String] The name of the table to be deleted. # @return [void] def delete_table(database_name, table_name) @glue_client.delete_table(database_name: database_name, name: table_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete job: \n#{e.message}") end # Removes a specified database from a Data Catalog. # # @param database_name [String] The name of the database to delete. # @return [void] def delete_database(database_name) @glue_client.delete_database(name: database_name) rescue Aws::Glue::Errors::ServiceError => e @logger.error("Glue could not delete database: \n#{e.message}") end # Uploads a job script file to an S3 bucket. # # @param file_path [String] The local path of the job script file. # @param bucket_resource [Aws::S3::Bucket] The S3 bucket resource to upload the file to. # @return [void] def upload_job_script(file_path, bucket_resource) File.open(file_path) do |file| bucket_resource.client.put_object({ body: file, bucket: bucket_resource.name, key: file_path }) end rescue Aws::S3::Errors::S3UploadFailedError => e @logger.error("S3 could not upload job script: \n#{e.message}") raise end end
Crear una clase que ejecute el escenario.
class GlueCrawlerJobScenario def initialize(glue_client, glue_service_role, glue_bucket, logger) @glue_client = glue_client @glue_service_role = glue_service_role @glue_bucket = glue_bucket @logger = logger end def run(crawler_name, db_name, db_prefix, data_source, job_script, job_name) wrapper = GlueWrapper.new(@glue_client, @logger) new_step(1, "Create a crawler") puts "Checking for crawler #{crawler_name}." crawler = wrapper.get_crawler(crawler_name) if crawler == false puts "Creating crawler #{crawler_name}." wrapper.create_crawler(crawler_name, @glue_service_role.arn, db_name, db_prefix, data_source) puts "Successfully created #{crawler_name}:" crawler = wrapper.get_crawler(crawler_name) puts JSON.pretty_generate(crawler).yellow end print "\nDone!\n".green new_step(2, "Run a crawler to output a database.") puts "Location of input data analyzed by crawler: #{data_source}" puts "Outputs: a Data Catalog database in CSV format containing metadata on input." wrapper.start_crawler(crawler_name) puts "Starting crawler... (this typically takes a few minutes)" crawler_state = nil while crawler_state != "READY" custom_wait(15) crawler = wrapper.get_crawler(crawler_name) crawler_state = crawler[0]["state"] print "Status check: #{crawler_state}.".yellow end print "\nDone!\n".green new_step(3, "Query the database.") database = wrapper.get_database(db_name) puts "The crawler created database #{db_name}:" print "#{database}".yellow puts "\nThe database contains these tables:" tables = wrapper.get_tables(db_name) tables.each_with_index do |table, index| print "\t#{index + 1}. #{table['name']}".yellow end print "\nDone!\n".green new_step(4, "Create a job definition that runs an ETL script.") puts "Uploading Python ETL script to S3..." wrapper.upload_job_script(job_script, @glue_bucket) puts "Creating job definition #{job_name}:\n" response = wrapper.create_job(job_name, "Getting started example job.", @glue_service_role.arn, "s3://#{@glue_bucket.name}/#{job_script}") puts JSON.pretty_generate(response).yellow print "\nDone!\n".green new_step(5, "Start a new job") job_run_status = nil job_run_id = wrapper.start_job_run( job_name, db_name, tables[0]["name"], @glue_bucket.name ) puts "Job #{job_name} started. Let's wait for it to run." until ["SUCCEEDED", "STOPPED", "FAILED", "TIMEOUT"].include?(job_run_status) custom_wait(10) job_run = wrapper.get_job_runs(job_name) job_run_status = job_run[0]["job_run_state"] print "Status check: #{job_name}/#{job_run_id} - #{job_run_status}.".yellow end print "\nDone!\n".green new_step(6, "View results from a successful job run.") if job_run_status == "SUCCEEDED" puts "Data from your job run is stored in your S3 bucket '#{@glue_bucket.name}'. Files include:" begin # Print the key name of each object in the bucket. @glue_bucket.objects.each do |object_summary| if object_summary.key.include?("run-") print "#{object_summary.key}".yellow end end # Print the first 256 bytes of a run file desired_sample_objects = 1 @glue_bucket.objects.each do |object_summary| if object_summary.key.include?("run-") if desired_sample_objects > 0 sample_object = @glue_bucket.object(object_summary.key) sample = sample_object.get(range: "bytes=0-255").body.read puts "\nSample run file contents:" print "#{sample}".yellow desired_sample_objects -= 1 end end end rescue Aws::S3::Errors::ServiceError => e logger.error( "Couldn't get job run data. Here's why: %s: %s", e.response.error.code, e.response.error.message ) raise end end print "\nDone!\n".green new_step(7, "Delete job definition and crawler.") wrapper.delete_job(job_name) puts "Job deleted: #{job_name}." wrapper.delete_crawler(crawler_name) puts "Crawler deleted: #{crawler_name}." wrapper.delete_table(db_name, tables[0]["name"]) puts "Table deleted: #{tables[0]["name"]} in #{db_name}." wrapper.delete_database(db_name) puts "Database deleted: #{db_name}." print "\nDone!\n".green end end def main banner("../../helpers/banner.txt") puts "######################################################################################################".yellow puts "# #".yellow puts "# EXAMPLE CODE DEMO: #".yellow puts "# AWS Glue #".yellow puts "# #".yellow puts "######################################################################################################".yellow puts "" puts "You have launched a demo of AWS Glue using the AWS for Ruby v3 SDK. Over the next 60 seconds, it will" puts "do the following:" puts " 1. Create a crawler." puts " 2. Run a crawler to output a database." puts " 3. Query the database." puts " 4. Create a job definition that runs an ETL script." puts " 5. Start a new job." puts " 6. View results from a successful job run." puts " 7. Delete job definition and crawler." puts "" confirm_begin billing security puts "\e[H\e[2J" # Set input file names job_script_filepath = "job_script.py" resource_names = YAML.load_file("resource_names.yaml") # Instantiate existing IAM role. iam = Aws::IAM::Resource.new(region: "us-east-1") iam_role_name = resource_names["glue_service_role"] iam_role = iam.role(iam_role_name) # Instantiate existing S3 bucket. s3 = Aws::S3::Resource.new(region: "us-east-1") s3_bucket_name = resource_names["glue_bucket"] s3_bucket = s3.bucket(s3_bucket_name) scenario = GlueCrawlerJobScenario.new( Aws::Glue::Client.new(region: "us-east-1"), iam_role, s3_bucket, @logger ) random_int = rand(10 ** 4) scenario.run( "doc-example-crawler-#{random_int}", "doc-example-database-#{random_int}", "doc-example-#{random_int}-", "s3://crawler-public-us-east-1/flight/2016/csv", job_script_filepath, "doc-example-job-#{random_int}" ) puts "-" * 88 puts "You have reached the end of this tour of AWS Glue." puts "To destroy CDK-created resources, run:\n cdk destroy" puts "-" * 88 end
Cree un ETL script que sirva AWS Glue para extraer, transformar y cargar datos durante la ejecución de los trabajos.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job """ These custom arguments must be passed as Arguments to the StartJobRun request. --input_database The name of a metadata database that is contained in your AWS Glue Data Catalog and that contains tables that describe the data to be processed. --input_table The name of a table in the database that describes the data to be processed. --output_bucket_url An S3 bucket that receives the transformed output data. """ args = getResolvedOptions( sys.argv, ["JOB_NAME", "input_database", "input_table", "output_bucket_url"] ) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args["JOB_NAME"], args) # Script generated for node S3 Flight Data. S3FlightData_node1 = glueContext.create_dynamic_frame.from_catalog( database=args["input_database"], table_name=args["input_table"], transformation_ctx="S3FlightData_node1", ) # This mapping performs two main functions: # 1. It simplifies the output by removing most of the fields from the data. # 2. It renames some fields. For example, `fl_date` is renamed to `flight_date`. ApplyMapping_node2 = ApplyMapping.apply( frame=S3FlightData_node1, mappings=[ ("year", "long", "year", "long"), ("month", "long", "month", "tinyint"), ("day_of_month", "long", "day", "tinyint"), ("fl_date", "string", "flight_date", "string"), ("carrier", "string", "carrier", "string"), ("fl_num", "long", "flight_num", "long"), ("origin_city_name", "string", "origin_city_name", "string"), ("origin_state_abr", "string", "origin_state_abr", "string"), ("dest_city_name", "string", "dest_city_name", "string"), ("dest_state_abr", "string", "dest_state_abr", "string"), ("dep_time", "long", "departure_time", "long"), ("wheels_off", "long", "wheels_off", "long"), ("wheels_on", "long", "wheels_on", "long"), ("arr_time", "long", "arrival_time", "long"), ("mon", "string", "mon", "string"), ], transformation_ctx="ApplyMapping_node2", ) # Script generated for node Revised Flight Data. RevisedFlightData_node3 = glueContext.write_dynamic_frame.from_options( frame=ApplyMapping_node2, connection_type="s3", format="json", connection_options={"path": args["output_bucket_url"], "partitionKeys": []}, transformation_ctx="RevisedFlightData_node3", ) job.commit()
-
Para API obtener más información, consulte los siguientes temas en AWS SDK for Ruby APIReference.
-
- Rust
-
- SDKpara Rust
-
nota
Hay más información GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS
. Cree y ejecute un rastreador que rastree un bucket público de Amazon Simple Storage Service (Amazon S3) y genere una base de datos de metadatos que describa los CSV datos con formato B que encuentre.
let create_crawler = glue .create_crawler() .name(self.crawler()) .database_name(self.database()) .role(self.iam_role.expose_secret()) .targets( CrawlerTargets::builder() .s3_targets(S3Target::builder().path(CRAWLER_TARGET).build()) .build(), ) .send() .await; match create_crawler { Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::AlreadyExistsException(_) => { info!("Using existing crawler"); Ok(()) } _ => Err(GlueMvpError::GlueSdk(glue_err)), } } Ok(_) => Ok(()), }?; let start_crawler = glue.start_crawler().name(self.crawler()).send().await; match start_crawler { Ok(_) => Ok(()), Err(err) => { let glue_err: aws_sdk_glue::Error = err.into(); match glue_err { aws_sdk_glue::Error::CrawlerRunningException(_) => Ok(()), _ => Err(GlueMvpError::GlueSdk(glue_err)), } } }?;
Enumere la información sobre las bases de datos y las tablas de su. AWS Glue Data Catalog
let database = glue .get_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)? .to_owned(); let database = database .database() .ok_or_else(|| GlueMvpError::Unknown("Could not find database".into()))?; let tables = glue .get_tables() .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let tables = tables.table_list();
Cree y ejecute un trabajo que extraiga CSV datos del bucket de Amazon S3 de origen, los transforme eliminando campos y cambiándoles el nombre, y cargue la salida JSON con formato en otro bucket de Amazon S3.
let create_job = glue .create_job() .name(self.job()) .role(self.iam_role.expose_secret()) .command( JobCommand::builder() .name("glueetl") .python_version("3") .script_location(format!("s3://{}/job.py", self.bucket())) .build(), ) .glue_version("3.0") .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job_name = create_job.name().ok_or_else(|| { GlueMvpError::Unknown("Did not get job name after creating job".into()) })?; let job_run_output = glue .start_job_run() .job_name(self.job()) .arguments("--input_database", self.database()) .arguments( "--input_table", self.tables .first() .ok_or_else(|| GlueMvpError::Unknown("Missing crawler table".into()))? .name(), ) .arguments("--output_bucket_url", self.bucket()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; let job = job_run_output .job_run_id() .ok_or_else(|| GlueMvpError::Unknown("Missing run id from just started job".into()))? .to_string();
Elimine todos los recursos creados en la demostración.
glue.delete_job() .job_name(self.job()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; for t in &self.tables { glue.delete_table() .name(t.name()) .database_name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; } glue.delete_database() .name(self.database()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?; glue.delete_crawler() .name(self.crawler()) .send() .await .map_err(GlueMvpError::from_glue_sdk)?;
-
Para API obtener más información, consulte los siguientes temas como referencia sobre AWS SDK Rust. API
-
Para obtener una lista completa de guías para AWS SDK desarrolladores y ejemplos de código, consulteUso de este servicio con un AWS SDK. En este tema también se incluye información sobre cómo empezar y detalles sobre SDK las versiones anteriores.