选择您的 Cookie 首选项

我们使用必要 Cookie 和类似工具提供我们的网站和服务。我们使用性能 Cookie 收集匿名统计数据,以便我们可以了解客户如何使用我们的网站并进行改进。必要 Cookie 无法停用,但您可以单击“自定义”或“拒绝”来拒绝性能 Cookie。

如果您同意,AWS 和经批准的第三方还将使用 Cookie 提供有用的网站功能、记住您的首选项并显示相关内容,包括相关广告。要接受或拒绝所有非必要 Cookie,请单击“接受”或“拒绝”。要做出更详细的选择,请单击“自定义”。

高级 AWS Glue 流式处理概念

聚焦模式
高级 AWS Glue 流式处理概念 - AWS Glue

在当代数据驱动的应用中,数据的重要性会随着时间的推移而减弱,其价值也从预测性转变为被动性。因此,客户希望实时处理数据,以便更快地做出决策。在处理实时数据源时,比如来自 IoT 传感器的数据,由于网络延迟和摄取过程中其他源方面的故障,数据可能会无序到达或在处理过程中出现延迟。作为 AWS Glue 平台的一部分,AWS Glue 流式处理基于这些功能构建,提供可扩展的无服务器流 ETL,由 Apache Spark 结构化流提供支持,使用户能够进行实时数据处理。

在本主题中,我们将探讨 AWS Glue 流式处理的高级流概念和功能。

处理流时的时间注意事项

处理流时有四个时间概念:

屏幕截图显示了一个 Amazon CloudWatch 监控日志(对于上述示例为 AWS Glue),并查看了所需的执行程序数量(橙色线),然后根据该数量扩缩了执行程序(蓝色线),而无需手动调整。
  • 事件时间 - 事件发生的时间。在大多数情况下,该字段在源处嵌入事件数据本身。

  • 事件时间窗口 - 两个事件时间之间的时间范围。如上图所示,W1 是从 17:00 到 17:10 的事件时间窗口。每个事件时间窗口都是多个事件的分组。

  • 触发时间 - 触发时间控制数据处理和结果更新的频率。这是微批次开始的时间。

  • 摄取时间 - 将流式处理数据摄取到流服务中的时间。如果事件时间未嵌入事件本身,则在某些情况下,此时间可用于窗口化。

窗口化

窗口化是一种按事件时间窗口对多个事件进行分组和聚合的技术。我们将在以下示例中探讨窗口化的好处以及何时使用。

根据业务用例,Spark 支持三种类型的时间窗口。

  • 滚动窗口 - 一系列不重叠的固定大小事件时间窗口,您可以在这些窗口上聚合。

  • 滑动窗口 - 从“固定大小”的角度来看,它类似于滚动窗口,但只要滑动的持续时间小于窗口本身的持续时间,窗口就可以重叠或滑动。

  • 会话窗口 - 从输入数据事件开始,只要在间隙或非活动时间内收到输入,它就会继续自行扩展。会话窗口的窗口长度可以是静态的,也可以是动态的,具体取决于输入。

滚动窗口

滚动窗口是一系列不重叠的固定大小事件时间窗口,您可以在这些窗口上聚合。为便于理解,我们来看一个真实的例子。

屏幕截图显示了一个 Amazon CloudWatch 监控日志(对于上述示例为 AWS Glue),并查看了所需的执行程序数量(橙色线),然后根据该数量扩缩了执行程序(蓝色线),而无需手动调整。

ABC 汽车公司想为一款新型跑车做一个营销活动。他们想挑选一个拥有庞大跑车迷群体的城市。为此,他们在网站上投放了一段 15 秒钟的广告短片,介绍这款汽车。所有“点击”和相应的“城市”都会被记录下来,并流送到 Amazon Kinesis Data Streams。我们想计算 10 分钟内的点击次数,然后按城市分组,看看哪个城市的需求量最大。以下是聚合的输出。

window_start_time window_end_time city total_clicks
2023-07-10 17:00:00 2023-07-10 17:10:00 达拉斯 75
2023-07-10 17:00:00 2023-07-10 17:10:00 芝加哥 10
2023-07-10 17:20:00 2023-07-10 17:30:00 达拉斯 20
2023-07-10 17:20:00 2023-07-10 17:30:00 芝加哥 50

如上所述,这些事件时间窗口与触发时间间隔不同。例如,即使您的触发时间是每分钟,输出结果也只会显示 10 分钟不重叠的聚合窗口。为了进行优化,最好使触发间隔与事件时间窗口保持一致。

