使用 AWS Secrets Manager 中的密钥进行 Apache Airflow Snowflake 连接 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 AWS Secrets Manager 中的密钥进行 Apache Airflow Snowflake 连接

以下示例调用 AWS Secrets Manager 在 Amazon MWAA 上获取 Apache Airflow Snowflake 连接的密钥。它假设您已完成 使用密钥配置 Apache Airflow 连接 AWS Secrets Manager 中的步骤。

版本

  • 您可以将本页上的代码示例与 Python 3.10 中的 Apache Airflow v2 及更高版本一起使用。

先决条件

要使用本页上的示例代码,您需要以下内容:

权限

要求

要使用本页上的示例代码,请将以下依赖项添加到 requirements.txt。要了解更多信息,请参阅 安装 Python 依赖项

apache-airflow-providers-snowflake==1.3.0

代码示例

以下步骤描述了如何创建 DAG 代码,以便调用 Secrets Manager 来获取密钥。

  1. 在命令提示符下,导航到存储 DAG 代码的目录。例如:

    cd dags
  2. 复制以下代码示例的内容,并在本地另存为 snowflake_connection.py

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator from airflow.utils.dates import days_ago snowflake_query = [ """use warehouse "MY_WAREHOUSE";""", """select * from "SNOWFLAKE_SAMPLE_DATA"."WEATHER"."WEATHER_14_TOTAL" limit 100;""", ] with DAG(dag_id='snowflake_test', schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: snowflake_select = SnowflakeOperator( task_id="snowflake_select", sql=snowflake_query, snowflake_conn_id="snowflake_conn", )

接下来做什么?

  • 要了解如何将本示例中的 DAGD 代码上传到 Amazon S3 存储桶的 dags 文件夹,请参阅 添加或更新 DAG