1、在《ActiveMQ 基于zookeeper的主从(levelDB Master/Slave)搭建以及Spring-boot下使用》中,采用以下代码进行JMS消息发送:
1
2
3
4
5
6
7
8
9
10
|
@Service public class Producer { @Autowired private JmsMessagingTemplate jmsTemplate; public void sendMessage(Destination destination, final String message){ jmsTemplate.convertAndSend(destination, message); } } |
经使用JMeter进行压力测试,发现JMS的发送消息特别慢。
2、下面通过自定义CachingConnectionFactory解决。
(1)SenderConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
package com.example.springbootactivemq.jms; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.connection.CachingConnectionFactory; import org.springframework.jms.core.JmsTemplate; /** * Created by yan on 2017/8/3. */ @Configuration public class SenderConfig { @Value ( "${spring.activemq.broker-url}" ) private String brokerUrl; @Bean public ActiveMQConnectionFactory activeMQConnectionFactory() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(brokerUrl); return activeMQConnectionFactory; } @Bean public CachingConnectionFactory cachingConnectionFactory() { return new CachingConnectionFactory(activeMQConnectionFactory()); } @Bean public JmsTemplate jmsTemplate() { return new JmsTemplate(cachingConnectionFactory()); } @Bean public Sender sender() { return new Sender(); } } |
(2)Sender.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package com.example.springbootactivemq.jms; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.Session; import javax.jms.TextMessage; /** * Created by yan on 2017/8/3. */ public class Sender { @Autowired private JmsTemplate jmsTemplate; public void send( final String destination, final String message){ this .jmsTemplate.convertAndSend(destination, message); } } |
(3)Receiver.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
package com.example.springbootactivemq.jms; import org.springframework.jms.annotation.JmsListener; import org.springframework.jms.listener.SessionAwareMessageListener; import org.springframework.jms.support.JmsUtils; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; /** * Created by yan on 2017/8/3. */ public class Receiver implements SessionAwareMessageListener<TextMessage> { @JmsListener (destination = "${queue.destination}" ) public void receive(String message) { try { Thread.sleep( 2000 ); } catch (InterruptedException e) { e.printStackTrace(); } } } |
(4)ReceiverConfig.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.example.springbootactivemq.jms; import org.apache.activemq.ActiveMQConnectionFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; /** * Created by yan on 2017/8/3. */ @Configuration @EnableJms public class ReceiverConfig { @Value ( "${spring.activemq.broker-url}" ) private String brokerUrl; @Bean public ActiveMQConnectionFactory activeMQConnectionFactory() { ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setBrokerURL(brokerUrl); return activeMQConnectionFactory; } @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(activeMQConnectionFactory()); factory.setConcurrency( "3-10" ); return factory; } @Bean public Receiver receiver() { return new Receiver(); } } |
(5)TestCtrl.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package com.example.springbootactivemq.test; import com.example.springbootactivemq.jms.Sender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.util.HashMap; import java.util.Map; /** * Created by yan on 2017/8/2. */ @RestController @RequestMapping ( value = "/test" , headers = "Accept=application/json" , produces = "application/json;charset=utf-8" ) public class TestCtrl { @Autowired private Sender sender; @Value ( "${queue.destination}" ) private String destination; @RequestMapping ( value = "/say/{msg}/to/{name}" , method = RequestMethod.GET ) public Map<String, Object> say( @PathVariable String msg, @PathVariable String name){ Map<String, Object> map = new HashMap<>(); map.put( "msg" , msg); map.put( "name" , name); sender.send(destination, msg); return map; } } |
(6)application.properties
1
2
3
4
5
6
7
8
|
spring.activemq.broker-url=failover:(tcp: //192.168.3.10:61616,tcp://192.168.3.11:61616,tcp://192.168.3.12:61616) spring.activemq.in-memory= true spring.activemq.pool.enabled= false spring.activemq.user=admin spring.activemq.password=admin queue.destination=test.queue queue.concurrency= 3 - 10 |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。