服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|Java教程|ASP教程|编程技术|正则表达式|C/C++|IOS|C#|Swift|Android|VB|R语言|JavaScript|易语言|vb.net|

服务器之家 - 编程语言 - 编程技术 - 基于Netty的代理网关设计与实现

基于Netty的代理网关设计与实现

2021-11-24 22:04阿里技术 编程技术

本文的技术路线。在实现代理网关之前,首先介绍下代理相关的原理及如何实现。

基于Netty的代理网关设计与实现

一、问题背景

平台端购置一批裸代理,来做广告异地展现审核。从外部购置的代理,使用方式为:

  • 通过给定的HTTP 的 API 提取代理 IP:PORT,返回的结果会给出代理的有效时长 3~5 分钟,以及代理所属地域;
  • 从提取的代理中,选取指定地域,添加认证信息,请求获取结果;

本文设计实现一个通过的代理网关:

  • 管理维护代理资源,并做代理的认证鉴权;
  • 对外暴露统一的代理入口,而非动态变化的代理IP:PORT;
  • 流量过滤及限流,比如:静态资源不走代理;

本文重点在代理网关本身的设计与实现,而非代理资源的管理与维护。

注:本文包含大量可执行的JAVA代码以解释代理相关的原理

二、技术路线

本文的技术路线。在现代理网关之前,首先介绍下代理相关的原理及如何实现

  • 透明代理;
  • 非透明代理;
  • 透明的上游代理;
  • 非透明的上游代理;

最后,本文要构建代理网关,本质上就是一个非透明的上游代理,并给出详细的设计与实现。

1.透明代理

透明代理是代理网关的基础,本文采用JAVA原生的NIO进行详细介绍。在实现代理网关时,实际使用的为NETTY框架。原生NIO的实现对理解NETTY的实现有帮助。

透明代理设计三个交互方,客户端、代理服务、服务端,其原理是:

基于Netty的代理网关设计与实现

  • 代理服务在收到连接请求时,判定:如果是CONNECT请求,需要回应代理连接成功消息到客户端;
  • CONNECT请求回应结束后,代理服务需要连接到CONNECT指定的远程服务器,然后直接转发客户端和远程服务通信;
  • 代理服务在收到非CONNECT请求时,需要解析出请求的远程服务器,然后直接转发客户端和远程服务通信;

需要注意的点是:

  • 通常HTTPS请求,在通过代理前,会发送CONNECT请求;连接成功后,会在信道上进行加密通信的握手协议;因此连接远程的时机是在CONNECT请求收到时,因为此后是加密数据;
  • 透明代理在收到CONNECT请求时,不需要传递到远程服务(远程服务不识别此请求);
  • 透明代理在收到非CONNECT请求时,要无条件转发;

