Uso de la API de REST de Apache Airflow - Amazon Managed Workflows para Apache Airflow

Uso de la API de REST de Apache Airflow

Amazon Managed Workflows para Apache Airflow (Amazon MWAA) permite interactuar directamente con los entornos de Apache Airflow mediante la API de REST de Apache Airflow para entornos que ejecutan Apache Airflow v2.4.3 y versiones posteriores. Esto le permite acceder a sus entornos de Amazon MWAA y administrarlos mediante programación, lo que proporciona una forma estandarizada de invocar flujos de trabajo de orquestación de datos, gestionar sus DAG y supervisar el estado de varios componentes de Apache Airflow, como la base de datos de metadatos, el activador y el programador.

Para respaldar la escalabilidad cuando se utiliza la API de REST de Apache Airflow, Amazon MWAA le ofrece la opción de escalar horizontalmente la capacidad del servidor web para gestionar el aumento de la demanda, ya sea de solicitudes de API de REST, uso de la interfaz de la línea de comandos (CLI) o más usuarios simultáneos de la interfaz de usuario (UI) de Apache Airflow. Para obtener más información sobre cómo Amazon MWAA escala los servidores web, consulte Configuración del escalado automático del servidor web de Amazon MWAA.

Puede usar la API de REST de Apache Airflow para implementar los siguientes casos de uso en sus entornos:

  • Acceso mediante programación: ahora puede iniciar las ejecuciones del DAG de Apache Airflow, administrar conjuntos de datos y recuperar el estado de varios componentes, como la base de datos de metadatos, los activadores y los programadores, sin depender de la interfaz de usuario o la CLI de Apache Airflow.

  • Integración con aplicaciones y microservicios externos: la compatibilidad con la API de REST permite crear soluciones personalizadas que integran sus entornos de Amazon MWAA con otros sistemas. Por ejemplo, puede iniciar flujos de trabajo en respuesta a eventos de sistemas externos, como trabajos de base de datos completados o registros de nuevos usuarios.

  • Supervisión centralizada: puede crear paneles de supervisión que agreguen el estado de sus DAG en varios entornos de Amazon MWAA, lo que permite una supervisión y una administración centralizadas.

Para obtener más información sobre la API de REST de Apache Airflow, consulte la referencia de la API de REST de Apache Airflow.

Puede acceder a la API de REST de Apache Airflow mediante credenciales de AWS. Como alternativa, también puede acceder a ella si obtiene un token de acceso al servidor web y, a continuación, lo utiliza para llamarlo.

Los siguientes ejemplos muestran cómo realizar llamadas a la API a la API de REST de Apache Airflow e iniciar una nueva ejecución del DAG:

Concesión de acceso a la API de REST de Apache Airflow: airflow:InvokeRestApi

Para acceder a la API de REST de Apache Airflow mediante la credencial AWS, debe otorgar el permiso de airflow:InvokeRestApi en su política de IAM. En el siguiente ejemplo de política, especifique el rol de Admin, Op, User, Viewer o Public en {airflow-role} para personalizar el nivel de acceso del usuario. Para más información, consulte la sección Roles predeterminados en la guía de referencia de Apache Airflow.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:{your-region}:YOUR_ACCOUNT_ID:role/{your-environment-name}/{airflow-role}" ] } ] }
nota

Mientras configura un servidor web privado, la acción InvokeRestApi no se puede invocar desde fuera de una nube privada virtual (VPC). Puede utilizar la clave aws:SourceVpc para aplicar un control de acceso más detallado para esta operación. Para obtener más información, consulte aws:SourceVpc.

Llamado a la API de REST de Apache Airflow

En el siguiente script de ejemplo, se explica cómo utilizar la API de REST de Apache Airflow para enumerar los DAG disponibles en su entorno y cómo crear una variable de Apache Airflow:

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

Creación de un token de sesión de servidor web y llamada a la API de REST de Apache Airflow

Para crear un token de acceso al servidor web, utilice la siguiente función de Python. Esta función primero llama a la API de Amazon MWAA para obtener un token de inicio de sesión web. El token de inicio de sesión web, que caduca después de 60 segundos, se cambia luego por un token de sesión web, que le permite acceder al servidor web y utilizar la API de REST de Apache Airflow. Si necesita más de 10 transacciones por segundo (TPS) de capacidad de limitación, puede usar este método para acceder a la API de REST de Apache Airflow.

nota

El token de sesión caduca después de 12 horas.

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Una vez finalizada la autenticación, dispondrá de las credenciales para empezar a enviar solicitudes a los puntos de conexión de la API. En el siguiente ejemplo, use el punto de conexión /dags/dag_id/dag.

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)