使用阿帕奇氣流休息 API - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用阿帕奇氣流休息 API

適用於 Apache 氣流 (Amazon MWAA) 的 Amazon 管理工作流程支援使用 Apache 氣流 REST API 直接與 Apache 氣流環境互動,適用於執行 Apache 氣流 v2.4.3 及以上版本的環境。這可讓您以程式設計方式存取和管理 Amazon MWAA 環境,提供標準化的方式來叫用資料協調流程工作流程、管理 DAG,以及監控各種 Apache Airflow 元件 (例如中繼資料資料庫、安裝程式和排程器) 的狀態。

為了直接使用 Apache 氣流 REST API 提供支援,Amazon MWAA 提供水平擴展 Web 伺服器容量的選項,以處理不斷增加的需求,無論是來自 REST API 請求、命令列界面 (CLI) 使用情況,還是更多並行 Apache Airflow 使用者介面 (UI) 使用者。如需 Amazon MWAA 如何擴展網路伺服器的詳細資訊,請參閱。設定 Amazon MWAA 網頁伺服器自動擴展

您可以使用 Apache 氣流 REST API 為您的環境實作下列使用案例:

  • 程式設計存取 — 您現在可以啟動 Apache Airflow DAG 執行、管理資料集,以及擷取各種元件的狀態,例如中繼資料資料庫、觸發程式和排程器,而不需仰賴 Apache Airflow UI 或 CLI。

  • 與外部應用程式和微服務整合 — REST API 支援可讓您建立自訂解決方案,將 Amazon MWAA 環境與其他系統整合。例如,您可以啟動工作流程以回應來自外部系統的事件,例如已完成的資料庫工作或新使用者註冊。

  • 集中式監控 — 您可以建立監控儀表板,以彙總多個 Amazon MWAA 環境中 DAG 的狀態,進而集中監控和管理。

下列主題說明如何取得網頁伺服器存取權杖,然後使用該權杖對 Apache 氣流 REST API 進行 API 呼叫。在下列範例中,您將呼叫 API 以開始新的 DAG 執行。

如需有關 Apache 氣流 REST API 的詳細資訊,請參閱 Apache 氣流 REST API 參考資料。

建立 Web 伺服器工作階段權杖

要創建 Web 服務器訪問令牌,請使用以下 Python 函數。該函數首先調用 Amazon MWAA API 以獲取網絡登錄令牌。然後,將在 60 秒後過期的 Web 登錄令牌交換為 Web 會話令牌,該令牌可讓您訪問 Web 服務器並使用 Apache 氣流 REST API。

注意

工作階段權杖會在 12 小時後到期。

def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

呼叫阿帕奇氣流休息 API

驗證完成後,您將擁有憑據以開始向 API 端點發送請求。在以下範例中,使用端點/dags/dag_id/dag

def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)