Utilisation de métriques personnalisées avec Amazon Managed Service pour Apache Flink - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de métriques personnalisées avec Amazon Managed Service pour Apache Flink

Le service géré pour Apache Flink expose 19 mesures CloudWatch, y compris des mesures relatives à l'utilisation des ressources et au débit. En outre, vous pouvez créer vos propres métriques pour suivre des données spécifiques à l’application, telles que le traitement des événements ou l’accès à des ressources externes.

Cette rubrique contient les sections suivantes :

Comment ça marche

Les métriques personnalisées du service géré pour Apache Flink utilisent le système de métrique Apache Flink. Les métriques Apache Flink ont les attributs suivants :

  • Type : le type d’une métrique décrit la manière dont elle mesure et crée des rapport sur les données. Les types de métriques Apache Flink disponibles incluent Nombre, Jauge, Histogramme et Mètre. Pour plus d’informations sur les types de métriques Apache Flink, consultez la section Metric Types.

    Note

    AWS CloudWatch Metrics ne prend pas en charge le type de métrique Histogram Apache Flink. CloudWatch ne peut afficher que les métriques Apache Flink des types Count, Gauge et Meter.

  • Portée : la portée d'une métrique comprend son identifiant et un ensemble de paires clé-valeur qui indiquent comment la métrique sera signalée. CloudWatch L’identifiant d’une métrique se compose des éléments suivants :

    • Une portée système, qui indique le niveau auquel la métrique est signalée (par exemple, Opérateur).

    • Une portée utilisateur, qui définit des attributs tels que les variables utilisateur ou les noms de groupes de métriques. Ces attributs sont définis à l’aide de MetricGroup.addGroup(key, value) ou MetricGroup.addGroup(name).

    Pour plus d’informations sur la portée des métriques, consultez Scope.

Pour plus d’informations sur les métriques d’Apache Flink, consultez Metrics de la documentation d’Apache Flink.

Pour créer une métrique personnalisée dans votre service géré pour Apache Flink, vous pouvez accéder au système de métrique Apache Flink à partir de n’importe quelle fonction utilisateur qui étend RichFunction en appelant GetMetricGroup. Cette méthode renvoie un MetricGroupobjet que vous pouvez utiliser pour créer et enregistrer des métriques personnalisées. Le service géré pour Apache Flink indique toutes les métriques créées avec la clé de groupe KinesisAnalytics to CloudWatch. Les métriques personnalisées que vous définissez présentent les caractéristiques suivantes :

  • Votre métrique personnalisée possède un nom de métrique et un nom de groupe. Ces noms doivent être composés de caractères alphanumériques.

  • Les attributs que vous définissez dans le champ d'application utilisateur (à l'exception du KinesisAnalytics groupe de mesures) sont publiés sous forme de CloudWatch dimensions.

  • Les métriques personnalisées sont publiées au niveau Application par défaut.

  • Les dimensions (Tâche/Opérateur/Parallélisme) sont ajoutées à la métrique en fonction du niveau de surveillance de l’application. Vous définissez le niveau de surveillance de l'application à l'aide du MonitoringConfigurationparamètre de l'CreateApplicationaction ou du MonitoringConfigurationUpdateparamètre ou de l'UpdateApplicationaction.

Exemples

Les exemples de code suivants montrent comment créer une classe de mappage qui crée et incrémente une métrique personnalisée, et comment implémenter la classe de mappage dans votre application en l’ajoutant à un objet DataStream.

Métrique personnalisée du nombre d'enregistrements

L’exemple de code suivant montre comment créer une classe de mappage qui crée une métrique qui compte les enregistrements dans un flux de données (fonctionnalité identique à celle de la métrique numRecordsIn) :

private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }

Dans l’exemple précédent, la variable valueToExpose est incrémentée pour chaque enregistrement traité par l’application.

Après avoir défini votre classe de mappage, vous créez un flux intégré à l’application qui implémente la carte :

DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));

Pour le code complet de cette application, consultez Record Count Custom Metric Application.

Métrique personnalisée du nombre de mots

L’exemple de code suivant montre comment créer une classe de mappage qui crée une métrique qui compte les mots dans un flux de données :

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }

Dans l’exemple précédent, la variable counter est incrémentée pour chaque mot traité par l’application.

Après avoir défini votre classe de mappage, vous créez un flux intégré à l’application qui implémente la carte :

// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());

Pour le code complet de cette application, consultez Word Count Custom Metric Application.

Afficher les métriques personnalisées

Les métriques personnalisées pour votre application apparaissent dans la console CloudWatch Metrics du AWS/KinesisAnalyticstableau de bord, sous le groupe de métriques Application.