一、多线程介绍
在编程中,我们不可逃避的会遇到多线程的编程问题,因为在大多数的业务系统中需要并发处理,如果是在并发的场景中,多线程就非常重要了。另外,我们在面试的时候,面试官通常也会问到我们关于多线程的问题,如:如何创建一个线程?我们通常会这么回答,主要有两种方法,第一种:继承thread类,重写run方法;第二种:实现runnable接口,重写run方法。那么面试官一定会问这两种方法各自的优缺点在哪,不管怎么样,我们会得出一个结论,那就是使用方式二,因为面向对象提倡少继承,尽量多用组合。
这个时候,我们还可能想到,如果想得到多线程的返回值怎么办呢?根据我们多学到的知识,我们会想到实现callable接口,重写call方法。那么多线程到底在实际项目中怎么使用呢,他有多少种方式呢?
首先,我们来看一个例子:
这是一种创建多线程的简单方法,很容易理解,在例子中,根据不同的业务场景,我们可以在thread()里边传入不同的参数实现不同的业务逻辑,但是,这个方法创建多线程暴漏出来的问题就是反复创建线程,而且创建线程后还得销毁,如果对并发场景要求低的情况下,这种方式貌似也可以,但是高并发的场景中,这种方式就不行了,因为创建线程销毁线程是非常耗资源的。所以根据经验,正确的做法是我们使用线程池技术,jdk提供了多种线程池类型供我们选择,具体方式可以查阅jdk的文档。
这里代码我们需要注意的是,传入的参数代表我们配置的线程数,是不是越多越好呢?肯定不是。因为我们在配置线程数的时候要充分考虑服务器的性能,线程配置的多,服务器的性能未必就优。通常,机器完成的计算是由线程数决定的,当线程数到达峰值,就无法在进行计算了。如果是耗cpu的业务逻辑(计算较多),线程数和核数一样就到达峰值了,如果是耗i/o的业务逻辑(操作数据库,文件上传、下载等),线程数越多一定意义上有助于提升性能。
线程数大小的设定又一个公式决定:
y=n*((a+b)/a),其中,n:cpu核数,a:线程执行时程序的计算时间,b:线程执行时,程序的阻塞时间。有了这个公式后,线程池的线程数配置就会有约束了,我们可以根据机器的实际情况灵活配置。
二、多线程优化及性能比较
最近的项目中用到了所线程技术,在使用过程中遇到了很多的麻烦,趁着热度,整理一下几种多线程框架的性能比较。目前所掌握的大致分三种,第一种:threadpool(线程池)+countdownlatch(程序计数器),第二种:fork/join框架,第三种jdk8并行流,下面对这几种方式的多线程处理性能做一下比较总结。
首先,假设一种业务场景,在内存中生成多个文件对象,这里暂定30000,(thread.sleep(时间))线程睡眠模拟业务处理业务逻辑,来比较这几种方式的多线程处理性能。
1) 单线程
这种方式非常简单,但是程序在处理的过程中非常的耗时,使用的时间会很长,因为每个线程都在等待当前线程执行完才会执行,和多线程没有多少关系,所以效率非常低。
首先创建文件对象,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
public class fileinfo { private string filename; //文件名 private string filetype; //文件类型 private string filesize; //文件大小 private string filemd5; //md5码 private string fileversionno; //文件版本号 public fileinfo() { super (); } public fileinfo(string filename, string filetype, string filesize, string filemd5, string fileversionno) { super (); this .filename = filename; this .filetype = filetype; this .filesize = filesize; this .filemd5 = filemd5; this .fileversionno = fileversionno; } public string getfilename() { return filename; } public void setfilename(string filename) { this .filename = filename; } public string getfiletype() { return filetype; } public void setfiletype(string filetype) { this .filetype = filetype; } public string getfilesize() { return filesize; } public void setfilesize(string filesize) { this .filesize = filesize; } public string getfilemd5() { return filemd5; } public void setfilemd5(string filemd5) { this .filemd5 = filemd5; } public string getfileversionno() { return fileversionno; } public void setfileversionno(string fileversionno) { this .fileversionno = fileversionno; } |
接着,模拟业务处理,创建30000个文件对象,线程睡眠1ms,之前设置的1000ms,发现时间很长,整个eclipse卡掉了,所以将时间改为了1ms。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public class test { private static list<fileinfo> filelist= new arraylist<fileinfo>(); public static void main(string[] args) throws interruptedexception { createfileinfo(); long starttime=system.currenttimemillis(); for (fileinfo fi:filelist){ thread.sleep( 1 ); } long endtime=system.currenttimemillis(); system.out.println( "单线程耗时:" +(endtime-starttime)+ "ms" ); } private static void createfileinfo(){ for ( int i= 0 ;i< 30000 ;i++){ filelist.add( new fileinfo( "身份证正面照" , "jpg" , "101522" , "md5" +i, "1" )); } } } |
测试结果如下:
可以看到,生成30000个文件对象消耗的时间比较长,接近1分钟,效率比较低。
2) threadpool (线程池) +countdownlatch (程序计数器)
顾名思义,countdownlatch为线程计数器,他的执行过程如下:首先,在主线程中调用await()方法,主线程阻塞,然后,将程序计数器作为参数传递给线程对象,最后,每个线程执行完任务后,调用countdown()方法表示完成任务。countdown()被执行多次后,主线程的await()会失效。实现过程如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
public class test2 { private static executorservice executor=executors.newfixedthreadpool( 100 ); private static countdownlatch countdownlatch= new countdownlatch( 100 ); private static list<fileinfo> filelist= new arraylist<fileinfo>(); private static list<list<fileinfo>> list= new arraylist<>(); public static void main(string[] args) throws interruptedexception { createfileinfo(); addlist(); long starttime=system.currenttimemillis(); int i= 0 ; for (list<fileinfo> fi:list){ executor.submit( new filerunnable(countdownlatch,fi,i)); i++; } countdownlatch.await(); long endtime=system.currenttimemillis(); executor.shutdown(); system.out.println(i+ "个线程耗时:" +(endtime-starttime)+ "ms" ); } private static void createfileinfo(){ for ( int i= 0 ;i< 30000 ;i++){ filelist.add( new fileinfo( "身份证正面照" , "jpg" , "101522" , "md5" +i, "1" )); } } private static void addlist(){ for ( int i= 0 ;i< 100 ;i++){ list.add(filelist); } } } |
filerunnable类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
/** * 多线程处理 * @author wangsj * * @param <t> */ public class filerunnable<t> implements runnable { private countdownlatch countdownlatch; private list<t> list; private int i; public filerunnable(countdownlatch countdownlatch, list<t> list, int i) { super (); this .countdownlatch = countdownlatch; this .list = list; this .i = i; } @override public void run() { for (t t:list){ try { thread.sleep( 1 ); } catch (interruptedexception e) { e.printstacktrace(); } countdownlatch.countdown(); } } } |
测试结果如下:
3) fork/join 框架
jdk从版本7开始,出现了fork/join框架,从字面来理解,fork就是拆分,join就是合并,所以,该框架的思想就是。通过fork拆分任务,然后join来合并拆分后各个人物执行完毕后的结果并汇总。比如,我们要计算连续相加的几个数,2+4+5+7=?,我们利用fork/join框架来怎么完成呢,思想就是拆分子任务,我们可以把这个运算拆分为两个子任务,一个计算2+4,另一个计算5+7,这是fork的过程,计算完成后,把这两个子任务计算的结果汇总,得到总和,这是join的过程。
fork/join框架执行思想:首先,分割任务,使用fork类将大任务分割为若干子任务,这个分割过程需要按照实际情况来定,直到分割出的任务足够小。然后,join类执行任务,分割的子任务在不同的队列里,几个线程分别从队列里获取任务并执行,执行完的结果放到一个单独的队列里,最后,启动线程,队列里拿取结果并合并结果。
使用fork/join框架要用到几个类,关于类的使用方式可以参考jdk的api,使用该框架,首先需要继承forkjointask类,通常,只需要继承他的子类recursivetask或recursiveaction即可,recursivetask,用于有返回结果的场景,recursiveaction用于没有返回结果的场景。forkjointask的执行需要用到forkjoinpool来执行,该类用于维护分割出的子任务添加到不同的任务队列。
下面是实现代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
public class test3 { private static list<fileinfo> filelist= new arraylist<fileinfo>(); // private static forkjoinpool forkjoinpool=new forkjoinpool(100); // private static job<fileinfo> job=new job<>(filelist.size()/100, filelist); public static void main(string[] args) { createfileinfo(); long starttime=system.currenttimemillis(); forkjoinpool forkjoinpool= new forkjoinpool( 100 ); //分割任务 job<fileinfo> job= new job<>(filelist.size()/ 100 , filelist); //提交任务返回结果 forkjointask<integer> fjtresult=forkjoinpool.submit(job); //阻塞 while (!job.isdone()){ system.out.println( "任务完成!" ); } long endtime=system.currenttimemillis(); system.out.println( "fork/join框架耗时:" +(endtime-starttime)+ "ms" ); } private static void createfileinfo(){ for ( int i= 0 ;i< 30000 ;i++){ filelist.add( new fileinfo( "身份证正面照" , "jpg" , "101522" , "md5" +i, "1" )); } } } /** * 执行任务类 * @author wangsj * */ public class job<t> extends recursivetask<integer> { private static final long serialversionuid = 1l; private int count; private list<t> joblist; public job( int count, list<t> joblist) { super (); this .count = count; this .joblist = joblist; } /** * 执行任务,类似于实现runnable接口的run方法 */ @override protected integer compute() { //拆分任务 if (joblist.size()<=count){ executejob(); return joblist.size(); } else { //继续创建任务,直到能够分解执行 list<recursivetask< long >> fork = new linkedlist<recursivetask< long >>(); //拆分子任务,这里采用二分法 int countjob=joblist.size()/ 2 ; list<t> leftlist=joblist.sublist( 0 , countjob); list<t> rightlist=joblist.sublist(countjob, joblist.size()); //分配任务 job leftjob= new job<>(count,leftlist); job rightjob= new job<>(count,rightlist); //执行任务 leftjob.fork(); rightjob.fork(); return integer.parseint(leftjob.join().tostring()) +integer.parseint(rightjob.join().tostring()); } } /** * 执行任务方法 */ private void executejob() { for (t job:joblist){ try { thread.sleep( 1 ); } catch (interruptedexception e) { e.printstacktrace(); } } } |
测试结果如下:
4) jdk8 并行流
并行流是jdk8的新特性之一,思想就是将一个顺序执行的流变为一个并发的流,通过调用parallel()方法来实现。并行流将一个流分成多个数据块,用不同的线程来处理不同的数据块的流,最后合并每个块数据流的处理结果,类似于fork/join框架。
并行流默认使用的是公共线程池forkjoinpool,他的线程数是使用的默认值,根据机器的核数,我们可以适当调整线程数的大小。线程数的调整通过以下方式来实现。
1
|
system.setproperty( "java.util.concurrent.forkjoinpool.common.parallelism" , "100" ); |
以下是代码的实现过程,非常简单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class test4 { private static list<fileinfo> filelist= new arraylist<fileinfo>(); public static void main(string[] args) { // system.setproperty("java.util.concurrent.forkjoinpool.common.parallelism", "100"); createfileinfo(); long starttime=system.currenttimemillis(); filelist.parallelstream().foreach(e ->{ try { thread.sleep( 1 ); } catch (interruptedexception f) { f.printstacktrace(); } }); long endtime=system.currenttimemillis(); system.out.println( "jdk8并行流耗时:" +(endtime-starttime)+ "ms" ); } private static void createfileinfo(){ for ( int i= 0 ;i< 30000 ;i++){ filelist.add( new fileinfo( "身份证正面照" , "jpg" , "101522" , "md5" +i, "1" )); } } } |
下面是测试,第一次没有设置线程池的数量,采用默认,测试结果如下:
我们看到,结果并不是很理想,耗时较长,接下来设置线程池的数量大小,即添加如下代码:
1
|
system.setproperty( "java.util.concurrent.forkjoinpool.common.parallelism" , "100" ); |
接着进行测试,结果如下:
这次耗时较小,比较理想。
三、总结
综上几种情况来看,以单线程作为参考,耗时最长的还是原生的fork/join框架,这里边尽管配置了线程池的数量,但效果较精确配置了线程池数量的jdk8并行流较差。并行流实现代码简单易懂,不需要我们写多余的for循环,一个parallelstream方法全部搞定,代码量大大的减少了,其实,并行流的底层还是使用的fork/join框架,这就要求我们在开发的过程中灵活使用各种技术,分清各种技术的优缺点,从而能够更好的为我们服务。
原文链接:http://www.cnblogs.com/10158wsj/p/8338367.html