Kinesis v1 から v2 への移行 - Amazon Monitron

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis v1 から v2 への移行

現在 v1 データスキーマを使用している場合は、すでに Amazon S3 へデータを送信しているか、Lambda を使ってデータストリームのペイロードの処理を進めているかもしれません。

データスキーマを v2 に更新する

すでに v1 スキーマでデータストリームを設定している場合は、次の手順でデータエクスポートプロセスを更新できます。

  1. Amazon Monitron コンソールを開きます。

  2. プロジェクトに移動します。

  3. 現在のライブデータのエクスポートを停止します。

  4. ライブデータエクスポートを開始して、新しいデータストリームを作成します。

  5. 新しく作成したデータストリームを選択します。

  6. [ライブデータエクスポートを開始] を選択します。この時点で、新しいスキーマによってペイロードがデータストリーム経由で送信されます。

  7. (任意) Kinesis コンソールに移動し、古いデータストリームを削除します。

  8. v2 スキーマを使用して、新しく作成したデータストリーム用の新しい配信方法を設定します。

これで、新しいストリームで v2 スキーマに準拠したペイロードを新しいバケットに配信できます。バケット内のすべてのデータを処理したい場合に備えて、形式を統一するために 2 つの異なるバケットを使用することをおすすめします。例えば、Athena や AWS Glue など、他のサービスを使用します。

注記

データを Amazon S3 に配信していた場合、v2 スキーマを使用して Amazon S3 にデータを配信する方法の詳細については、「エクスポートしたデータを Amazon S3 に保存する」で手順をご確認ください。

注記

Lambda 関数を使用してペイロードを処理していた場合は、「Lambda でデータを処理する」で手順をご確認ください。Lambda を使って更新するセクションでは、より詳細な情報が記載されていますので、合わせて参照してください。

Lambda を使ってデータ処理を更新する

Lambda でデータ処理を更新するためには、v2 データストリームがイベントベースになったことを考慮する必要があります。最初の v1 Lambda コードは次のようなコードになっているかもしれません。

import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...

v1 データスキーマは廃止予定であるため、以前の Lambda コードはすべての新しいデータストリームで機能するわけではありません。

以下の Python サンプルコードでは、データスキーマ v2 を使用して Kinesis ストリームからのイベントを処理します。このコードは、新しい eventType パラメーターを使用して、処理を適切なハンドラーに向けます。

import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions