Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Trabajar con trabajos de Flink en Amazon EMR
Hay varias formas de interactuar con Flink en Amazon EMR: a través de la consola, la interfaz de Flink que se encuentra en la interfaz de usuario de seguimiento de ResourceManager y de la línea de comandos. Puede enviar un archivo JAR a una aplicación de Flink con cualquiera de estas opciones. Una vez enviado un archivo JAR, se convierte en un trabajo administrado por el JobManager de Flink. El JobManager se encuentra en el nodo de YARN que aloja el daemon Application Master de la sesión de Flink.
Puede ejecutar una aplicación de Flink como trabajo de YARN en un clúster de ejecución prolongada o como clúster transitorio. En un clúster de ejecución prolongada, puede enviar varios trabajos de Flink a un clúster de Flink que se ejecuta en Amazon EMR. Si ejecuta un trabajo de Flink como clúster transitorio, su clúster de Amazon EMR solo existe durante el tiempo que se tarda en ejecutar la aplicación de Flink, por lo que solo se le cobrará por los recursos y el tiempo utilizado. Puede enviar un trabajo de Flink con la operación AddSteps
de la API de Amazon EMR como un argumento de paso para la operación RunJobFlow
o a través de los comandos AWS CLIadd-steps
o create-cluster
.
Inicio de una aplicación de YARN de Flink como un paso en un clúster de ejecución prolongada
Para iniciar una aplicación de Flink a la que varios clientes puedan enviar trabajos mediante las operaciones de la API de YARN, debe crear un clúster o agregar una aplicación de Flink a un clúster existente. Para obtener instrucciones sobre cómo crear un clúster nuevo, consulte Creación de un clúster con Flink. Para iniciar una sesión de YARN en un clúster existente, utilice los siguientes pasos desde la consola, la AWS CLI o el SDK de Java.
nota
El comando flink-yarn-session
se agregó en Amazon EMR versión 5.5.0 como envoltorio para que el script yarn-session.sh
simplifique la ejecución. Si utiliza una versión anterior de Amazon EMR, sustituya bash -c
"/usr/lib/flink/bin/yarn-session.sh -d"
por Argumentos en la consola o Args
en el comando AWS CLI.
Para enviar un trabajo de Flink en un clúster existente desde la consola
Envíe la sesión de Flink de con el comando flink-yarn-session
en un clúster existente.
Abra la consola de Amazon EMR enhttps://console.aws.amazon.com/emr
. -
En la lista de clústeres, seleccione el clúster que lanzó con anterioridad.
-
En la página de detalles del clúster, elija Steps (Pasos), Add Step (Añadir paso).
-
Utilice las directrices que siguen para introducir los parámetros y, a continuación, elija Agregar.
Parámetro Descripción Step type (Tipo de paso)
JAR personalizada Nombre
Un nombre que le ayuda a identificar el paso. Por ejemplo, <example-flink-step-name>
.Jar location (Ubicación de JAR)
command-runner.jar
Arguments
El comando
flink-yarn-session
con argumentos adecuados para su aplicación. Por ejemplo,flink-yarn-session -d
inicia una sesión de Flink dentro del clúster de YARN en estado desconectado (-d
). Consulte Configuración de YARNen la documentación de Flink más reciente para conocer los detalles de argumentos.
Para enviar un trabajo de Flink en un clúster existente con AWS CLI
-
Utilice el comando
add-steps
para agregar un trabajo de Flink a un clúster de ejecución prolongada. El siguiente comando de ejemplo especificaArgs="flink-yarn-session", "-d"
para iniciar una sesión de Flink dentro del clúster de YARN en un estado desconectado (-d
). Consulte Configuración de YARNen la documentación de Flink más reciente para conocer los detalles de argumentos. aws emr add-steps --cluster-id
<j-XXXXXXXX>
--steps Type=CUSTOM_JAR,Name=<example-flink-step-name>
,Jar=command-runner.jar,Args="flink-yarn-session","-d"
Envío del trabajo a una aplicación de Flink existente en un clúster de ejecución prolongada
Si ya tiene una aplicación de Flink existente en un clúster de ejecución prolongada, puede especificar el ID de la aplicación de Flink del clúster para enviar el trabajo. Para obtener el ID de la aplicación, ejecute yarn application -list
en AWS CLI o a través de la operación de la API YarnClient
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089
El ID de aplicación para esta sesión de Flink es application_1473169569237_0002
y puede usarlo para enviar trabajos a la aplicación desde AWS CLI o desde un SDK.
ejemplo SDK para Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("
myClusterId
") .withSteps(stepConfigs));
ejemplo AWS CLI
aws emr add-steps --cluster-id
<j-XXXXXXXX>
\ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region<region-code>
Enviar un trabajo de Flink transitorio
El segundo ejemplo lanza un clúster transitorio que ejecuta un trabajo de Flink y, a continuación, termina al completarse.
ejemplo SDK para Java
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
public class Main_test {
public static void main(String[] args) {
AWSCredentials credentials_profile = null;
try {
credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load credentials from .aws/credentials file. " +
"Make sure that the credentials file exists and the profile name is specified within it.",
e);
}
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
.withRegion(Regions.US_WEST_1)
.build();
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
"s3://path/to/output/");
StepConfig flinkRunWordCountStep = new StepConfig()
.withName("Flink add a wordcount step and terminate")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(flinkWordCountConf);
stepConfigs.add(flinkRunWordCountStep);
Application flink = new Application().withName("Flink");
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("flink-transient")
.withReleaseLabel("emr-5.20.0")
.withApplications(flink)
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withLogUri("s3://path/to/my/logfiles")
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName("myEc2Key")
.withEc2SubnetId("subnet-12ab3c45")
.withInstanceCount(3)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType("m4.large")
.withSlaveInstanceType("m4.large"))
.withSteps(stepConfigs);
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println("The cluster ID is " + result.toString());
}
}
ejemplo AWS CLI
Utilice el subcomando create-cluster
para crear un clúster transitorio que termina cuando se completa el trabajo de Flink:
aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=
<YourKeyName>
,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""