AWS Code Sample

The AWS Documentation website is getting a new look!
Try it now and let us know what you think. Switch to the new look >>

You can return to the original look by selecting English in the language selector above. is a client that manages the connection to Amazon Transcribe and retries sending data when there are errors on the connection.

/** * COPYRIGHT: * <p> * Copyright 2018-2019, Inc. or its affiliates. All Rights Reserved. * <p> * Licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. * A copy of the License is located at * <p> * * <p> * or in the "license" file accompanying this file. This file is distributed * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ package com.amazonaws.transcribestreaming.retryclient; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import; import; import; import; import; import; import; import; import; import; import; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; /** * Build a client wrapper around the Amazon Transcribe client to retry * on an exception that can be retried. */ public class TranscribeStreamingRetryClient { private static final int DEFAULT_MAX_RETRIES = 10; private static final int DEFAULT_MAX_SLEEP_TIME_MILLS = 100; private static final Logger log = LoggerFactory.getLogger(TranscribeStreamingRetryClient.class); private final TranscribeStreamingAsyncClient client; List<Class<?>> nonRetriableExceptions = Arrays.asList(BadRequestException.class); private int maxRetries = DEFAULT_MAX_RETRIES; private int sleepTime = DEFAULT_MAX_SLEEP_TIME_MILLS; /** * Create a TranscribeStreamingRetryClient with given credential and configuration * * @param creds Creds to use for transcription * @param endpoint Endpoint to use for transcription * @param region Region to use for transcriptions * @throws URISyntaxException if the endpoint is not a URI */ public TranscribeStreamingRetryClient(AwsCredentialsProvider creds, String endpoint, Region region) throws URISyntaxException { this(TranscribeStreamingAsyncClient.builder() .overrideConfiguration( c -> c.putAdvancedOption( SdkAdvancedClientOption.SIGNER, EventStreamAws4Signer.create())) .credentialsProvider(creds) .endpointOverride(new URI(endpoint)) .region(region) .build()); } /** * Initiate TranscribeStreamingRetryClient with TranscribeStreamingAsyncClient * * @param client TranscribeStreamingAsyncClient */ public TranscribeStreamingRetryClient(TranscribeStreamingAsyncClient client) { this.client = client; } /** * Get Max retries * * @return Max retries */ public int getMaxRetries() { return maxRetries; } /** * Set Max retries * * @param maxRetries Max retries */ public void setMaxRetries(int maxRetries) { this.maxRetries = maxRetries; } /** * Get sleep time * * @return sleep time between retries */ public int getSleepTime() { return sleepTime; } /** * Set sleep time between retries * * @param sleepTime sleep time */ public void setSleepTime(int sleepTime) { this.sleepTime = sleepTime; } /** * Initiate a Stream Transcription with retry. * * @param request StartStreamTranscriptionRequest to use to start transcription * @param publisher The source audio stream as Publisher * @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled. * @return Completable future to handle stream response. */ public CompletableFuture<Void> startStreamTranscription(final StartStreamTranscriptionRequest request, final Publisher<AudioStream> publisher, final StreamTranscriptionBehavior responseHandler) { CompletableFuture<Void> finalFuture = new CompletableFuture<>(); recursiveStartStream(rebuildRequestWithSession(request), publisher, responseHandler, finalFuture, 0); return finalFuture; } /** * Recursively call startStreamTranscription() to be called till the request is completed or till we run out of retries. * * @param request StartStreamTranscriptionRequest * @param publisher The source audio stream as Publisher * @param responseHandler StreamTranscriptionBehavior object that defines how the response needs to be handled. * @param finalFuture final future to finish on completing the chained futures. * @param retryAttempt Current attempt number */ private void recursiveStartStream(final StartStreamTranscriptionRequest request, final Publisher<AudioStream> publisher, final StreamTranscriptionBehavior responseHandler, final CompletableFuture<Void> finalFuture, final int retryAttempt) { CompletableFuture<Void> result = client.startStreamTranscription(request, publisher, getResponseHandler(responseHandler)); result.whenComplete((r, e) -> { if (e != null) { log.debug("Error occured:", e); if (retryAttempt <= maxRetries && isExceptionRetriable(e)) { log.debug("Retriable error occurred and will be retried."); log.debug("Sleeping for sometime before retrying..."); try { Thread.sleep(sleepTime); } catch (InterruptedException e1) { log.debug("Unable to sleep. Failed with exception: ", e); e1.printStackTrace(); } log.debug("Making retry attempt: " + (retryAttempt + 1)); recursiveStartStream(request, publisher, responseHandler, finalFuture, retryAttempt + 1); } else { log.error("Encountered unretriable exception or ran out of retries. "); responseHandler.onError(e); finalFuture.completeExceptionally(e); } } else { responseHandler.onComplete(); finalFuture.complete(null); } }); } private StartStreamTranscriptionRequest rebuildRequestWithSession(StartStreamTranscriptionRequest request) { return StartStreamTranscriptionRequest.builder() .languageCode(request.languageCode()) .mediaEncoding(request.mediaEncoding()) .mediaSampleRateHertz(request.mediaSampleRateHertz()) .sessionId(UUID.randomUUID().toString()) .build(); } /** * StartStreamTranscriptionResponseHandler implements subscriber of transcript stream * Output is printed to standard output */ private StartStreamTranscriptionResponseHandler getResponseHandler( StreamTranscriptionBehavior transcriptionBehavior) { final StartStreamTranscriptionResponseHandler build = StartStreamTranscriptionResponseHandler.builder() .onResponse(r -> { transcriptionBehavior.onResponse(r); }) .onError(e -> { //Do nothing here. Don't close any streams that shouldn't be cleaned up yet. }) .onComplete(() -> { //Do nothing here. Don't close any streams that shouldn't be cleaned up yet. }) .subscriber(event -> transcriptionBehavior.onStream(event)) .build(); return build; } /** * Check if the exception can be retried. * * @param e Exception that occurred * @return True if the exception is retriable */ private boolean isExceptionRetriable(Throwable e) { e.printStackTrace(); return nonRetriableExceptions.contains(e.getClass()); } public void close() { this.client.close(); } }

Sample Details

Service: transcribe

Last tested: 2019-01-10

Author: AWS

Type: snippet

On this page: