本章目标

  • 掌握Socket编程基础
  • 理解TCP和UDP协议编程
  • 学习NIO和AIO编程模型
  • 掌握HTTP客户端编程
  • 了解网络协议处理
  • 学习分布式通信基础

1. Socket编程基础

1.1 TCP Socket编程

// TCP服务器端
public class TCPServer {
    
    public static void main(String[] args) {
        demonstrateBasicTCPServer();
    }
    
    public static void demonstrateBasicTCPServer() {
        System.out.println("=== TCP服务器演示 ===");
        
        try (ServerSocket serverSocket = new ServerSocket(8080)) {
            System.out.println("服务器启动,监听端口: 8080");
            
            while (true) {
                // 接受客户端连接
                Socket clientSocket = serverSocket.accept();
                System.out.println("客户端连接: " + clientSocket.getRemoteSocketAddress());
                
                // 为每个客户端创建处理线程
                Thread clientHandler = new Thread(new ClientHandler(clientSocket));
                clientHandler.start();
            }
            
        } catch (IOException e) {
            System.out.println("服务器异常: " + e.getMessage());
        }
    }
}

// 客户端处理器
class ClientHandler implements Runnable {
    private Socket clientSocket;
    
    public ClientHandler(Socket clientSocket) {
        this.clientSocket = clientSocket;
    }
    
    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
             PrintWriter writer = new PrintWriter(
                clientSocket.getOutputStream(), true)) {
            
            String inputLine;
            while ((inputLine = reader.readLine()) != null) {
                System.out.println("收到消息: " + inputLine);
                
                // 回显消息
                if ("bye".equalsIgnoreCase(inputLine)) {
                    writer.println("再见!");
                    break;
                } else {
                    writer.println("回显: " + inputLine);
                }
            }
            
        } catch (IOException e) {
            System.out.println("处理客户端时发生异常: " + e.getMessage());
        } finally {
            try {
                clientSocket.close();
                System.out.println("客户端连接关闭");
            } catch (IOException e) {
                System.out.println("关闭客户端连接时发生异常: " + e.getMessage());
            }
        }
    }
}
// TCP客户端
public class TCPClient {
    
    public static void main(String[] args) {
        demonstrateBasicTCPClient();
    }
    
    public static void demonstrateBasicTCPClient() {
        System.out.println("=== TCP客户端演示 ===");
        
        try (Socket socket = new Socket("localhost", 8080);
             BufferedReader reader = new BufferedReader(
                new InputStreamReader(socket.getInputStream()));
             PrintWriter writer = new PrintWriter(
                socket.getOutputStream(), true);
             Scanner scanner = new Scanner(System.in)) {
            
            System.out.println("连接到服务器: " + socket.getRemoteSocketAddress());
            System.out.println("输入消息 (输入 'bye' 退出):");
            
            String userInput;
            while ((userInput = scanner.nextLine()) != null) {
                // 发送消息
                writer.println(userInput);
                
                // 接收回复
                String response = reader.readLine();
                System.out.println("服务器回复: " + response);
                
                if ("bye".equalsIgnoreCase(userInput)) {
                    break;
                }
            }
            
        } catch (IOException e) {
            System.out.println("客户端异常: " + e.getMessage());
        }
    }
}

1.2 UDP Socket编程

// UDP服务器端
public class UDPServer {
    
    public static void main(String[] args) {
        demonstrateBasicUDPServer();
    }
    
    public static void demonstrateBasicUDPServer() {
        System.out.println("=== UDP服务器演示 ===");
        
        try (DatagramSocket socket = new DatagramSocket(9090)) {
            System.out.println("UDP服务器启动,监听端口: 9090");
            
            byte[] buffer = new byte[1024];
            
            while (true) {
                // 接收数据包
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                socket.receive(packet);
                
                String receivedMessage = new String(packet.getData(), 0, packet.getLength());
                System.out.println("收到来自 " + packet.getAddress() + ":" + 
                                 packet.getPort() + " 的消息: " + receivedMessage);
                
                // 发送回复
                String response = "回显: " + receivedMessage;
                byte[] responseData = response.getBytes();
                
                DatagramPacket responsePacket = new DatagramPacket(
                    responseData, responseData.length, 
                    packet.getAddress(), packet.getPort());
                
                socket.send(responsePacket);
                
                if ("bye".equalsIgnoreCase(receivedMessage)) {
                    System.out.println("收到退出信号,服务器关闭");
                    break;
                }
            }
            
        } catch (IOException e) {
            System.out.println("UDP服务器异常: " + e.getMessage());
        }
    }
}
// UDP客户端
public class UDPClient {
    
    public static void main(String[] args) {
        demonstrateBasicUDPClient();
    }
    
    public static void demonstrateBasicUDPClient() {
        System.out.println("=== UDP客户端演示 ===");
        
        try (DatagramSocket socket = new DatagramSocket();
             Scanner scanner = new Scanner(System.in)) {
            
            InetAddress serverAddress = InetAddress.getByName("localhost");
            int serverPort = 9090;
            
            System.out.println("连接到UDP服务器: " + serverAddress + ":" + serverPort);
            System.out.println("输入消息 (输入 'bye' 退出):");
            
            String userInput;
            while ((userInput = scanner.nextLine()) != null) {
                // 发送数据包
                byte[] data = userInput.getBytes();
                DatagramPacket packet = new DatagramPacket(
                    data, data.length, serverAddress, serverPort);
                socket.send(packet);
                
                // 接收回复
                byte[] buffer = new byte[1024];
                DatagramPacket responsePacket = new DatagramPacket(buffer, buffer.length);
                socket.receive(responsePacket);
                
                String response = new String(responsePacket.getData(), 0, 
                                           responsePacket.getLength());
                System.out.println("服务器回复: " + response);
                
                if ("bye".equalsIgnoreCase(userInput)) {
                    break;
                }
            }
            
        } catch (IOException e) {
            System.out.println("UDP客户端异常: " + e.getMessage());
        }
    }
}

1.3 多线程服务器

// 多线程TCP服务器
public class MultiThreadTCPServer {
    private static final int PORT = 8081;
    private static final int MAX_CLIENTS = 10;
    
    public static void main(String[] args) {
        demonstrateMultiThreadServer();
    }
    
    public static void demonstrateMultiThreadServer() {
        System.out.println("=== 多线程TCP服务器演示 ===");
        
        ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENTS);
        
        try (ServerSocket serverSocket = new ServerSocket(PORT)) {
            System.out.println("多线程服务器启动,监听端口: " + PORT);
            System.out.println("最大客户端连接数: " + MAX_CLIENTS);
            
            while (true) {
                Socket clientSocket = serverSocket.accept();
                System.out.println("新客户端连接: " + clientSocket.getRemoteSocketAddress());
                
                // 提交到线程池处理
                executor.submit(new EnhancedClientHandler(clientSocket));
            }
            
        } catch (IOException e) {
            System.out.println("服务器异常: " + e.getMessage());
        } finally {
            executor.shutdown();
        }
    }
}

// 增强的客户端处理器
class EnhancedClientHandler implements Runnable {
    private Socket clientSocket;
    private String clientId;
    
    public EnhancedClientHandler(Socket clientSocket) {
        this.clientSocket = clientSocket;
        this.clientId = "Client-" + clientSocket.getRemoteSocketAddress();
    }
    
