以下我们系统通过原理,过程等方便给大家深入的简介了java nio的函数机制以及用法等,学习下吧。
前言
本篇主要讲解java中的io机制
分为两块:
第一块讲解多线程下的io机制
第二块讲解如何在io机制下优化cpu资源的浪费(new io)
echo服务器
单线程下的socket机制就不用我介绍了,不懂得可以去查阅下资料
那么多线程下,如果进行套接字的使用呢?
我们使用最简单的echo服务器来帮助大家理解
首先,来看下多线程下服务端和客户端的工作流程图:
可以看到,多个客户端同时向服务端发送请求
服务端做出的措施是开启多个线程来匹配相对应的客户端
并且每个线程去独自完成他们的客户端请求
原理讲完了我们来看下是如何实现的
在这里我写了一个简单的服务器
用到了线程池的技术来创建线程(具体代码作用我已经加了注释):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
public class myserver { private static executorservice executorservice = executors.newcachedthreadpool(); //创建一个线程池 private static class handlemsg implements runnable{ //一旦有新的客户端请求,创建这个线程进行处理 socket client; //创建一个客户端 public handlemsg(socket client){ //构造传参绑定 this .client = client; } @override public void run() { bufferedreader bufferedreader = null ; //创建字符缓存输入流 printwriter printwriter = null ; //创建字符写入流 try { bufferedreader = new bufferedreader( new inputstreamreader(client.getinputstream())); //获取客户端的输入流 printwriter = new printwriter(client.getoutputstream(), true ); //获取客户端的输出流,true是随时刷新 string inputline = null ; long a = system.currenttimemillis(); while ((inputline = bufferedreader.readline())!= null ){ printwriter.println(inputline); } long b = system.currenttimemillis(); system.out.println( "此线程花费了:" +(b-a)+ "秒!" ); } catch (ioexception e) { e.printstacktrace(); } finally { try { bufferedreader.close(); printwriter.close(); client.close(); } catch (ioexception e) { e.printstacktrace(); } } } } public static void main(string[] args) throws ioexception { //服务端的主线程是用来循环监听客户端请求 serversocket server = new serversocket( 8686 ); //创建一个服务端且端口为8686 socket client = null ; while ( true ){ //循环监听 client = server.accept(); //服务端监听到一个客户端请求 system.out.println(client.getremotesocketaddress()+ "地址的客户端连接成功!" ); executorservice.submit( new handlemsg(client)); //将该客户端请求通过线程池放入handlmsg线程中进行处理 } } } |
上述代码中我们使用一个类编写了一个简单的echo服务器
在主线程中用死循环来开启端口监听
简单客户端
有了服务器,我们就可以对其进行访问,并且发送一些字符串数据
服务器的功能是返回这些字符串,并且打印出线程占用时间
下面来写个简单的客户端来响应服务端:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
public class myclient { public static void main(string[] args) throws ioexception { socket client = null ; printwriter printwriter = null ; bufferedreader bufferedreader = null ; try { client = new socket(); client.connect( new inetsocketaddress( "localhost" , 8686 )); printwriter = new printwriter(client.getoutputstream(), true ); printwriter.println( "hello" ); printwriter.flush(); bufferedreader = new bufferedreader( new inputstreamreader(client.getinputstream())); //读取服务器返回的信息并进行输出 system.out.println( "来自服务器的信息是:" +bufferedreader.readline()); } catch (ioexception e) { e.printstacktrace(); } finally { printwriter.close(); bufferedreader.close(); client.close(); } } } |
代码中,我们用字符流发送了一个hello字符串过去,如果代码没问题
服务器会返回一个hello数据,并且打印出我们设置的日志信息
echo服务器结果展示
我们来运行:
1.打开server,开启循环监听:
2.打开一个客户端:
可以看到客户端打印出了返回结果
3.查看服务端日志:
很好,一个简单的多线程套接字编程就实现了
但是试想一下:
如果一个客户端请求中,在io写入到服务端过程中加入sleep,
使每个请求占用服务端线程10秒
然后有大量的客户端请求,每个请求都占用那么长时间
那么服务端的并能能力就会大幅度下降
这并不是因为服务端有多少繁重的任务,而仅仅是因为服务线程在等待io(因为accept,read,write都是阻塞式的)
让高速运行的cpu去等待及其低效的网络io是非常不合算的行为
这时候该怎么办?
nio
new io成功的解决了上述问题,它是怎样解决的呢?
io处理客户端请求的最小单位是线程
而nio使用了比线程还小一级的单位:通道(channel)
可以说,nio中只需要一个线程就能完成所有接收,读,写等操作
要学习nio,首先要理解它的三大核心
selector,选择器
buffer,缓冲区
channel,通道
博主不才,画了张丑图给大家加深下印象 ^ . ^
再给一张tcp下的nio工作流程图(好难画的线条...)
大家大致看懂就行,我们一步步来
buffer
首先要知道什么是buffer
在nio中数据交互不再像io机制那样使用流
而是使用buffer(缓冲区)
博主觉得图才是最容易理解的
所以...
可以看出buffer在整个工作流程中的位置
来点实际点的,上面图中的具体代码如下:
1.首先给buffer分配空间,以字节为单位
1
|
bytebuffer bytebuffer = bytebuffer.allocate( 1024 ); |
创建一个bytebuffer对象并且指定内存大小
2.向buffer中写入数据:
1
2
|
1 ).数据从channel到buffer:channel.read(bytebuffer); 2 ).数据从client到buffer:bytebuffer.put(...); |
3.从buffer中读取数据:
1
2
|
1 ).数据从buffer到channel:channel.write(bytebuffer); 2 ).数据从buffer到server:bytebuffer.get(...); |
selector
选择器是nio的核心,它是channel的管理者
通过执行select()阻塞方法,监听是否有channel准备好
一旦有数据可读,此方法的返回值是selectionkey的数量
所以服务端通常会死循环执行select()方法,直到有channl准备就绪,然后开始工作
每个channel都会和selector绑定一个事件,然后生成一个selectionkey的对象
需要注意的是:
channel和selector绑定时,channel必须是非阻塞模式
而filechannel不能切换到非阻塞模式,因为它不是套接字通道,所以filechannel不能和selector绑定事件
在nio中一共有四种事件:
1.selectionkey.op_connect:连接事件
2.selectionkey.op_accept:接收事件
3.selectionkey.op_read:读事件
4.selectionkey.op_write:写事件
channel
共有四种通道:
filechannel:作用于io文件流
datagramchannel:作用于udp协议
socketchannel:作用于tcp协议
serversocketchannel:作用于tcp协议
本篇文章通过常用的tcp协议来讲解nio
我们以serversocketchannel为例:
打开一个serversocketchannel通道
1
|
serversocketchannel serversocketchannel = serversocketchannel.open(); |
关闭serversocketchannel通道:
1
|
serversocketchannel.close(); |
循环监听socketchannel:
1
2
3
4
|
while ( true ){ socketchannel socketchannel = serversocketchannel.accept(); clientchannel.configureblocking( false ); } |
clientchannel.configureblocking(false);
语句是将此通道设置为非阻塞,也就是异步
自由控制阻塞或非阻塞便是nio的特性之一
selectionkey
selectionkey是通道和选择器交互的核心组件
比如在socketchannel上绑定一个selector,并注册为连接事件:
1
2
3
4
|
socketchannel clientchannel = socketchannel.open(); clientchannel.configureblocking( false ); clientchannel.connect( new inetsocketaddress(port)); clientchannel.register(selector, selectionkey.op_connect); |
核心在register()方法,它返回一个selectionkey对象
来检测channel事件是那种事件可以使用以下方法:
1
2
3
4
|
selectionkey.isacceptable(); selectionkey.isconnectable(); selectionkey.isreadable(); selectionkey.iswritable(); |
服务端便是通过这些方法 在轮询中执行相对应操作
当然通过channel与selector绑定的key也可以反过来拿到他们
1
2
|
channel channel = selectionkey.channel(); selector selector = selectionkey.selector(); |
在channel上注册事件时,我们也可以顺带绑定一个buffer:
1
|
clientchannel.register(key.selector(), selectionkey.op_read,bytebuffer.allocatedirect( 1024 )); |
或者绑定一个object:
1
2
|
selectionkey.attach(object); object anthorobj = selectionkey.attachment(); |
nio的tcp服务端
讲了这么多,都是理论
我们来看下最简单也是最核心的代码(加那么多注释很不优雅,但方便大家看懂):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
|
package cn.blog.test.niotest; import java.io.ioexception; import java.net.inetsocketaddress; import java.nio.bytebuffer; import java.nio.channels.*; import java.nio.charset.charset; import java.util.iterator; import java.util.set; public class mynioserver { private selector selector; //创建一个选择器 private final static int port = 8686 ; private final static int buf_size = 10240 ; private void initserver() throws ioexception { //创建通道管理器对象selector this .selector=selector.open(); //创建一个通道对象channel serversocketchannel channel = serversocketchannel.open(); channel.configureblocking( false ); //将通道设置为非阻塞 channel.socket().bind( new inetsocketaddress(port)); //将通道绑定在8686端口 //将上述的通道管理器和通道绑定,并为该通道注册op_accept事件 //注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞 selectionkey selectionkey = channel.register(selector,selectionkey.op_accept); while ( true ){ //轮询 selector.select(); //这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个) set keys = selector.selectedkeys(); //如果channel有数据了,将生成的key访入keys集合中 iterator iterator = keys.iterator(); //得到这个keys集合的迭代器 while (iterator.hasnext()){ //使用迭代器遍历集合 selectionkey key = (selectionkey) iterator.next(); //得到集合中的一个key实例 iterator.remove(); //拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错 if (key.isacceptable()){ //判断当前key所代表的channel是否在acceptable状态,如果是就进行接收 doaccept(key); } else if (key.isreadable()){ doread(key); } else if (key.iswritable() && key.isvalid()){ dowrite(key); } else if (key.isconnectable()){ system.out.println( "连接成功!" ); } } } } public void doaccept(selectionkey key) throws ioexception { serversocketchannel serverchannel = (serversocketchannel) key.channel(); system.out.println( "serversocketchannel正在循环监听" ); socketchannel clientchannel = serverchannel.accept(); clientchannel.configureblocking( false ); clientchannel.register(key.selector(),selectionkey.op_read); } public void doread(selectionkey key) throws ioexception { socketchannel clientchannel = (socketchannel) key.channel(); bytebuffer bytebuffer = bytebuffer.allocate(buf_size); long bytesread = clientchannel.read(bytebuffer); while (bytesread> 0 ){ bytebuffer.flip(); byte [] data = bytebuffer.array(); string info = new string(data).trim(); system.out.println( "从客户端发送过来的消息是:" +info); bytebuffer.clear(); bytesread = clientchannel.read(bytebuffer); } if (bytesread==- 1 ){ clientchannel.close(); } } public void dowrite(selectionkey key) throws ioexception { bytebuffer bytebuffer = bytebuffer.allocate(buf_size); bytebuffer.flip(); socketchannel clientchannel = (socketchannel) key.channel(); while (bytebuffer.hasremaining()){ clientchannel.write(bytebuffer); } bytebuffer.compact(); } public static void main(string[] args) throws ioexception { mynioserver mynioserver = new mynioserver(); mynioserver.initserver(); } } |
我打印了监听channel,告诉大家serversocketchannel是在什么时候开始运行的
如果配合nio客户端的debug,就能很清楚的发现,进入select()轮询前
虽然已经有了accept事件的key,但select()默认并不会去调用
而是要等待有其它感兴趣事件被select()捕获之后,才会去调用accept的selectionkey
这时候serversocketchannel才开始进行循环监听
也就是说一个selector中,始终保持着serversocketchannel的运行
而serverchannel.accept();
真正做到了异步(在initserver方法中的channel.configureblocking(false);)
如果没有接受到connect,会返回一个null
如果成功连接了一个socketchannel,则此socketchannel会注册写入(read)事件
并且设置为异步
nio的tcp客户端
有服务端必定有客户端
其实如果能完全理解了服务端
客户端的代码大同小异
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
package cn.blog.test.niotest; import java.io.ioexception; import java.net.inetsocketaddress; import java.nio.bytebuffer; import java.nio.channels.selectionkey; import java.nio.channels.selector; import java.nio.channels.socketchannel; import java.util.iterator; public class mynioclient { private selector selector; //创建一个选择器 private final static int port = 8686 ; private final static int buf_size = 10240 ; private static bytebuffer bytebuffer = bytebuffer.allocate(buf_size); private void initclient() throws ioexception { this .selector = selector.open(); socketchannel clientchannel = socketchannel.open(); clientchannel.configureblocking( false ); clientchannel.connect( new inetsocketaddress(port)); clientchannel.register(selector, selectionkey.op_connect); while ( true ){ selector.select(); iterator<selectionkey> iterator = selector.selectedkeys().iterator(); while (iterator.hasnext()){ selectionkey key = iterator.next(); iterator.remove(); if (key.isconnectable()){ doconnect(key); } else if (key.isreadable()){ doread(key); } } } } public void doconnect(selectionkey key) throws ioexception { socketchannel clientchannel = (socketchannel) key.channel(); if (clientchannel.isconnectionpending()){ clientchannel.finishconnect(); } clientchannel.configureblocking( false ); string info = "服务端你好!!" ; bytebuffer.clear(); bytebuffer.put(info.getbytes( "utf-8" )); bytebuffer.flip(); clientchannel.write(bytebuffer); //clientchannel.register(key.selector(),selectionkey.op_read); clientchannel.close(); } public void doread(selectionkey key) throws ioexception { socketchannel clientchannel = (socketchannel) key.channel(); clientchannel.read(bytebuffer); byte [] data = bytebuffer.array(); string msg = new string(data).trim(); system.out.println( "服务端发送消息:" +msg); clientchannel.close(); key.selector().close(); } public static void main(string[] args) throws ioexception { mynioclient mynioclient = new mynioclient(); mynioclient.initclient(); } } |
输出结果
这里我打开一个服务端,两个客户端:
接下来,你可以试下同时打开一千个客户端,只要你的cpu够给力,服务端就不可能因为阻塞而降低性能
以上便是java nio的基础详解,如果大家还有什么不明白的地方可以在下方的留言区域讨论。
原文链接:https://segmentfault.com/a/1190000012316621