教學課程: AWS Lambda 與 Amazon DynamoDB 串流搭配使用 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程: AWS Lambda 與 Amazon DynamoDB 串流搭配使用

在此教學課程中,您建立 Lambda 函數以從 Amazon DynamoDB Streams 中取用事件。

必要條件

此教學課程假設您具備基本的 Lambda 操作知識並了解 Lambda 主控台。若您尚未了解,請遵循 使用主控台建立一個 Lambda 函數 中的指示,建立您的第一個 Lambda 函數。

若要完成下列步驟,您需要 AWS Command Line Interface (AWS CLI) 版本 2。命令和預期的輸出會列在不同的區塊中:

aws --version

您應該會看到下列輸出:

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

對於長命令,逸出字元 (\) 用於將命令分割為多行。

在 Linux 和 macOS 上,使用您偏好的 shell 和套件軟體管理工具。

注意

在 Windows 中,作業系統的內建終端不支援您常與 Lambda 搭配使用的某些 Bash CLI 命令 (例如 zip)。若要取得 Ubuntu 和 Bash 的 Windows 整合版本,請安裝適用於 Linux 的 Windows 子系統。本指南中的 CLI 命令範例使用 Linux 格式。如果您使用的是 Windows CLI,必須重新格式化包含內嵌 JSON 文件的命令。

建立執行角色

建立可授與函數存取 AWS 資源之權限的執行角色

若要建立執行角色
  1. 在 IAM 主控台中開啟 角色頁面

  2. 選擇 建立角色

  3. 建立具備下列屬性的角色。

    • 信任實體 - Lambda。

    • 權限AWSLambdaDynamoDBExecutionRole.

    • 角色名稱 - lambda-dynamodb-role

AWSLambdaDynamoDBExecutionRole具有函數從 DynamoDB 讀取項目並將記錄寫入記錄所需的權限。 CloudWatch

建立函數

建立可處理您事件的 Lambda 函數。函數程式碼會將部分傳入事件資料寫入 CloudWatch 記錄檔。

.NET
AWS SDK for .NET
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 .NET 與 Lambda 一起使用 DynamoDB 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); foreach (var record in dynamoEvent.Records) { context.Logger.LogInformation($"Event ID: {record.EventID}"); context.Logger.LogInformation($"Event Name: {record.EventName}"); context.Logger.LogInformation(JsonSerializer.Serialize(record)); } context.Logger.LogInformation("Stream processing complete."); } }
Go
SDK for Go V2
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Go 使用與 Lambda 一起使用 DynamoDB 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/events" "fmt" ) func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) { if len(event.Records) == 0 { return nil, fmt.Errorf("received empty event") } for _, record := range event.Records { LogDynamoDBRecord(record) } message := fmt.Sprintf("Records processed: %d", len(event.Records)) return &message, nil } func main() { lambda.Start(HandleRequest) } func LogDynamoDBRecord(record events.DynamoDBEventRecord){ fmt.Println(record.EventID) fmt.Println(record.EventName) fmt.Printf("%+v\n", record.Change) }
Java
適用於 Java 2.x 的 SDK
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 與 Lambda 一起使用 DynamoDB 事件。

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; import com.google.gson.Gson; import com.google.gson.GsonBuilder; public class example implements RequestHandler<DynamodbEvent, Void> { private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); @Override public Void handleRequest(DynamodbEvent event, Context context) { System.out.println(GSON.toJson(event)); event.getRecords().forEach(this::logDynamoDBRecord); return null; } private void logDynamoDBRecord(DynamodbStreamRecord record) { System.out.println(record.getEventID()); System.out.println(record.getEventName()); System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb())); } }
JavaScript
適用於 JavaScript (v3) 的開發套件
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用使 Lambda. JavaScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); }; const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };

使用使 Lambda. TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); } const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };
PHP
適用於 PHP 的開發套件
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 PHP 與 Lambda 一起使用 DynamoDB 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\DynamoDb\DynamoDbHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends DynamoDbHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleDynamoDb(DynamoDbEvent $event, Context $context): void { $this->logger->info("Processing DynamoDb table items"); $records = $event->getRecords(); foreach ($records as $record) { $eventName = $record->getEventName(); $keys = $record->getKeys(); $old = $record->getOldImage(); $new = $record->getNewImage(); $this->logger->info("Event Name:".$eventName."\n"); $this->logger->info("Keys:". json_encode($keys)."\n"); $this->logger->info("Old Image:". json_encode($old)."\n"); $this->logger->info("New Image:". json_encode($new)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords items"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
適用於 Python (Boto3) 的 SDK
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用使用 Python 與 Lambda 一起使用動 DynamoDB 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
Ruby
適用於 Ruby 的開發套件
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用紅寶石與 Lambda 一起使用 DynamoDB 事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) return 'received empty event' if event['Records'].empty? event['Records'].each do |record| log_dynamodb_record(record) end "Records processed: #{event['Records'].length}" end def log_dynamodb_record(record) puts record['eventID'] puts record['eventName'] puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}" end
Rust
適用於 Rust 的 SDK
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Rust 與 Lambda 一起使用 DynamoDB 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::dynamodb::{Event, EventRecord}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> { let records = &event.payload.records; tracing::info!("event payload: {:?}",records); if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_dynamo_dbrecord(record); } tracing::info!("Dynamo db records processed"); // Prepare the response Ok(()) } fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{ tracing::info!("EventId: {}", record.event_id); tracing::info!("EventName: {}", record.event_name); tracing::info!("DynamoDB Record: {:?}", record.change ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
建立函數
  1. 將範本程式碼複製到名為 example.js 的檔案。

  2. 建立部署套件。

    zip function.zip example.js
  3. 使用 create-function 命令建立一個 Lambda 函數。

    aws lambda create-function --function-name ProcessDynamoDBRecords \ --zip-file fileb://function.zip --handler example.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-dynamodb-role

測試 Lambda 函數

在此步驟中,您可以使用 invoke AWS Lambda CLI 命令和下列 DynamoDB 事件範例手動叫用 Lambda 函數。將以下內容複製到名為的檔案中input.txt

範例 input.txt
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ] }

執行以下 invoke 命令。

aws lambda invoke --function-name ProcessDynamoDBRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

如果您使用的是 AWS CLI 版本 2,則需要此cli-binary-format選項。若要讓此成為預設的設定,請執行 aws configure set cli-binary-format raw-in-base64-out。若要取得更多資訊,請參閱《AWS Command Line Interface 使用者指南第 2 版》AWS CLI 支援的全域命令列選項

該函數會在回應本文中傳回字串 message 訊息。

outputfile.txt 檔案中確認輸出。

建立啟用串流的 DynamoDB 資料表

建立啟用串流的 Amazon DynamoDB 資料表。

建立 DynamoDB 資料表
  1. 開啟 DynamoDB 主控台

  2. 選擇 建立資料表

  3. 根據下列設定建立資料表。

    • Table name (資料表名稱)lambda-dynamodb-stream

    • Primary key (主要金鑰) - id (字串)

  4. 選擇建立

啟用串流
  1. 開啟 DynamoDB 主控台

  2. 選擇 資料表

  3. 選擇 lambda-dynamodb-stream 資料表。

  4. 匯出與串流 下,選擇 DynamoDB 串流詳細資訊

  5. 選擇 Turn on (開啟)

  6. 針對「視景類型」,選擇「僅關鍵屬性」。

  7. 選擇 [開啟串流]。

寫下串流 ARN。在下個步驟將串流與您的 Lambda 函數相關聯時會需要用到它。如需啟用串流的詳細資訊,請參閱使用 DynamoDB Streams 擷取資料表活動

在中新增事件來源 AWS Lambda

在 AWS Lambda中建立事件來源映射。事件來源映射可將 DynamoDB 串流與您的 Lambda 函數相關聯。建立此事件來源對應之後, AWS Lambda 開始輪詢串流。

執行下列 AWS CLI create-event-source-mapping 命令。命令執行後,請記下 UUID。使用任何命令時,您會需要此 UUID 以參考到事件來源映射,例如當刪除事件來源映射的時候。

aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \ --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn

這會在指定的 DynamoDB 串流和 Lambda 函數之間建立映射。您可以將 DynamoDB 串流與多個 Lambda 函數相關聯,也可以將相同的 Lambda 函數與多個串流相關聯。但是 Lambda 函數會共用其所共有之串流的讀取傳送量。

您可以執行下列命令來取得事件來源映射的清單。

aws lambda list-event-source-mappings

該清單傳回所有您建立的事件來源映射,並針對每個映射顯示 LastProcessingResult 和其他內容。此欄位可用在發生問題時,提供資訊豐富的訊息。諸如 No records processed (表示尚 AWS Lambda 未開始輪詢或串流中沒有記錄) 和 OK (表示 AWS Lambda 成功從串流讀取記錄並叫用 Lambda 函數) 等值表示沒有問題。如果發生問題,您會收到錯誤訊息。

如果您有許多事件來源映射,請使用函數式稱參數來縮小結果。

aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords

測試設定

測試體 end-to-end 驗。當您更新資料表時,DynamoDB 會將事件記錄寫入串流。當 AWS Lambda 輪詢串流時,若偵測到串流上的新記錄,便會透過傳送事件到函數來代表您調用 Lambda 函數。

  1. 在 DynamoDB 主控台上針對資料表新增、更新、刪除項目。DynamoDB 會將這些動作的記錄寫入串流。

  2. AWS Lambda 輪詢串流,當它偵測到串流的更新時,它會傳入它在串流中找到的事件資料來叫用 Lambda 函數。

  3. 您的函數運行並在 Amazon 創建日誌 CloudWatch。您可以驗證 Amazon CloudWatch 主控台中報告的日誌。

清除您的資源

除非您想要保留為此教學課程建立的資源,否則您現在便可刪除。刪除您不再使用的 AWS 資源,您可以避免不必要的費用 AWS 帳戶。

若要刪除 Lambda 函數
  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇您建立的函數。

  3. 選擇 Actions (動作)、Delete (刪除)。

  4. 在文字輸入欄位中輸入 delete,然後選擇 刪除

刪除執行角色
  1. 開啟 IAM 主控台中的 角色頁面

  2. 選取您建立的執行角色。

  3. 選擇 刪除

  4. 在文字輸入欄位中輸入角色的名稱,然後選擇 刪除

若要刪除 DynamoDB 資料表
  1. 開啟 DynamoDB 主控台的 資料表頁面

  2. 選取您建立的資料表。

  3. 選擇 刪除

  4. 在文字方塊中輸入 delete

  5. 選擇 刪除資料表