通过zookeeper实现分布式锁
1、创建zookeeper的client
首先通过curatorframeworkfactory创建一个连接zookeeper的连接curatorframework client
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
public class curatorfactorybean implements factorybean<curatorframework>, initializingbean, disposablebean { private static final logger logger = loggerfactory.getlogger(contractfileinfocontroller. class ); private string connectionstring; private int sessiontimeoutms; private int connectiontimeoutms; private retrypolicy retrypolicy; private curatorframework client; public curatorfactorybean(string connectionstring) { this (connectionstring, 500 , 500 ); } public curatorfactorybean(string connectionstring, int sessiontimeoutms, int connectiontimeoutms) { this .connectionstring = connectionstring; this .sessiontimeoutms = sessiontimeoutms; this .connectiontimeoutms = connectiontimeoutms; } @override public void destroy() throws exception { logger.info( "closing curator framework..." ); this .client.close(); logger.info( "closed curator framework." ); } @override public curatorframework getobject() throws exception { return this .client; } @override public class <?> getobjecttype() { return this .client != null ? this .client.getclass() : curatorframework. class ; } @override public boolean issingleton() { return true ; } @override public void afterpropertiesset() throws exception { if (stringutils.isempty( this .connectionstring)) { throw new illegalstateexception( "connectionstring can not be empty." ); } else { if ( this .retrypolicy == null ) { this .retrypolicy = new exponentialbackoffretry( 1000 , 2147483647 , 180000 ); } this .client = curatorframeworkfactory.newclient( this .connectionstring, this .sessiontimeoutms, this .connectiontimeoutms, this .retrypolicy); this .client.start(); this .client.blockuntilconnected( 30 , timeunit.milliseconds); } } public void setconnectionstring(string connectionstring) { this .connectionstring = connectionstring; } public void setsessiontimeoutms( int sessiontimeoutms) { this .sessiontimeoutms = sessiontimeoutms; } public void setconnectiontimeoutms( int connectiontimeoutms) { this .connectiontimeoutms = connectiontimeoutms; } public void setretrypolicy(retrypolicy retrypolicy) { this .retrypolicy = retrypolicy; } public void setclient(curatorframework client) { this .client = client; } } |
2、封装分布式锁
根据curatorframework创建interprocessmutex(分布式可重入排它锁)对一行数据进行上锁
1
2
3
|
public interprocessmutex(curatorframework client, string path) { this (client, path, new standardlockinternalsdriver()); } |
使用 acquire方法
1、acquire() :入参为空,调用该方法后,会一直堵塞,直到抢夺到锁资源,或者zookeeper连接中断后,上抛异常。
2、acquire(long time, timeunit unit):入参传入超时时间、单位,抢夺时,如果出现堵塞,会在超过该时间后,返回false。
1
2
3
4
5
6
7
8
9
|
public void acquire() throws exception { if (! this .internallock(-1l, (timeunit) null )) { throw new ioexception( "lost connection while trying to acquire lock: " + this .basepath); } } public boolean acquire( long time, timeunit unit) throws exception { return this .internallock(time, unit); } |
释放锁 mutex.release();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
public void release() throws exception { thread currentthread = thread.currentthread(); interprocessmutex.lockdata lockdata = (interprocessmutex.lockdata) this .threaddata.get(currentthread); if (lockdata == null ) { throw new illegalmonitorstateexception( "you do not own the lock: " + this .basepath); } else { int newlockcount = lockdata.lockcount.decrementandget(); if (newlockcount <= 0 ) { if (newlockcount < 0 ) { throw new illegalmonitorstateexception( "lock count has gone negative for lock: " + this .basepath); } else { try { this .internals.releaselock(lockdata.lockpath); } finally { this .threaddata.remove(currentthread); } } } } } |
封装后的dlock代码
1、调用interprocessmutex processmutex = dlock.mutex(path);
2、手动释放锁processmutex.release();
3、需要手动删除路径dlock.del(path);
推荐 使用:
都是 函数式编程
在业务代码执行完毕后 会释放锁和删除path
1、这个有返回结果
public t mutex(string path, zklockcallback zklockcallback, long time, timeunit timeunit)
2、这个无返回结果
public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
|
public class dlock { private final logger logger; private static final long timeout_d = 100l; private static final string root_path_d = "/dlock" ; private string lockrootpath; private curatorframework client; public dlock(curatorframework client) { this ( "/dlock" , client); } public dlock(string lockrootpath, curatorframework client) { this .logger = loggerfactory.getlogger(dlock. class ); this .lockrootpath = lockrootpath; this .client = client; } public interprocessmutex mutex(string path) { if (!stringutils.startswith(path, "/" )) { path = constant.keybuilder( new object[]{ "/" , path}); } return new interprocessmutex( this .client, constant.keybuilder( new object[]{ this .lockrootpath, "" , path})); } public <t> t mutex(string path, zklockcallback<t> zklockcallback) throws zklockexception { return this .mutex(path, zklockcallback, 100l, timeunit.milliseconds); } public <t> t mutex(string path, zklockcallback<t> zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this .getlockpath(path); interprocessmutex mutex = new interprocessmutex( this .client, finalpath); try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception( "acquire zk lock return false" ); } } catch (exception var13) { throw new zklockexception( "acquire zk lock failed." , var13); } t var8; try { var8 = zklockcallback.doinlock(); } finally { this .releaselock(finalpath, mutex); } return var8; } private void releaselock(string finalpath, interprocessmutex mutex) { try { mutex.release(); this .logger.info( "delete zk node path:{}" , finalpath); this .deleteinternal(finalpath); } catch (exception var4) { this .logger.error( "dlock" , "release lock failed, path:{}" , finalpath, var4); // logutil.error(this.logger, "dlock", "release lock failed, path:{}", new object[]{finalpath, var4}); } } public void mutex(string path, zkvoidcallback zklockcallback, long time, timeunit timeunit) throws zklockexception { string finalpath = this .getlockpath(path); interprocessmutex mutex = new interprocessmutex( this .client, finalpath); try { if (!mutex.acquire(time, timeunit)) { throw new zklockexception( "acquire zk lock return false" ); } } catch (exception var13) { throw new zklockexception( "acquire zk lock failed." , var13); } try { zklockcallback.response(); } finally { this .releaselock(finalpath, mutex); } } public string getlockpath(string custompath) { if (!stringutils.startswith(custompath, "/" )) { custompath = constant.keybuilder( new object[]{ "/" , custompath}); } string finalpath = constant.keybuilder( new object[]{ this .lockrootpath, "" , custompath}); return finalpath; } private void deleteinternal(string finalpath) { try { ((errorlistenerpathable) this .client.delete().inbackground()).forpath(finalpath); } catch (exception var3) { this .logger.info( "delete zk node path:{} failed" , finalpath); } } public void del(string custompath) { string lockpath = "" ; try { lockpath = this .getlockpath(custompath); ((errorlistenerpathable) this .client.delete().inbackground()).forpath(lockpath); } catch (exception var4) { this .logger.info( "delete zk node path:{} failed" , lockpath); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@functionalinterface public interface zklockcallback<t> { t doinlock(); } @functionalinterface public interface zkvoidcallback { void response(); } public class zklockexception extends exception { public zklockexception() { } public zklockexception(string message) { super (message); } public zklockexception(string message, throwable cause) { super (message, cause); } } |
配置curatorconfig
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@configuration public class curatorconfig { @value ( "${zk.connectionstring}" ) private string connectionstring; @value ( "${zk.sessiontimeoutms:500}" ) private int sessiontimeoutms; @value ( "${zk.connectiontimeoutms:500}" ) private int connectiontimeoutms; @value ( "${zk.dlockroot:/dlock}" ) private string dlockroot; @bean public curatorfactorybean curatorfactorybean() { return new curatorfactorybean(connectionstring, sessiontimeoutms, connectiontimeoutms); } @bean @autowired public dlock dlock(curatorframework client) { return new dlock(dlockroot, client); } } |
测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
@restcontroller @requestmapping ( "/dlock" ) public class lockcontroller { @autowired private dlock dlock; @requestmapping ( "/lock" ) public map testdlock(string no){ final string path = constant.keybuilder( "/test/no/" , no); long mutex=0l; try { system.out.println( "在拿锁:" +path+system.currenttimemillis()); mutex = dlock.mutex(path, () -> { try { system.out.println( "拿到锁了" + system.currenttimemillis()); thread.sleep( 10000 ); system.out.println( "操作完成了" + system.currenttimemillis()); } finally { return system.currenttimemillis(); } }, 1000 , timeunit.milliseconds); } catch (zklockexception e) { system.out.println( "拿不到锁呀" +system.currenttimemillis()); } return collections.singletonmap( "ret" ,mutex); } @requestmapping ( "/dlock" ) public map testdlock1(string no){ final string path = constant.keybuilder( "/test/no/" , no); long mutex=0l; try { system.out.println( "在拿锁:" +path+system.currenttimemillis()); interprocessmutex processmutex = dlock.mutex(path); processmutex.acquire(); system.out.println( "拿到锁了" + system.currenttimemillis()); thread.sleep( 10000 ); processmutex.release(); system.out.println( "操作完成了" + system.currenttimemillis()); } catch (zklockexception e) { system.out.println( "拿不到锁呀" +system.currenttimemillis()); e.printstacktrace(); } catch (exception e){ e.printstacktrace(); } return collections.singletonmap( "ret" ,mutex); } @requestmapping ( "/del" ) public map deldlock(string no){ final string path = constant.keybuilder( "/test/no/" , no); dlock.del(path); return collections.singletonmap( "ret" , 1 ); } } |
以上所述是小编给大家介绍的java(springboot)基于zookeeper的分布式锁实现详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对服务器之家网站的支持!
原文链接:https://blog.csdn.net/LJY_SUPER/article/details/87807091