現代のデータ駆動型のアプリケーションでは、データの重要性は時間の経過と共に低下し、予測データから事後的データへとその価値が移行します。そのため、お客様はデータをリアルタイムで処理して、より迅速な意思決定を行いたいと考えています。IoT センサーなどからのリアルタイムのデータフィードを処理する場合、取り込み中のネットワーク遅延やその他のソース関連の障害により、データの到着が順序付けされていないことや、処理に遅延が発生することがあります。AWS Glue プラットフォームの一部である AWS Glue Streaming は、これらの機能を基盤として、Apache Spark 構造化ストリーミングを利用したスケーラブルでサーバーレスなストリーミング ETL を提供し、ユーザーがリアルタイムでデータを処理できるようにします。
このトピックでは、高度なストリーミングの概念と AWS Glue Streaming の機能について説明します。
ストリームを処理する際の時間に関する考慮事項
ストリームを処理する際の時間には、4 つの概念があります。

-
[イベント時間] – イベントが発生した時間。ほとんどの場合、このフィールドはソースでイベントデータ自体に埋め込まれています。
-
[イベントタイムウィンドウ] – 2 つのイベント時間の間の時間枠。上の図に示されているように、W1 は 17:00 から 17:10 までのイベントタイムウィンドウです。各イベントタイムウィンドウは、複数のイベントをグループ化したものです。
-
[トリガー時間] – トリガー時間により、データの処理と結果の更新が行われる頻度が制御されます。これは、マイクロバッチ処理が開始された時間です。
-
[取り込み時間] – ストリームデータがストリーミングサービスに取り込まれた時間。イベント時間がイベント自体に組み込まれていない場合、この時間をウィンドウ処理に使用できる場合があります。
ウィンドウ処理
ウィンドウ処理は、複数のイベントをイベントタイムウィンドウごとにグループ化して集計する手法です。以下の例で、ウィンドウ処理の利点と、どのような場合に使用するかを説明します。
ビジネスユースケースに応じて、Spark では 3 種類のタイムウィンドウがサポートされます。
-
タンブリングウィンドウ - 集計する、一連の重なり合わない固定サイズのイベントタイムウィンドウ。
-
スライディングウィンドウ - 「固定サイズ」であるという点ではタンブリングウィンドウに似ていますが、スライドの継続時間がウィンドウ自体の継続時間よりも短い限り、ウィンドウが重なり合ったりスライドしたりできます。
-
セッションウィンドウ - 入力データイベントで開始され、一定の間隔または非アクティブ時間内に入力が受信されている限り、長くなり続けます。セッションウィンドウは、入力に応じて、ウィンドウの長さが静的サイズまたは動的サイズになります。
タンブリングウィンドウ
タンブリングウィンドウは、集計する、一連の重なり合わない固定サイズのイベントタイムウィンドウです。実際の例を使って理解してみましょう。

ABC Auto 社は、スポーツカーの新しいブランドのマーケティングキャンペーンを行いたいと考えています。彼らは、スポーツカーのファンが最も多い都市を選びたいと考えています。この目標を達成するために、ウェブサイトで車を紹介する 15 秒間の短い広告を掲載しています。すべての「クリック」とそれに対応する「都市」が記録され、Amazon Kinesis Data Streams にストリーミングされます。10 分のウィンドウでクリック数をカウントし、都市別にグループ化して、需要が最も多い都市を確認したいと考えています。この集計の出力を次に示します。
window_start_time | window_end_time | city | total_clicks |
---|---|---|---|
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | ダラス | 75 |
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | シカゴ | 10 |
2023-07-10 17:20:00 | 2023-07-10 17:30:00 | ダラス | 20 |
2023-07-10 17:20:00 | 2023-07-10 17:30:00 | シカゴ | 50 |
上で説明したように、これらのイベントタイムウィンドウは、トリガー時間の間隔とは異なります。例えば、トリガー時間が 1 分ごとであっても、出力結果では 10 分の重なり合わない集計ウィンドウのみが表示されます。最適化するには、トリガー間隔をイベントタイムウィンドウに合わせることをお勧めします。
上の表で、17:00~17:10 のウィンドウでダラスでは 75 回のクリックがあり、シカゴでは 10 回のクリックがありました。また、どの都市についても 17:10~17:20 のウィンドウのデータがないため、このウィンドウは省略されています。
これで、ダウンストリーム分析アプリケーションでこのデータをさらに分析して、マーケティングキャンペーンを実施するのに最も適した都市を特定できます。
AWS Glue でのタンブリングウィンドウの使用
-
Amazon Kinesis Data Streams DataFrame を作成し、そこから読み取ります。例:
parsed_df = kinesis_raw_df \ .selectExpr('CAST(data AS STRING)') \ .select(from_json("data", ticker_schema).alias("data")) \ .select('data.event_time','data.ticker','data.trade','data.volume', 'data.price')
-
タンブリングウィンドウでデータを処理します。以下の例では、入力フィールド「event_time」に基づいて 10 分のタンブリングウィンドウでデータがグループ化され、その出力が Amazon S3 データレイクに書き込まれます。
grouped_df = parsed_df \ .groupBy(window("event_time", "10 minutes"), "city") \ .agg(sum("clicks").alias("total_clicks")) summary_df = grouped_df \ .withColumn("window_start_time", col("window.start")) \ .withColumn("window_end_time", col("window.end")) \ .withColumn("year", year("window_start_time")) \ .withColumn("month", month("window_start_time")) \ .withColumn("day", dayofmonth("window_start_time")) \ .withColumn("hour", hour("window_start_time")) \ .withColumn("minute", minute("window_start_time")) \ .drop("window") write_result = summary_df \ .writeStream \ .format("parquet") \ .trigger(processingTime="10 seconds") \ .option("checkpointLocation", "s3a://bucket-stock-stream/stock-stream-catalog-job/checkpoint/") \ .option("path", "s3a://bucket-stock-stream/stock-stream-catalog-job/summary_output/") \ .partitionBy("year", "month", "day") \ .start()
スライディングウィンドウ
スライディングウィンドウは「固定サイズ」であるという点ではタンブリングウィンドウに似ていますが、スライドの継続時間がウィンドウ自体の継続時間よりも短い限り、ウィンドウが重なり合ったりスライドしたりできます。スライディングの性質上、1 つの入力を複数のウィンドウにバインドできます。

