Migrazione da Kinesis v1 a v2 - Amazon Monitron

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Migrazione da Kinesis v1 a v2

Se attualmente utilizzi lo schema di dati v1, potresti già inviare dati ad Amazon S3 o elaborare ulteriormente il payload del flusso di dati con Lambda.

Aggiornamento dello schema dei dati alla v2

Se hai già configurato un flusso di dati con lo schema v1, puoi aggiornare il processo di esportazione dei dati effettuando le seguenti operazioni:

  1. Apri la tua console Amazon Monitron.

  2. Vai al tuo progetto.

  3. Interrompi l'attuale esportazione dei dati in tempo reale.

  4. Avvia l'esportazione dei dati in tempo reale per creare un nuovo flusso di dati.

  5. Seleziona il flusso di dati appena creato.

  6. Scegli avvia l'esportazione dei dati in tempo reale. A questo punto, il nuovo schema invierà il payload attraverso il flusso di dati.

  7. (Facoltativo) Vai alla console Kinesis ed elimina il vecchio flusso di dati.

  8. Configura un nuovo metodo di consegna per il flusso di dati appena creato con lo schema v2.

Il tuo nuovo stream ora fornisce payload conformi allo schema v2 al tuo nuovo bucket. Ti consigliamo di utilizzare due bucket distinti per avere un formato coerente nel caso in cui desideri elaborare tutti i dati in questi bucket. Ad esempio, utilizzando altri servizi come Athena e. AWS Glue

Nota

Se stavi distribuendo i tuoi dati ad Amazon S3, scopri come archiviare i dati esportati in Amazon S3 per informazioni dettagliate su come distribuire i dati ad Amazon S3 con lo schema v2.

Nota

Se stavi usando una funzione Lambda per elaborare i tuoi payload, scopri come elaborare i dati con Lambda. Puoi anche fare riferimento alla sezione Aggiornamento con Lambda per ulteriori informazioni.

Aggiornamento dell'elaborazione dei dati con Lambda

L'aggiornamento dell'elaborazione dei dati con Lambda richiede di considerare che il flusso di dati v2 è ora basato su eventi. Il codice Lambda v1 iniziale potrebbe essere stato simile al seguente:

import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...

Poiché lo schema di dati v1 si trova su un percorso obsoleto, il codice Lambda precedente non funzionerà con tutti i nuovi flussi di dati.

Il seguente codice di esempio in Python elaborerà gli eventi dal flusso Kinesis con lo schema di dati v2. Questo codice utilizza il nuovo eventType parametro per orientare l'elaborazione verso il gestore appropriato:

import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions