服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - 编程技术 - 从线上偶发的宕机事件看Netty流量控制

从线上偶发的宕机事件看Netty流量控制

2021-09-23 23:34Kirito的技术分享 编程技术

目前移动端的使用场景中会用到大量的消息推送,push消息可以帮助运营人员更高效地实现运营目标(比如给用户推送营销活动或者提醒APP新功能)。

 从线上偶发的宕机事件看Netty流量控制

业务背景

目前移动端的使用场景中会用到大量的消息推送,push消息可以帮助运营人员更高效地实现运营目标(比如给用户推送营销活动或者提醒APP新功能)。

对于推送系统来说需要具备以下两个特性:

  • 消息秒级送到用户,无延时,支持每秒百万推送,单机百万长连接。

  • 支持通知、文本、自定义消息透传等展现形式。正是由于以上原因,对于系统的开发和维护带来了挑战。下图是推送系统的简单描述(API->推送模块->手机)。

从线上偶发的宕机事件看Netty流量控制

问题背景

推送系统中长连接集群在稳定性测试、压力测试阶运行一段时间后随机会出现一个进程挂掉的情况,概率较小(频率为一个月左右发生一次),这会影响部分客户端消息送到的时效。

推送系统中的长连接节点(Broker系统)是基于Netty开发,此节点维护了服务端和手机终端的长连接,线上问题出现后,添加Netty内存泄露监控参数进行问题排查,观察多天但并未排查出问题。

由于长连接节点是Netty开发,为便于读者理解,下面简单介绍一下Netty。

Netty介绍

Netty是一个高性能、异步事件驱动的NIO框架,基于Java NIO提供的API实现。它提供了对TCP、UDP和文件传输的支持,作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,HBase,Hadoop,Bees,Dubbo等开源组件也基于Netty的NIO框架构建。

问题分析

猜想

最初猜想是长连接数导致的,但经过排查日志、分析代码,发现并不是此原因造成。

长连接数:39万,如下图:

从线上偶发的宕机事件看Netty流量控制

 

连接数 

 

每个channel字节大小1456, 按40万长连接计算,不致于产生内存过大现象。

查看GC日志

查看GC日志,发现进程挂掉之前频繁full GC(频率5分钟一次),但内存并未降低,怀疑堆外内存泄露。

分析heap内存情况

ChannelOutboundBuffer对象占将近5G内存,泄露原因基本可以确定:ChannelOutboundBuffer的entry数过多导致,查看ChannelOutboundBuffer的源码可以分析出,是ChannelOutboundBuffer中的数据。

没有写出去,导致一直积压;

ChannelOutboundBuffer内部是一个链表结构。

从线上偶发的宕机事件看Netty流量控制

从上图分析数据未写出去,为什么会出现这种情况?

代码中实际有判断连接是否可用的情况(Channel.isActive),并且会对超时的连接进行关闭。从历史经验来看,这种情况发生在连接半打开(客户端异常关闭)的情况比较多---双方不进行数据通信无问题。

按上述猜想,测试环境进行重现和测试。

1)模拟客户端集群,并与长连接服务器建立连接,设置客户端节点的防火墙,模拟服务器与客户端网络异常的场景(即要模拟Channel.isActive调用成功,但数据实际发送不出去的情况)。
2)调小堆外内存,持续发送测试消息给之前的客户端。消息大小(1K左右)。
3)按照128M内存来计算,实际上调用9W多次就会出现。

问题解决

启用autoRead机制

当channel不可写时,关闭autoRead;

  1. public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 
  2.     if (!ctx.channel().isWritable()) { 
  3.         Channel channel = ctx.channel(); 
  4.         ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel); 
  5.         String clientId = ""
  6.         if (channelInfo != null) { 
  7.             clientId = channelInfo.getClientId(); 
  8.         } 
  9.  
  10.         LOGGER.info("channel is unwritable, turn off autoread, clientId:{}", clientId); 
  11.         channel.config().setAutoRead(false); 
  12.     } 

当数据可写时开启autoRead;

  1. @Override
  2. public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception 
  3.     Channel channel = ctx.channel(); 
  4.     ChannelInfo channelInfo = ChannelManager.CHANNEL_CHANNELINFO.get(channel); 
  5.     String clientId = ""
  6.     if (channelInfo != null) { 
  7.         clientId = channelInfo.getClientId(); 
  8.     } 
  9.     if (channel.isWritable()) { 
  10.         LOGGER.info("channel is writable again, turn on autoread, clientId:{}", clientId); 
  11.         channel.config().setAutoRead(true); 
  12.     } 

说明:

从线上偶发的宕机事件看Netty流量控制

autoRead的作用是更精确的速率控制,如果打开的时候Netty就会帮我们注册读事件。当注册了读事件后,如果网络可读,则Netty就会从channel读取数据。那如果autoread关掉后,则Netty会不注册读事件。

这样即使是对端发送数据过来了也不会触发读事件,从而也不会从channel读取到数据。当recv_buffer满时,也就不会再接收数据。

设置高低水位

  1. serverBootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(1024 * 10248 * 1024 * 1024)); 

注:高低水位配合后面的isWritable使用

增加channel.isWritable()的判断

channel是否可用除了校验channel.isActive()还需要加上channel.isWrite()的判断,isActive只是保证连接是否激活,而是否可写由isWrite来决定。

  1. private void writeBackMessage(ChannelHandlerContext ctx, MqttMessage message) { 
  2.     Channel channel = ctx.channel(); 
  3.     //增加channel.isWritable()的判断
  4.     if (channel.isActive() && channel.isWritable()) { 
  5.         ChannelFuture cf = channel.writeAndFlush(message); 
  6.         if (cf.isDone() && cf.cause() != null) { 
  7.             LOGGER.error("channelWrite error!", cf.cause()); 
  8.             ctx.close(); 
  9.         } 
  10.     } 

注:isWritable可以来控制ChannelOutboundBuffer,不让其无限制膨胀。其机制就是利用设置好的channel高低水位来进行判断。

问题验证

修改后再进行测试,发送到27W次也并不报错;

解决思路分析

一般Netty数据处理流程如下:将读取的数据交由业务线程处理,处理完成再发送出去(整个过程是异步的),Netty为了提高网络的吞吐量,在业务层与socket之间增加了一个ChannelOutboundBuffer。

在调用channel.write的时候,所有写出的数据其实并没有写到socket,而是先写到ChannelOutboundBuffer。当调用channel.flush的时候才真正的向socket写出。因为这中间有一个buffer,就存在速率匹配了,而且这个buffer还是无界的(链表),也就是你如果没有控制channel.write的速度,会有大量的数据在这个buffer里堆积,如果又碰到socket写不出数据的时候(isActive此时判断无效)或者写得慢的情况。

很有可能的结果就是资源耗尽,而且如果ChannelOutboundBuffer存放的是

DirectByteBuffer,这会让问题更加难排查。

流程可抽象如下:

从线上偶发的宕机事件看Netty流量控制

从上面的分析可以看出,步骤一写太快(快到处理不过来)或者下游发送不出数据都会造成问题,这实际是一个速率匹配问题。

Netty源码说明

超过高水位

当ChannelOutboundBuffer的容量超过高水位设定阈值后,isWritable()返回false,设置channel不可写(setUnwritable),并且触发fireChannelWritabilityChanged()。

  1. private void incrementPendingOutboundBytes(long size, boolean invokeLater) { 
  2.     if (size == 0) { 
  3.         return
  4.     } 
  5.  
  6.     long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size); 
  7.     if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { 
  8.         setUnwritable(invokeLater); 
  9.     } 
  10. private void setUnwritable(boolean invokeLater) { 
  11.     for (;;) { 
  12.         final int oldValue = unwritable; 
  13.         final int newValue = oldValue | 1
  14.         if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { 
  15.             if (oldValue == 0 && newValue != 0) { 
  16.                 fireChannelWritabilityChanged(invokeLater); 
  17.             } 
  18.             break
  19.         } 
  20.     } 

低于低水位

当ChannelOutboundBuffer的容量低于低水位设定阈值后,isWritable()返回true,设置channel可写,并且触发fireChannelWritabilityChanged()。

  1. private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) { 
  2.     if (size == 0) { 
  3.         return
  4.     } 
  5.  
  6.     long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size); 
  7.     if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { 
  8.         setWritable(invokeLater); 
  9.     } 
  10. private void setWritable(boolean invokeLater) { 
  11.     for (;;) { 
  12.         final int oldValue = unwritable; 
  13.         final int newValue = oldValue & ~1
  14.         if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) { 
  15.             if (oldValue != 0 && newValue == 0) { 
  16.                 fireChannelWritabilityChanged(invokeLater); 
  17.             } 
  18.             break
  19.         } 
  20.     } 

总结

当ChannelOutboundBuffer的容量超过高水位设定阈值后,isWritable()返回false,表明消息产生堆积,需要降低写入速度。

当ChannelOutboundBuffer的容量低于低水位设定阈值后,isWritable()返回true,表明消息过少,需要提高写入速度。通过以上三个步骤修改后,部署线上观察半年未发生问题出现。

原文链接:https://mp.weixin.qq.com/s?__biz=MzI0NzEyODIyOA==&mid=2247487422&idx=1&sn=f9684b46fc38f347036c21b4fb3c3979&utm_source=tuicool&utm_medium=referral

延伸 · 阅读

精彩推荐