Using Custom Metrics with Amazon Kinesis Data Analytics for Apache Flink - Amazon Kinesis Data Analytics

Using Custom Metrics with Amazon Kinesis Data Analytics for Apache Flink

Kinesis Data Analytics for Apache Flink exposes 19 metrics to CloudWatch, including metrics for resource usage and throughput. In addition, you can create your own metrics to track application-specific data, such as processing events or accessing external resources.

This topic contains the following sections:

How It Works

Custom metrics in Kinesis Data Analytics use the Apache Flink metric system. Apache Flink metrics have the following attributes:

  • Type: A metric's type describes how it measures and reports data. Available types include Count, Gauge, Histogram, and Meter. For more information about Apache Flink metric types, see Metric Types.

    Note

    CloudWatch Logs does not support the Histogram metric type. CloudWatch will not display metrics of this type.

  • Scope: A metric's scope consists of its identifier and a set of key-value pairs that indicate how the metric will be reported to CloudWatch. A metric's identifier consists of the following:

    For more information about metric scope, see Scope.

For more information about Apache Flink metrics, see Metrics in the Apache Flink documentation.

To create a custom metric in your Kinesis Data Analytics for Apache Flink application, you can access the Apache Flink metric system from any user function that extends RichFunction by calling GetMetricGroup. This method returns a MetricGroup object you can use to create and register custom metrics. Kinesis Data Analytics reports all metrics created with the group key KinesisAnalytics to CloudWatch. Custom metrics that you define have the following characteristics:

  • Attributes that you define in user scope (except for the KinesisAnalytics metric group) are published as CloudWatch dimensions.

  • Custom metrics are published at the Application level by default.

  • Dimensions (Task/ Operator/ Parallelism) are added to the metric based on the application's monitoring level. You set the application's monitoring level using the MonitoringConfiguration parameter of the CreateApplication action, or the or MonitoringConfigurationUpdate parameter of the UpdateApplication action.

Examples

The following code examples demonstrate how to create a mapping class the creates and increments a custom metric, and how to implement the mapping class in your application by adding it to a DataStream object.

Record Count Custom Metric

The following code example demonstrates how to create a mapping class that creates a metric that counts records in a data stream (the same functionality as the numRecordsIn metric):

private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("kinesisanalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }

In the preceding example, the valueToExpose variable is incremented for each record that the application processes.

After defining your mapping class, you then create an in-application stream that implements the map:

DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));

For the complete code for this application, see Record Count Custom Metric Application.

Word Count Custom Metric

The following code example demonstrates how to create a mapping class that creates a metric that counts words in a data stream:

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("kinesisanalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }

In the preceding example, the counter variable is incremented for each word that the application processes.

After defining your mapping class, you then create an in-application stream that implements the map:

// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());

For the complete code for this application, see Word Count Custom Metric Application.

Viewing Custom Metrics

Custom metrics for your application appear in the CloudWatch Metrics console in the AWS/KinesisAnalytics dashboard, under the Application metric group.


                Console screenshot showing the settings on the create
                    application page.