在单体应用中,如果我们对共享数据不进行加锁操作,会出现数据一致性问题,我们的解决办法通常是加锁。
在分布式架构中,我们同样会遇到数据共享操作问题,本文章使用redis来解决分布式架构中的数据一致性问题。
1. 单机数据一致性
单机数据一致性架构如下图所示:多个可客户访问同一个服务器,连接同一个数据库。
场景描述:客户端模拟购买商品过程,在redis中设定库存总数剩100个,多个客户端同时并发购买。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@restcontroller public class indexcontroller1 { @autowired stringredistemplate template; @requestmapping ( "/buy1" ) public string index(){ // redis中存有goods:001号商品,数量为100 string result = template.opsforvalue().get( "goods:001" ); // 获取到剩余商品数 int total = result == null ? 0 : integer.parseint(result); if ( total > 0 ){ // 剩余商品数大于0 ,则进行扣减 int realtotal = total - 1 ; // 将商品数回写数据库 template.opsforvalue().set( "goods:001" ,string.valueof(realtotal)); system.out.println( "购买商品成功,库存还剩:" +realtotal + "件, 服务端口为8001" ); return "购买商品成功,库存还剩:" +realtotal + "件, 服务端口为8001" ; } else { system.out.println( "购买商品失败,服务端口为8001" ); } return "购买商品失败,服务端口为8001" ; } } |
使用jmeter模拟高并发场景,测试结果如下:
测试结果出现多个用户购买同一商品,发生了数据不一致问题!
解决办法:单体应用的情况下,对并发的操作进行加锁操作,保证对数据的操作具有原子性
- synchronized
- reentrantlock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
@restcontroller public class indexcontroller2 { // 使用reentrantlock锁解决单体应用的并发问题 lock lock = new reentrantlock(); @autowired stringredistemplate template; @requestmapping ( "/buy2" ) public string index() { lock.lock(); try { string result = template.opsforvalue().get( "goods:001" ); int total = result == null ? 0 : integer.parseint(result); if (total > 0 ) { int realtotal = total - 1 ; template.opsforvalue().set( "goods:001" , string.valueof(realtotal)); system.out.println( "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ); return "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ; } else { system.out.println( "购买商品失败,服务端口为8001" ); } } catch (exception e) { lock.unlock(); } finally { lock.unlock(); } return "购买商品失败,服务端口为8001" ; } } |
2. 分布式数据一致性
上面解决了单体应用的数据一致性问题,但如果是分布式架构部署呢,架构如下:
提供两个服务,端口分别为8001、8002,连接同一个redis服务,在服务前面有一台nginx作为负载均衡
两台服务代码相同,只是端口不同
将8001、8002两个服务启动,每个服务依然用reentrantlock加锁,用jmeter做并发测试,发现会出现数据一致性问题!
3. redis实现分布式锁
3.1 方式一
取消单机锁,下面使用redis的set命令来实现分布式加锁
set key value [ex seconds] [px milliseconds] [nx|xx]
ex seconds − 设置指定的到期时间(以秒为单位)
px milliseconds − 设置指定的到期时间(以毫秒为单位)
nx − 仅在键不存在时设置键
xx − 只有在键已存在时才设置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
@restcontroller public class indexcontroller4 { // redis分布式锁的key public static final string redis_lock = "good_lock" ; @autowired stringredistemplate template; @requestmapping ( "/buy4" ) public string index(){ // 每个人进来先要进行加锁,key值为"good_lock",value随机生成 string value = uuid.randomuuid().tostring().replace( "-" , "" ); try { // 加锁 boolean flag = template.opsforvalue().setifabsent(redis_lock, value); // 加锁失败 if (!flag){ return "抢锁失败!" ; } system.out.println( value+ " 抢锁成功" ); string result = template.opsforvalue().get( "goods:001" ); int total = result == null ? 0 : integer.parseint(result); if (total > 0 ) { int realtotal = total - 1 ; template.opsforvalue().set( "goods:001" , string.valueof(realtotal)); // 如果在抢到所之后,删除锁之前,发生了异常,锁就无法被释放, // 释放锁操作不能在此操作,要在finally处理 // template.delete(redis_lock); system.out.println( "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ); return "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ; } else { system.out.println( "购买商品失败,服务端口为8001" ); } return "购买商品失败,服务端口为8001" ; } finally { // 释放锁 template.delete(redis_lock); } } } |
上面的代码,可以解决分布式架构中数据一致性问题。但再仔细想想,还是会有问题,下面进行改进。
3.2 方式二(改进方式一)
在上面的代码中,如果程序在运行期间,部署了微服务jar包的机器突然挂了,代码层面根本就没有走到finally代码块,也就是说在宕机前,锁并没有被删除掉,这样的话,就没办法保证解锁
所以,这里需要对这个key加一个过期时间,redis中设置过期时间有两种方法:
- template.expire(redis_lock,10, timeunit.seconds)
- template.opsforvalue().setifabsent(redis_lock, value,10l,timeunit.seconds)
第一种方法需要单独的一行代码,且并没有与加锁放在同一步操作,所以不具备原子性,也会出问题
第二种方法在加锁的同时就进行了设置过期时间,所有没有问题,这里采用这种方式
调整下代码,在加锁的同时,设置过期时间:
1
2
|
// 为key加一个过期时间,其余代码不变 boolean flag = template.opsforvalue().setifabsent(redis_lock,value,10l,timeunit.seconds); |
这种方式解决了因服务突然宕机而无法释放锁的问题。但再仔细想想,还是会有问题,下面进行改进。
3.3 方式三(改进方式二)
方式二设置了key的过期时间,解决了key无法删除的问题,但问题又来了
上面设置了key的过期时间为10秒,如果业务逻辑比较复杂,需要调用其他微服务,处理时间需要15秒(模拟场景,别较真),而当10秒钟过去之后,这个key就过期了,其他请求就又可以设置这个key,此时如果耗时15秒的请求处理完了,回来继续执行程序,就会把别人设置的key给删除了,这是个很严重的问题!
所以,谁上的锁,谁才能删除
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
@restcontroller public class indexcontroller6 { public static final string redis_lock = "good_lock" ; @autowired stringredistemplate template; @requestmapping ( "/buy6" ) public string index(){ // 每个人进来先要进行加锁,key值为"good_lock" string value = uuid.randomuuid().tostring().replace( "-" , "" ); try { // 为key加一个过期时间 boolean flag = template.opsforvalue().setifabsent(redis_lock, value,10l,timeunit.seconds); // 加锁失败 if (!flag){ return "抢锁失败!" ; } system.out.println( value+ " 抢锁成功" ); string result = template.opsforvalue().get( "goods:001" ); int total = result == null ? 0 : integer.parseint(result); if (total > 0 ) { // 如果在此处需要调用其他微服务,处理时间较长。。。 int realtotal = total - 1 ; template.opsforvalue().set( "goods:001" , string.valueof(realtotal)); system.out.println( "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ); return "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ; } else { system.out.println( "购买商品失败,服务端口为8001" ); } return "购买商品失败,服务端口为8001" ; } finally { // 谁加的锁,谁才能删除!!!! if (template.opsforvalue().get(redis_lock).equals(value)){ template.delete(redis_lock); } } } } |
这种方式解决了因服务处理时间太长而释放了别人锁的问题。这样就没问题了吗?
3.4 方式四(改进方式三)
在上面方式三下,规定了谁上的锁,谁才能删除,但finally快的判断和del删除操作不是原子操作,并发的时候也会出问题,并发嘛,就是要保证数据的一致性,保证数据的一致性,最好要保证对数据的操作具有原子性。
在redis的set命令介绍中,最后推荐lua脚本进行锁的删除,地址:https://redis.io/commands/set
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
|
@restcontroller public class indexcontroller7 { public static final string redis_lock = "good_lock" ; @autowired stringredistemplate template; @requestmapping ( "/buy7" ) public string index(){ // 每个人进来先要进行加锁,key值为"good_lock" string value = uuid.randomuuid().tostring().replace( "-" , "" ); try { // 为key加一个过期时间 boolean flag = template.opsforvalue().setifabsent(redis_lock, value,10l,timeunit.seconds); // 加锁失败 if (!flag){ return "抢锁失败!" ; } system.out.println( value+ " 抢锁成功" ); string result = template.opsforvalue().get( "goods:001" ); int total = result == null ? 0 : integer.parseint(result); if (total > 0 ) { // 如果在此处需要调用其他微服务,处理时间较长。。。 int realtotal = total - 1 ; template.opsforvalue().set( "goods:001" , string.valueof(realtotal)); system.out.println( "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ); return "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ; } else { system.out.println( "购买商品失败,服务端口为8001" ); } return "购买商品失败,服务端口为8001" ; } finally { // 谁加的锁,谁才能删除,使用lua脚本,进行锁的删除 jedis jedis = null ; try { jedis = redisutils.getjedis(); string script = "if redis.call('get',keys[1]) == argv[1] " + "then " + "return redis.call('del',keys[1]) " + "else " + " return 0 " + "end" ; object eval = jedis.eval(script, collections.singletonlist(redis_lock), collections.singletonlist(value)); if ( "1" .equals(eval.tostring())){ system.out.println( "-----del redis lock ok...." ); } else { system.out.println( "-----del redis lock error ...." ); } } catch (exception e){ } finally { if ( null != jedis){ jedis.close(); } } } } } |
3.5 方式五(改进方式四)
在方式四下,规定了谁上的锁,谁才能删除,并且解决了删除操作没有原子性问题。但还没有考虑缓存续命,以及redis集群部署下,异步复制造成的锁丢失:主节点没来得及把刚刚set进来这条数据给从节点,就挂了。所以直接上redlock的redisson落地实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
@restcontroller public class indexcontroller8 { public static final string redis_lock = "good_lock" ; @autowired stringredistemplate template; @autowired redisson redisson; @requestmapping ( "/buy8" ) public string index(){ rlock lock = redisson.getlock(redis_lock); lock.lock(); // 每个人进来先要进行加锁,key值为"good_lock" string value = uuid.randomuuid().tostring().replace( "-" , "" ); try { string result = template.opsforvalue().get( "goods:001" ); int total = result == null ? 0 : integer.parseint(result); if (total > 0 ) { // 如果在此处需要调用其他微服务,处理时间较长。。。 int realtotal = total - 1 ; template.opsforvalue().set( "goods:001" , string.valueof(realtotal)); system.out.println( "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ); return "购买商品成功,库存还剩:" + realtotal + "件, 服务端口为8001" ; } else { system.out.println( "购买商品失败,服务端口为8001" ); } return "购买商品失败,服务端口为8001" ; } finally { if (lock.islocked() && lock.isheldbycurrentthread()){ lock.unlock(); } } } } |
3.6 小结
分析问题的过程,也是解决问题的过程,也能锻炼自己编写代码时思考问题的方式和角度。
上述测试代码地址:
https://github.com/hofanking/springboot-redis-example
到此这篇关于redis实现分布式锁方法详细的文章就介绍到这了,更多相关redis分布式锁内容请搜索服务器之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持服务器之家!
原文链接:https://blog.csdn.net/zxd1435513775/article/details/122194202