Module aws_lambda_powertools.utilities.parser.envelopes
Expand source code
from .apigw import ApiGatewayEnvelope
from .apigwv2 import ApiGatewayV2Envelope
from .base import BaseEnvelope
from .cloudwatch import CloudWatchLogsEnvelope
from .dynamodb import DynamoDBStreamEnvelope
from .event_bridge import EventBridgeEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope
__all__ = [
    "ApiGatewayEnvelope",
    "ApiGatewayV2Envelope",
    "CloudWatchLogsEnvelope",
    "DynamoDBStreamEnvelope",
    "EventBridgeEnvelope",
    "KinesisDataStreamEnvelope",
    "SnsEnvelope",
    "SnsSqsEnvelope",
    "SqsEnvelope",
    "BaseEnvelope",
]Sub-modules
- aws_lambda_powertools.utilities.parser.envelopes.apigw
- aws_lambda_powertools.utilities.parser.envelopes.apigwv2
- aws_lambda_powertools.utilities.parser.envelopes.base
- aws_lambda_powertools.utilities.parser.envelopes.cloudwatch
- aws_lambda_powertools.utilities.parser.envelopes.dynamodb
- aws_lambda_powertools.utilities.parser.envelopes.event_bridge
- aws_lambda_powertools.utilities.parser.envelopes.kinesis
- aws_lambda_powertools.utilities.parser.envelopes.sns
- aws_lambda_powertools.utilities.parser.envelopes.sqs
Classes
- class ApiGatewayEnvelope
- 
API Gateway envelope to extract data within body key Expand source codeclass ApiGatewayEnvelope(BaseEnvelope): """API Gateway envelope to extract data within body key""" def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with Api Gateway model {APIGatewayProxyEventModel}") parsed_envelope: APIGatewayProxyEventModel = APIGatewayProxyEventModel.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.body, model=model)Ancestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> Optional[~Model]
- 
Parses data found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- Any
- Parsed detail payload with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with Api Gateway model {APIGatewayProxyEventModel}") parsed_envelope: APIGatewayProxyEventModel = APIGatewayProxyEventModel.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.body, model=model)
 
- class ApiGatewayV2Envelope
- 
API Gateway V2 envelope to extract data within body key Expand source codeclass ApiGatewayV2Envelope(BaseEnvelope): """API Gateway V2 envelope to extract data within body key""" def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with Api Gateway model V2 {APIGatewayProxyEventV2Model}") parsed_envelope: APIGatewayProxyEventV2Model = APIGatewayProxyEventV2Model.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.body, model=model)Ancestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> Optional[~Model]
- 
Parses data found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- Any
- Parsed detail payload with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with Api Gateway model V2 {APIGatewayProxyEventV2Model}") parsed_envelope: APIGatewayProxyEventV2Model = APIGatewayProxyEventV2Model.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.body, model=model)
 
- class BaseEnvelope
- 
ABC implementation for creating a supported Envelope Expand source codeclass BaseEnvelope(ABC): """ABC implementation for creating a supported Envelope""" @staticmethod def _parse(data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Union[Model, None]: """Parses envelope data against model provided Parameters ---------- data : Dict Data to be parsed and validated model : Type[Model] Data model to parse and validate data against Returns ------- Any Parsed data """ if data is None: logger.debug("Skipping parsing as event is None") return data logger.debug("parsing event against model") if isinstance(data, str): logger.debug("parsing event as string") return model.parse_raw(data) return model.parse_obj(data) @abstractmethod def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]): """Implementation to parse data against envelope model, then against the data model NOTE: Call `_parse` method to fully parse data with model provided. Example ------- **EventBridge envelope implementation example** def parse(...): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data) # 2. parses portion of data within the envelope against model return self._parse(data=parsed_envelope.detail, model=data_model) """ return NotImplemented # pragma: no coverAncestors- abc.ABC
 Subclasses- ApiGatewayEnvelope
