使用 AWS Secrets Manager 中的密钥进行 Apache Airflow 连接 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 AWS Secrets Manager 中的密钥进行 Apache Airflow 连接

以下示例调用 AWS Secrets Manager 在 Amazon MWAA 上获取 Apache Airflow 连接的密钥。它假设您已完成 使用密钥配置 Apache Airflow 连接 AWS Secrets Manager 中的步骤。

版本

  • 本页上的示例代码可与 Python 3.7 中的 Apache Airflow v1一起使用。

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

要求

代码示例

以下步骤描述了如何创建 DAG 代码,以便调用 Secrets Manager 来获取密钥。

Apache Airflow v2
  1. 在命令提示符下,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 secrets-manager.py

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG, settings, secrets from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook from datetime import timedelta import os ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html sm_secretId_name = 'airflow/connections/myconn' default_args = { 'owner': 'airflow', 'start_date': days_ago(1), 'depends_on_past': False } ### Gets the secret myconn from Secrets Manager def read_from_aws_sm_fn(**kwargs): ### set up Secrets Manager hook = AwsBaseHook(client_type='secretsmanager') client = hook.get_client_type('secretsmanager') response = client.get_secret_value(SecretId=sm_secretId_name) myConnSecretString = response["SecretString"] return myConnSecretString ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval=None ) as dag: write_all_to_aws_sm = PythonOperator( task_id="read_from_aws_sm", python_callable=read_from_aws_sm_fn, provide_context=True )
Apache Airflow v1
  1. 在命令提示符下,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 secrets-manager.py

    from airflow import DAG, settings, secrets from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from airflow.contrib.hooks.aws_hook import AwsHook from datetime import timedelta import os ### The steps to create this secret key can be found at: https://docs.aws.amazon.com/mwaa/latest/userguide/connections-secrets-manager.html sm_secretId_name = 'airflow/connections/myconn' default_args = { 'owner': 'airflow', 'start_date': days_ago(1), 'depends_on_past': False } ### Gets the secret myconn from Secrets Manager def read_from_aws_sm_fn(**kwargs): ### set up Secrets Manager hook = AwsHook() client = hook.get_client_type('secretsmanager') response = client.get_secret_value(SecretId=sm_secretId_name) myConnSecretString = response["SecretString"] return myConnSecretString ### 'os.path.basename(__file__).replace(".py", "")' uses the file name secrets-manager.py for a DAG ID of secrets-manager with DAG( dag_id=os.path.basename(__file__).replace(".py", ""), default_args=default_args, dagrun_timeout=timedelta(hours=2), start_date=days_ago(1), schedule_interval=None ) as dag: write_all_to_aws_sm = PythonOperator( task_id="read_from_aws_sm", python_callable=read_from_aws_sm_fn, provide_context=True )

接下来做什么?

  • 要了解如何将本示例中的 DAGD 代码上传到 Amazon S3 存储桶的 dags 文件夹,请参阅 添加或更新 DAG