Exportación de configuraciones para Nube de AWS destinos compatibles - AWS IoT Greengrass

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Exportación de configuraciones para Nube de AWS destinos compatibles

Los componentes de Greengrass definidos por el usuario se StreamManagerClient utilizan en el SDK de Stream Manager para interactuar con el administrador de transmisiones. Cuando un componente crea una transmisión o actualiza una transmisión, pasa un MessageStreamDefinition objeto que representa las propiedades de la transmisión, incluida la definición de exportación. El objeto ExportDefinition contiene las configuraciones de exportación definidas para el flujo. El administrador de flujos utiliza estas configuraciones de exportación para determinar dónde y cómo exportar el flujo.

Diagrama del modelo de objetos del tipo de ExportDefinition propiedad.

Puede definir cero o más configuraciones de exportación en un flujo, incluidas varias configuraciones de exportación para un único tipo de destino. Por ejemplo, puede exportar un flujo a dos canales AWS IoT Analytics y a un flujo de datos de Kinesis.

En caso de intentos fallidos de exportación, el administrador del flujo vuelve a intentar exportar los datos continuamente al Nube de AWS a intervalos de hasta cinco minutos. La cantidad de reintentos no tiene un límite máximo.

nota

StreamManagerClient también proporciona un destino que puede utilizar para exportar secuencias a un servidor HTTP. Este destino está pensado solo con fines de prueba. No es estable y no se admite para su uso en entornos de producción.

Usted es responsable del mantenimiento de estos recursos de Nube de AWS.

Canales de AWS IoT Analytics

El administrador de flujos admite la exportación automática a AWS IoT Analytics. AWS IoT Analytics le permite realizar análisis avanzados de sus datos para ayudarle a tomar decisiones de negocios y mejorar los modelos de machine learning. Para obtener más información, consulte ¿Qué es AWS IoT Analytics? en la AWS IoT Analytics Guía del usuario.

En el SDK de Stream Manager, sus componentes de Greengrass utilizan el IoTAnalyticsConfig para definir la configuración de exportación para este tipo de destino. Para obtener más información, consulte la referencia del SDK para el lenguaje de destino.

Requisitos

Este destino de exportación tiene los siguientes requisitos:

  • Los canales de destino AWS IoT Analytics deben estar en el mismo Cuenta de AWS y Región de AWS igual que el dispositivo principal de Greengrass.

  • El Autorizar a los dispositivos principales a interactuar con AWS los servicios debe permitir el permiso iotanalytics:BatchPutMessage para segmentar los canales. Por ejemplo:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iotanalytics:BatchPutMessage" ], "Resource": [ "arn:aws:iotanalytics:region:account-id:channel/channel_1_name", "arn:aws:iotanalytics:region:account-id:channel/channel_2_name" ] } ] }

    Puede conceder acceso granular o condicional a recursos (por ejemplo, utilizando un esquema de nomenclatura con comodín *) Para obtener más información, consulte Adición y eliminación de políticas de IAM en la Guía del usuario de IAM.

Exportación a AWS IoT Analytics

Para crear una transmisión a la que se exporteAWS IoT Analytics, sus componentes de Greengrass crean una transmisión con una definición de exportación que incluye uno o más IoTAnalyticsConfig objetos. Este objeto define los ajustes de exportación, como el canal de destino, el tamaño del lote, el intervalo del lote y la prioridad.

Cuando sus componentes de Greengrass reciben datos de los dispositivos, añaden mensajes que contienen una masa de datos a la transmisión de destino.

A continuación, el administrador de flujos exporta los datos en función de los ajustes del lote y la prioridad definidos en las configuraciones de exportación del flujo.

Amazon Kinesis Data Streams

El administrador de flujos permite exportar automáticamente a Amazon Kinesis Data Streams. Kinesis Data Streams se utiliza normalmente para agregar grandes volúmenes de datos y cargarlos en un almacén MapReduce de datos o un clúster. Para obtener más información, consulte ¿Qué son los Amazon Kinesis Data Streams? en la Guía para desarrolladores de Amazon Kinesis.