- ApiGatewayV2Envelope
- CloudWatchLogsEnvelope
- DynamoDBStreamEnvelope
- EventBridgeEnvelope
- KinesisDataStreamEnvelope
- SnsEnvelope
- SnsSqsEnvelope
- SqsEnvelope
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model])
- 
Implementation to parse data against envelope model, then against the data model NOTE: Call _parsemethod to fully parse data with model provided.ExampleEventBridge envelope implementation example def parse(…): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data) # 2. parses portion of data within the envelope against model return self._parse(data=parsed_envelope.detail, model=data_model)Expand source code@abstractmethod def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]): """Implementation to parse data against envelope model, then against the data model NOTE: Call `_parse` method to fully parse data with model provided. Example ------- **EventBridge envelope implementation example** def parse(...): # 1. parses data against envelope model parsed_envelope = EventBridgeModel(**data) # 2. parses portion of data within the envelope against model return self._parse(data=parsed_envelope.detail, model=data_model) """ return NotImplemented # pragma: no cover
 
- class CloudWatchLogsEnvelope
- 
CloudWatch Envelope to extract a List of log records. The record's body parameter is a string (after being base64 decoded and gzipped), though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: The record will be parsed the same way so if model is str Expand source codeclass CloudWatchLogsEnvelope(BaseEnvelope): """CloudWatch Envelope to extract a List of log records. The record's body parameter is a string (after being base64 decoded and gzipped), though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: The record will be parsed the same way so if model is str """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}") parsed_envelope = CloudWatchLogsModel.parse_obj(data) logger.debug(f"Parsing CloudWatch records in `body` with {model}") return [ self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents ]Ancestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> List[Optional[~Model]]
- 
Parses records found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- List
- List of records parsed with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {CloudWatchLogsModel}") parsed_envelope = CloudWatchLogsModel.parse_obj(data) logger.debug(f"Parsing CloudWatch records in `body` with {model}") return [ self._parse(data=record.message, model=model) for record in parsed_envelope.awslogs.decoded_data.logEvents ]
 
- class DynamoDBStreamEnvelope
- 
DynamoDB Stream Envelope to extract data within NewImage/OldImage Note: Values are the parsed models. Images' values can also be None, and length of the list is the record's amount in the original event. Expand source codeclass DynamoDBStreamEnvelope(BaseEnvelope): """DynamoDB Stream Envelope to extract data within NewImage/OldImage Note: Values are the parsed models. Images' values can also be None, and length of the list is the record's amount in the original event. """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]: """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of dictionaries with NewImage and OldImage records parsed with model provided """ logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}") parsed_envelope = DynamoDBStreamModel.parse_obj(data) logger.debug(f"Parsing DynamoDB Stream new and old records with {model}") return [ { "NewImage": self._parse(data=record.dynamodb.NewImage, model=model), "OldImage": self._parse(data=record.dynamodb.OldImage, model=model), } for record in parsed_envelope.Records ]Ancestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> List[Dict[str, Optional[~Model]]]
- 
Parses DynamoDB Stream records found in either NewImage and OldImage with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- List
- List of dictionaries with NewImage and OldImage records parsed with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Dict[str, Optional[Model]]]: """Parses DynamoDB Stream records found in either NewImage and OldImage with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of dictionaries with NewImage and OldImage records parsed with model provided """ logger.debug(f"Parsing incoming data with DynamoDB Stream model {DynamoDBStreamModel}") parsed_envelope = DynamoDBStreamModel.parse_obj(data) logger.debug(f"Parsing DynamoDB Stream new and old records with {model}") return [ { "NewImage": self._parse(data=record.dynamodb.NewImage, model=model), "OldImage": self._parse(data=record.dynamodb.OldImage, model=model), } for record in parsed_envelope.Records ]
 
- class EventBridgeEnvelope
- 
EventBridge envelope to extract data within detail key Expand source codeclass EventBridgeEnvelope(BaseEnvelope): """EventBridge envelope to extract data within detail key""" def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}") parsed_envelope: EventBridgeModel = EventBridgeModel.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.detail, model=model)Ancestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> Optional[~Model]
- 
Parses data found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- Any
- Parsed detail payload with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> Optional[Model]: """Parses data found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- Any Parsed detail payload with model provided """ logger.debug(f"Parsing incoming data with EventBridge model {EventBridgeModel}") parsed_envelope: EventBridgeModel = EventBridgeModel.parse_obj(data) logger.debug(f"Parsing event payload in `detail` with {model}") return self._parse(data=parsed_envelope.detail, model=model)
 
- class KinesisDataStreamEnvelope
- 
Kinesis Data Stream Envelope to extract array of Records The record's data parameter is a base64 encoded string which is parsed into a bytes array, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) Expand source codeclass KinesisDataStreamEnvelope(BaseEnvelope): """Kinesis Data Stream Envelope to extract array of Records The record's data parameter is a base64 encoded string which is parsed into a bytes array, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}") parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data) logger.debug(f"Parsing Kinesis records in `body` with {model}") models = [] for record in parsed_envelope.Records: # We allow either AWS expected contract (bytes) or a custom Model, see #943 data = cast(bytes, record.kinesis.data) models.append(self._parse(data=data.decode("utf-8"), model=model)) return modelsAncestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> List[Optional[~Model]]
- 
Parses records found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- List
- List of records parsed with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with Kinesis model {KinesisDataStreamModel}") parsed_envelope: KinesisDataStreamModel = KinesisDataStreamModel.parse_obj(data) logger.debug(f"Parsing Kinesis records in `body` with {model}") models = [] for record in parsed_envelope.Records: # We allow either AWS expected contract (bytes) or a custom Model, see #943 data = cast(bytes, record.kinesis.data) models.append(self._parse(data=data.decode("utf-8"), model=model)) return models
 