理解を深めるために、クレジットカードの不正使用の可能性を検知したい銀行の例を考えてみましょう。ストリーミングアプリケーションでは、クレジットカードの取引の流れを継続的にモニタリングできます。これらの取引が 10 分間のウィンドウに集計され、5 分ごとにウィンドウが先にスライドされて、最も古い 5 分間のデータが削除され、最新の 5 分間の新しいデータが追加されます。各ウィンドウ内で、取引が国ごとにグループ化され、疑わしいパターンがないかチェックできます。例えば、米国での取引の直後にオーストラリアで発生した取引などです。わかりやすくするために、取引総額が 100 USD を超える場合は、その取引を不正使用として分類してみましょう。このようなパターンが検出された場合は、不正使用の可能性があることが示され、カードが凍結される可能性があります。
クレジットカード処理システムは、国と共に各カード ID の取引イベントのストリームを Kinesis に送信しています。AWS Glue ジョブが分析を実行し、以下の集計された出力を生成します。
window_start_time | window_end_time | card_last_four | country | total_amount |
---|---|---|---|---|
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | 米国 | 85 |
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | オーストラリア | 10 |
2023-07-10 17:05:45 | 2023-07-10 17:15:45 | 6544 | 米国 | 50 |
2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | 米国 | 50 |
2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | オーストラリア | 150 |
上記の集計に基づき、取引金額で合計された、5 分ごとにスライドする 10 分のウィンドウが表示されます。外れ値 (オーストラリアでの 150 USD の取引) がある 17:10~17:20 のウィンドウで異常が検出されます。AWS Glue は、この異常を検出し、boto3 を使用して問題のキーを含むアラームイベントを SNS トピックにプッシュすることができます。さらに、Lambda 関数はこのトピックにサブスクライブしてアクションを実行できます。
スライディングウィンドウでのデータの処理
以下のように、group-by
句とウィンドウ関数を使用してスライディングウィンドウを実装します。
grouped_df = parsed_df \
.groupBy(window(col("event_time"), "10 minute", "5 min"), "country", "card_last_four") \
.agg(sum("tx_amount").alias("total_amount"))
セッションウィンドウ
固定サイズである上記の 2 つのウィンドウとは異なり、セッションウィンドウは、入力に応じて、ウィンドウの長さが静的サイズまたは動的サイズになります。セッションウィンドウは入力データイベントで開始され、一定の間隔または非アクティブ時間内に入力が受信されている限り、長くなり続けます。

