「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」
Lambda 関数を使用したデータの事前処理
ストリームのデータの形式の変換、変更、強化、フィルタ処理が必要な場合、AWS Lambda 関数を使用してデータを事前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。
Lambda 関数によるレコードの事前処理は、次のシナリオで役立ちます。
-
他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は、現在 JSON データ形式または CSV データ形式をサポートしています。
-
集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。
-
外挿やエラー修正などの他の AWS サービスによるデータの強化。
-
レコードのフィールドに複雑な文字列変換を適用します。
-
データをクリーンアップするためのデータフィルタリング。
Lambda 関数を使用したレコードの事前処理
Kinesis Data Analytics アプリケーションを作成するときに、[Connect to a Source (ソースに接続)] ページで Lambda 事前処理を有効にします。
Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを事前処理するには
-
AWS マネジメントコンソールにサインインして Kinesis Data Analytics コンソール ( https://console.aws.amazon.com/kinesisanalytics
) を開きます。 -
アプリケーションの [Connect to a Source] (ソースに接続) ページの [AWS Lambda でのレコード事前処理] セクションで [有効化] を選択します。
-
既に作成した Lambda 関数を使用するには、[Lambda function (Lambda 関数)] ドロップダウンリストで関数を選択します。
-
Lambda 事前処理テンプレートの 1 つから新規の Lambda 関数を作成する場合は、ドロップダウンリストからテンプレートを選択します。次に、[View <template name> in Lambda (Lambda で <テンプレート名> を表示)] を選択して関数を編集します。
-
新規の Lambda 関数を作成するには、[Create new (新規作成)] を選択します。Lambda 関数の作成については、https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html 開発者ガイドの「AWS LambdaHelloWorld Lambda 関数を作成してコンソールを探る」を参照してください。
-
使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[$LATEST] を選択します。
レコードの事前処理に Lambda 関数を選択または作成すると、アプリケーションの SQL コードがレコードからスキーマを実行したり、アプリケーションがレコードからスキーマを生成したりする前に、レコードが事前処理されます。
Lambda 事前処理アクセス権限
Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。
{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }
アクセス権限ポリシーの追加については、「Amazon Kinesis Data Analytics for SQL Applications の認証とアクセスコントロール」を参照してください。
Lambda 事前処理メトリクス
Amazon CloudWatch を使用して、Lambda 呼び出しの数、処理されたバイト数、成功と失敗の数などをモニタリングすることができます。Kinesis Data Analytics Lambda の事前処理で出力される CloudWatch メトリクスについては、「Amazon Kinesis Analytics のメトリクス」を参照してください。
Kinesis Producer Library で AWS Lambda を使用
Kinesis プロデューサーライブラリ (KPL) は、小さなユーザーフォーマットレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットを有効に利用しています。Java Kinesis Client Library (KCL) は、これらのレコードの集約解除をサポートしています。ただし、ストリームのコンシューマーとして AWS Lambda を使用する場合は、特別なモジュールを使用してレコードを集約解除する必要があります。
必要なプロジェクトコードと手順については、GitHub で AWS Lambda 用の Kinesis プロデューサーライブラリの集約解除モジュール
データ事前処理イベント入力データモデル / レコードレスポンスモデル
レコードを事前処理するには、Lambda 関数が、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。
イベント入力データモデル
Kinesis Data Analytics は、Kinesis データストリームまたは Kinesis Data Firehose 配信ストリームから継続的にデータを読み取ります。取得したレコードの各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。
事前処理関数への入力モデルは、Kinesis データストリームと Kinesis Data Firehose 配信ストリームのどちらからデータを受け取ったかによってわずかに異なります。
ソースが Kinesis Data Firehose 配信ストリームの場合、イベント入力データモデルは次のようになります。
Kinesis Data Firehose のリクエストデータモデル
フィールド | 説明: | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 呼び出し ID (ランダム GUID)。 | ||||||||||||
applicationArn |
Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN) | ||||||||||||
streamArn |
配信ストリーム ARN | ||||||||||||
レコード
|
次の例は、Firehose 配信ストリームからの入力を示しています。
{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }
ソースが Kinesis データストリームの場合、イベント入力データモデルは次のようになります。
Kinesis ストリームのリクエストデータモデル
Field | 説明: | ||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
invocationId |
Lambda 呼び出し ID (ランダム GUID)。 | ||||||||||||||||||
applicationArn |
Kinesis Data Analytics アプリケーションの ARN | ||||||||||||||||||
streamArn |
配信ストリーム ARN | ||||||||||||||||||
レコード
|
次の例は、Kinesis データストリームからの入力を示しています。
{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }
レコードレスポンスモデル
Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。レコードには次のパラメータが含まれている必要があります。含まれていない場合、Kinesis Data Analytics がレコードを拒否し、データ事前処理を失敗と見なします。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。
レスポンスデータモデル
レコード
|
次の例は、Lambda 関数からの出力を示しています。
{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }
一般的なデータ事前処理の失敗
事前処理が失敗する一般的な理由は次のとおりです。
-
Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。
-
レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、
Dropped
またはProcessingFailed
レコードの場合はオプションです。 -
Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。
-
Lambda 関数のレスポンスが、AWS Lambda サービスによって定められたレスポンスの上限を超えています。
データ事前処理の失敗の場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から情報を得ることができます。
-
Kinesis Data Analytics アプリケーション
MillisBehindLatest
: アプリケーションがストリーミング ソースからどの程度読み取っているかを示します。 -
Kinesis Data Analytics アプリケーション
InputPreprocessing
CloudWatch メトリック: 成功数と失敗数、その他の統計を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。 -
AWS Lambda 関数の CloudWatch メトリクスおよびログ。