Conceptos avanzados de la transmisión de AWS Glue
En las aplicaciones contemporáneas basadas en datos, la importancia de los datos disminuye con el tiempo y su valor pasa de ser predictivo a reactivo. Como resultado, los clientes desean procesar los datos en tiempo real para tomar decisiones más rápidas. Cuando se trata de fuentes de datos en tiempo real, como las de los sensores de IoT, es posible que los datos lleguen desordenados o que se produzcan retrasos en el procesamiento debido a la latencia de la red y a otros fallos relacionados con el origen durante la ingesta. Como parte de la plataforma de AWS Glue, la transmisión de AWS Glue aprovecha estas capacidades para ofrecer ETL de transmisión escalable y sin servidor, con tecnología de transmisión estructurado de Apache Spark, que permite a los usuarios procesar datos en tiempo real.
En este tema, exploraremos los conceptos y capacidades avanzados de la transmisión de AWS Glue.
Consideraciones de tiempo a la hora de procesar los trabajos de transmisión
Al procesar los trabajos de transmisión, hay cuatro nociones de tiempo:

-
Hora del evento: la marca temporal de cuando se produjo el evento. En la mayoría de los casos, este campo está incrustado en los propios datos del evento, en el origen.
-
Ventana de tiempo de evento: el intervalo entre dos horas de un evento. Como se muestra en el diagrama anterior, W1 es una ventana de tiempo de evento que va desde las 17:00 h hasta las 17:10 h. Cada ventana de tiempo de evento es una agrupación de varios eventos.
-
Hora de activación: la hora de activación controla la frecuencia con la que se procesan los datos y se actualizan los resultados. Este es el momento en que el procesamiento del microlote comenzó.
-
Tiempo de ingesta: el momento en que los datos de transmisión se ingirieron en el servicio de transmisión. Si la hora del evento no está integrada en el propio evento, en algunos casos se puede usar como ventana.
Creación de ventanas
La creación de ventanas es una técnica en la que se agrupan y agregan varios eventos por ventana de tiempo de evento. En los siguientes ejemplos, analizaremos las ventajas de la creación de ventanas y cuándo usarla.
Según el caso de uso empresarial, Spark admite tres tipos de ventanas de tiempo.
-
Ventana de saltos de tamaño constante: una serie de ventanas de tiempo de evento con tamaño fijo que no se superponen y que se agregan.
-
Ventana deslizante: son similares a las ventanas de saltos de tamaño constante, ya que tienen un “tamaño fijo”, pero las ventanas pueden superponerse o deslizarse siempre que la duración del deslizamiento sea inferior a la duración de la propia ventana.
-
Ventana de sesión: comienza con un evento de datos de entrada y continúa expandiéndose mientras reciba información dentro de un intervalo o periodo de inactividad. Una ventana de sesión puede tener un tamaño estático o dinámico igual a la longitud de la ventana, en función de las entradas.
Ventana de saltos de tamaño constante
La ventana de saltos de tamaño constante es una serie de ventanas de tiempo de evento con tamaño fijo que no se superponen y que se agregan. Vamos a entender esto con un ejemplo del mundo real.

La empresa ABC Auto desea realizar una campaña de marketing para una nueva marca de coches deportivos. Quiere elegir una ciudad donde haya la mayor cantidad de fanáticos de los coches deportivos. Para lograr este objetivo, muestra un breve anuncio de 15 segundos que presenta el coche en su sitio web. Todos los “clics” y la “ciudad” correspondiente se graban y se transmiten a Amazon Kinesis Data Streams. Queremos contar el número de clics en un intervalo de 10 minutos y agruparlo por ciudad para ver qué ciudad tiene la mayor demanda. El siguiente valor es el resultado de la agregación.
window_start_time | window_end_time | ciudad | total_clicks |
---|---|---|---|
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | Dallas | 75 |
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | Chicago | 10 |
2023-07-10 17:20:00 | 2023-07-10 17:30:00 | Dallas | 20 |
2023-07-10 17:20:00 | 2023-07-10 17:30:00 | Chicago | 50 |
Como se explicó anteriormente, estas ventanas de tiempo de evento son diferentes de los intervalos de tiempo de activación. Por ejemplo, aunque el tiempo de activación sea cada minuto, los resultados de la salida solo mostrarán ventanas de agregación de 10 minutos que no se superponen. Para la optimización, es mejor alinear el intervalo de activación con la ventana de tiempo de evento.
En la tabla anterior, Dallas registró 75 clics en la ventana de 17:00 h a 17:10 h, mientras que Chicago tuvo 10 clics. Además, no hay datos para la ventana de 17:10 h a 17:20 h de ninguna ciudad, por lo que se omite esta ventana.
Ahora puede realizar un análisis más detallado de estos datos en la aplicación de análisis posterior para determinar cuál es la mejor ciudad para llevar a cabo la campaña de marketing.
Uso de ventanas de saltos de tamaño constante en AWS Glue
-
Cree un DataFrame de Amazon Kinesis Data Streams y léalo. Ejemplo:
parsed_df = kinesis_raw_df \ .selectExpr('CAST(data AS STRING)') \ .select(from_json("data", ticker_schema).alias("data")) \ .select('data.event_time','data.ticker','data.trade','data.volume', 'data.price')
-
Procese los datos en una ventana de saltos de tamaño constante. En el siguiente ejemplo, los datos se agrupan en función del campo de entrada “event_time” en ventanas de tamaño constante de 10 minutos y escriben la salida en un lago de datos de Amazon S3.
grouped_df = parsed_df \ .groupBy(window("event_time", "10 minutes"), "city") \ .agg(sum("clicks").alias("total_clicks")) summary_df = grouped_df \ .withColumn("window_start_time", col("window.start")) \ .withColumn("window_end_time", col("window.end")) \ .withColumn("year", year("window_start_time")) \ .withColumn("month", month("window_start_time")) \ .withColumn("day", dayofmonth("window_start_time")) \ .withColumn("hour", hour("window_start_time")) \ .withColumn("minute", minute("window_start_time")) \ .drop("window") write_result = summary_df \ .writeStream \ .format("parquet") \ .trigger(processingTime="10 seconds") \ .option("checkpointLocation", "s3a://bucket-stock-stream/stock-stream-catalog-job/checkpoint/") \ .option("path", "s3a://bucket-stock-stream/stock-stream-catalog-job/summary_output/") \ .partitionBy("year", "month", "day") \ .start()
Ventana deslizante
Las ventanas deslizantes son similares a las de saltos de tamaño constante, ya que tienen un “tamaño fijo”, pero las ventanas pueden superponerse o deslizarse siempre que la duración del deslizamiento sea menor que la duración de la propia ventana. Debido a la naturaleza del deslizamiento, una entrada se puede vincular a varias ventanas.

