新しい Amazon MWAA 環境に移行する - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

新しい Amazon MWAA 環境に移行する

既存の Apache Airflow ワークロードを新しい Amazon MWAA 環境に移行するには、次のステップを確認します。これらのステップを使用して、古いバージョンの Amazon MWAA から新しいバージョンリリースに移行したり、セルフマネージド Apache Airflow デプロイを Amazon MWAA に移行したりできます。このチュートリアルでは、既存の Apache Airflow v1.10.12 から Apache Airflow v2.5.1 を実行している新しい Amazon MWAA に移行することを前提としていますが、同じ手順を使用して異なる Apache Airflow バージョンから移行したり、異なる Apache Airflow バージョンに移行したりできます。

前提条件

手順を完了して環境を移行するには、以下が必要です。

ステップ 1: サポートされている最新の Apache Airflow バージョンを実行する新しい Amazon MWAA 環境を作成する

Amazon MWAA ユーザーガイドの「Amazon Word の開始方法」の詳細な手順を使用するか、 AWS CloudFormation テンプレートを使用して環境を作成できます。 MWAA 既存の Amazon MWAA 環境から移行し、 AWS CloudFormation テンプレートを使用して古い環境を作成している場合は、 AirflowVersionプロパティを変更して新しいバージョンを指定できます。

MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion: 2.5.1 DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true

または、既存の Amazon MWAA 環境から移行する場合、AWS SDK for Python (Boto3) を使用して環境のクローンを作成する次の Python スクリプトをコピーすることもできます。スクリプトをダウンロードすることもできます。

# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)

ステップ 2: ワークフローリソースの移行

Apache Airflow v2 はメジャーバージョンのリリースです。Apache Airflow v1 から移行する場合は、ワークフローリソースを準備し、DAGs、要件、プラグインに加えた変更を確認する必要があります。そのためには、Docker と Amazon MWAA ローカルランナーを使用して、ローカルオペレーティングシステムで Apache Airflow のブリッジバージョンを設定することをお勧めします。Amazon MWAA ローカルランナーは、Amazon CLI 環境をローカルでレプリケートするコマンドラインインターフェイス (MWAA) ユーティリティを提供します。

Apache Airflow のバージョンを変更するときは、必ず で正しい単語を参照--constraintしてくださいrequirements.txt。 URL

ワークフローリソースを移行するには
  1. aws-mwaa-local-runner リポジトリのフォークを作成し、Amazon MWAA ローカルランナーのコピーをクローンします。

  2. aws-mwaa-local-runner リポジトリのv1.10.15ブランチを確認します。Apache Airflow は、Apache Airflow v2 への移行を支援するブリッジリリースとして v1.10.15 をリリースしました。Amazon MWAA は v1.10.15 をサポートしていませんが、Amazon MWAA ローカルランナーを使用してリソースをテストできます。

  3. Amazon MWAA ローカルランナー CLI ツールを使用して Docker イメージを構築し、Apache Airflow をローカルで実行します。詳細については、README リポジトリのローカルランナー GitHub を参照してください。

  4. ローカルで実行されている Apache Airflow を使用して、Apache Airflow ドキュメンテーションウェブサイトの「1.10 から 2 へのアップグレード」で説明されている手順に従ってください。

    1. を更新するにはrequirements.txt「Amazon MWAA ユーザーガイド」の「Python 依存関係の管理」で推奨されるベストプラクティスに従ってください。

    2. カスタム演算子とセンサーを既存の Apache Airflow v1.10.12 環境のプラグインにバンドルしている場合は、DAG フォルダに移動します。Apache Airflow v2+ のモジュール管理のベストプラクティスについての詳細な情報については、Apache Airflow のドキュメンテーションウェブサイトの「モジュール管理」を参照してください。

  5. ワークフローリソースに必要な変更を加えたら、 aws-mwaa-local-runner リポジトリの v2.5.1 ブランチを確認し、更新されたワークフローの DAGs、要件、カスタムプラグインをローカルでテストします。別の Apache Airflow バージョンに移行する場合は、代わりにそのバージョンに適したローカルランナーブランチを使用できます。

  6. ワークフローリソースを正常にテストしたら、新しい Amazon DAGs 環境で設定した Amazon S3 バケットに MWAArequirements.txt、、プラグインをコピーします。 Amazon S3

ステップ 3: 既存の環境からメタデータをエクスポートする

dag、、 などの Apache Airflow メタデータテーブルはdag_tag、更新された DAG ファイルを環境の Amazon S3 バケットにコピーし、スケジューラがそれらを解析するときにdag_code自動的に入力されます。アクセス許可関連のテーブルも、IAM 実行ロールのアクセス許可に基づいて自動的に入力されます。移行する必要はありません。

DAG 履歴、、variableslot_pool、および必要に応じて、sla_missjob、、および xcomlogテーブルに関連するデータを移行できます。タスクインスタンスログは、airflow-{environment_name}ロググループの下の CloudWatch Logs に保存されます。古い実行のタスクインスタンスログを表示したい場合は、それらのログを新しい環境ロググループにコピーする必要があります。関連するコストを削減するために、数日分のログだけを移動することをおすすめします。

