チュートリアル: Amazon Kinesis で AWS Lambda を使用する - AWS Lambda

チュートリアル: Amazon Kinesis で AWS Lambda を使用する

このチュートリアルでは、Kinesis データストリームのイベントを処理する Lambda 関数を作成します。

  1. カスタムアプリケーションがストリームにレコードを書き込みます。

  2. AWS Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。

  3. AWS Lambda は、Lambda 関数の作成時に指定した実行ロールを引き受けることにより、Lambda 関数を実行します。

Prerequisites

このチュートリアルでは、基本的な Lambda オペレーションと Lambda コンソールについてある程度の知識があることを前提としています。初めての方は、Lambda の開始方法 の手順に従って最初の Lambda 関数を作成してください。

次のステップを完了するには、コマンドを実行するコマンドラインターミナルまたはシェルが必要です。コマンドと予想される出力は、別々のブロックにリストされます。

aws --version

次のような出力が表示されます。

aws-cli/2.0.57 Python/3.7.4 Darwin/19.6.0 exe/x86_64

コマンドが長い場合、コマンドを複数行に分割するためにエスケープ文字 (\) が使用されます。

Linux および macOS では、任意のシェルとパッケージマネージャーを使用します。Windows 10 では、Linux 用の Windows サブシステムをインストールして、Windows 統合バージョンの Ubuntu および Bash を入手できます。

実行ロールを作成する

AWS リソースにアクセスするためのアクセス権限を関数に付与する実行ロールを作成します。

実行ロールを作成するには

  1. IAM コンソールの [roles page (ロールページ)] を開きます。

  2. [ロールの作成] を選択します。

  3. 次のプロパティでロールを作成します。

    • 信頼されたエンティティ - AWS Lambda

    • アクセス許可 - AWSLambdaKinesisExecutionRole

    • Role namelambda-kinesis-role

AWSLambdaKinesisExecutionRole ポリシーには、Kinesis から項目を読み取り、CloudWatch Logs にログを書き込むために関数が必要とするアクセス許可があります。

関数を作成する

以下のコード例では、Kinesis イベント入力を受け取り、含まれるメッセージを処理します。例示のため、このコードでは受信イベントデータの一部を CloudWatch Logs に書き込みます。

注記

他の言語によるサンプルコードについては、「」を参照してくださいサンプル関数コード

例 index.js

console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };

関数を作成するには

  1. サンプルコードを index.js という名前のファイルにコピーします。

  2. デプロイパッケージを作成します。

    zip function.zip index.js
  3. create-function コマンドを使用して Lambda 関数を作成します。

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \ --role arn:aws:iam::123456789012:role/lambda-kinesis-role

Lambda 関数をテストする

invokeAWS Lambda CLI コマンドおよびサンプルの Kinesis イベントを使用して、手動で Lambda 関数を呼び出します。

Lambda 関数をテストするには

  1. 以下の JSON をファイルにコピーし、input.txt という名前で保存します。

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
  2. invoke コマンドを使用して、関数にイベントを送信します。

    aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt

    AWS CLI バージョン 2 を使用している場合、cli-binary-format オプションは必須です。このオプションは、AWS CLI 設定ファイルで設定することもできます。

    レスポンスは out.txt に保存されます。

Kinesis Stream を作成する

create-stream コマンドを使用して、スキーマを作成します。

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

次の describe-stream コマンドを実行して、ストリーム ARN を取得します。

aws kinesis describe-stream --stream-name lambda-stream

次のような出力が表示されます。

{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }

次のステップで Lambda 関数にストリームを関連付けるために、ストリーム ARN を使用します。

AWS Lambda でイベントソースを追加する

次の AWS CLI add-event-source コマンドを実行します。

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

後で使用するために、マッピング ID をメモしておきます。list-event-source-mappings コマンドを実行して、イベントソースマッピングのリストを取得できます。

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream

レスポンスでは、ステータス値が enabled であることを確認できます。イベントソースマッピングを無効にすると、レコードを失うことなくポーリングを一時停止できます。

セットアップをテストする

イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加します。--data 値は、文字列を Kinesis に送信する前に CLI で base64 にエンコードされる文字列です。同じコマンドを複数回実行して、複数のレコードをストリームに追加することができます。

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda は実行ロールを使用して、ストリームからレコードを読み取ります。次に、Lambda 関数を呼び出し、レコードのバッチを渡します。この関数は、各レコードからデータをデコードしてログ記録し、出力を CloudWatch Logs に送信します。CloudWatch コンソールでログを表示する

リソースのクリーンアップ

このチュートリアル用に作成したリソースは、保持を希望しない場合、すぐに削除できます。使用しなくなった AWS リソースを削除することで、AWS アカウントに請求される料金が発生しないようにできます。

実行ロールを削除するには

  1. IAM コンソールの [Roles (ロール)] ページを開きます。

  2. 作成した実行ロールを選択します。

  3. [ロールの削除] を選択します。

  4. [はい、削除します] を選択します。

Lambda 関数を削除するには

  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. 作成した関数を選択します。

  3. [アクション] を選択してから、[削除] をクリックします。

  4. [削除] を選択します。

Kinesis ストリームを削除するには

  1. AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis) を開きます。

  2. 作成したストリームを選択します。

  3. [ Actions] で、[Delete ] を選択します。

  4. テキストボックスに「delete」と入力します。

  5. [削除] を選択します。