Lambda Telemetry API - AWS Lambda

Lambda Telemetry API

Telemetry API を使用することで、拡張機能はテレメトリデータを Lambda から直接受信できます。Lambda は、関数の初期化および呼び出し中に、ログ、プラットフォームメトリクス、およびプラットフォームトレースなどのテレメトリを自動的に取得します。Telemetry API により、拡張機能はこのテレメトリデータを Lambda からほぼリアルタイムで直接取得できます。

Lambda 実行環境内で、Lambda 拡張機能をテレメトリストリームにサブスクライブできます。サブスクライブ後、Lambda は自動的にすべてのテレメトリデータを拡張機能に送信します。そのデータを処理、フィルタリング、送信し、Amazon Simple Storage Service (Amazon S3) バケットや、サードパーティのオブザーバビリティツールプロバイダーなどの目的の宛先に配信することができます。

以下の図は、Extensions API と Telemetry API が、実行環境内から拡張機能を Lambda にリンクする方法を示しています。さらに Runtime API も、ランタイムと関数を Lambda に接続します。

Lambda を実行環境内のプロセスに接続する Extensions API、Telemetry API、および Runtime API。
重要

Lambda Telemetry API は、Lambda Log API に取って代わる API です。Logs API は引き続き完全に機能しますが、今後は Telemetry API のみを使用することをお勧めします。拡張機能は、Telemetry API または Logs API のいずれかを使用して、テレメトリストリームにサブスクライブできます。これらの API のいずれかを使用してサブスクライブした後で、もう一方の API を使用してサブスクライブしようとすると、エラーが返されます。

拡張機能は、Telemetry API を使用して 3 つの異なるテレメトリストリームにサブスクライブできます。

  • プラットフォームテレメトリ – 実行環境ランタイムライフサイクル、拡張機能ライフサイクル、および関数の呼び出しに関連するイベントとエラーを説明するログ、メトリクス、およびトレース。

  • 関数ログ – Lambda 関数コードが生成するカスタムログ。

  • 拡張機能ログ – Lambda 拡張機能コードが生成するカスタムログ。

注記

Lambda は、拡張機能がテレメトリストリームにサブスクライブしている場合でも CloudWatch、ログとメトリクスを に送信し、トレースを X-Ray (トレースを有効にしている場合) に送信します。

Telemetry API を使用した拡張機能の作成

Lambda 拡張機能は、実行環境で独立したプロセスとして実行されます。拡張機能は、関数の呼び出しが完了した後も引き続き実行できます。拡張機能は別個のプロセスであるため、関数コードとは異なる言語で記述することができます。拡張機能は、Golang や Rust などのコンパイルされた言語を使用して記述することが推奨されます。そうすることで、拡張機能は、サポートされているランタイムとの互換性がある自己完結型のバイナリになります。

以下の図は、Telemetry API を使用してテレメトリデータを受信し、処理する拡張機能を作成するための 4 ステッププロセスを説明しています。

拡張機能を登録し、リスナーを作成して、ストリームにサブスクライブしてから、テレメトリを取得する。

以下は、各ステップの詳しい説明です。

  1. Lambda 拡張機能 API を使用して拡張機能を登録します。これによって、この後のステップで必要になる Lambda-Extension-Identifier が提供されます。拡張機能の登録方法に関する詳細については、「拡張機能の登録」を参照してください。

  2. テレメトリリスナーを作成します。これは、基本的な HTTP または TCP サーバーにすることができます。Lambda は、テレメトリリスナーの URI を使用してテレメトリデータを拡張機能に送信します。詳細については、「テレメトリリスナーの作成」を参照してください。

  3. Telemetry API 内の Subscribe API を使用して、拡張機能を目的のテレメトリストリームにサブスクライブします。このステップには、テレメトリリスナーの URI が必要になります。詳細については、「Telemetry API へのサブスクリプションリクエストの送信」を参照してください。

  4. テレメトリリスナー経由で Lambda からテレメトリデータを取得します。このデータには、Amazon S3、または外部のオブザーバビリティサービスへのデータのディスパッチなど、あらゆるカスタム処理を実行できます。

注記

Lambda 関数の実行環境は、そのライフサイクルの一環として、複数回起動および停止できます。一般的に、拡張機能コードは関数の呼び出し中に実行されるとともに、シャットダウンフェーズ中にも最大 2 秒間実行されます。テレメトリがリスナーに届いた時点でバッチ処理することをお勧めします。次に、Invoke および Shutdown ライフサイクルイベントを使用して、各バッチを目的の送信先に送信します。

拡張機能の登録

テレメトリデータにサブスクライブする前に、Lambda 拡張機能を登録する必要があります。登録は、拡張機能の初期化フェーズ中に行われます。以下は、拡張機能を登録するための HTTP リクエストの例です。

POST http://${AWS_LAMBDA_RUNTIME_API}/2020-01-01/extension/register Lambda-Extension-Name: lambda_extension_name { 'events': [ 'INVOKE', 'SHUTDOWN'] }

リクエストが成功すると、サブスクライバーは HTTP 200 成功レスポンスを受信します。レスポンスヘッダーには、Lambda-Extension-Identifier が含まれています。レスポンスボディには、関数のその他のプロパティが含まれています。

HTTP/1.1 200 OK Lambda-Extension-Identifier: a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 { "functionName": "lambda_function", "functionVersion": "$LATEST", "handler": "lambda_handler", "accountId": "123456789012" }

詳細については、「拡張機能 API リファレンス」を参照してください。

テレメトリリスナーの作成

Lambda 拡張機能には、Telemetry API からの受信リクエストを処理するリスナーが必要です。以下のコードは、Golang でのテレメトリリスナーの実装例です。

// Starts the server in a goroutine where the log events will be sent func (s *TelemetryApiListener) Start() (string, error) { address := listenOnAddress() l.Info("[listener:Start] Starting on address", address) s.httpServer = &http.Server{Addr: address} http.HandleFunc("/", s.http_handler) go func() { err := s.httpServer.ListenAndServe() if err != http.ErrServerClosed { l.Error("[listener:goroutine] Unexpected stop on Http Server:", err) s.Shutdown() } else { l.Info("[listener:goroutine] Http Server closed:", err) } }() return fmt.Sprintf("http://%s/", address), nil } // http_handler handles the requests coming from the Telemetry API. // Everytime Telemetry API sends log events, this function will read them from the response body // and put into a synchronous queue to be dispatched later. // Logging or printing besides the error cases below is not recommended if you have subscribed to // receive extension logs. Otherwise, logging here will cause Telemetry API to send new logs for // the printed lines which may create an infinite loop. func (s *TelemetryApiListener) http_handler(w http.ResponseWriter, r *http.Request) { body, err := ioutil.ReadAll(r.Body) if err != nil { l.Error("[listener:http_handler] Error reading body:", err) return } // Parse and put the log messages into the queue var slice []interface{} _ = json.Unmarshal(body, &slice) for _, el := range slice { s.LogEventsQueue.Put(el) } l.Info("[listener:http_handler] logEvents received:", len(slice), " LogEventsQueue length:", s.LogEventsQueue.Len()) slice = nil }

宛先プロトコルの指定

Telemetry API を使用してテレメトリを受信するためにサブスクライブするときは、宛先 URI だけでなく、宛先プロトコルも指定できます。

{ "destination": { "protocol": "HTTP", "URI": "http://sandbox.localdomain:8080" } }

