This documentation is for Version 1 of the AWS CLI only.
We announced the upcoming end-of-support for the AWS CLI version 1. We recommend that you migrate to the AWS CLI version 2. For dates, additional details, and information on how to migrate, see the announcement
Amazon EMR examples using AWS CLI with Bash script
The following code examples show you how to perform actions and implement common scenarios by using the AWS Command Line Interface with Bash script with Amazon EMR.
Scenarios are code examples that show you how to accomplish specific tasks by calling multiple functions within a service or combined with other AWS services.
Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.
Topics
Scenarios
The following code example shows how to:
Create an EC2 key pair
Set up storage and prepare your application
Clean up resources
- AWS CLI with Bash script
-
Note
There's more on GitHub. Find the complete example and learn how to set up and run in the Sample developer tutorials
repository. #!/bin/bash # EMR Getting Started Tutorial Script # This script automates the steps in the Amazon EMR Getting Started tutorial set -euo pipefail # Security: Set strict mode and trap errors trap 'handle_error "Script interrupted or command failed"' ERR # Set up logging with secure permissions LOG_FILE="emr-tutorial.log" touch "$LOG_FILE" chmod 600 "$LOG_FILE" exec > >(tee -a "$LOG_FILE") 2>&1 echo "Starting Amazon EMR Getting Started Tutorial Script" echo "Logging to $LOG_FILE" # Function to handle errors handle_error() { echo "ERROR: $1" echo "Resources created so far:" if [ -n "${BUCKET_NAME:-}" ]; then echo "- S3 Bucket: $BUCKET_NAME"; fi if [ -n "${CLUSTER_ID:-}" ]; then echo "- EMR Cluster: $CLUSTER_ID"; fi echo "Attempting to clean up resources..." cleanup exit 1 } # Function to clean up resources cleanup() { echo "" echo "===========================================" echo "CLEANUP IN PROGRESS" echo "===========================================" echo "Starting cleanup process..." # Terminate EMR cluster if it exists if [ -n "${CLUSTER_ID:-}" ]; then echo "Terminating EMR cluster: $CLUSTER_ID" aws emr terminate-clusters --cluster-ids "$CLUSTER_ID" 2>/dev/null || true echo "Waiting for cluster to terminate..." aws emr wait cluster-terminated --cluster-id "$CLUSTER_ID" 2>/dev/null || true echo "Cluster terminated successfully." fi # Delete S3 bucket and contents if it exists and is not shared if [ -n "${BUCKET_NAME:-}" ] && [ "${BUCKET_IS_SHARED:-false}" != "true" ]; then echo "Deleting S3 bucket contents: $BUCKET_NAME" aws s3 rm "s3://$BUCKET_NAME" --recursive 2>/dev/null || true echo "Deleting S3 bucket: $BUCKET_NAME" aws s3 rb "s3://$BUCKET_NAME" 2>/dev/null || true fi # Remove temporary key pair file if created by this script if [ -f "${KEY_NAME_FILE:-}" ]; then rm -f "$KEY_NAME_FILE" echo "Removed temporary key pair file." fi echo "Cleanup completed." } # Validate AWS CLI is installed and configured if ! command -v aws &> /dev/null; then handle_error "AWS CLI is not installed" fi # Test AWS credentials if ! aws sts get-caller-identity > /dev/null 2>&1; then handle_error "AWS credentials are not configured or invalid" fi # Generate a random identifier for S3 bucket RANDOM_ID=$(openssl rand -hex 6) # Check for shared prereq bucket PREREQ_BUCKET=$(aws cloudformation describe-stacks --stack-name tutorial-prereqs-bucket \ --query 'Stacks[0].Outputs[?OutputKey==`BucketName`].OutputValue' --output text 2>/dev/null || true) if [ -n "$PREREQ_BUCKET" ] && [ "$PREREQ_BUCKET" != "None" ]; then BUCKET_NAME="$PREREQ_BUCKET" BUCKET_IS_SHARED=true echo "Using shared bucket: $BUCKET_NAME" else BUCKET_IS_SHARED=false BUCKET_NAME="emr-${RANDOM_ID}" fi echo "Using bucket name: $BUCKET_NAME" # Create S3 bucket with security best practices echo "Creating S3 bucket: $BUCKET_NAME" aws s3 mb "s3://$BUCKET_NAME" --region "${AWS_REGION:-us-east-1}" || handle_error "Failed to create S3 bucket" # Enable bucket versioning for safety aws s3api put-bucket-versioning --bucket "$BUCKET_NAME" --versioning-configuration Status=Enabled || true # Block public access to bucket aws s3api put-public-access-block --bucket "$BUCKET_NAME" \ --public-access-block-configuration \ "BlockPublicAcls=true,IgnorePublicAcls=true,BlockPublicPolicy=true,RestrictPublicBuckets=true" || true # Enable encryption on bucket aws s3api put-bucket-encryption --bucket "$BUCKET_NAME" \ --server-side-encryption-configuration '{ "Rules": [{ "ApplyServerSideEncryptionByDefault": { "SSEAlgorithm": "AES256" } }] }' || true echo "S3 bucket created successfully with security best practices." # Create PySpark script echo "Creating PySpark script: health_violations.py" cat > health_violations.py << 'EOL' import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI of your food establishment data CSV, such as 's3://emr-tutorial-bucket/food-establishment-data.csv'. :param output_uri: The URI where output is written, such as 's3://emr-tutorial-bucket/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations FROM restaurant_violations WHERE violation_type = 'RED' GROUP BY name ORDER BY total_red_violations DESC LIMIT 10""") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.") parser.add_argument( '--output_uri', help="The URI where output is saved, like an S3 bucket location.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri) EOL # Secure the script file chmod 600 health_violations.py # Upload PySpark script to S3 echo "Uploading PySpark script to S3" aws s3 cp health_violations.py "s3://$BUCKET_NAME/" --sse AES256 || handle_error "Failed to upload PySpark script" echo "PySpark script uploaded successfully." # Download and prepare sample data echo "Downloading sample data" curl -sS -o food_establishment_data.zip "https://docs.aws.amazon.com/emr/latest/ManagementGuide/samples/food_establishment_data.zip" || handle_error "Failed to download sample data" # Verify downloaded file if [ ! -f food_establishment_data.zip ] || [ ! -s food_establishment_data.zip ]; then handle_error "Downloaded file is empty or missing" fi unzip -o food_establishment_data.zip || handle_error "Failed to unzip sample data" echo "Sample data downloaded and extracted successfully." # Secure the sample data file chmod 600 food_establishment_data.csv # Upload sample data to S3 echo "Uploading sample data to S3" aws s3 cp food_establishment_data.csv "s3://$BUCKET_NAME/" --sse AES256 || handle_error "Failed to upload sample data" echo "Sample data uploaded successfully." # Clean up sensitive local files rm -f food_establishment_data.zip health_violations.py # Create IAM default roles for EMR echo "Creating IAM default roles for EMR" aws emr create-default-roles 2>/dev/null || true echo "IAM default roles created successfully." # Check if EC2 key pair exists echo "Checking for EC2 key pair" KEY_PAIRS=$(aws ec2 describe-key-pairs --query "KeyPairs[*].KeyName" --output text 2>/dev/null || true) if [ -z "$KEY_PAIRS" ]; then echo "No EC2 key pairs found. Creating a new key pair..." KEY_NAME="emr-tutorial-key-${RANDOM_ID}" KEY_NAME_FILE="${KEY_NAME}.pem" aws ec2 create-key-pair --key-name "$KEY_NAME" --query "KeyMaterial" --output text > "$KEY_NAME_FILE" chmod 400 "$KEY_NAME_FILE" echo "Created new key pair: $KEY_NAME" else # Use the first available key pair KEY_NAME=$(echo "$KEY_PAIRS" | awk '{print $1}') echo "Using existing key pair: $KEY_NAME" fi # Launch EMR cluster with security best practices echo "Launching EMR cluster with Spark" CLUSTER_RESPONSE=$(aws emr create-cluster \ --name "EMR Tutorial Cluster" \ --release-label emr-6.10.0 \ --applications Name=Spark \ --ec2-attributes KeyName="$KEY_NAME" \ --instance-type m5.xlarge \ --instance-count 3 \ --use-default-roles \ --log-uri "s3://$BUCKET_NAME/logs/" \ --ebs-root-volume-size 100 \ --security-configuration "EMR-Tutorial-SecurityConfig" 2>/dev/null || true) # Check for errors in the response if echo "$CLUSTER_RESPONSE" | grep -i "error" > /dev/null; then handle_error "Failed to create EMR cluster: $CLUSTER_RESPONSE" fi # Extract cluster ID using jq if available, otherwise use alternative parsing if command -v jq &> /dev/null; then CLUSTER_ID=$(echo "$CLUSTER_RESPONSE" | jq -r '.ClusterId // empty') else CLUSTER_ID=$(echo "$CLUSTER_RESPONSE" | grep -o '"ClusterId"[[:space:]]*:[[:space:]]*"[^"]*' | grep -o 'j-[A-Z0-9]*' || true) fi if [ -z "$CLUSTER_ID" ] || [ "$CLUSTER_ID" == "null" ]; then handle_error "Failed to extract cluster ID from response: $CLUSTER_RESPONSE" fi echo "EMR cluster created with ID: $CLUSTER_ID" # Wait for cluster to be ready echo "Waiting for cluster to be ready (this may take several minutes)..." aws emr wait cluster-running --cluster-id "$CLUSTER_ID" || handle_error "Cluster failed to reach running state" # Check if cluster is in WAITING state CLUSTER_STATE=$(aws emr describe-cluster --cluster-id "$CLUSTER_ID" --query "Cluster.Status.State" --output text) if [ "$CLUSTER_STATE" != "WAITING" ]; then echo "Waiting for cluster to reach WAITING state..." WAIT_COUNT=0 MAX_WAIT=120 while [ "$CLUSTER_STATE" != "WAITING" ]; do if [ $WAIT_COUNT -ge $MAX_WAIT ]; then handle_error "Cluster did not reach WAITING state within timeout period" fi sleep 30 CLUSTER_STATE=$(aws emr describe-cluster --cluster-id "$CLUSTER_ID" --query "Cluster.Status.State" --output text) echo "Current cluster state: $CLUSTER_STATE" # Check for error states if [[ "$CLUSTER_STATE" == "TERMINATED_WITH_ERRORS" || "$CLUSTER_STATE" == "TERMINATED" ]]; then handle_error "Cluster entered error state: $CLUSTER_STATE" fi WAIT_COUNT=$((WAIT_COUNT + 1)) done fi echo "Cluster is now in WAITING state and ready to accept work." # Submit Spark application as a step echo "Submitting Spark application as a step" STEP_RESPONSE=$(aws emr add-steps \ --cluster-id "$CLUSTER_ID" \ --steps Type=Spark,Name="Health Violations Analysis",ActionOnFailure=CONTINUE,Args=["s3://$BUCKET_NAME/health_violations.py","--data_source","s3://$BUCKET_NAME/food_establishment_data.csv","--output_uri","s3://$BUCKET_NAME/results/"]) # Check for errors in the response if echo "$STEP_RESPONSE" | grep -i "error" > /dev/null; then handle_error "Failed to submit step: $STEP_RESPONSE" fi # Extract step ID using appropriate method if command -v jq &> /dev/null; then STEP_ID=$(echo "$STEP_RESPONSE" | jq -r '.StepIds[0] // empty') else STEP_ID=$(echo "$STEP_RESPONSE" | grep -o 's-[A-Z0-9]*' | head -1 || true) fi if [ -z "$STEP_ID" ] || [ "$STEP_ID" == "null" ]; then echo "Full step response: $STEP_RESPONSE" handle_error "Failed to extract valid step ID from response" fi echo "Step submitted with ID: $STEP_ID" # Wait for step to complete with timeout echo "Waiting for step to complete (this may take several minutes)..." aws emr wait step-complete --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" || handle_error "Step failed to complete" # Check step status STEP_STATE=$(aws emr describe-step --cluster-id "$CLUSTER_ID" --step-id "$STEP_ID" --query "Step.Status.State" --output text) if [ "$STEP_STATE" != "COMPLETED" ]; then handle_error "Step did not complete successfully. Final state: $STEP_STATE" fi echo "Step completed successfully." # View results echo "Listing output files in S3" aws s3 ls "s3://$BUCKET_NAME/results/" || handle_error "Failed to list output files" # Download results echo "Downloading results file" RESULT_FILE=$(aws s3 ls "s3://$BUCKET_NAME/results/" | grep -o "part-[0-9]*\.csv" | head -1 || true) if [ -z "$RESULT_FILE" ]; then echo "No result file found with pattern 'part-[0-9]*.csv'. Trying to find any CSV file..." RESULT_FILE=$(aws s3 ls "s3://$BUCKET_NAME/results/" | grep -o "part-.*\.csv" | head -1 || true) if [ -z "$RESULT_FILE" ]; then echo "Listing all files in results directory:" aws s3 ls "s3://$BUCKET_NAME/results/" handle_error "No result file found in the output directory" fi fi aws s3 cp "s3://$BUCKET_NAME/results/$RESULT_FILE" ./results.csv --sse AES256 || handle_error "Failed to download results file" chmod 600 ./results.csv echo "Results downloaded to results.csv" echo "Top 10 establishments with the most red violations:" cat results.csv # Display SSH connection information echo "" echo "To connect to the cluster via SSH, use the following command:" echo "aws emr ssh --cluster-id $CLUSTER_ID --key-pair-file ${KEY_NAME_FILE:-./${KEY_NAME}.pem}" # Display summary of created resources echo "" echo "===========================================" echo "RESOURCES CREATED" echo "===========================================" echo "- S3 Bucket: $BUCKET_NAME" echo "- EMR Cluster: $CLUSTER_ID" echo "- Results file: results.csv" if [ -f "${KEY_NAME_FILE:-}" ]; then echo "- EC2 Key Pair: $KEY_NAME (saved to ${KEY_NAME_FILE})" fi # Perform cleanup cleanup echo "Script completed successfully."-
For API details, see the following topics in AWS CLI Command Reference.
-