Seleccione sus preferencias de cookies

Usamos cookies esenciales y herramientas similares que son necesarias para proporcionar nuestro sitio y nuestros servicios. Usamos cookies de rendimiento para recopilar estadísticas anónimas para que podamos entender cómo los clientes usan nuestro sitio y hacer mejoras. Las cookies esenciales no se pueden desactivar, pero puede hacer clic en “Personalizar” o “Rechazar” para rechazar las cookies de rendimiento.

Si está de acuerdo, AWS y los terceros aprobados también utilizarán cookies para proporcionar características útiles del sitio, recordar sus preferencias y mostrar contenido relevante, incluida publicidad relevante. Para aceptar o rechazar todas las cookies no esenciales, haga clic en “Aceptar” o “Rechazar”. Para elegir opciones más detalladas, haga clic en “Personalizar”.

Capturar datos de IoT directamente en Amazon S3 de forma rentable con AWS IoT Greengrass - Recomendaciones de AWS

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Capturar datos de IoT directamente en Amazon S3 de forma rentable con AWS IoT Greengrass

Creado por Sebastian Viviani (AWS) y Rizwan Syed (AWS)

Resumen

Este patrón le muestra cómo incorporar datos de Internet de las cosas (IoT) de forma rentable directamente en un depósito de Amazon Simple Storage Service (Amazon S3) mediante un dispositivo AWS IoT Greengrass versión 2. El dispositivo ejecuta un componente personalizado que lee los datos de IoT y los guarda en un almacenamiento persistente (es decir, un disco o volumen local). A continuación, el dispositivo comprime los datos de IoT en un archivo de Apache Parquet y los carga periódicamente en un bucket de S3.

La cantidad y la velocidad de los datos de IoT que ingiere están limitadas únicamente por las capacidades de su hardware periférico y el ancho de banda de la red. Puede utilizar Amazon Athena para analizar de forma rentable los datos ingeridos. Athena admite archivos comprimidos de Apache Parquet y la visualización de datos mediante el uso de Amazon Managed Grafana.

Requisitos previos y limitaciones

Requisitos previos 

Limitaciones

  • Los datos de este patrón no se cargan en tiempo real en el bucket de S3. Hay un período de retraso y puede configurarlo. Los datos se almacenan temporalmente en el dispositivo perimetral y, una vez transcurrido el período, se cargan.

  • El SDK está disponible en Java, Node.js o Python.

Arquitectura

Pila de tecnología de destino

  • Amazon S3

  • AWS IoT Greengrass

  • Intermediario MQTT

  • Componente Stream Manager

Arquitectura de destino

El siguiente diagrama muestra una arquitectura diseñada para capturar datos de sensores de IoT y almacenarlos en un bucket de S3.

Diagrama de arquitectura

En el diagrama, se muestra el siguiente flujo de trabajo:

  1. Las actualizaciones de varios sensores (por ejemplo, de temperatura y válvula) se publican en un agente de MQTT local.

  2. El compresor de archivos Parquet que está suscrito a estos sensores actualiza los temas y recibe estas actualizaciones.

  3. El compresor de archivos Parquet almacena las actualizaciones de forma local.

  4. Una vez transcurrido el período, los archivos almacenados se comprimen en archivos Parquet y se pasan al administrador de flujos para cargarlos en el bucket de S3 especificado.

  5. El administrador de transmisión carga los archivos de Parquet en el bucket de S3.

nota

El administrador de transmisiones (StreamManager) es un componente administrado. Para ver ejemplos de cómo exportar datos a Amazon S3, consulte Stream Manager en la documentación de AWS IoT Greengrass. Puede utilizar un bróker MQTT local como componente u otro bróker como Eclipse Mosquitto.

Herramientas

Herramientas de AWS

  • Amazon Athena es un servicio de consultas interactivo que facilita el análisis de datos en Amazon S3 con SQL estándar.

  • Amazon Simple Storage Service (Amazon S3) es un servicio de almacenamiento de objetos basado en la nube que le ayuda a almacenar, proteger y recuperar cualquier cantidad de datos.

  • AWS IoT Greengrass es un servicio en la nube y de tiempo de ejecución de borde de IoT de código abierto que le ayuda a crear, implementar y administrar aplicaciones de IoT en los dispositivos.

Otras herramientas

  • Apache Parquet es un formato de archivo de datos de código abierto orientado por columnas diseñado para el almacenamiento y la recuperación.

  • El MQTT (Message Queuing Telemetry Transport) es un protocolo de mensajería ligero diseñado para dispositivos restringidos.

Prácticas recomendadas

Utilice el formato de partición adecuado para los datos cargados

No hay requisitos específicos para los nombres de los prefijos raíz del bucket de S3 (por ejemplo, "myAwesomeDataSet/" o"dataFromSource"), pero le recomendamos que utilice una partición y un prefijo significativos para que sea fácil entender el propósito del conjunto de datos.

También le recomendamos que utilice la partición correcta en Amazon S3 para que las consultas se ejecuten de forma óptima en el conjunto de datos. En el siguiente ejemplo, los datos se dividen en formato HIVE para optimizar la cantidad de datos escaneados por cada consulta de Athena. Esto puede mejorar el rendimiento y reducir los costos.

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

Epics

TareaDescripciónHabilidades requeridas

Cree un bucket de S3.

  1. Cree un bucket de S3 o utilice uno existente.

  2. Cree un prefijo significativo para el depósito de S3 en el que desee capturar los datos de IoT (por ejemplo,s3:\\<bucket>\<prefix>).

  3. Registre su prefijo para usarlo más adelante.

Desarrollador de aplicaciones

Añada permisos de IAM al bucket de S3.

Para conceder a los usuarios acceso de escritura al bucket y al prefijo de S3 que creó anteriormente, añada la siguiente política de IAM a su rol de AWS IoT Greengrass:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

Para obtener más información, consulte Creación de una política de IAM para acceder a los recursos de Amazon S3 en la documentación de Aurora.

A continuación, actualice la política de recursos (si es necesario) del bucket de S3 para permitir el acceso de escritura con los principios de AWS correctos.

Desarrollador de aplicaciones

Configure su entorno

TareaDescripciónHabilidades requeridas

Cree un bucket de S3.

  1. Cree un bucket de S3 o utilice uno existente.

  2. Cree un prefijo significativo para el depósito de S3 en el que desee capturar los datos de IoT (por ejemplo,s3:\\<bucket>\<prefix>).

  3. Registre su prefijo para usarlo más adelante.

Desarrollador de aplicaciones

Añada permisos de IAM al bucket de S3.

Para conceder a los usuarios acceso de escritura al bucket y al prefijo de S3 que creó anteriormente, añada la siguiente política de IAM a su rol de AWS IoT Greengrass:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

Para obtener más información, consulte Creación de una política de IAM para acceder a los recursos de Amazon S3 en la documentación de Aurora.

A continuación, actualice la política de recursos (si es necesario) del bucket de S3 para permitir el acceso de escritura con los principios de AWS correctos.

Desarrollador de aplicaciones
TareaDescripciónHabilidades requeridas

Actualizar la receta del componente.

Actualice la configuración del componente al crear una implementación según el siguiente ejemplo:

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

<region>Sustitúyalo por su región <period> de AWS, su intervalo periódico, <s3Bucket> su bucket de S3 y <s3prefix> por su prefijo.

Desarrollador de aplicaciones

Cree el componente.

Realice una de las siguientes acciones:

  • Cree el componente.

  • Añada el componente a la canalización de CI/CD (si existe). Asegúrese de copiar el artefacto del repositorio de artefactos al depósito de artefactos de AWS IoT Greengrass. A continuación, cree o actualice su componente AWS IoT Greengrass.

  • nota

    Añada el agente MQTT como componente o agréguelo manualmente más adelante. : Esta decisión afecta al esquema de autenticación que puede usar con el intermediario. Al añadir manualmente un agente, se desvincula el agente de AWS IoT Greengrass y se habilita cualquier esquema de autenticación compatible del agente. Los componentes de agente proporcionados por AWS tienen esquemas de autenticación predefinidos. Para obtener más información, consulte el agente MQTT 3.1.1 (Moquette) y el agente MQTT 5 (EMQX).

Desarrollador de aplicaciones

Actualice el cliente MQTT.

El código de ejemplo no utiliza la autenticación porque el componente se conecta localmente al intermediario. Si su situación es diferente, actualice la sección del cliente de MQTT según sea necesario. Además, realice lo siguiente:

  1. Actualice los temas de MQTT de la suscripción.

  2. Actualice el analizador de mensajes MQTT según sea necesario, ya que los mensajes de cada fuente pueden diferir.

Desarrollador de aplicaciones

Cree e implemente el componente AWS IoT Greengrass

TareaDescripciónHabilidades requeridas

Actualizar la receta del componente.

Actualice la configuración del componente al crear una implementación según el siguiente ejemplo:

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

<region>Sustitúyalo por su región <period> de AWS, su intervalo periódico, <s3Bucket> su bucket de S3 y <s3prefix> por su prefijo.

Desarrollador de aplicaciones

Cree el componente.

Realice una de las siguientes acciones:

  • Cree el componente.

  • Añada el componente a la canalización de CI/CD (si existe). Asegúrese de copiar el artefacto del repositorio de artefactos al depósito de artefactos de AWS IoT Greengrass. A continuación, cree o actualice su componente AWS IoT Greengrass.

  • nota

    Añada el agente MQTT como componente o agréguelo manualmente más adelante. : Esta decisión afecta al esquema de autenticación que puede usar con el intermediario. Al añadir manualmente un agente, se desvincula el agente de AWS IoT Greengrass y se habilita cualquier esquema de autenticación compatible del agente. Los componentes de agente proporcionados por AWS tienen esquemas de autenticación predefinidos. Para obtener más información, consulte el agente MQTT 3.1.1 (Moquette) y el agente MQTT 5 (EMQX).

Desarrollador de aplicaciones

Actualice el cliente MQTT.

El código de ejemplo no utiliza la autenticación porque el componente se conecta localmente al intermediario. Si su situación es diferente, actualice la sección del cliente de MQTT según sea necesario. Además, realice lo siguiente:

  1. Actualice los temas de MQTT de la suscripción.

  2. Actualice el analizador de mensajes MQTT según sea necesario, ya que los mensajes de cada fuente pueden diferir.

Desarrollador de aplicaciones
TareaDescripciónHabilidades requeridas

Actualice la implementación del dispositivo principal.

Si la implementación del dispositivo principal de AWS IoT Greengrass versión 2 ya existe, revise la implementación. Si la implementación no existe, cree una nueva.

Para asignar al componente el nombre correcto, actualice la configuración del administrador de registros para el nuevo componente (si es necesario) en función de lo siguiente:

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

Por último, complete la revisión de la implementación de su dispositivo principal AWS IoT Greengrass.

Desarrollador de aplicaciones

Añada el componente al dispositivo principal de AWS IoT Greengrass versión 2

TareaDescripciónHabilidades requeridas

Actualice la implementación del dispositivo principal.

Si la implementación del dispositivo principal de AWS IoT Greengrass versión 2 ya existe, revise la implementación. Si la implementación no existe, cree una nueva.

Para asignar al componente el nombre correcto, actualice la configuración del administrador de registros para el nuevo componente (si es necesario) en función de lo siguiente:

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

Por último, complete la revisión de la implementación de su dispositivo principal AWS IoT Greengrass.

Desarrollador de aplicaciones
TareaDescripciónHabilidades requeridas

Compruebe los registros del volumen de AWS IoT Greengrass.

Compruebe lo siguiente:

  • El cliente MQTT se ha conectado correctamente al agente MQTT local.

  • El cliente MQTT está suscrito a los temas correctos.

  • Están llegando al bróker mensajes de actualización de sensores sobre temas relacionados con el MQTT.

  • La compresión de parquet se produce en cada intervalo periódico.

Desarrollador de aplicaciones

Compruebe el bucket de S3.

Compruebe si los datos se cargan en el bucket de S3. Puede ver los archivos que se están cargando en cada período.

También puede comprobar si los datos se han cargado en el depósito de S3 consultando los datos en la siguiente sección.

Desarrollador de aplicaciones

Verifique la captura de datos en el depósito de S3

TareaDescripciónHabilidades requeridas

Compruebe los registros del volumen de AWS IoT Greengrass.

Compruebe lo siguiente:

  • El cliente MQTT se ha conectado correctamente al agente MQTT local.

  • El cliente MQTT está suscrito a los temas correctos.

  • Están llegando al bróker mensajes de actualización de sensores sobre temas relacionados con el MQTT.

  • La compresión de parquet se produce en cada intervalo periódico.

Desarrollador de aplicaciones

Compruebe el bucket de S3.

Compruebe si los datos se cargan en el bucket de S3. Puede ver los archivos que se están cargando en cada período.

También puede comprobar si los datos se han cargado en el depósito de S3 consultando los datos en la siguiente sección.

Desarrollador de aplicaciones
TareaDescripciónHabilidades requeridas

Crear una base de datos y tabla.

  1. Cree una base de datos de AWS Glue (si es necesario).

  2. Cree una tabla en AWS Glue manualmente o ejecute un rastreador en AWS Glue.

Desarrollador de aplicaciones

Concede a Athena acceso a los datos.

  1. Actualice los permisos para permitir que Athena acceda al bucket de S3. Para obtener más información, consulte Acceso detallado a bases de datos y tablas en el catálogo de datos de AWS Glue en la documentación de Athena.

  2. Consulte la tabla en su base de datos.

Desarrollador de aplicaciones

Configurar consultas desde Athena

TareaDescripciónHabilidades requeridas

Crear una base de datos y tabla.

  1. Cree una base de datos de AWS Glue (si es necesario).

  2. Cree una tabla en AWS Glue manualmente o ejecute un rastreador en AWS Glue.

Desarrollador de aplicaciones

Concede a Athena acceso a los datos.

  1. Actualice los permisos para permitir que Athena acceda al bucket de S3. Para obtener más información, consulte Acceso detallado a bases de datos y tablas en el catálogo de datos de AWS Glue en la documentación de Athena.

  2. Consulte la tabla en su base de datos.

Desarrollador de aplicaciones

Solución de problemas

ProblemaSolución

El cliente MQTT no se puede conectar

El cliente MQTT no se suscribe

Validar los permisos en el bróker MQTT. Si tiene un bróker MQTT de AWS, consulte Broker MQTT 3.1.1 (Moquette) y MQTT 5 Broker (EMQX).

Los archivos Parquet no se crean

  • Compruebe que los temas de MQTT son correctos.

  • Compruebe que los mensajes de MQTT de los sensores tengan el formato correcto.

Los objetos no se cargan en el bucket de S3

  • Compruebe que dispone de conectividad a Internet y de un punto de conexión.

  • Compruebe que la política de recursos de su bucket de S3 sea correcta.

  • Compruebe los permisos para la función de dispositivo principal de AWS IoT Greengrass versión 2.

Recursos relacionados

Información adicional

Análisis de costos

El siguiente escenario de análisis de costos demuestra cómo el enfoque de captura de datos incluido en este patrón puede afectar los costos de captura de datos en la nube de AWS. Los ejemplos de precios de este escenario se basan en los precios vigentes en el momento de la publicación. Los precios están sujetos a cambios. Además, los costos pueden variar en función de la región de AWS, las Service quotas de AWS y otros factores relacionados con el entorno de nube.

Conjunto de señales de entrada

Este análisis utiliza el siguiente conjunto de señales de entrada como base para comparar los costos de captura de IoT con otras alternativas disponibles.

Número de señales

Frecuencia

Datos por señal

125

25 Hz

8 bytes

En este escenario, el sistema recibe 125 señales. Cada señal es de 8 bytes y se produce cada 40 milisegundos (25 Hz). Estas señales pueden venir individualmente o agrupadas en una carga útil común. Tiene la opción de dividir y empaquetar estas señales según sus necesidades. También puede determinar la latencia. La latencia consiste en el período de tiempo para recibir, acumular y capturar los datos.

A modo de comparación, la operación de captura de este escenario se basa en la región de AWS de us-east-1. La comparación de costos se aplica únicamente a los servicios de AWS. Otros costos, como el hardware o la conectividad, no se tienen en cuenta en el análisis.

Comparaciones de costos

La siguiente tabla muestra el costo mensual en dólares estadounidenses (USD) de cada método de ingestión.

Método

Costo mensual

AWS SiteWise IoT*

331,77 USD

AWS IoT SiteWise Edge con paquete de procesamiento de datos (mantiene todos los datos en el borde)

200 USD

Reglas de AWS IoT Core y Amazon S3 para acceder a datos sin procesar

84,54 USD

Compresión de archivos Parquet en el borde y carga a Amazon S3

0,5 USD

*Los datos se deben reducir para cumplir con las Service quotas. Esto significa que se pierden algunos datos con este método.

Métodos alternativos

En esta sección se muestran los costos equivalentes de los siguientes métodos alternativos:

  • AWS IoT SiteWise: cada señal debe cargarse en un mensaje individual. Por lo tanto, la cantidad total de mensajes por mes es de 125 × 25 × 3600 × 24 × 30, o sea, 8 100 millones de mensajes por mes. Sin embargo, AWS IoT solo SiteWise puede gestionar 10 puntos de datos por segundo por propiedad. Suponiendo que los datos se reduzcan a 10 Hz, la cantidad de mensajes por mes se reduce a 125 × 10 × 3600 × 24 × 30, o sea, 3,24 mil millones. Si utiliza el componente de publicación que agrupa las medidas en grupos de 10 (a 1 USD por millón de mensajes), el costo mensual será de 324 USD al mes. Suponiendo que cada mensaje tenga 8 bytes (1 Kb/125), se trata de 25,92 GB de almacenamiento de datos. Esto añade un costo mensual de 7,77 USD. El costo total del primer mes es de 331,77 USD y aumenta 7,77 USD cada mes.

  • AWS IoT SiteWise Edge con paquete de procesamiento de datos, que incluye todos los modelos y señales completamente procesados en el borde (es decir, sin ingesta de nubes): puede usar el paquete de procesamiento de datos como alternativa para reducir los costos y configurar todos los modelos que se calculan en el borde. Esto puede funcionar solo para el almacenamiento y la visualización, incluso si no se realiza ningún cálculo real. En este caso, es necesario utilizar un hardware potente para la puerta de enlace perimetral. Hay un costo fijo de 200 USD al mes.

  • Ingesta directa a AWS IoT Core por parte de MQTT y una regla de IoT para almacenar los datos sin procesar en Amazon S3: suponiendo que todas las señales se publiquen en una carga útil común, el número total de mensajes publicados en AWS IoT Core es de 25 × 3600 × 24 × 30, es decir, 64,8 millones por mes. A 1 USD por millón de mensajes, se trata de un costo mensual de 64,8 USD al mes. A 0,15 USD por millón de activaciones de reglas y con una regla por mensaje, esto añade un costo mensual de 19,44 USD. Con un costo de 0,023 USD por GB de almacenamiento en Amazon S3, se añaden otros 1,5 USD al mes (aumentando cada mes para reflejar los nuevos datos). El costo total del primer mes es de 84,54 USD y aumenta 1,5 USD cada mes.

  • Comprimir los datos en el borde de un archivo Parquet y subirlos a Amazon S3 (método propuesto): la relación de compresión depende del tipo de datos. Con los mismos datos industriales probados para MQTT, el total de datos de salida de un mes completo es de 1,2 Gb. Esto cuesta 0,03 USD al mes. Los índices de compresión (que utilizan datos aleatorios) descritos en otros puntos de referencia son del orden del 66 por ciento (lo que se acerca más al peor de los casos). El total de datos es de 21 Gb y cuesta 0,5 USD al mes.

Generador de archivos de parquet

El siguiente ejemplo de código muestra la estructura de un generador de archivos Parquet escrito en Python. El ejemplo de código es solo para fines ilustrativos y no funcionará si se pega en su entorno.

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)
PrivacidadTérminos del sitioPreferencias de cookies
© 2025, Amazon Web Services, Inc o sus afiliados. Todos los derechos reservados.