Submitting jobs from Airflow - Amazon EMR

Submitting jobs from Airflow

An official Airflow 2.0 operator is currently under development. In the meantime, you can install a preview version of the operator from the EMR Serverless Samples GitHub repository.

You can use EmrServerlessCreateApplicationOperator to create a Spark or Hive application. You can also use EmrServerlessStartJobOperator to start one or more jobs with the your new application.

To use the operator, add the following line to your requirements.txt file and update your MWAA environment to use the new file.

emr_serverless @ https://github.com/aws-samples/emr-serverless-samples/releases/download/v1.0.1/mwaa_plugin.zip

The following abbreviated example shows how to create an application, run multiple Spark jobs, and then stop the application. A full example is available in the EMR Serverless Samples GitHub repository.

from datetime import datetime from airflow import DAG from emr_serverless.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "DOC-EXAMPLE-BUCKET" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://{S3_LOGS_BUCKET}/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application-id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)