完整的透明代理的实现不到约300行代码,完整摘录如下:

  1. @Slf4j
  2. public class SimpleTransProxy {
  3. public static void main(String[] args) throws IOException {
  4. int port = 8006;
  5. ServerSocketChannel localServer = ServerSocketChannel.open();
  6. localServer.bind(new InetSocketAddress(port));
  7. Reactor reactor = new Reactor();
  8. // REACTOR线程
  9. GlobalThreadPool.REACTOR_EXECUTOR.submit(reactor::run);
  10. // WORKER单线程调试
  11. while (localServer.isOpen()) {
  12. // 此处阻塞等待连接
  13. SocketChannel remoteClient = localServer.accept();
  14. // 工作线程
  15. GlobalThreadPool.WORK_EXECUTOR.submit(new Runnable() {
  16. @SneakyThrows
  17. @Override
  18. public void run() {
  19. // 代理到远程
  20. SocketChannel remoteServer = new ProxyHandler().proxy(remoteClient);
  21. // 透明传输
  22. reactor.pipe(remoteClient, remoteServer)
  23. .pipe(remoteServer, remoteClient);
  24. }
  25. });
  26. }
  27. }
  28. }
  29. @Data
  30. class ProxyHandler {
  31. private String method;
  32. private String host;
  33. private int port;
  34. private SocketChannel remoteServer;
  35. private SocketChannel remoteClient;
  36. /**
  37. * 原始信息
  38. */
  39. private List buffers = new ArrayList<>();
  40. private StringBuilder stringBuilder = new StringBuilder();
  41. /**
  42. * 连接到远程
  43. * @param remoteClient
  44. * @return
  45. * @throws IOException
  46. */
  47. public SocketChannel proxy(SocketChannel remoteClient) throws IOException {
  48. this.remoteClient = remoteClient;
  49. connect();
  50. return this.remoteServer;
  51. }
  52. public void connect() throws IOException {
  53. // 解析METHOD, HOST和PORT
  54. beforeConnected();
  55. // 链接REMOTE SERVER
  56. createRemoteServer();
  57. // CONNECT请求回应,其他请求WRITE THROUGH
  58. afterConnected();
  59. }
  60. protected void beforeConnected() throws IOException {
  61. // 读取HEADER
  62. readAllHeader();
  63. // 解析HOST和PORT
  64. parseRemoteHostAndPort();
  65. }
  66. /**
  67. * 创建远程连接
  68. * @throws IOException
  69. */
  70. protected void createRemoteServer() throws IOException {
  71. remoteServer = SocketChannel.open(new InetSocketAddress(host, port));
  72. }
  73. /**
  74. * 连接建立后预处理
  75. * @throws IOException
  76. */
  77. protected void afterConnected() throws IOException {
  78. // 当CONNECT请求时,默认写入200到CLIENT
  79. if ("CONNECT".equalsIgnoreCase(method)) {
  80. // CONNECT默认为443端口,根据HOST再解析
  81. remoteClient.write(ByteBuffer.wrap("HTTP/1.0 200 Connection Established\r\nProxy-agent: nginx\r\n\r\n".getBytes()));
  82. } else {
  83. writeThrouth();
  84. }
  85. }
  86. protected void writeThrouth() {
  87. buffers.forEach(byteBuffer -> {
  88. try {
  89. remoteServer.write(byteBuffer);
  90. } catch (IOException e) {
  91. e.printStackTrace();
  92. }
  93. });
  94. }
  95. /**
  96. * 读取请求内容
  97. * @throws IOException
  98. */
  99. protected void readAllHeader() throws IOException {
  100. while (true) {
  101. ByteBuffer clientBuffer = newByteBuffer();
  102. int read = remoteClient.read(clientBuffer);
  103. clientBuffer.flip();
  104. appendClientBuffer(clientBuffer);
  105. if (read < clientBuffer.capacity()) {
  106. break;
  107. }
  108. }
  109. }
  110. /**
  111. * 解析出HOST和PROT
  112. * @throws IOException
  113. */
  114. protected void parseRemoteHostAndPort() throws IOException {
  115. // 读取第一批,获取到METHOD
  116. method = parseRequestMethod(stringBuilder.toString());
  117. // 默认为80端口,根据HOST再解析
  118. port = 80;
  119. if ("CONNECT".equalsIgnoreCase(method)) {
  120. port = 443;
  121. }
  122. this.host = parseHost(stringBuilder.toString());
  123. URI remoteServerURI = URI.create(host);
  124. host = remoteServerURI.getHost();
  125. if (remoteServerURI.getPort() > 0) {
  126. port = remoteServerURI.getPort();
  127. }
  128. }
  129. protected void appendClientBuffer(ByteBuffer clientBuffer) {
  130. buffers.add(clientBuffer);
  131. stringBuilder.append(new String(clientBuffer.array(), clientBuffer.position(), clientBuffer.limit()));
  132. }
  133. protected static ByteBuffer newByteBuffer() {
  134. // buffer必须大于7,保证能读到method
  135. return ByteBuffer.allocate(128);
  136. }
  137. private static String parseRequestMethod(String rawContent) {
  138. // create uri
  139. return rawContent.split("\r\n")[0].split(" ")[0];
  140. }
  141. private static String parseHost(String rawContent) {
  142. String[] headers = rawContent.split("\r\n");
  143. String host = "host:";
  144. for (String header : headers) {
  145. if (header.length() > host.length()) {
  146. String key = header.substring(0, host.length());
  147. String value = header.substring(host.length()).trim();
  148. if (host.equalsIgnoreCase(key)) {
  149. if (!value.startsWith("http://") && !value.startsWith("https://")) {
  150. value = "http://" + value;
  151. }
  152. return value;
  153. }
  154. }
  155. }
  156. return "";
  157. }
  158. }
  159. @Slf4j
  160. @Data
  161. class Reactor {
  162. private Selector selector;
  163. private volatile boolean finish = false;
  164. @SneakyThrows
  165. public Reactor() {
  166. selector = Selector.open();
  167. }
  168. @SneakyThrows
  169. public Reactor pipe(SocketChannel from, SocketChannel to) {
  170. from.configureBlocking(false);
  171. from.register(selector, SelectionKey.OP_READ, new SocketPipe(this, from, to));
  172. return this;
  173. }
  174. @SneakyThrows
  175. public void run() {
  176. try {
  177. while (!finish) {
  178. if (selector.selectNow() > 0) {
  179. Iterator it = selector.selectedKeys().iterator();
  180. while (it.hasNext()) {
  181. SelectionKey selectionKey = it.next();
  182. if (selectionKey.isValid() && selectionKey.isReadable()) {
  183. ((SocketPipe) selectionKey.attachment()).pipe();
  184. }
  185. it.remove();
  186. }
  187. }
  188. }
  189. } finally {
  190. close();
  191. }
  192. }
  193. @SneakyThrows
  194. public synchronized void close() {
  195. if (finish) {
  196. return;
  197. }
  198. finish = true;
  199. if (!selector.isOpen()) {
  200. return;
  201. }
  202. for (SelectionKey key : selector.keys()) {
  203. closeChannel(key.channel());
  204. key.cancel();
  205. }
  206. if (selector != null) {
  207. selector.close();
  208. }
  209. }
  210. public void cancel(SelectableChannel channel) {
  211. SelectionKey key = channel.keyFor(selector);
  212. if (Objects.isNull(key)) {
  213. return;
  214. }
  215. key.cancel();
  216. }
  217. @SneakyThrows
  218. public void closeChannel(Channel channel) {
  219. SocketChannel socketChannel = (SocketChannel)channel;
  220. if (socketChannel.isConnected() && socketChannel.isOpen()) {
  221. socketChannel.shutdownOutput();
  222. socketChannel.shutdownInput();
  223. }
  224. socketChannel.close();
  225. }
  226. }
  227. @Data
  228. @AllArgsConstructor
  229. class SocketPipe {
  230. private Reactor reactor;
  231. private SocketChannel from;
  232. private SocketChannel to;
  233. @SneakyThrows
  234. public void pipe() {
  235. // 取消监听
  236. clearInterestOps();
  237. GlobalThreadPool.PIPE_EXECUTOR.submit(new Runnable() {
  238. @SneakyThrows
  239. @Override
  240. public void run() {
  241. int totalBytesRead = 0;
  242. ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
  243. while (valid(from) && valid(to)) {
  244. byteBuffer.clear();
  245. int bytesRead = from.read(byteBuffer);
  246. totalBytesRead = totalBytesRead + bytesRead;
  247. byteBuffer.flip();
  248. to.write(byteBuffer);
  249. if (bytesRead < byteBuffer.capacity()) {
  250. break;
  251. }
  252. }
  253. if (totalBytesRead < 0) {
  254. reactor.closeChannel(from);
  255. reactor.cancel(from);
  256. } else {
  257. // 重置监听
  258. resetInterestOps();
  259. }
  260. }
  261. });
  262. }
  263. protected void clearInterestOps() {
  264. from.keyFor(reactor.getSelector()).interestOps(0);
  265. to.keyFor(reactor.getSelector()).interestOps(0);
  266. }
  267. protected void resetInterestOps() {
  268. from.keyFor(reactor.getSelector()).interestOps(SelectionKey.OP_READ);
  269. to.keyFor(reactor.getSelector()).interestOps(SelectionKey.OP_READ);
  270. }
  271. private boolean valid(SocketChannel channel) {
  272. return channel.isConnected() && channel.isRegistered() && channel.isOpen();
  273. }
  274. }

