쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

고급 AWS Glue 스트리밍 개념

포커스 모드
고급 AWS Glue 스트리밍 개념 - AWS Glue

현대의 데이터 기반 애플리케이션에서 데이터의 중요성은 시간 경과에 따라 감소하고 그 가치는 예측에서 반응으로 전환됩니다. 따라서 고객은 더 빠른 의사 결정을 내리기 위해 실시간으로 데이터를 처리하기를 원합니다. IoT 센서와 같은 실시간 데이터 피드를 처리할 때 데이터가 정렬되지 않은 상태로 도착하거나 네트워크 지연 시간과 수집 중 기타 소스 관련 오류로 인해 처리가 지연될 수 있습니다. AWS Glue 플랫폼의 일부인 AWS Glue 스트리밍은 이러한 기능을 기반으로 Apache Spark 정형 스트리밍으로 구동되는 확장 가능한 서버리스 스트리밍 ETL을 제공하여 사용자에게 실시간 데이터 처리 기능을 제공합니다.

이 주제에서는 AWS Glue 스트리밍의 고급 스트리밍 개념과 기능을 살펴보겠습니다.

스트림 처리 시 시간 고려 사항

스트림을 처리할 때 시간에 대한 네 가지 개념이 있습니다.

스크린샷은 위에 제공된 예제에 대한 Amazon CloudWatch 모니터링 로그인 AWS Glue를 보여주며 필요한 실행기 수(주황색 선)를 살펴보고 수동 조정 없이 이에 맞게 실행기 크기(파란색 선)를 조정합니다.
  • 이벤트 타임 – 이벤트가 발생한 시간입니다. 대부분의 경우 이 필드는 소스의 이벤트 데이터 자체에 포함됩니다.

  • 이벤트 기간 - 두 이벤트 시간 사이의 기간입니다. 위 다이어그램에 표시된 것처럼 W1은 17:00부터 17:10까지의 이벤트 기간입니다. 각 이벤트 기간은 여러 이벤트의 그룹입니다.

  • 트리거 시간 - 트리거 시간은 데이터 처리 및 결과 업데이트 빈도를 제어합니다. 마이크로 배치 처리가 시작된 시간입니다.

  • 모으기 시간 - 스트림 데이터가 스트리밍 서비스에 수집된 시간입니다. 이벤트 시간이 이벤트 자체에 포함되지 않은 경우 경우에 따라 이 시간을 기간 설정에 사용할 수 있습니다.

기간 설정

기간 설정은 이벤트 기간을 기준으로 여러 이벤트를 그룹화하고 집계하는 기법입니다. 다음 예제에서는 기간 설정의 이점과 이를 사용하는 경우를 살펴보겠습니다.

비즈니스 사용 사례에 따라 Spark에서 지원하는 세 가지 유형의 기간이 있습니다.

  • 연속 기간 - 집계하는 일련의 겹치지 않는 고정된 크기의 이벤트 기간입니다.

  • 슬라이딩 기간 – '고정된 크기'라는 점에서 연속 기간과 유사하지만, 슬라이드 지속 시간이 기간 자체의 지속 시간보다 짧으면 기간이 겹치거나 지나갈 수 있습니다.

  • 세션 기간 - 입력 데이터 이벤트로 시작하여 공백 또는 비활성 기간 내에 입력을 수신하는 한 자체적으로 계속 확장됩니다. 세션 기간은 입력에 따라 기간 길이의 크기가 정적이거나 동적일 수 있습니다.

텀블링 윈도우

연속 기간은 집계하는 일련의 겹치지 않는 고정된 크기의 이벤트 기간입니다. 실제 사례를 들어 설명해 보겠습니다.

스크린샷은 위에 제공된 예제에 대한 Amazon CloudWatch 모니터링 로그인 AWS Glue를 보여주며 필요한 실행기 수(주황색 선)를 살펴보고 수동 조정 없이 이에 맞게 실행기 크기(파란색 선)를 조정합니다.

ABC Auto는 새로운 브랜드의 스포츠카에 대한 마케팅 캠페인을 진행하고자 합니다. 이를 위해 스포츠카 팬이 가장 많은 도시를 선택하려고 합니다. 이 목표를 달성하기 위해 이 회사는 웹 사이트에 자동차를 소개하는 15초짜리 짧은 광고를 선보입니다. 모든 '클릭'과 해당 '도시'가 기록되어 Amazon Kinesis Data Streams로 스트리밍됩니다. 10분 동안의 클릭 수를 세고 이를 도시별로 그룹화하여 수요가 가장 높은 도시를 확인하려고 합니다. 다음은 집계의 출력입니다.

