Wednesday, October 4, 2017

Hive Optimization Techniques - Map Side Join And Bucket Join

                                   Map Side Join

This is one of the hive features to speed up the Hive queries.
Map Side Join can be performed when one of the joining tables is small enough to fit into memory.
It will save the time taken by unnecessary shuffling of the data to reducer by performing join at mappers.
It is not the default behavior of Hive engine.
It is governed by enabling following properties.
hive.auto.convert.join = true;

One above property is set true ,during joins if the table size is less than 25 MB(hive.mapjoin.smalltable.filesize), the join is converted to map-join.


                                           Bucket Map Join

Bucket Map Join can be performed when both tables are bucketed on the common column on which join is being performed.
For Example,
table1(id1,name1) cluster by (id1) into 2 buckets
table2(id2,name2) cluster by (id2) into 2 buckets
table1 join table2 on id1==id2 

All the records belong to same id fall into one bucket,you wont find records in different bucket with same id.

Bucket Map  Join is performed at the mapper side by sending buckets containing records with same id to single mapper(bucket_table1 joint bucket_table2) rather than sending complete table2 to each mapper for join.

This is not the default behavior and can be achieved by setting following properties.

select /*+ MAPJOIN(table2) table1.* from table1 t1 join table2 t2 on t1.id1=t2.id2*/
                                                          or
set hive.optimize.bucketmapjoin=true

If both tables are sorted by id on which bucketing and joining were performed then sort-merge join can be performed by setting following properties.

hive.input.format = org.apache.hadoop.hive.ql.BukcetizedHiveInputFormat;
hive.optimize.bucketmapjoin=true
hive.optimize.bucketmapjoin.sortedmerge = true






Hive Table Creation - Avro,Parquet

                         import table orders to HDFS  as Avro datafile with deflate 
                       compression
sqoop import \
--connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
--username root \
--password cloudera \
--table orders \
--target-dir practice/sqoop/orders_avro_deflate \
--as-avrodatafile \
--m 1 \
--compress \
--compression-codec org.apache.hadoop.io.compress.DeflateCodec

create external table using avro files in Hive

create external table avro_deflate
    > stored as AVRO
    > location 'hdfs://quickstart.cloudera:8020/user/cloudera/practice/sqoop/orders_avro_deflate'
    > TBLPROPERTIES ('avro.schema.url'='hdfs://quickstart.cloudera:8020/user/cloudera/practice/sqoop/orders_avro_deflate_avsc/orders.avsc');

Note: put avsc and avro files in different folders
                Give absolute path

                Hdfs://quickstart.cloudera:8020/path

                                             import table orders_items to HDFS as parquet with snappy compression

                               note : use fully qualified path and also specify column names
                                  sqoop import \
                                   --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
                                  --username root \
                                  --password cloudera \
                                  --table order_items \
                                 --target-dir practice/sqoop/oi_parquet_snappy \
                                 --as-parquetfile \
                                 --m 1 \
                                --compress \
                                --compression-codec org.apache.hadoop.io.compress.SnappyCodec

                         create external parquet table

create external table parquet_snappy(order_item_id int,order_item_order_id int,order_item_product_id int ,order_item_quantity int,order_item_subtotal float,order_item_product_price float)
ROW FORMAT SERDE 'parquet.hive.serde.ParquetHiveSerDe'
STORED AS PARQUET
location 'hdfs://quickstart.cloudera:8020/user/cloudera/practice/sqoop/oi_parquet_snappy';

Sqoop Import Export

è Import data from RDBMS to HDFS
Sqoop-import –connect “mysql:jdbc://quickstart.cloudera:3306/database_name” \
--username “root”
--password “cloudera”
--table customers
--target-dir “\user\cloudera\output”

--m 1 (or –num-mappers 1)

è Import data from RDBMS to Hive

Sqoop-import –connect “mysql:jdbc://quickstart.cloudera:3306/database_name” \
--username “root”
--password “cloudera”
            --table customers
            --hive-import
            --hive-database sqoop_import_hive_db
            --hive-overwrite

  --compress \
  --compression-codec org.apache.hadoop.io.compress.SnappyCodec \

 --null-string "NULL_REPLEACE" \
 --null-non-string "NON_STRING_NULL_REPLACE"

--incremental append \
 --check-column id \
 --last-value 6

--append \

è Export data from HDFS to RDBMS

sqoop export \
 --connect jdbc:mysql://quickstart.cloudera:3306/retail_db \
 --username retail_dba \
 --password cloudera \
 --table temp_null \
 --export-dir "/user/cloudera/sqoop_import_null" \
 --input-fields-terminated-by ',' \
 --input-lines-terminated-by '\n' \
 --input-null-string "NULL_REPLEACE" \
 --input-null-non-string "NON_STRING_NULL_REPLACE"


è Export Data from Hive to RDBMS

sqoop export \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username "retail_dba" \
  --password "cloudera" \
  --table hive_export \
  --export-dir "/user/hive/warehouse/sqoop_import.db/departments" \
  --m 1 \
  --input-null-string nvl \
  --input-null-non-string -1 \
  --input-fields-terminated-by '\001' \
  --input-lines-terminated-by '\n'

Sqoop job creation, merging data and optimizing Sqoop jobs.

è Data merging using sqoop

sqoop merge --merge-key department_id \
 --new-data "/user/cloudera/sqoop_merge" \
 --onto "/user/cloudera/sqoop_import/departments" \
 --target-dir "/user/cloudera/sqoop_import/sqoop_merge_target" \
 --class-name departments \
 --jar-file /tmp/sqoop-cloudera/compile/a9643b6f412b3e44e3a84e6f7ba0be1f/departments.jar

è Create sqoop job

sqoop job --create sqoop_job \
  -- import \
  --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
  --username=retail_dba \
  --password=cloudera \
  --table orders \
  --target-dir "/user/cloudera/sqoop_job_dir" \
  --m 1

sqoop job --list

sqoop job --show sqoop_job

sqoop job --exec sqoop_job

è Sqoop job optimization
            -Number of mappers
Increasing number of mappers will fasten transfer speed because it divides the task in parts and make import process parallel.
-Balanced load on mappers                       
You need to split on columns that is uniform (prefer integer) that will give balanced load to all the mappers and transfer is faster.
--split-by "customer_id"
-Number of connection from RDBMS
We cannot just increase the number of mappers  blindly(like 100 or more). Your rdbms should allow all those concurrent connections otherwise it will be bottle neck from RDBMS.
-use –direct mode
If direct mode is supported by RDBMS then you should import data using –direct mode as it would not launch mappers task
1.      --direct is only supported in mysql and postgresql.
2.      Sqoop’s direct mode does not support imports of BLOB, CLOB, or LONGVARBINARY columns.