Trabajar con trabajos de Flink en Amazon EMR - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Trabajar con trabajos de Flink en Amazon EMR

Hay varias formas de interactuar con Flink en Amazon EMR: a través de la consola, la interfaz de Flink que se encuentra en la interfaz de usuario de seguimiento y en ResourceManager la línea de comandos. Puede enviar un archivo JAR a una aplicación de Flink con cualquiera de estas opciones. Una vez enviado un archivo JAR, se convierte en un trabajo gestionado por Flink. JobManager JobManager Se encuentra en el nodo YARN que aloja el daemon Application Master de la sesión de Flink.

Puede ejecutar una aplicación de Flink como trabajo de YARN en un clúster de ejecución prolongada o como clúster transitorio. En un clúster de ejecución prolongada, puede enviar varios trabajos de Flink a un clúster de Flink que se ejecuta en Amazon EMR. Si ejecuta un trabajo de Flink como clúster transitorio, su clúster de Amazon EMR solo existe durante el tiempo que se tarda en ejecutar la aplicación de Flink, por lo que solo se le cobrará por los recursos y el tiempo utilizado. Puede enviar un trabajo de Flink con la operación AddSteps de la API de Amazon EMR como un argumento de paso para la operación RunJobFlow o a través de los comandos AWS CLIadd-steps o create-cluster.

Para iniciar una aplicación de Flink a la que varios clientes puedan enviar trabajos mediante las operaciones de la API de YARN, debe crear un clúster o agregar una aplicación de Flink a un clúster existente. Para obtener instrucciones sobre cómo crear un clúster nuevo, consulte Creación de un clúster con Flink. Para iniciar una sesión de YARN en un clúster existente, utilice los siguientes pasos desde la consola, la AWS CLI o el SDK de Java.

nota

El comando flink-yarn-session se agregó en Amazon EMR versión 5.5.0 como envoltorio para que el script yarn-session.sh simplifique la ejecución. Si utiliza una versión anterior de Amazon EMR, sustituya bash -c "/usr/lib/flink/bin/yarn-session.sh -d" por Argumentos en la consola o Args en el comando AWS CLI.

Para enviar un trabajo de Flink en un clúster existente desde la consola

Envíe la sesión de Flink de con el comando flink-yarn-session en un clúster existente.

  1. Abra la consola Amazon EMR en https://console.aws.amazon.com/emr.

  2. En la lista de clústeres, seleccione el clúster que lanzó con anterioridad.

  3. En la página de detalles del clúster, elija Steps (Pasos), Add Step (Añadir paso).

  4. Utilice las directrices que siguen para introducir los parámetros y, a continuación, elija Agregar.

    Parámetro Descripción

    Step type (Tipo de paso)

    JAR personalizada

    Nombre

    Un nombre que le ayuda a identificar el paso. Por ejemplo, < example-flink-step-name >.

    Jar location (Ubicación de JAR)

    command-runner.jar

    Arguments

    El comando flink-yarn-session con argumentos adecuados para su aplicación. Por ejemplo, flink-yarn-session -d inicia una sesión de Flink en tu clúster de YARN en un estado separado ()-d. Consulte Configuración de YARN en la documentación de Flink más reciente para conocer los detalles de argumentos.

Para enviar un trabajo de Flink en un clúster existente con AWS CLI
  • Utilice el comando add-steps para agregar un trabajo de Flink a un clúster de ejecución prolongada. El siguiente comando de ejemplo especifica Args="flink-yarn-session", "-d" para iniciar una sesión de Flink dentro del clúster de YARN en un estado desconectado (-d). Consulte Configuración de YARN en la documentación de Flink más reciente para conocer los detalles de argumentos.

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

Si ya tiene una aplicación de Flink existente en un clúster de ejecución prolongada, puede especificar el ID de la aplicación de Flink del clúster para enviar el trabajo. Para obtener el ID de la aplicación, ejecute la yarn application -list operación de YarnClientAPI AWS CLI o a través de ella:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

El ID de aplicación para esta sesión de Flink es application_1473169569237_0002 y puede usarlo para enviar trabajos a la aplicación desde AWS CLI o desde un SDK.

ejemplo SDK para Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
ejemplo AWS CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region <region-code>

El segundo ejemplo lanza un clúster transitorio que ejecuta un trabajo de Flink y, a continuación, termina al completarse.

ejemplo SDK para Java
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
ejemplo AWS CLI

Utilice el subcomando create-cluster para crear un clúster transitorio que termina cuando se completa el trabajo de Flink:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""