How to access Kinesis Video Streams data - Amazon Connect

How to access Kinesis Video Streams data

You must have developer skills to work with Kinesis Video Streams data. Use the code samples in this section to interact with the customer audio data sent to Kinesis Video Streams.

LMSDemo.java

package com.amazonaws.kinesisvideo.parser.demo; import com.amazonaws.auth.AWSSessionCredentials; import com.amazonaws.auth.AWSSessionCredentialsProvider; import com.amazonaws.kinesisvideo.parser.examples.LMSExample; import com.amazonaws.regions.Regions; import java.io.FileOutputStream; import java.io.IOException; public class LMSDemo { public static void main(String args[]) throws InterruptedException, IOException { LMSExample example = new LMSExample(Regions.US_WEST_2, "<<StreamName>>", "<<FragmentNumber>>", new AWSSessionCredentialsProvider() { @Override public AWSSessionCredentials getCredentials() { return new AWSSessionCredentials() { @Override public String getSessionToken() { return "<<AWSSessionToken>>"; } @Override public String getAWSAccessKeyId() { return "<<AWSAccessKey>>"; } @Override public String getAWSSecretKey() { return "<<AWSSecretKey>>"; } }; } @Override public void refresh() { } }, new FileOutputStream("<<FileName>>.raw")); example.execute(); } }

LMSExample.java

