在微服务架构中,我们将一个项目拆分成很多个独立的模块,这些独立的模块通过远程调用来互相配合工作,但是,在高并发情况下,通信次数的增加会导致总的通信时间增加,同时,线程池的资源也是有限的,高并发环境会导致有大量的线程处于等待状态,进而导致响应延迟,为了解决这些问题,我们需要来了解hystrix的请求合并。
hystrix中的请求合并,就是利用一个合并处理器,将对同一个服务发起的连续请求合并成一个请求进行处理(这些连续请求的时间窗默认为10ms),在这个过程中涉及到的一个核心类就是hystrixcollapser,ok,接下来我们就来看看如何实现hystrix的请求合并。
服务提供者接口
我需在在服务提供者中提供两个接口供服务消费者调用,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
@requestmapping ( "/getbook6" ) public list<book> book6(string ids) { system.out.println( "ids>>>>>>>>>>>>>>>>>>>>>" + ids); arraylist<book> books = new arraylist<>(); books.add( new book( "《李自成》" , 55 , "姚雪垠" , "人民文学出版社" )); books.add( new book( "中国文学简史" , 33 , "林庚" , "清华大学出版社" )); books.add( new book( "文学改良刍议" , 33 , "胡适" , "无" )); books.add( new book( "ids" , 22 , "helloworld" , "haha" )); return books; } @requestmapping ( "/getbook6/{id}" ) public book book61( @pathvariable integer id) { book book = new book( "《李自成》2" , 55 , "姚雪垠2" , "人民文学出版社2" ); return book; } |
第一个接口是一个批处理接口,第二个接口是一个处理单个请求的接口。在批处理接口中,服务消费者传来的ids参数格式是1,2,3,4…这种格式,正常情况下我们需要根据ids查询到对应的数据,然后组装成一个集合返回,我这里为了处理方便,不管什么样的请求统统都返回一样的数据集;处理单个请求的接口就比较简单了,不再赘述。
服务消费者
ok,服务提供者处理好之后,接下来我们来看看服务消费者要怎么处理。
bookservice
首先在bookservice中添加两个方法用来调用服务提供者提供的接口,如下:
1
2
3
4
5
6
7
8
9
|
public book test8( long id) { return resttemplate.getforobject( "http://hello-service/getbook6/{1}" , book. class , id); } public list<book> test9(list< long > ids) { system.out.println( "test9---------" +ids+ "thread.currentthread().getname():" + thread.currentthread().getname()); book[] books = resttemplate.getforobject( "http://hello-service/getbook6?ids={1}" , book[]. class , stringutils.join(ids, "," )); return arrays.aslist(books); } |
test8用来调用提供单个id的接口,test9用来调用批处理的接口,在test9中,我将test9执行时所处的线程打印出来,方便我们观察执行结果,另外,在resttemplate中,如果返回值是一个集合,我们得先用一个数组接收,然后再转为集合(或许也有其他办法,小伙伴们有更好的建议可以提)。
bookbatchcommand
ok,bookservice中的方法准备好了后,我们就可以来创建一个bookbatchcommand,这是一个批处理命令,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
public class bookbatchcommand extends hystrixcommand<list<book>> { private list< long > ids; private bookservice bookservice; public bookbatchcommand(list< long > ids, bookservice bookservice) { super (setter.withgroupkey(hystrixcommandgroupkey.factory.askey( "collapsinggroup" )) .andcommandkey(hystrixcommandkey.factory.askey( "collapsingkey" ))); this .ids = ids; this .bookservice = bookservice; } @override protected list<book> run() throws exception { return bookservice.test9(ids); } } |
这个类实际上和我们在上篇博客中介绍的类差不多,都是继承自hystrixcommand,用来处理合并之后的请求,在run方法中调用bookservice中的test9方法。
bookcollapsecommand
接下来我们需要创建bookcollapsecommand继承自hystrixcollapser来实现请求合并。如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
public class bookcollapsecommand extends hystrixcollapser<list<book>, book, long > { private bookservice bookservice; private long id; public bookcollapsecommand(bookservice bookservice, long id) { super (setter.withcollapserkey(hystrixcollapserkey.factory.askey( "bookcollapsecommand" )).andcollapserpropertiesdefaults(hystrixcollapserproperties.setter().withtimerdelayinmilliseconds( 100 ))); this .bookservice = bookservice; this .id = id; } @override public long getrequestargument() { return id; } @override protected hystrixcommand<list<book>> createcommand(collection<collapsedrequest<book, long >> collapsedrequests) { list< long > ids = new arraylist<>(collapsedrequests.size()); ids.addall(collapsedrequests.stream().map(collapsedrequest::getargument).collect(collectors.tolist())); bookbatchcommand bookbatchcommand = new bookbatchcommand(ids, bookservice); return bookbatchcommand; } @override protected void mapresponsetorequests(list<book> batchresponse, collection<collapsedrequest<book, long >> collapsedrequests) { system.out.println( "mapresponsetorequests" ); int count = 0 ; for (collapsedrequest<book, long > collapsedrequest : collapsedrequests) { book book = batchresponse.get(count++); collapsedrequest.setresponse(book); } } } |
关于这个类,我说如下几点:
1.首先在构造方法中,我们设置了请求时间窗为100ms,即请求时间间隔在100ms之内的请求会被合并为一个请求。
2.createcommand方法主要用来合并请求,在这里获取到各个单个请求的id,将这些单个的id放到一个集合中,然后再创建出一个bookbatchcommand对象,用该对象去发起一个批量请求。
3.mapresponsetorequests方法主要用来为每个请求设置请求结果。该方法的第一个参数batchresponse表示批处理请求的结果,第二个参数collapsedrequests则代表了每一个被合并的请求,然后我们通过遍历batchresponse来为collapsedrequests设置请求结果。
ok,所有的这些操作完成后,我们就可以来测试啦。
测试
我们在服务消费者端创建访问接口,来测试合并请求,测试接口如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
@requestmapping ( "/test7" ) @responsebody public void test7() throws executionexception, interruptedexception { hystrixrequestcontext context = hystrixrequestcontext.initializecontext(); bookcollapsecommand bc1 = new bookcollapsecommand(bookservice, 1l); bookcollapsecommand bc2 = new bookcollapsecommand(bookservice, 2l); bookcollapsecommand bc3 = new bookcollapsecommand(bookservice, 3l); bookcollapsecommand bc4 = new bookcollapsecommand(bookservice, 4l); future<book> q1 = bc1.queue(); future<book> q2 = bc2.queue(); future<book> q3 = bc3.queue(); book book1 = q1.get(); book book2 = q2.get(); book book3 = q3.get(); thread.sleep( 3000 ); future<book> q4 = bc4.queue(); book book4 = q4.get(); system.out.println( "book1>>>" +book1); system.out.println( "book2>>>" +book2); system.out.println( "book3>>>" +book3); system.out.println( "book4>>>" +book4); context.close(); } |
关于这个测试接口我说如下两点:
1.首先要初始化hystrixrequestcontext
2.创建bookcollapsecommand类的实例来发起请求,先发送3个请求,然后睡眠3秒钟,再发起1个请求,这样,前3个请求就会被合并为一个请求,第四个请求因为间隔的时间比较久,所以不会被合并,而是单独创建一个线程去处理。
ok,我们来看看执行结果,如下:
通过注解实现请求合并
ok,上面这种请求合并方式写起来稍微有一点麻烦,我们可以使用注解来更优雅的实现这一功能。首先在bookservice中添加两个方法,如下:
1
2
3
4
5
6
7
8
9
10
11
|
@hystrixcollapser (batchmethod = "test11" ,collapserproperties = { @hystrixproperty (name = "timerdelayinmilliseconds" ,value = "100" )}) public future<book> test10( long id) { return null ; } @hystrixcommand public list<book> test11(list< long > ids) { system.out.println( "test9---------" +ids+ "thread.currentthread().getname():" + thread.currentthread().getname()); book[] books = resttemplate.getforobject( "http://hello-service/getbook6?ids={1}" , book[]. class , stringutils.join(ids, "," )); return arrays.aslist(books); } |
在test10方法上添加@hystrixcollapser注解实现请求合并,用batchmethod属性指明请求合并后的处理方法,collapserproperties属性指定其他属性。
ok,在bookservice中写好之后,直接调用就可以了,如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
@requestmapping ( "/test8" ) @responsebody public void test8() throws executionexception, interruptedexception { hystrixrequestcontext context = hystrixrequestcontext.initializecontext(); future<book> f1 = bookservice.test10(1l); future<book> f2 = bookservice.test10(2l); future<book> f3 = bookservice.test10(3l); book b1 = f1.get(); book b2 = f2.get(); book b3 = f3.get(); thread.sleep( 3000 ); future<book> f4 = bookservice.test10(4l); book b4 = f4.get(); system.out.println( "b1>>>" +b1); system.out.println( "b2>>>" +b2); system.out.println( "b3>>>" +b3); system.out.println( "b4>>>" +b4); context.close(); } |
和前面的一样,前三个请求会进行合并,第四个请求会单独执行,ok,执行结果如下:
总结
请求合并的优点小伙伴们已经看到了,多个请求被合并为一个请求进行一次性处理,可以有效节省网络带宽和线程池资源,但是,有优点必然也有缺点,设置请求合并之后,本来一个请求可能5ms就搞定了,但是现在必须再等10ms看看还有没有其他的请求一起的,这样一个请求的耗时就从5ms增加到15ms了,不过,如果我们要发起的命令本身就是一个高延迟的命令,那么这个时候就可以使用请求合并了,因为这个时候时间窗的时间消耗就显得微不足道了,另外高并发也是请求合并的一个非常重要的场景。
ok,我们的请求合并就说到这里,有问题欢迎小伙伴们留言讨论。希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/u012702547/article/details/78213270