    @Override
    public void run() {
        System.out.println(clientId + " 开始处理");
        
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
             PrintWriter writer = new PrintWriter(
                clientSocket.getOutputStream(), true)) {
            
            // 发送欢迎消息
            writer.println("欢迎连接到多线程服务器! 你的ID是: " + clientId);
            writer.println("支持的命令: time, echo <message>, calc <expression>, bye");
            
            String inputLine;
            while ((inputLine = reader.readLine()) != null) {
                System.out.println(clientId + " 发送: " + inputLine);
                
                String response = processCommand(inputLine);
                writer.println(response);
                
                if ("bye".equalsIgnoreCase(inputLine)) {
                    break;
                }
            }
            
        } catch (IOException e) {
            System.out.println(clientId + " 处理异常: " + e.getMessage());
        } finally {
            try {
                clientSocket.close();
                System.out.println(clientId + " 连接关闭");
            } catch (IOException e) {
                System.out.println(clientId + " 关闭连接时异常: " + e.getMessage());
            }
        }
    }
    
    private String processCommand(String command) {
        if (command == null || command.trim().isEmpty()) {
            return "错误: 空命令";
        }
        
        command = command.trim();
        
        if ("time".equalsIgnoreCase(command)) {
            return "当前时间: " + new Date();
        } else if (command.toLowerCase().startsWith("echo ")) {
            return "回显: " + command.substring(5);
        } else if (command.toLowerCase().startsWith("calc ")) {
            return calculateExpression(command.substring(5));
        } else if ("bye".equalsIgnoreCase(command)) {
            return "再见! 感谢使用服务器";
        } else {
            return "未知命令: " + command + ". 支持的命令: time, echo, calc, bye";
        }
    }
    
    private String calculateExpression(String expression) {
        try {
            // 简单的计算器实现(仅支持基本运算)
            expression = expression.trim();
            
            if (expression.contains("+")) {
                String[] parts = expression.split("\\+");
                double result = Double.parseDouble(parts[0].trim()) + 
                               Double.parseDouble(parts[1].trim());
                return "计算结果: " + result;
            } else if (expression.contains("-")) {
                String[] parts = expression.split("-");
                double result = Double.parseDouble(parts[0].trim()) - 
                               Double.parseDouble(parts[1].trim());
                return "计算结果: " + result;
            } else if (expression.contains("*")) {
                String[] parts = expression.split("\\*");
                double result = Double.parseDouble(parts[0].trim()) * 
                               Double.parseDouble(parts[1].trim());
                return "计算结果: " + result;
            } else if (expression.contains("/")) {
                String[] parts = expression.split("/");
                double divisor = Double.parseDouble(parts[1].trim());
                if (divisor == 0) {
                    return "错误: 除数不能为零";
                }
                double result = Double.parseDouble(parts[0].trim()) / divisor;
                return "计算结果: " + result;
            } else {
                return "错误: 不支持的表达式格式";
            }
        } catch (Exception e) {
            return "计算错误: " + e.getMessage();
        }
    }
}

2. NIO编程

2.1 NIO基础概念

// NIO基础演示
public class NIOBasicsDemo {
    
    public static void main(String[] args) {
        demonstrateBuffer();
        demonstrateChannel();
        demonstrateSelector();
    }
    
    // Buffer操作演示
    public static void demonstrateBuffer() {
        System.out.println("=== Buffer操作演示 ===");
        
        // ByteBuffer操作
        ByteBuffer buffer = ByteBuffer.allocate(10);
        System.out.println("初始状态 - position: " + buffer.position() + 
                         ", limit: " + buffer.limit() + 
                         ", capacity: " + buffer.capacity());
        
        // 写入数据
        buffer.put("Hello".getBytes());
        System.out.println("写入后 - position: " + buffer.position() + 
                         ", limit: " + buffer.limit());
        
        // 切换到读模式
        buffer.flip();
        System.out.println("flip后 - position: " + buffer.position() + 
                         ", limit: " + buffer.limit());
        
        // 读取数据
        byte[] data = new byte[buffer.remaining()];
        buffer.get(data);
        System.out.println("读取的数据: " + new String(data));
        
        // 清空缓冲区
        buffer.clear();
        System.out.println("clear后 - position: " + buffer.position() + 
                         ", limit: " + buffer.limit());
        
        System.out.println();
    }
    
    // Channel操作演示
    public static void demonstrateChannel() {
        System.out.println("=== Channel操作演示 ===");
        
        try {
            // 文件Channel操作
            Path path = Paths.get("nio_test.txt");
            
            // 写入文件
            try (FileChannel writeChannel = FileChannel.open(path, 
                    StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                buffer.put("这是NIO Channel写入的数据\n".getBytes());
                buffer.put("支持高效的文件操作\n".getBytes());
                
                buffer.flip();
                writeChannel.write(buffer);
                
                System.out.println("数据写入文件: " + path);
            }
            
            // 读取文件
            try (FileChannel readChannel = FileChannel.open(path, 
                    StandardOpenOption.READ)) {
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int bytesRead = readChannel.read(buffer);
                
                buffer.flip();
                byte[] data = new byte[buffer.remaining()];
                buffer.get(data);
                
                System.out.println("从文件读取的数据:");
                System.out.println(new String(data));
                System.out.println("读取字节数: " + bytesRead);
            }
            
            // 清理测试文件
            Files.deleteIfExists(path);
            
        } catch (IOException e) {
            System.out.println("Channel操作异常: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    // Selector操作演示
    public static void demonstrateSelector() {
        System.out.println("=== Selector操作演示 ===");
        
        try {
            Selector selector = Selector.open();
            
            // 创建ServerSocketChannel
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.bind(new InetSocketAddress(8082));
            
            // 注册到Selector
            SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            
            System.out.println("NIO服务器启动,监听端口: 8082");
            System.out.println("Selector注册的Channel数量: " + selector.keys().size());
            
            // 模拟选择操作(实际应用中会在循环中进行)
            int readyChannels = selector.selectNow();
            System.out.println("就绪的Channel数量: " + readyChannels);
            
            // 清理资源
            serverChannel.close();
            selector.close();
            
        } catch (IOException e) {
            System.out.println("Selector操作异常: " + e.getMessage());
        }
        
        System.out.println();
    }
}

2.2 NIO服务器实现

// NIO服务器
public class NIOServer {
    private static final int PORT = 8083;
    private Selector selector;
    private ServerSocketChannel serverChannel;
    
    public static void main(String[] args) {
        NIOServer server = new NIOServer();
        server.start();
    }
    
    public void start() {
        try {
            // 初始化
            selector = Selector.open();
            serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.bind(new InetSocketAddress(PORT));
            
            // 注册接受连接事件
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            
            System.out.println("NIO服务器启动,监听端口: " + PORT);
            
            // 事件循环
            while (true) {
                // 阻塞等待事件
                int readyChannels = selector.select();
                
                if (readyChannels == 0) {
                    continue;
                }
                
                // 处理就绪的事件
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
                
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();
                    
                    try {
                        if (key.isAcceptable()) {
                            handleAccept(key);
                        } else if (key.isReadable()) {
                            handleRead(key);
                        } else if (key.isWritable()) {
                            handleWrite(key);
                        }
                    } catch (IOException e) {
                        System.out.println("处理事件时发生异常: " + e.getMessage());
                        key.cancel();
                        key.channel().close();
                    }
                }
            }
            
        } catch (IOException e) {
            System.out.println("服务器启动异常: " + e.getMessage());
        } finally {
            cleanup();
        }
    }
    
    private void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
        SocketChannel clientChannel = serverChannel.accept();
        
        if (clientChannel != null) {
            clientChannel.configureBlocking(false);
            
            // 为客户端分配缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            clientChannel.register(selector, SelectionKey.OP_READ, buffer);
            
            System.out.println("新客户端连接: " + clientChannel.getRemoteAddress());
        }
    }
    
    private void handleRead(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        
        int bytesRead = clientChannel.read(buffer);
        
        if (bytesRead > 0) {
            buffer.flip();
            
            // 读取消息
            byte[] data = new byte[buffer.remaining()];
            buffer.get(data);
            String message = new String(data).trim();
            
            System.out.println("收到消息: " + message + " 来自: " + 
                             clientChannel.getRemoteAddress());
            
            // 准备回复
            String response;
            if ("bye".equalsIgnoreCase(message)) {
                response = "再见!";
            } else {
                response = "回显: " + message;
            }
            
            // 将回复写入缓冲区
            buffer.clear();
            buffer.put(response.getBytes());
            buffer.flip();
            
            // 注册写事件
            key.interestOps(SelectionKey.OP_WRITE);
            
        } else if (bytesRead == -1) {
            // 客户端断开连接
            System.out.println("客户端断开连接: " + clientChannel.getRemoteAddress());
            key.cancel();
            clientChannel.close();
        }
    }
    
    private void handleWrite(SelectionKey key) throws IOException {
        SocketChannel clientChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        
        clientChannel.write(buffer);
        
        if (!buffer.hasRemaining()) {
            // 写完后切换回读模式
            buffer.clear();
            key.interestOps(SelectionKey.OP_READ);
        }
    }
    
    private void cleanup() {
        try {
            if (selector != null) {
                selector.close();
            }
            if (serverChannel != null) {
                serverChannel.close();
            }
        } catch (IOException e) {
            System.out.println("清理资源时发生异常: " + e.getMessage());
        }
    }
}

2.3 NIO客户端实现

// NIO客户端
public class NIOClient {
    private static final String HOST = "localhost";
    private static final int PORT = 8083;
    
    public static void main(String[] args) {
        NIOClient client = new NIOClient();
        client.start();
    }
    
    public void start() {
        try (SocketChannel channel = SocketChannel.open();
             Scanner scanner = new Scanner(System.in)) {
            
            // 连接服务器
            channel.connect(new InetSocketAddress(HOST, PORT));
            channel.configureBlocking(false);
            
            System.out.println("连接到NIO服务器: " + HOST + ":" + PORT);
            System.out.println("输入消息 (输入 'bye' 退出):");
            
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            
            while (true) {
                System.out.print("> ");
                String input = scanner.nextLine();
                
                if (input == null || input.trim().isEmpty()) {
                    continue;
                }
                
                // 发送消息
                buffer.clear();
                buffer.put(input.getBytes());
                buffer.flip();
                
                while (buffer.hasRemaining()) {
                    channel.write(buffer);
                }
                
                // 接收回复
                buffer.clear();
                
                // 等待数据可读
                while (true) {
                    int bytesRead = channel.read(buffer);
                    if (bytesRead > 0) {
                        buffer.flip();
                        byte[] data = new byte[buffer.remaining()];
                        buffer.get(data);
                        System.out.println("服务器回复: " + new String(data));
                        break;
                    } else if (bytesRead == 0) {
                        // 暂时没有数据,稍等片刻
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            return;
                        }
                    } else {
                        // 连接关闭
                        System.out.println("服务器关闭连接");
                        return;
                    }
                }
                
                if ("bye".equalsIgnoreCase(input)) {
                    break;
                }
            }
            
        } catch (IOException e) {
            System.out.println("客户端异常: " + e.getMessage());
        }
    }
}

3. HTTP客户端编程

3.1 使用HttpURLConnection

// HTTP客户端演示
public class HTTPClientDemo {
    