Lambda は、テレメトリの受信のために 2 つのプロトコルを受け入れます。

  • HTTP (推奨) – Lambda は、テレメトリを JSON 形式のレコードの配列としてローカル HTTP エンドポイント (http://sandbox.localdomain:${PORT}/${PATH}) に配信します。$PATH パラメータはオプションです。Lambda は HTTP のみをサポートし、HTTPS はサポートしません。Lambda は、POST リクエストを通じてテレメトリを配信します。

  • TCP – Lambda は、テレメトリを改行区切りの JSON (NDJSON) 形式で TCP ポートに配信します。

注記

TCP ではなく、HTTP を使用することが強く推奨されます。TCP を使用する場合、Lambda プラットフォームはテレメトリをアプリケーションレイヤーにいつ配信するかを確認できません。このため、拡張機能がクラッシュすると、ログが失われる可能性があります。HTTP にこの制限はありません。

サブスクライブしてテレメトリを受信する前に、ローカル HTTP リスナーまたは TCP ポートを確立します。セットアップ中は、以下の点に注意してください。

  • Lambda がログを送信するのは実行環境内の宛先のみです。

  • Lambda は、リスナーがない場合、または POST リクエストにエラーが発生した場合は、テレメトリの送信を再試行します (バックオフも実施)。テレメトリリスナーがクラッシュした場合は、Lambda が実行環境を再起動した後でテレメトリの受信を再開します。

  • Lambda はポート 9001 を予約しています。他のポート番号の制限や推奨事項はありません。

メモリの使用量とバッファリングの設定

実行環境でのメモリ使用量は、サブスクライバーの数に比例して直線的に増加します。各サブスクリプションがテレメトリデータを格納するために新しいメモリバッファを開始するため、サブスクリプションはメモリリソースを消費します。バッファメモリ使用量は、実行環境の全体的なメモリ消費量の一部としてカウントされます。

Telemetry API を使用してテレメトリを受信するようにサブスクライブするときは、テレメトリデータをバッファして、バッチ形式でサブスクライバーに配信できます。メモリの使用量を最適化できるように、バッファリング設定を指定することが可能です。

{ "buffering": { "maxBytes": 256*1024, "maxItems": 1000, "timeoutMs": 100 } }
バッファリング設定
パラメータ 説明 デフォルトと制限

maxBytes

メモリでバッファするテレメトリの最大量 (バイト単位)。

デフォルト: 262,144

最小: 262,144

最大: 1,048,576

maxItems

メモリでバッファするイベントの最大数。

デフォルト: 10,000

最小: 1,000

最大: 10,000

timeoutMs

バッチをバッファする最大時間 (ミリ秒単位)。

デフォルト: 1,000

最小: 25

最大: 30,000

バッファーを設定するときは、次の点に注意してください。

  • 入力ストリームのいずれかが閉じられている場合、Lambda はログをフラッシュします。これは、ランタイムがクラッシュした場合などに発生する可能性があります。

  • 各サブスクライバーは、サブスクリプションリクエストで独自のバッファリング設定をカスタマイズできます。

  • バッファリング設定のコンポーネントが maxBytes の場合、データを読み取るためのバッファーサイズを決定する際には、受信ペイロードのサイズを 2 * maxBytes + metadataBytes と想定してください。考慮すべき metadataBytes 量を判断するには、以下のメタデータを確認してください。Lambda は、以下のようなメタデータを各レコードに追加します。

    { "time": "2022-08-20T12:31:32.123Z", "type": "function", "record": "Hello World" }
  • サブスクライバーが着信テレメトリを十分な速さで処理できない場合、あるいは関数コードが大量のログボリュームを生成する場合、Lambda は、メモリ使用率を制限内に収めるためにレコードをドロップする可能性があります。この状況が発生すると、Lambda は platform.logsDropped イベントを送信します。

Telemetry API へのサブスクリプションリクエストの送信

Lambda 拡張機能は、Telemetry API にサブスクリプションリクエストを送信することで、テレメトリデータを受信するためにサブスクライブできます。サブスクリプションリクエストには、拡張機能がサブスクライブするイベントのタイプに関する情報が含まれている必要があります。これに加えて、リクエストには配信先情報バッファリング設定を含めることができます。

サブスクリプションリクエストを送信する前に、拡張機能 ID (Lambda-Extension-Identifier) が必要になります。Extensions API を使用して拡張機能を登録すると、API レスポンスから拡張機能 ID を取得できます。

サブスクリプションは、拡張機能の初期化フェーズ中に行われます。以下は、プラットフォームテレメトリ、関数ログ、および拡張機能ログの 3 つのテレメトリストリームすべてにサブスクライブするための HTTP リクエストの例です。

PUT http://${AWS_LAMBDA_RUNTIME_API}/2022-07-01/telemetry HTTP/1.1 { "schemaVersion": "2022-12-13", "types": [ "platform", "function", "extension" ], "buffering": { "maxItems": 1000, "maxBytes": 256*1024, "timeoutMs": 100 }, "destination": { "protocol": "HTTP", "URI": "http://sandbox.localdomain:8080" } }

リクエストが成功すると、サブスクライバーが HTTP 200 成功レスポンスを受信します。

HTTP/1.1 200 OK "OK"

インバウンド Telemetry API メッセージ

Telemetry API を使用してサブスクライブすると、拡張機能は POST リクエストで Lambda からのテレメトリ受信を自動的に開始します。各 POST リクエスト本文には Event オブジェクトの配列が含まれています。それぞれの Event には以下のスキーマがあります。

{ time: String, type: String, record: Object }
  • time プロパティは、Lambda プラットフォームがイベントを生成した時刻を定義します。これは、イベントが実際に発生した時刻とは異なります。time の文字列値は、ISO 8601 形式のタイムスタンプです。

  • type プロパティは、イベントタイプを定義します。以下の表には、可能な値のすべてが説明されています。

  • record プロパティは、テレメトリデータが含まれる JSON オブジェクトを定義します。この JSON オブジェクトのスキーマは、type に応じて異なります。

以下の表は、すべての Event オブジェクトタイプと、各イベントタイプの Telemetry API Event スキーマリファレンスへのリンクをまとめたものです。

Telemetry API メッセージタイプ
カテゴリ イベントタイプ 説明 イベントレコードスキーマ

プラットフォームイベント

platform.initStart

関数の初期化が開始されました。

platform.initStart スキーマ

プラットフォームイベント

platform.initRuntimeDone

関数の初期化が完了しました。

platform.initRuntimeDone スキーマ

プラットフォームイベント

platform.initReport

関数の初期化のレポートです。

platform.initReport スキーマ

プラットフォームイベント

platform.start

関数の呼び出しが開始されました。

platform.start スキーマ

プラットフォームイベント

platform.runtimeDone

ランタイムが、成功または失敗のいずれかでイベントの処理を終了しました。

platform.runtimeDone スキーマ

プラットフォームイベント

platform.report

関数の呼び出しのレポートです。

platform.report スキーマ

プラットフォームイベント

platform.restoreStart

ランタイムの復元が開始されました。

platform.restoreStart スキーマ

プラットフォームイベント

platform.restoreRuntimeDone

ランタイムの復元が完了しました。

platform.restoreRuntimeDone スキーマ

プラットフォームイベント

platform.restoreReport

ランタイムの復元のレポート。

platform.restoreReport スキーマ

プラットフォームイベント

platform.telemetrySubscription

拡張機能が Telemetry API にサブスクライブしました。

platform.telemetrySubscription スキーマ

プラットフォームイベント

platform.logsDropped

Lambda がログエントリをドロップしました。

platform.logsDropped スキーマ

関数ログ

function

関数コードからのログ行です。

function スキーマ

拡張ログ

extension

拡張コードからのログ行です。

extension スキーマ