服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - java简单手写版本实现时间轮算法

java简单手写版本实现时间轮算法

2021-09-02 12:23扶苏l Java教程

这篇文章主要为大家详细介绍了java简单手写版本实现时间轮算法,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

时间轮

关于时间轮的介绍,网上有很多,这里就不重复了

核心思想

  • 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
  • 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
  • 每个槽对应一个环形链表存储该时间应该被执行的任务
  • 需要一个线程去驱动指针运转,获取到期任务

以下给出java 简单手写版本实现

代码实现

时间轮主数据结构

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:31
 */
@Slf4j
public class TimeWheel {
 /**
  * 一个槽的时间间隔(时间轮最小刻度)
  */
 private long tickMs;
 
 /**
  * 时间轮大小(槽的个数)
  */
 private int wheelSize;
 
 /**
  * 一轮的时间跨度
  */
 private long interval;
 
 private long currentTime;
 
 /**
  * 槽
  */
 private TimerTaskList[] buckets;
 
 /**
  * 上层时间轮
  */
 private volatile TimeWheel overflowWheel;
 
 /**
  * 一个timer只有一个delayqueue
  */
 private DelayQueue<TimerTaskList> delayQueue;
 
 public TimeWheel(long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) {
  this.currentTime = currentTime;
  this.tickMs = tickMs;
  this.wheelSize = wheelSize;
  this.interval = tickMs * wheelSize;
  this.buckets = new TimerTaskList[wheelSize];
  this.currentTime = currentTime - (currentTime % tickMs);
  this.delayQueue = delayQueue;
  for (int i = 0; i < wheelSize; i++) {
   buckets[i] = new TimerTaskList();
  }
 }
 
 public boolean add(TimerTaskEntry entry) {
  long expiration = entry.getExpireMs();
  if (expiration < tickMs + currentTime) {
   //到期了
   return false;
  } else if (expiration < currentTime + interval) {
   //扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去
   long virtualId = (expiration / tickMs);
   int index = (int) (virtualId % wheelSize);
   TimerTaskList bucket = buckets[index];
   bucket.addTask(entry);
   //设置bucket 过期时间
   if (bucket.setExpiration(virtualId * tickMs)) {
    //设好过期时间的bucket需要入队
    delayQueue.offer(bucket);
    return true;
   }
  } else {
   //当前轮不能满足,需要扔到上一轮
   TimeWheel timeWheel = getOverflowWheel();
   return timeWheel.add(entry);
  }
  return false;
 }
 
 
 private TimeWheel getOverflowWheel() {
  if (overflowWheel == null) {
   synchronized (this) {
    if (overflowWheel == null) {
     overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue);
    }
   }
  }
  return overflowWheel;
 }
 
 /**
  * 推进指针
  *
  * @param timestamp
  */
 public void advanceLock(long timestamp) {
  if (timestamp > currentTime + tickMs) {
   currentTime = timestamp - (timestamp % tickMs);
   if (overflowWheel != null) {
    this.getOverflowWheel().advanceLock(timestamp);
   }
  }
 }
}

定时器接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
 * 定时器
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:30
 */
public interface Timer {
 
 /**
  * 添加一个新任务
  *
  * @param timerTask
  */
 void add(TimerTask timerTask);
 
 
 /**
  * 推动指针
  *
  * @param timeout
  */
 void advanceClock(long timeout);
 
 /**
  * 等待执行的任务
  *
  * @return
  */
 int size();
 
 /**
  * 关闭服务,剩下的无法被执行
  */
 void shutdown();
}

定时器实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 20:33
 */
