Apache Hive 및 Hadoop을 사용하여 사용자 지정 플러그인 생성 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Hive 및 Hadoop을 사용하여 사용자 지정 플러그인 생성

Amazon MWAA는 plugins.zip의 콘텐츠를 /usr/local/airflow/plugins로 추출합니다. 이를 사용하여 컨테이너에 바이너리를 추가할 수 있습니다. 또한 Apache Airflow는 스타트업plugins 폴더에 있는 Python 파일의 콘텐츠를 실행하므로 환경 변수를 설정하고 수정할 수 있습니다. 다음 샘플은 Amazon Managed Workflows for Apache Airflow 환경에서 Apache Hive 및 Hadoop을 사용하여 사용자 지정 플러그인을 생성하고 다른 사용자 지정 플러그인 및 바이너리와 결합할 수 있는 단계를 안내합니다.

버전

  • 이 페이지의 샘플 코드는 Python 3.7Apache Airflow v1과 함께 사용할 수 있습니다.

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

사전 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

권한

  • 이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

요구 사항

이 페이지의 샘플 코드를 사용하려면 다음 종속성을 사용자 requirements.txt에 추가합니다. 자세한 내용은 Python 종속성 설치 섹션을 참조하십시오.

Apache Airflow v2
-c https://raw.githubusercontent.com/apache/airflow/constraints-2.0.2/constraints-3.7.txt apache-airflow-providers-amazon[apache.hive]
Apache Airflow v1
apache-airflow[hive]==1.10.12

다운로드 종속성

Amazon MWAA는 plugins.zip 콘텐츠를 각 Amazon MWAA 스케줄러 및 작업자 컨테이너에 있는 /usr/local/airflow/plugins로 추출합니다. 이는 환경에 바이너리를 추가하는 데 사용됩니다. 다음 단계에서는 사용자 지정 플러그인에 필요한 파일을 조합하는 방법을 설명합니다.

  1. 명령 프롬프트에서 플러그인을 만들려는 디렉터리로 이동합니다. 예:

    cd plugins
  2. 미러에서 Hadoop을 다운로드합니다. 예를 들면 다음과 같습니다.

    wget https://downloads.apache.org/hadoop/common/hadoop-3.3.0/hadoop-3.3.0.tar.gz
  3. 미러에서 Hive를 다운로드합니다. 예를 들면 다음과 같습니다.

    wget https://downloads.apache.org/hive/hive-3.1.2/apache-hive-3.1.2-bin.tar.gz
  4. 디렉터리를 생성합니다. 예:

    mkdir hive_plugin
  5. Hadoop을 추출합니다.

    tar -xvzf hadoop-3.3.0.tar.gz -C hive_plugin
  6. Hive를 추출합니다.

    tar -xvzf apache-hive-3.1.2-bin.tar.gz -C hive_plugin

사용자 지정 플러그인

Apache Airflow는 스타트업 시 플러그인 폴더에 있는 Python 파일의 콘텐츠를 실행합니다. 이는 환경 변수를 설정하고 수정하는 데 사용됩니다. 다음 단계에서는 사용자 지정 플러그인의 샘플 코드를 설명합니다.

  1. 명령 프롬프트에서 hive_plugin 디렉터리로 이동합니다. 예:

    cd hive_plugin
  2. 다음 코드 샘플의 콘테츠를 복사하여 로컬의 hive_plugin 디렉터리에 hive_plugin.py로 저장합니다.

    from airflow.plugins_manager import AirflowPlugin import os os.environ["JAVA_HOME"]="/usr/lib/jvm/jre" os.environ["HADOOP_HOME"]='/usr/local/airflow/plugins/hadoop-3.3.0' os.environ["HADOOP_CONF_DIR"]='/usr/local/airflow/plugins/hadoop-3.3.0/etc/hadoop' os.environ["HIVE_HOME"]='/usr/local/airflow/plugins/apache-hive-3.1.2-bin' os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/plugins/hadoop-3.3.0:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/bin:/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" os.environ["CLASSPATH"] = os.getenv("CLASSPATH") + ":/usr/local/airflow/plugins/apache-hive-3.1.2-bin/lib" class EnvVarPlugin(AirflowPlugin): name = 'hive_plugin'
  3. 다음 텍스트의 콘텐츠를 복사하여 로컬의 hive_plugin 디렉터리에 .airflowignore로 저장합니다.

    hadoop-3.3.0 apache-hive-3.1.2-bin

Plugins.zip

다음 단계에서는 plugins.zip을 생성하는 방법을 보여줍니다. 이 예제의 내용은 다른 플러그인 및 바이너리와 결합하여 단일 plugins.zip 파일로 만들 수 있습니다.

  1. 명령 프롬프트에서 이전 단계의 hive_plugin 디렉터리로 이동합니다. 예:

    cd hive_plugin
  2. plugins 폴더 내 콘텐츠를 압축합니다.

    zip -r ../hive_plugin.zip ./

코드 샘플

다음 단계에서는 사용자 지정 플러그인을 테스트할 DAG 코드를 생성하는 방법을 설명합니다.

  1. 명령 프롬프트에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하고 로컬에서 hive.py로 저장합니다.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="hive_test_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: hive_test = BashOperator( task_id="hive_test", bash_command='hive --help' )

Airflow 구성 옵션

Apache Airflow v2를 사용하는 경우 Apache Airflow 구성 옵션으로 core.lazy_load_plugins : False을 추가합니다. 자세한 내용은 2에서 구성 옵션을 사용하여 플러그인 로드를 참조하십시오.

다음 단계