チュートリアル #1: AWS CLI を使用した Amazon DynamoDB と AWS Lambda での、フィルターを使ったすべてのイベントの処理 - Amazon DynamoDB

チュートリアル #1: AWS CLI を使用した Amazon DynamoDB と AWS Lambda での、フィルターを使ったすべてのイベントの処理

このチュートリアルでは、AWS Lambda トリガーを作成して、DynamoDB テーブルからのストリーミングを処理します。

このチュートリアルのシナリオは、シンプルなソーシャルネットワークである Woofer です。Woofer ユーザーは、他の Woofer ユーザーに送信される bark (短いテキストメッセージ) を使用して通信します。次の図は、このアプリケーションのコンポーネントとワークフローを示しています。

DynamoDB テーブル、ストリームレコード、Lambda 関数、および Amazon SNS トピックの Woofer アプリケーションワークフロー。
  1. ユーザーは DynamoDB テーブル (BarkTable) に項目を書き込みます。テーブルの各項目は bark を表します。

  2. 新しいストリームレコードが書き込まれ、新しい項目が BarkTable に追加されたことを反映します。

  3. 新しいストリームレコードは AWS Lambda 関数 (publishNewBark) をトリガーします。

  4. ストリーミングレコードに、新しい項目が BarkTable に追加されたことが示された場合、Lambda 関数はストリーミングレコードからデータを読み込み、Amazon Simple Notification Service (Amazon SNS) のトピックにメッセージを発行します。

  5. メッセージは Amazon SNS トピックの受信者によって受信されます (このチュートリアルでは、唯一の受信者は E メールアドレスです)。

開始する前に

このチュートリアルでは AWS Command Line Interface AWS CLI を使用します。まだ行っていない場合は、「AWS Command Line Interface ユーザーガイド」の手順に従って、AWS CLI をインストールおよび設定します。

ステップ 1: ストリーミングが有効になった DynamoDB テーブルを作成する

このステップでは、Woofer ユーザーからのすべての bark を保存する DynamoDB テーブル (BarkTable) を作成します。プライマリキーは、Username (パーティションキー) と Timestamp (ソートキー) で構成されます。これらの属性は両方とも文字列型になります。

BarkTable ではストリームが有効になっています。このチュートリアルの後半では、AWS Lambda 関数をストリームと関連付けてトリガーを作成します。

  1. 次のコマンドを入力して、テーブルを作成します。

    aws dynamodb create-table \ --table-name BarkTable \ --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \ --key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
  2. 出力で、LatestStreamArn を探します。

    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...

    regionaccountID をメモしておきます。これらは、このチュートリアルの他のステップで必要になります。

ステップ 2: Lambda 実行ロールを作成する

このステップでは、AWS Identity and Access Management (IAM) ロール (WooferLambdaRole) を作成し、それにアクセス権限を割り当てます。このロールは、ステップ 4: Lambda 関数を作成してテストする で作成する Lambda 関数で使用されます。

