A simple process to demonstrate efficient bulk loading into HBase using Spark. The method used does not rely on additional dependencies, and results in a well partitioned HBase table with very high, or complete, data locality. This method should work with any version of Spark or HBase.
Loading data into HBase using Spark can be done in a variety of ways, including:
and then with a call to saveAsNewAPIHadoopDataset(...)
on the RDD
module which originated from Cloudera Spark on HBase which is going to be improved greatly in HBase 2.0. However, bulk loading is not supported until HBase 2.0 which is still some way away.It is inefficient to bulk load by writing through the region servers since it requires HBase to use resources such as the write-ahead log (WAL), the compaction and flush queues, server memory and will trigger a lot of garbage compaction, all of which can be bypassed as explained nicely in this Cloudera Blog. In this post I show a method that avoids this by pre-splitting data and using the standard HBase bulk loading tools. It is rather easy to implement but there are a few gotchas that can trip you up along the way. In demonstrating this process, I hope others may benefit and I'll explain some checks to perform along the way to help in understanding.
The general process involves:
An HBase cluster is made up of region servers each serving partitions of one or more tables. These partitions are known as regions and represent a subset of the total rows in a table. The regions themselves are stored on disk as HFiles. If one writes through the region server API, as data grows the HFile would eventually breach a size threshold and the region would split into two (known as daughter regions) and the result would be two new HFiles each in a new directory on HDFS. The HBase bulk load tools allow you to skip all of that by generating HFiles elsewhere, move them into place on HDFS and then instruct HBase to serve them.
However, unless you size the regions correctly up front, as soon as new records come in, or when a major compaction runs, the threshold is checked and the HFile may be split immediately. If you have created a table without pre-splitting, you can easily create a table holding only 1 region with e.g. 1TB of data. As data is written, or major compaction runs, the region split is performed on a single machine and may take many hours. Note that this process will repeat on the daughter regions until they are below the threshold - it may take days.
To avoid this, we estimate the final size of the data that will be in HBase and create a pre-split table. You can read about sizing regions but a good starting point is 5-6GB. The default region size is 10GB
and so aiming for approximately have this gives the table time to grow until splitting naturally in due course. To estimate the size of data, one typical approach is to take a sample of the input data, run the process once and then extrapolate - or you can simply take a guess and then refine later. To check the size of a table called ´map_data´ in HBase:
hdfs dfs -du -s -h /hbase/data/default/map_data
242.3 G 726.9 G /hbase/data/default/map_data
Here the total size is 242.3GB
and HDFS has a replication factor of three. If we aim for 6GB per region, then we need 41 regions if they are of equal size. Data is often skewed, so I'd recommend increasing this, say, to 45.
An easy way to achieve balanced region sizes is to use key salting with a modulus based approach. You do this by taking the absolute value of the hashcode
of the key, the salt
then becomes the modulus(numRegions)
which is prefixed to the original key - e.g.:
scala> val key="species:797:1:1:1"
key: String = species:797:1:1:1
scala> val salt = Math.abs(key.hashCode % 45)
salt: Int = 4
scala> val saltedKey = "%02d".format(salt) + ":" + key // leftpad with 0s
saltedKey: String = 04:species:797:1:1:1
With this in place we can create the table:
create 'map_data',
{SPLITS => [
'01','02','03', ...etc... '44'
In this example we use
With a correctly split target table in place, we now use Spark to create the HFiles. Spark uses the notion of Resilient Distributed Datasets (RDD), whereby a dataset is partitioned across worker nodes in the cluster to be operated on in parallel. What we aim to do is partition the RDD so that each partition will match a region in HBase, and then have the worker create an HFile for that partition. Since we have pre-split our table with a modulus-based salted key approach, we can partition the RDD using the same approach. The following (Scala) illustrates how to write a simple custom partitioner to achieve this.
// salt a key using a modulus based approach
def salt(key: String, modulus: Int) : String = {
val saltAsInt = Math.abs(key.hashCode) % modulus
// left pad with 0's (for readability of keys)
val charsInSalt = digitsRequired(modulus)
("%0" + charsInSalt + "d").format(saltAsInt) + ":" + key
// number of characters required to encode the modulus in chars (01,02.. etc)
def digitsRequired(modulus: Int) : Int = {
// A partitioner that puts data destined for the same HBase region together
class SaltPrefixPartitioner[K,V](modulus: Int) extends Partitioner {
val charsInSalt = digitsRequired(modulus)
def getPartition(key: Any): Int = {
override def numPartitions: Int = modulus
Please note, that while we take a simple salting based approach here, a more generic approach would be to read the region boundaries from HBase and use those in the Spark partitioner. Such an approach would work with any table split strategy. This is the approach taken in the HBase v2.0 spark module.
Next we partition the RDD of key value pairs:
// salt the keys
val saltedRDD = sourceRDD.map(r => {
(salt(r._1, 45), r._2)
// repartition and sort the data - HFiles want sorted data
val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(new SaltPrefixPartitioner(modulus))
// cells of data for HBase
val cells = partitionedRDD.map(r => {
val saltedRowKey = Bytes.toBytes(r._1)
val cellData = r._2
// create a cell of data for HBase
val cell = new KeyValue(
Bytes.toBytes("epsg_3857"), // column familily
"tile", // column qualifier (i.e. cell name)
(new ImmutableBytesWritable(saltedRowKey), cell)
Then write the HFiles:
// setup the HBase configuration
val baseConf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.quorum", "<your ZK cluster here>");
// NOTE: job creates a copy of the conf
val job = new Job(baseConf, "map data")
val table = new HTable(conf, "map_data")
// Major gotcha(!) - see comments that follow
PatchedHFileOutputFormat2.configureIncrementalLoad(job, table);
val conf = job.getConfiguration // important(!)
// write HFiles onto HDFS
The code above is simply setting up the Hadoop configuration, the serialization formats, the target directory and instructing Spark to save the HFiles. However, there are two things that can trip you up.
, or the PatchedHFileOutputFormat2
seen here, to configure the config is advisable since it reads the HBase table metadata and will configure the compression and block encoding for us automatically. However, HFileOutputFormat
takes a job in its interface, not a configuration object and it copies the configuration internally. You must therefore get the configuration back from the job afterwards, or else you will not have compression etc. enabled. It is easy to overlook this.Once the HFiles are written, I'd recommend a couple of basic checks before loading them in:
Inspect the HDFS directory hdfs dfs -du -h /tmp/map_data
to confirm you have the correct number of files, and the expected size of HFiles.
Inspect a few HFiles using the HBase tool. We're looking to see the compression we set and for each HFile to have keys with one of the prefix salts (02
in this example) e.g.:
# print sample of keys and the metadata
hbase hfile -e -m -f /tmp/map_data/EPSG_3857/...
K: 02:species:797:1:1:0:/EPSG_3857:tile/1476880136948/Put/vlen=48/seqid=0
K: 02:species:797:1:1:1:/EPSG_3857:tile/1476880136948/Put/vlen=48/seqid=0
We now have nicely partitioned HFiles, ready to load into our table:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /tmp/map_data map_data
Here we are using the HBase command line tool allowing us to review the HFiles before loading, but this call could be added into the Spark code to automate it.
During loading, should you see any log lines saying HFile no longer fits into a single partition splitting...
it indicates that we got the sizing of the regions wrong, and will result in poor loading performance. It may also appear if the table was being written to and if a region split occurred while the data was being staged for loading. If that were to happen, the load class rearranges the staged file into the matching two daughter ranges, creating two new HFiles in the process. This rearranging happens on the machine executing the load and slows the process.
Using the HBase master user interface, the locality of the regions should all indicate 1.0
or very close to that.
Tim Robertson is located in Copenhagen, Denmark and has worked with the Hadoop ecosystem since 2008. He currently leads the informatics activities of the Global Biodiversity Information Facility.