一、nio类库简介
1、缓冲区buffer
buffer是一个对象,包含一些要写入和读出的数据。
在nio中,所有的数据都是用缓冲区处理的,读取数据时,它是从通道(channel)直接读到缓冲区中,在写入数据时,也是从缓冲区写入到通道。
缓冲区实质上是一个数组,通常是一个字节数组(bytebuffer),也可以是其它类型的数组,此外缓冲区还提供了对数据的结构化访问以及维护读写位置等信息。
buffer类的继承关系如下图所示:
2、通道channel
channel是一个通道,网络数据通过channel读取和写入。通道和流的不同之处在于通道是双向的(通道可以用于读、写后者二者同时进行),流只是在一个方向上移动。
channel大体上可以分为两类:用于网络读写的selectablechannel(serversocketchannel和socketchannel就是其子类)、用于文件操作的filechannel。
下面的例子给出通过filechannel来向文件中写入数据、从文件中读取数据,将文件数据拷贝到另一个文件中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
public class niotest { public static void main(string[] args) throws ioexception { copyfile(); } //拷贝文件 private static void copyfile() { fileinputstream in= null ; fileoutputstream out= null ; try { in= new fileinputstream( "src/main/java/data/in-data.txt" ); out= new fileoutputstream( "src/main/java/data/out-data.txt" ); filechannel inchannel=in.getchannel(); filechannel outchannel=out.getchannel(); bytebuffer buffer=bytebuffer.allocate( 1024 ); int bytesread = inchannel.read(buffer); while (bytesread!=- 1 ) { buffer.flip(); outchannel.write(buffer); buffer.clear(); bytesread = inchannel.read(buffer); } } catch (filenotfoundexception e) { // todo auto-generated catch block e.printstacktrace(); } catch (ioexception e) { // todo auto-generated catch block e.printstacktrace(); } } //写文件 private static void writefilenio() { try { randomaccessfile fout = new randomaccessfile( "src/main/java/data/nio-data.txt" , "rw" ); filechannel fc=fout.getchannel(); bytebuffer buffer=bytebuffer.allocate( 1024 ); buffer.put( "hi123" .getbytes()); buffer.flip(); try { fc.write(buffer); } catch (ioexception e) { // todo auto-generated catch block e.printstacktrace(); } } catch (filenotfoundexception e) { // todo auto-generated catch block e.printstacktrace(); } } //读文件 private static void readfilenio() { fileinputstream fileinputstream; try { fileinputstream = new fileinputstream( "src/main/java/data/nio-data.txt" ); filechannel filechannel=fileinputstream.getchannel(); //从 fileinputstream 获取通道 bytebuffer bytebuffer=bytebuffer.allocate( 1024 ); //创建缓冲区 int bytesread=filechannel.read(bytebuffer); //将数据读到缓冲区 while (bytesread!=- 1 ) { /*limit=position * position=0; */ bytebuffer.flip(); //hasremaining():告知在当前位置和限制之间是否有元素 while (bytebuffer.hasremaining()) { system.out.print((char) bytebuffer.get()); } /* * 清空缓冲区 * position=0; * limit=capacity; */ bytebuffer.clear(); bytesread = filechannel.read(bytebuffer); } } catch (filenotfoundexception e) { // todo auto-generated catch block e.printstacktrace(); } catch (ioexception e) { // todo auto-generated catch block e.printstacktrace(); } } } |
3、多路复用器selector
多路复用器提供选择已经就绪的任务的能力。selector会不断的轮询注册在其上的channel,如果某个channel上面发送读或者写事件,这个channel就处于就绪状态,会被selector轮询出来,然后通过selectionkey可以获取就绪channel的集合,进行后续的i/o操作。
一个多路复用器selector可以同时轮询多个channel,由于jdk使用了epoll代替了传统的select实现,所以它没有最大连接句柄1024/2048的限制,意味着只需要一个线程负责selector的轮询,就可以接入成千上万的客户端。其模型如下图所示:
用单线程处理一个selector。要使用selector,得向selector注册channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。
注:
1、什么select模型?
select是事件触发机制,当等待的事件发生就触发进行处理,多用于linux实现的服务器对客户端的处理。
可以阻塞地同时探测一组支持非阻塞的io设备,是否有事件发生(如可读、可写,有高优先级错误输出等),直至某一个设备触发了事件或者超过了指定的等待时间。也就是它们的职责不是做io,而是帮助调用者寻找当前就绪的设备。
2、什么是epoll模型?
epoll的设计思路,是把select/poll单个的操作拆分为1个epoll_create+多个epoll_ctrl+一个wait。此外,内核针对epoll操作添加了一个文件系统”eventpollfs”,每一个或者多个要监视的文件描述符都有一个对应的eventpollfs文件系统的inode节点,主要信息保存在eventpoll结构体中。而被监视的文件的重要信息则保存在epitem结构体中。所以他们是一对多的关系。
二、nio服务器端开发
功能说明:开启服务器端,对每一个接入的客户端都向其发送hello字符串。
使用nio进行服务器端开发主要有以下几个步骤:
1、创建serversocketchannel,配置它为非阻塞模式
1
2
|
serversocketchannel = serversocketchannel.open(); serversocketchannel.configureblocking( false ); |
2、绑定监听,配置tcp参数,如backlog大小
1
|
serversocketchannel.socket().bind( new inetsocketaddress( 8080 )); |
3、创建一个独立的i/o线程,用于轮询多路复用器selector
4、创建selector,将之前创建的serversocketchannel注册到selector上,监听selectionkey.accept
1
2
|
selector=selector.open(); serversocketchannel.register(selector, selectionkey.op_accept); |
5、启动i/o线程,在循环体内执行selector.select()方法,轮询就绪的channel
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
while ( true ) { try { //select()阻塞到至少有一个通道在你注册的事件上就绪了 //如果没有准备好的channel,就在这一直阻塞 //select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。 selector.select(); } catch (ioexception e) { // todo auto-generated catch block e.printstacktrace(); break ; } } |
6、当轮询到了处于就绪状态的channel时,需对其进行判断,如果是op_accept状态,说明是新的客户端接入,则调用serversocketchannel.accept()方法接受新的客户端
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
//返回已经就绪的selectionkey,然后迭代执行 set<selectionkey> readkeys=selector.selectedkeys(); for (iterator<selectionkey> it=readkeys.iterator();it.hasnext();) { selectionkey key=it.next(); it.remove(); try { if (key.isacceptable()) { serversocketchannel server=(serversocketchannel) key.channel(); socketchannel client=server.accept(); client.configureblocking( false ); client.register(selector,selectionkey.op_write); } else if (key.iswritable()) { socketchannel client=(socketchannel) key.channel(); bytebuffer buffer=bytebuffer.allocate( 20 ); string str= "hello" ; buffer=bytebuffer.wrap(str.getbytes()); client.write(buffer); key.cancel(); } } catch (ioexception e) { e.printstacktrace(); key.cancel(); try { key.channel().close(); } catch (ioexception e1) { // todo auto-generated catch block e1.printstacktrace(); } } } |
7、设置新接入的客户端链路socketchannel为非阻塞模式,配置其他的一些tcp参数
1
2
3
4
5
6
7
|
if (key.isacceptable()) { serversocketchannel server=(serversocketchannel) key.channel(); socketchannel client=server.accept(); client.configureblocking( false ); ... } |
8、将socketchannel注册到selector,监听op_write
1
|
client.register(selector,selectionkey.op_write); |
9、如果轮询的channel为op_write,则说明要向sockchannel中写入数据,则构造bytebuffer对象,写入数据包
1
2
3
4
5
6
7
8
9
|
else if (key.iswritable()) { socketchannel client=(socketchannel) key.channel(); bytebuffer buffer=bytebuffer.allocate( 20 ); string str= "hello" ; buffer=bytebuffer.wrap(str.getbytes()); client.write(buffer); key.cancel(); } |
完整代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
import java.io.ioexception; import java.net.inetsocketaddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.serversocketchannel; import java.nio.channels.socketchannel; import java.util.iterator; import java.util.set; public class serversocketchanneldemo { public static void main(string[] args) { serversocketchannel serversocketchannel; selector selector= null ; try { serversocketchannel = serversocketchannel.open(); serversocketchannel.configureblocking( false ); serversocketchannel.socket().bind( new inetsocketaddress( 8080 )); selector=selector.open(); serversocketchannel.register(selector, selectionkey.op_accept); } catch (ioexception e) { // todo auto-generated catch block e.printstacktrace(); } while ( true ) { try { //select()阻塞到至少有一个通道在你注册的事件上就绪了 //如果没有准备好的channel,就在这一直阻塞 //select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。 selector.select(); } catch (ioexception e) { // todo auto-generated catch block e.printstacktrace(); break ; } //返回已经就绪的selectionkey,然后迭代执行 set<selectionkey> readkeys=selector.selectedkeys(); for (iterator<selectionkey> it=readkeys.iterator();it.hasnext();) { selectionkey key=it.next(); it.remove(); try { if (key.isacceptable()) { serversocketchannel server=(serversocketchannel) key.channel(); socketchannel client=server.accept(); client.configureblocking( false ); client.register(selector,selectionkey.op_write); } else if (key.iswritable()) { socketchannel client=(socketchannel) key.channel(); bytebuffer buffer=bytebuffer.allocate( 20 ); string str= "hello" ; buffer=bytebuffer.wrap(str.getbytes()); client.write(buffer); key.cancel(); } } catch (ioexception e) { e.printstacktrace(); key.cancel(); try { key.channel().close(); } catch (ioexception e1) { // todo auto-generated catch block e1.printstacktrace(); } } } } } } |
我们用telnet localhost 8080模拟出多个客户端:
程序运行结果如下:
总结
以上就是本文关于java nio服务器端开发详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!
原文链接:http://www.cnblogs.com/xujian2014/p/5657540.html