Optimisation des temps de redémarrage pour les opérations de récupération et de mise à l’échelle de tâches - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Optimisation des temps de redémarrage pour les opérations de récupération et de mise à l’échelle de tâches

Lorsqu’une tâche échoue ou qu’une opération de mise à l’échelle est en cours, Flink tente de réexécuter la tâche à partir du dernier point de contrôle terminé. L’exécution du processus de redémarrage peut durer une minute ou plus, en fonction de la taille de l’état du point de contrôle et du nombre de tâches parallèles. Pendant la période de redémarrage, les tâches en attente peuvent s’accumuler pour la tâche. Flink peut cependant permettre d’optimiser la vitesse de récupération et de redémarrage des graphes d’exécution afin d’améliorer la stabilité des tâches.

Cette page décrit comment Amazon EMR Flink peut améliorer le temps de redémarrage des tâches lors des opérations de récupération ou de mise à l’échelle de tâches.

Note

La récupération locale des tâches est prise en charge avec Amazon EMR 6.0.0 et versions ultérieures.

Avec les points de contrôle Flink, chaque tâche crée un instantané de son état, que Flink écrit sur un système de stockage distribué tel qu’Amazon S3. En cas de récupération, les tâches restaurent leur état à partir du stockage distribué. Le stockage distribué offre une tolérance aux pannes et peut redistribuer l’état lors de la mise à l’échelle, car tous les nœuds peuvent y accéder.

Cependant, un magasin distribué à distance présente également un inconvénient : toutes les tâches doivent lire leur état depuis un emplacement distant sur le réseau, ce qui peut entraîner l’augmentation du temps de récupération pour les états importants lors des opérations de récupération ou de mise à l’échelle de tâches.

La récupération locale des tâches permet de résoudre ce problème. Les tâches enregistrent leur état au point de contrôle sur un stockage secondaire local à la tâche, par exemple sur un disque local. Elles stockent également leur état sur le stockage principal, à savoir Amazon S3 dans notre cas. Pendant la récupération, le planificateur planifie les tâches sur le même Task Manager que celui dans lequel les tâches ont été exécutées précédemment afin qu’elles puissent être récupérées depuis le magasin d’état local au lieu de lire depuis le magasin d’état distant. Pour plus d’informations, voir la rubrique Récupération locale des tâches de la documentation Apache Flink.

Nos tests d’évaluation avec des exemples de tâches ont montré que le temps de récupération était passé de quelques minutes à quelques secondes grâce à l’activation de la récupération locale des tâches.

Pour activer la récupération locale des tâches, définissez les configurations suivantes dans votre fichier flink-conf.yaml. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes.

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint execution.checkpointing.interval: 15000
Note

Le point de contrôle incrémentiel générique basé sur les journaux est pris en charge avec Amazon EMR 6.10.0 et versions ultérieures.

Le point de contrôle incrémentiel générique basé sur les journaux a été ajouté dans Flink 1.16 pour accélérer les points de contrôle. Un intervalle de point de contrôle plus court entraîne souvent une réduction du travail de récupération, car moins d’événements doivent être traités de nouveau après la récupération. Pour plus d’informations, accédez à la page Improving speed and stability of checkpointing with generic log-based incremental checkpoints sur le blog Apache Flink.

Sur base de quelques exemples de tâches, nos tests d’évaluation ont montré que le temps de contrôle était passé de quelques minutes à quelques secondes grâce au point de contrôle incrémentiel générique basé sur les journaux.

Pour activer les points de contrôle incrémentiels génériques basés sur les journaux, définissez les configurations suivantes dans votre fichier flink-conf.yaml. Spécifiez la valeur de l’intervalle de point de contrôle en millisecondes.

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
Note

La prise en charge de la récupération précise pour le planificateur par défaut est disponible avec Amazon EMR 6.0.0 et versions ultérieures. La prise en charge de la récupération précise pour le planificateur adaptatif est disponible avec Amazon EMR 6.15.0 et versions ultérieures.

Lorsqu’une tâche échoue pendant son exécution, Flink réinitialise l’intégralité du graphe d’exécution et déclenche une réexécution complète à partir du dernier point de contrôle terminé. Cette opération est plus chère qu’une simple réexécution des tâches qui ont échoué. La récupération précise redémarre uniquement le composant connecté au pipeline de la tâche ayant échoué. Dans l’exemple suivant, le graphe de tâches présente 5 sommets (A à E). Toutes les connexions entre les sommets sont en pipeline avec une distribution ponctuelle, et la valeur de parallelism.default pour la tâche est définie sur 2.

A → B → C → D → E

Dans cet exemple, 10 tâches sont en cours d’exécution au total. Le premier pipeline (a1 à e1) s’exécute sur un TaskManager (TM1), et le deuxième pipeline (a2 à e2) s’exécute sur un autre TaskManager (TM2).

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

Deux composants sont connectés en pipeline : a1 → e1 et a2 → e2. En cas d’échec de TM1 ou de TM2, l’échec affecte uniquement les 5 tâches du pipeline dans lequel TaskManager était en cours d’exécution. La stratégie de redémarrage démarre uniquement le composant en pipeline concerné.

La récupération précise ne fonctionne qu’avec des tâches Flink parfaitement parallèles. Elle n’est pas prise en charge par keyBy() ou par les opérations redistribute(). Pour plus d’informations, accédez à la page FLIP-1 : Fine Grained Recovery from Task Failures du projet Jira Flink Improvement Proposal.

Pour activer la récupération précise, définissez les configurations suivantes dans votre fichier flink-conf.yaml.

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
Note

Le mécanisme de redémarrage combiné du planificateur adaptatif est pris en charge par Amazon EMR 6.15.0 et versions ultérieures.

Le planificateur adaptatif peut ajuster le parallélisme de la tâche en fonction des emplacements disponibles. Si le nombre d’emplacements disponibles est insuffisant, le planificateur réduit automatiquement le parallélisme pour s’adapter au parallélisme des tâches configuré. Si de nouveaux emplacements sont disponibles, la tâche fait l’objet d’une augmentation d’échelle selon le parallélisme des tâches configuré. Un planificateur adaptatif permet d’éviter les temps d’arrêt de la tâche lorsque les ressources disponibles sont insuffisantes. Il s’agit du planificateur pris en charge par l’outil de mise à l’échelle automatique Flink. Nous recommandons donc l’utilisation d’un planificateur adaptatif avec Amazon EMR Flink. Les planificateurs adaptatifs peuvent toutefois effectuer plusieurs redémarrages en peu de temps, à raison d’un redémarrage pour chaque nouvelle ressource ajoutée, ce qui peut entraîner une baisse des performances de la tâche.

Avec Amazon EMR 6.15.0 et versions ultérieures, Flink dispose d’un mécanisme de redémarrage combiné dans le planificateur adaptatif qui ouvre une fenêtre de redémarrage lorsque la première ressource est ajoutée, puis attend l’intervalle de fenêtre configuré de 1 minute par défaut. Un seul redémarrage est effectué lorsque les ressources disponibles sont suffisantes pour exécuter la tâche avec un parallélisme configuré ou lorsque l’intervalle expire.

Grâce à quelques exemples de tâches, nos tests d’évaluation ont montré que cette fonctionnalité traite 10 % d’enregistrements supplémentaires par rapport au comportement par défaut lorsque vous utilisez le planificateur adaptatif et l’outil de mise à l’échelle automatique Flink.

Pour activer le mécanisme de redémarrage combiné, définissez les configurations suivantes dans votre fichier flink-conf.yaml.

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m