今天要讲的是arrayblockqueue,arrayblockqueue是juc提供的线程安全的有界的阻塞队列,一看到array,第一反应:这货肯定和数组有关,既然是数组,那自然是有界的了,我们先来看看arrayblockqueue的基本使用方法,然后再看看arrayblockqueue的源码。
arrayblockqueue基本使用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public static void main(string[] args) throws interruptedexception { arrayblockingqueue<integer> arrayblockingqueue= new arrayblockingqueue( 5 ); arrayblockingqueue.offer( 10 ); arrayblockingqueue.offer( 50 ); arrayblockingqueue.add( 20 ); arrayblockingqueue.add( 60 ); system.out.println(arrayblockingqueue); system.out.println(arrayblockingqueue.poll()); system.out.println(arrayblockingqueue); system.out.println(arrayblockingqueue.take()); system.out.println(arrayblockingqueue); system.out.println(arrayblockingqueue.peek()); system.out.println(arrayblockingqueue); } |
运行结果:
- 创建了一个长度为5的arrayblockqueue。
- 用offer方法,向arrayblockqueue添加了两个元素,分别是10,50。
- 用put方法,向arrayblockqueue添加了两个元素,分别是20,60。
- 打印出arrayblockqueue,结果是10,50,20,60。
- 用poll方法,弹出arrayblockqueue第一个元素,并且打印出来:10。
- 打印出arrayblockqueue,结果是50,20,60。
- 用take方法,弹出arrayblockqueue第一个元素,并且打印出来:50。
- 打印出arrayblockqueue,结果是20,60。
- 用peek方法,弹出arrayblockqueue第一个元素,并且打印出来:20。
- 打印出arrayblockqueue,结果是20,60。
代码比较简单,但是你肯定会有疑问
- offer/add(在上面的代码中没有演示)/put都是往队列里面添加元素,区别是什么?
- poll/take/peek都是弹出队列的元素,区别是什么?
- 底层代码是如何保证线程安全的?
- 数据保存在哪里?
要解决上面几个疑问,最好的办法当然是看下源码,通过亲自阅读源码所产生的印象远远要比看视频,看博客,死记硬背最后的结论要深刻的多。就算真的忘记了,只要再看看源码,瞬间可以回忆起来。
arrayblockqueue源码解析
构造方法
arrayblockqueue提供了三个构造方法,如下图所示:
arrayblockingqueue(int capacity)
1
2
3
|
public arrayblockingqueue( int capacity) { this (capacity, false ); } |
这是最常用的构造方法,传入capacity,capacity是容量的意思,也就是arrayblockingqueue的最大长度,方法内部直接调用了第二个构造方法,传入的第二个参数为false。
arrayblockingqueue(int capacity, boolean fair)
1
2
3
4
5
6
7
8
|
public arrayblockingqueue( int capacity, boolean fair) { if (capacity <= 0 ) throw new illegalargumentexception(); this .items = new object[capacity]; lock = new reentrantlock(fair); notempty = lock.newcondition(); notfull = lock.newcondition(); } |
这个构造方法接受两个参数,分别是capacity和fair,fair是boolean类型的,代表是公平锁,还是非公平锁,可以看出如果我们用第一个构造方法来创建arrayblockingqueue的话,采用的是非公平锁,因为公平锁会损失一定的性能,在没有充足的理由的情况下,是没有必要采用公平锁的。
方法内部做了几件事情:
- 创建object类型的数组,容量为capacity,并且赋值给当前类对象的items。
- 创建排他锁。
- 创建条件变量notempty 。
- 创建条件变量notfull。
至于排他锁和两个条件变量是做什么用的,看到后面就明白了。
arrayblockingqueue(int capacity, boolean fair,collection<? extends e> c)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public arrayblockingqueue( int capacity, boolean fair, collection<? extends e> c) { //调用第二个构造方法,方法内部就是初始化数组,排他锁,两个条件变量 this (capacity, fair); final reentrantlock lock = this .lock; lock.lock(); // 开启排他锁 try { int i = 0 ; try { // 循环传入的集合,把集合中的元素赋值给items数组,其中i会自增 for (e e : c) { checknotnull(e); items[i++] = e; } } catch (arrayindexoutofboundsexception ex) { throw new illegalargumentexception(); } count = i; //把i赋值给count //如果i==capacity,也就是到了最大容量,把0赋值给putindex,否则把i赋值给putindex putindex = (i == capacity) ? 0 : i; } finally { lock.unlock(); //释放排他锁 } } |
- 调用第二个构造方法,方法内部就是初始化数组items,排他锁lock,以及两个条件变量。
- 开启排他锁。
- 循环传入的集合,将集合中的元素赋值给items数组,其中i会自增。
- 把i赋值给count。
- 如果i==capacity,说明到了最大的容量,就把0赋值给putindex,否则把i赋值给putindex。
- 在finally中释放排他锁。
看到这里,我们应该明白这个构造方法的作用是什么了,就是把传入的集合作为arrayblockingqueuede初始化数据,但是我们又会有一个新的疑问:count,putindex 是做什么用的。
offer(e e)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
public boolean offer(e e) { checknotnull(e); final reentrantlock lock = this .lock; lock.lock(); //开启排他锁 try { if (count == items.length) //如果count==items.length,返回false return false ; else { enqueue(e); //入队 return true ; //返回true } } finally { lock.unlock(); //释放锁 } } |
- 开启排他锁。
- 如果count==items.length,也就是到了最大的容量,返回false。
- 如果count<items.length,执行入队方法,并且返回true。
- 释放排他锁。
看到这里,我们应该可以明白了,arrayblockqueue是如何保证线程安全的,还是利用了reentrantlock排他锁,count就是用来保存数组的当前大小的。我们再来看看enqueue方法。
1
2
3
4
5
6
7
8
|
private void enqueue(e x) { final object[] items = this .items; items[putindex] = x; if (++putindex == items.length) putindex = 0 ; count++; notempty.signal(); } |
这方法比较简单,在代码里面就不写注释了,做了如下的操作:
- 把x赋值给items[putindex] 。
- 将putindex进行自增,如果自增后的值 == items.length,把0赋值给putindex 。
- 执行count++操作。
- 调用条件变量notempty的signal方法,说明在某个地方,必定调用了notempty的await方法,这里就是唤醒因为调用notempty的await方法而被阻塞的线程。
这里就解答了一个疑问:putindex是做什么的,就是入队元素的下标。
add(e e)
1
2
3
|
public boolean add(e e) { return super .add(e); } |
1
2
3
4
5
6
|
public boolean add(e e) { if (offer(e)) return true ; else throw new illegalstateexception( "queue full" ); } |
这个方法内部最终还是调用的offer方法。
put(e e)
1
2
3
4
5
6
7
8
9
10
11
12
|
public void put(e e) throws interruptedexception { checknotnull(e); final reentrantlock lock = this .lock; lock.lockinterruptibly(); //开启响应中断的排他锁 try { while (count == items.length) //如果队列满了,调用notfull的await notfull.await(); enqueue(e); //入队 } finally { lock.unlock(); //释放排他锁 } } |
- 开启响应中断的排他锁,如果在获取锁的过程中,当前的线程被中断,会抛出异常。
- 如果队列满了,调用notfull的await方法,说明在某个地方,必定调用了notfull的signal方法来唤醒当前线程,这里用while循环是为了防止虚假唤醒。
- 执行入队操作。
- 释放排他锁。
可以看到put方法和 offer/add方法的区别了:
- offer/add:如果队列满了,直接返回false。
- put:如果队列满了,当前线程被阻塞,等待唤醒。
poll()
1
2
3
4
5
6
7
8
9
|
public e poll() { final reentrantlock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } } |
- 开启排他锁。
- 如果count==0,直接返回null,否则执行dequeue出队操作。
- 释放排他锁。
我们来看dequeue方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
private e dequeue() { final object[] items = this .items; @suppresswarnings ( "unchecked" ) e x = (e) items[takeindex]; //获得元素的值 items[takeindex] = null ; //把null赋值给items[takeindex] if (++takeindex == items.length) //如果takeindex自增后的值== items.length,就把0赋值给takeindex takeindex = 0 ; count--; if (itrs != null ) itrs.elementdequeued(); notfull.signal(); //唤醒因为调用notfull的await方法而被阻塞的线程 return x; } |
- 获取元素的值,takeindex保存的是出队的下标。
- 把null赋值给items[takeindex],也就是清空被弹出的元素。
- 如果takeindex自增后的值== items.length,就把0赋值给takeindex。
- count--。
- 唤醒因为调用notfull的await方法而被阻塞的线程。
这里调用了notfull的signal方法来唤醒因为调用notfull的await方法而被阻塞的线程,那到底在哪里调用了notfull的await方法呢,还记不记得在put方法中调用了notfull的await方法,我们再看看:
1
2
|
while (count == items.length) notfull.await(); |
当队列满了,就调用 notfull.await()来等待,在出队操作中,又调用了notfull.signal()来唤醒。
take()
1
2
3
4
5
6
7
8
9
10
11
|
public e take() throws interruptedexception { final reentrantlock lock = this .lock; lock.lockinterruptibly(); try { while (count == 0 ) notempty.await(); return dequeue(); } finally { lock.unlock(); } } |
- 开启排他锁。
- 如果count==0,代表队列是空的,则调用notempty的await方法,用while循环是为了防止虚假唤醒。
- 执行出队操作。
- 释放排他锁。
这里调用了notempty的await方法,那么哪里调用了notempty的signal方法呢?在enqueue入队方法里。
我们可以看到take和poll的区别:
- take:如果队列为空,会阻塞,直到被唤醒了。
- poll: 如果队列为空,直接返回null。
peek()
1
2
3
4
5
6
7
8
9
|
public e peek() { final reentrantlock lock = this .lock; lock.lock(); try { return itemat(takeindex); } finally { lock.unlock(); } } |
1
2
3
|
final e itemat( int i) { return (e) items[i]; } |
- 开启排他锁。
- 获得元素。
- 释放排他锁。
我们可以看到peek和poll/take的区别:
- peek,只是获取元素,不会清空元素。
- poll/take,获取并清空元素。
size()
1
2
3
4
5
6
7
8
9
|
public int size() { final reentrantlock lock = this .lock; lock.lock(); try { return count; } finally { lock.unlock(); } } |
- 开启排他锁。
- 返回count。
- 释放排他锁。
总结
至此,arrayblockqueue的核心源码就分析完毕了,我们来做一个总结:
- arrayblockqueue有几个比较重要的字段,分别是items,保存的是队列的数据,putindex保存的是入队的下标,takeindex保存的是出队的下标,count用来统计队列元素的个数,lock用来保证线程的安全性,notempty和notfull两个条件变量实现唤醒和阻塞。
- offer和add是一样的,其中add方法内部调用的就是offer方法,如果队列满了,直接返回false。
- put,如果队列满了,会被阻塞。
- peek,只是弹出元素,不会清空元素。
- poll,弹出并清空元素,如果队列为空,直接返回null。
- take,弹出并清空元素,如果队列为空,会被阻塞。
以上所述是小编给大家介绍的arrayblockqueue源码解析详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:https://www.cnblogs.com/CodeBear/p/10668582.html