Amazon RDS for Microsoft SQL Server와 함께 Amazon MWAA 사용 - Amazon Managed Workflows for Apache Airflow

Amazon RDS for Microsoft SQL Server와 함께 Amazon MWAA 사용

Amazon Managed Workflows for Apache Airflow를 사용하여 RDS for SQL Server에 연결할 수 있습니다. 다음 샘플 코드는 Amazon Managed Workflows for Apache Airflow 환경에서 DAG를 사용하여 Amazon RDS for Microsoft SQL Server 서버에 연결하고 쿼리를 실행합니다.

버전

  • 이 페이지의 샘플 코드는 Python 3.7Apache Airflow v1과 함께 사용할 수 있습니다.

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

사전 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

  • Amazon MWAA 환경.

  • Amazon MWAA와 RDS for SQL Server 는 동일한 Amazon VPC에서 실행/

  • Amazon MWAA 및 서버의 VPC 보안 그룹은 다음과 같은 연결로 구성됩니다.

    • Amazon MWAA 보안 그룹의 Amazon RDS에 대해 열려 있는 포트 1433에 대한 인바운드 규칙

    • 또는 Amazon MWAA에서 RDS로 열려 있는 1433 포트에 대한 아웃바운드 규칙

  • RDS for SQL Server에 대한 Apache Airflow 연결에는 이전 프로세스에서 만든 Amazon RDS SQL 서버 데이터베이스의 호스트 이름, 포트, 사용자 이름 및 암호가 반영됩니다.

의존성

이 섹션의 샘플 코드를 사용하려면 다음 종속성을 사용자 requirements.txt에 추가합니다. 자세한 내용은 Python 종속성 설치 섹션을 참조하십시오.

Apache Airflow v2
apache-airflow-providers-microsoft-mssql==1.0.1 apache-airflow-providers-odbc==1.0.1 pymssql==2.2.1
Apache Airflow v1
apache-airflow[mssql]==1.10.12

Apache Airflow v2 연결

Apache Airflow v2에서 연결을 사용하는 경우 Airflow 연결 객체에 다음과 같은 키-값 페어가 포함되어 있는지 확인합니다.

  1. 연결 ID: mssql_default

  2. 연결 유형: Amazon Web Services

  3. 호스트: YOUR_DB_HOST

  4. 스키마:

  5. 로그인: 관리자

  6. 암호:

  7. 포트: 1433

  8. 추가:

코드 샘플

  1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하고 로컬에서 sql-server.py로 저장합니다.

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import pymssql import logging import sys from airflow import DAG from datetime import datetime from airflow.operators.mssql_operator import MsSqlOperator from airflow.operators.python_operator import PythonOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'mssql_conn_example', default_args=default_args, schedule_interval=None) drop_db = MsSqlOperator( task_id="drop_db", sql="DROP DATABASE IF EXISTS testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_db = MsSqlOperator( task_id="create_db", sql="create database testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_table = MsSqlOperator( task_id="create_table", sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) insert_into_table = MsSqlOperator( task_id="insert_into_table", sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) def select_pet(**kwargs): try: conn = pymssql.connect( server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com', user='admin', password='<yoursupersecretpassword>', database='testdb' ) # Create a cursor from the connection cursor = conn.cursor() cursor.execute("SELECT * from testdb.dbo.pet") row = cursor.fetchone() if row: print(row) except: logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0]) select_query = PythonOperator( task_id='select_query', python_callable=select_pet, dag=dag, ) drop_db >> create_db >> create_table >> insert_into_table >> select_query

다음 단계

  • 이 예제의 requirements.txt 파일을 Python 종속성 설치의 Amazon S3 버킷에 업로드하는 방법을 알아봅니다.

  • 이 예제의 DAG 코드를 DAG 추가 또는 업데이트에서 Amazon S3 버킷의 dags 폴더에 업로드하는 방법을 알아봅니다.

  • 예제 스크립트와 기타 pymssql 모듈 예제를 살펴봅니다.

  • Apache Airflow 참조 가이드에서 mssql_operator를 사용하여 특정 Microsoft SQL 데이터베이스에서 SQL 코드를 실행하는 방법에 대해 자세히 알아봅니다.