마이크로소프트 SQL 서버를 RDS 위해 MWAA 아마존과 아마존을 함께 사용하기 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

마이크로소프트 SQL 서버를 RDS 위해 MWAA 아마존과 아마존을 함께 사용하기

Apache Airflow용 Amazon 관리형 워크플로를 사용하여 RDS서버용 SQL 서버에 연결할 수 있습니다. 다음 샘플 코드는 Apache Airflow용 Amazon Managed Workflow 환경에서 RDS Microsoft SQL Server용 Amazon에 연결하고 쿼리를 실행하는 데 사용됩니다DAGs.

버전

  • 이 페이지의 샘플 코드는 Python 3.7Apache Airflow v1과 함께 사용할 수 있습니다.

  • 이 페이지의 코드 예제를 Python 3.10의 아파치 에어플로우 v2와 함께 사용할 수 있습니다.

사전 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

  • 아마존 MWAA 환경.

  • MWAAAmazon과 RDS SQL 서버용 서버가 동일한 Amazon에서 실행되고 있습니다VPC/

  • VPCAmazon MWAA 및 서버의 보안 그룹은 다음과 같은 연결로 구성됩니다.

    • Amazon 보안 RDS 그룹에서 Amazon에 1433 개방된 포트에 대한 인바운드 규칙 MWAA

    • 또는 MWAA Amazon에서 다음으로 1433 열리는 포트에 대한 아웃바운드 규칙 RDS

  • RDSSQL서버용 Apache Airflow 연결은 이전 프로세스에서 만든 Amazon RDS SQL 서버 데이터베이스의 호스트 이름, 포트, 사용자 이름 및 암호를 반영합니다.

의존성

이 섹션의 샘플 코드를 사용하려면 다음 종속성을 사용자 requirements.txt에 추가합니다. 자세한 내용은 Python 종속성 설치 섹션을 참조하십시오.

Apache Airflow v2
apache-airflow-providers-microsoft-mssql==1.0.1 apache-airflow-providers-odbc==1.0.1 pymssql==2.2.1
Apache Airflow v1
apache-airflow[mssql]==1.10.12

Apache Airflow v2 연결

Apache Airflow v2에서 연결을 사용하는 경우 Airflow 연결 객체에 다음과 같은 키-값 페어가 포함되어 있는지 확인합니다.

  1. 연결 ID: mssql_default

  2. 연결 유형: Amazon Web Services

  3. 호스트: YOUR_DB_HOST

  4. 스키마:

  5. 로그인: 관리자

  6. 암호:

  7. 포트: 1433

  8. 추가:

코드 샘플

  1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하고 로컬에서 sql-server.py로 저장합니다.

    """ Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ import pymssql import logging import sys from airflow import DAG from datetime import datetime from airflow.operators.mssql_operator import MsSqlOperator from airflow.operators.python_operator import PythonOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'mssql_conn_example', default_args=default_args, schedule_interval=None) drop_db = MsSqlOperator( task_id="drop_db", sql="DROP DATABASE IF EXISTS testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_db = MsSqlOperator( task_id="create_db", sql="create database testdb;", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) create_table = MsSqlOperator( task_id="create_table", sql="CREATE TABLE testdb.dbo.pet (name VARCHAR(20), owner VARCHAR(20));", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) insert_into_table = MsSqlOperator( task_id="insert_into_table", sql="INSERT INTO testdb.dbo.pet VALUES ('Olaf', 'Disney');", mssql_conn_id="mssql_default", autocommit=True, dag=dag ) def select_pet(**kwargs): try: conn = pymssql.connect( server='sampledb.<xxxxxx>.<region>.rds.amazonaws.com', user='admin', password='<yoursupersecretpassword>', database='testdb' ) # Create a cursor from the connection cursor = conn.cursor() cursor.execute("SELECT * from testdb.dbo.pet") row = cursor.fetchone() if row: print(row) except: logging.error("Error when creating pymssql database connection: %s", sys.exc_info()[0]) select_query = PythonOperator( task_id='select_query', python_callable=select_pet, dag=dag, ) drop_db >> create_db >> create_table >> insert_into_table >> select_query

다음 단계

  • 이 예제의 requirements.txt 파일을 Python 종속성 설치의 Amazon S3 버킷에 업로드하는 방법을 알아봅니다.

  • 이 예제의 DAG 코드를 Amazon S3 버킷의 dags 폴더에 업로드하는 방법을 알아봅니다DAG 추가 또는 업데이트.

  • 예제 스크립트와 기타 pymssql 모듈 예제를 살펴봅니다.

  • Apache Airflow 참조 안내서에서 SQL mssql_operator를 사용하여 특정 Microsoft 데이터베이스에서 SQL 코드를 실행하는 방법에 대해 자세히 알아보십시오.