The Apache Hive data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL.
We have used Apache Hive in our recent project and I was assigned to tune our application, I started to look into Apache Hive documentation to find the best practices recommended by Hive gurus.
In this post, I am outlining my findings, the post is more advanced and technical so probably, experience with Apache Hive is a prerequisite to get most of this post.
So without further ado, let’s jump directly into the patterns, anti patterns and best practices.
Partitions Over Small Tables:
Sometime, it has been seen that we design Hive schema to match our use case and to improve the performance of the queries, multiple tables are used for multiple category. e.g. A new table is created for employees work in a state. So there will be a table Employee_NY for NY based employee and so on so forth.
Hive provides partition concept which is much more optimal than tables described in previous section. With partition, only one table will be designed and table will have a partition on State.
Along with static partition, Hive supports dynamic partitions as well, by default dynamic partition is disabled and can be enabled. Followings are some properties to consider when partitioning is desired.
- hive.exec.dynamic.partition=true/false is used to enable or disable partitioning, default is false.
- hive.exec.dynamic.partition.mode=strict/nonstrict (default strict). If value is strict, it is made sure, table has at least one static partition
- hive.exec.max.dynamic.partitions.pernode (default value being 100) is the maximum dynamic partitions that can be created by each mapper or reducer node.
- hive.exec.max.dynamic.partitions (default value being 1000) is the total number of dynamic partitions could be created by one DML.
- hive.exec.max.created.files (default value being 100000) is the maximum total number of files created by all mappers and reducers.
The partitioning is a useful feature in Hive but over partitioned design can be detrimental for queries. As Hadoop is designed to perform better with millions of large files than billions of small files. So with over partitioning, there will more small files than small large file and this will impact the performance. Secondly, Hadoop creates a separate task to process each file, so the more small files we have the more tasks will be created which in turn will use more resources and deteriorate the performance of the queries.
An ideal partition scheme should not result in too many partitions and their directories, and the files in each directory should be large, some multiple of the file- system block size.
Multi Passes Over Same Data:
Hive provides a feasibility to scan source data once to produce multiple aggregations on the same data. For example, following query scan the employee table once but the data is processed two times.
INSERT OVERWRITE employee_NY SELECT * WHERE state=’NY’
INSERT OVERWRITE employee_NJ SELECT * WHERE state=’NJ’;
Partition For All Tables:
This is an interesting pattern where it is recommended to create partitions in intermediate (temporary) tables. ETL processes often create intermediate tables so keeping the partitions across the pipeline is recommended.
Partitioning a table is a great feature to segregate data and optimize query. But as mention over partitioning can deteriorate the performance of the query. In such situations, Hive provides another technique called “Bucketing” to decompose dataset into manageable parts.
Bucketing used column to bucket on, e.g. if bucketing is configured on employee_id column in employee table then hadoop will take hashcode of employee_id to find the correct bucket and stores the employee records in that bucket. Bucket size is configured at table creation time.
CREATE TABLE employee (employee_id INT, firstName STRING, lastName STRING) PARTITIONED BY (state STRING) CLUSTERED BY (employee_id) INTO 96 BUCKETS;
The bucketing is enable by following flag
- hive.enforce.bucketing = true/false, this is a convenient way to enable bucketing.
Adding Columns To Table:
If new columns are needed to add, those can be added and this feature allows developers to load the updated data into Hive without much trouble. If new column is added, the value for this column for existing rows will be NULL. The new column will always be added at the end of the table.
Compress The Data:
Compression reduces the size of data and hence reduce the IO overhead. Compressed data is CPU intensive which is not an issue in Hadoop ecosystem. Despite CPU overhead, compression works best in most of the scenarios and hence it should be used in almost always.
Look Under The Skin:
Sometime we need to look little closely how Hive execute queries. HiveQL is declarative language and Hive translates Hive queries into Map Reduce jobs. EXPLAIN [query] orEXPLAIN EXTENDED [query] can be used to look into the execution plan of query.
In Hive Map-side joins improve the query performance significantly and hence Map-side joins should be preferred over Reduce-side joins. Two ways to optimize joins are
- To know the largest table and put it last in the JOIN clause
- To user /* streamtable(table_name) */ clause.
As mentioned, Hive translates query into MapReduce jobs. Each job is comprised of different stages. By default these stages runs sequentially. The performance can be increased by enabling the parallel execution of these stages.
- hive.exec.parallel=true/false (default false)
Remember cluster resources will be utilized more in parallel execution.
Hive can execute its query in strict mode by enabling following property
By enabling this property, following type of queries are restricted which improves the resource usage in cluster:
- Queries on partitioned tables are not permitted unless they include a partition filter in WHERE clause, limiting their scope.
- The second type of queries which are restricted are those with ORDER BY clause but no LIMIT clause.
- Third type of query prevented Cartesian product. JOIN without ON clauses will create cartesian product. Hive doesn’t optimize WHERE clause as a JOIN operator.
Number Of Reducers:
Following properties are used to tune the number of reducers for Hive jobs.
- hive.exec.reducers.bytes.per.reducer Default value is 1GB which is optimal for most of the cases but user can tune it based on one’s needs.
- hive.exec.reducers.max it represents maximum number of reducers a Hive job can have. This property is useful where one job sometime exhaust all the resources and other jobs wait for available resources. This property make sure, such situation doesn’t happen.
Single MapReduce For Multiple Group By:
- hive.multigroupby.singlemr=(true/false) Whether to optimize multi group by query to generate single M/R job plan. If the multi group by query has common group by keys, it will be optimized to generate single M/R job.
Hive provides two virtual columns: one for the input filename for split and the other for the block offset in the file. These are helpful when diagnosing queries where Hive is producing unexpected or null results. By projecting these “columns,” you can see which file and row is causing problems.
These columns are INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE.