Criando uma SSH conexão usando o SSHOperator - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Criando uma SSH conexão usando o SSHOperator

O exemplo a seguir descreve como você pode usar o SSHOperator em um gráfico acíclico direcionado (DAG) para se conectar a uma EC2 instância remota da Amazon a partir do seu ambiente Amazon Managed Workflows for Apache Airflow. Você pode usar uma abordagem semelhante para se conectar a qualquer instância remota com SSH acesso.

No exemplo a seguir, você carrega uma chave SSH secreta (.pem) para o dags diretório do seu ambiente no Amazon S3. Em seguida, você instala as dependências necessárias usando requirements.txt e cria uma nova conexão do Apache Airflow na interface do usuário. Por fim, você escreve um DAG que cria uma SSH conexão com a instância remota.

Version (Versão)

Pré-requisitos

Para usar o código de amostra nesta página, você precisará do seguinte:

  • Um MWAAambiente da Amazon.

  • Uma chave SSH secreta. A amostra de código pressupõe que você tenha uma EC2 instância da Amazon e uma .pem na mesma região do seu MWAA ambiente da Amazon. Se você não tiver uma chave, consulte Criar ou importar um par de chaves no Guia do EC2 usuário da Amazon.

Permissões

  • Nenhuma permissão adicional é necessária para usar o exemplo de código nesta página.

Requisitos

Adicione o parâmetro a seguir para requirements.txt para instalar o pacote apache-airflow-providers-ssh no servidor web. Depois que seu ambiente for atualizado e a Amazon instalar a dependência MWAA com sucesso, você verá um novo tipo de SSHconexão na interface do usuário.

-c https://raw.githubusercontent.com/apache/airflow/constraints-Airflow-version/constraints-Python-version.txt apache-airflow-providers-ssh
nota

-cdefine as restrições em. URL requirements.txt Isso garante que a Amazon MWAA instale a versão correta do pacote para seu ambiente.

Copie sua chave secreta para o Amazon S3

Use o AWS Command Line Interface comando a seguir para copiar sua .pem chave para o dags diretório do seu ambiente no Amazon S3.

$ aws s3 cp your-secret-key.pem s3://your-bucket/dags/

A Amazon MWAA copia o conteúdodags, incluindo a .pem chave, para o /usr/local/airflow/dags/ diretório local. Ao fazer isso, o Apache Airflow pode acessar a chave.

Crie uma nova conexão com o Apache Airflow

Para criar uma nova SSH conexão usando a interface do usuário do Apache Airflow
  1. Abra a página Ambientes no MWAA console da Amazon.

  2. Na lista de ambientes, escolha Abrir a IU do Airflow em seu ambiente.

  3. Na página IU do Apache Airflow, selecione Admin na barra de navegação superior para expandir a lista suspensa e selecione Conexões.

  4. Na página Listar conexões, escolha o botão + ou Adicionar um novo registro para adicionar uma nova conexão.

  5. Na página Adicionar conexão, forneça as seguintes informações:

    1. Em Id da conexão, insira ssh_new.

    2. Para Tipo de conexão, escolha na SSHlista suspensa.

      nota

      Se o tipo de SSHconexão não estiver disponível na lista, a Amazon MWAA não instalou o apache-airflow-providers-ssh pacote necessário. Atualize seu arquivo requirements.txt para incluir esse pacote e tente novamente.

    3. Em Host, insira o endereço IP da EC2 instância da Amazon à qual você deseja se conectar. Por exemplo, 12.345.67.89.

    4. Em Nome de usuário, insira ec2-user se você está se conectando a uma EC2 instância da Amazon. Seu nome de usuário pode ser diferente, dependendo do tipo de instância remota à qual você deseja que o Apache Airflow se conecte.

    5. Para Extra, insira o seguinte par de valores-chave no formato: JSON

      { "key_file": "/usr/local/airflow/dags/your-secret-key.pem" }

      Esse par de chave-valor instrui o Apache Airflow a procurar a chave secreta no diretório local /dags.

Exemplo de código

O seguinte DAG usa o SSHOperator para se conectar à sua EC2 instância de destino da Amazon e, em seguida, executa o comando hostname Linux para imprimir o nome da instância. Você pode modificar o DAG para executar qualquer comando ou script na instância remota.

  1. Abra um terminal e navegue até o diretório em que seu DAG código está armazenado. Por exemplo:

    cd dags
  2. Copie o conteúdo da amostra de código a seguir e salve localmente como ssh.py.

    from airflow.decorators import dag from datetime import datetime from airflow.providers.ssh.operators.ssh import SSHOperator @dag( dag_id="ssh_operator_example", schedule_interval=None, start_date=datetime(2022, 1, 1), catchup=False, ) def ssh_dag(): task_1=SSHOperator( task_id="ssh_task", ssh_conn_id='ssh_new', command='hostname', ) my_ssh_dag = ssh_dag()
  3. Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acione-o DAG usando a interface do usuário do Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Se for bem-sucedido, você verá uma saída semelhante à seguinte nos registros de ssh_task tarefas do ssh_operator_exampleDAG:

    [2022-01-01, 12:00:00 UTC] {{base.py:79}} INFO - Using connection to: id: ssh_new. Host: 12.345.67.89, Port: None,
    Schema: , Login: ec2-user, Password: None, extra: {'key_file': '/usr/local/airflow/dags/your-secret-key.pem'}
    [2022-01-01, 12:00:00 UTC] {{ssh.py:264}} WARNING - Remote Identification Change is not verified. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{ssh.py:270}} WARNING - No Host Key Verification. This won't protect against Man-In-The-Middle attacks
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Connected (version 2.0, client OpenSSH_7.4)
    [2022-01-01, 12:00:00 UTC] {{transport.py:1819}} INFO - Authentication (publickey) successful!
    [2022-01-01, 12:00:00 UTC] {{ssh.py:139}} INFO - Running command: hostname
    [2022-01-01, 12:00:00 UTC]{{ssh.py:171}} INFO - ip-123-45-67-89.us-west-2.compute.internal
    [2022-01-01, 12:00:00 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=ssh_operator_example, task_id=ssh_task, execution_date=20220712T200914, start_date=20220712T200915, end_date=20220712T200916