Para entenderlo mejor, consideremos el ejemplo de un banco que quiere detectar posibles fraudes con tarjetas de crédito. Una aplicación de transmisión podría supervisar un flujo continuo de transacciones con tarjetas de crédito. Estas transacciones podrían agruparse en ventanas de 10 minutos de duración y, cada 5 minutos, la ventana se deslizaría hacia adelante, eliminando los 5 minutos de datos más antiguos y agregando los últimos 5 minutos de datos nuevos. Dentro de cada ventana, las transacciones podrían agruparse por país para detectar patrones sospechosos, como una transacción en EE. UU. seguida inmediatamente de otra en Australia. Para simplificar, clasificaremos estas transacciones como fraude cuando el importe total de las transacciones sea superior a 100 USD. Si se detecta un patrón de este tipo, es señal de un posible fraude y la tarjeta puede congelarse.
El sistema de procesamiento de tarjetas de crédito envía una serie de transacciones a Kinesis para cada identificador de tarjeta junto con el país. Un trabajo de AWS Glue ejecuta el análisis y produce el siguiente resultado agregado.
window_start_time | window_end_time | card_last_four | país | total_amount |
---|---|---|---|---|
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | EE. UU. | 85 |
2023-07-10 17:00:00 | 2023-07-10 17:10:00 | 6544 | Australia | 10 |
2023-07-10 17:05:45 | 2023-07-10 17:15:45 | 6544 | EE. UU. | 50 |
2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | EE. UU. | 50 |
2023-07-10 17:10:45 | 2023-07-10 17:20:45 | 6544 | Australia | 150 |
Según la agregación anterior, puede ver que el intervalo de 10 minutos se desliza cada 5 minutos, sumado por el importe de la transacción. La anomalía se detecta en el intervalo de 17:10 h a 17:20 h, donde hay un valor atípico, es decir, una transacción de 150 USD en Australia. AWS Glue puede detectar esta anomalía y activar un evento de alarma con la clave infractora en un tema de SNS mediante boto3. Además, una función de Lambda puede suscribirse a este tema y tomar medidas.
Datos de proceso en una ventana deslizante
La cláusula group-by
y la función de ventana se usan para implementar la ventana deslizante, como se muestra a continuación.
grouped_df = parsed_df \ .groupBy(window(col("event_time"), "10 minute", "5 min"), "country", "card_last_four") \ .agg(sum("tx_amount").alias("total_amount"))
Ventana de sesión
A diferencia de las dos ventanas anteriores, que tienen un tamaño fijo, la ventana de sesión puede tener un tamaño estático o dinámico de la longitud de la ventana, según las entradas. Una ventana de sesión comienza con un evento de datos de entrada y continúa expandiéndose mientras reciba información dentro de un intervalo o periodo de inactividad.

Tomemos un ejemplo. La empresa ABC Hotel quiere saber cuál es el momento más concurrido de la semana y ofrecer mejores ofertas a sus huéspedes. En cuanto un huésped se registra, se abre una ventana de sesión y Spark mantiene un estado de agregación para esa ventana de tiempo de evento. Cada vez que un invitado se registra, se genera un evento y se envía a Amazon Kinesis Data Streams. El hotel decide que, si no se hacen registros de entrada durante un periodo de 15 minutos, se puede cerrar la ventana de tiempo de evento. La siguiente ventana de tiempo de evento volverá a comenzar cuando haya un nuevo registro. El resultado es el siguiente.
window_start_time | window_end_time | ciudad | total_checkins |
---|---|---|---|
2023-07-10 17:02:00 | 2023-07-10 17:30:00 | Dallas | 50 |
2023-07-10 17:02:00 | 2023-07-10 17:30:00 | Chicago | 25 |
2023-07-10 17:40:00 | 2023-07-10 18:20:00 | Dallas | 75 |
2023-07-10 18:50:45 | 2023-07-10 19:15:45 | Dallas | 20 |
El primer registro se produjo con event_time a las 17:02 h. La ventana de tiempo del evento de agregación comenzará a las 17:02 h. Esta agregación continuará mientras recibamos los eventos dentro de los 15 minutos de duración. En el ejemplo anterior, el último evento que recibimos fue a las 17:15 h y, durante los siguientes 15 minutos, no hubo ningún evento. Como resultado, Spark cerró la ventana de tiempo del evento a las 17:15 h, más 15 minutos = 17:30 h, y la estableció entre las 17:02 h y las 17:30 h. Comenzó una nueva ventana de tiempo de evento a las 17:47 h cuando recibió un nuevo evento con los datos de registro.
Datos de proceso en una ventana de sesión
La cláusula group-by
y la función de ventana se usan para implementar la ventana deslizante.
grouped_df = parsed_df \ .groupBy(session_window(col("event_time"), "10 minute"), "city") \ .agg(count("check_in").alias("total_checkins"))
Modos de salida
El modo de salida es el modo en el que los resultados de la tabla ilimitada se escriben en el receptor externo. Hay tres modos disponibles. En el siguiente ejemplo, contamos las apariciones de una palabra a medida que se transmiten y se procesan líneas de datos en cada microlote.
-
Modo completo: toda la tabla de resultados se escribirá en el receptor después de cada procesamiento por microlotes, aunque el recuento de palabras no se haya actualizado en la ventana de tiempo del evento actual.
-
Modo de adición: este es el modo predeterminado, en el que solo se escribirán en el receptor las palabras o filas nuevas que se hayan agregado a la tabla de resultados desde la última activación. Este modo es adecuado para la transmisión sin estado de consultas como map, flatMap, filter, etc.
-
Modo de actualización: solo se escribirán en el receptor las palabras o filas de la tabla de resultados que se hayan actualizado o agregado desde la última activación.
nota
Modo de salida = las ventanas de sesión no admiten la opción de actualización.
Manejo de datos tardíos y marcas de agua
Cuando se trabaja con datos en tiempo real, es posible que se produzcan retrasos en la llegada de los datos debido a la latencia de la red y a fallos en las fases iniciales, por lo que necesitamos un mecanismo para volver a realizar la agregación en la ventana de tiempo de evento omitida. Sin embargo, para ello, es necesario mantener el estado. Al mismo tiempo, es necesario limpiar los datos más antiguos para limitar el tamaño del estado. En la versión 2.1 de Spark se agregó compatibilidad con una característica llamada “marca de agua” que mantiene el estado y permite al usuario especificar el umbral para los datos tardíos.
Con referencia al ejemplo anterior acerca de la cotización bursátil, consideremos que el límite permitido para los datos tardíos es de no más de 10 minutos. Para simplificar las cosas, supondremos que es una ventana de saltos de tamaño constante, que la cotización es AMZ y que la operación es BUY.

