Running Hive jobs - Amazon EMR

Amazon EMR Serverless is in preview release and is subject to change. To use EMR Serverless in preview, follow the sign up steps at https://pages.awscloud.com/EMR-Serverless-Preview.html. The only Region that EMR Serverless currently supports is us-east-1, so make sure to set all region parameters to this value. All Amazon S3 buckets used with EMR Serverless must also be created in us-east-1.

Running Hive jobs

You can run Hive jobs on an application with the type parameter set to 'HIVE'. Jobs must be compatible with the Hive version referenced in the Amazon EMR release version. For example, when running jobs on an application with Amazon EMR release 5.34.0-preview, your job must be compatible with Apache Hive 2.3.8.

To run a Hive job, you must specify the following parameters when using the start-job-run API.

Execution role (executionRoleArn)

This is an IAM role ARN that your application uses to execute Hive jobs. This role must contain the following permissions:

  • Read from S3 buckets or other data sources where your data resides

  • Read from S3 buckets or prefixes where your Hive query file and init query file are located

  • Read and write to S3 buckets where your Hive Scratch directory and Hive Metastore warehouse directory are located

  • Write to S3 buckets where you intend to write your final output

  • Write logs to an S3 bucket or prefix specified by S3MonitoringConfiguration

  • Access to KMS keys if you use KMS keys to encrypt data in your S3 bucket

  • Access to AWS Glue Catalog

Failure to provide these permissions to the IAM role can lead to job failures. If your Hive job is reading or writing data to or from other data sources, make sure that the appropriate permissions are specified in this IAM role. For more information, see Using job execution roles with EMR Serverless.

Job driver (jobDriver)

A job's driver is used to provide input to the job. This parameter accepts only one value for the job type that you want to run. A Hive query is passed to the job-driver parameter by specifying Hive as the job type. This job type has the following parameters:

  • query ‐ This is the reference in Amazon S3 to the Hive query file that you want to run.

  • parameters ‐ These are the additional Hive configuration properties you want to override. You can override properties by passing them to this parameter as --hiveconf <property=value> and variables passing by them as --hivevar <key=value>.

  • initQueryFile ‐ This is the init Hive query file.

Configuration overrides (configurationOverrides)

This parameter is used for overriding application and monitoring level configuration properties. This parameters accepts a JSON objects having the following two fields:

  • applicationConfiguration ‐ This field allows you to override the default configurations for applications by supplying a configuration object. You can use a shorthand syntax to provide the configuration, or you can reference the configuration object in a JSON file. Configuration objects consist of a classification, properties, and optional nested configurations. Properties consist of the settings you want to override in that file. You can specify multiple classifications for multiple applications in a single JSON object. The configuration classifications that are available vary by specific release version for Amazon EMR. For a list of configuration classifications that are available for each release version of Amazon EMR, see Release versions.

    If you pass the same configuration in an application override and in Hive parameters, the Hive parameters take precedence. The complete configuration priority list follows, in order of highest priority to lowest priority.

    • Configuration supplied as part of Hive parameters using --hiveconf <property=value>.

    • Configuration provided as part of application overrides.

    • Optimized configurations chosen by Amazon EMR for the release.

    • Default open source configurations for the application.

  • monitoringConfiguration ‐ This field allows you to specify the Amazon S3 URL (s3MonitoringConfiguration) where you want the EMR Serverless job to store logs of your Hive job. Make sure you've created this bucket with the same AWS account hosting your application, and in the same Region where your job is running.

Hive job properties

The following table lists the mandatory properties that you must configure when submitting a Hive job.

Setting Description
hive.exec.scratchdir The Amazon S3 location where temporary files are created during the Hive job execution.
hive.metastore.warehouse.dir The Amazon S3 location of databases for managed tables in Hive.

The following table lists the optional Hive properties and their default values that you can override when submitting a Hive job.

Setting Description Value
hive.driver.memory The amount of memory to use per Hive driver process. This memory is shared equally between HiveCLI and Tez Application Master with 20% of headroom. 6G
hive.driver.cores The number of cores to use for the Hive driver process. 2
hive.driver.disk The disk size for the Hive driver. 21G
hive.tez.disk.size The disk size for each task container. 21G
hive.prewarm.enabled Enables container prewarm for Tez. FALSE
hive.prewarm.numcontainers The number of containers to prewarm for Tez. 10
hive.tez.container.size The amount of memory to use per Tez task process. 6144
hive.tez.cpu.vcores The number of cores to use for each Tez task. 1
hive.max-task-containers The maximum number of concurrent containers. The configured mapper memory is also multuplied by this value to determines available memory that is used by split computation and task preemption. 100
hive.exec.reducers.max The maximum number of reducers. 256
hive.auto.convert.join.noconditionaltask.size The size below which a join is directly converted to a mapjoin. Optimal value is calculated based on Tez task memory
tez.runtime.io.sort.mb The size of the soft buffer when output is sorted. Optimal value is calculated based on Tez task memory
tez.runtime.unordered.output.buffer.size-mb The size of the buffer to use if not writing directly to disk. Optimal value is calculated based on Tez task memory
tez.am.task.max.failed.attempts The maximum number of attempts that can fail for a particular task before the task is failed. This does not count manually terminated attempts. 3
hive.exec.stagingdir The name of the directory for storing temporary files that will be created inside table locations and in the scratch directory location specified using the hive.exec.scratchdir property. .hive-staging
hive.compute.query.using.stats Enables Hive to answer a few queries using statistics stored in the metastore. For basic statistics, set hive.stats.autogather to true. For a more advanced collection, run analyze table queries. TRUE
hive.vectorized.execution.enabled Enables vectorized mode of query execution. FALSE
hive.cbo.enable Enables cost-based optimizations using the Calcite framework. TRUE
hive.tez.auto.reducer.parallelism Enables Tez's auto-reducer parallelism feature. Hive will still estimate data sizes and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as necessary. FALSE
hive.stats.fetch.column.stats Disables fetching of column statistics from the metastore. Fetching column statistics can be expensive when the number of columns is high. FALSE
hive.vectorized.execution.reduce.enabled Enables vectorized mode of a query execution's reduce-side. TRUE
hive.exec.max.dynamic.partitions.pernode Maximum number of dynamic partitions allowed to be created in each mapper and reducer node. 100
hive.stats.fetch.partition.stats Enables fetching of partition statistics from the metastore. When this flag is disabled, Hive will make calls to the file system to get file sizes and estimate the number of rows from the row schema. Fetching partition statistics can be expensive when the number of partitions is high. TRUE
hive.exec.max.dynamic.partitions The maximum number of dynamic partitions allowed to be created in total. 1000
hive.auto.convert.join.noconditionaltask Enables optimization in converting a common join into a mapjoin based on the input file size. TRUE
hive.exec.dynamic.partition.mode In strict mode, you must specify at least one static partition in case you accidentally overwrite all partitions. In non-strict mode, all partitions are allowed to be dynamic. strict
hive.merge.tezfiles Enables the merging of small files at the end of a Tez DAG. FALSE
hive.strict.checks.cartesian.product Enables strict Cartesian join checks, which disallows a Cartesian product (a cross join). TRUE
hive.stats.autogather Enables basic statistics to be gathered automatically during the INSERT OVERWRITE command. TRUE
hive.exec.orc.split.strategy Expects one of [BI, ETL, HYBRID]. This is not a user level config. BI strategy is used when you want to spend less time in split generation as opposed to query execution (split generation does not read or cache file footers). ETL strategy is used when you want to spend more time in split generation (split generation reads and caches file footers). HYBRID chooses between the above strategies based on heuristics. HYBRID
hive.auto.convert.join Enables auto-conversion of common joins into mapjoins, based on the input file size. TRUE
hive.default.fileformat The default file format for CREATE TABLE statements. You can explicitly override this by specifying STORED AS [FORMAT] in your CREATE TABLE command. TEXTFILE
hive.exec.reducers.bytes.per.reducer The size per reducer. The default is 256 MB. If the input size is 1G, the job will use 4 reducers. 256000000
hive.exec.dynamic.partition Enables dynamic partitions in DML/DDL. TRUE
hive.merge.size.per.task The size of merged files at the end of the job. 256000000
hive.merge.mapfiles Enables small files to be merged at the end of a map-only job. TRUE
hive.fetch.task.conversion Expects one of [NONE, MINIMAL, MORE]. Some select queries can be converted to single FETCH task, minimizing latency. MORE
hive.stats.gather.num.threads The number of threads used by the partialscan and noscan analyze command for partitioned tables. This is applicable only for file formats that implement StatsProvidingRecordReader (like ORC). 10
hive.optimize.ppd Enables predicate pushdown. TRUE
hive.input.format The default input format. Set to HiveInputFormat if you encounter problems with CombineHiveInputFormat. org.apache.hadoop.hive.ql.io.CombineHiveInputFormat
hive.optimize.ppd.storage Enables predicate pushdown to storage handlers. TRUE
hive.groupby.position.alias Enables using a column position alias in GROUP BY statements. FALSE
hive.orderby.position.alias Enables using a column position alias in ORDER BY statements. TRUE
hive.mapred.reduce.tasks.speculative.execution Enables speculative execution for reducers. TRUE
hive.support.quoted.identifiers Expects one of [NONE, COLUMN]. NONE implies only alphanumeric and underscore characters are valid in identifiers. COLUMN implies column names can contain any character. COLUMN
hive.tez.min.partition.factor Puts a lower limit to the number of reducers that Tez specifies when auto-reducer parallelism is enabled. 0.25
hive.strict.checks.type.safety Enables strict type safety checks and disables comparing bigint with both string and double. TRUE
hive.root.logger The Hive log4j logging level. INFO, DRFA
tez.am.log.level The root logging level passed to the Tez app master. INFO
tez.task.log.level The root logging level passed to the Tez tasks. INFO
tez.grouping.max-size The upper bound on the size (in bytes) of a grouped split, to avoid generating excessively large splits. 1073741824
tez.grouping.min-size The lower bound on the size (in bytes) of a grouped split, to avoid generating too many small splits. 52428800
tez.shuffle-vertex-manager.min-src-fraction The fraction of source tasks which must complete before tasks for the current vertex are scheduled (in case of a ScatterGather connection). 0.25
tez.shuffle-vertex-manager.max-src-fraction The fraction of source tasks that must have completed before all tasks on the current vertex can be scheduled (in case of a ScatterGather connection). The number of tasks ready for scheduling on the current vertex scales linearly between min-fraction and max-fraction. This defaults the default value or tez.shuffle-vertex-manager.min-src-fraction, whichever is greater. 0.75
tez.am.speculation.enabled Enables speculative execution of slower tasks. This can help reduce job latency when some tasks are running slower due bad or slow machines. FALSE
tez.am.dag.cleanup.on.completion Enables the cleanup of shuffle data upon DAG completion. TRUE
tez.client.asynchronous-stop Enables pushing of ATS events before terminating the Hive driver. FALSE
tez.am.sleep.time.before.exit.millis The amount of time after which ATS events should be pushed upon AM shutdown request. 0
tez.yarn.ats.event.flush.timeout.millis The maximum amount of time for which AM should wait for events to be flushed before shutting down. 300000

Hive job examples

The following is an example of running a Hive query using the StartJobRun API.

aws emr-serverless start-job-run \ --application-id <application_id> \ --execution-role-arn <iam_role_arn> \ --job-driver '{ "hive": { "query": "s3://DOC-EXAMPLE-BUCKET/emr-serverless-hive/query/hive-query.ql", "parameters": "--hiveconf hive.root.logger=DEBUG,DRFA" } }' \ --configuration-overrides '{ "applicationConfiguration": [{ "classification": "hive-site", "properties": { "hive.exec.scratchdir": "s3://DOC-EXAMPLE-BUCKET/emr-serverless-hive/hive/scratch", "hive.metastore.warehouse.dir": "s3://DOC-EXAMPLE-BUCKET/emr-serverless-hive/hive/warehouse", "hive.driver.cores": "2", "hive.driver.memory": "4g", "hive.tez.container.size": "4096", "hive.tez.cpu.vcores": "1" } }], "monitoringConfiguration": { "s3MonitoringConfiguration": { "logUri": "s3://DOC-EXAMPLE-BUCKET/emr-serverless-hive/logs/" } } }'

While the job is running, the logs for the Hive driver and Tez tasks continuously upload to the Amazon S3 log location you configured in monitoringConfiguration.

Once the job run has a state of SUCCEEDED, the output of your Hive query will be available in the Amazon S3 location you specified in the monitoringConfiguration field of configurationOverrides. For example, if your log location is s3://DOC-EXAMPLE-BUCKET/emr-serverless-hive/hive/logs, your Hive query's outpit will be available in s3://DOC-EXAMPLE-BUCKET/emr-serverless-hive/hive/logs/applications/<application-id>/jobs/<job-run-id>/HIVE_DRIVER/stdout.gz.

The hive-query.ql file contains the query that Hive will run. The following is an example of a sample query.

create database emrserverless; use emrserverless; create table test_table(id int); insert into test_table values (1),(2),(2),(3),(3),(3); select id, count(id) from test_table group by id order by id desc;