태스크 복구 및 작업 규모 조정을 위한 작업 재시작 시간 최적화
태스크가 실패하거나 규모 조정 작업이 발생할 경우 Flink는 마지막으로 완료된 체크포인트의 태스크를 다시 재실행하려고 시도합니다. 체크포인트 상태의 크기와 병렬 태스크의 수에 따라 재시작 프로세스를 실행하는 데 1분 이상 소요될 수 있습니다. 프로세스를 다시 시작하는 동안에는 작업에 대한 백로그 태스크가 누적될 수 있습니다. 그렇지만 Flink는 실행 그래프의 복구 및 재시작 속도를 최적화하여 작업 안정성을 향상시킬 수 있는 방법이 몇 가지 있습니다.
이 페이지에서는 Amazon EMR Flink를 사용하여 태스크 복구 및 규모 조정 작업에서 작업 재시작 시간을 단축할 수 있는 몇 가지 방법을 설명합니다.
태스크-로컬 복구
참고
태스크-로컬 복구는 Amazon EMR 6.0.0 이상에서 지원됩니다.
Flink 체크포인트를 사용할 경우 각 태스크에서 Flink가 Amazon S3와 같은 분산 스토리지에 기록하는 상태 스냅샷을 만듭니다. 복구의 경우 태스크는 분산 스토리지를 통해 해당 상태를 복원합니다. 분산 스토리지에서는 내결함성을 제공하며 모든 노드에서 액세스가 가능하기 때문에 크기 재조정이 이뤄지는 동안 상태를 재분배할 수 있습니다.
하지만 원격 분산 저장소에는 모든 태스크에서 네트워크를 통해 원격 위치에서 해당 상태를 읽어야 한다는 단점도 있습니다. 이러한 한계로 인해 태스크 복구 또는 규모 조정 작업 중에 대규모 상태의 복구 시간이 길어질 수 있습니다.
이와 같은 긴 복구 시간 문제는 태스크-로컬 복구를 통해 해결됩니다. 태스크에서는 체크포인트의 상태를 로컬 디스크와 같이 해당 작업에 대해 로컬인 보조 스토리지에 기록합니다. 또한 태스크는 기본 스토리지(이 경우 Amazon S3)에 상태를 저장합니다. 복구가 진행되는 동안 스케줄러는 태스크가 이전에 실행된 동일한 태스크 관리자에서 태스크를 예약하기 때문에 원격 상태 저장소에서 데이터를 읽는 대신 로컬 상태 저장소에서 복구할 수 있습니다. 자세한 내용을 알아보려면 Apache Flink 설명서의 태스크 로컬 복구
샘플 작업을 사용한 벤치마크 테스트 결과에 따르면 태스크-로컬 복구가 활성화된 상태에서는 복구 시간이 몇 분에서 몇 초로 단축된 것으로 확인되었습니다.
태스크-로컬 복구를 활성화하려면 flink-conf.yaml
파일에 다음과 같은 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.
state.backend.local-recovery: true state.backend:
hasmap or rocksdb
state.checkpoints.dir: s3://storage-location-bucket-path
/checkpoint execution.checkpointing.interval:15000
일반 로그 기반 증분 체크포인트
참고
일반 로그 기반 증분 체크포인트는 Amazon EMR 6.10.0 이상에서 지원됩니다.
체크포인트의 속도를 높이기 위해 일반 로그 기반 증분 체크포인트가 Flink 1.16에 추가되었습니다. 체크포인트 간격을 빠르게 하면 복구 후 다시 처리해야 하는 이벤트가 줄어들기 때문에 복구 작업이 적어지는 경우가 많습니다. 자세한 내용은 Apache Flink 블로그에서 일반 로그 기반 증분 체크포인트로 체크포인트의 속도 및 안정성 강화
샘플 작업을 이용하여 수행한 벤치마크 테스트에서 일반 로그 기반 증분 체크포인트를 사용하면 체크포인트 시간이 몇 분에서 몇 초로 단축된 것이 확인되었습니다.
일반 로그 기반 증분 체크포인트를 활성화하려면 flink-conf.yaml
파일에 다음 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.
state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://
bucket-path
/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path
/checkpoint execution.checkpointing.interval:15000
세분화된 복구
참고
기본 스케줄러에 대한 세분화된 복구 지원은 Amazon EMR 6.0.0에서 이용할 수 있습니다. 적응형 스케줄러에서의 세분화된 복구 지원은 Amazon EMR 6.15.0 이상에서 이용할 수 있습니다.
실행 중에 태스크가 실패하면 Flink에서는 전체 실행 그래프를 재설정하고 마지막으로 완료된 체크포인트에서 전체 재실행을 트리거합니다. 이 방식은 실패한 태스크를 재실행하는 것보다 비용이 많이 듭니다. 세분화된 복구에서는 실패한 태스크의 파이프라인으로 연결된 구성 요소만 재시작합니다. 다음 예제의 작업 그래프에는 버텍스가 5개(A
~E
) 있습니다. 버텍스 사이에 있는 모든 연결은 포인트별 분포로 파이프라인되며 작업의 parallelism.default
는 2
로 설정됩니다.
A → B → C → D → E
이 예시에서는 태스크가 총 10개 실행 중입니다. 첫 번째 파이프라인(a1
~e1
)은 TaskManager(TM1
)에서 실행되고 두 번째 파이프라인(a2
~e2
)은 또 다른 TaskManager(TM2
)에서 실행됩니다.
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
a1 → e1
및 a2 →
e2
라는 두 개의 구성 요소가 파이프라인으로 연결되어 있습니다. TM1
또는 TM2
중에 하나가 실패하면 실패는 TaskManager가 중이던 파이프라인에 있는 태스크 5개에만 영향을 미칩니다. 재시작 전략에 따라 영향을 받는 파이프라인 구성 요소만 시작됩니다.
세분화된 복구는 완벽히 병렬화된 Flink 작업에서만 작동합니다. keyBy()
또는 redistribute()
작업에서는 지원되지 않습니다. 자세한 내용은 Flink 개선 제안 Jira 프로젝트의 FLIP-1: 태스크 실패에서 세분화된 복구
세분화된 복구를 활성화하려면 flink-conf.yaml
파일에 다음과 같은 구성을 설정하세요.
jobmanager.execution.failover-strategy: region restart-strategy:
exponential-delay or fixed-delay
적응형 스케줄러의 결합된 재시작 메커니즘
참고
적응형 스케줄러의 결합된 재시작 메커니즘은 Amazon EMR 6.15.0 이상에서 지원됩니다.
적응형 스케줄러에서는 가용 슬롯을 기반으로 작업 병렬성을 조정할 수 있습니다. 이 스케줄러는 구성된 작업 병렬 처리에 적합한 가용 슬롯이 충분하지 않은 경우 병렬 처리의 수를 자동으로 줄입니다. 새 슬롯이 가용 상태가 되면 작업은 구성된 작업 병렬 처리로 다시 확장됩니다. 적응형 스케줄러는 가용 리소스가 충분하지 않은 경우 작업에서 가동 중지가 발생하는 것은 방지합니다. Flink Autoscaler에 대해 지원되는 스케줄러입니다. 이러한 이유들로 인해 적응형 스케줄러를 Amazon EMR Flink와 함께 사용하는 것이 좋습니다. 단, 적응형 스케줄러는 짧은 시간 내에 여러 번 재시작을 수행할 수 있으며, 새 리소스가 추가될 때마다 한 번씩 다시 시작됩니다. 이로 인해 작업 성능이 떨어질 수 있습니다.
Amazon EMR 6.15.0 이상에서는 Flink에 첫 번째 리소스가 추가될 때 재시작 기간을 연 다음, 구성된 기본 1분 간격까지 기다리는 적응형 스케줄러의 결합된 재시작 메커니즘이 있습니다. 이 메커니즘에서는 구성된 병렬 처리로 작업을 실행하기 위한 가용 리소스가 충분하거나 간격 제한 시간이 초과될 경우 단일 재시작을 수행합니다.
샘플 작업을 이용한 벤치마크 테스트에서는 적응형 스케줄러와 Flink Autoscaler를 사용할 경우 이 기능이 기본 동작보다 10% 더 많은 레코드를 처리하는 것이 입증되었습니다.
결합된 재시작 메커니즘을 활성화하려면 flink-conf.yaml
파일에 다음 구성을 설정하세요.
jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m