Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Ejemplo de análisis de voz: función de Lambda
El código de Python del siguiente ejemplo procesa las notificaciones recibidas de un conector de voz. Puede añadir el código a una función AWS Lambda. También puede utilizarla para activar su cola de Amazon SQS, un tema de Amazon SNS o Amazon Kinesis Data Streams. A continuación, puede almacenar las notificaciones en un futuro procesamiento de EventTable
. Para conocer los formatos de notificación exactos, consulte Descripción de notificaciones.
import base64 import boto3 import json import logging import time from datetime import datetime from enum import Enum log = logging.getLogger() log.setLevel(logging.INFO) dynamo = boto3.client("dynamodb") EVENT_TABLE_NAME = "EventTable" class EventType(Enum): """ This example code uses a single Lambda processor to handle either triggers from SQS, SNS, Lambda, or Kinesis. You can adapt it to fit your desired infrastructure depending on what you prefer. To distinguish where we get events from, we use an EventType enum as an example to show the different ways of parsing the notifications. """ SQS = "SQS" SNS = "SNS" LAMBDA = "LAMBDA" KINESIS = "KINESIS" class AnalyticsType(Enum): """ Define the various analytics event types that this Lambda will handle. """ SPEAKER_SEARCH = "SpeakerSearch" VOICE_TONE_ANALYSIS = "VoiceToneAnalysis" ANALYTICS_READY = "AnalyticsReady" UNKNOWN = "UNKNOWN" class DetailType(Enum): """ Define the various detail types that Voice Connector's voice analytics feature can return. """ SPEAKER_SEARCH_TYPE = "SpeakerSearchStatus" VOICE_TONE_ANALYSIS_TYPE = "VoiceToneAnalysisStatus" ANALYTICS_READY = "VoiceAnalyticsStatus" def handle(event, context): """ Example of how to handle incoming Voice Analytics notification messages from Voice Connector. """ logging.info(f"Received event of type {type(event)} with payload {event}") is_lambda = True # Handle triggers from SQS, SNS, and KDS. Use the below code if you would like # to use this Lambda as a trigger for an existing SQS queue, SNS topic or Kinesis # stream. if "Records" in event: logging.info("Handling event from SQS or SNS since Records exists") is_lambda = False for record in event.get("Records", []): _process_record(record) # If you would prefer to have your Lambda invoked directly, use the # below code to have the Voice Connector directly invoke your Lambda. # In this scenario, there are no "Records" passed. if is_lambda: logging.info(f"Handling event from Lambda") event_type = EventType.LAMBDA _process_notification_event(event_type, event) def _process_record(record): # SQS and Kinesis use eventSource. event_source = record.get("eventSource") # SNS uses EventSource. if not event_source: event_source = record.get("EventSource") # Assign the event type explicitly based on the event source value. event_type = None if event_source == "aws:sqs": event = record["body"] event_type = EventType.SQS elif event_source == "aws:sns": event = record["Sns"]["Message"] event_type = EventType.SNS elif event_source == "aws:kinesis": raw_data = record["kinesis"]["data"] raw_message = base64.b64decode(raw_data).decode('utf-8') event = json.loads(raw_message) event_type = EventType.KINESIS else: raise Exception(f"Event source {event_source} is not supported") _process_notification_event(event_type, event) def _process_notification_event( event_type: EventType, event: dict ): """ Extract the attributes from the Voice Analytics notification message and store it as a DynamoDB item to process later. """ message_id = event.get("id") analytics_type = _get_analytics_type(event.get("detail-type")) pk = None if analytics_type == AnalyticsType.ANALYTICS_READY.value or analytics_type == AnalyticsType.UNKNOWN.value: transaction_id = event.get("detail").get("transactionId") pk = f"transactionId#{transaction_id}#notificationType#{event_type.value}#analyticsType#{analytics_type}" else: task_id = event.get("detail").get("taskId") pk = f"taskId#{task_id}#notificationType#{event_type.value}#analyticsType#{analytics_type}" logging.info(f"Generated PK {pk}") _create_request_record(pk, message_id, json.dumps(event)) def _create_request_record(pk: str, sk: str, body: str): """ Record this notification message into the Dynamo db table """ try: # Use consistent ISO8601 date format. # 2019-08-01T23:09:35.369156 -> 2019-08-01T23:09:35.369Z time_now = ( datetime.utcnow().isoformat()[:-3] + "Z" ) response = dynamo.put_item( Item={ "PK": {"S": pk}, "SK": {"S": sk}, "body": {"S": body}, "createdOn": {"S": time_now}, }, TableName=EVENT_TABLE_NAME, ) logging.info(f"Added record in table {EVENT_TABLE_NAME}, response : {response}") except Exception as e: logging.error(f"Error in adding record: {e}") def _get_analytics_type(detail_type: str): """ Get analytics type based on message detail type value. """ if detail_type == DetailType.SPEAKER_SEARCH_TYPE.value: return AnalyticsType.SPEAKER_SEARCH.value elif detail_type == DetailType.VOICE_TONE_ANALYSIS_TYPE.value: return AnalyticsType.VOICE_TONE_ANALYSIS.value elif detail_type == DetailType.ANALYTICS_READY.value: return AnalyticsType.ANALYTICS_READY.value else: return AnalyticsType.UNKNOWN.value
importante
Debe recibir el consentimiento antes de llamar a las StartVoiceToneAnalysisAPI StartSpeakerSearchTasko API. Le recomendamos que mantenga los eventos en un área de espera, como Amazon DynamoDB, hasta que reciba el consentimiento.