本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
教學課程: AWS Lambda 與 Amazon DynamoDB 串流搭配使用
在此教學課程中,您建立 Lambda 函數以從 Amazon DynamoDB Streams 中取用事件。
必要條件
此教學課程假設您具備基本的 Lambda 操作知識並了解 Lambda 主控台。若您尚未了解,請遵循 使用主控台建立一個 Lambda 函數 中的指示,建立您的第一個 Lambda 函數。
若要完成下列步驟,您需要 AWS Command Line Interface (AWS CLI) 版本 2。命令和預期的輸出會列在不同的區塊中:
aws --version
您應該會看到下列輸出:
aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2
對於長命令,逸出字元 (\
) 用於將命令分割為多行。
在 Linux 和 macOS 上,使用您偏好的 shell 和套件軟體管理工具。
在 Windows 中,作業系統的內建終端不支援您常與 Lambda 搭配使用的某些 Bash CLI 命令 (例如 zip
)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本,請安裝適用於 Linux 的 Windows 子系統。本指南中的 CLI 命令範例使用 Linux 格式。如果您使用的是 Windows CLI,必須重新格式化包含內嵌 JSON 文件的命令。
建立執行角色
建立可授與函數存取 AWS 資源之權限的執行角色。
若要建立執行角色
-
在 IAM 主控台中開啟 角色頁面。
-
選擇 建立角色。
-
建立具備下列屬性的角色。
AWSLambdaDynamoDBExecutionRole具有函數從 DynamoDB 讀取項目並將記錄寫入記錄所需的權限。 CloudWatch
建立函數
建立可處理您事件的 Lambda 函數。函數程式碼會將部分傳入事件資料寫入 CloudWatch 記錄檔。
- .NET
-
- AWS SDK for .NET
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 .NET 與 Lambda 一起使用 DynamoDB 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
public class Function
{
public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context)
{
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
foreach (var record in dynamoEvent.Records)
{
context.Logger.LogInformation($"Event ID: {record.EventID}");
context.Logger.LogInformation($"Event Name: {record.EventName}");
context.Logger.LogInformation(JsonSerializer.Serialize(record));
}
context.Logger.LogInformation("Stream processing complete.");
}
}
- Go
-
- SDK for Go V2
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Go 使用與 Lambda 一起使用 DynamoDB 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main
import (
"context"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-lambda-go/events"
"fmt"
)
func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) {
if len(event.Records) == 0 {
return nil, fmt.Errorf("received empty event")
}
for _, record := range event.Records {
LogDynamoDBRecord(record)
}
message := fmt.Sprintf("Records processed: %d", len(event.Records))
return &message, nil
}
func main() {
lambda.Start(HandleRequest)
}
func LogDynamoDBRecord(record events.DynamoDBEventRecord){
fmt.Println(record.EventID)
fmt.Println(record.EventName)
fmt.Printf("%+v\n", record.Change)
}
- Java
-
- 適用於 Java 2.x 的 SDK
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Java 與 Lambda 一起使用 DynamoDB 事件。
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
public class example implements RequestHandler<DynamodbEvent, Void> {
private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create();
@Override
public Void handleRequest(DynamodbEvent event, Context context) {
System.out.println(GSON.toJson(event));
event.getRecords().forEach(this::logDynamoDBRecord);
return null;
}
private void logDynamoDBRecord(DynamodbStreamRecord record) {
System.out.println(record.getEventID());
System.out.println(record.getEventName());
System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb()));
}
}
- JavaScript
-
- 適用於 JavaScript (v3) 的開發套件
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用使 Lambda. JavaScript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(record => {
logDynamoDBRecord(record);
});
};
const logDynamoDBRecord = (record) => {
console.log(record.eventID);
console.log(record.eventName);
console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
使用使 Lambda. TypeScript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
export const handler = async (event, context) => {
console.log(JSON.stringify(event, null, 2));
event.Records.forEach(record => {
logDynamoDBRecord(record);
});
}
const logDynamoDBRecord = (record) => {
console.log(record.eventID);
console.log(record.eventName);
console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`);
};
- PHP
-
- 適用於 PHP 的開發套件
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 PHP 與 Lambda 一起使用 DynamoDB 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php
# using bref/bref and bref/logger for simplicity
use Bref\Context\Context;
use Bref\Event\DynamoDb\DynamoDbEvent;
use Bref\Event\DynamoDb\DynamoDbHandler;
use Bref\Logger\StderrLogger;
require __DIR__ . '/vendor/autoload.php';
class Handler extends DynamoDbHandler
{
private StderrLogger $logger;
public function __construct(StderrLogger $logger)
{
$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/
public function handleDynamoDb(DynamoDbEvent $event, Context $context): void
{
$this->logger->info("Processing DynamoDb table items");
$records = $event->getRecords();
foreach ($records as $record) {
$eventName = $record->getEventName();
$keys = $record->getKeys();
$old = $record->getOldImage();
$new = $record->getNewImage();
$this->logger->info("Event Name:".$eventName."\n");
$this->logger->info("Keys:". json_encode($keys)."\n");
$this->logger->info("Old Image:". json_encode($old)."\n");
$this->logger->info("New Image:". json_encode($new));
// TODO: Do interesting work based on the new data
// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords items");
}
}
$logger = new StderrLogger();
return new Handler($logger);
- Python
-
- 適用於 Python (Boto3) 的 SDK
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用使用 Python 與 Lambda 一起使用動 DynamoDB 事件。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import json
def lambda_handler(event, context):
print(json.dumps(event, indent=2))
for record in event['Records']:
log_dynamodb_record(record)
def log_dynamodb_record(record):
print(record['eventID'])
print(record['eventName'])
print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
- Ruby
-
- 適用於 Ruby 的開發套件
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用紅寶石與 Lambda 一起使用 DynamoDB 事件。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def lambda_handler(event:, context:)
return 'received empty event' if event['Records'].empty?
event['Records'].each do |record|
log_dynamodb_record(record)
end
"Records processed: #{event['Records'].length}"
end
def log_dynamodb_record(record)
puts record['eventID']
puts record['eventName']
puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}"
end
- Rust
-
- 適用於 Rust 的 SDK
-
還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。
使用 Rust 與 Lambda 一起使用 DynamoDB 事件。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use lambda_runtime::{service_fn, tracing, Error, LambdaEvent};
use aws_lambda_events::{
event::dynamodb::{Event, EventRecord},
};
// Built with the following dependencies:
//lambda_runtime = "0.11.1"
//serde_json = "1.0"
//tokio = { version = "1", features = ["macros"] }
//tracing = { version = "0.1", features = ["log"] }
//tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
//aws_lambda_events = "0.15.0"
async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> {
let records = &event.payload.records;
tracing::info!("event payload: {:?}",records);
if records.is_empty() {
tracing::info!("No records found. Exiting.");
return Ok(());
}
for record in records{
log_dynamo_dbrecord(record);
}
tracing::info!("Dynamo db records processed");
// Prepare the response
Ok(())
}
fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{
tracing::info!("EventId: {}", record.event_id);
tracing::info!("EventName: {}", record.event_name);
tracing::info!("DynamoDB Record: {:?}", record.change );
Ok(())
}
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.without_time()
.init();
let func = service_fn(function_handler);
lambda_runtime::run(func).await?;
Ok(())
}
建立函數
-
將範本程式碼複製到名為 example.js
的檔案。
-
建立部署套件。
zip function.zip example.js
-
使用 create-function
命令建立一個 Lambda 函數。
aws lambda create-function --function-name ProcessDynamoDBRecords \
--zip-file fileb://function.zip --handler example.handler --runtime nodejs18.x \
--role arn:aws:iam::111122223333
:role/lambda-dynamodb-role
測試 Lambda 函數
在此步驟中,您可以使用 invoke
AWS Lambda CLI 命令和下列 DynamoDB 事件範例手動叫用 Lambda 函數。將以下內容複製到名為的檔案中input.txt
。
範例 input.txt
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"2",
"eventName":"MODIFY",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"222",
"SizeBytes":59,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"3",
"eventName":"REMOVE",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"333",
"SizeBytes":38,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
]
}
執行以下 invoke
命令。
aws lambda invoke --function-name ProcessDynamoDBRecords \
--cli-binary-format raw-in-base64-out \
--payload file://input.txt outputfile.txt
如果您使用的是 AWS CLI 版本 2,則需要此cli-binary-format選項。若要讓此成為預設的設定,請執行 aws configure set cli-binary-format raw-in-base64-out
。若要取得更多資訊,請參閱《AWS Command Line Interface 使用者指南第 2 版》中 AWS CLI 支援的全域命令列選項。
該函數會在回應本文中傳回字串 message
訊息。
在 outputfile.txt
檔案中確認輸出。
建立啟用串流的 DynamoDB 資料表
建立啟用串流的 Amazon DynamoDB 資料表。
建立 DynamoDB 資料表
-
開啟 DynamoDB 主控台。
-
選擇 建立資料表 。
-
根據下列設定建立資料表。
-
選擇建立。
啟用串流
-
開啟 DynamoDB 主控台。
-
選擇 資料表。
-
選擇 lambda-dynamodb-stream 資料表。
-
在 匯出與串流 下,選擇 DynamoDB 串流詳細資訊 。
-
選擇 Turn on (開啟)。
-
針對「視景類型」,選擇「僅關鍵屬性」。
-
選擇 [開啟串流]。
寫下串流 ARN。在下個步驟將串流與您的 Lambda 函數相關聯時會需要用到它。如需啟用串流的詳細資訊,請參閱使用 DynamoDB Streams 擷取資料表活動。
在中新增事件來源 AWS Lambda
在 AWS Lambda中建立事件來源映射。事件來源映射可將 DynamoDB 串流與您的 Lambda 函數相關聯。建立此事件來源對應之後, AWS Lambda 開始輪詢串流。
執行下列 AWS CLI create-event-source-mapping
命令。命令執行後,請記下 UUID。使用任何命令時,您會需要此 UUID 以參考到事件來源映射,例如當刪除事件來源映射的時候。
aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \
--batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn
這會在指定的 DynamoDB 串流和 Lambda 函數之間建立映射。您可以將 DynamoDB 串流與多個 Lambda 函數相關聯,也可以將相同的 Lambda 函數與多個串流相關聯。但是 Lambda 函數會共用其所共有之串流的讀取傳送量。
您可以執行下列命令來取得事件來源映射的清單。
aws lambda list-event-source-mappings
該清單傳回所有您建立的事件來源映射,並針對每個映射顯示 LastProcessingResult
和其他內容。此欄位可用在發生問題時,提供資訊豐富的訊息。諸如 No records processed
(表示尚 AWS Lambda 未開始輪詢或串流中沒有記錄) 和 OK
(表示 AWS Lambda 成功從串流讀取記錄並叫用 Lambda 函數) 等值表示沒有問題。如果發生問題,您會收到錯誤訊息。
如果您有許多事件來源映射,請使用函數式稱參數來縮小結果。
aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords
測試設定
測試體 end-to-end 驗。當您更新資料表時,DynamoDB 會將事件記錄寫入串流。當 AWS Lambda 輪詢串流時,若偵測到串流上的新記錄,便會透過傳送事件到函數來代表您調用 Lambda 函數。
-
在 DynamoDB 主控台上針對資料表新增、更新、刪除項目。DynamoDB 會將這些動作的記錄寫入串流。
-
AWS Lambda 輪詢串流,當它偵測到串流的更新時,它會傳入它在串流中找到的事件資料來叫用 Lambda 函數。
-
您的函數運行並在 Amazon 創建日誌 CloudWatch。您可以驗證 Amazon CloudWatch 主控台中報告的日誌。
清除您的資源
除非您想要保留為此教學課程建立的資源,否則您現在便可刪除。刪除您不再使用的 AWS 資源,您可以避免不必要的費用 AWS 帳戶。
若要刪除 Lambda 函數
-
開啟 Lambda 主控台中的 函數頁面。
-
選擇您建立的函數。
-
選擇 Actions (動作)、Delete (刪除)。
-
在文字輸入欄位中輸入 delete
,然後選擇 刪除 。
刪除執行角色
-
開啟 IAM 主控台中的 角色頁面 。
-
選取您建立的執行角色。
-
選擇 刪除 。
-
在文字輸入欄位中輸入角色的名稱,然後選擇 刪除 。
若要刪除 DynamoDB 資料表
-
開啟 DynamoDB 主控台的 資料表頁面 。
-
選取您建立的資料表。
-
選擇 刪除 。
-
在文字方塊中輸入 delete
。
-
選擇 刪除資料表 。