前言

本文隶属于专栏《大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

本专栏目录结构和参考文献请见大数据技术体系


WHAT

在实际生产环境中,有这样一种场景:

用户数据位于 HDFS 中,业务需要定期将这部分海量数据导入 HBase 系统,以执行随机查询更新操作。

这种场景如果调用写入 API 进行处理,极有可能会给 RegionServer 带来较大的写人压力:

  • 引起 RegionServer 频繁 flush,进而不断 compact、split,影响集群稳定性。
  • 引起 RegionServer 频繁GC,影响集群稳定性。
  • 消耗大量 CPU 资源、带宽资源、内存资源以及 IO 资源,与其他业务产生资源竞争。
  • 在某些场景下,比如平均 KV 大小比较大的场景,会耗尽 RegionServer 的处理线程, 导致集群阻塞。

鉴于存在上述问题,HBase提供了另一种将数据写入HBase集群的方法一BulkLoad

BulkLoad 首先使用 MapReduce 将待写入集群数据转换为 HFile 文件,再直接将这些 HFile 文件加载到在线集群中。

显然,BulkLoad 方案没有将写请求发送给 RegionServer 处理,可以有效避免上述一系列问题。


BulkLoad 核心流程

从 HBase 的视角来看,BulkLoad 主要由两个阶段组成:

HFile 生成阶段

这个阶段会运行一个 MapReduce 任务,MapReduce 的 mapper 需要自己实现,将 HDFS 文件中的数据读出来组装成一个复合 KV,其中 Key 是 rowkey, Value 可以是 KeyValue 对象、Put 对象甚至 Delete 对象;

MapReduce 的 reducer 由 HBase 负责,通过方法 HFileOutputFormat2.configureIncrementalLoad() 进行配置,这个方法主要负责以下事项。

  • 根据表信息配置一个全局有序的 partitioner。
  • 将 partitioner 文件上传到 HDFS 集群并写入 DistributedCache.
  • 设置 reduce task 的个数为目标表 Region 的个数。
  • 设置输出 key/value 类满足 HFileOutputFormat 所规定的格式要求。
  • 根据类型设置 reducer 执行相应的排序(Key ValueSortReducer或者PutSortReducer)。
    这个阶段会为每个Region生成一个对应的HFile文件。

HFile 导入阶段

HFile 准备就绪之后,就可以使用工具 completebulkload 将HFile 加载到在线HBase集群。

completebulkload 工具主要负责以下工作:

  • 依次检查第一步生成的所有 HFile 文件,将每个文件映射到对应的 Region。
  • 将 HFile 文件移动到对应 Region 在的 HDFS 文件目录下。
  • 告知 Region 对应的 RegionServer,加载 HFile 文件对外提供服务。

如果在 BulkLoad 的中间过程中 Region 发生了分裂,completebulkload工具会自动将对应的 HFile 文件按照新生成的 Region 边界切分成多个 HFile 文件,保证每个 HFile 都能与目标表当前的 Region 相对应。

但这个过程需要读取 HFile 内容,因而并不高效。

需要尽量减少 HFile 生成阶段和 HFile 导入阶段的延迟,最好能够在 HFile 生成之后立刻执行 HFile 导入。

基于 BulkLoad 两阶段的工作原理,BulkLoad的核心流程如图所示。

在这里插入图片描述


实践

需求

通过 BulkLoad 的方式批量加载数据到 HBase 表中,把 HDFS 上面的这个路径 /input/user.txt 的数据文件,转换成 HFile 格式,然后 load 到 user 这张表里面中。


1. 开发生成 HFile 文件的代码

package com.shockang.study.bigdata.hbase.bulkload;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class HBaseLoad {

	public static class LoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
		@Override
		protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
			String[] split = value.toString().split("");
			Put put = new Put(Bytes.toBytes(split[0]));
			put.addColumn("f1".getBytes(), "name".getBytes(), split[1].getBytes());
			put.addColumn("f1".getBytes(), "age".getBytes(), split[2].getBytes());
			context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])), put);
		}
	}

	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		final String INPUT_PATH = "hdfs://node1:9000/input";
		final String OUTPUT_PATH = "hdfs://node1:9000/output_HFile";
		Configuration conf = HBaseConfiguration.create();
		Connection connection = ConnectionFactory.createConnection(conf);
		Table table = connection.getTable(TableName.valueOf("t4"));
		Job job = Job.getInstance(conf);
		job.setJarByClass(HBaseLoad.class);
		job.setMapperClass(LoadMapper.class);
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		// 指定输出的类型 HFileOutputFormat2
		job.setOutputFormatClass(HFileOutputFormat2.class);
		HFileOutputFormat2.configureIncrementalLoad(job, table, connection.getRegionLocator(TableName.valueOf("t4")));
		FileInputFormat.addInputPath(job, new Path(INPUT_PATH));
		FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

2. 打成 jar 包提交到集群中运行

hadoop jar my-bigdata-1.0-SNAPSHOT.jar com.shockang.study.bigdata.hbase.bulkload.HBaseLoad

3、观察HDFS上输出的结果

在这里插入图片描述


4. 加载 HFile 文件到 HBase 表中

package com.shockang.study.bigdata.hbase.bulkload;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

public class LoadData {
	public static void main(String[] args) throws Exception {
		Configuration configuration = HBaseConfiguration.create();
		configuration.set("hbase.zookeeper.quorum", "node1:2181,node2:2181,node3:2181");
		//获取数据库连接
		Connection connection = ConnectionFactory.createConnection(configuration);
		//获取表的管理器对象
		Admin admin = connection.getAdmin();
		TableName tableName = TableName.valueOf("t4");
		//获取tab1e对象
		Table table = connection.getTable(tableName);
		//构建LoadIncrementalHFi1es加载HFile文件
		LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
		load.doBulkLoad(new Path("hdfs://node1:9000/output_HFile"), admin, table, connection.getRegionLocator(tableName));
	}
}

命令加载

命令格式:

hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] /user/todd/myoutput mytable 

先将 hbase 的 jar 包添加到 hadoop 的 classpath 路径下

export HBASE_HOME=/opt/bigdata/hbase 
export HADOOP_HOME=/opt/bigdata/hadoop 
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`

命令加载演示

hadoop jar /opt/bigdata/hbase/lib/hbase-server-1.2.1.jar completebulkload /output_HFile t5
上一篇 下一篇