本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
DAGs使用 Lambda 函數調用
下列程式碼範例使用AWS Lambda函數取得 Apache 氣流CLI權杖,並在 Amazon MWAA 環境中叫用有向無環圖 (DAG)。
版本
-
您可以使用此頁面上的代碼示例與 Python 3.10
中的阿帕奇氣流 V2。
必要條件
若要使用此程式碼範例,您必須:
-
針對您的 Amazon MWAA 環境使用公用網路存取模式。
-
有一個使用最新的 Python 運行時的 Lambda 函數。
注意
如果 Lambda 函數和您的 Amazon MWAA 環境位於相同的環境中VPC,您可以在私有網路上使用此程式碼。對於此組態,Lambda 函數的執行角色需要有權限才能呼叫 Amazon 彈性運算雲端 (AmazonEC2) CreateNetworkInterface API 作業。您可以使用AWSLambdaVPCAccessExecutionRole
許可
若要使用此頁面上的程式碼範例,Amazon MWAA 環境的執行角色需要存取權才能執行airflow:CreateCliToken
動作。您可以使用AmazonMWAAAirflowCliAccess
AWS 受管理的策略提供此權限:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }
如需詳細資訊,請參閱阿帕奇氣流CLI政策:A mazonMWAAAirflow CliAccess。
相依性
-
若要將此程式碼範例與 Apache Airflow v2 搭配使用,不需要額外的相依性。該代碼使用 Apache 氣流 v2 基本安裝
在您的環境中。
程式碼範例
-
在開啟 AWS Lambda 主控台https://console.aws.amazon.com/lambda/
。 -
從「函數」清單中選擇您的 Lambda 函數。
-
在功能頁面上,複製下列程式碼,並以資源名稱取代下列程式碼:
-
YOUR_ENVIRONMENT_NAME
— 您的 Amazon MWAA 環境的名稱。 -
YOUR_DAG_NAME
— 您要呼叫的名稱。DAG
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
-
-
選擇部署。
-
選擇「測試」以使用 Lambda 主控台叫用您的函數。
-
若要驗證您的 Lambda 是否成功叫用您的DAG,請使用 Amazon MWAA 主控台導覽至您環境的 Apache 氣流使用者介面,然後執行下列動作:
-
在DAGs頁面上,在的清單DAG中找出您的新目標DAGs。
-
在「上次執行」下,檢查最新DAG執行的時間戳記。此時間戳記應與您其他環境
invoke_dag
中的最新時間戳記非常相符。 -
在 [最近的工作] 下,檢查上次執行是否成功。
-