服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - 手把手带你理解java线程池之工作队列workQueue

手把手带你理解java线程池之工作队列workQueue

2021-12-25 16:35渣男小四 Java教程

这篇文章主要介绍了java线程池之工作队列workQueue,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

线程池之工作队列

手把手带你理解java线程池之工作队列workQueue

 

ArrayBlockingQueue

采用数组来实现,并采用可重入锁ReentrantLock来做并发控制,无论是添加还是读取,都先要获得锁才能进行操作 可看出进行读写操作都使用了ReentrantLock,ArrayBlockingQueue需要为其指定容量

public boolean offer(E e) {
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lock();
      try {
          if (count == items.length)
              return false;
          else {
              enqueue(e);
              return true;
          }
      } finally {
          lock.unlock();
      }
  }
  
  public void put(E e) throws InterruptedException {
      checkNotNull(e);
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      try {
          while (count == items.length)
              notFull.await();
          enqueue(e);
      } finally {
          lock.unlock();
      }
  }

 

SynchronousQueue

由于SynchronousQueue源码比较复杂,里面大量的Cas操作,SynchronousQueue没有容器,所以里面是装不了任务的,当一个生产者线程生产一个任务的 时候,如果没有对应的消费者消费,那么该生产者会一直阻塞,知道有消费者消费为止。
图示:

手把手带你理解java线程池之工作队列workQueue

如下代码,如果我们将消费者线程注释掉执行,那么生产者哪里将会一直阻塞

package thread.customthreadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
* 测试SynchronousQueue
*/
public class SynchronousQueueTest {

  private static final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();

  private static final ExecutorService service = Executors.newCachedThreadPool();

  public static void main(String[] args) {
      /**
       * Provider
       */
      service.submit(() -> {
          try {
              synchronousQueue.put("liu");
          }catch (Exception e){
              e.printStackTrace();
          }
          System.out.println("Consumer finished spending");
      });

      /**
       * Consumer
       */
      service.submit(() ->{
          try {
              synchronousQueue.take();
          }catch (Exception e){
              e.printStackTrace();
          }
          System.out.println("take over");
      });
  }
}

 

LinkedBlockingDeque

LinkedBlockingDeque是一个双向队列,底层使用单链表实现,任何一段都可进行元素的读写操作,在初始化LinkedBlockingDeque的时候, 我们可以指定容量,也可不指定,如果不指定,则容量为Integer.MAX_VALUE,

注:Deque是双端队列,而Queue是单端队列,双端意思是两端都可以进行读写操作,而单端则只能从一端进,一端出(FIFO)

public LinkedBlockingDeque() {
      this(Integer.MAX_VALUE);
}
package thread.customthreadpool;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingDequeTest {

  private static final LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>();

  public static void main(String[] args) throws InterruptedException {
      deque.put(1);
      deque.put(2);
      deque.put(3);
      deque.put(4);
      deque.put(5);
      System.out.println(deque);
      System.out.println("deque size  "+deque.size());
      deque.take();
      deque.take();
      deque.take();
      deque.take();
      deque.take();
      System.out.println(deque);
      System.out.println("deque size  "+deque.size());
  }
}

手把手带你理解java线程池之工作队列workQueue

 

LinkedBlockingQueue

底层基于单向连表实现,是一个单向队列,具有先进先出(FIFO)特点,使用了ReentrantLock来做并发控制,读写操作都上锁

private final ReentrantLock putLock = new ReentrantLock();
  public void put(E e) throws InterruptedException {
      if (e == null) throw new NullPointerException();
      int c = -1;
      Node<E> node = new Node<E>(e);
      final ReentrantLock putLock = this.putLock;
      final AtomicInteger count = this.count;
      putLock.lockInterruptibly();
      try {
          while (count.get() == capacity) {
              notFull.await();
          }
          enqueue(node);
          c = count.getAndIncrement();
          if (c + 1 < capacity)
              notFull.signal();
      } finally {
          putLock.unlock();
      }
      if (c == 0)
          signalNotEmpty();
  }
  public E take() throws InterruptedException {
      E x;
      int c = -1;
      final AtomicInteger count = this.count;
      final ReentrantLock takeLock = this.takeLock;
      takeLock.lockInterruptibly();
      try {
          while (count.get() == 0) {
              notEmpty.await();
          }
          x = dequeue();
          c = count.getAndDecrement();
          if (c > 1)
              notEmpty.signal();
      } finally {
          takeLock.unlock();
      }
      if (c == capacity)
          signalNotFull();
      return x;
  }

 

DelayDeque

DelayDeque是一个无界队列,添加进DelayDeque的元素会经过compareTo方法计算,然后按照时间 进行排序,排在队头的元素是最早到期的,越往后到期时间越长,DelayDeque只能接受Delayed接口类型 如图所示,队列里的元素并不是按照先进先出的规则,而是按照过期时间

