服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - java 中自定义OutputFormat的实例详解

java 中自定义OutputFormat的实例详解

2020-12-18 13:11woshisap Java教程

这篇文章主要介绍了java 中 自定义OutputFormat的实例详解的相关资料,这里提供实例帮助大家学习理解这部分内容,希望通过本文能帮助到大家,需要的朋友可以参考下

java 中 自定义OutputFormat的实例详解

实例代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.ccse.hadoop.outputformat;
 
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
 
public class MySelfOutputFormatApp {
   
  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";
  public final static String OUTPUT_FILENAME = "/abc";
   
  public static void main(String[] args) throws IOException, URISyntaxException, 
    ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
    fileSystem.delete(new Path(OUTPUT_PATH), true);
     
    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());
    job.setJarByClass(MySelfOutputFormatApp.class);
     
    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
     
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setOutputFormatClass(MyselfOutputFormat.class);
     
    job.waitForCompletion(true);
  }
   
  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
 
    private Text word = new Text();
    private LongWritable writable = new LongWritable(1);
     
    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      if (value != null) {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          context.write(word, writable);
        }
      }
    }
     
  }
   
  public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      long sum = 0
      for (LongWritable value : values) {
        sum += value.get();
      }
      context.write(key, new LongWritable(sum));
    }
  }
 
  public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> {
 
    private FSDataOutputStream outputStream = null;
     
    @Override
    public RecordWriter<Text, LongWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException,
        InterruptedException {
      try {
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());
        //指定文件的输出路径
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH 
                     + MySelfOutputFormatApp.OUTPUT_FILENAME);
        this.outputStream = fileSystem.create(path, false);
      } catch (URISyntaxException e) {
        e.printStackTrace();
      }
      return new MySelfRecordWriter(outputStream);
    }
 
    @Override
    public void checkOutputSpecs(JobContext context) throws IOException,
        InterruptedException {
    }
 
    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException {
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);
    }
     
  }
   
  public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> {
 
    private FSDataOutputStream outputStream = null;
     
    public MySelfRecordWriter(FSDataOutputStream outputStream) {
      this.outputStream = outputStream;
    }
     
    @Override
    public void write(Text key, LongWritable value) throws IOException,
        InterruptedException {
      this.outputStream.writeBytes(key.toString());
      this.outputStream.writeBytes("\t");
      this.outputStream.writeLong(value.get());
    }
 
    @Override
    public void close(TaskAttemptContext context) throws IOException,
        InterruptedException {
      this.outputStream.close();
    }
     
  }
   
}

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

原文链接:http://blog.csdn.net/woshisap/article/details/42320129

延伸 · 阅读

精彩推荐