// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0
using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegration;
publicclassFunction{// Powertools Logger requires an environment variables against your function// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context){if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
returnnewStreamsEventResponse();
}
foreach (var record in evnt.Records)
{try{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */returnnew StreamsEventResponse
{
BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
{new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
}
};
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
returnnewStreamsEventResponse();
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context){
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async workreturn data;
}
}
publicclassStreamsEventResponse{
[JsonPropertyName("batchItemFailures")]
public IList<BatchItemFailure> BatchItemFailures { get; set; }
publicclassBatchItemFailure{
[JsonPropertyName("itemIdentifier")]
public string ItemIdentifier { get; set; }
}
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0package main
import (
"context""fmt""github.com/aws/aws-lambda-go/events""github.com/aws/aws-lambda-go/lambda"
)
funchandler(ctx context.Context, kinesisEvent events.KinesisEvent)(map[string]interface{}, error){
batchItemFailures := []map[string]interface{}{}
for _, record := range kinesisEvent.Records {
curRecordSequenceNumber := ""// Process your recordif/* Your record processing condition here */{
curRecordSequenceNumber = record.Kinesis.SequenceNumber
}
// Add a condition to check if the record processing failedif curRecordSequenceNumber != ""{
batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
}
}
kinesisBatchResponse := map[string]interface{}{"batchItemFailures": batchItemFailures,
}
return kinesisBatchResponse, nil
}
funcmain(){
lambda.Start(handler)
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
publicclassProcessKinesisRecordsimplementsRequestHandler<KinesisEvent, StreamsEventResponse> {@Overridepublic StreamsEventResponse handleRequest(KinesisEvent input, Context context){
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {try{//Process your record
KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
curRecordSequenceNumber = kinesisRecord.getSequenceNumber();
} catch (Exception e) {/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
returnnew StreamsEventResponse(batchItemFailures);
}
}
returnnew StreamsEventResponse(batchItemFailures);
}
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0exports.handler = async (event, context) => {for (const record of event.Records) {try{console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
console.log(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {console.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */return{batchItemFailures: [{itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
console.log(`Successfully processed ${event.Records.length} records.`);
return{batchItemFailures: [] };
};
asyncfunctiongetRecordDataAsync(payload) {var data = Buffer.from(payload.data, "base64").toString("utf-8");
awaitPromise.resolve(1); //Placeholder for actual async workreturn data;
}
使用 TypeScript 搭配 Lambda 報告 Kinesis 批次項目失敗。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0import{
KinesisStreamEvent,
Context,
KinesisStreamHandler,
KinesisStreamRecordPayload,
KinesisStreamBatchResponse,
} from"aws-lambda";
import{ Buffer } from"buffer";
import{ Logger } from"@aws-lambda-powertools/logger";
const logger = new Logger({logLevel: "INFO",
serviceName: "kinesis-stream-handler-sample",
});
exportconst functionHandler: KinesisStreamHandler = async (
event: KinesisStreamEvent,
context: Context
): Promise<KinesisStreamBatchResponse> => {for (const record of event.Records) {try{
logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
const recordData = await getRecordDataAsync(record.kinesis);
logger.info(`Record Data: ${recordData}`);
// TODO: Do interesting work based on the new data
} catch (err) {
logger.error(`An error occurred ${err}`);
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */return{batchItemFailures: [{itemIdentifier: record.kinesis.sequenceNumber }],
};
}
}
logger.info(`Successfully processed ${event.Records.length} records.`);
return{batchItemFailures: [] };
};
asyncfunctiongetRecordDataAsync(
payload: KinesisStreamRecordPayload
): Promise<string> {var data = Buffer.from(payload.data, "base64").toString("utf-8");
awaitPromise.resolve(1); //Placeholder for actual async workreturn data;
}
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.# SPDX-License-Identifier: Apache-2.0defhandler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""for record in records:
try:
# Process your record
curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
except Exception as e:
# Return failed record's sequence numberreturn{"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return{"batchItemFailures":[]}
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.# SPDX-License-Identifier: Apache-2.0require'aws-sdk'deflambda_handler(event:, context:)
batch_item_failures = []
event['Records'].each do|record|begin
puts "Processed Kinesis Event - EventID: #{record['eventID']}"
record_data = get_record_data_async(record['kinesis'])
puts "Record Data: #{record_data}"# TODO: Do interesting work based on the new datarescue StandardError => err
puts "An error occurred #{err}"# Since we are working with streams, we can return the failed item immediately.# Lambda will immediately begin to retry processing from this failed item onwards.return{batchItemFailures: [{itemIdentifier: record['kinesis']['sequenceNumber'] }] }
endend
puts "Successfully processed #{event['Records'].length} records."{batchItemFailures: batch_item_failures }
enddefget_record_data_async(payload)
data = Base64.decode64(payload['data']).force_encoding('utf-8')
# Placeholder for actual async work
sleep(1)
data
end
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0use aws_lambda_events::{
event::kinesis::KinesisEvent,
kinesis::KinesisEventRecord,
streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
asyncfnfunction_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {letmut response = KinesisEventResponse {
batch_item_failures: vec![],
};
if event.payload.records.is_empty() {
tracing::info!("No records found. Exiting.");
returnOk(response);
}
for record in &event.payload.records {
tracing::info!(
"EventId: {}",
record.event_id.as_deref().unwrap_or_default()
);
let record_processing_result = process_record(record);
if record_processing_result.is_err() {
response.batch_item_failures.push(KinesisBatchItemFailure {
item_identifier: record.kinesis.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */returnOk(response);
}
}
tracing::info!(
"Successfully processed {} records",
event.payload.records.len()
);
Ok(response)
}
fnprocess_record(record: &KinesisEventRecord) -> Result<(), Error> {let record_data = std::str::from_utf8(record.kinesis.data.as_slice());
ifletSome(err) = record_data.err() {
tracing::error!("Error: {}", err);
returnErr(Error::from(err));
}
let record_data = record_data.unwrap_or_default();
// do something interesting with the data
tracing::info!("Data: {}", record_data);
Ok(())
}
#[tokio::main]asyncfnmain() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0
using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace KinesisIntegration;
publicclassFunction{// Powertools Logger requires an environment variables against your function// POWERTOOLS_SERVICE_NAME
[Logging(LogEvent = true)]
public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context){if (evnt.Records.Count == 0)
{
Logger.LogInformation("Empty Kinesis Event received");
returnnewStreamsEventResponse();
}
foreach (var record in evnt.Records)
{try{
Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
string data = await GetRecordDataAsync(record.Kinesis, context);
Logger.LogInformation($"Data: {data}");
// TODO: Do interesting work based on the new data
}
catch (Exception ex)
{
Logger.LogError($"An error occurred {ex.Message}");
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */returnnew StreamsEventResponse
{
BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
{new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
}
};
}
}
Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
returnnewStreamsEventResponse();
}
private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context){
byte[] bytes = record.Data.ToArray();
string data = Encoding.UTF8.GetString(bytes);
await Task.CompletedTask; //Placeholder for actual async workreturn data;
}
}
publicclassStreamsEventResponse{
[JsonPropertyName("batchItemFailures")]
public IList<BatchItemFailure> BatchItemFailures { get; set; }
publicclassBatchItemFailure{
[JsonPropertyName("itemIdentifier")]
public string ItemIdentifier { get; set; }
}
}