// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
publicclassFunction{public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context){
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
StreamsEventResponse streamsEventResponse = newStreamsEventResponse();
foreach (var record in dynamoEvent.Records)
{try{
var sequenceNumber = record.Dynamodb.SequenceNumber;
context.Logger.LogInformation(sequenceNumber);
}
catch (Exception ex)
{
context.Logger.LogError(ex.Message);
batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
}
}
if (batchItemFailures.Count > 0)
{
streamsEventResponse.BatchItemFailures = batchItemFailures;
}
context.Logger.LogInformation("Stream processing complete.");
return streamsEventResponse;
}
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
publicclassProcessDynamodbRecordsimplementsRequestHandler<DynamodbEvent, Serializable> {@Overridepublic StreamsEventResponse handleRequest(DynamodbEvent input, Context context){
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
String curRecordSequenceNumber = "";
for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) {try{//Process your record
StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb();
curRecordSequenceNumber = dynamodbRecord.getSequenceNumber();
} catch (Exception e) {/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */
batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
returnnew StreamsEventResponse(batchItemFailures);
}
}
returnnew StreamsEventResponse();
}
}
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.# SPDX-License-Identifier: Apache-2.0defhandler(event, context):
records = event.get("Records")
curRecordSequenceNumber = ""for record in records:
try:
# Process your record
curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"]
except Exception as e:
# Return failed record's sequence numberreturn{"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}
return{"batchItemFailures":[]}
use aws_lambda_events::{
event::dynamodb::{Event, EventRecord, StreamRecord},
streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
/// Process the stream recordfnprocess_record(record: &EventRecord) -> Result<(), Error> {let stream_record: &StreamRecord = &record.change;
// process your stream record here...
tracing::info!("Data: {:?}", stream_record);
Ok(())
}
/// Main Lambda handler here...asyncfnfunction_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> {letmut response = DynamoDbEventResponse {
batch_item_failures: vec![],
};
let records = &event.payload.records;
if records.is_empty() {
tracing::info!("No records found. Exiting.");
returnOk(response);
}
for record in records {
tracing::info!("EventId: {}", record.event_id);
// Couldn't find a sequence numberif record.change.sequence_number.is_none() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: Some("".to_string()),
});
returnOk(response);
}
// Process your record here...if process_record(record).is_err() {
response.batch_item_failures.push(DynamoDbBatchItemFailure {
item_identifier: record.change.sequence_number.clone(),
});
/* Since we are working with streams, we can return the failed item immediately.
Lambda will immediately begin to retry processing from this failed item onwards. */returnOk(response);
}
}
tracing::info!("Successfully processed {} record(s)", records.len());
Ok(response)
}
#[tokio::main]asyncfnmain() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();
run(service_fn(function_handler)).await
}
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.// SPDX-License-Identifier: Apache-2.0using System.Text.Json;
using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.DynamoDBEvents;
// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]
namespace AWSLambda_DDB;
publicclassFunction{public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context){
context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records...");
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>();
StreamsEventResponse streamsEventResponse = newStreamsEventResponse();
foreach (var record in dynamoEvent.Records)
{try{
var sequenceNumber = record.Dynamodb.SequenceNumber;
context.Logger.LogInformation(sequenceNumber);
}
catch (Exception ex)
{
context.Logger.LogError(ex.Message);
batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber });
}
}
if (batchItemFailures.Count > 0)
{
streamsEventResponse.BatchItemFailures = batchItemFailures;
}
context.Logger.LogInformation("Stream processing complete.");
return streamsEventResponse;
}
}