Invocación de DAG con una función de Lambda - Amazon Managed Workflows para Apache Airflow

Invocación de DAG con una función de Lambda

El siguiente código de ejemplo utiliza una función AWS Lambda para obtener un token CLI de Apache Airflow e invocar un gráfico acíclico dirigido (DAG) en un entorno de Amazon MWAA.

Versión

Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10 y con Apache Airflow v3 en Python 3.11.

Requisitos previos

Para utilizar este ejemplo de código, debe:

nota

Si la función de Lambda y su entorno Amazon MWAA están en la misma VPC, puede usar este código en una red privada. Para esta configuración, el rol de ejecución de la función de Lambda necesita permiso para llamar a la operación de la API de CreateNetworkInterface de Amazon Elastic Compute Cloud (Amazon EC2). Puede proporcionar este permiso mediante la política AWSLambdaVPCAccessExecutionRole administrada por AWS.

Permisos

Para usar el ejemplo de código de esta página, el rol de ejecución de su entorno Amazon MWAA necesita acceso para realizar la acción airflow:CreateCliToken. Puede proporcionar este permiso mediante la política AmazonMWAAAirflowCliAccess administrada por AWS.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

Para obtener más información, consulta Política de la CLI de Apache Airflow: AmazonMWAAAirflowCliAccess.

Dependencias

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. Use aws-mwaa-docker-images para instalar Apache Airflow.

Ejemplo de código

  1. Abra la consola de AWS Lambda en https://console.aws.amazon.com/lambda/.

  2. Elija su función de Lambda en la lista de funciones.

  3. En la página de funciones, copie el código siguiente y sustituya lo siguiente por los nombres de sus recursos:

    • YOUR_ENVIRONMENT_NAME: el nombre del entorno de Amazon MWAA.

    • YOUR_DAG_NAME: el nombre del DAG que desea invocar.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. Elija Implementar.

  5. Elija Probar para invocar la función mediante la consola Lambda.

  6. Para comprobar que su Lambda ha invocado correctamente su DAG, utilice la consola Amazon MWAA para navegar hasta la interfaz de usuario de Apache Airflow de su entorno y, a continuación, haga lo siguiente:

    1. En la página DAG, busque su nuevo DAG de destino en la lista de DAG.

    2. En Última ejecución, compruebe la marca de tiempo de la última ejecución del DAG. Esta marca de tiempo debe acercarse lo máximo posible a la última marca de tiempo para invoke_dag en su otro entorno.

    3. En Tareas recientes, compruebe que la última ejecución se haya realizado correctamente.