前面我们的例子是一个固定的出参和入参,固定的方法实现。
本节将实现通用的调用,让框架具有更广泛的实用性。
基本思路
所有的方法调用,基于反射进行相关处理实现。
服务端
核心类
- RpcServer
调整如下:
- serverBootstrap.group(workerGroup,bossGroup)
- .channel(NioServerSocketChannel.class)
- //打印日志
- .handler(newLoggingHandler(LogLevel.INFO))
-
.childHandler(newChannelInitializer
(){ - @Override
- protectedvoidinitChannel(Channelch)throwsException{
- ch.pipeline()
- //解码bytes=>resp
- .addLast(newObjectDecoder(Integer.MAX_VALUE,ClassResolvers.cacheDisabled(null)))
- //request=>bytes
- .addLast(newObjectEncoder())
- .addLast(newRpcServerHandler());
- }
- })
- //这个参数影响的是还没有被accept取出的连接
- .option(ChannelOption.SO_BACKLOG,128)
- //这个参数只是过一段时间内客户端没有响应,服务端会发送一个ack包,以判断客户端是否还活着。
- .childOption(ChannelOption.SO_KEEPALIVE,true);
其中 ObjectDecoder 和 ObjectEncoder 都是 netty 内置的实现。
RpcServerHandler
- packagecom.github.houbb.rpc.server.handler;
- importcom.github.houbb.log.integration.core.Log;
- importcom.github.houbb.log.integration.core.LogFactory;
- importcom.github.houbb.rpc.common.rpc.domain.RpcRequest;
- importcom.github.houbb.rpc.common.rpc.domain.impl.DefaultRpcResponse;
- importcom.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
- importio.netty.channel.ChannelHandlerContext;
- importio.netty.channel.SimpleChannelInboundHandler;
- /**
- *@authorbinbin.hou
- *@since0.0.1
- */
- publicclassRpcServerHandlerextendsSimpleChannelInboundHandler{
- privatestaticfinalLoglog=LogFactory.getLog(RpcServerHandler.class);
- @Override
- publicvoidchannelActive(ChannelHandlerContextctx)throwsException{
- finalStringid=ctx.channel().id().asLongText();
- log.info("[Server]channel{}connected"+id);
- }
- @Override
- protectedvoidchannelRead0(ChannelHandlerContextctx,Objectmsg)throwsException{
- finalStringid=ctx.channel().id().asLongText();
- log.info("[Server]channelreadstart:{}",id);
- //接受客户端请求
- RpcRequestrpcRequest=(RpcRequest)msg;
- log.info("[Server]receivechannel{}request:{}",id,rpcRequest);
- //回写到client端
- DefaultRpcResponserpcResponse=handleRpcRequest(rpcRequest);
- ctx.writeAndFlush(rpcResponse);
- log.info("[Server]channel{}response{}",id,rpcResponse);
- }
- @Override
- publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{
- cause.printStackTrace();
- ctx.close();
- }
- /**
- *处理请求信息
- *@paramrpcRequest请求信息
- *@return结果信息
- *@since0.0.6
- */
- privateDefaultRpcResponsehandleRpcRequest(finalRpcRequestrpcRequest){
- DefaultRpcResponserpcResponse=newDefaultRpcResponse();
- rpcResponse.seqId(rpcRequest.seqId());
- try{
- //获取对应的service实现类
- //rpcRequest=>invocationRequest
- //执行invoke
- Objectresult=DefaultServiceFactory.getInstance()
- .invoke(rpcRequest.serviceId(),
- rpcRequest.methodName(),
- rpcRequest.paramTypeNames(),
- rpcRequest.paramValues());
- rpcResponse.result(result);
- }catch(Exceptione){
- rpcResponse.error(e);
- log.error("[Server]executemeetexforrequest",rpcRequest,e);
- }
- //构建结果值
- returnrpcResponse;
- }
- }
和以前类似,不过 handleRpcRequest 要稍微麻烦一点。
这里需要根据发射,调用对应的方法。
pojo
其中使用的出参、入参实现如下:
RpcRequest
- packagecom.github.houbb.rpc.common.rpc.domain;
- importjava.util.List;
- /**
- *序列化相关处理
- *(1)调用创建时间-createTime
- *(2)调用方式callType
- *(3)超时时间timeOut
- *
- *额外信息:
- *(1)上下文信息
- *
- *@authorbinbin.hou
- *@since0.0.6
- */
- publicinterfaceRpcRequestextendsBaseRpc{
- /**
- *创建时间
- *@return创建时间
- *@since0.0.6
- */
- longcreateTime();
- /**
- *服务唯一标识
- *@return服务唯一标识
- *@since0.0.6
- */
- StringserviceId();
- /**
- *方法名称
- *@return方法名称
- *@since0.0.6
- */
- StringmethodName();
- /**
- *方法类型名称列表
- *@return名称列表
- *@since0.0.6
- */
-
List
paramTypeNames(); - //调用参数信息列表
- /**
- *调用参数值
- *@return参数值数组
- *@since0.0.6
- */
- Object[]paramValues();
- }
RpcResponse
- packagecom.github.houbb.rpc.common.rpc.domain;
- /**
- *序列化相关处理
- *@authorbinbin.hou
- *@since0.0.6
- */
- publicinterfaceRpcResponseextendsBaseRpc{
- /**
- *异常信息
- *@return异常信息
- *@since0.0.6
- */
- Throwableerror();
- /**
- *请求结果
- *@return请求结果
- *@since0.0.6
- */
- Objectresult();
- }
BaseRpc
- packagecom.github.houbb.rpc.common.rpc.domain;
- importjava.io.Serializable;
- /**
- *序列化相关处理
- *@authorbinbin.hou
- *@since0.0.6
- */
- publicinterfaceBaseRpcextendsSerializable{
- /**
- *获取唯一标识号
- *(1)用来唯一标识一次调用,便于获取该调用对应的响应信息。
- *@return唯一标识号
- */
- StringseqId();
- /**
- *设置唯一标识号
- *@paramtraceId唯一标识号
- *@returnthis
- */
- BaseRpcseqId(finalStringtraceId);
- }
ServiceFactory-服务工厂
为了便于对所有的 service 实现类统一管理,这里定义 service 工厂类。
ServiceFactory
- packagecom.github.houbb.rpc.server.service;
- importcom.github.houbb.rpc.server.config.service.ServiceConfig;
- importcom.github.houbb.rpc.server.registry.ServiceRegistry;
- importjava.util.List;
- /**
- *服务方法类仓库管理类-接口
- *
- *
- *(1)对外暴露的方法,应该尽可能的少。
- *(2)对于外部的调用,后期比如telnet治理,可以使用比如有哪些服务列表?
- *单个服务有哪些方法名称?
- *
- *等等基础信息的查询,本期暂时全部隐藏掉。
- *
- *(3)前期尽可能的少暴露方法。
- *@authorbinbin.hou
- *@since0.0.6
- *@seeServiceRegistry服务注册,将服务信息放在这个类中,进行统一的管理。
- *@seeServiceMethod方法信息
- */
- publicinterfaceServiceFactory{
- /**
- *注册服务列表信息
- *@paramserviceConfigList服务配置列表
- *@returnthis
- *@since0.0.6
- */
-
ServiceFactoryregisterServices(finalList
serviceConfigList); - /**
- *直接反射调用
- *(1)此处对于方法反射,为了提升性能,所有的class.getFullName()进行拼接然后放进key中。
- *
- *@paramserviceId服务名称
- *@parammethodName方法名称
- *@paramparamTypeNames参数类型名称列表
- *@paramparamValues参数值
- *@return方法调用返回值
- *@since0.0.6
- */
- Objectinvoke(finalStringserviceId,finalStringmethodName,
-
List
paramTypeNames,finalObject[]paramValues); - }
DefaultServiceFactory
作为默认实现,如下:
- packagecom.github.houbb.rpc.server.service.impl;
- importcom.github.houbb.heaven.constant.PunctuationConst;
- importcom.github.houbb.heaven.util.common.ArgUtil;
- importcom.github.houbb.heaven.util.lang.reflect.ReflectMethodUtil;
- importcom.github.houbb.heaven.util.util.CollectionUtil;
- importcom.github.houbb.rpc.common.exception.RpcRuntimeException;
- importcom.github.houbb.rpc.server.config.service.ServiceConfig;
- importcom.github.houbb.rpc.server.service.ServiceFactory;
- importjava.lang.reflect.InvocationTargetException;
- importjava.lang.reflect.Method;
- importjava.util.HashMap;
- importjava.util.List;
- importjava.util.Map;
- /**
- *默认服务仓库实现
- *@authorbinbin.hou
- *@since0.0.6
- */
- publicclassDefaultServiceFactoryimplementsServiceFactory{
- /**
- *服务map
- *@since0.0.6
- */
-
privateMap
serviceMap; - /**
- *直接获取对应的method信息
- *(1)key:serviceId:methodName:param1@param2@param3
- *(2)value:对应的method信息
- */
-
privateMap
methodMap; - privatestaticfinalDefaultServiceFactoryINSTANCE=newDefaultServiceFactory();
- privateDefaultServiceFactory(){}
- publicstaticDefaultServiceFactorygetInstance(){
- returnINSTANCE;
- }
- /**
- *服务注册一般在项目启动的时候,进行处理。
- *属于比较重的操作,而且一个服务按理说只应该初始化一次。
- *此处加锁为了保证线程安全。
- *@paramserviceConfigList服务配置列表
- *@returnthis
- */
- @Override
-
publicsynchronizedServiceFactoryregisterServices(List
serviceConfigList){ - ArgUtil.notEmpty(serviceConfigList,"serviceConfigList");
- //集合初始化
- serviceMap=newHashMap<>(serviceConfigList.size());
- //这里只是预估,一般为2个服务。
- methodMap=newHashMap<>(serviceConfigList.size()*2);
- for(ServiceConfigserviceConfig:serviceConfigList){
- serviceMap.put(serviceConfig.id(),serviceConfig.reference());
- }
- //存放方法名称
-
for(Map.Entry
entry:serviceMap.entrySet()){ - StringserviceId=entry.getKey();
- Objectreference=entry.getValue();
- //获取所有方法列表
- Method[]methods=reference.getClass().getMethods();
- for(Methodmethod:methods){
- StringmethodName=method.getName();
- if(ReflectMethodUtil.isIgnoreMethod(methodName)){
- continue;
- }
-
List
paramTypeNames=ReflectMethodUtil.getParamTypeNames(method); - Stringkey=buildMethodKey(serviceId,methodName,paramTypeNames);
- methodMap.put(key,method);
- }
- }
- returnthis;
- }
- @Override
-
publicObjectinvoke(StringserviceId,StringmethodName,List
paramTypeNames,Object[]paramValues){ - //参数校验
- ArgUtil.notEmpty(serviceId,"serviceId");
- ArgUtil.notEmpty(methodName,"methodName");
- //提供cache,可以根据前三个值快速定位对应的method
- //根据method进行反射处理。
- //对于paramTypes进行string连接处理。
- finalObjectreference=serviceMap.get(serviceId);
- finalStringmethodKey=buildMethodKey(serviceId,methodName,paramTypeNames);
- finalMethodmethod=methodMap.get(methodKey);
- try{
- returnmethod.invoke(reference,paramValues);
- }catch(IllegalAccessException|InvocationTargetExceptione){
- thrownewRpcRuntimeException(e);
- }
- }
- /**
- *(1)多个之间才用:分隔
- *(2)参数之间采用@分隔
- *@paramserviceId服务标识
- *@parammethodName方法名称
- *@paramparamTypeNames参数类型名称
- *@return构建完整的key
- *@since0.0.6
- */
-
privateStringbuildMethodKey(StringserviceId,StringmethodName,List
paramTypeNames){ - Stringparam=CollectionUtil.join(paramTypeNames,PunctuationConst.AT);
- returnserviceId+PunctuationConst.COLON+methodName+PunctuationConst.COLON
- +param;
- }
- }
ServiceRegistry-服务注册类
接口
- packagecom.github.houbb.rpc.server.registry;
- /**
- *服务注册类
- *(1)每个应用唯一
- *(2)每个服务的暴露协议应该保持一致
- *暂时不提供单个服务的特殊处理,后期可以考虑添加
- *
- *@authorbinbin.hou
- *@since0.0.6
- */
- publicinterfaceServiceRegistry{
- /**
- *暴露的rpc服务端口信息
- *@paramport端口信息
- *@returnthis
- *@since0.0.6
- */
- ServiceRegistryport(finalintport);
- /**
- *注册服务实现
- *@paramserviceId服务标识
- *@paramserviceImpl服务实现
- *@returnthis
- *@since0.0.6
- */
- ServiceRegistryregister(finalStringserviceId,finalObjectserviceImpl);
- /**
- *暴露所有服务信息
- *(1)启动服务端
- *@returnthis
- *@since0.0.6
- */
- ServiceRegistryexpose();
- }
实现
- packagecom.github.houbb.rpc.server.registry.impl;
- importcom.github.houbb.heaven.util.common.ArgUtil;
- importcom.github.houbb.rpc.common.config.protocol.ProtocolConfig;
- importcom.github.houbb.rpc.server.config.service.DefaultServiceConfig;
- importcom.github.houbb.rpc.server.config.service.ServiceConfig;
- importcom.github.houbb.rpc.server.core.RpcServer;
- importcom.github.houbb.rpc.server.registry.ServiceRegistry;
- importcom.github.houbb.rpc.server.service.impl.DefaultServiceFactory;
- importjava.util.ArrayList;
- importjava.util.List;
- /**
- *默认服务端注册类
- *@authorbinbin.hou
- *@since0.0.6
- */
- publicclassDefaultServiceRegistryimplementsServiceRegistry{
- /**
- *单例信息
- *@since0.0.6
- */
- privatestaticfinalDefaultServiceRegistryINSTANCE=newDefaultServiceRegistry();
- /**
- *rpc服务端端口号
- *@since0.0.6
- */
- privateintrpcPort;
- /**
- *协议配置
- *(1)默认只实现tcp
- *(2)后期可以拓展实现web-service/http/https等等。
- *@since0.0.6
- */
- privateProtocolConfigprotocolConfig;
- /**
- *服务配置列表
- *@since0.0.6
- */
-
privateList
serviceConfigList; - privateDefaultServiceRegistry(){
- //初始化默认参数
- this.serviceConfigList=newArrayList<>();
- this.rpcPort=9527;
- }
- publicstaticDefaultServiceRegistrygetInstance(){
- returnINSTANCE;
- }
- @Override
- publicServiceRegistryport(intport){
- ArgUtil.positive(port,"port");
- this.rpcPort=port;
- returnthis;
- }
- /**
- *注册服务实现
- *(1)主要用于后期服务调用
- *(2)如何根据id获取实现?非常简单,id是唯一的。
- *有就是有,没有就抛出异常,直接返回。
- *(3)如果根据{@linkcom.github.houbb.rpc.common.rpc.domain.RpcRequest}获取对应的方法。
- *
- *3.1根据serviceId获取唯一的实现
- *3.2根据{@linkClass#getMethod(String,Class[])}方法名称+参数类型唯一获取方法
- *3.3根据{@linkjava.lang.reflect.Method#invoke(Object,Object...)}执行方法
- *
- *@paramserviceId服务标识
- *@paramserviceImpl服务实现
- *@returnthis
- *@since0.0.6
- */
- @Override
- @SuppressWarnings("unchecked")
- publicsynchronizedDefaultServiceRegistryregister(finalStringserviceId,finalObjectserviceImpl){
- ArgUtil.notEmpty(serviceId,"serviceId");
- ArgUtil.notNull(serviceImpl,"serviceImpl");
- //构建对应的其他信息
- ServiceConfigserviceConfig=newDefaultServiceConfig();
- serviceConfig.id(serviceId).reference(serviceImpl);
- serviceConfigList.add(serviceConfig);
- returnthis;
- }
- @Override
- publicServiceRegistryexpose(){
- //注册所有服务信息
- DefaultServiceFactory.getInstance()
- .registerServices(serviceConfigList);
- //暴露nettyserver信息
- newRpcServer(rpcPort).start();
- returnthis;
- }
- }
ServiceConfig 是一些服务的配置信息,接口定义如下:
- packagecom.github.houbb.rpc.server.config.service;
- /**
- *单个服务配置类
- *
- *简化用户使用:
- *在用户使用的时候,这个类应该是不可见的。
- *直接提供对应的服务注册类即可。
- *
- *后续拓展
- *(1)版本信息
- *(2)服务端超时时间
- *
- *@authorbinbin.hou
- *@since0.0.6
-
*@param
实现类泛型 - */
-
publicinterfaceServiceConfig
{ - /**
- *获取唯一标识
- *@return获取唯一标识
- *@since0.0.6
- */
- Stringid();
- /**
- *设置唯一标识
- *@paramid标识信息
- *@returnthis
- *@since0.0.6
- */
-
ServiceConfig
id(Stringid); - /**
- *获取引用实体实现
- *@return实体实现
- *@since0.0.6
- */
- Treference();
- /**
- *设置引用实体实现
- *@paramreference引用实现
- *@returnthis
- *@since0.0.6
- */
-
ServiceConfig
reference(Treference); - }
测试
maven 引入
引入服务端的对应 maven 包:
-
-
com.github.houbb -
rpc-server -
0.0.6
服务端启动
- //启动服务
- DefaultServiceRegistry.getInstance()
- .register(ServiceIdConst.CALC,newCalculatorServiceImpl())
- .expose();
这里注册了一个计算服务,并且设置对应的实现。
和以前实现类似,此处不再赘述。
启动日志:
- [DEBUG][2021-10-0513:39:42.638][main][c.g.h.l.i.c.LogFactory.setImplementation]-Logginginitializedusing'classcom.github.houbb.log.integration.adaptors.stdout.StdOutExImpl'adapter.
- [INFO][2021-10-0513:39:42.645][Thread-0][c.g.h.r.s.c.RpcServer.run]-RPC服务开始启动服务端
- 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerchannelRegistered
- 信息:[id:0xec4dc74f]REGISTERED
- 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerbind
- 信息:[id:0xec4dc74f]BIND:0.0.0.0/0.0.0.0:9527
- 十月05,20211:39:43下午io.netty.handler.logging.LoggingHandlerchannelActive
- 信息:[id:0xec4dc74f,L:/0:0:0:0:0:0:0:0:9527]ACTIVE
- [INFO][2021-10-0513:39:43.893][Thread-0][c.g.h.r.s.c.RpcServer.run]-RPC服务端启动完成,监听【9527】端口
ps: 写到这里忽然发现忘记添加对应的 register 日志了,这里可以添加对应的 registerListener 拓展。
原文链接:https://www.toutiao.com/a7017765348539892256/