Hadoop学习3:Hadoop中的join操作

使用过Hive的人都知道,Hive提供了类似于RDBMS中的join操作。因为Hive是基于MapReduce实现的,所以使用MapReduce也同样可以实现join操作。下面lz就来探索下MapReduce中的join操作。

1.在map阶段进行join

这就相当于Hive中的map-side join(broadcast join)。join操作在map task中完成,无需启动reduce task。

适用场景:一张小表(可放入内存)、一张大表的连接操作。

思想:小表复制到各个节点上,并加载到内存中;大表分片,与小表完成连接操作。

实现:在提交Job的时候先将小表分发到各节点,各节点在map task开始前将小表加载到内存中。然后扫描大表,找到与小表可连接的记录,直接输出。
map方法:

public class MapSideJoinMapper extends Mapper<Object, Text, Text, Text> {
	// 缓存小表
	private Map<String, String> cache = new HashMap<String, String>();
	private Text resultKey = new Text();
	private Text resultVal = new Text();

	@Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // 获取分发到的文件
	    URI[] uris = context.getCacheFiles();
	    if (uris != null) {
            BufferedReader br = null;
            for (URI uri : uris) {
                String filename = uri.toString().substring(uri.toString().lastIndexOf("/")+1);
                br = new BufferedReader(new FileReader(filename));
                String line = null;
                // 将文件存入内存中
                while ((line=br.readLine()) != null) {
                    String[] data = line.split("\\|");
                    if (data.length == 3) {
			    		String joinKey = data[1];
			    		String others = data[0] + "#" + data[2];
			    		cache.put(joinKey, others);
			        }
                }
            }
        }
    }

	@Override
	protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
		//排掉空行
        if (value == null || value.toString().equals("")) {
            return;
        }
        String[] data = value.toString().split("\\|");
        if (data.length == 3) {
        	String joinKey = data[0];
        	// 大表与小表对比
        	if (cache.containsKey(joinKey)) {
        		String others = data[1] + "#" + data[2] + "#" + cache.get(joinKey);
        		resultKey.set(joinKey);
            	resultVal.set(others);
            	context.write(resultKey, resultVal);
        	}
        }
	}
}

主方法:

