DAGs使用 Lambda 函數調用 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

DAGs使用 Lambda 函數調用

下列程式碼範例使用AWS Lambda函數取得 Apache 氣流CLI權杖,並在 Amazon MWAA 環境中叫用有向無環圖 (DAG)。

版本

  • 您可以使用此頁面上的代碼示例與 Python 3.10 中的阿帕奇氣流 V2

必要條件

若要使用此程式碼範例,您必須:

注意

如果 Lambda 函數和您的 Amazon MWAA 環境位於相同的環境中VPC,您可以在私有網路上使用此程式碼。對於此組態,Lambda 函數的執行角色需要有權限才能呼叫 Amazon 彈性運算雲端 (AmazonEC2) CreateNetworkInterface API 作業。您可以使用AWSLambdaVPCAccessExecutionRole AWS 受管理的策略提供此權限。

許可

若要使用此頁面上的程式碼範例,Amazon MWAA 環境的執行角色需要存取權才能執行airflow:CreateCliToken動作。您可以使用AmazonMWAAAirflowCliAccess AWS 受管理的策略提供此權限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

如需詳細資訊,請參閱阿帕奇氣流CLI政策:A mazonMWAAAirflow CliAccess

相依性

  • 若要將此程式碼範例與 Apache Airflow v2 搭配使用,不需要額外的相依性。該代碼使用 Apache 氣流 v2 基本安裝在您的環境中。

程式碼範例

  1. 在開啟 AWS Lambda 主控台https://console.aws.amazon.com/lambda/

  2. 從「函數」清單中選擇您的 Lambda 數。

  3. 在功能頁面上,複製下列程式碼,並以資源名稱取代下列程式碼:

    • YOUR_ENVIRONMENT_NAME— 您的 Amazon MWAA 環境的名稱。

    • YOUR_DAG_NAME— 您要呼叫的名稱。DAG

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. 選擇部署

  5. 選擇「測試」以使用 Lambda 主控台叫用您的函數。

  6. 若要驗證您的 Lambda 是否成功叫用您的DAG,請使用 Amazon MWAA 主控台導覽至您環境的 Apache 氣流使用者介面,然後執行下列動作:

    1. DAGs頁面上,在的清單DAG中找出您的新目標DAGs。

    2. 在「上次執行」下,檢查最新DAG執行的時間戳記。此時間戳記應與您其他環境invoke_dag中的最新時間戳記非常相符。

    3. 在 [最近的工作] 下,檢查上次執行是否成功。