Learn the basics of Amazon S3 Control with an AWS SDK
The following code examples show how to learn core operations for Amazon S3 Control.
- Java
-
- SDK for Java 2.x
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. Learn core operations.
package com.example.s3.batch; import software.amazon.awssdk.services.s3.model.S3Exception; import java.io.IOException; import java.util.Map; import java.util.Scanner; import java.util.UUID; import java.util.concurrent.CompletionException; public class S3BatchScenario { public static final String DASHES = new String(new char[80]).replace("\0", "-"); private static final String STACK_NAME = "MyS3Stack"; public static void main(String[] args) throws IOException { S3BatchActions actions = new S3BatchActions(); String accountId = actions.getAccountId(); String uuid = java.util.UUID.randomUUID().toString(); Scanner scanner = new Scanner(System.in); System.out.println(DASHES); System.out.println("Welcome to the Amazon S3 Batch basics scenario."); System.out.println(""" S3 Batch operations enables efficient and cost-effective processing of large-scale data stored in Amazon S3. It automatically scales resources to handle varying workloads without the need for manual intervention. One of the key features of S3 Batch is its ability to perform tagging operations on objects stored in S3 buckets. Users can leverage S3 Batch to apply, update, or remove tags on thousands or millions of objects in a single operation, streamlining the management and organization of their data. This can be particularly useful for tasks such as cost allocation, lifecycle management, or metadata-driven workflows, where consistent and accurate tagging is essential. S3 Batch's scalability and serverless nature make it an ideal solution for organizations with growing data volumes and complex data management requirements. This Java program walks you through Amazon S3 Batch operations. Let's get started... """); waitForInputToContinue(scanner); // Use CloudFormation to stand up the resource required for this scenario. System.out.println("Use CloudFormation to stand up the resource required for this scenario."); CloudFormationHelper.deployCloudFormationStack(STACK_NAME); Map<String, String> stackOutputs = CloudFormationHelper.getStackOutputs(STACK_NAME); String iamRoleArn = stackOutputs.get("S3BatchRoleArn"); System.out.println(DASHES); System.out.println(DASHES); System.out.println("Setup the required bucket for this scenario."); waitForInputToContinue(scanner); String bucketName = "amzn-s3-demo-bucket-" + UUID.randomUUID(); // Change bucket name. actions.createBucket(bucketName); String reportBucketName = "arn:aws:s3:::"+bucketName; String manifestLocation = "arn:aws:s3:::"+bucketName+"/job-manifest.csv"; System.out.println("Populate the bucket with the required files."); String[] fileNames = {"job-manifest.csv", "object-key-1.txt", "object-key-2.txt", "object-key-3.txt", "object-key-4.txt"}; actions.uploadFilesToBucket(bucketName, fileNames, actions); waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("1. Create a S3 Batch Job"); System.out.println("This job tags all objects listed in the manifest file with tags"); waitForInputToContinue(scanner); String jobId ; try { jobId = actions.createS3JobAsync(accountId, iamRoleArn, manifestLocation, reportBucketName, uuid).join(); System.out.println("The Job id is " + jobId); } catch (S3Exception e) { System.err.println("SSM error: " + e.getMessage()); return; } catch (RuntimeException e) { System.err.println("Unexpected error: " + e.getMessage()); return; } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("2. Update an existing S3 Batch Operations job's priority"); System.out.println(""" In this step, we modify the job priority value. The higher the number, the higher the priority. So, a job with a priority of `30` would have a higher priority than a job with a priority of `20`. This is a common way to represent the priority of a task or job, with higher numbers indicating a higher priority. Ensure that the job status allows for priority updates. Jobs in certain states (e.g., Cancelled, Failed, or Completed) cannot have their priorities updated. Only jobs in the Active or Suspended state typically allow priority updates. """); try { actions.updateJobPriorityAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Update job priority failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to update job priority: " + ex.getMessage()); } waitForInputToContinue(scanner); System.out.println(DASHES); System.out.println(DASHES); System.out.println("3. Cancel the S3 Batch job"); System.out.print("Do you want to cancel the Batch job? (y/n): "); String cancelAns = scanner.nextLine(); if (cancelAns != null && cancelAns.trim().equalsIgnoreCase("y")) { try { actions.cancelJobAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Cancel job failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to cancel job: " + ex.getMessage()); } } else { System.out.println("Job " +jobId +" was not canceled."); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("4. Describe the job that was just created"); waitForInputToContinue(scanner); try { actions.describeJobAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Describe job failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to describe job: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("5. Describe the tags associated with the job"); waitForInputToContinue(scanner); try { actions.getJobTagsAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Get job tags failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to get job tags: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("6. Update Batch Job Tags"); waitForInputToContinue(scanner); try { actions.putJobTaggingAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Put job tagging failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to put job tagging: " + ex.getMessage()); } System.out.println(DASHES); System.out.println(DASHES); System.out.println("7. Delete the Amazon S3 Batch job tagging."); System.out.print("Do you want to delete Batch job tagging? (y/n)"); String delAns = scanner.nextLine(); if (delAns != null && delAns.trim().equalsIgnoreCase("y")) { try { actions.deleteBatchJobTagsAsync(jobId, accountId) .exceptionally(ex -> { System.err.println("Delete batch job tags failed: " + ex.getMessage()); return null; }) .join(); } catch (CompletionException ex) { System.err.println("Failed to delete batch job tags: " + ex.getMessage()); } } else { System.out.println("Tagging was not deleted."); } System.out.println(DASHES); System.out.println(DASHES); System.out.print("Do you want to delete the AWS resources used in this scenario? (y/n)"); String delResAns = scanner.nextLine(); if (delResAns != null && delResAns.trim().equalsIgnoreCase("y")) { actions.deleteFilesFromBucket(bucketName, fileNames, actions); actions.deleteBucketFolderAsync(bucketName); actions.deleteBucket(bucketName) .thenRun(() -> System.out.println("Bucket deletion completed")) .exceptionally(ex -> { System.err.println("Error occurred: " + ex.getMessage()); return null; }); CloudFormationHelper.destroyCloudFormationStack(STACK_NAME); } else { System.out.println("The AWS resources were not deleted."); } System.out.println("The Amazon S3 Batch scenario has successfully completed."); System.out.println(DASHES); } private static void waitForInputToContinue(Scanner scanner) { while (true) { System.out.println(); System.out.println("Enter 'c' followed by <ENTER> to continue:"); String input = scanner.nextLine(); if (input.trim().equalsIgnoreCase("c")) { System.out.println("Continuing with the program..."); System.out.println(); break; } else { // Handle invalid input. System.out.println("Invalid input. Please try again."); } } } }
An action class that wraps operations.
public class S3BatchActions { private static S3ControlAsyncClient asyncClient; private static S3AsyncClient s3AsyncClient ; /** * Retrieves the asynchronous S3 Control client instance. * <p> * This method creates and returns a singleton instance of the {@link S3ControlAsyncClient}. If the instance * has not been created yet, it will be initialized with the following configuration: * <ul> * <li>Maximum concurrency: 100</li> * <li>Connection timeout: 60 seconds</li> * <li>Read timeout: 60 seconds</li> * <li>Write timeout: 60 seconds</li> * <li>API call timeout: 2 minutes</li> * <li>API call attempt timeout: 90 seconds</li> * <li>Retry policy: 3 retries</li> * <li>Region: US_EAST_1</li> * <li>Credentials provider: {@link EnvironmentVariableCredentialsProvider}</li> * </ul> * * @return the asynchronous S3 Control client instance */ private static S3ControlAsyncClient getAsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryPolicy(RetryPolicy.builder() .numRetries(3) .build()) .build(); asyncClient = S3ControlAsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .build(); } return asyncClient; } private static S3AsyncClient getS3AsyncClient() { if (asyncClient == null) { SdkAsyncHttpClient httpClient = NettyNioAsyncHttpClient.builder() .maxConcurrency(100) .connectionTimeout(Duration.ofSeconds(60)) .readTimeout(Duration.ofSeconds(60)) .writeTimeout(Duration.ofSeconds(60)) .build(); ClientOverrideConfiguration overrideConfig = ClientOverrideConfiguration.builder() .apiCallTimeout(Duration.ofMinutes(2)) .apiCallAttemptTimeout(Duration.ofSeconds(90)) .retryStrategy(RetryMode.STANDARD) .build(); s3AsyncClient = S3AsyncClient.builder() .region(Region.US_EAST_1) .httpClient(httpClient) .overrideConfiguration(overrideConfig) .build(); } return s3AsyncClient; } /** * Cancels a job asynchronously. * * @param jobId The ID of the job to be canceled. * @param accountId The ID of the account associated with the job. * @return A {@link CompletableFuture} that completes when the job status has been updated to "CANCELLED". * If an error occurs during the update, the returned future will complete exceptionally. */ public CompletableFuture<Void> cancelJobAsync(String jobId, String accountId) { UpdateJobStatusRequest updateJobStatusRequest = UpdateJobStatusRequest.builder() .accountId(accountId) .jobId(jobId) .requestedJobStatus(String.valueOf(JobStatus.CANCELLED)) .build(); return asyncClient.updateJobStatus(updateJobStatusRequest) .thenAccept(updateJobStatusResponse -> { System.out.println("Job status updated to: " + updateJobStatusResponse.status()); }) .exceptionally(ex -> { System.err.println("Failed to cancel job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } /** * Updates the priority of a job asynchronously. * * @param jobId the ID of the job to update * @param accountId the ID of the account associated with the job * @return a {@link CompletableFuture} that represents the asynchronous operation, which completes when the job priority has been updated or an error has occurred */ public CompletableFuture<Void> updateJobPriorityAsync(String jobId, String accountId) { UpdateJobPriorityRequest priorityRequest = UpdateJobPriorityRequest.builder() .accountId(accountId) .jobId(jobId) .priority(60) .build(); CompletableFuture<Void> future = new CompletableFuture<>(); getAsyncClient().updateJobPriority(priorityRequest) .thenAccept(response -> { System.out.println("The job priority was updated"); future.complete(null); // Complete the CompletableFuture on successful execution }) .exceptionally(ex -> { System.err.println("Failed to update job priority: " + ex.getMessage()); future.completeExceptionally(ex); // Complete the CompletableFuture exceptionally on error return null; // Return null to handle the exception }); return future; } /** * Asynchronously retrieves the tags associated with a specific job in an AWS account. * * @param jobId the ID of the job for which to retrieve the tags * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job tags have been retrieved, or with an exception if the operation fails * @throws RuntimeException if an error occurs while retrieving the job tags */ public CompletableFuture<Void> getJobTagsAsync(String jobId, String accountId) { GetJobTaggingRequest request = GetJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return asyncClient.getJobTagging(request) .thenAccept(response -> { List<S3Tag> tags = response.tags(); if (tags.isEmpty()) { System.out.println("No tags found for job ID: " + jobId); } else { for (S3Tag tag : tags) { System.out.println("Tag key is: " + tag.key()); System.out.println("Tag value is: " + tag.value()); } } }) .exceptionally(ex -> { System.err.println("Failed to get job tags: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } /** * Asynchronously deletes the tags associated with a specific batch job. * * @param jobId The ID of the batch job whose tags should be deleted. * @param accountId The ID of the account associated with the batch job. * @return A CompletableFuture that completes when the job tags have been successfully deleted, or an exception is thrown if the deletion fails. */ public CompletableFuture<Void> deleteBatchJobTagsAsync(String jobId, String accountId) { DeleteJobTaggingRequest jobTaggingRequest = DeleteJobTaggingRequest.builder() .accountId(accountId) .jobId(jobId) .build(); return asyncClient.deleteJobTagging(jobTaggingRequest) .thenAccept(response -> { System.out.println("You have successfully deleted " + jobId + " tagging."); }) .exceptionally(ex -> { System.err.println("Failed to delete job tags: " + ex.getMessage()); throw new RuntimeException(ex); }); } /** * Asynchronously describes the specified job. * * @param jobId the ID of the job to describe * @param accountId the ID of the AWS account associated with the job * @return a {@link CompletableFuture} that completes when the job description is available * @throws RuntimeException if an error occurs while describing the job */ public CompletableFuture<Void> describeJobAsync(String jobId, String accountId) { DescribeJobRequest jobRequest = DescribeJobRequest.builder() .jobId(jobId) .accountId(accountId) .build(); return getAsyncClient().describeJob(jobRequest) .thenAccept(response -> { System.out.println("Job ID: " + response.job().jobId()); System.out.println("Description: " + response.job().description()); System.out.println("Status: " + response.job().statusAsString()); System.out.println("Role ARN: " + response.job().roleArn()); System.out.println("Priority: " + response.job().priority()); System.out.println("Progress Summary: " + response.job().progressSummary()); // Print out details about the job manifest. JobManifest manifest = response.job().manifest(); System.out.println("Manifest Location: " + manifest.location().objectArn()); System.out.println("Manifest ETag: " + manifest.location().eTag()); // Print out details about the job operation. JobOperation operation = response.job().operation(); if (operation.s3PutObjectTagging() != null) { System.out.println("Operation: S3 Put Object Tagging"); System.out.println("Tag Set: " + operation.s3PutObjectTagging().tagSet()); } // Print out details about the job report. JobReport report = response.job().report(); System.out.println("Report Bucket: " + report.bucket()); System.out.println("Report Prefix: " + report.prefix()); System.out.println("Report Format: " + report.format()); System.out.println("Report Enabled: " + report.enabled()); System.out.println("Report Scope: " + report.reportScopeAsString()); }) .exceptionally(ex -> { System.err.println("Failed to describe job: " + ex.getMessage()); throw new RuntimeException(ex); }); } /** * Creates an asynchronous S3 job using the AWS Java SDK. * * @param accountId the AWS account ID associated with the job * @param iamRoleArn the ARN of the IAM role to be used for the job * @param manifestLocation the location of the job manifest file in S3 * @param reportBucketName the name of the S3 bucket to store the job report * @param uuid a unique identifier for the job * @return a CompletableFuture that represents the asynchronous creation of the S3 job. * The CompletableFuture will return the job ID if the job is created successfully, * or throw an exception if there is an error. */ public CompletableFuture<String> createS3JobAsync(String accountId, String iamRoleArn, String manifestLocation, String reportBucketName, String uuid) { String[] bucketName = new String[]{""}; String[] parts = reportBucketName.split(":::"); if (parts.length > 1) { bucketName[0] = parts[1]; } else { System.out.println("The input string does not contain the expected format."); } return CompletableFuture.supplyAsync(() -> getETag(bucketName[0], "job-manifest.csv")) .thenCompose(eTag -> { ArrayList<S3Tag> tagSet = new ArrayList<>(); S3Tag s3Tag = S3Tag.builder() .key("keyOne") .value("ValueOne") .build(); S3Tag s3Tag2 = S3Tag.builder() .key("keyTwo") .value("ValueTwo") .build(); tagSet.add(s3Tag); tagSet.add(s3Tag2); S3SetObjectTaggingOperation objectTaggingOperation = S3SetObjectTaggingOperation.builder() .tagSet(tagSet) .build(); JobOperation jobOperation = JobOperation.builder() .s3PutObjectTagging(objectTaggingOperation) .build(); JobManifestLocation jobManifestLocation = JobManifestLocation.builder() .objectArn(manifestLocation) .eTag(eTag) .build(); JobManifestSpec manifestSpec = JobManifestSpec.builder() .fieldsWithStrings("Bucket", "Key") .format("S3BatchOperations_CSV_20180820") .build(); JobManifest jobManifest = JobManifest.builder() .spec(manifestSpec) .location(jobManifestLocation) .build(); JobReport jobReport = JobReport.builder() .bucket(reportBucketName) .prefix("reports") .format("Report_CSV_20180820") .enabled(true) .reportScope("AllTasks") .build(); CreateJobRequest jobRequest = CreateJobRequest.builder() .accountId(accountId) .description("Job created using the AWS Java SDK") .manifest(jobManifest) .operation(jobOperation) .report(jobReport) .priority(42) .roleArn(iamRoleArn) .clientRequestToken(uuid) .confirmationRequired(false) .build(); // Create the job asynchronously. return getAsyncClient().createJob(jobRequest) .thenApply(CreateJobResponse::jobId); }) .handle((jobId, ex) -> { if (ex != null) { Throwable cause = (ex instanceof CompletionException) ? ex.getCause() : ex; if (cause instanceof S3ControlException) { throw new CompletionException(cause); } else { throw new RuntimeException(cause); } } return jobId; }); } /** * Retrieves the ETag (Entity Tag) for an object stored in an Amazon S3 bucket. * * @param bucketName the name of the Amazon S3 bucket where the object is stored * @param key the key (file name) of the object in the Amazon S3 bucket * @return the ETag of the object */ public String getETag(String bucketName, String key) { S3Client s3Client = S3Client.builder() .region(Region.US_EAST_1) .build(); HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() .bucket(bucketName) .key(key) .build(); HeadObjectResponse headObjectResponse = s3Client.headObject(headObjectRequest); return headObjectResponse.eTag(); } /** * Asynchronously adds tags to a job in the system. * * @param jobId the ID of the job to add tags to * @param accountId the account ID associated with the job * @return a CompletableFuture that completes when the tagging operation is finished */ public CompletableFuture<Void> putJobTaggingAsync(String jobId, String accountId) { S3Tag departmentTag = S3Tag.builder() .key("department") .value("Marketing") .build(); S3Tag fiscalYearTag = S3Tag.builder() .key("FiscalYear") .value("2020") .build(); PutJobTaggingRequest putJobTaggingRequest = PutJobTaggingRequest.builder() .jobId(jobId) .accountId(accountId) .tags(departmentTag, fiscalYearTag) .build(); return asyncClient.putJobTagging(putJobTaggingRequest) .thenRun(() -> { System.out.println("Additional Tags were added to job " + jobId); }) .exceptionally(ex -> { System.err.println("Failed to add tags to job: " + ex.getMessage()); throw new RuntimeException(ex); // Propagate the exception }); } // Setup the S3 bucket required for this scenario. /** * Creates an Amazon S3 bucket with the specified name. * * @param bucketName the name of the S3 bucket to create * @throws S3Exception if there is an error creating the bucket */ public void createBucket(String bucketName) { try { S3Client s3Client = S3Client.builder() .region(Region.US_EAST_1) .build(); S3Waiter s3Waiter = s3Client.waiter(); CreateBucketRequest bucketRequest = CreateBucketRequest.builder() .bucket(bucketName) .build(); s3Client.createBucket(bucketRequest); HeadBucketRequest bucketRequestWait = HeadBucketRequest.builder() .bucket(bucketName) .build(); // Wait until the bucket is created and print out the response. WaiterResponse<HeadBucketResponse> waiterResponse = s3Waiter.waitUntilBucketExists(bucketRequestWait); waiterResponse.matched().response().ifPresent(System.out::println); System.out.println(bucketName + " is ready"); } catch (S3Exception e) { System.err.println(e.awsErrorDetails().errorMessage()); System.exit(1); } } /** * Uploads a file to an Amazon S3 bucket asynchronously. * * @param bucketName the name of the S3 bucket to upload the file to * @param fileName the name of the file to be uploaded * @throws RuntimeException if an error occurs during the file upload */ public void populateBucket(String bucketName, String fileName) { // Define the path to the directory. Path filePath = Paths.get("src/main/resources/batch/", fileName).toAbsolutePath(); PutObjectRequest putOb = PutObjectRequest.builder() .bucket(bucketName) .key(fileName) .build(); CompletableFuture<PutObjectResponse> future = getS3AsyncClient().putObject(putOb, AsyncRequestBody.fromFile(filePath)); future.whenComplete((result, ex) -> { if (ex != null) { System.err.println("Error uploading file: " + ex.getMessage()); } else { System.out.println("Successfully placed " + fileName + " into bucket " + bucketName); } }).join(); } // Update the bucketName in CSV. public void updateCSV(String newValue) { Path csvFilePath = Paths.get("src/main/resources/batch/job-manifest.csv").toAbsolutePath(); try { // Read all lines from the CSV file. List<String> lines = Files.readAllLines(csvFilePath); // Update the first value in each line. List<String> updatedLines = lines.stream() .map(line -> { String[] parts = line.split(","); parts[0] = newValue; return String.join(",", parts); }) .collect(Collectors.toList()); // Write the updated lines back to the CSV file Files.write(csvFilePath, updatedLines); System.out.println("CSV file updated successfully."); } catch (Exception e) { e.printStackTrace(); } } /** * Deletes an object from an Amazon S3 bucket asynchronously. * * @param bucketName The name of the S3 bucket where the object is stored. * @param objectName The name of the object to be deleted. * @return A {@link CompletableFuture} that completes when the object has been deleted, * or throws a {@link RuntimeException} if an error occurs during the deletion. */ public CompletableFuture<Void> deleteBucketObjects(String bucketName, String objectName) { ArrayList<ObjectIdentifier> toDelete = new ArrayList<>(); toDelete.add(ObjectIdentifier.builder() .key(objectName) .build()); DeleteObjectsRequest dor = DeleteObjectsRequest.builder() .bucket(bucketName) .delete(Delete.builder() .objects(toDelete).build()) .build(); return getS3AsyncClient().deleteObjects(dor) .thenAccept(result -> { System.out.println("The object was deleted!"); }) .exceptionally(ex -> { throw new RuntimeException("Error deleting object: " + ex.getMessage(), ex); }); } /** * Deletes a folder and all its contents asynchronously from an Amazon S3 bucket. * * @param bucketName the name of the S3 bucket containing the folder to be deleted * @return a {@link CompletableFuture} that completes when the folder and its contents have been deleted * @throws RuntimeException if any error occurs during the deletion process */ public void deleteBucketFolderAsync(String bucketName) { String folderName = "reports/"; ListObjectsV2Request request = ListObjectsV2Request.builder() .bucket(bucketName) .prefix(folderName) .build(); CompletableFuture<ListObjectsV2Response> listObjectsFuture = getS3AsyncClient().listObjectsV2(request); listObjectsFuture.thenCompose(response -> { List<CompletableFuture<DeleteObjectResponse>> deleteFutures = response.contents().stream() .map(obj -> { DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() .bucket(bucketName) .key(obj.key()) .build(); return getS3AsyncClient().deleteObject(deleteRequest) .thenApply(deleteResponse -> { System.out.println("Deleted object: " + obj.key()); return deleteResponse; }); }) .collect(Collectors.toList()); return CompletableFuture.allOf(deleteFutures.toArray(new CompletableFuture[0])) .thenCompose(v -> { // Delete the folder. DeleteObjectRequest deleteRequest = DeleteObjectRequest.builder() .bucket(bucketName) .key(folderName) .build(); return getS3AsyncClient().deleteObject(deleteRequest) .thenApply(deleteResponse -> { System.out.println("Deleted folder: " + folderName); return deleteResponse; }); }); }).join(); } /** * Deletes an Amazon S3 bucket. * * @param bucketName the name of the bucket to delete * @return a {@link CompletableFuture} that completes when the bucket has been deleted, or exceptionally if there is an error * @throws RuntimeException if there is an error deleting the bucket */ public CompletableFuture<Void> deleteBucket(String bucketName) { S3AsyncClient s3Client = getS3AsyncClient(); return s3Client.deleteBucket(DeleteBucketRequest.builder() .bucket(bucketName) .build()) .thenAccept(deleteBucketResponse -> { System.out.println(bucketName + " was deleted"); }) .exceptionally(ex -> { // Handle the exception or rethrow it. throw new RuntimeException("Failed to delete bucket: " + bucketName, ex); }); } /** * Uploads a set of files to an Amazon S3 bucket. * * @param bucketName the name of the S3 bucket to upload the files to * @param fileNames an array of file names to be uploaded * @param actions an instance of {@link S3BatchActions} that provides the implementation for the necessary S3 operations * @throws IOException if there's an error creating the text files or uploading the files to the S3 bucket */ public static void uploadFilesToBucket(String bucketName, String[] fileNames, S3BatchActions actions) throws IOException { actions.updateCSV(bucketName); createTextFiles(fileNames); for (String fileName : fileNames) { actions.populateBucket(bucketName, fileName); } System.out.println("All files are placed in the S3 bucket " + bucketName); } /** * Deletes the specified files from the given S3 bucket. * * @param bucketName the name of the S3 bucket * @param fileNames an array of file names to be deleted from the bucket * @param actions the S3BatchActions instance to be used for the file deletion * @throws IOException if an I/O error occurs during the file deletion */ public void deleteFilesFromBucket(String bucketName, String[] fileNames, S3BatchActions actions) throws IOException { for (String fileName : fileNames) { actions.deleteBucketObjects(bucketName, fileName) .thenRun(() -> System.out.println("Object deletion completed")) .exceptionally(ex -> { System.err.println("Error occurred: " + ex.getMessage()); return null; }); } System.out.println("All files have been deleted from the bucket " + bucketName); } public static void createTextFiles(String[] fileNames) { String currentDirectory = System.getProperty("user.dir"); String directoryPath = currentDirectory + "\\src\\main\\resources\\batch"; Path path = Paths.get(directoryPath); try { // Create the directory if it doesn't exist. if (Files.notExists(path)) { Files.createDirectories(path); System.out.println("Created directory: " + path.toString()); } else { System.out.println("Directory already exists: " + path.toString()); } for (String fileName : fileNames) { // Check if the file is a .txt file. if (fileName.endsWith(".txt")) { // Define the path for the new file. Path filePath = path.resolve(fileName); System.out.println("Attempting to create file: " + filePath.toString()); // Create and write content to the new file. Files.write(filePath, "This is a test".getBytes()); // Verify the file was created. if (Files.exists(filePath)) { System.out.println("Successfully created file: " + filePath.toString()); } else { System.out.println("Failed to create file: " + filePath.toString()); } } } } catch (IOException e) { System.err.println("An error occurred: " + e.getMessage()); e.printStackTrace(); } } public String getAccountId() { StsClient stsClient = StsClient.builder() .region(Region.US_EAST_1) .build(); GetCallerIdentityResponse callerIdentityResponse = stsClient.getCallerIdentity(); return callerIdentityResponse.account(); } }
-
For API details, see the following topics in AWS SDK for Java 2.x API Reference.
-
- Python
-
- SDK for Python (Boto3)
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository
. Learn S3 Batch Basics Scenario.
class S3BatchWrapper: """Wrapper class for managing S3 Batch Operations.""" def __init__(self, s3_client: Any, s3control_client: Any, sts_client: Any) -> None: """ Initializes the S3BatchWrapper with AWS service clients. :param s3_client: A Boto3 Amazon S3 client. This client provides low-level access to AWS S3 services. :param s3control_client: A Boto3 Amazon S3 Control client. This client provides low-level access to AWS S3 Control services. :param sts_client: A Boto3 AWS STS client. This client provides low-level access to AWS STS services. """ self.s3_client = s3_client self.s3control_client = s3control_client self.sts_client = sts_client # Get region from the client for bucket creation logic self.region_name = self.s3_client.meta.region_name def get_account_id(self) -> str: """ Get AWS account ID. Returns: str: AWS account ID """ return self.sts_client.get_caller_identity()["Account"] def create_bucket(self, bucket_name: str) -> None: """ Create an S3 bucket. Args: bucket_name (str): Name of the bucket to create Raises: ClientError: If bucket creation fails """ try: if self.region_name and self.region_name != 'us-east-1': self.s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': self.region_name } ) else: self.s3_client.create_bucket(Bucket=bucket_name) print(f"Created bucket: {bucket_name}") except ClientError as e: print(f"Error creating bucket: {e}") raise def upload_files_to_bucket(self, bucket_name: str, file_names: List[str]) -> str: """ Upload files to S3 bucket including manifest file. Args: bucket_name (str): Target bucket name file_names (list): List of file names to upload Returns: str: ETag of the manifest file Raises: ClientError: If file upload fails """ try: for file_name in file_names: if file_name != "job-manifest.csv": content = f"Content for {file_name}" self.s3_client.put_object( Bucket=bucket_name, Key=file_name, Body=content.encode('utf-8') ) print(f"Uploaded {file_name} to {bucket_name}") manifest_content = "" for file_name in file_names: if file_name != "job-manifest.csv": manifest_content += f"{bucket_name},{file_name}\n" manifest_response = self.s3_client.put_object( Bucket=bucket_name, Key="job-manifest.csv", Body=manifest_content.encode('utf-8') ) print(f"Uploaded manifest file to {bucket_name}") print(f"Manifest content:\n{manifest_content}") return manifest_response['ETag'].strip('"') except ClientError as e: print(f"Error uploading files: {e}") raise def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise def check_job_failure_reasons(self, job_id: str, account_id: str) -> List[Dict[str, Any]]: """ Check for any failure reasons of a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID Returns: list: List of failure reasons Raises: ClientError: If checking job failure reasons fails """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) if 'FailureReasons' in response['Job']: for reason in response['Job']['FailureReasons']: print(f"- {reason}") return response['Job'].get('FailureReasons', []) except ClientError as e: print(f"Error checking job failure reasons: {e}") raise def wait_for_job_ready(self, job_id: str, account_id: str, desired_status: str = 'Ready') -> bool: """ Wait for a job to reach the desired status. Args: job_id (str): ID of the batch job account_id (str): AWS account ID desired_status (str): Target status to wait for Returns: bool: True if desired status is reached, False otherwise Raises: ClientError: If checking job status fails """ print(f"Waiting for job to become {desired_status}...") max_attempts = 60 attempt = 0 while attempt < max_attempts: try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status == desired_status: return True if current_status == 'Suspended': print("Job is in Suspended state, can proceed with activation") return True if current_status in ['Active', 'Failed', 'Cancelled', 'Complete']: print(f"Job is in {current_status} state, cannot reach {desired_status} status") if 'FailureReasons' in response['Job']: print("Failure reasons:") for reason in response['Job']['FailureReasons']: print(f"- {reason}") return False time.sleep(20) attempt += 1 except ClientError as e: print(f"Error checking job status: {e}") raise print(f"Timeout waiting for job to become {desired_status}") return False def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise def cleanup_resources(self, bucket_name: str, file_names: List[str]) -> None: """ Clean up all resources created during the scenario. Args: bucket_name (str): Name of the bucket to clean up file_names (list): List of files to delete Raises: ClientError: If cleanup fails """ try: for file_name in file_names: self.s3_client.delete_object(Bucket=bucket_name, Key=file_name) print(f"Deleted {file_name}") response = self.s3_client.list_objects_v2( Bucket=bucket_name, Prefix='batch-op-reports/' ) if 'Contents' in response: for obj in response['Contents']: self.s3_client.delete_object( Bucket=bucket_name, Key=obj['Key'] ) print(f"Deleted {obj['Key']}") self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Deleted bucket {bucket_name}") except ClientError as e: print(f"Error in cleanup: {e}") raise
-
For API details, see the following topics in AWS SDK for Python (Boto3) API Reference.
-
For a complete list of AWS SDK developer guides and code examples, see Developing with Amazon S3 using the AWS SDKs. This topic also includes information about getting started and details about previous SDK versions.