本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助。
打开IDEA创建一个maven工程(Java就可以了)。
pom.xml文件如下
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
< project xmlns = " http://maven.apache.org/POM/4.0.0 " xmlns:xsi = " http://www.w3.org/2001/XMLSchema-instance " xsi:schemaLocation = " http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd " > < modelVersion >4.0.0</ modelVersion > < groupId >com.zhenqi</ groupId > < artifactId >rabbitmq-study</ artifactId > < version >1.0-SNAPSHOT</ version > < packaging >jar</ packaging > < name >rabbitmq-study</ name > < url > http://maven.apache.org </ url > < properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > </ properties > < dependencies > < dependency > < groupId >junit</ groupId > < artifactId >junit</ artifactId > < version >4.12</ version > < scope >test</ scope > </ dependency > <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client --> < dependency > < groupId >com.rabbitmq</ groupId > < artifactId >amqp-client</ artifactId > < version >4.1.0</ version > < exclusions > < exclusion > < groupId >org.slf4j</ groupId > < artifactId >slf4j-api</ artifactId > </ exclusion > </ exclusions > </ dependency > < dependency > < groupId >org.slf4j</ groupId > < artifactId >slf4j-log4j12</ artifactId > < version >1.7.21</ version > </ dependency > < dependency > < groupId >commons-lang</ groupId > < artifactId >commons-lang</ artifactId > < version >2.6</ version > </ dependency > </ dependencies > </ project > |
为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。
1
|
2
3
|
[ {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]} ] |
添加administrator角色
1
|
|
rabbitmqctl set_user_tags openstack administrator |
创建抽象队列 EndPoint.java
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package com.zhenqi; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * Created by wuming on 2017/7/16. */ public abstract class EndPoint { protected Channel channel; protected Connection connection; protected String endPointName; public EndPoint(String endpointName) throws Exception { this .endPointName = endpointName; //创建一个连接工厂 connection factory ConnectionFactory factory = new ConnectionFactory(); //设置rabbitmq-server服务IP地址 factory.setHost( "192.168.146.128" ); factory.setUsername( "openstack" ); factory.setPassword( "rabbitmq" ); factory.setPort( 5672 ); factory.setVirtualHost( "/" ); //得到 连接 connection = factory.newConnection(); //创建 channel实例 channel = connection.createChannel(); channel.queueDeclare(endpointName, false , false , false , null ); } /** * 关闭channel和connection。并非必须,因为隐含是自动调用的。 * @throws IOException */ public void close() throws Exception{ this .channel.close(); this .connection.close(); } } |
生产者Producer.java
生产者类的任务是向队列里写一条消息
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
package com.zhenqi; import org.apache.commons.lang.SerializationUtils; import java.io.Serializable; /** * Created by wuming on 2017/7/16. */ public class Producer extends EndPoint { public Producer(String endpointName) throws Exception { super (endpointName); } public void sendMessage(Serializable object) throws Exception { channel.basicPublish( "" ,endPointName, null , SerializationUtils.serialize(object)); } } |
消费者QueueConsumer.java
消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package com.zhenqi; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.ShutdownSignalException; import org.apache.commons.lang.SerializationUtils; import org.apache.log4j.Logger; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * Created by wuming on 2017/7/16. */ public class QueueConsumer extends EndPoint implements Runnable, Consumer { private Logger LOG=Logger.getLogger(QueueConsumer. class ); public QueueConsumer(String endpointName) throws Exception { super (endpointName); } public void handleConsumeOk(String s) { } public void handleCancelOk(String s) { } public void handleCancel(String s) throws IOException { } public void handleShutdownSignal(String s, ShutdownSignalException e) { } public void handleRecoverOk(String s) { LOG.info( "Consumer " +s + " registered" ); } public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte [] bytes) throws IOException { Map map = (HashMap) SerializationUtils.deserialize(bytes); LOG.info( "Message Number " + map.get( "message number" ) + " received." ); } public void run() { try { channel.basicConsume(endPointName, true , this ); } catch (IOException e){ e.printStackTrace(); } } } |
测试
运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走
1
|
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package com.zhenqi; import java.util.HashMap; /** * Created by wuming on 2017/7/16. */ public class TestRabbitmq { public static void main(String[] args){ try { QueueConsumer consumer = new QueueConsumer( "queue" ); Thread consumerThread = new Thread(consumer); consumerThread.start(); Producer producer = new Producer( "queue" ); for ( int i = 0 ; i < 100000 ; i++){ HashMap message = new HashMap(); message.put( "message number" , i); producer.sendMessage(message); System.out.println( "Message Number " + i + " sent." ); } } catch (Exception e){ e.printStackTrace(); } } } |
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://blog.csdn.net/coco2d_x2014/article/details/75213318?utm_source=tuicool&utm_medium=referral