    public static void main(String[] args) {
        demonstrateHttpURLConnection();
        demonstrateHttpClient();
    }
    
    // HttpURLConnection演示
    public static void demonstrateHttpURLConnection() {
        System.out.println("=== HttpURLConnection演示 ===");
        
        try {
            // GET请求
            performGetRequest();
            
            // POST请求
            performPostRequest();
            
        } catch (Exception e) {
            System.out.println("HTTP请求异常: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    private static void performGetRequest() throws IOException {
        System.out.println("--- GET请求演示 ---");
        
        URL url = new URL("https://httpbin.org/get?param1=value1&param2=value2");
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        
        // 设置请求方法和属性
        connection.setRequestMethod("GET");
        connection.setRequestProperty("User-Agent", "Java-HTTP-Client/1.0");
        connection.setRequestProperty("Accept", "application/json");
        connection.setConnectTimeout(5000);
        connection.setReadTimeout(5000);
        
        // 获取响应
        int responseCode = connection.getResponseCode();
        System.out.println("响应码: " + responseCode);
        
        // 读取响应头
        System.out.println("响应头:");
        for (Map.Entry<String, List<String>> header : connection.getHeaderFields().entrySet()) {
            System.out.println("  " + header.getKey() + ": " + header.getValue());
        }
        
        // 读取响应体
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(connection.getInputStream()))) {
            
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = reader.readLine()) != null) {
                response.append(line).append("\n");
            }
            
            System.out.println("响应体:");
            System.out.println(response.toString());
        }
        
        connection.disconnect();
    }
    
    private static void performPostRequest() throws IOException {
        System.out.println("--- POST请求演示 ---");
        
        URL url = new URL("https://httpbin.org/post");
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        
        // 设置请求方法和属性
        connection.setRequestMethod("POST");
        connection.setRequestProperty("Content-Type", "application/json");
        connection.setRequestProperty("Accept", "application/json");
        connection.setDoOutput(true);
        
        // 准备请求体
        String jsonInputString = "{\"name\": \"张三\", \"age\": 25, \"city\": \"北京\"}";
        
        // 发送请求体
        try (OutputStream os = connection.getOutputStream()) {
            byte[] input = jsonInputString.getBytes("utf-8");
            os.write(input, 0, input.length);
        }
        
        // 获取响应
        int responseCode = connection.getResponseCode();
        System.out.println("响应码: " + responseCode);
        
        // 读取响应体
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(connection.getInputStream()))) {
            
            StringBuilder response = new StringBuilder();
            String line;
            while ((line = reader.readLine()) != null) {
                response.append(line).append("\n");
            }
            
            System.out.println("响应体:");
            System.out.println(response.toString());
        }
        
        connection.disconnect();
    }
    
    // Java 11+ HttpClient演示
    public static void demonstrateHttpClient() {
        System.out.println("=== Java 11+ HttpClient演示 ===");
        
        try {
            // 创建HttpClient
            HttpClient client = HttpClient.newBuilder()
                .connectTimeout(Duration.ofSeconds(10))
                .build();
            
            // GET请求
            HttpRequest getRequest = HttpRequest.newBuilder()
                .uri(URI.create("https://httpbin.org/get"))
                .header("User-Agent", "Java-HttpClient/11")
                .GET()
                .build();
            
            HttpResponse<String> getResponse = client.send(getRequest, 
                HttpResponse.BodyHandlers.ofString());
            
            System.out.println("GET响应码: " + getResponse.statusCode());
            System.out.println("GET响应体: " + getResponse.body());
            
            // POST请求
            String jsonData = "{\"message\": \"Hello from Java HttpClient\"}";
            
            HttpRequest postRequest = HttpRequest.newBuilder()
                .uri(URI.create("https://httpbin.org/post"))
                .header("Content-Type", "application/json")
                .POST(HttpRequest.BodyPublishers.ofString(jsonData))
                .build();
            
            HttpResponse<String> postResponse = client.send(postRequest, 
                HttpResponse.BodyHandlers.ofString());
            
            System.out.println("POST响应码: " + postResponse.statusCode());
            System.out.println("POST响应体: " + postResponse.body());
            
            // 异步请求
            CompletableFuture<HttpResponse<String>> asyncResponse = 
                client.sendAsync(getRequest, HttpResponse.BodyHandlers.ofString());
            
            asyncResponse.thenAccept(response -> {
                System.out.println("异步响应码: " + response.statusCode());
                System.out.println("异步响应体长度: " + response.body().length());
            }).join();
            
        } catch (Exception e) {
            System.out.println("HttpClient异常: " + e.getMessage());
        }
    }
}

3.2 RESTful API客户端

// RESTful API客户端
public class RESTClient {
    private final HttpClient httpClient;
    private final String baseUrl;
    
    public RESTClient(String baseUrl) {
        this.baseUrl = baseUrl;
        this.httpClient = HttpClient.newBuilder()
            .connectTimeout(Duration.ofSeconds(10))
            .build();
    }
    
    public static void main(String[] args) {
        demonstrateRESTClient();
    }
    
    public static void demonstrateRESTClient() {
        System.out.println("=== RESTful API客户端演示 ===");
        
        RESTClient client = new RESTClient("https://jsonplaceholder.typicode.com");
        
        try {
            // GET - 获取所有用户
            System.out.println("--- 获取所有用户 ---");
            String users = client.get("/users");
            System.out.println("用户列表: " + users.substring(0, Math.min(200, users.length())) + "...");
            
            // GET - 获取特定用户
            System.out.println("\n--- 获取用户ID=1 ---");
            String user = client.get("/users/1");
            System.out.println("用户详情: " + user);
            
            // POST - 创建新用户
            System.out.println("\n--- 创建新用户 ---");
            String newUserJson = "{\"name\": \"张三\", \"email\": \"zhangsan@example.com\"}";
            String createdUser = client.post("/users", newUserJson);
            System.out.println("创建的用户: " + createdUser);
            
            // PUT - 更新用户
            System.out.println("\n--- 更新用户 ---");
            String updateUserJson = "{\"id\": 1, \"name\": \"李四\", \"email\": \"lisi@example.com\"}";
            String updatedUser = client.put("/users/1", updateUserJson);
            System.out.println("更新的用户: " + updatedUser);
            
            // DELETE - 删除用户
            System.out.println("\n--- 删除用户 ---");
            int deleteStatus = client.delete("/users/1");
            System.out.println("删除状态码: " + deleteStatus);
            
        } catch (Exception e) {
            System.out.println("REST客户端异常: " + e.getMessage());
        }
    }
    
    public String get(String path) throws IOException, InterruptedException {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(baseUrl + path))
            .header("Accept", "application/json")
            .GET()
            .build();
        
        HttpResponse<String> response = httpClient.send(request, 
            HttpResponse.BodyHandlers.ofString());
        
        if (response.statusCode() >= 200 && response.statusCode() < 300) {
            return response.body();
        } else {
            throw new IOException("HTTP错误: " + response.statusCode());
        }
    }
    
    public String post(String path, String jsonBody) throws IOException, InterruptedException {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(baseUrl + path))
            .header("Content-Type", "application/json")
            .header("Accept", "application/json")
            .POST(HttpRequest.BodyPublishers.ofString(jsonBody))
            .build();
        
        HttpResponse<String> response = httpClient.send(request, 
            HttpResponse.BodyHandlers.ofString());
        
        return response.body();
    }
    
    public String put(String path, String jsonBody) throws IOException, InterruptedException {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(baseUrl + path))
            .header("Content-Type", "application/json")
            .header("Accept", "application/json")
            .PUT(HttpRequest.BodyPublishers.ofString(jsonBody))
            .build();
        
        HttpResponse<String> response = httpClient.send(request, 
            HttpResponse.BodyHandlers.ofString());
        
        return response.body();
    }
    
    public int delete(String path) throws IOException, InterruptedException {
        HttpRequest request = HttpRequest.newBuilder()
            .uri(URI.create(baseUrl + path))
            .DELETE()
            .build();
        
        HttpResponse<String> response = httpClient.send(request, 
            HttpResponse.BodyHandlers.ofString());
        
        return response.statusCode();
    }
}

4. AIO编程(异步I/O)

4.1 AIO基础概念

// AIO服务器演示
public class AIOServer {
    private static final int PORT = 8084;
    
    public static void main(String[] args) {
        demonstrateAIOServer();
    }
    
    public static void demonstrateAIOServer() {
        System.out.println("=== AIO服务器演示 ===");
        
        try {
            AsynchronousServerSocketChannel serverChannel = 
                AsynchronousServerSocketChannel.open();
            serverChannel.bind(new InetSocketAddress(PORT));
            
            System.out.println("AIO服务器启动,监听端口: " + PORT);
            
            // 异步接受连接
            serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
                @Override
                public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
                    // 继续接受下一个连接
                    serverChannel.accept(null, this);
                    
                    // 处理当前连接
                    handleClient(clientChannel);
                }
                
                @Override
                public void failed(Throwable exc, Void attachment) {
                    System.out.println("接受连接失败: " + exc.getMessage());
                }
            });
            
            // 保持服务器运行
            System.out.println("按Enter键停止服务器...");
            System.in.read();
            
            serverChannel.close();
            
        } catch (IOException e) {
            System.out.println("AIO服务器异常: " + e.getMessage());
        }
    }
    
    private static void handleClient(AsynchronousSocketChannel clientChannel) {
        try {
            System.out.println("新客户端连接: " + clientChannel.getRemoteAddress());
            
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            
            // 异步读取数据
            clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
                @Override
                public void completed(Integer bytesRead, ByteBuffer buffer) {
                    if (bytesRead > 0) {
                        buffer.flip();
                        
                        // 读取消息
                        byte[] data = new byte[buffer.remaining()];
                        buffer.get(data);
                        String message = new String(data).trim();
                        
                        System.out.println("收到消息: " + message);
                        
                        // 准备回复
                        String response = "回显: " + message;
                        ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
                        
                        // 异步写入回复
                        clientChannel.write(responseBuffer, responseBuffer, 
                            new CompletionHandler<Integer, ByteBuffer>() {
                                @Override
                                public void completed(Integer bytesWritten, ByteBuffer buffer) {
                                    System.out.println("回复发送完成,字节数: " + bytesWritten);
                                    
                                    if (!"bye".equalsIgnoreCase(message)) {
                                        // 继续读取下一条消息
                                        ByteBuffer nextBuffer = ByteBuffer.allocate(1024);
                                        clientChannel.read(nextBuffer, nextBuffer, this);
                                    } else {
                                        // 关闭连接
                                        try {
                                            clientChannel.close();
                                            System.out.println("客户端连接关闭");
                                        } catch (IOException e) {
                                            System.out.println("关闭连接异常: " + e.getMessage());
                                        }
                                    }
                                }
                                
                                @Override
                                public void failed(Throwable exc, ByteBuffer buffer) {
                                    System.out.println("写入失败: " + exc.getMessage());
                                }
                            });
                    } else if (bytesRead == -1) {
                        // 客户端断开连接
                        try {
                            clientChannel.close();
                            System.out.println("客户端断开连接");
                        } catch (IOException e) {
                            System.out.println("关闭连接异常: " + e.getMessage());
                        }
                    }
                }
                
                @Override
                public void failed(Throwable exc, ByteBuffer buffer) {
                    System.out.println("读取失败: " + exc.getMessage());
                }
            });
            
        } catch (IOException e) {
            System.out.println("处理客户端异常: " + e.getMessage());
        }
    }
}

4.2 AIO客户端

// AIO客户端演示
public class AIOClient {
    private static final String HOST = "localhost";
    private static final int PORT = 8084;
    
    public static void main(String[] args) {
        demonstrateAIOClient();
    }
    
    public static void demonstrateAIOClient() {
        System.out.println("=== AIO客户端演示 ===");
        
        try {
            AsynchronousSocketChannel clientChannel = AsynchronousSocketChannel.open();
            
            // 异步连接服务器
            Future<Void> connectFuture = clientChannel.connect(
                new InetSocketAddress(HOST, PORT));
            connectFuture.get(); // 等待连接完成
            
            System.out.println("连接到AIO服务器: " + HOST + ":" + PORT);
            
            Scanner scanner = new Scanner(System.in);
            System.out.println("输入消息 (输入 'bye' 退出):");
            
            while (true) {
                System.out.print("> ");
                String input = scanner.nextLine();
                
                if (input == null || input.trim().isEmpty()) {
                    continue;
                }
                
                // 发送消息
                ByteBuffer sendBuffer = ByteBuffer.wrap(input.getBytes());
                Future<Integer> writeFuture = clientChannel.write(sendBuffer);
                writeFuture.get(); // 等待写入完成
                
                // 接收回复
                ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
                Future<Integer> readFuture = clientChannel.read(receiveBuffer);
                int bytesRead = readFuture.get(); // 等待读取完成
                
                if (bytesRead > 0) {
                    receiveBuffer.flip();
                    byte[] data = new byte[receiveBuffer.remaining()];
                    receiveBuffer.get(data);
                    System.out.println("服务器回复: " + new String(data));
                }
                
                if ("bye".equalsIgnoreCase(input)) {
                    break;
                }
            }
            
            clientChannel.close();
            scanner.close();
            
        } catch (Exception e) {
            System.out.println("AIO客户端异常: " + e.getMessage());
        }
    }
}

5. WebSocket编程

5.1 WebSocket服务器

// 简单的WebSocket服务器实现
public class SimpleWebSocketServer {
    private static final int PORT = 8085;
    
    public static void main(String[] args) {
        demonstrateWebSocketServer();
    }
    
    public static void demonstrateWebSocketServer() {
        System.out.println("=== WebSocket服务器演示 ===");
        
        try (ServerSocket serverSocket = new ServerSocket(PORT)) {
            System.out.println("WebSocket服务器启动,监听端口: " + PORT);
            
            while (true) {
                Socket clientSocket = serverSocket.accept();
                System.out.println("新客户端连接: " + clientSocket.getRemoteSocketAddress());
                
                // 为每个客户端创建处理线程
                Thread clientHandler = new Thread(new WebSocketHandler(clientSocket));
                clientHandler.start();
            }
            
        } catch (IOException e) {
            System.out.println("WebSocket服务器异常: " + e.getMessage());
        }
    }
}

// WebSocket处理器
class WebSocketHandler implements Runnable {
    private Socket clientSocket;
    private boolean isWebSocketUpgraded = false;
    
    public WebSocketHandler(Socket clientSocket) {
        this.clientSocket = clientSocket;
    }
    
    @Override
    public void run() {
        try (BufferedReader reader = new BufferedReader(
                new InputStreamReader(clientSocket.getInputStream()));
             OutputStream outputStream = clientSocket.getOutputStream()) {
            
            // 读取HTTP请求头
            String line;
            StringBuilder requestHeaders = new StringBuilder();
            String webSocketKey = null;
            
            while ((line = reader.readLine()) != null && !line.isEmpty()) {
                requestHeaders.append(line).append("\n");
                
                if (line.startsWith("Sec-WebSocket-Key:")) {
                    webSocketKey = line.substring("Sec-WebSocket-Key:".length()).trim();
                }
            }
            
            System.out.println("收到HTTP请求:");
            System.out.println(requestHeaders.toString());
            
            if (webSocketKey != null) {
                // 执行WebSocket握手
                performWebSocketHandshake(outputStream, webSocketKey);
                isWebSocketUpgraded = true;
                
                // 处理WebSocket消息
                handleWebSocketMessages(clientSocket.getInputStream(), outputStream);
            } else {
                // 返回HTTP响应
                sendHttpResponse(outputStream);
            }
            
        } catch (Exception e) {
            System.out.println("WebSocket处理异常: " + e.getMessage());
        } finally {
            try {
                clientSocket.close();
                System.out.println("客户端连接关闭");
            } catch (IOException e) {
                System.out.println("关闭连接异常: " + e.getMessage());
            }
        }
    }
    
    private void performWebSocketHandshake(OutputStream outputStream, String webSocketKey) 
            throws Exception {
        // WebSocket握手响应
        String acceptKey = generateWebSocketAcceptKey(webSocketKey);
        
        String response = "HTTP/1.1 101 Switching Protocols\r\n" +
                         "Upgrade: websocket\r\n" +
                         "Connection: Upgrade\r\n" +
                         "Sec-WebSocket-Accept: " + acceptKey + "\r\n" +
                         "\r\n";
        
        outputStream.write(response.getBytes());
        outputStream.flush();
        
        System.out.println("WebSocket握手完成");
    }
    
    private String generateWebSocketAcceptKey(String webSocketKey) throws Exception {
        String magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
        String combined = webSocketKey + magic;
        
        MessageDigest digest = MessageDigest.getInstance("SHA-1");
        byte[] hash = digest.digest(combined.getBytes());
        
        return Base64.getEncoder().encodeToString(hash);
    }
    
    private void handleWebSocketMessages(InputStream inputStream, OutputStream outputStream) 
            throws IOException {
        byte[] buffer = new byte[1024];
        
        while (true) {
            int bytesRead = inputStream.read(buffer);
            if (bytesRead == -1) {
                break;
            }
            
            // 简化的WebSocket帧解析(仅处理文本消息)
            if (bytesRead >= 2) {
                boolean fin = (buffer[0] & 0x80) != 0;
                int opcode = buffer[0] & 0x0F;
                boolean masked = (buffer[1] & 0x80) != 0;
                int payloadLength = buffer[1] & 0x7F;
                
                if (opcode == 1 && masked) { // 文本帧且已掩码
                    int maskStart = 2;
                    if (payloadLength == 126) {
                        maskStart = 4;
                        payloadLength = ((buffer[2] & 0xFF) << 8) | (buffer[3] & 0xFF);
                    } else if (payloadLength == 127) {
                        maskStart = 10;
                        // 简化处理,不支持超长消息
                    }
                    
                    byte[] mask = new byte[4];
                    System.arraycopy(buffer, maskStart, mask, 0, 4);
                    
                    byte[] payload = new byte[payloadLength];
                    System.arraycopy(buffer, maskStart + 4, payload, 0, payloadLength);
                    
                    // 解除掩码
                    for (int i = 0; i < payloadLength; i++) {
                        payload[i] ^= mask[i % 4];
                    }
                    
                    String message = new String(payload);
                    System.out.println("收到WebSocket消息: " + message);
                    
                    // 发送回复
                    String response = "回显: " + message;
                    sendWebSocketMessage(outputStream, response);
                    
                    if ("bye".equalsIgnoreCase(message)) {
                        break;
                    }
                }
            }
        }
    }
    
    private void sendWebSocketMessage(OutputStream outputStream, String message) 
            throws IOException {
        byte[] messageBytes = message.getBytes();
        int length = messageBytes.length;
        
        // 构建WebSocket帧
        ByteArrayOutputStream frame = new ByteArrayOutputStream();
        
        // 第一个字节:FIN=1, RSV=000, Opcode=0001 (文本)
        frame.write(0x81);
        
        // 第二个字节:MASK=0, Payload length
        if (length < 126) {
            frame.write(length);
        } else if (length < 65536) {
            frame.write(126);
            frame.write((length >> 8) & 0xFF);
            frame.write(length & 0xFF);
        } else {
            frame.write(127);
            // 简化处理,不支持超长消息
            for (int i = 0; i < 8; i++) {
                frame.write(0);
            }
        }
        
        // 载荷数据
        frame.write(messageBytes);
        
        outputStream.write(frame.toByteArray());
        outputStream.flush();
    }
    
    private void sendHttpResponse(OutputStream outputStream) throws IOException {
        String htmlResponse = "<!DOCTYPE html>\n" +
                             "<html>\n" +
                             "<head><title>WebSocket测试</title></head>\n" +
                             "<body>\n" +
                             "<h1>WebSocket服务器</h1>\n" +
                             "<p>请使用WebSocket客户端连接到此服务器。</p>\n" +
                             "</body>\n" +
                             "</html>";
        
        String response = "HTTP/1.1 200 OK\r\n" +
                         "Content-Type: text/html\r\n" +
                         "Content-Length: " + htmlResponse.length() + "\r\n" +
                         "\r\n" +
                         htmlResponse;
        
        outputStream.write(response.getBytes());
        outputStream.flush();
    }
}

6. 网络协议处理

6.1 自定义协议实现

// 自定义协议消息
class ProtocolMessage {
    private int messageType;
    private int messageId;
    private byte[] data;
    
    public ProtocolMessage(int messageType, int messageId, byte[] data) {
        this.messageType = messageType;
        this.messageId = messageId;
        this.data = data;
    }
    
    // 序列化为字节数组
    public byte[] serialize() {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(baos);
        
        try {
            dos.writeInt(messageType);
            dos.writeInt(messageId);
            dos.writeInt(data.length);
            dos.write(data);
            dos.flush();
        } catch (IOException e) {
            throw new RuntimeException("序列化失败", e);
        }
        
        return baos.toByteArray();
    }
    
    // 从字节数组反序列化
    public static ProtocolMessage deserialize(byte[] bytes) {
        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
        DataInputStream dis = new DataInputStream(bais);
        
        try {
            int messageType = dis.readInt();
            int messageId = dis.readInt();
            int dataLength = dis.readInt();
            
            byte[] data = new byte[dataLength];
            dis.readFully(data);
            
            return new ProtocolMessage(messageType, messageId, data);
        } catch (IOException e) {
            throw new RuntimeException("反序列化失败", e);
        }
    }
    
    // Getters
    public int getMessageType() { return messageType; }
    public int getMessageId() { return messageId; }
    public byte[] getData() { return data; }
    public String getDataAsString() { return new String(data); }
    
    @Override
    public String toString() {
        return String.format("ProtocolMessage{type=%d, id=%d, data='%s'}", 
                           messageType, messageId, getDataAsString());
    }
}

// 协议处理演示
public class ProtocolDemo {
    // 消息类型常量
    public static final int MSG_TYPE_HEARTBEAT = 1;
    public static final int MSG_TYPE_REQUEST = 2;
    public static final int MSG_TYPE_RESPONSE = 3;
    public static final int MSG_TYPE_NOTIFICATION = 4;
    
    public static void main(String[] args) {
        demonstrateProtocol();
    }
    
    public static void demonstrateProtocol() {
        System.out.println("=== 自定义协议演示 ===");
        
        // 创建不同类型的消息
        ProtocolMessage heartbeat = new ProtocolMessage(
            MSG_TYPE_HEARTBEAT, 1, "ping".getBytes());
        
        ProtocolMessage request = new ProtocolMessage(
            MSG_TYPE_REQUEST, 2, "获取用户信息".getBytes());
        
        ProtocolMessage response = new ProtocolMessage(
            MSG_TYPE_RESPONSE, 2, "{\"name\":\"张三\",\"age\":25}".getBytes());
        
        ProtocolMessage notification = new ProtocolMessage(
            MSG_TYPE_NOTIFICATION, 3, "系统维护通知".getBytes());
        
        // 序列化和反序列化测试
        testSerialization(heartbeat);
        testSerialization(request);
        testSerialization(response);
        testSerialization(notification);
        
        // 协议服务器演示
        startProtocolServer();
    }
    
    private static void testSerialization(ProtocolMessage message) {
        System.out.println("\n--- 序列化测试 ---");
        System.out.println("原始消息: " + message);
        
        // 序列化
        byte[] serialized = message.serialize();
        System.out.println("序列化后字节数: " + serialized.length);
        
        // 反序列化
        ProtocolMessage deserialized = ProtocolMessage.deserialize(serialized);
        System.out.println("反序列化消息: " + deserialized);
        
        // 验证一致性
        boolean isEqual = message.getMessageType() == deserialized.getMessageType() &&
                         message.getMessageId() == deserialized.getMessageId() &&
                         Arrays.equals(message.getData(), deserialized.getData());
        System.out.println("序列化一致性: " + (isEqual ? "通过" : "失败"));
    }
    
    private static void startProtocolServer() {
        System.out.println("\n=== 协议服务器启动 ===");
        
        Thread serverThread = new Thread(() -> {
            try (ServerSocket serverSocket = new ServerSocket(8086)) {
                System.out.println("协议服务器监听端口: 8086");
                
                Socket clientSocket = serverSocket.accept();
                System.out.println("客户端连接: " + clientSocket.getRemoteSocketAddress());
                
                handleProtocolClient(clientSocket);
                
            } catch (IOException e) {
                System.out.println("协议服务器异常: " + e.getMessage());
            }
        });
        
        serverThread.start();
        
        // 等待服务器启动
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        // 启动客户端测试
        testProtocolClient();
    }
    
    private static void handleProtocolClient(Socket clientSocket) {
        try (DataInputStream dis = new DataInputStream(clientSocket.getInputStream());
             DataOutputStream dos = new DataOutputStream(clientSocket.getOutputStream())) {
            
            while (true) {
                // 读取消息长度
                int messageLength = dis.readInt();
                if (messageLength <= 0) break;
                
                // 读取消息内容
                byte[] messageBytes = new byte[messageLength];
                dis.readFully(messageBytes);
                
                // 反序列化消息
                ProtocolMessage message = ProtocolMessage.deserialize(messageBytes);
                System.out.println("服务器收到: " + message);
                
                // 处理消息并发送回复
                ProtocolMessage response = processMessage(message);
                byte[] responseBytes = response.serialize();
                
                dos.writeInt(responseBytes.length);
                dos.write(responseBytes);
                dos.flush();
                
                System.out.println("服务器发送: " + response);
                
                if (message.getMessageType() == MSG_TYPE_HEARTBEAT && 
                    "bye".equals(message.getDataAsString())) {
                    break;
                }
            }
            
        } catch (IOException e) {
            System.out.println("处理协议客户端异常: " + e.getMessage());
        } finally {
            try {
                clientSocket.close();
            } catch (IOException e) {
                System.out.println("关闭客户端连接异常: " + e.getMessage());
            }
        }
    }
    
    private static ProtocolMessage processMessage(ProtocolMessage message) {
        switch (message.getMessageType()) {
            case MSG_TYPE_HEARTBEAT:
                return new ProtocolMessage(MSG_TYPE_HEARTBEAT, 
                    message.getMessageId(), "pong".getBytes());
                
            case MSG_TYPE_REQUEST:
                String requestData = message.getDataAsString();
                String responseData = "处理请求: " + requestData + " -> 成功";
                return new ProtocolMessage(MSG_TYPE_RESPONSE, 
                    message.getMessageId(), responseData.getBytes());
                
            default:
                return new ProtocolMessage(MSG_TYPE_RESPONSE, 
                    message.getMessageId(), "未知消息类型".getBytes());
        }
    }
    
    private static void testProtocolClient() {
        System.out.println("\n=== 协议客户端测试 ===");
        
        try (Socket socket = new Socket("localhost", 8086);
             DataInputStream dis = new DataInputStream(socket.getInputStream());
             DataOutputStream dos = new DataOutputStream(socket.getOutputStream())) {
            
            // 发送心跳消息
            sendMessage(dos, new ProtocolMessage(MSG_TYPE_HEARTBEAT, 1, "ping".getBytes()));
            receiveMessage(dis);
            
            // 发送请求消息
            sendMessage(dos, new ProtocolMessage(MSG_TYPE_REQUEST, 2, "查询订单".getBytes()));
            receiveMessage(dis);
            
            // 发送结束消息
            sendMessage(dos, new ProtocolMessage(MSG_TYPE_HEARTBEAT, 3, "bye".getBytes()));
            receiveMessage(dis);
            
        } catch (IOException e) {
            System.out.println("协议客户端异常: " + e.getMessage());
        }
    }
    
    private static void sendMessage(DataOutputStream dos, ProtocolMessage message) 
            throws IOException {
        byte[] messageBytes = message.serialize();
        dos.writeInt(messageBytes.length);
        dos.write(messageBytes);
        dos.flush();
        System.out.println("客户端发送: " + message);
    }
    
    private static void receiveMessage(DataInputStream dis) throws IOException {
        int messageLength = dis.readInt();
        byte[] messageBytes = new byte[messageLength];
        dis.readFully(messageBytes);
        
        ProtocolMessage message = ProtocolMessage.deserialize(messageBytes);
         System.out.println("客户端收到: " + message);
     }
 }

7. 网络编程最佳实践

7.1 性能优化

// 网络性能优化演示
public class NetworkPerformanceDemo {
    
    public static void main(String[] args) {
        demonstrateBufferOptimization();
        demonstrateConnectionPooling();
        demonstrateAsyncProcessing();
    }
    
    // 缓冲区优化
    public static void demonstrateBufferOptimization() {
        System.out.println("=== 缓冲区优化演示 ===");
        
        // 测试不同缓冲区大小的性能
        int[] bufferSizes = {1024, 4096, 8192, 16384};
        
        for (int bufferSize : bufferSizes) {
            long startTime = System.currentTimeMillis();
            
            try {
                // 模拟网络数据传输
                simulateDataTransfer(bufferSize);
                
                long endTime = System.currentTimeMillis();
                System.out.println("缓冲区大小: " + bufferSize + 
                                 ", 传输时间: " + (endTime - startTime) + "ms");
                
            } catch (IOException e) {
                System.out.println("传输异常: " + e.getMessage());
            }
        }
        
        System.out.println();
    }
    
    private static void simulateDataTransfer(int bufferSize) throws IOException {
        // 创建测试数据
        byte[] testData = new byte[1024 * 1024]; // 1MB数据
        Arrays.fill(testData, (byte) 'A');
        
        // 使用ByteArrayInputStream和ByteArrayOutputStream模拟网络传输
        try (ByteArrayInputStream input = new ByteArrayInputStream(testData);
             ByteArrayOutputStream output = new ByteArrayOutputStream()) {
            
            byte[] buffer = new byte[bufferSize];
            int bytesRead;
            
            while ((bytesRead = input.read(buffer)) != -1) {
                output.write(buffer, 0, bytesRead);
            }
        }
    }
    
    // 连接池优化
    public static void demonstrateConnectionPooling() {
        System.out.println("=== 连接池优化演示 ===");
        
        SimpleConnectionPool pool = new SimpleConnectionPool("localhost", 8080, 5);
        
        // 模拟多个并发请求
        ExecutorService executor = Executors.newFixedThreadPool(10);
        
        for (int i = 0; i < 20; i++) {
            final int requestId = i;
            executor.submit(() -> {
                try {
                    Socket connection = pool.getConnection();
                    System.out.println("请求 " + requestId + " 获得连接: " + connection);
                    
                    // 模拟使用连接
                    Thread.sleep(100);
                    
                    pool.releaseConnection(connection);
                    System.out.println("请求 " + requestId + " 释放连接");
                    
                } catch (Exception e) {
                    System.out.println("请求 " + requestId + " 异常: " + e.getMessage());
                }
            });
        }
        
        executor.shutdown();
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        pool.close();
        System.out.println();
    }
    
    // 异步处理优化
    public static void demonstrateAsyncProcessing() {
        System.out.println("=== 异步处理优化演示 ===");
        
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                return "异步任务1完成";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "任务1被中断";
            }
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(800);
                return "异步任务2完成";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "任务2被中断";
            }
        });
        
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1200);
                return "异步任务3完成";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                return "任务3被中断";
            }
        });
        
        // 等待所有任务完成
        CompletableFuture<Void> allTasks = CompletableFuture.allOf(future1, future2, future3);
        
        allTasks.thenRun(() -> {
            try {
                System.out.println("结果1: " + future1.get());
                System.out.println("结果2: " + future2.get());
                System.out.println("结果3: " + future3.get());
            } catch (Exception e) {
                System.out.println("获取结果异常: " + e.getMessage());
            }
        }).join();
        
        System.out.println();
    }
}

// 简单连接池实现
class SimpleConnectionPool {
    private final String host;
    private final int port;
    private final int maxConnections;
    private final Queue<Socket> availableConnections;
    private final Set<Socket> usedConnections;
    private final Object lock = new Object();
    
    public SimpleConnectionPool(String host, int port, int maxConnections) {
        this.host = host;
        this.port = port;
        this.maxConnections = maxConnections;
        this.availableConnections = new LinkedList<>();
        this.usedConnections = new HashSet<>();
    }
    
    public Socket getConnection() throws IOException {
        synchronized (lock) {
            if (!availableConnections.isEmpty()) {
                Socket connection = availableConnections.poll();
                usedConnections.add(connection);
                return connection;
            } else if (usedConnections.size() < maxConnections) {
                Socket connection = new Socket(host, port);
                usedConnections.add(connection);
                return connection;
            } else {
                throw new IOException("连接池已满,无法获取新连接");
            }
        }
    }
    
    public void releaseConnection(Socket connection) {
        synchronized (lock) {
            if (usedConnections.remove(connection)) {
                if (!connection.isClosed()) {
                    availableConnections.offer(connection);
                }
            }
        }
    }
    
    public void close() {
        synchronized (lock) {
            for (Socket connection : availableConnections) {
                try {
                    connection.close();
                } catch (IOException e) {
                    System.out.println("关闭连接异常: " + e.getMessage());
                }
            }
            
            for (Socket connection : usedConnections) {
                try {
                    connection.close();
                } catch (IOException e) {
                    System.out.println("关闭连接异常: " + e.getMessage());
                }
            }
            
            availableConnections.clear();
            usedConnections.clear();
        }
    }
}

7.2 安全最佳实践

// 网络安全演示
public class NetworkSecurityDemo {
    
    public static void main(String[] args) {
        demonstrateSSLSocket();
        demonstrateInputValidation();
        demonstrateRateLimiting();
    }
    
    // SSL/TLS安全连接
    public static void demonstrateSSLSocket() {
        System.out.println("=== SSL安全连接演示 ===");
        
        try {
            // 创建SSL上下文
            SSLContext sslContext = SSLContext.getInstance("TLS");
            sslContext.init(null, null, null);
            
            // 创建SSL Socket工厂
            SSLSocketFactory factory = sslContext.getSocketFactory();
            
            // 连接到HTTPS服务器
            try (SSLSocket sslSocket = (SSLSocket) factory.createSocket("httpbin.org", 443)) {
                
                // 启用所有支持的协议
                sslSocket.setEnabledProtocols(sslSocket.getSupportedProtocols());
                
                // 发送HTTP请求
                PrintWriter out = new PrintWriter(sslSocket.getOutputStream(), true);
                BufferedReader in = new BufferedReader(new InputStreamReader(sslSocket.getInputStream()));
                
                out.println("GET /get HTTP/1.1");
                out.println("Host: httpbin.org");
                out.println("Connection: close");
                out.println();
                
                // 读取响应
                String line;
                int lineCount = 0;
                while ((line = in.readLine()) != null && lineCount < 10) {
                    System.out.println(line);
                    lineCount++;
                }
                
                System.out.println("SSL连接成功,协议: " + sslSocket.getSession().getProtocol());
            }
            
        } catch (Exception e) {
            System.out.println("SSL连接异常: " + e.getMessage());
        }
        
        System.out.println();
    }
    
    // 输入验证
    public static void demonstrateInputValidation() {
        System.out.println("=== 输入验证演示 ===");
        
        String[] testInputs = {
            "正常输入",
            "<script>alert('XSS')</script>",
            "'; DROP TABLE users; --",
            "../../../etc/passwd",
            "正常的长文本输入",
            "包含特殊字符的输入: @#$%^&*()"
        };
        
        for (String input : testInputs) {
            boolean isValid = validateInput(input);
            System.out.println("输入: \"" + input + "\" -> " + (isValid ? "有效" : "无效"));
        }
        
        System.out.println();
    }
    
    private static boolean validateInput(String input) {
        if (input == null || input.trim().isEmpty()) {
            return false;
        }
        
        // 长度检查
        if (input.length() > 1000) {
            return false;
        }
        
        // 危险字符检查
        String[] dangerousPatterns = {
            "<script", "</script>", "javascript:", "vbscript:",
            "DROP TABLE", "DELETE FROM", "INSERT INTO", "UPDATE SET",
            "../", "..\\\\"  // 路径遍历
        };
        
        String lowerInput = input.toLowerCase();
        for (String pattern : dangerousPatterns) {
            if (lowerInput.contains(pattern.toLowerCase())) {
                return false;
            }
        }
        
        return true;
    }
    