例を見てみましょう。ABC hotel 社は、1 週間のうちで最も混雑する時間帯を調べて、宿泊客により良い取引を提供したいと考えています。宿泊客がチェックインするとすぐにセッションウィンドウが開始され、Spark はそのイベントタイムウィンドウの間、集計する状態を維持します。宿泊客がチェックインするたびに、イベントが生成されて Amazon Kinesis Data Streams に送信されます。このホテルは、15 分間チェックインがない場合はイベントタイムウィンドウを終了できると判断しました。新しいチェックインがあると、次のイベントタイムウィンドウが再度開始されます。出力は次のとおりです。
window_start_time | window_end_time | city | total_checkins |
---|---|---|---|
2023-07-10 17:02:00 | 2023-07-10 17:30:00 | ダラス | 50 |
2023-07-10 17:02:00 | 2023-07-10 17:30:00 | シカゴ | 25 |
2023-07-10 17:40:00 | 2023-07-10 18:20:00 | ダラス | 75 |
2023-07-10 18:50:45 | 2023-07-10 19:15:45 | ダラス | 20 |
最初のチェックインは、event_time=17:02 に行われました。集計イベントタイムウィンドウは 17:02 に開始されます。この集計は、15 分以内にイベントが受信される限り継続されます。上の例では、17:15 に最後のイベントが受信され、その後の 15 分間はイベントがありませんでした。その結果、Spark はこのイベントタイムウィンドウを 17:15+15 分 = 17:30 に終了し、17:02~17:30 として設定しました。新しいチェックインデータイベントが受信された 17:47 に、新しいイベントタイムウィンドウが開始されました。
セッションウィンドウでのデータの処理
group-by
句とウィンドウ関数を使用してスライディングウィンドウを実装します。
grouped_df = parsed_df \
.groupBy(session_window(col("event_time"), "10 minute"), "city") \
.agg(count("check_in").alias("total_checkins"))
出力モード
出力モードとは、無制限テーブルからの結果が外部シンクに書き込まれるときのモードです。3 つのモードを使用できます。次の例では、各マイクロバッチでデータの行がストリーミングおよび処理されるときに、単語の出現回数をカウントしています。
-
完全モード – 現在のイベントタイムウィンドウで単語数が更新されていなくても、マイクロバッチ処理のたびに結果テーブル全体がシンクに書き込まれます。
-
付加モード – デフォルトのモードで、最後のトリガー以降に結果テーブルに追加された新しい単語や行のみがシンクに書き込まれます。このモードは、map、flatMap、filter などのクエリのステートレスストリーミングに適しています。
-
更新モード – 最後のトリガー以降に更新または追加された結果テーブルの単語や行のみがシンクに書き込まれます。
注記
セッションウィンドウでは「更新」の出力モードはサポートされていません。
遅延データとウォーターマークの処理
リアルタイムデータを扱う場合、ネットワーク遅延やアップストリームの障害によりデータの到着が遅れる可能性があるため、失われたイベントタイムウィンドウに対して再び集計を実行するメカニズムが必要です。ただし、そのためにはステートを維持する必要があります。同時に、ステートのサイズを制限するために、古いデータをクリーンアップする必要があります。Spark バージョン 2.1 では、ステートを維持し、遅延データのしきい値をユーザーが指定することができる、ウォーターマークと呼ばれる機能のサポートが追加されました。
前述の株価ティッカーの例について、遅延データの許容しきい値が 10 分以内であると考えてみましょう。単純化のために、タンブリングウィンドウ、ティッカーは AMZ、取引を BUY と仮定します。

上の図では、10 分間のタンブリングウィンドウで合計株数を計算しています。17:00、17:10、および 17:20 にトリガーされています。タイムラインの矢印の上には入力データストリームがあり、下には無制限の結果のテーブルがあります。
最初の 10 分間のタンブリングウィンドウでは、event_time に基づいて集計され、total_volume は 30 と計算されました。2 番目のイベントタイムウィンドウで、Spark は event_time=17:02 の最初のデータイベントを取得しました。これは Spark でこれまでに確認された event_time の最大値であるため、ウォーターマークのしきい値がこの 10 分前に設定されます (つまり、watermark_event_time=16:52)。event_time が 16:52 以降のデータイベントは時間が制限された集計で考慮され、それより前のデータイベントはドロップされます。これにより、Spark はさらに 10 分間中間状態を維持して、遅延データを受け入れることができます。実時間で 17:08 ごろに Spark が event_time=16:54 のイベントを受信し、これはしきい値内でした。そこで、Spark は「16:50~17:00」のイベントタイムウィンドウを再計算し、合計株数が 30 から 60 に更新されました。
しかし、トリガー時間 17:20 で、Spark が event_time=17:15 のイベントを受信したときに watermark_event_time が 17:05 に設定されます。そのため、event_time=17:03 の遅延データイベントは「遅すぎる」と見なされ、無視されました。
Watermark Boundary = Max(Event Time) - Watermark Threshold
AWS Glue でのウォーターマークの使用
Spark は、ウォーターマークの境界が越えられるまで、外部シンクへのデータの出力または書き込みを行いません。AWS Glue でウォーターマークを実装するには、以下の例を参照してください。
grouped_df = parsed_df \
.withWatermark("event_time", "10 minutes") \
.groupBy(window("event_time", "5 minutes"), "ticker") \
.agg(sum("volume").alias("total_volume"))