Process Streaming Data with Amazon Kinesis and Firehose

What is Amazon Kinesis?#

Amazon Kinesis is a fully managed service for real-time processing of streaming data at massive scale. Amazon Kinesis can collect and process hundreds of terabytes of data per hour from hundreds of thousands of sources, so you can write applications that process information in real-time. With Amazon Kinesis applications, you can build real-time dashboards, capture exceptions and generate alerts, drive recommendations, and make other real-time business or operational decisions. You can also easily send data to other services such as Amazon Simple Storage Service, Amazon DynamoDB, and Amazon Redshift.

The AWS Mobile SDK for Android provides simple, high-level clients designed to help you interface with Amazon Kinesis. The Kinesis clients let you store streaming data on disk and then send them all at once. This is useful because many mobile applications that use Kinesis will create multiple data requests per second. Sending one data request for each action could adversely impact battery life. Moreover, the requests could be lost if the device goes offline. Thus, using the high-level Kinesis client for batching can preserve both battery life and data.

For information about Kinesis Region availability, see AWS Service Region Availability.

To get started using the Amazon Kinesis mobile client, you'll need to integrate the SDK for Android into your app, set the appropriate permissions, and import the necessary libraries.

What is Firehose?#

Amazon Kinesis Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3 and Amazon Redshift. With Firehose, you do not need to write any applications or manage any resources. You configure your data producers to send data to Firehose and it automatically delivers the data to the destination that you specified.

KinesisFirehoseRecorder is the high level client for Firehose. Its usage is very similar to that of KinesisRecorder.

For more information about Firehose, see Amazon Kinesis Firehose.

You can also learn more about how the Kinesis services work together on the following page: Amazon Kinesis services.

Getting Started#

Create an Identity Pool#

To use AWS services in your mobile application, you must obtain AWS Credentials using Amazon Cognito Identity as your credential provider. Using a credentials provider allows you to access AWS services without having to embed your private credentials in your application. This also allows you to set permissions to control which AWS services your users have access to.

The identities of your application's users are stored and managed by an identity pool, which is a store of user identity data specific to your account. Every identity pool has roles that specify which AWS resources your users can access. Typically, a developer will use one identity pool per application. For more information on identity pools, see the Cognito Developer Guide.

To create an identity pool for your application:

  1. Log in to the Cognito Console and click Create new identity pool.
  2. Enter a name for your Identity Pool and check the checkbox to enable access to unauthenticated identities. Click Create Pool to create your identity pool.
  3. Click Allow to create the roles associated with your identity pool.

The next page displays code that creates a credentials provider so you can easily integrate Amazon Cognito Identity in your Android application.

For more information on Cognito Identity, see Authenticate Users with Amazon Cognito Identity.

Set IAM Permissions (Amazon Kinesis)#

To use Amazon Kinesis in an application, you must set the correct permissions. The following IAM policy allows the user to submit records to a Kinesis stream identified by ARN:

{
    "Statement": [{
        "Effect": "Allow",
        "Action": "kinesis:PutRecords",
        "Resource": "arn:aws:kinesis:us-west-2:111122223333:stream/mystream"
    }]
}

This policy should be applied to roles assigned to the Cognito identity pool, but you will need to replace the Resource value with the correct ARN for your Kinesis stream. You can apply policies at the IAM console.

Set IAM Permissions (Amazon Kinesis Firehose)#

Amazon Kinesis Firehose needs slightly different permission. The following IAM policy allows the user to submit records to an Amazon Kinesis Firehose stream identified by the Amazon Resource Name (ARN):

{
    "Statement": [{
        "Effect": "Allow",
        "Action": "firehose:PutRecordBatch",
        "Resource": "arn:aws:firehose:us-west-2:111122223333:deliverystream/mystream"
    }]
}

For more information about ARN formatting and example policies, see Amazon Resource Names for Amazon Kinesis.

To learn more about Kinesis-specific policies, see Controlling Access to Amazon Kinesis Resources with IAM.

To learn more about IAM policies, see Using IAM.

Include the SDK in Your Project#

Follow the instructions on the Set Up the SDK for Android page to include the proper JAR files for this service and set the appropriate permissions.

Set Permissions in Your Android Manifest#

In your AndroidManifest.xml file, add the following permission:

<uses-permission android:name="android.permission.INTERNET" />

Add Import Statements#

Add the following imports to the main activity of your app.

import com.amazonaws.mobileconnectors.kinesis.kinesisrecorder.*;
import com.amazonaws.auth.CognitoCachingCredentialsProvider;
import com.amazonaws.regions.Regions;