@Slf4j
public class SystemTimer implements Timer {
 /**
  * 底层时间轮
  */
 private TimeWheel timeWheel;
 /**
  * 一个Timer只有一个延时队列
  */
 private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>();
 /**
  * 过期任务执行线程
  */
 private ExecutorService workerThreadPool;
 /**
  * 轮询delayQueue获取过期任务线程
  */
 private ExecutorService bossThreadPool;
 
 
 public SystemTimer() {
  this.timeWheel = new TimeWheel(1, 20, System.currentTimeMillis(), delayQueue);
  this.workerThreadPool = Executors.newFixedThreadPool(100);
  this.bossThreadPool = Executors.newFixedThreadPool(1);
  //20ms推动一次时间轮运转
  this.bossThreadPool.submit(() -> {
   for (; ; ) {
    this.advanceClock(20);
   }
  });
 }
 
 
 public void addTimerTaskEntry(TimerTaskEntry entry) {
  if (!timeWheel.add(entry)) {
   //已经过期了
   TimerTask timerTask = entry.getTimerTask();
   log.info("=====任务:{} 已到期,准备执行============",timerTask.getDesc());
   workerThreadPool.submit(timerTask);
  }
 }
 
 @Override
 public void add(TimerTask timerTask) {
  log.info("=======添加任务开始====task:{}", timerTask.getDesc());
  TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis());
  timerTask.setTimerTaskEntry(entry);
  addTimerTaskEntry(entry);
 }
 
 /**
  * 推动指针运转获取过期任务
  *
  * @param timeout 时间间隔
  * @return
  */
 @Override
 public synchronized void advanceClock(long timeout) {
  try {
   TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS);
   if (bucket != null) {
    //推进时间
    timeWheel.advanceLock(bucket.getExpiration());
    //执行过期任务(包含降级)
    bucket.clear(this::addTimerTaskEntry);
   }
  } catch (InterruptedException e) {
   log.error("advanceClock error");
  }
 }
 
 @Override
 public int size() {
  //todo
  return 0;
 }
 
 @Override
 public void shutdown() {
  this.bossThreadPool.shutdown();
  this.workerThreadPool.shutdown();
  this.timeWheel = null;
 }
}

存储任务的环形链表

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
@Slf4j
class TimerTaskList implements Delayed {
 /**
  * TimerTaskList 环形链表使用一个虚拟根节点root
  */
 private TimerTaskEntry root = new TimerTaskEntry(null, -1);
 
 {
  root.next = root;
  root.prev = root;
 }
 
 /**
  * bucket的过期时间
  */
 private AtomicLong expiration = new AtomicLong(-1L);
 
 public long getExpiration() {
  return expiration.get();
 }
 
 /**
  * 设置bucket的过期时间,设置成功返回true
  *
  * @param expirationMs
  * @return
  */
 boolean setExpiration(long expirationMs) {
  return expiration.getAndSet(expirationMs) != expirationMs;
 }
 
 public boolean addTask(TimerTaskEntry entry) {
  boolean done = false;
  while (!done) {
   //如果TimerTaskEntry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止
   entry.remove();
   synchronized (this) {
    if (entry.timedTaskList == null) {
     //加到链表的末尾
     entry.timedTaskList = this;
     TimerTaskEntry tail = root.prev;
     entry.prev = tail;
     entry.next = root;
     tail.next = entry;
     root.prev = entry;
     done = true;
    }
   }
  }
  return true;
 }
 
 /**
  * 从 TimedTaskList 移除指定的 timerTaskEntry
  *
  * @param entry
  */
 public void remove(TimerTaskEntry entry) {
  synchronized (this) {
   if (entry.getTimedTaskList().equals(this)) {
    entry.next.prev = entry.prev;
    entry.prev.next = entry.next;
    entry.next = null;
    entry.prev = null;
    entry.timedTaskList = null;
   }
  }
 }
 
 /**
  * 移除所有
  */
 public synchronized void clear(Consumer<TimerTaskEntry> entry) {
  TimerTaskEntry head = root.next;
  while (!head.equals(root)) {
   remove(head);
   entry.accept(head);
   head = root.next;
  }
  expiration.set(-1L);
 }
 
 @Override
 public long getDelay(TimeUnit unit) {
  return Math.max(0, unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS));
 }
 
 @Override
 public int compareTo(Delayed o) {
  if (o instanceof TimerTaskList) {
   return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get());
  }
  return 0;
 }
}

