Configurando o Flink na Amazon EMR - Amazon EMR

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Configurando o Flink na Amazon EMR

As EMR versões 6.9.0 e superiores da Amazon oferecem suporte ao Hive Metastore e ao AWS Glue Catalog com o conector Apache Flink para o Hive. Esta seção descreve as etapas necessárias para configurar o Catálogo do AWS Glue e o Hive Metastore com o Flink.

  1. Crie um EMR cluster com a versão 6.9.0 ou superior e pelo menos dois aplicativos: Hive e Flink.

  2. Use script runner para executar o script a seguir como função de etapa:

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. Crie um EMR cluster com a versão 6.9.0 ou superior e pelo menos dois aplicativos: Hive e Flink.

  2. Selecione Usar com metadados da tabela do Hive nas configurações do Catálogo de Dados do AWS Glue para habilitar o Catálogo de Dados no cluster.

  3. Use o script runner para executar o seguinte script como uma função de etapa: Execute comandos e scripts em um EMR cluster da Amazon:

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

Você pode usar a EMR configuração da Amazon API para configurar o Flink com um arquivo de configuração. Os arquivos que podem ser configurados no API são:

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

O principal arquivo de configuração para o Flink é flink-conf.yaml.

Configurar o número de slots de tarefa que são usados para o Flink na AWS CLI
  1. Crie um arquivo configurations.json, com o seguinte conteúdo:

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. Em seguida, crie um cluster com a seguinte configuração:

    aws emr create-cluster --release-label emr-7.2.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
nota

Você também pode alterar algumas configurações com o API Flink. Para obter mais informações, consulte Concepts na documentação do Flink.

Com a Amazon EMR versão 5.21.0 e posterior, você pode substituir as configurações do cluster e especificar classificações de configuração adicionais para cada grupo de instâncias em um cluster em execução. Você faz isso usando o EMR console da Amazon, o AWS Command Line Interface (AWS CLI) ou AWS SDK o. Para obter mais informações, consulte Supplying a Configuration for an Instance Group in a Running Cluster.

Como proprietário da aplicação, você sabe quais recursos atribuir a tarefas no Flink. Para os exemplos nesta documentação, use o mesmo número de tarefas que as instâncias de tarefa que você usa para a aplicação. Geralmente, recomendamos isso para o nível de paralelismo inicial, mas também é possível aumentar a granularidade do paralelismo usando slots de tarefa, que geralmente não excedem o número de núcleos virtuais por instância. Para obter mais informações sobre a arquitetura do Flink, consulte Concepts na documentação do Flink.

O JobManager do Flink permanece disponível durante o processo de failover do nó primário em um EMR cluster da Amazon com vários nós primários. A partir do Amazon EMR 5.28.0, a JobManager alta disponibilidade também é ativada automaticamente. Nenhuma configuração manual é necessária.

Com EMR as versões 5.27.0 ou anteriores da Amazon, esse JobManager é um único ponto de falha. Quando o JobManager falha, ele perde todos os estados de trabalho e não retoma os trabalhos em execução. Você pode ativar a JobManager alta disponibilidade configurando a contagem de tentativas do aplicativo, marcando o ponto de verificação e ativando o armazenamento ZooKeeper como estado para o Flink, conforme demonstra o exemplo a seguir:

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

Você deve configurar o máximo de tentativas principais de aplicativos YARN e tentativas de aplicativo para o Flink. Para obter mais informações, consulte Configuração da alta disponibilidade do YARN cluster. Você também pode configurar o ponto de verificação do Flink para reiniciar a JobManager recuperação de trabalhos em execução de pontos de verificação concluídos anteriormente. Para obter mais informações, consulte Flink checkpointing.

Para EMR versões da Amazon que usam o Flink 1.11.x, você deve configurar o tamanho total do processo de memória para JobManager (jobmanager.memory.process.size) e TaskManager (taskmanager.memory.process.size) in. flink-conf.yaml Você pode definir esses valores configurando o cluster com a configuração API ou descomentando manualmente esses campos por meio de. SSH O Flink fornece os valores padrão a seguir.

  • jobmanager.memory.process.size: 1600m

  • taskmanager.memory.process.size: 1728m

Para excluir o JVM metasespaço e a sobrecarga, use o tamanho total da memória do Flink () taskmanager.memory.flink.size em vez de. taskmanager.memory.process.size O valor padrão para taskmanager.memory.process.size é 1280m. Não é recomendável definir taskmanager.memory.process.size e taskmanager.memory.process.size.

Todas as EMR versões da Amazon que usam o Flink 1.12.0 e posteriores têm os valores padrão listados no conjunto de código aberto do Flink como valores padrão na AmazonEMR, então você não precisa configurá-los sozinho.

