使用 Flink Operator 和 Flink 應用程式的高可用性 (HA) - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Flink Operator 和 Flink 應用程式的高可用性 (HA)

我們啟用 Flink Operator 的高可用性,以便可以容錯移轉至待命 Flink Operator,在發生故障時將 Operator 控制迴圈中的停機時間降至最低。依預設會啟用「高可用性」,且起始 Operator 複本的預設數目為 2。可以在 values.yaml 檔案中設定 Helm Chart 的複本欄位。

下列欄位可自訂:

  • replicas (選用,預設值為 2):將此數字設定為大於 1 可建立其他待命 Operator,並允許更快速地復原作業。

  • highAvailabilityEnabled (選用,預設值為 true):控制是否要啟用 HA。將此參數指定為 true 可啟用多可用區部署支援,並設定正確的 flink-conf.yaml 參數。

透過在 values.yaml 檔案中設定下列組態,停用 operator 的高可用性。

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

多可用區部署

我們會在多個可用區域建立 operator Pod。這是一個軟約束,如果您在不同的可用區域中沒有足夠的資源,將在相同的可用區域中排程您的 operator Pod 。

確定領導者複本

如果啟用 HA,則複本使用 Lease 來確定哪些 JM 是領導者,並使用 K8s Lease 進行領導者選舉。您可以描述 Lease 並查看 .Spec.Holder Identity 欄位,以確定目前的領導者

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Flink-S3 互動

設定存取憑證

請確定您已設定 IRSA,具有可存取 S3 儲存貯體的適當 IAM 許可。

從 S3 應用程式模式中擷取作業 jar

Flink Operator 也支援從 S3 中擷取應用程式 jar。您只需在 FlinkDeployment 規範中提供 Jaruri 的 S3 位置即可。

您也可以使用此功能下載其他成品,例如 PyFlink 指令碼。生成的 Python 指令碼放在路徑 /opt/flink/usrlib/ 下。

下列範例會示範如何將此功能用於 PyFlink 工作。請注意 jarURI 和引數欄位。

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Flink S3 連接器

Flink 隨附有兩個 S3 連接器 (如下所列)。以下各章節討論何時使用哪個連接器。

檢查點:Presto S3 連接器

  • 將 S3 結構描述設為 s3p://

  • 用於檢查點到 s3 的建議連接器。

FlinkDeployment 範例規格:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<UCKET-NAME>/flink-checkpoint/
  • 將 S3 結構描述設定為 s3:// 或 (s3a://)

  • 用於從 S3 讀取和寫入檔案的建議連接器 (只有 S3 連接器實作 Flinks Filesystem 介面)。

  • 預設情況下,我們在 flink-conf.yaml 檔案中設定 fs.s3a.aws.credentials.provider,也就是 com.amazonaws.auth.WebIdentityTokenCredentialsProvider。如果完全覆寫預設 flink-conf 並且正在與 S3 進行互動,請確保使用此提供程式。

FlinkDeployment 規格範例

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

Flink 部署的高可用性 (HA) 允許工作繼續進行,即使遇到暫時性錯誤並發生 JobManager 當機。作業將會重新啟動,但會從上次啟用 HA 的成功檢查點開始。若未啟用 HA,Kubernetes 將會重新啟動您的工作 JobManager,但您的工作將會以全新工作的形式開始,而且會失去進度。設定 HA 之後,我們可以告知 Kubernetes 將 HA 中繼資料儲存在持續性儲存體中,以便在中發生暫時性故障時進行參考, JobManager 然後從上次成功的檢查點繼續我們的工作。

Flink 作業預設為啟用 HA (複本計數設為 2,這將要求您提供 S3 儲存位置,以便 HA 中繼資料持續存在)。

HA 組態

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

以下是 Job Manager 中上述 HA 組態的描述 (在 .spec.jobManager 下定義):

  • highAvailabilityEnabled (選用,預設值為 true):如果您不想啟用 HA 且不想使用提供的 HA 組態,請將此設定為 false 。您仍然可以操作「複本」欄位以手動設定 HA。

  • replicas(選擇性,預設值為 2):將此數字設定為大於 1 會建立其他待命狀態, JobManagers 並可加快工作的復原速度。如果停用 HA,則必須將複本計數設定為 1,否則您將繼續收到驗證錯誤 (如果未啟用 HA,則僅支援 1 個複本)。

  • storageDir (必填):因為我們預設使用的複本計數為 2,所以我們必須提供一個持久的 StorageDir。目前,此欄位僅接受 S3 路徑作為儲存位置。

Pod 位置

如果您啟用 HA,我們也會嘗試在同一個可用區域中共置 Pod,進而提升效能 (藉由將 Pod 放在相同可用區域來減少網路延遲)。這是一個竭盡全力的過程,意味著如果您在對大部分 Pod 進行排程的可用區域中沒有足夠的資源,剩餘的 Pod 仍會排程,但最終可能會在此可用區域以外的節點上結束。

確定領導者複本

如果啟用 HA,複本會使用租用來判斷哪個 JM 是領導者,並使用 K8s Configmap 作為儲存此中繼資料的資料儲存。如果您想確定領導者,可以查看 Configmap 的內容,然後查看資料下的關鍵字 org.apache.flink.k8s.leader.restserver,以使用該 IP 地址尋找 K8s Pod。也可使用下列 bash 命令。

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Amazon EMR 6.13.0 及更高版本支援 Flink Native Kubernetes,以便在 Amazon EKS 叢集上以高可用性模式執行 Flink 應用程式。

注意

提交 Flink 作業時,必須建立 Amazon S3 儲存貯體來儲存高可用性中繼資料。如果不想使用此功能,可以停用它。依預設會啟用此功能。

若要開啟 Flink 高可用性功能,請在執行 run-application CLI 命令時提供下列 Flink 參數。參數定義於範例下方。

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir:您要在其中存放作業的高可用性中繼資料的 Amazon S3 儲存貯體。

    Dkubernetes.jobmanager.replicas:要建立的 Job Manager Pod 數量,為大於 1 的整數。

    Dkubernetes.cluster-id:識別 Flink 叢集的唯一識別碼。