服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - 编程技术 - Nacos2.0配置灰度发布原理源码解析

Nacos2.0配置灰度发布原理源码解析

2021-08-28 01:11Kirito的技术分享 编程技术

今天分享的是我们组的一个实习生写的一篇源码解析文章,小伙子实习期间在社区Nacos2.0的基础上对灰度发布的能力进行了增强,并完成了MSE Nacos2.0上从管控到内核的灰度发布能力的研发。

Nacos2.0配置灰度发布原理源码解析

今天分享的是我们组的一个实习生写的一篇源码解析文章,小伙子实习期间在社区Nacos2.0的基础上对灰度发布的能力进行了增强,并完成了MSE Nacos2.0上从管控到内核的灰度发布能力的研发。以下是他对配置发布流程的代码解析,相信看完之后你会感叹:现在的实习生都有这个水平了吗?

说到灰度发布,就不得不提到阿里的安全生产三板斧:可监控、可灰度、可回滚。在阿里内部,对于安全生产是高度重视的,灰度可以说是发布之前的必备流程。因此,作为阿里的配置中心,Nacos同样支持了配置灰度的功能,可以通过控制台进行配置的灰度推送、回滚,从而实现安全的配置发布。一般来说,我们按照下图所示流程进行配置的安全修改。只有在小规模机器上验证配置按预期生效之后才会正式发布配置,否则就回滚灰度配置。

Nacos2.0配置灰度发布原理源码解析

发布流程

配置灰度发布流程

社区Nacos的灰度是基于IP的方式进行的,用户需要在控制台,选择需要灰度的配置,然后新建灰度配置,选择灰度机器的IP进行配置推送。整个交互流程如下图所示。

Nacos2.0配置灰度发布原理源码解析

IP灰度机制

具体的使用方法,如果使用的是自建的社区Nacos,可以访问http://ip:port/nacos进入控制台,在配置管理的编辑页面进行配置灰度发布,如下图。

Nacos2.0配置灰度发布原理源码解析

社区Nacos控制台

如果使用的是阿里云的MSE微服务引擎,可以查看MSE配置灰度发布帮助文档了解使用方法,目前在Nacos2.0专业版上已经支持灰度功能,在MSE控制台打开Beta按钮即可,如下图所示。

Nacos2.0配置灰度发布原理源码解析

MSE Beta发布

Nacos灰度原理

Nacos的灰度发布原理其实并不复杂,本质就如同下面这张流程图。

Nacos2.0配置灰度发布原理源码解析

灰度原理

乍一看,这个流程好复杂,实际上定睛一看,好像也没啥。整个过程就是Client、Server和Console之间的交互。Client端监听Server上的配置,建立长连接并上报自己的客户端信息,例如IP地址。Console负责进行配置灰度的调用,将用户所需要的灰度配置请求发送到Server端。然后Server端根据用户的灰度配置请求中的IP地址,过滤与客户端的长连接,然后将灰度配置定向推送到对应IP的客户端中即可。下面笔者从长连接的建立到配置灰度,进行详细的源码分析。

长连接建立

在Nacos2.0版本之前,Nacos主要采用长轮询的方式在客户端拉取服务端的配置信息。而在Nacos2.0版本中,引入了基于gRPC的长连接模型来提升配置监听的性能,客户端和服务端会建立长连接来监听配置的变更,一旦服务端有配置变更,就会将配置信息推送到客户端中。在Nacos源码中,这一过程主要涉及到两个组件之间的交互,即com.alibaba.nacos.common.remote.client.grpc包下的GrpcSdkClient类和com.alibaba.nacos.core.remote.grpc包下的GrpcBiStreamRequestAcceptor类。然而,GrpcSdkClient中没有定义具体的连接逻辑,其主要逻辑在其父类GrpcClient中。下面这段代码就是客户端连接服务端的核心代码,位于GrpcClient的connectToServer方法。

  1. @Override 
  2.   public Connection connectToServer(ServerInfo serverInfo) { 
  3.       try { 
  4.           // ...... 
  5.           int port = serverInfo.getServerPort() + rpcPortOffset(); 
  6.  
  7.           // 创建一个Grpc的Stub 
  8.           RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port); 
  9.  
  10.           if (newChannelStubTemp != null) { 
  11.  
  12.               // 检查服务端是否可用 
  13.               Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp); 
  14.               if (response == null || !(response instanceof ServerCheckResponse)) { 
  15.                   shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel()); 
  16.                   return null
  17.               } 
  18.  
  19.               // 创建一个Grpc的Stream 
  20.               BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc 
  21.                   .newStub(newChannelStubTemp.getChannel()); 
  22.  
  23.               // 创建连接信息,保存Grpc的连接信息,也就是长连接的一个holder 
  24.               GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor); 
  25.               grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId()); 
  26.  
  27.               // 创建stream请求同时绑定到当前连接中 
  28.               StreamObserver<Payload> payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn); 
  29.  
  30.               // 绑定Grpc相关连接信息 
  31.               grpcConn.setPayloadStreamObserver(payloadStreamObserver); 
  32.               grpcConn.setGrpcFutureServiceStub(newChannelStubTemp); 
  33.               grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel()); 
  34.  
  35.               // 发送一个初始化连接请求,用于上报客户端的一些信息,例如标签、客户端版本等 
  36.               ConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest(); 
  37.               conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion()); 
  38.               conSetupRequest.setLabels(super.getLabels()); 
  39.               conSetupRequest.setAbilities(super.clientAbilities); 
  40.               conSetupRequest.setTenant(super.getTenant()); 
  41.               grpcConn.sendRequest(conSetupRequest); 
  42.  
  43.               // 等待连接建立成功 
  44.               Thread.sleep(100L); 
  45.               return grpcConn; 
  46.           } 
  47.           return null
  48.       } catch (Exception e) { 
  49.           LOGGER.error("[{}]Fail to connect to server!,error={}", GrpcClient.this.getName(), e); 
  50.       } 
  51.       return null
  52.   } 

