Amazon EMR on EKS로 태스크 복구 및 작업 규모 조정을 위한 Flink 작업 재시작 시간 최적화 - 아마존 EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon EMR on EKS로 태스크 복구 및 작업 규모 조정을 위한 Flink 작업 재시작 시간 최적화

태스크가 실패하거나 규모 조정 작업이 발생할 경우 Flink는 마지막으로 완료된 체크포인트의 태스크를 다시 재실행하려고 시도합니다. 체크포인트 상태의 크기와 병렬 태스크의 수에 따라 재시작 프로세스를 실행하는 데 1분 이상 소요될 수 있습니다. 프로세스를 다시 시작하는 동안에는 작업에 대한 백로그 태스크가 누적될 수 있습니다. 그렇지만 Flink는 실행 그래프의 복구 및 재시작 속도를 최적화하여 작업 안정성을 향상시킬 수 있는 방법이 몇 가지 있습니다.

이 페이지에서는 Amazon EMR Flink를 사용하여 태스크 복구 및 규모 조정 작업에서 작업 재시작 시간을 단축할 수 있는 몇 가지 방법을 설명합니다.

참고

태스크-로컬 복구는 EKS 6.14.0 이상의 Amazon EMR on EKS에서 Flink를 통해 지원됩니다.

Flink 체크포인트를 사용할 경우 각 태스크에서 Flink가 Amazon S3와 같은 분산 스토리지에 기록하는 상태 스냅샷을 만듭니다. 복구의 경우 태스크는 분산 스토리지를 통해 해당 상태를 복원합니다. 분산 스토리지에서는 내결함성을 제공하며 모든 노드에서 액세스가 가능하기 때문에 크기 재조정이 이뤄지는 동안 상태를 재분배할 수 있습니다.

하지만 원격 분산 저장소에는 모든 태스크에서 네트워크를 통해 원격 위치에서 해당 상태를 읽어야 한다는 단점도 있습니다. 이러한 한계로 인해 태스크 복구 또는 규모 조정 작업 중에 대규모 상태의 복구 시간이 길어질 수 있습니다.

이와 같은 긴 복구 시간 문제는 태스크-로컬 복구를 통해 해결됩니다. 태스크에서는 체크포인트의 상태를 로컬 디스크와 같이 해당 작업에 대해 로컬인 보조 스토리지에 기록합니다. 또한 태스크는 기본 스토리지(이 경우 Amazon S3)에 상태를 저장합니다. 복구가 진행되는 동안 스케줄러는 태스크가 이전에 실행된 동일한 태스크 관리자에서 태스크를 예약하기 때문에 원격 상태 저장소에서 데이터를 읽는 대신 로컬 상태 저장소에서 복구할 수 있습니다. 자세한 내용을 알아보려면 Apache Flink 설명서태스크 로컬 복구를 참조하세요.

샘플 작업을 사용한 벤치마크 테스트 결과에 따르면 태스크-로컬 복구가 활성화된 상태에서는 복구 시간이 몇 분에서 몇 초로 단축된 것으로 확인되었습니다.

태스크-로컬 복구를 활성화하려면 flink-conf.yaml 파일에 다음과 같은 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://STORAGE-BUCKET-PATH/checkpoint execution.checkpointing.interval: 15000
참고

Amazon EBS를 통한 태스크-로컬 복구는 Amazon EMR on EKS 6.15.0 이상의 Flink를 통해 지원됩니다.

Amazon EMR on EKS에서 Flink를 사용할 경우 태스크 로컬 복구를 위해 Amazon EBS 볼륨을 TaskManager 포드에 자동 프로비저닝할 수 있습니다. 기본 오버레이 마운트에는 10GB 볼륨이 함께 제공되어 상태가 낮은 작업에 충분합니다. 상태가 큰 작업에서는 자동 EBS 볼륨 마운트 옵션을 활성화할 수 있습니다. TaskManager 포드는 포드 생성 과정에서 자동으로 생성되어 마운트되며 포드 삭제 중에는 제거됩니다.