既存の Amazon MWAA 環境から移行する場合、メタデータデータベースに直接アクセスすることはできません。既存の Amazon DAG 環境から選択した Amazon S3 バケットにメタデータをエクスポートするには、MWAA を実行する必要があります。自己管理環境から移行する場合は、以下の手順を使用して Apache Airflow メタデータをエクスポートすることもできます。

データがエクスポートされたら、新しい環境で DAG を実行してデータをインポートできます。エクスポートおよびインポートプロセス中、他のすべての DAGs は一時停止されます。

既存の環境からメタデータをエクスポートするには
  1. を使用して Amazon S3 バケットを作成し AWS CLI 、エクスポートしたデータを保存します。UUIDregion をお客様の情報に置き換えます。

    $ aws s3api create-bucket \ --bucket mwaa-migration-{UUID}\ --region {region}
    注記

    変数に保存する接続などの機密データを移行する場合は、Amazon S3 バケットのデフォルト暗号化を有効にすることをお勧めします。

  2. 注記

    自己管理型の環境からの移行には適用されません。

    既存の環境の実行ロールを変更し、次のポリシーを追加して、ステップ 1 で作成したバケットへの書き込みアクセスを許可します。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  3. amazon-mwaa-examples リポジトリのクローンを作成し、移行シナリオの metadata-migration サブディレクトリに移動します。

    $ git clone https://github.com/aws-samples/amazon-mwaa-examples.git $ cd amazon-mwaa-examples/usecases/metadata-migration/existing-version-new-version/
  4. export_data.py で、S3_BUCKET の文字列の値を、エクスポートしたメタデータを保存するために作成した Amazon S3 バケットに置き換えます。

    S3_BUCKET = 'mwaa-migration-{UUID}'
  5. requirements.txt ファイルを metadata-migration ディレクトリに配置してください。既存の環境の要件ファイルが既にある場合は、requirements.txt で指定されている追加の要件をファイルに追加してください。既存の要件ファイルがない場合は、metadata-migration ディレクトリにあるものを使用してください。

  6. 既存の環境に関連付けられている Amazon S3 バケットの DAG ディレクトリexport_data.pyにコピーします。自己管理環境から移行する場合は、/dags フォルダに export_data.py をコピーします。

  7. 更新した requirements.txt を、既存の環境に関連付けられている Amazon S3 バケットにコピーし、環境を編集して新しい requirements.txt バージョンを指定します。

  8. 環境が更新されたら、Apache Airflow UI にアクセスし、DAG db_export の一時停止を解除し、ワークフローの実行をトリガーします。

  9. メタデータが mwaa-migration-{UUID} Amazon S3 バケットの data/migration/existing-version_to_new-version/export/ にエクスポートされ、各テーブルが専用のファイルにあることを確認します。

ステップ 4: メタデータを新しい環境にインポートする

メタデータを新しい環境にインポートするには
  1. import_data.py で、以下の文字列値を自分の情報に置き換えてください。

    • 既存の Amazon MWAA 環境から移行する場合:

      S3_BUCKET = 'mwaa-migration-{UUID}' OLD_ENV_NAME='{old_environment_name}' NEW_ENV_NAME='{new_environment_name}' TI_LOG_MAX_DAYS = {number_of_days}

      MAX_DAYS は、ワークフローが新しい環境にコピーするログファイルの日数を制御します。

    • セルフマネージド環境からの移行の場合:

      S3_BUCKET = 'mwaa-migration-{UUID}' NEW_ENV_NAME='{new_environment_name}'
  2. (オプション) import_data.py は、失敗したタスクログのみをコピーします。すべてのタスクログをコピーする場合は、以下のコードスニペットに示すように、getDagTasks 関数を変更して ti.state = 'failed' を削除してください。

    def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
  3. 新しい環境の実行ロールを変更し、次のポリシーを追加します。アクセス許可ポリシーにより、Amazon MWAA は Apache Airflow メタデータをエクスポートした Amazon S3 バケットから読み取り、既存のロググループからタスクインスタンスログをコピーできます。すべてのプレースホルダーを自分の情報に置き換えます。

    注記

    セルフマネージド環境から移行する場合は、ポリシーから CloudWatch Logs 関連のアクセス許可を削除する必要があります。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:{region}:{account_number}:log-group:airflow-{old_environment_name}*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}", "arn:aws:s3:::mwaa-migration-{UUID}/*" ] } ] }
  4. 新しい環境に関連付けられた Amazon S3 バケットの DAG ディレクトリimport_data.pyにコピーし、Apache Airflow UI にアクセスして DAG db_import の一時停止を解除し、ワークフローをトリガーします。新しい DAG は数分で Apache Airflow UI に表示されます。

  5. DAG の実行が完了したら、個々の DAG にアクセスして、DAG の実行履歴がコピーされていることを確認します。

次のステップ

  • Apache Airflow モデル (Apache Airflow ドキュメント) — Apache Airflow メタデータデータベースモデルについて詳しく学んでください。