时间轮
关于时间轮的介绍,网上有很多,这里就不重复了
核心思想
- 一个环形数组存储时间轮的所有槽(看你的手表),每个槽对应当前时间轮的最小精度
- 超过当前时间轮最大表示范围的会被丢到上层时间轮,上层时间轮的最小精度即为下层时间轮能表达的最大时间(时分秒概念)
- 每个槽对应一个环形链表存储该时间应该被执行的任务
- 需要一个线程去驱动指针运转,获取到期任务
以下给出java 简单手写版本实现
代码实现
时间轮主数据结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
/** * @author apdoer * @version 1.0 * @date 2021/3/22 19:31 */ @Slf4j public class TimeWheel { /** * 一个槽的时间间隔(时间轮最小刻度) */ private long tickMs; /** * 时间轮大小(槽的个数) */ private int wheelSize; /** * 一轮的时间跨度 */ private long interval; private long currentTime; /** * 槽 */ private TimerTaskList[] buckets; /** * 上层时间轮 */ private volatile TimeWheel overflowWheel; /** * 一个timer只有一个delayqueue */ private DelayQueue<TimerTaskList> delayQueue; public TimeWheel( long tickMs, int wheelSize, long currentTime, DelayQueue<TimerTaskList> delayQueue) { this .currentTime = currentTime; this .tickMs = tickMs; this .wheelSize = wheelSize; this .interval = tickMs * wheelSize; this .buckets = new TimerTaskList[wheelSize]; this .currentTime = currentTime - (currentTime % tickMs); this .delayQueue = delayQueue; for ( int i = 0 ; i < wheelSize; i++) { buckets[i] = new TimerTaskList(); } } public boolean add(TimerTaskEntry entry) { long expiration = entry.getExpireMs(); if (expiration < tickMs + currentTime) { //到期了 return false ; } else if (expiration < currentTime + interval) { //扔进当前时间轮的某个槽里,只有时间大于某个槽,才会放进去 long virtualId = (expiration / tickMs); int index = ( int ) (virtualId % wheelSize); TimerTaskList bucket = buckets[index]; bucket.addTask(entry); //设置bucket 过期时间 if (bucket.setExpiration(virtualId * tickMs)) { //设好过期时间的bucket需要入队 delayQueue.offer(bucket); return true ; } } else { //当前轮不能满足,需要扔到上一轮 TimeWheel timeWheel = getOverflowWheel(); return timeWheel.add(entry); } return false ; } private TimeWheel getOverflowWheel() { if (overflowWheel == null ) { synchronized ( this ) { if (overflowWheel == null ) { overflowWheel = new TimeWheel(interval, wheelSize, currentTime, delayQueue); } } } return overflowWheel; } /** * 推进指针 * * @param timestamp */ public void advanceLock( long timestamp) { if (timestamp > currentTime + tickMs) { currentTime = timestamp - (timestamp % tickMs); if (overflowWheel != null ) { this .getOverflowWheel().advanceLock(timestamp); } } } } |
定时器接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
/** * 定时器 * @author apdoer * @version 1.0 * @date 2021/3/22 20:30 */ public interface Timer { /** * 添加一个新任务 * * @param timerTask */ void add(TimerTask timerTask); /** * 推动指针 * * @param timeout */ void advanceClock( long timeout); /** * 等待执行的任务 * * @return */ int size(); /** * 关闭服务,剩下的无法被执行 */ void shutdown(); } |
定时器实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
|
/** * @author apdoer * @version 1.0 * @date 2021/3/22 20:33 */ @Slf4j public class SystemTimer implements Timer { /** * 底层时间轮 */ private TimeWheel timeWheel; /** * 一个Timer只有一个延时队列 */ private DelayQueue<TimerTaskList> delayQueue = new DelayQueue<>(); /** * 过期任务执行线程 */ private ExecutorService workerThreadPool; /** * 轮询delayQueue获取过期任务线程 */ private ExecutorService bossThreadPool; public SystemTimer() { this .timeWheel = new TimeWheel( 1 , 20 , System.currentTimeMillis(), delayQueue); this .workerThreadPool = Executors.newFixedThreadPool( 100 ); this .bossThreadPool = Executors.newFixedThreadPool( 1 ); //20ms推动一次时间轮运转 this .bossThreadPool.submit(() -> { for (; ; ) { this .advanceClock( 20 ); } }); } public void addTimerTaskEntry(TimerTaskEntry entry) { if (!timeWheel.add(entry)) { //已经过期了 TimerTask timerTask = entry.getTimerTask(); log.info( "=====任务:{} 已到期,准备执行============" ,timerTask.getDesc()); workerThreadPool.submit(timerTask); } } @Override public void add(TimerTask timerTask) { log.info( "=======添加任务开始====task:{}" , timerTask.getDesc()); TimerTaskEntry entry = new TimerTaskEntry(timerTask, timerTask.getDelayMs() + System.currentTimeMillis()); timerTask.setTimerTaskEntry(entry); addTimerTaskEntry(entry); } /** * 推动指针运转获取过期任务 * * @param timeout 时间间隔 * @return */ @Override public synchronized void advanceClock( long timeout) { try { TimerTaskList bucket = delayQueue.poll(timeout, TimeUnit.MILLISECONDS); if (bucket != null ) { //推进时间 timeWheel.advanceLock(bucket.getExpiration()); //执行过期任务(包含降级) bucket.clear( this ::addTimerTaskEntry); } } catch (InterruptedException e) { log.error( "advanceClock error" ); } } @Override public int size() { //todo return 0 ; } @Override public void shutdown() { this .bossThreadPool.shutdown(); this .workerThreadPool.shutdown(); this .timeWheel = null ; } } |
存储任务的环形链表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
/** * @author apdoer * @version 1.0 * @date 2021/3/22 19:26 */ @Data @Slf4j class TimerTaskList implements Delayed { /** * TimerTaskList 环形链表使用一个虚拟根节点root */ private TimerTaskEntry root = new TimerTaskEntry( null , - 1 ); { root.next = root; root.prev = root; } /** * bucket的过期时间 */ private AtomicLong expiration = new AtomicLong(-1L); public long getExpiration() { return expiration.get(); } /** * 设置bucket的过期时间,设置成功返回true * * @param expirationMs * @return */ boolean setExpiration( long expirationMs) { return expiration.getAndSet(expirationMs) != expirationMs; } public boolean addTask(TimerTaskEntry entry) { boolean done = false ; while (!done) { //如果TimerTaskEntry已经在别的list中就先移除,同步代码块外面移除,避免死锁,一直到成功为止 entry.remove(); synchronized ( this ) { if (entry.timedTaskList == null ) { //加到链表的末尾 entry.timedTaskList = this ; TimerTaskEntry tail = root.prev; entry.prev = tail; entry.next = root; tail.next = entry; root.prev = entry; done = true ; } } } return true ; } /** * 从 TimedTaskList 移除指定的 timerTaskEntry * * @param entry */ public void remove(TimerTaskEntry entry) { synchronized ( this ) { if (entry.getTimedTaskList().equals( this )) { entry.next.prev = entry.prev; entry.prev.next = entry.next; entry.next = null ; entry.prev = null ; entry.timedTaskList = null ; } } } /** * 移除所有 */ public synchronized void clear(Consumer<TimerTaskEntry> entry) { TimerTaskEntry head = root.next; while (!head.equals(root)) { remove(head); entry.accept(head); head = root.next; } expiration.set(-1L); } @Override public long getDelay(TimeUnit unit) { return Math.max( 0 , unit.convert(expiration.get() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)); } @Override public int compareTo(Delayed o) { if (o instanceof TimerTaskList) { return Long.compare(expiration.get(), ((TimerTaskList) o).expiration.get()); } return 0 ; } } |
存储任务的容器entry
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
/** * @author apdoer * @version 1.0 * @date 2021/3/22 19:26 */ @Data class TimerTaskEntry implements Comparable<TimerTaskEntry> { private TimerTask timerTask; private long expireMs; volatile TimerTaskList timedTaskList; TimerTaskEntry next; TimerTaskEntry prev; public TimerTaskEntry(TimerTask timedTask, long expireMs) { this .timerTask = timedTask; this .expireMs = expireMs; this .next = null ; this .prev = null ; } void remove() { TimerTaskList currentList = timedTaskList; while (currentList != null ) { currentList.remove( this ); currentList = timedTaskList; } } @Override public int compareTo(TimerTaskEntry o) { return (( int ) ( this .expireMs - o.expireMs)); } } |
任务包装类(这里也可以将工作任务以线程变量的方式去传入)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
@Data @Slf4j class TimerTask implements Runnable { /** * 延时时间 */ private long delayMs; /** * 任务所在的entry */ private TimerTaskEntry timerTaskEntry; private String desc; public TimerTask(String desc, long delayMs) { this .desc = desc; this .delayMs = delayMs; this .timerTaskEntry = null ; } public synchronized void setTimerTaskEntry(TimerTaskEntry entry) { // 如果这个timetask已经被一个已存在的TimerTaskEntry持有,先移除一个 if (timerTaskEntry != null && timerTaskEntry != entry) { timerTaskEntry.remove(); } timerTaskEntry = entry; } public TimerTaskEntry getTimerTaskEntry() { return timerTaskEntry; } @Override public void run() { log.info( "============={}任务执行" , desc); } } |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/m0_43452671/article/details/115449100