存储任务的容器entry

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * @author apdoer
 * @version 1.0
 * @date 2021/3/22 19:26
 */
@Data
class TimerTaskEntry implements Comparable<TimerTaskEntry> {
 private TimerTask timerTask;
 private long expireMs;
 volatile TimerTaskList timedTaskList;
 TimerTaskEntry next;
 TimerTaskEntry prev;
 
 public TimerTaskEntry(TimerTask timedTask, long expireMs) {
  this.timerTask = timedTask;
  this.expireMs = expireMs;
  this.next = null;
  this.prev = null;
 }
 
 void remove() {
  TimerTaskList currentList = timedTaskList;
  while (currentList != null) {
   currentList.remove(this);
   currentList = timedTaskList;
  }
 }
 
 @Override
 public int compareTo(TimerTaskEntry o) {
  return ((int) (this.expireMs - o.expireMs));
 }
}

任务包装类(这里也可以将工作任务以线程变量的方式去传入)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Data
@Slf4j
class TimerTask implements Runnable {
 /**
  * 延时时间
  */
 private long delayMs;
 /**
  * 任务所在的entry
  */
 private TimerTaskEntry timerTaskEntry;
 
 private String desc;
 
 public TimerTask(String desc, long delayMs) {
  this.desc = desc;
  this.delayMs = delayMs;
  this.timerTaskEntry = null;
 }
 
 public synchronized void setTimerTaskEntry(TimerTaskEntry entry) {
  // 如果这个timetask已经被一个已存在的TimerTaskEntry持有,先移除一个
  if (timerTaskEntry != null && timerTaskEntry != entry) {
   timerTaskEntry.remove();
  }
  timerTaskEntry = entry;
 }
 
 public TimerTaskEntry getTimerTaskEntry() {
  return timerTaskEntry;
 }
 
 @Override
 public void run() {
  log.info("============={}任务执行", desc);
 }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://blog.csdn.net/m0_43452671/article/details/115449100

延伸 · 阅读

精彩推荐
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    这篇文章主要介绍了Java使用SAX解析xml的示例,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java8中Stream使用的一个注意事项

    Java8中Stream使用的一个注意事项

    最近在工作中发现了对于集合操作转换的神器,java8新特性 stream,但在使用中遇到了一个非常重要的注意点,所以这篇文章主要给大家介绍了关于Java8中S...

    阿杜7472021-02-04
  • Java教程小米推送Java代码

    小米推送Java代码

    今天小编就为大家分享一篇关于小米推送Java代码,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    富贵稳中求8032021-07-12
  • Java教程xml与Java对象的转换详解

    xml与Java对象的转换详解

    这篇文章主要介绍了xml与Java对象的转换详解的相关资料,需要的朋友可以参考下...

    Java教程网2942020-09-17
  • Java教程Java BufferWriter写文件写不进去或缺失数据的解决

    Java BufferWriter写文件写不进去或缺失数据的解决

    这篇文章主要介绍了Java BufferWriter写文件写不进去或缺失数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望...

    spcoder14552021-10-18
  • Java教程升级IDEA后Lombok不能使用的解决方法

    升级IDEA后Lombok不能使用的解决方法

    最近看到提示IDEA提示升级,寻思已经有好久没有升过级了。升级完毕重启之后,突然发现好多错误,本文就来介绍一下如何解决,感兴趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程Java实现抢红包功能

    Java实现抢红包功能

    这篇文章主要为大家详细介绍了Java实现抢红包功能,采用多线程模拟多人同时抢红包,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙...

    littleschemer13532021-05-16
  • Java教程20个非常实用的Java程序代码片段

    20个非常实用的Java程序代码片段

    这篇文章主要为大家分享了20个非常实用的Java程序片段,对java开发项目有所帮助,感兴趣的小伙伴们可以参考一下 ...

    lijiao5352020-04-06