1、技术方案
1.1、redis的基本命令
1)SETNX命令(SET if Not eXists)
语法:SETNX key value
功能:当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
2)expire命令
语法:expire KEY seconds
功能:设置key的过期时间。如果key已过期,将会被自动删除。
3)DEL命令
语法:DEL key [KEY …]
功能:删除给定的一个或多个 key ,不存在的 key 会被忽略。
1.2、实现同步锁原理
1)加锁:“锁”就是一个存储在redis里的key-value对,key是把一组投资操作用字符串来形成唯一标识,value其实并不重要,因为只要这个唯一的key-value存在,就表示这个操作已经上锁。
2)解锁:既然key-value对存在就表示上锁,那么释放锁就自然是在redis里删除key-value对。
3)阻塞、非阻塞:阻塞式的实现,若线程发现已经上锁,会在特定时间内轮询锁。非阻塞式的实现,若发现线程已经上锁,则直接返回。
4)处理异常情况:假设当投资操作调用其他平台接口出现等待时,自然没有释放锁,这种情况下加入锁超时机制,用redis的expire命令为key设置超时时长,过了超时时间redis就会将这个key自动删除,即强制释放锁
(此步骤需在JAVA内部设置同样的超时机制,内部超时时长应小于或等于redis超时时长)。
1.3、处理流程图
2、代码实现
2.1、同步锁工具类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
package com.mic.synchrolock.util; import java.util.ArrayList; import java.util.List; import java.util.UUID; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import com.mic.constants.Constants; import com.mic.constants.InvestType; /** * 分布式同步锁工具类 * @author Administrator * */ public class SynchrolockUtil { private final Log logger = LogFactory.getLog(getClass()); @Autowired public final String RETRYTYPE_WAIT = "1" ; //加锁方法当对象已加锁时,设置为等待并轮询 public final String RETRYTYPE_NOWAIT = "0" ; //加锁方法当对象已加锁时,设置为直接返回 private String requestTimeOutName = "" ; //投资同步锁请求超时时间 private String retryIntervalName = "" ; //投资同步锁轮询间隔 private String keyTimeoutName = "" ; //缓存中key的失效时间 private String investProductSn = "" ; //产品Sn private String uuid; //对象唯一标识 private Long startTime = System.currentTimeMillis(); //首次调用时间 public Long getStartTime() { return startTime; } List<String> keyList = new ArrayList<String>(); //缓存key的保存集合 public List<String> getKeyList() { return keyList; } public void setKeyList(List<String> keyList) { this .keyList = keyList; } @PostConstruct public void init() { uuid = UUID.randomUUID().toString(); } @PreDestroy public void destroy() { this .unlock(); } /** * 根据传入key值,判断缓存中是否存在该key * 存在-已上锁:判断retryType,轮询超时,或直接返回,返回ture * 不存在-未上锁:将该放入缓存,返回false * @param key * @param retryType 当遇到上锁情况时 1:轮询;0:直接返回 * @return */ public boolean islocked(String key,String retryType){ boolean flag = true ; logger.info( "====投资同步锁设置轮询间隔、请求超时时长、缓存key失效时长====" ); //投资同步锁轮询间隔 毫秒 Long retryInterval = Long.parseLong(Constants.getProperty(retryIntervalName)); //投资同步锁请求超时时间 毫秒 Long requestTimeOut = Long.parseLong(Constants.getProperty(requestTimeOutName)); //缓存中key的失效时间 秒 Integer keyTimeout = Integer.parseInt(Constants.getProperty(keyTimeoutName)); //调用缓存获取当前产品锁 logger.info( "====当前产品key为:" +key+ "====" ); if (isLockedInRedis(key,keyTimeout)){ if ( "1" .equals(retryType)){ //采用轮询方式等待 while ( true ) { logger.info( "====产品已被占用,开始轮询====" ); try { Thread.sleep(retryInterval); } catch (InterruptedException e) { logger.error( "线程睡眠异常:" +e.getMessage(), e); return flag; } logger.info( "====判断请求是否超时====" ); Long currentTime = System.currentTimeMillis(); //当前调用时间 long Interval = currentTime - startTime; if (Interval > requestTimeOut) { logger.info( "====请求超时====" ); return flag; } if (!isLockedInRedis(key,keyTimeout)){ logger.info( "====轮询结束,添加同步锁====" ); flag = false ; keyList.add(key); break ; } } } else { //不等待,直接返回 logger.info( "====产品已被占用,直接返回====" ); return flag; } } else { logger.info( "====产品未被占用,添加同步锁====" ); flag = false ; keyList.add(key); } return flag; } /** * 在缓存中查询key是否存在 * 若存在则返回true; * 若不存在则将key放入缓存,设置过期时间,返回false * @param key * @param keyTimeout key超时时间单位是秒 * @return */ boolean isLockedInRedis(String key, int keyTimeout){ logger.info( "====在缓存中查询key是否存在====" ); boolean isExist = false ; //与redis交互,查询对象是否上锁 Long result = this .redisClientTemplate.setnx(key, uuid); logger.info( "====上锁 result = " +result+ "====" ); if ( null != result && 1 == Integer.parseInt(result.toString())){ logger.info( "====设置缓存失效时长 = " +keyTimeout+ "秒====" ); this .redisClientTemplate.expire(key, keyTimeout); logger.info( "====上锁成功====" ); isExist = false ; } else { logger.info( "====上锁失败====" ); isExist = true ; } return isExist; } /** * 根据传入key,对该产品进行解锁 * @param key * @return */ public void unlock(){ //与redis交互,对产品解锁 if (keyList.size()> 0 ){ for (String key : this .keyList){ String value = this .redisClientTemplate.get(key); if ( null != value && ! "" .equals(value)){ if (uuid.equals(value)){ logger.info( "====解锁key:" +key+ " value=" +value+ "====" ); this .redisClientTemplate.del(key); } else { logger.info( "====待解锁集合中key:" +key+ " value=" +value+ "与uuid不匹配====" ); } } else { logger.info( "====待解锁集合中key=" +key+ "的value为空====" ); } } } else { logger.info( "====待解锁集合为空====" ); } } } |
2.2、业务调用模拟样例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
//获取同步锁工具类 SynchrolockUtil synchrolockUtil = SpringUtils.getBean( "synchrolockUtil" ); //获取需上锁资源的KEY String key = "abc" ; //查询是否上锁,上锁轮询,未上锁加锁 boolean isLocked = synchrolockUtil.islocked(key,synchrolockUtil.RETRYTYPE_WAIT); //判断上锁结果 if (isLocked){ logger.error( "同步锁请求超时并返回 key =" +key); } else { logger.info( "====同步锁加锁陈功====" ); } try { //执行业务处理 } catch (Exception e) { logger.error( "业务异常:" +e.getMessage(), e); } finally { //解锁 synchrolockUtil.unlock(); } |
2.3、如果业务处理内部,还有嵌套加锁需求,只需将对象传入方法内部,加锁成功后将key值追加到集合中即可
ps:实际实现中还需要jedis工具类,需额外添加调用
补充:使用redis锁还是出现同步问题
一种可能是,2台机器同时访问,一台访问,还没有把锁设置过去的时候,另一台也查不到就会出现这个问题。
解决方法
这我跟写代码的方式有关。先查,如果不存在就set,这种方式有极微小的可能存在时间差,导致锁set了2次。
推荐使用setIfAbsent 这样在redis set的时候是单线程的。不会存在重复的问题。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持服务器之家。如有错误或未考虑完全的地方,望不吝赐教。
原文链接:https://www.cnblogs.com/MIC2016/p/7525560.html