Module aws_lambda_powertools.utilities.data_classes.kinesis_firehose_event
Classes
class KinesisFirehoseDataTransformationRecord (record_id: str, result: "Literal['Ok', 'Dropped', 'ProcessingFailed']" = 'Ok', data: str = '', metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None, json_serializer: Callable = <function dumps>, json_deserializer: Callable = <function loads>)-
Record in Kinesis Data Firehose response object.
Parameters
record_id:str- uniquely identifies this record within the current batch
result:Literal["Ok", "Dropped", "ProcessingFailed"]- record data transformation status, whether it succeeded, should be dropped, or failed.
data:str-
base64-encoded payload, by default empty string.
Use
data_from_textordata_from_jsonmethods to convert data if needed. metadata:KinesisFirehoseDataTransformationRecordMetadata | None-
Metadata associated with this record; can contain partition keys.
See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
json_serializer:Callable- function to serialize
objto a JSON formattedstr, by default json.dumps json_deserializer:Callable- function to deserialize
str,bytes, bytearraycontaining a JSON document to a Pythonobj`, by default json.loads
Documentation:
Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationRecord: """Record in Kinesis Data Firehose response object. Parameters ---------- record_id: str uniquely identifies this record within the current batch result: Literal["Ok", "Dropped", "ProcessingFailed"] record data transformation status, whether it succeeded, should be dropped, or failed. data: str base64-encoded payload, by default empty string. Use `data_from_text` or `data_from_json` methods to convert data if needed. metadata: KinesisFirehoseDataTransformationRecordMetadata | None Metadata associated with this record; can contain partition keys. See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html json_serializer: Callable function to serialize `obj` to a JSON formatted `str`, by default json.dumps json_deserializer: Callable function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`, by default json.loads Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html """ _valid_result_types: ClassVar[tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed") record_id: str result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok" data: str = "" metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None json_serializer: Callable = json.dumps json_deserializer: Callable = json.loads def asdict(self) -> dict: if self.result not in self._valid_result_types: warnings.warn( stacklevel=1, message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"', ) record: dict[str, Any] = { "recordId": self.record_id, "result": self.result, "data": self.data, } if self.metadata: record["metadata"] = self.metadata.asdict() return record @property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data) @property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8") @cached_property def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" if not self.data: return {} return self.json_deserializer(self.data_as_text)Class variables
var data : strvar metadata : KinesisFirehoseDataTransformationRecordMetadata | Nonevar record_id : strvar result : Literal['Ok', 'Dropped', 'ProcessingFailed']
Instance variables
prop data_as_bytes : bytes-
Decoded base64-encoded data as bytes
Expand source code
@property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" if not self.data: return b"" return base64.b64decode(self.data) var data_as_json-
Decoded base64-encoded data loaded to json
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val prop data_as_text : str-
Decoded base64-encoded data as text
Expand source code
@property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" if not self.data: return "" return self.data_as_bytes.decode("utf-8")
Methods
def asdict(self) ‑> dictdef json_deserializer(s, *, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw)-
Deserialize
s(astr,bytesorbytearrayinstance containing a JSON document) to a Python object.object_hookis an optional function that will be called with the result of any object literal decode (adict). The return value ofobject_hookwill be used instead of thedict. This feature can be used to implement custom decoders (e.g. JSON-RPC class hinting).object_pairs_hookis an optional function that will be called with the result of any object literal decoded with an ordered list of pairs. The return value ofobject_pairs_hookwill be used instead of thedict. This feature can be used to implement custom decoders. Ifobject_hookis also defined, theobject_pairs_hooktakes priority.parse_float, if specified, will be called with the string of every JSON float to be decoded. By default this is equivalent to float(num_str). This can be used to use another datatype or parser for JSON floats (e.g. decimal.Decimal).parse_int, if specified, will be called with the string of every JSON int to be decoded. By default this is equivalent to int(num_str). This can be used to use another datatype or parser for JSON integers (e.g. float).parse_constant, if specified, will be called with one of the following strings: -Infinity, Infinity, NaN. This can be used to raise an exception if invalid JSON numbers are encountered.To use a custom
JSONDecodersubclass, specify it with theclskwarg; otherwiseJSONDecoderis used. def json_serializer(obj, *, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, cls=None, indent=None, separators=None, default=None, sort_keys=False, **kw)-
Serialize
objto a JSON formattedstr.If
skipkeysis true thendictkeys that are not basic types (str,int,float,bool,None) will be skipped instead of raising aTypeError.If
ensure_asciiis false, then the return value can contain non-ASCII characters if they appear in strings contained inobj. Otherwise, all such characters are escaped in JSON strings.If
check_circularis false, then the circular reference check for container types will be skipped and a circular reference will result in anRecursionError(or worse).If
allow_nanis false, then it will be aValueErrorto serialize out of rangefloatvalues (nan,inf,-inf) in strict compliance of the JSON specification, instead of using the JavaScript equivalents (NaN,Infinity,-Infinity).If
indentis a non-negative integer, then JSON array elements and object members will be pretty-printed with that indent level. An indent level of 0 will only insert newlines.Noneis the most compact representation.If specified,
separatorsshould be an(item_separator, key_separator)tuple. The default is(', ', ': ')if indent isNoneand(',', ': ')otherwise. To get the most compact JSON representation, you should specify(',', ':')to eliminate whitespace.default(obj)is a function that should return a serializable version of obj or raise TypeError. The default simply raises TypeError.If sort_keys is true (default:
False), then the output of dictionaries will be sorted by key.To use a custom
JSONEncodersubclass (e.g. one that overrides the.default()method to serialize additional types), specify it with theclskwarg; otherwiseJSONEncoderis used.
class KinesisFirehoseDataTransformationRecordMetadata (partition_keys: dict[str, str] = <factory>)-
Metadata in Firehose Data Transform Record.
Parameters
partition_keys:dict[str, str]- A dict of partition keys/value in string format, e.g.
{"year":"2023","month":"09"}
Documentation:
Expand source code
@dataclass(repr=False, order=False, frozen=True) class KinesisFirehoseDataTransformationRecordMetadata: """ Metadata in Firehose Data Transform Record. Parameters ---------- partition_keys: dict[str, str] A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}` Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ partition_keys: dict[str, str] = field(default_factory=lambda: {}) def asdict(self) -> dict: if self.partition_keys is not None: return {"partitionKeys": self.partition_keys} return {}Class variables
var partition_keys : dict[str, str]
Methods
def asdict(self) ‑> dict
class KinesisFirehoseDataTransformationResponse (records: list[KinesisFirehoseDataTransformationRecord] = <factory>)-
Kinesis Data Firehose response object
Documentation:
Parameters
records:list[KinesisFirehoseResponseRecord]- records of Kinesis Data Firehose response object,
optional parameter at start. can be added later using
add_recordfunction.
Examples
Transforming data records
from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict()Expand source code
@dataclass(repr=False, order=False) class KinesisFirehoseDataTransformationResponse: """Kinesis Data Firehose response object Documentation: -------------- - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html Parameters ---------- records : list[KinesisFirehoseResponseRecord] records of Kinesis Data Firehose response object, optional parameter at start. can be added later using `add_record` function. Examples -------- **Transforming data records** ```python from aws_lambda_powertools.utilities.data_classes import ( KinesisFirehoseDataTransformationRecord, KinesisFirehoseDataTransformationResponse, KinesisFirehoseEvent, ) from aws_lambda_powertools.utilities.serialization import base64_from_json from aws_lambda_powertools.utilities.typing import LambdaContext def lambda_handler(event: dict, context: LambdaContext): firehose_event = KinesisFirehoseEvent(event) result = KinesisFirehoseDataTransformationResponse() for record in firehose_event.records: payload = record.data_as_text # base64 decoded data as str ## generate data to return transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload} processed_record = KinesisFirehoseDataTransformationRecord( record_id=record.record_id, data=base64_from_json(transformed_data), ) result.add_record(processed_record) # return transformed records return result.asdict() ``` """ records: list[KinesisFirehoseDataTransformationRecord] = field(default_factory=list) def add_record(self, record: KinesisFirehoseDataTransformationRecord): self.records.append(record) def asdict(self) -> dict: if not self.records: raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response") return {"records": [record.asdict() for record in self.records]}Class variables
var records : list[KinesisFirehoseDataTransformationRecord]
Methods
def add_record(self, record: KinesisFirehoseDataTransformationRecord)def asdict(self) ‑> dict
class KinesisFirehoseEvent (data: dict[str, Any], json_deserializer: Callable | None = None)-
Kinesis Data Firehose event
Documentation:
Parameters
data:dict[str, Any]- Lambda Event Source Event payload
json_deserializer:Callable, optional- function to deserialize
str,bytes,bytearraycontaining a JSON document to a Pythonobj, by default json.loads
Expand source code
class KinesisFirehoseEvent(DictWrapper): """Kinesis Data Firehose event Documentation: -------------- - https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html """ @property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"] @property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"] @property def source_kinesis_stream_arn(self) -> str | None: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn") @property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"] @property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop delivery_stream_arn : str-
ARN of the Firehose Data Firehose Delivery Stream
Expand source code
@property def delivery_stream_arn(self) -> str: """ARN of the Firehose Data Firehose Delivery Stream""" return self["deliveryStreamArn"] prop invocation_id : str-
Unique ID for for Lambda invocation
Expand source code
@property def invocation_id(self) -> str: """Unique ID for for Lambda invocation""" return self["invocationId"] prop records : Iterator[KinesisFirehoseRecord]-
Expand source code
@property def records(self) -> Iterator[KinesisFirehoseRecord]: for record in self["records"]: yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer) prop region : str-
AWS region where the event originated eg: us-east-1
Expand source code
@property def region(self) -> str: """AWS region where the event originated eg: us-east-1""" return self["region"] prop source_kinesis_stream_arn : str | None-
ARN of the Kinesis Stream; present only when Kinesis Stream is source
Expand source code
@property def source_kinesis_stream_arn(self) -> str | None: """ARN of the Kinesis Stream; present only when Kinesis Stream is source""" return self.get("sourceKinesisStreamArn")
Inherited members
class KinesisFirehoseRecord (data: dict[str, Any], json_deserializer: Callable | None = None)-
Provides a single read only access to a wrapper dict
Parameters
data:dict[str, Any]- Lambda Event Source Event payload
json_deserializer:Callable, optional- function to deserialize
str,bytes,bytearraycontaining a JSON document to a Pythonobj, by default json.loads
Expand source code
class KinesisFirehoseRecord(DictWrapper): @property def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" return self["approximateArrivalTimestamp"] @property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" return self["recordId"] @property def data(self) -> str: """The data blob, base64-encoded""" return self["data"] @property def metadata(self) -> KinesisFirehoseRecordMetadata | None: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None @property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" return base64.b64decode(self.data) @property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" return self.data_as_bytes.decode("utf-8") @cached_property def data_as_json(self) -> dict: """Decoded base64-encoded data loaded to json""" return self._json_deserializer(self.data_as_text) def build_data_transformation_response( self, result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok", data: str = "", metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None, ) -> KinesisFirehoseDataTransformationRecord: """Create a KinesisFirehoseResponseRecord directly using the record_id and given values Parameters ---------- result : Literal["Ok", "Dropped", "ProcessingFailed"] processing result, supported value: Ok, Dropped, ProcessingFailed data : str, optional data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or use either function like `data_from_text`, `data_from_json` to populate data metadata: KinesisFirehoseResponseRecordMetadata, optional Metadata associated with this record; can contain partition keys - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html """ return KinesisFirehoseDataTransformationRecord( record_id=self.record_id, result=result, data=data, metadata=metadata, )Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop approximate_arrival_timestamp : int-
The approximate time that the record was inserted into the delivery stream
Expand source code
@property def approximate_arrival_timestamp(self) -> int: """The approximate time that the record was inserted into the delivery stream""" return self["approximateArrivalTimestamp"] prop data : str-
The data blob, base64-encoded
Expand source code
@property def data(self) -> str: """The data blob, base64-encoded""" return self["data"] prop data_as_bytes : bytes-
Decoded base64-encoded data as bytes
Expand source code
@property def data_as_bytes(self) -> bytes: """Decoded base64-encoded data as bytes""" return base64.b64decode(self.data) var data_as_json-
Decoded base64-encoded data loaded to json
Expand source code
def __get__(self, instance, owner=None): if instance is None: return self if self.attrname is None: raise TypeError( "Cannot use cached_property instance without calling __set_name__ on it.") try: cache = instance.__dict__ except AttributeError: # not all objects have __dict__ (e.g. class defines slots) msg = ( f"No '__dict__' attribute on {type(instance).__name__!r} " f"instance to cache {self.attrname!r} property." ) raise TypeError(msg) from None val = cache.get(self.attrname, _NOT_FOUND) if val is _NOT_FOUND: val = self.func(instance) try: cache[self.attrname] = val except TypeError: msg = ( f"The '__dict__' attribute on {type(instance).__name__!r} instance " f"does not support item assignment for caching {self.attrname!r} property." ) raise TypeError(msg) from None return val prop data_as_text : str-
Decoded base64-encoded data as text
Expand source code
@property def data_as_text(self) -> str: """Decoded base64-encoded data as text""" return self.data_as_bytes.decode("utf-8") prop metadata : KinesisFirehoseRecordMetadata | None-
Optional: metadata associated with this record; present only when Kinesis Stream is source
Expand source code
@property def metadata(self) -> KinesisFirehoseRecordMetadata | None: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None prop record_id : str-
Record ID; uniquely identifies this record within the current batch
Expand source code
@property def record_id(self) -> str: """Record ID; uniquely identifies this record within the current batch""" return self["recordId"]
Methods
def build_data_transformation_response(self, result: "Literal['Ok', 'Dropped', 'ProcessingFailed']" = 'Ok', data: str = '', metadata: KinesisFirehoseDataTransformationRecordMetadata | None = None)-
Create a KinesisFirehoseResponseRecord directly using the record_id and given values
Parameters
result:Literal["Ok", "Dropped", "ProcessingFailed"]- processing result, supported value: Ok, Dropped, ProcessingFailed
data:str, optional- data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
use either function like
data_from_text,data_from_jsonto populate data metadata:KinesisFirehoseResponseRecordMetadata, optional- Metadata associated with this record; can contain partition keys - https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
Inherited members
class KinesisFirehoseRecordMetadata (data: dict[str, Any], json_deserializer: Callable | None = None)-
Provides a single read only access to a wrapper dict
Parameters
data:dict[str, Any]- Lambda Event Source Event payload
json_deserializer:Callable, optional- function to deserialize
str,bytes,bytearraycontaining a JSON document to a Pythonobj, by default json.loads
Expand source code
class KinesisFirehoseRecordMetadata(DictWrapper): @property def _metadata(self) -> dict: """Optional: metadata associated with this record; present only when Kinesis Stream is source""" return self["kinesisRecordMetadata"] # could raise KeyError @property def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" return self._metadata["shardId"] @property def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" return self._metadata["partitionKey"] @property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self._metadata["approximateArrivalTimestamp"] @property def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" return self._metadata["sequenceNumber"] @property def subsequence_number(self) -> int: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation """ return self._metadata["subsequenceNumber"]Ancestors
- DictWrapper
- collections.abc.Mapping
- collections.abc.Collection
- collections.abc.Sized
- collections.abc.Iterable
- collections.abc.Container
- typing.Generic
Instance variables
prop approximate_arrival_timestamp : int-
Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source
Expand source code
@property def approximate_arrival_timestamp(self) -> int: """Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source""" return self._metadata["approximateArrivalTimestamp"] prop partition_key : str-
Kinesis stream partition key; present only when Kinesis Stream is source
Expand source code
@property def partition_key(self) -> str: """Kinesis stream partition key; present only when Kinesis Stream is source""" return self._metadata["partitionKey"] prop sequence_number : str-
Kinesis stream sequence number; present only when Kinesis Stream is source
Expand source code
@property def sequence_number(self) -> str: """Kinesis stream sequence number; present only when Kinesis Stream is source""" return self._metadata["sequenceNumber"] prop shard_id : str-
Kinesis stream shard ID; present only when Kinesis Stream is source
Expand source code
@property def shard_id(self) -> str: """Kinesis stream shard ID; present only when Kinesis Stream is source""" return self._metadata["shardId"] prop subsequence_number : int-
Kinesis stream sub-sequence number; present only when Kinesis Stream is source
Note: this will only be present for Kinesis streams using record aggregation
Expand source code
@property def subsequence_number(self) -> int: """Kinesis stream sub-sequence number; present only when Kinesis Stream is source Note: this will only be present for Kinesis streams using record aggregation """ return self._metadata["subsequenceNumber"]
Inherited members