다음 단계를 따라 Amazon EMR on EKS에서 Flink용 자동 EBS 볼륨 마운트를 활성화하세요.

  1. 이후 단계에서 사용할 다음 변수의 값을 내보내세요.

    export AWS_REGION=aa-example-1 export FLINK_EKS_CLUSTER_NAME=my-cluster export AWS_ACCOUNT_ID=111122223333
  2. 클러스터에 대해 kubeconfig YAML 파일을 생성 또는 업데이트합니다.

    aws eks update-kubeconfig --name $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION
  3. Amazon EKS 클러스터에서 Amazon EBS CSI(컨테이너 스토리지 인터페이스) 드라이버에 대한 IAM 서비스 계정을 생성합니다.

    eksctl create iamserviceaccount \ --name ebs-csi-controller-sa \ --namespace kube-system \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME\ --role-name TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} \ --role-only \ --attach-policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy \ --approve
  4. 다음 명령에 따라 Amazon EBS CSI 드라이버를 생성합니다.

    eksctl create addon \ --name aws-ebs-csi-driver \ --region $AWS_REGION \ --cluster $FLINK_EKS_CLUSTER_NAME \ --service-account-role-arn arn:aws:iam::${AWS_ACCOUNT_ID}:role/TLR_${AWS_REGION}_${FLINK_EKS_CLUSTER_NAME}
  5. 다음 명령에 따라 Amazon EBS 스토리지 클래스를 생성합니다.

    cat ≪ EOF ≫ storage-class.yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: ebs-sc provisioner: ebs.csi.aws.com volumeBindingMode: WaitForFirstConsumer EOF

    그런 후 다음과 같이 클래스를 적용합니다.

    kubectl apply -f storage-class.yaml
  6. Helm에서는 서비스 계정을 생성할 수 있는 옵션과 함께 Amazon EMR Flink Kubernetes 연산자를 설치합니다. 그러면 Flink 배포에 사용할 수 있는 emr-containers-sa-flink가 생성됩니다.

    helm install flink-kubernetes-operator flink-kubernetes-operator/ \ --set jobServiceAccount.create=true \ --set rbac.jobRole.create=true \ --set rbac.jobRoleBinding.create=true
  7. Flink 작업을 제출하고 태스크-로컬 복구를 위한 EBS 볼륨의 자동 프로비저닝을 활성화하려면 flink-conf.yaml 파일에 다음과 같은 구성을 설정하세요. 작업의 상태 크기에 맞게 크기 제한을 조정하세요. serviceAccountemr-containers-sa-flink으로 설정합니다. 체크포인트 간격의 값을 밀리초 단위로 지정하세요. executionRoleArn은 생략하세요.

    flinkConfiguration: task.local-recovery.ebs.enable: true kubernetes.taskmanager.local-recovery.persistentVolumeClaim.sizeLimit: 10Gi state.checkpoints.dir: s3://BUCKET-PATH/checkpoint state.backend.local-recovery: true state.backend: hasmap or rocksdb state.backend.incremental: "true" execution.checkpointing.interval: 15000 serviceAccount: emr-containers-sa-flink

Amazon EBS CSI 드라이버 플러그인을 삭제할 준비가 끝나면 다음과 같은 명령을 사용하세요.

# Detach Attached Policy aws iam detach-role-policy --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} --policy-arn arn:aws:iam::aws:policy/service-role/AmazonEBSCSIDriverPolicy # Delete the created Role aws iam delete-role --role-name TLR_${$AWS_REGION}_${FLINK_EKS_CLUSTER_NAME} # Delete the created service account eksctl delete iamserviceaccount --name ebs-csi-controller-sa --namespace kube-system --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete Addon eksctl delete addon --name aws-ebs-csi-driver --cluster $FLINK_EKS_CLUSTER_NAME --region $AWS_REGION # Delete the EBS storage class kubectl delete -f storage-class.yaml
참고

Amazon EMR on EKS 6.14.0 이상에서 Flink를 사용하면 일반 로그 기반 증분 체크포인트가 지원됩니다.

체크포인트의 속도를 높이기 위해 일반 로그 기반 증분 체크포인트가 Flink 1.16에 추가되었습니다. 체크포인트 간격을 빠르게 하면 복구 후 다시 처리해야 하는 이벤트가 줄어들기 때문에 복구 작업이 적어지는 경우가 많습니다. 자세한 내용은 Apache Flink 블로그에서 일반 로그 기반 증분 체크포인트로 체크포인트의 속도 및 안정성 강화를 참조하세요.

샘플 작업을 이용하여 수행한 벤치마크 테스트에서 일반 로그 기반 증분 체크포인트를 사용하면 체크포인트 시간이 몇 분에서 몇 초로 단축된 것이 확인되었습니다.