以上,借鉴NETTY:

  1. 首先初始化REACTOR线程,然后开启代理监听,当收到代理请求时处理。
  2. 代理服务在收到代理请求时,首先做代理的预处理,然后又SocketPipe做客户端和远程服务端双向转发。
  3. 代理预处理,首先读取第一个HTTP请求,解析出METHOD, HOST, PORT。
  4. 如果是CONNECT请求,发送回应Connection Established,然后连接远程服务端,并返回SocketChannel
  5. 如果是非CONNECT请求,连接远程服务端,写入原始请求,并返回SocketChannel
  6. SocketPipe在客户端和远程服务端,做双向的转发;其本身是将客户端和服务端的SocketChannel注册到REACTOR
  7. REACTOR在监测到READABLE的CHANNEL,派发给SocketPipe做双向转发。

测试

代理的测试比较简单,指向代码后,代理服务监听8006端口,此时:

  1. curl -x 'localhost:8006' http://httpbin.org/get测试HTTP请求
  2. curl -x 'localhost:8006' https://httpbin.org/get测试HTTPS请求

注意,此时代理服务代理了HTTPS请求,但是并不需要-k选项,指示非安全的代理。因为代理服务本身并没有作为一个中间人,并没有解析出客户端和远程服务端通信的内容。在非透明代理时,需要解决这个问题。

2.非透明代理

非透明代理,需要解析出客户端和远程服务端传输的内容,并做相应的处理。

当传输为HTTP协议时,SocketPipe传输的数据即为明文的数据,可以拦截后直接做处理。

当传输为HTTPS协议时,SocketPipe传输的有效数据为加密数据,并不能透明处理。

另外,无论是传输的HTTP协议还是HTTPS协议,SocketPipe读到的都为非完整的数据,需要做聚批的处理。

SocketPipe聚批问题,可以采用类似BufferedInputStream对InputStream做Decorate的模式来实现,相对比较简单;详细可以参考NETTY的HttpObjectAggregator;

HTTPS原始请求和结果数据的加密和解密的处理,需要实现的NIO的SOCKET CHANNEL;

SslSocketChannel封装原理

考虑到目前JDK自带的NIO的SocketChannel并不支持SSL;已有的SSLSocket是阻塞的OIO。如图:

基于Netty的代理网关设计与实现

可以看出

  • 每次入站数据和出站数据都需要 SSL SESSION 做握手;
  • 入站数据做解密,出站数据做加密;
  • 握手,数据加密和数据解密是统一的一套状态机;

基于Netty的代理网关设计与实现

