java 中 自定义OutputFormat的实例详解
实例代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
|
package com.ccse.hadoop.outputformat; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; public class MySelfOutputFormatApp { public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput" ; public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput" ; public final static String OUTPUT_FILENAME = "/abc" ; public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get( new URI(OUTPUT_PATH), conf); fileSystem.delete( new Path(OUTPUT_PATH), true ); Job job = new Job(conf, MySelfOutputFormatApp. class .getSimpleName()); job.setJarByClass(MySelfOutputFormatApp. class ); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); job.setMapperClass(MyMapper. class ); job.setMapOutputKeyClass(Text. class ); job.setMapOutputValueClass(LongWritable. class ); job.setReducerClass(MyReducer. class ); job.setOutputKeyClass(Text. class ); job.setOutputValueClass(LongWritable. class ); job.setOutputFormatClass(MyselfOutputFormat. class ); job.waitForCompletion( true ); } public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { private Text word = new Text(); private LongWritable writable = new LongWritable( 1 ); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException { if (value != null ) { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, writable); } } } } public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException { long sum = 0 ; for (LongWritable value : values) { sum += value.get(); } context.write(key, new LongWritable(sum)); } } public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> { private FSDataOutputStream outputStream = null ; @Override public RecordWriter<Text, LongWritable> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { try { FileSystem fileSystem = FileSystem.get( new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration()); //指定文件的输出路径 final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH + MySelfOutputFormatApp.OUTPUT_FILENAME); this .outputStream = fileSystem.create(path, false ); } catch (URISyntaxException e) { e.printStackTrace(); } return new MySelfRecordWriter(outputStream); } @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { } @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter( new Path(MySelfOutputFormatApp.OUTPUT_PATH), context); } } public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> { private FSDataOutputStream outputStream = null ; public MySelfRecordWriter(FSDataOutputStream outputStream) { this .outputStream = outputStream; } @Override public void write(Text key, LongWritable value) throws IOException, InterruptedException { this .outputStream.writeBytes(key.toString()); this .outputStream.writeBytes( "\t" ); this .outputStream.writeLong(value.get()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { this .outputStream.close(); } } } |
2.OutputFormat是用于处理各种输出目的地的。
2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。
2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。
2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。
以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
原文链接:http://blog.csdn.net/woshisap/article/details/42320129