服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Java大数据处理的核心技术MapReduce框架

Java大数据处理的核心技术MapReduce框架

2023-05-10 01:05未知服务器之家 Java教程

目录 MapReduce框架 1、框架图 2、Input数据输入 2.1概念 2.2数据切片与MapTask并行度 2.3切片过程 2.4类图 2.5TextInputFormat 2.6CombineTextInputFormat 2.7Read阶段 3、Map阶段 4、Collect收集阶段 5、Shuffle阶段 6、ReduceTask MapReduce框架 1、框架图 Input→Ma

目录
  • MapReduce框架
    • 1、框架图
    • 2、Input数据输入
      • 2.1概念
      • 2.2数据切片与MapTask并行度
      • 2.3切片过程
      • 2.4类图
      • 2.5TextInputFormat
      • 2.6CombineTextInputFormat
      • 2.7Read阶段
    • 3、Map阶段
      • 4、Collect收集阶段
        • 5、Shuffle阶段
          • 6、ReduceTask

          MapReduce框架

          1、框架图

          Input→Mapper→shuffle→Reducer→Output

          2、Input数据输入

          2.1概念

          (1)数据块(Block),物理存储,Block是HDFS物理上把文件分成一块一块。数据块是HDFS存储数据单位。

          (2)数据切片,逻辑存储,数据切片是MapReduce程序j最小计算输入数据的单位。一个切片会启动一个MapTask

          2.2数据切片与MapTask并行度

          (1)一个Job的Map阶段并行度由客户端在提交job时的切片数决定;

          (2)每一个split切片分配一个MapTask并行实例片

          (3)切片是针对每一个文件单独切片

          (4)默认情况下,切片大小等于Block Size块大小

          MapTask数据=输入文件切片数据

          2.3切片过程

          (1)程序先找到数据存储目录

          (2)开始遍历处理目录下的每一个文件

          A、按每个文件进行切片

          B、判断文件是否可以切片(snappy、Gzip压缩不能切)

          (3)遍历第一个文件

          获取文件大小→计算切片大小→开始切片→将切片信息写入切片规划文件中→提交切片规划文件到yarn

          A、获取文件大小:fs.size(文件)

          B、计算切片大小:设置minsize、maxsize、blocksize

          mapreduce.input.fileinputformat.split.minsize=1 默认值为1
          mapreduce.input.fileinputformat.split.maxsize=Long.MAXValue 默认值Long.MAXValue

          计算公式 :computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))

          最大取最小,最小取最大。因此切片大小默认与 HDFS 的 block 保持一致。

          maxsize(切片最大值): 参数如果调到比 blocksize 小,则会让切片变小,而且就等于配置的这个参数的值。minsize(切片最小值): 参数调的比 blockSize 大,则可以让切片变得比 blocksize 还大。

          C、开始切片:getSplit()

          每次切片时,都要判断剩下的是否大于块的1.1倍,不大于1.1倍就切分成一块切片

          D、将切片信息写入切片规划文件中:job.split

          记录起始位置、长度、所在切点列表等

          E、提交切片规划文件到yarn

          yarn上MRAppMaster根据切片规划计算MapTask数

          三个文件:切片规则文件(job.split)、参数配置文件(job.xml)、程序jar包

          2.4类图

          Java大数据处理的核心技术MapReduce框架

          B、在job驱动中,设置自定义partitioner,job.setPartitionerClass(自定义分区类.class)

          C、自定义Partition后,要根据自定义Partitioner的逻辑设置相应的数量的ReduceTask:job.setNumReduceTasks(数量)

          public class FlowDriver {
              public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                  // 1.获取job对象
                  Configuration configuration = new Configuration();
                  Job job = Job.getInstance(configuration);
                  // 2.关联本Driver类
                  job.setJarByClass(FlowDriver.class);
                  // 3.关联Mapper和Reducer类
                  job.setMapperClass(FlowMapper.class);
                  job.setReducerClass(FlowReducer.class);
                  // 4.设置Map端输出KV类型
                  job.setMapOutputKeyClass(Text.class);
                  job.setMapOutputValueClass(FlowBean.class);
                  // 5.设置最终输出KV类型
                  job.setOutputKeyClass(Text.class);
                  job.setOutputValueClass(FlowBean.class);
                  // 6.设置程序的输入和输出路径
                  FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\input02\\phone_data.txt"));
                  FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output06"));
                  // 8.指定自定义分区器
                  job.setPartitionerClass(ProvincePartitioner.class);
                  // 9.同时也指定相应数量的ReduceTask--对应的参数mapreduce.job.reduces,默认为1
                  job.setNumReduceTasks(5);
                  // 7.提交job
                  boolean result = job.waitForCompletion(true);
                  System.exit(result ? 0 : 1);
              }
          }

          (5)Partition分区总结

          A、如果ReduceTask数量 > getPartition()结果数,则会多产生几个空的输出文件

          B、如果 1 <ReduceTask数量 <getPartition()结果数,则有一部分分区数据无处安放,会异常

          C、如果ReduceTask数量=1,则不管MapTask输出多少个分区文件,最终结果只有一个ReduceTask,只会产生一个结果文件。(分区数不大于1,不会走默认hash分区器和自定义分区器,直接返回)

          D、分区号必须从0开始,逐一累加

          (6)排序

          A、排序是MapReduce框架中最重要的操作之一

          B、MapTask和ReduceTask均会对数据按key进行排序,该 操作属于Hadoop的默认行为 。任务应用程序中的数据均会被排序,而不管逻辑上是否需要。

          C、默认排序是按照字典顺序排序,排序的方法为快速排序

          D、排序分类:部分排序、全排序、辅助排序、二次排序

          (7)溢写

          A、当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件

          (8)Combiner

          A、Combiner是MR程序中Mapper和Reducer之外的一种组件

          B、Combiner的父类是Reducer

          C、Combiner与Reducer区别:在于运行的位置 ,Combiner是在每一个MapTask所在节点运行,即在分区、排序后准备溢写前可以进行combiner。Reducer是接收全局所有MapTask输出结果。

          D、Combiner的意义是对每一个MapTask的输出进行局部汇总,以减少网络传输量

          E、Combiner应用前提是不影响最终的业务逻辑

          public class WordCountCombiner extends Reducer<Text, IntWritable, Text,IntWritable> {
              private IntWritable outV = new IntWritable();
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                  int sum = 0;
                  for (IntWritable value : values) {
                      sum += value.get();
                  }
                  outV.set(sum);
                  context.write(key,outV);
              }
          }
          public class WordCountDriver {
              public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
                  // 一获取二关联三设置一提交
                  // 1.获取配置信息及Job对象
                  Configuration configuration = new Configuration();
                  Job job = Job.getInstance(configuration);
                  // 2.关联本Driver程序的类
                  job.setJarByClass(WordCountDriver.class);
                  // 3.关联Mapper和Reducer的业务类
                  job.setMapperClass(WordCountMapper.class);
                  job.setReducerClass(WordCountReducer.class);
                  // 4.设置Mapper输出的KV类型
                  job.setMapOutputKeyClass(Text.class);
                  job.setMapOutputValueClass(IntWritable.class);
                  // 5.设置最终输出的KV类型
                  job.setOutputKeyClass(Text.class);
                  job.setOutputValueClass(IntWritable.class);
                  // 6.设置输入和输出路径
                  //FileInputFormat.setInputPaths(job, new Path(args[0]));
                  //FileOutputFormat.setOutputPath(job, new Path(args[1]));
                  FileInputFormat.setInputPaths(job, new Path("C:\\install\\temp\\input\\hadoop.txt"));
                  FileOutputFormat.setOutputPath(job, new Path("C:\\install\\temp\\output\\output01-2"));
                  // 8.设置Combiner类--方式一
                  //job.setCombinerClass(WordCountCombiner.class);
                  // 方式二:其新建的WordCountCombiner的reduce方法处理与正常的WordCountReducer中的reduce方法处理逻辑是一样
                  // 因此可以直接用此类作为combiner类
                  job.setCombinerClass(WordCountReducer.class);
                  // 9.设置ReduceTasks的数量--这样就没有reduce阶段,就不会有shuffle,Combiner也就没有用,直接由map输出,
                  // 文件名为part-m-00000,就是不part-r-00000,两者结果是不一样的
                  // 即如果没有reduce阶段,即使设置了combiner也不起作用
                  // job.setNumReduceTasks(0);
                  // 7.提交job
                  boolean result = job.waitForCompletion(true);
                  System.exit(result ? 0 : 1); // 0-正常退出 非0(1)异常终止(结束)
              }
          }

          (9)Meger

          A、MapTask以分区为单位进行合并,对所有临时文件合并成一个大文件(output/file.out),同时生成相应索引文件(output/file.out.index)

          B、对某个分区采用多轮递归合并的方式,每次合并默认10个文件,每个MapTask最终得到一个大文件

          6、ReduceTask

          (1)Copy阶段

          ReduceTask从各个MapTask上远程拷贝一片数据,如大小超过阀值,则写到磁盘上,否则直接放在内存中

          (2)Sort阶段

          由于各个MapTask已经实现了对自己处理结果进行了局部排序,因此ReduceTask只需要对所有数据进行一次归并排序即可

          (3)Reducer阶段

          reduce()函数将计算结果写到HDFS上

          (4)其他

          A、ReduceTask数量默认是1,可手动设置job.setNumReduceTasks(数量)

          B、ReduceTask=0,表示没有reduce阶段,输出文件个数和Map个数一致

          C、如果数据分布不均匀,就会在reduce阶段产生数据倾斜

          D、ReduceTask数量并不能任意设置,要考虑业务逻辑需求,具体多少个ReduceTask,需要根据集群性能确定

          E、如果分区数不是1,但ReduceTask为1,不执行分区过程(执行分区的前提是判断ReduceNum个数是否大于1)

          原文地址:https://blog.csdn.net/dreamsun_meng/article/details/130499496

          延伸 · 阅读

          精彩推荐