Trabajar con operaciones de streaming en sesiones AWS Glue interactivas - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Trabajar con operaciones de streaming en sesiones AWS Glue interactivas

Cambio del tipo de sesión de streaming

Utilice el comando mágico de configuración de las sesiones interactivas de AWS Glue, %streaming, para definir el trabajo que vaya a ejecutar e inicializar una sesión interactiva de streaming.

Muestreo de flujo de entrada para desarrollo interactivo

Una herramienta que hemos creado para mejorar la experiencia interactiva en las sesiones AWS Glue interactivas es la adición de un nuevo método GlueContext para obtener una instantánea de una transmisión en formato estático DynamicFrame. GlueContextle permite inspeccionar, interactuar e implementar su flujo de trabajo.

Con la instancia de la clase GlueContext, podrá localizar el método getSampleStreamingDynamicFrame. Los argumentos requeridos para este método son:

  • dataFrame: The Spark Streaming DataFrame

  • options: consulte las opciones disponibles a continuación

Entre las opciones disponibles se incluyen:

  • windowSize: También se denomina duración de microlotes. Este parámetro determinará cuánto tiempo esperará una consulta de streaming después de que se haya desencadenado el lote anterior. El valor de este parámetro debe ser inferior al de pollingTimeInMs.

  • pollingTimeInSra.: El tiempo total durante el que se ejecutará el método. Desencadenará al menos un microlote para obtener registros de muestra del flujo de entrada.

  • recordPollingLimit: Este parámetro le ayuda a limitar el número total de registros de la transmisión que se van a sondear.

  • (Opcional) También se puede utilizar writeStreamFunction para aplicar esta función personalizada a cada función de muestreo de registros. Consulte los ejemplos en Scala y Python que aparecen a continuación.

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
nota

Cuando el elemento DynFrame que se muestrea está vacío, puede deberse a varios motivos:

  • El origen del streaming está configurado como “Latest” (Más reciente) y no se han ingerido datos nuevos durante el período de muestra.

  • El tiempo de sondeo no es suficiente para procesar los registros que ha ingerido. No aparecerán datos a menos que se haya procesado todo el lote.

Ejecución de aplicaciones de streaming en las sesiones interactivas

En las sesiones interactivas de AWS Glue, se puede ejecutar una aplicación de streaming de AWS Glue del mismo modo que se crearía una aplicación de streaming en la consola de AWS Glue. Dado que las sesiones interactivas se basan en sesiones, si se producen excepciones en el motor de ejecución no se detiene la sesión. Ahora ofrecemos el beneficio adicional de desarrollar una función por lotes de manera iterativa. Por ejemplo:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

En el ejemplo anterior, incluimos un uso no válido de un método y, a diferencia de trabajos de AWS Glue normales, que cerrarán toda la aplicación, el contexto de codificación y las definiciones del usuario se conservan por completo y la sesión sigue funcionando. No es necesario arrancar un nuevo clúster y volver a ejecutar toda la transformación anterior. Eso permite centrarse en iterar rápidamente las implementaciones de una función por lotes para obtener los resultados deseados.

Es importante tener en cuenta que Sesiones interactivas evalúa cada instrucción con una perspectiva de bloqueo, para que la sesión solo ejecute una instrucción a la vez. Dado que las consultas de streaming son continuas y nunca terminan, las sesiones con consultas de streaming activas no podrán gestionar ninguna instrucción de seguimiento a menos que se interrumpan. Puede ejecutar el comando de interrupción directamente desde Jupyter Notebook y nuestro kernel se encargará de realizar la cancelación.

Tomemos como ejemplo la siguiente secuencia de instrucciones que están esperando su ejecución:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely