服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|编程技术|正则表达式|C/C++|

服务器之家 - 编程语言 - JAVA教程 - java远程连接调用Rabbitmq的实例代码

java远程连接调用Rabbitmq的实例代码

2020-12-03 09:38甄齐才 JAVA教程

本篇文章主要介绍了java远程连接调用Rabbitmq的实例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

本文介绍了java远程连接调用Rabbitmq,分享给大家,希望此文章对各位有所帮助。

打开IDEA创建一个maven工程(Java就可以了)。

java远程连接调用Rabbitmq的实例代码

pom.xml文件如下

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 
 <groupId>com.zhenqi</groupId>
 <artifactId>rabbitmq-study</artifactId>
 <version>1.0-SNAPSHOT</version>
 <packaging>jar</packaging>
 
 <name>rabbitmq-study</name>
 <url>http://maven.apache.org</url>
 
 <properties>
  <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
 </properties>
 
 <dependencies>
  <dependency>
   <groupId>junit</groupId>
   <artifactId>junit</artifactId>
   <version>4.12</version>
   <scope>test</scope>
  </dependency>
 
  <!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
  <dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>4.1.0</version>
   <exclusions>
    <exclusion>
     <groupId>org.slf4j</groupId>
     <artifactId>slf4j-api</artifactId>
    </exclusion>
   </exclusions>
  </dependency>
 
  <dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.21</version>
  </dependency>
 
  <dependency>
   <groupId>commons-lang</groupId>
   <artifactId>commons-lang</artifactId>
   <version>2.6</version>
  </dependency>
 
 </dependencies>
</project>

为了能远程访问rabbitmq,则需要编辑 /etc/rabbitmq/rabbitmq.conf,添加以下内容。

 
?
1
 
2
3
[
  {rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
 ]

添加administrator角色

 
?
1
 
rabbitmqctl set_user_tags openstack administrator

创建抽象队列 EndPoint.java

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.zhenqi;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
 
 
/**
 * Created by wuming on 2017/7/16.
 */
public abstract class EndPoint {
 
  protected Channel channel;
  protected Connection connection;
  protected String endPointName;
 
  public EndPoint(String endpointName) throws Exception {
 
    this.endPointName = endpointName;
 
    //创建一个连接工厂 connection factory
    ConnectionFactory factory = new ConnectionFactory();
 
    //设置rabbitmq-server服务IP地址
    factory.setHost("192.168.146.128");
    factory.setUsername("openstack");
    factory.setPassword("rabbitmq");
    factory.setPort(5672);
    factory.setVirtualHost("/");
 
    //得到 连接
    connection = factory.newConnection();
 
    //创建 channel实例
    channel = connection.createChannel();
 
    channel.queueDeclare(endpointName, false, false, false, null);
  }
 
  /**
   * 关闭channel和connection。并非必须,因为隐含是自动调用的。
   * @throws IOException
   */
  public void close() throws Exception{
    this.channel.close();
    this.connection.close();
  }
}

生产者Producer.java

生产者类的任务是向队列里写一条消息

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package com.zhenqi;
 
import org.apache.commons.lang.SerializationUtils;
 
import java.io.Serializable;
 
/**
 * Created by wuming on 2017/7/16.
 */
public class Producer extends EndPoint {
 
  public Producer(String endpointName) throws Exception {
    super(endpointName);
  }
 
  public void sendMessage(Serializable object) throws Exception {
    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
  }
}

消费者QueueConsumer.java

消费者可以以线程方式运行,对于不同的事件有不同的回调函数,其中最主要的是处理新消息到来的事件。

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.zhenqi;
 
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
 
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
/**
 * Created by wuming on 2017/7/16.
 */
public class QueueConsumer extends EndPoint implements Runnable, Consumer {
 
  private Logger LOG=Logger.getLogger(QueueConsumer.class);
 
  public QueueConsumer(String endpointName) throws Exception {
    super(endpointName);
  }
 
  public void handleConsumeOk(String s) {
 
  }
 
  public void handleCancelOk(String s) {
 
  }
 
  public void handleCancel(String s) throws IOException {
 
  }
 
  public void handleShutdownSignal(String s, ShutdownSignalException e) {
 
  }
 
  public void handleRecoverOk(String s) {
    LOG.info("Consumer "+s +" registered");
  }
 
  public void handleDelivery(String s, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
    Map map = (HashMap) SerializationUtils.deserialize(bytes);
    LOG.info("Message Number "+ map.get("message number") + " received.");
  }
 
  public void run() {
    try{
      channel.basicConsume(endPointName, true,this);
    }catch(IOException e){
      e.printStackTrace();
    }
  }
}

 测试

运行一个消费者线程,然后开始产生大量的消息,这些消息会被消费者取走

 
?
1
 
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.zhenqi;
 
import java.util.HashMap;
 
/**
 * Created by wuming on 2017/7/16.
 */
public class TestRabbitmq {
 
  public static void main(String[] args){
    try{
      QueueConsumer consumer = new QueueConsumer("queue");
      Thread consumerThread = new Thread(consumer);
      consumerThread.start();
 
      Producer producer = new Producer("queue");
 
      for (int i = 0; i < 100000; i++){
        HashMap message = new HashMap();
        message.put("message number", i);
        producer.sendMessage(message);
        System.out.println("Message Number "+ i +" sent.");
      }
    }catch(Exception e){
      e.printStackTrace();
    }
 
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:http://blog.csdn.net/coco2d_x2014/article/details/75213318?utm_source=tuicool&utm_medium=referral

延伸 · 阅读

精彩推荐
  • JAVA教程老生常谈Eclipse中的BuildPath(必看篇)

    老生常谈Eclipse中的BuildPath(必看篇)

    下面小编就为大家带来一篇老生常谈Eclipse中的BuildPath(必看篇)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    Java教程网3532020-11-10
  • JAVA教程MyBatis 执行动态 SQL语句详解

    MyBatis 执行动态 SQL语句详解

    大家对mybatis执行任意sql语句都了解,那么MyBatis执行动态SQL语句呢?下面脚本之家小编给大家解答下mybatis执行动态sql语句的方法,非常不错,感兴趣的朋友...

    isea5334042020-06-09
  • JAVA教程总结Java调用Python程序方法

    总结Java调用Python程序方法

    这篇文章主要介绍了总结Java调用Python程序方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随...

    IT_xiao_bai5372020-08-05
  • JAVA教程拉钩网java笔试题分享

    拉钩网java笔试题分享

    这篇文章主要介绍了拉钩网java笔试题分享,下面是题目和实现示例,需要的朋友可以参考下 ...

    Java教程网2082019-11-22
  • JAVA教程详解Spring Data JPA动态条件查询的写法

    详解Spring Data JPA动态条件查询的写法

    本篇文章主要介绍了Spring Data JPA动态条件查询的写法 ,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧...

    Derry.19904102020-11-25
  • JAVA教程java安全编码指南之:Number操作详解

    java安全编码指南之:Number操作详解

    这篇文章主要介绍了java安全编码指南之:Number操作详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    flydean程序那些事1972020-09-15
  • JAVA教程Java中的length和length()深入分析

    Java中的length和length()深入分析

    java中的length属性是针对数组说的,比如说你声明了一个数组,想知道这个数组的长度则用到了length这个属性。java中的length()方法是针对字符串String说的,如果想...

    daisy3812020-07-05
  • JAVA教程Java变量和对象的作用域

    Java变量和对象的作用域

    本文主要介绍了Java变量和对象的作用域的相关知识。具有很好的参考价值,下面跟着小编一起来看下吧...

    AlanLee(Java)2542020-09-02