SageMaker ejemplos que utilizan AWS SDK for .NET - Ejemplos de código de AWS SDK

Hay más AWS SDK ejemplos disponibles en el GitHub repositorio de AWS Doc SDK Examples.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

SageMaker ejemplos que utilizan AWS SDK for .NET

Los siguientes ejemplos de código muestran cómo realizar acciones e implementar escenarios comunes mediante el uso del AWS SDK for .NET with SageMaker.

Las acciones son extractos de código de programas más grandes y deben ejecutarse en contexto. Mientras las acciones muestran cómo llamar a las funciones de servicio individuales, es posible ver las acciones en contexto en los escenarios relacionados y en los ejemplos entre servicios.

Los escenarios son ejemplos de código que muestran cómo llevar a cabo una tarea específica llamando a varias funciones dentro del mismo servicio.

Cada ejemplo incluye un enlace a GitHub, donde puede encontrar instrucciones sobre cómo configurar y ejecutar el código en su contexto.

Introducción

Los siguientes ejemplos de código muestran cómo empezar a usarlo SageMaker.

AWS SDK for .NET
nota

Hay más información al respecto GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

using Amazon.SageMaker; using Amazon.SageMaker.Model; namespace SageMakerActions; public static class HelloSageMaker { static async Task Main(string[] args) { var sageMakerClient = new AmazonSageMakerClient(); Console.WriteLine($"Hello Amazon SageMaker! Let's list some of your notebook instances:"); Console.WriteLine(); // You can use await and any of the async methods to get a response. // Let's get the first five notebook instances. var response = await sageMakerClient.ListNotebookInstancesAsync( new ListNotebookInstancesRequest() { MaxResults = 5 }); if (!response.NotebookInstances.Any()) { Console.WriteLine($"No notebook instances found."); Console.WriteLine("See https://docs.aws.amazon.com/sagemaker/latest/dg/howitworks-create-ws.html to create one."); } foreach (var notebookInstance in response.NotebookInstances) { Console.WriteLine($"\tInstance: {notebookInstance.NotebookInstanceName}"); Console.WriteLine($"\tArn: {notebookInstance.NotebookInstanceArn}"); Console.WriteLine($"\tCreation Date: {notebookInstance.CreationTime.ToShortDateString()}"); Console.WriteLine(); } } }

Acciones

En el siguiente ejemplo de código, se muestra cómo usar CreatePipeline.

AWS SDK for .NET
nota

Hay más información sobre GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }
  • Para API obtener más información, consulte CreatePipelinela AWS SDK for .NET APIReferencia.

En el siguiente ejemplo de código, se muestra cómo usar DeletePipeline.

AWS SDK for .NET
nota

Hay más información sobre GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

/// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; }
  • Para API obtener más información, consulte DeletePipelinela AWS SDK for .NET APIReferencia.

En el siguiente ejemplo de código, se muestra cómo usar DescribePipelineExecution.

AWS SDK for .NET
nota

Hay más información sobre GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

/// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; }

En el siguiente ejemplo de código, se muestra cómo usar StartPipelineExecution.

AWS SDK for .NET
nota

Hay más información sobre GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

/// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; }

En el siguiente ejemplo de código, se muestra cómo usar UpdatePipeline.

AWS SDK for .NET
nota

Hay más información sobre GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }
  • Para API obtener más información, consulte UpdatePipelinela AWS SDK for .NET APIReferencia.

Escenarios

En el siguiente ejemplo de código, se muestra cómo:

  • Configurar los recursos de una canalización

  • Configurar una canalización que ejecuta un trabajo geoespacial

  • Iniciar la ejecución de una canalización.

  • Supervisar el estado de la ejecución.

  • Ver el resultado de la canalización.

  • Limpiar recursos.

Para obtener más información, consulte Crear y ejecutar SageMaker canalizaciones con AWS SDKs Community.aws.

AWS SDK for .NET
nota

Hay más información sobre. GitHub Busque el ejemplo completo y aprenda a configurar y ejecutar en el Repositorio de ejemplos de código de AWS.

Crea una clase que abarque SageMaker las operaciones.

