Lambda 関数を使用したデータの事前処理 - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Lambda 関数を使用したデータの事前処理

注記

2023 年 9 月 12 日以降、SQL 用 Kinesis Data Analytics をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「制限」を参照してください。

ストリーム内のデータに形式の変換、変換、エンリッチメント、またはフィルタリングが必要な場合は、 AWS Lambda 関数を使用してデータを前処理できます。アプリケーションの SQL コードが実行される前、またはアプリケーションがデータストリームからスキーマを作成する前に、これを行うことができます。

Lambda 関数によるレコードの事前処理は、次のシナリオで役立ちます。

  • 他のフォーマット (KPL や GZIP など) から Kinesis Data Analytics が分析できる形式にレコードを変換します。Kinesis Data Analytics は、現在 JSON データ形式または CSV データ形式をサポートしています。

  • 集計検出や異常検出などの操作でよりアクセスしやすい形式にデータを拡張します。たとえば、複数のデータ値が文字列にまとめて格納されている場合は、データを別々の列に拡張できます。

  • 外挿やエラー修正などの他の Amazon サービスによるデータの強化。

  • レコードのフィールドに複雑な文字列変換を適用します。

  • データをクリーンアップするためのデータフィルタリング。

レコードを事前処理するための Lambda 関数の使用

Kinesis Data Analytics アプリケーションを作成するときは、[ソースに接続] ページで Lambda 事前処理を有効にします。

Lambda 関数を使用して Kinesis Data Analytics アプリケーションでレコードを事前処理するには
  1. にサインイン AWS Management Console し、https://console.aws.amazon.com/kinesisanalytics で Managed Service for Apache Flink コンソールを開きます。

  2. アプリケーションの [ソースに接続] ページの [レコード事前処理] セクションで [有効化 AWS Lambda] を選択します。

  3. 既に作成した Lambda 関数を使用するには、[Lambda 関数] ドロップダウンリストで関数を選択します。

  4. Lambda 事前処理テンプレートの 1 つから新規の Lambda 関数を作成する場合は、ドロップダウンリストからテンプレートを選択します。次に、[View <template name> in Lambda (Lambda で <テンプレート名> を表示)] を選択して関数を編集します。

  5. 新しい Lambda 関数を作成するには、[新規作成] を選択します。Lambda 関数の作成の詳細については、「 AWS Lambda デベロッパーガイド」の HelloWorld 「Lambda 関数の作成」と「コンソールの検索」を参照してください。

  6. 使用する Lambda 関数のバージョンを選択します。最新のバージョンを使用するには、[$LATEST] を選択します。

レコードの事前処理に Lambda 関数を選択または作成すると、アプリケーションの SQL コードがレコードからスキーマを実行したり、アプリケーションがレコードからスキーマを生成したりする前に、レコードが事前処理されます。

Lambda 事前処理アクセス権限

Lambda 事前処理を使用するには、アプリケーションの IAM ロールに次のアクセス許可ポリシーが必要です。

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

Lambda 事前処理メトリクス

Amazon を使用して、Lambda 呼び出し、処理されたバイト数、成功と失敗の数などを CloudWatch モニタリングできます。Kinesis Data Analytics Lambda の前処理によって出力される CloudWatch メトリクスの詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。

Kinesis Producer Library AWS Lambda で を使用する

Kinesis Producer Library (KPL) は、小さなユーザーフォーマットレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットを有効に利用できます。Kinesis Client Library (KCL) for Java は、これらのレコードの集約解除をサポートしています。ただし、ストリームのコンシューマー AWS Lambda として を使用する場合は、特別なモジュールを使用してレコードの集約を解除する必要があります。

必要なプロジェクトコードと手順については、 の「 の Kinesis プロデューサーライブラリの集約解除モジュール AWS Lambda」を参照してください GitHub。このプロジェクトのコンポーネントを使用して、Java、Node.js、Python AWS Lambda の 内で KPL シリアル化されたデータを処理できます。これらのコンポーネントは、複数言語 KCL アプリケーションの一部として使用することもできます。

データ事前処理イベント入力データモデル / レコードレスポンスモデル

レコードを事前処理するには、Lambda 関数が、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

イベント入力データモデル

Kinesis Data Analytics は、Kinesis データストリームまたは Firehose 配信ストリームから継続的にデータを読み取ります。取得したレコードの各バッチが Lambda 関数にどのように渡されたか、サービスが管理しています。関数はレコードのリストを入力として受け取ります。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、事前処理要件 (データ形式の変換や強化など) を実行します。

前処理機能への入力モデルは、データが Kinesis データストリームから受信されたか、Firehose 配信ストリームから受信されたかによって若干異なります。

ソースが Firehose 配信ストリームの場合、イベント入力データモデルは次のとおりです。

Kinesis Data Firehose のリクエストデータモデル

フィールド 説明
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN)。
streamArn 配信ストリーム ARN
レコード
フィールド 説明
recordId レコード ID (ランダム GUID)
kinesisFirehoseRecordMetadata
フィールド 説明
approximateArrivalTimestamp 配信ストリームレコードの概算到着時間
data Base64 でエンコードされたソースレコードのペイロード

次の例は、Firehose 配信ストリームからの入力を示しています。

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

ソースが Kinesis データストリームの場合、イベント入力データモデルは次の通りです。

Kinesis ストリームのリクエストデータモデル

フィールド 説明
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーション ARN
streamArn 配信ストリーム ARN
レコード
フィールド 説明
recordId Kinesis レコードのシーケンス番号に基づいたレコード ID
kinesisStreamRecordMetadata
フィールド 説明
sequenceNumber Kinesis ストリームレコードからのシーケンス番号
partitionKey Kinesis ストリームレコードからのパーティションキー
shardId Kinesis ストリームレコードからの ShardId
approximateArrivalTimestamp 配信ストリームレコードの概算到着時間
データ Base64 でエンコードされたソースレコードのペイロード

次の例は、Kinesis データストリームからの入力を示しています。

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

レコードレスポンスモデル

Lambda 関数に送信された Lambda 事前処理関数 (レコード ID 付き) から返されたすべてのレコードは返される必要があります。レコードには次のパラメータが含まれている必要があります。含まれていない場合、Kinesis Data Analytics がレコードを拒否し、データ事前処理を失敗と見なします。レコードのデータペイロード部分は、事前処理要件を達成するために変換できます。

レスポンスデータモデル

レコード
フィールド 説明
recordId レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ事前処理の失敗として扱われます。
result レコードのデータ変換のステータス。指定できる値は以下のとおりです。
  • Ok: レコードが正常に変換されました。Kinesis Data Analytics は SQL 処理のレコードを取り込みます。

  • Dropped: レコードは処理ロジックによって意図的に削除されました。Kinesis Data Analytics は SQL 処理のレコードを削除します。データペイロードフィールドは、Dropped レコードではオプションです。

  • ProcessingFailed: レコードを変換できませんでした。Kinesis Data Analytics は、Lambda 関数の処理が失敗したとみなし、エラーストリームにエラーを書き込みます。エラーストリームの詳細については、「エラー処理」を参照してください。データペイロードフィールドは、ProcessingFailed レコードではオプションです。

data base64 エンコード後の変換されたデータペイロード。アプリケーションの取り込みデータ形式が JSON である場合、各データペイロードには複数の JSON ドキュメントを含めることができます。または、アプリケーションの取り込みデータ形式が CSV である場合、それぞれに複数の CSV 行を含めることができます (各行には行の区切り文字が入ります)。Kinesis Data Analyticsサービスは、同じデータペイロード内の複数の JSON ドキュメントまたは CSV 行のいずれかを使用して、データを正常に解析して処理します。

次の例は、Lambda 関数からの出力を示しています。

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

一般的なデータ事前処理の失敗

事前処理が失敗する一般的な理由は次のとおりです。

  • Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。

  • レスポンスにレコード ID、ステータス、データペイロードフィールドのいずれかが欠落しています。データペイロードフィールドは、Dropped または ProcessingFailed レコードの場合はオプションです。

  • Lambda 関数のタイムアウトが、データを事前処理するのに十分ではありません。

  • Lambda 関数のレスポンスが、 AWS Lambda サービスによって定められたレスポンスの上限を超えています。

データの事前処理が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。次の CloudWatch メトリクスをモニタリングして、障害に関するインサイトを得ることができます。

  • Kinesis Data Analytics アプリケーション (MillisBehindLatest): アプリケーションの読み取りがストリーミングソースからどれだけ離れているかを示します。

  • Kinesis Data Analytics アプリケーションInputPreprocessing CloudWatch メトリクス: 成功と失敗の数、およびその他の統計を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。

  • AWS Lambda 関数の CloudWatch メトリクスとログ。