En el SDK de Stream Manager, sus componentes de Greengrass utilizan el KinesisConfig para definir la configuración de exportación para este tipo de destino. Para obtener más información, consulte la referencia del SDK para el lenguaje de destino.

Requisitos

Este destino de exportación tiene los siguientes requisitos:

  • Las transmisiones de destino de Kinesis Data Streams deben estar en el Cuenta de AWS mismo dispositivo principal de Greengrass Región de AWS y en el mismo.

  • El Autorizar a los dispositivos principales a interactuar con AWS los servicios debe permitir el permiso kinesis:PutRecords para destinarse a flujos de datos. Por ejemplo:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:PutRecords" ], "Resource": [ "arn:aws:kinesis:region:account-id:stream/stream_1_name", "arn:aws:kinesis:region:account-id:stream/stream_2_name" ] } ] }

    Puede conceder acceso granular o condicional a recursos (por ejemplo, utilizando un esquema de nomenclatura con comodín *) Para obtener información, consulte Adición y eliminación de políticas de IAM en la Guía del usuario de IAM.

Exportar a Kinesis Data Streams

Para crear una transmisión que se exporte a Kinesis Data Streams, sus componentes de Greengrass crean una transmisión con una definición de exportación que incluye uno o más objetos. KinesisConfig Este objeto define los ajustes de exportación, como el flujo de datos de destino, el tamaño del lote, el intervalo del lote y la prioridad.

Cuando sus componentes de Greengrass reciben datos de los dispositivos, añaden mensajes que contienen una masa de datos a la transmisión de destino. A continuación, el administrador de flujos exporta los datos en función de los ajustes del lote y la prioridad definidos en las configuraciones de exportación del flujo.

El administrador de flujos genera un UUID aleatorio único como clave de partición para cada registro cargado en Amazon Kinesis.

AWS IoT SiteWise propiedades de activos

El administrador de flujos admite la exportación automática a AWS IoT SiteWise. AWS IoT SiteWise le permite recopilar, organizar y analizar datos de equipos industriales a escala. Para obtener más información, consulte ¿Qué es AWS IoT SiteWise? en la AWS IoT SiteWiseGuía del usuario.

En el SDK de Stream Manager, sus componentes de Greengrass utilizan el IoTSiteWiseConfig para definir la configuración de exportación para este tipo de destino. Para obtener más información, consulte la referencia del SDK para el lenguaje de destino.

nota

AWStambién proporciona AWS IoT SiteWise componentes, que ofrecen una solución prediseñada que puede utilizar para transmitir datos desde fuentes OPC-UA. Para obtener más información, consulte Colector IoT SiteWise OPC-UA.

Requisitos

Este destino de exportación tiene los siguientes requisitos:

  • Las propiedades de los activos de destino AWS IoT SiteWise deben estar en el mismo Cuenta de AWS y Región de AWS igual que el dispositivo principal de Greengrass.

    nota

    Para ver la lista de Región de AWS dispositivos AWS IoT SiteWise compatibles, consulte los AWS IoT SiteWisepuntos finales y las cuotas en la Referencia AWSgeneral.

  • Autorizar a los dispositivos principales a interactuar con AWS los serviciosEl iotsitewise:BatchPutAssetPropertyValue debe permitir que el permiso se dirija a las propiedades de los activos. El siguiente ejemplo de política utiliza la clave iotsitewise:assetHierarchyPath de condición para conceder acceso a un activo raíz de destino y a sus elementos secundarios. Puede quitar Condition de la política para permitir el acceso a todos sus recursos AWS IoT SiteWise o especificar ARN de los activos individuales.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "iotsitewise:BatchPutAssetPropertyValue", "Resource": "*", "Condition": { "StringLike": { "iotsitewise:assetHierarchyPath": [ "/root node asset ID", "/root node asset ID/*" ] } } } ] }

    Puede conceder acceso granular o condicional a recursos (por ejemplo, utilizando un esquema de nomenclatura con comodín *) Para obtener información, consulte Adición y eliminación de políticas de IAM en la Guía del usuario de IAM.

    Para obtener información de seguridad importante, consulte la BatchPutAssetPropertyValue autorización en la Guía del AWS IoT SiteWise usuario.