Os contêineres de aplicações Flink criam e gravam em três tipos de arquivos de log: arquivos .out, arquivos .log e arquivos .err. Somente os arquivos .err são compactados e removidos do sistema de arquivos, enquanto os arquivos de log .log e .out permanecem no sistema de arquivos. Para garantir que esses arquivos de saída continuem gerenciáveis e que o cluster continue estável, é possível configurar a alternância de logs log4j.properties para definir um número máximo de arquivos e limitar o tamanho deles.

Amazon EMR versões 5.30.0 e posteriores

A partir do Amazon EMR 5.30.0, o Flink usa a estrutura de registro log4j2 com o nome de classificação de configuração. O exemplo de configuração a seguir demonstra flink-log4j. o formato log4j2.

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon EMR versões 5.29.0 e anteriores

Com EMR as versões 5.29.0 e anteriores da Amazon, o Flink usa a estrutura de registro log4j. O exemplo de configuração a seguir demonstra o formato log4j.

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

As EMR versões 6.12.0 e superiores da Amazon oferecem suporte ao tempo de execução do Java 11 para o Flink. As seções a seguir descrevem como configurar o cluster para fornecer suporte ao runtime do Java 11 para o Flink.

Use as etapas a seguir para criar um EMR cluster com o tempo de execução do Flink e do Java 11. O arquivo de configuração ao qual você adiciona suporte ao runtime do Java 11 é flink-conf.yaml.

Console
Para criar um cluster com o tempo de execução do Flink e do Java 11 no console
  1. Faça login no AWS Management Console e abra o EMR console da Amazon em https://console.aws.amazon.com/emr.

  2. Escolha Clusters EMR em EC2 Ativado no painel de navegação e, em seguida, Criar cluster.

  3. Selecione a EMR versão 6.12.0 ou superior da Amazon e escolha instalar o aplicativo Flink. Selecione qualquer outra aplicação que você queira instalar no cluster.

  4. Continue configurando o cluster. Na seção opcional Configurações de software, use a opção padrão Inserir configuração e insira a seguinte configuração:

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. Continue configurando e iniciando o cluster.

AWS CLI
Para criar um cluster com o tempo de execução do Flink e do Java 11 a partir do CLI
  1. Crie um arquivo de configuração configurations.json que configure o Flink para usar o Java 11.

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. A partir do AWS CLI, crie um novo EMR cluster com a EMR versão 6.12.0 ou superior da Amazon e instale o aplicativo Flink, conforme mostrado no exemplo a seguir:

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

Use as etapas a seguir para atualizar um EMR cluster em execução com o Flink e o Java 11 runtime. O arquivo de configuração ao qual você adiciona suporte ao runtime do Java 11 é flink-conf.yaml.

Console
Para atualizar um cluster em execução com o Flink e o tempo de execução do Java 11 no console
  1. Faça login no AWS Management Console e abra o EMR console da Amazon em https://console.aws.amazon.com/emr.

  2. Escolha Clusters EMR EC2em Ativado no painel de navegação e, em seguida, selecione o cluster que você deseja atualizar.

    nota

    O cluster deve usar a EMR versão 6.12.0 ou superior da Amazon para oferecer suporte ao Java 11.

  3. Selecione a guia Configuração.

  4. Na seção Configurações do grupo de instâncias, selecione o grupo de instâncias Em execução que você deseja atualizar e escolha Reconfigurar no menu de ações da lista.

  5. Reconfigure o grupo de instâncias com a opção Editar atributos, conforme mostrado a seguir. Selecione Adicionar nova configuração após cada.

    Classificação Propriedade Valor

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. Selecione Salvar alterações para adicionar as configurações.

AWS CLI
Para atualizar um cluster em execução para usar o tempo de execução do Flink e do Java 11 a partir do CLI

Você pode usar o comando modify-instance-groups para especificar configurações para cada grupo de instâncias em um cluster em execução.

  1. Primeiro, crie um arquivo de configuração configurations.json que configure o Flink para usar o Java 11. No exemplo a seguir, substitua ig-1xxxxxxx9 com o ID do grupo de instâncias que você deseja reconfigurar. Salve o arquivo no mesmo diretório em que você executará o comando modify-instance-groups.

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. A partir do AWS CLI, execute o seguinte comando. Substitua o ID do grupo de instâncias que você deseja reconfigurar:

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

Para determinar o tempo de execução do Java para um cluster em execução, faça login no nó primário com SSH conforme descrito em Conecte-se ao nó primário com SSH. Em seguida, execute o seguinte comando:

ps -ef | grep flink

O comando ps com a opção -ef lista todos os processos que estão em execução no sistema. É possível filtrar essa saída com grep para encontrar menções à string flink. Revise a saída do valor do Java Runtime Environment (JRE),jre-XX. Na saída a seguir, jre-11 indica que o Java 11 está selecionado em runtime para o Flink.

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

Como alternativa, faça login no nó primário com SSH e inicie uma YARN sessão do Flink com o comandoflink-yarn-session -d. A saída mostra a Java Virtual Machine (JVM) para Flink, java-11-amazon-corretto no exemplo a seguir:

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64