1. 收集阶段
在mapper
中,调用context.write(key,value)
实际是调用代理newoutputcollector
的wirte
方法
1
2
3
4
|
public void write(keyout key, valueout value ) throws ioexception, interruptedexception { output.write(key, value); } |
实际调用的是mapoutputbuffer
的collect()
,在进行收集前,调用partitioner来计算每个key-value的分区号
1
2
3
4
5
|
@override public void write(k key, v value) throws ioexception, interruptedexception { collector.collect(key, value, partitioner.getpartition(key, value, partitions)); } |
2. newoutputcollector对象的创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@suppresswarnings ( "unchecked" ) newoutputcollector(org.apache.hadoop.mapreduce.jobcontext jobcontext, jobconf job, taskumbilicalprotocol umbilical, taskreporter reporter ) throws ioexception, classnotfoundexception { // 创建实际用来收集key-value的缓存区对象 collector = createsortingcollector(job, reporter); // 获取总的分区个数 partitions = jobcontext.getnumreducetasks(); if (partitions > 1 ) { partitioner = (org.apache.hadoop.mapreduce.partitioner<k,v>) reflectionutils.newinstance(jobcontext.getpartitionerclass(), job); } else { // 默认情况,直接创建一个匿名内部类,所有的key-value都分配到0号分区 partitioner = new org.apache.hadoop.mapreduce.partitioner<k,v>() { @override public int getpartition(k key, v value, int numpartitions) { return partitions - 1 ; } }; } } |
3. 创建环形缓冲区对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@suppresswarnings ( "unchecked" ) private <key, value> mapoutputcollector<key, value> createsortingcollector(jobconf job, taskreporter reporter) throws ioexception, classnotfoundexception { mapoutputcollector.context context = new mapoutputcollector.context( this , job, reporter); // 从当前job的配置中,获取mapreduce.job.map.output.collector.class,如果没有设置,使用mapoutputbuffer.class class <?>[] collectorclasses = job.getclasses( jobcontext.map_output_collector_class_attr, mapoutputbuffer. class ); int remainingcollectors = collectorclasses.length; exception lastexception = null ; for ( class clazz : collectorclasses) { try { if (!mapoutputcollector. class .isassignablefrom(clazz)) { throw new ioexception( "invalid output collector class: " + clazz.getname() + " (does not implement mapoutputcollector)" ); } class <? extends mapoutputcollector> subclazz = clazz.assubclass(mapoutputcollector. class ); log.debug( "trying map output collector class: " + subclazz.getname()); // 创建缓冲区对象 mapoutputcollector<key, value> collector = reflectionutils.newinstance(subclazz, job); // 创建完缓冲区对象后,执行初始化 collector.init(context); log.info( "map output collector class = " + collector.getclass().getname()); return collector; } catch (exception e) { string msg = "unable to initialize mapoutputcollector " + clazz.getname(); if (--remainingcollectors > 0 ) { msg += " (" + remainingcollectors + " more collector(s) to try)" ; } lastexception = e; log.warn(msg, e); } } throw new ioexception( "initialization of all the collectors failed. " + "error in last collector was :" + lastexception.getmessage(), lastexception); } |
3. mapoutputbuffer的初始化 环形缓冲区对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
@suppresswarnings ( "unchecked" ) public void init(mapoutputcollector.context context ) throws ioexception, classnotfoundexception { job = context.getjobconf(); reporter = context.getreporter(); maptask = context.getmaptask(); mapoutputfile = maptask.getmapoutputfile(); sortphase = maptask.getsortphase(); spilledrecordscounter = reporter.getcounter(taskcounter.spilled_records); // 获取分区总个数,取决于reducetask的数量 partitions = job.getnumreducetasks(); rfs = ((localfilesystem)filesystem.getlocal(job)).getraw(); //sanity checks // 从当前配置中,获取mapreduce.map.sort.spill.percent,如果没有设置,就是0.8 final float spillper = job.getfloat(jobcontext.map_sort_spill_percent, ( float ) 0.8 ); // 获取mapreduce.task.io.sort.mb,如果没设置,就是100mb final int sortmb = job.getint(jobcontext.io_sort_mb, 100 ); indexcachememorylimit = job.getint(jobcontext.index_cache_memory_limit, index_cache_memory_limit_default); if (spillper > ( float ) 1.0 || spillper <= ( float ) 0.0 ) { throw new ioexception( "invalid \"" + jobcontext.map_sort_spill_percent + "\": " + spillper); } if ((sortmb & 0x7ff ) != sortmb) { throw new ioexception( "invalid \"" + jobcontext.io_sort_mb + "\": " + sortmb); } // 在溢写前,对key-value排序,采用的排序器,使用快速排序,只排索引 sorter = reflectionutils.newinstance(job.getclass( "map.sort.class" , quicksort. class , indexedsorter. class ), job); // buffers and accounting int maxmemusage = sortmb << 20 ; maxmemusage -= maxmemusage % metasize; // 存放key-value kvbuffer = new byte [maxmemusage]; bufvoid = kvbuffer.length; // 存储key-value的属性信息,分区号,索引等 kvmeta = bytebuffer.wrap(kvbuffer) .order(byteorder.nativeorder()) .asintbuffer(); setequator( 0 ); bufstart = bufend = bufindex = equator; kvstart = kvend = kvindex; maxrec = kvmeta.capacity() / nmeta; softlimit = ( int )(kvbuffer.length * spillper); bufferremaining = softlimit; log.info(jobcontext.io_sort_mb + ": " + sortmb); log.info( "soft limit at " + softlimit); log.info( "bufstart = " + bufstart + "; bufvoid = " + bufvoid); log.info( "kvstart = " + kvstart + "; length = " + maxrec); // k/v serialization // 获取快速排序的key的比较器,排序只按照key进行排序! comparator = job.getoutputkeycomparator(); // 获取key-value的序列化器 keyclass = ( class <k>)job.getmapoutputkeyclass(); valclass = ( class <v>)job.getmapoutputvalueclass(); serializationfactory = new serializationfactory(job); keyserializer = serializationfactory.getserializer(keyclass); keyserializer.open(bb); valserializer = serializationfactory.getserializer(valclass); valserializer.open(bb); // output counters mapoutputbytecounter = reporter.getcounter(taskcounter.map_output_bytes); mapoutputrecordcounter = reporter.getcounter(taskcounter.map_output_records); fileoutputbytecounter = reporter .getcounter(taskcounter.map_output_materialized_bytes); // 溢写到磁盘,可以使用一个压缩格式! 获取指定的压缩编解码器 // compression if (job.getcompressmapoutput()) { class <? extends compressioncodec> codecclass = job.getmapoutputcompressorclass(defaultcodec. class ); codec = reflectionutils.newinstance(codecclass, job); } else { codec = null ; } // 获取combiner组件 // combiner final counters.counter combineinputcounter = reporter.getcounter(taskcounter.combine_input_records); combinerrunner = combinerrunner.create(job, gettaskid(), combineinputcounter, reporter, null ); if (combinerrunner != null ) { final counters.counter combineoutputcounter = reporter.getcounter(taskcounter.combine_output_records); combinecollector= new combineoutputcollector<k,v>(combineoutputcounter, reporter, job); } else { combinecollector = null ; } spillinprogress = false ; minspillsforcombine = job.getint(jobcontext.map_combine_min_spills, 3 ); // 设置溢写线程在后台运行,溢写是在后台运行另外一个溢写线程!和收集是两个线程! spillthread.setdaemon( true ); spillthread.setname( "spillthread" ); spilllock.lock(); try { // 启动线程 spillthread.start(); while (!spillthreadrunning) { spilldone.await(); } } catch (interruptedexception e) { throw new ioexception( "spill thread failed to initialize" , e); } finally { spilllock.unlock(); } if (sortspillexception != null ) { throw new ioexception( "spill thread failed to initialize" , sortspillexception); } } |
4. paritionner的获取
从配置中读取mapreduce.job.partitioner.class
,如果没有指定,采用hashpartitioner.class
如果reducetask > 1, 还没有设置分区组件,使用hashpartitioner
1
2
3
4
5
6
|
@suppresswarnings ( "unchecked" ) public class <? extends partitioner<?,?>> getpartitionerclass() throws classnotfoundexception { return ( class <? extends partitioner<?,?>>) conf.getclass(partitioner_class_attr, hashpartitioner. class ); } |
1
2
3
4
5
6
7
|
public class hashpartitioner<k, v> extends partitioner<k, v> { /** use {@link object#hashcode()} to partition. **/ public int getpartition(k key, v value, int numreducetasks) { return (key.hashcode() & integer.max_value) % numreducetasks; } } |
分区号的限制:0 <= 分区号 < 总的分区数(reducetask的个数)
1
2
3
4
|
if (partition < 0 || partition >= partitions) { throw new ioexception( "illegal partition for " + key + " (" + partition + ")" ); } |
5.maptask shuffle的流程
①在map()调用context.write()
②调用mapoutputbuffer的collect()
- 调用分区组件partitionner计算当前这组key-value的分区号
③将当前key-value收集到mapoutputbuffer中
- 如果超过溢写的阀值,在后台启动溢写线程,来进行溢写!
④溢写前,先根据分区号,将相同分区号的key-value,采用快速排序算法,进行排序!
- 排序并不在内存中移动key-value,而是记录排序后key-value的有序索引!
⑤ 开始溢写,按照排序后有序的索引,将文件写入到一个临时的溢写文件中
- 如果没有定义combiner,直接溢写!
- 如果定义了combiner,使用combinerrunner.conbine()对key-value处理后再次溢写!
⑥多次溢写后,每次溢写都会产生一个临时文件
⑦最后,执行一次flush(),将剩余的key-value进行溢写
⑧mergeparts: 将多次溢写的结果,保存为一个总的文件!
- 在合并为一个总的文件前,会执行归并排序,保证合并后的文件,各个分区也是有序的!
- 如果定义了conbiner,conbiner会再次运行(前提是溢写的文件个数大于3)!
- 否则,就直接溢写!
⑨最终保证生成一个最终的文件,这个文件根据总区号,分为若干部分,每个部分的key-value都已经排好序,等待reducetask来拷贝相应分区的数据
6. combiner
combiner其实就是reducer类型:
1
2
|
class <? extends reducer<k,v,k,v>> cls = ( class <? extends reducer<k,v,k,v>>) job.getcombinerclass(); |
combiner的运行时机:
maptask:
- ①每次溢写前,如果指定了combiner,会运行
- ②将多个溢写片段,进行合并为一个最终的文件时,也会运行combiner,前提是片段数>=3
reducetask:
③reducetask在运行时,需要启动shuffle进程拷贝maptask产生的数据!
- 数据在copy后,进入shuffle工作的内存,在内存中进行merge和sort!
- 数据过多,内部不够,将部分数据溢写在磁盘!
- 如果有溢写的过程,那么combiner会再次运行!
①一定会运行,②,③需要条件!
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对服务器之家的支持。如果你想了解更多相关内容请查看下面相关链接
原文链接:https://blog.csdn.net/qq_43193797/article/details/86097451