什么是memcache?
memcache集群环境下缓存解决方案
memcache是一个高性能的分布式的内存对象缓存系统,通过在内存里维护一个统一的巨大的hash表,它能够用来存储各种格式的数据,包括图像、视频、文件以及数据库检索的结果等。简单的说就是将数据调用到内存中,然后从内存中读取,从而大大提高读取速度。
memcache是danga的一个项目,最早是livejournal 服务的,最初为了加速 livejournal 访问速度而开发的,后来被很多大型的网站采用。
memcached是以守护程序方式运行于一个或多个服务器中,随时会接收客户端的连接和操作
为什么会有memcache和memcached两种名称?
其实memcache是这个项目的名称,而memcached是它服务器端的主程序文件名,知道我的意思了吧。一个是项目名称,一个是主程序文件名,在网上看到了很多人不明白,于是混用了。
memcached是高性能的,分布式的内存对象缓存系统,用于在动态应用中减少数据库负载,提升访问速度。memcached由danga interactive开发,用于提升livejournal.com访问速度的。lj每秒动态页面访问量几千次,用户700万。memcached将数据库负载大幅度降低,更好的分配资源,更快速访问。
这篇文章将会涉及以下内容:
- java socket多线程服务器
- java io
- concurrency
- memcache特性和协议
memcache
memcache is an in-memory key-value store for small chunks of arbitrary data (strings, objects) from results of databasecalls, api calls, or page rendering.
即内存缓存数据库,是一个键值对数据库。该数据库的存在是为了将从其他服务中获取的数据暂存在内存中,在重复访问时可以直接从命中的缓存中返回。既加快了访问速率,也减少了其他服务的负载。这里将实现一个单服务器版本的memcache,并且支持多个客户端的同时连接。
客户端将与服务器建立telnet连接,然后按照memcache协议与服务器缓存进行交互。这里实现的指令为get,set和del。先来看一下各个指令的格式
set
set属于存储指令,存储指令的特点时,第一行输入基本信息,第二行输入其对应的value值。
set <key> <flags> <exptime> <bytes> [noreply]\r\n
<value>\r\n
如果存储成功,将会返回stored,如果指令中包含noreply属性,则服务器将不会返回信息。
该指令中每个域的内容如下:
- key: 键
- flags: 16位无符号整数,会在get时随键值对返回
- exptime: 过期时间,以秒为单位
- bytes:即将发送的value的长度
- noreply:是否需要服务器响应,为可选属性
如果指令不符合标准,服务器将会返回error。
get
get属于获取指令,该指令特点如下:
get <key>*\r\n
它支持传入多个key的值,如果缓存命中了一个或者多个key,则会返回相应的数据,并以end作为结尾。如果没有命中,则返回的消息中不包含该key对应的值。格式如下:
1
2
3
4
5
6
|
value <key> <flags> <bytes>\r\n <data block>\r\n value <key> <flags> <bytes>\r\n <data block>\r\n end del |
删除指令,该指令格式如下:
1
|
del <key> [noreply]\r\n |
如果删除成功,则返回deleted\r\n,否则返回not_found。如果有noreply参数,则服务器不会返回响应。
java socket
java socket需要了解的只是包括tcp协议,套接字,以及io流。这里就不详细赘述,可以参考我的这系列文章,也建议去阅读java network programming。一书。
代码实现
这里贴图功能出了点问题,可以去文末我的项目地址查看类图。
这里采用了指令模式和工厂模式实现指令的创建和执行的解耦。指令工厂将会接收commandline并且返回一个command实例。每一个command都拥有execute方法用来执行各自独特的操作。这里只贴上del指令的特殊实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/** * 各种指令 * 目前支持get,set,delete * * 以及自定义的 * error,end */ public interface command { /** * 执行指令 * @param reader * @param writer */ void execute(reader reader, writer writer); /** * 获取指令的类型 * @return */ commandtype gettype(); } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
/** * 指令工厂 单一实例 */ public class commandfactory { private static commandfactory commandfactory; private static cache<item> memcache; private commandfactory(){} public static commandfactory getinstance(cache<item> cache) { if (commandfactory == null ) { commandfactory = new commandfactory(); memcache = cache; } return commandfactory; } /** * 根据指令的类型获取command * @param commandline * @return */ public command getcommand(string commandline){ if (commandline.matches( "^set .*$" )){ return new setcommand(commandline, memcache); } else if (commandline.matches( "^get .*$" )){ return new getcommand(commandline, memcache); } else if (commandline.matches( "^del .*$" )){ return new deletecommand(commandline, memcache); } else if (commandline.matches( "^end$" )){ return new endcommand(commandline); } else { return new errorcommand(commandline, errorcommand.errortype.error); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
|
/** * 删除缓存指令 */ public class deletecommand implements command{ private final string command; private final cache<item> cache; private string key; private boolean noreply; public deletecommand( final string command, final cache<item> cache){ this .command = command; this .cache = cache; initcommand(); } private void initcommand(){ if ( this .command.contains( "noreply" )){ noreply = true ; } string[] info = command.split( " " ); key = info[ 1 ]; } @override public void execute(reader reader, writer writer) { bufferedwriter bfw = (bufferedwriter) writer; item item = cache.delete(key); if (!noreply){ try { if (item == null ){ bfw.write( "not_found\r\n" ); } else { bfw.write( "deleted\r\n" ); } bfw.flush(); } catch (ioexception e) { try { bfw.write( "error\r\n" ); bfw.flush(); } catch (ioexception e1) { e1.printstacktrace(); } e.printstacktrace(); } } } @override public commandtype gettype() { return commandtype.search; } } |
然后是实现内存服务器,为了支持先进先出功能,这里使用了linkedtreemap作为底层实现,并且重写了removeoldest方法。同时还使用cachemanager的后台线程及时清除过期的缓存条目。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
public class memcache implements cache<item>{ private logger logger = logger.getlogger(memcache. class .getname()); //利用linkedhashmap实现lru private static linkedhashmap<string, item> cache; private final int maxsize; //负载因子 private final float default_load_factor = 0 .75f; public memcache( final int maxsize){ this .maxsize = maxsize; //确保cache不会在达到maxsize之后自动扩容 int capacity = ( int ) math.ceil(maxsize / default_load_factor) + 1 ; this .cache = new linkedhashmap<string, item>(capacity, default_load_factor, true ){ @override protected boolean removeeldestentry(map.entry<string,item> eldest) { if (size() > maxsize){ logger.info( "缓存数量已经达到上限,会删除最近最少使用的条目" ); } return size() > maxsize; } }; //实现同步访问 collections.synchronizedmap(cache); } public synchronized boolean isfull(){ return cache.size() >= maxsize; } @override public item get(string key) { item item = cache.get(key); if (item == null ){ logger.info( "缓存中key:" + key + "不存在" ); return null ; } else if (item!= null && item.isexpired()){ //如果缓存过期则删除并返回null logger.info( "从缓存中读取key:" + key + " value:" + item.getvalue() + "已经失效" ); cache.remove(key); return null ; } logger.info( "从缓存中读取key:" + key + " value:" + item.getvalue() + " 剩余有效时间" + item.remaintime()); return item; } @override public void set(string key, item value) { logger.info( "向缓存中写入key:" + key + " value:" + value); cache.put(key, value); } @override public item delete(string key) { logger.info( "从缓存中删除key:" + key); return cache.remove(key); } @override public int size(){ return cache.size(); } @override public int capacity() { return maxsize; } @override public iterator<map.entry<string, item>> iterator() { return cache.entryset().iterator(); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
/** * 缓存管理器 * 后台线程 * 将cache中过期的缓存删除 */ public class cachemanager implements runnable { private logger logger = logger.getlogger(cachemanager. class .getname()); //缓存 public cache<item> cache; public cachemanager(cache<item> cache){ this .cache = cache; } @override public void run() { while ( true ){ iterator<map.entry<string, item>> itemiterator = cache.iterator(); while (itemiterator.hasnext()){ map.entry<string, item> entry = itemiterator.next(); item item = entry.getvalue(); if (item.isexpired()){ logger.info( "key:" + entry.getkey() + " value" + item.getvalue() + " 已经过期,从数据库中删除" ); itemiterator.remove(); } } try { //每隔5秒钟再运行该后台程序 timeunit.seconds.sleep( 5 ); } catch (interruptedexception e) { e.printstacktrace(); } } } } |
最后是实现一个多线程的socket服务器,这里就是将serversocket绑定到一个接口,并且将accept到的socket交给额外的线程处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
/** * 服务器 */ public class ioserver implements server { private boolean stop; //端口号 private final int port; //服务器线程 private serversocket serversocket; private final logger logger = logger.getlogger(ioserver. class .getname()); //线程池,线程容量为maxconnection private final executorservice executorservice; private final cache<item> cache; public ioserver( int port, int maxconnection, cache<item> cache){ if (maxconnection<= 0 ) throw new illegalargumentexception( "支持的最大连接数量必须为正整数" ); this .port = port; executorservice = executors.newfixedthreadpool(maxconnection); this .cache = cache; } @override public void start() { try { serversocket = new serversocket(port); logger.info( "服务器在端口" +port+ "上启动" ); while ( true ){ try { socket socket = serversocket.accept(); logger.info( "收到" +socket.getlocaladdress()+ "的连接" ); executorservice.submit( new sockethandler(socket, cache)); } catch (ioexception e) { e.printstacktrace(); } } } catch (ioexception e) { logger.log(level.warning, "服务器即将关闭..." ); e.printstacktrace(); } finally { executorservice.shutdown(); shutdown(); } } /** * 服务器是否仍在运行 * @return */ public boolean isrunning() { return !serversocket.isclosed(); } /** * 停止服务器 */ public void shutdown(){ try { if (serversocket!= null ){ serversocket.close(); } } catch (ioexception e) { e.printstacktrace(); } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
/** * 处理各个客户端的连接 * 在获得end指令后关闭连接s */ public class sockethandler implements runnable{ private static logger logger = logger.getlogger(sockethandler. class .getname()); private final socket socket; private final cache<item> cache; private boolean finish; public sockethandler(socket s, cache<item> cache){ this .socket = s; this .cache = cache; } @override public void run() { try { //获取socket输入流 final bufferedreader reader = new bufferedreader( new inputstreamreader(socket.getinputstream())); //获取socket输出流 final bufferedwriter writer = new bufferedwriter( new outputstreamwriter(socket.getoutputstream())); commandfactory commandfactory = commandfactory.getinstance(cache); while (!finish){ final string commandline = reader.readline(); logger.info( "ip:" + socket.getlocaladdress() + " 指令:" + commandline); if (commandline == null || commandline.trim().isempty()) { continue ; } //使用指令工厂获取指令实例 final command command = commandfactory.getcommand(commandline); command.execute(reader, writer); if (command.gettype() == commandtype.end){ logger.info( "请求关闭连接" ); finish = true ; } } } catch (ioexception e) { e.printstacktrace(); logger.info( "关闭来自" + socket.getlocaladdress() + "的连接" ); } finally { try { if (socket != null ){ socket.close(); } } catch (ioexception e) { e.printstacktrace(); } } } } |
项目地址请戳这里,如果觉得还不错的话,希望能给个星哈><
参考资料
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://segmentfault.com/a/1190000014215001