Amazon S3 控制示例使SDK用 Java 2.x - AWS SDK for Java 2.x

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon S3 控制示例使SDK用 Java 2.x

下列程式碼範例說明如何透過 AWS SDK for Java 2.x 搭配 Amazon S3 控制項使用來執行動作和實作常見案例。

Actions 是大型程式的程式碼摘錄,必須在內容中執行。雖然動作會告訴您如何呼叫個別服務函數,但您可以在其相關情境和跨服務範例中查看內容中的動作。

Scenarios (案例) 是向您展示如何呼叫相同服務中的多個函數來完成特定任務的程式碼範例。

每個範例都包含一個連結 GitHub,您可以在其中找到如何在內容中設定和執行程式碼的指示。

開始使用

下列程式碼範例示範如何開始使用「Amazon S3 控制」

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider; import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.retry.RetryPolicy; import software.amazon.awssdk.http.async.SdkAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3control.S3ControlAsyncClient; import software.amazon.awssdk.services.s3control.model.JobListDescriptor; import software.amazon.awssdk.services.s3control.model.JobStatus; import software.amazon.awssdk.services.s3control.model.ListJobsRequest; import software.amazon.awssdk.services.s3control.paginators.ListJobsPublisher; import java.time.Duration; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; public class HelloS3Batch { private static S3ControlAsyncClient asyncClient; public static void main(String []args ) { S3BatchActions actions = new S3BatchActions(); String accountId= actions.getAccountId(); try { listBatchJobsAsync(accountId) .exceptionally(ex -> { System.err.println("List batch jobs failed: " + ex.getMessage()); return null; }) .join(); // Wait for completion } catch (CompletionException ex) { System.err.println("Failed to list batch jobs: " + ex.getMessage()); } } /** * Retrieves the asynchronous S3 Control client instance. * <p> * This method creates and returns a singleton instance of the {@link S3ControlAsyncClient}. If the instance * has not been created yet, it will be initialized with the following configuration: * <ul> * <li>Maximum concurrency: 100</li> * <li>Connection timeout: 60 seconds</li> * <li>Read timeout: 60 seconds</li> * <li>Write timeout: 60 seconds</li> * <li>API call timeout: 2 minutes</li> * <li>API call attempt timeout: 90 seconds</li> * <li>Retry policy: 3 retries</li> * <li>Region: US_EAST_1</li> * <li>Credentials provider: {@link EnvironmentVariableCredentialsProvider}</li> * </ul> * * @return the asynchronous S3 Control client instance */ private static S3ControlAsyncClient getAsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); asyncClient = S3ControlAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return asyncClient; } /** * Asynchronously lists batch jobs that have completed for the specified account. * * @param accountId the ID of the account to list jobs for * @return a CompletableFuture that completes when the job listing operation is finished */ public static CompletableFuture<Void> listBatchJobsAsync(String accountId) { ListJobsRequest jobsRequest = ListJobsRequest.builder() .jobStatuses(JobStatus.COMPLETE) .accountId(accountId) .maxResults(10) .build(); ListJobsPublisher publisher = getAsyncClient().listJobsPaginator(jobsRequest); return publisher.subscribe(response -> { List<JobListDescriptor> jobs = response.jobs(); for (JobListDescriptor job : jobs) { System.out.println("The job id is " + job.jobId()); System.out.println("The job priority is " + job.priority()); } }).thenAccept(response -> { System.out.println("Listing batch jobs completed"); }).exceptionally(ex -> { System.err.println("Failed to list batch jobs: " + ex.getMessage()); throw new RuntimeException(ex); }); }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考ListJobsPaginator中的。

動作

下列程式碼範例會示範如何使用CreateJob

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

/** * Creates an asynchronous S3 job using the AWS Java SDK. * * @param accountId the AWS account ID associated with the job * @param iamRoleArn the ARN of the IAM role to be used for the job * @param manifestLocation the location of the job manifest file in S3 * @param reportBucketName the name of the S3 bucket to store the job report * @param uuid a unique identifier for the job * @return a CompletableFuture that represents the asynchronous creation of the S3 job. * The CompletableFuture will return the job ID if the job is created successfully, * or throw an exception if there is an error. */ public CompletableFuture<String> createS3JobAsync(String accountId, String iamRoleArn, String manifestLocation, String reportBucketName, String uuid) { String[] bucketName = new String[]{""}; String[] parts = reportBucketName.split(":::"); if (parts.length > 1) { bucketName[0] = parts[1]; } else { System.out.println("The input string does not contain the expected format."); } return CompletableFuture.supplyAsync(() -> getETag(bucketName[0], "job-manifest.csv")) .thenCompose(eTag -> { ArrayList<S3Tag> tagSet = new ArrayList<>(); S3Tag s3Tag = S3Tag.builder() .key("keyOne") .value("ValueOne") .build(); S3Tag s3Tag2 = S3Tag.builder() .key("keyTwo") .value("ValueTwo") .build(); tagSet.add(s3Tag); tagSet.add(s3Tag2); S3SetObjectTaggingOperation objectTaggingOperation = S3SetObjectTaggingOperation.builder() .tagSet(tagSet) .build(); JobOperation jobOperation = JobOperation.builder() .s3PutObjectTagging(objectTaggingOperation) .build(); JobManifestLocation jobManifestLocation = JobManifestLocation.builder() .objectArn(manifestLocation) .eTag(eTag) .build(); JobManifestSpec manifestSpec = JobManifestSpec.builder() .fieldsWithStrings("Bucket", "Key") .format("S3BatchOperations_CSV_20180820") .build(); JobManifest jobManifest = JobManifest.builder() .spec(manifestSpec) .location(jobManifestLocation) .build(); JobReport jobReport = JobReport.builder() .bucket(reportBucketName) .prefix("reports") .format("Report_CSV_20180820") .enabled(true) .reportScope("AllTasks") .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .accountId(accountId) .description("Job created using the AWS Java SDK") .manifest(jobManifest) .operation(jobOperation) .report(jobReport) .priority(42) .roleArn(iamRoleArn) .clientRequestToken(uuid) .confirmationRequired(false) .build(); // Create the job asynchronously. return getAsyncClient().createJob(jobRequest) .thenApply(CreateJobResponse::jobId); }) .handle((jobId, ex) -> { if (ex != null) { Throwable cause = (ex instanceof CompletionException) ? ex.getCause() : ex; if (cause instanceof S3ControlException) { throw new CompletionException(cause); } else { throw new RuntimeException(cause); } } return jobId; }); }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考CreateJob中的。

下列程式碼範例會示範如何使用DeleteJobTagging

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

/** * Asynchronously deletes the tags associated with a specific batch job. * * @param jobId The ID of the batch job whose tags should be deleted. * @param accountId The ID of the account associated with the batch job. * @return A CompletableFuture that completes when the job tags have been successfully deleted, or an exception is thrown if the deletion fails. */ public CompletableFuture<Void> deleteBatchJobTagsAsync(String jobId, String accountId) { DeleteJobTaggingRequest jobTaggingRequest = DeleteJobTaggingRequest.builder() .accountId(accountId) .jobId(jobId) .build(); return asyncClient.deleteJobTagging(jobTaggingRequest) .thenAccept(response -> { System.out.println("You have successfully deleted " + jobId + " tagging."); }) .exceptionally(ex -> { System.err.println("Failed to delete job tags: " + ex.getMessage()); throw new RuntimeException(ex); }); }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考DeleteJobTagging中的。

下列程式碼範例會示範如何使用DescribeJob

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

/** * Asynchronously describes the specified job. * * @param jobId the ID of the job to describe * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job description is available * @throws RuntimeException if an error occurs while describing the job */ public CompletableFuture<Void> describeJobAsync(String jobId, String accountId) { DescribeJobRequest jobRequest = DescribeJobRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return getAsyncClient().describeJob(jobRequest) .thenAccept(response -> { System.out.println("Job ID: " + response.job().jobId()); System.out.println("Description: " + response.job().description()); System.out.println("Status: " + response.job().statusAsString()); System.out.println("Role ARN: " + response.job().roleArn()); System.out.println("Priority: " + response.job().priority()); System.out.println("Progress Summary: " + response.job().progressSummary()); // Print out details about the job manifest. JobManifest manifest = response.job().manifest(); System.out.println("Manifest Location: " + manifest.location().objectArn()); System.out.println("Manifest ETag: " + manifest.location().eTag()); // Print out details about the job operation. JobOperation operation = response.job().operation(); if (operation.s3PutObjectTagging() != null) { System.out.println("Operation: S3 Put Object Tagging"); System.out.println("Tag Set: " + operation.s3PutObjectTagging().tagSet()); } // Print out details about the job report. JobReport report = response.job().report(); System.out.println("Report Bucket: " + report.bucket()); System.out.println("Report Prefix: " + report.prefix()); System.out.println("Report Format: " + report.format()); System.out.println("Report Enabled: " + report.enabled()); System.out.println("Report Scope: " + report.reportScopeAsString()); }) .exceptionally(ex -> { System.err.println("Failed to describe job: " + ex.getMessage()); throw new RuntimeException(ex); }); }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考DescribeJob中的。

下列程式碼範例會示範如何使用GetJobTagging

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

/** * Asynchronously retrieves the tags associated with a specific job in an AWS account. * * @param jobId the ID of the job for which to retrieve the tags * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job tags have been retrieved, or with an exception if the operation fails * @throws RuntimeException if an error occurs while retrieving the job tags */ public CompletableFuture<Void> getJobTagsAsync(String jobId, String accountId) { GetJobTaggingRequest request = GetJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return asyncClient.getJobTagging(request) .thenAccept(response -> { List<S3Tag> tags = response.tags(); if (tags.isEmpty()) { System.out.println("No tags found for job ID: " + jobId); } else { for (S3Tag tag : tags) { System.out.println("Tag key is: " + tag.key()); System.out.println("Tag value is: " + tag.value()); } } }) .exceptionally(ex -> { System.err.println("Failed to get job tags: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考GetJobTagging中的。

下列程式碼範例會示範如何使用PutJobTagging

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

/** * Asynchronously adds tags to a job in the system. * * @param jobId the ID of the job to add tags to * @param accountId the account ID associated with the job * @return a CompletableFuture that completes when the tagging operation is finished */ public CompletableFuture<Void> putJobTaggingAsync(String jobId, String accountId) { S3Tag departmentTag = S3Tag.builder() .key("department") .value("Marketing") .build(); S3Tag fiscalYearTag = S3Tag.builder() .key("FiscalYear") .value("2020") .build(); PutJobTaggingRequest putJobTaggingRequest = PutJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .tags(departmentTag, fiscalYearTag) .build(); return asyncClient.putJobTagging(putJobTaggingRequest) .thenRun(() -> { System.out.println("Additional Tags were added to job " + jobId); }) .exceptionally(ex -> { System.err.println("Failed to add tags to job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考PutJobTagging中的。

下列程式碼範例會示範如何使用UpdateJobPriority

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

/** * Updates the priority of a job asynchronously. * * @param jobId the ID of the job to update * @param accountId the ID of the account associated with the job * @return a {@link CompletableFuture} that represents the asynchronous operation, which completes when the job priority has been updated or an error has occurred */ public CompletableFuture<Void> updateJobPriorityAsync(String jobId, String accountId) { UpdateJobPriorityRequest priorityRequest = UpdateJobPriorityRequest.builder() .accountId(accountId) .jobId(jobId) .priority(60) .build(); CompletableFuture<Void> future = new CompletableFuture<>(); getAsyncClient().updateJobPriority(priorityRequest) .thenAccept(response -> { System.out.println("The job priority was updated"); future.complete(null); // Complete the CompletableFuture on successful execution }) .exceptionally(ex -> { System.err.println("Failed to update job priority: " + ex.getMessage()); future.completeExceptionally(ex); // Complete the CompletableFuture exceptionally on error return null; // Return null to handle the exception }); return future; }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考UpdateJobPriority中的。

下列程式碼範例會示範如何使用UpdateJobStatus

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

/** * Cancels a job asynchronously. * * @param jobId The ID of the job to be canceled. * @param accountId The ID of the account associated with the job. * @return A {@link CompletableFuture} that completes when the job status has been updated to "CANCELLED". * If an error occurs during the update, the returned future will complete exceptionally. */ public CompletableFuture<Void> cancelJobAsync(String jobId, String accountId) { UpdateJobStatusRequest updateJobStatusRequest = UpdateJobStatusRequest.builder() .accountId(accountId) .jobId(jobId) .requestedJobStatus(String.valueOf(JobStatus.CANCELLED)) .build(); return asyncClient.updateJobStatus(updateJobStatusRequest) .thenAccept(updateJobStatusResponse -> { System.out.println("Job status updated to: " + updateJobStatusResponse.status()); }) .exceptionally(ex -> { System.err.println("Failed to cancel job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); }
  • 如需詳API細資訊,請參閱AWS SDK for Java 2.x API參考UpdateJobStatus中的。

案例

下列程式碼範例顯示如何學習「Amazon S3 控制」的核心操作。

SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在AWS 設定和執行程式碼範例儲存庫

瞭解核心作業。

package com.example.s3.batch; import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; import java.util.Map; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.CompletionException; public class S3BatchScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); private static final String STACK_NAME = "MyS3Stack"; public static void main(String[] args) throws IOException { S3BatchActions actions = new S3BatchActions(); String accountId = actions.getAccountId(); String uuid = java.util.UUID.randomUUID().toString(); Scanner scanner = new Scanner(System.in); System.out.println(DASHES); System.out.println("Welcome to the Amazon S3 Batch basics scenario."); System.out.println(""" S3 Batch operations enables efficient and cost-effective processing of large-scale data stored in Amazon S3. It automatically scales resources to handle varying workloads without the need for manual intervention. One of the key features of S3 Batch is its ability to perform tagging operations on objects stored in S3 buckets. Users can leverage S3 Batch to apply, update, or remove tags on thousands or millions of objects in a single operation, streamlining the management and organization of their data. This can be particularly useful for tasks such as cost allocation, lifecycle management, or metadata-driven workflows, where consistent and accurate tagging is essential. S3 Batch's scalability and serverless nature make it an ideal solution for organizations with growing data volumes and complex data management requirements. This Java program walks you through Amazon S3 Batch operations. Let's get started... """); waitForInputToContinue(scanner); // Use CloudFormation to stand up the resource required for this scenario. System.out.println("Use CloudFormation to stand up the resource required for this scenario."); CloudFormationHelper.deployCloudFormationStack(STACK_NAME); Map<String, String> stackOutputs = CloudFormationHelper.getStackOutputs(STACK_NAME); String iamRoleArn = stackOutputs.get("S3BatchRoleArn"); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Setup the required bucket for this scenario."); waitForInputToContinue(scanner); String bucketName = "x-" + UUID.randomUUID(); actions.createBucket(bucketName); String reportBucketName = "arn:aws:s3:::"+bucketName; String manifestLocation = "arn:aws:s3:::"+bucketName+"/job-manifest.csv"; System.out.println("Populate the bucket with the required files."); String[] fileNames = {"job-manifest.csv", "object-key-1.txt", "object-key-2.txt", "object-key-3.txt", "object-key-4.txt"}; actions.uploadFilesToBucket(bucketName, fileNames, actions); waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create a S3 Batch Job"); System.out.println("This job tags all objects listed in the manifest file with tags"); waitForInputToContinue(scanner); String jobId ; try { jobId = actions.createS3JobAsync(accountId, iamRoleArn, manifestLocation, reportBucketName, uuid).join(); System.out.println("The Job id is " + jobId); } catch (S3Exception e) { System.err.println("SSM error: " + e.getMessage()); return; } catch (RuntimeException e) { System.err.println("Unexpected error: " + e.getMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Update an existing S3 Batch Operations job's priority"); System.out.println(""" In this step, we modify the job priority value. The higher the number, the higher the priority. So, a job with a priority of `30` would have a higher priority than a job with a priority of `20`. This is a common way to represent the priority of a task or job, with higher numbers indicating a higher priority. Ensure that the job status allows for priority updates. Jobs in certain states (e.g., Cancelled, Failed, or Completed) cannot have their priorities updated. Only jobs in the Active or Suspended state typically allow priority updates. """); try { actions.updateJobPriorityAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Update job priority failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to update job priority: " + ex.getMessage()); } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Cancel the S3 Batch job"); System.out.print("Do you want to cancel the Batch job? (y/n): "); String cancelAns = scanner.nextLine(); if (cancelAns != null && cancelAns.trim().equalsIgnoreCase("y")) { try { actions.cancelJobAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Cancel job failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to cancel job: " + ex.getMessage()); } } else { System.out.println("Job " +jobId +" was not canceled."); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Describe the job that was just created"); waitForInputToContinue(scanner); try { actions.describeJobAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Describe job failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to describe job: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Describe the tags associated with the job"); waitForInputToContinue(scanner); try { actions.getJobTagsAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Get job tags failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to get job tags: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("6. Update Batch Job Tags"); waitForInputToContinue(scanner); try { actions.putJobTaggingAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Put job tagging failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to put job tagging: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Delete the Amazon S3 Batch job tagging."); System.out.print("Do you want to delete Batch job tagging? (y/n)"); String delAns = scanner.nextLine(); if (delAns != null && delAns.trim().equalsIgnoreCase("y")) { try { actions.deleteBatchJobTagsAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Delete batch job tags failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to delete batch job tags: " + ex.getMessage()); } } else { System.out.println("Tagging was not deleted."); } System.out.println(DASHES); System.out.println(DASHES); System.out.print("Do you want to delete the AWS resources used in this scenario? (y/n)"); String delResAns = scanner.nextLine(); if (delResAns != null && delResAns.trim().equalsIgnoreCase("y")) { actions.deleteFilesFromBucket(bucketName, fileNames, actions); actions.deleteBucketFolderAsync(bucketName); actions.deleteBucket(bucketName) .thenRun(() -> System.out.println("Bucket deletion completed")) .exceptionally(ex -> { System.err.println("Error occurred: " + ex.getMessage()); return null; }); CloudFormationHelper.destroyCloudFormationStack(STACK_NAME); } else { System.out.println("The AWS resources were not deleted."); } System.out.println("The Amazon S3 Batch scenario has successfully completed."); System.out.println(DASHES); } private static void waitForInputToContinue(Scanner scanner) { while (true) { System.out.println(); System.out.println("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { System.out.println("Continuing with the program..."); System.out.println(); break; } else { // Handle invalid input. System.out.println("Invalid input. Please try again."); } } } }

包裝操作的操作類。

public class S3BatchActions { private static S3ControlAsyncClient asyncClient; private static S3AsyncClient s3AsyncClient ; /** * Retrieves the asynchronous S3 Control client instance. * <p> * This method creates and returns a singleton instance of the {@link S3ControlAsyncClient}. If the instance * has not been created yet, it will be initialized with the following configuration: * <ul> * <li>Maximum concurrency: 100</li> * <li>Connection timeout: 60 seconds</li> * <li>Read timeout: 60 seconds</li> * <li>Write timeout: 60 seconds</li> * <li>API call timeout: 2 minutes</li> * <li>API call attempt timeout: 90 seconds</li> * <li>Retry policy: 3 retries</li> * <li>Region: US_EAST_1</li> * <li>Credentials provider: {@link EnvironmentVariableCredentialsProvider}</li> * </ul> * * @return the asynchronous S3 Control client instance */ private static S3ControlAsyncClient getAsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); asyncClient = S3ControlAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return asyncClient; } private static S3AsyncClient getS3AsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); s3AsyncClient = S3AsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) .build(); } return s3AsyncClient; } /** * Cancels a job asynchronously. * * @param jobId The ID of the job to be canceled. * @param accountId The ID of the account associated with the job. * @return A {@link CompletableFuture} that completes when the job status has been updated to "CANCELLED". * If an error occurs during the update, the returned future will complete exceptionally. */ public CompletableFuture<Void> cancelJobAsync(String jobId, String accountId) { UpdateJobStatusRequest updateJobStatusRequest = UpdateJobStatusRequest.builder() .accountId(accountId) .jobId(jobId) .requestedJobStatus(String.valueOf(JobStatus.CANCELLED)) .build(); return asyncClient.updateJobStatus(updateJobStatusRequest) .thenAccept(updateJobStatusResponse -> { System.out.println("Job status updated to: " + updateJobStatusResponse.status()); }) .exceptionally(ex -> { System.err.println("Failed to cancel job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } /** * Updates the priority of a job asynchronously. * * @param jobId the ID of the job to update * @param accountId the ID of the account associated with the job * @return a {@link CompletableFuture} that represents the asynchronous operation, which completes when the job priority has been updated or an error has occurred */ public CompletableFuture<Void> updateJobPriorityAsync(String jobId, String accountId) { UpdateJobPriorityRequest priorityRequest = UpdateJobPriorityRequest.builder() .accountId(accountId) .jobId(jobId) .priority(60) .build(); CompletableFuture<Void> future = new CompletableFuture<>(); getAsyncClient().updateJobPriority(priorityRequest) .thenAccept(response -> { System.out.println("The job priority was updated"); future.complete(null); // Complete the CompletableFuture on successful execution }) .exceptionally(ex -> { System.err.println("Failed to update job priority: " + ex.getMessage()); future.completeExceptionally(ex); // Complete the CompletableFuture exceptionally on error return null; // Return null to handle the exception }); return future; } /** * Asynchronously retrieves the tags associated with a specific job in an AWS account. * * @param jobId the ID of the job for which to retrieve the tags * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job tags have been retrieved, or with an exception if the operation fails * @throws RuntimeException if an error occurs while retrieving the job tags */ public CompletableFuture<Void> getJobTagsAsync(String jobId, String accountId) { GetJobTaggingRequest request = GetJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return asyncClient.getJobTagging(request) .thenAccept(response -> { List<S3Tag> tags = response.tags(); if (tags.isEmpty()) { System.out.println("No tags found for job ID: " + jobId); } else { for (S3Tag tag : tags) { System.out.println("Tag key is: " + tag.key()); System.out.println("Tag value is: " + tag.value()); } } }) .exceptionally(ex -> { System.err.println("Failed to get job tags: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } /** * Asynchronously deletes the tags associated with a specific batch job. * * @param jobId The ID of the batch job whose tags should be deleted. * @param accountId The ID of the account associated with the batch job. * @return A CompletableFuture that completes when the job tags have been successfully deleted, or an exception is thrown if the deletion fails. */ public CompletableFuture<Void> deleteBatchJobTagsAsync(String jobId, String accountId) { DeleteJobTaggingRequest jobTaggingRequest = DeleteJobTaggingRequest.builder() .accountId(accountId) .jobId(jobId) .build(); return asyncClient.deleteJobTagging(jobTaggingRequest) .thenAccept(response -> { System.out.println("You have successfully deleted " + jobId + " tagging."); }) .exceptionally(ex -> { System.err.println("Failed to delete job tags: " + ex.getMessage()); throw new RuntimeException(ex); }); } /** * Asynchronously describes the specified job. * * @param jobId the ID of the job to describe * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job description is available * @throws RuntimeException if an error occurs while describing the job */ public CompletableFuture<Void> describeJobAsync(String jobId, String accountId) { DescribeJobRequest jobRequest = DescribeJobRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return getAsyncClient().describeJob(jobRequest) .thenAccept(response -> { System.out.println("Job ID: " + response.job().jobId()); System.out.println("Description: " + response.job().description()); System.out.println("Status: " + response.job().statusAsString()); System.out.println("Role ARN: " + response.job().roleArn()); System.out.println("Priority: " + response.job().priority()); System.out.println("Progress Summary: " + response.job().progressSummary()); // Print out details about the job manifest. JobManifest manifest = response.job().manifest(); System.out.println("Manifest Location: " + manifest.location().objectArn()); System.out.println("Manifest ETag: " + manifest.location().eTag()); // Print out details about the job operation. JobOperation operation = response.job().operation(); if (operation.s3PutObjectTagging() != null) { System.out.println("Operation: S3 Put Object Tagging"); System.out.println("Tag Set: " + operation.s3PutObjectTagging().tagSet()); } // Print out details about the job report. JobReport report = response.job().report(); System.out.println("Report Bucket: " + report.bucket()); System.out.println("Report Prefix: " + report.prefix()); System.out.println("Report Format: " + report.format()); System.out.println("Report Enabled: " + report.enabled()); System.out.println("Report Scope: " + report.reportScopeAsString()); }) .exceptionally(ex -> { System.err.println("Failed to describe job: " + ex.getMessage()); throw new RuntimeException(ex); }); } /** * Creates an asynchronous S3 job using the AWS Java SDK. * * @param accountId the AWS account ID associated with the job * @param iamRoleArn the ARN of the IAM role to be used for the job * @param manifestLocation the location of the job manifest file in S3 * @param reportBucketName the name of the S3 bucket to store the job report * @param uuid a unique identifier for the job * @return a CompletableFuture that represents the asynchronous creation of the S3 job. * The CompletableFuture will return the job ID if the job is created successfully, * or throw an exception if there is an error. */ public CompletableFuture<String> createS3JobAsync(String accountId, String iamRoleArn, String manifestLocation, String reportBucketName, String uuid) { String[] bucketName = new String[]{""}; String[] parts = reportBucketName.split(":::"); if (parts.length > 1) { bucketName[0] = parts[1]; } else { System.out.println("The input string does not contain the expected format."); } return CompletableFuture.supplyAsync(() -> getETag(bucketName[0], "job-manifest.csv")) .thenCompose(eTag -> { ArrayList<S3Tag> tagSet = new ArrayList<>(); S3Tag s3Tag = S3Tag.builder() .key("keyOne") .value("ValueOne") .build(); S3Tag s3Tag2 = S3Tag.builder() .key("keyTwo") .value("ValueTwo") .build(); tagSet.add(s3Tag); tagSet.add(s3Tag2); S3SetObjectTaggingOperation objectTaggingOperation = S3SetObjectTaggingOperation.builder() .tagSet(tagSet) .build(); JobOperation jobOperation = JobOperation.builder() .s3PutObjectTagging(objectTaggingOperation) .build(); JobManifestLocation jobManifestLocation = JobManifestLocation.builder() .objectArn(manifestLocation) .eTag(eTag) .build(); JobManifestSpec manifestSpec = JobManifestSpec.builder() .fieldsWithStrings("Bucket", "Key") .format("S3BatchOperations_CSV_20180820") .build(); JobManifest jobManifest = JobManifest.builder() .spec(manifestSpec) .location(jobManifestLocation) .build(); JobReport jobReport = JobReport.builder() .bucket(reportBucketName) .prefix("reports") .format("Report_CSV_20180820") .enabled(true) .reportScope("AllTasks") .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .accountId(accountId) .description("Job created using the AWS Java SDK") .manifest(jobManifest) .operation(jobOperation) .report(jobReport) .priority(42) .roleArn(iamRoleArn) .clientRequestToken(uuid) .confirmationRequired(false) .build(); // Create the job asynchronously. return getAsyncClient().createJob(jobRequest) .thenApply(CreateJobResponse::jobId); }) .handle((jobId, ex) -> { if (ex != null) { Throwable cause = (ex instanceof CompletionException) ? ex.getCause() : ex; if (cause instanceof S3ControlException) { throw new CompletionException(cause); } else { throw new RuntimeException(cause); } } return jobId; }); } /** * Retrieves the ETag (Entity Tag) for an object stored in an Amazon S3 bucket. * * @param bucketName the name of the Amazon S3 bucket where the object is stored * @param key the key (file name) of the object in the Amazon S3 bucket * @return the ETag of the object */ public String getETag(String bucketName, String key) { S3Client s3Client = S3Client.builder() .region(Region.US_EAST_1) .build(); HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() .bucket(bucketName) .key(key) .build(); HeadObjectResponse headObjectResponse = s3Client.headObject(headObjectRequest); return headObjectResponse.eTag(); } /** * Asynchronously adds tags to a job in the system. * * @param jobId the ID of the job to add tags to * @param accountId the account ID associated with the job * @return a CompletableFuture that completes when the tagging operation is finished */ public CompletableFuture<Void> putJobTaggingAsync(String jobId, String accountId) { S3Tag departmentTag = S3Tag.builder() .key("department") .value("Marketing") .build(); S3Tag fiscalYearTag = S3Tag.builder() .key("FiscalYear") .value("2020") .build(); PutJobTaggingRequest putJobTaggingRequest = PutJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .tags(departmentTag, fiscalYearTag) .build(); return asyncClient.putJobTagging(putJobTaggingRequest) .thenRun(() -> { System.out.println("Additional Tags were added to job " + jobId); }) .exceptionally(ex -> { System.err.println("Failed to add tags to job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } // Setup the S3 bucket required for this scenario. /** * Creates an Amazon S3 bucket with the specified name. * * @param bucketName the name of the S3 bucket to create * @throws S3Exception if there is an error creating the bucket */ public void createBucket(String bucketName) { try { S3Client s3Client = S3Client.builder() .region(Region.US_EAST_1) .build(); S3Waiter s3Waiter = s3Client.waiter(); CreateBucketRequest bucketRequest = CreateBucketRequest.builder() .bucket(bucketName) .build(); s3Client.createBucket(bucketRequest); HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder() .bucket(bucketName) .build(); // Wait until the bucket is created and print out the response. WaiterResponse<HeadBucketResponse> waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait); waiterResponse.matched().response().ifPresent(System.out::println); System.out.println(bucketName + " is ready"); } catch (S3Exception e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } /** * Uploads a file to an Amazon S3 bucket asynchronously. * * @param bucketName the name of the S3 bucket to upload the file to * @param fileName the name of the file to be uploaded * @throws RuntimeException if an error occurs during the file upload */ public void populateBucket(String bucketName, String fileName) { // Define the path to the directory. Path filePath = Paths.get("src/main/resources/batch/", fileName).toAbsolutePath(); PutObjectRequest putOb = PutObjectRequest.builder() .bucket(bucketName) .key(fileName) .build(); CompletableFuture<PutObjectResponse> future = getS3AsyncClient().putObject(putOb, AsyncRequestBody.fromFile(filePath)); future.whenComplete((result, ex) -> { if (ex != null) { System.err.println("Error uploading file: " + ex.getMessage()); } else { System.out.println("Successfully placed " + fileName + " into bucket " + bucketName); } }).join(); } // Update the bucketName in CSV. public void updateCSV(String newValue) { Path csvFilePath = Paths.get("src/main/resources/batch/job-manifest.csv").toAbsolutePath(); try { // Read all lines from the CSV file. List<String> lines = Files.readAllLines(csvFilePath); // Update the first value in each line. List<String> updatedLines = lines.stream() .map(line -> { String[] parts = line.split(","); parts[0] = newValue; return String.join(",", parts); }) .collect(Collectors.toList()); // Write the updated lines back to the CSV file Files.write(csvFilePath, updatedLines); System.out.println("CSV file updated successfully."); } catch (Exception e) { e.printStackTrace(); } } /** * Deletes an object from an Amazon S3 bucket asynchronously. * * @param bucketName The name of the S3 bucket where the object is stored. * @param objectName The name of the object to be deleted. * @return A {@link CompletableFuture} that completes when the object has been deleted, * or throws a {@link RuntimeException} if an error occurs during the deletion. */ public CompletableFuture<Void> deleteBucketObjects(String bucketName, String objectName) { ArrayList<ObjectIdentifier> toDelete = new ArrayList<>(); toDelete.add(ObjectIdentifier.builder() .key(objectName) .build()); DeleteObjectsRequest dor = DeleteObjectsRequest.builder() .bucket(bucketName) .delete(Delete.builder() .objects(toDelete).build()) .build(); return getS3AsyncClient().deleteObjects(dor) .thenAccept(result -> { System.out.println("The object was deleted!"); }) .exceptionally(ex -> { throw new RuntimeException("Error deleting object: " + ex.getMessage(), ex); }); } /** * Deletes a folder and all its contents asynchronously from an Amazon S3 bucket. * * @param bucketName the name of the S3 bucket containing the folder to be deleted * @return a {@link CompletableFuture} that completes when the folder and its contents have been deleted * @throws RuntimeException if any error occurs during the deletion process */ public void deleteBucketFolderAsync(String bucketName) { String folderName = "reports/"; ListObjectsV2Request request = ListObjectsV2Request.builder() .bucket(bucketName) .prefix(folderName) .build(); CompletableFuture<ListObjectsV2Response> listObjectsFuture = getS3AsyncClient().listObjectsV2(request); listObjectsFuture.thenCompose(response -> { List<CompletableFuture<DeleteObjectResponse>> deleteFutures = response.contents().stream() .map(obj -> { DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() .bucket(bucketName) .key(obj.key()) .build(); return getS3AsyncClient().deleteObject(deleteRequest) .thenApply(deleteResponse -> { System.out.println("Deleted object: " + obj.key()); return deleteResponse; }); }) .collect(Collectors.toList()); return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) .thenCompose(v -> { // Delete the folder. DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() .bucket(bucketName) .key(folderName) .build(); return getS3AsyncClient().deleteObject(deleteRequest) .thenApply(deleteResponse -> { System.out.println("Deleted folder: " + folderName); return deleteResponse; }); }); }).join(); } /** * Deletes an Amazon S3 bucket. * * @param bucketName the name of the bucket to delete * @return a {@link CompletableFuture} that completes when the bucket has been deleted, or exceptionally if there is an error * @throws RuntimeException if there is an error deleting the bucket */ public CompletableFuture<Void> deleteBucket(String bucketName) { S3AsyncClient s3Client = getS3AsyncClient(); return s3Client.deleteBucket(DeleteBucketRequest.builder() .bucket(bucketName) .build()) .thenAccept(deleteBucketResponse -> { System.out.println(bucketName + " was deleted"); }) .exceptionally(ex -> { // Handle the exception or rethrow it. throw new RuntimeException("Failed to delete bucket: " + bucketName, ex); }); } /** * Uploads a set of files to an Amazon S3 bucket. * * @param bucketName the name of the S3 bucket to upload the files to * @param fileNames an array of file names to be uploaded * @param actions an instance of {@link S3BatchActions} that provides the implementation for the necessary S3 operations * @throws IOException if there's an error creating the text files or uploading the files to the S3 bucket */ public static void uploadFilesToBucket(String bucketName, String[] fileNames, S3BatchActions actions) throws IOException { actions.updateCSV(bucketName); createTextFiles(fileNames); for (String fileName : fileNames) { actions.populateBucket(bucketName, fileName); } System.out.println("All files are placed in the S3 bucket " + bucketName); } /** * Deletes the specified files from the given S3 bucket. * * @param bucketName the name of the S3 bucket * @param fileNames an array of file names to be deleted from the bucket * @param actions the S3BatchActions instance to be used for the file deletion * @throws IOException if an I/O error occurs during the file deletion */ public void deleteFilesFromBucket(String bucketName, String[] fileNames, S3BatchActions actions) throws IOException { for (String fileName : fileNames) { actions.deleteBucketObjects(bucketName, fileName) .thenRun(() -> System.out.println("Object deletion completed")) .exceptionally(ex -> { System.err.println("Error occurred: " + ex.getMessage()); return null; }); } System.out.println("All files have been deleted from the bucket " + bucketName); } public static void createTextFiles(String[] fileNames) { String currentDirectory = System.getProperty("user.dir"); String directoryPath = currentDirectory + "\\src\\main\\resources\\batch"; Path path = Paths.get(directoryPath); try { // Create the directory if it doesn't exist. if (Files.notExists(path)) { Files.createDirectories(path); System.out.println("Created directory: " + path.toString()); } else { System.out.println("Directory already exists: " + path.toString()); } for (String fileName : fileNames) { // Check if the file is a .txt file. if (fileName.endsWith(".txt")) { // Define the path for the new file. Path filePath = path.resolve(fileName); System.out.println("Attempting to create file: " + filePath.toString()); // Create and write content to the new file. Files.write(filePath, "This is a test".getBytes()); // Verify the file was created. if (Files.exists(filePath)) { System.out.println("Successfully created file: " + filePath.toString()); } else { System.out.println("Failed to create file: " + filePath.toString()); } } } } catch (IOException e) { System.err.println("An error occurred: " + e.getMessage()); e.printStackTrace(); } } public String getAccountId() { StsClient stsClient = StsClient.builder() .region(Region.US_EAST_1) .build(); GetCallerIdentityResponse callerIdentityResponse = stsClient.getCallerIdentity(); return callerIdentityResponse.account(); } }