// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{// Powertools Logger requires an environment variables against your function// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
publicasync Task FunctionHandler(KinesisEvent evnt, ILambdaContext context){if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{try{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
privateasync Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context){
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async workreturn data;
}
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0package main
import (
"context""log""github.com/aws/aws-lambda-go/events""github.com/aws/aws-lambda-go/lambda"
)
funchandler(ctx context.Context, kinesisEvent events.KinesisEvent)error{iflen(kinesisEvent.Records) == 0{
log.Printf("empty Kinesis event received")
returnnil
}
for _, record := range kinesisEvent.Records {
log.Printf("processed Kinesis event with EventId: %v", record.EventID)
recordDataBytes := record.Kinesis.Data
recordDataText := string(recordDataBytes)
log.Printf("record data: %v", recordDataText)
// TODO: Do interesting work based on the new data
}
log.Printf("successfully processed %v records", len(kinesisEvent.Records))
returnnil
}
funcmain(){
lambda.Start(handler)
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0package example;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
publicclassHandlerimplementsRequestHandler<KinesisEvent, Void> {@Overridepublic Void handleRequest(final KinesisEvent event, final Context context){
LambdaLogger logger = context.getLogger();
if (event.getRecords().isEmpty()) {
logger.log("Empty Kinesis Event received");
returnnull;
}
for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {try{
logger.log("Processed Event with EventId: "+record.getEventID());
String data = new String(record.getKinesis().getData().array());
logger.log("Data:"+ data);
// TODO: Do interesting work based on the new data
}
catch (Exception ex) {
logger.log("An error occurred:"+ex.getMessage());
throw ex;
}
}
logger.log("Successfully processed:"+event.getRecords().size()+" records");
returnnull;
}
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0exports.handler = async (event, context) => {for (const record of event.Records) {try{console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {console.error(`An error occurred ${err}`);
throw err;
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
};
asyncfunctiongetRecordDataAsync(payload) {var data = Buffer.from(payload.data, "base64").toString("utf-8");
awaitPromise.resolve(1); //Placeholder for actual async workreturn data;
}
使用 Lambda 消耗 Kinesis 事件。 TypeScript
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0import{
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
} from"aws-lambda";
import{ Buffer } from"buffer";
import{ Logger } from"@aws-lambda-powertools/logger";
const logger = new Logger({logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
exportconst functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<void> => {for (const record of event.Records) {try{
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
throw err;
}
logger.info(`Successfully processed ${event.Records.length} records.`);
}
};
asyncfunctiongetRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {var data = Buffer.from(payload.data, "base64").toString("utf-8");
awaitPromise.resolve(1); //Placeholder for actual async workreturn data;
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0<?php# using bref/bref and bref/logger for simplicityuseBref\Context\Context;
useBref\Event\Kinesis\KinesisEvent;
useBref\Event\Kinesis\KinesisHandler;
useBref\Logger\StderrLogger;
require__DIR__ . '/vendor/autoload.php';
classHandlerextendsKinesisHandler{private StderrLogger $logger;
publicfunction__construct(StderrLogger $logger)
{$this->logger = $logger;
}
/**
* @throws JsonException
* @throws \Bref\Event\InvalidLambdaEvent
*/publicfunctionhandleKinesis(KinesisEvent $event, Context $context): void{$this->logger->info("Processing records");
$records = $event->getRecords();
foreach ($recordsas$record) {$data = $record->getData();
$this->logger->info(json_encode($data));
// TODO: Do interesting work based on the new data// Any exception thrown will be logged and the invocation will be marked as failed
}
$totalRecords = count($records);
$this->logger->info("Successfully processed $totalRecords records");
}
}
$logger = new StderrLogger();
returnnew Handler($logger);
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.# SPDX-License-Identifier: Apache-2.0import base64
deflambda_handler(event, context):for record in event['Records']:
try:
print(f"Processed Kinesis Event - EventID: {record['eventID']}")
record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
print(f"Record Data: {record_data}")
# TODO: Do interesting work based on the new dataexcept Exception as e:
print(f"An error occurred {e}")
raise e
print(f"Successfully processed {len(event['Records'])} records.")
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.# SPDX-License-Identifier: Apache-2.0require'aws-sdk'deflambda_handler(event:, context:)
event['Records'].each do|record|begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"# TODO: Do interesting work based on the new datarescue => err
$stderr.puts "An error occurred #{err}"
raise err
endend
puts "Successfully processed #{event['Records'].length} records."enddefget_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('UTF-8')
# Placeholder for actual async work# You can use Ruby's asynchronous programming tools like async/await or fibers here.return data
end
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
asyncfnfunction_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
returnOk(());
}
event.payload.records.iter().for_each(|record| {
tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());
let record_data = std::str::from_utf8(&record.kinesis.data);
match record_data {Ok(data) => {// log the record data
tracing::info!("Data: {}", data);
}
Err(e) => {
tracing::error!("Error: {}", e);
}
}
});
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(())
}
#[tokio::main]asyncfnmain() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegrationSampleCode;
public class Function
{// Powertools Logger requires an environment variables against your function// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
publicasync Task FunctionHandler(KinesisEvent evnt, ILambdaContext context){if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
return;
}
foreach (var record in evnt.Records)
{try{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
throw;
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
}
privateasync Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context){
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async workreturn data;
}
}