Hive Tutorial 34 : Update Feature example
The update feature is complex as it involves modifying of existing data instead of simple append. Like insert operation, the first step in update is to load the data into an un-partitioned staging table from local data source.
a. Insert into Un-Partitioned table:
Inserting into un-partitioned table is pretty straightforward. The following is a code to insert into un-partitioned table in which you take the input data file path and load it into the un-partitioned table. The records in the file will get appended to the already existing records in the table.
insert into un-partitioned table(tableName, inputDataPath)
{
run the query : "load data inpath '" + inputDataPath + "/"
+ "' into table " + tableName;
clie nt.e xecu te(i nser tQue ry);
}
{
run the query : "load data inpath '" + inputDataPath + "/"
+ "' into table " + tableName;
clie
}
The query will look like:
load data inpath ‘/ho me/h adoo p/in putD ata/ inpu t.tx t’ into table EmployeeData
load data inpath ‘/ho
Once the query is executed, the three records present in the input.txt file get appended to the EmployeeData table.
b. Insert into Partitioned table:
Unlike un-partitioned table insertion, inserting into partitioned table involves multiple steps. The below given information is used while inserting the data in a partitioned main table:
TableFields will hold:
fieldname: empId, fieldType: int
fieldname: empName, fieldType: string
fieldname: empCity, fieldType: string
TablePartitions will hold:
partitionName: cty, partitionType: string, part itio nExp ress ion: city
fieldname: empName, fieldType: string
fieldname: empCity, fieldType: string
TablePartitions will hold:
partitionName: cty, partitionType: string, part
Below is the algorithm to insert the data in partitioned main table:
Algorithm-1: Insert into Partitioned Table
Step-1: Load the data in un-partitioned staging table(call as stagingTable1)
Input: staging table name, path of the input data file
The query will look like:
load data inpath ‘/ho me/h adoo p/in putD ata/ inpu t.tx t’ overwrite into table
stagingTable1
Step-1: Load the data in un-partitioned staging table(call as stagingTable1)
Input: staging table name, path of the input data file
The query will look like:
load data inpath ‘/ho
stagingTable1
Step-2: Insert into partitioned staging table(call as stagingTable2)
Input: stagingTable1, stagingTable2, table fields, table partitions
insert overwrite table stagingTable2 partition
For each column in "List<TableField> tableFields"
- Add field name
select
For each partition in "Lis t<Ta bleP arti tion > tablePartitions"
- Add partition name
from stagingTable1
The query looks like:
- insert overwrite table stagingTable2 partition(cty) select empId, empName, empCity from stagingTable1;
Step-3: Load data in partitioned main table(call as mainTable)
For each partitioned folder present in the partitioned staging table
- iterate to the leaf node level partition
- get the sub directory list of the given partition using hdfs api
For every leaf node partition call the load query
Input: stagingTable1, stagingTable2, table fields, table partitions
insert overwrite table stagingTable2 partition
For each column in "List<TableField> tableFields"
- Add field name
select
For each partition in "Lis
- Add partition name
from stagingTable1
The query looks like:
- insert overwrite table stagingTable2 partition(cty) select empId, empName, empCity from stagingTable1;
Step-3: Load data in partitioned main table(call as mainTable)
For each partitioned folder present in the partitioned staging table
- iterate to the leaf node level partition
- get the sub directory list of the given partition using hdfs api
For every leaf node partition call the load query
Input: stagingTable2, mainTable, table partitions, folder names list
load data inpath stagingTable2
For each leaf node folder name present in the stagingTable2
- append the folder name to the query string
into table mainTable partition
For each partition in "Lis t<Ta bleP arti tion > tablePartitions"
- Add partition name
The query looks like:
- load data inpath ‘/us er/h ive/ ware hous e/tm pTab le/c ty=O rlan do’ into table mainTable part itio n(ct y=’O rlan do’) ;
load data inpath stagingTable2
For each leaf node folder name present in the stagingTable2
- append the folder name to the query string
into table mainTable partition
For each partition in "Lis
- Add partition name
The query looks like:
- load data inpath ‘/us
Just like insert, update can happen either on un-partitioned or partitioned table.
a. Update into Un-Partitioned table:
For update, we use another staging table where the latest records are merged with existing table records using left outer join. The following is the algorithm for the update operation on un-partitioned table.
Algorithm-2: Update into un-partitioned table
Step-1: Run the merge join query
Input : mainTable, staging table name to hold merged records(call as
stagingTable3), un-partitioned staging table name(call as stagingTable2),
table primary key, table fields
Build the merge join query:
insert overwrite table stagingTable3
select each column in "List<TableField> tableFields"
- Add field name with the alias A
from mainTable with alias A
Apply the left outer join with stagingTable2 with alias B
Check for where A.primaryKey = B.primaryKey and where B.primaryKey is null
Then union with the data selected from stagingTable2
Step-1: Run the merge join query
Input : mainTable, staging table name to hold merged records(call as
stagingTable3), un-partitioned staging table name(call as stagingTable2),
table primary key, table fields
Build the merge join query:
insert overwrite table stagingTable3
select each column in "List<TableField> tableFields"
- Add field name with the alias A
from mainTable with alias A
Apply the left outer join with stagingTable2 with alias B
Check for where A.primaryKey = B.primaryKey and where B.primaryKey is null
Then union with the data selected from stagingTable2
Step-2: Load the data by overwriting from stagingTable3 to mainTable by
using the below given load query:
load data inpath stagingTable3 overwrite into mainTable
using the below given load query:
load data inpath stagingTable3 overwrite into mainTable
b. Update into Partitioned table:
Update operation in partitioned table works bit different than un-partitioned table. In order to perform update, as first step, latest data that is present in un-partitioned staging (stagingTable1) table is put into a temporary partitioned table (stagingTable2). This step uses the ‘insert into partitioned table’ logic, mentioned earlier, to insert latest data into a temporary partitioned table.
The update operation may contain data / partitions that need to be updated or may contain new data / partition that is not present in the destination table (mainTable).
The approach is to create two maps one for update and other for insert. If the partition is new, put that into insert map otherwise put it into update map. A partition can be identified as new or existing one by comparing it with existing partitions of destination table. This comparison and classification of partitions has to be done till the leaf level to accommodate multi-level partition.
A list of partitions for a table can be obtained by getting the sub directory list recursively of table location.
After classification, insert map will contain a list of partitions that are new and are not present in the existing destination table.
Insertion of new partitions is same as insertion approach which we have used earlier. Iterate insert map and for each entry which is nothing but a new partition, insert it into destination table (mainTable) using earlier insert logic.
Like un-partitioned update, here also we use a merge table (stagingTable3) that will be used as temporary table for merging latest records with existing records and use left outer join approach to achieve update operation.
The update operation may contain data / partitions that need to be updated or may contain new data / partition that is not present in the destination table (mainTable).
The approach is to create two maps one for update and other for insert. If the partition is new, put that into insert map otherwise put it into update map. A partition can be identified as new or existing one by comparing it with existing partitions of destination table. This comparison and classification of partitions has to be done till the leaf level to accommodate multi-level partition.
A list of partitions for a table can be obtained by getting the sub directory list recursively of table location.
After classification, insert map will contain a list of partitions that are new and are not present in the existing destination table.
Insertion of new partitions is same as insertion approach which we have used earlier. Iterate insert map and for each entry which is nothing but a new partition, insert it into destination table (mainTable) using earlier insert logic.
Like un-partitioned update, here also we use a merge table (stagingTable3) that will be used as temporary table for merging latest records with existing records and use left outer join approach to achieve update operation.
The following is algorithm for update operation in partitioned table:
Algorithm-3: Update into partitioned table
Step-1: Insert overwrite into stagingTable3 PARTITION
- for each partition in partitionList , append partition name = partition_name
select e.* from
- select table fields from mainTable "A" left outer join with stagingTable2 "B" on
o A.primaryKey =B.primaryKey AND
o for each partition in partitionList, append B.partition name = partition_name and for each partition in partitionList, append A.partition name = partition_name
- where B.primaryKey IS NULL and
- for each partition in partitionList, append A.partition name =
partition_name
Step-1: Insert overwrite into stagingTable3 PARTITION
- for each partition in partitionList , append partition name = partition_name
select e.* from
- select table fields from mainTable "A" left outer join with stagingTable2 "B" on
o A.primaryKey =B.primaryKey AND
o for each partition in partitionList, append B.partition name = partition_name and for each partition in partitionList, append A.partition name = partition_name
- where B.primaryKey IS NULL and
- for each partition in partitionList, append A.partition name =
partition_name
- UNION ALL
- select table fields from stagingTable2 "C" where
- for each partition in partitionList, append C.partition name = partition_name
- select table fields from stagingTable2 "C" where
- for each partition in partitionList, append C.partition name = partition_name
Once the merged table has updated data of both temp table and destination table, overwrite destination table with the merged table records.
Insert and update operations on partitioned tables can be parallelized by spawning a thread for each partition. Based on our test results, it is evident that multi-thread partition loading performs better than sequential loading.
Insert and update operations on partitioned tables can be parallelized by spawning a thread for each partition. Based on our test results, it is evident that multi-thread partition loading performs better than sequential loading.
Comments
Post a Comment