最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案。这篇文章记录了调研的过程,以及初步方案的实现。
候选方案对比
下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势。
方案 | 优势 | 劣势 | 选用场景 |
---|---|---|---|
JDK 内置的延迟队列 DelayQueue |
实现简单 | 数据内存态,不可靠 | 一致性相对低的场景 |
调度框架和 MySQL 进行短间隔轮询 |
实现简单,可靠性高 | 存在明显的性能瓶颈 | 数据量较少实时性相对低的场景 |
RabbitMQ 的 DLX 和 TTL ,一般称为 死信队列 方案 |
异步交互可以削峰 | 延时的时间长度不可控,如果数据需要持久化则性能会降低 | - |
调度框架和 Redis 进行短间隔轮询 |
数据持久化,高性能 | 实现难度大 | 常见于支付结果回调方案 |
时间轮 | 实时性高 | 实现难度大,内存消耗大 | 实时性高的场景 |
如果应用的数据量不高,实时性要求比较低,选用调度框架和 MySQL
进行短间隔轮询这个方案是最优的方案。但是笔者遇到的场景数据量相对比较大,实时性并不高,采用扫库的方案一定会对 MySQL
实例造成比较大的压力。记得很早之前,看过一个PPT叫《盒子科技聚合支付系统演进》,其中里面有一张图片给予笔者一点启发:
里面刚好用到了调度框架和 Redis
进行短间隔轮询实现延时任务的方案,不过为了分摊应用的压力,图中的方案还做了分片处理。鉴于笔者当前业务紧迫,所以在第一期的方案暂时不考虑分片,只做了一个简化版的实现。
由于PPT中没有任何的代码或者框架贴出,有些需要解决的技术点需要自行思考,下面会重现一次整个方案实现的详细过程。
场景设计
实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做 OrderMessage
),订单消息需要延迟5到15秒后进行异步处理。
否决的候选方案实现思路
下面介绍一下其它四个不选用的候选方案,结合一些伪代码和流程分析一下实现过程。
JDK内置延迟队列
DelayQueue
是一个阻塞队列的实现,它的队列元素必须是 Delayed
的子类,这里做个简单的例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
|
public class DelayQueueMain { private static final Logger LOGGER = LoggerFactory.getLogger(DelayQueueMain.class); public static void main(String[] args) throws Exception { DelayQueue<OrderMessage> queue = new DelayQueue<>(); // 默认延迟5秒 OrderMessage message = new OrderMessage( "ORDER_ID_10086" ); queue. add (message); // 延迟6秒 message = new OrderMessage( "ORDER_ID_10087" , 6); queue. add (message); // 延迟10秒 message = new OrderMessage( "ORDER_ID_10088" , 10); queue. add (message); ExecutorService executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setName( "DelayWorker" ); thread.setDaemon( true ); return thread; }); LOGGER.info( "开始执行调度线程..." ); executorService. execute (() -> { while ( true ) { try { OrderMessage task = queue.take(); LOGGER.info( "延迟处理订单消息,{}" , task.getDescription()); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } }); Thread.sleep( Integer .MAX_VALUE); } private static class OrderMessage implements Delayed { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss" ); /** * 默认延迟5000毫秒 */ private static final long DELAY_MS = 1000L * 5; /** * 订单ID */ private final String orderId; /** * 创建时间戳 */ private final long timestamp ; /** * 过期时间 */ private final long expire; /** * 描述 */ private final String description; public OrderMessage(String orderId, long expireSeconds) { this.orderId = orderId; this. timestamp = System.currentTimeMillis(); this.expire = this. timestamp + expireSeconds * 1000L; this.description = String.format( "订单[%s]-创建时间为:%s,超时时间为:%s" , orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli( timestamp ), ZoneId.systemDefault()).format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F)); } public OrderMessage(String orderId) { this.orderId = orderId; this. timestamp = System.currentTimeMillis(); this.expire = this. timestamp + DELAY_MS; this.description = String.format( "订单[%s]-创建时间为:%s,超时时间为:%s" , orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli( timestamp ), ZoneId.systemDefault()).format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli(expire), ZoneId.systemDefault()).format(F)); } public String getOrderId() { return orderId; } public long getTimestamp() { return timestamp ; } public long getExpire() { return expire; } public String getDescription() { return description; } @Override public long getDelay(TimeUnit unit) { return unit. convert (this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return ( int ) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); } } } |
注意一下, OrderMessage
实现 Delayed
接口,关键是需要实现 Delayed#getDelay()
和 Delayed#compareTo()
。运行一下 main()
方法:
1
2
3
4
|
10:16:08.240 [main] INFO club.throwable.delay.DelayQueueMain - 开始执行调度线程... 10:16:13.224 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10086]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:13 10:16:14.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10087]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:14 10:16:18.237 [DelayWorker] INFO club.throwable.delay.DelayQueueMain - 延迟处理订单消息,订单[ORDER_ID_10088]-创建时间为:2019-08-20 10:16:08,超时时间为:2019-08-20 10:16:18 |
调度框架 + MySQL
使用调度框架对 MySQL
表进行短间隔轮询是实现难度比较低的方案,通常服务刚上线,表数据不多并且实时性不高的情况下应该首选这个方案。不过要注意以下几点:
MySQL
引入 Quartz
、 MySQL
的Java驱动包和 spring-boot-starter-jdbc
(这里只是为了方便用相对轻量级的框架实现,生产中可以按场景按需选择其他更合理的框架):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> <version>2.1.7.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.1</version> <scope>test</scope> </dependency> |
假设表设计如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
CREATE DATABASE `delayTask` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_520_ci; USE `delayTask`; CREATE TABLE `t_order_message` ( id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR (50) NOT NULL COMMENT '订单ID' , create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建日期时间' , edit_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '修改日期时间' , retry_times TINYINT NOT NULL DEFAULT 0 COMMENT '重试次数' , order_status TINYINT NOT NULL DEFAULT 0 COMMENT '订单状态' , INDEX idx_order_id (order_id), INDEX idx_create_time (create_time) ) COMMENT '订单信息表' ; # 写入两条测试数据 INSERT INTO t_order_message(order_id) VALUES ( '10086' ),( '10087' ); |
编写代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
|
/ / 常量 public class OrderConstants { public static final int MAX_RETRY_TIMES = 5 ; public static final int PENDING = 0 ; public static final int SUCCESS = 1 ; public static final int FAIL = - 1 ; public static final int LIMIT = 10 ; } / / 实体 @Builder @Data public class OrderMessage { private Long id ; private String orderId; private LocalDateTime createTime; private LocalDateTime editTime; private Integer retryTimes; private Integer orderStatus; } / / DAO @RequiredArgsConstructor public class OrderMessageDao { private final JdbcTemplate jdbcTemplate; private static final ResultSetExtractor< List <OrderMessage>> M = r - > { List <OrderMessage> list = Lists.newArrayList(); while (r. next ()) { list .add(OrderMessage.builder() . id (r.getLong( "id" )) .orderId(r.getString( "order_id" )) .createTime(r.getTimestamp( "create_time" ).toLocalDateTime()) .editTime(r.getTimestamp( "edit_time" ).toLocalDateTime()) .retryTimes(r.getInt( "retry_times" )) .orderStatus(r.getInt( "order_status" )) .build()); } return list ; }; public List <OrderMessage> selectPendingRecords(LocalDateTime start, LocalDateTime end, List <Integer> statusList, int maxRetryTimes, int limit) { StringJoiner joiner = new StringJoiner( "," ); statusList.forEach(s - > joiner.add(String.valueOf(s))); return jdbcTemplate.query( "SELECT * FROM t_order_message WHERE create_time >= ? AND create_time <= ? " + "AND order_status IN (?) AND retry_times < ? LIMIT ?" , p - > { p.setTimestamp( 1 , Timestamp.valueOf(start)); p.setTimestamp( 2 , Timestamp.valueOf(end)); p.setString( 3 , joiner.toString()); p.setInt( 4 , maxRetryTimes); p.setInt( 5 , limit); }, M); } public int updateOrderStatus( Long id , int status) { return jdbcTemplate.update( "UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id =?" , p - > { p.setInt( 1 , status); p.setTimestamp( 2 , Timestamp.valueOf(LocalDateTime.now())); p.setLong( 3 , id ); }); } } / / Service @RequiredArgsConstructor public class OrderMessageService { private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageService. class ); private final OrderMessageDao orderMessageDao; private static final List <Integer> STATUS = Lists.newArrayList(); static { STATUS.add(OrderConstants.PENDING); STATUS.add(OrderConstants.FAIL); } public void executeDelayJob() { LOGGER.info( "订单处理定时任务开始执行......" ); LocalDateTime end = LocalDateTime.now(); / / 一天前 LocalDateTime start = end.minusDays( 1 ); List <OrderMessage> list = orderMessageDao.selectPendingRecords(start, end, STATUS, OrderConstants.MAX_RETRY_TIMES, OrderConstants.LIMIT); if (! list .isEmpty()) { for (OrderMessage m : list ) { LOGGER.info( "处理订单[{}],状态由{}更新为{}" , m.getOrderId(), m.getOrderStatus(), OrderConstants.SUCCESS); / / 这里其实可以优化为批量更新 orderMessageDao.updateOrderStatus(m.getId(), OrderConstants.SUCCESS); } } LOGGER.info( "订单处理定时任务开始完毕......" ); } } / / Job @DisallowConcurrentExecution public class OrderMessageDelayJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { OrderMessageService service = (OrderMessageService) jobExecutionContext.getMergedJobDataMap().get( "orderMessageService" ); service.executeDelayJob(); } public static void main(String[] args) throws Exception { HikariConfig config = new HikariConfig(); config.setJdbcUrl( "jdbc:mysql://localhost:3306/delayTask?useSSL=false&characterEncoding=utf8" ); config.setDriverClassName(Driver. class .getName()); config.setUsername( "root" ); config.setPassword( "root" ); HikariDataSource dataSource = new HikariDataSource(config); OrderMessageDao orderMessageDao = new OrderMessageDao(new JdbcTemplate(dataSource)); OrderMessageService service = new OrderMessageService(orderMessageDao); / / 内存模式的调度器 StdSchedulerFactory factory = new StdSchedulerFactory(); Scheduler scheduler = factory.getScheduler(); / / 这里没有用到IOC容器,直接用Quartz数据集合传递服务引用 JobDataMap jobDataMap = new JobDataMap(); jobDataMap.put( "orderMessageService" , service); / / 新建Job JobDetail job = JobBuilder.newJob(OrderMessageDelayJob. class ) .withIdentity( "orderMessageDelayJob" , "delayJob" ) .usingJobData(jobDataMap) .build(); / / 新建触发器, 10 秒执行一次 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity( "orderMessageDelayTrigger" , "delayJob" ) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds( 10 ).repeatForever()) .build(); scheduler.scheduleJob(job, trigger); / / 启动调度器 scheduler.start(); Thread.sleep(Integer.MAX_VALUE); } } |
这个例子里面用了 create_time
做轮询,实际上可以添加一个调度时间 schedule_time
列做轮询,这样子才能更容易定制空闲时和忙碌时候的调度策略。上面的示例的运行效果如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
11 : 58 : 27.202 [main] INFO org.quartz.core.QuartzScheduler - Scheduler meta - data: Quartz Scheduler (v2. 3.1 ) 'DefaultQuartzScheduler' with instanceId 'NON_CLUSTERED' Scheduler class : 'org.quartz.core.QuartzScheduler' - running locally. NOT STARTED. Currently in standby mode. Number of jobs executed: 0 Using thread pool 'org.quartz.simpl.SimpleThreadPool' - with 10 threads. Using job - store 'org.quartz.simpl.RAMJobStore' - which does not support persistence. and is not clustered. 11 : 58 : 27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties' 11 : 58 : 27.202 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.3 . 1 11 : 58 : 27.209 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started. 11 : 58 : 27.212 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers 11 : 58 : 27.217 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob' , class = club.throwable.jdbc.OrderMessageDelayJob 11 : 58 : 27.219 [HikariPool - 1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool - 1 - Added connection com.mysql.jdbc.JDBC4Connection@ 10eb8c53 11 : 58 : 27.220 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers 11 : 58 : 27.221 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.quartz.core.JobRunShell - Calling execute on job delayJob.orderMessageDelayJob 11 : 58 : 34.440 [DefaultQuartzScheduler_Worker - 1 ] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始执行...... 11 : 58 : 34.451 [HikariPool - 1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool - 1 - Added connection com.mysql.jdbc.JDBC4Connection@ 3d27ece4 11 : 58 : 34.459 [HikariPool - 1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool - 1 - Added connection com.mysql.jdbc.JDBC4Connection@ 64e808af 11 : 58 : 34.470 [HikariPool - 1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool - 1 - Added connection com.mysql.jdbc.JDBC4Connection@ 79c8c2b7 11 : 58 : 34.477 [HikariPool - 1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool - 1 - Added connection com.mysql.jdbc.JDBC4Connection@ 19a62369 11 : 58 : 34.485 [HikariPool - 1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool - 1 - Added connection com.mysql.jdbc.JDBC4Connection@ 1673d017 11 : 58 : 34.485 [HikariPool - 1 connection adder] DEBUG com.zaxxer.hikari.pool.HikariPool - HikariPool - 1 - After adding stats (total = 10 , active = 0 , idle = 10 , waiting = 0 ) 11 : 58 : 34.559 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL query 11 : 58 : 34.565 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [SELECT * FROM t_order_message WHERE create_time > = ? AND create_time < = ? AND order_status IN (?) AND retry_times < ? LIMIT ?] 11 : 58 : 34.645 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource 11 : 58 : 35.210 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - SQLWarning ignored: SQL state '22007' , error code '1292' , message [Truncated incorrect DOUBLE value: '0,-1' ] 11 : 58 : 35.335 [DefaultQuartzScheduler_Worker - 1 ] INFO club.throwable.jdbc.OrderMessageService - 处理订单[ 10086 ],状态由 0 更新为 1 11 : 58 : 35.342 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update 11 : 58 : 35.346 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id = ?] 11 : 58 : 35.347 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource 11 : 58 : 35.354 [DefaultQuartzScheduler_Worker - 1 ] INFO club.throwable.jdbc.OrderMessageService - 处理订单[ 10087 ],状态由 0 更新为 1 11 : 58 : 35.355 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL update 11 : 58 : 35.355 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.core.JdbcTemplate - Executing prepared SQL statement [UPDATE t_order_message SET order_status = ?,edit_time = ? WHERE id = ?] 11 : 58 : 35.355 [DefaultQuartzScheduler_Worker - 1 ] DEBUG org.springframework.jdbc.datasource.DataSourceUtils - Fetching JDBC Connection from DataSource 11 : 58 : 35.361 [DefaultQuartzScheduler_Worker - 1 ] INFO club.throwable.jdbc.OrderMessageService - 订单处理定时任务开始完毕...... 11 : 58 : 35.363 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 1 triggers 11 : 58 : 37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.simpl.PropertySettingJobFactory - Producing instance of Job 'delayJob.orderMessageDelayJob' , class = club.throwable.jdbc.OrderMessageDelayJob 11 : 58 : 37.206 [DefaultQuartzScheduler_QuartzSchedulerThread] DEBUG org.quartz.core.QuartzSchedulerThread - batch acquisition of 0 triggers |
RabbitMQ死信队列
使用 RabbitMQ
死信队列依赖于 RabbitMQ
的两个特性: TTL
和 DLX
。
TTL
: Time To Live
,消息存活时间,包括两个维度:队列消息存活时间和消息本身的存活时间。
DLX
: Dead Letter Exchange
,死信交换器。
画个图描述一下这两个特性:
下面为了简单起见, TTL
使用了针对队列的维度。引入 RabbitMQ
的Java驱动:
1
2
3
4
5
6
|
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> <scope>test</scope> </dependency> |
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
public class DlxMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss" ); private static final Logger LOGGER = LoggerFactory.getLogger(DlxMain.class); public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel producerChannel = connection .createChannel(); Channel consumerChannel = connection .createChannel(); // dlx交换器名称为dlx.exchange,类型是direct,绑定键为dlx. key ,队列名为dlx.queue producerChannel.exchangeDeclare( "dlx.exchange" , "direct" ); producerChannel.queueDeclare( "dlx.queue" , false , false , false , null ); producerChannel.queueBind( "dlx.queue" , "dlx.exchange" , "dlx.key" ); Map<String, Object> queueArgs = new HashMap<>(); // 设置队列消息过期时间,5秒 queueArgs.put( "x-message-ttl" , 5000); // 指定DLX相关参数 queueArgs.put( "x-dead-letter-exchange" , "dlx.exchange" ); queueArgs.put( "x-dead-letter-routing-key" , "dlx.key" ); // 声明业务队列 producerChannel.queueDeclare( "business.queue" , false , false , false , queueArgs); ExecutorService executorService = Executors.newSingleThreadExecutor(r -> { Thread thread = new Thread(r); thread.setDaemon( true ); thread.setName( "DlxConsumer" ); return thread; }); // 启动消费者 executorService. execute (() -> { try { consumerChannel.basicConsume( "dlx.queue" , true , new DlxConsumer(consumerChannel)); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } }); OrderMessage message = new OrderMessage( "10086" ); producerChannel.basicPublish( "" , "business.queue" , MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info( "发送消息成功,订单ID:{}" , message.getOrderId()); message = new OrderMessage( "10087" ); producerChannel.basicPublish( "" , "business.queue" , MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info( "发送消息成功,订单ID:{}" , message.getOrderId()); message = new OrderMessage( "10088" ); producerChannel.basicPublish( "" , "business.queue" , MessageProperties.TEXT_PLAIN, message.getDescription().getBytes(StandardCharsets.UTF_8)); LOGGER.info( "发送消息成功,订单ID:{}" , message.getOrderId()); Thread.sleep( Integer .MAX_VALUE); } private static class DlxConsumer extends DefaultConsumer { DlxConsumer(Channel channel) { super(channel); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { LOGGER.info( "处理消息成功:{}" , new String(body, StandardCharsets.UTF_8)); } } private static class OrderMessage { private final String orderId; private final long timestamp ; private final String description; OrderMessage(String orderId) { this.orderId = orderId; this. timestamp = System.currentTimeMillis(); this.description = String.format( "订单[%s],订单创建时间为:%s" , orderId, LocalDateTime.ofInstant(Instant.ofEpochMilli( timestamp ), ZoneId.systemDefault()).format(F)); } public String getOrderId() { return orderId; } public long getTimestamp() { return timestamp ; } public String getDescription() { return description; } } } |
运行 main()
方法结果如下:
1
2
3
4
5
6
|
16:35:58.638 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10086 16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10087 16:35:58.641 [main] INFO club.throwable.dlx.DlxMain - 发送消息成功,订单ID:10088 16:36:03.646 [pool-1-thread-4] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10086],订单创建时间为:2019-08-20 16:35:58 16:36:03.670 [pool-1-thread-5] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10087],订单创建时间为:2019-08-20 16:35:58 16:36:03.670 [pool-1-thread-6] INFO club.throwable.dlx.DlxMain - 处理消息成功:订单[10088],订单创建时间为:2019-08-20 16:35:58 |
时间轮
时间轮 TimingWheel
是一种高效、低延迟的调度数据结构,底层采用数组实现存储任务列表的环形队列,示意图如下:
这里暂时不对时间轮和其实现作分析,只简单举例说明怎么使用时间轮实现延时任务。这里使用 Netty
提供的 HashedWheelTimer
,引入依赖:
1
2
3
4
5
|
<dependency> <groupId>io.netty</groupId> <artifactId>netty-common</artifactId> <version>4.1.39.Final</version> </dependency> |
代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public class HashedWheelTimerMain { private static final DateTimeFormatter F = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss.SSS" ); public static void main(String[] args) throws Exception { AtomicInteger counter = new AtomicInteger(); ThreadFactory factory = r -> { Thread thread = new Thread(r); thread.setDaemon( true ); thread.setName( "HashedWheelTimerWorker-" + counter.getAndIncrement()); return thread; }; // tickDuration - 每tick一次的时间间隔, 每tick一次就会到达下一个槽位 // unit - tickDuration的时间单位 // ticksPerWhee - 时间轮中的槽位数 Timer timer = new HashedWheelTimer(factory, 1, TimeUnit.SECONDS, 60); TimerTask timerTask = new DefaultTimerTask( "10086" ); timer.newTimeout(timerTask, 5, TimeUnit.SECONDS); timerTask = new DefaultTimerTask( "10087" ); timer.newTimeout(timerTask, 10, TimeUnit.SECONDS); timerTask = new DefaultTimerTask( "10088" ); timer.newTimeout(timerTask, 15, TimeUnit.SECONDS); Thread.sleep( Integer .MAX_VALUE); } private static class DefaultTimerTask implements TimerTask { private final String orderId; private final long timestamp ; public DefaultTimerTask(String orderId) { this.orderId = orderId; this. timestamp = System.currentTimeMillis(); } @Override public void run(Timeout timeout) throws Exception { System. out .println(String.format( "任务执行时间:%s,订单创建时间:%s,订单ID:%s" , LocalDateTime.now().format(F), LocalDateTime.ofInstant(Instant.ofEpochMilli( timestamp ), ZoneId.systemDefault()).format(F), orderId)); } } } |
运行结果:
任务执行时间:2019-08-20 17:19:49.310,订单创建时间:2019-08-20 17:19:43.294,订单ID:10086
任务执行时间:2019-08-20 17:19:54.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10087
任务执行时间:2019-08-20 17:19:59.297,订单创建时间:2019-08-20 17:19:43.301,订单ID:10088
一般来说,任务执行的时候应该使用另外的业务线程池,以免阻塞时间轮本身的运动。
选用的方案实现过程
最终选用了基于 Redis
的有序集合 Sorted Set
和 Quartz
短轮询进行实现。具体方案是:
-
订单创建的时候,订单ID和当前时间戳分别作为
Sorted Set
的member和score添加到订单队列Sorted Set
中。 -
订单创建的时候,订单ID和推送内容
JSON
字符串分别作为field和value添加到订单队列内容Hash
中。 -
第1步和第2步操作的时候用
Lua
脚本保证原子性。 -
使用一个异步线程通过
Sorted Set
的命令ZREVRANGEBYSCORE
弹出指定数量的订单ID对应的订单队列内容Hash
中的订单推送内容数据进行处理。
对于第4点处理有两种方案:
-
方案一:弹出订单内容数据的同时进行数据删除,也就是
ZREVRANGEBYSCORE
、ZREM
和HDEL
命令要在同一个Lua
脚本中执行,这样的话Lua
脚本的编写难度大,并且由于弹出数据已经在Redis
中删除,如果数据处理失败则可能需要从数据库重新查询补偿。 -
方案二:弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列
Sorted Set
和订单队列内容Hash
中对应的数据,这样的话需要控制并发,有重复执行的可能性。
最终暂时选用了方案一,也就是从 Sorted Set
弹出订单ID并且从 Hash
中获取完推送数据之后马上删除这两个集合中对应的数据。方案的流程图大概是这样:
这里先详细说明一下用到的 Redis
命令。
Sorted Set相关命令
ZADD
命令 - 将一个或多个成员元素及其分数值加入到有序集当中。
ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN
ZREVRANGEBYSCORE
命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
max:分数区间 - 最大分数。 min:分数区间 - 最小分数。 WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。 LIMIT:可选参数,offset和count原理和 MySQL
的 LIMIT offset,size
一致,如果不指定此参数则返回整个集合的数据。 ZREM
命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。
ZREM key member [member ...]
Hash相关命令 HMSET
命令 - 同时将多个field-value(字段-值)对设置到哈希表中。
HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN
HDEL
命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。
HDEL KEY_NAME FIELD1.. FIELDN
Lua相关 加载 Lua
脚本并且返回脚本的 SHA-1
字符串: SCRIPT LOAD script
。 执行已经加载的 Lua
脚本: EVALSHA sha1 numkeys key [key ...] arg [arg ...]
。 unpack
函数可以把 table
类型的参数转化为可变参数,不过需要注意的是 unpack
函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见 Stackoverflow
的提问 table.unpack() only returns the first element 。
PS:如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。
引入依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.1.7.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>5.1.9.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.59</version> </dependency> </dependencies> |
编写 Lua
脚本 /lua/enqueue.lua
和 /lua/dequeue.lua
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
-- /lua/enqueue.lua local zset_key = KEYS[1] local hash_key = KEYS[2] local zset_value = ARGV[1] local zset_score = ARGV[2] local hash_field = ARGV[3] local hash_value = ARGV[4] redis.call( 'ZADD' , zset_key, zset_score, zset_value) redis.call( 'HSET' , hash_key, hash_field, hash_value) return nil -- /lua/dequeue.lua -- 参考jesque的部分Lua脚本实现 local zset_key = KEYS[1] local hash_key = KEYS[2] local min_score = ARGV[1] local max_score = ARGV[2] local offset = ARGV[3] local limit = ARGV[4] -- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代 local status, type = next (redis.call( 'TYPE' , zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call( 'ZREVRANGEBYSCORE' , zset_key, max_score, min_score, 'LIMIT' , offset, limit) if list ~= nil and #list > 0 then -- unpack函数能把table转化为可变参数 redis.call( 'ZREM' , zset_key, unpack(list)) local result = redis.call( 'HMGET' , hash_key, unpack(list)) redis.call( 'HDEL' , hash_key, unpack(list)) return result end end end return nil |
编写核心API代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
// Jedis提供者 @Component public class JedisProvider implements InitializingBean { private JedisPool jedisPool; @Override public void afterPropertiesSet() throws Exception { jedisPool = new JedisPool(); } public Jedis provide(){ return jedisPool.getResource(); } } // OrderMessage @Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; } // 延迟队列接口 public interface OrderDelayQueue { void enqueue(OrderMessage message); List<OrderMessage> dequeue(String min , String max , String offset, String limit); List<OrderMessage> dequeue(); String enqueueSha(); String dequeueSha(); } // 延迟队列实现类 @RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { private static final String MIN_SCORE = "0" ; private static final String OFFSET = "0" ; private static final String LIMIT = "10" ; private static final String ORDER_QUEUE = "ORDER_QUEUE" ; private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE" ; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua" ; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua" ; private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>(); private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>(); private static final List<String> KEYS = Lists.newArrayList(); private final JedisProvider jedisProvider; static { KEYS. add (ORDER_QUEUE); KEYS. add (ORDER_DETAIL_QUEUE); } @Override public void enqueue(OrderMessage message) { List<String> args = Lists.newArrayList(); args. add (message.getOrderId()); args. add (String.valueOf(System.currentTimeMillis())); args. add (message.getOrderId()); args. add (JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } } @Override public List<OrderMessage> dequeue() { // 30分钟之前 String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT); } @SuppressWarnings( "unchecked" ) @Override public List<OrderMessage> dequeue(String min , String max , String offset, String limit) { List<String> args = new ArrayList<>(); args. add ( max ); args. add ( min ); args. add (offset); args. add (limit); List<OrderMessage> result = Lists.newArrayList(); try (Jedis jedis = jedisProvider.provide()) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args); if ( null != eval) { for (String e : eval) { result. add (JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha() { return ENQUEUE_LUA_SHA.get(); } @Override public String dequeueSha() { return DEQUEUE_LUA_SHA.get(); } @Override public void afterPropertiesSet() throws Exception { // 加载Lua脚本 loadLuaScript(); } private void loadLuaScript() throws Exception { try (Jedis jedis = jedisProvider.provide()) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.compareAndSet( null , sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.compareAndSet( null , sha); } } public static void main(String[] as ) throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss.SSS" ); JedisProvider jedisProvider = new JedisProvider(); jedisProvider.afterPropertiesSet(); RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider); queue.afterPropertiesSet(); // 写入测试数据 OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(10086)); message.setOrderId( "ORDER_ID_10086" ); message.setUserId(10086L); message.setTimestamp(LocalDateTime.now().format(f)); List<String> args = Lists.newArrayList(); args. add (message.getOrderId()); // 测试需要,score设置为30分钟之前 args. add (String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)); args. add (message.getOrderId()); args. add (JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } List<OrderMessage> dequeue = queue.dequeue(); System. out .println(dequeue); } } |
这里先执行一次 main()
方法验证一下延迟队列是否生效:
1
|
[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp =2019-08-21 08:32:22.885)] |
确定延迟队列的代码没有问题,接着编写一个 Quartz
的 Job
类型的消费者 OrderMessageConsumer
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
@DisallowConcurrentExecution @Component public class OrderMessageConsumer implements Job { private static final AtomicInteger COUNTER = new AtomicInteger(); private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread thread = new Thread(r); thread.setDaemon( true ); thread.setName( "OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute (JobExecutionContext jobExecutionContext) throws JobExecutionException { StopWatch stopWatch = new StopWatch(); stopWatch.start(); LOGGER.info( "订单消息处理定时任务开始执行......" ); List<OrderMessage> messages = orderDelayQueue.dequeue(); if (!messages.isEmpty()) { // 简单的列表等分放到线程池中执行 List<List<OrderMessage>> partition = Lists.partition(messages, 2); int size = partition. size (); final CountDownLatch latch = new CountDownLatch( size ); for (List<OrderMessage> p : partition) { BUSINESS_WORKER_POOL. execute (new ConsumeTask(p, latch)); } try { latch.await(); } catch (InterruptedException ignore ) { // ignore } } stopWatch.stop(); LOGGER.info( "订单消息处理定时任务执行完毕,耗时:{} ms......" , stopWatch.getTotalTimeMillis()); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final List<OrderMessage> messages; private final CountDownLatch latch; @Override public void run() { try { // 实际上这里应该单条捕获异常 for (OrderMessage message : messages) { LOGGER.info( "处理订单信息,内容:{}" , message); } } finally { latch.countDown(); } } } } |
上面的消费者设计的时候需要有以下考量:
-
使用
@DisallowConcurrentExecution
注解不允许Job
并发执行,其实多个Job
并发执行意义不大,因为我们采用的是短间隔的轮询,而Redis
是单线程处理命令,在客户端做多线程其实效果不佳。 -
线程池
BUSINESS_WORKER_POOL
的线程容量或者队列应该综合LIMIT
值、等分订单信息列表中使用的size
值以及ConsumeTask
里面具体的执行时间进行考虑,这里只是为了方便使用了固定容量的线程池。 -
ConsumeTask
中应该对每一条订单信息的处理单独捕获异常和吞并异常,或者把处理单个订单信息的逻辑封装成一个不抛出异常的方法。
其他 Quartz
相关的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// Quartz配置类 @Configuration public class QuartzAutoConfiguration { @Bean public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setAutoStartup( true ); factory.setJobFactory(quartzAutowiredJobFactory); return factory; } @Bean public QuartzAutowiredJobFactory quartzAutowiredJobFactory() { return new QuartzAutowiredJobFactory(); } public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory; } @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); // 这里利用AutowireCapableBeanFactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例 autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } } } |
这里暂时使用了内存态的 RAMJobStore
去存放任务和触发器的相关信息,如果在生产环境最好替换成基于 MySQL
也就是 JobStoreTX
进行集群化,最后是启动函数和 CommandLineRunner
的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class}) public class Application implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Override public void run(String... args) throws Exception { // 准备一些测试数据 prepareOrderMessageData(); JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity( "OrderMessageConsumer" , "DelayTask" ) .build(); // 触发器5秒触发一次 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity( "OrderMessageConsumerTrigger" , "DelayTask" ) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()) .build(); scheduler.scheduleJob(job, trigger ); } private void prepareOrderMessageData() throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern( "yyyy-MM-dd HH:mm:ss.SSS" ); try (Jedis jedis = jedisProvider.provide()) { List<OrderMessage> messages = Lists.newArrayList(); for ( int i = 0; i < 100; i++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(i)); message.setOrderId( "ORDER_ID_" + i); message.setUserId((long) i); message.setTimestamp(LocalDateTime.now().format(f)); messages. add (message); } // 这里暂时不使用Lua Map<String, Double > map = Maps.newHashMap(); Map<String, String> hash = Maps.newHashMap(); for (OrderMessage message : messages) { // 故意把score设计成30分钟前 map.put(message.getOrderId(), Double .valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000))); hash.put(message.getOrderId(), JSON.toJSONString(message)); } jedis.zadd( "ORDER_QUEUE" , map); jedis.hmset( "ORDER_DETAIL_QUEUE" , hash); } } } |
输出结果如下:
2019-08-21 22:45:59.518 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 订单消息处理定时任务开始执行......
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:45:59.540 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 订单消息处理定时任务执行完毕,耗时:22 ms......
2019-08-21 22:46:04.515 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 订单消息处理定时任务开始执行......
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475)
2019-08-21 22:46:04.516 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 订单消息处理定时任务执行完毕,耗时:1 ms......
......
首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:
-
切换
JobStore
为JDBC
模式,Quartz
官方有完整教程,或者看笔者之前翻译的Quartz
文档。 - 需要监控或者收集任务的执行状态,添加预警等等。
这里其实有一个性能隐患,命令 ZREVRANGEBYSCORE
的时间复杂度可以视为为 O(N)
, N
是集合的元素个数,由于这里把所有的订单信息都放进了同一个 Sorted Set
( ORDER_QUEUE
)中,所以在一直有新增数据的时候, dequeue
脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案。
小结
这篇文章主要从一个实际生产案例的仿真例子入手,分析了当前延时任务的一些实现方案,还基于 Redis
和 Quartz
给出了一个完整的示例。当前的示例只是处于可运行的状态,有些问题尚未解决。下一篇文章会着眼于解决两个方面的问题:
- 分片。
- 监控。
还有一点, 架构是基于业务形态演进出来的,很多东西需要结合场景进行方案设计和改进,思路仅供参考,切勿照搬代码 。
以上所述是小编给大家介绍的使用Redis实现延时任务的解决方案,非常不错,具有一定的参考借鉴价值,需要的朋友参考下吧!
原文链接:http://www.throwable.club/2019/08/21/redis-delay-task-first/