本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
Amazon Chime SDK 的语音分析示例 Lambda 函数
以下示例中的 Python 代码处理从语音连接器收到的通知。您可以将代码添加到 AWS Lambda 函数中。您还可以使用它来触发您的 Amazon SQS 队列、Amazon SNS 主题或 Amazon Kinesis 数据流。然后,您可以存储通知到 EventTable
中以备将来处理。有关确切的通知格式,请参阅 了解 Amazon Chime SDK 的通知。
import base64 import boto3 import json import logging import time from datetime import datetime from enum import Enum log = logging.getLogger() log.setLevel(logging.INFO) dynamo = boto3.client("dynamodb") EVENT_TABLE_NAME = "EventTable" class EventType(Enum): """ This example code uses a single Lambda processor to handle either triggers from SQS, SNS, Lambda, or Kinesis. You can adapt it to fit your desired infrastructure depending on what you prefer. To distinguish where we get events from, we use an EventType enum as an example to show the different ways of parsing the notifications. """ SQS = "SQS" SNS = "SNS" LAMBDA = "LAMBDA" KINESIS = "KINESIS" class AnalyticsType(Enum): """ Define the various analytics event types that this Lambda will handle. """ SPEAKER_SEARCH = "SpeakerSearch" VOICE_TONE_ANALYSIS = "VoiceToneAnalysis" ANALYTICS_READY = "AnalyticsReady" UNKNOWN = "UNKNOWN" class DetailType(Enum): """ Define the various detail types that Voice Connector's voice analytics feature can return. """ SPEAKER_SEARCH_TYPE = "SpeakerSearchStatus" VOICE_TONE_ANALYSIS_TYPE = "VoiceToneAnalysisStatus" ANALYTICS_READY = "VoiceAnalyticsStatus" def handle(event, context): """ Example of how to handle incoming Voice Analytics notification messages from Voice Connector. """ logging.info(f"Received event of type {type(event)} with payload {event}") is_lambda = True # Handle triggers from SQS, SNS, and KDS. Use the below code if you would like # to use this Lambda as a trigger for an existing SQS queue, SNS topic or Kinesis # stream. if "Records" in event: logging.info("Handling event from SQS or SNS since Records exists") is_lambda = False for record in event.get("Records", []): _process_record(record) # If you would prefer to have your Lambda invoked directly, use the # below code to have the Voice Connector directly invoke your Lambda. # In this scenario, there are no "Records" passed. if is_lambda: logging.info(f"Handling event from Lambda") event_type = EventType.LAMBDA _process_notification_event(event_type, event) def _process_record(record): # SQS and Kinesis use eventSource. event_source = record.get("eventSource") # SNS uses EventSource. if not event_source: event_source = record.get("EventSource") # Assign the event type explicitly based on the event source value. event_type = None if event_source == "aws:sqs": event = record["body"] event_type = EventType.SQS elif event_source == "aws:sns": event = record["Sns"]["Message"] event_type = EventType.SNS elif event_source == "aws:kinesis": raw_data = record["kinesis"]["data"] raw_message = base64.b64decode(raw_data).decode('utf-8') event = json.loads(raw_message) event_type = EventType.KINESIS else: raise Exception(f"Event source {event_source} is not supported") _process_notification_event(event_type, event) def _process_notification_event( event_type: EventType, event: dict ): """ Extract the attributes from the Voice Analytics notification message and store it as a DynamoDB item to process later. """ message_id = event.get("id") analytics_type = _get_analytics_type(event.get("detail-type")) pk = None if analytics_type == AnalyticsType.ANALYTICS_READY.value or analytics_type == AnalyticsType.UNKNOWN.value: transaction_id = event.get("detail").get("transactionId") pk = f"transactionId#{transaction_id}#notificationType#{event_type.value}#analyticsType#{analytics_type}" else: task_id = event.get("detail").get("taskId") pk = f"taskId#{task_id}#notificationType#{event_type.value}#analyticsType#{analytics_type}" logging.info(f"Generated PK {pk}") _create_request_record(pk, message_id, json.dumps(event)) def _create_request_record(pk: str, sk: str, body: str): """ Record this notification message into the Dynamo db table """ try: # Use consistent ISO8601 date format. # 2019-08-01T23:09:35.369156 -> 2019-08-01T23:09:35.369Z time_now = ( datetime.utcnow().isoformat()[:-3] + "Z" ) response = dynamo.put_item( Item={ "PK": {"S": pk}, "SK": {"S": sk}, "body": {"S": body}, "createdOn": {"S": time_now}, }, TableName=EVENT_TABLE_NAME, ) logging.info(f"Added record in table {EVENT_TABLE_NAME}, response : {response}") except Exception as e: logging.error(f"Error in adding record: {e}") def _get_analytics_type(detail_type: str): """ Get analytics type based on message detail type value. """ if detail_type == DetailType.SPEAKER_SEARCH_TYPE.value: return AnalyticsType.SPEAKER_SEARCH.value elif detail_type == DetailType.VOICE_TONE_ANALYSIS_TYPE.value: return AnalyticsType.VOICE_TONE_ANALYSIS.value elif detail_type == DetailType.ANALYTICS_READY.value: return AnalyticsType.ANALYTICS_READY.value else: return AnalyticsType.UNKNOWN.value
重要
您必须先征得同意,然后才能致电 StartSpeakerSearchTask 或 StartVoiceToneAnalysis APIs。 我们建议您在获得同意之前,将活动保留在等候区域(例如 Amazon DynamoDB)。