Apache Airflow 연결을 위한 AWS Secrets Manager의 암호 키 사용 - Amazon Managed Workflows for Apache Airflow

Apache Airflow 연결을 위한 AWS Secrets Manager의 암호 키 사용

다음 샘플에서는 Amazon Managed Workflows for Apache Airflow에서 Apache Airflow 연결을 위한 암호 키를 가져오기 위해 AWS Secrets Manager를 직접적으로 호출합니다. AWS Secrets Manager 암호를 사용하여 Apache Airflow 연결 구성의 단계를 완료했다고 가정합니다.

버전

  • 이 페이지의 샘플 코드는 Python 3.7Apache Airflow v1과 함께 사용할 수 있습니다.

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

사전 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

권한

요구 사항

  • 이 코드 예제를 Apache Airflow v1과 함께 사용하려면 추가 종속성이 필요하지 않습니다. 코드는 사용자 환경에 설치된 Apache Airflow v1 기본 설치를 사용합니다.

  • 이 코드 예제를 Apache Airflow v2와 함께 사용하려면 추가 종속성이 필요하지 않습니다. 코드는 사용자 환경에 설치된 Apache Airflow v2 기본 설치를 사용합니다.

코드 샘플

다음 단계는 Secrets Manager를 호출하여 암호를 가져오는 DAG 코드를 만드는 방법을 설명합니다.

Apache Airflow v2
  1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하고 로컬에서 secrets-manager.py로 저장합니다.

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG, settings, secrets from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from datetime import timedelta import os ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html sm_secretId_name = 'airflow/connections/myconn' default_args = { 'owner': 'airflow', 'start_date': days_ago(1), 'depends_on_past': False } ### Gets the secret myconn from Secrets Manager def read_from_aws_sm_fn(**kwargs): ### set up Secrets Manager hook = AwsBaseHook(client_type='secretsmanager') client = hook.get_client_type(region_name='us-east-1') response = client.get_secret_value(SecretId=sm_secretId_name) myConnSecretString = response["SecretString"] return myConnSecretString ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval=None ) as dag: write_all_to_aws_sm = PythonOperator( task_id="read_from_aws_sm", python_callable=read_from_aws_sm_fn, provide_context=True )
Apache Airflow v1
  1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하고 로컬에서 secrets-manager.py로 저장합니다.

    from airflow import DAG, settings, secrets from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow.contrib.hooks.aws_hook import AwsHook from datetime import timedelta import os ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html sm_secretId_name = 'airflow/connections/myconn' default_args = { 'owner': 'airflow', 'start_date': days_ago(1), 'depends_on_past': False } ### Gets the secret myconn from Secrets Manager def read_from_aws_sm_fn(**kwargs): ### set up Secrets Manager hook = AwsHook() client = hook.get_client_type('secretsmanager') response = client.get_secret_value(SecretId=sm_secretId_name) myConnSecretString = response["SecretString"] return myConnSecretString ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval=None ) as dag: write_all_to_aws_sm = PythonOperator( task_id="read_from_aws_sm", python_callable=read_from_aws_sm_fn, provide_context=True )

다음 단계