最近几天一直在看Hadoop相关的书籍,目前稍微有点感觉,自己就仿照着WordCount程序自己编写了一个统计关联商品。
需求描述:
根据超市的销售清单,计算商品之间的关联程度(即统计同时买A商品和B商品的次数)。
数据格式:
超市销售清单简化为如下格式:一行表示一个清单,每个商品采用 "," 分割,如下图所示:
需求分析:
采用hadoop中的mapreduce对该需求进行计算。
map函数主要拆分出关联的商品,输出结果为 key为商品A,value为商品B,对于第一条三条结果拆分结果如下图所示:
这里为了统计出和A、B两件商品想关联的商品,所以商品A、B之间的关系输出两条结果即 A-B、B-A。
reduce函数分别对和商品A相关的商品进行分组统计,即分别求value中的各个商品出现的次数,输出结果为key为商品A|商品B,value为该组合出现的次数。针对上面提到的5条记录,对map输出中key值为R的做下分析:
通过map函数的处理,得到如下图所示的记录:
reduce中对map输出的value值进行分组计数,得到的结果如下图所示
将商品A B作为key,组合个数作为value输出,输出结果如下图所示:
对于需求的实现过程的分析到目前就结束了,下面就看下具体的代码实现
代码实现:
关于代码就不做详细的介绍,具体参照代码之中的注释吧。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
|
package com; import java.io.IOException; import java.util.HashMap; import java.util.Map.Entry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Test extends Configured implements Tool{ /** * map类,实现数据的预处理 * 输出结果key为商品A value为关联商品B * @author lulei */ public static class MapT extends Mapper<LongWritable, Text, Text, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{ String line = value.toString(); if (!(line == null || "" .equals(line))) { //分割商品 String []vs = line.split( "," ); //两两组合,构成一条记录 for ( int i = 0 ; i < (vs.length - 1 ); i++) { if ( "" .equals(vs[i])) { //排除空记录 continue ; } for ( int j = i+ 1 ; j < vs.length; j++) { if ( "" .equals(vs[j])) { continue ; } //输出结果 context.write( new Text(vs[i]), new Text(vs[j])); context.write( new Text(vs[j]), new Text(vs[i])); } } } } } /** * reduce类,实现数据的计数 * 输出结果key 为商品A|B value为该关联次数 * @author lulei */ public static class ReduceT extends Reducer<Text, Text, Text, IntWritable> { private int count; /** * 初始化 */ public void setup(Context context) { //从参数中获取最小记录个数 String countStr = context.getConfiguration().get( "count" ); try { this .count = Integer.parseInt(countStr); } catch (Exception e) { this .count = 0 ; } } public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ String keyStr = key.toString(); HashMap<String, Integer> hashMap = new HashMap<String, Integer>(); //利用hash统计B商品的次数 for (Text value : values) { String valueStr = value.toString(); if (hashMap.containsKey(valueStr)) { hashMap.put(valueStr, hashMap.get(valueStr) + 1 ); } else { hashMap.put(valueStr, 1 ); } } //将结果输出 for (Entry<String, Integer> entry : hashMap.entrySet()) { if (entry.getValue() >= this .count) { //只输出次数不小于最小值的 context.write( new Text(keyStr + "|" + entry.getKey()), new IntWritable(entry.getValue())); } } } } @Override public int run(String[] arg0) throws Exception { // TODO Auto-generated method stub Configuration conf = getConf(); conf.set( "count" , arg0[ 2 ]); Job job = new Job(conf); job.setJobName( "jobtest" ); job.setOutputFormatClass(TextOutputFormat. class ); job.setOutputKeyClass(Text. class ); job.setOutputValueClass(Text. class ); job.setMapperClass(MapT. class ); job.setReducerClass(ReduceT. class ); FileInputFormat.addInputPath(job, new Path(arg0[ 0 ])); FileOutputFormat.setOutputPath(job, new Path(arg0[ 1 ])); job.waitForCompletion( true ); return job.isSuccessful() ? 0 : 1 ; } /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub if (args.length != 3 ) { System.exit(- 1 ); } try { int res = ToolRunner.run( new Configuration(), new Test(), args); System.exit(res); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } |
上传运行:
将程序打包成jar文件,上传到机群之中。将测试数据也上传到HDFS分布式文件系统中。
命令运行截图如下图所示:
运行结束后查看相应的HDFS文件系统,如下图所示:
到此一个完整的mapreduce程序就完成了,关于hadoop的学习,自己还将继续~感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!