Crea y gestiona EMR clústeres de Amazon con Step Functions - AWS Step Functions

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Crea y gestiona EMR clústeres de Amazon con Step Functions

Obtenga información sobre cómo integrarlos AWS Step Functions con Amazon EMR mediante la integración de EMR servicios de Amazon proporcionadaAPIs. La integración de servicios APIs es similar a la de Amazon correspondiente EMRAPIs, con algunas diferencias en los campos que se pasan y en las respuestas que se devuelven.

Para obtener información sobre la integración con AWS servicios en Step Functions, consulte Integración de los servicios de yPasar parámetros a un servicio API en Step Functions.

Características clave de la EMR integración optimizada de Amazon
  • La integración optimizada del EMR servicio Amazon tiene un conjunto personalizado APIs que envuelve el Amazon subyacente EMRAPIs, tal como se describe a continuación. Debido a esto, se diferencia significativamente del Amazon EMR AWS SDKintegración de servicios.

  • Se admite el patrón de integración Ejecutar un trabajo (.sync).

Step Functions no termina un EMR clúster de Amazon automáticamente si se detiene la ejecución. Si tu máquina de estado se detiene antes de que tu EMR clúster de Amazon finalice, es posible que el clúster siga ejecutándose indefinidamente y que se acumulen cargos adicionales. Para evitarlo, asegúrate de que cualquier EMR clúster de Amazon que crees finalice correctamente. Para obtener más información, consulte:

nota

A partir de emr-5.28.0, puede especificar el parámetro StepConcurrencyLevel al crear un clúster para permitir que diferentes pasos se ejecuten en paralelo en un único clúster. Puede utilizar los estados Map y Parallel de Step Functions para enviar trabajo en paralelo al clúster.

La disponibilidad de la integración de los EMR servicios de Amazon está sujeta a la disponibilidad de Amazon EMRAPIs. Consulte EMR la documentación de Amazon para conocer las limitaciones en regiones especiales.

nota

Para su integración con AmazonEMR, Step Functions tiene una frecuencia de sondeo de trabajo codificada de 60 segundos durante los primeros 10 minutos y 300 segundos después.

Amazon compatible EMR APIs

En la siguiente tabla se describen las diferencias entre cada integración de EMR servicios de Amazon API y la Amazon correspondiente EMRAPIs.

Integración EMR de Amazon Service API Correspondiente EMR API Diferencias
createCluster

Crea y comienza a ejecutar un clúster (flujo de trabajo).

Amazon EMR está vinculado directamente a un tipo único de IAM rol conocido como rol vinculado a un servicio. Para que createCluster y createCluster.sync funcionen, tiene que tener configurados los permisos necesarios para crear el rol vinculado a servicios AWSServiceRoleForEMRCleanup. Para obtener más información al respecto, incluida una declaración que puedas añadir a tu política de IAM permisos, consulta Uso del rol vinculado a servicios para Amazon. EMR

runJobFlow createClusterusa la misma sintaxis de solicitud que runJobFlow, excepto en lo siguiente:
  • El campo Instances.KeepJobFlowAliveWhenNoSteps es obligatorio y debe tener el valor booleano TRUE.

  • El campo Steps no está permitido.

  • El campo Instances.InstanceFleets[index].Name debe proporcionarse y debe ser único si API se utiliza el modifyInstanceFleetByName conector opcional.

  • El campo Instances.InstanceGroups[index].Name debe proporcionarse y debe ser único si modifyInstanceGroupByName API se utiliza el campo opcional.

La respuesta es la siguiente:
{ "ClusterId": "string" }
Amazon EMR usa esto:
{ "JobFlowId": "string" }
createCluster.sync

Crea y comienza a ejecutar un clúster (flujo de trabajo).

runJobFlow Lo mismo que createCluster, pero espera a que el clúster alcance el estado WAITING.
setClusterTerminationProtección

Bloquea un clúster (flujo de trabajo) para que las EC2 instancias del clúster no puedan ser terminadas por la intervención del usuario, una API llamada o un error en el flujo de trabajo.

