Reading Data from HBase
Get and Scan are the two ways to read data from HBase, aside from manually parsing HFiles. A Get is simply a Scan limited by the API to one row. A Scan fetches zero or more rows of a table. By default, a Scan reads the entire table from start to end. You can limit your Scan results in several different ways, which affect the Scan's load in terms of IO, network, or both, as well as processing load on the client side.
Specify a startrow or stoprow or both. Neither startrow nor stoprow need to exist. Because HBase sorts rows lexicographically, it will return the first row after startrow would have occurred, and will stop returning rows after stoprow would have occurred.
The goal is to reduce IO and network.
● The startrow is inclusive and the stoprow is exclusive. Given a table with rows a, b, c, d, e, f, and startrow of c and stoprow of f, rows c-e are returned.
● If you omit startrow, the first row of the table is the startrow.
● If you omit the stoprow, all results after startrow (including startrow) are returned.
● If startrow is lexicographically after stoprow, and you set Scan setReversed(boolean reversed) to true, the results are returned in reverse order. Given the same table above, with rows a-f, if you specify c as the stoprow and f as the startrow, rows f, e, and d are returned.
Example & Syntex
Scan() Scan(byte startRow) Scan(byte[ ] startRow, byte[ ] stopRow)
Hadoop 2.4 introduced a new feature called hedged reads. If a read from a block is slow, the HDFS client starts up another parallel, 'hedged' read against a different block replica. The result of whichever read returns first is used, and the outstanding read is cancelled. This feature helps in situations where a read occasionally takes a long time rather than when there is a systemic problem. Hedged reads can be enabled for HBase when the HFiles are stored in HDFS.
When reading data from HBase using Get or Scan operations, you can use custom filters to return a subset of results to the client. While this does not reduce server-side IO, it does reduce network bandwidth and reduces the amount of data the client needs to process. Filters are generally used using the Java API, but can be used from HBase Shell for testing and debugging purposes. Writing Data to HBase To write data to HBase, you use methods of the HTableInterface class. You can use the Java API directly, or use the HBase Shell, the REST API, the Thrift API, , or another client which uses the Java API indirectly. When you issue a Put, the coordinates of the data are the row, the column, and the timestamp. The timestamp is unique per version of the cell, and can be generated automatically or specified programmatically by your application, and must be a long integer.
Writing Data to HBase
To write data to HBase, you use methods of the HTableInterface class. You can use the Java API directly, or use the HBase Shell, the REST API, the Thrift API, , or another client which uses the Java API indirectly. When you issue a Put, the coordinates of the data are the row, the column, and the timestamp. The timestamp is unique per version of the cell, and can be generated automatically or specified programmatically by your application, and must be a long integer.
Variations on Put
There are several different ways to write data into HBase. Some of them are listed below.
● A Put operation writes data into HBase.
● A Delete operation deletes data from HBase. What actually happens during a Delete depends upon several factors.
● A Check And Put operation performs a Scan before attempting the Put, and only does the Put if a value matches what is expected, and provides row-level atomicity.
● A CheckAndDelete operation performs a Scan before attempting the Delete, and only does the Delete if a value matches what is expected.
● An Increment operation increments values of one or more columns within a single row, and provides row-level atomicity.
When you put data into HBase, a timestamp is required. The timestamp can be generated automatically by the RegionServer or can be supplied by you. The timestamp must be unique per version of a given cell, because the timestamp identifies the version. To modify a previous version of a cell, for instance, you would issue a Put with a different value for the data itself, but the same timestamp.
When you request for HBase to delete data, either explicitly using a Delete method or implicitly using a threshold such as the maximum number of versions or the TTL, HBase does not delete the data immediately. Instead, it writes a deletion marker, called a tombstone, to the HFile, which is the physical file where a given RegionServer stores its region of a column family. The tombstone markers are processed during major compaction operations, when HFiles are rewritten without the deleted data included.
Importing Data Into HBase
The method you use for importing data into HBase depends on several factors:
● The location, size, and format of your existing data
● Whether you need to import data once or periodically over time
● Whether you want to import the data in bulk or stream it into HBase regularly
● How fresh the HBase data needs to be
If the data is already in an HBase table:
● To move the data from one HBase cluster to another, use snapshot and either the clone_snapshot or ExportSnapshot utility; or, use the CopyTable utility.
● To move the data from one HBase cluster to another without downtime on either cluster, use replication.
● To migrate data between HBase version that are not wire compatible, such as from CDH 4 to CDH 5, see Importing HBase Data From CDH 4 to CDH 5.
If the data currently exists outside HBase:
● If possible, write the data to HFile format, and use a BulkLoad to import it into HBase. The data is immediately available to HBase and you can bypass the normal write path, increasing efficiency.
● If you prefer not to use bulk loads, and you are using a tool such as Pig, you can use it to import your data.
If you need to stream live data to HBase instead of import in bulk:
● Write a Java client using the Java API, or use the Apache Thrift Proxy API to write a client in a language supported by Thrift.
● Stream data directly into HBase using the REST Proxy API in conjunction with an HTTP client such as wget or curl.
● Use Flume or Spark
CopyTable uses HBase read and write paths to copy part or all of a table to a new table in either the same cluster or a different cluster. CopyTable causes read load when reading from the source, and write load when writing to the destination. Region splits occur on the destination table in real time as needed. To avoid these issues, use snapshot and export commands instead of CopyTable. Alternatively, you can pre-split the destination table to avoid excessive splits. The destination table can be partitioned differently from the source table.
Importing HBase Data
From CDH 4 to CDH 5 CDH 4 and CDH 5 are not wire-compatible, so import methods such as CopyTable will not work. Instead, you can use separate export and import operations using distcp, or you can copy the table's HFiles using HDFS utilities and upgrade the HFiles in place. The first option is preferred unless the size of the table is too large to be practical and the export or import will take too long. The import/export mechanism gives you flexibility and allows you to run exports as often as you need, for an ongoing period of time. This would allow you to test CDH 5 with your production data before finalizing your upgrade, for instance.
As of CDH 4.7, Cloudera recommends snapshots instead of CopyTable where possible. A snapshot captures the state of a table at the time the snapshot was taken. Because no data is copied when a snapshot is taken, the process is very quick. As long as the snapshot exists, cells in the snapshot are never deleted from HBase, even if they are explicitly deleted by the API. Instead, they are archived so that the snapshot can restore the table to its state at the time of the snapshot. After taking a snapshot, use the clone_snapshot command to copy the data to a new (immediately enabled) table in the same cluster, or the Export utility to create a new table based on the snapshot, in the same cluster or a new cluster. This is a copy-on-write operation. The new table shares HFiles with the original table until writes occur in the new table but not the old table, or until a compaction or split occurs in either of the tables. This can improve performance in the short term compared to CopyTable. To export the snapshot to a new cluster, use the ExportSnapshot utility, which uses MapReduce to copy the snapshot to the new cluster.
HBase uses the well-known HFile format to store its data on disk. In many situations, writing HFiles programmatically with your data, and bulk-loading that data into HBase on the RegionServer, has advantages over other data ingest mechanisms. BulkLoad operations bypass the write path completely, providing the following benefits:
● The data is available to HBase immediately but does cause additional load or latency on the cluster when it appears.
● BulkLoad operations do not use the write-ahead log (WAL) and do not cause flushes or split storms.
● BulkLoad operations do not cause excessive garbage collection.
1. Extract your data from its existing source.
For instance, if your data is in a MySQL database, you might run the mysqldump command. The process you use depends on your data. If your data is already in TSV or CSV format, skip this step and use the included ImportTsv utility to process your data into HFiles. See the ImportTsv documentation for details.
2. Process your data into HFile format.
The job must to emit the row key as the Key, and either a KeyValue, a Put, or a Delete as the Value. The Reducer is handled by HBase; configure it using HFileOutputFormat.configureIncrementalLoad() and it does the following: o Inspects the table to configure a total order partitioner o Uploads the partitions file to the cluster and adds it to the DistributedCache o Sets the number of reduce tasks to match the current number of regions o Sets the output key/value class to match HFileOutputFormat requirements o Sets the Reducer to perform the appropriate sorting (either KeyValueSortReducer or PutSortReducer)
3. One HFile is created per region in the output folder.
Input data is almost completely re-written, so you need available disk space at least twice the size of the original data set. For example, for a 100 GB output from mysqldump, you should have at least 200 GB of available disk space in HDFS. You can delete the original input file at the end of the process.
4. Load the files into HBase.
Use the LoadIncrementalHFiles command (more commonly known as the completebulkload tool), passing it a URL that locates the files in HDFS. Each file is loaded into the relevant region on the RegionServer for the region. You can limit the number of versions that are loaded by passing the --versions= N option, where N is the maximum number of versions to include, from newest to oldest (largest timestamp to smallest timestamp).
5. If a region was split after the files were created, the tool automatically splits the HFile according to the new boundaries. This process is inefficient, so if your table is being written to by other processes, you should load as soon as the transform step is done.
Using Cluster Replication
If your data is already in an HBase cluster, replication is useful for getting the data into additional HBase clusters. In HBase, cluster replication refers to keeping one cluster state synchronized with that of another cluster, using the write-ahead log (WAL) of the source cluster to propagate the changes. Replication is enabled at column family granularity. Before enabling replication for a column family, create the table and all column families to be replicated, on the destination cluster.
Common Replication Topologies
● A central source cluster might propagate changes to multiple destination clusters, for failover or due to geographic distribution.
● A source cluster might push changes to a destination cluster, which might also push its own changes back to the original cluster.
● Many different low-latency clusters might push changes to one centralized cluster for backup or resource-intensive data-analytics jobs. The processed data might then be replicated back to the low-latency clusters.
● Multiple levels of replication can be chained together to suit your needs. The following diagram shows a hypothetical scenario. Use the arrows to follow the data paths.
Using Pig and HCatalog
Apache Pig is a platform for analyzing large data sets using a high-level language. Apache HCatalog is a sub-project of Apache Hive, which enables reading and writing of data from one Hadoop utility to another. You can use a combination of Pig and HCatalog to import data into HBase.
Using the Java API
The Java API is the most common mechanism for getting data into HBase, through Put operations. The Thrift and REST APIs, as well as the HBase Shell, use the Java API.
Using the Apache Thrift Proxy API
The Apache Thrift library provides cross-language client-server remote procedure calls (RPCs), using Thrift bindings. A Thrift binding is client code generated by the Apache Thrift Compiler for a target language (such as Python) that allows communication between the Thrift server and clients using that client code. HBase includes an Apache Thrift Proxy API, which allows you to write HBase applications in Python, C, C++, or another language that Thrift supports. The Thrift Proxy API is slower than the Java API and may have fewer features. T use the Thrift Proxy API, you need to configure and run the HBase Thrift server on your cluster.
Using the REST Proxy API
After configuring and starting the HBase REST Server on your cluster, you can use the HBase REST Proxy API to stream data into HBase, from within another application or shell script, or by using an HTTP client such as wget or curl. The REST Proxy API is slower than the Java API and may have fewer features. This approach is simple and does not require advanced development experience to implement. However, like the Java and Thrift Proxy APIs, it uses the full write path and can cause compactions and region splits.
Using Flume Apache
Flume is a fault-tolerant system designed for ingesting data into HDFS, for use with Hadoop. You can configure Flume to write data directly into HBase. Flume includes two different sinks designed to work with HBase: HBaseSink and AsyncHBaseSink. HBaseSink supports HBase IPC calls introduced in HBase 0.96, and allows you to write data to an HBase cluster that is secured by Kerberos, whereas AsyncHBaseSink does not. However, AsyncHBaseSink uses an asynchronous model and guarantees atomicity at the row level.
You can write data to HBase from Apache Spark by using def saveAsHadoopDataset(conf: JobConf): Unit.
Using a Custom MapReduce Job
Many of the methods to import data into HBase use MapReduce implicitly. If none of those approaches fit your needs, you can use MapReduce directly to convert data to a series of HFiles or API calls for import into HBase. In this way, you can import data from Avro, Parquet, or another format into HBase, or export data from HBase into another format, using API calls such as TableOutputFormat, HFileOutputFormat, and TableInputFormat.