チュートリアル: Amazon Kinesis で AWS Lambda を使用する
このチュートリアルでは、Kinesis データストリームのイベントを処理する Lambda 関数を作成します。
-
カスタムアプリケーションがストリームにレコードを書き込みます。
-
AWS Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。
-
AWS Lambda は、Lambda 関数の作成時に指定した実行ロールを引き受けることにより、Lambda 関数を実行します。
前提条件
このチュートリアルでは、基本的な Lambda オペレーションと Lambda コンソールについてある程度の知識があることを前提としています。初めての方は、コンソールで Lambda の関数の作成 の手順に従って最初の Lambda 関数を作成してください。
次のステップを完了するには、コマンドを実行するコマンドラインターミナルまたはシェルが必要です。コマンドと予想される出力は、別々のブロックにリストされます。
aws --version
次のような出力が表示されます。
aws-cli/2.0.57 Python/3.7.4 Darwin/19.6.0 exe/x86_64
コマンドが長い場合、コマンドを複数行に分割するためにエスケープ文字 (\
) が使用されます。
Linux および macOS では、任意のシェルとパッケージマネージャーを使用します。
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (zip
など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、Windows Subsystem for Linux をインストール
実行ロールを作成する
AWS リソースにアクセスするためのアクセス許可を関数に付与する実行ロールを作成します。
実行ロールを作成するには
-
IAM コンソールの [roles page (ロールページ)
] を開きます。 -
[ロールの作成] を選択します。
-
次のプロパティでロールを作成します。
-
信頼されたエンティティ - AWS Lambda
-
アクセス許可 - AWSLambdaKinesisExecutionRole。
-
ロール名 –
lambda-kinesis-role
。
-
AWSLambdaKinesisExecutionRole ポリシーには、Kinesis から項目を読み取り、CloudWatch Logs にログを書き込むために関数が必要とするアクセス許可があります。
関数を作成する
以下のコード例では、Kinesis イベント入力を受け取り、含まれるメッセージを処理します。例示のため、このコードでは受信イベントデータの一部を CloudWatch Logs に書き込みます。
他の言語によるサンプルコードについては、「」を参照してくださいサンプル関数コード
例 index.js
console.log('Loading function'); exports.handler = function(event, context) { //console.log(JSON.stringify(event, null, 2)); event.Records.forEach(function(record) { // Kinesis data is base64 encoded so decode here var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii'); console.log('Decoded payload:', payload); }); };
関数を作成するには
-
サンプルコードを
index.js
という名前のファイルにコピーします。 -
デプロイパッケージを作成します。
zip function.zip index.js
-
create-function
コマンドを使用して Lambda 関数を作成します。aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs12.x \ --role arn:aws:iam::
123456789012
:role/lambda-kinesis-role
Lambda 関数をテストする
invoke
AWS Lambda CLI コマンドおよびサンプルの Kinesis イベントを使用して、手動で Lambda 関数を呼び出します。
Lambda 関数をテストするには
-
以下の JSON をファイルにコピーし、
input.txt
という名前で保存します。{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
-
invoke
コマンドを使用して、関数にイベントを送信します。aws lambda invoke --function-name ProcessKinesisRecords --payload file://input.txt out.txt
AWS CLI バージョン 2 を使用している場合、cli-binary-format オプションは必須です。これをデフォルト設定にするには、
aws configure set cli-binary-format raw-in-base64-out
を実行します。詳細については、AWS CLI でサポートされているグローバルコマンドラインオプションを参照してください。レスポンスは
out.txt
に保存されます。
Kinesis Stream を作成する
create-stream
コマンドを使用して、スキーマを作成します。
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
次の describe-stream
コマンドを実行して、ストリーム ARN を取得します。
aws kinesis describe-stream --stream-name lambda-stream
次のような出力が表示されます。
{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }
次のステップで Lambda 関数にストリームを関連付けるために、ストリーム ARN を使用します。
AWS Lambda でイベントソースを追加する
次の AWS CLI add-event-source
コマンドを実行します。
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream \ --batch-size 100 --starting-position LATEST
後で使用するために、マッピング ID をメモしておきます。list-event-source-mappings
コマンドを実行して、イベントソースマッピングのリストを取得できます。
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-west-2:123456789012:stream/lambda-stream
レスポンスでは、ステータス値が enabled
であることを確認できます。イベントソースマッピングを無効にすると、レコードを失うことなくポーリングを一時停止できます。
セットアップをテストする
イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加します。--data
値は、文字列を Kinesis に送信する前に CLI で base64 にエンコードされる文字列です。同じコマンドを複数回実行して、複数のレコードをストリームに追加することができます。
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."
Lambda は実行ロールを使用して、ストリームからレコードを読み取ります。次に、Lambda 関数を呼び出し、レコードのバッチを渡します。この関数は、各レコードからデータをデコードしてログ記録し、出力を CloudWatch Logs に送信します。CloudWatch コンソール
リソースのクリーンアップ
このチュートリアル用に作成したリソースは、保持を希望しない場合、すぐに削除できます。使用しなくなった AWS リソースを削除することで、AWS アカウントに請求される料金が発生しないようにできます。
実行ロールを削除するには
-
IAM コンソールの [Roles (ロール)] ページ
を開きます。 -
作成した実行ロールを選択します。
-
[ロールの削除] を選択します。
-
[はい、削除します] を選択します。
Lambda 関数を削除するには
-
Lambda コンソールの [Functions]
(関数) ページを開きます。 -
作成した関数を選択します。
-
[アクション] を選択してから、[削除] をクリックします。
-
[Delete] (削除) をクリックします。
Kinesis ストリームを削除するには
-
AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis
) を開きます。 -
作成したストリームを選択します。
-
[ Actions] で、[Delete ] を選択します。
-
テキストボックスに「
delete
」と入力します。 -
[Delete] (削除) をクリックします。