また、ロールのポリシーを作成します。このポリシーには、Lambda 関数がランタイム時に必要とするすべてのアクセス許可が含まれます。

  1. 次の内容で、trust-relationship.json というファイルを作成します。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
  2. WooferLambdaRole を作成するため、以下のコマンドを入力します。

    aws iam create-role --role-name WooferLambdaRole \ --path "/service-role/" \ --assume-role-policy-document file://trust-relationship.json
  3. 次の内容で、role-policy.json というファイルを作成します。(region および accountID をお客様の AWS リージョンとアカウント ID に置き換えます。)

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:accountID:*" }, { "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/*" }, { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "*" ] } ] }

    ポリシーには 4 つのステートメントがあり、これにより WooferLambdaRole は以下を実行できます。

    • Lambda 関数 (publishNewBark) を実行します。このチュートリアルの後半で、この関数を作成します。

    • Amazon CloudWatch Logs のアクセス。Lambda 関数はランタイム時に診断を CloudWatch Logs に書き込みます。

    • BarkTable の DynamoDB Streams からデータを読み込みます。

    • Amazon SNS にメッセージを公開します。

  4. 次のコマンドを入力して、WooferLambdaRole にポリシーをアタッチします。

    aws iam put-role-policy --role-name WooferLambdaRole \ --policy-name WooferLambdaRolePolicy \ --policy-document file://role-policy.json

ステップ 3: Amazon SNS トピックを作成する

このステップでは、Amazon SNSトピック (wooferTopic) を作成し、そのトピックに E メールアドレスをサブスクライブします。Lambda 関数はこのトピックを使用して、Woofer ユーザーからの新しい bark を公開します。

  1. 次のコマンドを入力して、新しい Amazon SNS トピックを作成します。

    aws sns create-topic --name wooferTopic
  2. 次のコマンドを入力して、wooferTopic に E メールアドレスをサブスクライブします (region および accountID は AWS リージョンとアカウント ID に置き換え、example@example.com は有効な E メールアドレスと置き換えます)。

    aws sns subscribe \ --topic-arn arn:aws:sns:region:accountID:wooferTopic \ --protocol email \ --notification-endpoint example@example.com
  3. Amazon SNS は E メールアドレスに確認メッセージを送信します。そのメッセージの [サブスクリプションを確認] リンクを選択して、サブスクリプションプロセスを完了します。

ステップ 4: Lambda 関数を作成してテストする

このステップでは、AWS Lambda 関数 (publishNewBark) を作成して BarkTable からのストリームレコードを処理します。

publishNewBark 関数は、BarkTable の新しい項目に対応するストリームイベントのみを処理します。この関数は、そのようなイベントからデータを読み取ってから、Amazon SNS をコールしてデータを公開します。

  1. 次の内容で、publishNewBark.js というファイルを作成します。region および accountID をお客様の AWS リージョンとアカウント ID に置き換えます。

    'use strict'; var AWS = require("aws-sdk"); var sns = new AWS.SNS(); exports.handler = (event, context, callback) => { event.Records.forEach((record) => { console.log('Stream record: ', JSON.stringify(record, null, 2)); if (record.eventName == 'INSERT') { var who = JSON.stringify(record.dynamodb.NewImage.Username.S); var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S); var what = JSON.stringify(record.dynamodb.NewImage.Message.S); var params = { Subject: 'A new bark from ' + who, Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what, TopicArn: 'arn:aws:sns:region:accountID:wooferTopic' }; sns.publish(params, function(err, data) { if (err) { console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2)); } else { console.log("Results from sending message: ", JSON.stringify(data, null, 2)); } }); } }); callback(null, `Successfully processed ${event.Records.length} records.`); };
  2. publishNewBark.js を含める zip ファイルを作成します。zip コマンドラインユーティリティがある場合は、次のコマンドを入力してこれを行うことができます。

    zip publishNewBark.zip publishNewBark.js
  3. Lambda 関数を作成する場合は、ステップ 2: Lambda 実行ロールを作成する で作成した WooferLambdaRole の Amazon リソースネーム (ARN) を指定します。この ARN を取得するには、次のコマンドを入力します。

    aws iam get-role --role-name WooferLambdaRole

    出力で、WooferLambdaRole の ARN を探します。

    ... "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole" ...

    次のコマンドを入力して、Lambda 関数を作成します。roleARNWooferLambdaRole の ARN に置き換えます。

    aws lambda create-function \ --region region \ --function-name publishNewBark \ --zip-file fileb://publishNewBark.zip \ --role roleARN \ --handler publishNewBark.handler \ --timeout 5 \ --runtime nodejs16.x
  4. ここで、publishNewBark をテストして、これが動作することを確認します。これを行うには、DynamoDB Streams の実際のレコードに似た情報を入力します。

    次の内容で、payload.json というファイルを作成します。region および accountID をお客様の AWS リージョンとアカウント ID に置き換えます。

    { "Records": [ { "eventID": "7de3041dd709b024af6f29e4fa13d34c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "region", "dynamodb": { "ApproximateCreationDateTime": 1479499740, "Keys": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Username": { "S": "John Doe" } }, "NewImage": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Message": { "S": "This is a bark from the Woofer social network" }, "Username": { "S": "John Doe" } }, "SequenceNumber": "13021600000000001596893679", "SizeBytes": 112, "StreamViewType": "NEW_IMAGE" }, "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104" } ] }

    次のコマンドを入力して、publishNewBark 関数をテストします。

    aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt

    テストが成功すると、次の出力が表示されます。

    { "StatusCode": 200, "ExecutedVersion": "$LATEST" }

    さらに、output.txt ファイルには次のテキストが含まれます。

    "Successfully processed 1 records."

    また、数分以内に新しい E メールメッセージが届きます。

    注記

    AWS Lambda は、Amazon CloudWatch Logs に診断情報を書き込みます。Lambda 関数でエラーが発生した場合、この診断情報をトラブルシューティングに使用できます。

    1. CloudWatch コンソール (https://console.aws.amazon.com/cloudwatch/) を開きます。

    2. ナビゲーションペインで [ログ] を選択します。

    3. 次のロググループを選択: /aws/lambda/publishNewBark

    4. 最新のログストリーミングを選択して、関数からの出力 (およびエラー) を表示します。

ステップ 5: トリガーを作成してテストする

ステップ 4: Lambda 関数を作成してテストする で、Lambda 関数をテストして、正しく実行されたことを確認しました。このステップでは、Lambda 関数 (publishNewBark) をイベントソース (BarkTable ストリーミング) に関連付けることでトリガーを作成します。

  1. トリガーを作成する場合、BarkTable ストリーム用の ARN を指定する必要があります。この ARN を取得するには、次のコマンドを入力します。

    aws dynamodb describe-table --table-name BarkTable

    出力で、LatestStreamArn を探します。

    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...
  2. 次のコマンドを入力してトリガーを作成します streamARN を実際のストリーム ARN に置き換えます。

    aws lambda create-event-source-mapping \ --region region \ --function-name publishNewBark \ --event-source streamARN \ --batch-size 1 \ --starting-position TRIM_HORIZON
  3. トリガーをテストします。次のコマンドを入力して、BarkTable に項目を追加します。

    aws dynamodb put-item \ --table-name BarkTable \ --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}

    数分以内に新しい E メールメッセージが届きます。

  4. DynamoDB コンソールを開き、さらにいくつかの項目を BarkTable に追加します。Username および Timestamp 属性の値を指定する必要があります (必須ではないものの、Message の値を指定する必要があります)。BarkTable に追加した各項目について、新しい E メールメッセージが届きます。

    Lambda 関数は、BarkTable に追加した新しい項目のみを処理します。テーブル内の項目を更新または削除すると、この関数は何も行いません。

注記

AWS Lambda は、Amazon CloudWatch Logs に診断情報を書き込みます。Lambda 関数でエラーが発生した場合、この診断情報をトラブルシューティングに使用できます。

  1. CloudWatch コンソール (https://console.aws.amazon.com/cloudwatch/) を開きます。

  2. ナビゲーションペインで [ログ] を選択します。

  3. 次のロググループを選択: /aws/lambda/publishNewBark

  4. 最新のログストリーミングを選択して、関数からの出力 (およびエラー) を表示します。