Migrasi dari Kinesis v1 ke v2 - Amazon Monitron

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Migrasi dari Kinesis v1 ke v2

Jika saat ini Anda menggunakan skema data v1, Anda mungkin sudah mengirim data ke Amazon S3, atau memproses lebih lanjut payload aliran data dengan Lambda.

Memperbarui skema data ke v2

Jika Anda telah mengonfigurasi aliran data dengan skema v1, Anda dapat memperbarui proses ekspor data Anda dengan melakukan hal berikut:

  1. Buka konsol Amazon Monitron Anda.

  2. Arahkan ke proyek Anda.

  3. Hentikan ekspor data langsung saat ini.

  4. Mulai ekspor data langsung untuk membuat aliran data baru.

  5. Pilih aliran data yang baru dibuat.

  6. Pilih mulai ekspor data langsung. Pada titik ini, skema baru akan mengirim muatan Anda melalui aliran data.

  7. (Opsional) Buka konsol Kinesis dan hapus aliran data lama Anda.

  8. Konfigurasikan metode pengiriman baru untuk aliran data yang baru Anda buat dengan skema v2.

Streaming baru Anda sekarang mengirimkan muatan yang sesuai dengan skema v2 ke bucket baru Anda. Sebaiknya gunakan dua bucket berbeda untuk memiliki format yang konsisten jika Anda ingin memproses semua data dalam bucket ini. Misalnya, menggunakan layanan lain seperti Athena dan. AWS Glue

catatan

Jika Anda mengirimkan data ke Amazon S3, pelajari cara menyimpan data yang diekspor di Amazon S3 untuk detail tentang cara mengirimkan data Anda ke Amazon S3 dengan skema v2.

catatan

Jika Anda menggunakan fungsi Lambda untuk memproses muatan, pelajari cara memproses data dengan Lambda. Anda juga dapat merujuk ke bagian pembaruan dengan Lambda untuk informasi lebih lanjut.

Memperbarui pemrosesan data dengan Lambda

Memperbarui pemrosesan data dengan Lambda mengharuskan Anda mempertimbangkan bahwa aliran data v2 sekarang berbasis peristiwa. Kode Lambda v1 awal Anda mungkin mirip dengan yang berikut ini:

import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...

Karena skema data v1 berada di jalur penghentian, kode Lambda sebelumnya tidak akan berfungsi dengan semua aliran data baru.

Kode contoh Python berikut akan memproses peristiwa dari aliran Kinesis dengan skema data v2. Kode ini menggunakan eventType parameter baru untuk mengarahkan pemrosesan ke handler yang sesuai:

import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions