Menu
Amazon Polly
Developer Guide

Python Example: Python Server Code (server.py)

This section provides the code for the Python server described in Python Example (HTML5 Client and Python Server).

""" Example Python 2.7+/3.3+ Application This application consists of a HTTP 1.1 server using the HTTP chunked transfer coding (https://tools.ietf.org/html/rfc2616#section-3.6.1) and a minimal HTML5 user interface that interacts with it. The goal of this example is to start streaming the speech to the client (the HTML5 web UI) as soon as the first consumable chunk of speech is returned in order to start playing the audio as soon as possible. For use cases where low latency and responsiveness are strong requirements, this is the recommended approach. The service documentation contains examples for non-streaming use cases where waiting for the speech synthesis to complete and fetching the whole audio stream at once are an option. To test the application, run 'python server.py' and then open the URL displayed in the terminal in a web browser (see index.html for a list of supported browsers). The address and port for the server can be passed as parameters to server.py. For more information, run: 'python server.py -h' """ from argparse import ArgumentParser from collections import namedtuple from contextlib import closing from io import BytesIO from json import dumps as json_encode import os import sys if sys.version_info >= (3, 0): from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn from urllib.parse import parse_qs else: from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer from SocketServer import ThreadingMixIn from urlparse import parse_qs from boto3 import Session from botocore.exceptions import BotoCoreError, ClientError ResponseStatus = namedtuple("HTTPStatus", ["code", "message"]) ResponseData = namedtuple("ResponseData", ["status", "content_type", "data_stream"]) # Mapping the output format used in the client to the content type for the # response AUDIO_FORMATS = {"ogg_vorbis": "audio/ogg", "mp3": "audio/mpeg", "pcm": "audio/wave; codecs=1"} CHUNK_SIZE = 1024 HTTP_STATUS = {"OK": ResponseStatus(code=200, message="OK"), "BAD_REQUEST": ResponseStatus(code=400, message="Bad request"), "NOT_FOUND": ResponseStatus(code=404, message="Not found"), "INTERNAL_SERVER_ERROR": ResponseStatus(code=500, message="Internal server error")} PROTOCOL = "http" ROUTE_INDEX = "/index.html" ROUTE_VOICES = "/voices" ROUTE_READ = "/read" # Create a client using the credentials and region defined in the adminuser # section of the AWS credentials and configuration files session = Session(profile_name="adminuser") polly = session.client("polly") class HTTPStatusError(Exception): """Exception wrapping a value from http.server.HTTPStatus""" def __init__(self, status, description=None): """ Constructs an error instance from a tuple of (code, message, description), see http.server.HTTPStatus """ super(HTTPStatusError, self).__init__() self.code = status.code self.message = status.message self.explain = description class ThreadedHTTPServer(ThreadingMixIn, HTTPServer): """An HTTP Server that handle each request in a new thread""" daemon_threads = True class ChunkedHTTPRequestHandler(BaseHTTPRequestHandler): """"HTTP 1.1 Chunked encoding request handler""" # Use HTTP 1.1 as 1.0 doesn't support chunked encoding protocol_version = "HTTP/1.1" def query_get(self, queryData, key, default=""): """Helper for getting values from a pre-parsed query string""" return queryData.get(key, [default])[0] def do_GET(self): """Handles GET requests""" # Extract values from the query string path, _, query_string = self.path.partition('?') query = parse_qs(query_string) response = None print(u"[START]: Received GET for %s with query: %s" % (path, query)) try: # Handle the possible request paths if path == ROUTE_INDEX: response = self.route_index(path, query) elif path == ROUTE_VOICES: response = self.route_voices(path, query) elif path == ROUTE_READ: response = self.route_read(path, query) else: response = self.route_not_found(path, query) self.send_headers(response.status, response.content_type) self.stream_data(response.data_stream) except HTTPStatusError as err: # Respond with an error and log debug # information if sys.version_info >= (3, 0): self.send_error(err.code, err.message, err.explain) else: self.send_error(err.code, err.message) self.log_error(u"%s %s %s - [%d] %s", self.client_address[0], self.command, self.path, err.code, err.explain) print("[END]") def route_not_found(self, path, query): """Handles routing for unexpected paths""" raise HTTPStatusError(HTTP_STATUS["NOT_FOUND"], "Page not found") def route_index(self, path, query): """Handles routing for the application's entry point'""" try: return ResponseData(status=HTTP_STATUS["OK"], content_type="text_html", # Open a binary stream for reading the index # HTML file data_stream=open(os.path.join(sys.path[0], path[1:]), "rb")) except IOError as err: # Couldn't open the stream raise HTTPStatusError(HTTP_STATUS["INTERNAL_SERVER_ERROR"], str(err)) def route_voices(self, path, query): """Handles routing for listing available voices""" params = {} voices = [] while True: try: # Request list of available voices, if a continuation token # was returned by the previous call then use it to continue # listing response = polly.describe_voices(**params) except (BotoCoreError, ClientError) as err: # The service returned an error raise HTTPStatusError(HTTP_STATUS["INTERNAL_SERVER_ERROR"], str(err)) # Collect all the voices voices.extend(response.get("Voices", [])) # If a continuation token was returned continue, stop iterating # otherwise if "NextToken" in response: params = {"NextToken": response["NextToken"]} else: break json_data = json_encode(voices) bytes_data = bytes(json_data, "utf-8") if sys.version_info >= (3, 0) \ else bytes(json_data) return ResponseData(status=HTTP_STATUS["OK"], content_type="application/json", # Create a binary stream for the JSON data data_stream=BytesIO(bytes_data)) def route_read(self, path, query): """Handles routing for reading text (speech synthesis)""" # Get the parameters from the query string text = self.query_get(query, "text") voiceId = self.query_get(query, "voiceId") outputFormat = self.query_get(query, "outputFormat") # Validate the parameters, set error flag in case of unexpected # values if len(text) == 0 or len(voiceId) == 0 or \ outputFormat not in AUDIO_FORMATS: raise HTTPStatusError(HTTP_STATUS["BAD_REQUEST"], "Wrong parameters") else: try: # Request speech synthesis response = polly.synthesize_speech(Text=text, VoiceId=voiceId, OutputFormat=outputFormat) except (BotoCoreError, ClientError) as err: # The service returned an error raise HTTPStatusError(HTTP_STATUS["INTERNAL_SERVER_ERROR"], str(err)) return ResponseData(status=HTTP_STATUS["OK"], content_type=AUDIO_FORMATS[outputFormat], # Access the audio stream in the response data_stream=response.get("AudioStream")) def send_headers(self, status, content_type): """Send out the group of headers for a successful request""" # Send HTTP headers self.send_response(status.code, status.message) self.send_header('Content-type', content_type) self.send_header('Transfer-Encoding', 'chunked') self.send_header('Connection', 'close') self.end_headers() def stream_data(self, stream): """Consumes a stream in chunks to produce the response's output'""" print("Streaming started...") if stream: # Note: Closing the stream is important as the service throttles on # the number of parallel connections. Here we are using # contextlib.closing to ensure the close method of the stream object # will be called automatically at the end of the with statement's # scope. with closing(stream) as managed_stream: # Push out the stream's content in chunks while True: data = managed_stream.read(CHUNK_SIZE) self.wfile.write(b"%X\r\n%s\r\n" % (len(data), data)) # If there's no more data to read, stop streaming if not data: break # Ensure any buffered output has been transmitted and close the # stream self.wfile.flush() print("Streaming completed.") else: # The stream passed in is empty self.wfile.write(b"0\r\n\r\n") print("Nothing to stream.") # Define and parse the command line arguments cli = ArgumentParser(description='Example Python Application') cli.add_argument( "-p", "--port", type=int, metavar="PORT", dest="port", default=8000) cli.add_argument( "--host", type=str, metavar="HOST", dest="host", default="localhost") arguments = cli.parse_args() # If the module is invoked directly, initialize the application if __name__ == '__main__': # Create and configure the HTTP server instance server = ThreadedHTTPServer((arguments.host, arguments.port), ChunkedHTTPRequestHandler) print("Starting server, use <Ctrl-C> to stop...") print(u"Open {0}://{1}:{2}{3} in a web browser.".format(PROTOCOL, arguments.host, arguments.port, ROUTE_INDEX)) try: # Listen for requests indefinitely server.serve_forever() except KeyboardInterrupt: # A request to terminate has been received, stop the server print("\nShutting down...") server.socket.close()