前言
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有activemq,rabbitmq,zeromq,kafka,metamq,rocketmq
消息队列应用场景
消息队列在实际应用中常用的使用场景:异步处理,应用解耦,流量削锋和消息通讯四个场景。
本文主要给大家介绍的是关于java中消息队列任务平滑关闭的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧。
1.问题背景
对于消息队列任务的监听,我们一般使用java写一个独立的程序,在linux服务器上运行。当订阅者程序启动后,会通过消息队列客户端接收消息,放入线程池中并发的处理。
那么问题来了,当我们修改程序后,需要重新启动时,如何保证消息都能够被处理呢?
一些开源的消息队列中间件,会提供ack机制(消息确认机制),当订阅者处理完消息后,会通知服务端删除对应消息,如果订阅者出现异常,服务端未收到确认消费,则会重试发送。
那如果消息队列中间件没有提供ack机制,或者为了高吞度量的考虑关闭了ack功能,如何最大可能保证消息都能够被处理呢?
正常来说,订阅者程序关闭后,消息会在队列中堆积,等待订阅者下次订阅消费,所以未接收的消息是不会丢失的。可能出现的问题就是在关闭的一瞬间,已经从消息队列中取出,但还没有被处理的消息。
因此我们需要一套平滑关闭的机制,保证在重启的时候,已接收的消息可以得到正常处理。
2.问题分析
平滑关闭的思路如下:
- 在关闭程序时,首先关闭消息订阅,保证不再接收新的消息。
- 关闭线程池,等待线程池中的消息处理完毕。
- 程序退出。
关闭消息订阅:消息队列的客户端都会提供关闭连接的方法,具体可以自行查看api。
关闭线程池:java的threadpoolexecutor线程池提供shutdown()
和shutdownnow()
两个方法,区别是前者会等待线程池中的消息都处理完毕,后者会直接停止所有线程并返回未处理完的线程list。因为我们需要使用shutdown()
方法进行关闭,并通过isterminated()
方法,判断线程池是否已经关闭。
那么问题又来了,我们如何通知到程序,需要执行关闭操作呢?
在linux中,进程的关闭是通过信号传递的,我们可以用kill -9 pid关闭进程,除了-9之外,我们可以通过 kill -l,查看kill命令的其它信号量。
这里提供两种关闭方法:
-
程序中添加
runtime.getruntime().addshutdownhook
钩子方法,sigterm,sigint,sighup三种信号都会触发该方法(分别对应kill -1/kill -2/kill -15,ctrl+c也会触发sigint信号)。 - 程序中通过signal类注册信号监听,比如usr2(对应kill -12),在handle方法中执行关闭操作。
补充说明:addshutdownhook方法和handle方法中如果再调用system.exit
,会造成deadlock,使进程无法正常退出。
伪代码分别如下
1
2
3
4
5
6
7
|
runtime.getruntime().addshutdownhook( new thread() { public void run() { //关闭订阅者 //关闭线程池 //退出 } }); |
1
2
3
4
5
6
7
8
9
10
|
//注册linux kill信号量 kill -12 signal sig = new signal( "usr2" ); signal.handle(sig, new signalhandler() { @override public void handle(signal signal) { //关闭订阅者 //关闭线程池 //退出 } }); |
模拟demo
下面通过一个demo模拟相关逻辑操作
首先模拟一个生产者,每秒生产5个消息
然后模拟一个订阅者,收到消息后,放入线程池进行处理,线程池固定4个线程,每个线程处理时间1秒,这样线程池每秒会积压1个消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
|
package com.lujianing.demo; import sun.misc.signal; import sun.misc.signalhandler; import java.util.concurrent.*; /** * @author lujianing01@58.com * @description: * @date 2016/11/14 */ public class msgclient { //模拟消费线程池 同时4个线程处理 private static final threadpoolexecutor thread_pool = (threadpoolexecutor) executors.newfixedthreadpool( 4 ); //模拟消息生产任务 private static final scheduledexecutorservice scheduled_executor_service = executors.newsinglethreadscheduledexecutor(); //用于判断是否关闭订阅 private static volatile boolean isclose = false ; public static void main(string[] args) throws interruptedexception { //注册钩子方法 runtime.getruntime().addshutdownhook( new thread() { public void run() { close(); } }); blockingqueue <string> queue = new arrayblockingqueue<string>( 100 ); producer(queue); consumer(queue); } //模拟消息队列生产者 private static void producer( final blockingqueue queue){ //每200毫秒向队列中放入一个消息 scheduled_executor_service.scheduleatfixedrate( new runnable() { public void run() { queue.offer( "" ); } }, 0l, 200l, timeunit.milliseconds); } //模拟消息队列消费者 生产者每秒生产5个 消费者4个线程消费1个1秒 每秒积压1个 private static void consumer( final blockingqueue queue) throws interruptedexception { while (!isclose){ getpoolbacklogsize(); //从队列中拿到消息 final string msg = (string)queue.take(); //放入线程池处理 if (!thread_pool.isshutdown()) { thread_pool.execute( new runnable() { public void run() { try { //system.out.println(msg); timeunit.milliseconds.sleep(1000l); } catch (interruptedexception e) { e.printstacktrace(); } } }); } } } //查看线程池堆积消息个数 private static long getpoolbacklogsize(){ long backlog = thread_pool.gettaskcount()- thread_pool.getcompletedtaskcount(); system.out.println(string.format( "[%s]thread_pool backlog:%s" ,system.currenttimemillis(),backlog)); return backlog; } private static void close(){ system.out.println( "收到kill消息,执行关闭操作" ); //关闭订阅消费 isclose = true ; //关闭线程池,等待线程池积压消息处理 thread_pool.shutdown(); //判断线程池是否关闭 while (!thread_pool.isterminated()) { try { //每200毫秒 判断线程池积压数量 getpoolbacklogsize(); timeunit.milliseconds.sleep(200l); } catch (interruptedexception e) { e.printstacktrace(); } } system.out.println( "订阅者关闭,线程池处理完毕" ); } static { string osname = system.getproperty( "os.name" ).tolowercase(); if (osname != null && osname.indexof( "window" ) == - 1 ) { //注册linux kill信号量 kill -12 signal sig = new signal( "usr2" ); signal.handle(sig, new signalhandler() { @override public void handle(signal signal) { close(); } }); } } } |
当我们在服务上运行时,通过控制台可以看到相关的输出信息,demo中输出了线程池的积压消息个数
1
|
java -cp /home/work/lujianing/msg-queue-client/* com.lujianing.demo.msgclient |
另打开一个终端,通过ps命令查看进程号,或者通过nohup启动java进程拿到进程id
1
|
ps -fe|grep msgclient |
当我们执行kill -12 pid
的时候 可以看到关闭业务逻辑
3.总结
其实不单单消息队列任务,在常见的rpc服务中也会见到类似的功能,比如58的scf,在源码中,也会分别注册了usr2信号量和addshutdownhook钩子方法。
在重启脚本中,首先会发送kill -12
命令,rpc服务收到信号后会修改server状态为关闭。接着会发送kill -15
命令,触发钩子方法,关闭所有的连接。
好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对服务器之家的支持。
原文链接:https://my.oschina.net/lujianing/blog/787745