Lambda 関数を使用したデータの事前処理 - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

Lambda 関数を使用したデータの事前処理

ストリームのデータの形式の変換、変更、強化、フィルタ処理が必要な場合、AWS Lambda 関数を使用してデータを事前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。

Lambda 関数によるレコードの事前処理は、次のシナリオで役立ちます。

  • 他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は、現在 JSON データ形式または CSV データ形式をサポートしています。

  • 集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。

  • 外挿やエラー修正などの他の AWS サービスによるデータの強化。

  • レコードのフィールドに複雑な文字列変換を適用します。

  • データをクリーンアップするためのデータフィルタリング。

Lambda 関数を使用したレコードの事前処理

Kinesis Data Analytics アプリケーションを作成するときに、[Connect to a Source (ソースに接続)] ページで Lambda 事前処理を有効にします。

Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを事前処理するには

  1. AWS マネジメントコンソールにサインインして Kinesis Data Analytics コンソール ( https://console.aws.amazon.com/kinesisanalytics) を開きます。

  2. アプリケーションの [Connect to a Source] (ソースに接続) ページの [AWS Lambda でのレコード事前処理] セクションで [有効化] を選択します。

  3. 既に作成した Lambda 関数を使用するには、[Lambda function (Lambda 関数)] ドロップダウンリストで関数を選択します。

  4. Lambda 事前処理テンプレートの 1 つから新規の Lambda 関数を作成する場合は、ドロップダウンリストからテンプレートを選択します。次に、[View <template name> in Lambda (Lambda で <テンプレート名> を表示)] を選択して関数を編集します。

  5. 新規の Lambda 関数を作成するには、[Create new (新規作成)] を選択します。Lambda 関数の作成については、https://docs.aws.amazon.com/lambda/latest/dg/getting-started-create-function.html 開発者ガイドの「AWS LambdaHelloWorld Lambda 関数を作成してコンソールを探る」を参照してください。

  6. 使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[$LATEST] を選択します。

レコードの事前処理に Lambda 関数を選択または作成すると、アプリケーションの SQL コードがレコードからスキーマを実行したり、アプリケーションがレコードからスキーマを生成したりする前に、レコードが事前処理されます。

Lambda 事前処理アクセス権限

Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

アクセス権限ポリシーの追加については、「Amazon Kinesis Data Analytics for SQL Applications の認証とアクセスコントロール」を参照してください。

Lambda 事前処理メトリクス

Amazon CloudWatch を使用して、Lambda 呼び出しの数、処理されたバイト数、成功と失敗の数などをモニタリングすることができます。Kinesis Data Analytics Lambda の事前処理で出力される CloudWatch メトリクスについては、「Amazon Kinesis Analytics のメトリクス」を参照してください。

Kinesis Producer Library で AWS Lambda を使用

Kinesis プロデューサーライブラリ (KPL) は、小さなユーザーフォーマットレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットを有効に利用しています。Java Kinesis Client Library (KCL) は、これらのレコードの集約解除をサポートしています。ただし、ストリームのコンシューマーとして AWS Lambda を使用する場合は、特別なモジュールを使用してレコードを集約解除する必要があります。

必要なプロジェクトコードと手順については、GitHub で AWS Lambda 用の Kinesis プロデューサーライブラリの集約解除モジュールについて参照してください。このプロジェクトのコンポーネントを使用して、AWS Lambda 内のシリアル化された KPL データを Java、Node.js、Python で処理できます。これらのコンポーネントは、複数言語 KCL アプリケーションの一部として使用することもできます。

データ事前処理イベント入力データモデル / レコードレスポンスモデル

レコードを事前処理するには、Lambda 関数が、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

イベント入力データモデル

Kinesis Data Analytics は、Kinesis データストリームまたは Kinesis Data Firehose 配信ストリームから継続的にデータを読み取ります。取得したレコードの各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。

事前処理関数への入力モデルは、Kinesis データストリームと Kinesis Data Firehose 配信ストリームのどちらからデータを受け取ったかによってわずかに異なります。

ソースが Kinesis Data Firehose 配信ストリームの場合、イベント入力データモデルは次のようになります。

Kinesis Data Firehose のリクエストデータモデル

フィールド 説明:
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN)
streamArn 配信ストリーム ARN
レコード
Field 説明:
recordId レコード ID (ランダム GUID)
kinesisFirehoseRecordMetadata
Field 説明:
approximateArrivalTimestamp 配信ストリームレコードの概算到着時間
data Base64 でエンコードされたソースレコードのペイロード

次の例は、Firehose 配信ストリームからの入力を示しています。

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

ソースが Kinesis データストリームの場合、イベント入力データモデルは次のようになります。

Kinesis ストリームのリクエストデータモデル

Field 説明:
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーションの ARN
streamArn 配信ストリーム ARN
レコード
Field 説明:
recordId Kinesis レコードのシーケンス番号に基づいたレコード ID
kinesisStreamRecordMetadata
Field 説明:
sequenceNumber Kinesis ストリームレコードからのシーケンス番号
partitionKey Kinesis ストリームレコードからのパーティションキー
shardId Kinesis ストリームレコードからの ShardId
approximateArrivalTimestamp 配信ストリームレコードの概算到着時間
データ Base64 でエンコードされたソースレコードのペイロード

次の例は、Kinesis データストリームからの入力を示しています。

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

レコードレスポンスモデル

Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。レコードには次のパラメータが含まれている必要があります。含まれていない場合、Kinesis Data Analytics がレコードを拒否し、データ事前処理を失敗と見なします。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。

レスポンスデータモデル

レコード
Field 説明:
recordId レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ事前処理の失敗として扱われます。
result レコードのデータ変換のステータス。指定できる値は以下のとおりです。
  • Ok: レコードは正常に変換されました。 Kinesis Data Analytics は、SQL処理のためにレコードをインジェストします。

  • Dropped: レコードは、処理ロジックによって意図的にドロップされました。 Kinesis Data Analytics は、SQL処理からレコードをドロップします。データペイロードフィールドは、Dropped レコードではオプションです。

  • ProcessingFailed: レコードを変換できませんでした。 Kinesis Data Analytics は、Lambda関数によって処理に失敗したとみなし、エラーストリームにエラーを書き込みます。エラーストリームの詳細については、「エラー処理」を参照してください。データペイロードフィールドは、ProcessingFailed レコードではオプションです。

data base64 エンコード後の変換されたデータペイロード。アプリケーションの取り込みデータ形式が JSON である場合、各データペイロードには複数の JSON ドキュメントを含めることができます。または、アプリケーションの取り込みデータ形式が CSV である場合、それぞれに複数の CSV 行を含めることができます (各行には行の区切り文字が入ります)。Kinesis Data Analytics サービスは、同じデータペイロード内の複数の JSON ドキュメントまたは CSV 行のいずれかを使用して、データを正常に解析して処理します。

次の例は、Lambda 関数からの出力を示しています。

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

一般的なデータ事前処理の失敗

事前処理が失敗する一般的な理由は次のとおりです。

  • Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。

  • レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、Dropped または ProcessingFailed レコードの場合はオプションです。

  • Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。

  • Lambda 関数のレスポンスが、AWS Lambda サービスによって定められたレスポンスの上限を超えています。

データ事前処理の失敗の場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスを監視して、失敗から情報を得ることができます。

  • Kinesis Data Analytics アプリケーション MillisBehindLatest: アプリケーションがストリーミング ソースからどの程度読み取っているかを示します。

  • Kinesis Data Analytics アプリケーション InputPreprocessing CloudWatch メトリック: 成功数と失敗数、その他の統計を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。

  • AWS Lambda 関数の CloudWatch メトリクスおよびログ。