服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|JavaScript|易语言|

服务器之家 - 编程语言 - Java教程 - Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

2021-09-14 01:07Android开发编程 Java教程

在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本。

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

前言

ThreadLocal被ThreadLocalMap中的entry的key弱引用,如果出现GC的情况时,

没有被其他对象引用,会被回收,但是ThreadLocal对应的value却不会回收,容易造成内存泄漏,这也间接导致了内存溢出以及数据假丢失;

那么问题来了,有没有更高效的ThreadLocal有;

今天我们就来分析一波FastThreadLocalThread

一、FastThreadLocalThread源码分析

Netty为了在某些场景下提高性能,改进了jdk ThreadLocal,Netty实现的FastThreadLocal 优化了Java 原生 ThreadLocal 的访问速度,存储速度。避免了检测弱引用带来的 value 回收难问题,和数组位置冲突带来的线性查找问题,解决这些问题并不是没有代价;

Netty实现的 FastThreadLocal 底层也是通过数组存储 value 对象,与Java原生ThreadLocal使用自身作为Entry的key不同,FastThreadLocal通过保存数组的全局唯一下标,实现了对value的快速访问。同时FastThreadLocal 也实现了清理对象的方法;

1、FastThreadLocalThread

在Netty中,要使用 FastThreadLocal 实现线程本地变量需要将线程包装成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,会使用 slowThreadLocalMap的 ThreadLocal 来存储变量副本;

  1. io.netty.util.concurrent.DefaultThreadFactory 
  2. @Override 
  3. public Thread newThread(Runnable r) { 
  4.     Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet()); 
  5.     // 一般daemon为false,意思是不设置为守护线程 
  6.     if (t.isDaemon() != daemon) { 
  7.         t.setDaemon(daemon); 
  8.     } 
  9.     // 优先级 默认为5 
  10.     if (t.getPriority() != priority) { 
  11.         t.setPriority(priority); 
  12.     } 
  13.     return t; 
  14. protected Thread newThread(Runnable r, String name) { 
  15.     return new FastThreadLocalThread(threadGroup, r, name); 

FastThreadLocalThread 继承自Thread类,有如下成员变量:

  1. io.netty.util.concurrent.FastThreadLocalThread 
  2. // 任务执行完,是否清除FastThreadLocal的标记 
  3. private final boolean cleanupFastThreadLocals; 
  4. // 类似于Thread类中ThreadLocalMap,为了实现FastThreadLocal 
  5. private InternalThreadLocalMap threadLocalMap; 

2、 InternalThreadLocalMap

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 对象实例。在第一次获取FTL数据时,会初始化FastThreadLocalThread.threadLocalMap,调用的构造函数如下:

  1. private InternalThreadLocalMap() { 
  2.             //为了简便,InternalThreadLocalMap父类 
  3.             //UnpaddedInternalThreadLocalMap不展开介绍 
  4.             super(newIndexedVariableTable()); 
  5.         } 
  6.         //默认的数组大小为32,且使用UNSET对象填充数组 
  7.         //如果下标处数据为UNSET,则表示没有数据 
  8.         private static Object[] newIndexedVariableTable() { 
  9.             Object[] array = new Object[32]; 
  10.             Arrays.fill(array, UNSET); 
  11.             return array; 
  12.         } 

为了避免写时候影响同一cpu缓冲行的其他数据并发访问,其使用了缓存行填充技术 (cpu 缓冲行填充),在类定义中声明了如下long字段进行填充;

  1. //InternalThreadLocalMap 
  2. // Cache line padding (must be public)  
  3. // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.  
  4. public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9; 

FTL使用的数组下标是InternalThreadLocalMap中的静态变量nextIndex统一递增生成的:

  1. static final AtomicInteger nextIndex = new AtomicInteger(); 
  2. public static int nextVariableIndex() { 
  3.     //Netty中所有FTL数组下标都是通过递增这个静态变量实现的 
  4.     //采用静态变量生成所有FTL元素在数组中的下标会造成一个问题, 
  5.     //会造成InternalThreadLocalMap中数组不必要的自动扩容 
  6.     int index = nextIndex.getAndIncrement(); 
  7.     if (index < 0) { 
  8.         nextIndex.decrementAndGet(); 
  9.         throw new IllegalStateException("too many thread-local indexed variables"); 
  10.     } 
  11.     return index

InternalThreadLocalMap.nextVariableIndex()方法获取FTL在该FastThreadLocalThread.threadLocalMap数组下标,因为InternalThreadLocalMap.nextVariableIndex() 使用静态域 nextIndex 递增维护所有FTL的下标,会造成后面实例化的 FTL 下标过大,如果FTL下标大于其对应 FastThreadLocalThread.threadLocalMap 数组的长度,会进行数组的自动扩容,如下:

  1. private void expandIndexedVariableTableAndSet(int index, Object value) { 
  2.     Object[] oldArray = indexedVariables; 
  3.     final int oldCapacity = oldArray.length; 
  4.     //下面复杂的实现是为了将newCapacity规范为最接近的一个2的指数,  
  5.     //这段代码在早期的 jdk HashMap 中见过 
  6.     int newCapacity = index
  7.     newCapacity |= newCapacity >>>  1; 
  8.     newCapacity |= newCapacity >>>  2; 
  9.     newCapacity |= newCapacity >>>  4; 
  10.     newCapacity |= newCapacity >>>  8; 
  11.     newCapacity |= newCapacity >>> 16; 
  12.     newCapacity ++; 
  13.     Object[] newArray = Arrays.copyOf(oldArray, newCapacity); 
  14.     Arrays.fill(newArray, oldCapacity, newArray.length, UNSET); 
  15.     newArray[index] = value; 
  16.     indexedVariables = newArray; 

3、FastThreadLocal

构造函数:

有两个重要的下标域,FTL不仅在FastThreadLocalThread.threadLocalMap中保存了用户实际使用的value(在数组中的下标为index),还在数组中保存为了实现清理记录的相关数据,也即下标variablesToRemoveIndex,一般情况 variablesToRemoveIndex = 0;因为variablesToRemoveIndex 是静态变量,所以全局唯一;

  1. //如果在该FTL中放入了数据,也就实际调用了其set或get函数,会在 
  2.        //该FastThreadLocalThread.threadLocalMap数组的 
  3.        // variablesToRemoveIndex下标处放置一个IdentityHashMap, 
  4.        //并将该FTL放入IdentityHashMap中,在后续清理时会取出 
  5.        //variablesToRemoveIndex下标处的IdentityHashMap进行清理 
  6.         private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex(); 
  7.        //在threadLocalMap数组中存放实际数据的下标 
  8.         private final int index
  9.         public FastThreadLocal() { 
  10.             index = InternalThreadLocalMap.nextVariableIndex(); 
  11.         } 

用户可扩展的函数:

  1. //初始化 value 函数 
  2. protected V initialValue() throws Exception { 
  3.     return null
  4. //让使用者在该FTL被移除时可以有机会做些操作。 
  5. protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { } 

FastThreadLocalThread

Java高进进阶之FastThreadLocal源码详解(修复ThreadLocal的缺陷)

cleanupFastThreadLocals 字段在 4.1 的最新版本中已经没有在用到了

  1. /** 
  2.  * true,表示FTL会在线程结束时被主动清理 见  FastThreadLocalRunnable 类 
  3.  * false,需要将FTL放入后台清理线程的队列中 
  4.  */ 
  5. // This will be set to true if we have a chance to wrap the Runnable. 
  6. //这个字段则用于标识该线程在结束时是否会主动清理FTL 
  7. private final boolean cleanupFastThreadLocals; 
  8. //次对象将在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 时候创建 
  9. private InternalThreadLocalMap threadLocalMap; 
  10. public FastThreadLocalThread(Runnable target) { 
  11.     super(FastThreadLocalRunnable.wrap(target)); 
  12.     cleanupFastThreadLocals = true

4、 set 方法

  1. public final void set(V value) { 
  2.     //判断设置的 value 值是否是缺省值 
  3.     if (value != InternalThreadLocalMap.UNSET) { 
  4.         //获取当前线程的 InternalThreadLocalMap , 如果当前线程为FastThreadLocalThread,那么直接通过threadLocalMap引用获取 
  5.         //否则通过 jdk 原生的 threadLocal 获取 
  6.         InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); 
  7.         //FastThreadLocal 对应的 index 下标的 value 替换成新的 value 
  8.         setKnownNotUnset(threadLocalMap, value); 
  9.     } else { 
  10.         //如果放置的对象为UNSET,则表示清理,会对该FTL进行清理,类似毒丸对象 
  11.         remove(); 
  12.     } 

这里扩容方会调用 InternalThreadLocalMap.expandIndexedVariableTableAndSet

  1. private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) { 
  2.     //在数组下标index处放置实际对象,如果index大于数组length,会进行数组扩容. 
  3.     if (threadLocalMap.setIndexedVariable(index, value)) { 
  4.         //放置成功之后,将该FTL加入到 variablesToRemoveIndex 下标的 
  5.         //IdentityHashMap,等待后续清理 
  6.         addToVariablesToRemove(threadLocalMap, this); 
  7.     } 
  1. /** 
  2.  * 该FTL加入到variablesToRemoveIndex下标的IdentityHashMap 
  3.  * IdentityHashMap的特性可以保证同一个实例不会被多次加入到该位置 
  4.  */ 
  5. @SuppressWarnings("unchecked"
  6. private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) { 
  7.     //获取 variablesToRemoveIndex下标处的 IdentityHashMap 
  8.     Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); 
  9.     Set<FastThreadLocal<?>> variablesToRemove; 
  10.     //如果是第一次获取,则 variablesToRemoveIndex下标处的值为 UNSET 
  11.     if (v == InternalThreadLocalMap.UNSET || v == null) { 
  12.         //新建一个新的 IdentityHashMap 并 
  13.         variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>()); 
  14.         //放入到下标variablesToRemoveIndex处 
  15.         threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove); 
  16.     } else { 
  17.         variablesToRemove = (Set<FastThreadLocal<?>>) v; 
  18.     } 
  19.     //将该FTL放入该IdentityHashMap中 
  20.     variablesToRemove.add(variable); 

下面看InternalThreadLocalMap.get()实现:

  1. public static InternalThreadLocalMap get() { 
  2.     Thread thread = Thread.currentThread(); 
  3.     //首先看当前 thread 是否为FastThreadLocalThread实例 
  4.     //如果是的话,可以快速通过引用,获取到其 threadLocalMap 
  5.     if (thread instanceof FastThreadLocalThread) { 
  6.         return fastGet((FastThreadLocalThread) thread); 
  7.     } else { 
  8.         //如果不是,则 jdk 原生慢速获取到其 threadLocalMap 
  9.         return slowGet(); 
  10.     } 

5、 get 方法

get方法极为简单,实现如下:

  1. ===========================FastThreadLocal========================== 
  2. public final V get() { 
  3.         InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); 
  4.         Object v = threadLocalMap.indexedVariable(index); 
  5.         if (v != InternalThreadLocalMap.UNSET) { 
  6.             return (V) v; 
  7.         } 
  8.         return initialize(threadLocalMap); 
  9.     } 

首先获取当前线程的map,然后根据 FastThreadLocal的index 获取value,然后返回,如果是空对象,则通过 initialize 返回,initialize 方法会将返回值设置到 map 的槽位中,并放进 Set 中;

  1. initialize 
  2. ============================FastThreadLocal========================== 
  3. private V initialize(InternalThreadLocalMap threadLocalMap) { 
  4.     V v = null
  5.     try { 
  6.         //1、获取初始值 
  7.         v = initialValue(); 
  8.     } catch (Exception e) { 
  9.         throw new RuntimeException(e); 
  10.     } 
  11.     // 2、设置value到InternalThreadLocalMap中 
  12.     threadLocalMap.setIndexedVariables(index, v); 
  13.     // 3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中 
  14.     addToVariablesToRemove(threadLocalMap, this); 
  15.     return v; 
  16. //初始化参数:由子类复写 
  17. protected V initialValue() throws Exception { 
  18.     return null
  • 获取 ThreadLocalMap
  • 直接通过索引取出对象
  • 如果为空那么调用初始化方法初始化

6、ftl的资源回收机制

netty中ftl的两种回收机制回收机制:

自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理;

手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除;

  1. FastThreadLocalRunnable 
  2. final class FastThreadLocalRunnable implements Runnable { 
  3.     private final Runnable runnable; 
  4.     @Override 
  5.     public void run() { 
  6.         try { 
  7.             runnable.run(); 
  8.         } finally { 
  9.             FastThreadLocal.removeAll(); 
  10.         } 
  11.     } 
  12.     static Runnable wrap(Runnable runnable) { 
  13.         return runnable instanceof FastThreadLocalRunnable  
  14.                 ? runnable : new FastThreadLocalRunnable(runnable); 
  15.     } 

如果将线程执行的任务包装成 FastThreadLocalRunnable,那么在任务执行完后自动删除ftl的资源。

  1. ===============================FastThreadLocal=========================== 
  2. public static void removeAll() { 
  3.     // 获取到map 
  4.     InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet(); 
  5.     if (threadLocalMap == null) { 
  6.         return
  7.     } 
  8.     try { 
  9.         // 获取到Set<FastThreadLocal>集合 
  10.         Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex); 
  11.         if (v != null && v != InternalThreadLocalMap.UNSET) { 
  12.             @SuppressWarnings("unchecked"
  13.             Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v; 
  14.             // 将Set转换为数组 
  15.             FastThreadLocal<?>[] variablesToRemoveArray = 
  16.                     variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]); 
  17.             // 遍历数组,删除每一个FastThreadLocal对应的value 
  18.             for (FastThreadLocal<?> tlv: variablesToRemoveArray) { 
  19.                 tlv.remove(threadLocalMap); 
  20.             } 
  21.         } 
  22.     } finally { 
  23.         // 删除当前线程的InternalThreadLocalMap 
  24.         InternalThreadLocalMap.remove(); 
  25.     } 
  26. public static void remove() { 
  27.     Thread thread = Thread.currentThread(); 
  28.     if (thread instanceof FastThreadLocalThread) { 
  29.          // 将FastThreadLocalThread 内部的map置位null 
  30.         ((FastThreadLocalThread) thread).setThreadLocalMap(null); 
  31.     } else { 
  32.         // 将 ThreadLocal内部ThreadLocalMap 中的value置位null 
  33.         slowThreadLocalMap.remove(); 
  34.     } 

remove方法:

  1. ===============================FastThreadLocal========================== 
  2. private void remove() { 
  3.     remove(InternalThreadLocalMap.getIfSet()); 
  4. private void remove(InternalThreadLocalMap threadLocalMap) { 
  5.     if (threadLocalMap == null) { 
  6.         return
  7.     } 
  8.     // 从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value并设UNSET 
  9.     Object v = threadLocalMap.removeIndexedVariable(index); 
  10.     // 从 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象 
  11.     removeFromVariablesToRemove(threadLocalMap, this); 
  12.     // 如果删除的是有效值,则进行onRemove方法的回调 
  13.     if (v != InternalThreadLocalMap.UNSET) { 
  14.         try { 
  15.             // 回调子类复写的onRemoved方法,默认为空实现 
  16.             onRemoved((V) v); 
  17.         } catch (Exception e) { 
  18.             throw new RuntimeException(e); 
  19.         } 
  20.     } 

总结

只有不断的学习,不断的找到自己的缺点才可以进步;

一起加油;

原文地址:https://mp.weixin.qq.com/s/WNfFAgnXHyBSbCGwo1pyJQ

延伸 · 阅读

精彩推荐