ribbon是netflix发布的开源项目,主要功能是提供客户端的软件负载均衡算法,将netflix的中间层服务连接在一起。ribbon客户端组件提供一系列完善的配置项如连接超时,重试等。简单的说,就是在配置文件中列出load balancer(简称lb)后面所有的机器,ribbon会自动的帮助你基于某种规则(如简单轮询,随即连接等)去连接这些机器。我们也很容易使用ribbon实现自定义的负载均衡算法。
说起负载均衡一般都会想到服务端的负载均衡,常用产品包括lbs硬件或云服务、nginx等,都是耳熟能详的产品。
而spring cloud提供了让服务调用端具备负载均衡能力的ribbon,通过和eureka的紧密结合,不用在服务集群内再架设负载均衡服务,很大程度简化了服务集群内的架构。
具体也不想多写虚的介绍,反正哪里都能看得到相关的介绍。
直接开撸代码,通过代码来看ribbon是如何实现的。
配置
详解:
1.ribbonautoconfiguration配置生成ribbonloadbalancerclient实例。
代码位置:
spring-cloud-netflix-core-1.3.5.release.jar
org.springframework.cloud.netflix.ribbon
ribbonautoconfiguration.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
@configuration @conditionalonclass ({ iclient. class , resttemplate. class , asyncresttemplate. class , ribbon. class }) @ribbonclients @autoconfigureafter (name = "org.springframework.cloud.netflix.eureka.eurekaclientautoconfiguration" ) @autoconfigurebefore ({loadbalancerautoconfiguration. class , asyncloadbalancerautoconfiguration. class }) @enableconfigurationproperties (ribboneagerloadproperties. class ) public class ribbonautoconfiguration { // 略 @bean @conditionalonmissingbean (loadbalancerclient. class ) public loadbalancerclient loadbalancerclient() { return new ribbonloadbalancerclient(springclientfactory()); } // 略 } |
先看配置条件项,ribbonautoconfiguration配置必须在loadbalancerautoconfiguration配置前执行,因为在loadbalancerautoconfiguration配置中会使用ribbonloadbalancerclient实例。
ribbonloadbalancerclient继承自loadbalancerclient接口,是负载均衡客户端,也是负载均衡策略的调用方。
2.loadbalancerinterceptorconfig配置生成:
1).负载均衡拦截器loadbalancerinterceptor实例
包含:
loadbalancerclient实现类的ribbonloadbalancerclient实例
负载均衡的请求创建工厂loadbalancerrequestfactory:实例
2).resttemplate自定义的resttemplatecustomizer实例
代码位置:
spring-cloud-commons-1.2.4.release.jar
org.springframework.cloud.client.loadbalancer
loadbalancerautoconfiguration.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
@configuration @conditionalonclass (resttemplate. class ) @conditionalonbean (loadbalancerclient. class ) @enableconfigurationproperties (loadbalancerretryproperties. class ) public class loadbalancerautoconfiguration { // 略 @bean @conditionalonmissingbean public loadbalancerrequestfactory loadbalancerrequestfactory( loadbalancerclient loadbalancerclient) { return new loadbalancerrequestfactory(loadbalancerclient, transformers); } @configuration @conditionalonmissingclass ( "org.springframework.retry.support.retrytemplate" ) static class loadbalancerinterceptorconfig { @bean public loadbalancerinterceptor ribboninterceptor( loadbalancerclient loadbalancerclient, loadbalancerrequestfactory requestfactory) { return new loadbalancerinterceptor(loadbalancerclient, requestfactory); } @bean @conditionalonmissingbean public resttemplatecustomizer resttemplatecustomizer( final loadbalancerinterceptor loadbalancerinterceptor) { return new resttemplatecustomizer() { @override public void customize(resttemplate resttemplate) { list<clienthttprequestinterceptor> list = new arraylist<>( resttemplate.getinterceptors()); list.add(loadbalancerinterceptor); resttemplate.setinterceptors(list); } }; } } // 略 } |
先看配置条件项:
要求在项目环境中必须要有resttemplate类。
要求必须要有loadbalancerclient接口的实现类的实例,也就是上一步生成的ribbonloadbalancerclient。
3.通过上面一步创建的resttemplatecustomizer配置所有resttemplate实例,就是将负载均衡拦截器设置给resttemplate实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
@configuration @conditionalonclass (resttemplate. class ) @conditionalonbean (loadbalancerclient. class ) @enableconfigurationproperties (loadbalancerretryproperties. class ) public class loadbalancerautoconfiguration { // 略 @bean public smartinitializingsingleton loadbalancedresttemplateinitializer( final list<resttemplatecustomizer> customizers) { return new smartinitializingsingleton() { @override public void aftersingletonsinstantiated() { for (resttemplate resttemplate : loadbalancerautoconfiguration. this .resttemplates) { for (resttemplatecustomizer customizer : customizers) { customizer.customize(resttemplate); } } } }; } // 略 @configuration @conditionalonmissingclass ( "org.springframework.retry.support.retrytemplate" ) static class loadbalancerinterceptorconfig { @bean public loadbalancerinterceptor ribboninterceptor( loadbalancerclient loadbalancerclient, loadbalancerrequestfactory requestfactory) { return new loadbalancerinterceptor(loadbalancerclient, requestfactory); } @bean @conditionalonmissingbean public resttemplatecustomizer resttemplatecustomizer( final loadbalancerinterceptor loadbalancerinterceptor) { return new resttemplatecustomizer() { @override public void customize(resttemplate resttemplate) { list<clienthttprequestinterceptor> list = new arraylist<>( resttemplate.getinterceptors()); list.add(loadbalancerinterceptor); resttemplate.setinterceptors(list); } }; } } // 略 } |
resttemplate.setinterceptors(list)这个地方就是注入负载均衡拦截器的地方loadbalancerinterceptor。
从这个地方实际上也可以猜出来,resttemplate可以通过注入的拦截器来构建相应的请求实现负载均衡。
也能看出来可以自定义拦截器实现其他目的。
4.ribbonclientconfiguration配置生成zoneawareloadbalancer实例
代码位置:
spring-cloud-netflix-core-1.3.5.release.jar
org.springframework.cloud.netflix.ribbon
ribbonclientconfiguration.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
@suppresswarnings ( "deprecation" ) @configuration @enableconfigurationproperties //order is important here, last should be the default, first should be optional // see https://github.com/spring-cloud/spring-cloud-netflix/issues/2086#issuecomment-316281653 @import ({okhttpribbonconfiguration. class , restclientribbonconfiguration. class , httpclientribbonconfiguration. class }) public class ribbonclientconfiguration { // 略 @bean @conditionalonmissingbean public iloadbalancer ribbonloadbalancer(iclientconfig config, serverlist<server> serverlist, serverlistfilter<server> serverlistfilter, irule rule, iping ping, serverlistupdater serverlistupdater) { if ( this .propertiesfactory.isset(iloadbalancer. class , name)) { return this .propertiesfactory.get(iloadbalancer. class , config, name); } return new zoneawareloadbalancer<>(config, rule, ping, serverlist, serverlistfilter, serverlistupdater); } // 略 } |
zoneawareloadbalancer继承自iloadbalancer接口,该接口有一个方法:
1
2
3
4
5
6
7
8
|
/** * choose a server from load balancer. * * @param key an object that the load balancer may use to determine which server to return. null if * the load balancer does not use this parameter. * @return server chosen */ public server chooseserver(object key); |
zoneawareloadbalancer就是一个具体的负载均衡实现类,也是默认的负载均衡类,通过对chooseserver方法的实现选取某个服务实例。
拦截&请求
1.使用resttemplate进行get、post等各种请求,都是通过doexecute方法实现
代码位置:
spring-web-4.3.12.release.jar
org.springframework.web.client
resttemplate.class
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
|
public class resttemplate extends interceptinghttpaccessor implements restoperations { // 略 protected <t> t doexecute(uri url, httpmethod method, requestcallback requestcallback, responseextractor<t> responseextractor) throws restclientexception { assert .notnull(url, "'url' must not be null" ); assert .notnull(method, "'method' must not be null" ); clienthttpresponse response = null ; try { clienthttprequest request = createrequest(url, method); if (requestcallback != null ) { requestcallback.dowithrequest(request); } response = request.execute(); handleresponse(url, method, response); if (responseextractor != null ) { return responseextractor.extractdata(response); } else { return null ; } } catch (ioexception ex) { string resource = url.tostring(); string query = url.getrawquery(); resource = (query != null ? resource.substring( 0 , resource.indexof( '?' )) : resource); throw new resourceaccessexception( "i/o error on " + method.name() + " request for \"" + resource + "\": " + ex.getmessage(), ex); } finally { if (response != null ) { response.close(); } } } // 略 } |
支持的各种http请求方法最终都是调用doexecute方法,该方法内调用创建方法创建请求实例,并执行请求得到响应对象。
2.生成请求实例创建工厂
上一步代码中,调用createrequest方法创建请求实例,这个方法是定义在父类中。
先整理出主要的继承关系:
createrequest方法实际是定义在httpaccessor抽象类中。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public abstract class httpaccessor { private clienthttprequestfactory requestfactory = new simpleclienthttprequestfactory(); public void setrequestfactory(clienthttprequestfactory requestfactory) { assert .notnull(requestfactory, "clienthttprequestfactory must not be null" ); this .requestfactory = requestfactory; } public clienthttprequestfactory getrequestfactory() { return this .requestfactory; } protected clienthttprequest createrequest(uri url, httpmethod method) throws ioexception { clienthttprequest request = getrequestfactory().createrequest(url, method); if (logger.isdebugenabled()) { logger.debug( "created " + method.name() + " request for \"" + url + "\"" ); } return request; } } |
在createrequest方法中调用getrequestfactory方法获得请求实例创建工厂,实际上getrequestfactory并不是当前httpaccessor类中定义的,而是在子类interceptinghttpaccessor中定义的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
public abstract class interceptinghttpaccessor extends httpaccessor { private list<clienthttprequestinterceptor> interceptors = new arraylist<clienthttprequestinterceptor>(); public void setinterceptors(list<clienthttprequestinterceptor> interceptors) { this .interceptors = interceptors; } public list<clienthttprequestinterceptor> getinterceptors() { return interceptors; } @override public clienthttprequestfactory getrequestfactory() { clienthttprequestfactory delegate = super .getrequestfactory(); if (!collectionutils.isempty(getinterceptors())) { return new interceptingclienthttprequestfactory(delegate, getinterceptors()); } else { return delegate; } } } |
在这里做了个小动作,首先还是通过httpaccessor类创建并获得simpleclienthttprequestfactory工厂,这个工厂主要就是在没有拦截器的时候创建基本请求实例。
其次,在有拦截器注入的情况下,创建interceptingclienthttprequestfactory工厂,该工厂就是创建带拦截器的请求实例,因为注入了负载均衡拦截器,所以这里就从interceptingclienthttprequestfactory工厂创建。
3.通过工厂创建请求实例
创建实例就看工厂的createrequest方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public class interceptingclienthttprequestfactory extends abstractclienthttprequestfactorywrapper { private final list<clienthttprequestinterceptor> interceptors; public interceptingclienthttprequestfactory(clienthttprequestfactory requestfactory, list<clienthttprequestinterceptor> interceptors) { super (requestfactory); this .interceptors = (interceptors != null ? interceptors : collections.<clienthttprequestinterceptor>emptylist()); } @override protected clienthttprequest createrequest(uri uri, httpmethod httpmethod, clienthttprequestfactory requestfactory) { return new interceptingclienthttprequest(requestfactory, this .interceptors, uri, httpmethod); } } |
就是new了个interceptingclienthttprequest实例,并且把拦截器、基本请求实例创建工厂注进去。
4.请求实例调用配置阶段注入的负载均衡拦截器的拦截方法intercept
可从第1步看出,创建完请求实例后,通过执行请求实例的execute方法执行请求。
1
2
3
4
5
|
clienthttprequest request = createrequest(url, method); if (requestcallback != null ) { requestcallback.dowithrequest(request); } response = request.execute(); |
实际请求实例是interceptingclienthttprequest,execute实际是在它的父类中。
类定义位置:
spring-web-4.3.12.release.jar
org.springframework.http.client
interceptingclienthttprequest.class
看一下它们的继承关系。
在execute方法中实际调用了子类实现的executeinternal方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
|
public abstract class abstractclienthttprequest implements clienthttprequest { private final httpheaders headers = new httpheaders(); private boolean executed = false ; @override public final httpheaders getheaders() { return ( this .executed ? httpheaders.readonlyhttpheaders( this .headers) : this .headers); } @override public final outputstream getbody() throws ioexception { assertnotexecuted(); return getbodyinternal( this .headers); } @override public final clienthttpresponse execute() throws ioexception { assertnotexecuted(); clienthttpresponse result = executeinternal( this .headers); this .executed = true ; return result; } protected void assertnotexecuted() { assert .state(! this .executed, "clienthttprequest already executed" ); } protected abstract outputstream getbodyinternal(httpheaders headers) throws ioexception; protected abstract clienthttpresponse executeinternal(httpheaders headers) throws ioexception; } |
其实就是interceptingclienthttprequest类的executeinternal方法,其中,又调用了一个执行器interceptingrequestexecution的execute,通关判断如果有拦截器注入进来过,就调用拦截器的intercept方法。
这里的拦截器实际上就是在配置阶段注入进resttemplate实例的负载均衡拦截器loadbalancerinterceptor实例,可参考上面配置阶段的第2步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
class interceptingclienthttprequest extends abstractbufferingclienthttprequest { // 略 @override protected final clienthttpresponse executeinternal(httpheaders headers, byte [] bufferedoutput) throws ioexception { interceptingrequestexecution requestexecution = new interceptingrequestexecution(); return requestexecution.execute( this , bufferedoutput); } private class interceptingrequestexecution implements clienthttprequestexecution { private final iterator<clienthttprequestinterceptor> iterator; public interceptingrequestexecution() { this .iterator = interceptors.iterator(); } @override public clienthttpresponse execute(httprequest request, byte [] body) throws ioexception { if ( this .iterator.hasnext()) { clienthttprequestinterceptor nextinterceptor = this .iterator.next(); return nextinterceptor.intercept(request, body, this ); } else { clienthttprequest delegate = requestfactory.createrequest(request.geturi(), request.getmethod()); for (map.entry<string, list<string>> entry : request.getheaders().entryset()) { list<string> values = entry.getvalue(); for (string value : values) { delegate.getheaders().add(entry.getkey(), value); } } if (body.length > 0 ) { streamutils.copy(body, delegate.getbody()); } return delegate.execute(); } } } } |
5.负载均衡拦截器调用负载均衡客户端
在负载均衡拦截器loadbalancerinterceptor类的intercept方法中,又调用了负载均衡客户端loadbalancerclient实现类的execute方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
public class loadbalancerinterceptor implements clienthttprequestinterceptor { private loadbalancerclient loadbalancer; private loadbalancerrequestfactory requestfactory; public loadbalancerinterceptor(loadbalancerclient loadbalancer, loadbalancerrequestfactory requestfactory) { this .loadbalancer = loadbalancer; this .requestfactory = requestfactory; } public loadbalancerinterceptor(loadbalancerclient loadbalancer) { // for backwards compatibility this (loadbalancer, new loadbalancerrequestfactory(loadbalancer)); } @override public clienthttpresponse intercept( final httprequest request, final byte [] body, final clienthttprequestexecution execution) throws ioexception { final uri originaluri = request.geturi(); string servicename = originaluri.gethost(); assert .state(servicename != null , "request uri does not contain a valid hostname: " + originaluri); return this .loadbalancer.execute(servicename, requestfactory.createrequest(request, body, execution)); } } |
在配置阶段的第1步,可以看到实现类是ribbonloadbalancerclient。
6.负载均衡客户端调用负载均衡策略选取目标服务实例并发起请求
在ribbonloadbalancerclient的第一个execute方法以及getserver方法中可以看到,实际上是通过iloadbalancer的负载均衡器实现类作的chooseserver方法选取一个服务,交给接下来的请求对象发起一个请求。
这里的负载均衡实现类默认是zoneawareloadbalancer区域感知负载均衡器实例,其内部通过均衡策略选择一个服务。
zoneawareloadbalancer的创建可以参考配置阶段的第4步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
public class ribbonloadbalancerclient implements loadbalancerclient { @override public <t> t execute(string serviceid, loadbalancerrequest<t> request) throws ioexception { iloadbalancer loadbalancer = getloadbalancer(serviceid); server server = getserver(loadbalancer); if (server == null ) { throw new illegalstateexception( "no instances available for " + serviceid); } ribbonserver ribbonserver = new ribbonserver(serviceid, server, issecure(server, serviceid), serverintrospector(serviceid).getmetadata(server)); return execute(serviceid, ribbonserver, request); } @override public <t> t execute(string serviceid, serviceinstance serviceinstance, loadbalancerrequest<t> request) throws ioexception { server server = null ; if (serviceinstance instanceof ribbonserver) { server = ((ribbonserver)serviceinstance).getserver(); } if (server == null ) { throw new illegalstateexception( "no instances available for " + serviceid); } ribbonloadbalancercontext context = this .clientfactory .getloadbalancercontext(serviceid); ribbonstatsrecorder statsrecorder = new ribbonstatsrecorder(context, server); try { t returnval = request.apply(serviceinstance); statsrecorder.recordstats(returnval); return returnval; } // catch ioexception and rethrow so resttemplate behaves correctly catch (ioexception ex) { statsrecorder.recordstats(ex); throw ex; } catch (exception ex) { statsrecorder.recordstats(ex); reflectionutils.rethrowruntimeexception(ex); } return null ; } // 略 protected server getserver(iloadbalancer loadbalancer) { if (loadbalancer == null ) { return null ; } return loadbalancer.chooseserver( "default" ); // todo: better handling of key } protected iloadbalancer getloadbalancer(string serviceid) { return this .clientfactory.getloadbalancer(serviceid); } public static class ribbonserver implements serviceinstance { private final string serviceid; private final server server; private final boolean secure; private map<string, string> metadata; public ribbonserver(string serviceid, server server) { this (serviceid, server, false , collections.<string, string> emptymap()); } public ribbonserver(string serviceid, server server, boolean secure, map<string, string> metadata) { this .serviceid = serviceid; this .server = server; this .secure = secure; this .metadata = metadata; } // 略 } } |
代码撸完,总结下。
普通使用resttemplate请求其他服务时,内部使用的就是常规的http请求实例发送请求。
为resttemplate增加了@loanbalanced 注解后,实际上通过配置,为resttemplate注入负载均衡拦截器,让负载均衡器选择根据其对应的策略选择合适的服务后,再发送请求。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:http://www.cnblogs.com/kongxianghai/p/8445030.html