Usando métricas personalizadas com o Amazon Managed Service para Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usando métricas personalizadas com o Amazon Managed Service para Apache Flink

O Managed Service for Apache Flink expõe 19 métricas CloudWatch, incluindo métricas de uso de recursos e taxa de transferência. Além disso, você pode criar suas próprias métricas para rastrear dados específicos do aplicativo, como eventos de processamento ou acesso a recursos externos.

Este tópico contém as seguintes seções:

Como funciona

As métricas personalizadas no Managed Service for Apache Flink usam o sistema métrico Apache Flink. As métricas do Apache Flink têm os atributos a seguir:

  • Tipo: o tipo de uma métrica descreve como ela mede e relata dados. Os tipos de métricas disponíveis no Apache Flink incluem Contagem, Indicador, Histograma e Medidor. Para obter mais informações sobre os tipos de métricas do Apache Flink, consulte Tipos de métricas.

    nota

    AWS CloudWatch As métricas não são compatíveis com o tipo de métrica Histogram Apache Flink. CloudWatch só pode exibir métricas do Apache Flink dos tipos Count, Gauge e Meter.

  • Escopo: o escopo de uma métrica consiste em seu identificador e um conjunto de pares de valores-chave que indicam como a métrica será reportada. CloudWatch O identificador de uma métrica consiste no seguinte:

    • Um escopo do sistema, que indica o nível no qual a métrica é relatada (por exemplo, Operator).

    • Um escopo de usuário, que define atributos como variáveis de usuário ou nomes de grupos de métricas. Esses atributos são definidos usando MetricGroup.addGroup(key, value) ou MetricGroup.addGroup(name).

    Para obter mais informações sobre as métricas de escopos, consulte Escopo.

Para obter mais informações sobre a métrica do Apache Flink, consulte Métrica na documentação do Apache Flink.

Para criar uma métrica personalizada em seu Managed Service for Apache Flink, você pode acessar o sistema métrico do Apache Flink a partir de qualquer função do usuário que se estenda RichFunction por meio de chamadas para GetMetricGroup. Esse método retorna um MetricGroupobjeto que você pode usar para criar e registrar métricas personalizadas. O Managed Service for Apache Flink relata todas as métricas criadas com a chave de grupo paraKinesisAnalytics. CloudWatch As métricas personalizadas que você define têm as seguintes características:

  • Sua métrica personalizada tem um nome de métrica e um nome de grupo. Esses nomes devem conter caracteres alfanuméricos.

  • Os atributos que você define no escopo do usuário (exceto para o grupo de KinesisAnalytics métricas) são publicados como CloudWatch dimensões.

  • Por padrão, as métricas personalizadas são publicadas no nível Application.

  • As dimensões (Task/Operator/Parallelism) são adicionadas à métrica com base no nível de monitoramento do aplicativo. Você define o nível de monitoramento do aplicativo usando o MonitoringConfigurationparâmetro da CreateApplicationação ou o MonitoringConfigurationUpdateparâmetro ou da UpdateApplicationação.

Exemplos

Os exemplos de código a seguir demonstram como criar uma classe de mapeamento que cria e incrementa uma métrica personalizada e como implementar a classe de mapeamento em seu aplicativo adicionando-a a um objeto DataStream.

Métrica personalizada de contagem de registros

O exemplo de código a seguir demonstra como criar uma classe de mapeamento que cria uma métrica que conta registros em um fluxo de dados (a mesma funcionalidade da métrica numRecordsIn):

private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }

No exemplo anterior, a variável valueToExpose é incrementada para cada registro que o aplicativo processa.

Depois de definir sua classe de mapeamento, você cria um fluxo no aplicativo que implementa o mapa:

DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));

Para obter o código completo desse aplicativo, consulte Aplicativo de métrica personalizada para contagem de registros.

Métrica personalizada de contagem de palavras

O exemplo de código a seguir demonstra como criar uma classe de mapeamento que cria uma métrica que conta palavras em um fluxo de dados:

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }

No exemplo anterior, a variável counter é incrementada para cada palavra que o aplicativo processa.

Depois de definir sua classe de mapeamento, você cria um fluxo no aplicativo que implementa o mapa:

// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());

Para obter o código completo desse aplicativo, consulte Aplicativo de métrica personalizada para contagem de palavras.

Visualizando métricas personalizadas

As métricas personalizadas do seu aplicativo aparecem no console de CloudWatch métricas no AWS/KinesisAnalyticspainel, no grupo de métricas do aplicativo.