前言
单实例的并发控制,主要是针对JVM内,我们常规的手段即可满足需求,常见的手段大概有下面这些
- 同步代码块
- CAS自旋
- 锁
- 阻塞队列,令牌桶等
1.1 同步代码块
通过同步代码块,来确保同一时刻只会有一个线程执行对应的业务逻辑,常见的使用姿势如下
1
2
3
|
public synchronized doProcess() { // 同步代码块,只会有一个线程执行 } |
一般推荐使用最小区间使用原则,尽量不要直接在方法上加synchronized,比如经典的双重判定单例模式
1
2
3
4
5
6
7
8
9
10
11
12
|
public class Single { private static volatile Single instance; private Single() {} public static Single getInstance() { if (instance == null ) { synchronized (Single. class ) { if (instance == null ) instance = new Single(); } } return instance; } } |
1.2 CAS自旋方式
比如AtomicXXX原子类中的很多实现,就是借助unsafe的CAS来实现的,如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public final int getAndIncrement() { return unsafe.getAndAddInt( this , valueOffset, 1 ); } // unsafe 实现 // cas + 自选,不断的尝试更新设置,直到成功为止 public final int getAndAddInt(Object var1, long var2, int var4) { int var5; do { var5 = this .getIntVolatile(var1, var2); } while (! this .compareAndSwapInt(var1, var2, var5, var5 + var4)); return var5; } |
1.3 锁
jdk本身提供了不少的锁,为了实现单实例的并发控制,我们需要选择写锁;如果支持多读,单实例写,则可以考虑读写锁;一般使用姿势也比较简单
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
private void doSome(ReentrantReadWriteLock.WriteLock writeLock) { try { writeLock.lock(); System.out.println( "持有锁成功 " + Thread.currentThread().getName()); Thread.sleep( 1000 ); System.out.println( "执行完毕! " + Thread.currentThread().getName()); writeLock.unlock(); } catch (Exception e) { e.printStackTrace(); } } @Test public void lock() throws InterruptedException { ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start(); Thread.sleep( 20000 ); } |
1.4 阻塞队列
借助同步阻塞队列,也可以实现并发控制的效果,比如队列中初始化n个元素,每次消费从队列中获取一个元素,如果拿不到则阻塞;执行完毕之后,重新塞入一个元素,这样就可以实现一个简单版的并发控制
demo版演示,下面指定队列长度为2,表示最大并发数控制为2;设置为1时,可以实现单线程的访问控制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
AtomicInteger cnt = new AtomicInteger(); private void consumer(LinkedBlockingQueue<Integer> queue) { try { // 同步阻塞拿去数据 int val = queue.take(); Thread.sleep( 2000 ); System.out.println( "成功拿到: " + val + " Thread: " + Thread.currentThread()); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 添加数据 System.out.println( "结束 " + Thread.currentThread()); queue.offer(cnt.getAndAdd( 1 )); } } @Test public void blockQueue() throws InterruptedException { LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>( 2 ); queue.add(cnt.getAndAdd( 1 )); queue.add(cnt.getAndAdd( 1 )); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); new Thread(() -> consumer(queue)).start(); Thread.sleep( 10000 ); } |
1.5 信号量Semaphore
上面队列的实现方式,可以使用信号量Semaphore来完成,通过设置信号量,来控制并发数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
private void semConsumer(Semaphore semaphore) { try { //同步阻塞,尝试获取信号 semaphore.acquire( 1 ); System.out.println( "成功拿到信号,执行: " + Thread.currentThread()); Thread.sleep( 2000 ); System.out.println( "执行完毕,释放信号: " + Thread.currentThread()); semaphore.release( 1 ); } catch (Exception e) { e.printStackTrace(); } } @Test public void semaphore() throws InterruptedException { Semaphore semaphore = new Semaphore( 2 ); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); new Thread(() -> semConsumer(semaphore)).start(); Thread.sleep(20_000); } |
1.6 计数器CountDownLatch
计数,应用场景更偏向于多线程的协同,比如多个线程执行完毕之后,再处理某些事情;不同于上面的并发数的控制,它和栅栏一样,更多的是行为结果的统一
这种场景下的使用姿势一般如下
重点:countDownLatch 计数为0时放行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
@Test public void countDown() throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch( 2 ); new Thread(() -> { try { System.out.println( "do something in " + Thread.currentThread()); Thread.sleep( 2000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); new Thread(() -> { try { System.out.println( "do something in t2: " + Thread.currentThread()); Thread.sleep( 1000 ); } catch (InterruptedException e) { e.printStackTrace(); } finally { countDownLatch.countDown(); } }).start(); countDownLatch.await(); System.out.printf( "结束" ); } |
1.7 栅栏 CyclicBarrier
CyclicBarrier的作用与上面的CountDownLatch相似,区别在于正向计数+1, 只有达到条件才放行; 且支持通过调用reset()重置计数,而CountDownLatch则不行
一个简单的demo
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) { // 等待达到条件才放行 try { System.out.println( "准备执行: " + Thread.currentThread() + " at: " + LocalDateTime.now()); Thread.sleep(sleep); int index = barrier.await(); System.out.println( "开始执行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now()); } catch (Exception e) { e.printStackTrace(); } } @Test public void testCyclicBarrier() throws InterruptedException { // 到达两个工作线程才能继续往后面执行 CyclicBarrier barrier = new CyclicBarrier( 2 ); // 三秒之后,下面两个线程的才会输出 开始执行 new Thread(() -> cyclicBarrierLogic(barrier, 1000 )).start(); new Thread(() -> cyclicBarrierLogic(barrier, 3000 )).start(); Thread.sleep( 4000 ); // 重置,可以再次使用 barrier.reset(); new Thread(() -> cyclicBarrierLogic(barrier, 1 )).start(); new Thread(() -> cyclicBarrierLogic(barrier, 1 )).start(); Thread.sleep( 10000 ); } |
1.8 guava令牌桶
guava封装了非常简单的并发控制工具类RateLimiter,作为单机的并发控制首选
一个控制qps为2的简单demo如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
private void guavaProcess(RateLimiter rateLimiter) { try { // 同步阻塞方式获取 System.out.println( "准备执行: " + Thread.currentThread() + " > " + LocalDateTime.now()); rateLimiter.acquire(); System.out.println( "执行中: " + Thread.currentThread() + " > " + LocalDateTime.now()); } catch (Exception e) { e.printStackTrace(); } } @Test public void testGuavaRate() throws InterruptedException { // 1s 中放行两个请求 RateLimiter rateLimiter = RateLimiter.create( 2 .0d); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); new Thread(() -> guavaProcess(rateLimiter)).start(); Thread.sleep(20_000); } |
输出:
准备执行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
执行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
执行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
执行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
执行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
执行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
执行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
执行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219
1.9 滑动窗口TimeWindow
没有找到通用的滑动窗口jar包,一般来讲滑动窗口更适用于平滑的限流,解决瞬时高峰问题
一个供参考的实现方式:
固定大小队列,队列中每个数据代表一个时间段的计数,
访问 -》 队列头拿数据(注意不出队)-》判断是否跨时间段 -》 同一时间段,计数+1 -》跨时间段,新增数据入队,若
扔不进去,表示时间窗满,队尾数据出队
问题:当流量稀疏时,导致不会自动释放过期的数据
解决方案:根据时间段设置定时任务,模拟访问操作,只是将计数改为 + 0
1.10 小结
本文给出了几种单机版的并发控制的技术手段,主要目的是介绍了一些可选的方案,技术细节待后续补全完善,当然如果有其他的建议,欢迎评论交流
到此这篇关于Java中常见的并发控制手段的文章就介绍到这了,更多相关Java并发控制手段内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://juejin.cn/post/6995710761787981831