本章目标
- 掌握Socket编程基础
- 理解TCP和UDP协议编程
- 学习NIO和AIO编程模型
- 掌握HTTP客户端编程
- 了解网络协议处理
- 学习分布式通信基础
1. Socket编程基础
1.1 TCP Socket编程
// TCP服务器端
public class TCPServer {
public static void main(String[] args) {
demonstrateBasicTCPServer();
}
public static void demonstrateBasicTCPServer() {
System.out.println("=== TCP服务器演示 ===");
try (ServerSocket serverSocket = new ServerSocket(8080)) {
System.out.println("服务器启动,监听端口: 8080");
while (true) {
// 接受客户端连接
Socket clientSocket = serverSocket.accept();
System.out.println("客户端连接: " + clientSocket.getRemoteSocketAddress());
// 为每个客户端创建处理线程
Thread clientHandler = new Thread(new ClientHandler(clientSocket));
clientHandler.start();
}
} catch (IOException e) {
System.out.println("服务器异常: " + e.getMessage());
}
}
}
// 客户端处理器
class ClientHandler implements Runnable {
private Socket clientSocket;
public ClientHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(
clientSocket.getOutputStream(), true)) {
String inputLine;
while ((inputLine = reader.readLine()) != null) {
System.out.println("收到消息: " + inputLine);
// 回显消息
if ("bye".equalsIgnoreCase(inputLine)) {
writer.println("再见!");
break;
} else {
writer.println("回显: " + inputLine);
}
}
} catch (IOException e) {
System.out.println("处理客户端时发生异常: " + e.getMessage());
} finally {
try {
clientSocket.close();
System.out.println("客户端连接关闭");
} catch (IOException e) {
System.out.println("关闭客户端连接时发生异常: " + e.getMessage());
}
}
}
}
// TCP客户端
public class TCPClient {
public static void main(String[] args) {
demonstrateBasicTCPClient();
}
public static void demonstrateBasicTCPClient() {
System.out.println("=== TCP客户端演示 ===");
try (Socket socket = new Socket("localhost", 8080);
BufferedReader reader = new BufferedReader(
new InputStreamReader(socket.getInputStream()));
PrintWriter writer = new PrintWriter(
socket.getOutputStream(), true);
Scanner scanner = new Scanner(System.in)) {
System.out.println("连接到服务器: " + socket.getRemoteSocketAddress());
System.out.println("输入消息 (输入 'bye' 退出):");
String userInput;
while ((userInput = scanner.nextLine()) != null) {
// 发送消息
writer.println(userInput);
// 接收回复
String response = reader.readLine();
System.out.println("服务器回复: " + response);
if ("bye".equalsIgnoreCase(userInput)) {
break;
}
}
} catch (IOException e) {
System.out.println("客户端异常: " + e.getMessage());
}
}
}
1.2 UDP Socket编程
// UDP服务器端
public class UDPServer {
public static void main(String[] args) {
demonstrateBasicUDPServer();
}
public static void demonstrateBasicUDPServer() {
System.out.println("=== UDP服务器演示 ===");
try (DatagramSocket socket = new DatagramSocket(9090)) {
System.out.println("UDP服务器启动,监听端口: 9090");
byte[] buffer = new byte[1024];
while (true) {
// 接收数据包
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
String receivedMessage = new String(packet.getData(), 0, packet.getLength());
System.out.println("收到来自 " + packet.getAddress() + ":" +
packet.getPort() + " 的消息: " + receivedMessage);
// 发送回复
String response = "回显: " + receivedMessage;
byte[] responseData = response.getBytes();
DatagramPacket responsePacket = new DatagramPacket(
responseData, responseData.length,
packet.getAddress(), packet.getPort());
socket.send(responsePacket);
if ("bye".equalsIgnoreCase(receivedMessage)) {
System.out.println("收到退出信号,服务器关闭");
break;
}
}
} catch (IOException e) {
System.out.println("UDP服务器异常: " + e.getMessage());
}
}
}
// UDP客户端
public class UDPClient {
public static void main(String[] args) {
demonstrateBasicUDPClient();
}
public static void demonstrateBasicUDPClient() {
System.out.println("=== UDP客户端演示 ===");
try (DatagramSocket socket = new DatagramSocket();
Scanner scanner = new Scanner(System.in)) {
InetAddress serverAddress = InetAddress.getByName("localhost");
int serverPort = 9090;
System.out.println("连接到UDP服务器: " + serverAddress + ":" + serverPort);
System.out.println("输入消息 (输入 'bye' 退出):");
String userInput;
while ((userInput = scanner.nextLine()) != null) {
// 发送数据包
byte[] data = userInput.getBytes();
DatagramPacket packet = new DatagramPacket(
data, data.length, serverAddress, serverPort);
socket.send(packet);
// 接收回复
byte[] buffer = new byte[1024];
DatagramPacket responsePacket = new DatagramPacket(buffer, buffer.length);
socket.receive(responsePacket);
String response = new String(responsePacket.getData(), 0,
responsePacket.getLength());
System.out.println("服务器回复: " + response);
if ("bye".equalsIgnoreCase(userInput)) {
break;
}
}
} catch (IOException e) {
System.out.println("UDP客户端异常: " + e.getMessage());
}
}
}
1.3 多线程服务器
// 多线程TCP服务器
public class MultiThreadTCPServer {
private static final int PORT = 8081;
private static final int MAX_CLIENTS = 10;
public static void main(String[] args) {
demonstrateMultiThreadServer();
}
public static void demonstrateMultiThreadServer() {
System.out.println("=== 多线程TCP服务器演示 ===");
ExecutorService executor = Executors.newFixedThreadPool(MAX_CLIENTS);
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
System.out.println("多线程服务器启动,监听端口: " + PORT);
System.out.println("最大客户端连接数: " + MAX_CLIENTS);
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("新客户端连接: " + clientSocket.getRemoteSocketAddress());
// 提交到线程池处理
executor.submit(new EnhancedClientHandler(clientSocket));
}
} catch (IOException e) {
System.out.println("服务器异常: " + e.getMessage());
} finally {
executor.shutdown();
}
}
}
// 增强的客户端处理器
class EnhancedClientHandler implements Runnable {
private Socket clientSocket;
private String clientId;
public EnhancedClientHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
this.clientId = "Client-" + clientSocket.getRemoteSocketAddress();
}
@Override
public void run() {
System.out.println(clientId + " 开始处理");
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(
clientSocket.getOutputStream(), true)) {
// 发送欢迎消息
writer.println("欢迎连接到多线程服务器! 你的ID是: " + clientId);
writer.println("支持的命令: time, echo <message>, calc <expression>, bye");
String inputLine;
while ((inputLine = reader.readLine()) != null) {
System.out.println(clientId + " 发送: " + inputLine);
String response = processCommand(inputLine);
writer.println(response);
if ("bye".equalsIgnoreCase(inputLine)) {
break;
}
}
} catch (IOException e) {
System.out.println(clientId + " 处理异常: " + e.getMessage());
} finally {
try {
clientSocket.close();
System.out.println(clientId + " 连接关闭");
} catch (IOException e) {
System.out.println(clientId + " 关闭连接时异常: " + e.getMessage());
}
}
}
private String processCommand(String command) {
if (command == null || command.trim().isEmpty()) {
return "错误: 空命令";
}
command = command.trim();
if ("time".equalsIgnoreCase(command)) {
return "当前时间: " + new Date();
} else if (command.toLowerCase().startsWith("echo ")) {
return "回显: " + command.substring(5);
} else if (command.toLowerCase().startsWith("calc ")) {
return calculateExpression(command.substring(5));
} else if ("bye".equalsIgnoreCase(command)) {
return "再见! 感谢使用服务器";
} else {
return "未知命令: " + command + ". 支持的命令: time, echo, calc, bye";
}
}
private String calculateExpression(String expression) {
try {
// 简单的计算器实现(仅支持基本运算)
expression = expression.trim();
if (expression.contains("+")) {
String[] parts = expression.split("\\+");
double result = Double.parseDouble(parts[0].trim()) +
Double.parseDouble(parts[1].trim());
return "计算结果: " + result;
} else if (expression.contains("-")) {
String[] parts = expression.split("-");
double result = Double.parseDouble(parts[0].trim()) -
Double.parseDouble(parts[1].trim());
return "计算结果: " + result;
} else if (expression.contains("*")) {
String[] parts = expression.split("\\*");
double result = Double.parseDouble(parts[0].trim()) *
Double.parseDouble(parts[1].trim());
return "计算结果: " + result;
} else if (expression.contains("/")) {
String[] parts = expression.split("/");
double divisor = Double.parseDouble(parts[1].trim());
if (divisor == 0) {
return "错误: 除数不能为零";
}
double result = Double.parseDouble(parts[0].trim()) / divisor;
return "计算结果: " + result;
} else {
return "错误: 不支持的表达式格式";
}
} catch (Exception e) {
return "计算错误: " + e.getMessage();
}
}
}
2. NIO编程
2.1 NIO基础概念
// NIO基础演示
public class NIOBasicsDemo {
public static void main(String[] args) {
demonstrateBuffer();
demonstrateChannel();
demonstrateSelector();
}
// Buffer操作演示
public static void demonstrateBuffer() {
System.out.println("=== Buffer操作演示 ===");
// ByteBuffer操作
ByteBuffer buffer = ByteBuffer.allocate(10);
System.out.println("初始状态 - position: " + buffer.position() +
", limit: " + buffer.limit() +
", capacity: " + buffer.capacity());
// 写入数据
buffer.put("Hello".getBytes());
System.out.println("写入后 - position: " + buffer.position() +
", limit: " + buffer.limit());
// 切换到读模式
buffer.flip();
System.out.println("flip后 - position: " + buffer.position() +
", limit: " + buffer.limit());
// 读取数据
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("读取的数据: " + new String(data));
// 清空缓冲区
buffer.clear();
System.out.println("clear后 - position: " + buffer.position() +
", limit: " + buffer.limit());
System.out.println();
}
// Channel操作演示
public static void demonstrateChannel() {
System.out.println("=== Channel操作演示 ===");
try {
// 文件Channel操作
Path path = Paths.get("nio_test.txt");
// 写入文件
try (FileChannel writeChannel = FileChannel.open(path,
StandardOpenOption.CREATE, StandardOpenOption.WRITE)) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("这是NIO Channel写入的数据\n".getBytes());
buffer.put("支持高效的文件操作\n".getBytes());
buffer.flip();
writeChannel.write(buffer);
System.out.println("数据写入文件: " + path);
}
// 读取文件
try (FileChannel readChannel = FileChannel.open(path,
StandardOpenOption.READ)) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = readChannel.read(buffer);
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("从文件读取的数据:");
System.out.println(new String(data));
System.out.println("读取字节数: " + bytesRead);
}
// 清理测试文件
Files.deleteIfExists(path);
} catch (IOException e) {
System.out.println("Channel操作异常: " + e.getMessage());
}
System.out.println();
}
// Selector操作演示
public static void demonstrateSelector() {
System.out.println("=== Selector操作演示 ===");
try {
Selector selector = Selector.open();
// 创建ServerSocketChannel
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(8082));
// 注册到Selector
SelectionKey serverKey = serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO服务器启动,监听端口: 8082");
System.out.println("Selector注册的Channel数量: " + selector.keys().size());
// 模拟选择操作(实际应用中会在循环中进行)
int readyChannels = selector.selectNow();
System.out.println("就绪的Channel数量: " + readyChannels);
// 清理资源
serverChannel.close();
selector.close();
} catch (IOException e) {
System.out.println("Selector操作异常: " + e.getMessage());
}
System.out.println();
}
}
2.2 NIO服务器实现
// NIO服务器
public class NIOServer {
private static final int PORT = 8083;
private Selector selector;
private ServerSocketChannel serverChannel;
public static void main(String[] args) {
NIOServer server = new NIOServer();
server.start();
}
public void start() {
try {
// 初始化
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.bind(new InetSocketAddress(PORT));
// 注册接受连接事件
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("NIO服务器启动,监听端口: " + PORT);
// 事件循环
while (true) {
// 阻塞等待事件
int readyChannels = selector.select();
if (readyChannels == 0) {
continue;
}
// 处理就绪的事件
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
keyIterator.remove();
try {
if (key.isAcceptable()) {
handleAccept(key);
} else if (key.isReadable()) {
handleRead(key);
} else if (key.isWritable()) {
handleWrite(key);
}
} catch (IOException e) {
System.out.println("处理事件时发生异常: " + e.getMessage());
key.cancel();
key.channel().close();
}
}
}
} catch (IOException e) {
System.out.println("服务器启动异常: " + e.getMessage());
} finally {
cleanup();
}
}
private void handleAccept(SelectionKey key) throws IOException {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
SocketChannel clientChannel = serverChannel.accept();
if (clientChannel != null) {
clientChannel.configureBlocking(false);
// 为客户端分配缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
clientChannel.register(selector, SelectionKey.OP_READ, buffer);
System.out.println("新客户端连接: " + clientChannel.getRemoteAddress());
}
}
private void handleRead(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
int bytesRead = clientChannel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
// 读取消息
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data).trim();
System.out.println("收到消息: " + message + " 来自: " +
clientChannel.getRemoteAddress());
// 准备回复
String response;
if ("bye".equalsIgnoreCase(message)) {
response = "再见!";
} else {
response = "回显: " + message;
}
// 将回复写入缓冲区
buffer.clear();
buffer.put(response.getBytes());
buffer.flip();
// 注册写事件
key.interestOps(SelectionKey.OP_WRITE);
} else if (bytesRead == -1) {
// 客户端断开连接
System.out.println("客户端断开连接: " + clientChannel.getRemoteAddress());
key.cancel();
clientChannel.close();
}
}
private void handleWrite(SelectionKey key) throws IOException {
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
clientChannel.write(buffer);
if (!buffer.hasRemaining()) {
// 写完后切换回读模式
buffer.clear();
key.interestOps(SelectionKey.OP_READ);
}
}
private void cleanup() {
try {
if (selector != null) {
selector.close();
}
if (serverChannel != null) {
serverChannel.close();
}
} catch (IOException e) {
System.out.println("清理资源时发生异常: " + e.getMessage());
}
}
}
2.3 NIO客户端实现
// NIO客户端
public class NIOClient {
private static final String HOST = "localhost";
private static final int PORT = 8083;
public static void main(String[] args) {
NIOClient client = new NIOClient();
client.start();
}
public void start() {
try (SocketChannel channel = SocketChannel.open();
Scanner scanner = new Scanner(System.in)) {
// 连接服务器
channel.connect(new InetSocketAddress(HOST, PORT));
channel.configureBlocking(false);
System.out.println("连接到NIO服务器: " + HOST + ":" + PORT);
System.out.println("输入消息 (输入 'bye' 退出):");
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (true) {
System.out.print("> ");
String input = scanner.nextLine();
if (input == null || input.trim().isEmpty()) {
continue;
}
// 发送消息
buffer.clear();
buffer.put(input.getBytes());
buffer.flip();
while (buffer.hasRemaining()) {
channel.write(buffer);
}
// 接收回复
buffer.clear();
// 等待数据可读
while (true) {
int bytesRead = channel.read(buffer);
if (bytesRead > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
System.out.println("服务器回复: " + new String(data));
break;
} else if (bytesRead == 0) {
// 暂时没有数据,稍等片刻
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
} else {
// 连接关闭
System.out.println("服务器关闭连接");
return;
}
}
if ("bye".equalsIgnoreCase(input)) {
break;
}
}
} catch (IOException e) {
System.out.println("客户端异常: " + e.getMessage());
}
}
}
3. HTTP客户端编程
3.1 使用HttpURLConnection
// HTTP客户端演示
public class HTTPClientDemo {
public static void main(String[] args) {
demonstrateHttpURLConnection();
demonstrateHttpClient();
}
// HttpURLConnection演示
public static void demonstrateHttpURLConnection() {
System.out.println("=== HttpURLConnection演示 ===");
try {
// GET请求
performGetRequest();
// POST请求
performPostRequest();
} catch (Exception e) {
System.out.println("HTTP请求异常: " + e.getMessage());
}
System.out.println();
}
private static void performGetRequest() throws IOException {
System.out.println("--- GET请求演示 ---");
URL url = new URL("https://httpbin.org/get?param1=value1¶m2=value2");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法和属性
connection.setRequestMethod("GET");
connection.setRequestProperty("User-Agent", "Java-HTTP-Client/1.0");
connection.setRequestProperty("Accept", "application/json");
connection.setConnectTimeout(5000);
connection.setReadTimeout(5000);
// 获取响应
int responseCode = connection.getResponseCode();
System.out.println("响应码: " + responseCode);
// 读取响应头
System.out.println("响应头:");
for (Map.Entry<String, List<String>> header : connection.getHeaderFields().entrySet()) {
System.out.println(" " + header.getKey() + ": " + header.getValue());
}
// 读取响应体
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(connection.getInputStream()))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line).append("\n");
}
System.out.println("响应体:");
System.out.println(response.toString());
}
connection.disconnect();
}
private static void performPostRequest() throws IOException {
System.out.println("--- POST请求演示 ---");
URL url = new URL("https://httpbin.org/post");
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// 设置请求方法和属性
connection.setRequestMethod("POST");
connection.setRequestProperty("Content-Type", "application/json");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);
// 准备请求体
String jsonInputString = "{\"name\": \"张三\", \"age\": 25, \"city\": \"北京\"}";
// 发送请求体
try (OutputStream os = connection.getOutputStream()) {
byte[] input = jsonInputString.getBytes("utf-8");
os.write(input, 0, input.length);
}
// 获取响应
int responseCode = connection.getResponseCode();
System.out.println("响应码: " + responseCode);
// 读取响应体
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(connection.getInputStream()))) {
StringBuilder response = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
response.append(line).append("\n");
}
System.out.println("响应体:");
System.out.println(response.toString());
}
connection.disconnect();
}
// Java 11+ HttpClient演示
public static void demonstrateHttpClient() {
System.out.println("=== Java 11+ HttpClient演示 ===");
try {
// 创建HttpClient
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
// GET请求
HttpRequest getRequest = HttpRequest.newBuilder()
.uri(URI.create("https://httpbin.org/get"))
.header("User-Agent", "Java-HttpClient/11")
.GET()
.build();
HttpResponse<String> getResponse = client.send(getRequest,
HttpResponse.BodyHandlers.ofString());
System.out.println("GET响应码: " + getResponse.statusCode());
System.out.println("GET响应体: " + getResponse.body());
// POST请求
String jsonData = "{\"message\": \"Hello from Java HttpClient\"}";
HttpRequest postRequest = HttpRequest.newBuilder()
.uri(URI.create("https://httpbin.org/post"))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonData))
.build();
HttpResponse<String> postResponse = client.send(postRequest,
HttpResponse.BodyHandlers.ofString());
System.out.println("POST响应码: " + postResponse.statusCode());
System.out.println("POST响应体: " + postResponse.body());
// 异步请求
CompletableFuture<HttpResponse<String>> asyncResponse =
client.sendAsync(getRequest, HttpResponse.BodyHandlers.ofString());
asyncResponse.thenAccept(response -> {
System.out.println("异步响应码: " + response.statusCode());
System.out.println("异步响应体长度: " + response.body().length());
}).join();
} catch (Exception e) {
System.out.println("HttpClient异常: " + e.getMessage());
}
}
}
3.2 RESTful API客户端
// RESTful API客户端
public class RESTClient {
private final HttpClient httpClient;
private final String baseUrl;
public RESTClient(String baseUrl) {
this.baseUrl = baseUrl;
this.httpClient = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(10))
.build();
}
public static void main(String[] args) {
demonstrateRESTClient();
}
public static void demonstrateRESTClient() {
System.out.println("=== RESTful API客户端演示 ===");
RESTClient client = new RESTClient("https://jsonplaceholder.typicode.com");
try {
// GET - 获取所有用户
System.out.println("--- 获取所有用户 ---");
String users = client.get("/users");
System.out.println("用户列表: " + users.substring(0, Math.min(200, users.length())) + "...");
// GET - 获取特定用户
System.out.println("\n--- 获取用户ID=1 ---");
String user = client.get("/users/1");
System.out.println("用户详情: " + user);
// POST - 创建新用户
System.out.println("\n--- 创建新用户 ---");
String newUserJson = "{\"name\": \"张三\", \"email\": \"zhangsan@example.com\"}";
String createdUser = client.post("/users", newUserJson);
System.out.println("创建的用户: " + createdUser);
// PUT - 更新用户
System.out.println("\n--- 更新用户 ---");
String updateUserJson = "{\"id\": 1, \"name\": \"李四\", \"email\": \"lisi@example.com\"}";
String updatedUser = client.put("/users/1", updateUserJson);
System.out.println("更新的用户: " + updatedUser);
// DELETE - 删除用户
System.out.println("\n--- 删除用户 ---");
int deleteStatus = client.delete("/users/1");
System.out.println("删除状态码: " + deleteStatus);
} catch (Exception e) {
System.out.println("REST客户端异常: " + e.getMessage());
}
}
public String get(String path) throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + path))
.header("Accept", "application/json")
.GET()
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
if (response.statusCode() >= 200 && response.statusCode() < 300) {
return response.body();
} else {
throw new IOException("HTTP错误: " + response.statusCode());
}
}
public String post(String path, String jsonBody) throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + path))
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(jsonBody))
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
return response.body();
}
public String put(String path, String jsonBody) throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + path))
.header("Content-Type", "application/json")
.header("Accept", "application/json")
.PUT(HttpRequest.BodyPublishers.ofString(jsonBody))
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
return response.body();
}
public int delete(String path) throws IOException, InterruptedException {
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(baseUrl + path))
.DELETE()
.build();
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
return response.statusCode();
}
}
4. AIO编程(异步I/O)
4.1 AIO基础概念
// AIO服务器演示
public class AIOServer {
private static final int PORT = 8084;
public static void main(String[] args) {
demonstrateAIOServer();
}
public static void demonstrateAIOServer() {
System.out.println("=== AIO服务器演示 ===");
try {
AsynchronousServerSocketChannel serverChannel =
AsynchronousServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(PORT));
System.out.println("AIO服务器启动,监听端口: " + PORT);
// 异步接受连接
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
// 继续接受下一个连接
serverChannel.accept(null, this);
// 处理当前连接
handleClient(clientChannel);
}
@Override
public void failed(Throwable exc, Void attachment) {
System.out.println("接受连接失败: " + exc.getMessage());
}
});
// 保持服务器运行
System.out.println("按Enter键停止服务器...");
System.in.read();
serverChannel.close();
} catch (IOException e) {
System.out.println("AIO服务器异常: " + e.getMessage());
}
}
private static void handleClient(AsynchronousSocketChannel clientChannel) {
try {
System.out.println("新客户端连接: " + clientChannel.getRemoteAddress());
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 异步读取数据
clientChannel.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesRead, ByteBuffer buffer) {
if (bytesRead > 0) {
buffer.flip();
// 读取消息
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
String message = new String(data).trim();
System.out.println("收到消息: " + message);
// 准备回复
String response = "回显: " + message;
ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
// 异步写入回复
clientChannel.write(responseBuffer, responseBuffer,
new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer bytesWritten, ByteBuffer buffer) {
System.out.println("回复发送完成,字节数: " + bytesWritten);
if (!"bye".equalsIgnoreCase(message)) {
// 继续读取下一条消息
ByteBuffer nextBuffer = ByteBuffer.allocate(1024);
clientChannel.read(nextBuffer, nextBuffer, this);
} else {
// 关闭连接
try {
clientChannel.close();
System.out.println("客户端连接关闭");
} catch (IOException e) {
System.out.println("关闭连接异常: " + e.getMessage());
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
System.out.println("写入失败: " + exc.getMessage());
}
});
} else if (bytesRead == -1) {
// 客户端断开连接
try {
clientChannel.close();
System.out.println("客户端断开连接");
} catch (IOException e) {
System.out.println("关闭连接异常: " + e.getMessage());
}
}
}
@Override
public void failed(Throwable exc, ByteBuffer buffer) {
System.out.println("读取失败: " + exc.getMessage());
}
});
} catch (IOException e) {
System.out.println("处理客户端异常: " + e.getMessage());
}
}
}
4.2 AIO客户端
// AIO客户端演示
public class AIOClient {
private static final String HOST = "localhost";
private static final int PORT = 8084;
public static void main(String[] args) {
demonstrateAIOClient();
}
public static void demonstrateAIOClient() {
System.out.println("=== AIO客户端演示 ===");
try {
AsynchronousSocketChannel clientChannel = AsynchronousSocketChannel.open();
// 异步连接服务器
Future<Void> connectFuture = clientChannel.connect(
new InetSocketAddress(HOST, PORT));
connectFuture.get(); // 等待连接完成
System.out.println("连接到AIO服务器: " + HOST + ":" + PORT);
Scanner scanner = new Scanner(System.in);
System.out.println("输入消息 (输入 'bye' 退出):");
while (true) {
System.out.print("> ");
String input = scanner.nextLine();
if (input == null || input.trim().isEmpty()) {
continue;
}
// 发送消息
ByteBuffer sendBuffer = ByteBuffer.wrap(input.getBytes());
Future<Integer> writeFuture = clientChannel.write(sendBuffer);
writeFuture.get(); // 等待写入完成
// 接收回复
ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
Future<Integer> readFuture = clientChannel.read(receiveBuffer);
int bytesRead = readFuture.get(); // 等待读取完成
if (bytesRead > 0) {
receiveBuffer.flip();
byte[] data = new byte[receiveBuffer.remaining()];
receiveBuffer.get(data);
System.out.println("服务器回复: " + new String(data));
}
if ("bye".equalsIgnoreCase(input)) {
break;
}
}
clientChannel.close();
scanner.close();
} catch (Exception e) {
System.out.println("AIO客户端异常: " + e.getMessage());
}
}
}
5. WebSocket编程
5.1 WebSocket服务器
// 简单的WebSocket服务器实现
public class SimpleWebSocketServer {
private static final int PORT = 8085;
public static void main(String[] args) {
demonstrateWebSocketServer();
}
public static void demonstrateWebSocketServer() {
System.out.println("=== WebSocket服务器演示 ===");
try (ServerSocket serverSocket = new ServerSocket(PORT)) {
System.out.println("WebSocket服务器启动,监听端口: " + PORT);
while (true) {
Socket clientSocket = serverSocket.accept();
System.out.println("新客户端连接: " + clientSocket.getRemoteSocketAddress());
// 为每个客户端创建处理线程
Thread clientHandler = new Thread(new WebSocketHandler(clientSocket));
clientHandler.start();
}
} catch (IOException e) {
System.out.println("WebSocket服务器异常: " + e.getMessage());
}
}
}
// WebSocket处理器
class WebSocketHandler implements Runnable {
private Socket clientSocket;
private boolean isWebSocketUpgraded = false;
public WebSocketHandler(Socket clientSocket) {
this.clientSocket = clientSocket;
}
@Override
public void run() {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(clientSocket.getInputStream()));
OutputStream outputStream = clientSocket.getOutputStream()) {
// 读取HTTP请求头
String line;
StringBuilder requestHeaders = new StringBuilder();
String webSocketKey = null;
while ((line = reader.readLine()) != null && !line.isEmpty()) {
requestHeaders.append(line).append("\n");
if (line.startsWith("Sec-WebSocket-Key:")) {
webSocketKey = line.substring("Sec-WebSocket-Key:".length()).trim();
}
}
System.out.println("收到HTTP请求:");
System.out.println(requestHeaders.toString());
if (webSocketKey != null) {
// 执行WebSocket握手
performWebSocketHandshake(outputStream, webSocketKey);
isWebSocketUpgraded = true;
// 处理WebSocket消息
handleWebSocketMessages(clientSocket.getInputStream(), outputStream);
} else {
// 返回HTTP响应
sendHttpResponse(outputStream);
}
} catch (Exception e) {
System.out.println("WebSocket处理异常: " + e.getMessage());
} finally {
try {
clientSocket.close();
System.out.println("客户端连接关闭");
} catch (IOException e) {
System.out.println("关闭连接异常: " + e.getMessage());
}
}
}
private void performWebSocketHandshake(OutputStream outputStream, String webSocketKey)
throws Exception {
// WebSocket握手响应
String acceptKey = generateWebSocketAcceptKey(webSocketKey);
String response = "HTTP/1.1 101 Switching Protocols\r\n" +
"Upgrade: websocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Accept: " + acceptKey + "\r\n" +
"\r\n";
outputStream.write(response.getBytes());
outputStream.flush();
System.out.println("WebSocket握手完成");
}
private String generateWebSocketAcceptKey(String webSocketKey) throws Exception {
String magic = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
String combined = webSocketKey + magic;
MessageDigest digest = MessageDigest.getInstance("SHA-1");
byte[] hash = digest.digest(combined.getBytes());
return Base64.getEncoder().encodeToString(hash);
}
private void handleWebSocketMessages(InputStream inputStream, OutputStream outputStream)
throws IOException {
byte[] buffer = new byte[1024];
while (true) {
int bytesRead = inputStream.read(buffer);
if (bytesRead == -1) {
break;
}
// 简化的WebSocket帧解析(仅处理文本消息)
if (bytesRead >= 2) {
boolean fin = (buffer[0] & 0x80) != 0;
int opcode = buffer[0] & 0x0F;
boolean masked = (buffer[1] & 0x80) != 0;
int payloadLength = buffer[1] & 0x7F;
if (opcode == 1 && masked) { // 文本帧且已掩码
int maskStart = 2;
if (payloadLength == 126) {
maskStart = 4;
payloadLength = ((buffer[2] & 0xFF) << 8) | (buffer[3] & 0xFF);
} else if (payloadLength == 127) {
maskStart = 10;
// 简化处理,不支持超长消息
}
byte[] mask = new byte[4];
System.arraycopy(buffer, maskStart, mask, 0, 4);
byte[] payload = new byte[payloadLength];
System.arraycopy(buffer, maskStart + 4, payload, 0, payloadLength);
// 解除掩码
for (int i = 0; i < payloadLength; i++) {
payload[i] ^= mask[i % 4];
}
String message = new String(payload);
System.out.println("收到WebSocket消息: " + message);
// 发送回复
String response = "回显: " + message;
sendWebSocketMessage(outputStream, response);
if ("bye".equalsIgnoreCase(message)) {
break;
}
}
}
}
}
private void sendWebSocketMessage(OutputStream outputStream, String message)
throws IOException {
byte[] messageBytes = message.getBytes();
int length = messageBytes.length;
// 构建WebSocket帧
ByteArrayOutputStream frame = new ByteArrayOutputStream();
// 第一个字节:FIN=1, RSV=000, Opcode=0001 (文本)
frame.write(0x81);
// 第二个字节:MASK=0, Payload length
if (length < 126) {
frame.write(length);
} else if (length < 65536) {
frame.write(126);
frame.write((length >> 8) & 0xFF);
frame.write(length & 0xFF);
} else {
frame.write(127);
// 简化处理,不支持超长消息
for (int i = 0; i < 8; i++) {
frame.write(0);
}
}
// 载荷数据
frame.write(messageBytes);
outputStream.write(frame.toByteArray());
outputStream.flush();
}
private void sendHttpResponse(OutputStream outputStream) throws IOException {
String htmlResponse = "<!DOCTYPE html>\n" +
"<html>\n" +
"<head><title>WebSocket测试</title></head>\n" +
"<body>\n" +
"<h1>WebSocket服务器</h1>\n" +
"<p>请使用WebSocket客户端连接到此服务器。</p>\n" +
"</body>\n" +
"</html>";
String response = "HTTP/1.1 200 OK\r\n" +
"Content-Type: text/html\r\n" +
"Content-Length: " + htmlResponse.length() + "\r\n" +
"\r\n" +
htmlResponse;
outputStream.write(response.getBytes());
outputStream.flush();
}
}
6. 网络协议处理
6.1 自定义协议实现
// 自定义协议消息
class ProtocolMessage {
private int messageType;
private int messageId;
private byte[] data;
public ProtocolMessage(int messageType, int messageId, byte[] data) {
this.messageType = messageType;
this.messageId = messageId;
this.data = data;
}
// 序列化为字节数组
public byte[] serialize() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
try {
dos.writeInt(messageType);
dos.writeInt(messageId);
dos.writeInt(data.length);
dos.write(data);
dos.flush();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
return baos.toByteArray();
}
// 从字节数组反序列化
public static ProtocolMessage deserialize(byte[] bytes) {
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
DataInputStream dis = new DataInputStream(bais);
try {
int messageType = dis.readInt();
int messageId = dis.readInt();
int dataLength = dis.readInt();
byte[] data = new byte[dataLength];
dis.readFully(data);
return new ProtocolMessage(messageType, messageId, data);
} catch (IOException e) {
throw new RuntimeException("反序列化失败", e);
}
}
// Getters
public int getMessageType() { return messageType; }
public int getMessageId() { return messageId; }
public byte[] getData() { return data; }
public String getDataAsString() { return new String(data); }
@Override
public String toString() {
return String.format("ProtocolMessage{type=%d, id=%d, data='%s'}",
messageType, messageId, getDataAsString());
}
}
// 协议处理演示
public class ProtocolDemo {
// 消息类型常量
public static final int MSG_TYPE_HEARTBEAT = 1;
public static final int MSG_TYPE_REQUEST = 2;
public static final int MSG_TYPE_RESPONSE = 3;
public static final int MSG_TYPE_NOTIFICATION = 4;
public static void main(String[] args) {
demonstrateProtocol();
}
public static void demonstrateProtocol() {
System.out.println("=== 自定义协议演示 ===");
// 创建不同类型的消息
ProtocolMessage heartbeat = new ProtocolMessage(
MSG_TYPE_HEARTBEAT, 1, "ping".getBytes());
ProtocolMessage request = new ProtocolMessage(
MSG_TYPE_REQUEST, 2, "获取用户信息".getBytes());
ProtocolMessage response = new ProtocolMessage(
MSG_TYPE_RESPONSE, 2, "{\"name\":\"张三\",\"age\":25}".getBytes());
ProtocolMessage notification = new ProtocolMessage(
MSG_TYPE_NOTIFICATION, 3, "系统维护通知".getBytes());
// 序列化和反序列化测试
testSerialization(heartbeat);
testSerialization(request);
testSerialization(response);
testSerialization(notification);
// 协议服务器演示
startProtocolServer();
}
private static void testSerialization(ProtocolMessage message) {
System.out.println("\n--- 序列化测试 ---");
System.out.println("原始消息: " + message);
// 序列化
byte[] serialized = message.serialize();
System.out.println("序列化后字节数: " + serialized.length);
// 反序列化
ProtocolMessage deserialized = ProtocolMessage.deserialize(serialized);
System.out.println("反序列化消息: " + deserialized);
// 验证一致性
boolean isEqual = message.getMessageType() == deserialized.getMessageType() &&
message.getMessageId() == deserialized.getMessageId() &&
Arrays.equals(message.getData(), deserialized.getData());
System.out.println("序列化一致性: " + (isEqual ? "通过" : "失败"));
}
private static void startProtocolServer() {
System.out.println("\n=== 协议服务器启动 ===");
Thread serverThread = new Thread(() -> {
try (ServerSocket serverSocket = new ServerSocket(8086)) {
System.out.println("协议服务器监听端口: 8086");
Socket clientSocket = serverSocket.accept();
System.out.println("客户端连接: " + clientSocket.getRemoteSocketAddress());
handleProtocolClient(clientSocket);
} catch (IOException e) {
System.out.println("协议服务器异常: " + e.getMessage());
}
});
serverThread.start();
// 等待服务器启动
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 启动客户端测试
testProtocolClient();
}
private static void handleProtocolClient(Socket clientSocket) {
try (DataInputStream dis = new DataInputStream(clientSocket.getInputStream());
DataOutputStream dos = new DataOutputStream(clientSocket.getOutputStream())) {
while (true) {
// 读取消息长度
int messageLength = dis.readInt();
if (messageLength <= 0) break;
// 读取消息内容
byte[] messageBytes = new byte[messageLength];
dis.readFully(messageBytes);
// 反序列化消息
ProtocolMessage message = ProtocolMessage.deserialize(messageBytes);
System.out.println("服务器收到: " + message);
// 处理消息并发送回复
ProtocolMessage response = processMessage(message);
byte[] responseBytes = response.serialize();
dos.writeInt(responseBytes.length);
dos.write(responseBytes);
dos.flush();
System.out.println("服务器发送: " + response);
if (message.getMessageType() == MSG_TYPE_HEARTBEAT &&
"bye".equals(message.getDataAsString())) {
break;
}
}
} catch (IOException e) {
System.out.println("处理协议客户端异常: " + e.getMessage());
} finally {
try {
clientSocket.close();
} catch (IOException e) {
System.out.println("关闭客户端连接异常: " + e.getMessage());
}
}
}
private static ProtocolMessage processMessage(ProtocolMessage message) {
switch (message.getMessageType()) {
case MSG_TYPE_HEARTBEAT:
return new ProtocolMessage(MSG_TYPE_HEARTBEAT,
message.getMessageId(), "pong".getBytes());
case MSG_TYPE_REQUEST:
String requestData = message.getDataAsString();
String responseData = "处理请求: " + requestData + " -> 成功";
return new ProtocolMessage(MSG_TYPE_RESPONSE,
message.getMessageId(), responseData.getBytes());
default:
return new ProtocolMessage(MSG_TYPE_RESPONSE,
message.getMessageId(), "未知消息类型".getBytes());
}
}
private static void testProtocolClient() {
System.out.println("\n=== 协议客户端测试 ===");
try (Socket socket = new Socket("localhost", 8086);
DataInputStream dis = new DataInputStream(socket.getInputStream());
DataOutputStream dos = new DataOutputStream(socket.getOutputStream())) {
// 发送心跳消息
sendMessage(dos, new ProtocolMessage(MSG_TYPE_HEARTBEAT, 1, "ping".getBytes()));
receiveMessage(dis);
// 发送请求消息
sendMessage(dos, new ProtocolMessage(MSG_TYPE_REQUEST, 2, "查询订单".getBytes()));
receiveMessage(dis);
// 发送结束消息
sendMessage(dos, new ProtocolMessage(MSG_TYPE_HEARTBEAT, 3, "bye".getBytes()));
receiveMessage(dis);
} catch (IOException e) {
System.out.println("协议客户端异常: " + e.getMessage());
}
}
private static void sendMessage(DataOutputStream dos, ProtocolMessage message)
throws IOException {
byte[] messageBytes = message.serialize();
dos.writeInt(messageBytes.length);
dos.write(messageBytes);
dos.flush();
System.out.println("客户端发送: " + message);
}
private static void receiveMessage(DataInputStream dis) throws IOException {
int messageLength = dis.readInt();
byte[] messageBytes = new byte[messageLength];
dis.readFully(messageBytes);
ProtocolMessage message = ProtocolMessage.deserialize(messageBytes);
System.out.println("客户端收到: " + message);
}
}
7. 网络编程最佳实践
7.1 性能优化
// 网络性能优化演示
public class NetworkPerformanceDemo {
public static void main(String[] args) {
demonstrateBufferOptimization();
demonstrateConnectionPooling();
demonstrateAsyncProcessing();
}
// 缓冲区优化
public static void demonstrateBufferOptimization() {
System.out.println("=== 缓冲区优化演示 ===");
// 测试不同缓冲区大小的性能
int[] bufferSizes = {1024, 4096, 8192, 16384};
for (int bufferSize : bufferSizes) {
long startTime = System.currentTimeMillis();
try {
// 模拟网络数据传输
simulateDataTransfer(bufferSize);
long endTime = System.currentTimeMillis();
System.out.println("缓冲区大小: " + bufferSize +
", 传输时间: " + (endTime - startTime) + "ms");
} catch (IOException e) {
System.out.println("传输异常: " + e.getMessage());
}
}
System.out.println();
}
private static void simulateDataTransfer(int bufferSize) throws IOException {
// 创建测试数据
byte[] testData = new byte[1024 * 1024]; // 1MB数据
Arrays.fill(testData, (byte) 'A');
// 使用ByteArrayInputStream和ByteArrayOutputStream模拟网络传输
try (ByteArrayInputStream input = new ByteArrayInputStream(testData);
ByteArrayOutputStream output = new ByteArrayOutputStream()) {
byte[] buffer = new byte[bufferSize];
int bytesRead;
while ((bytesRead = input.read(buffer)) != -1) {
output.write(buffer, 0, bytesRead);
}
}
}
// 连接池优化
public static void demonstrateConnectionPooling() {
System.out.println("=== 连接池优化演示 ===");
SimpleConnectionPool pool = new SimpleConnectionPool("localhost", 8080, 5);
// 模拟多个并发请求
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 20; i++) {
final int requestId = i;
executor.submit(() -> {
try {
Socket connection = pool.getConnection();
System.out.println("请求 " + requestId + " 获得连接: " + connection);
// 模拟使用连接
Thread.sleep(100);
pool.releaseConnection(connection);
System.out.println("请求 " + requestId + " 释放连接");
} catch (Exception e) {
System.out.println("请求 " + requestId + " 异常: " + e.getMessage());
}
});
}
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
pool.close();
System.out.println();
}
// 异步处理优化
public static void demonstrateAsyncProcessing() {
System.out.println("=== 异步处理优化演示 ===");
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
return "异步任务1完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "任务1被中断";
}
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(800);
return "异步任务2完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "任务2被中断";
}
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1200);
return "异步任务3完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "任务3被中断";
}
});
// 等待所有任务完成
CompletableFuture<Void> allTasks = CompletableFuture.allOf(future1, future2, future3);
allTasks.thenRun(() -> {
try {
System.out.println("结果1: " + future1.get());
System.out.println("结果2: " + future2.get());
System.out.println("结果3: " + future3.get());
} catch (Exception e) {
System.out.println("获取结果异常: " + e.getMessage());
}
}).join();
System.out.println();
}
}
// 简单连接池实现
class SimpleConnectionPool {
private final String host;
private final int port;
private final int maxConnections;
private final Queue<Socket> availableConnections;
private final Set<Socket> usedConnections;
private final Object lock = new Object();
public SimpleConnectionPool(String host, int port, int maxConnections) {
this.host = host;
this.port = port;
this.maxConnections = maxConnections;
this.availableConnections = new LinkedList<>();
this.usedConnections = new HashSet<>();
}
public Socket getConnection() throws IOException {
synchronized (lock) {
if (!availableConnections.isEmpty()) {
Socket connection = availableConnections.poll();
usedConnections.add(connection);
return connection;
} else if (usedConnections.size() < maxConnections) {
Socket connection = new Socket(host, port);
usedConnections.add(connection);
return connection;
} else {
throw new IOException("连接池已满,无法获取新连接");
}
}
}
public void releaseConnection(Socket connection) {
synchronized (lock) {
if (usedConnections.remove(connection)) {
if (!connection.isClosed()) {
availableConnections.offer(connection);
}
}
}
}
public void close() {
synchronized (lock) {
for (Socket connection : availableConnections) {
try {
connection.close();
} catch (IOException e) {
System.out.println("关闭连接异常: " + e.getMessage());
}
}
for (Socket connection : usedConnections) {
try {
connection.close();
} catch (IOException e) {
System.out.println("关闭连接异常: " + e.getMessage());
}
}
availableConnections.clear();
usedConnections.clear();
}
}
}
7.2 安全最佳实践
// 网络安全演示
public class NetworkSecurityDemo {
public static void main(String[] args) {
demonstrateSSLSocket();
demonstrateInputValidation();
demonstrateRateLimiting();
}
// SSL/TLS安全连接
public static void demonstrateSSLSocket() {
System.out.println("=== SSL安全连接演示 ===");
try {
// 创建SSL上下文
SSLContext sslContext = SSLContext.getInstance("TLS");
sslContext.init(null, null, null);
// 创建SSL Socket工厂
SSLSocketFactory factory = sslContext.getSocketFactory();
// 连接到HTTPS服务器
try (SSLSocket sslSocket = (SSLSocket) factory.createSocket("httpbin.org", 443)) {
// 启用所有支持的协议
sslSocket.setEnabledProtocols(sslSocket.getSupportedProtocols());
// 发送HTTP请求
PrintWriter out = new PrintWriter(sslSocket.getOutputStream(), true);
BufferedReader in = new BufferedReader(new InputStreamReader(sslSocket.getInputStream()));
out.println("GET /get HTTP/1.1");
out.println("Host: httpbin.org");
out.println("Connection: close");
out.println();
// 读取响应
String line;
int lineCount = 0;
while ((line = in.readLine()) != null && lineCount < 10) {
System.out.println(line);
lineCount++;
}
System.out.println("SSL连接成功,协议: " + sslSocket.getSession().getProtocol());
}
} catch (Exception e) {
System.out.println("SSL连接异常: " + e.getMessage());
}
System.out.println();
}
// 输入验证
public static void demonstrateInputValidation() {
System.out.println("=== 输入验证演示 ===");
String[] testInputs = {
"正常输入",
"<script>alert('XSS')</script>",
"'; DROP TABLE users; --",
"../../../etc/passwd",
"正常的长文本输入",
"包含特殊字符的输入: @#$%^&*()"
};
for (String input : testInputs) {
boolean isValid = validateInput(input);
System.out.println("输入: \"" + input + "\" -> " + (isValid ? "有效" : "无效"));
}
System.out.println();
}
private static boolean validateInput(String input) {
if (input == null || input.trim().isEmpty()) {
return false;
}
// 长度检查
if (input.length() > 1000) {
return false;
}
// 危险字符检查
String[] dangerousPatterns = {
"<script", "</script>", "javascript:", "vbscript:",
"DROP TABLE", "DELETE FROM", "INSERT INTO", "UPDATE SET",
"../", "..\\\\" // 路径遍历
};
String lowerInput = input.toLowerCase();
for (String pattern : dangerousPatterns) {
if (lowerInput.contains(pattern.toLowerCase())) {
return false;
}
}
return true;
}
// 速率限制
public static void demonstrateRateLimiting() {
System.out.println("=== 速率限制演示 ===");
RateLimiter rateLimiter = new RateLimiter(5, 1000); // 每秒最多5个请求
// 模拟客户端请求
for (int i = 1; i <= 10; i++) {
boolean allowed = rateLimiter.isAllowed("client1");
System.out.println("请求 " + i + ": " + (allowed ? "允许" : "被限制"));
try {
Thread.sleep(150); // 模拟请求间隔
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println();
}
}
// 简单的速率限制器
class RateLimiter {
private final int maxRequests;
private final long timeWindowMs;
private final Map<String, Queue<Long>> clientRequests;
public RateLimiter(int maxRequests, long timeWindowMs) {
this.maxRequests = maxRequests;
this.timeWindowMs = timeWindowMs;
this.clientRequests = new ConcurrentHashMap<>();
}
public boolean isAllowed(String clientId) {
long currentTime = System.currentTimeMillis();
Queue<Long> requests = clientRequests.computeIfAbsent(clientId,
k -> new LinkedList<>());
synchronized (requests) {
// 清理过期请求
while (!requests.isEmpty() &&
currentTime - requests.peek() > timeWindowMs) {
requests.poll();
}
// 检查是否超过限制
if (requests.size() >= maxRequests) {
return false;
}
// 记录当前请求
requests.offer(currentTime);
return true;
}
}
}
7.3 错误处理和监控
// 网络错误处理和监控
public class NetworkMonitoringDemo {
public static void main(String[] args) {
demonstrateErrorHandling();
demonstrateNetworkMonitoring();
}
// 错误处理演示
public static void demonstrateErrorHandling() {
System.out.println("=== 网络错误处理演示 ===");
NetworkClient client = new NetworkClient();
// 测试各种网络错误情况
client.testConnection("localhost", 8080); // 可能的连接拒绝
client.testConnection("nonexistent.host", 80); // 主机不存在
client.testConnection("httpbin.org", 80); // 正常连接
System.out.println();
}
// 网络监控演示
public static void demonstrateNetworkMonitoring() {
System.out.println("=== 网络监控演示 ===");
NetworkMonitor monitor = new NetworkMonitor();
// 启动监控
monitor.startMonitoring();
// 模拟一些网络活动
for (int i = 0; i < 5; i++) {
monitor.recordRequest("GET", "/api/users", 200, 150);
monitor.recordRequest("POST", "/api/orders", 201, 300);
monitor.recordRequest("GET", "/api/products", 404, 50);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// 显示监控统计
monitor.printStatistics();
monitor.stopMonitoring();
System.out.println();
}
}
// 网络客户端错误处理
class NetworkClient {
private static final int CONNECT_TIMEOUT = 5000;
private static final int READ_TIMEOUT = 10000;
private static final int MAX_RETRIES = 3;
public void testConnection(String host, int port) {
System.out.println("\n--- 测试连接: " + host + ":" + port + " ---");
for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port), CONNECT_TIMEOUT);
socket.setSoTimeout(READ_TIMEOUT);
System.out.println("连接成功 (尝试 " + attempt + "/" + MAX_RETRIES + ")");
socket.close();
return;
} catch (UnknownHostException e) {
System.out.println("主机不存在: " + e.getMessage());
break; // 不需要重试
} catch (ConnectException e) {
System.out.println("连接被拒绝 (尝试 " + attempt + "/" + MAX_RETRIES + "): " + e.getMessage());
} catch (SocketTimeoutException e) {
System.out.println("连接超时 (尝试 " + attempt + "/" + MAX_RETRIES + "): " + e.getMessage());
} catch (IOException e) {
System.out.println("I/O异常 (尝试 " + attempt + "/" + MAX_RETRIES + "): " + e.getMessage());
}
if (attempt < MAX_RETRIES) {
try {
Thread.sleep(1000 * attempt); // 指数退避
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
System.out.println("连接失败,已达到最大重试次数");
}
}
// 网络监控器
class NetworkMonitor {
private final Map<String, RequestStats> endpointStats;
private final AtomicLong totalRequests;
private final AtomicLong totalErrors;
private volatile boolean monitoring;
public NetworkMonitor() {
this.endpointStats = new ConcurrentHashMap<>();
this.totalRequests = new AtomicLong(0);
this.totalErrors = new AtomicLong(0);
this.monitoring = false;
}
public void startMonitoring() {
monitoring = true;
System.out.println("网络监控已启动");
}
public void stopMonitoring() {
monitoring = false;
System.out.println("网络监控已停止");
}
public void recordRequest(String method, String endpoint, int statusCode, long responseTimeMs) {
if (!monitoring) return;
totalRequests.incrementAndGet();
if (statusCode >= 400) {
totalErrors.incrementAndGet();
}
String key = method + " " + endpoint;
RequestStats stats = endpointStats.computeIfAbsent(key, k -> new RequestStats());
stats.addRequest(statusCode, responseTimeMs);
System.out.println("记录请求: " + key + " -> " + statusCode + " (" + responseTimeMs + "ms)");
}
public void printStatistics() {
System.out.println("\n=== 网络监控统计 ===");
System.out.println("总请求数: " + totalRequests.get());
System.out.println("总错误数: " + totalErrors.get());
System.out.println("错误率: " + String.format("%.2f%%",
(totalErrors.get() * 100.0) / Math.max(1, totalRequests.get())));
System.out.println("\n端点统计:");
for (Map.Entry<String, RequestStats> entry : endpointStats.entrySet()) {
System.out.println(" " + entry.getKey() + ": " + entry.getValue());
}
}
}
// 请求统计
class RequestStats {
private final AtomicLong requestCount;
private final AtomicLong errorCount;
private final AtomicLong totalResponseTime;
private final AtomicLong maxResponseTime;
private final AtomicLong minResponseTime;
public RequestStats() {
this.requestCount = new AtomicLong(0);
this.errorCount = new AtomicLong(0);
this.totalResponseTime = new AtomicLong(0);
this.maxResponseTime = new AtomicLong(0);
this.minResponseTime = new AtomicLong(Long.MAX_VALUE);
}
public void addRequest(int statusCode, long responseTimeMs) {
requestCount.incrementAndGet();
if (statusCode >= 400) {
errorCount.incrementAndGet();
}
totalResponseTime.addAndGet(responseTimeMs);
// 更新最大响应时间
long currentMax = maxResponseTime.get();
while (responseTimeMs > currentMax &&
!maxResponseTime.compareAndSet(currentMax, responseTimeMs)) {
currentMax = maxResponseTime.get();
}
// 更新最小响应时间
long currentMin = minResponseTime.get();
while (responseTimeMs < currentMin &&
!minResponseTime.compareAndSet(currentMin, responseTimeMs)) {
currentMin = minResponseTime.get();
}
}
@Override
public String toString() {
long count = requestCount.get();
if (count == 0) return "无请求";
double avgResponseTime = (double) totalResponseTime.get() / count;
double errorRate = (errorCount.get() * 100.0) / count;
return String.format("请求数=%d, 错误率=%.1f%%, 平均响应时间=%.1fms, 最小=%dms, 最大=%dms",
count, errorRate, avgResponseTime,
minResponseTime.get() == Long.MAX_VALUE ? 0 : minResponseTime.get(),
maxResponseTime.get());
}
}
本章小结
网络编程技术对比
技术 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
传统Socket | 简单的客户端-服务器通信 | 简单易懂,直接控制 | 阻塞I/O,性能有限 |
NIO | 高并发服务器 | 非阻塞,高性能 | 编程复杂,学习成本高 |
AIO | 异步I/O密集型应用 | 真正异步,回调机制 | 复杂度高,调试困难 |
HTTP客户端 | Web服务调用 | 标准协议,易于集成 | 协议开销,不适合实时通信 |
WebSocket | 实时双向通信 | 低延迟,全双工 | 协议复杂,需要特殊处理 |
性能优化要点
缓冲区优化
- 选择合适的缓冲区大小
- 使用直接内存缓冲区
- 避免频繁的小数据传输
连接管理
- 使用连接池复用连接
- 设置合理的超时时间
- 及时关闭无用连接
并发处理
- 使用线程池处理请求
- 采用异步编程模型
- 避免阻塞主线程
安全考虑
传输安全
- 使用SSL/TLS加密通信
- 验证服务器证书
- 选择安全的加密算法
输入验证
- 验证所有输入数据
- 防止注入攻击
- 限制输入长度和格式
访问控制
- 实施速率限制
- 使用身份认证
- 记录访问日志
开发建议
选择合适的技术
- 根据应用需求选择网络编程模型
- 考虑性能、复杂度和维护成本
- 优先使用成熟的框架和库
错误处理
- 实现完善的异常处理机制
- 提供有意义的错误信息
- 实施重试和降级策略
监控和调试
- 添加详细的日志记录
- 监控网络性能指标
- 使用网络分析工具
下一章预告
下一章我们将学习Java Web开发,包括: - Servlet和JSP基础 - Spring框架核心概念 - Spring Boot快速开发 - RESTful Web服务 - Web安全和认证 - 前后端分离架构
练习题
基础练习
TCP聊天室
- 实现一个多用户TCP聊天室
- 支持用户登录、消息广播、私聊功能
- 使用多线程处理多个客户端连接
文件传输服务
- 实现TCP文件传输客户端和服务器
- 支持大文件传输、断点续传
- 添加传输进度显示和错误恢复
HTTP代理服务器
- 实现简单的HTTP代理服务器
- 支持GET和POST请求转发
- 添加请求日志和缓存功能
进阶练习
NIO Web服务器
- 使用NIO实现高性能Web服务器
- 支持静态文件服务和简单的动态内容
- 实现连接池和请求路由
分布式消息系统
- 设计并实现简单的消息队列系统
- 支持发布-订阅模式
- 实现消息持久化和集群部署
网络监控工具
- 开发网络性能监控工具
- 监控延迟、吞吐量、错误率等指标
- 提供实时图表和告警功能
挑战练习
自定义RPC框架
- 实现简单的RPC(远程过程调用)框架
- 支持服务注册、发现和负载均衡
- 实现序列化、网络传输和代理生成
WebSocket游戏服务器
- 开发实时多人在线游戏服务器
- 使用WebSocket实现实时通信
- 处理游戏状态同步和冲突解决
网络安全扫描器
- 实现网络端口扫描工具
- 检测常见的安全漏洞
- 生成安全评估报告
通过这些练习,你将深入理解Java网络编程的各个方面,掌握构建高性能、安全可靠的网络应用程序的技能。 “`