Paraleliza las tareas -

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Paraleliza las tareas

Para optimizar el rendimiento, es importante paralelizar las tareas de carga y transformación de datos. Como explicamos en Temas clave de Apache Spark, el número de particiones resilientes de conjuntos de datos distribuidos (RDD) es importante porque determina el grado de paralelismo. Cada tarea que crea Spark corresponde a una RDD partición en proporción 1:1. Para lograr el mejor rendimiento, debes entender cómo se determina el número de RDD particiones y cómo se optimiza ese número.

Si no tienes suficiente paralelismo, los siguientes síntomas se registrarán en las CloudWatchmétricas y en la interfaz de usuario de Spark.

CloudWatch métricas

Compruebe la CPUcarga y el uso de la memoria. Si algunos ejecutores no se procesan durante una fase de su trabajo, es conveniente mejorar el paralelismo. En este caso, durante el período de tiempo visualizado, el ejecutor 1 estaba realizando una tarea, pero los ejecutores restantes (2, 3 y 4) no. Se puede deducir que el controlador de Spark no asignó tareas a esos ejecutores.

El gráfico muestra el controlador y solo un ejecutor.

Interfaz de usuario de Spark

En la pestaña Escenario de la interfaz de usuario de Spark, puedes ver el número de tareas de una etapa. En este caso, Spark solo ha realizado una tarea.

""

Además, la cronología del evento muestra al Ejecutor 1 procesando una tarea. Esto significa que el trabajo de esta etapa se ejecutó íntegramente con un ejecutor, mientras que los demás estaban inactivos.

La cronología del evento muestra solo una tarea.

Si observa estos síntomas, pruebe las siguientes soluciones para cada fuente de datos.

Paralelizar la carga de datos desde Amazon S3

Para paralelizar las cargas de datos de Amazon S3, compruebe primero el número predeterminado de particiones. A continuación, puede determinar manualmente el número objetivo de particiones, pero asegúrese de evitar tener demasiadas particiones.

Determine el número predeterminado de particiones

En Amazon S3, el número inicial de RDD particiones de Spark (cada una de las cuales corresponde a una tarea de Spark) viene determinado por las características del conjunto de datos de Amazon S3 (por ejemplo, el formato, la compresión y el tamaño). Al crear un Spark AWS Glue DynamicFrame o un Spark DataFrame a partir de CSV objetos almacenados en Amazon S3, el número inicial de RDD particiones (NumPartitions) se puede calcular aproximadamente de la siguiente manera:

  • Tamaño del objeto <= 64 MB: NumPartitions = Number of Objects

  • Tamaño del objeto > 64 MB: NumPartitions = Total Object Size / 64 MB

  • No se puede dividir (gzip): NumPartitions = Number of Objects

Como se explica en la sección Reducir la cantidad de datos escaneados, Spark divide los objetos S3 grandes en divisiones que se pueden procesar en paralelo. Cuando el objeto es más grande que el tamaño de la división, Spark divide el objeto y crea una RDD partición (y una tarea) para cada división. El tamaño de división de Spark se basa en el formato de los datos y en el entorno de ejecución, pero esta es una aproximación inicial razonable. Algunos objetos se comprimen con formatos de compresión que no se pueden dividir, como gzip, por lo que Spark no puede dividirlos.

El NumPartitions valor puede variar en función del formato de datos, la compresión, la AWS Glue versión, el número de AWS Glue trabajadores y la configuración de Spark.

Por ejemplo, cuando cargas un único csv.gz objeto de 10 GB con un Spark DataFrame, el controlador de Spark solo creará una RDD partición (NumPartitions=1) porque gzip no se puede dividir. Esto supone una gran carga para un ejecutor de Spark en concreto y no se asigna ninguna tarea a los ejecutores restantes, como se describe en la siguiente figura.

Comprueba el número real de tareas (NumPartitions) de la etapa en la pestaña Stage de la interfaz de usuario web de Spark o ejecuta df.rdd.getNumPartitions() tu código para comprobar el paralelismo.

Cuando encuentres un archivo gzip de 10 GB, comprueba si el sistema que lo genera puede generarlo en un formato divisible. Si no es una opción, es posible que tengas que escalar la capacidad del clúster para procesar el archivo. Para ejecutar las transformaciones de manera eficiente en los datos que ha cargado, tendrá que reequilibrar los trabajadores del RDD clúster mediante la repartición.

