LambdaFunctionProcessor

class aws_cdk.aws_kinesisfirehose.LambdaFunctionProcessor(lambda_function, *, buffer_interval=None, buffer_size=None, retries=None)

Bases: object

(experimental) Use an AWS Lambda function to transform records.

Stability

experimental

Example:

import path as path
import aws_cdk.aws_kinesisfirehose as firehose
import aws_cdk.aws_kms as kms
import aws_cdk.aws_lambda_nodejs as lambdanodejs
import aws_cdk.aws_logs as logs
import aws_cdk.aws_s3 as s3
import aws_cdk.core as cdk
import aws_cdk.aws_kinesisfirehose_destinations as destinations

app = cdk.App()

stack = cdk.Stack(app, "aws-cdk-firehose-delivery-stream-s3-all-properties")

bucket = s3.Bucket(stack, "Bucket",
    removal_policy=cdk.RemovalPolicy.DESTROY,
    auto_delete_objects=True
)

backup_bucket = s3.Bucket(stack, "BackupBucket",
    removal_policy=cdk.RemovalPolicy.DESTROY,
    auto_delete_objects=True
)
log_group = logs.LogGroup(stack, "LogGroup",
    removal_policy=cdk.RemovalPolicy.DESTROY
)

data_processor_function = lambdanodejs.NodejsFunction(stack, "DataProcessorFunction",
    entry=path.join(__dirname, "lambda-data-processor.js"),
    timeout=cdk.Duration.minutes(1)
)

processor = firehose.LambdaFunctionProcessor(data_processor_function,
    buffer_interval=cdk.Duration.seconds(60),
    buffer_size=cdk.Size.mebibytes(1),
    retries=1
)

key = kms.Key(stack, "Key",
    removal_policy=cdk.RemovalPolicy.DESTROY
)

backup_key = kms.Key(stack, "BackupKey",
    removal_policy=cdk.RemovalPolicy.DESTROY
)

firehose.DeliveryStream(stack, "Delivery Stream",
    destinations=[destinations.S3Bucket(bucket,
        logging=True,
        log_group=log_group,
        processor=processor,
        compression=destinations.Compression.GZIP,
        data_output_prefix="regularPrefix",
        error_output_prefix="errorPrefix",
        buffering_interval=cdk.Duration.seconds(60),
        buffering_size=cdk.Size.mebibytes(1),
        encryption_key=key,
        s3_backup=destinations.DestinationS3BackupProps(
            mode=destinations.BackupMode.ALL,
            bucket=backup_bucket,
            compression=destinations.Compression.ZIP,
            data_output_prefix="backupPrefix",
            error_output_prefix="backupErrorPrefix",
            buffering_interval=cdk.Duration.seconds(60),
            buffering_size=cdk.Size.mebibytes(1),
            encryption_key=backup_key
        )
    )]
)

app.synth()
Parameters
  • lambda_function (IFunction) –

  • buffer_interval (Optional[Duration]) – (experimental) The length of time Kinesis Data Firehose will buffer incoming data before calling the processor. s Default: Duration.minutes(1)

  • buffer_size (Optional[Size]) – (experimental) The amount of incoming data Kinesis Data Firehose will buffer before calling the processor. Default: Size.mebibytes(3)

  • retries (Union[int, float, None]) – (experimental) The number of times Kinesis Data Firehose will retry the processor invocation after a failure due to network timeout or invocation limits. Default: 3

Stability

experimental

Methods

bind(_scope, *, role)

(experimental) Binds this processor to a destination of a delivery stream.

Implementers should use this method to grant processor invocation permissions to the provided stream and return the necessary configuration to register as a processor.

Parameters
  • _scope (Construct) –

  • role (IRole) – (experimental) The IAM role assumed by Kinesis Data Firehose to write to the destination that this DataProcessor will bind to.

Stability

experimental

Return type

DataProcessorConfig

Attributes

props

(experimental) The constructor props of the LambdaFunctionProcessor.

Stability

experimental

Return type

DataProcessorProps