HBase学习1:使用MapReduce操纵HBase

使用MapReduce操纵HBase分为两个部分:往HBase上写入数据和从HBase中读取数据。下面lz来分别介绍这两种情况:

1.往HBase上写入数据

(1) 使用TableOutputFormat:

HBase的map类要继承TableMapper<KEYOUT, VALUEOUT>,reduce类要继承TableReducer<KEYIN, VALUEIN, KEYOUT>。


public abstract class TableMapper<KEYOUT, VALUEOUT>
	extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {}

public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
	extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {}

HBase输入输出规范要使用TableInputFormat和TableOutputFormat。另外,org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil提供了initTableMapperJob和initTableReducerJob方法,简化了job的创建过程。

还用最简单的WordCount为例:


public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

    private IntWritable one = new IntWritable(1);
    private Text word = new Text();

    @Override
    protected void map(Object key, Text value, Context context)
            throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }

}

因为数据源还是HDFS,所以map跟以前的WordCount没什么两样。


public class IntSumReducer extends TableReducer<Text, IntWritable, Text> {

    private Text resultKey = new Text();

    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        // 添加到HBase需要创建put对象
        Put put = new Put(key.toString().getBytes());
        put.add("info".getBytes(), "count".getBytes(), Bytes.toBytes(sum));
        context.write(resultKey, put);
    }

}

reduce输出的key传个无内容的对象就可以。


public class WordCount extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.exit(1);
        }
        String inputPath = args[0];
        // hbase中的表名
        String outputTable = args[1];
        // HBase配置文件
        Configuration conf = getConf();
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.zookeeper.quorum", "centos1,centos2,centos3");
        Job job = Job.getInstance(conf, "word count");

        job.setJarByClass(WordCount.class);

        FileInputFormat.addInputPath(job, new Path(inputPath));
        job.setMapperClass(TokenizerMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 调用initTableReducerJob设置Reducer
        TableMapReduceUtil.initTableReducerJob(outputTable, IntSumReducer.class, job);

        return job.waitForCompletion(true) ? 0 : 1;
    }

}

这种方式在reduce中直接生成put对象写入HBase,在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应)。

(2)使用BulkLoad:

一般需要两个MapReduce的完成:

1.第一个Job完成业务逻辑,结果写入HDFS而不是HBase。

2.第二个Job使用第一个的结果作为输入,将其格式化为HFile格式。

3.调用BulkLoad将第二个Job生成的HFile导入到对应的HBase表中。

改写上面的WordCount:

第一个Job就是普通的WordCount(可参考http://ee-dreamer.com/?p=657#more-657),输出的格式如下:

第二个Job将上面格式的数据转化为HFile格式:


public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable, Put> {

	@Override
	protected void map(Object key, Text value, Context context)
			throws IOException, InterruptedException {
		String[] datas = value.toString().split("\t");
		if (datas.length == 2) {
			String word = datas[0];
			int count = Integer.parseInt(datas[1]);
			Put put = new Put(word.getBytes());
			put.add("info".getBytes(), "count".getBytes(), Bytes.toBytes(count));

			context.write(new ImmutableBytesWritable(word.getBytes()), put);
		}

	}

}

这个Job没有reduce阶段。在Job完成后,将生成的HFile加载到HBase中:


public class BulkLoad extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 3) {
			System.exit(1);
		}
		// 这是第一个Job的输出路径
		String inputPath = args[0];
		// HFile文件HDFS上的临时存储目录
		String tempPath = args[1];
		// HBase表名
		String outputTable = args[2];
		Configuration conf = getConf();

		Job job = Job.getInstance(conf, "bulk load");
		job.setJarByClass(BulkLoad.class);
		job.setMapperClass(BulkLoadMapper.class);
		job.setMapOutputKeyClass(ImmutableBytesWritable.class);
		job.setMapOutputValueClass(Put.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setOutputFormatClass(HFileOutputFormat2.class);
		FileInputFormat.addInputPath(job, new Path(inputPath));
		FileOutputFormat.setOutputPath(job, new Path(tempPath));

		conf.set("hbase.zookeeper.property.clientPort", "2181");
		conf.set("hbase.zookeeper.quorum", "centos1,centos2,centos3");
		HTable hTable = new HTable(conf, outputTable);
		HFileOutputFormat2.configureIncrementalLoad(job, hTable);
		job.waitForCompletion(true);

		if (job.isSuccessful()) {
			LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(conf);
			loadFfiles.doBulkLoad(new Path(tempPath), hTable);
			System.out.println("Bulk Load Completed..");
			return 0;
		} else {
			return 1;
		}

	}

}

这种方式利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。
BulLoad过程的第三步也可以在用MapReduce作业生成HBase数据文件后在命令行中进行,不一定要与MapReduce过程写在一起。

hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] outputpath tablename

但是要注意一下两点:
1.HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作,会很慢。
2.最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是:<ImmutableBytesWritable, KeyValue>或者<ImmutableBytesWritable, Put>。

2.从HBase中读取数据

使用TableInputFormat就可以了。lz把刚才写人到HBase中的结果读出并写人HDFS:


public class ReadHBaseMapper extends TableMapper<Text, Text> {

    private Text resultKey = new Text();
    private Text resultVal = new Text();

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        StringBuilder sb = new StringBuilder();
        // 通过cell获取表中的列值
        for (Cell cell : value.listCells()) {
            String val = Bytes.toString(Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(),
                    cell.getValueOffset() + cell.getValueLength()));
            sb.append(val).append("|");
        }
        // 写入HDFS
        resultKey.set(Bytes.toString(value.getRow()));
        resultVal.set(sb.deleteCharAt(sb.length()-1).toString());
        context.write(resultKey, resultVal);
    }

}

 

public class ReadHBase extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.exit(1);
        }
        String inputTable = args[0];
        String outputPath = args[1];
        Configuration conf = getConf();
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.zookeeper.quorum", "centos1,centos2,centos3");

        Job job = Job.getInstance(conf, "read HBase");
        job.setJarByClass(ReadHBase.class);
        // 读取HBase表要创建scan对象,并指定要扫描的列簇名
        Scan scan = new Scan();
        scan.addFamily("info".getBytes());
        TableMapReduceUtil.initTableMapperJob(inputTable, scan, ReadHBaseMapper.class,
                Text.class, Text.class, job);
        job.setNumReduceTasks(0);
        FileOutputFormat.setOutputPath(job, new Path(outputPath));

        return job.waitForCompletion(true) ? 0 : 1;
    }

}

1 Reply to “HBase学习1:使用MapReduce操纵HBase”

发表评论