上面这段代码主要功能有两个,一个是与服务端建立gRPC的长连接,另一个功能主要是初始化连接,后者是实现配置灰度发布的前提。在上文中有提到,配置灰度发布的过程中,需要根据控制台的灰度配置请求中的IP信息过滤长连接,在服务端就是根据连接建立初始化时上报的信息实现的过滤。从上面的代码中可以看到,ConnectionSetupRequest作为一个初始化请求,携带着客户端版本、标签等信息,但是好像并没有携带IP地址的信息。实际上,ConnectionSetupRequest也确实没有携带IP地址信息。因为在Nacos设计中,采用Request来表明客户端的请求信息,而IP地址更像是属于连接层的信息,应该属于连接的元信息,因此并没有放在Request中进行显式的设置,而是在发送请求时自动的作为Metadata信息发送到服务端中。可以看一下com.alibaba.nacos.common.remote.client.grpc包下的GrpcConnection的sendRequest方法,该方法接收一个Request请求作为参数,将请求发送给服务端。

  1. public void sendRequest(Request request) { 
  2.       // 将request转换为Grpc的Payload 
  3.       Payload convert = GrpcUtils.convert(request); 
  4.       // 通过Grpc的流发送请求 
  5.       payloadStreamObserver.onNext(convert); 
  6.   } 

IP地址的设置,就在com.alibaba.nacos.common.remote.client.grpc包下的GrpcUtils的convert方法中,该方法主要将一个Request转换为gRPC的Payload。

  1. /** 
  2.   * convert request to payload. 
  3.   * 
  4.   * @param request request. 
  5.   * @return payload. 
  6.   */ 
  7.  public static Payload convert(Request request) { 
  8.      // 设置元信息 
  9.      Metadata newMeta = Metadata.newBuilder().setType(request.getClass().getSimpleName()) 
  10.              .setClientIp(NetUtils.localIP()).putAllHeaders(request.getHeaders()).build(); 
  11.      request.clearHeaders(); 
  12.       
  13.      // 转换为json 
  14.      String jsonString = toJson(request); 
  15.       
  16.      Payload.Builder builder = Payload.newBuilder(); 
  17.   // 创建Payload 
  18.      return builder 
  19.              .setBody(Any.newBuilder().setValue(ByteString.copyFrom(jsonString, Charset.forName(Constants.ENCODE)))) 
  20.              .setMetadata(newMeta).build(); 
  21.       
  22.  } 

可以看到,这里通过NetUtils.localIP()方法获取客户端的IP信息,并存入到Metadata中,跟随Payload一起上报给服务端。到这里,客户端这里的连接过程就暂时完成了,下面介绍一下服务端接收到连接请求的响应过程。

在服务端,主要通过GrpcBiStreamRequestAcceptor的requestBiStream方法接收客户端请求,如下所示。

  1. @Override 
  2.   public StreamObserver<Payload> requestBiStream(StreamObserver<Payload> responseObserver) { 
  3.        
  4.       StreamObserver<Payload> streamObserver = new StreamObserver<Payload>() { 
  5.            
  6.           final String connectionId = CONTEXT_KEY_CONN_ID.get(); 
  7.            
  8.           final Integer localPort = CONTEXT_KEY_CONN_LOCAL_PORT.get(); 
  9.            
  10.           final int remotePort = CONTEXT_KEY_CONN_REMOTE_PORT.get(); 
  11.            
  12.           String remoteIp = CONTEXT_KEY_CONN_REMOTE_IP.get(); 
  13.            
  14.           String clientIp = ""
  15.            
  16.           @Override 
  17.           public void onNext(Payload payload) { 
  18.               // 获取客户端IP 
  19.               clientIp = payload.getMetadata().getClientIp(); 
  20.               traceDetailIfNecessary(payload); 
  21.                
  22.               Object parseObj; 
  23.               try { 
  24.                   parseObj = GrpcUtils.parse(payload); 
  25.               } catch (Throwable throwable) { 
  26.                   Loggers.REMOTE_DIGEST 
  27.                           .warn("[{}]Grpc request bi stream,payload parse error={}", connectionId, throwable); 
  28.                   return
  29.               } 
  30.                
  31.               if (parseObj == null) { 
  32.                   Loggers.REMOTE_DIGEST 
  33.                           .warn("[{}]Grpc request bi stream,payload parse null ,body={},meta={}", connectionId, 
  34.                                   payload.getBody().getValue().toStringUtf8(), payload.getMetadata()); 
  35.                   return
  36.               } 
  37.                
  38.               // 处理初始化请求 
  39.               if (parseObj instanceof ConnectionSetupRequest) { 
  40.                   ConnectionSetupRequest setUpRequest = (ConnectionSetupRequest) parseObj; 
  41.                   Map<String, String> labels = setUpRequest.getLabels(); 
  42.                   String appName = "-"
  43.                   if (labels != null && labels.containsKey(Constants.APPNAME)) { 
  44.                       appName = labels.get(Constants.APPNAME); 
  45.                   } 
  46.                    
  47.                   ConnectionMeta metaInfo = new ConnectionMeta(connectionId, payload.getMetadata().getClientIp(), 
  48.                           remoteIp, remotePort, localPort, ConnectionType.GRPC.getType(), 
  49.                           setUpRequest.getClientVersion(), appName, setUpRequest.getLabels()); 
  50.                   metaInfo.setTenant(setUpRequest.getTenant()); 
  51.                    
  52.                   // 服务端的长连接信息holder 
  53.                   Connection connection = new GrpcConnection(metaInfo, responseObserver, CONTEXT_KEY_CHANNEL.get()); 
  54.                   connection.setAbilities(setUpRequest.getAbilities()); 
  55.                   boolean rejectSdkOnStarting = metaInfo.isSdkSource() && !ApplicationUtils.isStarted(); 
  56.                    
  57.                   // 注册connection到connectionManager中 
  58.                   if (rejectSdkOnStarting || !connectionManager.register(connectionId, connection)) { 
  59.                       //Not register to the connection manager if current server is over limit or server is starting. 
  60.                       try { 
  61.                           Loggers.REMOTE_DIGEST.warn("[{}]Connection register fail,reason:{}", connectionId, 
  62.                                   rejectSdkOnStarting ? " server is not started" : " server is over limited."); 
  63.                           connection.request(new ConnectResetRequest(), 3000L); 
  64.                           connection.close(); 
  65.                       } catch (Exception e) { 
  66.                           //Do nothing. 
  67.                           if (connectionManager.traced(clientIp)) { 
  68.                               Loggers.REMOTE_DIGEST 
  69.                                       .warn("[{}]Send connect reset request error,error={}", connectionId, e); 
  70.                           } 
  71.                       } 
  72.                   } 
  73.                    
  74.               } else if (parseObj instanceof Response) { 
  75.                   Response response = (Response) parseObj; 
  76.                   if (connectionManager.traced(clientIp)) { 
  77.                       Loggers.REMOTE_DIGEST 
  78.                               .warn("[{}]Receive response of server request  ,response={}", connectionId, response); 
  79.                   } 
  80.                   RpcAckCallbackSynchronizer.ackNotify(connectionId, response); 
  81.                   connectionManager.refreshActiveTime(connectionId); 
  82.               } else { 
  83.                   Loggers.REMOTE_DIGEST 
  84.                           .warn("[{}]Grpc request bi stream,unknown payload receive ,parseObj={}", connectionId, 
  85.                                   parseObj); 
  86.               } 
  87.                
  88.           } 
  89.            
  90.           // ...... 
  91.       }; 
  92.        
  93.       return streamObserver; 
  94.   } 

这里我们主要看onNext方法,其负责处理客户端的请求信息,即Payload信息。如果是初始化连接的请求ConnectionSetupRequest,就会记录与客户端之间的长连接信息,并注册到ConnectionManager中。ConnectionManager是服务端维护所有客户端连接信息的类,持有所有的长连接信息,后续的配置推送等都需要通过ConnectionManager获取长连接信息。可以简单看一下ConnectionManager的源码,在com.alibaba.nacos.core.remote包下,如下所示。

  1. /** 
  2.  * connect manager. 
  3.  * 
  4.  * @author liuzunfei 
  5.  * @version $Id: ConnectionManager.java, v 0.1 2020年07月13日 7:07 PM liuzunfei Exp $ 
  6.  */ 
  7. @Service 
  8. public class ConnectionManager extends Subscriber<ConnectionLimitRuleChangeEvent> { 
  9.      
  10.     // ...... 
  11.      
  12.     Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>(); 
  13.      
  14.     // ...... 
  15.      
  16.     /** 
  17.      * register a new connect
  18.      * 
  19.      * @param connectionId connectionId 
  20.      * @param connection   connection 
  21.      */ 
  22.     public synchronized boolean register(String connectionId, Connection connection) { 
  23.          
  24.         if (connection.isConnected()) { 
  25.             if (connections.containsKey(connectionId)) { 
  26.                 return true
  27.             } 
  28.             if (!checkLimit(connection)) { 
  29.                 return false
  30.             } 
  31.             if (traced(connection.getMetaInfo().clientIp)) { 
  32.                 connection.setTraced(true); 
  33.             } 
  34.             // 注册connection 
  35.             connections.put(connectionId, connection); 
  36.             connectionForClientIp.get(connection.getMetaInfo().clientIp).getAndIncrement(); 
  37.              
  38.             clientConnectionEventListenerRegistry.notifyClientConnected(connection); 
  39.             Loggers.REMOTE_DIGEST 
  40.                     .info("new connection registered successfully, connectionId = {},connection={} ", connectionId, 
  41.                             connection); 
  42.             return true
  43.              
  44.         } 
  45.         return false
  46.          
  47.     } 
  48.      
  49.     // ...... 
  50.      

可以看到,在ConnectionManager中,维护了一个Map。在调用register方法时,将Connection注册到Map中,以供后续的逻辑使用。这里有一个细节,注册到ConnectionManager中的GrpcConnection与客户端持有的GrpcConnection不是一个类。这里的GrpcConnection位于com.alibaba.nacos.core.remote.grpc包,而客户端的GrpcConnection位于com.alibaba.nacos.common.remote.client.grpc包。事实上与客户端有关的gRPC相关的类都在com.alibaba.nacos.common.remote.client.grpc。com.alibaba.nacos.core.remote.grpc则是服务端的相关实现。

到这里,长连接建立的核心流程已经介绍完了,接下来笔者将详细介绍一下配置灰度的推送过程,由于Nacos在这里使用了发布订阅模式以及异步的方法调用,理解起来可能稍微要麻烦一点。

灰度推送

在Nacos中,提供了一组OpenAPI进行配置的管理,配置灰度发布也是其中一个功能,可以在com.alibaba.nacos.config.server.controller包下的ConfigController中查看,包括了BetaConfig的发布、停止和查询,接下来笔者将会一一介绍他们的原理。

创建BetaConfig

创建BetaConfig的API代码如下,一个简单的Web的API。

  1. /** 
  2.  * Adds or updates non-aggregated data. 
  3.  * 
  4.  * @throws NacosException NacosException. 
  5.  */ 
  6. @PostMapping 
  7. @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) 
  8. public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response, 
  9.         @RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group
  10.         @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant, 
  11.         @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag, 
  12.         @RequestParam(value = "appName", required = false) String appName, 
  13.         @RequestParam(value = "src_user", required = false) String srcUser, 
  14.         @RequestParam(value = "config_tags", required = false) String configTags, 
  15.         @RequestParam(value = "desc", required = false) String desc
  16.         @RequestParam(value = "use", required = false) String use, 
  17.         @RequestParam(value = "effect", required = false) String effect, 
  18.         @RequestParam(value = "type", required = false) String type, 
  19.         @RequestParam(value = "schema", required = false) String schema) throws NacosException { 
  20.      
  21.     final String srcIp = RequestUtil.getRemoteIp(request); 
  22.     final String requestIpApp = RequestUtil.getAppName(request); 
  23.     srcUser = RequestUtil.getSrcUserName(request); 
  24.     //check type 
  25.     if (!ConfigType.isValidType(type)) { 
  26.         type = ConfigType.getDefaultType().getType(); 
  27.     } 
  28.     // check tenant 
  29.     ParamUtils.checkTenant(tenant); 
  30.     ParamUtils.checkParam(dataId, group"datumId", content); 
  31.     ParamUtils.checkParam(tag); 
  32.     Map<String, Object> configAdvanceInfo = new HashMap<String, Object>(10); 
  33.     MapUtil.putIfValNoNull(configAdvanceInfo, "config_tags", configTags); 
  34.     MapUtil.putIfValNoNull(configAdvanceInfo, "desc"desc); 
  35.     MapUtil.putIfValNoNull(configAdvanceInfo, "use", use); 
  36.     MapUtil.putIfValNoNull(configAdvanceInfo, "effect", effect); 
  37.     MapUtil.putIfValNoNull(configAdvanceInfo, "type", type); 
  38.     MapUtil.putIfValNoNull(configAdvanceInfo, "schema"schema); 
  39.     ParamUtils.checkParam(configAdvanceInfo); 
  40.      
  41.     if (AggrWhitelist.isAggrDataId(dataId)) { 
  42.         LOGGER.warn("[aggr-conflict] {} attempt to publish single data, {}, {}", RequestUtil.getRemoteIp(request), 
  43.                 dataId, group); 
  44.         throw new NacosException(NacosException.NO_RIGHT, "dataId:" + dataId + " is aggr"); 
  45.     } 
  46.      
  47.     final Timestamp time = TimeUtils.getCurrentTime(); 
  48.      
  49.     // 目标灰度机器的IP地址。 
  50.     String betaIps = request.getHeader("betaIps"); 
  51.      
  52.     ConfigInfo configInfo = new ConfigInfo(dataId, group, tenant, appName, content); 
  53.     configInfo.setType(type); 
  54.     if (StringUtils.isBlank(betaIps)) { 
  55.         if (StringUtils.isBlank(tag)) { 
  56.             persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false); 
  57.             ConfigChangePublisher 
  58.                     .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime())); 
  59.         } else { 
  60.             persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, timefalse); 
  61.             ConfigChangePublisher.notifyConfigChange( 
  62.                     new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime())); 
  63.         } 
  64.     } else { 
  65.         // 发布Beta 配置 
  66.         persistService.insertOrUpdateBeta(configInfo, betaIps, srcIp, srcUser, timefalse); 
  67.          
  68.         // 通知配置变更 
  69.         ConfigChangePublisher 
  70.                 .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, time.getTime())); 
  71.     } 
  72.     ConfigTraceService 
  73.             .logPersistenceEvent(dataId, group, tenant, requestIpApp, time.getTime(), InetUtils.getSelfIP(), 
  74.                     ConfigTraceService.PERSISTENCE_EVENT_PUB, content); 
  75.     return true

该方法接收一个创建配置的请求,包括配置的data-id、content等信息。从代码中可以看出,该方法是通过判断请求的Header中有无betaIps的值来确定是发布正式配置还是Beta配置的。如果betaIps的值不为空,则表明待发布的配置是一个Beta配置。而配置发布的过程,实际上就是把配置插入或者更新到数据库中。在Nacos中,正式配置和灰度配置是分别存储在不同的表中的,一旦发布就会通过ConfigChangePublisher发布一个ConfigDataChangeEvent事件,然后由订阅了该事件的监听者推送配置信息到客户端。ConfigDataChangeEvent的监听者是AsyncNotifyService类,位于com.alibaba.nacos.config.server.service.notify包下,该类主要用作执行集群之间的数据Dump操作。该类在初始化的时候,会向事件中心NotifyCenter注册一个监听者,用以监听数据变更事件并异步执行数据的Dump操作,如下所示。

  1. /** 
  2.  * Async notify service. 
  3.  * 
  4.  * @author Nacos 
  5.  */ 
  6. @Service 
  7. public class AsyncNotifyService { 
  8.      
  9.     private static final Logger LOGGER = LoggerFactory.getLogger(AsyncNotifyService.class); 
  10.      
  11.     private final NacosAsyncRestTemplate nacosAsyncRestTemplate = HttpClientManager.getNacosAsyncRestTemplate(); 
  12.      
  13.     private static final int MIN_RETRY_INTERVAL = 500; 
  14.      
  15.     private static final int INCREASE_STEPS = 1000; 
  16.      
  17.     private static final int MAX_COUNT = 6; 
  18.      
  19.     @Autowired 
  20.     private DumpService dumpService; 
  21.      
  22.     @Autowired 
  23.     private ConfigClusterRpcClientProxy configClusterRpcClientProxy; 
  24.      
  25.     private ServerMemberManager memberManager; 
  26.      
  27.     @Autowired 
  28.     public AsyncNotifyService(ServerMemberManager memberManager) { 
  29.         this.memberManager = memberManager; 
  30.          
  31.         // Register ConfigDataChangeEvent to NotifyCenter. 
  32.         NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize); 
  33.          
  34.         // Register A Subscriber to subscribe ConfigDataChangeEvent. 
  35.         NotifyCenter.registerSubscriber(new Subscriber() { 
  36.              
  37.             @Override 
  38.             public void onEvent(Event event) { 
  39.                 // Generate ConfigDataChangeEvent concurrently 
  40.                 if (event instanceof ConfigDataChangeEvent) { 
  41.                     ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; 
  42.                     long dumpTs = evt.lastModifiedTs; 
  43.                     String dataId = evt.dataId; 
  44.                     String group = evt.group
  45.                     String tenant = evt.tenant; 
  46.                     String tag = evt.tag; 
  47.                     Collection<Member> ipList = memberManager.allMembers(); 
  48.                      
  49.                     // In fact, any type of queue here can be 
  50.                     Queue<NotifySingleTask> httpQueue = new LinkedList<NotifySingleTask>(); 
  51.                     Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<NotifySingleRpcTask>(); 
  52.                      
  53.                     for (Member member : ipList) { 
  54.                         // 判断是否是长轮询 
  55.                         if (!MemberUtil.isSupportedLongCon(member)) { 
  56.                             // 添加一个长轮询的异步dump任务 
  57.                             httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), 
  58.                                     evt.isBeta)); 
  59.                         } else { 
  60.                             // 添加一个长连接的异步dump任务 
  61.                             rpcQueue.add
  62.                                     new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); 
  63.                         } 
  64.                     } 
  65.                     // 判断并执行长轮询的异步dump任务 
  66.                     if (!httpQueue.isEmpty()) { 
  67.                         ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); 
  68.                     } 
  69.                     // 判断并执行长连接的异步dump任务 
  70.                     if (!rpcQueue.isEmpty()) { 
  71.                         ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); 
  72.                     } 
  73.                      
  74.                 } 
  75.             } 
  76.              
  77.             @Override 
  78.             public Class<? extends Event> subscribeType() { 
  79.                 return ConfigDataChangeEvent.class; 
  80.             } 
  81.         }); 
  82.     } 

在接收到ConfigDataChangeEvent之后,如果Nacos2.0以上的版本,会创建一个RpcTask用以执行配置变更的通知,由内部类AsyncRpcTask执行,AsyncRpcTask具体逻辑如下所示。

  1. class AsyncRpcTask implements Runnable { 
  2.          
  3.         private Queue<NotifySingleRpcTask> queue; 
  4.          
  5.         public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) { 
  6.             this.queue = queue; 
  7.         } 
  8.          
  9.         @Override 
  10.         public void run() { 
  11.             while (!queue.isEmpty()) { 
  12.                 NotifySingleRpcTask task = queue.poll(); 
  13.                 // 创建配置变更请求 
  14.                 ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest(); 
  15.                 syncRequest.setDataId(task.getDataId()); 
  16.                 syncRequest.setGroup(task.getGroup()); 
  17.                 syncRequest.setBeta(task.isBeta); 
  18.                 syncRequest.setLastModified(task.getLastModified()); 
  19.                 syncRequest.setTag(task.tag); 
  20.                 syncRequest.setTenant(task.getTenant()); 
  21.                  
  22.                 Member member = task.member; 
  23.                 // 如果是自身的数据变更,直接执行dump操作 
  24.                 if (memberManager.getSelf().equals(member)) { 
  25.                     if (syncRequest.isBeta()) { 
  26.                         // 同步Beta配置 
  27.                         dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), 
  28.                                 syncRequest.getLastModified(), NetUtils.localIP(), true); 
  29.                     } else { 
  30.                         // 同步正式配置 
  31.                         dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), 
  32.                                 syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP()); 
  33.                     } 
  34.                     continue
  35.                 } 
  36.                  
  37.                 // 通知其他服务端进行dump 
  38.                 if (memberManager.hasMember(member.getAddress())) { 
  39.                     // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify 
  40.                     boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress()); 
  41.                     if (unHealthNeedDelay) { 
  42.                         // target ip is unhealthy, then put it in the notification list 
  43.                         ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null
  44.                                 task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 
  45.                                 0, member.getAddress()); 
  46.                         // get delay time and set fail count to the task 
  47.                         asyncTaskExecute(task); 
  48.                     } else { 
  49.      
  50.                         if (!MemberUtil.isSupportedLongCon(member)) { 
  51.                             asyncTaskExecute( 
  52.                                     new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag, 
  53.                                             task.getLastModified(), member.getAddress(), task.isBeta)); 
  54.                         } else { 
  55.                             try { 
  56.                                 configClusterRpcClientProxy 
  57.                                         .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task)); 
  58.                             } catch (Exception e) { 
  59.                                 MetricsMonitor.getConfigNotifyException().increment(); 
  60.                                 asyncTaskExecute(task); 
  61.                             } 
  62.                         } 
  63.                        
  64.                     } 
  65.                 } else { 
  66.                     //No nothig if  member has offline. 
  67.                 } 
  68.                  
  69.             } 
  70.         } 
  71.     } 

这里首先创建了一个ConfigChangeClusterSyncRequest,并将配置信息写入。然后获取集群信息,通知相应的Server处理的数据同步请求。同步配置变更信息的核心逻辑由DumpService来执行。我们主要查看同步Beta配置的操作,DumpService的dump方法如下所示。

  1. /** 
  2.  * Add DumpTask to TaskManager, it will execute asynchronously. 
  3.  */ 
  4. public void dump(String dataId, String group, String tenant, long lastModified, String handleIp, boolean isBeta) { 
  5.     String groupKey = GroupKey2.getKey(dataId, group, tenant); 
  6.     String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta)); 
  7.     dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, lastModified, handleIp, isBeta)); 
  8.     DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey); 

在该方法中,这里会根据配置变更信息,提交一个异步的DumpTask任务,后续会由DumpProcessor类的process方法进行处理,该方法如下所示。

  1. /** 
  2.  * dump processor. 
  3.  * 
  4.  * @author Nacos 
  5.  * @date 2020/7/5 12:19 PM 
  6.  */ 
  7. public class DumpProcessor implements NacosTaskProcessor { 
  8.      
  9.     final DumpService dumpService; 
  10.      
  11.     public DumpProcessor(DumpService dumpService) { 
  12.         this.dumpService = dumpService; 
  13.     } 
  14.      
  15.     @Override 
  16.     public boolean process(NacosTask task) { 
  17.         final PersistService persistService = dumpService.getPersistService(); 
  18.         DumpTask dumpTask = (DumpTask) task; 
  19.         String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey()); 
  20.         String dataId = pair[0]; 
  21.         String group = pair[1]; 
  22.         String tenant = pair[2]; 
  23.         long lastModified = dumpTask.getLastModified(); 
  24.         String handleIp = dumpTask.getHandleIp(); 
  25.         boolean isBeta = dumpTask.isBeta(); 
  26.         String tag = dumpTask.getTag(); 
  27.          
  28.         ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId) 
  29.                 .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp); 
  30.          
  31.         if (isBeta) { 
  32.             // 更新Beta配置的缓存 
  33.             ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant); 
  34.              
  35.             build.remove(Objects.isNull(cf)); 
  36.             build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps()); 
  37.             build.content(Objects.isNull(cf) ? null : cf.getContent()); 
  38.              
  39.             return DumpConfigHandler.configDump(build.build()); 
  40.         } 
  41.         if (StringUtils.isBlank(tag)) { 
  42.             ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant); 
  43.  
  44.             build.remove(Objects.isNull(cf)); 
  45.             build.content(Objects.isNull(cf) ? null : cf.getContent()); 
  46.             build.type(Objects.isNull(cf) ? null : cf.getType()); 
  47.         } else { 
  48.             ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag); 
  49.  
  50.             build.remove(Objects.isNull(cf)); 
  51.             build.content(Objects.isNull(cf) ? null : cf.getContent()); 
  52.  
  53.         } 
  54.         return DumpConfigHandler.configDump(build.build()); 
  55.     } 

可以看到,如果是Beta配置,则获取最新的Beta配置信息,然后触发DumpConfigHandler的configDump方法。进入configDump可以看到,该方法主要用来更新缓存的配置信息,调用ConfigCacheService的相关操作进行配置的更新。

  1. /** 
  2.  * Dump config subscriber. 
  3.  * 
  4.  * @author <a href="mailto:liaochuntao@live.com">liaochuntao</a> 
  5.  */ 
  6. public class DumpConfigHandler extends Subscriber<ConfigDumpEvent> { 
  7.      
  8.     /** 
  9.      * trigger config dump event. 
  10.      * 
  11.      * @param event {@link ConfigDumpEvent} 
  12.      * @return {@code true} if the config dump task success , else {@code false
  13.      */ 
  14.     public static boolean configDump(ConfigDumpEvent event) { 
  15.         final String dataId = event.getDataId(); 
  16.         final String group = event.getGroup(); 
  17.         final String namespaceId = event.getNamespaceId(); 
  18.         final String content = event.getContent(); 
  19.         final String type = event.getType(); 
  20.         final long lastModified = event.getLastModifiedTs(); 
  21.         if (event.isBeta()) { 
  22.             boolean result = false
  23.             // 删除操作 
  24.             if (event.isRemove()) { 
  25.                 result = ConfigCacheService.removeBeta(dataId, group, namespaceId); 
  26.                 if (result) { 
  27.                     ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), 
  28.                             ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); 
  29.                 } 
  30.                 return result; 
  31.             } else { 
  32.                 // 更新或者发布 
  33.                 result = ConfigCacheService 
  34.                         .dumpBeta(dataId, group, namespaceId, content, lastModified, event.getBetaIps()); 
  35.                 if (result) { 
  36.                     ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), 
  37.                             ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, 
  38.                             content.length()); 
  39.                 } 
  40.             } 
  41.              
  42.             return result; 
  43.         } 
  44.          
  45.         // ...... 
  46.          
  47.     } 
  48.      
  49.     @Override 
  50.     public void onEvent(ConfigDumpEvent event) { 
  51.         configDump(event); 
  52.     } 
  53.      
  54.     @Override 
  55.     public Class<? extends Event> subscribeType() { 
  56.         return ConfigDumpEvent.class; 
  57.     } 

在ConfigCacheService中,会对比配置信息,如果配置有变化,则发布事件LocalDataChangeEvent,触发RpcConfigChangeNotifier的configDataChanged方法来推送配置,configDataChanged方法代码如下。

  1. /** 
  2.  * ConfigChangeNotifier. 
  3.  * 
  4.  * @author liuzunfei 
  5.  * @version $Id: ConfigChangeNotifier.java, v 0.1 2020年07月20日 3:00 PM liuzunfei Exp $ 
  6.  */ 
  7. @Component(value = "rpcConfigChangeNotifier"
  8. public class RpcConfigChangeNotifier extends Subscriber<LocalDataChangeEvent> { 
  9.      
  10.     // ...... 
  11.      
  12.     @Autowired 
  13.     ConfigChangeListenContext configChangeListenContext; 
  14.      
  15.     @Autowired 
  16.     private RpcPushService rpcPushService; 
  17.      
  18.     @Autowired 
  19.     private ConnectionManager connectionManager; 
  20.      
  21.     /** 
  22.      * adaptor to config module ,when server side config change ,invoke this method. 
  23.      * 
  24.      * @param groupKey groupKey 
  25.      */ 
  26.     public void configDataChanged(String groupKey, String dataId, String group, String tenant, boolean isBeta, 
  27.             List<String> betaIps, String tag) { 
  28.          
  29.         // 获取配置的所有监听者 
  30.         Set<String> listeners = configChangeListenContext.getListeners(groupKey); 
  31.         if (CollectionUtils.isEmpty(listeners)) { 
  32.             return
  33.         } 
  34.         int notifyClientCount = 0; 
  35.         // 遍历所有监听者 
  36.         for (final String client : listeners) { 
  37.             // 获取长连接信息 
  38.             Connection connection = connectionManager.getConnection(client); 
  39.             if (connection == null) { 
  40.                 continue
  41.             } 
  42.  
  43.             String clientIp = connection.getMetaInfo().getClientIp(); 
  44.             String clientTag = connection.getMetaInfo().getTag(); 
  45.              
  46.             // 判断是否是Beta的Ip 
  47.             if (isBeta && betaIps != null && !betaIps.contains(clientIp)) { 
  48.                 continue
  49.             } 
  50.             // tag check 
  51.             if (StringUtils.isNotBlank(tag) && !tag.equals(clientTag)) { 
  52.                 continue
  53.             } 
  54.     
  55.             // 配置变更推送请求 
  56.             ConfigChangeNotifyRequest notifyRequest = ConfigChangeNotifyRequest.build(dataId, group, tenant); 
  57.     
  58.             // 执行推送任务 
  59.             RpcPushTask rpcPushRetryTask = new RpcPushTask(notifyRequest, 50, client, clientIp, 
  60.                     connection.getMetaInfo().getAppName()); 
  61.             push(rpcPushRetryTask); 
  62.             notifyClientCount++; 
  63.         } 
  64.         Loggers.REMOTE_PUSH.info("push [{}] clients ,groupKey=[{}]", notifyClientCount, groupKey); 
  65.     } 
  66.      
  67.     @Override 
  68.     public void onEvent(LocalDataChangeEvent event) { 
  69.         String groupKey = event.groupKey; 
  70.         boolean isBeta = event.isBeta; 
  71.         List<String> betaIps = event.betaIps; 
  72.         String[] strings = GroupKey.parseKey(groupKey); 
  73.         String dataId = strings[0]; 
  74.         String group = strings[1]; 
  75.         String tenant = strings.length > 2 ? strings[2] : ""
  76.         String tag = event.tag; 
  77.          
  78.         configDataChanged(groupKey, dataId, group, tenant, isBeta, betaIps, tag); 
  79.          
  80.     } 
  81.      
  82.     // ...... 

到这里,基本上就是配置变更推送的最后一个步骤了,如代码中注释所示,通过调用ConnectionManager的getConnection方法,遍历所有监听者的连接,根据其中的Meta信息判断是否是Beta推送的目标,然后执行推送任务,也就是执行push方法,如下所示。

  1. private void push(RpcPushTask retryTask) { 
  2.        ConfigChangeNotifyRequest notifyRequest = retryTask.notifyRequest; 
  3.        // 判断是否重试次数达到限制 
  4.        if (retryTask.isOverTimes()) { 
  5.            Loggers.REMOTE_PUSH 
  6.                    .warn("push callback retry fail over times .dataId={},group={},tenant={},clientId={},will unregister client."
  7.                            notifyRequest.getDataId(), notifyRequest.getGroup(), notifyRequest.getTenant(), 
  8.                            retryTask.connectionId); 
  9.            // 主动注销连接 
  10.            connectionManager.unregister(retryTask.connectionId); 
  11.        } else if (connectionManager.getConnection(retryTask.connectionId) != null) { 
  12.            // first time :delay 0s; sencond time:delay 2s  ;third time :delay 4s 
  13.            // 尝试执行配置推送 
  14.            ConfigExecutor.getClientConfigNotifierServiceExecutor() 
  15.                    .schedule(retryTask, retryTask.tryTimes * 2, TimeUnit.SECONDS); 
  16.        } else { 
  17.            // client is already offline,ingnore task. 
  18.        } 
  19.         
  20.    } 

这里实际上也是一个异步执行的过程,推送任务RpcPushTask会被提交到ClientConfigNotifierServiceExecutor来计划执行,第一次会立即推送配置,即调用RpcPushTask的run方法,如果失败则延迟重试次数x2的秒数再次执行,直到超过重试次数,主动注销当前连接。其中,RpcPushTask的定义如下。

  1. class RpcPushTask implements Runnable { 
  2.       
  3.      ConfigChangeNotifyRequest notifyRequest; 
  4.       
  5.      int maxRetryTimes = -1; 
  6.       
  7.      int tryTimes = 0; 
  8.       
  9.      String connectionId; 
  10.       
  11.      String clientIp; 
  12.       
  13.      String appName; 
  14.       
  15.      public RpcPushTask(ConfigChangeNotifyRequest notifyRequest, int maxRetryTimes, String connectionId, 
  16.              String clientIp, String appName) { 
  17.          this.notifyRequest = notifyRequest; 
  18.          this.maxRetryTimes = maxRetryTimes; 
  19.          this.connectionId = connectionId; 
  20.          this.clientIp = clientIp; 
  21.          this.appName = appName; 
  22.      } 
  23.       
  24.      public boolean isOverTimes() { 
  25.          return maxRetryTimes > 0 && this.tryTimes >= maxRetryTimes; 
  26.      } 
  27.       
  28.      @Override 
  29.      public void run() { 
  30.          tryTimes++; 
  31.          if (!tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH, connectionId, clientIp)) { 
  32.              push(this); 
  33.          } else { 
  34.              // 推送配置 
  35.              rpcPushService.pushWithCallback(connectionId, notifyRequest, new AbstractPushCallBack(3000L) { 
  36.                  @Override 
  37.                  public void onSuccess() { 
  38.                      tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_SUCCESS, connectionId, clientIp); 
  39.                  } 
  40.                   
  41.                  @Override 
  42.                  public void onFail(Throwable e) { 
  43.                      tpsMonitorManager.applyTpsForClientIp(POINT_CONFIG_PUSH_FAIL, connectionId, clientIp); 
  44.                      Loggers.REMOTE_PUSH.warn("Push fail", e); 
  45.                      push(RpcPushTask.this); 
  46.                  } 
  47.                   
  48.              }, ConfigExecutor.getClientConfigNotifierServiceExecutor()); 
  49.               
  50.          } 
  51.           
  52.      } 
  53.  } 

