Llama a Amazon EMR con Step Functions - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Llama a Amazon EMR con Step Functions

Step Functions puede controlar ciertosAWSservicios directamente desde el Lenguaje de estados de Amazon. Para obtener más información acerca de cómo trabajar conAWS Step Functionsy sus integraciones, consulte los temas siguientes:

En qué se diferencia la integración optimizada de Amazon EMR de Amazon EMRAWSIntegración de SDK

La integración de servicios optimizada de Amazon EMR tiene un conjunto personalizado de API que envuelven las API de Amazon EMR subyacentes, que se describen a continuación. Debido a esto, difiere significativamente del EMR de AmazonAWSIntegración de servicios SDK. Además, elEjecutar un trabajo (.sync)se admite el patrón de integración.

Para integrarAWS Step Functionscon Amazon EMR, utiliza las API de integración de servicios de Amazon EMR proporcionadas. Las API de integración de servicios son similares a las API de Amazon EMR correspondientes, con algunas diferencias en los campos que se pasan y en las respuestas que se devuelven.

Step Functions no finaliza automáticamente un clúster de Amazon EMR si se detiene la ejecución. Si el equipo de estado se detiene antes de que finalice el clúster de Amazon EMR, es posible que el clúster siga funcionando indefinidamente y puede acumular cargos adicionales. Para evitarlo, asegúrese de que cualquier clúster de Amazon EMR que cree finalice correctamente. Para obtener más información, consulte:

nota

A partir deemr-5.28.0, puede especificar el parámetroStepConcurrencyLevelal crear un clúster para permitir que varios pasos se ejecuten en paralelo en un único clúster. Puede utilizar las Step FunctionsMapyParallelestados para enviar trabajo en paralelo al clúster.

La disponibilidad de la integración de servicios de Amazon EMR está sujeta a la disponibilidad de las API de Amazon EMR. Consulte laAmazon EMRdocumentación de limitaciones en regiones especiales.

En la siguiente tabla se describen las diferencias entre cada API de integración de servicios y la API de Amazon EMR correspondiente.

API de integración de Amazon EMR Service y API de Amazon EMR correspondientes
API de integración de servicios de Amazon EMR API de EMR correspondiente Diferencias
createCluster

Crea y comienza a ejecutar un clúster (flujo de trabajo).

Amazon EMR está vinculado directamente a un tipo único de rol de IAM conocido como rol vinculado a servicios. Para que createCluster y createCluster.sync funcionen, tiene que tener configurados los permisos necesarios para crear el rol vinculado a servicios AWSServiceRoleForEMRCleanup. Para obtener más información al respecto, incluida una instrucción que puede agregar a la política de permisos de IAM, consulteUso del rol vinculado a un servicio para Amazon EMR.

runJobFlow createClusterutiliza la misma sintaxis de solicitud querunJobFlow, excepto por lo siguiente:
  • El campo Instances.KeepJobFlowAliveWhenNoSteps es obligatorio y debe tener el valor booleano TRUE.

  • El campo Steps no está permitido.

  • El campo Instances.InstanceFleets[index].Name se debe proporcionar y debe ser único si la API de conector modifyInstanceFleetByName opcional se utiliza.

  • El campo Instances.InstanceGroups[index].Name se debe proporcionar y debe ser único si la API modifyInstanceGroupByName opcional se utiliza.

La respuesta es la siguiente:
{ "ClusterId": "string" }
Amazon EMR utiliza:
{ "JobFlowId": "string" }
createCluster.sync

Crea y comienza a ejecutar un clúster (flujo de trabajo).

runJobFlow Lo mismo que createCluster, pero espera a que el clúster alcance el estado WAITING.
setClusterTerminationProtección

Bloquea un clúster (flujo de trabajo) de forma que las instancias de EC2 del clúster no se puedan terminar mediante la intervención del usuario, una llamada a la API o un error del flujo de trabajo.

setTerminationProtection La solicitud utiliza:
{ "ClusterId": "string" }
Amazon EMR utiliza:
{ "JobFlowIds": ["string"] }
terminateCluster

Cierra un clúster (flujo de trabajo).

terminateJobFlows La solicitud utiliza:
{ "ClusterId": "string" }
Amazon EMR utiliza:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

Cierra un clúster (flujo de trabajo).

terminateJobFlows Lo mismo que terminateCluster, pero espera a que el clúster finalice.
addStep

Agrega un nuevo paso a un clúster en ejecución.

addJobFlowPasos La solicitud utiliza la clave"ClusterId". Amazon EMR utiliza"JobFlowId". La solicitud utiliza un solo paso.
{ "Step": <"StepConfig object"> }
Amazon EMR utiliza:
{ "Steps": [<StepConfig objects>] }
La respuesta es la siguiente:
{ "StepId": "string" }
Amazon EMR devuelve esto:
{ "StepIds": [<strings>] }
addStep.sync

Agrega un nuevo paso a un clúster en ejecución.

addJobFlowPasos Lo mismo que addStep, pero espera a que el paso se complete.
cancelStep

Cancela un paso pendiente en un clúster en ejecución.

cancelSteps La solicitud utiliza:
{ "StepId": "string" }
Amazon EMR utiliza:
{ "StepIds": [<strings>] }
La respuesta es la siguiente:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR utiliza:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

Modifica las capacidades de Spot de destino y de destino bajo demanda para la flota de instancias con el InstanceFleetName especificado.

modifyInstanceFleet La solicitud es la misma que para modifyInstanceFleet, excepto por lo siguiente:
  • El campo Instance.InstanceFleetId no está permitido.

  • En el tiempo de ejecución, el InstanceFleetId lo determina automáticamente la integración del servicio mediante una llamada a ListInstanceFleets y un análisis del resultado.

modifyInstanceGroupByName

Modifica el número de nodos y las opciones de configuración de un grupo de instancias.

modifyInstanceGroups La solicitud es la siguiente:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR utiliza una lista:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

En el objeto InstanceGroupModifyConfig, el campo InstanceGroupId no está permitido.

Se ha añadido un nuevo campo, InstanceGroupName. En el tiempo de ejecución, el InstanceGroupId lo determina automáticamente la integración del servicio mediante una llamada a ListInstanceGroups y un análisis del resultado.

El código siguiente incluye un estado Task que crea un clúster.

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

El código siguiente incluye un estado Task que habilita la protección de la terminación.

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

El código siguiente incluye un estado Task que envía un paso a un clúster.

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<mybucket>/MyHiveQueryResults/" ] } } }, "End": true }

El código siguiente incluye un estado Task que cancela un paso.

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

El código siguiente incluye un estado Task que termina un clúster.

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

El código siguiente incluye un estado Task que escala un clúster hacia arriba o hacia abajo para un grupo de instancias.

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

El código siguiente incluye un estado Task que escala un clúster hacia arriba o hacia abajo para una flota de instancias.

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

Para obtener información acerca de cómo configurar IAM al utilizar Step Functions con otrosAWSservicios, consultePolíticas de IAM para servicios integrados.