使用MapReduce操纵HBase分为两个部分:往HBase上写入数据和从HBase中读取数据。下面lz来分别介绍这两种情况:
1.往HBase上写入数据
(1) 使用TableOutputFormat:
HBase的map类要继承TableMapper<KEYOUT, VALUEOUT>,reduce类要继承TableReducer<KEYIN, VALUEIN, KEYOUT>。
public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {} public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Writable> {}
HBase输入输出规范要使用TableInputFormat和TableOutputFormat。另外,org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil提供了initTableMapperJob和initTableReducerJob方法,简化了job的创建过程。
还用最简单的WordCount为例:
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
因为数据源还是HDFS,所以map跟以前的WordCount没什么两样。
public class IntSumReducer extends TableReducer<Text, IntWritable, Text> { private Text resultKey = new Text(); @Override public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 添加到HBase需要创建put对象 Put put = new Put(key.toString().getBytes()); put.add("info".getBytes(), "count".getBytes(), Bytes.toBytes(sum)); context.write(resultKey, put); } }
reduce输出的key传个无内容的对象就可以。
public class WordCount extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.exit(1); } String inputPath = args[0]; // hbase中的表名 String outputTable = args[1]; // HBase配置文件 Configuration conf = getConf(); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "centos1,centos2,centos3"); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); FileInputFormat.addInputPath(job, new Path(inputPath)); job.setMapperClass(TokenizerMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); // 调用initTableReducerJob设置Reducer TableMapReduceUtil.initTableReducerJob(outputTable, IntSumReducer.class, job); return job.waitForCompletion(true) ? 0 : 1; } }
这种方式在reduce中直接生成put对象写入HBase,在大数据量写入时效率低下(HBase会block写入,频繁进行flush,split,compact等大量IO操作),并对HBase节点的稳定性造成一定的影响(GC时间过长,响应变慢,导致节点超时退出,并引起一系列连锁反应)。
(2)使用BulkLoad:
一般需要两个MapReduce的完成:
1.第一个Job完成业务逻辑,结果写入HDFS而不是HBase。
2.第二个Job使用第一个的结果作为输入,将其格式化为HFile格式。
3.调用BulkLoad将第二个Job生成的HFile导入到对应的HBase表中。
改写上面的WordCount:
第一个Job就是普通的WordCount(可参考http://ee-dreamer.com/?p=657#more-657),输出的格式如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
A 2 Abstract 1 As 1 Because 2 But 2 China 1 Chinas 1 Culture 2 Cut 1 DINK 1 Dink 1 English 14 ... |
第二个Job将上面格式的数据转化为HFile格式:
public class BulkLoadMapper extends Mapper<Object, Text, ImmutableBytesWritable, Put> { @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] datas = value.toString().split("\t"); if (datas.length == 2) { String word = datas[0]; int count = Integer.parseInt(datas[1]); Put put = new Put(word.getBytes()); put.add("info".getBytes(), "count".getBytes(), Bytes.toBytes(count)); context.write(new ImmutableBytesWritable(word.getBytes()), put); } } }
这个Job没有reduce阶段。在Job完成后,将生成的HFile加载到HBase中:
public class BulkLoad extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 3) { System.exit(1); } // 这是第一个Job的输出路径 String inputPath = args[0]; // HFile文件HDFS上的临时存储目录 String tempPath = args[1]; // HBase表名 String outputTable = args[2]; Configuration conf = getConf(); Job job = Job.getInstance(conf, "bulk load"); job.setJarByClass(BulkLoad.class); job.setMapperClass(BulkLoadMapper.class); job.setMapOutputKeyClass(ImmutableBytesWritable.class); job.setMapOutputValueClass(Put.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(HFileOutputFormat2.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(tempPath)); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "centos1,centos2,centos3"); HTable hTable = new HTable(conf, outputTable); HFileOutputFormat2.configureIncrementalLoad(job, hTable); job.waitForCompletion(true); if (job.isSuccessful()) { LoadIncrementalHFiles loadFfiles = new LoadIncrementalHFiles(conf); loadFfiles.doBulkLoad(new Path(tempPath), hTable); System.out.println("Bulk Load Completed.."); return 0; } else { return 1; } } }
这种方式利用MapReduce作业输出HBase内部数据格式的表数据,然后将生成的StoreFiles直接导入到集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。
BulLoad过程的第三步也可以在用MapReduce作业生成HBase数据文件后在命令行中进行,不一定要与MapReduce过程写在一起。
hadoop jar hbase-server-VERSION.jar completebulkload [-c /path/to/hbase/config/hbase-site.xml] outputpath tablename
但是要注意一下两点:
1.HFile方式在所有的加载方案里面是最快的,不过有个前提——数据是第一次导入,表是空的。如果表中已经有了数据。HFile再导入到hbase的表中会触发split操作,会很慢。
2.最终输出结果,无论是map还是reduce,输出部分key和value的类型必须是:<ImmutableBytesWritable, KeyValue>或者<ImmutableBytesWritable, Put>。
2.从HBase中读取数据
使用TableInputFormat就可以了。lz把刚才写人到HBase中的结果读出并写人HDFS:
public class ReadHBaseMapper extends TableMapper<Text, Text> { private Text resultKey = new Text(); private Text resultVal = new Text(); @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); // 通过cell获取表中的列值 for (Cell cell : value.listCells()) { String val = Bytes.toString(Arrays.copyOfRange(cell.getValueArray(), cell.getValueOffset(), cell.getValueOffset() + cell.getValueLength())); sb.append(val).append("|"); } // 写入HDFS resultKey.set(Bytes.toString(value.getRow())); resultVal.set(sb.deleteCharAt(sb.length()-1).toString()); context.write(resultKey, resultVal); } }
public class ReadHBase extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.exit(1); } String inputTable = args[0]; String outputPath = args[1]; Configuration conf = getConf(); conf.set("hbase.zookeeper.property.clientPort", "2181"); conf.set("hbase.zookeeper.quorum", "centos1,centos2,centos3"); Job job = Job.getInstance(conf, "read HBase"); job.setJarByClass(ReadHBase.class); // 读取HBase表要创建scan对象,并指定要扫描的列簇名 Scan scan = new Scan(); scan.addFamily("info".getBytes()); TableMapReduceUtil.initTableMapperJob(inputTable, scan, ReadHBaseMapper.class, Text.class, Text.class, job); job.setNumReduceTasks(0); FileOutputFormat.setOutputPath(job, new Path(outputPath)); return job.waitForCompletion(true) ? 0 : 1; } }
楼下是疯子。哈哈