SSHOperator을(를) 사용하여 SSH 연결 생성 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

SSHOperator을(를) 사용하여 SSH 연결 생성

다음 예제는 DAG(유방향 비순환 그래프)에서 SSHOperator을(를) 사용하여 Amazon Managed Workflows for Apache Airflow 환경에서 원격 Amazon EC2 인스턴스에 연결하는 방법을 설명합니다. 비슷한 접근 방식을 사용하여 SSH 액세스가 있는 모든 원격 인스턴스에 연결할 수 있습니다.

다음 예제에서는 Amazon S3에 있는 사용자 환경의 dags 디렉터리에 SSH 암호 키(.pem)를 업로드합니다. 그런 다음 requirements.txt을(를) 사용하여 필요한 종속성을 설치하고 UI에 새 Apache Airflow 연결을 생성합니다. 마지막으로 원격 인스턴스에 대한 SSH 연결을 생성하는 DAG를 작성합니다.

버전

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

사전 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

  • Amazon MWAA 환경.

  • SSH 암호 키. 코드 샘플은 Amazon MWAA 환경과 동일한 리전에 Amazon EC2 인스턴스와 .pem이(가) 있다고 가정합니다. 키가 없는 경우 Amazon EC2 사용 설명서의 키 쌍 생성 또는 가져오기를 참조하십시오.

권한

  • 이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

요구 사항

웹 서버에 apache-airflow-providers-ssh 패키지를 설치하려면 다음 파라미터를 requirements.txt에 추가합니다. 환경이 업데이트되고 Amazon MWAA가 종속성을 성공적으로 설치하면 UI에 새 SSH 연결 유형이 표시됩니다.

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
참고

-crequirements.txt의 제약 조건 URL을 정의합니다. 이렇게 하면 Amazon MWAA가 사용자 환경에 맞는 올바른 패키지 버전을 설치할 수 있습니다.

암호 키를 Amazon S3에 복사

다음 AWS Command Line Interface 명령을 사용하여 Amazon S3의 환경 dags 디렉터리에 .pem 키를 복사합니다.

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA는 .pem 키를 포함해 dags의 콘텐츠를 로컬 /usr/local/airflow/dags/ 디렉터리에 복사합니다. 이렇게 하면 Apache Airflow가 키에 액세스할 수 있습니다.

새 Apache Airflow 연결 생성

Apache Airflow UI를 사용하여 새 SSH 연결을 생성하려면
  1. Amazon MWAA 콘솔에서 환경 페이지를 엽니다.

  2. 환경 목록에서 사용자 환경에 맞는 Airflow UI 열기를 선택합니다.

  3. Apache Airflow UI 페이지의 상단 내비게이션 바에서 관리자를 선택하여 드롭다운 목록을 확장한 다음 연결을 선택합니다.

  4. 연결 목록 페이지에서 +를 선택하거나 새 레코드 추가 버튼을 선택하여 새 연결을 추가합니다.

  5. 연결 추가 페이지에서 다음 정보를 추가합니다.

    1. 연결 IDssh_new를 입력합니다.

    2. 연결 유형의 경우 드롭다운 목록에서 SSH를 선택합니다.

      참고

      목록에 SSH 연결 유형이 없는 경우 Amazon MWAA가 필요한 apache-airflow-providers-ssh 패키지를 설치하지 않은 것입니다. 이 패키지를 포함하도록 requirements.txt 파일을 업데이트한 다음 다시 시도하십시오.

    3. Host에 연결하려는 Amazon EC2 인스턴스의 IP 주소를 입력합니다. 예: 12.345.67.89.

    4. Amazon EC2 인스턴스에 연결 중인 경우 사용자 이름ec2-user을(를) 입력합니다. Apache Airflow를 연결하려는 원격 인스턴스의 유형에 따라 사용자 이름이 달라질 수 있습니다.

    5. Extra에는 다음 키-값 쌍을 JSON 형식으로 입력합니다.

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      이 키-값 쌍은 Apache Airflow가 로컬 /dags 디렉터리에서 암호 키를 찾도록 지시합니다.

코드 샘플

다음 DAG는 SSHOperator을(를) 사용하여 대상 Amazon EC2 인스턴스에 연결한 다음, hostname Linux 명령을 실행하여 인스턴스 이름을 인쇄합니다. 원격 인스턴스에서 모든 명령 또는 스크립트를 실행하도록 DAG를 수정할 수 있습니다.

  1. 터미널을 열고 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하고 로컬에서 ssh.py로 저장합니다.

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 성공하면 ssh_operator_example DAG의 ssh_task에 대한 작업 로그에 다음과 비슷한 출력이 표시됩니다.

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916