本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用建立 SSH 連線 SSHOperator
以下範例說明如何使用定向非循環圖 (DAG) SSHOperator
中的,從適用於 Apache 氣流環境的 Amazon 受管工作流程連接到遠端 Amazon EC2 執行個體。您可以使用類似的方法連接到任何具有 SSH 訪問權限的遠程實例。
在下列範例中,您將 SSH 秘密金鑰 (.pem
) 上傳到 Amazon S3 上環境的dags
目錄。然後,您可以使用安裝必要的依賴關係,requirements.txt
並在 UI 中創建一個新的 Apache 氣流連接。最後,您撰寫的 DAG 會建立與遠端執行個體的 SSH 連線。
版本
-
您可以使用此頁面上的代碼示例與 Apache 氣流 v2 及更高版本在 Python 3.10
中。
必要條件
若要使用此頁面上的範例程式碼,您需要下列項目:
-
一個安全殼層密鑰。程式碼範例假設您有一個 Amazon EC2 執行個體,以及與 Amazon MWAA 環境
.pem
位於相同區域中的執行個體。如果您沒有金鑰,請參閱 Amazon EC2 使用者指南中的建立或匯入 key pair。
許可
-
使用此頁面上的程式碼範例不需要其他權限。
要求
添加以下參數requirements.txt
以在 Web 服務器上安裝apache-airflow-providers-ssh
軟件包。一旦您的環境更新並且 Amazon MWAA 成功安裝相依性,您就會在使用者介面中看到新的 SSH 連線類型。
-c https://raw.githubusercontent.com/apache/airflow/constraints-
Airflow-version
/constraints-Python-version
.txt apache-airflow-providers-ssh
注意
-c
定義中的條件約束 URL requirements.txt
。如此可確保 Amazon MWAA 為您的環境安裝正確的套件版本。
將您的密鑰複製到 Amazon S3
使用下列 AWS Command Line Interface 命令將.pem
金鑰複製到 Amazon S3 中環境的dags
目錄。
$
aws s3 cp
your-secret-key
.pem s3://your-bucket
/dags/
Amazon MWAA 將中dags
的內容 (包括.pem
金鑰) 複製到本機/usr/local/airflow/dags/
目錄,這樣一來,Apache 氣流可以存取金鑰。
創建一個新的 Apache 氣流連接
若要使用 Apache 氣流使用者介面建立新的 SSH 連線
-
在 Amazon MWAA 主控台上開啟「環境」頁面
。 -
從環境清單中,為您的環境選擇開啟 Airflow UI。
-
在 Apache Airflow UI 頁面上,從頂端導覽列選擇 [管理員] 以展開下拉式清單,然後選擇 [連線]。
-
在 [列出連線] 頁面上,選擇 [+] 或 [新增記錄] 按鈕以新增連線。
-
在 [新增連線] 頁面上,新增下列資訊:
-
對於「連線 ID」,輸入
ssh_new
。 -
在 [連線類型] 中,從下拉式清單中選擇 [SSH]。
注意
如果清單中沒有 SSH 連線類型,表示 Amazon MWAA 尚未安裝所需
apache-airflow-providers-ssh
的套件。請更新您的requirements.txt
檔案以包含此套件,然後再試一次。 -
對於主機,請輸入要連接到的 Amazon EC2 執行個體的 IP 地址。例如
12.345.67.89
。 -
對於使用者名稱,請輸入是
ec2-user
否要連線到 Amazon EC2 執行個體。您的使用者名稱可能會有所不同,視您希望 Apache Airflow 連線的遠端執行個體類型而定。 -
在「額外」中,輸入以下 JSON 格式的機碼值組:
{ "key_file": "/usr/local/airflow/dags/
your-secret-key
.pem" }此機碼值配對會指示 Apache 氣流在本機
/dags
目錄中尋找秘密金鑰。
-
範例程式碼
下面的 DAG 使用連接SSHOperator
到您的目標 Amazon EC2 實例,然後運行 hostname
Linux 命令來打印安裝的名稱。您可以修改 DAG 以在遠端執行個體上執行任何命令或指令碼。
-
開啟終端機,然後瀏覽至儲存 DAG 程式碼的目錄。例如:
cd dags
-
複製下列程式碼範例的內容,並在本機儲存為
ssh.py
。from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
-
執行下列 AWS CLI 命令,將 DAG 複製到您環境的儲存貯體,然後使用 Apache 氣流 UI 觸發 DAG。
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
如果成功,您會在
ssh_operator_example
DAG 中的工作記錄中看到類似下列的ssh_task
輸出:[2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None, Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/
your-secret-key
.pem'} [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4) [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful! [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916