Lambda 関数を使用したデータの事前処理 - Amazon Kinesis Data Analytics for SQL Applications 開発者

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Lambda 関数を使用したデータの事前処理

ストリームのデータの形式の変換、変更、強化、フィルタ処理が必要な場合、AWS Lambda 関数を使用してデータを事前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。

レコードを事前処理するために Lambda 関数を使用すると、次のシナリオで役立ちます。

  • 他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は、現時点では JSON データ形式または CSV

  • 集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。

  • 外挿やエラー修正などの他の Amazon サービスによるデータの強化。

  • レコードのフィールドに複雑な文字列変換を適用します。

  • データをクリーンアップするためのデータフィルタリング。

レコードを事前処理するための Lambda 関数の使用

Kinesis Data Analytics アプリケーションを作成するときに、ソースConnect するページで.

Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを事前処理するには

  1. にサインインします。AWS Management Consoleにアクセスして、Kinesis Data Analytics コンソール (https://console.aws.amazon.com/kinesisanalytics

  2. リポジトリの []ソースConnect するページで、[Enabled (有効)]()レコードの前処理AWS Lambdaセクションに追加します。

  3. 既に作成した Lambda 関数を使用するには、Lambda 関数選択します。

  4. Lambda 事前処理テンプレートの 1 つから新しい Lambda 関数を作成するには、ドロップダウンリストからテンプレートを選択します。次に、[View <template name> in Lambda (Lambda で <テンプレート名> を表示)] を選択して関数を編集します。

  5. 新しい Lambda 関数を作成するには、[新しいの作成。Lambda 関数の作成については、HelloWorld Lambda 関数を作成してコンソールを探る()AWS Lambda開発者ガイド

  6. 使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[$LATEST] を選択します。

レコードの事前処理のために Lambda 関数を選択または作成すると、アプリケーションの SQL コードが実行される前にレコードが事前処理されるか、アプリケーションがレコードからスキーマを生成します。

Lambda 事前処理アクセス権限

Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

アクセス権限ポリシーの追加については、「Amazon Kinesis Data Analytics for SQL Applications の認証とアクセスコントロール」を参照してください。

Lambda 事前処理メトリクス

Amazon CloudWatch を使用して、Lambda 呼び出しの数、処理されたバイト数、成功と失敗の数などをモニタリングすることができます。Kinesis Data Analytics Lambda 事前処理で出力される CloudWatch メトリクスについては、Amazon Kinesis Analytics

を使用するAWS LambdaKinesis プロデューサーライブラリを使用

-Kinesis プロデューサーライブラリ(KPL) は、ユーザーがフォーマットした小さなレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットの使用を容易にします。Java 用 Kinesis クライアントライブラリ (KCL) は、これらのレコードの集約解除をサポートしています。ただし、ストリームのコンシューマーとして AWS Lambda を使用する場合は、特別なモジュールを使用してレコードを集約解除する必要があります。

必要なプロジェクトコードと手順については、Kinesis プロデューサーライブラリの集約解除モジュールAWS LambdaGitHub で。このプロジェクトのコンポーネントを使用して、AWS Lambda 内のシリアル化された KPL データを Java、Node.js、Python で処理できます。これらのコンポーネントは、複数言語 KCL アプリケーションの一部として使用することもできます。

データ事前処理イベント入力データモデル / レコードレスポンスモデル

レコードを事前処理するために、Lambda 関数は、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

イベント入力データモデル

Kinesis Data Analytics は、Kinesis ストリームまたは Kinesis Data Firehose 配信ストリームから継続的にデータを読み取ります。取得するレコードのバッチごとに、各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。

事前処理関数への入力モデルは、データが Kinesis データストリームか Kinesis Data Firehose 配信ストリームのどちらから受信されたかによってわずかに異なります。

ソースが Kinesis Data Firehose 配信ストリームの場合、イベント入力データモデルは次の通りです。

Kinesis Data Firehose のリクエストデータモデル

フィールド 説明
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN)
streamArn 配信ストリーム ARN
レコード
フィールド 説明
recordId レコード ID (ランダム GUID)
kinesisFirehoseRecordMetadata
フィールド 説明
approximateArrivalTimestamp 配信ストリームレコードの概算到着時間
data Base64 でエンコードされたソースレコードのペイロード

次の例は、Firehose 配信ストリームからの入力を示しています。

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

ソースが Kinesis データストリームの場合、イベント入力データモデルは次のようになります。

Kinesis ストリームのリクエストデータモデル

フィールド 説明
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーション ARN
streamArn 配信ストリーム ARN
レコード
フィールド 説明
recordId Kinesis レコードのシーケンス番号に基づいたレコード ID
kinesisStreamRecordMetadata
フィールド 説明
sequenceNumber Kinesis ストリームレコードからのシーケンス番号
partitionKey Kinesis ストリームレコードからのパーティションキー
shardId Kinesis ストリームレコードからの ShardId
approximateArrivalTimestamp 配信ストリームレコードの概算到着時間
データ Base64 でエンコードされたソースレコードのペイロード

次の例は、Kinesis データストリームからの入力を示しています。

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

レコードレスポンスモデル

Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。これらには、次のパラメータが含まれている必要があります。そうでない場合は、Kinesis Data Analytics はそれらを拒否し、データ事前処理の失敗として処理します。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。

レスポンスデータモデル

レコード
フィールド 説明
recordId レコード ID は、呼び出し時に Kinesis Data Analytics から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ事前処理の失敗として扱われます。
result レコードのデータ変換のステータス。指定できる値は以下のとおりです。
  • Ok: レコードが正常に変換されました。Kinesis Data Analytics は SQL 処理のレコードを取り込みます。

  • Dropped: レコードは処理ロジックによって意図的に削除されました。Kinesis Data Analytics は SQL 処理からレコードを削除します。データペイロードフィールドは、Dropped レコードではオプションです。

  • ProcessingFailed: レコードを変換できませんでした。Kinesis Data Analytics は、Lambda 関数の処理が失敗したとみなし、エラーストリームにエラーを書き込みます。エラーストリームの詳細については、「エラー処理」を参照してください。データペイロードフィールドは、ProcessingFailed レコードではオプションです。

data base64 エンコード後の変換されたデータペイロード。アプリケーションの取り込みデータ形式が JSON である場合、各データペイロードには複数の JSON ドキュメントを含めることができます。または、アプリケーションの取り込みデータ形式が CSV である場合、それぞれに複数の CSV 行を含めることができます (各行には行の区切り文字が入ります)。Kinesis Data Analytics サービスは、同じデータペイロード内の複数の JSON ドキュメントまたは CSV 行のいずれかを使用して、データを正常に解析して処理します。

次の例は、Lambda 関数からの出力を示しています。

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

一般的なデータ事前処理の失敗

事前処理が失敗する一般的な理由は次のとおりです。

  • Lambda 関数に送信されたバッチ内のすべてのレコード (レコード ID 付き) が Kinesis Data Analytics サービスに返されるわけではありません。

  • レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、Dropped または ProcessingFailed レコードの場合はオプションです。

  • Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。

  • Lambda 関数のレスポンスが、AWS Lambdaサービス。

データ事前処理の失敗の場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から情報を得ることができます。

  • Kinesis Data Analytics アプリケーションMillisBehindLatest: アプリケーションの読み取りがストリーミングソースからどれだけ離れているかを示します。

  • Kinesis Data Analytics アプリケーションInputPreprocessingCloudWatch メトリクス: 特に成功と失敗の数を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。

  • AWS Lambda関数 CloudWatch メトリクスとログ。