using System.Text.Json; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; namespace SageMakerActions; /// <summary> /// Wrapper class for Amazon SageMaker actions and logic. /// </summary> public class SageMakerWrapper { private readonly IAmazonSageMaker _amazonSageMaker; public SageMakerWrapper(IAmazonSageMaker amazonSageMaker) { _amazonSageMaker = amazonSageMaker; } /// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } } /// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; } /// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; } /// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; } }

Crea una función que gestione las devoluciones de llamadas de la SageMaker canalización.

using System.Text.Json; using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; // Assembly attribute to enable the AWS Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SageMakerLambda; /// <summary> /// The AWS Lambda function handler for the Amazon SageMaker pipeline. /// </summary> public class SageMakerLambdaFunction { /// <summary> /// Default constructor. This constructor is used by AWS Lambda to construct the instance. When invoked in a Lambda environment /// the AWS credentials will come from the AWS Identity and Access Management (IAM) role associated with the function. The AWS Region will be set to the /// Region that the Lambda function is running in. /// </summary> public SageMakerLambdaFunction() { } /// <summary> /// The AWS Lambda function handler that processes events from the SageMaker pipeline and starts a job or export. /// </summary> /// <param name="request">The custom SageMaker pipeline request object.</param> /// <param name="context">The Lambda context.</param> /// <returns>The dictionary of output parameters.</returns> public async Task<Dictionary<string, string>> FunctionHandler(PipelineRequest request, ILambdaContext context) { var geoSpatialClient = new AmazonSageMakerGeospatialClient(); var sageMakerClient = new AmazonSageMakerClient(); var responseDictionary = new Dictionary<string, string>(); context.Logger.LogInformation("Function handler started with request: " + JsonSerializer.Serialize(request)); if (request.Records != null && request.Records.Any()) { context.Logger.LogInformation("Records found, this is a queue event. Processing the queue records."); foreach (var message in request.Records) { await ProcessMessageAsync(message, context, geoSpatialClient, sageMakerClient); } } else if (!string.IsNullOrEmpty(request.vej_export_config)) { context.Logger.LogInformation("Export configuration found, this is an export. Start the Vector Enrichment Job (VEJ) export."); var outputConfig = JsonSerializer.Deserialize<ExportVectorEnrichmentJobOutputConfig>( request.vej_export_config); var exportResponse = await geoSpatialClient.ExportVectorEnrichmentJobAsync( new ExportVectorEnrichmentJobRequest() { Arn = request.vej_arn, ExecutionRoleArn = request.Role, OutputConfig = outputConfig }); context.Logger.LogInformation($"Export response: {JsonSerializer.Serialize(exportResponse)}"); responseDictionary = new Dictionary<string, string> { { "export_eoj_status", exportResponse.ExportStatus.ToString() }, { "vej_arn", exportResponse.Arn } }; } else if (!string.IsNullOrEmpty(request.vej_name)) { context.Logger.LogInformation("Vector Enrichment Job name found, starting the job."); var inputConfig = JsonSerializer.Deserialize<VectorEnrichmentJobInputConfig>( request.vej_input_config); var jobConfig = JsonSerializer.Deserialize<VectorEnrichmentJobConfig>( request.vej_config); var jobResponse = await geoSpatialClient.StartVectorEnrichmentJobAsync( new StartVectorEnrichmentJobRequest() { ExecutionRoleArn = request.Role, InputConfig = inputConfig, Name = request.vej_name, JobConfig = jobConfig }); context.Logger.LogInformation("Job response: " + JsonSerializer.Serialize(jobResponse)); responseDictionary = new Dictionary<string, string> { { "vej_arn", jobResponse.Arn }, { "statusCode", jobResponse.HttpStatusCode.ToString() } }; } return responseDictionary; } /// <summary> /// Process a queue message and check the status of a SageMaker job. /// </summary> /// <param name="message">The queue message.</param> /// <param name="context">The Lambda context.</param> /// <param name="geoClient">The SageMaker GeoSpatial client.</param> /// <param name="sageMakerClient">The SageMaker client.</param> /// <returns>Async task.</returns> private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context, AmazonSageMakerGeospatialClient geoClient, AmazonSageMakerClient sageMakerClient) { context.Logger.LogInformation($"Processed message {message.Body}"); // Get information about the SageMaker job. var payload = JsonSerializer.Deserialize<QueuePayload>(message.Body); context.Logger.LogInformation($"Payload token {payload!.token}"); var token = payload.token; if (payload.arguments.ContainsKey("vej_arn")) { // Use the job ARN and the token to get the job status. var job_arn = payload.arguments["vej_arn"]; context.Logger.LogInformation($"Token: {token}, arn {job_arn}"); var jobInfo = geoClient.GetVectorEnrichmentJobAsync( new GetVectorEnrichmentJobRequest() { Arn = job_arn }); context.Logger.LogInformation("Job info: " + JsonSerializer.Serialize(jobInfo)); if (jobInfo.Result.Status == VectorEnrichmentJobStatus.COMPLETED) { context.Logger.LogInformation($"Status completed, resuming pipeline..."); await sageMakerClient.SendPipelineExecutionStepSuccessAsync( new SendPipelineExecutionStepSuccessRequest() { CallbackToken = token, OutputParameters = new List<OutputParameter>() { new OutputParameter() { Name = "export_status", Value = jobInfo.Result.Status } } }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.FAILED) { context.Logger.LogInformation($"Status failed, stopping pipeline..."); await sageMakerClient.SendPipelineExecutionStepFailureAsync( new SendPipelineExecutionStepFailureRequest() { CallbackToken = token, FailureReason = jobInfo.Result.ErrorDetails.ErrorMessage }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.IN_PROGRESS) { // Put this message back in the queue to reprocess later. context.Logger.LogInformation( $"Status still in progress, check back later."); throw new("Job still running."); } } } }

