前言
控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程之间合作,让线程之间相互配合来满足业务逻辑。比如让线程A等待线程B执行完毕后再执行等合作策略。
控制并发流程的工具类主要有:
简介
Semaphore 信号量,许可,用于控制在一段时间内,可并发访问执行的线程数量。它的作用是控制访问特定资源的线程数目,底层依赖AQS的状态State,是在生产当中比较常用的一个工具类。
关于 AQS,可以查看《并发编程之抽象队列同步器AQS应用ReentrantLock》
一个信号量有且仅有 3 种操作,且它们全部是原子的。
- 初始化、增加和减少。
- 增加可以为一个进程解除阻塞。
- 减少可以让一个进程进入阻塞。
Semaphore 管理一系列许可证。
- 每个 acquire() 方法阻塞,直到有一个许可证可以获得然后拿走一个许可证。
- 每个 release() 方法增加一个许可证,这可能会释放一个阻塞的 acquire() 方法。
- 不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动。
Semaphore 在计数器不为 0 的时候对线程就放行,一旦达到 0,那么所有请求资源的新线程都会被阻塞,包括增加请求到许可的线程,Semaphore 是不可重入的。
- 每一次请求一个许可都会导致计数器减少 1,同样每次释放一个许可都会导致计数器增加 1,一旦达到 0,新的许可请求线程将被挂起。
Semaphore 有两种模式,公平模式 和 非公平模式 ,默认是非公平模式。
- 公平模式就是调用 acquire 的顺序就是获取许可证的顺序,遵循 FIFO。
- 非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。
应用场景
Semaphore可以用来做流量限制,特别是公共资源有限的应用场景,比如说数据库连接。
由于 release() 释放许可时,未对释放许可数做限制,所有可以通过该方法增加总的许可数量; reducePermits() 方法可以减少总的许可数量,通过这两个方法可以到达动态调整许可的
分析:假如有一个需求,需读取几个万个文件的数据,因为都是IO密集型,我们可以启动几十个线程并发的读取,但是如果读取到内存后,还需要存储到数据库,而数据库的连接数只有10个,这时候我们就必须要控制只有10个线程同时获取到数据库连接,否则会抛出异常提示无法连接数据库。针对这种情况,我们就可以使用Semaphore来做流量控制。
代码如下:
- package com.niuh.tools;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.Semaphore;
- import java.util.concurrent.TimeUnit;
- /**
- * <p>
- * Semaphore示例
- * </p>
- */
- public class SemaphoreRunner {
- /**
- * 线程数量
- */
- private static final int THREAD_COUNT = 30;
- /**
- * 线程池
- */
- private static ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
- private static Semaphore semaphore = new Semaphore(10);
- public static void main(String[] args) {
- for (int i = 0; i < THREAD_COUNT; i++) {
- executor.execute(new Runnable() {
- public void run() {
- try {
- // 获取一个"许可证"
- semaphore.acquire();
- // 模拟数据保存
- TimeUnit.SECONDS.sleep(2);
- System.out.println("save date...");
- // 执行完后,归还"许可证"
- semaphore.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });
- }
- executor.shutdown();
- }
- }
源码分析
Semaphore 类图
- Semaphore 通过使用内部类 Syn 继承 AQS 实现。
其内部主要变量和方法如下:
框架流程图如下:
构造函数
- permits 表示许可线程的数量
- fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程
- public Semaphore(int permits) {
- sync = new NonfairSync(permits);
- }
- /**
- * @param permits 总许可数
- * @param fair fair=true 公平锁 fair=false 非公平锁
- */
- public Semaphore(int permits, boolean fair) {
- sync = fair ? new FairSync(permits) : new NonfairSync(permits);
- }
内部类同步器
- abstract static class Sync extends AbstractQueuedSynchronizer {
- private static final long serialVersionUID = 1192457210091910933L;
- // 赋值setState为总许可数
- Sync(int permits) {
- setState(permits);
- }
- // 剩余许可数
- final int getPermits() {
- return getState();
- }
- // 自旋 + CAS 非公平获取
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- // 剩余可用许可数
- int available = getState();
- // 本次获取许可后,剩余许可
- int remaining = available - acquires;
- // 如果获取后,剩余许可大于0,则CAS更新剩余许可,否则获取更新失败
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- // 自旋 + CAS 释放许可
- // 由于未对释放许可数做限制,所以可以通过release动态增加许可数量
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- // 当前剩余许可
- int current = getState();
- // 许可可更新值
- int next = current + releases;
- // 如果许可更新值为负数,说明许可数量益处,抛出错误
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- // CAS更新许可数量
- if (compareAndSetState(current, next))
- return true;
- }
- }
- // 自旋 + CAS 减少许可数量
- final void reducePermits(int reductions) {
- for (;;) {
- // 当前剩余许可
- int current = getState();
- // 更新值
- int next = current - reductions;
- // 如果更新值比当前剩余许可大,抛出益处
- if (next > current) // underflow
- throw new Error("Permit count underflow");
- // CAS 更新许可数
- if (compareAndSetState(current, next))
- return;
- }
- }
- // 丢弃所有许可
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
- }
非公平模式
- /**
- * 非公平模式
- */
- static final class NonfairSync extends Sync {
- private static final long serialVersionUID = -2694183684443567898L;
- NonfairSync(int permits) {
- super(permits);
- }
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- }
公平模式
- /**
- * 公平模式
- */
- static final class FairSync extends Sync {
- private static final long serialVersionUID = 2014338818796000944L;
- FairSync(int permits) {
- super(permits);
- }
- // 公平模式获取许可
- // 公平模式不论许可是否充足,都会判断同步队列中是否线程在等待,如果有,获取失败,排队阻塞
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- // 如果有线程在排队,立即返回
- if (hasQueuedPredecessors())
- return -1;
- // 自旋 + CAS获取许可
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 ||
- compareAndSetState(available, remaining))
- return remaining;
- }
- }
- }
获取许可
Semaphore 提供了两种获取资源的方式。
- 响应中断 和 不响应中断。
响应中断获取资源
两个方法支持 Interrupt 中断机制,可使用 acquire() 方法每次获取一个信号量,也可以使用 acquire(int permits) 方法获取指定数量的信号量 。
从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到:
- 某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程
- 某些其他线程中断当前线程
如果当前线程被acquire方法使得中断状态设置为on或者在等待许可时被中断则抛出InterruptedException,并且清除当前线程的已中断状态。
acquire执行流程:
- public void acquire() throws InterruptedException {
- sync.acquireSharedInterruptibly(1);
- }
- public void acquire(int permits) throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireSharedInterruptibly(permits);
- }
- public final void acquireSharedInterruptibly(int arg)
- throws InterruptedException {
- if (Thread.interrupted())
- throw new InterruptedException();
- // 获取许可,剩余许可 >= 0,则获取许可成功 <0 获取许可失败,进入排队
- if (tryAcquireShared(arg) < 0)
- doAcquireSharedInterruptibly(arg);
- }
- /**
- * 获取许可失败,当前线程进入同步队列,排队阻塞
- * @param arg the acquire argument
- */
- private void doAcquireSharedInterruptibly(int arg)
- throws InterruptedException {
- // 创建同步队列节点,并入队列
- final Node node = addWaiter(Node.SHARED);
- boolean failed = true;
- try {
- for (;;) {
- // 如果当前节点是第二个节点,尝试获取锁
- final Node p = node.predecessor();
- if (p == head) {
- int r = tryAcquireShared(arg);
- if (r >= 0) {
- setHeadAndPropagate(node, r);
- p.next = null; // help GC
- failed = false;
- return;
- }
- }
- // 阻塞当前线程
- if (shouldParkAfterFailedAcquire(p, node) &&
- parkAndCheckInterrupt())
- throw new InterruptedException();
- }
- } finally {
- if (failed)
- cancelAcquire(node);
- }
- }
代码的执行步骤如下:
AQS 子类使用共享模式,需要实现 tryAcquireShared() 方法。
- 在公平锁中还是与ReentrantLock中的操作一样,先判断同步队列中是不是还有其他的等待线程,否则直接返回失败。否则对 state 值进行减操作并返回剩下的信号量。
- 非公平锁直接调用了父类中的 nonfairTryAcquireShared 和 ReentrantLock 一样。
- // 非公平锁的获取方式
- protected int tryAcquireShared(int acquires) {
- return nonfairTryAcquireShared(acquires);
- }
- final int nonfairTryAcquireShared(int acquires) {
- for (;;) {
- int available = getState();//获取去中的信号量数
- int remaining = available - acquires;//剩余信号量数
- //1.信号量数大于0,获取共享锁,并设置执行compareAndSetState(available, remaining),返回剩余信号量数
- //2.信号量数小于等于0,直接返回负数
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
- // 公平锁获取
- protected int tryAcquireShared(int acquires) {
- for (;;) {
- if (hasQueuedPredecessors())
- return -1;
- int available = getState();
- int remaining = available - acquires;
- if (remaining < 0 || compareAndSetState(available, remaining))
- return remaining;
- }
- }
变量 state 采用 volatile 可见修饰。
- /**
- * The synchronization state.
- */
- private volatile int state;
- /**
- * Returns the current value of synchronization state.
- * This operation has memory semantics of a <tt>volatile</tt> read.
- * @return current state value
- */
- protected final int getState() {
- return state;
- }
不响应中断获取资源
两个方法不响应 Interrupt 中断机制,其它功能与 acquire() 方法一致。
从semaphore中获取一个许可,线程会一直被阻塞直到获取一个许可或是被中断,获取一个许可后立即返回,并把许可数减1,如果没有可用的许可,当前线程会处于休眠状态直到:
- 某些其他线程调用release方法,并且当前线程是下一个要被分配许可的线程;
- 如果当前线程在等待许可时被中断,那么它会接着等待,但是与没有发生中断相比,为线程分配许可的时间可能改变。
- public void acquireUninterruptibly() {
- sync.acquireShared(1);
- }
- public void acquireUninterruptibly(int permits) {
- if (permits < 0) throw new IllegalArgumentException();
- sync.acquireShared(permits);
- }
尝试获得信号量
尝试获得信号量有三个方法。
- 尝试获取信号量,如果获取成功则返回 true,否则马上返回 false,不会阻塞当前线程。
- 尝试获取信号量,如果在指定的时间内获得信号量,则返回 true,否则返回 false。
- 尝试获取指定数量的信号量,如果在指定的时间内获得信号量,则返回 true,否则返回 false。
- public boolean tryAcquire() {
- return sync.nonfairTryAcquireShared(1) >= 0;
- }
- public boolean tryAcquire(long timeout, TimeUnit unit)
- throws InterruptedException {
- return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
- }
- public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
- throws InterruptedException {
- if (permits < 0) throw new IllegalArgumentException();
- return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
- }
释放归还许可
release 方法,主要作用是释放资源,需要保证 release 的执行,否则线程退出但是资源没有释放。
- 一般代码写在 finally 中是最好的。
- 并且获取多少资源就要释放多少资源,否则还是资源没被正确释放,如果一开始执行了 acquire(10) 最后释放的时候不能只写一个 release() 而是 release(10) 才对。
- // 尝试释放锁
- public final boolean release(int arg) {
- // 如果释放锁成功 唤醒同步队列中的后继节点
- if (tryRelease(arg)) {
- Node h = head;
- if (h != null && h.waitStatus != 0)
- unparkSuccessor(h);
- return true;
- }
- return false;
- }
- // 为了方便对比把两个代码放在一块 可以看到 release 中的结构完全一样
- // 区别就在于 doReleaseShared 中有更多的判断操作
- public final boolean releaseShared(int arg) {
- if (tryReleaseShared(arg)) {
- doReleaseShared(); //在里面执行的 unparkSuccessor(h)
- return true;
- }
- return false;
- }
子类实现共享模式的类需要实现 tryReleaseShared() 方法判断是否释放成功。
- 这个方法是一个 CAS 自旋,原因是因为 Semaphore 是一个共享锁,可能有多个线程同时释放资源,因此 CAS 操作可能失败。
- // 由于未对释放许可数做限制,所以可以通过release动态增加许可数量
- protected final boolean tryReleaseShared(int releases) {
- for (;;) {
- //获取当前许可数量
- int current = getState();
- //计算回收后的数量
- int next = current + releases;
- if (next < current) // overflow
- throw new Error("Maximum permit count exceeded");
- //CAS改变许可数量成功,返回true
- if (compareAndSetState(current, next))
- return true;
- }
- }
一旦 CAS 改变许可数量成功,就调用 doReleaseShared() 方法释放阻塞的线程。
- private void doReleaseShared() {
- // 自旋,唤醒等待的第一个线程(其它线程将由第一个线程向后传递唤醒)
- for (;;) {
- Node h = head;
- if (h != null && h != tail) {
- int ws = h.waitStatus;
- if (ws == Node.SIGNAL) {
- if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
- continue; // loop to recheck cases
- // 唤醒第一个等待线程
- unparkSuccessor(h);
- }
- else if (ws == 0 &&
- !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
- continue; // loop on failed CAS
- }
- if (h == head) // loop if head changed
- break;
- }
- }
其他方法
获取当前剩余的信号量数量
- 该方法返回 AQS 中 state 变量的值,当前剩余的信号量个数。
- public int availablePermits() {
- return sync.getPermits();
- }
- // Sync
- final int getPermits() {
- return getState();
- }
耗尽许可数量
- 获取并返回立即可用的所有许可。
- Sync 类的drainPermits()方法,获取 1 个信号量后将可用的信号量个数置为 0。例如总共有 10 个信号量,已经使用了 5 个,再调用 drainPermits() 方法后,可以获得一个信号量,剩余 4 个信号量就消失了,总共可用的信号量就变成 6 个了。用 CAS 自旋将剩余资源清空。
- public int drainPermits() {
- return sync.drainPermits();
- }
- // Sync
- final int drainPermits() {
- for (;;) {
- int current = getState();
- if (current == 0 || compareAndSetState(current, 0))
- return current;
- }
- }
缩减许可数量
- 缩减必须是单向的,即只能减少不能增加。用 CAS 自旋在剩余共享资源上做缩减。
- protected void reducePermits(int reduction) {
- if (reduction < 0) throw new IllegalArgumentException();
- sync.reducePermits(reduction);
- }
- // Sync
- final void reducePermits(int reductions) {
- for (;;) {
- int current = getState();
- int next = current - reductions;
- if (next > current) // underflow
- throw new Error("Permit count underflow");
- if (compareAndSetState(current, next))
- return;
- }
- }
上述两个方法对共享资源数量的修改操作有两点需要注意
- 是不可逆的
- 是对剩余资源的操作而不是全部资源,当剩余资源数目不足或已经为 0 时,方法就返回。
- 正在被占用的资源不参与。
判断 AQS 同步队列中是否还有 Node
- public final boolean hasQueuedThreads() {
- return sync.hasQueuedThreads();
- }
- // AbstractQueuedSynchronizer
- public final boolean hasQueuedThreads() {
- //头结点不等于尾节点就说明链表中还有元素
- return head != tail;
- }
总结
Semaphore 的内部工作流程也是基于 AQS,不同于 CyclicBarrier 和 ReentrantLock,不会使用到 AQS 的条件队列,都是在同步队列中操作,只是当前线程会被 park。
Semaphore 是 JUC 包提供的一个典型的共享锁,它通过自定义两种不同的同步器(FairSync 和 NonfairSync)提供了公平和非公平两种工作模式,两种模式下分别提供了限时/不限时、响应中断/不响应中断的获取资源的方法(限时获取总是及时响应中断的),而所有的释放资源的 release() 操作是统一的。
PS:以上代码提交在 Github :
https://github.com/Niuh-Study/niuh-juc-final.git