使用 Amazon MWAA 與 Amazon Microsoft RDS 服SQL務器 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon MWAA 與 Amazon Microsoft RDS 服SQL務器

您可以使用適用於 Apache 氣流的 Amazon 受管工作流程來連接RDS到SQL伺服器。下列範例程式碼會DAGs在 Apache 氣流環境的 Amazon 受管工作流程上使用,以連接到 Microsoft SQL 伺服器RDS的 Amazon 上執行查詢。

版本

  • 此頁面上的示例代碼可以與 Python 3.7 中的阿帕奇氣流 V1 一起使用。

  • 您可以使用此頁面上的代碼示例與 Python 3.10 中的阿帕奇氣流 V2

必要條件

若要使用此頁面上的範例程式碼,您需要下列項目:

  • Amazon 的MWAA環境

  • Amazon MWAA 和RDS用於SQL服務器運行在同一個 AmazonVPC/

  • VPCAmazon MWAA 和伺服器的安全群組使用下列連線進行設定:

    • Amazon 安全組中為 Amazon 1433 RDS 開放的端口MWAA的入站規則

    • 或者從 Amazon MWAA 到1433開放港口的出站規則 RDS

  • 適用RDS於SQL伺服器的 Apache 氣流連線會反映先前程序建立之 Amazon RDS SQL 伺服器資料庫的主機名稱、連接埠、使用者名稱和密碼。

相依性

若要使用本節中的範例程式碼,請將下列相依性新增至您的requirements.txt. 如需進一步了解,請參閱 安裝 Python 賴項

Apache Airflow v2
apache-airflow-providers-microsoft-mssql==1.0.1 apache-airflow-providers-odbc==1.0.1 pymssql==2.2.1
Apache Airflow v1
apache-airflow[mssql]==1.10.12

阿帕奇氣流 V2 連接

如果您在 Apache Airflow v2 中使用連線,請確定氣流連線物件包含下列索引鍵值組:

  1. 連接埠識別碼:預

  2. 連絡人類型:Amazon Web Services

  3. 主持人:YOUR_DB_HOST

  4. 綱要:

  5. 登錄:管理員

  6. 密碼:

  7. 連接埠:

  8. 額外:

範例程式碼

  1. 在命令提示符中,導航到存儲DAG代碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並在本機儲存為sql-server.py

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import pymssql import logging import sys from airflow import DAG from datetime import datetime from airflow.operators.mssql_operator import MsSqlOperator from airflow.operators.python_operator import PythonOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'mssql_conn_example', default_args=default_args, schedule_interval=None) drop_db = MsSqlOperator( task_id="drop_db", sql="DROP DATABASE IF EXISTS testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_db = MsSqlOperator( task_id="create_db", sql="create database testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_table = MsSqlOperator( task_id="create_table", sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) insert_into_table = MsSqlOperator( task_id="insert_into_table", sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) def select_pet(**kwargs): try: conn = pymssql.connect( server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com', user='admin', password='<yoursupersecretpassword>', database='testdb' ) # Create a cursor from the connection cursor = conn.cursor() cursor.execute("SELECT * from testdb.dbo.pet") row = cursor.fetchone() if row: print(row) except: logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0]) select_query = PythonOperator( task_id='select_query', python_callable=select_pet, dag=dag, ) drop_db >> create_db >> create_table >> insert_into_table >> select_query

後續步驟?

  • 在中了解如何將此範例中的requirements.txt檔案上傳到您的 Amazon S3 儲存貯體安裝 Python 賴項

  • 了解如何將此範例中的DAG程式碼上傳到的 Amazon S3 儲存貯體中的dags資料夾新增或更新 DAG

  • 探索範例指令碼和其他 pymssql 模組範例。

  • 請參閱 Apache 氣流參考指南,進一步了解如何使用 mssql_operator 在特定的 Microsoft SQL 資料庫中執行SQL程式碼。