序
本文主要研究一下java11的httpclient的基本使用。
变化
- 从java9的jdk.incubator.httpclient模块迁移到java.net.http模块,包名由jdk.incubator.http改为java.net.http
- 原来的诸如httpresponse.bodyhandler.asstring()方法变更为httpresponse.bodyhandlers.ofstring(),变化一为bodyhandler改为bodyhandlers,变化二为asxxx()之类的方法改为ofxxx(),由as改为of
实例
设置超时时间
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@test public void testtimeout() throws ioexception, interruptedexception { //1.set connect timeout httpclient client = httpclient.newbuilder() .connecttimeout(duration.ofmillis( 5000 )) .followredirects(httpclient.redirect.normal) .build(); //2.set read timeout httprequest request = httprequest.newbuilder() .uri(uri.create( "http://openjdk.java.net/" )) .timeout(duration.ofmillis( 5009 )) .build(); httpresponse<string> response = client.send(request, httpresponse.bodyhandlers.ofstring()); system.out.println(response.body()); } |
httpconnecttimeoutexception实例
1
2
3
4
5
6
7
|
caused by: java.net.http.httpconnecttimeoutexception: http connect timed out at java.net.http/jdk.internal.net.http.responsetimerevent.handle(responsetimerevent.java: 68 ) at java.net.http/jdk.internal.net.http.httpclientimpl.purgetimeoutsandreturnnextdeadline(httpclientimpl.java: 1248 ) at java.net.http/jdk.internal.net.http.httpclientimpl$selectormanager.run(httpclientimpl.java: 877 ) caused by: java.net.connectexception: http connect timed out at java.net.http/jdk.internal.net.http.responsetimerevent.handle(responsetimerevent.java: 69 ) ... 2 more |
httptimeoutexception实例
1
2
3
4
5
|
java.net.http.httptimeoutexception: request timed out at java.net.http/jdk.internal.net.http.httpclientimpl.send(httpclientimpl.java: 559 ) at java.net.http/jdk.internal.net.http.httpclientfacade.send(httpclientfacade.java: 119 ) at com.example.httpclienttest.testtimeout(httpclienttest.java: 40 ) |
设置authenticator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@test public void testbasicauth() throws ioexception, interruptedexception { httpclient client = httpclient.newbuilder() .connecttimeout(duration.ofmillis( 5000 )) .authenticator( new authenticator() { @override protected passwordauthentication getpasswordauthentication() { return new passwordauthentication( "admin" , "password" .tochararray()); } }) .build(); httprequest request = httprequest.newbuilder() .uri(uri.create( "http://localhost:8080/json/info" )) .timeout(duration.ofmillis( 5009 )) .build(); httpresponse<string> response = client.send(request, httpresponse.bodyhandlers.ofstring()); system.out.println(response.statuscode()); system.out.println(response.body()); } |
- authenticator可以用来设置http authentication,比如basic authentication
- 虽然basic authentication也可以自己设置header,不过通过authenticator省得自己去构造header
设置header
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@test public void testcookies() throws ioexception, interruptedexception { httpclient client = httpclient.newbuilder() .connecttimeout(duration.ofmillis( 5000 )) .build(); httprequest request = httprequest.newbuilder() .uri(uri.create( "http://localhost:8080/json/cookie" )) .header( "cookie" , "jsessionid=4f994730-32d7-4e22-a18b-25667ddeb636; userid=java11" ) .timeout(duration.ofmillis( 5009 )) .build(); httpresponse<string> response = client.send(request, httpresponse.bodyhandlers.ofstring()); system.out.println(response.statuscode()); system.out.println(response.body()); } |
通过request可以自己设置header
get
同步
1
2
3
4
5
6
7
8
9
10
11
12
|
@test public void testsyncget() throws ioexception, interruptedexception { httpclient client = httpclient.newhttpclient(); httprequest request = httprequest.newbuilder() .uri(uri.create( "https://www.baidu.com" )) .build(); httpresponse<string> response = client.send(request, httpresponse.bodyhandlers.ofstring()); system.out.println(response.body()); } |
异步
1
2
3
4
5
6
7
8
9
10
11
|
@test public void testasyncget() throws executionexception, interruptedexception { httpclient client = httpclient.newhttpclient(); httprequest request = httprequest.newbuilder() .uri(uri.create( "https://www.baidu.com" )) .build(); completablefuture<string> result = client.sendasync(request, httpresponse.bodyhandlers.ofstring()) .thenapply(httpresponse::body); system.out.println(result.get()); } |
post表单
1
2
3
4
5
6
7
8
9
10
11
12
|
@test public void testpostform() throws ioexception, interruptedexception { httpclient client = httpclient.newbuilder().build(); httprequest request = httprequest.newbuilder() .uri(uri.create( "http://www.w3school.com.cn/demo/demo_form.asp" )) .header( "content-type" , "application/x-www-form-urlencoded" ) .post(httprequest.bodypublishers.ofstring( "name1=value1&name2=value2" )) .build(); httpresponse<string> response = client.send(request, httpresponse.bodyhandlers.ofstring()); system.out.println(response.statuscode()); } |
header指定内容是表单类型,然后通过bodypublishers.ofstring传递表单数据,需要自己构建表单参数
post json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
@test public void testpostjsongetjson() throws executionexception, interruptedexception, jsonprocessingexception { objectmapper objectmapper = new objectmapper(); stockdto dto = new stockdto(); dto.setname( "hj" ); dto.setsymbol( "hj" ); dto.settype(stockdto.stocktype.sh); string requestbody = objectmapper .writerwithdefaultprettyprinter() .writevalueasstring(dto); httprequest request = httprequest.newbuilder(uri.create( "http://localhost:8080/json/demo" )) .header( "content-type" , "application/json" ) .post(httprequest.bodypublishers.ofstring(requestbody)) .build(); completablefuture<stockdto> result = httpclient.newhttpclient() .sendasync(request, httpresponse.bodyhandlers.ofstring()) .thenapply(httpresponse::body) .thenapply(body -> { try { return objectmapper.readvalue(body,stockdto. class ); } catch (ioexception e) { return new stockdto(); } }); system.out.println(result.get()); } |
post json的话,body自己json化为string,然后header指定是json格式
文件上传
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
@test public void testuploadfile() throws ioexception, interruptedexception, urisyntaxexception { httpclient client = httpclient.newhttpclient(); path path = path.of(getclass().getclassloader().getresource( "body.txt" ).touri()); file file = path.tofile(); string multipartformdataboundary = "java11httpclientformboundary" ; org.apache.http.httpentity multipartentity = multipartentitybuilder.create() .addpart( "file" , new filebody(file, contenttype.default_binary)) .setboundary(multipartformdataboundary) //要设置,否则阻塞 .build(); httprequest request = httprequest.newbuilder() .uri(uri.create( "http://localhost:8080/file/upload" )) .header( "content-type" , "multipart/form-data; boundary=" + multipartformdataboundary) .post(httprequest.bodypublishers.ofinputstream(() -> { try { return multipartentity.getcontent(); } catch (ioexception e) { e.printstacktrace(); throw new runtimeexception(e); } })) .build(); httpresponse<string> response = client.send(request, httpresponse.bodyhandlers.ofstring()); system.out.println(response.body()); } |
- 官方的httpclient并没有提供类似webclient那种现成的bodyinserters.frommultipartdata方法,因此这里需要自己转换
- 这里使用org.apache.httpcomponents(httpclient及httpmime)的multipartentitybuilder构建multipartentity,最后通过httprequest.bodypublishers.ofinputstream来传递内容
- 这里header要指定content-type值为multipart/form-data以及boundary的值,否则服务端可能无法解析
文件下载
1
2
3
4
5
6
7
8
9
10
11
|
@test public void testasyncdownload() throws executionexception, interruptedexception { httpclient client = httpclient.newhttpclient(); httprequest request = httprequest.newbuilder() .uri(uri.create( "http://localhost:8080/file/download" )) .build(); completablefuture<path> result = client.sendasync(request, httpresponse.bodyhandlers.offile(paths.get( "/tmp/body.txt" ))) .thenapply(httpresponse::body); system.out.println(result.get()); } |
使用httpresponse.bodyhandlers.offile来接收文件
并发请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@test public void testconcurrentrequests(){ httpclient client = httpclient.newhttpclient(); list<string> urls = list.of( "http://www.baidu.com" , "http://www.alibaba.com/" , "http://www.tencent.com" ); list<httprequest> requests = urls.stream() .map(url -> httprequest.newbuilder(uri.create(url))) .map(reqbuilder -> reqbuilder.build()) .collect(collectors.tolist()); list<completablefuture<httpresponse<string>>> futures = requests.stream() .map(request -> client.sendasync(request, httpresponse.bodyhandlers.ofstring())) .collect(collectors.tolist()); futures.stream() .foreach(e -> e.whencomplete((resp,err) -> { if (err != null ){ err.printstacktrace(); } else { system.out.println(resp.body()); system.out.println(resp.statuscode()); } })); completablefuture.allof(futures .toarray(completablefuture<?>[]:: new )) .join(); } |
- sendasync方法返回的是completablefuture,可以方便地进行转换、组合等操作
- 这里使用completablefuture.allof组合在一起,最后调用join等待所有future完成
错误处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
@test public void testhandleexception() throws executionexception, interruptedexception { httpclient client = httpclient.newbuilder() .connecttimeout(duration.ofmillis( 5000 )) .build(); httprequest request = httprequest.newbuilder() .uri(uri.create( "https://twitter.com" )) .build(); completablefuture<string> result = client.sendasync(request, httpresponse.bodyhandlers.ofstring()) // .whencomplete((resp,err) -> { // if(err != null){ // err.printstacktrace(); // }else{ // system.out.println(resp.body()); // system.out.println(resp.statuscode()); // } // }) .thenapply(httpresponse::body) .exceptionally(err -> { err.printstacktrace(); return "fallback" ; }); system.out.println(result.get()); } |
- httpclient异步请求返回的是completablefuture<httpresponse<t>>,其自带exceptionally方法可以用来做fallback处理
- 另外值得注意的是httpclient不像webclient那样,它没有对4xx或5xx的状态码抛出异常,需要自己根据情况来处理,手动检测状态码抛出异常或者返回其他内容
http2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
@test public void testhttp2() throws urisyntaxexception { httpclient.newbuilder() .followredirects(httpclient.redirect.never) .version(httpclient.version.http_2) .build() .sendasync(httprequest.newbuilder() .uri( new uri( "https://http2.akamai.com/demo" )) .get() .build(), httpresponse.bodyhandlers.ofstring()) .whencomplete((resp,t) -> { if (t != null ){ t.printstacktrace(); } else { system.out.println(resp.version()); system.out.println(resp.statuscode()); } }).join(); } |
执行之后可以看到返回的response的version为http_2
websocket
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@test public void testwebsocket() throws interruptedexception { httpclient client = httpclient.newhttpclient(); websocket websocket = client.newwebsocketbuilder() .buildasync(uri.create( "ws://localhost:8080/echo" ), new websocket.listener() { @override public completionstage<?> ontext(websocket websocket, charsequence data, boolean last) { // request one more websocket.request( 1 ); // print the message when it's available return completablefuture.completedfuture(data) .thenaccept(system.out::println); } }).join(); websocket.sendtext( "hello " , false ); websocket.sendtext( "world " , true ); timeunit.seconds.sleep( 10 ); websocket.sendclose(websocket.normal_closure, "ok" ).join(); } |
- httpclient支持http2,也包含了websocket,通过newwebsocketbuilder去构造websocket
- 传入listener进行接收消息,要发消息的话,使用websocket来发送,关闭使用sendclose方法
reactive streams
httpclient本身就是reactive的,支持reactive streams,这里举responsesubscribers.bytearraysubscriber的源码看看:
java.net.http/jdk/internal/net/http/responsesubscribers.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
public static class bytearraysubscriber<t> implements bodysubscriber<t> { private final function< byte [], t> finisher; private final completablefuture<t> result = new minimalfuture<>(); private final list<bytebuffer> received = new arraylist<>(); private volatile flow.subscription subscription; public bytearraysubscriber(function< byte [],t> finisher) { this .finisher = finisher; } @override public void onsubscribe(flow.subscription subscription) { if ( this .subscription != null ) { subscription.cancel(); return ; } this .subscription = subscription; // we can handle whatever you've got subscription.request( long .max_value); } @override public void onnext(list<bytebuffer> items) { // incoming buffers are allocated by http client internally, // and won't be used anywhere except this place. // so it's free simply to store them for further processing. assert utils.hasremaining(items); received.addall(items); } @override public void onerror(throwable throwable) { received.clear(); result.completeexceptionally(throwable); } static private byte [] join(list<bytebuffer> bytes) { int size = utils.remaining(bytes, integer.max_value); byte [] res = new byte [size]; int from = 0 ; for (bytebuffer b : bytes) { int l = b.remaining(); b.get(res, from, l); from += l; } return res; } @override public void oncomplete() { try { result.complete(finisher.apply(join(received))); received.clear(); } catch (illegalargumentexception e) { result.completeexceptionally(e); } } @override public completionstage<t> getbody() { return result; } } |
- bodysubscriber接口继承了flow.subscriber<list<bytebuffer>>接口
- 这里的subscription来自flow类,该类是java9引入的,里头包含了支持reactive streams的实现
小结
httpclient在java11从incubator变为正式版,相对于传统的httpurlconnection其提升可不是一点半点,不仅支持异步,也支持reactive streams,同时也支持了http2以及websocket,非常值得大家使用。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://segmentfault.com/a/1190000016555671