Determine manualmente el número objetivo de particiones

En función de las propiedades de tus datos y de la implementación de determinadas funcionalidades por parte de Spark, es posible que acabes con un NumPartitions valor bajo, aunque el trabajo subyacente pueda seguir siendo paralelizado. Si NumPartitions es demasiado pequeño, df.repartition(N) ejecútalo para aumentar el número de particiones y poder distribuir el procesamiento entre varios ejecutores de Spark.

En este caso, la ejecución df.repartition(100) aumentará NumPartitions de 1 a 100, lo que generará 100 particiones de sus datos, cada una con una tarea que podrá asignarse a los demás ejecutores.

La operación repartition(N) divide todos los datos en partes iguales (10 GB/100 particiones = 100 MB/partición), lo que evita que los datos se desvíen hacia determinadas particiones.

nota

Cuando se ejecuta una operación de mezcla, como la que join se ejecuta, el número de particiones aumenta o disminuye de forma dinámica en función del valor de o. spark.sql.shuffle.partitions spark.default.parallelism Esto facilita un intercambio de datos más eficiente entre los ejecutores de Spark. Para obtener más información, consulta la documentación de Spark.

Su objetivo al determinar el número objetivo de particiones es maximizar el uso de los AWS Glue trabajadores aprovisionados. El número de AWS Glue trabajadores y el número de tareas de Spark están relacionados con el número devCPUs. Spark admite una tarea por cada CPU núcleo v. En AWS Glue la versión 3.0 o posterior, puedes calcular el número objetivo de particiones mediante la siguiente fórmula.

# Calculate NumPartitions by WorkerType numExecutors = (NumberOfWorkers - 1) numSlotsPerExecutor = 4 if WorkerType is G.1X 8 if WorkerType is G.2X 16 if WorkerType is G.4X 32 if WorkerType is G.8X NumPartitions = numSlotsPerExecutor * numExecutors # Example: Glue 4.0 / G.1X / 10 Workers numExecutors = ( 10 - 1 ) = 9 # 1 Worker reserved on Spark Driver numSlotsPerExecutor = 4 # G.1X has 4 vCpu core ( Glue 3.0 or later ) NumPartitions = 9 * 4 = 36

En este ejemplo, cada elemento de trabajo de G.1X proporciona cuatro CPU núcleos v a un ejecutor de Spark ()spark.executor.cores = 4. Spark admite una tarea para cada CPU núcleo v, por lo que los ejecutores de G.1X Spark pueden ejecutar cuatro tareas simultáneamente (). numSlotPerExecutor Este número de particiones aprovecha al máximo el clúster si las tareas tardan el mismo tiempo. Sin embargo, algunas tareas tardarán más que otras y crearán núcleos inactivos. Si esto ocurre, considere la posibilidad de multiplicar numPartitions por 2 o 3 para dividir y programar de manera eficiente las tareas más complicadas.

Demasiadas particiones

Un número excesivo de particiones crea un número excesivo de tareas. Esto provoca una gran carga en el controlador de Spark debido a la sobrecarga relacionada con el procesamiento distribuido, como las tareas de administración y el intercambio de datos entre los ejecutores de Spark.

Si el número de particiones de su trabajo es considerablemente mayor que el número objetivo de particiones, considere reducir el número de particiones. Puede reducir las particiones mediante las siguientes opciones:

  • Si el tamaño de los archivos es muy pequeño, utilice AWS Glue groupFiles. Puede reducir el paralelismo excesivo que resulta del lanzamiento de una tarea de Apache Spark para procesar cada archivo.

  • Se usa coalesce(N) para unir particiones. Se trata de un proceso de bajo coste. A la hora de reducir el número de particiones, coalesce(N) se prefiere en lugar de repartition(N) hacerlo, ya que repartition(N) realiza una reproducción aleatoria para distribuir equitativamente la cantidad de registros de cada partición. Esto aumenta los costos y la sobrecarga de administración.

  • Utilice Spark 3.x Adaptive Query Execution. Como se explica en la sección Temas clave de Apache Spark, Adaptive Query Execution proporciona una función para unir automáticamente el número de particiones. Puedes usar este enfoque cuando no sepas el número de particiones hasta que realices la ejecución.

