教學 #1: 使用篩選器來處理 Amazon DynamoDB 的所有事件,並 AWS Lambda 使用 AWS CLI - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學 #1: 使用篩選器來處理 Amazon DynamoDB 的所有事件,並 AWS Lambda 使用 AWS CLI

在本教學課程中,您將建立 AWS Lambda 觸發器來處理 DynamoDB 表格的串流。

本教學案例為 Woofer,簡易的社交網路。Woofer 使用者使用互相傳送的 barks (簡短的文字訊息) 進行通訊。下圖顯示此應用程式的元件和工作流程。

DynamoDB 表格、串流記錄、Lambda 函數和 Amazon SNS 主題的低音單元應用程式工作流程。
  1. 使用者將某個項目寫入 DynamoDB 資料表 (BarkTable)。資料表中的每個項目代表一個 bark。

  2. 寫入新串流紀錄即反映新的項目已新增至 BarkTable

  3. 新的流記錄觸發一個 AWS Lambda 函數(publishNewBark)。

  4. 如果該串流紀錄指出新的項目已新增至 BarkTable,則 Lambda 函數會從串流紀錄讀取資料,將訊息發佈至 Amazon Simple Notification Service (Amazon SNS) 中的主題。

  5. Amazon SNS 主題的訂閱者會收到此訊息。(在此教學中,唯一的訂閱者是電子郵件地址)。

開始之前

本自學課程使用 AWS Command Line Interface AWS CLI. 若您尚未執行此作業,請遵循《AWS Command Line Interface 使用者指南》中的說明安裝及設定 AWS CLI。

步驟 1:建立啟用串流的 DynamoDB 資料表

在此步驟中,您會建立 DynamoDB 資料表 (BarkTable) 存放 Woofer 使用者的所有 bark。主索引鍵包含 Username (分割區索引鍵) 和 Timestamp (排序索引鍵)。這些屬性的類型皆為字串。

BarkTable 已啟用串流。稍後在本教學課程中,您會透過將 AWS Lambda 函數與串流產生關聯來建立觸發程序。

  1. 輸入下列命令建立資料表。

    aws dynamodb create-table \ --table-name BarkTable \ --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \ --key-schema AttributeName=Username,KeyType=HASH AttributeName=Timestamp,KeyType=RANGE \ --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \ --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
  2. 在輸出中,尋找 LatestStreamArn

    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...

    記下 regionaccountID,因為您在本教學的其他步驟中會需要它們。

步驟 2:建立 Lambda 執行角色

在此步驟中,您會建立 AWS Identity and Access Management (IAM) 角色 (WooferLambdaRole) 並為其指派許可。這個角色會由您在 步驟 4:建立並測試 Lambda 函數 中建立的 Lambda 函數所使用。

