Use custom metrics with Amazon Managed Service for Apache Flink - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink was previously known as Amazon Kinesis Data Analytics for Apache Flink.

Use custom metrics with Amazon Managed Service for Apache Flink

Managed Service for Apache Flink exposes 19 metrics to CloudWatch, including metrics for resource usage and throughput. In addition, you can create your own metrics to track application-specific data, such as processing events or accessing external resources.

How it works

Custom metrics in Managed Service for Apache Flink use the Apache Flink metric system. Apache Flink metrics have the following attributes:

  • Type: A metric's type describes how it measures and reports data. Available Apache Flink metric types include Count, Gauge, Histogram, and Meter. For more information about Apache Flink metric types, see Metric Types.

    Note

    AWS CloudWatch Metrics does not support the Histogram Apache Flink metric type. CloudWatch can only display Apache Flink metrics of the Count, Gauge, and Meter types.

  • Scope: A metric's scope consists of its identifier and a set of key-value pairs that indicate how the metric will be reported to CloudWatch. A metric's identifier consists of the following:

    For more information about metric scope, see Scope.

For more information about Apache Flink metrics, see Metrics in the Apache Flink documentation.

To create a custom metric in your Managed Service for Apache Flink, you can access the Apache Flink metric system from any user function that extends RichFunction by calling GetMetricGroup. This method returns a MetricGroup object you can use to create and register custom metrics. Managed Service for Apache Flink reports all metrics created with the group key KinesisAnalytics to CloudWatch. Custom metrics that you define have the following characteristics:

  • Your custom metric has a metric name and a group name. These names must consist of alphanumeric characters.

  • Attributes that you define in user scope (except for the KinesisAnalytics metric group) are published as CloudWatch dimensions.

  • Custom metrics are published at the Application level by default.

  • Dimensions (Task/ Operator/ Parallelism) are added to the metric based on the application's monitoring level. You set the application's monitoring level using the MonitoringConfiguration parameter of the CreateApplication action, or the or MonitoringConfigurationUpdate parameter of the UpdateApplication action.

View examples for creating a mapping class

The following code examples demonstrate how to create a mapping class that creates and increments a custom metric, and how to implement the mapping class in your application by adding it to a DataStream object.

Record count custom metric

The following code example demonstrates how to create a mapping class that creates a metric that counts records in a data stream (the same functionality as the numRecordsIn metric):

private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }

In the preceding example, the valueToExpose variable is incremented for each record that the application processes.

After defining your mapping class, you then create an in-application stream that implements the map:

DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));

For the complete code for this application, see Record Count Custom Metric Application.

Word count custom metric

The following code example demonstrates how to create a mapping class that creates a metric that counts words in a data stream:

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }

In the preceding example, the counter variable is incremented for each word that the application processes.

After defining your mapping class, you then create an in-application stream that implements the map:

// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());

For the complete code for this application, see Word Count Custom Metric Application.

View custom metrics

Custom metrics for your application appear in the CloudWatch Metrics console in the AWS/KinesisAnalytics dashboard, under the Application metric group.