Instantiate a Kinesis recorder#

Once you've imported the necessary libraries and have your credentials object, you can instantiate KinesisRecorder. KinesisRecorder is a high-level client meant for storing PutRecord requests on an Android device. Storing requests on the device lets you retain data when the device is offline, and it can also increase performance and battery efficiency since the network doesn't need to be awakened as frequently.

Note

KinesisRecorder uses synchronous calls, so you shouldn't call KinesisRecorder methods on the main thread.

When you create the KinesisRecorder client, you'll pass in a directory and an AWS region. The directory should be empty the first time you instantiate KinesisRecorder; it should be private to your application; and, to prevent collision, it should be used only by KinesisRecorder. The following snippet creates a directory and instantiates the KinesisRecorder client, passing in a Cognito credentials object (cognitoProvider), a region enum, and the directory.

String kinesisDirectory = "YOUR_UNIQUE_DIRECTORY";
KinesisRecorder recorder = new KinesisRecorder(
    myActivity.getDir(kinesisDirectory, 0)
    Regions.US_WEST_2,
    credentialsProvider
    );

You'll use KinesisRecorder to save records and then send them in a batch.

recorder.saveRecord("MyData".getBytes(),"MyStreamName");
recorder.submitAllRecords();

Note

For the saveRecord() request above to work, you would have to have created a stream named MyStreamName. You can create new streams in the Amazon Kinesis console.

If submitAllRecords() is called while the app is online, requests will be sent and removed from the disk. If submitAllRecords() is called while the app is offline, requests will be kept on disk until submitAllRecords() is called while online. This applies even if you lose your internet connection midway through a submit. So if you save ten requests, call submitAllRecords(), send five, and then lose the Internet connection, you have five requests left on disk. These remaining five will be sent the next time submitAllRecords() is invoked online.

To see how much space the KinesisRecorder client is allowed to use, you can call getDiskByteLimit().

Long byteLimit = recorder.getDiskByteLimit();
// Do something with byteLimit

Alternatively, you can retrieve the same information by getting the KinesisRecorderConfig object for the recorder and calling getMaxStorageSize():

KinesisRecorderConfig kinesisRecorderConfig = recorder.getKinesisRecorderConfig();
Long maxStorageSize = kinesisRecorderConfig.getMaxStorageSize();
// Do something with maxStorageSize

Storage limits#

If you exceed the storage limit for KinesisRecorder, requests will not be saved or sent. KinesisRecorderConfig has a default maxStorageSize of 8 MiB. You can configure the maximum allowed storage via the withMaxStorageSize() method of KinesisRecorderConfig.

To check the number of bytes currently stored in the directory passed in to the KinesisRecoder constructor, call getDiskBytesUsed():

Long bytesUsed = recorder.getDiskBytesUsed();
// Do something with bytesUsed

To learn more about working with Amazon Kinesis, see Amazon Kinesis Developer Resources. To learn more about the Kinesis classes, see the API Reference for the Android SDK.

Use KinesisFirehoseRecorder#

To use KinesisFirehoseRecorder, you need to pass the object in a directory where streaming data is saved. It’s recommended to use an app private directory because the data isn’t encrypted.

// Gets a working directory for the recorder
File directory = context.getCachedDir();
// Sets Firehose region
Regions region = Regions.US_WEST_2;
// Initialize a credentials provider to access Amazon Kinesis Firehose
AWSCredentialsProvider provider = new CognitoCachingCredentialsProvider(
        context,
        "identityPoolId",
        Regions.US_EAST_1); // region of your Amazon Cognito identity pool
KinesisFirehoseRecorder firehoseRecorder = new KinesisFirehoseRecorder(
        directory, region, provider);

// Start to save data, either a String or a byte array
firehoseRecorder.saveRecord("Hello world!\n");
firehoseRecorder.saveRecord("Streaming data to Amazon S3 via Amazon Kinesis Firehose is easy.\n");

// Send previously saved data to Amazon Kinesis Firehose
// Note: submitAllRecords() makes network calls, so wrap it in an AsyncTask.
new AsyncTask<Void, Void, Void>() {
    @Override
    protected Void doInBackground(Void... v) {
        try {
            firehoseRecorder.submitAllRecords();
        } catch (AmazonClientException ace) {
            // handle error
        }
    }
}.execute();

To learn more about working with Amazon Kinesis Firehose, see Amazon Kinesis Firehose.

To learn more about the Kinesis Firehose classes, see the API Reference for the Android SDK.