在上表中,达拉斯在 17:00-17:10 窗口内有 75 次点击,而芝加哥有 10 次点击。此外,17:10-17:20 窗口内没有任何城市的数据,因此忽略此窗口。

现在,您可以使用下游分析应用程序对这些数据进行进一步分析,来确定最适合开展营销活动的城市。

在 AWS Glue 中使用滚动窗口
  1. 创建一个 Amazon Kinesis Data Streams DataFrame 并从中读取。例如:

    parsed_df = kinesis_raw_df \ .selectExpr('CAST(data AS STRING)') \ .select(from_json("data", ticker_schema).alias("data")) \ .select('data.event_time','data.ticker','data.trade','data.volume', 'data.price')
  2. 在滚动窗口中处理数据。在下面的示例中,根据 10 分钟滚动窗口中的输入字段“event_time”对数据进行分组,并将输出写入 Amazon S3 数据湖。

    grouped_df = parsed_df \ .groupBy(window("event_time", "10 minutes"), "city") \ .agg(sum("clicks").alias("total_clicks")) summary_df = grouped_df \ .withColumn("window_start_time", col("window.start")) \ .withColumn("window_end_time", col("window.end")) \ .withColumn("year", year("window_start_time")) \ .withColumn("month", month("window_start_time")) \ .withColumn("day", dayofmonth("window_start_time")) \ .withColumn("hour", hour("window_start_time")) \ .withColumn("minute", minute("window_start_time")) \ .drop("window") write_result = summary_df \ .writeStream \ .format("parquet") \ .trigger(processingTime="10 seconds") \ .option("checkpointLocation", "s3a://bucket-stock-stream/stock-stream-catalog-job/checkpoint/") \ .option("path", "s3a://bucket-stock-stream/stock-stream-catalog-job/summary_output/") \ .partitionBy("year", "month", "day") \ .start()

滑动窗口

从“固定大小”的角度来看,滑动窗口类似于滚动窗口,但只要滑动的持续时间小于窗口本身的持续时间,窗口就可以重叠或滑动。由于滑动的性质,一个输入可以绑定到多个窗口。

屏幕截图显示了滑动窗口示例。

为了更好地理解,我们以一家银行为例,该银行想要检测潜在的信用卡欺诈行为。流式处理应用程序可以监控连续的信用卡交易流。这些交易可以聚合到持续时间为 10 分钟的窗口中,每隔 5 分钟,窗口就会向前滑动,删除最早 5 分钟的旧数据,添加最近 5 分钟的新数据。在每个窗口内,可按国家/地区对交易进行分组,检查可疑模式,例如在美国进行一笔交易后,紧接着在澳大利亚进行另一笔交易。为简单起见,当交易总额超过 100 美元时,我们将此类交易归类为欺诈。如果检测到这种模式,则表明存在潜在的欺诈行为,信用卡可能会被冻结。

信用卡处理系统会将每张卡片 ID 以及国家/地区的一系列交易事件发送到 Kinesis。AWS Glue 作业执行分析并生成以下聚合输出。

window_start_time window_end_time card_last_four country total_amount
2023-07-10 17:00:00 2023-07-10 17:10:00 6544 美国 85
2023-07-10 17:00:00 2023-07-10 17:10:00 6544 澳大利亚 10
2023-07-10 17:05:45 2023-07-10 17:15:45 6544 美国 50
2023-07-10 17:10:45 2023-07-10 17:20:45 6544 美国 50
2023-07-10 17:10:45 2023-07-10 17:20:45 6544 澳大利亚 150

根据上述聚合数据,可以看到 10 分钟窗口每 5 分钟滑动一次,交易金额相加。在 17:10 - 17:20 的窗口中检测到异常,其中有一个异常值,这是在澳大利亚的一笔 150 美元的交易。AWS Glue 可以检测到这种异常,然后使用 boto3 将带有违规键的警报事件推送到 SNS 主题。此外,Lambda 函数可以订阅该主题并可以采取行动。

在滑动窗口中处理数据

group-by 子句和窗口函数用于实现滑动窗口,如下所示。

grouped_df = parsed_df \ .groupBy(window(col("event_time"), "10 minute", "5 min"), "country", "card_last_four") \ .agg(sum("tx_amount").alias("total_amount"))

会话窗口

与上面两个固定大小的窗口不同,会话窗口的窗口长度可以是静态的,也可以是动态的,具体取决于输入。会话窗口从输入数据事件开始,只要在间隙或非活动时间内收到输入,它就会继续自行扩展。

屏幕截图显示了滑动窗口示例。

