SSHOperator を使用して SSH 接続の作成 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SSHOperator を使用して SSH 接続の作成

次の例では、Amazon Managed Workflows for Apache Airflow 環境からリモートの Amazon EC2 インスタンスに接続するために SSHOperator をどのように使用できるかを示しています。同様の方法で、SSH アクセスを持つ任意のリモートインスタンスに接続できます。

次の例では、SSH シークレットキー (.pem)を Amazon S3 の環境の dags ディレクトリにアップロードします。次に、requirements.txt を使用して必要な依存関係をインストールし、UI で新しい Apache Airflow 接続を作成します。最後に、リモートインスタンスへの SSH 接続を作成する DAG を作成します。

Version

  • このページのコード例は、「Python 3.10」で Apache Airflow v2 以上と共に使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

アクセス許可

  • このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

要件

次のパラメータを requirements.txt に追加して、ウェブサーバーに apache-airflow-providers-ssh パッケージをインストールしてください。環境が更新され、Amazon MWAA が依存関係を正常にインストールすると、UI に新しい [SSH] 接続タイプが表示されます。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注記

-crequirements.txt 内での制約 URL を定義します。これにより、Amazon MAA はお客様の環境に合った正しいパッケージバージョンをインストールできます。

シークレットキーを Amazon S3 にコピーする

次の AWS Command Line Interface コマンドを使用して、.pemキーを Amazon S3 の環境のdagsディレクトリにコピーします。

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA は、.pem キーを含む dags のコンテンツをローカルの /usr/local/airflow/dags/ ディレクトリにコピーすることで、Apache Airflow はキーにアクセスできます。

新しい Apache Airflow 接続の作成

Apache Airflow UI を使用して新しい SSH 接続を作成するには
  1. Amazon MWAA コンソールで「環境ページ」を開きます。

  2. 環境のリストで、ご使用の環境に合った [Open Airflow UI] を選択します。

  3. Apache Airflow UI ページで、上部のナビゲーションバーから [管理] を選択してドロップダウンリストを展開し、[接続] を選択します。

  4. [接続リスト] ページで [+] を選択するか、[新規レコードを追加] ボタンをクリックして新しい接続を追加します。

  5. [接続の追加] ページで、以下の情報を追加します。

    1. [接続 ID] には ssh_new と入力します。

    2. [接続タイプ] では、ドロップダウンリストから [SSH] を選択します。

      注記

      [SSH] 接続タイプがリストに表示されない場合、Amazon MWAA は必要な apache-airflow-providers-ssh パッケージをインストールしていない可能性があります。このパッケージを含むように requirements.txt ファイルを更新してから、もう一度試してください。

    3. [ホスト] には、接続する Amazon EC2 インスタンスの IP アドレスを入力します。例えば 12.345.67.89 です。

    4. [ユーザー名] に、Amazon EC2 インスタンスに接続する場合は ec2-user を入力します。Apache Airflow に接続させたいリモートインスタンスのタイプによって、ユーザー名は異なる場合があります。

    5. [抽出] には、以下のキーと値のペアを JSON 形式で入力します。

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      このキーと値のペアは、Apache Airflow に対してシークレットキーをローカルの /dags ディレクトリから探すように指示します。

コードサンプル

次の DAG は SSHOperator を使用してターゲットの Amazon EC2 インスタンスに接続し、その後 hostname Linux コマンドを実行してインスタンスの名前を表示します。DAG を変更して、リモートインスタンスで任意のコマンドまたはスクリプトを実行できます。

  1. ターミナルを開き、DAG コードが保存されているディレクトリに移動します。例:

    cd dags
  2. 以下のコードサンプルの内容をコピーし、ローカルに ssh.py として保存します。

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 次の AWS CLI コマンドを実行して DAG を環境のバケットにコピーし、Apache Airflow UI を使用して DAG をトリガーします。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 成功した場合、ssh_operator_example DAG 内の ssh_task タスクログで次のような出力が表示されます。

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916