服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

服务器之家 - 编程语言 - JAVA教程 - 详解Java如何实现基于Redis的分布式锁

详解Java如何实现基于Redis的分布式锁

2020-06-08 12:03daisy JAVA教程

在不同进程需要互斥地访问共享资源时,分布式锁是一种非常有用的技术手段。这篇文章运用图文和实例代码介绍了Java如何实现基于Redis的分布式锁,文章介绍的很详细,对Java和Redis刚兴趣的朋友们可以参考借鉴,下面来一起看看

前言

单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式。其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了。

我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cc.lixiaohui.lock;
 
import java.util.concurrent.TimeUnit;
 
public interface Lock {
 
 /**
 * 阻塞性的获取锁, 不响应中断
 */
 void lock;
 
 /**
 * 阻塞性的获取锁, 响应中断
 *
 * @throws InterruptedException
 */
 void lockInterruptibly throws InterruptedException;
 
 /**
 * 尝试获取锁, 获取不到立即返回, 不阻塞
 */
 boolean tryLock;
 
 /**
 * 超时自动返回的阻塞性的获取锁, 不响应中断
 *
 * @param time
 * @param unit
 * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未���取到锁
  *
 */
 boolean tryLock(long time, TimeUnit unit);
 
 /**
 * 超时自动返回的阻塞性的获取锁, 响应中断
 *
 * @param time
 * @param unit
 * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁
 * @throws InterruptedException 在尝试获取锁的当前线程被中断
 */
 boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;
 
 /**
 * 释放锁
 */
 void unlock;
 
}

看其抽象实现:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package cc.lixiaohui.lock;
 
import java.util.concurrent.TimeUnit;
 
/**
 * 锁的骨架实现, 真正的获取锁的步骤由子类去实现.
 *
 * @author lixiaohui
 *
 */
public abstract class AbstractLock implements Lock {
 
 /**
 * <pre>
 * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,
 * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性
 * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了.
 * </pre>
 */
 protected volatile boolean locked;
 
 /**
 * 当前jvm内持有该锁的线程(if have one)
 */
 private Thread exclusiveOwnerThread;
 
