SageMaker 사용 예제 AWS SDK for .NET - AWS SDK코드 예제

AWS 문서 AWS SDK SDK 예제 GitHub 리포지토리에 더 많은 예제가 있습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

SageMaker 사용 예제 AWS SDK for .NET

다음 코드 예제는 with 를 사용하여 작업을 수행하고 일반적인 시나리오를 구현하는 방법을 보여줍니다 SageMaker. AWS SDK for .NET

작업은 대규모 프로그램에서 발췌한 코드이며 컨텍스트에 맞춰 실행해야 합니다. 작업은 개별 서비스 함수를 호출하는 방법을 보여 주며 관련 시나리오와 교차 서비스 예시에서 컨텍스트에 맞는 작업을 볼 수 있습니다.

시나리오는 동일한 서비스 내에서 여러 함수를 호출하여 특정 태스크를 수행하는 방법을 보여주는 코드 예시입니다.

각 예제에는 컨텍스트에서 코드를 설정하고 실행하는 방법에 대한 지침을 찾을 수 있는 링크가 포함되어 있습니다. GitHub

시작하기

다음 코드 예제는 사용을 시작하는 방법을 보여줍니다 SageMaker.

AWS SDK for .NET
참고

더 많은 정보가 있습니다 GitHub. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

using Amazon.SageMaker; using Amazon.SageMaker.Model; namespace SageMakerActions; public static class HelloSageMaker { static async Task Main(string[] args) { var sageMakerClient = new AmazonSageMakerClient(); Console.WriteLine($"Hello Amazon SageMaker! Let's list some of your notebook instances:"); Console.WriteLine(); // You can use await and any of the async methods to get a response. // Let's get the first five notebook instances. var response = await sageMakerClient.ListNotebookInstancesAsync( new ListNotebookInstancesRequest() { MaxResults = 5 }); if (!response.NotebookInstances.Any()) { Console.WriteLine($"No notebook instances found."); Console.WriteLine("See https://docs.aws.amazon.com/sagemaker/latest/dg/howitworks-create-ws.html to create one."); } foreach (var notebookInstance in response.NotebookInstances) { Console.WriteLine($"\tInstance: {notebookInstance.NotebookInstanceName}"); Console.WriteLine($"\tArn: {notebookInstance.NotebookInstanceArn}"); Console.WriteLine($"\tCreation Date: {notebookInstance.CreationTime.ToShortDateString()}"); Console.WriteLine(); } } }

작업

다음 코드 예시에서는 CreatePipeline을 사용하는 방법을 보여 줍니다.

AWS SDK for .NET
참고

더 많은 정보가 있습니다 GitHub. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }

다음 코드 예시에서는 DeletePipeline을 사용하는 방법을 보여 줍니다.

AWS SDK for .NET
참고

더 많은 정보가 있습니다 GitHub. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

/// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; }

다음 코드 예시에서는 DescribePipelineExecution을 사용하는 방법을 보여 줍니다.

AWS SDK for .NET
참고

더 많은 정보가 있습니다 GitHub. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

/// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; }

다음 코드 예시에서는 StartPipelineExecution을 사용하는 방법을 보여 줍니다.

AWS SDK for .NET
참고

더 많은 정보가 있습니다 GitHub. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

/// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; }

다음 코드 예시에서는 UpdatePipeline을 사용하는 방법을 보여 줍니다.

AWS SDK for .NET
참고

더 많은 정보가 있습니다 GitHub. AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }

시나리오

다음 코드 예제에서는 다음과 같은 작업을 수행하는 방법을 보여줍니다.

  • 파이프라인의 리소스를 설정하세요.

  • 지리 공간 작업을 실행하는 파이프라인을 설정합니다.

  • 파이프라인 실행을 시작합니다.

  • 실행 상태를 모니터링합니다.

  • 파이프라인의 출력을 볼 수 있습니다.

  • 리소스를 정리합니다.

자세한 내용은 Community.AWS를 사용하여 SageMaker AWS SDKs 파이프라인 생성 및 실행을 참조하십시오.

AWS SDK for .NET
참고

자세한 내용은 다음과 같습니다. GitHub AWS 코드 예시 리포지토리에서 전체 예시를 찾고 설정 및 실행하는 방법을 배워보세요.

SageMaker 작업을 래핑하는 클래스를 만드세요.

using System.Text.Json; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; namespace SageMakerActions; /// <summary> /// Wrapper class for Amazon SageMaker actions and logic. /// </summary> public class SageMakerWrapper { private readonly IAmazonSageMaker _amazonSageMaker; public SageMakerWrapper(IAmazonSageMaker amazonSageMaker) { _amazonSageMaker = amazonSageMaker; } /// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } } /// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; } /// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; } /// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; } }

파이프라인의 콜백을 처리하는 함수를 만드십시오. SageMaker

using System.Text.Json; using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; // Assembly attribute to enable the AWS Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SageMakerLambda; /// <summary> /// The AWS Lambda function handler for the Amazon SageMaker pipeline. /// </summary> public class SageMakerLambdaFunction { /// <summary> /// Default constructor. This constructor is used by AWS Lambda to construct the instance. When invoked in a Lambda environment /// the AWS credentials will come from the AWS Identity and Access Management (IAM) role associated with the function. The AWS Region will be set to the /// Region that the Lambda function is running in. /// </summary> public SageMakerLambdaFunction() { } /// <summary> /// The AWS Lambda function handler that processes events from the SageMaker pipeline and starts a job or export. /// </summary> /// <param name="request">The custom SageMaker pipeline request object.</param> /// <param name="context">The Lambda context.</param> /// <returns>The dictionary of output parameters.</returns> public async Task<Dictionary<string, string>> FunctionHandler(PipelineRequest request, ILambdaContext context) { var geoSpatialClient = new AmazonSageMakerGeospatialClient(); var sageMakerClient = new AmazonSageMakerClient(); var responseDictionary = new Dictionary<string, string>(); context.Logger.LogInformation("Function handler started with request: " + JsonSerializer.Serialize(request)); if (request.Records != null && request.Records.Any()) { context.Logger.LogInformation("Records found, this is a queue event. Processing the queue records."); foreach (var message in request.Records) { await ProcessMessageAsync(message, context, geoSpatialClient, sageMakerClient); } } else if (!string.IsNullOrEmpty(request.vej_export_config)) { context.Logger.LogInformation("Export configuration found, this is an export. Start the Vector Enrichment Job (VEJ) export."); var outputConfig = JsonSerializer.Deserialize<ExportVectorEnrichmentJobOutputConfig>( request.vej_export_config); var exportResponse = await geoSpatialClient.ExportVectorEnrichmentJobAsync( new ExportVectorEnrichmentJobRequest() { Arn = request.vej_arn, ExecutionRoleArn = request.Role, OutputConfig = outputConfig }); context.Logger.LogInformation($"Export response: {JsonSerializer.Serialize(exportResponse)}"); responseDictionary = new Dictionary<string, string> { { "export_eoj_status", exportResponse.ExportStatus.ToString() }, { "vej_arn", exportResponse.Arn } }; } else if (!string.IsNullOrEmpty(request.vej_name)) { context.Logger.LogInformation("Vector Enrichment Job name found, starting the job."); var inputConfig = JsonSerializer.Deserialize<VectorEnrichmentJobInputConfig>( request.vej_input_config); var jobConfig = JsonSerializer.Deserialize<VectorEnrichmentJobConfig>( request.vej_config); var jobResponse = await geoSpatialClient.StartVectorEnrichmentJobAsync( new StartVectorEnrichmentJobRequest() { ExecutionRoleArn = request.Role, InputConfig = inputConfig, Name = request.vej_name, JobConfig = jobConfig }); context.Logger.LogInformation("Job response: " + JsonSerializer.Serialize(jobResponse)); responseDictionary = new Dictionary<string, string> { { "vej_arn", jobResponse.Arn }, { "statusCode", jobResponse.HttpStatusCode.ToString() } }; } return responseDictionary; } /// <summary> /// Process a queue message and check the status of a SageMaker job. /// </summary> /// <param name="message">The queue message.</param> /// <param name="context">The Lambda context.</param> /// <param name="geoClient">The SageMaker GeoSpatial client.</param> /// <param name="sageMakerClient">The SageMaker client.</param> /// <returns>Async task.</returns> private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context, AmazonSageMakerGeospatialClient geoClient, AmazonSageMakerClient sageMakerClient) { context.Logger.LogInformation($"Processed message {message.Body}"); // Get information about the SageMaker job. var payload = JsonSerializer.Deserialize<QueuePayload>(message.Body); context.Logger.LogInformation($"Payload token {payload!.token}"); var token = payload.token; if (payload.arguments.ContainsKey("vej_arn")) { // Use the job ARN and the token to get the job status. var job_arn = payload.arguments["vej_arn"]; context.Logger.LogInformation($"Token: {token}, arn {job_arn}"); var jobInfo = geoClient.GetVectorEnrichmentJobAsync( new GetVectorEnrichmentJobRequest() { Arn = job_arn }); context.Logger.LogInformation("Job info: " + JsonSerializer.Serialize(jobInfo)); if (jobInfo.Result.Status == VectorEnrichmentJobStatus.COMPLETED) { context.Logger.LogInformation($"Status completed, resuming pipeline..."); await sageMakerClient.SendPipelineExecutionStepSuccessAsync( new SendPipelineExecutionStepSuccessRequest() { CallbackToken = token, OutputParameters = new List<OutputParameter>() { new OutputParameter() { Name = "export_status", Value = jobInfo.Result.Status } } }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.FAILED) { context.Logger.LogInformation($"Status failed, stopping pipeline..."); await sageMakerClient.SendPipelineExecutionStepFailureAsync( new SendPipelineExecutionStepFailureRequest() { CallbackToken = token, FailureReason = jobInfo.Result.ErrorDetails.ErrorMessage }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.IN_PROGRESS) { // Put this message back in the queue to reprocess later. context.Logger.LogInformation( $"Status still in progress, check back later."); throw new("Job still running."); } } } }