package com.amazonaws.kinesisvideo.parser.examples; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.kinesisvideo.parser.ebml.MkvTypeInfos; import com.amazonaws.kinesisvideo.parser.mkv.MkvDataElement; import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitException; import com.amazonaws.kinesisvideo.parser.mkv.MkvElementVisitor; import com.amazonaws.kinesisvideo.parser.mkv.MkvEndMasterElement; import com.amazonaws.kinesisvideo.parser.mkv.MkvStartMasterElement; import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor; import com.amazonaws.kinesisvideo.parser.utilities.FrameVisitor; import com.amazonaws.kinesisvideo.parser.utilities.LMSFrameProcessor; import com.amazonaws.regions.Regions; import com.amazonaws.services.kinesisvideo.model.StartSelector; import com.amazonaws.services.kinesisvideo.model.StartSelectorType; import java.io.Closeable; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class LMSExample extends KinesisVideoCommon { private final ExecutorService executorService; private GetMediaProcessingArguments getMediaProcessingArguments; private final StreamOps streamOps; private final OutputStream outputStreamFromCustomer; private final OutputStream outputStreamToCustomer; private final String fragmentNumber; public LMSExample(Regions region, String streamName, String fragmentNumber, AWSCredentialsProvider credentialsProvider, OutputStream outputStreamFromCustomer, OutputStream outputStreamToCustomer) throws IOException { super(region, credentialsProvider, streamName); this.streamOps = new StreamOps(region, streamName, credentialsProvider); this.executorService = Executors.newFixedThreadPool(2); this.outputStreamFromCustomer = outputStreamFromCustomer; this.outputStreamToCustomer = outputStreamToCustomer; this.fragmentNumber = fragmentNumber; } public void execute () throws InterruptedException, IOException { getMediaProcessingArguments = GetMediaProcessingArguments.create(outputStreamFromCustomer, outputStreamToCustomer); try (GetMediaProcessingArguments getMediaProcessingArgumentsLocal = getMediaProcessingArguments) { //Start a GetMedia worker to read and process data from the Kinesis Video Stream. GetMediaWorker getMediaWorker = GetMediaWorker.create(getRegion(), getCredentialsProvider(), getStreamName(), new StartSelector().withStartSelectorType(StartSelectorType.FRAGMENT_NUMBER).withAfterFragmentNumber(fragmentNumber), streamOps.amazonKinesisVideo, getMediaProcessingArgumentsLocal.getFrameVisitor()); executorService.submit(getMediaWorker); //Wait for the workers to finish. executorService.shutdown(); executorService.awaitTermination(120, TimeUnit.SECONDS); if (!executorService.isTerminated()) { System.out.println("Shutting down executor service by force"); executorService.shutdownNow(); } else { System.out.println("Executor service is shutdown"); } } finally { outputStream.close(); } } private static class LogVisitor extends MkvElementVisitor { private final FragmentMetadataVisitor fragmentMetadataVisitor; private LogVisitor(FragmentMetadataVisitor fragmentMetadataVisitor) { this.fragmentMetadataVisitor = fragmentMetadataVisitor; } public long getFragmentCount() { return fragmentCount; } private long fragmentCount = 0; @Override public void visit(MkvStartMasterElement startMasterElement) throws MkvElementVisitException { if (MkvTypeInfos.EBML.equals(startMasterElement.getElementMetaData().getTypeInfo())) { fragmentCount++; System.out.println("Start of segment"); } } @Override public void visit(MkvEndMasterElement endMasterElement) throws MkvElementVisitException { if (MkvTypeInfos.SEGMENT.equals(endMasterElement.getElementMetaData().getTypeInfo())) { System.out.println("End of segment"); } } @Override public void visit(MkvDataElement dataElement) throws MkvElementVisitException { } } private static class GetMediaProcessingArguments implements Closeable { public FrameVisitor getFrameVisitor() { return frameVisitor; } private final FrameVisitor frameVisitor; public GetMediaProcessingArguments(FrameVisitor frameVisitor) { this.frameVisitor = frameVisitor; } public static GetMediaProcessingArguments create(OutputStream outputStreamFromCustomer, OutputStream outputStreamToCustomer) throws IOException { //Fragment metadata visitor to extract Kinesis Video fragment metadata from the GetMedia stream. FragmentMetadataVisitor fragmentMetadataVisitor = FragmentMetadataVisitor.create(); //A visitor used to log as the GetMedia stream is processed. LogVisitor logVisitor = new LogVisitor(fragmentMetadataVisitor); //A composite visitor to encapsulate the three visitors. FrameVisitor frameVisitor = FrameVisitor.create(LMSFrameProcessor.create(outputStreamFromCustomer, outputStreamToCustomer, fragmentMetadataVisitor)); return new GetMediaProcessingArguments(frameVisitor); } @Override public void close() throws IOException { } } }

LMSFrameProcessor.java

package com.amazonaws.kinesisvideo.parser.utilities; import com.amazonaws.kinesisvideo.parser.mkv.Frame; import com.amazonaws.kinesisvideo.parser.utilities.FragmentMetadataVisitor; import com.amazonaws.kinesisvideo.parser.utilities.MkvTrackMetadata; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; public class LMSFrameProcessor implements FrameVisitor.FrameProcessor { private OutputStream outputStreamFromCustomer; private OutputStream outputStreamToCustomer; private FragmentMetadataVisitor fragmentMetadataVisitor; protected LMSFrameProcessor(OutputStream outputStreamFromCustomer, OutputStream outputStreamToCustomer, FragmentMetadataVisitor fragmentMetadataVisitor) { this.outputStreamFromCustomer = outputStreamFromCustomer; this.outputStreamToCustomer = outputStreamToCustomer; } public static LMSFrameProcessor create(OutputStream outputStreamFromCustomer, OutputStream outputStreamToCustomer, FragmentMetadataVisitor fragmentMetadataVisitor) { return new LMSFrameProcessor(outputStreamFromCustomer, outputStreamToCustomer, fragmentMetadataVisitor); } @Override public void process(Frame frame, MkvTrackMetadata trackMetadata) { saveToOutPutStream(frame); } private void saveToOutPutStream(final Frame frame) { ByteBuffer frameBuffer = frame.getFrameData(); long trackNumber = frame.getTrackNumber(); MkvTrackMetadata metadata = fragmentMetadataVisitor.getMkvTrackMetadata(trackNumber); String trackName = metadata.getTrackName(); try { byte[] frameBytes = new byte[frameBuffer.remaining()]; frameBuffer.get(frameBytes); if (Strings.isNullOrEmpty(trackName) || "AUDIO_FROM_CUSTOMER".equals(trackName)) { outputStreamFromCustomer.write(frameBytes); } else if ("AUDIO_TO_CUSTOMER".equals(trackName)) { outputStreamToCustomer.write(frameBytes); } else { // Unknown track name. Not writing to output stream. } } catch (IOException e) { e.printStackTrace(); } } }