可以看到,在RpcPushTask的run方法中,调用了RpcPushService的pushWithCallback方法,如下所示。

  1. /** 
  2.  * push response  to clients. 
  3.  * 
  4.  * @author liuzunfei 
  5.  * @version $Id: PushService.java, v 0.1 2020年07月20日 1:12 PM liuzunfei Exp $ 
  6.  */ 
  7. @Service 
  8. public class RpcPushService { 
  9.      
  10.     @Autowired 
  11.     private ConnectionManager connectionManager; 
  12.      
  13.     /** 
  14.      * push response with no ack. 
  15.      * 
  16.      * @param connectionId    connectionId. 
  17.      * @param request         request. 
  18.      * @param requestCallBack requestCallBack. 
  19.      */ 
  20.     public void pushWithCallback(String connectionId, ServerRequest request, PushCallBack requestCallBack, 
  21.             Executor executor) { 
  22.         Connection connection = connectionManager.getConnection(connectionId); 
  23.         if (connection != null) { 
  24.             try { 
  25.                 // 执行配置推送 
  26.                 connection.asyncRequest(request, new AbstractRequestCallBack(requestCallBack.getTimeout()) { 
  27.                      
  28.                     @Override 
  29.                     public Executor getExecutor() { 
  30.                         return executor; 
  31.                     } 
  32.                      
  33.                     @Override 
  34.                     public void onResponse(Response response) { 
  35.                         if (response.isSuccess()) { 
  36.                             requestCallBack.onSuccess(); 
  37.                         } else { 
  38.                             requestCallBack.onFail(new NacosException(response.getErrorCode(), response.getMessage())); 
  39.                         } 
  40.                     } 
  41.                      
  42.                     @Override 
  43.                     public void onException(Throwable e) { 
  44.                         requestCallBack.onFail(e); 
  45.                     } 
  46.                 }); 
  47.             } catch (ConnectionAlreadyClosedException e) { 
  48.                 connectionManager.unregister(connectionId); 
  49.                 requestCallBack.onSuccess(); 
  50.             } catch (Exception e) { 
  51.                 Loggers.REMOTE_DIGEST 
  52.                         .error("error to send push response to connectionId ={},push response={}", connectionId, 
  53.                                 request, e); 
  54.                 requestCallBack.onFail(e); 
  55.             } 
  56.         } else { 
  57.             requestCallBack.onSuccess(); 
  58.         } 
  59.     } 
  60.      

其持有ConnectionManager对象,当需要推送配置到客户端时,会获取相应的Connection,然后执行asyncRequest将配置推送到客户端中。如果连接已经关闭,则注销连接。在asyncRequest底层即是调用Grpc建立的Stream的onNext方法,将配置推送给客户端,如下。

  1. /** 
  2.  * grpc connection
  3.  * 
  4.  * @author liuzunfei 
  5.  * @version $Id: GrpcConnection.java, v 0.1 2020年07月13日 7:26 PM liuzunfei Exp $ 
  6.  */ 
  7. public class GrpcConnection extends Connection { 
  8.      
  9.     private StreamObserver streamObserver; 
  10.      
  11.     private Channel channel; 
  12.      
  13.     public GrpcConnection(ConnectionMeta metaInfo, StreamObserver streamObserver, Channel channel) { 
  14.         super(metaInfo); 
  15.         this.streamObserver = streamObserver; 
  16.         this.channel = channel; 
  17.     } 
  18.      
  19.     @Override 
  20.     public void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException { 
  21.         sendRequestInner(request, requestCallBack); 
  22.     } 
  23.      
  24.     private DefaultRequestFuture sendRequestInner(Request request, RequestCallBack callBack) throws NacosException { 
  25.         final String requestId = String.valueOf(PushAckIdGenerator.getNextId()); 
  26.         request.setRequestId(requestId); 
  27.          
  28.         DefaultRequestFuture defaultPushFuture = new DefaultRequestFuture(getMetaInfo().getConnectionId(), requestId, 
  29.                 callBack, () -> RpcAckCallbackSynchronizer.clearFuture(getMetaInfo().getConnectionId(), requestId)); 
  30.          
  31.         RpcAckCallbackSynchronizer.syncCallback(getMetaInfo().getConnectionId(), requestId, defaultPushFuture); 
  32.         sendRequestNoAck(request); 
  33.         return defaultPushFuture; 
  34.     } 
  35.      
  36.     private void sendRequestNoAck(Request request) throws NacosException { 
  37.         try { 
  38.             //StreamObserver#onNext() is not thread-safe,synchronized is required to avoid direct memory leak. 
  39.             synchronized (streamObserver) { 
  40.                  
  41.                 Payload payload = GrpcUtils.convert(request); 
  42.                 traceIfNecessary(payload); 
  43.                 streamObserver.onNext(payload); 
  44.             } 
  45.         } catch (Exception e) { 
  46.             if (e instanceof StatusRuntimeException) { 
  47.                 throw new ConnectionAlreadyClosedException(e); 
  48.             } 
  49.             throw e; 
  50.         } 
  51.     } 
  52.      

主要推送逻辑的代码如上所示,调用asyncRequest之后,会将请求交给sendRequestInner处理,sendRequestInner又会调用sendRequestNoAck将推送请求推入gRPC的流中,客户端收到配置更新的请求,就会更新客户端的配置了。至此,一个灰度配置就发布成功了。

删除/查询BetaConfig

删除和查询BetaConfig的方法都很简单,都是简单的操作数据库即可。如果是删除配置,则会触发ConfigDataChangeEvent来告知客户端更新配置,这里笔者就不多加赘述了。

  1. /** 
  2.  * Execute to remove beta operation. 
  3.  * 
  4.  * @param dataId dataId string value. 
  5.  * @param group  group string value. 
  6.  * @param tenant tenant string value. 
  7.  * @return Execute to operate result. 
  8.  */ 
  9. @DeleteMapping(params = "beta=true"
  10. @Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class) 
  11. public RestResult<Boolean> stopBeta(@RequestParam(value = "dataId") String dataId, 
  12.         @RequestParam(value = "group") String group
  13.         @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant) { 
  14.     try { 
  15.         persistService.removeConfigInfo4Beta(dataId, group, tenant); 
  16.     } catch (Throwable e) { 
  17.         LOGGER.error("remove beta data error", e); 
  18.         return RestResultUtils.failed(500, false"remove beta data error"); 
  19.     } 
  20.     ConfigChangePublisher 
  21.             .notifyConfigChange(new ConfigDataChangeEvent(true, dataId, group, tenant, System.currentTimeMillis())); 
  22.     return RestResultUtils.success("stop beta ok"true); 
  23.  
  24. /** 
  25.  * Execute to query beta operation. 
  26.  * 
  27.  * @param dataId dataId string value. 
  28.  * @param group  group string value. 
  29.  * @param tenant tenant string value. 
  30.  * @return RestResult for ConfigInfo4Beta. 
  31.  */ 
  32. @GetMapping(params = "beta=true"
  33. @Secured(action = ActionTypes.READ, parser = ConfigResourceParser.class) 
  34. public RestResult<ConfigInfo4Beta> queryBeta(@RequestParam(value = "dataId") String dataId, 
  35.         @RequestParam(value = "group") String group
  36.         @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant) { 
  37.     try { 
  38.         ConfigInfo4Beta ci = persistService.findConfigInfo4Beta(dataId, group, tenant); 
  39.         return RestResultUtils.success("stop beta ok", ci); 
  40.     } catch (Throwable e) { 
  41.         LOGGER.error("remove beta data error", e); 
  42.         return RestResultUtils.failed("remove beta data error"); 
  43.     } 

总结

Nacos2.0使用长连接代替了短连接的长轮询,性能几乎提升了10倍。在阿里内部,也在逐渐推进Nacos2作为统一的配置中心。目前在微服务引擎(Micro Service Engine,简称 MSE),Nacos作为注册配置中心,提供了纯托管的服务,只需要购买Nacos专业版即可享受到10倍的性能提升。

此外,MSE微服务引擎顾名思义,是一个面向业界主流开源微服务生态的一站式微服务平台, 帮助微服务用户更稳定、更便捷、更低成本的使用开源微服务技术构建微服务体系。不但提供注册中心、配置中心全托管(兼容 Nacos/ZooKeeper/Eureka),而且提供网关(兼容 Ingress/Enovy)和无侵入的开源增强服务治理能力。

在阿里,MSE微服务引擎已经被大规模的接入使用,经历阿里内部生产考验以及反复淬炼,其中微服务服务治理能力支撑了大量的微服务系统,对包括Spring Cloud、Dubbo等微服务框架的治理功能增强,提供了无损上下线、金丝雀发布、离群摘除以及无损滚动升级的功能。

如果有快速搭建高性能微服务以及大规模服务治理的需求,相比于从零搭建和运维,MSE微服务引擎是一个不错的选择。

原文链接:

延伸 · 阅读

精彩推荐