Ejecutar un escenario interactivo en un símbolo del sistema.

public static class PipelineWorkflow { public static IAmazonIdentityManagementService _iamClient = null!; public static SageMakerWrapper _sageMakerWrapper = null!; public static IAmazonSQS _sqsClient = null!; public static IAmazonS3 _s3Client = null!; public static IAmazonLambda _lambdaClient = null!; public static IConfiguration _configuration = null!; public static string lambdaFunctionName = "SageMakerExampleFunction"; public static string sageMakerRoleName = "SageMakerExampleRole"; public static string lambdaRoleName = "SageMakerExampleLambdaRole"; private static string[] lambdaRolePolicies = null!; private static string[] sageMakerRolePolicies = null!; static async Task Main(string[] args) { var options = new AWSOptions() { Region = RegionEndpoint.USWest2 }; // Set up dependency injection for the AWS service. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonIdentityManagementService>(options) .AddAWSService<IAmazonEC2>(options) .AddAWSService<IAmazonSageMaker>(options) .AddAWSService<IAmazonSageMakerGeospatial>(options) .AddAWSService<IAmazonSQS>(options) .AddAWSService<IAmazonS3>(options) .AddAWSService<IAmazonLambda>(options) .AddTransient<SageMakerWrapper>() ) .Build(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally, load local settings. .Build(); ServicesSetup(host); string queueUrl = ""; string queueName = _configuration["queueName"]; string bucketName = _configuration["bucketName"]; var pipelineName = _configuration["pipelineName"]; try { Console.WriteLine(new string('-', 80)); Console.WriteLine( "Welcome to the Amazon SageMaker pipeline example scenario."); Console.WriteLine( "\nThis example workflow will guide you through setting up and running an" + "\nAmazon SageMaker pipeline. The pipeline uses an AWS Lambda function and an" + "\nAmazon SQS Queue. It runs a vector enrichment reverse geocode job to" + "\nreverse geocode addresses in an input file and store the results in an export file."); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine( "First, we will set up the roles, functions, and queue needed by the SageMaker pipeline."); Console.WriteLine(new string('-', 80)); var lambdaRoleArn = await CreateLambdaRole(); var sageMakerRoleArn = await CreateSageMakerRole(); var functionArn = await SetupLambda(lambdaRoleArn, true); queueUrl = await SetupQueue(queueName); await SetupBucket(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can create and run our pipeline."); Console.WriteLine(new string('-', 80)); await SetupPipeline(sageMakerRoleArn, functionArn, pipelineName); var executionArn = await ExecutePipeline(queueUrl, sageMakerRoleArn, pipelineName, bucketName); await WaitForPipelineExecution(executionArn); await GetOutputResults(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("The pipeline has completed. To view the pipeline and runs " + "in SageMaker Studio, follow these instructions:" + "\nhttps://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html"); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine("Finally, let's clean up our resources."); Console.WriteLine(new string('-', 80)); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("SageMaker pipeline scenario is complete."); Console.WriteLine(new string('-', 80)); } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { _sageMakerWrapper = host.Services.GetRequiredService<SageMakerWrapper>(); _iamClient = host.Services.GetRequiredService<IAmazonIdentityManagementService>(); _sqsClient = host.Services.GetRequiredService<IAmazonSQS>(); _s3Client = host.Services.GetRequiredService<IAmazonS3>(); _lambdaClient = host.Services.GetRequiredService<IAmazonLambda>(); } /// <summary> /// Set up AWS Lambda, either by updating an existing function or creating a new function. /// </summary> /// <param name="roleArn">The role Amazon Resource Name (ARN) to use for the Lambda function.</param> /// <param name="askUser">True to ask the user before updating.</param> /// <returns>The ARN of the function.</returns> public static async Task<string> SetupLambda(string roleArn, bool askUser) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Setting up the Lambda function for the pipeline."); var handlerName = "SageMakerLambda::SageMakerLambda.SageMakerLambdaFunction::FunctionHandler"; var functionArn = ""; try { var functionInfo = await _lambdaClient.GetFunctionAsync(new GetFunctionRequest() { FunctionName = lambdaFunctionName }); var updateFunction = true; if (askUser) { updateFunction = GetYesNoResponse( $"\tThe Lambda function {lambdaFunctionName} already exists, do you want to update it?"); } if (updateFunction) { // Update the Lambda function. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); await _lambdaClient.UpdateFunctionCodeAsync( new UpdateFunctionCodeRequest() { FunctionName = lambdaFunctionName, ZipFile = zipMemoryStream, }); } functionArn = functionInfo.Configuration.FunctionArn; } catch (ResourceNotFoundException) { Console.WriteLine($"\tThe Lambda function {lambdaFunctionName} was not found, creating the new function."); // Create the function if it does not already exist. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); var createResult = await _lambdaClient.CreateFunctionAsync( new CreateFunctionRequest() { FunctionName = lambdaFunctionName, Runtime = Runtime.Dotnet6, Description = "SageMaker example function.", Code = new FunctionCode() { ZipFile = zipMemoryStream }, Handler = handlerName, Role = roleArn, Timeout = 30 }); functionArn = createResult.FunctionArn; } Console.WriteLine($"\tLambda ready with ARN {functionArn}."); Console.WriteLine(new string('-', 80)); return functionArn; } /// <summary> /// Create a role to be used by AWS Lambda. Does not create the role if it already exists. /// </summary> /// <returns>The role ARN.</returns> public static async Task<string> CreateLambdaRole() { Console.WriteLine(new string('-', 80)); lambdaRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSQSFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy", "arn:aws:iam::aws:policy/service-role/" + "AWSLambdaSQSQueueExecutionRole" }; var roleArn = await GetRoleArnIfExists(lambdaRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to for AWS Lambda to use."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = lambdaRoleName }); foreach (var policy in lambdaRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = lambdaRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Create a role to be used by SageMaker. /// </summary> /// <returns>The role Amazon Resource Name (ARN).</returns> public static async Task<string> CreateSageMakerRole() { Console.WriteLine(new string('-', 80)); sageMakerRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSageMakerGeospatialFullAccess", }; var roleArn = await GetRoleArnIfExists(sageMakerRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to use with SageMaker."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = sageMakerRoleName }); foreach (var policy in sageMakerRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = sageMakerRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Set up the SQS queue to use with the pipeline. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <returns>The URL for the queue.</returns> public static async Task<string> SetupQueue(string queueName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up queue {queueName}."); try { var queueInfo = await _sqsClient.GetQueueUrlAsync(new GetQueueUrlRequest() { QueueName = queueName }); return queueInfo.QueueUrl; } catch (QueueDoesNotExistException) { var attrs = new Dictionary<string, string> { { QueueAttributeName.DelaySeconds, "5" }, { QueueAttributeName.ReceiveMessageWaitTimeSeconds, "5" }, { QueueAttributeName.VisibilityTimeout, "300" }, }; var request = new CreateQueueRequest { Attributes = attrs, QueueName = queueName, }; var response = await _sqsClient.CreateQueueAsync(request); Thread.Sleep(10000); await ConnectLambda(response.QueueUrl); Console.WriteLine($"\tQueue ready with Url {response.QueueUrl}."); Console.WriteLine(new string('-', 80)); return response.QueueUrl; } } /// <summary> /// Connect the queue to the Lambda function as an event source. /// </summary> /// <param name="queueUrl">The URL for the queue.</param> /// <returns>Async task.</returns> public static async Task ConnectLambda(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Connecting the Lambda function and queue for the pipeline."); var queueAttributes = await _sqsClient.GetQueueAttributesAsync( new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { "All" } }); var queueArn = queueAttributes.QueueARN; var eventSource = await _lambdaClient.ListEventSourceMappingsAsync( new ListEventSourceMappingsRequest() { FunctionName = lambdaFunctionName }); if (!eventSource.EventSourceMappings.Any()) { // Only add the event source mapping if it does not already exist. await _lambdaClient.CreateEventSourceMappingAsync( new CreateEventSourceMappingRequest() { EventSourceArn = queueArn, FunctionName = lambdaFunctionName, Enabled = true }); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the bucket to use for pipeline input and output. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task SetupBucket(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up bucket {bucketName}."); var bucketExists = await Amazon.S3.Util.AmazonS3Util.DoesS3BucketExistV2Async(_s3Client, bucketName); if (!bucketExists) { await _s3Client.PutBucketAsync(new PutBucketRequest() { BucketName = bucketName, BucketRegion = S3Region.USWest2 }); Thread.Sleep(5000); await _s3Client.PutObjectAsync(new PutObjectRequest() { BucketName = bucketName, Key = "samplefiles/latlongtest.csv", FilePath = "latlongtest.csv" }); } Console.WriteLine($"\tBucket {bucketName} ready."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Display some results from the output directory. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task<string> GetOutputResults(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Getting output results {bucketName}."); string outputKey = ""; Thread.Sleep(15000); var outputFiles = await _s3Client.ListObjectsAsync( new ListObjectsRequest() { BucketName = bucketName, Prefix = "outputfiles/" }); if (outputFiles.S3Objects.Any()) { var sampleOutput = outputFiles.S3Objects.OrderBy(s => s.LastModified).Last(); Console.WriteLine($"\tOutput file: {sampleOutput.Key}"); var outputSampleResponse = await _s3Client.GetObjectAsync( new GetObjectRequest() { BucketName = bucketName, Key = sampleOutput.Key }); outputKey = sampleOutput.Key; StreamReader reader = new StreamReader(outputSampleResponse.ResponseStream); await reader.ReadLineAsync(); Console.WriteLine("\tOutput file contents: \n"); for (int i = 0; i < 10; i++) { if (!reader.EndOfStream) { Console.WriteLine("\t" + await reader.ReadLineAsync()); } } } Console.WriteLine(new string('-', 80)); return outputKey; } /// <summary> /// Create a pipeline from the example pipeline JSON /// that includes the Lambda, callback, processing, and export jobs. /// </summary> /// <param name="roleArn">The ARN of the role for the pipeline.</param> /// <param name="functionArn">The ARN of the Lambda function for the pipeline.</param> /// <param name="pipelineName">The name for the pipeline.</param> /// <returns>The ARN of the pipeline.</returns> public static async Task<string> SetupPipeline(string roleArn, string functionArn, string pipelineName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up the pipeline."); var pipelineJson = await File.ReadAllTextAsync("GeoSpatialPipeline.json"); // Add the correct function ARN instead of the placeholder. pipelineJson = pipelineJson.Replace("*FUNCTION_ARN*", functionArn); var pipelineArn = await _sageMakerWrapper.SetupPipeline(pipelineJson, roleArn, pipelineName, "sdk example pipeline", pipelineName); Console.WriteLine($"\tPipeline set up with ARN {pipelineArn}."); Console.WriteLine(new string('-', 80)); return pipelineArn; } /// <summary> /// Start a pipeline run with job configurations. /// </summary> /// <param name="queueUrl">The URL for the queue used in the pipeline.</param> /// <param name="roleArn">The ARN of the role.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>The pipeline run ARN.</returns> public static async Task<string> ExecutePipeline( string queueUrl, string roleArn, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Starting pipeline execution."); var input = $"s3://{bucketName}/samplefiles/latlongtest.csv"; var output = $"s3://{bucketName}/outputfiles/"; var executionARN = await _sageMakerWrapper.ExecutePipeline(queueUrl, input, output, pipelineName, roleArn); Console.WriteLine($"\tRun started with ARN {executionARN}."); Console.WriteLine(new string('-', 80)); return executionARN; } /// <summary> /// Wait for a pipeline run to complete. /// </summary> /// <param name="executionArn">The pipeline run ARN.</param> /// <returns>Async task.</returns> public static async Task WaitForPipelineExecution(string executionArn) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Waiting for pipeline to finish."); PipelineExecutionStatus status; do { status = await _sageMakerWrapper.CheckPipelineExecutionStatus(executionArn); Thread.Sleep(30000); Console.WriteLine($"\tStatus is {status}."); } while (status == PipelineExecutionStatus.Executing); Console.WriteLine($"\tPipeline finished with status {status}."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <param name="askUser">True to ask the user for cleanup.</param> /// <param name="queueUrl">The URL of the queue to clean up.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>Async task.</returns> public static async Task<bool> CleanupResources( bool askUser, string queueUrl, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); if (!askUser || GetYesNoResponse($"\tDelete pipeline {pipelineName}? (y/n)")) { Console.WriteLine($"\tDeleting pipeline."); // Delete the pipeline. await _sageMakerWrapper.DeletePipelineByName(pipelineName); } if (!string.IsNullOrEmpty(queueUrl) && (!askUser || GetYesNoResponse($"\tDelete queue {queueUrl}? (y/n)"))) { Console.WriteLine($"\tDeleting queue."); // Delete the queue. await _sqsClient.DeleteQueueAsync(new DeleteQueueRequest(queueUrl)); } if (!askUser || GetYesNoResponse($"\tDelete Amazon S3 bucket {bucketName}? (y/n)")) { Console.WriteLine($"\tDeleting bucket."); // Delete all objects in the bucket. var deleteList = await _s3Client.ListObjectsV2Async(new ListObjectsV2Request() { BucketName = bucketName }); if (deleteList.KeyCount > 0) { await _s3Client.DeleteObjectsAsync(new DeleteObjectsRequest() { BucketName = bucketName, Objects = deleteList.S3Objects .Select(o => new KeyVersion { Key = o.Key }).ToList() }); } // Now delete the bucket. await _s3Client.DeleteBucketAsync(new DeleteBucketRequest() { BucketName = bucketName }); } if (!askUser || GetYesNoResponse($"\tDelete lambda {lambdaFunctionName}? (y/n)")) { Console.WriteLine($"\tDeleting lambda function."); await _lambdaClient.DeleteFunctionAsync(new DeleteFunctionRequest() { FunctionName = lambdaFunctionName }); } if (!askUser || GetYesNoResponse($"\tDelete role {lambdaRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in lambdaRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = lambdaRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = lambdaRoleName }); } if (!askUser || GetYesNoResponse($"\tDelete role {sageMakerRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in sageMakerRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = sageMakerRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = sageMakerRoleName }); } Console.WriteLine(new string('-', 80)); return true; } /// <summary> /// Helper method to get a role's ARN if it already exists. /// </summary> /// <param name="roleName">The name of the AWS Identity and Access Management (IAM) Role to look for.</param> /// <returns>The role ARN if it exists, otherwise an empty string.</returns> private static async Task<string> GetRoleArnIfExists(string roleName) { Console.WriteLine($"Checking for role named {roleName}."); try { var existingRole = await _iamClient.GetRoleAsync(new GetRoleRequest() { RoleName = lambdaRoleName }); return existingRole.Role.Arn; } catch (NoSuchEntityException) { return string.Empty; } } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } }