出力としての Lambda 関数の使用 - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

出力としての Lambda 関数の使用

を送信先 AWS Lambda として使用すると、最終送信先に送信する前に、SQL 結果の後処理をより簡単に実行できます。一般的な後処理タスクには次のものがあります。

  • 複数の行を 1 つのレコードに集約する

  • 現在の結果と過去の結果を組み合わせて、遅れて届くデータに対処する

  • 情報のタイプに基づいて異なる送信先に配信する

  • レコード形式の変換 (Protobuf への変換など)

  • 文字列操作または変換

  • 分析処理後のデータの強化

  • 地理空間ユースケースのカスタム処理

  • データ暗号化

Lambda 関数は、次のようなさまざまな AWS サービスやその他の宛先に分析情報を配信できます。

Lambda アプリケーションの作成の詳細については、「AWS Lambdaのご利用開始にあたって」を参照してください。

出力許可としての Lambda

出力として Lambda を使用するには、アプリケーションの Lambda 出力 IAM ロールに次のアクセス許可ポリシーが必要です。

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "FunctionARN" }

出力メトリクスとしての Lambda

Amazon を使用して CloudWatch 、送信されたバイト数、成功と失敗などをモニタリングします。Lambda を出力として使用して Kinesis Data Analytics から出力される CloudWatch メトリクスについては、「Amazon Kinesis Analytics Metrics」を参照してください。

出力イベント入力データモデルおよびレコードレスポンスモデルとしての Lambda

Kinesis Data Analytics 出力レコードを送信する場合、Lambda 関数は、必要なイベント入力データおよびレコードレスポンスモデルに準拠している必要があります。

イベント入力データモデル

Kinesis Data Analytics は、次のリクエストモデルの出力関数として、アプリケーションから Lambda に出力レコードを継続的に送信します。関数内では、リストを繰り返し処理し、ビジネスロジックを適用して、出力要件 (最終的な送信先に送信する前のデータ変換など) を実行します。

フィールド 説明
invocationId Lambda 呼び出し ID (ランダム GUID)。
applicationArn Kinesis Data Analytics アプリケーションの Amazon リソースネーム (ARN)。
レコード
フィールド 説明
recordId レコード ID (ランダム GUID)
lambdaDeliveryRecordMetadata
フィールド 説明
retryHint 配信再試行回数
データ Base64 でエンコードされた出力レコードのペイロード
注記

retryHint は配信失敗ごとに増加する値です。この値は永続的に保持されず、アプリケーションが中断された場合にリセットされます。

レコードレスポンスモデル

出力関数として Ok に (レコード ID と共に) 送信される各レコードは、 または DeliveryFailed のどちらかで確認される必要があり、次のパラメータを含める必要があります。それ以外の場合、Kinesis Data Analytics はそれらを配信失敗として扱います。

レコード
フィールド 説明
recordId レコード ID は呼び出し時に Kinesis Data Analytics から Lambda に渡されます。元のレコードの ID と確認されたレコードの ID との不一致は、配信失敗として扱われます。
result レコード配信のステータス。以下の値を指定できます。
  • Ok: レコードは正常に変換され、最終的な送信先に送信されました。Kinesis Data Analytics は SQL 処理のレコードを取り込みます。

  • DeliveryFailed: レコードは Lambda によって出力関数として最終的な送信先に正常に配信されませんでした。Kinesis Data Analytics は失敗したレコードの出力関数としての Lambda への送信を継続的に再試行します。

Lambda 出力呼び出しの頻度

Kinesis Data Analytics アプリケーションは、出力レコードをバッファして AWS Lambda 宛先関数を頻繁に呼び出します。

  • データ分析アプリケーション内で、タンブリングウィンドウとして送信先アプリケーション内ストリームにレコードが発行されると、タンブリングウィンドウトリガーごとに AWS Lambda 送信先関数が呼び出されます。たとえば、タンブリングウィンドウを 60 秒に設定してレコードを宛先のアプリケーション内ストリームに出力すると、Lambda 関数は、60 秒ごとに 1 回呼び出されます。

  • アプリケーション内で連続するクエリまたはスライディングウィンドウとしてレコードがアプリケーション内ストリームに出力される場合、Lambda 宛先関数は約 1 秒に 1 回呼び出されます。

注記

Lambda 関数あたりの呼び出しリクエストのペイロードサイズの制限が適用されます。これらの制限を超えると、出力レコードが分割され、複数の Lambda 関数呼び出しに分けて送信されます。

出力として使用するための Lambda 関数の追加

次の手順では、Kinesis Data Analytics アプリケーションの出力として Lambda 関数を追加する方法を示しています。

  1. にサインイン AWS Management Console し、https://console.aws.amazon.com/kinesisanalytics で Managed Service for Apache Flink コンソールを開きます。

  2. リストからアプリケーションを選択し、[Application details] を選択します。

  3. [宛先] セクションで、[Connect new destination] を選択します。

  4. [宛先] 項目に、[AWS Lambda 関数] を選択します。

  5. [ AWS Lambdaにレコードを配信] セクションで、既存の Lambda 関数とバージョンを選択するか、[新規作成] を選択します。

  6. 新しい Lambda 関数を作成する場合は、次の操作を行います。

    1. 提供されているいずれかのテンプレートのいずれかを選択します。詳細については、アプリケーションの送信先の Lambda 関数の作成

    2. [関数の作成] ページが新しいブラウザタブで開きます。[Name (名前)] ボックスで、関数にわかりやすい名前を付けます (例: myLambdaFunction)。

    3. アプリケーションの後処理機能のテンプレートを更新します。Lambda 関数作成の詳細については、AWS Lambda 開発者ガイドの入門ガイドを参照してください。

    4. Kinesis Data Analytics コンソールの [Lambda 関数] リストで、先ほど作成した Lambda 関数を選択します。Lambda 関数のバージョンは [$最新] を選択します。

  7. [In-application stream] セクションで、[Choose an existing in-application stream] を選択します。[In-application stream name] に、アプリケーションの出力ストリームを選択します。選択した出力ストリームからの結果は、Lambda 出力関数に送信されます。

  8. 残りのフォームはデフォルト値のままにして、[Save and continue] を選択します。

アプリケーションはアプリケーション内ストリームから Lambda 関数にレコードを送信するようになりました。デフォルトのテンプレートの結果は、Amazon CloudWatch コンソールで確認できます。AWS/KinesisAnalytics/LambdaDelivery.OkRecords メトリクスをモニタリングして、Lambda 関数に配信されるレコードの数を確認します。

出力エラーとしてよく見られる Lambda

以下は、Lambda 関数への配信が失敗する可能性のある一般的な理由です。

  • Lambda 関数に送信されるバッチのレコード (レコード ID 付き) の一部が Kinesis Data Analytics サービスに返されていません。

  • レスポンスにレコード ID、またはステータスフィールドのいずれかが欠落しています。

  • Lambda 関数のタイムアウトが Lambda 関数内のビジネスロジックを達成するのに十分ではありません。

  • Lambda 関数内のビジネスロジックは、すべてのエラーをキャッチしないため、処理されない例外のためにタイムアウトとバックプレッシャーが生じます。これらのメッセージは、「ポイズンピル」と呼ばれることが少なくありません。

データ配信が失敗した場合、Kinesis Data Analytics は、成功するまで同じレコードセットで Lambda 呼び出しを再試行し続けます。障害に関するインサイトを得るには、次の CloudWatch メトリクスをモニタリングできます。

  • 出力 CloudWatch メトリクスとしての Kinesis Data Analytics アプリケーション Lambda: 成功と失敗の数、およびその他の統計を示します。詳細については、「Amazon Kinesis Analytics Metrics」を参照してください。

  • AWS Lambda 関数の CloudWatch メトリクスとログ。