스트리밍을 위한 AWS Glue 대화형 세션 - AWS Glue

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

스트리밍을 위한 AWS Glue 대화형 세션

스트리밍 세션 유형 전환

AWS Glue 대화형 세션 구성 매직(%streaming)을 사용하여 실행 중인 작업을 정의하고 스트리밍 대화형 세션을 초기화합니다.

대화형 개발을 위한 샘플링 입력 스트림

인터랙티브 세션에서 인터랙티브 경험을 향상시키기 위해 개발한 도구 중 하나는 정적 스트림의 스냅샷을 얻을 수 있는 새로운 방법을 추가한 DynamicFrame 것입니다. AWS Glue GlueContext GlueContext워크플로를 검사, 상호 작용 및 구현할 수 있습니다.

GlueContext 클래스 인스턴스를 사용하면 getSampleStreamingDynamicFrame 메서드를 찾을 수 있습니다. 이 메서드의 필수 인수는 다음과 같습니다.

  • dataFrame: 스파크 스트리밍 DataFrame

  • options: 아래 사용 가능한 옵션 참조

사용 가능한 옵션은 다음과 같습니다.

  • windowSize: 마이크로 배치 기간이라고도 합니다. 이 파라미터는 이전 배치가 트리거된 후 스트리밍 쿼리가 대기하는 시간을 결정합니다. 이 파라미터 값은 pollingTimeInMs보다 작아야 합니다.

  • pollingTimeInMs: 메서드가 실행되는 총 시간입니다. 입력 스트림에서 샘플 레코드를 얻기 위해 적어도 하나의 마이크로 배치를 실행합니다.

  • recordPollingLimit: 이 매개변수를 사용하면 스트림에서 폴링할 총 레코드 수를 제한할 수 있습니다.

  • (선택 사항) writeStreamFunction을 사용하여 모든 레코드 샘플링 함수에 이 사용자 지정 함수를 적용할 수도 있습니다. Scala 및 Python 예제는 아래를 참조하세요.

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
참고

몇 가지 이유로 인해 샘플링된 DynFrame이 비어 있는 경우가 발생할 수 있습니다.

  • 스트리밍 소스가 ‘최신’으로 설정되어 있으며 샘플링 기간 동안 새 데이터가 수집되지 않았습니다.

  • 폴링 시간이 충분하지 않아 수집된 레코드를 처리할 수 없습니다. 전체 배치가 처리되지 않으면 데이터가 표시되지 않습니다.

대화형 세션에서 스트리밍 애플리케이션 실행

AWS Glue 대화형 세션에서는 AWS Glue 콘솔에서 스트리밍 애플리케이션을 생성하는 것처럼 AWS Glue 스트리밍 애플리케이션을 실행할 수 있습니다. 대화형 세션은 세션 기반이므로 런타임에 예외가 발생해도 세션이 중지되지 않습니다. 이제 배치 함수를 반복적으로 개발할 수 있다는 추가 이점이 있습니다. 예:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

위의 예에서는 잘못된 메서드 사용을 포함했고, 전체 애플리케이션을 종료하는 일반 AWS Glue 작업과는 달리 사용자의 코딩 컨텍스트 및 정의가 완전히 보존되며 세션이 여전히 작동 중입니다. 새 클러스터를 부트스트랩하고 모든 이전 변환을 다시 실행할 필요가 없습니다. 이를 통해 배치 함수 구현을 신속하게 반복하여 바람직한 결과를 얻을 수 있습니다.

대화형 세션은 세션이 한 번에 하나의 문만 실행하도록 각 문을 차단 방식으로 평가한다는 점에 유의해야 합니다. 스트리밍 쿼리는 지속적이고 끝나지 않으므로 활성 스트리밍 쿼리가 포함된 세션은 중단되지 않는 한 어떤 후속 문도 처리할 수 없습니다. Jupyter Notebook에서 직접 중단 명령을 실행할 수 있으며 커널이 취소를 처리할 것입니다.

실행 대기 중인 다음 일련의 문을 예로 들어 보겠습니다.

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely