Module aws_lambda_powertools.utilities.parser.models.kinesis_firehose_sqs
Expand source code
import json
from typing import List, Optional
from pydantic import BaseModel, PositiveInt, validator
from aws_lambda_powertools.shared.functions import base64_decode
from aws_lambda_powertools.utilities.parser.models import KinesisFirehoseRecordMetadata
from .sqs import SqsRecordModel
class KinesisFirehoseSqsRecord(BaseModel):
data: SqsRecordModel
recordId: str
approximateArrivalTimestamp: PositiveInt
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None
@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
# Firehose payload is encoded
return json.loads(base64_decode(value))
class KinesisFirehoseSqsModel(BaseModel):
invocationId: str
deliveryStreamArn: str
region: str
sourceKinesisStreamArn: Optional[str] = None
records: List[KinesisFirehoseSqsRecord]
Classes
class KinesisFirehoseSqsModel (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisFirehoseSqsModel(BaseModel): invocationId: str deliveryStreamArn: str region: str sourceKinesisStreamArn: Optional[str] = None records: List[KinesisFirehoseSqsRecord]Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var deliveryStreamArn : strvar invocationId : strvar records : List[KinesisFirehoseSqsRecord]var region : strvar sourceKinesisStreamArn : Optional[str]
class KinesisFirehoseSqsRecord (**data: Any)-
Create a new model by parsing and validating input data from keyword arguments.
Raises ValidationError if the input data cannot be parsed to form a valid model.
Expand source code
class KinesisFirehoseSqsRecord(BaseModel): data: SqsRecordModel recordId: str approximateArrivalTimestamp: PositiveInt kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata] = None @validator("data", pre=True, allow_reuse=True) def data_base64_decode(cls, value): # Firehose payload is encoded return json.loads(base64_decode(value))Ancestors
- pydantic.main.BaseModel
- pydantic.utils.Representation
Class variables
var approximateArrivalTimestamp : pydantic.types.PositiveIntvar data : SqsRecordModelvar kinesisRecordMetadata : Optional[KinesisFirehoseRecordMetadata]var recordId : str
Static methods
def data_base64_decode(value)-
Expand source code
@validator("data", pre=True, allow_reuse=True) def data_base64_decode(cls, value): # Firehose payload is encoded return json.loads(base64_decode(value))