为 Flink Operator 和 Flink 应用程序使用高可用性(HA) - Amazon EMR

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为 Flink Operator 和 Flink 应用程序使用高可用性(HA)

我们为 Flink Operator 启用了高可用性,这样就可以使用备用 Flink Operator 进行故障转移,从而在发生故障时最大限度地减少 Operator 控制回路中的停机时间。默认会启用“高可用性”,启动 Operator 副本的默认数量为 2。您可以在 Helm 图表的 values.yaml 文件中配置副本字段。

以下字段支持自定义:

  • replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他备用 Operator,从而更快地恢复任务。

  • highAvailabilityEnabled(可选,默认值为 true):控制是否要启用 HA。将此参数指定为 true 可启用多可用区部署支持,并设置正确的 flink-conf.yaml 参数。

values.yaml 文件中设置以下配置可以为 Operator 禁用 HA。

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

多可用区部署

我们在多个可用区中创 Operator Pod。这是一个软约束,如果不同可用区中没有足够的资源,您的 Operator Pod 将被调度到同一可用区中。

确定主副本

如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Lease 来选举主副本。您可以描述租约并查看 .Spec.Holder Identity 字段来确定当前主副本

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Flink-S3 交互

配置访问凭证

请确保已为 IRSA 配置了相应的 IAM 权限来访问 S3 存储桶。

从 S3 应用程序模式获取任务 jar

Flink Operator 也支持从 S3 获取应用程序 jar。您只需在 FlinkDeployment 规范中提供 JarurI 的 S3 位置即可。

您也可以使用此功能下载其他工件,例如 PyFlink 脚本。生成的 Python 脚本放在路径 /opt/flink/usrlib/ 下。

以下示例演示了如何将此功能用于作 PyFlink 业。注意 jarURI 和 args 字段。

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Flink S3 连接器

Flink 随附两个 S3 连接器(如下所示)。以下各节旨在介绍何时使用哪个连接器。

检查点:Presto S3 连接器

  • 将 S3 方案设置为 s3p://

  • 用于检查点到 s3 的推荐连接器。

示例 FlinkDeployment 规范:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<UCKET-NAME>/flink-checkpoint/
  • 将 S3 方案设置为 s3://s3a://

  • 用于从 S3 读取和写入文件的推荐连接器(仅限实现 Flinks Filesystem 接口的 S3 连接器)。

  • 默认在 flink-conf.yaml 文件中设置 fs.s3a.aws.credentials.provider,即 com.amazonaws.auth.WebIdentityTokenCredentialsProvider。如果完全覆盖默认值 flink-conf,并且正在与 S3 进行交互,请务必使用此提供程序。

示例 FlinkDeployment 规范

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

Flink Deployments 的高可用性 (HA) 允许任务继续取得进展,即使遇到暂时性错误并 JobManager 导致崩溃。任务将重新启动,但会从上次成功启用 HA 的检查点开始。如果未启用 HA,Kubernetes 将重启你的 JobManager,但你的作业将以全新的作业开始并失去其进度。配置 HA 后,我们可以让 Kubernetes 将 HA 元数据存储在永久存储中,以便在中出现暂时故障时参考, JobManager 然后从上次成功的检查点恢复我们的作业。

Flink 任务会默认启用 HA(副本计数设置为 2,这需要您提供 S3 存储位置来永久存储 HA 元数据)。

HA 配置

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

以下是 Job Manager 中上述 HA 配置的描述(在 .spec.jobManager 下定义):

  • highAvailabilityEnabled(可选,默认值为 true):如果不想启用 HA,也不想使用提供的 HA 配置,请将其设置为 false 。您仍然可以操作“replicas”字段来手动配置 HA。

  • replicas(可选,默认值为 2):将此数字设置为大于 1 会创建其他待机状态 JobManagers ,从而可以更快地恢复作业。如果禁用 HA,则必须将副本计数设置为 1,否则会不断收到验证错误(如果未启用 HA,则仅支持 1 个副本)。

  • storageDir(必需):由于默认使用副本计数 2,我们必须提供永久 storageDir。目前,此字段仅接受 S3 路径作为存储位置。

Pod 区域

如果启用 HA,我们还会尝试将 Pod 配置在同一个可用区中,从而提高性能(通过将 Pod 置于相同可用区来减少网络延迟)。这是一个尽力而为的过程,即如果在调度了大多数 Pod 的可用区中没有足够的资源,那么剩余 Pod 仍会被调度,但最终可能会出现在该可用区之外的节点上。

确定主副本

如果启用了 HA,各副本会使用租约来确定哪个 JM 是主副本,并使用 K8s Configmap 作为数据存储来存储此元数据。如果要确定主副本,可以查看 Configmap 的内容,在数据下查看密钥 org.apache.flink.k8s.leader.restserver,找到带 IP 地址的 K8s Pod。您也可以使用以下 bash 命令。

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Amazon EMR 6.13.0 及更高版本都支持 Flink 本机 Kubernetes 在 Amazon EKS 集群上以高可用性模式运行 Flink 应用程序。

注意

提交 Flink 作业时,必须使用一个 Amazon S3 存储桶来存储高可用性元数据。如果不想使用此功能,可以将其禁用。系统会默认启用该功能。

要开启 Flink 高可用性功能,请在运行 run-application CLI 命令时提供以下 Flink 参数。参数在示例的下方定义。

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir – 您要存储作业的高可用性元数据的 Amazon S3 存储桶。

    Dkubernetes.jobmanager.replicas – 要创建的 Job Manager 容器组(pod)的数量,以大于 1 的整数表示。

    Dkubernetes.cluster-id – 标识 Flink 集群的唯一 ID。