Paraleliza la carga de datos desde JDBC

El número de RDD particiones de Spark viene determinado por la configuración. Tenga en cuenta que, de forma predeterminada, solo se ejecuta una tarea para escanear un conjunto de datos fuente completo mediante una SELECT consulta.

AWS Glue DynamicFrames Tanto Spark como Spark DataFrames admiten la carga de JDBC datos paralelizada en varias tareas. Esto se hace mediante el uso de where predicados para dividir una SELECT consulta en varias consultas. Para paralelizar las lecturas desdeJDBC, configure las siguientes opciones:

  • Para AWS Glue DynamicFrame, defina hashfield (o) y. hashexpression) hashpartition Para obtener más información, consulte Lectura de JDBC tablas en paralelo.

    connection_mysql8_options = { "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test", "dbtable": "medicare_tb", "user": "test", "password": "XXXXXXXXX", "hashexpression":"id", "hashpartitions":"10" } datasource0 = glueContext.create_dynamic_frame.from_options( 'mysql', connection_options=connection_mysql8_options, transformation_ctx= "datasource0" )
  • Para Spark DataFrame, establece numPartitionspartitionColumn,lowerBound, yupperBound. Para obtener más información, consulte JDBCA otras bases de datos.

    df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \ .option("dbtable", "medicare_tb") \ .option("user", "test") \ .option("password", "XXXXXXXXXX") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("lowerBound", "0") \ .option("upperBound", "1141455") \ .load() df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")

Paralelice la carga de datos de DynamoDB al utilizar el conector ETL

El número de RDD particiones de Spark viene determinado por el parámetro. dynamodb.splits Para paralelizar las lecturas de Amazon DynamoDB, configure las siguientes opciones:

Paralelice la carga de datos de Kinesis Data Streams

El número de RDD particiones de Spark viene determinado por el número de particiones de la transmisión de datos de Amazon Kinesis Data Streams de origen. Si solo tiene unos pocos fragmentos en su transmisión de datos, solo habrá unas pocas tareas de Spark. Esto puede provocar un bajo paralelismo en los procesos posteriores. Para paralelizar las lecturas de Kinesis Data Streams, configure las siguientes opciones:

  • Aumente el número de fragmentos para obtener más paralelismo al cargar datos de Kinesis Data Streams.

  • Si la lógica del microlote es lo suficientemente compleja, considere la posibilidad de volver a particionar los datos al principio del lote, después de eliminar las columnas innecesarias.

Para obtener más información, consulte Prácticas recomendadas para optimizar el coste y el rendimiento de los trabajos de streaming. AWS Glue ETL

Paraleliza las tareas después de cargar los datos

Para paralelizar las tareas tras la carga de datos, aumente el número de RDD particiones mediante las siguientes opciones:

  • Reparticione los datos para generar un mayor número de particiones, especialmente justo después de la carga inicial si la carga en sí no se pudo paralelizar.

    Llame a repartition() DynamicFrame o especifique el DataFrame número de particiones. Una buena regla general es multiplicar por dos o tres veces el número de núcleos disponibles.

    Sin embargo, al escribir una tabla particionada, esto puede provocar una explosión de archivos (cada partición puede generar un archivo en cada partición de la tabla). Para evitarlo, puedes reparticionar tu columna DataFrame por columnas. Esto utiliza las columnas de partición de la tabla para que los datos estén organizados antes de escribirlos. Puede especificar un número mayor de particiones sin tener archivos pequeños en las particiones de la tabla. Sin embargo, tenga cuidado de evitar la distorsión de los datos, ya que algunos valores de partición acaban quedándose con la mayoría de los datos y retrasando la finalización de la tarea.

  • Cuando haya combinaciones, aumente el valor. spark.sql.shuffle.partitions Esto también puede ayudar a solucionar cualquier problema de memoria durante la reproducción aleatoria.

    Cuando tienes más de 2.001 particiones de modo aleatorio, Spark utiliza un formato de memoria comprimido. Si tienes un número cercano a ese valor, quizás quieras establecer el spark.sql.shuffle.paritions valor por encima de ese límite para obtener una representación más eficiente.