在适用于 Apache Flink 的亚马逊托管服务中使用自定义指标 - Managed Service for Apache Flink

适用于 Apache Flink 的亚马逊托管服务(亚马逊 MSF)以前被称为适用于 Apache Flink 的亚马逊 Kinesis Data Analytics。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在适用于 Apache Flink 的亚马逊托管服务中使用自定义指标

适用于 Apache Flink 的托管服务公开了 19 个指标 CloudWatch,包括资源使用量和吞吐量指标。此外,您可以创建自己的指标来跟踪应用程序特定的数据,例如处理事件或访问外部资源。

工作方式

Managed Service for Apache Flink 中的自定义指标使用 Apache Flink 指标系统。Apache Flink 指标具有以下属性:

  • 类型:指标的类型描述了它如何衡量和报告数据。可用的 Apache Flink 指标类型包括计数、计量表、直方图和计量器。有关 Apache Flink 指标类型的更多信息,请参阅指标类型

    注意

    AWS CloudWatch 指标不支持 Histogram Apache Flink 指标类型。 CloudWatch 只能显示计数、仪表和仪表类型的 Apache Flink 指标。

  • 范围:指标的范围由其标识符和一组键值对组成,这些键值对表示将如何报告该指标。 CloudWatch指标的标识符由以下内容组成:

    有关指标范围的更多信息,请参阅范围

有关 Apache Flink 指标的更多信息,请参阅 Apache Flink 文档中的指标

要在 Managed Service for Apache Flink 中创建自定义指标,您可以从任何通过调用 GetMetricGroup 扩展 RichFunction 的用户函数访问 Apache Flink 指标系统。此方法返回一个可用于创建和注册自定义指标的MetricGroup对象。适用于 Apache 的托管服务 Flink 报告使用组密钥KinesisAnalytics创建的所有指标。 CloudWatch您定义的自定义指标具有以下特征:

  • 您的自定义指标具有指标名称和组名称。根据 Pro metheus 命名规则,这些名称必须由字母数字字符组成。

  • 您在用户范围(KinesisAnalytics指标组除外)中定义的属性将作为 CloudWatch 维度发布。

  • 默认情况下,自定义指标是在该Application级别发布的。

  • 维度(任务/运算符/并行度)将根据应用程序的监控级别添加到指标中。您可以使用操作的参数或CreateApplication操作的或MonitoringConfiguration参数来设置应用程序的UpdateApplication监控级别。MonitoringConfigurationUpdate

查看创建映射类的示例

以下代码示例演示如何创建用于创建和增量自定义指标的映射类,以及如何通过将映射类添加到DataStream对象来在应用程序中实现该映射类。

记录计数自定义指标

以下代码示例演示如何创建映射类,该映射类用于创建对数据流中的记录进行计数的指标(功能与numRecordsIn指标相同):

private static class NoOpMapperFunction extends RichMapFunction<String, String> { private transient int valueToExpose = 0; private final String customMetricName; public NoOpMapperFunction(final String customMetricName) { this.customMetricName = customMetricName; } @Override public void open(Configuration config) { getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Program", "RecordCountApplication") .addGroup("NoOpMapperFunction") .gauge(customMetricName, (Gauge<Integer>) () -> valueToExpose); } @Override public String map(String value) throws Exception { valueToExpose++; return value; } }

在前面的示例中,应用程序处理的每条记录的valueToExpose变量都会递增。

定义映射类后,您将创建一个实现映射的应用程序内部流:

DataStream<String> noopMapperFunctionAfterFilter = kinesisProcessed.map(new NoOpMapperFunction("FilteredRecords"));

有关此应用程序的完整代码,请参阅记录计数自定义指标应用程序

字数自定义指标

以下代码示例演示如何创建映射类,该映射类用于创建对数据流中的字数进行计数的指标:

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private transient Counter counter; @Override public void open(Configuration config) { this.counter = getRuntimeContext().getMetricGroup() .addGroup("KinesisAnalytics") .addGroup("Service", "WordCountApplication") .addGroup("Tokenizer") .counter("TotalWords"); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>>out) { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { counter.inc(); out.collect(new Tuple2<>(token, 1)); } } } }

在前面的示例中,应用程序处理的每个字的counter变量都会递增。

定义映射类后,您将创建一个实现映射的应用程序内部流:

// Split up the lines in pairs (2-tuples) containing: (word,1), and // group by the tuple field "0" and sum up tuple field "1" DataStream<Tuple2<String, Integer>> wordCountStream = input.flatMap(new Tokenizer()).keyBy(0).sum(1); // Serialize the tuple to string format, and publish the output to kinesis sink wordCountStream.map(tuple -> tuple.toString()).addSink(createSinkFromStaticConfig());

有关此应用程序的完整代码,请参阅字数计数自定义指标应用程序

查看自定义指标

您的应用程序的自定义指标显示在 M CloudWatch etrics 控制台的 AWS/KinesisAnalytics控制面板,在 “应用程序” 指标组下。