Exportación a AWS IoT SiteWise

Para crear una transmisión a la que se exporteAWS IoT SiteWise, sus componentes de Greengrass crean una transmisión con una definición de exportación que incluye uno o más IoTSiteWiseConfig objetos. Este objeto define los ajustes de exportación, como el tamaño del lote, el intervalo del lote y la prioridad.

Cuando sus componentes de Greengrass reciben datos de propiedades de activos de los dispositivos, añaden mensajes que contienen los datos al flujo de destino. Los mensajes son objetos PutAssetPropertyValueEntry serializados en JSON que contienen los valores de propiedad de una o más propiedades de los activos. Para obtener más información, consulte Añadir un mensaje para los AWS IoT SiteWisedestinos de exportación.

nota

Cuando envía datos a AWS IoT SiteWise, los datos deben cumplir todos los requisitos de la acción BatchPutAssetPropertyValue. Para obtener más información, consulte BatchPutAssetPropertyValue en la Referencia de la API de AWS IoT SiteWise.

A continuación, el administrador de flujos exporta los datos en función de los ajustes del lote y la prioridad definidos en las configuraciones de exportación del flujo.

Puede ajustar la configuración del administrador de transmisiones y la lógica de los componentes de Greengrass para diseñar su estrategia de exportación. Por ejemplo:

  • Para realizar exportaciones prácticamente en tiempo real, establezca ajustes del tamaño de lote e intervalos bajos y añada los datos al flujo cuando lo reciba.

  • Para optimizar el procesamiento por lotes, mitigar las restricciones de ancho de banda o minimizar los costos, sus componentes de Greengrass pueden agrupar timestamp-quality-value los puntos de datos (TQV) recibidos para una sola propiedad de activo antes de agregar los datos a la transmisión. Una estrategia consiste en agrupar las entradas de hasta 10 combinaciones diferentes de propiedades y activos (o alias de propiedades) en un mensaje, en lugar de enviar más de una entrada para la misma propiedad. Esto ayuda al Administrador de flujos a mantenerse dentro de las cuotas de AWS IoT SiteWise.

Objetos de Amazon S3

El administrador de flujos permite exportar automáticamente a Amazon S3. Puede utilizar Amazon S3 para almacenar y recuperar grandes cantidades de datos. Para obtener más información, consulte ¿Qué es Amazon S3? en la Guía para desarrolladores de Amazon Simple Storage Service.

En el SDK de Stream Manager, sus componentes de Greengrass utilizan el S3ExportTaskExecutorConfig para definir la configuración de exportación para este tipo de destino. Para obtener más información, consulte la referencia del SDK para el lenguaje de destino.

Requisitos

