服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - 基于Redis延迟队列的实现代码

基于Redis延迟队列的实现代码

2021-08-04 15:15程序员老郑 Redis

在生活中很多时候都会用到延迟队列,本文基于Redis延迟队列的实现代码,具有一定的参考价值,感兴趣的小伙伴们可以参考一下

使用场景

工作中大家往往会遇到类似的场景:

1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户。

2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单。

解决方案分析

方案一:

采用通过定时任务采用数据库/非关系型数据库轮询方案。

优点:

1. 实现简单,对于项目前期这样是最容易的解决方案。

缺点:

1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询。

2. 服务资源浪费,因为轮询需要对所有的数据做一次 SCAN 扫描 JOB 服务的资源开销很大。

方案二:

采用延迟队列

优点:

1. 服务的资源使用率较高,能够精确的实现超时任务的执行。

2. 减少 DB 的查询次数,能够降低数据库的压力

缺点:

1. 对于延迟队列来说本身设计比较复杂,目前没有通用的比较好过的方案。

基于 Redis 的延迟队列实现

基于以上的分析,我决定通过 Redis 来实现分布式队列。

设计思路:

基于Redis延迟队列的实现代码

1. 第一步将需要发放的消息发送到延迟队列中。

2. 延迟队列将数据存入 Redis 的 ZSet 有序集合中score 为当前时间戳,member 存入需要发送的数据。

3. 添加一个 schedule 来进行对 Redis 有序队列的轮询。

4. 如果到达达到消息的执行时间,那么就进行业务的执行。

5. 如果没有达到消息的执行是将,那么消息等待下轮执行。

实现步骤:

由于本处篇幅有限,所以只列举部分代码,完整的代码可以在本文最后访问 GitHub 获取。由于本人阅历/水平有限,如有建议/或更正欢迎留言或提问。先在此谢谢大家驻足阅读

需要注意的问题:

单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。

事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。

我们可以通过 Redis 的 eval 命令来执行 lua 脚本来保证原子性实现Redis的事务。

实现步骤如下:

1. 延迟队列接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
 * 延迟队列
 *
 * @author zhengsh
 * @date 2020-03-27
 */
public interface RedisDelayQueue<E extends DelayMessage> {
 
    String META_TOPIC_WAIT = "delay:meta:topic:wait";
    String META_TOPIC_ACTIVE = "delay:meta:topic:active";
    String TOPIC_ACTIVE = "delay:active:9999";
    /**
     * 拉取消息
     */
    void poll();
 
    /**
     * 推送延迟消息
     *
     * @param e
     */
    void push(E e);
}

2. 延迟队列消息

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
 * 消息体
 *
 * @author zhengsh
 * @date 2020-03-27
 */
@Setter
@Getter
public class DelayMessage {
    /**
     * 消息唯一标识
     */
    private String id;
    /**
     * 消息主题
     */
    private String topic = "default";
    /**
     * 具体消息 json
     */
    private String body;
    /**
     * 延时时间, 格式为时间戳: 当前时间戳 + 实际延迟毫秒数
     */
    private Long delayTime = System.currentTimeMillis() + 30000L;
    /**
     * 消息发送时间
     */
    private LocalDateTime createTime;
}

3. 延迟队列实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
 * 延迟队列实现
 *
 * @author zhengsh
 * @date 2020-03-27
 */
@Component
public class RedisDelayQueueImpl<E extends DelayMessage> implements RedisDelayQueue<E> {
    private Logger logger = LoggerFactory.getLogger(getClass());
 
    @Autowired
    private StringRedisTemplate redisTemplate;
 
    @Override
    public void poll() {
        // todo
    }
 
    /**
     * 发送消息
     *
     * @param e
     */
    @SneakyThrows
    @Override
    public void push(E e) {
        try {
            String jsonStr = JSON.toJSONString(e);
            String topic = e.getTopic();
            String zkey = String.format("delay:wait:%s", topic);
            String u =
                    "redis.call('sadd', KEYS[1], ARGV[1])\n" +
                            "redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])\n" +
                            "return 1";
 
            Object[] keys = new Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)};
            Object[] values = new Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)};
 
            Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                Object nativeConnection = connection.getNativeConnection();
 
                if (nativeConnection instanceof RedisAsyncCommands) {
                    RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
                } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                    RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                    return (Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
                }
                return 0L;
            });
            logger.info("延迟队列[1],消息推送成功进入等待队列({}), topic: {}", result != null && result > 0, e.getTopic());
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
 
    private byte[] serialize(String key) {
        RedisSerializer<String> stringRedisSerializer =
                (RedisSerializer<String>) redisTemplate.getKeySerializer();
        //lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析
        return stringRedisSerializer.serialize(key);
    }
}

4. 定时任务

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**
 * 分发任务
 */
@Component
public class DistributeTask {
 
    private static final String LUA_SCRIPT;
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private StringRedisTemplate redisTemplate;
 
    static {
        StringBuilder sb = new StringBuilder(128);
        sb.append("local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)\n");
        sb.append("if(next(val) ~= nil) then\n");
        sb.append("    redis.call('sadd', KEYS[2], ARGV[2])\n");
        sb.append("    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)\n");
        sb.append("    for i = 1, #val, 100 do\n");
        sb.append("        redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))\n");
        sb.append("    end\n");
        sb.append("    return 1\n");
        sb.append("end\n");
        sb.append("return 0");
        LUA_SCRIPT = sb.toString();
    }
 
    /**
     * 2秒钟扫描一次执行队列
     */
    @Scheduled(cron = "0/5 * * * * ?")
    public void scheduledTaskByCorn() {
        try {
            Set<String> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT);
            assert members != null;
            for (String k : members) {
                if (!redisTemplate.hasKey(k)) {
                    // 如果 KEY 不存在元数据中删除
                    redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k);
                    continue;
                }
 
                String lk = k.replace("delay:wait", "delay:active");
                Object[] keys = new Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)};
                Object[] values = new Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)};
                Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
                    Object nativeConnection = connection.getNativeConnection();
 
                    if (nativeConnection instanceof RedisAsyncCommands) {
                        RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
                    } else if (nativeConnection instanceof RedisAdvancedClusterAsyncCommands) {
                        RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
                        return (Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
                    }
                    return 0L;
                });
                logger.info("延迟队列[2],消息到期进入执行队列({}): {}", result != null && result > 0, TOPIC_ACTIVE);
            }
        } catch (Throwable t) {
            t.printStackTrace();
        }
    }
 
    private byte[] serialize(String key) {
        RedisSerializer<String> stringRedisSerializer =
                (RedisSerializer<String>) redisTemplate.getKeySerializer();
        //lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析
        return stringRedisSerializer.serialize(key);
    }
}

GitHub 地址

https://github.com/zhengsh/redis-delay-queue

参考地址

1.https://www.runoob.com/redis/redis-transactions.html

到此这篇关于基于Redis延迟队列的实现代码的文章就介绍到这了,更多相关Redis 延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

原文链接:https://www.cnblogs.com/cbread/p/12630945.html

延伸 · 阅读

精彩推荐
  • Redis使用Redis实现用户积分排行榜的教程

    使用Redis实现用户积分排行榜的教程

    这篇文章主要介绍了使用Redis实现用户积分排行榜的教程,包括一个用PHP脚本进行操作的例子,需要的朋友可以参考下 ...

    Redis教程网4572019-10-23
  • Redis简单实用!利用Redis轻松实现高并发全局ID生成器

    简单实用!利用Redis轻松实现高并发全局ID生成器

    Redis作为高性能的KV数据库,并且操作还是原子性的,所以用来做支持高并发的发号器十分合适。 本文给大家介绍3种常见的全局ID生成方式。 1、全局递增...

    未知802023-05-07
  • RedisRedis挂了,流量把数据库也打挂了,怎么办?

    Redis挂了,流量把数据库也打挂了,怎么办?

    Redis 挂了,不就是缓存都没了吗?缓存都没了,不就是缓存雪崩了吗?缓存雪崩了,不就导致数据库挂了吗?一提到“缓存雪崩”这四个字,缓存穿透、缓存击...

    why技术8672021-08-11
  • RedisRedis面试题常见问答

    Redis面试题常见问答

    通常,我们会使用缓存用于缓冲对 DB 的冲击,如果缓存宕机,所有请求将直接打在 DB,造成 DB 宕机——从而导致整个系统宕机。...

    民工哥技术之路3082020-07-18
  • RedisRedis如何实现分布式锁

    Redis如何实现分布式锁

    相信大家对锁已经不陌生了,本文主要介绍了Redis如何实现分布式锁,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参...

    公众号程序员学长11082021-09-17
  • RedisWindows下Redis的安装使用教程

    Windows下Redis的安装使用教程

    这篇文章主要以图文结合的方式为大家详细介绍了Windows下Redis的安装使用,Redis的出现,很大程度补偿了memcached这类key/value存储的不足,在部分场合可以对...

    CSDN3952019-10-29
  • Redis一次关于Redis内存诡异增长的排查过程实战记录

    一次关于Redis内存诡异增长的排查过程实战记录

    这篇文章主要给大家分享了一次关于Redis内存诡异增长的排查过程实战记录,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学...

    付磊2962019-11-15
  • Redis浅谈Redis中的内存淘汰策略和过期键删除策略

    浅谈Redis中的内存淘汰策略和过期键删除策略

    本文主要介绍了浅谈Redis中的内存淘汰策略和过期键删除策略,文中通过示例代码介绍的非常详细,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    纪先生9002021-11-14