翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Lambda 関数を使用したデータの事前処理
ストリームのデータの形式の変換、変更、強化、フィルタ処理が必要な場合、AWS Lambda 関数を使用してデータを事前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。
レコードを事前処理するために Lambda 関数を使用すると、次のシナリオで役立ちます。
-
他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は現在 JSON データ形式または CSV データ形式をサポートしています
-
集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。
-
外挿やエラー修正などの他の Amazon サービスによるデータの強化。
-
レコードのフィールドに複雑な文字列変換を適用します。
-
データをクリーンアップするためのデータフィルタリング。
レコードを事前処理するための Lambda 関数の使用
Kinesis Data Analytics アプリケーションを作成するときは、ソースConnect するページで.
Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを前処理するには
AWS Management Console にサインインし、Kinesis Data Analytics コンソール (https://console.aws.amazon.com/kinesisanalytics
) を開きます。 -
リポジトリの []ソースConnect するアプリケーションのページで、[Enabled (有効)]ので事前処理を記録するAWS Lambdaセクションに追加します。
-
既に作成した Lambda 関数を使用するには、Lambda 関数選択します。
-
Lambda 事前処理テンプレートの 1 つから新しい Lambda 関数を作成するには、ドロップダウンリストからテンプレートを選択します。次に、[View <template name> in Lambda (Lambda で <テンプレート名> を表示)] を選択して関数を編集します。
-
新しい Lambda 関数を作成するには、新規作成。Lambda 関数の作成については、「」を参照してください。HelloWorld Lambda 関数を作成してコンソールを探るのAWS Lambdaデベロッパーガイド。
-
使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[$LATEST] を選択します。
レコード事前処理のために Lambda 関数を選択または作成すると、アプリケーションの SQL コードが実行される前にレコードが事前処理されるか、アプリケーションがレコードからスキーマを生成します。
Lambda 事前処理アクセス権限
Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
アクセス権限ポリシーの追加については、「Amazon Kinesis Data Analytics の認証とアクセスコントロール 」を参照してください。
Lambda 事前処理メトリクス
Amazon CloudWatch を使用して、Lambda 呼び出しの数、処理されたバイト数、成功と失敗の数などをモニタリングすることができます。Kinesis Data Analytics Lambda 事前処理によって出力される CloudWatch メトリクスについては、「」を参照してください。Amazon Kinesis Analytics メトリクス。
を使用するAWS LambdaKinesis Producer Library
-Kinesis プロデューサーライブラリ(KPL) は、ユーザーがフォーマットした小さなレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットの使用を容易にします。Java 用の Kinesis クライアントライブラリ (KCL) では、これらのレコードの集約解除がサポートされています。ただし、ストリームのコンシューマーとして AWS Lambda を使用する場合は、特別なモジュールを使用してレコードを集約解除する必要があります。
必要なプロジェクトコードと手順については、Kinesis プロデューサーライブラリデアグリゲーションモジュールAWS Lambda
データ事前処理イベント入力データモデル / レコードレスポンスモデル
レコードを事前処理するために、Lambda 関数は、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。
イベント入力データモデル
Kinesis Data Analytics は、Kinesis データストリームまたは Kinesis Data Firehose 配信ストリームから継続的にデータを読み取ります。取得したレコードの各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。
事前処理機能への入力モデルは、データが Kinesis データストリームか Kinesis Data Firehose 配信ストリームのどちらから受信されたかによってわずかに異なります。
ソースが Kinesis Data Firehose 配信ストリームの場合、イベント入力データモデルは次の通りです。
Kinesis Data Firehose のリクエストデータモデル
フィールド | 説明 | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 呼び出し ID (ランダム GUID)。 | ||||||||||||
applicationArn |
Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN) | ||||||||||||
streamArn |
配信ストリーム ARN | ||||||||||||
レコード
|
次の例は、Firehose 配信ストリームからの入力を示しています。
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
ソースが Kinesis データストリームの場合、イベント入力データモデルは次の通りです。
Kinesis ストリームのリクエストデータモデル
フィールド | 説明 | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 呼び出し ID (ランダム GUID)。 | ||||||||||||||||||
applicationArn |
Kinesis Data Analytics アプリケーション ARN | ||||||||||||||||||
streamArn |
配信ストリーム ARN | ||||||||||||||||||
レコード
|
次の例は、Kinesis データストリームからの入力を示しています。
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
レコードレスポンスモデル
Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。これらには、次のパラメータが含まれている必要があります。含まれない場合、Kinesis Data Analytics はそれらのパラメータを拒否し、データ事前処理の失敗として処理します。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。
レスポンスデータモデル
レコード
|
次の例は、Lambda 関数からの出力を示しています。
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
一般的なデータ事前処理の失敗
事前処理が失敗する一般的な理由は次のとおりです。
-
Lambda 関数に送信されたバッチ内のすべてのレコード (レコード ID 付き) が Kinesis Data Analytics サービスに返されるわけではありません。
-
レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、
Dropped
またはProcessingFailed
レコードの場合はオプションです。 -
Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。
-
Lambda 関数のレスポンスは、AWS Lambdaサービスサービス。
データ事前処理の失敗の場合、Kinesis Data Analytics は成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗の洞察を得ることができます。
-
Kinesis Data Analytics アプリケーション
MillisBehindLatest
: アプリケーションの読み取りがストリーミングソースからどれだけ離れているかを示します。 -
Kinesis Data Analytics アプリケーション
InputPreprocessing
CloudWatch メトリクス: 特に成功と失敗の数を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。 -
AWS Lambda関数の CloudWatch メトリクスおよびログ