服务器之家:专注于服务器技术及软件下载分享
分类导航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|

服务器之家 - 编程语言 - JAVA教程 - Java Socket编程实例(三)- TCP服务端线程池

Java Socket编程实例(三)- TCP服务端线程池

2020-05-17 12:17kingxss JAVA教程

这篇文章主要讲解Java Socket编程中TCP服务端线程池的实例,希望能给大家做一个参考。

一、服务端回传服务类:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
 
public class EchoProtocol implements Runnable {
  private static final int BUFSIZE = 32; // Size (in bytes) of I/O buffer
  private Socket clientSocket; // Socket connect to client
  private Logger logger; // Server logger
 
  public EchoProtocol(Socket clientSocket, Logger logger) {
    this.clientSocket = clientSocket;
    this.logger = logger;
  }
 
  public static void handleEchoClient(Socket clientSocket, Logger logger) {
    try {
      // Get the input and output I/O streams from socket
      InputStream in = clientSocket.getInputStream();
      OutputStream out = clientSocket.getOutputStream();
 
      int recvMsgSize; // Size of received message
      int totalBytesEchoed = 0; // Bytes received from client
      byte[] echoBuffer = new byte[BUFSIZE]; // Receive Buffer
      // Receive until client closes connection, indicated by -1
      while ((recvMsgSize = in.read(echoBuffer)) != -1) {
        out.write(echoBuffer, 0, recvMsgSize);
        totalBytesEchoed += recvMsgSize;
      }
 
      logger.info("Client " + clientSocket.getRemoteSocketAddress() + ", echoed " + totalBytesEchoed + " bytes.");
       
    } catch (IOException ex) {
      logger.log(Level.WARNING, "Exception in echo protocol", ex);
    } finally {
      try {
        clientSocket.close();
      } catch (IOException e) {
      }
    }
  }
 
  public void run() {
    handleEchoClient(this.clientSocket, this.logger);
  }
}

二、每个客户端请求都新启一个线程的Tcp服务端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.logging.Logger;
 
public class TCPEchoServerThread {
 
  public static void main(String[] args) throws IOException {
    // Create a server socket to accept client connection requests
    ServerSocket servSock = new ServerSocket(5500);
 
    Logger logger = Logger.getLogger("practical");
 
    // Run forever, accepting and spawning a thread for each connection
    while (true) {
      Socket clntSock = servSock.accept(); // Block waiting for connection
      // Spawn thread to handle new connection
      Thread thread = new Thread(new EchoProtocol(clntSock, logger));
      thread.start();
      logger.info("Created and started Thread " + thread.getName());
    }
    /* NOT REACHED */
  }
}

三、固定线程数的Tcp服务端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.logging.Level;
import java.util.logging.Logger;
 
public class TCPEchoServerPool {
  public static void main(String[] args) throws IOException {
    int threadPoolSize = 3; // Fixed ThreadPoolSize
 
    final ServerSocket servSock = new ServerSocket(5500);
    final Logger logger = Logger.getLogger("practical");
 
    // Spawn a fixed number of threads to service clients
    for (int i = 0; i < threadPoolSize; i++) {
      Thread thread = new Thread() {
        public void run() {
          while (true) {
            try {
              Socket clntSock = servSock.accept(); // Wait for a connection
              EchoProtocol.handleEchoClient(clntSock, logger); // Handle it
            } catch (IOException ex) {
              logger.log(Level.WARNING, "Client accept failed", ex);
            }
          }
        }
      };
      thread.start();
      logger.info("Created and started Thread = " + thread.getName());
    }
  }
}

四、使用线程池(使用Spring的线程次会有队列、最大线程数、最小线程数和超时时间的概念)

1.线程池工具类:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.util.concurrent.*;
 
/**
 * 任务执行者
 *
 * @author Watson Xu
 * @since 1.0.0 <p>2013-6-8 上午10:33:09</p>
 */
public class ThreadPoolTaskExecutor {
 
  private ThreadPoolTaskExecutor() {
 
  }
 
  private static ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactory() {
    int count;
 
    /* 执行器会在需要自行任务而线程池中没有线程的时候来调用该程序。对于callable类型的调用通过封装以后转化为runnable */
    public Thread newThread(Runnable r) {
      count++;
      Thread invokeThread = new Thread(r);
      invokeThread.setName("Courser Thread-" + count);
      invokeThread.setDaemon(false);// //????????????
 
      return invokeThread;
    }
  });
 
  public static void invoke(Runnable task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {
    invoke(task, null, unit, timeout);
  }
 
  public static <T> T invoke(Runnable task, T result, TimeUnit unit, long timeout) throws TimeoutException,
      RuntimeException {
    Future<T> future = executor.submit(task, result);
    T t = null;
    try {
      t = future.get(timeout, unit);
    } catch (TimeoutException e) {
      throw new TimeoutException("Thread invoke timeout ...");
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
    return t;
  }
 
  public static <T> T invoke(Callable<T> task, TimeUnit unit, long timeout) throws TimeoutException, RuntimeException {
    // 这里将任务提交给执行器,任务已经启动,这里是异步的。
    Future<T> future = executor.submit(task);
    // System.out.println("Task aready in thread");
    T t = null;
    try {
      /*
       * 这里的操作是确认任务是否已经完成,有了这个操作以后
       * 1)对invoke()的调用线程变成了等待任务完成状态
       * 2)主线程可以接收子线程的处理结果
       */
      t = future.get(timeout, unit);
    } catch (TimeoutException e) {
      throw new TimeoutException("Thread invoke timeout ...");
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
 
    return t;
  }
}

2.具有伸缩性的Tcp服务端:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
 
import demo.callable.ThreadPoolTaskExecutor;
 
 
public class TCPEchoServerExecutor {
 
  public static void main(String[] args) throws IOException {
    // Create a server socket to accept client connection requests
    ServerSocket servSock = new ServerSocket(5500);
 
    Logger logger = Logger.getLogger("practical");
     
    // Run forever, accepting and spawning threads to service each connection
    while (true) {
      Socket clntSock = servSock.accept(); // Block waiting for connection
      //executorService.submit(new EchoProtocol(clntSock, logger));
      try {
        ThreadPoolTaskExecutor.invoke(new EchoProtocol(clntSock, logger), TimeUnit.SECONDS, 3);
      } catch (Exception e) {
      
      //service.execute(new TimelimitEchoProtocol(clntSock, logger));
    }
    /* NOT REACHED */
  }
}

以上就是本文的全部内容,查看更多Java的语法,也希望大家多多支持服务器之家。

延伸 · 阅读

精彩推荐