`
minglaihan
  • 浏览: 15916 次
  • 性别: Icon_minigender_1
  • 来自: 天津
文章分类
社区版块
存档分类
最新评论

hadoop实现单表和多表关联

 
阅读更多

转载请注明:http://hanlaiming.freetzi.com/?p=123

在mapreduce上编写简单应用后,开始学习稍微高级一点的单表关联和多表关联。

在学习过程中我参考了这篇文章,谢谢http://www.cnblogs.com/xia520pi/archive/2012/06/04/2534533.html,里面很多基本的内容很实用。

一、单表关联。

实例中给出child-parent(孩子——父母)表,要求输出grandchild-grandparent(孙子——爷奶)表。

样例输入如下所示。

  • file:child parent
  • Tom Lucy
  • Tom Jack
  • Jone Lucy
  • Jone Jack
  • Lucy Mary
  • Lucy Ben
  • Jack Alice
  • Jack Jesse
  • Terry Alice
  • Terry Jesse
  • Philip Terry
  • Philip Alma
  • Mark Terry
  • Mark Alma
  • 家族树状关系谱:

image

家族谱

样例输出如下所示。

  • file:grandchild grandparent
  • Tom   Alice
  • Tom   Jesse
  • Jone   Alice
  • Jone   Jesse
  • Tom   Mary
  • Tom   Ben
  • Jone   Mary
  • Jone   Ben
  • Philip   Alice
  • Philip   Jesse
  • Mark   Alice
  • Mark   Jesse

设计思路

分析这个实例,显然需要进行单表连接,连接的是左表parent列和右表child列,且左表右表同一个表

  连接结果除去连接的两列就是所需要的结果——"grandchild--grandparent"表。要用MapReduce解决这个实例,首先应该考虑如何实现自连接其次就是连接列设置最后结果整理

考虑到MapReduce的shuffle过程会将相同的key会连接在一起,所以可以将map结果的key设置成待连接,然后列中相同的值就自然会连接在一起了。再与最开始的分析联系起来:

  要连接的是左表的parent列和右表的child列,且左表和右表是同一个表,所以在map阶段读入数据分割childparent之后,会将parent设置成keychild设置成value进行输出,并作为左表;再将同一对childparent中的child设置成keyparent设置成value进行输出,作为右表。为了区分输出中的左右表,需要在输出的value加上左右表信息,比如在value的String最开始处加上字符1表示左表,加上字符2表示右表。这样在map的结果中就形成了左表和右表,然后在shuffle过程中完成连接。reduce接收到连接的结果,其中每个key的value-list就包含了"grandchild--grandparent"关系。取出每个key的value-list进行解析,将左表中的child放入一个数组右表中的parent放入一个数组,然后对两个数组求笛卡尔积就是最后的结果了。

代码实现:

  • import java.io.IOException;
  • import java.util.*;
  • import org.apache.hadoop.conf.Configuration;
  • import org.apache.hadoop.fs.Path;
  • import org.apache.hadoop.io.IntWritable;
  • import org.apache.hadoop.io.Text;
  • import org.apache.hadoop.mapreduce.Job;
  • import org.apache.hadoop.mapreduce.Mapper;
  • import org.apache.hadoop.mapreduce.Reducer;
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  • import org.apache.hadoop.util.GenericOptionsParser;
  • public class STjoin {
  • public static int time = 0;
  • /*
  • * map将输出分割child和parent,然后正序输出一次作为右表,
  • * 反序输出一次作为左表,需要注意的是在输出的value中必须
  • * 加上左右表的区别标识。
  • */
  • public static class Map extends Mapper<Object, Text, Text, Text> {
  • // 实现map函数
  • public void map(Object key, Text value, Context context)
  • throws IOException, InterruptedException {
  • String childname = new String();// 孩子名称
  • String parentname = new String();// 父母名称
  • String relationtype = new String();// 左右表标识
  • // 输入的一行预处理文本
  • StringTokenizer itr=new StringTokenizer(value.toString());
  • String[] values=new String[2];
  • int i=0;
  • while(itr.hasMoreTokens()){
  • values[i]=itr.nextToken();
  • i++;
  • }
  • if (values[0].compareTo("child") != 0) {
  • childname = values[0];
  • parentname = values[1];
  • // 输出左表
  • relationtype = "1";
  • context.write(new Text(values[1]), new Text(relationtype +
  • "+"+ childname + "+" + parentname));
  • // 输出右表
  • relationtype = "2";
  • context.write(new Text(values[0]), new Text(relationtype +
  • "+"+ childname + "+" + parentname));
  • }
  • }
  • }
  • public static class Reduce extends Reducer<Text, Text, Text, Text> {
  • // 实现reduce函数
  • public void reduce(Text key, Iterable<Text> values, Context context)
  • throws IOException, InterruptedException {
  • // 输出表头
  • if (0 == time) {
  • context.write(new Text("grandchild"), new Text("grandparent"));
  • time++;
  • }
  • int grandchildnum = 0;
  • String[] grandchild = new String[10];
  • int grandparentnum = 0;
  • String[] grandparent = new String[10];
  • Iterator ite = values.iterator();
  • while (ite.hasNext()) {
  • String record = ite.next().toString();
  • int len = record.length();
  • int i = 2;
  • if (0 == len) {
  • continue;
  • }
  • // 取得左右表标识
  • char relationtype = record.charAt(0);
  • // 定义孩子和父母变量
  • String childname = new String();
  • String parentname = new String();
  • // 获取value-list中value的child
  • while (record.charAt(i) != '+') {
  • childname += record.charAt(i);
  • i++;
  • }
  • i = i + 1;
  • // 获取value-list中value的parent
  • while (i < len) {
  • parentname += record.charAt(i);
  • i++;
  • }
  • // 左表,取出child放入grandchildren
  • if ('1' == relationtype) {
  • grandchild[grandchildnum] = childname;
  • grandchildnum++;
  • }
  • // 右表,取出parent放入grandparent
  • if ('2' == relationtype) {
  • grandparent[grandparentnum] = parentname;
  • grandparentnum++;
  • }
  • }
  • // grandchild和grandparent数组求笛卡尔儿积
  • if (0 != grandchildnum && 0 != grandparentnum) {
  • for (int m = 0; m < grandchildnum; m++) {
  • for (int n = 0; n < grandparentnum; n++) {
  • // 输出结果
  • context.write(new Text(grandchild[m]), new Text(grandparent[n]));
  • }
  • }
  • }
  • }
  • }
  • public static void main(String[] args) throws Exception {
  • Configuration conf = new Configuration();
  • String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
  • if (otherArgs.length != 2) {
  • System.err.println("Usage: Single Table Join <in> <out>");
  • System.exit(2);
  • }
  • Job job = new Job(conf, "Single Table Join");
  • job.setJarByClass(STjoin.class);
  • // 设置Map和Reduce处理类
  • job.setMapperClass(Map.class);
  • job.setReducerClass(Reduce.class);
  • // 设置输出类型
  • job.setOutputKeyClass(Text.class);
  • job.setOutputValueClass(Text.class);
  • // 设置输入和输出目录
  • FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  • FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  • System.exit(job.waitForCompletion(true) ? 0 : 1);
  • }
  • }


二、多表关联

输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列;另一个代表地址表,包含地址名列和地址编号列。要求从输入数据中找出工厂名和地址名的对应关系,输出"工厂名——地址名"表。

样例输入如下所示。

  • 1)factory:
  • factoryname     addressed
  • Beijing Red Star     1
  • Shenzhen Thunder     3
  • Guangzhou Honda     2
  • Beijing Rising     1
  • Guangzhou Development Bank 2
  • Tencent         3
  • Back of Beijing      1
  • 2)address:
  • addressID addressname
  • 1     Beijing
  • 2     Guangzhou
  • 3     Shenzhen
  • 4     Xian
  • 样例输出如下所示。
  • factoryname     addressname
  • Back of Beijing      Beijing
  • Beijing Red Star     Beijing
  • Beijing Rising       Beijing
  • Guangzhou Development Bank Guangzhou
  • Guangzhou Honda     Guangzhou
  • Shenzhen Thunder     Shenzhen
  • Tencent         Shenzhen

多表关联和单表关联相似,都类似于数据库中的自然连接。相比单表关联,多表关联的左右表和连接列更加清楚。所以可以采用和单表关联的相同的处理方式,map识别出输入的行属于哪个表之后,对其进行分割,将连接的列值保存在key中,另一列和左右表标识保存在value中,然后输出。reduce拿到连接结果之后,解析value内容,根据标志将左右表内容分开存放,然后求笛卡尔积,最后直接输出。

代码实现:

  • import java.io.IOException;
  • import java.util.*;
  • import org.apache.hadoop.conf.Configuration;
  • import org.apache.hadoop.fs.Path;
  • import org.apache.hadoop.io.IntWritable;
  • import org.apache.hadoop.io.Text;
  • import org.apache.hadoop.mapreduce.Job;
  • import org.apache.hadoop.mapreduce.Mapper;
  • import org.apache.hadoop.mapreduce.Reducer;
  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  • import org.apache.hadoop.util.GenericOptionsParser;
  • public class MTjoin {
  • public static int time = 0;
  • /*
  • * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割,
  • * 保存连接列在key值,剩余列和左右表标志在value中,最后输出
  • */
  • public static class Map extends Mapper<Object, Text, Text, Text> {
  • // 实现map函数
  • public void map(Object key, Text value, Context context)
  • throws IOException, InterruptedException {
  • String line = value.toString();// 每行文件
  • String relationtype = new String();// 左右表标识
  • // 输入文件首行,不处理
  • if (line.contains("factoryname") == true
  • || line.contains("addressed") == true) {
  • return;
  • }
  • // 输入的一行预处理文本
  • StringTokenizer itr = new StringTokenizer(line);
  • String mapkey = new String();
  • String mapvalue = new String();
  • int i = 0;
  • while (itr.hasMoreTokens()) {
  • // 先读取一个单词
  • String token = itr.nextToken();
  • // 判断该地址ID就把存到"values[0]"
  • if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {
  • mapkey = token;
  • if (i > 0) {
  • relationtype = "1";
  • } else {
  • relationtype = "2";
  • }
  • continue;
  • }
  • // 存工厂名
  • mapvalue += token + " ";
  • i++;
  • }
  • // 输出左右表
  • context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue));
  • }
  • }
  • /*
  • * reduce解析map输出,将value中数据按照左右表分别保存,
  •   * 然后求出笛卡尔积,并输出。
  • */
  • public static class Reduce extends Reducer<Text, Text, Text, Text> {
  • // 实现reduce函数
  • public void reduce(Text key, Iterable<Text> values, Context context)
  • throws IOException, InterruptedException {
  • // 输出表头
  • if (0 == time) {
  • context.write(new Text("factoryname"), new Text("addressname"));
  • time++;
  • }
  • int factorynum = 0;
  • String[] factory = new String[10];
  • int addressnum = 0;
  • String[] address = new String[10];
  • Iterator ite = values.iterator();
  • while (ite.hasNext()) {
  • String record = ite.next().toString();
  • int len = record.length();
  • int i = 2;
  • if (0 == len) {
  • continue;
  • }
  • // 取得左右表标识
  • char relationtype = record.charAt(0);
  • // 左表
  • if ('1' == relationtype) {
  • factory[factorynum] = record.substring(i);
  • factorynum++;
  • }
  • // 右表
  • if ('2' == relationtype) {
  • address[addressnum] = record.substring(i);
  • addressnum++;
  • }
  • }
  • // 求笛卡尔积
  • if (0 != factorynum && 0 != addressnum) {
  • for (int m = 0; m < factorynum; m++) {
  • for (int n = 0; n < addressnum; n++) {
  • // 输出结果
  • context.write(new Text(factory[m]),
  • new Text(address[n]));
  • }
  • }
  • }
  • }
  • }

  • public static void main(String[] args) throws Exception {
  • Configuration conf = new Configuration();
  • String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  • if (otherArgs.length != 2) {
  • System.err.println("Usage: Multiple Table Join <in> <out>");
  • System.exit(2);
  • }
  • Job job = new Job(conf, "Multiple Table Join");
  • job.setJarByClass(MTjoin.class);
  • // 设置Map和Reduce处理类
  • job.setMapperClass(Map.class);
  • job.setReducerClass(Reduce.class);
  • // 设置输出类型
  • job.setOutputKeyClass(Text.class);
  • job.setOutputValueClass(Text.class);
  • // 设置输入和输出目录
  • FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  • FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  • System.exit(job.waitForCompletion(true) ? 0 : 1);
  • }
  • }
分享到:
评论

相关推荐

    Spark和Hadoop的集成

    Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。Storm是一个分布式的、容错的实时计算系统。两者整合,优势互补。

    基于Hadoop的FP-Growth关联规则并行改进算法

    大数据环境下,传统的串行FP-Growth...在Hadoop平台下,使用负载均衡和数据分割相结合的方式对原始事务数据集分片实现并行化。实验证明,基于Hadoop的负载均衡数据分割FP-Growth并行算法在处理数据量和效率上有所提高。

    Hadoop开发者第一期入门专刊

    目录:1 Hadoop 介绍 2 Hadoop 在国内应用情况 3 Hadoop 源代码eclipse 编译教程 7 在Windows 上安装Hadoop 教程 13 在Linux 上安装Hadoop...59 表关联在MapReduce 上的实现 63 Hadoop 计算平台和Hadoop 数据仓库的区别

    hadoop 2.2.2 已编译源码

    java 关联hadoop源码 查看底层实现,mapReduce实现 HDFS实现

    SEARUM:Hadoop MapReduce 关联规则挖掘实现

    海尔姆关联规则挖掘技术的 Hadoop MapReduce 实现。描述并行 FP-Growth 和关联规则挖掘 MapReduce 实现。 它运行 PFPGrowth 的每个阶段,如论文中所述,如论文中所述,针对 SEARUM 进行修改并与 SEARUM 集成 。 注意...

    hadoop-apriori:使用 Hadoop 实现 Apriori 算法

    Hadoop先验使用 Hadoop 的蛮力 Apriori 算法实现。 该算法不继续建立关联规则。用法家庭输入输出 minsup 最大数量 hadoop jar HadoopApriori.jar ...

    java使用hadoop实现关联商品统计

    本篇文章java使用hadoop实现关联商品统计,可以实现商品的关联统计,具有一定的参考价值,感兴趣的小伙伴们可以参考一下。

    基于hadoop实现的关联规则挖掘的图书数据分析推荐系统.zip

    资源包含文件:课程报告word+源码及数据库sql文件 详细介绍参考:https://biyezuopin.blog.csdn.net/article/details/125047261

    Hadoop关联数据挖掘apriori的研究与实现

    论文都是在扯淡,所有的思想都是一句话的事情。

    Hadoop实战丛书

    第4-7章深入地讲解了mapreduce计算模型、mapreduce应用的开发方法、mapreduce的工作机制,同时还列出了多个mapreduce的应用案例,涉及单词计数、数据去重、排序、单表关联和多表关联等内容;第8-11章全面地阐述了...

    hadoop倒排索引实现 完整代码+报告

    Map和 Reduce的设计思路(含 Map、Reduce阶段的 K、V类型) 基本要求与排序 因为两者代码具有关联性,故放在一起说。 首先在基本要求中,Map 我们对于输入的文件每句进行切割,将单词与文件名作为(text)key,...

    基于 hadoop实现的金庸江湖人物关系网分析+源代码+文档说明

    人物关系使用邻接表的形式表示,人物是顶点,人物之间关系是边,两个人的关系的密切程度由共现次数体现,共现次数越高,边权重越高。另外需要对共现次数进行归一化处理,确保某个顶点的出边权重和为1。 4.数据分析...

    基于Java(hadoop)实现的图书推荐系统【100011261】

    大学SDU大数据BigData课程设计,基于hadoop实现的图书推荐系统。 基于 Apriori 关联规则挖掘算法进行图书推荐的应用算法设计和实现,将利用大量图书评论数据,使用 MapReduce 并行化处理技术来完成图书的 k-频繁项集...

    论文研究-基于Hadoop的多特征协同过滤算法研究.pdf

    协同过滤是互联网推荐系统的核心技术,针对协同过滤推荐算法中推荐精度和推荐效率以及数据可扩展性问题,采用灰色关联相似度,设计和实现了一种基于Hadoop的多特征协同过滤推荐算法,使用贝叶斯概率对用户特征属性...

    基于Hadoop集群的多表并行关联算法及应用

    针对因特网环境下并行数据库实现多个大数据表关联存在的计算瓶颈,基于 Hadoop集群设计了一个并行关联多个大数据表的简便算法MR_Join。以商业网站凡客诚品的销售数据为例进行实验,验证算法的可行性并做出应用实例。...

    基于Hadoop的公共建筑能耗数据挖掘方法研究.docx

    - 数据分析师:可以借助数据挖掘算法和Hadoop平台,对大规模能耗数据进行分析,发现其中隐藏的规律和关联,提高数据分析效率和准确性。 使用场景及目标: - 公共建筑能耗管理:通过分析能耗数据,揭示建筑的能耗...

    Hadoop datajoin示例(客户和订单信息)

    文件汗有三个java类,两个测试文件txt ReduceClass.java MapClass.java TaggedRecordWritable.java customers.txt ...经过亲自测试,可以将两个文件中的信息以groupby的key关联起来,查出类似数据库的join.

    Hadoop权威指南(第2版).

    Hadoop实现了HDFS文件系统和MapRecue。用户只要继承MapReduceBase,提供分别实现Map和Reduce的两个类,并注册Job即可自动分布式运行。 目前Release版本是0.20.203.0。还不成熟,但是已经集群规模已经可以达到4000个...

    论文研究-基于Hadoop的FP-Growth关联规则并行改进算法.pdf

    大数据环境下,传统的串行FP-Growth...在Hadoop平台下,使用负载均衡和数据分割相结合的方式对原始事务数据集分片实现并行化。实验证明,基于Hadoop的负载均衡数据分割FP-Growth并行算法在处理数据量和效率上有所提高。

    Hadoop海量网络数据处理平台的关键技术

    实验结果表明,该分布式节点监控框架,能够实现采集节点故障检测的快速处理和节点负载的动态均衡,保障移动互联网流量数据采集的可靠性和完整性。 3.提出了一种异构环境下的高效数据存储机制针对当前基于Hadoop的海量...

Global site tag (gtag.js) - Google Analytics