SageMaker を使用した の例 AWS SDK for .NET - AWS SDK コード例

Doc AWS SDK Examples リポジトリには、他にも SDK の例があります。 AWS GitHub

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SageMaker を使用した の例 AWS SDK for .NET

次のコード例は、 AWS SDK for .NET で を使用してアクションを実行し、一般的なシナリオを実装する方法を示しています SageMaker。

アクションはより大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。アクションは個々のサービス機能を呼び出す方法を示していますが、関連するシナリオやサービス間の例ではアクションのコンテキストが確認できます。

「シナリオ」は、同じサービス内で複数の関数を呼び出して、特定のタスクを実行する方法を示すコード例です。

各例には、 へのリンクが含まれています。このリンクには GitHub、コンテキスト内でコードをセットアップして実行する方法の手順が記載されています。

開始方法

次のコード例は、 の使用を開始する方法を示しています SageMaker。

AWS SDK for .NET
注記

には他にもがあります GitHub。AWS コード例リポジトリ で全く同じ例を見つけて、設定と実行の方法を確認してください。

using Amazon.SageMaker; using Amazon.SageMaker.Model; namespace SageMakerActions; public static class HelloSageMaker { static async Task Main(string[] args) { var sageMakerClient = new AmazonSageMakerClient(); Console.WriteLine($"Hello Amazon SageMaker! Let's list some of your notebook instances:"); Console.WriteLine(); // You can use await and any of the async methods to get a response. // Let's get the first five notebook instances. var response = await sageMakerClient.ListNotebookInstancesAsync( new ListNotebookInstancesRequest() { MaxResults = 5 }); if (!response.NotebookInstances.Any()) { Console.WriteLine($"No notebook instances found."); Console.WriteLine("See https://docs.aws.amazon.com/sagemaker/latest/dg/howitworks-create-ws.html to create one."); } foreach (var notebookInstance in response.NotebookInstances) { Console.WriteLine($"\tInstance: {notebookInstance.NotebookInstanceName}"); Console.WriteLine($"\tArn: {notebookInstance.NotebookInstanceArn}"); Console.WriteLine($"\tCreation Date: {notebookInstance.CreationTime.ToShortDateString()}"); Console.WriteLine(); } } }
  • API の詳細については、「 API リファレンスListNotebookInstances」の「」を参照してください。 AWS SDK for .NET

アクション

次の例は、CreatePipeline を使用する方法を説明しています。

AWS SDK for .NET
注記

には他にもがあります GitHub。AWS コード例リポジトリ で全く同じ例を見つけて、設定と実行の方法を確認してください。

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }
  • API の詳細については、「 API リファレンスCreatePipeline」の「」を参照してください。 AWS SDK for .NET

次の例は、DeletePipeline を使用する方法を説明しています。

AWS SDK for .NET
注記

には他にもがあります GitHub。AWS コード例リポジトリ で全く同じ例を見つけて、設定と実行の方法を確認してください。

/// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; }
  • API の詳細については、「 API リファレンスDeletePipeline」の「」を参照してください。 AWS SDK for .NET

次の例は、DescribePipelineExecution を使用する方法を説明しています。

AWS SDK for .NET
注記

には他にもがあります GitHub。AWS コード例リポジトリ で全く同じ例を見つけて、設定と実行の方法を確認してください。

/// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; }
  • API の詳細については、「 API リファレンスDescribePipelineExecution」の「」を参照してください。 AWS SDK for .NET

次の例は、StartPipelineExecution を使用する方法を説明しています。

AWS SDK for .NET
注記

には他にもがあります GitHub。AWS コード例リポジトリ で全く同じ例を見つけて、設定と実行の方法を確認してください。

/// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; }
  • API の詳細については、「 API リファレンスStartPipelineExecution」の「」を参照してください。 AWS SDK for .NET

次の例は、UpdatePipeline を使用する方法を説明しています。

AWS SDK for .NET
注記

には他にもがあります GitHub。AWS コード例リポジトリ で全く同じ例を見つけて、設定と実行の方法を確認してください。

/// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } }
  • API の詳細については、「 API リファレンスUpdatePipeline」の「」を参照してください。 AWS SDK for .NET

シナリオ

次のコードサンプルは、以下の操作方法を示しています。

  • パイプラインのリソースを設定します。

  • 地理空間ジョブを実行するパイプラインを設定します。

  • パイプラインの実行を開始します。

  • ジョブ実行のステータスをモニタリングします。

  • パイプラインの出力を表示します。

  • リソースをクリーンアップします。

詳細については、「Community.aws の AWS SDKs を使用して SageMaker パイプラインを作成および実行する」を参照してください。

AWS SDK for .NET
注記

には他にもがあります GitHub。AWS コード例リポジトリ で全く同じ例を見つけて、設定と実行の方法を確認してください。

SageMaker オペレーションをラップするクラスを作成します。

using System.Text.Json; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; namespace SageMakerActions; /// <summary> /// Wrapper class for Amazon SageMaker actions and logic. /// </summary> public class SageMakerWrapper { private readonly IAmazonSageMaker _amazonSageMaker; public SageMakerWrapper(IAmazonSageMaker amazonSageMaker) { _amazonSageMaker = amazonSageMaker; } /// <summary> /// Create a pipeline from a JSON definition, or update it if the pipeline already exists. /// </summary> /// <returns>The Amazon Resource Name (ARN) of the pipeline.</returns> public async Task<string> SetupPipeline(string pipelineJson, string roleArn, string name, string description, string displayName) { try { var updateResponse = await _amazonSageMaker.UpdatePipelineAsync( new UpdatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return updateResponse.PipelineArn; } catch (Amazon.SageMaker.Model.ResourceNotFoundException) { var createResponse = await _amazonSageMaker.CreatePipelineAsync( new CreatePipelineRequest() { PipelineDefinition = pipelineJson, PipelineDescription = description, PipelineDisplayName = displayName, PipelineName = name, RoleArn = roleArn }); return createResponse.PipelineArn; } } /// <summary> /// Run a pipeline with input and output file locations. /// </summary> /// <param name="queueUrl">The URL for the queue to use for pipeline callbacks.</param> /// <param name="inputLocationUrl">The input location in Amazon Simple Storage Service (Amazon S3).</param> /// <param name="outputLocationUrl">The output location in Amazon S3.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="executionRoleArn">The ARN of the role.</param> /// <returns>The ARN of the pipeline run.</returns> public async Task<string> ExecutePipeline( string queueUrl, string inputLocationUrl, string outputLocationUrl, string pipelineName, string executionRoleArn) { var inputConfig = new VectorEnrichmentJobInputConfig() { DataSourceConfig = new() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = inputLocationUrl } }, DocumentType = VectorEnrichmentJobDocumentType.CSV }; var exportConfig = new ExportVectorEnrichmentJobOutputConfig() { S3Data = new VectorEnrichmentJobS3Data() { S3Uri = outputLocationUrl } }; var jobConfig = new VectorEnrichmentJobConfig() { ReverseGeocodingConfig = new ReverseGeocodingConfig() { XAttributeName = "Longitude", YAttributeName = "Latitude" } }; #pragma warning disable SageMaker1002 // Property value does not match required pattern is allowed here to match the pipeline definition. var startExecutionResponse = await _amazonSageMaker.StartPipelineExecutionAsync( new StartPipelineExecutionRequest() { PipelineName = pipelineName, PipelineExecutionDisplayName = pipelineName + "-example-execution", PipelineParameters = new List<Parameter>() { new Parameter() { Name = "parameter_execution_role", Value = executionRoleArn }, new Parameter() { Name = "parameter_queue_url", Value = queueUrl }, new Parameter() { Name = "parameter_vej_input_config", Value = JsonSerializer.Serialize(inputConfig) }, new Parameter() { Name = "parameter_vej_export_config", Value = JsonSerializer.Serialize(exportConfig) }, new Parameter() { Name = "parameter_step_1_vej_config", Value = JsonSerializer.Serialize(jobConfig) } } }); #pragma warning restore SageMaker1002 return startExecutionResponse.PipelineExecutionArn; } /// <summary> /// Check the status of a run. /// </summary> /// <param name="pipelineExecutionArn">The ARN.</param> /// <returns>The status of the pipeline.</returns> public async Task<PipelineExecutionStatus> CheckPipelineExecutionStatus(string pipelineExecutionArn) { var describeResponse = await _amazonSageMaker.DescribePipelineExecutionAsync( new DescribePipelineExecutionRequest() { PipelineExecutionArn = pipelineExecutionArn }); return describeResponse.PipelineExecutionStatus; } /// <summary> /// Delete a SageMaker pipeline by name. /// </summary> /// <param name="pipelineName">The name of the pipeline to delete.</param> /// <returns>The ARN of the pipeline.</returns> public async Task<string> DeletePipelineByName(string pipelineName) { var deleteResponse = await _amazonSageMaker.DeletePipelineAsync( new DeletePipelineRequest() { PipelineName = pipelineName }); return deleteResponse.PipelineArn; } }

SageMaker パイプラインからのコールバックを処理する関数を作成します。

using System.Text.Json; using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; using Amazon.SageMaker; using Amazon.SageMaker.Model; using Amazon.SageMakerGeospatial; using Amazon.SageMakerGeospatial.Model; // Assembly attribute to enable the AWS Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace SageMakerLambda; /// <summary> /// The AWS Lambda function handler for the Amazon SageMaker pipeline. /// </summary> public class SageMakerLambdaFunction { /// <summary> /// Default constructor. This constructor is used by AWS Lambda to construct the instance. When invoked in a Lambda environment /// the AWS credentials will come from the AWS Identity and Access Management (IAM) role associated with the function. The AWS Region will be set to the /// Region that the Lambda function is running in. /// </summary> public SageMakerLambdaFunction() { } /// <summary> /// The AWS Lambda function handler that processes events from the SageMaker pipeline and starts a job or export. /// </summary> /// <param name="request">The custom SageMaker pipeline request object.</param> /// <param name="context">The Lambda context.</param> /// <returns>The dictionary of output parameters.</returns> public async Task<Dictionary<string, string>> FunctionHandler(PipelineRequest request, ILambdaContext context) { var geoSpatialClient = new AmazonSageMakerGeospatialClient(); var sageMakerClient = new AmazonSageMakerClient(); var responseDictionary = new Dictionary<string, string>(); context.Logger.LogInformation("Function handler started with request: " + JsonSerializer.Serialize(request)); if (request.Records != null && request.Records.Any()) { context.Logger.LogInformation("Records found, this is a queue event. Processing the queue records."); foreach (var message in request.Records) { await ProcessMessageAsync(message, context, geoSpatialClient, sageMakerClient); } } else if (!string.IsNullOrEmpty(request.vej_export_config)) { context.Logger.LogInformation("Export configuration found, this is an export. Start the Vector Enrichment Job (VEJ) export."); var outputConfig = JsonSerializer.Deserialize<ExportVectorEnrichmentJobOutputConfig>( request.vej_export_config); var exportResponse = await geoSpatialClient.ExportVectorEnrichmentJobAsync( new ExportVectorEnrichmentJobRequest() { Arn = request.vej_arn, ExecutionRoleArn = request.Role, OutputConfig = outputConfig }); context.Logger.LogInformation($"Export response: {JsonSerializer.Serialize(exportResponse)}"); responseDictionary = new Dictionary<string, string> { { "export_eoj_status", exportResponse.ExportStatus.ToString() }, { "vej_arn", exportResponse.Arn } }; } else if (!string.IsNullOrEmpty(request.vej_name)) { context.Logger.LogInformation("Vector Enrichment Job name found, starting the job."); var inputConfig = JsonSerializer.Deserialize<VectorEnrichmentJobInputConfig>( request.vej_input_config); var jobConfig = JsonSerializer.Deserialize<VectorEnrichmentJobConfig>( request.vej_config); var jobResponse = await geoSpatialClient.StartVectorEnrichmentJobAsync( new StartVectorEnrichmentJobRequest() { ExecutionRoleArn = request.Role, InputConfig = inputConfig, Name = request.vej_name, JobConfig = jobConfig }); context.Logger.LogInformation("Job response: " + JsonSerializer.Serialize(jobResponse)); responseDictionary = new Dictionary<string, string> { { "vej_arn", jobResponse.Arn }, { "statusCode", jobResponse.HttpStatusCode.ToString() } }; } return responseDictionary; } /// <summary> /// Process a queue message and check the status of a SageMaker job. /// </summary> /// <param name="message">The queue message.</param> /// <param name="context">The Lambda context.</param> /// <param name="geoClient">The SageMaker GeoSpatial client.</param> /// <param name="sageMakerClient">The SageMaker client.</param> /// <returns>Async task.</returns> private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context, AmazonSageMakerGeospatialClient geoClient, AmazonSageMakerClient sageMakerClient) { context.Logger.LogInformation($"Processed message {message.Body}"); // Get information about the SageMaker job. var payload = JsonSerializer.Deserialize<QueuePayload>(message.Body); context.Logger.LogInformation($"Payload token {payload!.token}"); var token = payload.token; if (payload.arguments.ContainsKey("vej_arn")) { // Use the job ARN and the token to get the job status. var job_arn = payload.arguments["vej_arn"]; context.Logger.LogInformation($"Token: {token}, arn {job_arn}"); var jobInfo = geoClient.GetVectorEnrichmentJobAsync( new GetVectorEnrichmentJobRequest() { Arn = job_arn }); context.Logger.LogInformation("Job info: " + JsonSerializer.Serialize(jobInfo)); if (jobInfo.Result.Status == VectorEnrichmentJobStatus.COMPLETED) { context.Logger.LogInformation($"Status completed, resuming pipeline..."); await sageMakerClient.SendPipelineExecutionStepSuccessAsync( new SendPipelineExecutionStepSuccessRequest() { CallbackToken = token, OutputParameters = new List<OutputParameter>() { new OutputParameter() { Name = "export_status", Value = jobInfo.Result.Status } } }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.FAILED) { context.Logger.LogInformation($"Status failed, stopping pipeline..."); await sageMakerClient.SendPipelineExecutionStepFailureAsync( new SendPipelineExecutionStepFailureRequest() { CallbackToken = token, FailureReason = jobInfo.Result.ErrorDetails.ErrorMessage }); } else if (jobInfo.Result.Status == VectorEnrichmentJobStatus.IN_PROGRESS) { // Put this message back in the queue to reprocess later. context.Logger.LogInformation( $"Status still in progress, check back later."); throw new("Job still running."); } } } }

コマンドプロンプトからインタラクティブのシナリオを実行します。

public static class PipelineWorkflow { public static IAmazonIdentityManagementService _iamClient = null!; public static SageMakerWrapper _sageMakerWrapper = null!; public static IAmazonSQS _sqsClient = null!; public static IAmazonS3 _s3Client = null!; public static IAmazonLambda _lambdaClient = null!; public static IConfiguration _configuration = null!; public static string lambdaFunctionName = "SageMakerExampleFunction"; public static string sageMakerRoleName = "SageMakerExampleRole"; public static string lambdaRoleName = "SageMakerExampleLambdaRole"; private static string[] lambdaRolePolicies = null!; private static string[] sageMakerRolePolicies = null!; static async Task Main(string[] args) { var options = new AWSOptions() { Region = RegionEndpoint.USWest2 }; // Set up dependency injection for the AWS service. using var host = Host.CreateDefaultBuilder(args) .ConfigureLogging(logging => logging.AddFilter("System", LogLevel.Debug) .AddFilter<DebugLoggerProvider>("Microsoft", LogLevel.Information) .AddFilter<ConsoleLoggerProvider>("Microsoft", LogLevel.Trace)) .ConfigureServices((_, services) => services.AddAWSService<IAmazonIdentityManagementService>(options) .AddAWSService<IAmazonEC2>(options) .AddAWSService<IAmazonSageMaker>(options) .AddAWSService<IAmazonSageMakerGeospatial>(options) .AddAWSService<IAmazonSQS>(options) .AddAWSService<IAmazonS3>(options) .AddAWSService<IAmazonLambda>(options) .AddTransient<SageMakerWrapper>() ) .Build(); _configuration = new ConfigurationBuilder() .SetBasePath(Directory.GetCurrentDirectory()) .AddJsonFile("settings.json") // Load settings from .json file. .AddJsonFile("settings.local.json", true) // Optionally, load local settings. .Build(); ServicesSetup(host); string queueUrl = ""; string queueName = _configuration["queueName"]; string bucketName = _configuration["bucketName"]; var pipelineName = _configuration["pipelineName"]; try { Console.WriteLine(new string('-', 80)); Console.WriteLine( "Welcome to the Amazon SageMaker pipeline example scenario."); Console.WriteLine( "\nThis example workflow will guide you through setting up and running an" + "\nAmazon SageMaker pipeline. The pipeline uses an AWS Lambda function and an" + "\nAmazon SQS Queue. It runs a vector enrichment reverse geocode job to" + "\nreverse geocode addresses in an input file and store the results in an export file."); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine( "First, we will set up the roles, functions, and queue needed by the SageMaker pipeline."); Console.WriteLine(new string('-', 80)); var lambdaRoleArn = await CreateLambdaRole(); var sageMakerRoleArn = await CreateSageMakerRole(); var functionArn = await SetupLambda(lambdaRoleArn, true); queueUrl = await SetupQueue(queueName); await SetupBucket(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("Now we can create and run our pipeline."); Console.WriteLine(new string('-', 80)); await SetupPipeline(sageMakerRoleArn, functionArn, pipelineName); var executionArn = await ExecutePipeline(queueUrl, sageMakerRoleArn, pipelineName, bucketName); await WaitForPipelineExecution(executionArn); await GetOutputResults(bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("The pipeline has completed. To view the pipeline and runs " + "in SageMaker Studio, follow these instructions:" + "\nhttps://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-studio.html"); Console.WriteLine(new string('-', 80)); Console.WriteLine(new string('-', 80)); Console.WriteLine("Finally, let's clean up our resources."); Console.WriteLine(new string('-', 80)); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); Console.WriteLine("SageMaker pipeline scenario is complete."); Console.WriteLine(new string('-', 80)); } catch (Exception ex) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"There was a problem running the scenario: {ex.Message}"); await CleanupResources(true, queueUrl, pipelineName, bucketName); Console.WriteLine(new string('-', 80)); } } /// <summary> /// Populate the services for use within the console application. /// </summary> /// <param name="host">The services host.</param> private static void ServicesSetup(IHost host) { _sageMakerWrapper = host.Services.GetRequiredService<SageMakerWrapper>(); _iamClient = host.Services.GetRequiredService<IAmazonIdentityManagementService>(); _sqsClient = host.Services.GetRequiredService<IAmazonSQS>(); _s3Client = host.Services.GetRequiredService<IAmazonS3>(); _lambdaClient = host.Services.GetRequiredService<IAmazonLambda>(); } /// <summary> /// Set up AWS Lambda, either by updating an existing function or creating a new function. /// </summary> /// <param name="roleArn">The role Amazon Resource Name (ARN) to use for the Lambda function.</param> /// <param name="askUser">True to ask the user before updating.</param> /// <returns>The ARN of the function.</returns> public static async Task<string> SetupLambda(string roleArn, bool askUser) { Console.WriteLine(new string('-', 80)); Console.WriteLine("Setting up the Lambda function for the pipeline."); var handlerName = "SageMakerLambda::SageMakerLambda.SageMakerLambdaFunction::FunctionHandler"; var functionArn = ""; try { var functionInfo = await _lambdaClient.GetFunctionAsync(new GetFunctionRequest() { FunctionName = lambdaFunctionName }); var updateFunction = true; if (askUser) { updateFunction = GetYesNoResponse( $"\tThe Lambda function {lambdaFunctionName} already exists, do you want to update it?"); } if (updateFunction) { // Update the Lambda function. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); await _lambdaClient.UpdateFunctionCodeAsync( new UpdateFunctionCodeRequest() { FunctionName = lambdaFunctionName, ZipFile = zipMemoryStream, }); } functionArn = functionInfo.Configuration.FunctionArn; } catch (ResourceNotFoundException) { Console.WriteLine($"\tThe Lambda function {lambdaFunctionName} was not found, creating the new function."); // Create the function if it does not already exist. using var zipMemoryStream = new MemoryStream(await File.ReadAllBytesAsync("SageMakerLambda.zip")); var createResult = await _lambdaClient.CreateFunctionAsync( new CreateFunctionRequest() { FunctionName = lambdaFunctionName, Runtime = Runtime.Dotnet6, Description = "SageMaker example function.", Code = new FunctionCode() { ZipFile = zipMemoryStream }, Handler = handlerName, Role = roleArn, Timeout = 30 }); functionArn = createResult.FunctionArn; } Console.WriteLine($"\tLambda ready with ARN {functionArn}."); Console.WriteLine(new string('-', 80)); return functionArn; } /// <summary> /// Create a role to be used by AWS Lambda. Does not create the role if it already exists. /// </summary> /// <returns>The role ARN.</returns> public static async Task<string> CreateLambdaRole() { Console.WriteLine(new string('-', 80)); lambdaRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSQSFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerGeospatialFullAccess", "arn:aws:iam::aws:policy/service-role/" + "AmazonSageMakerServiceCatalogProductsLambdaServiceRolePolicy", "arn:aws:iam::aws:policy/service-role/" + "AWSLambdaSQSQueueExecutionRole" }; var roleArn = await GetRoleArnIfExists(lambdaRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to for AWS Lambda to use."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = lambdaRoleName }); foreach (var policy in lambdaRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = lambdaRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Create a role to be used by SageMaker. /// </summary> /// <returns>The role Amazon Resource Name (ARN).</returns> public static async Task<string> CreateSageMakerRole() { Console.WriteLine(new string('-', 80)); sageMakerRolePolicies = new string[]{ "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", "arn:aws:iam::aws:policy/AmazonSageMakerGeospatialFullAccess", }; var roleArn = await GetRoleArnIfExists(sageMakerRoleName); if (!string.IsNullOrEmpty(roleArn)) { return roleArn; } Console.WriteLine("\tCreating a role to use with SageMaker."); var assumeRolePolicy = "{" + "\"Version\": \"2012-10-17\"," + "\"Statement\": [{" + "\"Effect\": \"Allow\"," + "\"Principal\": {" + $"\"Service\": [" + "\"sagemaker.amazonaws.com\"," + "\"sagemaker-geospatial.amazonaws.com\"," + "\"lambda.amazonaws.com\"," + "\"s3.amazonaws.com\"" + "]" + "}," + "\"Action\": \"sts:AssumeRole\"" + "}]" + "}"; var roleResult = await _iamClient!.CreateRoleAsync( new CreateRoleRequest() { AssumeRolePolicyDocument = assumeRolePolicy, Path = "/", RoleName = sageMakerRoleName }); foreach (var policy in sageMakerRolePolicies) { await _iamClient.AttachRolePolicyAsync( new AttachRolePolicyRequest() { PolicyArn = policy, RoleName = sageMakerRoleName }); } // Allow time for the role to be ready. Thread.Sleep(10000); Console.WriteLine($"\tRole ready with ARN {roleResult.Role.Arn}."); Console.WriteLine(new string('-', 80)); return roleResult.Role.Arn; } /// <summary> /// Set up the SQS queue to use with the pipeline. /// </summary> /// <param name="queueName">The name for the queue.</param> /// <returns>The URL for the queue.</returns> public static async Task<string> SetupQueue(string queueName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up queue {queueName}."); try { var queueInfo = await _sqsClient.GetQueueUrlAsync(new GetQueueUrlRequest() { QueueName = queueName }); return queueInfo.QueueUrl; } catch (QueueDoesNotExistException) { var attrs = new Dictionary<string, string> { { QueueAttributeName.DelaySeconds, "5" }, { QueueAttributeName.ReceiveMessageWaitTimeSeconds, "5" }, { QueueAttributeName.VisibilityTimeout, "300" }, }; var request = new CreateQueueRequest { Attributes = attrs, QueueName = queueName, }; var response = await _sqsClient.CreateQueueAsync(request); Thread.Sleep(10000); await ConnectLambda(response.QueueUrl); Console.WriteLine($"\tQueue ready with Url {response.QueueUrl}."); Console.WriteLine(new string('-', 80)); return response.QueueUrl; } } /// <summary> /// Connect the queue to the Lambda function as an event source. /// </summary> /// <param name="queueUrl">The URL for the queue.</param> /// <returns>Async task.</returns> public static async Task ConnectLambda(string queueUrl) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Connecting the Lambda function and queue for the pipeline."); var queueAttributes = await _sqsClient.GetQueueAttributesAsync( new GetQueueAttributesRequest() { QueueUrl = queueUrl, AttributeNames = new List<string>() { "All" } }); var queueArn = queueAttributes.QueueARN; var eventSource = await _lambdaClient.ListEventSourceMappingsAsync( new ListEventSourceMappingsRequest() { FunctionName = lambdaFunctionName }); if (!eventSource.EventSourceMappings.Any()) { // Only add the event source mapping if it does not already exist. await _lambdaClient.CreateEventSourceMappingAsync( new CreateEventSourceMappingRequest() { EventSourceArn = queueArn, FunctionName = lambdaFunctionName, Enabled = true }); } Console.WriteLine(new string('-', 80)); } /// <summary> /// Set up the bucket to use for pipeline input and output. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task SetupBucket(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up bucket {bucketName}."); var bucketExists = await Amazon.S3.Util.AmazonS3Util.DoesS3BucketExistV2Async(_s3Client, bucketName); if (!bucketExists) { await _s3Client.PutBucketAsync(new PutBucketRequest() { BucketName = bucketName, BucketRegion = S3Region.USWest2 }); Thread.Sleep(5000); await _s3Client.PutObjectAsync(new PutObjectRequest() { BucketName = bucketName, Key = "samplefiles/latlongtest.csv", FilePath = "latlongtest.csv" }); } Console.WriteLine($"\tBucket {bucketName} ready."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Display some results from the output directory. /// </summary> /// <param name="bucketName">The name for the bucket.</param> /// <returns>Async task.</returns> public static async Task<string> GetOutputResults(string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Getting output results {bucketName}."); string outputKey = ""; Thread.Sleep(15000); var outputFiles = await _s3Client.ListObjectsAsync( new ListObjectsRequest() { BucketName = bucketName, Prefix = "outputfiles/" }); if (outputFiles.S3Objects.Any()) { var sampleOutput = outputFiles.S3Objects.OrderBy(s => s.LastModified).Last(); Console.WriteLine($"\tOutput file: {sampleOutput.Key}"); var outputSampleResponse = await _s3Client.GetObjectAsync( new GetObjectRequest() { BucketName = bucketName, Key = sampleOutput.Key }); outputKey = sampleOutput.Key; StreamReader reader = new StreamReader(outputSampleResponse.ResponseStream); await reader.ReadLineAsync(); Console.WriteLine("\tOutput file contents: \n"); for (int i = 0; i < 10; i++) { if (!reader.EndOfStream) { Console.WriteLine("\t" + await reader.ReadLineAsync()); } } } Console.WriteLine(new string('-', 80)); return outputKey; } /// <summary> /// Create a pipeline from the example pipeline JSON /// that includes the Lambda, callback, processing, and export jobs. /// </summary> /// <param name="roleArn">The ARN of the role for the pipeline.</param> /// <param name="functionArn">The ARN of the Lambda function for the pipeline.</param> /// <param name="pipelineName">The name for the pipeline.</param> /// <returns>The ARN of the pipeline.</returns> public static async Task<string> SetupPipeline(string roleArn, string functionArn, string pipelineName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Setting up the pipeline."); var pipelineJson = await File.ReadAllTextAsync("GeoSpatialPipeline.json"); // Add the correct function ARN instead of the placeholder. pipelineJson = pipelineJson.Replace("*FUNCTION_ARN*", functionArn); var pipelineArn = await _sageMakerWrapper.SetupPipeline(pipelineJson, roleArn, pipelineName, "sdk example pipeline", pipelineName); Console.WriteLine($"\tPipeline set up with ARN {pipelineArn}."); Console.WriteLine(new string('-', 80)); return pipelineArn; } /// <summary> /// Start a pipeline run with job configurations. /// </summary> /// <param name="queueUrl">The URL for the queue used in the pipeline.</param> /// <param name="roleArn">The ARN of the role.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>The pipeline run ARN.</returns> public static async Task<string> ExecutePipeline( string queueUrl, string roleArn, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Starting pipeline execution."); var input = $"s3://{bucketName}/samplefiles/latlongtest.csv"; var output = $"s3://{bucketName}/outputfiles/"; var executionARN = await _sageMakerWrapper.ExecutePipeline(queueUrl, input, output, pipelineName, roleArn); Console.WriteLine($"\tRun started with ARN {executionARN}."); Console.WriteLine(new string('-', 80)); return executionARN; } /// <summary> /// Wait for a pipeline run to complete. /// </summary> /// <param name="executionArn">The pipeline run ARN.</param> /// <returns>Async task.</returns> public static async Task WaitForPipelineExecution(string executionArn) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Waiting for pipeline to finish."); PipelineExecutionStatus status; do { status = await _sageMakerWrapper.CheckPipelineExecutionStatus(executionArn); Thread.Sleep(30000); Console.WriteLine($"\tStatus is {status}."); } while (status == PipelineExecutionStatus.Executing); Console.WriteLine($"\tPipeline finished with status {status}."); Console.WriteLine(new string('-', 80)); } /// <summary> /// Clean up the resources from the scenario. /// </summary> /// <param name="askUser">True to ask the user for cleanup.</param> /// <param name="queueUrl">The URL of the queue to clean up.</param> /// <param name="pipelineName">The name of the pipeline.</param> /// <param name="bucketName">The name of the bucket.</param> /// <returns>Async task.</returns> public static async Task<bool> CleanupResources( bool askUser, string queueUrl, string pipelineName, string bucketName) { Console.WriteLine(new string('-', 80)); Console.WriteLine($"Clean up resources."); if (!askUser || GetYesNoResponse($"\tDelete pipeline {pipelineName}? (y/n)")) { Console.WriteLine($"\tDeleting pipeline."); // Delete the pipeline. await _sageMakerWrapper.DeletePipelineByName(pipelineName); } if (!string.IsNullOrEmpty(queueUrl) && (!askUser || GetYesNoResponse($"\tDelete queue {queueUrl}? (y/n)"))) { Console.WriteLine($"\tDeleting queue."); // Delete the queue. await _sqsClient.DeleteQueueAsync(new DeleteQueueRequest(queueUrl)); } if (!askUser || GetYesNoResponse($"\tDelete Amazon S3 bucket {bucketName}? (y/n)")) { Console.WriteLine($"\tDeleting bucket."); // Delete all objects in the bucket. var deleteList = await _s3Client.ListObjectsV2Async(new ListObjectsV2Request() { BucketName = bucketName }); if (deleteList.KeyCount > 0) { await _s3Client.DeleteObjectsAsync(new DeleteObjectsRequest() { BucketName = bucketName, Objects = deleteList.S3Objects .Select(o => new KeyVersion { Key = o.Key }).ToList() }); } // Now delete the bucket. await _s3Client.DeleteBucketAsync(new DeleteBucketRequest() { BucketName = bucketName }); } if (!askUser || GetYesNoResponse($"\tDelete lambda {lambdaFunctionName}? (y/n)")) { Console.WriteLine($"\tDeleting lambda function."); await _lambdaClient.DeleteFunctionAsync(new DeleteFunctionRequest() { FunctionName = lambdaFunctionName }); } if (!askUser || GetYesNoResponse($"\tDelete role {lambdaRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in lambdaRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = lambdaRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = lambdaRoleName }); } if (!askUser || GetYesNoResponse($"\tDelete role {sageMakerRoleName}? (y/n)")) { Console.WriteLine($"\tDetaching policies and deleting role."); foreach (var policy in sageMakerRolePolicies) { await _iamClient!.DetachRolePolicyAsync(new DetachRolePolicyRequest() { RoleName = sageMakerRoleName, PolicyArn = policy }); } await _iamClient!.DeleteRoleAsync(new DeleteRoleRequest() { RoleName = sageMakerRoleName }); } Console.WriteLine(new string('-', 80)); return true; } /// <summary> /// Helper method to get a role's ARN if it already exists. /// </summary> /// <param name="roleName">The name of the AWS Identity and Access Management (IAM) Role to look for.</param> /// <returns>The role ARN if it exists, otherwise an empty string.</returns> private static async Task<string> GetRoleArnIfExists(string roleName) { Console.WriteLine($"Checking for role named {roleName}."); try { var existingRole = await _iamClient.GetRoleAsync(new GetRoleRequest() { RoleName = lambdaRoleName }); return existingRole.Role.Arn; } catch (NoSuchEntityException) { return string.Empty; } } /// <summary> /// Helper method to get a yes or no response from the user. /// </summary> /// <param name="question">The question string to print on the console.</param> /// <returns>True if the user responds with a yes.</returns> private static bool GetYesNoResponse(string question) { Console.WriteLine(question); var ynResponse = Console.ReadLine(); var response = ynResponse != null && ynResponse.Equals("y", StringComparison.InvariantCultureIgnoreCase); return response; } }