使用建立 SSH 連線 SSHOperator - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用建立 SSH 連線 SSHOperator

以下範例說明如何使用定向非循環圖 (DAG) SSHOperator 中的,從適用於 Apache 氣流環境的 Amazon 受管工作流程連接到遠端 Amazon EC2 執行個體。您可以使用類似的方法連接到任何具有 SSH 訪問權限的遠程實例。

在下列範例中,您將 SSH 秘密金鑰 (.pem) 上傳到 Amazon S3 上環境的dags目錄。然後,您可以使用安裝必要的依賴關係,requirements.txt並在 UI 中創建一個新的 Apache 氣流連接。最後,您撰寫的 DAG 會建立與遠端執行個體的 SSH 連線。

版本

  • 您可以使用此頁面上的代碼示例與 Apache 氣流 v2 及更高版本在 Python 3.10 中。

必要條件

若要使用此頁面上的範例程式碼,您需要下列項目:

  • Amazon MWAA 環境。

  • 一個安全殼層密鑰。程式碼範例假設您有一個 Amazon EC2 執行個體,以及與 Amazon MWAA 環境.pem位於相同區域中的執行個體。如果您沒有金鑰,請參閱 Amazon EC2 使用者指南中的建立或匯入 key pair

許可

  • 使用此頁面上的程式碼範例不需要其他權限。

要求

添加以下參數requirements.txt以在 Web 服務器上安裝apache-airflow-providers-ssh軟件包。一旦您的環境更新並且 Amazon MWAA 成功安裝相依性,您就會在使用者介面中看到新的 SSH 連線類型。

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
注意

-c定義中的條件約束 URL requirements.txt。如此可確保 Amazon MWAA 為您的環境安裝正確的套件版本。

將您的密鑰複製到 Amazon S3

使用下列 AWS Command Line Interface 命令將.pem金鑰複製到 Amazon S3 中環境的dags目錄。

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

Amazon MWAA 將中dags的內容 (包括.pem金鑰) 複製到本機/usr/local/airflow/dags/目錄,這樣一來,Apache 氣流可以存取金鑰。

創建一個新的 Apache 氣流連接

若要使用 Apache 氣流使用者介面建立新的 SSH 連線
  1. 在 Amazon MWAA 主控台上開啟「環境」頁面

  2. 從環境清單中,為您的環境選擇開啟 Airflow UI

  3. 在 Apache Airflow UI 頁面上,從頂端導覽列選擇 [管理員] 以展開下拉式清單,然後選擇 [連線]。

  4. 在 [列出連線] 頁面上,選擇 [+] 或 [新增記錄] 按鈕以新增連線。

  5. 在 [新增連線] 頁面上,新增下列資訊:

    1. 對於「連線 ID」,輸入ssh_new

    2. 在 [連線類型] 中,從下拉式清單中選擇 [SSH]。

      注意

      如果清單中沒有 SSH 連線類型,表示 Amazon MWAA 尚未安裝所需apache-airflow-providers-ssh的套件。請更新您的requirements.txt檔案以包含此套件,然後再試一次。

    3. 對於主機,請輸入要連接到的 Amazon EC2 執行個體的 IP 地址。例如 12.345.67.89

    4. 對於使用者名稱,請輸入是ec2-user否要連線到 Amazon EC2 執行個體。您的使用者名稱可能會有所不同,視您希望 Apache Airflow 連線的遠端執行個體類型而定。

    5. 在「額外」中,輸入以下 JSON 格式的機碼值組:

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      此機碼值配對會指示 Apache 氣流在本機/dags目錄中尋找秘密金鑰。

範例程式碼

下面的 DAG 使用連接SSHOperator到您的目標 Amazon EC2 實例,然後運行 hostname Linux 命令來打印安裝的名稱。您可以修改 DAG 以在遠端執行個體上執行任何命令或指令碼。

  1. 開啟終端機,然後瀏覽至儲存 DAG 程式碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並在本機儲存為ssh.py

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. 執行下列 AWS CLI 命令,將 DAG 複製到您環境的儲存貯體,然後使用 Apache 氣流 UI 觸發 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您會在 ssh_operator_example DAG 中的工作記錄中看到類似下列的ssh_task輸出:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916