window_start_time window_end_time 구/군/시 total_clicks
2023-07-10 17:00:00 2023-07-10 17:10:00 댈러스 75
2023-07-10 17:00:00 2023-07-10 17:10:00 시카고 10
2023-07-10 17:20:00 2023-07-10 17:30:00 댈러스 20
2023-07-10 17:20:00 2023-07-10 17:30:00 시카고 50

위에서 설명한 것처럼 이러한 이벤트 기간은 트리거 시간 간격과 다릅니다. 예를 들어, 트리거 시간이 1분마다인 경우에도 출력 결과에는 겹치지 않는 집계 기간이 10분만 표시됩니다. 최적화를 위해서는 트리거 간격을 이벤트 기간에 맞추는 것이 좋습니다.

위 표에서 17:00~17:10 기간 동안 Dallas는 75번의 클릭이 있었고 Chicago는 10번의 클릭이 있었습니다. 또한 17:10~17:20 기간은 어느 도시의 데이터도 없으므로 생략됩니다.

이제 다운스트림 분석 애플리케이션에서 이 데이터에 대한 추가 분석을 실행하여 마케팅 캠페인을 실행할 가장 독점적인 도시를 결정할 수 있습니다.

AWS Glue에서 연속 기간 사용
  1. Amazon Kinesis Data Streams DataFrame을 생성하고 여기에서 읽습니다. 예제:

    parsed_df = kinesis_raw_df \ .selectExpr('CAST(data AS STRING)') \ .select(from_json("data", ticker_schema).alias("data")) \ .select('data.event_time','data.ticker','data.trade','data.volume', 'data.price')
  2. 연속 기간에서 데이터를 처리합니다. 아래 예제에서 데이터는 10분 연속 기간의 입력 필드 'event_time'을 기준으로 그룹화되고 출력을 Amazon S3 데이터 레이크에 씁니다.

    grouped_df = parsed_df \ .groupBy(window("event_time", "10 minutes"), "city") \ .agg(sum("clicks").alias("total_clicks")) summary_df = grouped_df \ .withColumn("window_start_time", col("window.start")) \ .withColumn("window_end_time", col("window.end")) \ .withColumn("year", year("window_start_time")) \ .withColumn("month", month("window_start_time")) \ .withColumn("day", dayofmonth("window_start_time")) \ .withColumn("hour", hour("window_start_time")) \ .withColumn("minute", minute("window_start_time")) \ .drop("window") write_result = summary_df \ .writeStream \ .format("parquet") \ .trigger(processingTime="10 seconds") \ .option("checkpointLocation", "s3a://bucket-stock-stream/stock-stream-catalog-job/checkpoint/") \ .option("path", "s3a://bucket-stock-stream/stock-stream-catalog-job/summary_output/") \ .partitionBy("year", "month", "day") \ .start()

슬라이딩 윈도우

슬라이딩 기간은 '고정된 크기'라는 점에서 연속 기간과 유사하지만, 슬라이드 지속 시간이 기간 자체의 지속 시간보다 짧으면 기간이 겹치거나 지나갈 수 있습니다. 슬라이딩의 특성으로 인해 입력이 여러 기간에 바인딩될 수 있습니다.

스크린샷은 연속 기간 예제를 보여줍니다.

이해를 돕기 위해 신용 카드 사기 가능성을 탐지하려는 은행의 예를 들어 보겠습니다. 스트리밍 애플리케이션은 신용 카드 거래의 지속적인 스트림을 모니터링할 수 있습니다. 이러한 트랜잭션은 10분 기간으로 집계될 수 있으며 5분마다 기간이 앞으로 이동하여 가장 오래된 5분의 데이터를 제거하고 최신 5분의 새 데이터를 추가합니다. 각 기간 내에서 미국에서의 트랜잭션 직후 호주에서의 트랜잭션과 같은 의심스러운 패턴을 확인하면서 국가별로 트랜잭션을 그룹화할 수 있습니다. 간소화를 위해 총 트랜잭션 금액이 100 USD를 초과하는 경우 이러한 트랜잭션을 사기로 분류하겠습니다. 이러한 패턴이 탐지되면 사기 가능성이 있다는 신호이며 카드가 동결될 수 있습니다.