setTerminationProtection La solicitud utiliza:
{ "ClusterId": "string" }
Amazon EMR usa esto:
{ "JobFlowIds": ["string"] }
terminateCluster

Cierra un clúster (flujo de trabajo).

terminateJobFlows La solicitud utiliza:
{ "ClusterId": "string" }
Amazon EMR usa esto:
{ "JobFlowIds": ["string"] }
terminateCluster.sync

Cierra un clúster (flujo de trabajo).

terminateJobFlows Lo mismo que terminateCluster, pero espera a que el clúster finalice.
addStep

Agrega un nuevo paso a un clúster en ejecución.

Si lo desea, también puede especificar el ExecutionRoleArn parámetro mientras lo API usa.

addJobFlowPasos

La solicitud utiliza la clave "ClusterId". Amazon EMR usa"JobFlowId". La solicitud utiliza un solo paso.
{ "Step": <"StepConfig object"> }
Amazon EMR usa esto:
{ "Steps": [<StepConfig objects>] }
La respuesta es la siguiente:
{ "StepId": "string" }
Amazon EMR devuelve esto:
{ "StepIds": [<strings>] }
addStep.sync

Agrega un nuevo paso a un clúster en ejecución.

Si lo desea, también puede especificar el ExecutionRoleArn parámetro mientras lo API usa.

addJobFlowPasos

Lo mismo que addStep, pero espera a que el paso se complete.
cancelStep

Cancela un paso pendiente en un clúster en ejecución.

cancelSteps La solicitud utiliza:
{ "StepId": "string" }
Amazon EMR usa esto:
{ "StepIds": [<strings>] }
La respuesta es la siguiente:
{ "CancelStepsInfo": <CancelStepsInfo object> }
Amazon EMR usa esto:
{ "CancelStepsInfoList": [<CancelStepsInfo objects>] }
modifyInstanceFleetByName

Modifica las capacidades de Spot de destino y de destino bajo demanda para la flota de instancias con el InstanceFleetName especificado.

modifyInstanceFleet La solicitud es la misma que para modifyInstanceFleet, excepto por lo siguiente:
  • El campo Instance.InstanceFleetId no está permitido.

  • En el tiempo de ejecución, el InstanceFleetId lo determina automáticamente la integración del servicio mediante una llamada a ListInstanceFleets y un análisis del resultado.

modifyInstanceGroupByName

Modifica el número de nodos y las opciones de configuración de un grupo de instancias.

modifyInstanceGroups La solicitud es la siguiente:
{ "ClusterId": "string", "InstanceGroup": <InstanceGroupModifyConfig object> }
Amazon EMR usa una lista:
{ "ClusterId": ["string"], "InstanceGroups": [<InstanceGroupModifyConfig objects>] }

En el objeto InstanceGroupModifyConfig, el campo InstanceGroupId no está permitido.

Se ha añadido un nuevo campo, InstanceGroupName. En el tiempo de ejecución, el InstanceGroupId lo determina automáticamente la integración del servicio mediante una llamada a ListInstanceGroups y un análisis del resultado.

Ejemplo de flujo de trabajo

El código siguiente incluye un estado Task que crea un clúster.

"Create_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:createCluster.sync", "Parameters": { "Name": "MyWorkflowCluster", "VisibleToAllUsers": true, "ReleaseLabel": "emr-5.28.0", "Applications": [ { "Name": "Hive" } ], "ServiceRole": "EMR_DefaultRole", "JobFlowRole": "EMR_EC2_DefaultRole", "LogUri": "s3n://aws-logs-123456789012-us-east-1/elasticmapreduce/", "Instances": { "KeepJobFlowAliveWhenNoSteps": true, "InstanceFleets": [ { "InstanceFleetType": "MASTER", "Name": "MASTER", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] }, { "InstanceFleetType": "CORE", "Name": "CORE", "TargetOnDemandCapacity": 1, "InstanceTypeConfigs": [ { "InstanceType": "m4.xlarge" } ] } ] } }, "End": true }