以下,代码实现 SslSocketChannel

  1. public class SslSocketChannel {
  2. /**
  3. * 握手加解密需要的四个存储
  4. */
  5. protected ByteBuffer myAppData; // 明文
  6. protected ByteBuffer myNetData; // 密文
  7. protected ByteBuffer peerAppData; // 明文
  8. protected ByteBuffer peerNetData; // 密文
  9. /**
  10. * 握手加解密过程中用到的异步执行器
  11. */
  12. protected ExecutorService executor = Executors.newSingleThreadExecutor();
  13. /**
  14. * 原NIO 的 CHANNEL
  15. */
  16. protected SocketChannel socketChannel;
  17. /**
  18. * SSL 引擎
  19. */
  20. protected SSLEngine engine;
  21. public SslSocketChannel(SSLContext context, SocketChannel socketChannel, boolean clientMode) throws Exception {
  22. // 原始的NIO SOCKET
  23. this.socketChannel = socketChannel;
  24. // 初始化BUFFER
  25. SSLSession dummySession = context.createSSLEngine().getSession();
  26. myAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
  27. myNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
  28. peerAppData = ByteBuffer.allocate(dummySession.getApplicationBufferSize());
  29. peerNetData = ByteBuffer.allocate(dummySession.getPacketBufferSize());
  30. dummySession.invalidate();
  31. engine = context.createSSLEngine();
  32. engine.setUseClientMode(clientMode);
  33. engine.beginHandshake();
  34. }
  35. /**
  36. * 参考 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
  37. * 实现的 SSL 的握手协议
  38. * @return
  39. * @throws IOException
  40. */
  41. protected boolean doHandshake() throws IOException {
  42. SSLEngineResult result;
  43. HandshakeStatus handshakeStatus;
  44. int appBufferSize = engine.getSession().getApplicationBufferSize();
  45. ByteBuffer myAppData = ByteBuffer.allocate(appBufferSize);
  46. ByteBuffer peerAppData = ByteBuffer.allocate(appBufferSize);
  47. myNetData.clear();
  48. peerNetData.clear();
  49. handshakeStatus = engine.getHandshakeStatus();
  50. while (handshakeStatus != HandshakeStatus.FINISHED && handshakeStatus != HandshakeStatus.NOT_HANDSHAKING) {
  51. switch (handshakeStatus) {
  52. case NEED_UNWRAP:
  53. if (socketChannel.read(peerNetData) < 0) {
  54. if (engine.isInboundDone() && engine.isOutboundDone()) {
  55. return false;
  56. }
  57. try {
  58. engine.closeInbound();
  59. } catch (SSLException e) {
  60. log.debug("收到END OF STREAM,关闭连接.", e);
  61. }
  62. engine.closeOutbound();
  63. handshakeStatus = engine.getHandshakeStatus();
  64. break;
  65. }
  66. peerNetData.flip();
  67. try {
  68. result = engine.unwrap(peerNetData, peerAppData);
  69. peerNetData.compact();
  70. handshakeStatus = result.getHandshakeStatus();
  71. } catch (SSLException sslException) {
  72. engine.closeOutbound();
  73. handshakeStatus = engine.getHandshakeStatus();
  74. break;
  75. }
  76. switch (result.getStatus()) {
  77. case OK:
  78. break;
  79. case BUFFER_OVERFLOW:
  80. peerAppData = enlargeApplicationBuffer(engine, peerAppData);
  81. break;
  82. case BUFFER_UNDERFLOW:
  83. peerNetData = handleBufferUnderflow(engine, peerNetData);
  84. break;
  85. case CLOSED:
  86. if (engine.isOutboundDone()) {
  87. return false;
  88. } else {
  89. engine.closeOutbound();
  90. handshakeStatus = engine.getHandshakeStatus();
  91. break;
  92. }
  93. default:
  94. throw new IllegalStateException("无效的握手状态: " + result.getStatus());
  95. }
  96. break;
  97. case NEED_WRAP:
  98. myNetData.clear();
  99. try {
  100. result = engine.wrap(myAppData, myNetData);
  101. handshakeStatus = result.getHandshakeStatus();
  102. } catch (SSLException sslException) {
  103. engine.closeOutbound();
  104. handshakeStatus = engine.getHandshakeStatus();
  105. break;
  106. }
  107. switch (result.getStatus()) {
  108. case OK :
  109. myNetData.flip();
  110. while (myNetData.hasRemaining()) {
  111. socketChannel.write(myNetData);
  112. }
  113. break;
  114. case BUFFER_OVERFLOW:
  115. myNetData = enlargePacketBuffer(engine, myNetData);
  116. break;
  117. case BUFFER_UNDERFLOW:
  118. throw new SSLException("加密后消息内容为空,报错");
  119. case CLOSED:
  120. try {
  121. myNetData.flip();
  122. while (myNetData.hasRemaining()) {
  123. socketChannel.write(myNetData);
  124. }
  125. peerNetData.clear();
  126. } catch (Exception e) {
  127. handshakeStatus = engine.getHandshakeStatus();
  128. }
  129. break;
  130. default:
  131. throw new IllegalStateException("无效的握手状态: " + result.getStatus());
  132. }
  133. break;
  134. case NEED_TASK:
  135. Runnable task;
  136. while ((task = engine.getDelegatedTask()) != null) {
  137. executor.execute(task);
  138. }
  139. handshakeStatus = engine.getHandshakeStatus();
  140. break;
  141. case FINISHED:
  142. break;
  143. case NOT_HANDSHAKING:
  144. break;
  145. default:
  146. throw new IllegalStateException("无效的握手状态: " + handshakeStatus);
  147. }
  148. }
  149. return true;
  150. }
  151. /**
  152. * 参考 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
  153. * 实现的 SSL 的传输读取协议
  154. * @param consumer
  155. * @throws IOException
  156. */
  157. public void read(Consumer consumer) throws IOException {
  158. // BUFFER初始化
  159. peerNetData.clear();
  160. int bytesRead = socketChannel.read(peerNetData);
  161. if (bytesRead > 0) {
  162. peerNetData.flip();
  163. while (peerNetData.hasRemaining()) {
  164. peerAppData.clear();
  165. SSLEngineResult result = engine.unwrap(peerNetData, peerAppData);
  166. switch (result.getStatus()) {
  167. case OK:
  168. log.debug("收到远程的返回结果消息为:" + new String(peerAppData.array(), 0, peerAppData.position()));
  169. consumer.accept(peerAppData);
  170. peerAppData.flip();
  171. break;
  172. case BUFFER_OVERFLOW:
  173. peerAppData = enlargeApplicationBuffer(engine, peerAppData);
  174. break;
  175. case BUFFER_UNDERFLOW:
  176. peerNetData = handleBufferUnderflow(engine, peerNetData);
  177. break;
  178. case CLOSED:
  179. log.debug("收到远程连接关闭消息.");
  180. closeConnection();
  181. return;
  182. default:
  183. throw new IllegalStateException("无效的握手状态: " + result.getStatus());
  184. }
  185. }
  186. } else if (bytesRead < 0) {
  187. log.debug("收到END OF STREAM,关闭连接.");
  188. handleEndOfStream();
  189. }
  190. }
  191. public void write(String message) throws IOException {
  192. write(ByteBuffer.wrap(message.getBytes()));
  193. }
  194. /**
  195. * 参考 https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html
  196. * 实现的 SSL 的传输写入协议
  197. * @param message
  198. * @throws IOException
  199. */
  200. public void write(ByteBuffer message) throws IOException {
  201. myAppData.clear();
  202. myAppData.put(message);
  203. myAppData.flip();
  204. while (myAppData.hasRemaining()) {
  205. myNetData.clear();
  206. SSLEngineResult result = engine.wrap(myAppData, myNetData);
  207. switch (result.getStatus()) {
  208. case OK:
  209. myNetData.flip();
  210. while (myNetData.hasRemaining()) {
  211. socketChannel.write(myNetData);
  212. }
  213. log.debug("写入远程的消息为: {}", message);
  214. break;
  215. case BUFFER_OVERFLOW:
  216. myNetData = enlargePacketBuffer(engine, myNetData);
  217. break;
  218. case BUFFER_UNDERFLOW:
  219. throw new SSLException("加密后消息内容为空.");
  220. case CLOSED:
  221. closeConnection();
  222. return;
  223. default:
  224. throw new IllegalStateException("无效的握手状态: " + result.getStatus());
  225. }
  226. }
  227. }
  228. /**
  229. * 关闭连接
  230. * @throws IOException
  231. */
  232. public void closeConnection() throws IOException {
  233. engine.closeOutbound();
  234. doHandshake();
  235. socketChannel.close();
  236. executor.shutdown();
  237. }
  238. /**
  239. * END OF STREAM(-1)默认是关闭连接
  240. * @throws IOException
  241. */
  242. protected void handleEndOfStream() throws IOException {
  243. try {
  244. engine.closeInbound();
  245. } catch (Exception e) {
  246. log.error("END OF STREAM 关闭失败.", e);
  247. }
  248. closeConnection();
  249. }
  250. }

以上:

  • 基于 SSL 协议,实现统一的握手动作;
  • 分别实现读取的解密,和写入的加密方法;
  • 将 SslSocketChannel 实现为 SocketChannel的Decorator;

SslSocketChannel测试服务端

基于以上封装,简单测试服务端如下:

  1. @Slf4j
  2. public class NioSslServer {
  3. public static void main(String[] args) throws Exception {
  4. NioSslServer sslServer = new NioSslServer("127.0.0.1", 8006);
  5. sslServer.start()
  6. // 使用 curl -vv -k 'https://localhost:8006' 连接
  7. }
  8. private SSLContext context;
  9. private Selector selector;
  10. public NioSslServer(String hostAddress, int port) throws Exception {
  11. // 初始化SSL Context
  12. context = serverSSLContext();
  13. // 注册监听器
  14. selector = SelectorProvider.provider().openSelector();
  15. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  16. serverSocketChannel.configureBlocking(false);
  17. serverSocketChannel.socket().bind(new InetSocketAddress(hostAddress, port));
  18. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  19. }
  20. public void start() throws Exception {
  21. log.debug("等待连接中.");
  22. while (true) {
  23. selector.select();
  24. Iterator selectedKeys = selector.selectedKeys().iterator();
  25. while (selectedKeys.hasNext()) {
  26. SelectionKey key = selectedKeys.next();
  27. selectedKeys.remove();
  28. if (!key.isValid()) {
  29. continue;
  30. }
  31. if (key.isAcceptable()) {
  32. accept(key);
  33. } else if (key.isReadable()) {
  34. ((SslSocketChannel)key.attachment()).read(buf->{});
  35. // 直接回应一个OK
  36. ((SslSocketChannel)key.attachment()).write("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nOK\r\n\r\n");
  37. ((SslSocketChannel)key.attachment()).closeConnection();
  38. }
  39. }
  40. }
  41. }
  42. private void accept(SelectionKey key) throws Exception {
  43. log.debug("接收新的请求.");
  44. SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
  45. socketChannel.configureBlocking(false);
  46. SslSocketChannel sslSocketChannel = new SslSocketChannel(context, socketChannel, false);
  47. if (sslSocketChannel.doHandshake()) {
  48. socketChannel.register(selector, SelectionKey.OP_READ, sslSocketChannel);
  49. } else {
  50. socketChannel.close();
  51. log.debug("握手失败,关闭连接.");
  52. }
  53. }
  54. }

以上:

  • 基于 SSL 协议,实现统一的握手动作;
  • 分别实现读取的解密,和写入的加密方法;
  • 将 SslSocketChannel 实现为 SocketChannel的Decorator;

SslSocketChannel测试服务端

基于以上封装,简单测试服务端如下:

  1. @Slf4j
  2. public class NioSslServer {
  3. public static void main(String[] args) throws Exception {
  4. NioSslServer sslServer = new NioSslServer("127.0.0.1", 8006);
  5. sslServer.start();
  6. // 使用 curl -vv -k 'https://localhost:8006' 连接
  7. }
  8. private SSLContext context;
  9. private Selector selector;
  10. public NioSslServer(String hostAddress, int port) throws Exception {
  11. // 初始化SSL Context
  12. context = serverSSLContext();
  13. // 注册监听器
  14. selector = SelectorProvider.provider().openSelector();
  15. ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  16. serverSocketChannel.configureBlocking(false);
  17. serverSocketChannel.socket().bind(new InetSocketAddress(hostAddress, port));
  18. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
  19. }
  20. public void start() throws Exception {
  21. log.debug("等待连接中.");
  22. while (true) {
  23. selector.select();
  24. Iterator selectedKeys = selector.selectedKeys().iterator();
  25. while (selectedKeys.hasNext()) {
  26. SelectionKey key = selectedKeys.next();
  27. selectedKeys.remove();
  28. if (!key.isValid()) {
  29. continue;
  30. }
  31. if (key.isAcceptable()) {
  32. accept(key);
  33. } else if (key.isReadable()) {
  34. ((SslSocketChannel)key.attachment()).read(buf->{});
  35. // 直接回应一个OK
  36. ((SslSocketChannel)key.attachment()).write("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nOK\r\n\r\n");
  37. ((SslSocketChannel)key.attachment()).closeConnection();
  38. }
  39. }
  40. }
  41. }
  42. private void accept(SelectionKey key) throws Exception {
  43. log.debug("接收新的请求.");
  44. SocketChannel socketChannel = ((ServerSocketChannel)key.channel()).accept();
  45. socketChannel.configureBlocking(false);
  46. SslSocketChannel sslSocketChannel = new SslSocketChannel(context, socketChannel, false);
  47. if (sslSocketChannel.doHandshake()) {
  48. socketChannel.register(selector, SelectionKey.OP_READ, sslSocketChannel);
  49. } else {
  50. socketChannel.close();
  51. log.debug("握手失败,关闭连接.");
  52. }
  53. }
  54. }

以上:

由于是NIO,简单的测试需要用到NIO的基础组件Selector进行测试;

首先初始化ServerSocketChannel,监听8006端口;

接收到请求后,将SocketChannel封装为SslSocketChannel,注册到Selector;

接收到数据后,通过SslSocketChannel做read和write;

以上:

  • 客户端的封装测试,是为了验证封装 SSL 协议双向都是OK的
  • 在后文的非透明上游代理中,会同时使用 SslSocketChannel做服务端和客户端
  • 以上封装与服务端封装类似,不同的是初始化 SocketChannel,做connect而非bind

SslSocketChannel测试客户端

基于以上服务端封装,简单测试客户端如下:

  1. @Slf4j :
  2. public class NioSslClient {
  3. public static void main(String[] args) throws Exception {
  4. NioSslClient sslClient = new NioSslClient("httpbin.org", 443);
  5. sslClient.connect();
  6. // 请求 'https://httpbin.org/get'
  7. }
  8. private String remoteAddress;
  9. private int port;
  10. private SSLEngine engine;
  11. private SocketChannel socketChannel;
  12. private SSLContext context;
  13. /**
  14. * 需要远程的HOST和PORT
  15. * @param remoteAddress
  16. * @param port
  17. * @throws Exception
  18. */
  19. public NioSslClient(String remoteAddress, int port) throws Exception {
  20. this.remoteAddress = remoteAddress;
  21. this.port = port;
  22. context = clientSSLContext();
  23. engine = context.createSSLEngine(remoteAddress, port);
  24. engine.setUseClientMode(true);
  25. }
  26. public boolean connect() throws Exception {
  27. socketChannel = SocketChannel.open();
  28. socketChannel.configureBlocking(false);
  29. socketChannel.connect(new InetSocketAddress(remoteAddress, port));
  30. while (!socketChannel.finishConnect()) {
  31. // 通过REACTOR,不会出现等待情况
  32. //log.debug("连接中..");
  33. }
  34. SslSocketChannel sslSocketChannel = new SslSocketChannel(context, socketChannel, true);
  35. sslSocketChannel.doHandshake();
  36. // 握手完成后,开启SELECTOR
  37. Selector selector = SelectorProvider.provider().openSelector();
  38. socketChannel.register(selector, SelectionKey.OP_READ, sslSocketChannel);
  39. // 写入请求
  40. sslSocketChannel.write("GET /get HTTP/1.1\r\n"
  41. + "Host: httpbin.org:443\r\n"
  42. + "User-Agent: curl/7.62.0\r\n"
  43. + "Accept: */*\r\n"
  44. + "\r\n");
  45. // 读取结果
  46. while (true) {
  47. selector.select();
  48. Iterator selectedKeys = selector.selectedKeys().iterator();
  49. while (selectedKeys.hasNext()) {
  50. SelectionKey key = selectedKeys.next();
  51. selectedKeys.remove();
  52. if (key.isValid() && key.isReadable()) {
  53. ((SslSocketChannel)key.attachment()).read(buf->{
  54. log.info("{}", new String(buf.array(), 0, buf.position()));
  55. });
  56. ((SslSocketChannel)key.attachment()).closeConnection();
  57. return true;
  58. }
  59. }
  60. }
  61. }
  62. }

总结

以上:

  • 非透明代理需要拿到完整的请求数据,可以通过 Decorator模式,聚批实现;
  • 非透明代理需要拿到解密后的HTTPS请求数据,可以通过SslSocketChannel对原始的SocketChannel做封装实现;
  • 最后,拿到请求后,做相应的处理,最终实现非透明的代理。

3.透明上游代理

透明上游代理相比透明代理要简单,区别是:

  • 透明代理需要响应 CONNECT请求,透明上游代理不需要,直接转发即可;
  • .

    透明代理需要解析CONNECT请求中的HOST和PORT,并连接服务端;透明上游代理只需要连接下游代理的IP:PORT,直接转发请求即可;
  • 透明的上游代理,只是一个简单的SocketChannel管道;确定下游的代理服务端,连接转发请求;

只需要对透明代理做以上简单的修改,即可实现透明的上游代理。

4.非透明上游代理

非透明的上游代理,相比非透明的代理要复杂一些。

基于Netty的代理网关设计与实现

以上,分为四个组件:客户端,代理服务(ServerHandler),代理服务(ClientHandler),服务端

  • 如果是HTTP的请求,数据直接通过 客户端<->ServerHandler<->ClientHandler<->服务端,代理网关只需要做简单的请求聚批,就可以应用相应的管理策略;
  • 如果是HTTPS请求,代理作为客户端和服务端的中间人,只能拿到加密的数据;因此,代理网关需要作为HTTPS的服务方与客户端通信;然后作为HTTPS的客户端与服务端通信;
  • 代理作为HTTPS服务方时,需要考虑到其本身是个非透明的代理,需要实现非透明代理相关的协议;
  • 代理作为HTTPS客户端时,需要考虑到其下游是个透明的代理,真正的服务方是客户端请求的服务方;

三、设计与实现

本文需要构建的是非透明上游代理,以下采用NETTY框架给出详细的设计实现。上文将统一代理网关分为两大部分,ServerHandler和ClientHandler,以下

  • 介绍代理网关服务端相关实现;
  • 介绍代理网关客户端相关实现;

1.代理网关服务端

主。要包括

  • 初始化代理网关服务端
  • 初始化服务端处理器
  • 服务端协议升级与处理

初始化代理网关服务

  1. public void start() {
  2. HookedExecutors.newSingleThreadExecutor().submit(() ->{
  3. log.info("开始启动代理服务器,监听端口:{}", auditProxyConfig.getProxyServerPort());
  4. EventLoopGroup bossGroup = new NioEventLoopGroup(auditProxyConfig.getBossThreadCount());
  5. EventLoopGroup workerGroup = new NioEventLoopGroup(auditProxyConfig.getWorkThreadCount());
  6. try {
  7. ServerBootstrap b = new ServerBootstrap();
  8. b.group(bossGroup, workerGroup)
  9. .channel(NioServerSocketChannel.class)
  10. .handler(new LoggingHandler(LogLevel.DEBUG))
  11. .childHandler(new ServerChannelInitializer(auditProxyConfig))
  12. .bind(auditProxyConfig.getProxyServerPort()).sync().channel().closeFuture().sync();
  13. } catch (InterruptedException e) {
  14. log.error("代理服务器被中断.", e);
  15. Thread.currentThread().interrupt();
  16. } finally {
  17. bossGroup.shutdownGracefully();
  18. workerGroup.shutdownGracefully();
  19. }
  20. });
  21. }

代理网关初始化相对简单,

bossGroup线程组,负责接收请求

workerGroup线程组,负责处理接收的请求数据,具体处理逻辑封装在ServerChannelInitializer中。

代理网关服务的请求处理器在 ServerChannelInitializer中定义为:

  1. @Override
  2. protected void initChannel(SocketChannel ch) throws Exception {
  3. ch.pipeline()
  4. .addLast(new HttpRequestDecoder())
  5. .addLast(new HttpObjectAggregator(auditProxyConfig.getMaxRequestSize()))
  6. .addLast(new ServerChannelHandler(auditProxyConfig));
  7. }

首先解析HTTP请求,然后做聚批的处理,最后ServerChannelHandler实现代理网关协议;

代理网关协议:

  • 判定是否是CONNECT请求,如果是,会存储CONNECT请求;暂停读取,发送代理成功的响应,并在回应成功后,升级协议;
  • 升级引擎,本质上是采用SslSocketChannel对原SocketChannel做透明的封装;
  • 最后根据CONNECT请求连接远程服务端;