- class SnsEnvelope
- 
SNS Envelope to extract array of Records The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) Expand source codeclass SnsEnvelope(BaseEnvelope): """SNS Envelope to extract array of Records The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {SnsModel}") parsed_envelope = SnsModel.parse_obj(data) logger.debug(f"Parsing SNS records in `body` with {model}") return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]Ancestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> List[Optional[~Model]]
- 
Parses records found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- List
- List of records parsed with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SNS model {SnsModel}") parsed_envelope = SnsModel.parse_obj(data) logger.debug(f"Parsing SNS records in `body` with {model}") return [self._parse(data=record.Sns.Message, model=model) for record in parsed_envelope.Records]
 
- class SnsSqsEnvelope
- 
SNS plus SQS Envelope to extract array of Records Published messages from SNS to SQS has a slightly different payload. Since SNS payload is marshalled into Recordkey in SQS, we have to:- Parse SQS schema with incoming data
- Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record
- Finally, parse provided model against payload extracted
 Expand source codeclass SnsSqsEnvelope(BaseEnvelope): """SNS plus SQS Envelope to extract array of Records Published messages from SNS to SQS has a slightly different payload. Since SNS payload is marshalled into `Record` key in SQS, we have to: 1. Parse SQS schema with incoming data 2. Unmarshall SNS payload and parse against SNS Notification model not SNS/SNS Record 3. Finally, parse provided model against payload extracted """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) output = [] for record in parsed_envelope.Records: # We allow either AWS expected contract (str) or a custom Model, see #943 body = cast(str, record.body) sns_notification = SnsNotificationModel.parse_raw(body) output.append(self._parse(data=sns_notification.Message, model=model)) return outputAncestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> List[Optional[~Model]]
- 
Parses records found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- List
- List of records parsed with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) output = [] for record in parsed_envelope.Records: # We allow either AWS expected contract (str) or a custom Model, see #943 body = cast(str, record.body) sns_notification = SnsNotificationModel.parse_raw(body) output.append(self._parse(data=sns_notification.Message, model=model)) return output
 
- class SqsEnvelope
- 
SQS Envelope to extract array of Records The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) Expand source codeclass SqsEnvelope(BaseEnvelope): """SQS Envelope to extract array of Records The record's body parameter is a string, though it can also be a JSON encoded string. Regardless of its type it'll be parsed into a BaseModel object. Note: Records will be parsed the same way so if model is str, all items in the list will be parsed as str and npt as JSON (and vice versa) """ def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) logger.debug(f"Parsing SQS records in `body` with {model}") return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]Ancestors- BaseEnvelope
- abc.ABC
 Methods- def parse(self, data: Union[Dict[str, Any], Any, None], model: Type[~Model]) ‑> List[Optional[~Model]]
- 
Parses records found with model provided Parameters- data:- Dict
- Lambda event to be parsed
- model:- Type[Model]
- Data model provided to parse after extracting data using envelope
 Returns- List
- List of records parsed with model provided
 Expand source codedef parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]: """Parses records found with model provided Parameters ---------- data : Dict Lambda event to be parsed model : Type[Model] Data model provided to parse after extracting data using envelope Returns ------- List List of records parsed with model provided """ logger.debug(f"Parsing incoming data with SQS model {SqsModel}") parsed_envelope = SqsModel.parse_obj(data) logger.debug(f"Parsing SQS records in `body` with {model}") return [self._parse(data=record.body, model=model) for record in parsed_envelope.Records]