服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - Java教程 - Springboot集成RabbitMQ死信队列的实现

Springboot集成RabbitMQ死信队列的实现

2022-01-05 11:04小伙子你那什么车啊 Java教程

在大多数的MQ中间件中,都有死信队列的概念。本文主要介绍了Springboot集成RabbitMQ死信队列的实现,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

 

关于死信队列

在大多数的MQ中间件中,都有死信队列的概念。死信队列同其他的队列一样都是普通的队列。在RabbitMQ中并没有特定的“死信队列”类型,而是通过配置,将其实现。
当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange。当消息“死掉”后,会被自动路由到DLX Exchange的queue中。

 

什么样的消息会进入死信队列?

1.消息的TTL过期。
2.消费者对broker应答Nack,并且消息禁止重回队列。
3.Queue队列长度已达上限。

 

场景分析

以用户订单支付为场景。在各大电商平台上,订单的都有待支付时间,通常为30min。当用户超过30min未支付订单,该订单的状态应该会变成“超时取消”,或类似的状态值的改变。
如果不使用MQ,可以设计一个定时任务,定时查询数据库,判断订单的状态和支付时间是否已经到期,若到期则修改订单的状态。但显然,这不是一个很好的操作,频繁访问数据库,造成不必要的资源浪费。
使用MQ,我们可以在下单的时候,当订单数据入库后,发送一条Message到Queue中,并设置过期时间为30min或自定义的支付过期时间。

   /**
     * 发送带有过期时间的消息
     */
    @GetMapping("/sendDlx")
    public void sendDlx() {
        Order order = new Order();
        order.setItemId(1);
        order.setStatus(1);
        rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey, 
                JSON.toJSONString(order), message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            // 模拟,设置10S后消息过期
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
    }

若30min后,还未有消费者(下游服务)消费这条消息,那么该条消息就会被路由到死信队列中。我们可以设置一个监听去监听死信队列,当收到死信队列的消息后,则根据消息数据,查询数据库订单状态是否还是待支付状态,若是,则修改成超时取消。

 

代码实现

以下是demo,未做服务的拆分,因此整个流程都是单个服务实现的,所以就没有下游服务,但并不影响整体业务。

RabbitMQConfig

将需要的交换机,队列,绑定都声明成SpringBean。Spring会自动创建这些到RabbitMQ服务中。
@Value注解部分都是配置文件exchange、queue、routingKey的名称。

/**
 * @author wulei
 */
@Configuration
public class RabbitConfig {

    @Value("${sunspring.order.exchange}")
    private String orderExchange;

    @Value("${sunspring.order.queue}")
    private String orderQueue;

    @Value("${sunspring.order.routingKey}")
    private String orderRoutingKey;

    @Value("${sunspring.dlx.exchange}")
    private String dlxExchange;

    @Value("${sunspring.dlx.queue}")
    private String dlxQueue;

    @Value("${sunspring.dlx.routingKey}")
    private String dlxRoutingKey;

    /**
     * 声明死信队列
     * @return DirectExchange
     */
    @Bean
    public DirectExchange dlxExchange() {
        return new DirectExchange(dlxExchange);
    }

    /**
     * 声明死信队列
     * @return Queue
     */
    @Bean
    public Queue dlxQueue() {
        return new Queue(dlxQueue);
    }

    /**
     * 绑定死信队列到死信交换机
     * @return Binding
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(dlxQueue())
                .to(dlxExchange())
                .with(dlxRoutingKey);
    }

    /**
     * 声明订单业务交换机
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(orderExchange);
    }

    /**
     * 声明订单业务队列
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        Map<String,Object> arguments = new HashMap<>(2);
        // 绑定该队列到私信交换机
        arguments.put("x-dead-letter-exchange",dlxExchange);
        arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
        return new Queue(orderQueue,true,false,false,arguments);
    }

    /**
     * 绑定订单队列到订单交换机
     * @return Binding
     */
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue())
                .to(orderExchange())
                .with(orderRoutingKey);

    }
}
sunspring.order.exchange=sunspring_order_exchange
sunspring.order.queue=sunspring_order_queue
sunspring.order.routingKey=sunspring.order

sunspring.dlx.exchange=sunspring_dlx_exchange
sunspring.dlx.queue=sunspring.dlx.queue
sunspring.dlx.routingKey=dlx

在声明业务队列时,创建了一个Map,并且put了两个值,这两个值就是死信队列的声明。
x-dead-letter-exchange:死信交换机的名称
x-dead-letter-routing-key:死信交换机的路由键,因为demo中两个交换机的类型都是direct的,因此路由键必须相同。

/**
     * 声明订单业务队列
     * @return Queue
     */
    @Bean
    public Queue orderQueue() {
        Map<String,Object> arguments = new HashMap<>(2);
        // 绑定该队列到私信交换机
        arguments.put("x-dead-letter-exchange",dlxExchange);
        arguments.put("x-dead-letter-routing-key",dlxRoutingKey);
        return new Queue(orderQueue,true,false,false,arguments);
    }

监控页面

在exchange列表中有刚刚创建的业务交换机sunspring_order_exchange和死信交换机
sunspring_dlx_exchange

Springboot集成RabbitMQ死信队列的实现

在Queue列表中,有死信队列sunspring_dlx_queue和业务队列sunspring_order_queue
并且业务队列上有DLX标记,可见当前队列已经绑定了一个死信队列。DLK表示的路由键。

Springboot集成RabbitMQ死信队列的实现

 

场景模拟

 

生产者

生产者发送了一个过期时间为10S的消息。
message.getMessageProperties().setExpiration(“10000”);

/**
     * 发送带有过期时间的消息
     */
    @GetMapping("/sendDlx")
    public void sendDlx() {
        Order order = new Order();
        order.setItemId(1);
        order.setStatus(1);
        rabbitTemplate.convertAndSend(orderExchange, orderRoutingKey,
                JSON.toJSONString(order), message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
            message.getMessageProperties().setExpiration("10000");
            return message;
        });
    }

sunspring_order_queue接受到了一条消息,当前消息的状态是ready的,表示没有任何消费者消费这条消息。

Springboot集成RabbitMQ死信队列的实现

10s后,当前消息路由到了死信队列中,sunspring_order_queue消息数量变成0,sunspring_dlx_queue数量变成1。

Springboot集成RabbitMQ死信队列的实现

消费者,设置死信队列监听

通过设置对死信队列的监听,可以发现,在Springboot启动之后,创建了对RabbitMQ的监听,死信队列的消息也立刻被消费了。

因此,我们可以监听死信队列,对未被消费的消息进行下一步操作。如场景分析中的更改订单状态。

   @RabbitListener(queues = "sunspring.dlx.queue")
    public void dlxListener(Message message,Channel channel) throws IOException {
        System.out.println(new String(message.getBody()));

        //对消息进行业务处理....
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }

2019-08-20 20:05:05.158 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [120.27.243.91:5672]
2019-08-20 20:05:05.224 INFO 4420 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#68ab0936:0/SimpleConnection@74606204 [delegate=amqp://guest@120.27.243.91:5672/, localPort= 13563]
{"itemId":1,"status":1}

到此这篇关于Springboot集成RabbitMQ死信队列的实现的文章就介绍到这了,更多相关Springboot RabbitMQ死信队列内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://blog.csdn.net/shishishi777/article/details/99879419

延伸 · 阅读

精彩推荐
  • Java教程升级IDEA后Lombok不能使用的解决方法

    升级IDEA后Lombok不能使用的解决方法

    最近看到提示IDEA提示升级,寻思已经有好久没有升过级了。升级完毕重启之后,突然发现好多错误,本文就来介绍一下如何解决,感兴趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程Java BufferWriter写文件写不进去或缺失数据的解决

    Java BufferWriter写文件写不进去或缺失数据的解决

    这篇文章主要介绍了Java BufferWriter写文件写不进去或缺失数据的解决方案,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    这篇文章主要介绍了Java使用SAX解析xml的示例,帮助大家更好的理解和学习使用Java,感兴趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程20个非常实用的Java程序代码片段

    20个非常实用的Java程序代码片段

    这篇文章主要为大家分享了20个非常实用的Java程序片段,对java开发项目有所帮助,感兴趣的小伙伴们可以参考一下 ...

    lijiao5352020-04-06
  • Java教程Java实现抢红包功能

    Java实现抢红包功能

    这篇文章主要为大家详细介绍了Java实现抢红包功能,采用多线程模拟多人同时抢红包,文中示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙...

    littleschemer13532021-05-16
  • Java教程Java8中Stream使用的一个注意事项

    Java8中Stream使用的一个注意事项

    最近在工作中发现了对于集合操作转换的神器,java8新特性 stream,但在使用中遇到了一个非常重要的注意点,所以这篇文章主要给大家介绍了关于Java8中S...

    阿杜7482021-02-04
  • Java教程小米推送Java代码

    小米推送Java代码

    今天小编就为大家分享一篇关于小米推送Java代码,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧...

    富贵稳中求8032021-07-12
  • Java教程xml与Java对象的转换详解

    xml与Java对象的转换详解

    这篇文章主要介绍了xml与Java对象的转换详解的相关资料,需要的朋友可以参考下...

    Java教程网2942020-09-17