명령 프롬프트에서 대화형 시나리오를 실행합니다.

public static class PipelineWorkflow { public static IAmazonIdentityManagementService _iamClient = null!; public static SageMakerWrapper _sageMakerWrapper = null!; public static IAmazonSQS _sqsClient = null!; public static IAmazonS3 _s3Client = null!; public static IAmazonLambda _lambdaClient = null!; public static IConfiguration _configuration = null!; public static string lambdaFunctionName = "SageMakerExampleFunction"; public static string sageMakerRoleName = "SageMakerExampleRole"; public static string lambdaRoleName = "SageMakerExampleLambdaRole"; private static string[] lambdaRolePolicies = null!; private static string[] sageMakerRolePolicies = null!; static async Task Main(string[] args) { var options = new AWSOptions() { Region = RegionEndpoint.USWest2 }; // Set up dependency injection for the AWS service. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonIdentityManagementService>(options) .AddAWSService<IAmazonEC2>(options) .AddAWSService<IAmazonSageMaker>(options) .AddAWSService<IAmazonSageMakerGeospatial>(options) .AddAWSService<IAmazonSQS>(options) .AddAWSService<IAmazonS3>(options) .AddAWSService<IAmazonLambda>(options) .AddTransient<SageMakerWrapper>() ) .Build(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally, load local settings. .Build(); ServicesSetup(host); string queueUrl = ""; string queueName = _configuration["queueName"]; string bucketName = _configuration["bucketName"]; var pipelineName = _configuration["pipelineName"]; try { Console.WriteLine(new string('-', 80)); Console.WriteLine( "Welcome to the Amazon SageMaker pipeline example scenario."); Console.WriteLine( "\nThis example workflow will guide you through setting up and running an" + "\nAmazon SageMaker pipeline. The pipeline uses an AWS Lambda function and an" + "\nAmazon SQS Queue. It runs a vector enrichment reverse geocode job to" + "\nreverse geocode addresses in an input file and store the results in an export file."); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine( "First, we will set up the roles, functions, and queue needed by the SageMaker pipeline."); Console.WriteLine(new string('-', 80)); var lambdaRoleArn = await CreateLambdaRole(); var sageMakerRoleArn = await CreateSageMakerRole(); var functionArn = await SetupLambda(lambdaRoleArn, true); queueUrl = await SetupQueue(queueName); await SetupBucket(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can create and run our pipeline."); Console.WriteLine(new string('-', 80)); await SetupPipeline(sageMakerRoleArn, functionArn, pipelineName); var executionArn = await ExecutePipeline(queueUrl, sageMakerRoleArn, pipelineName, bucketName); await WaitForPipelineExecution(executionArn); await GetOutputResults(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("The pipeline has completed. To view the pipeline and runs " + "in SageMaker Studio, follow these instructions:" + "\nhttps://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html"); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine("Finally, let's clean up our resources."); Console.WriteLine(new string('-', 80)); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("SageMaker pipeline scenario is complete."); Console.WriteLine(new string('-', 80)); } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { _sageMakerWrapper = host.Services.GetRequiredService<SageMakerWrapper>(); _iamClient = host.Services.GetRequiredService<IAmazonIdentityManagementService>(); _sqsClient = host.Services.GetRequiredService<IAmazonSQS>(); _s3Client = host.Services.GetRequiredService<IAmazonS3>(); _lambdaClient = host.Services.GetRequiredService<IAmazonLambda>(); } /// <summary> /// Set up AWS Lambda, either by updating an existing function or creating a new function. /// </summary> /// <param name="roleArn">The role Amazon Resource Name (ARN) to use for the Lambda function.</param> /// <param name="askUser">True to ask the user before updating.</param> /// <returns>The ARN of the function.</returns> public static async Task<string> SetupLambda(string roleArn, bool askUser) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Setting up the Lambda function for the pipeline."); var handlerName = "SageMakerLambda::SageMakerLambda.SageMakerLambdaFunction::FunctionHandler"; var functionArn = ""; try { var functionInfo = await _lambdaClient.GetFunctionAsync(new GetFunctionRequest() { FunctionName = lambdaFunctionName }); var updateFunction = true; if (askUser) { updateFunction = GetYesNoResponse( $"\tThe Lambda function {lambdaFunctionName} already exists, do you want to update it?"); } if (updateFunction) { // Update the Lambda function. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); await _lambdaClient.UpdateFunctionCodeAsync( new UpdateFunctionCodeRequest() { FunctionName = lambdaFunctionName, ZipFile = zipMemoryStream, }); } functionArn = functionInfo.Configuration.FunctionArn; } catch (ResourceNotFoundException) { Console.WriteLine($"\tThe Lambda function {lambdaFunctionName} was not found, creating the new function."); // Create the function if it does not already exist. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); var createResult = await _lambdaClient.CreateFunctionAsync( new CreateFunctionRequest() { FunctionName = lambdaFunctionName, Runtime = Runtime.Dotnet6, Description = "SageMaker example function.", Code = new FunctionCode() { ZipFile = zipMemoryStream }, Handler = handlerName, Role = roleArn, Timeout = 30 }); functionArn = createResult.FunctionArn; } Console.WriteLine($"\tLambda ready with ARN {functionArn}."); Console.WriteLine(new string('-', 80)); return functionArn; } /// <summary> /// Create a role to be used by AWS Lambda. Does not create the role if it already exists. /// </summary> /// <returns>The role ARN.</returns> public static async Task<string> CreateLambdaRole() { Console.WriteLine(new string('-', 80)); lambdaRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSQSFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy", "arn:aws:iam::aws:policy/service-role/" + "AWSLambdaSQSQueueExecutionRole" }; var roleArn = await GetRoleArnIfExists(lambdaRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to for AWS Lambda to use."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = lambdaRoleName }); foreach (var policy in lambdaRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = lambdaRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Create a role to be used by SageMaker. /// </summary> /// <returns>The role Amazon Resource Name (ARN).</returns> public static async Task<string> CreateSageMakerRole() { Console.WriteLine(new string('-', 80)); sageMakerRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSageMakerGeospatialFullAccess", }; var roleArn = await GetRoleArnIfExists(sageMakerRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to use with SageMaker."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = sageMakerRoleName }); foreach (var policy in sageMakerRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = sageMakerRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Set up the SQS queue to use with the pipeline. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <returns>The URL for the queue.</returns> public static async Task<string> SetupQueue(string queueName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up queue {queueName}."); try { var queueInfo = await _sqsClient.GetQueueUrlAsync(new GetQueueUrlRequest() { QueueName = queueName }); return queueInfo.QueueUrl; } catch (QueueDoesNotExistException) { var attrs = new Dictionary<string, string> { { QueueAttributeName.DelaySeconds, "5" }, { QueueAttributeName.ReceiveMessageWaitTimeSeconds, "5" }, { QueueAttributeName.VisibilityTimeout, "300" }, }; var request = new CreateQueueRequest { Attributes = attrs, QueueName = queueName, }; var response = await _sqsClient.CreateQueueAsync(request); Thread.Sleep(10000); await ConnectLambda(response.QueueUrl); Console.WriteLine($"\tQueue ready with Url {response.QueueUrl}."); Console.WriteLine(new string('-', 80)); return response.QueueUrl; } } /// <summary> /// Connect the queue to the Lambda function as an event source. /// </summary> /// <param name="queueUrl">The URL for the queue.</param> /// <returns>Async task.</returns> public static async Task ConnectLambda(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Connecting the Lambda function and queue for the pipeline."); var queueAttributes = await _sqsClient.GetQueueAttributesAsync( new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { "All" } }); var queueArn = queueAttributes.QueueARN; var eventSource = await _lambdaClient.ListEventSourceMappingsAsync( new ListEventSourceMappingsRequest() { FunctionName = lambdaFunctionName }); if (!eventSource.EventSourceMappings.Any()) { // Only add the event source mapping if it does not already exist. await _lambdaClient.CreateEventSourceMappingAsync( new CreateEventSourceMappingRequest() { EventSourceArn = queueArn, FunctionName = lambdaFunctionName, Enabled = true }); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the bucket to use for pipeline input and output. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task SetupBucket(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up bucket {bucketName}."); var bucketExists = await Amazon.S3.Util.AmazonS3Util.DoesS3BucketExistV2Async(_s3Client, bucketName); if (!bucketExists) { await _s3Client.PutBucketAsync(new PutBucketRequest() { BucketName = bucketName, BucketRegion = S3Region.USWest2 }); Thread.Sleep(5000); await _s3Client.PutObjectAsync(new PutObjectRequest() { BucketName = bucketName, Key = "samplefiles/latlongtest.csv", FilePath = "latlongtest.csv" }); } Console.WriteLine($"\tBucket {bucketName} ready."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Display some results from the output directory. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task<string> GetOutputResults(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Getting output results {bucketName}."); string outputKey = ""; Thread.Sleep(15000); var outputFiles = await _s3Client.ListObjectsAsync( new ListObjectsRequest() { BucketName = bucketName, Prefix = "outputfiles/" }); if (outputFiles.S3Objects.Any()) { var sampleOutput = outputFiles.S3Objects.OrderBy(s => s.LastModified).Last(); Console.WriteLine($"\tOutput file: {sampleOutput.Key}"); var outputSampleResponse = await _s3Client.GetObjectAsync( new GetObjectRequest() { BucketName = bucketName, Key = sampleOutput.Key }); outputKey = sampleOutput.Key; StreamReader reader = new StreamReader(outputSampleResponse.ResponseStream); await reader.ReadLineAsync(); Console.WriteLine("\tOutput file contents: \n"); for (int i = 0; i < 10; i++) { if (!reader.EndOfStream) { Console.WriteLine("\t" + await reader.ReadLineAsync()); } } } Console.WriteLine(new string('-', 80)); return outputKey; } /// <summary> /// Create a pipeline from the example pipeline JSON /// that includes the Lambda, callback, processing, and export jobs. /// </summary> /// <param name="roleArn">The ARN of the role for the pipeline.</param> /// <param name="functionArn">The ARN of the Lambda function for the pipeline.</param> /// <param name="pipelineName">The name for the pipeline.</param> /// <returns>The ARN of the pipeline.</returns> public static async Task<string> SetupPipeline(string roleArn, string functionArn, string pipelineName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up the pipeline."); var pipelineJson = await File.ReadAllTextAsync("GeoSpatialPipeline.json"); // Add the correct function ARN instead of the placeholder. pipelineJson = pipelineJson.Replace("*FUNCTION_ARN*", functionArn); var pipelineArn = await _sageMakerWrapper.SetupPipeline(pipelineJson, roleArn, pipelineName, "sdk example pipeline", pipelineName); Console.WriteLine($"\tPipeline set up with ARN {pipelineArn}."); Console.WriteLine(new string('-', 80)); return pipelineArn; } /// <summary> /// Start a pipeline run with job configurations. /// </summary> /// <param name="queueUrl">The URL for the queue used in the pipeline.</param> /// <param name="roleArn">The ARN of the role.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>The pipeline run ARN.</returns> public static async Task<string> ExecutePipeline( string queueUrl, string roleArn, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Starting pipeline execution."); var input = $"s3://{bucketName}/samplefiles/latlongtest.csv"; var output = $"s3://{bucketName}/outputfiles/"; var executionARN = await _sageMakerWrapper.ExecutePipeline(queueUrl, input, output, pipelineName, roleArn); Console.WriteLine($"\tRun started with ARN {executionARN}."); Console.WriteLine(new string('-', 80)); return executionARN; } /// <summary> /// Wait for a pipeline run to complete. /// </summary> /// <param name="executionArn">The pipeline run ARN.</param> /// <returns>Async task.</returns> public static async Task WaitForPipelineExecution(string executionArn) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Waiting for pipeline to finish."); PipelineExecutionStatus status; do { status = await _sageMakerWrapper.CheckPipelineExecutionStatus(executionArn); Thread.Sleep(30000); Console.WriteLine($"\tStatus is {status}."); } while (status == PipelineExecutionStatus.Executing); Console.WriteLine($"\tPipeline finished with status {status}."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <param name="askUser">True to ask the user for cleanup.</param> /// <param name="queueUrl">The URL of the queue to clean up.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>Async task.</returns> public static async Task<bool> CleanupResources( bool askUser, string queueUrl, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); if (!askUser || GetYesNoResponse($"\tDelete pipeline {pipelineName}? (y/n)")) { Console.WriteLine($"\tDeleting pipeline."); // Delete the pipeline. await _sageMakerWrapper.DeletePipelineByName(pipelineName); } if (!string.IsNullOrEmpty(queueUrl) && (!askUser || GetYesNoResponse($"\tDelete queue {queueUrl}? (y/n)"))) { Console.WriteLine($"\tDeleting queue."); // Delete the queue. await _sqsClient.DeleteQueueAsync(new DeleteQueueRequest(queueUrl)); } if (!askUser || GetYesNoResponse($"\tDelete Amazon S3 bucket {bucketName}? (y/n)")) { Console.WriteLine($"\tDeleting bucket."); // Delete all objects in the bucket. var deleteList = await _s3Client.ListObjectsV2Async(new ListObjectsV2Request() { BucketName = bucketName }); if (deleteList.KeyCount > 0) { await _s3Client.DeleteObjectsAsync(new DeleteObjectsRequest() { BucketName = bucketName, Objects = deleteList.S3Objects .Select(o => new KeyVersion { Key = o.Key }).ToList() }); } // Now delete the bucket. await _s3Client.DeleteBucketAsync(new DeleteBucketRequest() { BucketName = bucketName }); } if (!askUser || GetYesNoResponse($"\tDelete lambda {lambdaFunctionName}? (y/n)")) { Console.WriteLine($"\tDeleting lambda function."); await _lambdaClient.DeleteFunctionAsync(new DeleteFunctionRequest() { FunctionName = lambdaFunctionName }); } if (!askUser || GetYesNoResponse($"\tDelete role {lambdaRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in lambdaRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = lambdaRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = lambdaRoleName }); } if (!askUser || GetYesNoResponse($"\tDelete role {sageMakerRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in sageMakerRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = sageMakerRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = sageMakerRoleName }); } Console.WriteLine(new string('-', 80)); return true; } /// <summary> /// Helper method to get a role's ARN if it already exists. /// </summary> /// <param name="roleName">The name of the AWS Identity and Access Management (IAM) Role to look for.</param> /// <returns>The role ARN if it exists, otherwise an empty string.</returns> private static async Task<string> GetRoleArnIfExists(string roleName) { Console.WriteLine($"Checking for role named {roleName}."); try { var existingRole = await _iamClient.GetRoleAsync(new GetRoleRequest() { RoleName = lambdaRoleName }); return existingRole.Role.Arn; } catch (NoSuchEntityException) { return string.Empty; } } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } }