本章目标

通过本章学习,你将掌握:

  1. 线程基础概念:理解进程与线程、线程生命周期、线程创建方式
  2. 线程同步机制:掌握synchronized、Lock、volatile等同步工具
  3. 并发工具类:学习CountDownLatch、Semaphore、CyclicBarrier等
  4. 线程池技术:理解线程池原理、配置和最佳实践
  5. 并发编程模式:掌握生产者消费者、读写锁等常见模式
  6. 性能优化:学习并发性能调优和问题诊断

1. 线程基础

1.1 进程与线程

// 线程基础概念演示
public class ThreadBasicsDemo {
    
    public static void main(String[] args) {
        demonstrateProcessAndThread();
        demonstrateThreadCreation();
        demonstrateThreadLifecycle();
        demonstrateThreadProperties();
    }
    
    // 进程与线程概念
    public static void demonstrateProcessAndThread() {
        System.out.println("=== 进程与线程 ===");
        
        // 获取当前进程信息
        ProcessHandle currentProcess = ProcessHandle.current();
        System.out.println("当前进程ID: " + currentProcess.pid());
        System.out.println("进程启动时间: " + currentProcess.info().startInstant().orElse(null));
        System.out.println("进程命令: " + currentProcess.info().command().orElse("未知"));
        
        // 获取当前线程信息
        Thread currentThread = Thread.currentThread();
        System.out.println("\n当前线程名称: " + currentThread.getName());
        System.out.println("当前线程ID: " + currentThread.getId());
        System.out.println("当前线程状态: " + currentThread.getState());
        System.out.println("当前线程优先级: " + currentThread.getPriority());
        System.out.println("是否为守护线程: " + currentThread.isDaemon());
        
        // 获取所有活跃线程
        ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
        while (rootGroup.getParent() != null) {
            rootGroup = rootGroup.getParent();
        }
        
        Thread[] threads = new Thread[rootGroup.activeCount()];
        int count = rootGroup.enumerate(threads);
        
        System.out.println("\n当前进程中的活跃线程数: " + count);
        for (int i = 0; i < count; i++) {
            if (threads[i] != null) {
                System.out.println("  线程: " + threads[i].getName() + 
                                 " (状态: " + threads[i].getState() + ")");
            }
        }
    }
    
    // 线程创建方式
    public static void demonstrateThreadCreation() {
        System.out.println("\n=== 线程创建方式 ===");
        
        // 方式1: 继承Thread类
        System.out.println("1. 继承Thread类:");
        MyThread thread1 = new MyThread("Thread-1");
        thread1.start();
        
        // 方式2: 实现Runnable接口
        System.out.println("\n2. 实现Runnable接口:");
        MyRunnable runnable = new MyRunnable("Runnable-1");
        Thread thread2 = new Thread(runnable);
        thread2.start();
        
        // 方式3: 使用Lambda表达式
        System.out.println("\n3. 使用Lambda表达式:");
        Thread thread3 = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                System.out.println("Lambda线程执行: " + i);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Lambda-Thread");
        thread3.start();
        
        // 方式4: 使用Callable和Future
        System.out.println("\n4. 使用Callable和Future:");
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        Callable<String> callable = () -> {
            Thread.sleep(200);
            return "Callable执行结果: " + Thread.currentThread().getName();
        };
        
        Future<String> future = executor.submit(callable);
        
        try {
            String result = future.get(); // 阻塞等待结果
            System.out.println(result);
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("获取结果失败: " + e.getMessage());
        }
        
        executor.shutdown();
        
        // 等待所有线程完成
        try {
            thread1.join();
            thread2.join();
            thread3.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 线程生命周期
    public static void demonstrateThreadLifecycle() {
        System.out.println("\n=== 线程生命周期 ===");
        
        LifecycleThread lifecycleThread = new LifecycleThread();
        
        // NEW状态
        System.out.println("创建后状态: " + lifecycleThread.getState());
        
        // RUNNABLE状态
        lifecycleThread.start();
        System.out.println("启动后状态: " + lifecycleThread.getState());
        
        try {
            Thread.sleep(100); // 让线程运行一段时间
            System.out.println("运行中状态: " + lifecycleThread.getState());
            
            // 等待线程完成
            lifecycleThread.join();
            System.out.println("完成后状态: " + lifecycleThread.getState());
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 线程属性
    public static void demonstrateThreadProperties() {
        System.out.println("\n=== 线程属性 ===");
        
        Thread thread = new Thread(() -> {
            System.out.println("线程属性演示:");
            Thread current = Thread.currentThread();
            System.out.println("  名称: " + current.getName());
            System.out.println("  ID: " + current.getId());
            System.out.println("  优先级: " + current.getPriority());
            System.out.println("  线程组: " + current.getThreadGroup().getName());
            System.out.println("  是否守护线程: " + current.isDaemon());
            System.out.println("  是否存活: " + current.isAlive());
        });
        
        // 设置线程属性
        thread.setName("PropertyDemo-Thread");
        thread.setPriority(Thread.MAX_PRIORITY);
        thread.setDaemon(false); // 用户线程
        
        thread.start();
        
        try {
            thread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 继承Thread类的方式
class MyThread extends Thread {
    private final String name;
    
    public MyThread(String name) {
        this.name = name;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            System.out.println(name + " 执行: " + i);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

// 实现Runnable接口的方式
class MyRunnable implements Runnable {
    private final String name;
    
    public MyRunnable(String name) {
        this.name = name;
    }
    
    @Override
    public void run() {
        for (int i = 0; i < 3; i++) {
            System.out.println(name + " 执行: " + i);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

// 生命周期演示线程
class LifecycleThread extends Thread {
    @Override
    public void run() {
        try {
            System.out.println("线程开始执行");
            
            // RUNNABLE状态
            for (int i = 0; i < 3; i++) {
                System.out.println("执行任务: " + i);
                Thread.sleep(50); // TIMED_WAITING状态
            }
            
            System.out.println("线程执行完成");
            
        } catch (InterruptedException e) {
            System.out.println("线程被中断");
            Thread.currentThread().interrupt();
        }
    }
}

1.2 线程中断机制

// 线程中断机制演示
public class ThreadInterruptDemo {
    
    public static void main(String[] args) {
        demonstrateInterruptBasics();
        demonstrateInterruptInSleep();
        demonstrateInterruptInLoop();
        demonstrateInterruptWithFlag();
    }
    
    // 中断基础
    public static void demonstrateInterruptBasics() {
        System.out.println("=== 线程中断基础 ===");
        
        Thread worker = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                System.out.println("工作线程正在运行...");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    System.out.println("工作线程被中断");
                    // 重新设置中断状态
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            System.out.println("工作线程结束");
        }, "Worker-Thread");
        
        worker.start();
        
        try {
            Thread.sleep(2000); // 让工作线程运行2秒
            System.out.println("主线程发送中断信号");
            worker.interrupt(); // 发送中断信号
            
            worker.join(); // 等待工作线程结束
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 在sleep中的中断
    public static void demonstrateInterruptInSleep() {
        System.out.println("\n=== 在sleep中的中断 ===");
        
        Thread sleepingThread = new Thread(() -> {
            try {
                System.out.println("线程开始睡眠10秒...");
                Thread.sleep(10000); // 睡眠10秒
                System.out.println("线程睡眠结束");
            } catch (InterruptedException e) {
                System.out.println("线程在睡眠中被中断");
                System.out.println("中断状态: " + Thread.currentThread().isInterrupted());
            }
        }, "Sleeping-Thread");
        
        sleepingThread.start();
        
        try {
            Thread.sleep(1000); // 等待1秒
            System.out.println("主线程发送中断信号");
            sleepingThread.interrupt();
            
            sleepingThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 在循环中的中断
    public static void demonstrateInterruptInLoop() {
        System.out.println("\n=== 在循环中的中断 ===");
        
        Thread loopingThread = new Thread(() -> {
            int count = 0;
            while (!Thread.currentThread().isInterrupted()) {
                count++;
                if (count % 1000000 == 0) {
                    System.out.println("循环计数: " + count);
                }
                
                // 检查中断状态
                if (Thread.interrupted()) {
                    System.out.println("检测到中断,退出循环");
                    break;
                }
            }
            System.out.println("循环线程结束,最终计数: " + count);
        }, "Looping-Thread");
        
        loopingThread.start();
        
        try {
            Thread.sleep(100); // 让循环运行一段时间
            System.out.println("主线程发送中断信号");
            loopingThread.interrupt();
            
            loopingThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 使用标志位的中断
    public static void demonstrateInterruptWithFlag() {
        System.out.println("\n=== 使用标志位的中断 ===");
        
        InterruptibleTask task = new InterruptibleTask();
        Thread taskThread = new Thread(task, "Task-Thread");
        
        taskThread.start();
        
        try {
            Thread.sleep(2000);
            System.out.println("主线程请求停止任务");
            task.stop(); // 使用自定义停止方法
            
            taskThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 可中断的任务
class InterruptibleTask implements Runnable {
    private volatile boolean running = true;
    
    @Override
    public void run() {
        int count = 0;
        while (running && !Thread.currentThread().isInterrupted()) {
            count++;
            
            // 模拟工作
            if (count % 100000 == 0) {
                System.out.println("任务执行中,计数: " + count);
            }
            
            // 定期检查中断状态
            if (count % 1000000 == 0) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("检测到线程中断信号");
                    break;
                }
            }
        }
        
        System.out.println("任务结束,最终计数: " + count);
        System.out.println("结束原因: " + (running ? "线程中断" : "标志位停止"));
    }
    
    public void stop() {
        running = false;
    }
    
    public boolean isRunning() {
        return running;
    }
}

1.3 线程通信

// 线程通信演示
public class ThreadCommunicationDemo {
    
    public static void main(String[] args) {
        demonstrateWaitNotify();
        demonstrateJoin();
        demonstrateYield();
        demonstrateThreadLocal();
    }
    
    // wait/notify机制
    public static void demonstrateWaitNotify() {
        System.out.println("=== wait/notify机制 ===");
        
        SharedResource resource = new SharedResource();
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    String data = resource.consume();
                    System.out.println("消费者获取: " + data);
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer");
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    String data = "数据-" + i;
                    resource.produce(data);
                    System.out.println("生产者生产: " + data);
                    Thread.sleep(800);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer");
        
        consumer.start();
        producer.start();
        
        try {
            consumer.join();
            producer.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // join方法
    public static void demonstrateJoin() {
        System.out.println("\n=== join方法 ===");
        
        Thread task1 = new Thread(() -> {
            try {
                System.out.println("任务1开始执行");
                Thread.sleep(2000);
                System.out.println("任务1执行完成");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Task-1");
        
        Thread task2 = new Thread(() -> {
            try {
                System.out.println("任务2开始执行");
                Thread.sleep(1500);
                System.out.println("任务2执行完成");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Task-2");
        
        long startTime = System.currentTimeMillis();
        
        task1.start();
        task2.start();
        
        try {
            System.out.println("主线程等待所有任务完成...");
            task1.join(); // 等待task1完成
            task2.join(); // 等待task2完成
            
            long endTime = System.currentTimeMillis();
            System.out.println("所有任务完成,总耗时: " + (endTime - startTime) + "ms");
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // yield方法
    public static void demonstrateYield() {
        System.out.println("\n=== yield方法 ===");
        
        Thread highPriorityThread = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("高优先级线程: " + i);
                Thread.yield(); // 让出CPU时间片
            }
        }, "High-Priority");
        
        Thread lowPriorityThread = new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                System.out.println("低优先级线程: " + i);
            }
        }, "Low-Priority");
        
        highPriorityThread.setPriority(Thread.MAX_PRIORITY);
        lowPriorityThread.setPriority(Thread.MIN_PRIORITY);
        
        highPriorityThread.start();
        lowPriorityThread.start();
        
        try {
            highPriorityThread.join();
            lowPriorityThread.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // ThreadLocal
    public static void demonstrateThreadLocal() {
        System.out.println("\n=== ThreadLocal ===");
        
        ThreadLocalDemo demo = new ThreadLocalDemo();
        
        // 创建多个线程,每个线程都有自己的ThreadLocal值
        for (int i = 0; i < 3; i++) {
            final int threadId = i;
            Thread thread = new Thread(() -> {
                demo.setThreadLocalValue("Thread-" + threadId + "-Value");
                demo.printThreadLocalValue();
                
                // 模拟一些工作
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                demo.printThreadLocalValue(); // 再次打印,验证值没有被其他线程影响
            }, "Thread-" + i);
            
            thread.start();
        }
        
        try {
            Thread.sleep(1000); // 等待所有线程完成
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 共享资源类
class SharedResource {
    private String data;
    private boolean hasData = false;
    
    public synchronized void produce(String data) throws InterruptedException {
        while (hasData) {
            wait(); // 等待消费者消费
        }
        
        this.data = data;
        this.hasData = true;
        notifyAll(); // 通知消费者
    }
    
    public synchronized String consume() throws InterruptedException {
        while (!hasData) {
            wait(); // 等待生产者生产
        }
        
        String result = this.data;
        this.hasData = false;
        notifyAll(); // 通知生产者
        
        return result;
    }
}

// ThreadLocal演示类
class ThreadLocalDemo {
    private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();
    
    public void setThreadLocalValue(String value) {
        threadLocal.set(value);
        System.out.println(Thread.currentThread().getName() + " 设置值: " + value);
    }
    
    public void printThreadLocalValue() {
        String value = threadLocal.get();
        System.out.println(Thread.currentThread().getName() + " 获取值: " + value);
    }
    
    public void removeThreadLocalValue() {
        threadLocal.remove();
        System.out.println(Thread.currentThread().getName() + " 移除值");
    }
}

2. 线程同步

2.1 synchronized关键字

// synchronized同步机制演示
public class SynchronizedDemo {
    
    public static void main(String[] args) {
        demonstrateSynchronizedMethod();
        demonstrateSynchronizedBlock();
        demonstrateStaticSynchronized();
        demonstrateDeadlock();
    }
    
    // 同步方法
    public static void demonstrateSynchronizedMethod() {
        System.out.println("=== 同步方法 ===");
        
        Counter counter = new Counter();
        
        // 创建多个线程同时访问计数器
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            }, "Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("最终计数值: " + counter.getCount());
        System.out.println("预期值: " + (5 * 1000));
    }
    
    // 同步代码块
    public static void demonstrateSynchronizedBlock() {
        System.out.println("\n=== 同步代码块 ===");
        
        BankAccount account = new BankAccount(1000);
        
        // 创建多个线程进行转账操作
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    account.withdraw(1);
                    account.deposit(1);
                }
            }, "Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("最终余额: " + account.getBalance());
        System.out.println("预期余额: 1000");
    }
    
    // 静态同步
    public static void demonstrateStaticSynchronized() {
        System.out.println("\n=== 静态同步 ===");
        
        // 创建多个线程访问静态方法
        Thread[] threads = new Thread[3];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                StaticCounter.increment();
                StaticCounter.printCount();
            }, "Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("最终静态计数: " + StaticCounter.getCount());
    }
    
    // 死锁演示
    public static void demonstrateDeadlock() {
        System.out.println("\n=== 死锁演示 ===");
        
        Object lock1 = new Object();
        Object lock2 = new Object();
        
        Thread thread1 = new Thread(() -> {
            synchronized (lock1) {
                System.out.println("线程1获得lock1");
                
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                System.out.println("线程1等待lock2...");
                synchronized (lock2) {
                    System.out.println("线程1获得lock2");
                }
            }
        }, "DeadLock-Thread-1");
        
        Thread thread2 = new Thread(() -> {
            synchronized (lock2) {
                System.out.println("线程2获得lock2");
                
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                
                System.out.println("线程2等待lock1...");
                synchronized (lock1) {
                    System.out.println("线程2获得lock1");
                }
            }
        }, "DeadLock-Thread-2");
        
        thread1.start();
        thread2.start();
        
        try {
            // 等待3秒,如果线程还没结束,说明发生了死锁
            thread1.join(3000);
            thread2.join(3000);
            
            if (thread1.isAlive() || thread2.isAlive()) {
                System.out.println("检测到死锁!强制中断线程...");
                thread1.interrupt();
                thread2.interrupt();
            }
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 计数器类
class Counter {
    private int count = 0;
    
    // 同步方法
    public synchronized void increment() {
        count++;
    }
    
    public synchronized void decrement() {
        count--;
    }
    
    public synchronized int getCount() {
        return count;
    }
}

// 银行账户类
class BankAccount {
    private double balance;
    private final Object lock = new Object();
    
    public BankAccount(double initialBalance) {
        this.balance = initialBalance;
    }
    
    public void deposit(double amount) {
        synchronized (lock) {
            double oldBalance = balance;
            balance += amount;
            System.out.println(Thread.currentThread().getName() + 
                             " 存款: " + amount + ", 余额: " + oldBalance + " -> " + balance);
        }
    }
    
    public boolean withdraw(double amount) {
        synchronized (lock) {
            if (balance >= amount) {
                double oldBalance = balance;
                balance -= amount;
                System.out.println(Thread.currentThread().getName() + 
                                 " 取款: " + amount + ", 余额: " + oldBalance + " -> " + balance);
                return true;
            } else {
                System.out.println(Thread.currentThread().getName() + 
                                 " 取款失败: 余额不足 (当前余额: " + balance + ", 尝试取款: " + amount + ")");
                return false;
            }
        }
    }
    
    public double getBalance() {
        synchronized (lock) {
            return balance;
        }
    }
}

// 静态计数器类
class StaticCounter {
    private static int count = 0;
    
    // 静态同步方法
    public static synchronized void increment() {
        count++;
        System.out.println(Thread.currentThread().getName() + " 增加计数,当前值: " + count);
    }
    
    public static synchronized int getCount() {
        return count;
    }
    
    public static synchronized void printCount() {
        System.out.println(Thread.currentThread().getName() + " 读取计数: " + count);
    }
}

2.2 Lock接口

// Lock接口演示
public class LockDemo {
    
    public static void main(String[] args) {
        demonstrateReentrantLock();
        demonstrateReadWriteLock();
        demonstrateTryLock();
        demonstrateCondition();
    }
    
    // ReentrantLock演示
    public static void demonstrateReentrantLock() {
        System.out.println("=== ReentrantLock演示 ===");
        
        LockCounter counter = new LockCounter();
        
        // 创建多个线程同时访问计数器
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            }, "Thread-" + i);
        }
        
        long startTime = System.currentTimeMillis();
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        long endTime = System.currentTimeMillis();
        
        System.out.println("最终计数值: " + counter.getCount());
        System.out.println("预期值: " + (5 * 1000));
        System.out.println("执行时间: " + (endTime - startTime) + "ms");
    }
    
    // ReadWriteLock演示
    public static void demonstrateReadWriteLock() {
        System.out.println("\n=== ReadWriteLock演示 ===");
        
        ReadWriteCache cache = new ReadWriteCache();
        
        // 创建读线程
        Thread[] readers = new Thread[5];
        for (int i = 0; i < readers.length; i++) {
            final int readerId = i;
            readers[i] = new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    String value = cache.get("key" + (j % 3));
                    System.out.println("读线程" + readerId + " 读取: key" + (j % 3) + " = " + value);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, "Reader-" + i);
        }
        
        // 创建写线程
        Thread[] writers = new Thread[2];
        for (int i = 0; i < writers.length; i++) {
            final int writerId = i;
            writers[i] = new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    String key = "key" + j;
                    String value = "value" + writerId + "-" + j;
                    cache.put(key, value);
                    System.out.println("写线程" + writerId + " 写入: " + key + " = " + value);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, "Writer-" + i);
        }
        
        // 启动所有线程
        for (Thread reader : readers) {
            reader.start();
        }
        for (Thread writer : writers) {
            writer.start();
        }
        
        // 等待所有线程完成
        for (Thread reader : readers) {
            try {
                reader.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        for (Thread writer : writers) {
            try {
                writer.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // tryLock演示
    public static void demonstrateTryLock() {
        System.out.println("\n=== tryLock演示 ===");
        
        ReentrantLock lock = new ReentrantLock();
        
        // 创建多个线程尝试获取锁
        Thread[] threads = new Thread[3];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                try {
                    // 尝试获取锁,最多等待1秒
                    if (lock.tryLock(1, TimeUnit.SECONDS)) {
                        try {
                            System.out.println("线程" + threadId + " 获得锁,开始工作");
                            Thread.sleep(2000); // 模拟工作
                            System.out.println("线程" + threadId + " 工作完成");
                        } finally {
                            lock.unlock();
                            System.out.println("线程" + threadId + " 释放锁");
                        }
                    } else {
                        System.out.println("线程" + threadId + " 获取锁超时");
                    }
                } catch (InterruptedException e) {
                    System.out.println("线程" + threadId + " 被中断");
                    Thread.currentThread().interrupt();
                }
            }, "TryLock-Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // Condition演示
    public static void demonstrateCondition() {
        System.out.println("\n=== Condition演示 ===");
        
        BoundedBuffer buffer = new BoundedBuffer(3);
        
        // 生产者线程
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String item = "Item-" + i;
                    buffer.put(item);
                    System.out.println("生产者生产: " + item);
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer");
        
        // 消费者线程
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String item = buffer.take();
                    System.out.println("消费者消费: " + item);
                    Thread.sleep(800);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer");
        
        producer.start();
        consumer.start();
        
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 使用Lock的计数器
class LockCounter {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
    
    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
}

// 读写锁缓存
class ReadWriteCache {
    private final Map<String, String> cache = new HashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    
    public String get(String key) {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 获取读锁");
            Thread.sleep(100); // 模拟读取时间
            return cache.get(key);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            System.out.println(Thread.currentThread().getName() + " 释放读锁");
            readLock.unlock();
        }
    }
    
    public void put(String key, String value) {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 获取写锁");
            Thread.sleep(200); // 模拟写入时间
            cache.put(key, value);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(Thread.currentThread().getName() + " 释放写锁");
            writeLock.unlock();
        }
    }
}

// 有界缓冲区
class BoundedBuffer {
    private final String[] buffer;
    private int count = 0;
    private int putIndex = 0;
    private int takeIndex = 0;
    
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    public BoundedBuffer(int capacity) {
        this.buffer = new String[capacity];
    }
    
    public void put(String item) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                System.out.println("缓冲区已满,生产者等待...");
                notFull.await();
            }
            
            buffer[putIndex] = item;
            putIndex = (putIndex + 1) % buffer.length;
            count++;
            
            System.out.println("缓冲区大小: " + count + "/" + buffer.length);
            notEmpty.signal();
            
        } finally {
            lock.unlock();
        }
    }
    
    public String take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                System.out.println("缓冲区为空,消费者等待...");
                notEmpty.await();
            }
            
            String item = buffer[takeIndex];
            buffer[takeIndex] = null;
            takeIndex = (takeIndex + 1) % buffer.length;
            count--;
            
            System.out.println("缓冲区大小: " + count + "/" + buffer.length);
            notFull.signal();
            
            return item;
            
        } finally {
            lock.unlock();
        }
    }
}

2.3 volatile关键字

// volatile关键字演示
public class VolatileDemo {
    
    public static void main(String[] args) {
        demonstrateVisibility();
        demonstrateAtomicity();
        demonstrateOrdering();
        demonstrateDoubleCheckedLocking();
    }
    
    // 可见性演示
    public static void demonstrateVisibility() {
        System.out.println("=== volatile可见性演示 ===");
        
        VisibilityTest test = new VisibilityTest();
        
        // 读线程
        Thread reader = new Thread(() -> {
            while (!test.isFlag()) {
                // 忙等待
            }
            System.out.println("读线程检测到flag变化: " + test.isFlag());
        }, "Reader");
        
        // 写线程
        Thread writer = new Thread(() -> {
            try {
                Thread.sleep(1000);
                test.setFlag(true);
                System.out.println("写线程设置flag为true");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Writer");
        
        reader.start();
        writer.start();
        
        try {
            reader.join(3000); // 最多等待3秒
            writer.join();
            
            if (reader.isAlive()) {
                System.out.println("读线程仍在运行,可能存在可见性问题");
                reader.interrupt();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 原子性演示
    public static void demonstrateAtomicity() {
        System.out.println("\n=== volatile原子性演示 ===");
        
        AtomicityTest test = new AtomicityTest();
        
        // 创建多个线程同时增加计数
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    test.increment();
                }
            }, "Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("最终计数值: " + test.getCount());
        System.out.println("预期值: " + (10 * 1000));
        System.out.println("说明: volatile不能保证原子性,需要使用synchronized或AtomicInteger");
    }
    
    // 有序性演示
    public static void demonstrateOrdering() {
        System.out.println("\n=== volatile有序性演示 ===");
        
        OrderingTest test = new OrderingTest();
        
        for (int i = 0; i < 10; i++) {
            final int iteration = i;
            
            Thread writer = new Thread(() -> {
                test.writer();
            }, "Writer-" + iteration);
            
            Thread reader = new Thread(() -> {
                test.reader();
            }, "Reader-" + iteration);
            
            writer.start();
            reader.start();
            
            try {
                writer.join();
                reader.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    // 双重检查锁定演示
    public static void demonstrateDoubleCheckedLocking() {
        System.out.println("\n=== 双重检查锁定演示 ===");
        
        // 创建多个线程同时获取单例
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            threads[i] = new Thread(() -> {
                Singleton instance = Singleton.getInstance();
                System.out.println(Thread.currentThread().getName() + 
                                 " 获取单例: " + instance.hashCode());
            }, "Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

// 可见性测试类
class VisibilityTest {
    private volatile boolean flag = false; // 使用volatile确保可见性
    
    public boolean isFlag() {
        return flag;
    }
    
    public void setFlag(boolean flag) {
        this.flag = flag;
    }
}

// 原子性测试类
class AtomicityTest {
    private volatile int count = 0; // volatile不能保证原子性
    
    public void increment() {
        count++; // 这不是原子操作
    }
    
    public int getCount() {
        return count;
    }
}

// 有序性测试类
class OrderingTest {
    private int a = 0;
    private volatile boolean flag = false;
    
    public void writer() {
        a = 1;          // 1
        flag = true;    // 2
    }
    
    public void reader() {
        if (flag) {     // 3
            int i = a;  // 4
            System.out.println(Thread.currentThread().getName() + 
                             " 读取到 a = " + i);
        }
    }
}

// 双重检查锁定单例
class Singleton {
    private static volatile Singleton instance; // volatile确保正确的双重检查锁定
    
    private Singleton() {
        // 模拟初始化工作
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        System.out.println("单例初始化完成: " + Thread.currentThread().getName());
    }
    
    public static Singleton getInstance() {
        if (instance == null) { // 第一次检查
            synchronized (Singleton.class) {
                if (instance == null) { // 第二次检查
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

3. 并发工具类

3.1 CountDownLatch

// CountDownLatch演示
public class CountDownLatchDemo {
    
    public static void main(String[] args) {
        demonstrateBasicUsage();
        demonstrateRaceStart();
        demonstrateServiceStartup();
    }
    
    // 基本用法
    public static void demonstrateBasicUsage() {
        System.out.println("=== CountDownLatch基本用法 ===");
        
        int workerCount = 5;
        CountDownLatch latch = new CountDownLatch(workerCount);
        
        // 创建工作线程
        for (int i = 0; i < workerCount; i++) {
            final int workerId = i;
            Thread worker = new Thread(() -> {
                try {
                    System.out.println("工作线程" + workerId + " 开始工作");
                    
                    // 模拟工作
                    Thread.sleep((workerId + 1) * 1000);
                    
                    System.out.println("工作线程" + workerId + " 完成工作");
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 完成工作,计数减1
                }
            }, "Worker-" + i);
            
            worker.start();
        }
        
        try {
            System.out.println("主线程等待所有工作线程完成...");
            latch.await(); // 等待计数归零
            System.out.println("所有工作线程已完成,主线程继续执行");
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 比赛开始场景
    public static void demonstrateRaceStart() {
        System.out.println("\n=== 比赛开始场景 ===");
        
        int runnerCount = 3;
        CountDownLatch startSignal = new CountDownLatch(1); // 开始信号
        CountDownLatch doneSignal = new CountDownLatch(runnerCount); // 完成信号
        
        // 创建跑步者
        for (int i = 0; i < runnerCount; i++) {
            final int runnerId = i;
            Thread runner = new Thread(() -> {
                try {
                    System.out.println("跑步者" + runnerId + " 准备就绪");
                    
                    startSignal.await(); // 等待开始信号
                    
                    System.out.println("跑步者" + runnerId + " 开始跑步!");
                    
                    // 模拟跑步时间
                    int runTime = (int) (Math.random() * 3000) + 1000;
                    Thread.sleep(runTime);
                    
                    System.out.println("跑步者" + runnerId + " 完成比赛!用时: " + runTime + "ms");
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    doneSignal.countDown(); // 完成比赛
                }
            }, "Runner-" + i);
            
            runner.start();
        }
        
        try {
            Thread.sleep(2000); // 等待所有跑步者准备
            
            System.out.println("\n裁判: 3... 2... 1... 开始!");
            startSignal.countDown(); // 发出开始信号
            
            doneSignal.await(); // 等待所有跑步者完成
            System.out.println("\n裁判: 比赛结束!");
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 服务启动场景
    public static void demonstrateServiceStartup() {
        System.out.println("\n=== 服务启动场景 ===");
        
        String[] services = {"数据库服务", "缓存服务", "消息队列服务", "Web服务"};
        CountDownLatch latch = new CountDownLatch(services.length);
        
        // 启动各个服务
        for (int i = 0; i < services.length; i++) {
            final String serviceName = services[i];
            final int startupTime = (i + 1) * 1000; // 不同服务启动时间不同
            
            Thread serviceThread = new Thread(() -> {
                try {
                    System.out.println(serviceName + " 开始启动...");
                    
                    Thread.sleep(startupTime); // 模拟启动时间
                    
                    System.out.println(serviceName + " 启动完成");
                    
                } catch (InterruptedException e) {
                    System.out.println(serviceName + " 启动被中断");
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            }, serviceName + "-Thread");
            
            serviceThread.start();
        }
        
        try {
            System.out.println("等待所有服务启动完成...");
            
            boolean allStarted = latch.await(10, TimeUnit.SECONDS); // 最多等待10秒
            
            if (allStarted) {
                System.out.println("\n所有服务启动完成,应用程序可以开始接收请求");
            } else {
                System.out.println("\n服务启动超时,当前剩余未启动服务数: " + latch.getCount());
            }
            
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3.2 Semaphore

// Semaphore演示
public class SemaphoreDemo {
    
    public static void main(String[] args) {
        demonstrateBasicUsage();
        demonstrateConnectionPool();
        demonstrateParkingLot();
    }
    
    // 基本用法
    public static void demonstrateBasicUsage() {
        System.out.println("=== Semaphore基本用法 ===");
        
        // 创建一个许可数为3的信号量
        Semaphore semaphore = new Semaphore(3);
        
        // 创建5个线程竞争3个许可
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            Thread thread = new Thread(() -> {
                try {
                    System.out.println("线程" + threadId + " 尝试获取许可...");
                    
                    semaphore.acquire(); // 获取许可
                    System.out.println("线程" + threadId + " 获得许可,开始工作");
                    
                    // 模拟工作
                    Thread.sleep(2000);
                    
                    System.out.println("线程" + threadId + " 工作完成,释放许可");
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    semaphore.release(); // 释放许可
                }
            }, "Thread-" + i);
            
            thread.start();
        }
        
        try {
            Thread.sleep(10000); // 等待所有线程完成
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 连接池场景
    public static void demonstrateConnectionPool() {
        System.out.println("\n=== 连接池场景 ===");
        
        DatabaseConnectionPool pool = new DatabaseConnectionPool(3);
        
        // 创建多个客户端同时请求连接
        for (int i = 0; i < 8; i++) {
            final int clientId = i;
            Thread client = new Thread(() -> {
                try {
                    Connection conn = pool.getConnection();
                    System.out.println("客户端" + clientId + " 获得连接: " + conn.getId());
                    
                    // 模拟使用连接
                    Thread.sleep((int) (Math.random() * 3000) + 1000);
                    
                    pool.releaseConnection(conn);
                    System.out.println("客户端" + clientId + " 释放连接: " + conn.getId());
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Client-" + i);
            
            client.start();
        }
        
        try {
            Thread.sleep(15000); // 等待所有客户端完成
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 停车场场景
    public static void demonstrateParkingLot() {
        System.out.println("\n=== 停车场场景 ===");
        
        ParkingLot parkingLot = new ParkingLot(4);
        
        // 创建多辆车尝试停车
        for (int i = 0; i < 10; i++) {
            final int carId = i;
            Thread car = new Thread(() -> {
                try {
                    boolean parked = parkingLot.park(carId);
                    if (parked) {
                        // 模拟停车时间
                        int parkingTime = (int) (Math.random() * 4000) + 1000;
                        Thread.sleep(parkingTime);
                        
                        parkingLot.leave(carId);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Car-" + i);
            
            car.start();
            
            // 间隔一段时间再来下一辆车
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        try {
            Thread.sleep(20000); // 等待所有车辆完成
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 数据库连接池
class DatabaseConnectionPool {
    private final Semaphore semaphore;
    private final Queue<Connection> connections;
    
    public DatabaseConnectionPool(int poolSize) {
        this.semaphore = new Semaphore(poolSize);
        this.connections = new ConcurrentLinkedQueue<>();
        
        // 初始化连接池
        for (int i = 0; i < poolSize; i++) {
            connections.offer(new Connection(i));
        }
    }
    
    public Connection getConnection() throws InterruptedException {
        semaphore.acquire(); // 获取许可
        return connections.poll(); // 获取连接
    }
    
    public void releaseConnection(Connection connection) {
        connections.offer(connection); // 归还连接
        semaphore.release(); // 释放许可
    }
    
    // 模拟数据库连接
    static class Connection {
        private final int id;
        
        public Connection(int id) {
            this.id = id;
        }
        
        public int getId() {
            return id;
        }
    }
}

// 停车场
class ParkingLot {
    private final Semaphore semaphore;
    private final int capacity;
    
    public ParkingLot(int capacity) {
        this.capacity = capacity;
        this.semaphore = new Semaphore(capacity);
    }
    
    public boolean park(int carId) {
        try {
            System.out.println("车辆" + carId + " 尝试进入停车场...");
            
            // 尝试获取停车位,最多等待2秒
            if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
                System.out.println("车辆" + carId + " 成功停车 (剩余车位: " + 
                                 semaphore.availablePermits() + "/" + capacity + ")");
                return true;
            } else {
                System.out.println("车辆" + carId + " 停车场已满,离开");
                return false;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }
    
    public void leave(int carId) {
        semaphore.release();
        System.out.println("车辆" + carId + " 离开停车场 (剩余车位: " + 
                          semaphore.availablePermits() + "/" + capacity + ")");
     }
 }

3.3 CyclicBarrier

// CyclicBarrier演示
public class CyclicBarrierDemo {
    
    public static void main(String[] args) {
        demonstrateBasicUsage();
        demonstrateMultiPhase();
        demonstrateTeamWork();
    }
    
    // 基本用法
    public static void demonstrateBasicUsage() {
        System.out.println("=== CyclicBarrier基本用法 ===");
        
        int parties = 4;
        CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
            System.out.println("\n所有线程都到达屏障点,执行屏障动作!\n");
        });
        
        // 创建多个线程
        for (int i = 0; i < parties; i++) {
            final int threadId = i;
            Thread thread = new Thread(() -> {
                try {
                    System.out.println("线程" + threadId + " 开始工作");
                    
                    // 模拟不同的工作时间
                    Thread.sleep((threadId + 1) * 1000);
                    
                    System.out.println("线程" + threadId + " 完成工作,等待其他线程");
                    
                    barrier.await(); // 等待所有线程到达屏障点
                    
                    System.out.println("线程" + threadId + " 继续执行后续工作");
                    
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println("线程" + threadId + " 被中断或屏障被破坏");
                    Thread.currentThread().interrupt();
                }
            }, "Thread-" + i);
            
            thread.start();
        }
        
        try {
            Thread.sleep(8000); // 等待所有线程完成
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 多阶段任务
    public static void demonstrateMultiPhase() {
        System.out.println("\n=== 多阶段任务演示 ===");
        
        int workerCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(workerCount);
        
        // 创建工作线程
        for (int i = 0; i < workerCount; i++) {
            final int workerId = i;
            Thread worker = new Thread(() -> {
                try {
                    // 第一阶段
                    System.out.println("工作者" + workerId + " 开始第一阶段工作");
                    Thread.sleep((workerId + 1) * 500);
                    System.out.println("工作者" + workerId + " 完成第一阶段");
                    
                    barrier.await(); // 等待所有工作者完成第一阶段
                    
                    // 第二阶段
                    System.out.println("工作者" + workerId + " 开始第二阶段工作");
                    Thread.sleep((3 - workerId) * 500);
                    System.out.println("工作者" + workerId + " 完成第二阶段");
                    
                    barrier.await(); // 等待所有工作者完成第二阶段
                    
                    // 第三阶段
                    System.out.println("工作者" + workerId + " 开始第三阶段工作");
                    Thread.sleep(1000);
                    System.out.println("工作者" + workerId + " 完成第三阶段");
                    
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println("工作者" + workerId + " 被中断或屏障被破坏");
                    Thread.currentThread().interrupt();
                }
            }, "Worker-" + i);
            
            worker.start();
        }
        
        try {
            Thread.sleep(10000); // 等待所有工作者完成
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 团队协作场景
    public static void demonstrateTeamWork() {
        System.out.println("\n=== 团队协作场景 ===");
        
        TeamProject project = new TeamProject();
        project.start();
    }
}

// 团队项目类
class TeamProject {
    private final int teamSize = 4;
    private final CyclicBarrier designBarrier;
    private final CyclicBarrier developBarrier;
    private final CyclicBarrier testBarrier;
    
    public TeamProject() {
        this.designBarrier = new CyclicBarrier(teamSize, () -> {
            System.out.println("\n=== 设计阶段完成,开始开发阶段 ===\n");
        });
        
        this.developBarrier = new CyclicBarrier(teamSize, () -> {
            System.out.println("\n=== 开发阶段完成,开始测试阶段 ===\n");
        });
        
        this.testBarrier = new CyclicBarrier(teamSize, () -> {
            System.out.println("\n=== 测试阶段完成,项目发布! ===\n");
        });
    }
    
    public void start() {
        String[] roles = {"前端工程师", "后端工程师", "数据库工程师", "测试工程师"};
        
        for (int i = 0; i < teamSize; i++) {
            final String role = roles[i];
            final int memberId = i;
            
            Thread member = new Thread(() -> {
                try {
                    // 设计阶段
                    System.out.println(role + " 开始设计工作");
                    Thread.sleep((memberId + 1) * 800);
                    System.out.println(role + " 完成设计工作");
                    designBarrier.await();
                    
                    // 开发阶段
                    System.out.println(role + " 开始开发工作");
                    Thread.sleep((4 - memberId) * 600);
                    System.out.println(role + " 完成开发工作");
                    developBarrier.await();
                    
                    // 测试阶段
                    System.out.println(role + " 开始测试工作");
                    Thread.sleep((memberId % 2 + 1) * 700);
                    System.out.println(role + " 完成测试工作");
                    testBarrier.await();
                    
                    System.out.println(role + " 项目完成!");
                    
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println(role + " 工作被中断");
                    Thread.currentThread().interrupt();
                }
            }, role);
            
            member.start();
        }
    }
}

3.4 Exchanger

// Exchanger演示
public class ExchangerDemo {
    
    public static void main(String[] args) {
        demonstrateBasicUsage();
        demonstrateDataExchange();
        demonstrateProducerConsumer();
    }
    
    // 基本用法
    public static void demonstrateBasicUsage() {
        System.out.println("=== Exchanger基本用法 ===");
        
        Exchanger<String> exchanger = new Exchanger<>();
        
        // 线程A
        Thread threadA = new Thread(() -> {
            try {
                String dataA = "来自线程A的数据";
                System.out.println("线程A准备交换数据: " + dataA);
                
                String receivedData = exchanger.exchange(dataA);
                
                System.out.println("线程A收到数据: " + receivedData);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "ThreadA");
        
        // 线程B
        Thread threadB = new Thread(() -> {
            try {
                Thread.sleep(1000); // 模拟准备时间
                
                String dataB = "来自线程B的数据";
                System.out.println("线程B准备交换数据: " + dataB);
                
                String receivedData = exchanger.exchange(dataB);
                
                System.out.println("线程B收到数据: " + receivedData);
                
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "ThreadB");
        
        threadA.start();
        threadB.start();
        
        try {
            threadA.join();
            threadB.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 数据交换场景
    public static void demonstrateDataExchange() {
        System.out.println("\n=== 数据交换场景 ===");
        
        DataExchangeDemo demo = new DataExchangeDemo();
        demo.start();
    }
    
    // 生产者消费者场景
    public static void demonstrateProducerConsumer() {
        System.out.println("\n=== 生产者消费者场景 ===");
        
        ProducerConsumerExchange demo = new ProducerConsumerExchange();
        demo.start();
    }
}

// 数据交换演示
class DataExchangeDemo {
    private final Exchanger<List<String>> exchanger = new Exchanger<>();
    
    public void start() {
        // 数据处理器A
        Thread processorA = new Thread(() -> {
            List<String> dataA = new ArrayList<>();
            
            try {
                for (int i = 0; i < 3; i++) {
                    // 生成数据
                    dataA.clear();
                    for (int j = 0; j < 5; j++) {
                        dataA.add("A-" + i + "-" + j);
                    }
                    
                    System.out.println("处理器A生成数据: " + dataA);
                    
                    // 交换数据
                    List<String> receivedData = exchanger.exchange(dataA);
                    
                    System.out.println("处理器A收到数据: " + receivedData);
                    
                    // 处理收到的数据
                    processData("A", receivedData);
                    
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "ProcessorA");
        
        // 数据处理器B
        Thread processorB = new Thread(() -> {
            List<String> dataB = new ArrayList<>();
            
            try {
                for (int i = 0; i < 3; i++) {
                    // 生成数据
                    dataB.clear();
                    for (int j = 0; j < 5; j++) {
                        dataB.add("B-" + i + "-" + j);
                    }
                    
                    System.out.println("处理器B生成数据: " + dataB);
                    
                    // 交换数据
                    List<String> receivedData = exchanger.exchange(dataB);
                    
                    System.out.println("处理器B收到数据: " + receivedData);
                    
                    // 处理收到的数据
                    processData("B", receivedData);
                    
                    Thread.sleep(1500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "ProcessorB");
        
        processorA.start();
        processorB.start();
        
        try {
            processorA.join();
            processorB.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private void processData(String processor, List<String> data) {
        System.out.println("处理器" + processor + " 正在处理数据: " + data.size() + " 项");
    }
}

// 生产者消费者交换
class ProducerConsumerExchange {
    private final Exchanger<List<String>> exchanger = new Exchanger<>();
    
    public void start() {
        // 生产者
        Thread producer = new Thread(() -> {
            List<String> fullBuffer = new ArrayList<>();
            List<String> emptyBuffer = new ArrayList<>();
            
            try {
                for (int round = 0; round < 5; round++) {
                    // 生产数据到满缓冲区
                    fullBuffer.clear();
                    for (int i = 0; i < 3; i++) {
                        String item = "Item-" + round + "-" + i;
                        fullBuffer.add(item);
                        System.out.println("生产者生产: " + item);
                        Thread.sleep(200);
                    }
                    
                    System.out.println("生产者准备交换缓冲区...");
                    
                    // 交换缓冲区
                    emptyBuffer = exchanger.exchange(fullBuffer);
                    
                    System.out.println("生产者获得空缓冲区,继续生产");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer");
        
        // 消费者
        Thread consumer = new Thread(() -> {
            List<String> emptyBuffer = new ArrayList<>();
            List<String> fullBuffer;
            
            try {
                for (int round = 0; round < 5; round++) {
                    System.out.println("消费者等待满缓冲区...");
                    
                    // 交换缓冲区
                    fullBuffer = exchanger.exchange(emptyBuffer);
                    
                    System.out.println("消费者获得满缓冲区,开始消费");
                    
                    // 消费数据
                    for (String item : fullBuffer) {
                        System.out.println("消费者消费: " + item);
                        Thread.sleep(300);
                    }
                    
                    // 清空缓冲区
                    emptyBuffer = fullBuffer;
                    emptyBuffer.clear();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer");
        
        producer.start();
        consumer.start();
        
        try {
            producer.join();
            consumer.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

4. 原子类

4.1 基本原子类

// 原子类演示
public class AtomicDemo {
    
    public static void main(String[] args) {
        demonstrateAtomicInteger();
        demonstrateAtomicLong();
        demonstrateAtomicBoolean();
        demonstrateAtomicReference();
    }
    
    // AtomicInteger演示
    public static void demonstrateAtomicInteger() {
        System.out.println("=== AtomicInteger演示 ===");
        
        AtomicInteger counter = new AtomicInteger(0);
        int threadCount = 10;
        int incrementsPerThread = 1000;
        
        Thread[] threads = new Thread[threadCount];
        
        long startTime = System.currentTimeMillis();
        
        // 创建线程进行并发增加
        for (int i = 0; i < threadCount; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < incrementsPerThread; j++) {
                    counter.incrementAndGet();
                }
            }, "Thread-" + i);
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        long endTime = System.currentTimeMillis();
        
        System.out.println("最终计数值: " + counter.get());
        System.out.println("预期值: " + (threadCount * incrementsPerThread));
        System.out.println("执行时间: " + (endTime - startTime) + "ms");
        
        // 演示其他操作
        System.out.println("\n其他AtomicInteger操作:");
        System.out.println("当前值: " + counter.get());
        System.out.println("增加5: " + counter.addAndGet(5));
        System.out.println("减少3: " + counter.addAndGet(-3));
        System.out.println("比较并设置(期望10002,新值20000): " + 
                         counter.compareAndSet(10002, 20000));
        System.out.println("当前值: " + counter.get());
        System.out.println("获取并设置为0: " + counter.getAndSet(0));
        System.out.println("当前值: " + counter.get());
    }
    
    // AtomicLong演示
    public static void demonstrateAtomicLong() {
        System.out.println("\n=== AtomicLong演示 ===");
        
        AtomicLong accumulator = new AtomicLong(0);
        
        // 模拟多线程累加大数
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            final long baseValue = (i + 1) * 1000000L;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    accumulator.addAndGet(baseValue + j);
                }
            }, "AccumulatorThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("累加结果: " + accumulator.get());
    }
    
    // AtomicBoolean演示
    public static void demonstrateAtomicBoolean() {
        System.out.println("\n=== AtomicBoolean演示 ===");
        
        AtomicBoolean flag = new AtomicBoolean(false);
        AtomicInteger successCount = new AtomicInteger(0);
        
        // 多个线程尝试设置标志
        Thread[] threads = new Thread[10];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                try {
                    Thread.sleep((int) (Math.random() * 100));
                    
                    // 尝试将false设置为true
                    if (flag.compareAndSet(false, true)) {
                        System.out.println("线程" + threadId + " 成功设置标志为true");
                        successCount.incrementAndGet();
                        
                        Thread.sleep(100); // 模拟工作
                        
                        flag.set(false); // 重置标志
                        System.out.println("线程" + threadId + " 重置标志为false");
                    } else {
                        System.out.println("线程" + threadId + " 设置标志失败");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "FlagThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("成功设置标志的线程数: " + successCount.get());
    }
    
    // AtomicReference演示
    public static void demonstrateAtomicReference() {
        System.out.println("\n=== AtomicReference演示 ===");
        
        AtomicReference<User> userRef = new AtomicReference<>(new User("张三", 25));
        
        // 多个线程尝试更新用户信息
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                try {
                    Thread.sleep((int) (Math.random() * 100));
                    
                    User currentUser = userRef.get();
                    User newUser = new User(currentUser.getName(), currentUser.getAge() + 1);
                    
                    if (userRef.compareAndSet(currentUser, newUser)) {
                        System.out.println("线程" + threadId + " 成功更新用户年龄: " + 
                                         currentUser.getAge() + " -> " + newUser.getAge());
                    } else {
                        System.out.println("线程" + threadId + " 更新用户信息失败");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "UserUpdateThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        User finalUser = userRef.get();
        System.out.println("最终用户信息: " + finalUser.getName() + ", 年龄: " + finalUser.getAge());
    }
}

// 用户类
class User {
    private final String name;
    private final int age;
    
    public User(String name, int age) {
        this.name = name;
        this.age = age;
    }
    
    public String getName() {
        return name;
    }
    
    public int getAge() {
        return age;
    }
    
    @Override
    public String toString() {
        return "User{name='" + name + "', age=" + age + "}";
    }
}

4.2 数组原子类

// 数组原子类演示
public class AtomicArrayDemo {
    
    public static void main(String[] args) {
        demonstrateAtomicIntegerArray();
        demonstrateAtomicLongArray();
        demonstrateAtomicReferenceArray();
    }
    
    // AtomicIntegerArray演示
    public static void demonstrateAtomicIntegerArray() {
        System.out.println("=== AtomicIntegerArray演示 ===");
        
        int arraySize = 10;
        AtomicIntegerArray atomicArray = new AtomicIntegerArray(arraySize);
        
        // 多个线程并发更新数组元素
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    int index = j % arraySize;
                    int newValue = atomicArray.incrementAndGet(index);
                    
                    if (j % 20 == 0) {
                        System.out.println("线程" + threadId + " 更新索引" + index + 
                                         " 的值为: " + newValue);
                    }
                }
            }, "ArrayThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        // 输出最终结果
        System.out.println("\n最终数组状态:");
        for (int i = 0; i < arraySize; i++) {
            System.out.println("索引" + i + ": " + atomicArray.get(i));
        }
    }
    
    // AtomicLongArray演示
    public static void demonstrateAtomicLongArray() {
        System.out.println("\n=== AtomicLongArray演示 ===");
        
        AtomicLongArray longArray = new AtomicLongArray(5);
        
        // 初始化数组
        for (int i = 0; i < longArray.length(); i++) {
            longArray.set(i, (i + 1) * 1000L);
        }
        
        System.out.println("初始数组状态:");
        for (int i = 0; i < longArray.length(); i++) {
            System.out.println("索引" + i + ": " + longArray.get(i));
        }
        
        // 并发累加
        Thread[] threads = new Thread[3];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < longArray.length(); j++) {
                    long addValue = (threadId + 1) * 100L;
                    long newValue = longArray.addAndGet(j, addValue);
                    System.out.println("线程" + threadId + " 给索引" + j + 
                                     " 增加" + addValue + ",新值: " + newValue);
                }
            }, "LongArrayThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("\n最终数组状态:");
        for (int i = 0; i < longArray.length(); i++) {
            System.out.println("索引" + i + ": " + longArray.get(i));
        }
    }
    
    // AtomicReferenceArray演示
    public static void demonstrateAtomicReferenceArray() {
        System.out.println("\n=== AtomicReferenceArray演示 ===");
        
        AtomicReferenceArray<String> stringArray = new AtomicReferenceArray<>(5);
        
        // 初始化数组
        for (int i = 0; i < stringArray.length(); i++) {
            stringArray.set(i, "初始值-" + i);
        }
        
        System.out.println("初始数组状态:");
        for (int i = 0; i < stringArray.length(); i++) {
            System.out.println("索引" + i + ": " + stringArray.get(i));
        }
        
        // 并发更新字符串
        Thread[] threads = new Thread[3];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < stringArray.length(); j++) {
                    String currentValue = stringArray.get(j);
                    String newValue = currentValue + "-线程" + threadId;
                    
                    if (stringArray.compareAndSet(j, currentValue, newValue)) {
                        System.out.println("线程" + threadId + " 成功更新索引" + j + 
                                         ": " + currentValue + " -> " + newValue);
                    } else {
                        System.out.println("线程" + threadId + " 更新索引" + j + " 失败");
                    }
                    
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, "StringArrayThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("\n最终数组状态:");
         for (int i = 0; i < stringArray.length(); i++) {
             System.out.println("索引" + i + ": " + stringArray.get(i));
         }
     }
 }

4.3 字段更新器

// 字段更新器演示
public class AtomicFieldUpdaterDemo {
    
    public static void main(String[] args) {
        demonstrateAtomicIntegerFieldUpdater();
        demonstrateAtomicLongFieldUpdater();
        demonstrateAtomicReferenceFieldUpdater();
    }
    
    // AtomicIntegerFieldUpdater演示
    public static void demonstrateAtomicIntegerFieldUpdater() {
        System.out.println("=== AtomicIntegerFieldUpdater演示 ===");
        
        AtomicIntegerFieldUpdater<Counter> updater = 
            AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");
        
        Counter counter = new Counter();
        
        // 多线程更新字段
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    updater.incrementAndGet(counter);
                }
                System.out.println("线程" + threadId + " 完成,当前计数: " + 
                                 updater.get(counter));
            }, "UpdaterThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("最终计数: " + updater.get(counter));
    }
    
    // AtomicLongFieldUpdater演示
    public static void demonstrateAtomicLongFieldUpdater() {
        System.out.println("\n=== AtomicLongFieldUpdater演示 ===");
        
        AtomicLongFieldUpdater<Statistics> updater = 
            AtomicLongFieldUpdater.newUpdater(Statistics.class, "totalBytes");
        
        Statistics stats = new Statistics();
        
        // 模拟多线程统计字节数
        Thread[] threads = new Thread[3];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    long bytes = (threadId + 1) * 1024L + j;
                    updater.addAndGet(stats, bytes);
                    
                    if (j % 25 == 0) {
                        System.out.println("线程" + threadId + " 添加" + bytes + 
                                         " 字节,总计: " + updater.get(stats));
                    }
                }
            }, "StatsThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("最终统计字节数: " + updater.get(stats));
    }
    
    // AtomicReferenceFieldUpdater演示
    public static void demonstrateAtomicReferenceFieldUpdater() {
        System.out.println("\n=== AtomicReferenceFieldUpdater演示 ===");
        
        AtomicReferenceFieldUpdater<Node, String> updater = 
            AtomicReferenceFieldUpdater.newUpdater(Node.class, String.class, "data");
        
        Node node = new Node("初始数据");
        
        // 多线程更新节点数据
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                try {
                    Thread.sleep((int) (Math.random() * 100));
                    
                    String currentData = updater.get(node);
                    String newData = currentData + "-线程" + threadId;
                    
                    if (updater.compareAndSet(node, currentData, newData)) {
                        System.out.println("线程" + threadId + " 成功更新: " + 
                                         currentData + " -> " + newData);
                    } else {
                        System.out.println("线程" + threadId + " 更新失败");
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "NodeThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("最终节点数据: " + updater.get(node));
    }
}

// 计数器类
class Counter {
    public volatile int count = 0;
    
    public int getCount() {
        return count;
    }
}

// 统计类
class Statistics {
    public volatile long totalBytes = 0;
    
    public long getTotalBytes() {
        return totalBytes;
    }
}

// 节点类
class Node {
    public volatile String data;
    
    public Node(String data) {
        this.data = data;
    }
    
    public String getData() {
        return data;
    }
}

5. 线程池

5.1 线程池基础

// 线程池演示
public class ThreadPoolDemo {
    
    public static void main(String[] args) {
        demonstrateFixedThreadPool();
        demonstrateCachedThreadPool();
        demonstrateSingleThreadExecutor();
        demonstrateScheduledThreadPool();
        demonstrateCustomThreadPool();
    }
    
    // 固定大小线程池
    public static void demonstrateFixedThreadPool() {
        System.out.println("=== 固定大小线程池演示 ===");
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交多个任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + " 开始执行,线程: " + 
                                 Thread.currentThread().getName());
                try {
                    Thread.sleep(2000); // 模拟任务执行
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + " 执行完成");
            });
        }
        
        // 关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("固定大小线程池演示完成\n");
    }
    
    // 缓存线程池
    public static void demonstrateCachedThreadPool() {
        System.out.println("=== 缓存线程池演示 ===");
        
        ExecutorService executor = Executors.newCachedThreadPool();
        
        // 提交任务,观察线程创建
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("缓存池任务" + taskId + " 开始,线程: " + 
                                 Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("缓存池任务" + taskId + " 完成");
            });
            
            // 间隔提交
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        // 关闭线程池
        executor.shutdown();
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("缓存线程池演示完成\n");
    }
    
    // 单线程执行器
    public static void demonstrateSingleThreadExecutor() {
        System.out.println("=== 单线程执行器演示 ===");
        
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        // 提交多个任务,观察顺序执行
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("单线程任务" + taskId + " 开始执行");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("单线程任务" + taskId + " 执行完成");
            });
        }
        
        // 关闭线程池
        executor.shutdown();
        try {
            executor.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        System.out.println("单线程执行器演示完成\n");
    }
    
    // 定时线程池
    public static void demonstrateScheduledThreadPool() {
        System.out.println("=== 定时线程池演示 ===");
        
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
        
        // 延迟执行
        scheduler.schedule(() -> {
            System.out.println("延迟任务执行,时间: " + new Date());
        }, 2, TimeUnit.SECONDS);
        
        // 固定频率执行
        ScheduledFuture<?> fixedRateTask = scheduler.scheduleAtFixedRate(() -> {
            System.out.println("固定频率任务执行,时间: " + new Date());
        }, 1, 1, TimeUnit.SECONDS);
        
        // 固定延迟执行
        ScheduledFuture<?> fixedDelayTask = scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("固定延迟任务执行,时间: " + new Date());
            try {
                Thread.sleep(500); // 模拟任务执行时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 1, 2, TimeUnit.SECONDS);
        
        // 运行一段时间后取消
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        fixedRateTask.cancel(false);
        fixedDelayTask.cancel(false);
        
        scheduler.shutdown();
        System.out.println("定时线程池演示完成\n");
    }
    
    // 自定义线程池
    public static void demonstrateCustomThreadPool() {
        System.out.println("=== 自定义线程池演示 ===");
        
        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,                      // 核心线程数
            4,                      // 最大线程数
            60L,                    // 空闲线程存活时间
            TimeUnit.SECONDS,       // 时间单位
            new LinkedBlockingQueue<>(10), // 工作队列
            new ThreadFactory() {   // 线程工厂
                private int threadNumber = 1;
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, "CustomThread-" + threadNumber++);
                    t.setDaemon(false);
                    return t;
                }
            },
            new RejectedExecutionHandler() { // 拒绝策略
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.out.println("任务被拒绝: " + r.toString());
                }
            }
        );
        
        // 监控线程池状态
        Thread monitor = new Thread(() -> {
            while (!executor.isShutdown()) {
                System.out.printf("线程池状态 - 核心线程数: %d, 活跃线程数: %d, " +
                                "队列大小: %d, 完成任务数: %d%n",
                                executor.getCorePoolSize(),
                                executor.getActiveCount(),
                                executor.getQueue().size(),
                                executor.getCompletedTaskCount());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        monitor.setDaemon(true);
        monitor.start();
        
        // 提交任务
        for (int i = 0; i < 20; i++) {
            final int taskId = i;
            try {
                executor.submit(() -> {
                    System.out.println("自定义池任务" + taskId + " 开始,线程: " + 
                                     Thread.currentThread().getName());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println("自定义池任务" + taskId + " 完成");
                });
            } catch (RejectedExecutionException e) {
                System.out.println("任务" + taskId + " 提交失败: " + e.getMessage());
            }
            
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        // 关闭线程池
        executor.shutdown();
        try {
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
            Thread.currentThread().interrupt();
        }
        
        System.out.println("自定义线程池演示完成");
    }
}

5.2 Future和CompletableFuture

// Future和CompletableFuture演示
public class FutureDemo {
    
    public static void main(String[] args) {
        demonstrateFuture();
        demonstrateCompletableFuture();
        demonstrateCompletableFutureChaining();
        demonstrateCompletableFutureCombining();
    }
    
    // Future基本用法
    public static void demonstrateFuture() {
        System.out.println("=== Future演示 ===");
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交Callable任务
        Future<Integer> future1 = executor.submit(() -> {
            Thread.sleep(2000);
            return 42;
        });
        
        Future<String> future2 = executor.submit(() -> {
            Thread.sleep(1500);
            return "Hello Future";
        });
        
        Future<Double> future3 = executor.submit(() -> {
            Thread.sleep(1000);
            return Math.PI;
        });
        
        try {
            // 获取结果
            System.out.println("等待结果...");
            
            // 带超时的获取
            Double result3 = future3.get(2, TimeUnit.SECONDS);
            System.out.println("结果3: " + result3);
            
            String result2 = future2.get();
            System.out.println("结果2: " + result2);
            
            Integer result1 = future1.get();
            System.out.println("结果1: " + result1);
            
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            System.out.println("获取结果时发生异常: " + e.getMessage());
        }
        
        executor.shutdown();
        System.out.println("Future演示完成\n");
    }
    
    // CompletableFuture基本用法
    public static void demonstrateCompletableFuture() {
        System.out.println("=== CompletableFuture演示 ===");
        
        // 异步执行
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "异步任务结果";
        });
        
        // 异步执行带回调
        CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
            System.out.println("异步执行任务,线程: " + Thread.currentThread().getName());
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }).thenRun(() -> {
            System.out.println("任务完成后的回调");
        });
        
        // 处理结果
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            return "原始数据";
        }).thenApply(data -> {
            return data.toUpperCase();
        }).thenApply(data -> {
            return "处理后的" + data;
        });
        
        try {
            System.out.println("结果1: " + future1.get());
            future2.get();
            System.out.println("结果3: " + future3.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("获取结果时发生异常: " + e.getMessage());
        }
        
        System.out.println("CompletableFuture演示完成\n");
    }
    
    // CompletableFuture链式操作
    public static void demonstrateCompletableFutureChaining() {
        System.out.println("=== CompletableFuture链式操作演示 ===");
        
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("步骤1: 获取用户ID");
                return "user123";
            })
            .thenCompose(userId -> {
                System.out.println("步骤2: 根据用户ID获取用户信息");
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return "用户信息: " + userId;
                });
            })
            .thenCompose(userInfo -> {
                System.out.println("步骤3: 根据用户信息获取权限");
                return CompletableFuture.supplyAsync(() -> {
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return userInfo + " -> 权限: admin";
                });
            })
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    System.out.println("处理过程中发生异常: " + throwable.getMessage());
                } else {
                    System.out.println("最终结果: " + result);
                }
            });
        
        try {
            future.get();
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("获取结果时发生异常: " + e.getMessage());
        }
        
        System.out.println("链式操作演示完成\n");
    }
    
    // CompletableFuture组合操作
    public static void demonstrateCompletableFutureCombining() {
        System.out.println("=== CompletableFuture组合操作演示 ===");
        
        // 并行执行多个任务
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "任务1结果";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "任务2结果";
        });
        
        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(800);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "任务3结果";
        });
        
        // 组合两个Future的结果
        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
            return result1 + " + " + result2;
        });
        
        // 等待所有任务完成
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
        
        CompletableFuture<String> allResults = allFutures.thenApply(v -> {
            try {
                return "所有结果: [" + future1.get() + ", " + future2.get() + ", " + future3.get() + "]";
            } catch (InterruptedException | ExecutionException e) {
                return "获取结果失败: " + e.getMessage();
            }
        });
        
        // 任意一个完成
        CompletableFuture<String> anyResult = CompletableFuture.anyOf(future1, future2, future3)
            .thenApply(result -> "最先完成的结果: " + result);
        
        try {
            System.out.println("组合结果: " + combinedFuture.get());
            System.out.println(anyResult.get());
            System.out.println(allResults.get());
        } catch (InterruptedException | ExecutionException e) {
            System.out.println("获取结果时发生异常: " + e.getMessage());
        }
        
        System.out.println("组合操作演示完成");
    }
}

6. 多线程最佳实践

6.1 线程安全设计

// 线程安全设计演示
public class ThreadSafetyDemo {
    
    public static void main(String[] args) {
        demonstrateImmutableClass();
        demonstrateThreadLocalUsage();
        demonstrateProducerConsumerPattern();
    }
    
    // 不可变类演示
    public static void demonstrateImmutableClass() {
        System.out.println("=== 不可变类演示 ===");
        
        ImmutablePerson person = new ImmutablePerson("张三", 25, 
            Arrays.asList("reading", "swimming"));
        
        // 多线程访问不可变对象
        Thread[] threads = new Thread[5];
        for (int i = 0; i < threads.length; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    System.out.println("线程" + threadId + " 访问: " + person);
                    
                    // 创建新的不可变对象
                    ImmutablePerson newPerson = person.withAge(person.getAge() + 1);
                    System.out.println("线程" + threadId + " 创建新对象: " + newPerson);
                    
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, "ImmutableThread-" + i);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("原始对象仍然不变: " + person);
        System.out.println("不可变类演示完成\n");
    }
    
    // ThreadLocal使用演示
    public static void demonstrateThreadLocalUsage() {
        System.out.println("=== ThreadLocal使用演示 ===");
        
        UserContext userContext = new UserContext();
        
        // 多线程设置和获取用户上下文
        Thread[] threads = new Thread[3];
        for (int i = 0; i < threads.length; i++) {
            final int userId = i + 1;
            threads[i] = new Thread(() -> {
                // 设置当前线程的用户信息
                userContext.setCurrentUser("用户" + userId);
                
                // 模拟业务操作
                businessOperation1(userContext);
                businessOperation2(userContext);
                
                // 清理ThreadLocal
                userContext.clear();
                
            }, "UserThread-" + userId);
        }
        
        // 启动并等待完成
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        
        System.out.println("ThreadLocal演示完成\n");
    }
    
    private static void businessOperation1(UserContext userContext) {
        System.out.println("业务操作1 - 当前用户: " + userContext.getCurrentUser() + 
                         ", 线程: " + Thread.currentThread().getName());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    private static void businessOperation2(UserContext userContext) {
        System.out.println("业务操作2 - 当前用户: " + userContext.getCurrentUser() + 
                         ", 线程: " + Thread.currentThread().getName());
        try {
            Thread.sleep(100);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
    
    // 生产者消费者模式演示
    public static void demonstrateProducerConsumerPattern() {
        System.out.println("=== 生产者消费者模式演示 ===");
        
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
        
        // 生产者
        Thread producer = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    String item = "商品" + i;
                    queue.put(item);
                    System.out.println("生产者生产: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(200);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer");
        
        // 消费者1
        Thread consumer1 = new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    String item = queue.take();
                    System.out.println("消费者1消费: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(300);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer1");
        
        // 消费者2
        Thread consumer2 = new Thread(() -> {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    String item = queue.take();
                    System.out.println("消费者2消费: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer2");
        
        producer.start();
        consumer1.start();
        consumer2.start();
        
        try {
            producer.join(); // 等待生产者完成
            Thread.sleep(3000); // 等待消费者处理完剩余商品
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        
        consumer1.interrupt();
        consumer2.interrupt();
        
        System.out.println("生产者消费者模式演示完成");
    }
}

// 不可变Person类
final class ImmutablePerson {
    private final String name;
    private final int age;
    private final List<String> hobbies;
    
    public ImmutablePerson(String name, int age, List<String> hobbies) {
        this.name = name;
        this.age = age;
        this.hobbies = Collections.unmodifiableList(new ArrayList<>(hobbies));
    }
    
    public String getName() {
        return name;
    }
    
    public int getAge() {
        return age;
    }
    
    public List<String> getHobbies() {
        return hobbies;
    }
    
    // 创建新的不可变对象
    public ImmutablePerson withAge(int newAge) {
        return new ImmutablePerson(this.name, newAge, this.hobbies);
    }
    
    public ImmutablePerson withName(String newName) {
        return new ImmutablePerson(newName, this.age, this.hobbies);
    }
    
    @Override
    public String toString() {
        return "ImmutablePerson{name='" + name + "', age=" + age + 
               ", hobbies=" + hobbies + "}";
    }
}

// 用户上下文类
class UserContext {
    private static final ThreadLocal<String> currentUser = new ThreadLocal<>();
    
    public void setCurrentUser(String user) {
        currentUser.set(user);
    }
    
    public String getCurrentUser() {
        return currentUser.get();
    }
    
    public void clear() {
        currentUser.remove();
    }
}

本章小结

多线程技术对比

技术 适用场景 优点 缺点
synchronized 简单同步需求 使用简单,JVM优化 性能相对较低,不可中断
ReentrantLock 复杂同步需求 功能丰富,可中断 使用复杂,需手动释放
volatile 简单状态标志 轻量级,保证可见性 不保证原子性
原子类 简单原子操作 无锁,高性能 只适用于简单操作
线程池 任务执行管理 资源复用,管理方便 配置复杂
并发工具类 特定协调场景 功能专一,使用方便 学习成本高

性能优化要点

  1. 减少锁的粒度:使用更细粒度的锁
  2. 避免锁竞争:减少临界区代码
  3. 使用无锁算法:原子类、CAS操作
  4. 合理使用线程池:避免频繁创建销毁线程
  5. 选择合适的并发工具:根据场景选择最适合的工具

安全考虑

  1. 避免死锁:统一锁顺序,使用超时
  2. 防止内存泄漏:及时清理ThreadLocal
  3. 异常处理:确保锁能正确释放
  4. 资源管理:正确关闭线程池

开发建议

  1. 优先使用高级并发工具:如线程池、并发集合
  2. 遵循不可变设计:减少共享可变状态
  3. 合理使用ThreadLocal:避免滥用
  4. 充分测试:并发代码需要充分的压力测试
  5. 性能监控:监控线程池状态和锁竞争情况

下一章预告

下一章我们将学习网络编程,包括: - Socket编程基础 - NIO和AIO - HTTP客户端编程 - 网络协议处理 - 分布式通信

练习题

  1. 基础练习:实现一个线程安全的计数器,支持增加、减少和获取操作

  2. 进阶练习:使用生产者消费者模式实现一个任务调度系统

  3. 综合练习:设计一个多线程的文件下载器,支持断点续传和进度显示

  4. 性能练习:比较不同同步机制的性能差异

  5. 实战练习:实现一个简单的线程池监控系统 “`