public class MapSideJoinJob extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		Job job = Job.getInstance(conf, "MapSideJoin");
        // 将文件分发到各个节点
		job.addCacheFile(new URI(args[1]));
		// 设置reduce任务数为0
		job.setNumReduceTasks(0);
		FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

        job.setJarByClass(MapSideJoinJob.class);
        job.setMapperClass(MapSideJoinMapper.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

2.在reduce阶段进行join

这就相当于Hive中的reduce-side join(shuffle join)。join操作在reduce task中完成。

适用场景:两张大表的连接操作。

思想:map阶段按照连接字段进行hash,reduce阶段完成join操作。

实现:map阶段,为来自不同表的记录打标签,然后用连接字段作key,其余字段和标志位作为value输出;reduce阶段,已连接字段作为key的分组已经完成,只需要将每一个分组当中来源于不同表的记录分开,进行笛卡尔积。

要自定义map的输出value类

public class CombineBean implements WritableComparable<CombineBean> {
	// 标志位,标明文件来源
	private Text flag;
	// 连接字段
	private Text joinKey;
	// 其余字段
	private Text others;

	public CombineBean() {
		this.flag = new Text();
		this.joinKey = new Text();
		this.others = new Text();
	}
	public Text getFlag() {
		return flag;
	}
	public void setFlag(Text flag) {
		this.flag = flag;
	}
	public Text getJoinKey() {
		return joinKey;
	}
	public void setJoinKey(Text joinKey) {
		this.joinKey = joinKey;
	}
	public Text getOthers() {
		return others;
	}

	public void setOthers(Text others) {
		this.others = others;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		this.flag.write(out);
		this.joinKey.write(out);
		this.others.write(out);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		this.flag.readFields(in);
		this.joinKey.readFields(in);
		this.others.readFields(in);
	}
	@Override
	public int compareTo(CombineBean o) {
		return this.joinKey.compareTo(o.getJoinKey());
	}
}

map方法:

public class ReduceSideJoinMapper extends Mapper<Object, Text, Text, CombineBean> {

	private CombineBean combineBean = new CombineBean();
	private Text flag = new Text();
	private Text joinKey = new Text();
	private Text others = new Text();

	@Override
	protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        // 通过反射方式获取FileSplit
		InputSplit split = context.getInputSplit();
	    Class<? extends InputSplit> splitClass = split.getClass();
	    FileSplit fileSplit = null;
	    if (splitClass.equals(FileSplit.class)) {
	        fileSplit = (FileSplit) split;
	    } else if (splitClass.getName().equals(
	            "org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
	        try {
	            Method getInputSplitMethod = splitClass
	                    .getDeclaredMethod("getInputSplit");
	            getInputSplitMethod.setAccessible(true);
	            fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
	        } catch (Exception e) {
	            throw new IOException(e);
	        }
	    }
	    String pathName = fileSplit.getPath().toString();
		// 来源于路径0的记录
		if (pathName.endsWith("path0")) {
			String[] data = value.toString().split("\\|");
			if (data.length == 3) {
				flag.set("0");
				joinKey.set(data[1]);
				others.set(data[0] + "#" + data[2]);
			}
		// 来源于路径1的记录
		} else if (pathName.endsWith("path1")) {
			String[] data = value.toString().split("\\|");
			if (data.length == 3) {
				flag.set("1");
				joinKey.set(data[0]);
				others.set(data[1] + "#" + data[2]);
			}
		}
		combineBean.setFlag(flag);
		combineBean.setJoinKey(joinKey);
		combineBean.setOthers(others);
		context.write(combineBean.getJoinKey(), combineBean);
	}
}

reduce方法:

public class ReduceSideJoinReducer extends Reducer<Text, CombineBean, Text, Text> {

	private List<Text> leftTable = new ArrayList<Text>();
	private List<Text> rightTable = new ArrayList<Text>();
	private Text resultVal = new Text();

	@Override
	protected void reduce(Text key, Iterable<CombineBean> beans, Context context) throws IOException, InterruptedException {
		leftTable.clear();
		rightTable.clear();

		for (CombineBean bean : beans) {
			String flag = bean.getFlag().toString();
			if ("0".equals(flag.trim())) {
				leftTable.add(bean.getOthers());
			}
			if ("1".equals(flag.trim())) {
				rightTable.add(bean.getOthers());
			}
		}

		for (Text leftData : leftTable) {
			for (Text rightData : rightTable) {
				resultVal.set(leftData + "#" + rightData);
				context.write(key, resultVal);
			}
		}
	}

}

主方法:

public class ReduceSideJoinJob extends Configured implements Tool {

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();

		Job job = Job.getInstance(conf, "ReduceSideJoin");

		job.setJarByClass(ReduceSideJoinJob.class);
        job.setMapperClass(ReduceSideJoinMapper.class);
        job.setReducerClass(ReduceSideJoinReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CombineBean.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        MultipleInputs.addInputPath(job, new Path(args[0]), SequenceFileInputFormat.class);
        MultipleInputs.addInputPath(job, new Path(args[1]), TextInputFormat.class);
        FileOutputFormat.setOutputPath(job, new Path(args[2]));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

3.半连接semi join

这种连接操作其实就是前两种连接操作相结合。

适用场景:一张小表,一张大表的连接操作。

思想:提交作业时只将小表中的连接字段放到DistributedCache中分发到各节点,这样就可以减小内存中的数据大小,防止发生OOM。map阶段,先判断来自大表的连接字段是否在内存小表连接字段的集合中,不在就过滤掉。这样便减少了shuffle的网络传输量。下面的步骤就与reduce阶段的join相同了。map阶段打标签,reduce阶段连接记录。

实现:以上两种的代码结合,不在赘述。

1 Reply to “Hadoop学习3:Hadoop中的join操作”

发表评论