使用过Hive的人都知道,Hive提供了类似于RDBMS中的join操作。因为Hive是基于MapReduce实现的,所以使用MapReduce也同样可以实现join操作。下面lz就来探索下MapReduce中的join操作。
1.在map阶段进行join
这就相当于Hive中的map-side join(broadcast join)。join操作在map task中完成,无需启动reduce task。
适用场景:一张小表(可放入内存)、一张大表的连接操作。
思想:小表复制到各个节点上,并加载到内存中;大表分片,与小表完成连接操作。
实现:在提交Job的时候先将小表分发到各节点,各节点在map task开始前将小表加载到内存中。然后扫描大表,找到与小表可连接的记录,直接输出。
map方法:
public class MapSideJoinMapper extends Mapper<Object, Text, Text, Text> { // 缓存小表 private Map<String, String> cache = new HashMap<String, String>(); private Text resultKey = new Text(); private Text resultVal = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { // 获取分发到的文件 URI[] uris = context.getCacheFiles(); if (uris != null) { BufferedReader br = null; for (URI uri : uris) { String filename = uri.toString().substring(uri.toString().lastIndexOf("/")+1); br = new BufferedReader(new FileReader(filename)); String line = null; // 将文件存入内存中 while ((line=br.readLine()) != null) { String[] data = line.split("\\|"); if (data.length == 3) { String joinKey = data[1]; String others = data[0] + "#" + data[2]; cache.put(joinKey, others); } } } } } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { //排掉空行 if (value == null || value.toString().equals("")) { return; } String[] data = value.toString().split("\\|"); if (data.length == 3) { String joinKey = data[0]; // 大表与小表对比 if (cache.containsKey(joinKey)) { String others = data[1] + "#" + data[2] + "#" + cache.get(joinKey); resultKey.set(joinKey); resultVal.set(others); context.write(resultKey, resultVal); } } } }
主方法:
public class MapSideJoinJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf, "MapSideJoin"); // 将文件分发到各个节点 job.addCacheFile(new URI(args[1])); // 设置reduce任务数为0 job.setNumReduceTasks(0); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setJarByClass(MapSideJoinJob.class); job.setMapperClass(MapSideJoinMapper.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); return job.waitForCompletion(true) ? 0 : 1; } }
2.在reduce阶段进行join
这就相当于Hive中的reduce-side join(shuffle join)。join操作在reduce task中完成。
适用场景:两张大表的连接操作。
思想:map阶段按照连接字段进行hash,reduce阶段完成join操作。
实现:map阶段,为来自不同表的记录打标签,然后用连接字段作key,其余字段和标志位作为value输出;reduce阶段,已连接字段作为key的分组已经完成,只需要将每一个分组当中来源于不同表的记录分开,进行笛卡尔积。
要自定义map的输出value类
public class CombineBean implements WritableComparable<CombineBean> { // 标志位,标明文件来源 private Text flag; // 连接字段 private Text joinKey; // 其余字段 private Text others; public CombineBean() { this.flag = new Text(); this.joinKey = new Text(); this.others = new Text(); } public Text getFlag() { return flag; } public void setFlag(Text flag) { this.flag = flag; } public Text getJoinKey() { return joinKey; } public void setJoinKey(Text joinKey) { this.joinKey = joinKey; } public Text getOthers() { return others; } public void setOthers(Text others) { this.others = others; } @Override public void write(DataOutput out) throws IOException { this.flag.write(out); this.joinKey.write(out); this.others.write(out); } @Override public void readFields(DataInput in) throws IOException { this.flag.readFields(in); this.joinKey.readFields(in); this.others.readFields(in); } @Override public int compareTo(CombineBean o) { return this.joinKey.compareTo(o.getJoinKey()); } }
map方法:
public class ReduceSideJoinMapper extends Mapper<Object, Text, Text, CombineBean> { private CombineBean combineBean = new CombineBean(); private Text flag = new Text(); private Text joinKey = new Text(); private Text others = new Text(); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 通过反射方式获取FileSplit InputSplit split = context.getInputSplit(); Class<? extends InputSplit> splitClass = split.getClass(); FileSplit fileSplit = null; if (splitClass.equals(FileSplit.class)) { fileSplit = (FileSplit) split; } else if (splitClass.getName().equals( "org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) { try { Method getInputSplitMethod = splitClass .getDeclaredMethod("getInputSplit"); getInputSplitMethod.setAccessible(true); fileSplit = (FileSplit) getInputSplitMethod.invoke(split); } catch (Exception e) { throw new IOException(e); } } String pathName = fileSplit.getPath().toString(); // 来源于路径0的记录 if (pathName.endsWith("path0")) { String[] data = value.toString().split("\\|"); if (data.length == 3) { flag.set("0"); joinKey.set(data[1]); others.set(data[0] + "#" + data[2]); } // 来源于路径1的记录 } else if (pathName.endsWith("path1")) { String[] data = value.toString().split("\\|"); if (data.length == 3) { flag.set("1"); joinKey.set(data[0]); others.set(data[1] + "#" + data[2]); } } combineBean.setFlag(flag); combineBean.setJoinKey(joinKey); combineBean.setOthers(others); context.write(combineBean.getJoinKey(), combineBean); } }
reduce方法:
public class ReduceSideJoinReducer extends Reducer<Text, CombineBean, Text, Text> { private List<Text> leftTable = new ArrayList<Text>(); private List<Text> rightTable = new ArrayList<Text>(); private Text resultVal = new Text(); @Override protected void reduce(Text key, Iterable<CombineBean> beans, Context context) throws IOException, InterruptedException { leftTable.clear(); rightTable.clear(); for (CombineBean bean : beans) { String flag = bean.getFlag().toString(); if ("0".equals(flag.trim())) { leftTable.add(bean.getOthers()); } if ("1".equals(flag.trim())) { rightTable.add(bean.getOthers()); } } for (Text leftData : leftTable) { for (Text rightData : rightTable) { resultVal.set(leftData + "#" + rightData); context.write(key, resultVal); } } } }
主方法:
public class ReduceSideJoinJob extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf, "ReduceSideJoin"); job.setJarByClass(ReduceSideJoinJob.class); job.setMapperClass(ReduceSideJoinMapper.class); job.setReducerClass(ReduceSideJoinReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CombineBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); MultipleInputs.addInputPath(job, new Path(args[0]), SequenceFileInputFormat.class); MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class); FileOutputFormat.setOutputPath(job, new Path(args[2])); return job.waitForCompletion(true) ? 0 : 1; } }
3.半连接semi join
这种连接操作其实就是前两种连接操作相结合。
适用场景:一张小表,一张大表的连接操作。
思想:提交作业时只将小表中的连接字段放到DistributedCache中分发到各节点,这样就可以减小内存中的数据大小,防止发生OOM。map阶段,先判断来自大表的连接字段是否在内存小表连接字段的集合中,不在就过滤掉。这样便减少了shuffle的网络传输量。下面的步骤就与reduce阶段的join相同了。map阶段打标签,reduce阶段连接记录。
实现:以上两种的代码结合,不在赘述。
楼下是疯子。哈哈