Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Migrer vers un nouvel MWAA environnement Amazon
Découvrez les étapes suivantes pour migrer votre charge de travail Apache Airflow existante vers un nouvel MWAA environnement Amazon. Vous pouvez suivre ces étapes pour migrer d'une ancienne version d'Amazon MWAA vers une nouvelle version, ou pour migrer votre déploiement Apache Airflow autogéré vers Amazon. MWAA Ce didacticiel part du principe que vous migrez d'un Apache Airflow v1.10.12 existant vers un nouvel Amazon MWAA exécutant Apache Airflow v2.5.1, mais vous pouvez utiliser les mêmes procédures pour migrer depuis ou vers différentes versions d'Apache Airflow.
Rubriques
- Prérequis
- Étape 1 : créer un nouvel MWAA environnement Amazon exécutant la dernière version prise en charge d'Apache Airflow
- Deuxième étape : migrer les ressources de votre flux de travail
- Troisième étape : Exporter les métadonnées de votre environnement existant
- Étape 4 : Importation des métadonnées dans votre nouvel environnement
- Étapes suivantes
- Ressources connexes
Prérequis
Pour effectuer les étapes et migrer votre environnement, vous aurez besoin des éléments suivants :
-
Un déploiement d'Apache Airflow. Il peut s'agir d'un MWAA environnement Amazon autogéré ou existant.
-
Docker est installé sur
votre système d'exploitation local. -
AWS Command Line Interface version 2 installée.
Étape 1 : créer un nouvel MWAA environnement Amazon exécutant la dernière version prise en charge d'Apache Airflow
Vous pouvez créer un environnement en suivant les étapes détaillées de la section Getting started with Amazon MWAA dans le guide de MWAA l'utilisateur Amazon, ou en utilisant un AWS CloudFormation modèle. Si vous migrez depuis un MWAA environnement Amazon existant et que vous avez utilisé un AWS CloudFormation modèle pour créer votre ancien environnement, vous pouvez modifier la AirflowVersion
propriété pour spécifier la nouvelle version.
MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn AirflowVersion:
2.5.1
DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true
Si vous migrez depuis un MWAA environnement Amazon existant, vous pouvez également copier le script Python suivant qui utilise le script AWS SDKfor Python (Boto3)
# This Python file uses the following encoding: utf-8 ''' Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' from __future__ import print_function import argparse import json import socket import time import re import sys from datetime import timedelta from datetime import datetime import boto3 from botocore.exceptions import ClientError, ProfileNotFound from boto3.session import Session ENV_NAME = "" REGION = "" def verify_boto3(boto3_current_version): ''' check if boto3 version is valid, must be 1.17.80 and up return true if all dependenceis are valid, false otherwise ''' valid_starting_version = '1.17.80' if boto3_current_version == valid_starting_version: return True ver1 = boto3_current_version.split('.') ver2 = valid_starting_version.split('.') for i in range(max(len(ver1), len(ver2))): num1 = int(ver1[i]) if i < len(ver1) else 0 num2 = int(ver2[i]) if i < len(ver2) else 0 if num1 > num2: return True elif num1 < num2: return False return False def get_account_id(env_info): ''' Given the environment metadata, fetch the account id from the environment ARN ''' return env_info['Arn'].split(":")[4] def validate_envname(env_name): ''' verify environment name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z][0-9a-zA-Z-_]*$", env_name): return env_name raise argparse.ArgumentTypeError("%s is an invalid environment name value" % env_name) def validation_region(input_region): ''' verify environment name doesn't have path to files or unexpected input REGION: example is us-east-1 ''' session = Session() mwaa_regions = session.get_available_regions('mwaa') if input_region in mwaa_regions: return input_region raise argparse.ArgumentTypeError("%s is an invalid REGION value" % input_region) def validation_profile(profile_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"^[a-zA-Z0-9]*$", profile_name): return profile_name raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name) def validation_version(version_name): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r"[1-2].\d.\d", version_name): return version_name raise argparse.ArgumentTypeError("%s is an invalid version name value" % version_name) def validation_execution_role(execution_role_arn): ''' verify profile name doesn't have path to files or unexpected input ''' if re.match(r'(?i)\b((?:[a-z][\w-]+:(?:/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:\'".,<>?«»“”‘’]))', execution_role_arn): return execution_role_arn raise argparse.ArgumentTypeError("%s is an invalid execution role ARN" % execution_role_arn) def create_new_env(env): ''' method to duplicate env ''' mwaa = boto3.client('mwaa', region_name=REGION) print('Source Environment') print(env) if (env['AirflowVersion']=="1.10.12") and (VERSION=="2.2.2"): if env['AirflowConfigurationOptions']['secrets.backend']=='airflow.contrib.secrets.aws_secrets_manager.SecretsManagerBackend': print('swapping',env['AirflowConfigurationOptions']['secrets.backend']) env['AirflowConfigurationOptions']['secrets.backend']='airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend' env['LoggingConfiguration']['DagProcessingLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['SchedulerLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['TaskLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WebserverLogs'].pop('CloudWatchLogGroupArn') env['LoggingConfiguration']['WorkerLogs'].pop('CloudWatchLogGroupArn') env['AirflowVersion']=VERSION env['ExecutionRoleArn']=EXECUTION_ROLE_ARN env['Name']=ENV_NAME_NEW env.pop('Arn') env.pop('CreatedAt') env.pop('LastUpdate') env.pop('ServiceRoleArn') env.pop('Status') env.pop('WebserverUrl') if not env['Tags']: env.pop('Tags') print('Destination Environment') print(env) return mwaa.create_environment(**env) def get_mwaa_env(input_env_name): # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/mwaa.html#MWAA.Client.get_environment mwaa = boto3.client('mwaa', region_name=REGION) environment = mwaa.get_environment( Name=input_env_name )['Environment'] return environment def print_err_msg(c_err): '''short method to handle printing an error message if there is one''' print('Error Message: {}'.format(c_err.response['Error']['Message'])) print('Request ID: {}'.format(c_err.response['ResponseMetadata']['RequestId'])) print('Http code: {}'.format(c_err.response['ResponseMetadata']['HTTPStatusCode'])) # # Main # # Usage: # python3 clone_environment.py --envname MySourceEnv --envnamenew MyDestEnv --region us-west-2 --execution_role AmazonMWAA-MyDestEnv-ExecutionRole --version 2.2.2 # # based on https://github.com/awslabs/aws-support-tools/blob/master/MWAA/verify_env/verify_env.py # if __name__ == '__main__': if sys.version_info[0] < 3: print("python2 detected, please use python3. Will try to run anyway") if not verify_boto3(boto3.__version__): print("boto3 version ", boto3.__version__, "is not valid for this script. Need 1.17.80 or higher") print("please run pip install boto3 --upgrade --user") sys.exit(1) parser = argparse.ArgumentParser() parser.add_argument('--envname', type=validate_envname, required=True, help="name of the source MWAA environment") parser.add_argument('--region', type=validation_region, default=boto3.session.Session().region_name, required=False, help="region, Ex: us-east-1") parser.add_argument('--profile', type=validation_profile, default=None, required=False, help="AWS CLI profile, Ex: dev") parser.add_argument('--version', type=validation_version, default="2.2.2", required=False, help="Airflow destination version, Ex: 2.2.2") parser.add_argument('--execution_role', type=validation_execution_role, default=None, required=True, help="New environment execution role ARN, Ex: arn:aws:iam::112233445566:role/service-role/AmazonMWAA-MyEnvironment-ExecutionRole") parser.add_argument('--envnamenew', type=validate_envname, required=True, help="name of the destination MWAA environment") args, _ = parser.parse_known_args() ENV_NAME = args.envname REGION = args.region PROFILE = args.profile VERSION = args.version EXECUTION_ROLE_ARN = args.execution_role ENV_NAME_NEW = args.envnamenew try: print("PROFILE",PROFILE) if PROFILE: boto3.setup_default_session(profile_name=PROFILE) env = get_mwaa_env(ENV_NAME) response = create_new_env(env) print(response) except ClientError as client_error: if client_error.response['Error']['Code'] == 'LimitExceededException': print_err_msg(client_error) print('please retry the script') elif client_error.response['Error']['Code'] in ['AccessDeniedException', 'NotAuthorized']: print_err_msg(client_error) print('please verify permissions used have permissions documented in readme') elif client_error.response['Error']['Code'] == 'InternalFailure': print_err_msg(client_error) print('please retry the script') else: print_err_msg(client_error) except ProfileNotFound as profile_not_found: print('profile', PROFILE, 'does not exist, please doublecheck the profile name') except IndexError as error: print("Error:", error)
Deuxième étape : migrer les ressources de votre flux de travail
Apache Airflow v2 est une version majeure. Si vous migrez depuis Apache Airflow v1, vous devez préparer les ressources de votre flux de travail et vérifier les modifications que vous apportez à vos DAGs exigences et à vos plugins. Pour ce faire, nous vous recommandons de configurer une version bridge d'Apache Airflow sur votre système d'exploitation local à l'aide de Docker et d'Amazon MWAA Local Runner
Chaque fois que vous modifiez les versions d'Apache Airflow, assurez-vous de faire référence à la bonne version --constraint
URL dans votrerequirements.txt
.
Pour migrer les ressources de votre flux de travail
-
Créez un fork du aws-mwaa-local-runner
référentiel et clonez une copie de l'Amazon MWAA local Runner. -
Vérifiez la
v1.10.15
branche du dépôt aws-mwaa-local -runner. Apache Airflow a publié la version v1.10.15 en tant que version intermédiaire pour faciliter la migration vers Apache Airflow v2. Bien qu'Amazon MWAA ne prenne pas en charge la version v1.10.15, vous pouvez utiliser le programme d'exécution local Amazon MWAA pour tester vos ressources. -
Utilisez l'CLIoutil Amazon MWAA local Runner pour créer l'image Docker et exécuter Apache Airflow localement. Pour plus d'informations, consultez le runner local README
dans le GitHub référentiel. -
Si Apache Airflow est exécuté localement, suivez les étapes décrites dans la section Mise à niveau de la version 1.10 vers la version 2 sur
le site Web de documentation d'Apache Airflow. -
Pour mettre à jour votre
requirements.txt
compte, suivez les meilleures pratiques que nous recommandons dans la section Gestion des dépendances Python, dans le guide de MWAA l'utilisateur Amazon. -
Si vous avez intégré vos opérateurs et capteurs personnalisés à vos plug-ins pour votre environnement Apache Airflow v1.10.12 existant, déplacez-les dans votre dossier. DAG Pour plus d'informations sur les meilleures pratiques de gestion des modules pour Apache Airflow v2+, consultez la section Gestion des modules
sur le site Web de documentation d'Apache Airflow.
-
-
Après avoir apporté les modifications nécessaires à vos ressources de flux de travail, consultez la
v2.5.1
branche du référentiel aws-mwaa-local -runner et testez localement votre flux de travail mis à jourDAGs, vos exigences et vos plugins personnalisés. Si vous migrez vers une autre version d'Apache Airflow, vous pouvez plutôt utiliser la branche d'exécution locale appropriée à votre version. -
Après avoir testé avec succès les ressources de votre flux de travail DAGs
requirements.txt
, copiez vos plugins et vos plugins dans le compartiment Amazon S3 que vous avez configuré avec votre nouvel MWAA environnement Amazon.
Troisième étape : Exporter les métadonnées de votre environnement existant
Les tables de métadonnées Apache Airflow telles que dag
dag_tag
, et dag_code
sont automatiquement renseignées lorsque vous copiez les DAG fichiers mis à jour dans le compartiment Amazon S3 de votre environnement et que le planificateur les analyse. Les tables relatives aux autorisations sont également renseignées automatiquement en fonction de votre autorisation de rôle IAM d'exécution. Il n'est pas nécessaire de les faire migrer.
Vous pouvez migrer les données relatives à DAG l'historique variable
slot_pool
,,sla_miss
, et, si nécessairexcom
,job
, et aux log
tables. Le journal des instances de tâches est stocké dans les CloudWatch journaux du groupe de airflow-
journaux. Si vous souhaitez consulter les journaux des instances de tâches pour les anciennes exécutions, ces journaux doivent être copiés dans le nouveau groupe de journaux d'environnement. Nous vous recommandons de ne déplacer que l'équivalent de quelques jours de journaux afin de réduire les coûts associés. {environment_name}
Si vous migrez depuis un MWAA environnement Amazon existant, il n'y a pas d'accès direct à la base de données de métadonnées. Vous devez exécuter un DAG pour exporter les métadonnées de votre MWAA environnement Amazon existant vers un compartiment Amazon S3 de votre choix. Les étapes suivantes peuvent également être utilisées pour exporter les métadonnées Apache Airflow si vous migrez depuis un environnement autogéré.
Une fois les données exportées, vous pouvez en exécuter une DAG dans votre nouvel environnement pour les importer. Pendant le processus d'exportation et d'importation, tous les autres processus DAGs sont interrompus.
Pour exporter les métadonnées depuis votre environnement existant
-
Créez un compartiment Amazon S3 à l'aide du AWS CLI pour stocker les données exportées. Remplacez le
UUID
etregion
par vos informations.$
aws s3api create-bucket \ --bucket mwaa-migration-
{UUID}
\ --region{region}
Note
Si vous migrez des données sensibles, telles que des connexions que vous stockez dans des variables, nous vous recommandons d'activer le chiffrement par défaut pour le compartiment Amazon S3.
-
Note
Ne s'applique pas à la migration depuis un environnement autogéré.
Modifiez le rôle d'exécution de l'environnement existant et ajoutez la politique suivante pour accorder un accès en écriture au bucket que vous avez créé à la première étape.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject*" ], "Resource": [ "arn:aws:s3:::mwaa-migration-
{UUID}
/*" ] } ] } -
Clonez le amazon-mwaa-examples
référentiel et accédez au metadata-migration
sous-répertoire correspondant à votre scénario de migration.$
git clone https://github.com/aws-samples/amazon-mwaa-examples.git
$
cd amazon-mwaa-examples/usecases/metadata-migration/
existing-version
-new-version
/ -
Dans
export_data.py
, remplacez la valeur de chaîne pourS3_BUCKET
par le compartiment Amazon S3 que vous avez créé pour stocker les métadonnées exportées.S3_BUCKET = 'mwaa-migration-
{UUID}
' -
Localisez le
requirements.txt
fichier dans lemetadata-migration
répertoire. Si vous disposez déjà d'un fichier d'exigences pour votre environnement existant, ajoutez les exigences supplémentaires spécifiées dansrequirements.txt
votre fichier. Si vous n'avez pas de fichier d'exigences existant, vous pouvez simplement utiliser celui fourni dans lemetadata-migration
répertoire. -
export_data.py
Copiez DAG dans le répertoire du compartiment Amazon S3 associé à votre environnement existant. Si vous migrez depuis un environnement autogéré, copiez-le dans votreexport_data.py
/dags
dossier. -
Copiez votre mise
requirements.txt
à jour dans le compartiment Amazon S3 associé à votre environnement existant, puis modifiez l'environnement pour spécifier la nouvellerequirements.txt
version. -
Une fois l'environnement mis à jour, accédez à l'interface utilisateur d'Apache Airflow, annulez le
db_export
DAG et déclenchez l'exécution du flux de travail. -
Vérifiez que les métadonnées sont
data/migration/
exportées vers le compartimentexisting-version
_to_new-version
/export/mwaa-migration-
Amazon S3, chaque table figurant dans son propre fichier dédié.{UUID}
Étape 4 : Importation des métadonnées dans votre nouvel environnement
Pour importer les métadonnées dans votre nouvel environnement
-
Dans
import_data.py
, remplacez les valeurs de chaîne suivantes par vos informations.-
Pour la migration depuis un MWAA environnement Amazon existant :
S3_BUCKET = 'mwaa-migration-
{UUID}
' OLD_ENV_NAME='{old_environment_name}
' NEW_ENV_NAME='{new_environment_name}
' TI_LOG_MAX_DAYS ={number_of_days}
MAX_DAYS
contrôle le nombre de jours de fichiers journaux que le flux de travail copie dans le nouvel environnement. -
Pour la migration depuis un environnement autogéré :
S3_BUCKET = 'mwaa-migration-
{UUID}
' NEW_ENV_NAME='{new_environment_name}
'
-
-
(Facultatif)
import_data.py
copie uniquement les journaux des tâches ayant échoué. Si vous souhaitez copier tous les journaux des tâches, modifiez lagetDagTasks
fonction et supprimez-lati.state = 'failed'
comme indiqué dans l'extrait de code suivant.def getDagTasks(): session = settings.Session() dagTasks = session.execute(f"select distinct ti.dag_id, ti.task_id, date(r.execution_date) as ed \ from task_instance ti, dag_run r where r.execution_date > current_date - {TI_LOG_MAX_DAYS} and \ ti.dag_id=r.dag_id and ti.run_id = r.run_id order by ti.dag_id, date(r.execution_date);").fetchall() return dagTasks
-
Modifiez le rôle d'exécution de votre nouvel environnement et ajoutez la politique suivante. La politique d'autorisation permet MWAA à Amazon de lire le contenu du compartiment Amazon S3 dans lequel vous avez exporté les métadonnées Apache Airflow et de copier les journaux des instances de tâches à partir de groupes de journaux existants. Remplacez tous les espaces réservés par vos informations.
Note
Si vous migrez depuis un environnement autogéré, vous devez supprimer les autorisations liées aux CloudWatch journaux de la politique.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:GetLogEvents", "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:
{region}
:{account_number}
:log-group:airflow-{old_environment_name}
*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject", "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::mwaa-migration-{UUID}
", "arn:aws:s3:::mwaa-migration-{UUID}
/*" ] } ] } -
Copiez
import_data.py
DAG dans le répertoire du compartiment Amazon S3 associé à votre nouvel environnement, puis accédez à l'interface utilisateur d'Apache Airflow pour suspendre le flux de travaildb_import
DAG et le déclencher. Le nouveau DAG apparaîtra dans l'interface utilisateur d'Apache Airflow dans quelques minutes. -
Une fois l'DAGexécution terminée, vérifiez que l'historique de vos DAG courses est copié en accédant à chaque utilisateurDAG.
Étapes suivantes
-
Pour plus d'informations sur les classes et fonctionnalités d'MWAAenvironnement Amazon disponibles, consultez la section Classe d'MWAAenvironnement Amazon dans le guide de MWAA l'utilisateur Amazon.
-
Pour plus d'informations sur la manière dont Amazon MWAA gère le dimensionnement MWAA automatique des collaborateurs, consultez le manuel Amazon MWAA User Guide.
-
Pour plus d'informations sur Amazon MWAA RESTAPI, consultez Amazon MWAA REST API.
Ressources connexes
-
Modèles Apache Airflow
(documentation Apache Airflow) : en savoir plus sur les modèles de base de données de métadonnées Apache Airflow.