기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Apache Airflow API REST 사용
Amazon Managed Workflows for Apache Airflow(Amazon MWAA)는 Apache Airflow v2.4.3 이상을 실행하는 환경에 대해 Apache Airflow API REST를 사용하여 Apache Airflow 환경과 직접 상호 작용을 지원합니다. 이를 통해 프로그래밍 방식으로 Amazon MWAA 환경에 액세스하고 관리할 수 있으며, 데이터 오케스트레이션 워크플로를 호출하고, DAGs를 관리하고, 메타데이터 데이터베이스, 트리거 및 스케줄러와 같은 다양한 Apache Airflow 구성 요소의 상태를 모니터링하는 표준화된 방법을 제공합니다.
Apache Airflow API REST를 사용하는 동안 확장성을 지원하기 위해 Amazon MWAA는 API REST 요청, 명령줄 인터페이스(CLI) 사용 또는 더 많은 동시 Apache Airflow 사용자 인터페이스(UI) 사용자 등 수요 증가를 처리하기 위해 웹 서버 용량을 수평적으로 확장할 수 있는 옵션을 제공합니다. Amazon MWAA가 웹 서버를 확장하는 방법에 대한 자세한 내용은 섹션을 참조하세요Amazon MWAA 웹 서버 오토 스케일링 구성.
Apache Airflow API REST Word를 사용하여 환경에 다음과 같은 사용 사례를 구현할 수 있습니다.
-
프로그래밍 방식 액세스 - 이제 Apache Airflow UI 또는 DAG에 의존하지 않고 Apache Airflow CLI 실행을 시작하고, 데이터 세트를 관리하고, 메타데이터 데이터베이스, 트리거 및 스케줄러와 같은 다양한 구성 요소의 상태를 검색할 수 있습니다.
-
외부 애플리케이션 및 마이크로서비스와 통합 - API REST 지원을 통해 Amazon MWAA 환경을 다른 시스템과 통합하는 사용자 지정 솔루션을 구축할 수 있습니다. 예를 들어 완료된 데이터베이스 작업 또는 새 사용자 등록과 같은 외부 시스템의 이벤트에 대한 응답으로 워크플로를 시작할 수 있습니다.
-
중앙 집중식 모니터링 - 여러 Amazon DAGs MWAA 환경에서 Word 상태를 집계하는 모니터링 대시보드를 구축하여 중앙 집중식 모니터링 및 관리를 활성화할 수 있습니다.
Apache Airflow API REST에 대한 자세한 내용은 Apache Airflow API REST 참조를
를 사용하면 자격 증명을 사용하여 AWS Apache Airflow API REST에 액세스할 InvokeRestApi
수 있습니다. 또는 웹 서버 액세스 토큰을 얻은 다음, 이 토큰을 사용해서 직접적으로 호출하여 액세스할 수도 있습니다.
참고
-
InvokeRestApi
작업을 사용하는 동안 “사용하도록 환경 업데이트InvokeRestApi
” 메시지에 오류가 발생하면 Amazon MWAA 환경을 업데이트해야 함을 나타냅니다. 이 오류는 Amazon MWAA 환경이InvokeRestApi
기능과 관련된 최신 변경 사항과 호환되지 않을 때 발생합니다. 이 문제를 해결하려면 Amazon MWAA 환경을 업데이트하여InvokeRestApi
기능에 필요한 변경 사항을 통합합니다. -
InvokeRestApi
작업의 기본 제한 시간은 10초입니다. 이 10초 이내에 작업이 완료되지 않으면 작업이 자동으로 종료되고 오류가 발생합니다. 오류가 발생하지 않도록 API REST 호출이이 제한 시간 내에 완료되도록 설계되었는지 확인합니다.
다음 예제에서는 Apache Airflow API API REST를 호출하고 새 DAG 실행을 시작하는 방법을 보여줍니다.
주제
Apache Airflow API REST에 대한 액세스 권한 부여: airflow:InvokeRestApi
AWS 자격 증명을 사용하여 Apache Airflow API REST에 액세스하려면 IAM 정책에 airflow:InvokeRestApi
권한을 부여해야 합니다. 다음 정책 샘플에서, {airflow-role}
에서 Admin
, Op
, User
, Viewer
또는 Public
역할을 지정하여 사용자 액세스 수준을 지정합니다. 자세한 내용은 Apache Airflow 참조 가이드의 기본 역할
{ "Version": "2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:{your-region}:YOUR_ACCOUNT_ID:role/{your-environment-name}/{airflow-role}" ] } ] }
참고
프라이빗 웹 서버를 구성하는 동안 Virtual Private Cloud(VPC) 외부에서 InvokeRestApi
작업을 호출할 수 없습니다. aws:SourceVpc
키를 사용하여 이 작업에 대해 더 세분화된 액세스 제어를 적용할 수 있습니다. 자세한 내용은 aws:SourceVpc를 참조하세요.
Apache Airflow REST API호출
다음 샘플 스크립트에서는 Apache Airflow API REST를 사용하여 환경에서 사용 가능한 DAGs를 나열하는 방법과 Apache Airflow 변수를 생성하는 방법을 다룹니다.
import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)
웹 서버 세션 토큰 생성 및 Apache Airflow API REST 호출
웹 서버 액세스 토큰을 생성하려면 다음 Python 함수를 사용합니다. 이 함수는 먼저 Amazon API MWAA를 호출하여 웹 로그인 토큰을 가져옵니다. 60초 후에 만료되는 웹 로그인 토큰은 웹 세션 토큰으로 교환되며, 이를 통해 웹 서버에 액세스하고 Apache Airflow RESTAPI를 사용할 수 있습니다. 초당 10개 이상의 트랜잭션(TPS) 제한 용량이 필요한 경우이 방법을 사용하여 Apache Airflow API REST Word에 액세스할 수 있습니다.
참고
세션 토큰은 12시간 후에 만료됩니다.
def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
인증이 완료되면 API 엔드포인트로 요청을 전송하기 시작할 수 있는 보안 인증 정보가 제공됩니다. 아래 예제에서 /dags/
엔드포인트를 사용합니다.dag_id
/dag
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)