服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - 基于Redis实现阻塞队列的方式

基于Redis实现阻塞队列的方式

2022-01-25 17:35WeJan's Blog Redis

本文主要讲解基于 Redis 的方式实现异步队列,基于 Redis 的 list 实现队列的方式也有多种,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧

日常需求开发过程中,不免会遇到需要通过代码进行异步处理的情况,比如批量发送邮件,批量发送短信,数据导入,为了减少用户的等待,不希望一直菊花转啊转,因此需要进行异步处理,做法就是讲要处理的数据添加到队列当中,然后按照排队的先后顺序进行异步处理。

这个队列,可以是专业的消息队列,如 RocketMQ/RabbitMQ 等,一般项目中,如果只是为了进行异步,未免有点杀鸡用牛刀的意味。
也可以使用基于 JVM 内存实现队列,但是如果项目进行了重启,就会造成队列数据丢失。
大部分的项目都会用到 Redis 中间件作为缓存使用,此时使用 Redis 的 list 结构来实现队列则是非常合适的选择。

因此,本文主要讲解基于 Redis 的方式实现异步队列。

本文首发个人技术博客: https://nullpointer.pw/redis-block-queue.html

基于 Redis 的 list 实现队列的方式也有多种,先说第一种不推荐的方式,即使用LPUSH生产消息,然后 while(true) 中通过RPOP消费消息,这种方式的确可以实现,但是不断代码不断的轮询,势必会消耗一些系统的资源。

第二种方式也是不推荐的方式,也是通过 LPUSH生产消息,然后通过 BRPOP 进行阻塞地等待并消费消息,这种方式较第一种方式减少了无用的轮询,降低系统资源的消耗,但是可能会存在队列消息丢失的情况,如果取出了消息然后处理失败,这个被取出的消息就将丢失。

第二种方式就是下文要介绍的方式,首先也是通过 LPUSH 生产消息,然后通过 BRPOPLPUSH阻塞地等待 list 新消息到来,有了新消息才开始消费,同时将消息备份到另外一个 list 当中,这种方式具备了第二种方式的优点,即减少了无用的轮询,同时也对消息进行了备份不会丢失数据,如果处理成功,可以通过 LREM 对备份的 list 中当前的这条消息进行删除处理。这种方式实现方式可以参考 模式: 安全的队列 .

Redis 基础

?
1
2
3
4
5
6
7
8
# 将一个或多个值 value 插入到列表 key 的表头
LPUSH key value [value …]
 
# 阻塞式等待,将列表 source 中的最后一个元素 (尾元素) 弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素。超时参数 timeout 接受一个以秒为单位的数字作为值。超时参数设为 0 表示阻塞时间可以无限期延长 (block indefinitely) 。
BRPOPLPUSH source destination timeout
 
# 根据参数 count 的值,移除列表中与参数 value 相等的元素。
LREM key count value

代码实现队列消息生产者

笔者使用的是 Spring 相关 API 实现对 Redis 指令的调用。首先实现消息的生产代码,封装到一个工具类方法当中。这里很简单,就是调用了 lpush 方法,将序列化的 key 和 value 添加到列表当中去。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Resource
private RedisConnectionFactory connectionFactory;
 
public void lPush(@Nonnull String key, @Nonnull String value) {
  RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
  try {
    byte[] byteKey = RedisSerializer.string().serialize(getKey(key));
    byte[] byteValue = RedisSerializer.string().serialize(value);
    assert byteKey != null;
    connection.lPush(byteKey, byteValue);
  } finally {
    RedisConnectionUtils.releaseConnection(connection, connectionFactory);
  }
}

代码实现队列消息消费者

因为实现队列消费消息的代码比较多,不可能每个需要阻塞消费的地方,对需要写这一坨代码,因此使用 Java8 的函数式接口实现方法的传递,同时阻塞式获取消息代码使用新线程去执行。

有人看到以下代码要吐槽了,不是说不用 while(true) 吗,怎么你这里面还是有,这里稍微解释一下,因为 SpringBoot 一般会指定 timeout 的全局超时时间,即使 BRPOPLPUSH 设置了 0,即无限期,当超出了 timeout 设置的值时,就会抛出 QueryTimeoutException 异常导致线程退出,因此添加了 try/catch 对异常进行捕获并忽略,同时使用 while(true) 保证线程可以继续执行。
代码中记录了当前消息处理结果,如果处理结果为成功,需要对备份队列的当前消息进行删除。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public void bRPopLPush(@Nonnull String key, Consumer<String> consumer) {
  CompletableFuture.runAsync(() -> {
    RedisConnection connection = RedisConnectionUtils.getConnection(connectionFactory);
    try {
      byte[] srcKey = RedisSerializer.string().serialize(getKey(key));
      byte[] dstKey = RedisSerializer.string().serialize(getBackupKey(key));
      assert srcKey != null;
      assert dstKey != null;
      while (true) {
        byte[] byteValue = new byte[0];
        boolean success = false;
        try {
          byteValue = connection.bRPopLPush(0, srcKey, dstKey);
          if (byteValue != null && byteValue.length != 0) {
            consumer.accept(new String(byteValue));
            success = true;
          }
        } catch (Exception ignored) {
          // 防止获取 key 达到超时时间抛出 QueryTimeoutException 异常退出
        } finally {
          if (success) {
            // 处理成功才删除备份队列的 key
            connection.lRem(dstKey, 1, byteValue);
          }
        }
      }
    } finally {
      RedisConnectionUtils.releaseConnection(connection, connectionFactory);
    }
  });
}

测试代码

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Test
public void testLPush() throws InterruptedException {
  String queueA = "queueA";
  int i = 0;
  while (true) {
    String msg = "Hello-" + i++;
    redisBlockQueue.lPush(queueA, msg);
    System.out.println("lPush: " + msg);
    Thread.sleep(3000);
  }
}
 
@Test
public void testBRPopLPush() {
  String queueA = "queueA";
  redisBlockQueue.bRPopLPush(queueA, (val) -> {
    // 在这里处理具体的业务逻辑
    System.out.println("val: " + val);
  });
 
  // 防止 Junit 进程退出
  LockSupport.park();
}

项目使用方式

为了方便使用,我将其抽取为了一个工具类,使用时通过 Spring 注入使用即可,
队列消费可以使用如下方式在项目启动的时候就进行阻塞监听队列,等待消费

?
1
2
3
4
5
6
7
8
9
@Resource
private RedisBlockQueue redisBlockQueue;
 
@PostConstruct
public void init() {
   redisBlockQueue.bRPopLPush(xx, (value) -> {
     //...
   });
}

本文完整代码下载github 地址

到此这篇关于基于Redis实现阻塞队列的文章就介绍到这了,更多相关Redis阻塞队列内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!

原文链接:https://www.cnblogs.com/vcmq/p/13509269.html

延伸 · 阅读

精彩推荐
  • Redis关于Redis数据库入门详细介绍

    关于Redis数据库入门详细介绍

    大家好,本篇文章主要讲的是关于Redis数据库入门详细介绍,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览...

    沃尔码6982022-01-24
  • Redisredis缓存存储Session原理机制

    redis缓存存储Session原理机制

    这篇文章主要为大家介绍了redis缓存存储Session原理机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪...

    程序媛张小妍9252021-11-25
  • RedisRedis 6.X Cluster 集群搭建

    Redis 6.X Cluster 集群搭建

    码哥带大家完成在 CentOS 7 中安装 Redis 6.x 教程。在学习 Redis Cluster 集群之前,我们需要先搭建一套集群环境。机器有限,实现目标是一台机器上搭建 6 个节...

    码哥字节15752021-04-07
  • Redis详解三分钟快速搭建分布式高可用的Redis集群

    详解三分钟快速搭建分布式高可用的Redis集群

    这篇文章主要介绍了详解三分钟快速搭建分布式高可用的Redis集群,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,...

    万猫学社4502021-07-25
  • Redis如何使用Redis锁处理并发问题详解

    如何使用Redis锁处理并发问题详解

    这篇文章主要给大家介绍了关于如何使用Redis锁处理并发问题的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学习...

    haofly4522019-11-26
  • Redis《面试八股文》之 Redis十六卷

    《面试八股文》之 Redis十六卷

    redis 作为我们最常用的内存数据库,很多地方你都能够发现它的身影,比如说登录信息的存储,分布式锁的使用,其经常被我们当做缓存去使用。...

    moon聊技术8182021-07-26
  • RedisRedis Template实现分布式锁的实例代码

    Redis Template实现分布式锁的实例代码

    这篇文章主要介绍了Redis Template实现分布式锁,需要的朋友可以参考下 ...

    晴天小哥哥2592019-11-18
  • RedisRedis集群的5种使用方式,各自优缺点分析

    Redis集群的5种使用方式,各自优缺点分析

    Redis 多副本,采用主从(replication)部署结构,相较于单副本而言最大的特点就是主从实例间数据实时同步,并且提供数据持久化和备份策略。...

    优知学院4082021-08-10