Como invocar DAGs com uma função do Lambda - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Como invocar DAGs com uma função do Lambda

O exemplo de código a seguir usa uma função AWS Lambda para obter um token CLI do Apache Airflow e invocar um gráfico acíclico direcionado (DAG) em um ambiente Amazon MWAA.

Version (Versão)

  • É possível usar o exemplo de código nesta página com o Apache Airflow v2 e superior no Python 3.10.

Pré-requisitos

Para usar esse exemplo de código, você deve:

nota

Se a função do Lambda e seu ambiente Amazon MWAA estiverem na mesma VPC, você poderá usar esse código em uma rede privada. Para essa configuração, o perfil de execução da função do Lambda precisa de permissão para chamar a operação da API CreateNetworkInterface do Amazon Elastic Compute Cloud (Amazon EC2). Você pode fornecer essa permissão usando a política AWSLambdaVPCAccessExecutionRole AWS gerenciada.

Permissões

Para usar o exemplo de código nesta página, o perfil de execução do seu ambiente Amazon MWAA precisa de acesso para realizar a ação airflow:CreateCliToken. Você pode fornecer essa permissão usando a política AmazonMWAAAirflowCliAccess AWS gerenciada:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

Para ter mais informações, consulte Política de CLI do Apache Airflow: Acesso ao AmazonMWAA AirflowCli.

Dependências

Exemplo de código

  1. Abra o AWS Lambda console em https://console.aws.amazon.com/lambda/.

  2. Escolha sua função do Lambda na lista Funções.

  3. Na página da função, copie o código a seguir e substitua-o pelos nomes dos seus recursos:

    • YOUR_ENVIRONMENT_NAME: o nome do seu ambiente do Amazon MWAA.

    • YOUR_DAG_NAME: o nome do DAG que você deseja invocar.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. Escolha Implantar.

  5. Escolha Testar para invocar sua função usando o console Lambda.

  6. Para verificar se seu Lambda invocou seu DAG com sucesso, use o console Amazon MWAA para navegar até a IU do Apache Airflow do seu ambiente e faça o seguinte:

    1. Na página DAGs, localize seu novo DAG de destino na lista de DAGs.

    2. Em Última execução, verifique a data e hora da última execução do DAG. Esse carimbo de data/hora deve ser semelhante ao carimbo de data/hora mais recente para invoke_dag em seu outro ambiente.

    3. Em Tarefas recentes, verifique se a última execução foi bem-sucedida.