Uso de la alta disponibilidad (HA) para Flink Operators y Flink Applications - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de la alta disponibilidad (HA) para Flink Operators y Flink Applications

Habilitamos la alta disponiblidad para el Flink Operator para poder realizar un cambio a un Flink Operator en espera y minimizar el tiempo de inactividad en el bucle de control del operador en caso de fallos. La alta disponibilidad está habilitada de forma predeterminada y el número predeterminado de réplicas de operadores de inicio es 2. Puede configurar el campo de réplicas en su archivo values.yaml para el gráfico de Helm.

Los siguientes campos se pueden personalizar:

  • replicas (opcional, el valor predeterminado es 2): si se establece este número en uno mayor que 1, se crean otros operadores en espera y se puede recuperar el trabajo con mayor rapidez.

  • highAvailabilityEnabled (opcional, el valor predeterminado es verdadero): controla si desea habilitar la alta disponibilidad. Si se especifica este parámetro como verdadero, se habilita la compatibilidad con la implementación multi-AZ y se establecen los parámetros flink-conf.yaml correctos.

Puede deshabilitar la alta disponibilidad para su operador al establecer la siguiente configuración en su archivo values.yaml.

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

Implementación Multi-AZ

Creamos los pods de los operadores en varias zonas de disponibilidad. Se trata de una limitación leve y, si no dispone de recursos suficientes en una zona de disponibilidad diferente, los pods de los operadores se programarán en la misma zona de disponibilidad.

Determinar la réplica líder

Si la alta disponibilidad está habilitada, las réplicas utilizan un arrendamiento para determinar cuál de los JM es el líder y utilizan un arrendamiento de los K8 para la elección del líder. Puede describir el arrendamiento y consultar el campo .Spec.Holder Identity para determinar el líder actual

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Interacción entre Flink y S3

Configuración de las credenciales de acceso

Asegúrese de haber configurado las IRSA con los permisos de IAM adecuados para acceder al bucket de S3.

Búsqueda de archivos jar de trabajo desde el modo aplicación de S3

El operador de Flink también admite la búsqueda de archivos jar de aplicaciones desde S3. Solo tiene que proporcionar la ubicación S3 para el JaRuri en su FlinkDeployment especificación.

También puedes usar esta función para descargar otros artefactos, como PyFlink scripts. El script de Python resultante se coloca en la ruta /opt/flink/usrlib/.

El siguiente ejemplo muestra cómo utilizar esta función para un PyFlink trabajo. Tenga en cuenta los campos jarURI y args.

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Conectores de Flink de S3

Flink viene empaquetado con dos conectores de S3 (enumerados a continuación). En las siguientes secciones se explica cuándo usar cada conector.

Punto de control: conector de Presto de S3

  • Establezca el esquema de S3 en s3p://

  • El conector recomendado para establecer el punto de control en s3. Para obtener más información, consulte la sección específica de S3 en la documentación de Apache Flink.

Ejemplo de especificación: FlinkDeployment

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/

Lectura y escritura en S3: conector Hadoop S3

  • Establezca el esquema de S3 en s3:// o en ( s3a:// )

  • El conector recomendado para leer y escribir archivos desde S3 (solo el conector de S3 que implementa la interfaz del sistema de archivos de Flink).

  • Por defecto, lo configuramos fs.s3a.aws.credentials.provider en el flink-conf.yaml archivo, que es. com.amazonaws.auth.WebIdentityTokenCredentialsProvider Si anula completamente el valor predeterminado flink-conf y está interactuando con S3, asegúrese de usar este proveedor.

FlinkDeployment Especificación de ejemplo

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

La alta disponibilidad (HA) de las implementaciones de Flink permite que los trabajos sigan progresando incluso si se produce un error transitorio y el usuario se bloquea. JobManager Los trabajos se reiniciarán, pero desde el último punto de control exitoso con la alta disponibilidad activada. Si la HA no está habilitada, Kubernetes lo reiniciará JobManager, pero su trabajo empezará como uno nuevo y perderá el progreso. Tras configurar HA, podemos indicarle a Kubernetes que almacene los metadatos de alta disponibilidad en un almacenamiento persistente como referencia en caso de que se produzca un fallo transitorio JobManager y que, después, reanude nuestras tareas desde el último punto de control exitoso.

La alta disponibilidad está habilitada de forma predeterminada para sus trabajos de Flink (el recuento de réplicas está establecido en 2, por lo que tendrá que proporcionar una ubicación de almacenamiento en S3 para que los metadatos de alta disponibilidad se conserven).

Configuraciones de alta disponibilidad

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

Las siguientes son descripciones de las configuraciones de alta disponibilidad anteriores en Job Manager (definidas en .spec.JobManager):

  • highAvailabilityEnabled (opcional, el valor predeterminado es verdadero): configúrela en false si no quiere activar la alta disponibilidad y no quiere utilizar las configuraciones de alta disponibilidad proporcionadas. Aún puede manipular el campo “réplicas” para configurar de forma manual la alta disponibilidad.

  • replicas(opcional, el valor predeterminado es 2): si se establece este número en mayor que 1, se crea otro modo de espera JobManagers y permite una recuperación más rápida del trabajo. Si deshabilita la alta disponibilidad, debe establecer el recuento de réplicas en 1 o seguirá recibiendo errores de validación (solo se admite 1 réplica si la alta disponibilidad no está habilitada).

  • storageDir (obligatorio): dado que utilizamos el recuento de réplicas como 2 de forma predeterminada, debemos proporcionar un storageDir persistente. Actualmente, este campo solo acepta rutas de S3 como ubicación de almacenamiento.

Localidad del pod

Si habilita la alta disponibilidad, también intentamos colocar los pods en la misma zona de disponibilidad, lo que mejora el rendimiento (al tener los pods en las mismas zonas de disponibilidad, se reduce la latencia de la red). Se trata de un proceso de mejor esfuerzo, ya que, si no dispone de recursos suficientes en la zona de disponibilidad en la que están programados la mayoría de sus pods, los demás pods seguirán estando programados, pero es posible que acaben en un nodo fuera de esta zona de disponibilidad.

Determinar la réplica líder

Si la alta disponibilidad está habilitada, las réplicas usan una concesión para determinar cuál de los JM es el líder y usan un ConfigMap de K8 como almacén de datos para almacenar estos metadatos. Si quiere determinar el líder, consulte el contenido del Configmap y busque en los datos la clave org.apache.flink.k8s.leader.restserver para encontrar los pod de K8 con la dirección IP. También puede utilizar los siguientes comandos bash.

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

A partir de la versión 6.13.0, Amazon EMR es compatible con Kubernetes nativo de Flink para ejecutar aplicaciones de Flink en un clúster de Amazon EKS.

nota

Debe tener un bucket de Amazon S3 creado para almacenar los metadatos de alta disponibilidad cuando envíe su trabajo de Flink. Si no desea usar esta característica, puede desactivarla. Está habilitada de forma predeterminada.

Para activar la característica de alta disponibilidad de Flink, utilice los siguientes parámetros de Flink al ejecutar el comando de la CLI run-application. Los parámetros se definen debajo del ejemplo.

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir: el bucket de Amazon S3 en el que desea almacenar los metadatos de alta disponibilidad para su trabajo.

    Dkubernetes.jobmanager.replicas: el número de pods de Job Manager que se van a crear como un entero superior a 1.

    Dkubernetes.cluster-id: un identificador único que identifica el clúster de Flink.