    // 速率限制
    public static void demonstrateRateLimiting() {
        System.out.println("=== 速率限制演示 ===");
        
        RateLimiter rateLimiter = new RateLimiter(5, 1000); // 每秒最多5个请求
        
        // 模拟客户端请求
        for (int i = 1; i <= 10; i++) {
            boolean allowed = rateLimiter.isAllowed("client1");
            System.out.println("请求 " + i + ": " + (allowed ? "允许" : "被限制"));
            
            try {
                Thread.sleep(150); // 模拟请求间隔
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        System.out.println();
    }
}

// 简单的速率限制器
class RateLimiter {
    private final int maxRequests;
    private final long timeWindowMs;
    private final Map<String, Queue<Long>> clientRequests;
    
    public RateLimiter(int maxRequests, long timeWindowMs) {
        this.maxRequests = maxRequests;
        this.timeWindowMs = timeWindowMs;
        this.clientRequests = new ConcurrentHashMap<>();
    }
    
    public boolean isAllowed(String clientId) {
        long currentTime = System.currentTimeMillis();
        
        Queue<Long> requests = clientRequests.computeIfAbsent(clientId, 
            k -> new LinkedList<>());
        
        synchronized (requests) {
            // 清理过期请求
            while (!requests.isEmpty() && 
                   currentTime - requests.peek() > timeWindowMs) {
                requests.poll();
            }
            
            // 检查是否超过限制
            if (requests.size() >= maxRequests) {
                return false;
            }
            
            // 记录当前请求
            requests.offer(currentTime);
            return true;
        }
    }
}

7.3 错误处理和监控

// 网络错误处理和监控
public class NetworkMonitoringDemo {
    
    public static void main(String[] args) {
        demonstrateErrorHandling();
        demonstrateNetworkMonitoring();
    }
    
    // 错误处理演示
    public static void demonstrateErrorHandling() {
        System.out.println("=== 网络错误处理演示 ===");
        
        NetworkClient client = new NetworkClient();
        
        // 测试各种网络错误情况
        client.testConnection("localhost", 8080);  // 可能的连接拒绝
        client.testConnection("nonexistent.host", 80);  // 主机不存在
        client.testConnection("httpbin.org", 80);  // 正常连接
        
        System.out.println();
    }
    
    // 网络监控演示
    public static void demonstrateNetworkMonitoring() {
        System.out.println("=== 网络监控演示 ===");
        
        NetworkMonitor monitor = new NetworkMonitor();
        
        // 启动监控
        monitor.startMonitoring();
        
        // 模拟一些网络活动
        for (int i = 0; i < 5; i++) {
            monitor.recordRequest("GET", "/api/users", 200, 150);
            monitor.recordRequest("POST", "/api/orders", 201, 300);
            monitor.recordRequest("GET", "/api/products", 404, 50);
            
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
        
        // 显示监控统计
        monitor.printStatistics();
        monitor.stopMonitoring();
        
        System.out.println();
    }
}

// 网络客户端错误处理
class NetworkClient {
    private static final int CONNECT_TIMEOUT = 5000;
    private static final int READ_TIMEOUT = 10000;
    private static final int MAX_RETRIES = 3;
    
    public void testConnection(String host, int port) {
        System.out.println("\n--- 测试连接: " + host + ":" + port + " ---");
        
        for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
            try {
                Socket socket = new Socket();
                socket.connect(new InetSocketAddress(host, port), CONNECT_TIMEOUT);
                socket.setSoTimeout(READ_TIMEOUT);
                
                System.out.println("连接成功 (尝试 " + attempt + "/" + MAX_RETRIES + ")");
                socket.close();
                return;
                
            } catch (UnknownHostException e) {
                System.out.println("主机不存在: " + e.getMessage());
                break; // 不需要重试
                
            } catch (ConnectException e) {
                System.out.println("连接被拒绝 (尝试 " + attempt + "/" + MAX_RETRIES + "): " + e.getMessage());
                
            } catch (SocketTimeoutException e) {
                System.out.println("连接超时 (尝试 " + attempt + "/" + MAX_RETRIES + "): " + e.getMessage());
                
            } catch (IOException e) {
                System.out.println("I/O异常 (尝试 " + attempt + "/" + MAX_RETRIES + "): " + e.getMessage());
            }
            
            if (attempt < MAX_RETRIES) {
                try {
                    Thread.sleep(1000 * attempt); // 指数退避
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }
        
        System.out.println("连接失败,已达到最大重试次数");
    }
}

// 网络监控器
class NetworkMonitor {
    private final Map<String, RequestStats> endpointStats;
    private final AtomicLong totalRequests;
    private final AtomicLong totalErrors;
    private volatile boolean monitoring;
    
    public NetworkMonitor() {
        this.endpointStats = new ConcurrentHashMap<>();
        this.totalRequests = new AtomicLong(0);
        this.totalErrors = new AtomicLong(0);
        this.monitoring = false;
    }
    
    public void startMonitoring() {
        monitoring = true;
        System.out.println("网络监控已启动");
    }
    
    public void stopMonitoring() {
        monitoring = false;
        System.out.println("网络监控已停止");
    }
    
    public void recordRequest(String method, String endpoint, int statusCode, long responseTimeMs) {
        if (!monitoring) return;
        
        totalRequests.incrementAndGet();
        
        if (statusCode >= 400) {
            totalErrors.incrementAndGet();
        }
        
        String key = method + " " + endpoint;
        RequestStats stats = endpointStats.computeIfAbsent(key, k -> new RequestStats());
        stats.addRequest(statusCode, responseTimeMs);
        
        System.out.println("记录请求: " + key + " -> " + statusCode + " (" + responseTimeMs + "ms)");
    }
    
    public void printStatistics() {
        System.out.println("\n=== 网络监控统计 ===");
        System.out.println("总请求数: " + totalRequests.get());
        System.out.println("总错误数: " + totalErrors.get());
        System.out.println("错误率: " + String.format("%.2f%%", 
            (totalErrors.get() * 100.0) / Math.max(1, totalRequests.get())));
        
        System.out.println("\n端点统计:");
        for (Map.Entry<String, RequestStats> entry : endpointStats.entrySet()) {
            System.out.println("  " + entry.getKey() + ": " + entry.getValue());
        }
    }
}

// 请求统计
class RequestStats {
    private final AtomicLong requestCount;
    private final AtomicLong errorCount;
    private final AtomicLong totalResponseTime;
    private final AtomicLong maxResponseTime;
    private final AtomicLong minResponseTime;
    
    public RequestStats() {
        this.requestCount = new AtomicLong(0);
        this.errorCount = new AtomicLong(0);
        this.totalResponseTime = new AtomicLong(0);
        this.maxResponseTime = new AtomicLong(0);
        this.minResponseTime = new AtomicLong(Long.MAX_VALUE);
    }
    
    public void addRequest(int statusCode, long responseTimeMs) {
        requestCount.incrementAndGet();
        
        if (statusCode >= 400) {
            errorCount.incrementAndGet();
        }
        
        totalResponseTime.addAndGet(responseTimeMs);
        
        // 更新最大响应时间
        long currentMax = maxResponseTime.get();
        while (responseTimeMs > currentMax && 
               !maxResponseTime.compareAndSet(currentMax, responseTimeMs)) {
            currentMax = maxResponseTime.get();
        }
        
        // 更新最小响应时间
        long currentMin = minResponseTime.get();
        while (responseTimeMs < currentMin && 
               !minResponseTime.compareAndSet(currentMin, responseTimeMs)) {
            currentMin = minResponseTime.get();
        }
    }
    
    @Override
    public String toString() {
        long count = requestCount.get();
        if (count == 0) return "无请求";
        
        double avgResponseTime = (double) totalResponseTime.get() / count;
        double errorRate = (errorCount.get() * 100.0) / count;
        
        return String.format("请求数=%d, 错误率=%.1f%%, 平均响应时间=%.1fms, 最小=%dms, 最大=%dms",
            count, errorRate, avgResponseTime, 
            minResponseTime.get() == Long.MAX_VALUE ? 0 : minResponseTime.get(),
            maxResponseTime.get());
    }
}

本章小结

网络编程技术对比

技术 适用场景 优点 缺点
传统Socket 简单的客户端-服务器通信 简单易懂,直接控制 阻塞I/O,性能有限
NIO 高并发服务器 非阻塞,高性能 编程复杂,学习成本高
AIO 异步I/O密集型应用 真正异步,回调机制 复杂度高,调试困难
HTTP客户端 Web服务调用 标准协议,易于集成 协议开销,不适合实时通信
WebSocket 实时双向通信 低延迟,全双工 协议复杂,需要特殊处理

性能优化要点

  1. 缓冲区优化

    • 选择合适的缓冲区大小
    • 使用直接内存缓冲区
    • 避免频繁的小数据传输
  2. 连接管理

    • 使用连接池复用连接
    • 设置合理的超时时间
    • 及时关闭无用连接
  3. 并发处理

    • 使用线程池处理请求
    • 采用异步编程模型
    • 避免阻塞主线程

安全考虑

  1. 传输安全

    • 使用SSL/TLS加密通信
    • 验证服务器证书
    • 选择安全的加密算法
  2. 输入验证

    • 验证所有输入数据
    • 防止注入攻击
    • 限制输入长度和格式
  3. 访问控制

    • 实施速率限制
    • 使用身份认证
    • 记录访问日志

开发建议

  1. 选择合适的技术

    • 根据应用需求选择网络编程模型
    • 考虑性能、复杂度和维护成本
    • 优先使用成熟的框架和库
  2. 错误处理

    • 实现完善的异常处理机制
    • 提供有意义的错误信息
    • 实施重试和降级策略
  3. 监控和调试

    • 添加详细的日志记录
    • 监控网络性能指标
    • 使用网络分析工具

下一章预告

下一章我们将学习Java Web开发,包括: - Servlet和JSP基础 - Spring框架核心概念 - Spring Boot快速开发 - RESTful Web服务 - Web安全和认证 - 前后端分离架构

练习题

基础练习

  1. TCP聊天室

    • 实现一个多用户TCP聊天室
    • 支持用户登录、消息广播、私聊功能
    • 使用多线程处理多个客户端连接
  2. 文件传输服务

    • 实现TCP文件传输客户端和服务器
    • 支持大文件传输、断点续传
    • 添加传输进度显示和错误恢复
  3. HTTP代理服务器

    • 实现简单的HTTP代理服务器
    • 支持GET和POST请求转发
    • 添加请求日志和缓存功能

进阶练习

  1. NIO Web服务器

    • 使用NIO实现高性能Web服务器
    • 支持静态文件服务和简单的动态内容
    • 实现连接池和请求路由
  2. 分布式消息系统

    • 设计并实现简单的消息队列系统
    • 支持发布-订阅模式
    • 实现消息持久化和集群部署
  3. 网络监控工具

    • 开发网络性能监控工具
    • 监控延迟、吞吐量、错误率等指标
    • 提供实时图表和告警功能

挑战练习

  1. 自定义RPC框架

    • 实现简单的RPC(远程过程调用)框架
    • 支持服务注册、发现和负载均衡
    • 实现序列化、网络传输和代理生成
  2. WebSocket游戏服务器

    • 开发实时多人在线游戏服务器
    • 使用WebSocket实现实时通信
    • 处理游戏状态同步和冲突解决
  3. 网络安全扫描器

    • 实现网络端口扫描工具
    • 检测常见的安全漏洞
    • 生成安全评估报告

通过这些练习,你将深入理解Java网络编程的各个方面,掌握构建高性能、安全可靠的网络应用程序的技能。 “`