신용 카드 처리 시스템은 국가와 함께 각 카드 ID에 대한 일련의 거래 이벤트를 Kinesis로 전송하고 있습니다. AWS Glue 작업은 분석을 실행하고 다음과 같은 집계 결과를 생성합니다.

window_start_time window_end_time card_last_four country total_amount
2023-07-10 17:00:00 2023-07-10 17:10:00 6544 미국 85
2023-07-10 17:00:00 2023-07-10 17:10:00 6544 호주 10
2023-07-10 17:05:45 2023-07-10 17:15:45 6544 미국 50
2023-07-10 17:10:45 2023-07-10 17:20:45 6544 미국 50
2023-07-10 17:10:45 2023-07-10 17:20:45 6544 호주 150

위 집계를 기준으로 5분마다 이동하는 10분 기간을 트랜잭션 금액별로 합산하여 볼 수 있습니다. 이상치(호주에서 150 USD 트랜잭션)가 있는 17:10~17:20 기간에서 이상이 탐지되었습니다. AWS Glue는 이러한 이상을 탐지하고 boto3를 사용하여 잘못된 키가 포함된 경보 이벤트를 SNS 주제에 푸시할 수 있습니다. 또한 Lambda 함수가 이 주제를 구독하고 조치를 취할 수 있습니다.

연속 기간에서 데이터 처리

group-by 절과 기간 함수는 아래와 같이 슬라이딩 기간을 구현하는 데 사용됩니다.

grouped_df = parsed_df \ .groupBy(window(col("event_time"), "10 minute", "5 min"), "country", "card_last_four") \ .agg(sum("tx_amount").alias("total_amount"))

세션 기간

크기가 고정된 위의 두 기간과 달리 세션 기간은 입력에 따라 기간 길이의 크기가 정적이거나 동적일 수 있습니다. 세션 기간은 입력 데이터 이벤트로 시작하여 공백 또는 비활성 기간 내에 입력을 수신하는 한 자체적으로 계속 확장됩니다.

스크린샷은 연속 기간 예제를 보여줍니다.

예를 들어 보겠습니다. ABC 호텔은 일주일 중 가장 바쁜 시간이 언제인지 알아보고 고객에게 더 나은 가격을 제공하고자 합니다. 손님이 체크인하는 즉시 세션 기간이 시작되고 Spark는 해당 이벤트 기간에 대한 집계로 상태를 유지합니다. 손님이 체크인할 때마다 이벤트가 생성되어 Amazon Kinesis Data Streams로 전송됩니다. 호텔은 15분 동안 체크인이 없을 경우 이벤트 기간을 닫을 수 있다고 판단합니다. 새 체크인이 있을 때 다음 이벤트 기간이 다시 시작됩니다. 출력은 다음과 같습니다.

window_start_time window_end_time 구/군/시 total_checkins
2023-07-10 17:02:00 2023-07-10 17:30:00 댈러스 50
2023-07-10 17:02:00 2023-07-10 17:30:00 시카고 25
2023-07-10 17:40:00 2023-07-10 18:20:00 댈러스 75
2023-07-10 18:50:45 2023-07-10 19:15:45 댈러스 20

첫 번째 체크인은 event_time=17:02에 발생했습니다. 집계 이벤트 기간은 17:02에 시작됩니다. 이 집계는 15분 이내에 이벤트를 수신하는 한 계속됩니다. 위 예제에서 수신한 마지막 이벤트는 17:15였고 이후 15분 동안 이벤트가 없었습니다. 그 결과 Spark는 17:15+15min = 17:30에 이벤트 기간을 닫고 17:02~17:30으로 설정했습니다. 새 체크인 데이터 이벤트를 수신한 17:47에 새 이벤트 기간을 시작했습니다.

세션 기간에서 데이터 처리

group-by 절과 기간 함수는 슬라이딩 기간을 구현하는 데 사용됩니다.

grouped_df = parsed_df \ .groupBy(session_window(col("event_time"), "10 minute"), "city") \ .agg(count("check_in").alias("total_checkins"))

출력 모드