手把手带你理解java线程池之工作队列workQueue

示例

package thread.customthreadpool.delayDeque;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class MyDelayed implements Delayed {

  private final String taskName ;
  private final long nowTime = System.currentTimeMillis();
  private final long expireTime ;

  public MyDelayed(String taskName,long expireTime) {
      this.taskName = taskName;
      this.expireTime = expireTime;
  }

  @Override
  public long getDelay(TimeUnit unit) {
      return unit.convert((nowTime+expireTime) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
  }

  @Override
  public int compareTo(Delayed o) {
      MyDelayed myDelayed = (MyDelayed) o;
      return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
  }

  @Override
  public String toString() {
      return "MyDelayed{" +
              "taskName='" + taskName + '\'' +
              ", nowTime=" + nowTime +
              ", expireTime=" + expireTime +
              '}';
  }
}
package thread.customthreadpool.delayDeque;

import java.util.concurrent.*;

public class MyDelayQueue {

  private static final DelayQueue<MyDelayed> delayQueue = new DelayQueue<>();

  private static final ExecutorService service = Executors.newCachedThreadPool();

  public static void main(String[] args) throws InterruptedException {
      service.submit(() -> {
          delayQueue.put(new MyDelayed("A-Task",5000));
          delayQueue.put(new MyDelayed("B-Task",4000));
          delayQueue.put(new MyDelayed("C-Task",3000));
          delayQueue.put(new MyDelayed("D-Task",2000));
          delayQueue.put(new MyDelayed("E-Task",1000));
      });
      while (true){
          System.out.println(delayQueue.take());
      }
  }
}

result

手把手带你理解java线程池之工作队列workQueue

应用场景

1.美团外卖订单:当我们下单后没付款 ,30分钟后将自动取消订单
2.缓存,对于某些任务,需要在特定的时间清理;

and so on

 

LinkedTransferQueue

当消费线程从队列中取元素时,如果队列为空,那么生成一个为null的节点,消费者线程就一直等待,此时如果生产者线程发现队列中有一个null节点, 它就不入队了,而是将元素填充到这个null节点并唤醒消费者线程,然后消费者线程取走元素。
LinkedTransferQueue是 SynchronousQueue 和 LinkedBlockingQueue 的整合,性能比较高,因为没有锁操作, SynchronousQueue不能存储元素,而LinkedTransferQueue能存储元素,

 

PriorityBlockingQueue

PriorityBlockingQueue是一个无界的阻塞队列,同时是一个支持优先级的队列,读写操作都是基于ReentrantLock, 内部使用堆算法保证每次出队都是优先级最高的元素

public E take() throws InterruptedException {
      final ReentrantLock lock = this.lock;
      lock.lockInterruptibly();
      E result;
      try {
          while ( (result = dequeue()) == null)
              notEmpty.await();
      } finally {
          lock.unlock();
      }
      return result;
}

到此这篇关于手把手带你理解java线程池之工作队列workQueue的文章就介绍到这了,更多相关java线程池内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/steakliu/p/15245736.html

延伸 · 阅读

精彩推荐
  • Java教程20个非常实用的Java程序代码片段

    20个非常实用的Java程序代码片段

    这篇文章主要为大家分享了20个非常实用的Java程序片段,对java开发项目有所帮助,感兴趣的小伙伴们可以参考一下 ...

    lijiao5352020-04-06
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    这篇文章主要介绍了Java使用SAX解析xml的示例,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java8中Stream使用的一个注意事项

    Java8中Stream使用的一个注意事项

    最近在工作中发现了对于集合操作转换的神器,java8新特性 stream,但在使用中遇到了一个非常重要的注意点,所以这篇文章主要给大家介绍了关于Java8中S...

    阿杜7482021-02-04
  • Java教程升级IDEA后Lombok不能使用的解决方法

    升级IDEA后Lombok不能使用的解决方法

    最近看到提示IDEA提示升级,寻思已经有好久没有升过级了。升级完毕重启之后,突然发现好多错误,本文就来介绍一下如何解决,感兴趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程小米推送Java代码

    小米推送Java代码

    今天小编就为大家分享一篇关于小米推送Java代码,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    富贵稳中求8032021-07-12
  • Java教程Java BufferWriter写文件写不进去或缺失数据的解决

    Java BufferWriter写文件写不进去或缺失数据的解决

    这篇文章主要介绍了Java BufferWriter写文件写不进去或缺失数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java实现抢红包功能

    Java实现抢红包功能

    这篇文章主要为大家详细介绍了Java实现抢红包功能,采用多线程模拟多人同时抢红包,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙...

    littleschemer13532021-05-16
  • Java教程xml与Java对象的转换详解

    xml与Java对象的转换详解

    这篇文章主要介绍了xml与Java对象的转换详解的相关资料,需要的朋友可以参考下...

    Java教程网2942020-09-17