举个例子。ABC 酒店公司希望了解一周中最繁忙的时间是什么时候,从而为客人提供更好的服务。客人入住后,会话窗口就会启动,Spark 会维护该事件时间窗口的聚合状态。每当有客人入住,都会生成一个事件并发送到 Amazon Kinesis Data Streams。酒店决定,如果 15 分钟内没有入住,就可以关闭活动时间窗口。一旦有新的入住,下一个活动时间窗口就重新开始。输出如下所示。

window_start_time window_end_time city total_checkins
2023-07-10 17:02:00 2023-07-10 17:30:00 达拉斯 50
2023-07-10 17:02:00 2023-07-10 17:30:00 芝加哥 25
2023-07-10 17:40:00 2023-07-10 18:20:00 达拉斯 75
2023-07-10 18:50:45 2023-07-10 19:15:45 达拉斯 20

第一次入住发生在 event_time=17:02。聚合事件时间窗口将从 17:02 开始。只要我们在 15 分钟内收到事件,聚合就会继续。在上面的示例中,我们收到的最后一个事件是在 17:15,然后接下来的 15 分钟内没有事件。结果,Spark 在 17:15+15min = 17:30 关闭了事件时间窗口,并将其设置为 17:02 - 17:30。当收到新的入住数据事件时,它在 17:47 启动了一个新的活动时间窗口。

在会话窗口中处理数据

group-by 子句和窗口函数用于实现滑动窗口。

grouped_df = parsed_df \ .groupBy(session_window(col("event_time"), "10 minute"), "city") \ .agg(count("check_in").alias("total_checkins"))

输出模式

输出模式是将来自无界表的结果写入外部接收器的模式。共有三种模式。在下面的示例中,在每个微批次中流送和处理数据行时,系统将会计算词的出现次数。

  • 完整模式 - 在每个微批次处理后,即使在当前事件时间窗口中没有更新字数,也会将整个结果表写入接收器。

  • 追加模式 - 这是默认模式,只有自上次触发以来添加到结果表中的新词和/或行才会写入接收器。这种模式适用于无状态流式处理查询,比如 map、flatMap、filter 等。

  • 更新模式 - 只有自上次触发以来更新或添加的结果表中的词和/或行才会写入接收器。

    注意

    会话窗口不支持输出模式 = "update"。

处理延迟数据和水印

在处理实时数据时,由于网络延迟和上游故障,数据的到达可能会延迟,我们需要一种机制,对错过的事件时间窗口再次执行聚合。但要做到这一点,就需要维护状态。同时,需要清理旧数据以限制状态的大小。Spark 2.1 版新增了对水印功能的支持,该功能可维护状态并允许用户指定延迟数据的阈值。

参考上面的股票代码示例,我们考虑一下允许的延迟数据阈值不超过 10 分钟的情况。为了简单起见,我们假设采用滚动窗口,股票代码为 AMZ,交易为买入。

屏幕截图显示了一个示例输入流,以及在数据集中添加延迟数据时的结果表。

在上图中,我们计算的是 10 分钟滚动窗口内的总交易量。触发时间分别为 17:00、17:10 和 17:20。时间轴箭头上方是输入数据流,下方是无界结果表。

在第一个 10 分钟滚动窗口中,我们根据 event_time 进行聚合,计算的 total_volume 为 30。在第二个事件时间窗口中,Spark 收到了第一个数据事件 event_time=17:02。由于这是 Spark 迄今为止看到的最大 event_time,因此将水印阈值设置为 10 分钟前(即 watermark_event_time=16:52)。event_time 在 16:52 之后的任何数据事件都将被考虑进行时间限制聚合,而在此之前的任何数据事件都将被丢弃。这样,Spark 就能将中间状态多维持 10 分钟,以容纳延迟数据。时钟时间 17:08 左右,Spark 收到一个 event_time=16:54 的事件,该事件在阈值内。因此,Spark 重新计算了“16:50-17:00”事件时间窗口,总交易量从 30 更新为 60。

但在触发时间 17:20,当 Spark 接收到 event_time = 17:15 的事件时,它会设置 watermark_event_time=17:05。因此,event_time=17:03 的延迟数据事件被认为“太迟”而被忽略。

Watermark Boundary = Max(Event Time) - Watermark Threshold

在 AWS Glue 中使用水印

在越过水印边界之前,Spark 不会向外部接收器发出或写入数据。要在 AWS Glue 中实现水印,请参阅以下示例。

grouped_df = parsed_df \ .withWatermark("event_time", "10 minutes") \ .groupBy(window("event_time", "5 minutes"), "ticker") \ .agg(sum("volume").alias("total_volume"))
隐私网站条款Cookie 首选项
© 2025, Amazon Web Services, Inc. 或其附属公司。保留所有权利。