详细实现为:

  1. @Override
  2. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  3. FullHttpRequest request = (FullHttpRequest)msg;
  4. try {
  5. if (isConnectRequest(request)) {
  6. // CONNECT 请求,存储待处理
  7. saveConnectRequest(ctx, request);
  8. // 禁止读取
  9. ctx.channel().config().setAutoRead(false);
  10. // 发送回应
  11. connectionEstablished(ctx, ctx.newPromise().addListener(future -> {
  12. if (future.isSuccess()) {
  13. // 升级
  14. if (isSslRequest(request) && !isUpgraded(ctx)) {
  15. upgrade(ctx);
  16. }
  17. // 开放消息读取
  18. ctx.channel().config().setAutoRead(true);
  19. ctx.read();
  20. }
  21. }));
  22. } else {
  23. // 其他请求,判定是否已升级
  24. if (!isUpgraded(ctx)) {
  25. // 升级引擎
  26. upgrade(ctx);
  27. }
  28. // 连接远程
  29. connectRemote(ctx, request);
  30. }
  31. } finally {
  32. ctx.fireChannelRead(msg);
  33. }
  34. }

2.代理网关客户端

代理网关服务端需要连接远程服务,进入代理网关客户端部分。

代理网关客户端初始化:

  1. /**
  2. * 初始化远程连接
  3. * @param ctx
  4. * @param httpRequest
  5. */
  6. protected void connectRemote(ChannelHandlerContext ctx, FullHttpRequest httpRequest) {
  7. Bootstrap b = new Bootstrap();
  8. b.group(ctx.channel().eventLoop()) // use the same EventLoop
  9. .channel(ctx.channel().getClass())
  10. .handler(new ClientChannelInitializer(auditProxyConfig, ctx, safeCopy(httpRequest)));
  11. // 动态连接代理
  12. FullHttpRequest originRequest = ctx.channel().attr(CONNECT_REQUEST).get();
  13. if (originRequest == null) {
  14. originRequest = httpRequest;
  15. }
  16. ChannelFuture cf = b.connect(new InetSocketAddress(calculateHost(originRequest), calculatePort(originRequest)));
  17. Channel cch = cf.channel();
  18. ctx.channel().attr(CLIENT_CHANNEL).set(cch);
  19. }

以上:

  • 复用代理网关服务端的workerGroup线程组;
  • 请求和结果的处理封装在ClientChannelInitializer;
  • 连接的远程服务端的HOST和PORT在服务端收到的请求中可以解析到。

代理网关客户端的处理器的初始化逻辑:

  1. @Override
  2. protected void initChannel(SocketChannel ch) throws Exception {
  3. SocketAddress socketAddress = calculateProxy();
  4. if (!Objects.isNull(socketAddress)) {
  5. ch.pipeline().addLast(new HttpProxyHandler(calculateProxy(), auditProxyConfig.getUserName(), auditProxyConfig
  6. .getPassword()));
  7. }
  8. if (isSslRequest()) {
  9. String host = host();
  10. int port = port();
  11. if (StringUtils.isNoneBlank(host) && port > 0) {
  12. ch.pipeline().addLast(new SslHandler(sslEngine(host, port)));
  13. }
  14. }
  15. ch.pipeline().addLast(new ClientChannelHandler(clientContext, httpRequest));
  16. }

以上:

如果下游是代理,那么会采用HttpProxyHandler,经由下游代理与远程服务端通信;

如果当前需要升级为SSL协议,会对SocketChannel做透明的封装,实现SSL通信。

最后,ClientChannelHandler只是简单消息的转发;唯一的不同是,由于代理网关拦截了第一个请求,此时需要将拦截的请求,转发到服务端。

四、其他问题

代理网关实现可能面临的问题:

1.内存问题

代理通常面临的问题是OOM。本文在实现代理网关时保证内存中缓存时当前正在处理的HTTP/HTTPS请求体。内存使用的上限理论上为实时处理的请求数量*请求体的平均大小,HTTP/HTTPS的请求结果,直接使用堆外内存,零拷贝转发。

2.性能问题

性能问题不应提早考虑。本文使用NETTY框架实现的代理网关,内部大量使用堆外内存,零拷贝转发,避免了性能问题。

代理网关一期上线后曾面临一个长连接导致的性能问题,

CLIENT和SERVER建立TCP长连接后(比如,TCP心跳检测),通常要么是CLIENT关闭TCP连接,或者是SERVER关闭;

如果双方长时间占用TCP连接资源而不关闭,就会导致SOCKET资源泄漏;现象是:CPU资源爆满,处理空闲连接;新连接无法建立;

使用IdleStateHandler定时监控空闲的TCP连接,强制关闭;解决了该问题。

五、总结

本文聚焦于统一代理网关的核心,详细介绍了代理相关的技术原理。

代理网关的管理部分,可以在ServerHandler部分维护,也可以在ClientHandler部分维护;

  • ServerHandler可以拦截转换请求
  • ClientHanlder可控制请求的出口

注:本文使用Netty的零拷贝;存储请求以解析处理;但并未实现对RESPONSE的处理;也就是RESPONSE是直接通过网关,此方面避免了常见的代理实现,内存泄漏OOM相关问题;

最后,本文实现代理网关后,针对代理的资源和流经代理网关的请求做了相应的控制,主要包括:

  • 当遇到静态资源的请求时,代理网关会直接请求远程服务端,不会通过下游代理
  • 当请求HEADER中包含地域标识时,代理网关会尽力保证请求打入指定的地域代理,经由地域代理访问远程服务端

本文参考https://docs.oracle.com/javase/8/docs/technotes/guides/security/jsse/JSSERefGuide.html实现 SslSocketChannel,以透明处理HTTP和HTTPS协议。

原文链接:https://zhuanlan.51cto.com/art/202111/692258.htm

延伸 · 阅读

精彩推荐