일반 로그 기반 증분 체크포인트를 활성화하려면 flink-conf.yaml 파일에 다음 구성을 설정하세요. 체크포인트 간격의 값을 밀리초 단위로 지정하세요.

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
참고

Amazon EMR on EKS 6.14.0 이상에서 Flink를 사용할 경우 기본 스케줄러에 대해 세분화된 복구 지원이 제공됩니다. 적응형 스케줄러에서의 세분화된 복구 지원은 Amazon EMR on EKS 6.15.0 이상에서 Flink를 통해 이용할 수 있습니다.

실행 중에 태스크가 실패하면 Flink에서는 전체 실행 그래프를 재설정하고 마지막으로 완료된 체크포인트에서 전체 재실행을 트리거합니다. 이 방식은 실패한 태스크를 재실행하는 것보다 비용이 많이 듭니다. 세분화된 복구에서는 실패한 태스크의 파이프라인으로 연결된 구성 요소만 재시작합니다. 다음 예제의 작업 그래프에는 버텍스가 5개(A~E) 있습니다. 버텍스 사이에 있는 모든 연결은 포인트별 분포로 파이프라인되며 작업의 parallelism.default2로 설정됩니다.

A → B → C → D → E

이 예시에서는 태스크가 총 10개 실행 중입니다. 첫 번째 파이프라인(a1~e1)은 TaskManager(TM1)에서 실행되고 두 번째 파이프라인(a2~e2)은 또 다른 TaskManager(TM2)에서 실행됩니다.

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

a1 → e1a2 → e2라는 두 개의 구성 요소가 파이프라인으로 연결되어 있습니다. TM1 또는 TM2 중에 하나가 실패하면 실패는 TaskManager가 중이던 파이프라인에 있는 태스크 5개에만 영향을 미칩니다. 재시작 전략에 따라 영향을 받는 파이프라인 구성 요소만 시작됩니다.

세분화된 복구는 완벽히 병렬화된 Flink 작업에서만 작동합니다. keyBy() 또는 redistribute() 작업에서는 지원되지 않습니다. 자세한 내용은 Flink 개선 제안 Jira 프로젝트의 FLIP-1: 태스크 실패에서 세분화된 복구를 참조하세요.

세분화된 복구를 활성화하려면 flink-conf.yaml 파일에 다음과 같은 구성을 설정하세요.

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
참고

적응형 스케줄러의 결합된 재시작 메커니즘은 Amazon EMR on EKS 6.15.0 이상의 Flink에서 지원됩니다.

적응형 스케줄러에서는 가용 슬롯을 기반으로 작업 병렬성을 조정할 수 있습니다. 이 스케줄러는 구성된 작업 병렬 처리에 적합한 가용 슬롯이 충분하지 않은 경우 병렬 처리의 수를 자동으로 줄입니다. 새 슬롯이 가용 상태가 되면 작업은 구성된 작업 병렬 처리로 다시 확장됩니다. 적응형 스케줄러는 가용 리소스가 충분하지 않은 경우 작업에서 가동 중지가 발생하는 것은 방지합니다. Flink Autoscaler에 대해 지원되는 스케줄러입니다. 이러한 이유들로 인해 적응형 스케줄러를 Amazon EMR Flink와 함께 사용하는 것이 좋습니다. 단, 적응형 스케줄러는 짧은 시간 내에 여러 번 재시작을 수행할 수 있으며, 새 리소스가 추가될 때마다 한 번씩 다시 시작됩니다. 이로 인해 작업 성능이 떨어질 수 있습니다.

Amazon EMR 6.15.0 이상에서는 Flink에 첫 번째 리소스가 추가될 때 재시작 기간을 연 다음, 구성된 기본 1분 간격까지 기다리는 적응형 스케줄러의 결합된 재시작 메커니즘이 있습니다. 이 메커니즘에서는 구성된 병렬 처리로 작업을 실행하기 위한 가용 리소스가 충분하거나 간격 제한 시간이 초과될 경우 단일 재시작을 수행합니다.

샘플 작업을 이용한 벤치마크 테스트에서는 적응형 스케줄러와 Flink Autoscaler를 사용할 경우 이 기능이 기본 동작보다 10% 더 많은 레코드를 처리하는 것이 입증되었습니다.

결합된 재시작 메커니즘을 활성화하려면 flink-conf.yaml 파일에 다음 구성을 설정하세요.

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m