Amazon Kinesis Data Firehose
Developer Guide

Converting Your Input Record Format in Kinesis Data Firehose

Amazon Kinesis Data Firehose can convert the format of your input data from JSON to Apache Parquet or Apache ORC before storing the data in Amazon S3. Parquet and ORC are columnar data formats that save space and enable faster queries compared to row-oriented formats like JSON. If you want to convert an input format other than JSON, such as comma-separated values (CSV) or structured text, you can use AWS Lambda to transform it to JSON first. For more information, see Amazon Kinesis Data Firehose Data Transformation.

Record Format Conversion Requirements

Kinesis Data Firehose requires the following three elements to convert the format of your record data:

  • A deserializer to read the JSON of your input data – You can choose one of two types of deserializers: Apache Hive JSON SerDe or OpenX JSON SerDe.

  • A schema to determine how to interpret that data – Use AWS Glue to create a schema in the AWS Glue Data Catalog. Kinesis Data Firehose then references that schema and uses it to interpret your input data. You can use the same schema to configure both Kinesis Data Firehose and your analytics software. For more information, see Populating the AWS Glue Data Catalog in the AWS Glue Developer Guide.

  • A serializer to convert the data to the target columnar storage format (Parquet or ORC) – You can choose one of two types of serializers: ORC SerDe or Parquet SerDe.

Important

If you enable record format conversion, you can't set your Kinesis Data Firehose destination to be Amazon Elasticsearch Service (Amazon ES), Amazon Redshift, or Splunk. With format conversion enabled, Amazon S3 is the only destination that you can use for your Kinesis Data Firehose delivery stream.

You can convert the format of your data even if you aggregate your records before sending them to Kinesis Data Firehose.

Choosing the JSON Deserializer

Choose the OpenX JSON SerDe if your input JSON contains time stamps in the following formats:

  • yyyy-MM-dd'T'HH:mm:ss[.S]'Z', where the fraction can have up to 9 digits – For example, 2017-02-07T15:13:01.39256Z.

  • yyyy-[M]M-[d]d HH:mm:ss[.S], where the fraction can have up to 9 digits – For example, 2017-02-07 15:13:01.14.

  • Epoch seconds – For example, 1518033528.

  • Epoch milliseconds – For example, 1518033528123.

  • Floating point epoch seconds – For example, 1518033528.123.

The OpenX JSON SerDe can convert periods (.) to underscores (_). It can also convert JSON keys to lowercase before deserializing them. For more information about the options that are available with this deserializer through Kinesis Data Firehose, see OpenXJsonSerDe.

If you're not sure which deserializer to choose, use the OpenX JSON SerDe, unless you have time stamps that it doesn't support.

If you have time stamps in formats other than those listed previously, use the Apache Hive JSON SerDe. When you choose this deserializer, you can specify the time stamp formats to use. To do this, follow the pattern syntax of the Joda-Time DateTimeFormat format strings. For more information, see Class DateTimeFormat.

You can also use the special value millis to parse time stamps in epoch milliseconds. If you don't specify a format, Kinesis Data Firehose uses java.sql.Timestamp::valueOf by default.

The Hive JSON SerDe doesn't allow the following:

  • Periods (.) in column names.

  • Fields whose type is uniontype.

  • Fields that have numerical types in the schema, but that are strings in the JSON. For example, if the schema is (an int), and the JSON is {"a":"123"}, the Hive SerDe gives an error.

The Hive SerDe doesn't convert nested JSON into strings. For example, if you have {"a":{"inner":1}}, it doesn't treat {"inner":1} as a string.

Choosing the Serializer

The serializer that you choose depends on your business needs. To learn more about the two serializer options, see ORC SerDe and Parquet SerDe.

Converting Input Record Format (Console)

You can enable data format conversion on the console when you create or update a Kinesis delivery stream. With data format conversion enabled, Amazon S3 is the only destination that you can configure for the delivery stream. Also, Amazon S3 compression gets disabled when you enable format conversion. However, Snappy compression happens automatically as part of the conversion process. The framing format for Snappy that Kinesis Data Firehose uses in this case is compatible with Hadoop. This means that you can use the results of the Snappy compression and run queries on this data in Athena. For the Snappy framing format that Hadoop relies on, see BlockCompressorStream.java.

To enable data format conversion for a data delivery stream

  1. Sign in to the AWS Management Console, and open the Kinesis Data Firehose console at https://console.aws.amazon.com/firehose/.

  2. Choose a Kinesis Data Firehose delivery stream to update, or create a new delivery stream by following the steps in Creating an Amazon Kinesis Data Firehose Delivery Stream.

  3. Under Convert record format, set Record format conversion to Enabled.

  4. Choose the output format that you want. For more information about the two options, see Apache Parquet and Apache ORC.

  5. Choose an AWS Glue table to specify a schema for your source records. Set the Region, database, table, and table version.

Converting Input Record Format (API)

If you want Kinesis Data Firehose to convert the format of your input data from JSON to Parquet or ORC, specify the optional DataFormatConversionConfiguration element in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate. If you specify DataFormatConversionConfiguration, the following restrictions apply:

  • In BufferingHints, you can't set SizeInMBs to a value less than 64 if you enable record format conversion. Also, when format conversion isn't enabled, the default value is 5. The value becomes 128 when you enable it.

  • You must set CompressionFormat in ExtendedS3DestinationConfiguration or in ExtendedS3DestinationUpdate to UNCOMPRESSED. The default value for CompressionFormat is UNCOMPRESSED. Therefore, you can also leave it unspecified in ExtendedS3DestinationConfiguration. The data still gets compressed as part of the serialization process, using Snappy compression by default. The framing format for Snappy that Kinesis Data Firehose uses in this case is compatible with Hadoop. This means that you can use the results of the Snappy compression and run queries on this data in Athena. For the Snappy framing format that Hadoop relies on, see BlockCompressorStream.java. When you configure the serializer, you can choose other types of compression.

Record Format Conversion Error Handling

When Kinesis Data Firehose can't parse or deserialize a record (for example, when the data doesn't match the schema), it writes it to Amazon S3 with an error prefix. If this write fails, Kinesis Data Firehose retries it forever, blocking further delivery. For each failed record, Kinesis Data Firehose writes a JSON document with the following schema:

{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }