服务器之家:专注于服务器技术及软件下载分享
分类导航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|数据库技术|

服务器之家 - 数据库 - Redis - Redis实现分布式锁和等待序列的方法示例

Redis实现分布式锁和等待序列的方法示例

2019-11-25 15:14小小小LIN子 Redis

这篇文章主要介绍了Redis实现分布式锁和等待序列的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的监管。

背景

最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常监管的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)

分析

redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置有效期,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己对失败的线程进程处理,有两种方式

  • 丢弃
  • 等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis 的 List 类型实现等待序列的作用

代码

直接上代码 其实直接redis的工具类就可以解决了

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.test
import redis.clients.jedis.Jedis;
 
import java.util.Collections;
import java.util.List;
 
/**
 * @desc redis队列实现方式
 * @anthor
 * @date
 **/
public class RedisUcUitl {
 
  private static final String LOCK_SUCCESS = "OK";
  private static final String SET_IF_NOT_EXIST = "NX";
  private static final String SET_WITH_EXPIRE_TIME = "PX";
 
  private static final Long RELEASE_SUCCESS = 1L;
 
  private RedisUcUitl() {
 
  }
  /**
   * logger
   **/
 
  /**
   * 存储redis队列顺序存储 在队列首部存入
   *
   * @param key  字节类型
   * @param value 字节类型
   */
  public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {
 
    return jedis.lpush(key, value);
  
  }
 
  /**
   * 移除列表中最后一个元素 并将改元素添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时
   *
   * @param srckey
   * @param dstkey
   * @param timeout 0 表示永不超时
   * @return
   */
  public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {
 
    return jedis.brpoplpush(srckey, dstkey, timeout);
 
  }
 
  /**
   * 返回制定的key,起始位置的redis数据
   * @param redisKey
   * @param start
   * @param end -1 表示到最后
   * @return
   */
  public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
    
    return jedis.lrange(redisKey, start, end);
  }
 
  /**
   * 删除key
   * @param redisKey
   */
  public static void delete(Jedis jedis, final byte[] redisKey) {
    
     return jedis.del(redisKey);
  }
 
  /**
   * 尝试加锁
   * @param lockKey key名称
   * @param requestId 身份标识
   * @param expireTime 过期时间
   * @return
   */
  public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    return LOCK_SUCCESS.equals(result);
 
  }
 
  /**
   * 释放锁
   * @param lockKey key名称
   * @param requestId 身份标识
   * @return
   */
  public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
    final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
 
    return RELEASE_SUCCESS.equals(result);
 
  }
}

业务逻辑主要代码如下

1.先消耗队列中的

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
while(true){
  // 消费队列
  try{
    // 被放入redis队列的数据 序列化后的
    byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
    if(bytes == null || bytes.isEmpty()){
      // 队列中没数据时退出
      break;
    }
    // 反序列化对象
    Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);
    // 塞入唯一的值 防止被其他线程误解锁
    String requestId = UUID.randomUUID().toString();
    boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
    if(lockGetFlag){
      // 成功获取锁 进行业务处理
      //TODO
      // 处理完毕释放锁
      boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);
 
    }else{
      // 未能获得锁放入等待队列
     RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
  
    }
    
  }catch(Exception e){
    break;
  }
  
}

2.处理最新接到的数据

同样是走尝试获取锁,获取不到放入队列的流程

一般序列化用 fastJson 之列的就可以了,这里用的是 JDK 自带的,工具类如下

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class ObjectSerialUtil {
 
  private ObjectSerialUtil() {
//    工具类
  }
 
  /**
   * 将Object对象序列化为byte[]
   *
   * @param obj 对象
   * @return byte数组
   * @throws Exception
   */
  public static byte[] objectToBytes(Object obj) throws IOException {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(obj);
    byte[] bytes = bos.toByteArray();
    bos.close();
    oos.close();
    return bytes;
  }
 
 
  /**
   * 将bytes数组还原为对象
   *
   * @param bytes
   * @return
   * @throws Exception
   */
  public static Object bytesToObject(byte[] bytes) {
    try {
      ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
      ObjectInputStream ois = new ObjectInputStream(bin);
      return ois.readObject();
    } catch (Exception e) {
      throw new BaseException("反序列化出错!", e);
    }
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://juejin.im/post/5d077d04f265da1b5f265661

延伸 · 阅读

精彩推荐
  • Redis《面试八股文》之 Redis十六卷

    《面试八股文》之 Redis十六卷

    redis 作为我们最常用的内存数据库,很多地方你都能够发现它的身影,比如说登录信息的存储,分布式锁的使用,其经常被我们当做缓存去使用。...

    moon聊技术8182021-07-26
  • Redisredis缓存存储Session原理机制

    redis缓存存储Session原理机制

    这篇文章主要为大家介绍了redis缓存存储Session原理机制详解,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪...

    程序媛张小妍9252021-11-25
  • RedisRedis Template实现分布式锁的实例代码

    Redis Template实现分布式锁的实例代码

    这篇文章主要介绍了Redis Template实现分布式锁,需要的朋友可以参考下 ...

    晴天小哥哥2592019-11-18
  • Redis关于Redis数据库入门详细介绍

    关于Redis数据库入门详细介绍

    大家好,本篇文章主要讲的是关于Redis数据库入门详细介绍,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览...

    沃尔码6982022-01-24
  • RedisRedis 6.X Cluster 集群搭建

    Redis 6.X Cluster 集群搭建

    码哥带大家完成在 CentOS 7 中安装 Redis 6.x 教程。在学习 Redis Cluster 集群之前,我们需要先搭建一套集群环境。机器有限,实现目标是一台机器上搭建 6 个节...

    码哥字节15752021-04-07
  • RedisRedis集群的5种使用方式,各自优缺点分析

    Redis集群的5种使用方式,各自优缺点分析

    Redis 多副本,采用主从(replication)部署结构,相较于单副本而言最大的特点就是主从实例间数据实时同步,并且提供数据持久化和备份策略。...

    优知学院4082021-08-10
  • Redis如何使用Redis锁处理并发问题详解

    如何使用Redis锁处理并发问题详解

    这篇文章主要给大家介绍了关于如何使用Redis锁处理并发问题的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用Redis具有一定的参考学习...

    haofly4522019-11-26
  • Redis详解三分钟快速搭建分布式高可用的Redis集群

    详解三分钟快速搭建分布式高可用的Redis集群

    这篇文章主要介绍了详解三分钟快速搭建分布式高可用的Redis集群,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,...

    万猫学社4502021-07-25