您也要為該角色建立政策。此政策包含 Lambda 函數在執行時間所需要的全部許可。

  1. 使用下列內容建立名為 trust-relationship.json 的檔案。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
  2. 輸入下列命令建立 WooferLambdaRole

    aws iam create-role --role-name WooferLambdaRole \ --path "/service-role/" \ --assume-role-policy-document file://trust-relationship.json
  3. 使用下列內容建立名為 role-policy.json 的檔案。(替換region為您accountID的 AWS 地區和帳戶 ID。)

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:accountID:*" }, { "Effect": "Allow", "Action": [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams" ], "Resource": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/*" }, { "Effect": "Allow", "Action": [ "sns:Publish" ], "Resource": [ "*" ] } ] }

    政策有四個陳述式可讓 WooferLambdaRole 執行下列操作:

    • 執行 Lambda 函數 (publishNewBark)。您稍後要在本教學中建立此函數。

    • 訪問 Amazon CloudWatch 日誌。Lambda 函數會在執行時間將診斷檔寫入 CloudWatch 記錄檔。

    • BarkTable 的 DynamoDB 串流讀取資料。

    • 將訊息發佈到 Amazon SNS。

  4. 輸入下列命令將此政策連接至 WooferLambdaRole

    aws iam put-role-policy --role-name WooferLambdaRole \ --policy-name WooferLambdaRolePolicy \ --policy-document file://role-policy.json

步驟 3:建立 Amazon SNS 主題

在此步驟中,您要建立 Amazon SNS 主題 (wooferTopic),並用電子郵件地址訂閱此主題。您的 Lambda 函數會使用此主題發佈 Woofer 使用者的新 bark。

  1. 輸入以下命令來建立新的 Amazon SNS 主題。

    aws sns create-topic --name wooferTopic
  2. 輸入下列命令使用電子郵件地址訂閱 wooferTopic。(將 regionaccountID 更換為 AWS 區域和帳戶 ID,將 example@example.com 更換為有效的電子郵件地址。)

    aws sns subscribe \ --topic-arn arn:aws:sns:region:accountID:wooferTopic \ --protocol email \ --notification-endpoint example@example.com
  3. Amazon SNS 會將確認訊息傳送至電子郵件地址。選擇該郵件中的 Confirm subscription (確認訂閱) 連結,完成訂閱程序。

步驟 4:建立並測試 Lambda 函數

在此步驟中,您可以建立 AWS Lambda 函數 (publishNewBark) 來處理資料流記錄BarkTable

publishNewBark 函數只處理對應至 BarkTable 新項目的串流事件。此函數會讀取此種事件的資料,然後呼叫 Amazon SNS 來進行發佈。

  1. 使用下列內容建立名為 publishNewBark.js 的檔案。取代regionaccountID的 AWS 地區和帳戶 ID。

    'use strict'; var AWS = require("aws-sdk"); var sns = new AWS.SNS(); exports.handler = (event, context, callback) => { event.Records.forEach((record) => { console.log('Stream record: ', JSON.stringify(record, null, 2)); if (record.eventName == 'INSERT') { var who = JSON.stringify(record.dynamodb.NewImage.Username.S); var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S); var what = JSON.stringify(record.dynamodb.NewImage.Message.S); var params = { Subject: 'A new bark from ' + who, Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what, TopicArn: 'arn:aws:sns:region:accountID:wooferTopic' }; sns.publish(params, function(err, data) { if (err) { console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2)); } else { console.log("Results from sending message: ", JSON.stringify(data, null, 2)); } }); } }); callback(null, `Successfully processed ${event.Records.length} records.`); };
  2. 建立 zip 檔以包含 publishNewBark.js。如果您有 zip 命令列公用程式,即可輸入下列命令執行此作業。

    zip publishNewBark.zip publishNewBark.js
  3. 當您建立 Lambda 函數時,您要為自己在 步驟 2:建立 Lambda 執行角色 中建立的 WooferLambdaRole 指定 Amazon Resource Name (ARN)。輸入下列命令以擷取此 ARN。

    aws iam get-role --role-name WooferLambdaRole

    在輸出中,尋找 WooferLambdaRole 的 ARN。

    ... "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole" ...

    輸入下列命令建立 Lambda 函數。將 roleARN 取代為 WooferLambdaRole 的 ARN。

    aws lambda create-function \ --region region \ --function-name publishNewBark \ --zip-file fileb://publishNewBark.zip \ --role roleARN \ --handler publishNewBark.handler \ --timeout 5 \ --runtime nodejs16.x
  4. 現在,測試 publishNewBark 確認能否作用。若要執行此作業,您要提供類似 DynamoDB Streams 中真實紀錄的輸入。

    使用下列內容建立名為 payload.json 的檔案。accountID用您region的帳戶 ID 替換 AWS 區域 和。

    { "Records": [ { "eventID": "7de3041dd709b024af6f29e4fa13d34c", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "region", "dynamodb": { "ApproximateCreationDateTime": 1479499740, "Keys": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Username": { "S": "John Doe" } }, "NewImage": { "Timestamp": { "S": "2016-11-18:12:09:36" }, "Message": { "S": "This is a bark from the Woofer social network" }, "Username": { "S": "John Doe" } }, "SequenceNumber": "13021600000000001596893679", "SizeBytes": 112, "StreamViewType": "NEW_IMAGE" }, "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104" } ] }

    輸入下列命令測試 publishNewBark 函數。

    aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt

    如果測試成功,您就會看到以下輸出。

    { "StatusCode": 200, "ExecutedVersion": "$LATEST" }

    此外,output.txt 檔案還會包含以下文字。

    "Successfully processed 1 records."

    您也會在幾分鐘內收到新的電子郵件訊息。

    注意

    AWS Lambda 將診斷資訊寫入 Amazon CloudWatch 日誌。如果 Lambda 函數發生問題,您可以使用這些診斷進行疑難排解:

    1. 請在以下位置開啟 CloudWatch 主控台。 https://console.aws.amazon.com/cloudwatch/

    2. 在導覽窗格中,選擇 Logs (日誌)。

    3. 選擇下列的日誌群組:/aws/lambda/publishNewBark

    4. 選擇最新的日誌串流,檢視函數的輸出 (和錯誤)。

步驟 5:建立並測試觸發器

步驟 4:建立並測試 Lambda 函數 中,您已測試過 Lambda 函數,確保其能正確執行。在此步驟中,透過將 Lambda 函數 (publishNewBark) 與事件來源 (BarkTable串流) 建立關聯來建立觸發條件

  1. 當您建立觸發時,您需要指定 BarkTable 串流的 ARN。輸入下列命令以擷取此 ARN。

    aws dynamodb describe-table --table-name BarkTable

    在輸出中,尋找 LatestStreamArn

    ... "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp ...
  2. 輸入以下命令來建立觸發。將 streamARN 更換為實際的串流 ARN。

    aws lambda create-event-source-mapping \ --region region \ --function-name publishNewBark \ --event-source streamARN \ --batch-size 1 \ --starting-position TRIM_HORIZON
  3. 測試觸發。輸入下列命令將項目新增至 BarkTable

    aws dynamodb put-item \ --table-name BarkTable \ --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}

    您應該會在幾分鐘內收到新的電子郵件訊息。

  4. 開啟 DynamoDB 主控台,在 BarkTable 中多新增幾個項目。您必須指定 UsernameTimestamp 屬性的值 (您也應該指定 Message 的值,雖然它不是必要的)。您應該會因為每個新增至 BarkTable 的項目而收到新的電子郵件訊息。

    Lambda 函數只處理您新增至 BarkTable 的新項目。如果您要更新或刪除資料表中的項目,此函數不會執行任何作業。

注意

AWS Lambda 將診斷資訊寫入 Amazon CloudWatch 日誌。如果您的 Lambda 函數發生問題,您可以使用這些診斷進行疑難排解。

  1. 請在以下位置開啟 CloudWatch 主控台。 https://console.aws.amazon.com/cloudwatch/

  2. 在導覽窗格中,選擇 Logs (日誌)。

  3. 選擇下列的日誌群組:/aws/lambda/publishNewBark

  4. 選擇最新的日誌串流,檢視函數的輸出 (和錯誤)。