本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
下列程式碼範例示範如何實作 Lambda 函數,以便接收在收到來自 Amazon MSK 叢集的訊息時觸發的事件。函數會擷取 MSK 承載並記下記錄內容。
- .NET
-
- AWS SDK for .NET
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 .NET 搭配 Lambda 來取用 Amazon MSK 事件。
using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
- Go
-
- SDK for Go V2
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 Go 搭配 Lambda 來取用 Amazon MSK 事件。
package main import ( "encoding/base64" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.KafkaEvent) { for key, records := range event.Records { fmt.Println("Key:", key) for _, record := range records { fmt.Println("Record:", record) decodedValue, _ := base64.StdEncoding.DecodeString(record.Value) message := string(decodedValue) fmt.Println("Message:", message) } } } func main() { lambda.Start(handler) }
- Java
-
- SDK for Java 2.x
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 Java 搭配 Lambda 來取用 Amazon MSK 事件。
import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
- JavaScript
-
- SDK for JavaScript (v3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 JavaScript 搭配 Lambda 來取用 Amazon MSK 事件。
exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
使用 TypeScript 搭配 Lambda 使用 Amazon MSK 事件。
import { MSKEvent, Context } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "msk-handler-sample", }); export const handler = async ( event: MSKEvent, context: Context ): Promise<void> => { for (const [topic, topicRecords] of Object.entries(event.records)) { logger.info(`Processing key: ${topic}`); // Process each record in the partition for (const record of topicRecords) { try { // Decode the message value from base64 const decodedMessage = Buffer.from(record.value, 'base64').toString(); logger.info({ message: decodedMessage }); } catch (error) { logger.error('Error processing event', { error }); throw error; } }; } }
- PHP
-
- SDK for PHP
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 PHP 搭配 Lambda 來取用 Amazon MSK 事件。
<?php // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kafka\KafkaEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): void { $kafkaEvent = new KafkaEvent($event); $this->logger->info("Processing records"); $records = $kafkaEvent->getRecords(); foreach ($records as $record) { try { $key = $record->getKey(); $this->logger->info("Key: $key"); $values = $record->getValue(); $this->logger->info(json_encode($values)); foreach ($values as $value) { $this->logger->info("Value: $value"); } } catch (Exception $e) { $this->logger->error($e->getMessage()); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
- Python
-
- SDK for Python (Boto3)
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 Python 搭配 Lambda 來取用 Amazon MSK 事件。
import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
- Ruby
-
- SDK for Ruby
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 Ruby 搭配 Lambda 來取用 Amazon MSK 事件。
require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end
- Rust
-
- SDK for Rust
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 Rust 搭配 Lambda 使用 Amazon MSK 事件。
use aws_lambda_events::event::kafka::KafkaEvent; use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; use base64::prelude::*; use serde_json::{Value}; use tracing::{info}; /// Pre-Requisites: /// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html /// 2. Add packages tracing, tracing-subscriber, serde_json, base64 /// /// This is the main body for the function. /// Write your code inside it. /// There are some code example in the following URLs: /// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples /// - https://github.com/aws-samples/serverless-rust-demo/ async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> { let payload = event.payload.records; for (_name, records) in payload.iter() { for record in records { let record_text = record.value.as_ref().ok_or("Value is None")?; info!("Record: {}", &record_text); // perform Base64 decoding let record_bytes = BASE64_STANDARD.decode(record_text)?; let message = std::str::from_utf8(&record_bytes)?; info!("Message: {}", message); } } Ok(().into()) } #[tokio::main] async fn main() -> Result<(), Error> { // required to enable CloudWatch error logging by the runtime tracing::init_default_subscriber(); info!("Setup CW subscriber!"); run(service_fn(function_handler)).await }
- AWS SDK for .NET
-
注意
GitHub 上提供更多範例。尋找完整範例,並了解如何在無伺服器範例
儲存庫中設定和執行。 使用 .NET 搭配 Lambda 來取用 Amazon MSK 事件。
using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
如需 AWS SDK 開發人員指南和程式碼範例的完整清單,請參閱 搭配 AWS SDK 使用 Lambda。此主題也包含有關入門的資訊和舊版 SDK 的詳細資訊。