 public void lock {
 try {
 lock(false, 0, null, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 }
 
 public void lockInterruptibly throws InterruptedException {
 lock(false, 0, null, true);
 }
 
 public boolean tryLock(long time, TimeUnit unit) {
 try {
 return lock(true, time, unit, false);
 } catch (InterruptedException e) {
 // TODO ignore
 }
 return false;
 }
 
 public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException {
 return lock(true, time, unit, true);
 }
 
 public void unlock {
 // TODO 检查当前线程是否持有锁
 if (Thread.currentThread != getExclusiveOwnerThread) {
 throw new IllegalMonitorStateException("current thread does not hold the lock");
 }
 
 unlock0;
 setExclusiveOwnerThread(null);
 }
 
 protected void setExclusiveOwnerThread(Thread thread) {
 exclusiveOwnerThread = thread;
 }
 
 protected final Thread getExclusiveOwnerThread {
 return exclusiveOwnerThread;
 }
 
 protected abstract void unlock0;
 
 /**
 * 阻塞式获取锁的实现
 *
 * @param useTimeout
 * @param time
 * @param unit
 * @param interrupt 是否响应中断
 * @return
 * @throws InterruptedException
 */
 protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;
 
}

基于Redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package cc.lixiaohui.lock;
 
import java.util.concurrent.TimeUnit;
 
import redis.clients.jedis.Jedis;
 
/**
 * <pre>
 * 基于Redis的SETNX操作实现的分布式锁
 *
 * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞
 *
 * <a href="http://redis.io/commands/setnx">SETNC操作参考资料</a>
 * </pre>
 *
 * @author lixiaohui
 *
 */
public class RedisBasedDistributedLock extends AbstractLock {
 
 private Jedis jedis;
 
 // 锁的名字
 protected String lockKey;
 
 // 锁的有效时长(毫秒)
 protected long lockExpires;
 
 public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) {
 this.jedis = jedis;
 this.lockKey = lockKey;
 this.lockExpires = lockExpires;
 }
 
 // 阻塞式获取锁的实现
 protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{
 if (interrupt) {
 checkInterruption;
 }
 
 long start = System.currentTimeMillis;
 long timeout = unit.toMillis(time); // if !useTimeout, then it's useless
 
 while (useTimeout ? isTimeout(start, timeout) : true) {
 if (interrupt) {
 checkInterruption;
 }
 
 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);
 
 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
 // TODO 成功获取到锁, 设置相关标识
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 
 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假设多个线程(非单jvm)同时走到这里
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
 // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了
 if (oldValue != null && isTimeExpired(oldValue)) {
  // TODO 成功获取到锁, 设置相关标识
  locked = true;
  setExclusiveOwnerThread(Thread.currentThread);
  return true;
 }
 } else {
 // TODO lock is not expired, enter next loop retrying
 }
 }
 return false;
 }
 
 public boolean tryLock {
 long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间
 String stringOfLockExpireTime = String.valueOf(lockExpireTime);
 
 if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁
 // TODO 成功获取到锁, 设置相关标识
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 
 String value = jedis.get(lockKey);
 if (value != null && isTimeExpired(value)) { // lock is expired
 // 假设多个线程(非单jvm)同时走到这里
 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic
 // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的)
 // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了
 if (oldValue != null && isTimeExpired(oldValue)) {
 // TODO 成功获取到锁, 设置相关标识
 locked = true;
 setExclusiveOwnerThread(Thread.currentThread);
 return true;
 }
 } else {
 // TODO lock is not expired, enter next loop retrying
 }
 
 return false;
 }
 
 /**
 * Queries if this lock is held by any thread.
 *
 * @return {@code true} if any thread holds this lock and
  *   {@code false} otherwise
 */
 public boolean isLocked {
 if (locked) {
 return true;
 } else {
 String value = jedis.get(lockKey);
 // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了,
 // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断
 // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就
 // 不是同步控制, 它只是一种锁状态的报告.
 return !isTimeExpired(value);
 }
 }
 
 @Override
 protected void unlock0 {
 // TODO 判断锁是否过期
 String value = jedis.get(lockKey);
 if (!isTimeExpired(value)) {
 doUnlock;
 }
 }
 
 private void checkInterruption throws InterruptedException {
 if(Thread.currentThread.isInterrupted) {
 throw new InterruptedException;
 }
 }
 
 private boolean isTimeExpired(String value) {
 return Long.parseLong(value) < System.currentTimeMillis;
 }
 
 private boolean isTimeout(long start, long timeout) {
 return start + timeout > System.currentTimeMillis;
 }
 
 private void doUnlock {
 jedis.del(lockKey);
 }
 
}

如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)unlock0方法即可(所谓抽象嘛)

测试

模拟全局ID增长器,设计一个IDGenerator类,该类负责生成全局递增ID,其代码如下:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cc.lixiaohui.lock;
 
import java.math.BigInteger;
import java.util.concurrent.TimeUnit;
 
/**
 * 模拟ID生成
 * @author lixiaohui
 *
 */
public class IDGenerator {
 
 private static BigInteger id = BigInteger.valueOf(0);
 
 private final Lock lock;
 
 private static final BigInteger INCREMENT = BigInteger.valueOf(1);
 
 public IDGenerator(Lock lock) {
 this.lock = lock;
 }
 
 public String getAndIncrement {
 if (lock.tryLock(3, TimeUnit.SECONDS)) {
 try {
 // TODO 这里获取到锁, 访问临界区资源
 return getAndIncrement0;
 } finally {
 lock.unlock;
 }
 }
 return null;
 //return getAndIncrement0;
 }
 
 private String getAndIncrement0 {
 String s = id.toString;
 id = id.add(INCREMENT);
 return s;
 }
}

