一、什么是 RabbitMQ
RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包。
二、如何与 Spring 集成
1. 我们都需要哪些 Jar 包?
抛开单独使用 Spring 的包不说,引入 RabbitMQ 我们还需要两个:
1
2
3
4
5
6
7
8
9
10
11
|
<!-- RabbitMQ --> < dependency > < groupId >com.rabbitmq</ groupId > < artifactId >amqp-client</ artifactId > < version >3.5.1</ version > </ dependency > < dependency > < groupId >org.springframework.amqp</ groupId > < artifactId >spring-rabbit</ artifactId > < version >1.4.5.RELEASE</ version > </ dependency > |
2. 使用外部参数文件 application.properties:
1
2
3
4
5
6
|
mq.host=127.0.0.1 mq.username=queue mq.password=1234 mq.port=8001 # 统一XML配置中易变部分的命名 mq.queue=test_mq |
易变指的是在实际项目中,如果测试与生产环境使用的同一个 RabbitMQ 服务器。那我们在部署时直接修改 properties 文件的参数即可,防止测试与生产环境混淆。
修改 applicationContext.xml 文件,引入我们创建的 properties 文件
1
2
|
< context:property-placeholder location = "classpath:application.properties" /> < util:properties id = "appConfig" location = "classpath:application.properties" ></ util:properties > |
3. 连接 RabbitMQ 服务器
1
2
3
4
5
|
<!-- 连接配置 --> < rabbit:connection-factory id = "connectionFactory" host = "${mq.host}" username = "${mq.username}" password = "${mq.password}" port = "${mq.port}" /> < rabbit:admin connection-factory = "connectionFactory" /> |
4. 声明一个 RabbitMQ Template
<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />
5. 在 applicationContext.xml 中声明一个交换机,name 属性需配置到 RabbitMQ 服务器。
1
2
3
4
5
|
< rabbit:topic-exchange name = "${mq.queue}_exchange" durable = "true" auto-delete = "false" > < rabbit:bindings > < rabbit:binding queue = "test_queue" pattern = "${mq.queue}_patt" /> </ rabbit:bindings > </ rabbit:topic-exchange > |
交换机的四种模式:
- direct:转发消息到 routigKey 指定的队列。
- topic:按规则转发消息(最灵活)。
- headers:(这个还没有接触到)
- fanout:转发消息到所有绑定队列
交换器的属性:
- 持久性:如果启用,交换器将会在server重启前都有效。
- 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
- 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。
如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。
一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。
topic 类型交换器通过模式匹配分析消息的 routing-key 属性。它将 routing-key 和 binding-key 的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,但是不匹配 stock.nana。
因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。
6. 在 applicationContext.xml 中声明一个队列,name 属性是需要配置到 RabbitMQ 服务器的。
<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />
- durable:是否持久化
- exclusive:仅创建者可以使用的私有队列,断开后自动删除
- auto-delete:当所有消费端连接断开后,是否自动删除队列
7. 创建生产者端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @Description: 消息队列发送者 * @Author: * @CreateTime: */ @Service public class Producer { @Autowired private AmqpTemplate amqpTemplate; public void sendQueue(String exchange_key, String queue_key, Object object) { // convertAndSend 将Java对象转换为消息发送至匹配key的交换机中Exchange amqpTemplate.convertAndSend(exchange_key, queue_key, object); } } |
8. 在 applicationContext.xml 中配置监听及消费者端
1
2
3
4
5
6
7
8
9
10
11
|
<!-- 消费者 --> < bean name = "rabbitmqService" class = "com.enh.mq.RabbitmqService" ></ bean > <!-- 配置监听 --> < rabbit:listener-container connection-factory = "connectionFactory" acknowledge = "auto" > <!-- queues 监听队列,多个用逗号分隔 ref 监听器 --> < rabbit:listener queues = "test_queue" ref = "rabbitmqService" /> </ rabbit:listener-container > |
消费者 Java 代码:
1
2
3
4
5
6
7
8
|
import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; public class RabbitmqService implements MessageListener { public void onMessage(Message message) { System.out.println( "消息消费者 = " + message.toString()); } } |
至此,我们的所有配置文件就写完了,最终如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
<? xml version = "1.0" encoding = "UTF-8" ?> < beans xmlns = "http://www.springframework.org/schema/beans" xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance" xmlns:context = "http://www.springframework.org/schema/context" xmlns:util = "http://www.springframework.org/schema/util" xmlns:aop = "http://www.springframework.org/schema/aop" xmlns:tx = "http://www.springframework.org/schema/tx" xmlns:rabbit = "http://www.springframework.org/schema/rabbit" xmlns:p = "http://www.springframework.org/schema/p" xsi:schemaLocation=" http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd"> <!-- RabbitMQ start --> <!-- 连接配置 --> < rabbit:connection-factory id = "connectionFactory" host = "${mq.host}" username = "${mq.username}" password = "${mq.password}" port = "${mq.port}" /> < rabbit:admin connection-factory = "connectionFactory" /> <!-- 消息队列客户端 --> < rabbit:template id = "amqpTemplate" exchange = "${mq.queue}_exchange" connection-factory = "connectionFactory" /> <!-- queue 队列声明 --> <!-- durable 是否持久化 exclusive 仅创建者可以使用的私有队列,断开后自动删除 auto-delete 当所有消费端连接断开后,是否自动删除队列 --> < rabbit:queue id = "test_queue" name = "${mq.queue}_testQueue" durable = "true" auto-delete = "false" exclusive = "false" /> <!-- 交换机定义 --> <!-- 交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。 如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。 direct模式:消息与一个特定的路由器完全匹配,才会转发 topic模式:按规则转发消息,最灵活 --> < rabbit:topic-exchange name = "${mq.queue}_exchange" durable = "true" auto-delete = "false" > < rabbit:bindings > <!-- 设置消息Queue匹配的pattern (direct模式为key) --> < rabbit:binding queue = "test_queue" pattern = "${mq.queue}_patt" /> </ rabbit:bindings > </ rabbit:topic-exchange > < bean name = "rabbitmqService" class = "com.enh.mq.RabbitmqService" ></ bean > <!-- 配置监听 消费者 --> < rabbit:listener-container connection-factory = "connectionFactory" acknowledge = "auto" > <!-- queues 监听队列,多个用逗号分隔 ref 监听器 --> < rabbit:listener queues = "test_queue" ref = "rabbitmqService" /> </ rabbit:listener-container > </ beans > |
9. 如何使用 RabbitMQ 发送一个消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@Autowired private Producer producer; @Value ( "#{appConfig['mq.queue']}" ) private String queueId; /** * @Description: 消息队列 * @Author: * @CreateTime: */ @ResponseBody @RequestMapping ( "/sendQueue" ) public String testQueue() { try { Map<String, Object> map = new HashMap<String, Object>(); map.put( "data" , "hello rabbitmq" ); producer.sendQueue(queueId + "_exchange" , queueId + "_patt" , map); } catch (Exception e) { e.printStackTrace(); } return "发送完毕" ; } |
嗯。这个测试是 SpringMVC 框架。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.cnblogs.com/libra0920/p/6230421.html