Kinesis - Amazon EMR

Kinesis

Amazon EMR clusters can read and process Amazon Kinesis streams directly, using familiar tools in the Hadoop ecosystem such as Hive, Pig, MapReduce, the Hadoop Streaming API, and Cascading. You can also join real-time data from Amazon Kinesis with existing data on Amazon S3, Amazon DynamoDB, and HDFS in a running cluster. You can directly load the data from Amazon EMR to Amazon S3 or DynamoDB for post-processing activities. For information about Amazon Kinesis service highlights and pricing, see the Amazon Kinesis page.

What can I do with Amazon EMR and Amazon Kinesis integration?

Integration between Amazon EMR and Amazon Kinesis makes certain scenarios much easier; for example:

  • Streaming log analysis–You can analyze streaming web logs to generate a list of top 10 error types every few minutes by region, browser, and access domain.

  • Customer engagement–You can write queries that join clickstream data from Amazon Kinesis with advertising campaign information stored in a DynamoDB table to identify the most effective categories of ads that are displayed on particular websites.

  • Ad-hoc interactive queries–You can periodically load data from Amazon Kinesis streams into HDFS and make it available as a local Impala table for fast, interactive, analytic queries.

Checkpointed analysis of Amazon Kinesis streams

Users can run periodic, batched analysis of Amazon Kinesis streams in what are called iterations. Because Amazon Kinesis stream data records are retrieved by using a sequence number, iteration boundaries are defined by starting and ending sequence numbers that Amazon EMR stores in a DynamoDB table. For example, when iteration0 ends, it stores the ending sequence number in DynamoDB so that when the iteration1 job begins, it can retrieve subsequent data from the stream. This mapping of iterations in stream data is called checkpointing. For more information, see Kinesis connector.

If an iteration was checkpointed and the job failed processing an iteration, Amazon EMR attempts to reprocess the records in that iteration.

Checkpointing is a feature that allows you to:

  • Start data processing after a sequence number processed by a previous query that ran on same stream and logical name

  • Re-process the same batch of data from Kinesis that was processed by an earlier query

To enable checkpointing, set the kinesis.checkpoint.enabled parameter to true in your scripts. Also, configure the following parameters:

Configuration setting Description
kinesis.checkpoint.metastore.table.name DynamoDB table name where checkpoint information will be stored
kinesis.checkpoint.metastore.hash.key.name Hash key name for the DynamoDB table
kinesis.checkpoint.metastore.hash.range.name Range key name for the DynamoDB table
kinesis.checkpoint.logical.name A logical name for current processing
kinesis.checkpoint.iteration.no Iteration number for processing associated with the logical name
kinesis.rerun.iteration.without.wait Boolean value that indicates if a failed iteration can be rerun without waiting for timeout; the default is false

Provisioned IOPS recommendations for Amazon DynamoDB tables

The Amazon EMR connector for Amazon Kinesis uses the DynamoDB database as its backing for checkpointing metadata. You must create a table in DynamoDB before consuming data in an Amazon Kinesis stream with an Amazon EMR cluster in checkpointed intervals. The table must be in the same region as your Amazon EMR cluster. The following are general recommendations for the number of IOPS you should provision for your DynamoDB tables; let j be the maximum number of Hadoop jobs (with different logical name+iteration number combination) that can run concurrently and s be the maximum number of shards that any job will process:

For Read Capacity Units: j*s/5

For Write Capacity Units: j*s

Performance considerations

Amazon Kinesis shard throughput is directly proportional to the instance size of nodes in Amazon EMR clusters and record size in the stream. We recommend that you use m5.xlarge or larger instances on master and core nodes.

Schedule Amazon Kinesis analysis with Amazon EMR

When you are analyzing data on an active Amazon Kinesis stream, limited by timeouts and a maximum duration for any iteration, it is important that you run the analysis frequently to gather periodic details from the stream. There are multiple ways to execute such scripts and queries at periodic intervals; we recommend using AWS Data Pipeline for recurrent tasks like these. For more information, see AWS Data Pipeline PigActivity and AWS Data Pipeline HiveActivity in the AWS Data Pipeline Developer Guide.