测试主逻辑:同一个JVM内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取ID(我这里并不是死循环而是跑20s),获取到ID存到同一个Set里面,在存之前先检查该IDset中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同JVM(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package cc.lixiaohui.DistributedLock.DistributedLock;
 
import java.util.HashSet;
import java.util.Set;
 
import org.junit.Test;
 
import redis.clients.jedis.Jedis;
import cc.lixiaohui.lock.IDGenerator;
import cc.lixiaohui.lock.Lock;
import cc.lixiaohui.lock.RedisBasedDistributedLock;
 
public class IDGeneratorTest {
 
 private static Set<String> generatedIds = new HashSet<String>;
 
 private static final String LOCK_KEY = "lock.lock";
 private static final long LOCK_EXPIRE = 5 * 1000;
 
 @Test
 public void test throws InterruptedException {
 Jedis jedis1 = new Jedis("localhost", 6379);
 Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g1 = new IDGenerator(lock1);
 IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");
 
 Jedis jedis2 = new Jedis("localhost", 6379);
 Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE);
 IDGenerator g2 = new IDGenerator(lock2);
 IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");
 
 Thread t1 = new Thread(consume1);
 Thread t2 = new Thread(consume2);
 t1.start;
 t2.start;
 
 Thread.sleep(20 * 1000); //让两个线程跑20秒
 
 IDConsumeMission.stop;
 
 t1.join;
 t2.join;
 }
 
 static String time {
 return String.valueOf(System.currentTimeMillis / 1000);
 }
 
 static class IDConsumeMission implements Runnable {
 
 private IDGenerator idGenerator;
 
 private String name;
 
 private static volatile boolean stop;
 
 public IDConsumeMission(IDGenerator idGenerator, String name) {
 this.idGenerator = idGenerator;
 this.name = name;
 }
 
 public static void stop {
 stop = true;
 }
 
 public void run {
 System.out.println(time + ": consume " + name + " start ");
 while (!stop) {
 String id = idGenerator.getAndIncrement;
 if(generatedIds.contains(id)) {
  System.out.println(time + ": duplicate id generated, id = " + id);
  stop = true;
  continue;
 }
 
 generatedIds.add(id);
 System.out.println(time + ": consume " + name + " add id = " + id);
 }
 System.out.println(time + ": consume " + name + " done ");
 }
 
 }
 
}

说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。

测试结果

跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:

详解Java如何实现基于Redis的分布式锁

IDGererator没有加锁(即IDGereratorgetAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

这个1秒都不到:

详解Java如何实现基于Redis的分布式锁

这个也1秒都不到:

详解Java如何实现基于Redis的分布式锁

结束语

好了,以上就是Java实现基于Redis的分布式锁的全部内容,各位如果发现问题希望能指正,希望这篇文章能对大家的学习和工作带来一定的帮助,如果有疑问可以留言交流。

延伸 · 阅读

精彩推荐
  • JAVA教程java二路归并排序示例分享

    java二路归并排序示例分享

    这篇文章主要介绍了java二路归并排序示例,需要的朋友可以参考下 ...

    java教程网5072019-11-11
  • JAVA教程浅谈Java转义符\\|

    浅谈Java转义符\\|

    java中\需要用\\来表示吧这个你应该知道,而split中传入的参数是什么呢 他并不是普通的字符串 你可以查一下api文档 它要求传入的是正则表达式 而正则表达...

    hebedich5072019-12-23
  • JAVA教程java生成压缩文件示例代码

    java生成压缩文件示例代码

    在工作过程中,需要将一个文件夹生成压缩文件,然后提供给用户下载。写了一个压缩文件的工具类。该工具类支持单个文件和文件夹压缩 ...

    java代码网3942019-10-21
  • JAVA教程java数据结构与算法之奇偶排序算法完整示例

    java数据结构与算法之奇偶排序算法完整示例

    这篇文章主要介绍了java数据结构与算法之奇偶排序算法,较为详细的分析了奇偶算法的原理并结合完整示例形式给出了实现技巧,需要的朋友可以参考下 ...

    modun4992020-06-03
  • JAVA教程Java实现字符串倒序输出的常用方法小结

    Java实现字符串倒序输出的常用方法小结

    这篇文章主要介绍了Java实现字符串倒序输出的常用方法,通过三个实例从不同角度实现该功能,有不错的借鉴价值,需要的朋友可以参考下 ...

    shichen20144062019-11-29
  • JAVA教程Java中String性能优化

    Java中String性能优化

    本文给大家分享的是如何在java中对String进行性能优化,使用String的时候需要有哪些注意事项呢,这就是今天我们要给大家总结分析的,有需要的小伙伴可以...

    hebedich2802019-12-13
  • JAVA教程JavaMail实现邮件发送的方法

    JavaMail实现邮件发送的方法

    这篇文章主要介绍了JavaMail实现邮件发送的方法,实例分析了java实现邮件发送的相关技巧,非常具有实用价值,需要的朋友可以参考下 ...

    司青4232019-12-16
  • JAVA教程java实现基于SGIP协议开发联通短信的方法

    java实现基于SGIP协议开发联通短信的方法

    这篇文章主要介绍了java实现基于SGIP协议开发联通短信的方法,涉及java短信发送的相关实现技巧,具有一定参考借鉴价值,需要的朋友可以参考下 ...

    tianshanfeike5332019-12-26