目录
- 阻塞队列和非阻塞队列
- 非阻塞队列
- 阻塞队列
- 抛出异常
- 特殊值
- 阻塞
- 超时
- 总结
阻塞队列和非阻塞队列
非阻塞队列
- ConcurrentLinkedQueue
单向链表结构的无界并发队列, 非阻塞队列,由CAS实现线程安全,内部基于节点实现
- ConcurrentLinkedDeque
双向链表结构的无界并发队列, 非阻塞队列,由CAS实现线程安全
- PriorityQueue
内部基于数组实现,线程不安全的队列
阻塞队列
- DelayQueue
一个支持延时获取元素的无界阻塞队列
- LinkedTransferQueue
一个由链表结构组成的无界阻塞队列。
- ArrayBlockingQueue
有界队列,阻塞式,初始化时必须指定队列大小,且不可改变;,底层由数组实现;
- SynchronousQueue
最多只能存储一个元素,每一个put操作必须等待一个take操作,否则不能继续添加元素
- PriorityBlockingQueue
一个带优先级的队列,而不是先进先出队列。元素按优先级顺序被移除,而且它也是无界的,也就是没有容量上限,虽然此队列逻辑上是无界的,但是由于资源被耗尽,所以试图执行添加操作可能会导致 OutOfMemoryError 错误;
特殊值
新消息添加进队列时,如果当前队列已满,则会被直接抛弃,导致消息丢失。
private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); //特殊值 @Test public void test2(){ System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d")); //往队列里面添加元素,超出队列长度则返回false。 System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); //移除队列里面的元素,如果有元素,则返回元素,没有则返回null。 }
阻塞
消息处理过快会一直阻塞,这个没什么问题。但是如果消息处理过慢,则会消息大量积压,导致cpu占用过高程序崩溃。
private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); //阻塞 @Test public void test3() throws InterruptedException { blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); // blockingQueue.put("d"); //如果队列已经满了,则一直阻塞,直到队列有位置。 System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); System.out.println(blockingQueue.take()); // System.out.println(blockingQueue.take()); //如果队列没有元素,则一直阻塞,直到队列有元素。 }
超时
如果队列已满,则在指定时间之后再次尝试往队列里面设置元素,设置成功返回true,失败返回false。这相比较上面的来说会好很多,相当于有2次机会往队列里面设置元素。移除元素也是同理。
private static final BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3); //超时 @Test public void test4() throws InterruptedException { System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("c")); System.out.println(blockingQueue.offer("d",3, TimeUnit.SECONDS)); //往队列里面添加元素,如果队列已满,则等待指定时间再次往队列里面设置元素,设置成功返回true,失败返回false。 System.out.println(blockingQueue.poll(3,TimeUnit.SECONDS)); //移除队列里面的元素,如果队列里面有元素,则返回元素,没有则等待指定时间再次移除队列里面的元素,如果有元素则返回元素,没有则返回null }
总结
使用队列时应该考虑队列的作用是一边积压消息,一边处理消息,而且要保证队列不会造成消息大量积压。所以我们可以让队列不停的进行设置元素,然后写个定时任务,按照指定的时间,不停的消费队列里面的全部元素。这个指定时间要根据业务量去设置,如果业务量太少,时间间隔可以设置的长一点。业务量大设置的短一点,如:1s执行一次,一次全部取完队列里的元素。这种情况就不建议使用固定长度的消息队列,而应该使用自动扩容的消息队列,因为你无法直到消息队列的具体长度是多少。
原文地址:https://blog.csdn.net/weixin_43933728/article/details/129653602