출력 모드는 무제한 테이블의 결과를 외부 싱크에 쓰는 모드입니다. 세 가지 모드를 사용할 수 있습니다. 다음 예제에서는 각 마이크로 배치에서 데이터 라인이 스트리밍되고 처리될 때 단어 발생 횟수를 계산합니다.

  • 전체 모드 - 현재 이벤트 기간에서 단어 수가 업데이트되지 않았더라도 모든 마이크로 배치 처리가 끝나면 전체 결과 테이블을 싱크에 씁니다.

  • 추가 모드 - 이 모드는 기본 모드로, 마지막 트리거 이후 결과 테이블에 추가된 새 단어 및/또는 행만 싱크에 씁니다. 이 모드는 맵, flatMap, 필터와 같은 쿼리의 상태 비저장 스트리밍에 적합합니다.

  • 업데이트 모드 - 마지막 트리거 이후 업데이트되거나 추가된 결과 테이블의 단어 및/또는 행만 싱크에 씁니다.

    참고

    출력 모드 = '업데이트'는 세션 기간에서 지원되지 않습니다.

최신 데이터 및 워터마크 처리

실시간 데이터로 작업할 때 네트워크 지연 및 업스트림 오류로 인해 데이터 도착이 지연될 수 있으며 놓친 이벤트 기간에 다시 집계를 수행하는 메커니즘이 필요합니다. 하지만 이렇게 하려면 상태를 유지해야 합니다. 이와 동시에 상태 크기를 제한하려면 오래된 데이터를 정리해야 합니다. Spark 버전 2.1에는 상태를 유지하고 사용자가 지연 데이터에 대한 임곗값을 지정할 수 있는 워터마킹이라는 기능에 대한 지원이 추가되었습니다.

위의 주식 시세 표시기 예제를 참조하여 지연 데이터에 대한 허용 임곗값을 10분 이내로 가정해 보겠습니다. 간단하게 하기 위해 연속 기간을 가정하겠습니다(티커는 AMZ, 거래는 BUY).

스크린샷은 예제 입력 스트림과 데이터 세트에 지연 데이터가 추가된 경우의 결과 테이블을 보여줍니다.

위 다이어그램에서는 10분의 연속 기간 동안 총 볼륨을 계산하고 있습니다. 17:00, 17:10, 17:20에 트리거가 있습니다. 타임라인 화살표 위에는 입력 데이터 스트림이 있고 아래에는 무제한 결과 테이블이 있습니다.

첫 번째 10분의 연속 기간에서는 event_time을 기준으로 집계했고, total_volume은 30으로 계산되었습니다. 두 번째 이벤트 기간에서 Spark는 event_time=17:02의 첫 번째 데이터 이벤트를 받았습니다. 이는 지금까지 Spark에서 본 최대 event_time이므로 워터마크 임곗값은 10분 뒤로 설정됩니다(즉, watermark_event_time=16:52). 16:52 이후의 event_time이 있는 모든 데이터 이벤트는 시간 제한 집계로 간주되며 그 이전의 모든 데이터 이벤트는 삭제됩니다. 이를 통해 Spark는 추가 10분 동안 중간 상태를 유지하여 지연 데이터를 수용할 수 있습니다. 17:08 즈음에 Spark는 임곗값 내에 있는 event_time=16:54의 이벤트를 수신했습니다. 따라서 Spark는'16:50~17:00' 이벤트 기간을 다시 계산했고 총 볼륨은 30에서 60으로 업데이트되었습니다.

그러나 트리거 시간 17:20에 Spark가 event_time=17:15의 이벤트를 수신하면 watermark_event_time=17:05로 설정됩니다. 따라서 event_time=17:03의 지연 데이터 이벤트는 '너무 늦음'으로 간주되어 무시되었습니다.

Watermark Boundary = Max(Event Time) - Watermark Threshold

AWS Glue에서 워터마크 사용

Spark는 워터마크 경계를 통과할 때까지 외부 싱크에 데이터를 내보내거나 쓰지 않습니다. AWS Glue에서 워터마크를 구현하려면 아래 예제를 참조하세요.

grouped_df = parsed_df \ .withWatermark("event_time", "10 minutes") \ .groupBy(window("event_time", "5 minutes"), "ticker") \ .agg(sum("volume").alias("total_volume"))
프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.