En el diagrama anterior, calculamos el volumen total en una ventana de saltos de tamaño constante de 10 minutos. Tenemos la activación a las 17:00 h, a las 17:10 h y a las 17:20 h. Sobre la flecha de la línea temporal, tenemos el flujo de datos de entrada y debajo está la tabla de resultados ilimitada.
En la primera ventana de saltos de tamaño constante de 10 minutos, agregamos en función de event_time, y total_volume se calculó como 30. En la segunda ventana de tiempo de evento, Spark obtuvo el primer evento de datos con event_time a las 17:02 h. Como este es el tiempo máximo de evento registrado hasta ahora por Spark, el umbral de la marca de agua se ha establecido 10 minutos antes (es decir, watermark_event_time es 16:52 h). Cualquier evento de datos con event_time posterior a las 16:52 h se tendrá en cuenta para la agregación con un límite de tiempo y cualquier evento de datos anterior a esa fecha se descartará. Esto permite a Spark mantener un estado intermedio durante 10 minutos adicionales para adaptarse a los datos tardíos. Alrededor de las 17:08 h, Spark recibió un evento con event_time a las 16:54 h, lo cual estaba dentro del umbral. Por lo tanto, Spark recalculó la ventana de tiempo de evento entre las 16:50 h y las 17:00 h, y el volumen total pasó de 30 a 60.
Sin embargo, en la hora de activación de las 17:20 h, cuando Spark recibió un evento con event_time a las 17:15 h, se estableció watermark_event_time a las 17:05 h. Por lo tanto, el evento de datos tardío con event_time a las 17:03 h se consideró “demasiado tarde” y se ignoró.
Watermark Boundary = Max(Event Time) - Watermark Threshold
Uso de marcas de agua en AWS Glue
Spark no emitirá ni escribirá los datos en el receptor externo hasta que se supere el límite de la marca de agua. Para implementar una marca de agua en AWS Glue, consulte el siguiente ejemplo.
grouped_df = parsed_df \ .withWatermark("event_time", "10 minutes") \ .groupBy(window("event_time", "5 minutes"), "ticker") \ .agg(sum("volume").alias("total_volume"))