Este destino de exportación tiene los siguientes requisitos:

  • Los buckets Amazon S3 de destino deben estar en el mismo lugar Cuenta de AWS que el dispositivo principal de Greengrass.

  • Si una función Lambda que se ejecuta en el modo contenedor de Greengrass escribe archivos de entrada en un directorio de archivos de entrada, debe montar el directorio como un volumen en el contenedor con permisos de escritura. Esto garantiza que los archivos se escriban en el sistema de archivos raíz y sean visibles para el componente del administrador de flujos, que se ejecuta fuera del contenedor.

  • Si un componente de contenedor de Docker escribe archivos de entrada en un directorio de archivos de entrada, debe montar el directorio como un volumen en el contenedor con permisos de escritura. Esto garantiza que los archivos se escriban en el sistema de archivos raíz y sean visibles para el componente del administrador de flujos, que se ejecuta fuera del contenedor.

  • El Autorizar a los dispositivos principales a interactuar con AWS los servicios debe permitir los siguientes permisos para los buckets de destino. Por ejemplo:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": [ "arn:aws:s3:::bucket-1-name/*", "arn:aws:s3:::bucket-2-name/*" ] } ] }

    Puede conceder acceso granular o condicional a recursos (por ejemplo, utilizando un esquema de nomenclatura con comodín *) Para obtener información, consulte Adición y eliminación de políticas de IAM en la Guía del usuario de IAM.

Exportar a Amazon S3.

Para crear una transmisión que se exporte a Amazon S3, sus componentes de Greengrass utilizan el S3ExportTaskExecutorConfig objeto para configurar la política de exportación. La política define los ajustes de exportación, como el umbral de carga multiparte y la prioridad. Para las exportaciones de Amazon S3, el administrador de flujos carga los datos que lee de los archivos locales en el dispositivo principal. Para iniciar una carga, sus componentes de Greengrass añaden una tarea de exportación a la transmisión de destino. La tarea de exportación contiene información sobre el archivo de entrada y el objeto Amazon S3 de destino. El administrador de transmisiones ejecuta las tareas en la secuencia en que se adjuntan a la transmisión.

nota

El bucket de destino ya debe existir en su Cuenta de AWS. Si no existe un objeto para la clave especificada, el administrador de flujos crea el objeto automáticamente.

El administrador de flujos utiliza la propiedad de umbral de carga multiparte, el ajuste del tamaño mínimo de las piezas y el tamaño del archivo de entrada para determinar cómo cargar los datos. El umbral de carga multiparte debe ser igual o mayor que el tamaño mínimo de la pieza. Si desea cargar datos en paralelo, puede crear varios flujos.

Las claves que especifican los objetos de Amazon S3 de destino pueden incluir DateTimeFormatter cadenas Java válidas en !{timestamp:value} los marcadores de posición. Puede utilizar estos marcadores de fecha y hora para particionar los datos en Amazon S3 en función de la hora en la que se cargaron los datos del archivo de entrada. Por ejemplo, el siguiente nombre de clave se resuelve en un valor como my-key/2020/12/31/data.txt.

my-key/!{timestamp:YYYY}/!{timestamp:MM}/!{timestamp:dd}/data.txt
nota

Si desea supervisar el estado de exportación de un flujo, cree primero un flujo de estado y, a continuación, configure el flujo de exportación para utilizarlo. Para obtener más información, consulte Supervise las tareas de exportación.

Administrar datos de entrada

Puede crear un código que las aplicaciones de IoT usen para administrar el ciclo de vida de los datos de entrada. El siguiente ejemplo de flujo de trabajo muestra cómo se pueden utilizar los componentes de Greengrass para gestionar estos datos.

  1. Un proceso local recibe datos de dispositivos o periféricos y, a continuación, los escribe en los archivos de un directorio del dispositivo principal. Estos son los archivos de entrada del administrador de flujos.

  2. Un componente de Greengrass escanea el directorio y añade una tarea de exportación a la secuencia de destino cuando se crea un nuevo archivo. La tarea es un objeto S3ExportTaskDefinition serializado en JSON que especifica la URL del archivo de entrada, el bucket y la clave de Amazon S3 de destino y los metadatos de usuario opcionales.

  3. El administrador de flujos lee el archivo de entrada y exporta los datos a Amazon S3 en el orden de las tareas anexas. El bucket de destino ya debe existir en su Cuenta de AWS. Si no existe un objeto para la clave especificada, el administrador de flujos crea el objeto automáticamente.

  4. El componente Greengrass lee los mensajes de un flujo de estado para supervisar el estado de la exportación. Una vez finalizadas las tareas de exportación, el componente Greengrass puede eliminar los archivos de entrada correspondientes. Para obtener más información, consulte Supervise las tareas de exportación.

Supervise las tareas de exportación

Puede crear un código que las aplicaciones de IoT utilizan para monitorear el estado de sus exportaciones de Amazon S3. Sus componentes de Greengrass deben crear un flujo de estado y, a continuación, configurar el flujo de exportación para escribir actualizaciones de estado en el flujo de estado. Una sola transmisión de estado puede recibir actualizaciones de estado de varias transmisiones que se exportan a Amazon S3.

En primer lugar, cree un flujo para utilizarlo como flujo de estado. Puede configurar las políticas de tamaño y retención del flujo para controlar la vida útil de los mensajes de estado. Por ejemplo:

  • Configure Persistence en Memory si no desea guardar los mensajes de estado.

  • Configure StrategyOnFull en OverwriteOldestData para que no se pierdan los nuevos mensajes de estado.

A continuación, cree o actualice el flujo de exportación para usar el flujo de estado. En concreto, defina la propiedad de configuración de estado de la configuración de S3ExportTaskExecutorConfig exportación del flujo. Esta configuración indica al administrador de transmisiones que escriba mensajes de estado sobre las tareas de exportación en la secuencia de estado. En el objeto StatusConfig, especifique el nombre del flujo de estado y el nivel de detalle. Los siguientes valores admitidos van desde el menos detallado (ERROR) al más detallado (). TRACE El valor predeterminado es INFO.

  • ERROR

  • WARN

  • INFO

  • DEBUG

  • TRACE

El siguiente ejemplo de flujo de trabajo muestra cómo los componentes de Greengrass pueden utilizar un flujo de estado para supervisar el estado de la exportación.

  1. Como se describió en el flujo de trabajo anterior, un componente de Greengrass añade una tarea de exportación a un flujo que está configurado para escribir mensajes de estado sobre las tareas de exportación en un flujo de estado. La operación de incorporación devuelve un número de secuencia que representa el ID de la tarea.

  2. Un componente de Greengrass lee los mensajes secuencialmente del flujo de estado y, a continuación, filtra los mensajes en función del nombre del flujo y el ID de la tarea o en función de una propiedad de la tarea de exportación del contexto del mensaje. Por ejemplo, el componente Greengrass puede filtrar por la URL del archivo de entrada de la tarea de exportación, que está representada por el S3ExportTaskDefinition objeto en el contexto del mensaje.

    Los siguientes códigos de estado indican que una tarea de exportación ha alcanzado un estado completo:

    • Success. Se ha completado correctamente la carga.

    • Failure. El administrador de flujos detectó un error; por ejemplo, el bucket especificado no existe. Tras resolver el problema, puede volver a añadir la tarea de exportación al flujo.

    • Canceled. La tarea se detuvo porque se eliminó la definición de transmisión o exportación o porque el período time-to-live (TTL) de la tarea expiró.

    nota

    La tarea también puede tener un estado de InProgress o Warning. El administrador de flujos emite advertencias cuando un evento devuelve un error que no afecta a la ejecución de la tarea. Por ejemplo, si no se limpia una carga parcial, aparecerá una advertencia.

  3. Una vez finalizadas las tareas de exportación, el componente Greengrass puede eliminar los archivos de entrada correspondientes.

El siguiente ejemplo muestra cómo un componente de Greengrass puede leer y procesar los mensajes de estado.

Python
import time from stream_manager import ( ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, StreamManagerClient, ) from stream_manager.util import Util client = StreamManagerClient() try: # Read the statuses from the export status stream is_file_uploaded_to_s3 = False while not is_file_uploaded_to_s3: try: messages_list = client.read_messages( "StatusStreamName", ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000) ) for message in messages_list: # Deserialize the status message first. status_message = Util.deserialize_json_bytes_to_obj(message.payload, StatusMessage) # Check the status of the status message. If the status is "Success", # the file was successfully uploaded to S3. # If the status was either "Failure" or "Cancelled", the server was unable to upload the file to S3. # We will print the message for why the upload to S3 failed from the status message. # If the status was "InProgress", the status indicates that the server has started uploading # the S3 task. if status_message.status == Status.Success: logger.info("Successfully uploaded file at path " + file_url + " to S3.") is_file_uploaded_to_s3 = True elif status_message.status == Status.Failure or status_message.status == Status.Canceled: logger.info( "Unable to upload file at path " + file_url + " to S3. Message: " + status_message.message ) is_file_uploaded_to_s3 = True time.sleep(5) except StreamManagerException: logger.exception("Exception while running") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Referencia del SDK de Python: read_messages | StatusMessage

Java
import com.amazonaws.greengrass.streammanager.client.StreamManagerClient; import com.amazonaws.greengrass.streammanager.client.StreamManagerClientFactory; import com.amazonaws.greengrass.streammanager.client.utils.ValidateAndSerialize; import com.amazonaws.greengrass.streammanager.model.ReadMessagesOptions; import com.amazonaws.greengrass.streammanager.model.Status; import com.amazonaws.greengrass.streammanager.model.StatusConfig; import com.amazonaws.greengrass.streammanager.model.StatusLevel; import com.amazonaws.greengrass.streammanager.model.StatusMessage; try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { try { boolean isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream List<Message> messages = client.readMessages("StatusStreamName", new ReadMessagesOptions().withMinMessageCount(1L).withReadTimeoutMillis(1000L)); for (Message message : messages) { // Deserialize the status message first. StatusMessage statusMessage = ValidateAndSerialize.deserializeJsonBytesToObj(message.getPayload(), StatusMessage.class); // Check the status of the status message. If the status is "Success", the file was successfully uploaded to S3. // If the status was either "Failure" or "Canceled", the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (Status.Success.equals(statusMessage.getStatus())) { System.out.println("Successfully uploaded file at path " + FILE_URL + " to S3."); isS3UploadComplete = true; } else if (Status.Failure.equals(statusMessage.getStatus()) || Status.Canceled.equals(statusMessage.getStatus())) { System.out.println(String.format("Unable to upload file at path %s to S3. Message %s", statusMessage.getStatusContext().getS3ExportTaskDefinition().getInputUrl(), statusMessage.getMessage())); sS3UploadComplete = true; } } } catch (StreamManagerException ignored) { } finally { // Sleep for sometime for the S3 upload task to complete before trying to read the status message. Thread.sleep(5000); } } catch (e) { // Properly handle errors. } } catch (StreamManagerException e) { // Properly handle exception. }

Referencia del SDK de Java: readMessages | StatusMessage

Node.js
const { StreamManagerClient, ReadMessagesOptions, Status, StatusConfig, StatusLevel, StatusMessage, util, } = require(*'aws-greengrass-stream-manager-sdk'*); const client = new StreamManagerClient(); client.onConnected(async () => { try { let isS3UploadComplete = false; while (!isS3UploadComplete) { try { // Read the statuses from the export status stream const messages = await c.readMessages("StatusStreamName", new ReadMessagesOptions() .withMinMessageCount(1) .withReadTimeoutMillis(1000)); messages.forEach((message) => { // Deserialize the status message first. const statusMessage = util.deserializeJsonBytesToObj(message.payload, StatusMessage); // Check the status of the status message. If the status is 'Success', the file was successfully uploaded to S3. // If the status was either 'Failure' or 'Cancelled', the server was unable to upload the file to S3. // We will print the message for why the upload to S3 failed from the status message. // If the status was "InProgress", the status indicates that the server has started uploading the S3 task. if (statusMessage.status === Status.Success) { console.log(`Successfully uploaded file at path ${FILE_URL} to S3.`); isS3UploadComplete = true; } else if (statusMessage.status === Status.Failure || statusMessage.status === Status.Canceled) { console.log(`Unable to upload file at path ${FILE_URL} to S3. Message: ${statusMessage.message}`); isS3UploadComplete = true; } }); // Sleep for sometime for the S3 upload task to complete before trying to read the status message. await new Promise((r) => setTimeout(r, 5000)); } catch (e) { // Ignored } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Referencia del SDK de Node.js: readMessages | StatusMessage