El código siguiente incluye un estado Task que habilita la protección de la terminación.

"Enable_Termination_Protection": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:setClusterTerminationProtection", "Parameters": { "ClusterId.$": "$.ClusterId", "TerminationProtected": true }, "End": true }

El código siguiente incluye un estado Task que envía un paso a un clúster.

"Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:addStep.sync", "Parameters": { "ClusterId.$": "$.ClusterId", "ExecutionRoleArn": "arn:aws:iam::123456789012:role/myEMR-execution-role", "Step": { "Name": "The first step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "hive-script", "--run-hive-script", "--args", "-f", "s3://<region>.elasticmapreduce.samples/cloudfront/code/Hive_CloudFront.q", "-d", "INPUT=s3://<region>.elasticmapreduce.samples", "-d", "OUTPUT=s3://<amzn-s3-demo-bucket>/MyHiveQueryResults/" ] } } }, "End": true }

El código siguiente incluye un estado Task que cancela un paso.

"Cancel_Step_One": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:cancelStep", "Parameters": { "ClusterId.$": "$.ClusterId", "StepId.$": "$.AddStepsResult.StepId" }, "End": true }

El código siguiente incluye un estado Task que termina un clúster.

"Terminate_Cluster": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:terminateCluster.sync", "Parameters": { "ClusterId.$": "$.ClusterId" }, "End": true }

El código siguiente incluye un estado Task que escala un clúster hacia arriba o hacia abajo para un grupo de instancias.

"ModifyInstanceGroupByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceGroupByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceGroupName": "MyCoreGroup", "InstanceGroup": { "InstanceCount": 8 } }, "End": true }

El código siguiente incluye un estado Task que escala un clúster hacia arriba o hacia abajo para una flota de instancias.

"ModifyInstanceFleetByName": { "Type": "Task", "Resource": "arn:aws:states:::elasticmapreduce:modifyInstanceFleetByName", "Parameters": { "ClusterId": "j-1234567890123", "InstanceFleetName": "MyCoreFleet", "InstanceFleet": { "TargetOnDemandCapacity": 8, "TargetSpotCapacity": 0 } }, "End": true }

IAMpolíticas para llamar a Amazon EMR

En las siguientes plantillas de ejemplo se muestra cómo AWS Step Functions genera IAM políticas en función de los recursos de la definición de su máquina de estados. Para obtener más información, consulte Cómo Step Functions genera IAM políticas para servicios integrados y Descubra los patrones de integración de servicios en Step Functions.

addStep

Recursos estáticos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Recursos dinámicos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:AddJobFlowSteps", "elasticmapreduce:DescribeStep", "elasticmapreduce:CancelSteps" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

cancelStep

Recursos estáticos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Recursos dinámicos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:CancelSteps", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

createCluster

Recursos estáticos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:RunJobFlow", "elasticmapreduce:DescribeCluster", "elasticmapreduce:TerminateJobFlows" ], "Resource": "*" }, { "Effect": "Allow", "Action": "iam:PassRole", "Resource": [ "arn:aws:iam::{{account}}:role/[[roleName]]" ] } ] }

setClusterTerminationProtection

Recursos estáticos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Recursos dinámicos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "elasticmapreduce:SetTerminationProtection", "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceFleetByName

Recursos estáticos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Recursos dinámicos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceFleet", "elasticmapreduce:ListInstanceFleets" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }

modifyInstanceGroupByName

Recursos estáticos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Recursos dinámicos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:ModifyInstanceGroups", "elasticmapreduce:ListInstanceGroups" ], "Resource": "*" } ] }

terminateCluster

Recursos estáticos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": [ "arn:aws:elasticmapreduce:[[region]]:[[accountId]]:cluster/[[clusterId]]" ] } ] }

Recursos dinámicos

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "elasticmapreduce:TerminateJobFlows", "elasticmapreduce:DescribeCluster" ], "Resource": "arn:aws:elasticmapreduce:*:*:cluster/*" } ] }