1.1项目背景:做一个灾情预警的消息平台,灾情检查系统需要向消息平台里面推送消息,这里是典型的异构系统的消息传递,我们需要选择一个中间件作为消息队列,调研分析了rabbitmq,zeromq,activemq,kafka等消息中间件,综合性能,安全,可持久化等角度果断选择了rabbitmq作为我们的消息中间件 (其实这里是因为rabbitmq 是spring官方支持的,开发起来方便)。需求上我们有多种类型的消息,这里有紧急推送的和一般的等区分,高并发时,就会有对消息进行优先推送的情况出现,于是rabbitmq消息队优先级的推送功能是我们需要解决的首个技术点.
1.2技术调研:这里一个概念需要说明,为什么说是消息队列的优先级而不是消息的优先级,来看下消息队列的工作原理
生产者生成消息打到交换机里面(如果没有声明交换机,会打到default exchange里面),交换机绑定一个或多个队列,消息进入队列里面,消费者一直在监听队列,发现队列里面有消息就开始消费,这里就是一个消息传递的过程,queue是一个栈队列,栈是先进先出的,就是说消息来了依次排队,一个队列并不能实现消息的插队和优先推送的功能。但是如果说我们的多个队列有不同的优先级,不同优先级的消息通过roatingkey进入不同的队列,优先级高的队列消息被优先消费,这样也能形成一个相对意义上的优先级,所以说这里不是消息的优先级而是队列的优先级.
1.2.1 为什么说是相对意义上的优先级
有并发才有优先级,如果每个消息都能被瞬间处理也不会有消息优先推送的需求,那我们看看消息会在哪里阻塞
1,queue,很明显高并发的时候队列里面是会存在很多消息的,2,eschange ,高并发的时候producer发送给exchange的时候也会产生阻塞。
第一种情况由于我们队列已经定义优先级了,所以进入队列的消息都是同种优先级别的,并不需要插队。而对于第二种情况,消息在exchange时阻塞时并不能实现消息优先进入队列,依然是一个依次处理的情景,但是由于exchang到queue的处理速度极快,所有我们忽略了这块的优先级。
1.2.3 代码实现
在rabbitmq3.5版本之前,官方并没有实现队列优先级的功能,但论坛里面有一些插件可以实现(末尾附链接),这里我们主要说3.5版本之后的实现
1.2.3.1 Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
Connectionconn =RabbitMQConnectionUtil.getRabbitmqConnection(); //创建连接 Channelchannel = conn.createChannel(); //创建channel Map<String,Object> arg = newHashMap<String, Object>(); arg.put( "x-max-priority" , 10 ); //队列的属性参数 有10个优先级别 // 声明(创建)队列 //channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueDeclare(QUEUE_NAME, true , false , false , arg); // 消息内容 String message = "Hello World!" ; channel.basicPublish( "" ,QUEUE_NAME, null , message.getBytes()); BasicPropertiesprop = new BasicProperties( null , null , null , null , 1 , null , null , null , null , null , null , null , null , null ); //消息的参数,声明该消息的优先级是1 channel.basicPublish( "" ,QUEUE_NAME, prop, message.getBytes()); //消息发布 System.out.println( "[x] Sent '" + message + "'" ); //关闭通道和连接 channel.close(); conn.close(); |
客户端看下结果:
1.2.3.2结合spring实现:
1.2.3.2.1 xml配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
< beans xmlns = "http://www.springframework.org/schema/beans" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit = "http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" > < description >rabbitmq 连接服务配置</ description > <!-- 连接配置 --> < rabbit:connection-factory id = "connectionFactory" host = "${rabbit.ip}" username = "${rabbit.username}" password = "${rabbit.password}" port = "${rabbit.port}" virtual-host = "${rabbit.vhost}" /> < rabbit:admin connection-factory = "connectionFactory" /> < rabbit:template id = "amqpTemplate" connection-factory = "connectionFactory" /> <!-- spring template声明--> <!-- 声明一个队列 --> < rabbit:queue id = "test_queue_key" name = "test_queue_key" durable = "true" auto-delete = "false" exclusive = "false" > < rabbit:queue-arguments > < entry key = "x-max-priority" > < value type = "java.lang.Integer" >10</ value >//这个地方一定是integer的,别的不好使!! </ entry > </ rabbit:queue-arguments > </ rabbit:queue > <!-- 监听配置queues:监听的队列,多个的话用逗号(,)分隔 ref:监听器--> < rabbit:listener-container connection-factory = "connectionFactory" acknowledge = "auto" > < rabbit:listener queue-names = "test_queue_key" ref = "queueListenter" method = "onMessage" /> </ rabbit:listener-container > < bean id = "queueListenter" class = "com.DF.spring.springAMQP.QueueListener" /> |
1.2.3.2.2代码部分:
producter:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext( "classpath:/spring/rabbitmq-contextDemo2.xml" ); RabbitTemplate amqpTemplate = ctx.getBean(RabbitTemplate. class ); Random random = new Random(); for ( int i= 0 ; i< 1000 ; i++){ final int priority = random.nextInt( 10 - 1 + 1 ) + 1 ; //随机的优先级 amqpTemplate.convertAndSend( "test_queue_key" , (Object)( "hello world" ), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(priority); return message; } }); } |
customer:
1
2
3
4
5
6
7
8
9
10
11
|
public class QueueListener implements MessageListener{ @Override public void onMessage(Message message) { try { System.out.print( "[x] 接收到的消息:" + new String(message.getBody(), "utf-8" )+ "&&&" + "优先级" +message.getMessageProperties().getPrority()); Thread.sleep( 1000 ); } catch (Exception e){ e.printStackTrace(); } } } |
从客户端看下队列里面的消息:
我们发送随机优先级的消息进入队列,看看消费端打印出来的消息:
到这里,rabbitmq结合spring的demo功能实现......
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/qq_33994587/article/details/52689527?locationNum=4&fps=1