下面通过实例代码为大家介绍Java线程池的几种实现方法和区别:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
|
import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class TestThreadPool { // -newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程 // -其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子 // -和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP // IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器 // -从方法的源代码看,cache池和fixed 池调用的是同一个底层池,只不过参数不同: // fixed池线程数固定,并且是0秒IDLE(无IDLE) // cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE private static ExecutorService fixedService = Executors.newFixedThreadPool( 6 ); // -缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中 // -缓存型池子通常用于执行一些生存期很短的异步型任务 // 因此在一些面向连接的daemon型SERVER中用得不多。 // -能reuse的线程,必须是timeout IDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。 // 注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。 private static ExecutorService cacheService = Executors.newCachedThreadPool(); // -单例线程,任意时间池中只能有一个线程 // -用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE) private static ExecutorService singleService = Executors.newSingleThreadExecutor(); // -调度型线程池 // -这个池子里的线程可以按schedule依次delay执行,或周期执行 private static ExecutorService scheduledService = Executors.newScheduledThreadPool( 10 ); public static void main(String[] args) { DateFormat format = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss" ); List<Integer> customerList = new ArrayList<Integer>(); System.out.println(format.format( new Date())); testFixedThreadPool(fixedService, customerList); System.out.println( "--------------------------" ); testFixedThreadPool(fixedService, customerList); fixedService.shutdown(); System.out.println(fixedService.isShutdown()); System.out.println( "----------------------------------------------------" ); testCacheThreadPool(cacheService, customerList); System.out.println( "----------------------------------------------------" ); testCacheThreadPool(cacheService, customerList); cacheService.shutdownNow(); System.out.println( "----------------------------------------------------" ); testSingleServiceThreadPool(singleService, customerList); testSingleServiceThreadPool(singleService, customerList); singleService.shutdown(); System.out.println( "----------------------------------------------------" ); testScheduledServiceThreadPool(scheduledService, customerList); testScheduledServiceThreadPool(scheduledService, customerList); scheduledService.shutdown(); } public static void testScheduledServiceThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<Integer>> listCallable = new ArrayList<Callable<Integer>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<Integer> callable = new Callable<Integer>() { @Override public Integer call() throws Exception { return new Random().nextInt( 10 ); } }; listCallable.add(callable); } try { List<Future<Integer>> listFuture = service.invokeAll(listCallable); for (Future<Integer> future : listFuture) { Integer id = future.get(); customerList.add(id); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testSingleServiceThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<List<Integer>>> listCallable = new ArrayList<Callable<List<Integer>>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { List<Integer> list = getList( new Random().nextInt( 10 )); boolean isStop = false ; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000" ); isStop = true ; } return list; } }; listCallable.add(callable); } try { List<Future<List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future<List<Integer>> future : listFuture) { List<Integer> list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testCacheThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<List<Integer>>> listCallable = new ArrayList<Callable<List<Integer>>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { List<Integer> list = getList( new Random().nextInt( 10 )); boolean isStop = false ; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000" ); isStop = true ; } return list; } }; listCallable.add(callable); } try { List<Future<List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future<List<Integer>> future : listFuture) { List<Integer> list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static void testFixedThreadPool(ExecutorService service, List<Integer> customerList) { List<Callable<List<Integer>>> listCallable = new ArrayList<Callable<List<Integer>>>(); for ( int i = 0 ; i < 10 ; i++) { Callable<List<Integer>> callable = new Callable<List<Integer>>() { @Override public List<Integer> call() throws Exception { List<Integer> list = getList( new Random().nextInt( 10 )); boolean isStop = false ; while (list.size() > 0 && !isStop) { System.out.println(Thread.currentThread().getId() + " -- sleep:1000" ); isStop = true ; } return list; } }; listCallable.add(callable); } try { List<Future<List<Integer>>> listFuture = service.invokeAll(listCallable); for (Future<List<Integer>> future : listFuture) { List<Integer> list = future.get(); customerList.addAll(list); } } catch (Exception e) { e.printStackTrace(); } System.out.println(customerList.toString()); } public static List<Integer> getList( int x) { List<Integer> list = new ArrayList<Integer>(); list.add(x); list.add(x * x); return list; } } |
使用:LinkedBlockingQueue实现线程池讲解
1
2
3
4
5
6
7
8
9
10
11
|
//例如:corePoolSize=3,maximumPoolSize=6,LinkedBlockingQueue(10) //RejectedExecutionHandler默认处理方式是:ThreadPoolExecutor.AbortPolicy //ThreadPoolExecutor executorService = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 1L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10)); //1.如果线程池中(也就是调用executorService.execute)运行的线程未达到LinkedBlockingQueue.init(10)的话,当前执行的线程数是:corePoolSize(3) //2.如果超过了LinkedBlockingQueue.init(10)并且超过的数>=init(10)+corePoolSize(3)的话,并且小于init(10)+maximumPoolSize. 当前启动的线程数是:(当前线程数-init(10)) //3.如果调用的线程数超过了init(10)+maximumPoolSize 则根据RejectedExecutionHandler的规则处理。 |
关于:RejectedExecutionHandler几种默认实现讲解
1
2
3
4
5
6
7
8
|
//默认使用:ThreadPoolExecutor.AbortPolicy,处理程序遭到拒绝将抛出运行时RejectedExecutionException。 RejectedExecutionHandler policy= new ThreadPoolExecutor.AbortPolicy(); // //在 ThreadPoolExecutor.CallerRunsPolicy 中,线程调用运行该任务的execute本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。 // policy=new ThreadPoolExecutor.CallerRunsPolicy(); // //在 ThreadPoolExecutor.DiscardPolicy 中,不能执行的任务将被删除。 // policy=new ThreadPoolExecutor.DiscardPolicy(); // //在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。 // policy=new ThreadPoolExecutor.DiscardOldestPolicy(); |
希望本篇文章对您有所帮助
原文链接:http://www.2cto.com/kf/201605/511060.html