11.1 线程基础

11.1.1 线程与进程

进程(Process): - 是操作系统分配资源的基本单位 - 拥有独立的内存空间 - 进程间通信需要特殊机制(IPC)

线程(Thread): - 是CPU调度的基本单位 - 同一进程内的线程共享内存空间 - 线程间通信更加便捷 - 创建和切换开销较小

// 获取当前线程信息
public class ThreadInfo {
    public static void main(String[] args) {
        Thread currentThread = Thread.currentThread();
        
        System.out.println("线程名称: " + currentThread.getName());
        System.out.println("线程ID: " + currentThread.getId());
        System.out.println("线程优先级: " + currentThread.getPriority());
        System.out.println("线程状态: " + currentThread.getState());
        System.out.println("是否为守护线程: " + currentThread.isDaemon());
        System.out.println("是否存活: " + currentThread.isAlive());
        
        // 获取线程组信息
        ThreadGroup group = currentThread.getThreadGroup();
        System.out.println("线程组名称: " + group.getName());
        System.out.println("活跃线程数: " + group.activeCount());
    }
}

11.1.2 线程的生命周期

Java线程有以下几种状态:

  1. NEW(新建):线程对象已创建,但还未调用start()方法
  2. RUNNABLE(可运行):线程正在JVM中执行,可能正在运行或等待CPU时间片
  3. BLOCKED(阻塞):线程被阻塞,等待监视器锁
  4. WAITING(等待):线程无限期等待另一个线程执行特定操作
  5. TIMED_WAITING(超时等待):线程等待另一个线程执行操作,但有时间限制
  6. TERMINATED(终止):线程执行完毕
// 线程状态演示
public class ThreadStateDemo {
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(() -> {
            try {
                System.out.println("线程开始执行");
                Thread.sleep(2000); // TIMED_WAITING
                System.out.println("线程执行完毕");
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
        });
        
        System.out.println("创建后状态: " + thread.getState()); // NEW
        
        thread.start();
        System.out.println("启动后状态: " + thread.getState()); // RUNNABLE
        
        Thread.sleep(500);
        System.out.println("睡眠中状态: " + thread.getState()); // TIMED_WAITING
        
        thread.join(); // 等待线程结束
        System.out.println("结束后状态: " + thread.getState()); // TERMINATED
    }
}

11.1.3 线程的创建与启动

方法一:继承Thread类

// 继承Thread类创建线程
class MyThread extends Thread {
    private String threadName;
    
    public MyThread(String name) {
        this.threadName = name;
    }
    
    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            System.out.println(threadName + " 执行第 " + i + " 次");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                System.out.println(threadName + " 被中断");
                return;
            }
        }
        System.out.println(threadName + " 执行完毕");
    }
}

public class ThreadDemo1 {
    public static void main(String[] args) {
        MyThread thread1 = new MyThread("线程1");
        MyThread thread2 = new MyThread("线程2");
        
        thread1.start();
        thread2.start();
        
        System.out.println("主线程继续执行");
    }
}

方法二:实现Runnable接口

// 实现Runnable接口创建线程
class MyRunnable implements Runnable {
    private String taskName;
    
    public MyRunnable(String name) {
        this.taskName = name;
    }
    
    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            System.out.println(taskName + " 执行第 " + i + " 次 - " + 
                Thread.currentThread().getName());
            try {
                Thread.sleep(800);
            } catch (InterruptedException e) {
                System.out.println(taskName + " 被中断");
                return;
            }
        }
        System.out.println(taskName + " 执行完毕");
    }
}

public class ThreadDemo2 {
    public static void main(String[] args) {
        MyRunnable task1 = new MyRunnable("任务1");
        MyRunnable task2 = new MyRunnable("任务2");
        
        Thread thread1 = new Thread(task1, "工作线程1");
        Thread thread2 = new Thread(task2, "工作线程2");
        
        thread1.start();
        thread2.start();
        
        // 使用Lambda表达式创建线程
        Thread thread3 = new Thread(() -> {
            System.out.println("Lambda线程: " + Thread.currentThread().getName());
        }, "Lambda线程");
        thread3.start();
    }
}

方法三:使用Callable和Future

import java.util.concurrent.*;

// 使用Callable创建有返回值的线程
class CalculationTask implements Callable<Integer> {
    private int number;
    
    public CalculationTask(int number) {
        this.number = number;
    }
    
    @Override
    public Integer call() throws Exception {
        System.out.println("开始计算: " + number + "的平方");
        Thread.sleep(2000); // 模拟耗时操作
        int result = number * number;
        System.out.println("计算完成: " + number + "^2 = " + result);
        return result;
    }
}

public class CallableDemo {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交多个计算任务
        Future<Integer> future1 = executor.submit(new CalculationTask(5));
        Future<Integer> future2 = executor.submit(new CalculationTask(10));
        Future<Integer> future3 = executor.submit(new CalculationTask(15));
        
        try {
            // 获取计算结果
            System.out.println("结果1: " + future1.get());
            System.out.println("结果2: " + future2.get());
            System.out.println("结果3: " + future3.get());
            
            // 计算总和
            int sum = future1.get() + future2.get() + future3.get();
            System.out.println("总和: " + sum);
            
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }
    }
}

11.2 线程同步

11.2.1 同步问题的产生

当多个线程同时访问共享资源时,可能会出现数据不一致的问题:

// 线程不安全的计数器
class UnsafeCounter {
    private int count = 0;
    
    public void increment() {
        count++; // 这不是原子操作!
    }
    
    public int getCount() {
        return count;
    }
}

public class UnsafeCounterDemo {
    public static void main(String[] args) throws InterruptedException {
        UnsafeCounter counter = new UnsafeCounter();
        
        // 创建1000个线程,每个线程增加计数器1000次
        Thread[] threads = new Thread[1000];
        for (int i = 0; i < 1000; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            });
        }
        
        // 启动所有线程
        for (Thread thread : threads) {
            thread.start();
        }
        
        // 等待所有线程完成
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("期望结果: 1000000");
        System.out.println("实际结果: " + counter.getCount());
        // 实际结果通常小于1000000,说明存在线程安全问题
    }
}

11.2.2 synchronized关键字

同步方法

// 线程安全的计数器 - 使用同步方法
class SafeCounter {
    private int count = 0;
    
    // 同步方法
    public synchronized void increment() {
        count++;
    }
    
    public synchronized void decrement() {
        count--;
    }
    
    public synchronized int getCount() {
        return count;
    }
    
    // 静态同步方法
    private static int staticCount = 0;
    
    public static synchronized void incrementStatic() {
        staticCount++;
    }
    
    public static synchronized int getStaticCount() {
        return staticCount;
    }
}

public class SafeCounterDemo {
    public static void main(String[] args) throws InterruptedException {
        SafeCounter counter = new SafeCounter();
        
        Thread[] threads = new Thread[1000];
        for (int i = 0; i < 1000; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("同步方法结果: " + counter.getCount());
        // 结果应该是1000000
    }
}

同步代码块

// 使用同步代码块
class BankAccount {
    private double balance;
    private final Object lock = new Object();
    
    public BankAccount(double initialBalance) {
        this.balance = initialBalance;
    }
    
    public void deposit(double amount) {
        synchronized(lock) {
            double newBalance = balance + amount;
            // 模拟处理时间
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            balance = newBalance;
            System.out.println(Thread.currentThread().getName() + 
                " 存款: " + amount + ", 余额: " + balance);
        }
    }
    
    public boolean withdraw(double amount) {
        synchronized(lock) {
            if (balance >= amount) {
                double newBalance = balance - amount;
                // 模拟处理时间
                try {
                    Thread.sleep(1);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                balance = newBalance;
                System.out.println(Thread.currentThread().getName() + 
                    " 取款: " + amount + ", 余额: " + balance);
                return true;
            } else {
                System.out.println(Thread.currentThread().getName() + 
                    " 取款失败,余额不足: " + balance);
                return false;
            }
        }
    }
    
    public double getBalance() {
        synchronized(lock) {
            return balance;
        }
    }
}

public class BankAccountDemo {
    public static void main(String[] args) throws InterruptedException {
        BankAccount account = new BankAccount(1000);
        
        // 创建多个线程进行并发操作
        Thread[] threads = new Thread[10];
        
        for (int i = 0; i < 10; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                if (threadId % 2 == 0) {
                    account.deposit(100);
                } else {
                    account.withdraw(150);
                }
            }, "Thread-" + threadId);
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("最终余额: " + account.getBalance());
    }
}

11.2.3 volatile关键字

// volatile关键字演示
public class VolatileDemo {
    // 不使用volatile可能导致线程看不到变化
    private static volatile boolean flag = false;
    private static int counter = 0;
    
    public static void main(String[] args) throws InterruptedException {
        // 线程1:修改flag的值
        Thread thread1 = new Thread(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            flag = true;
            System.out.println("线程1设置flag为true");
        });
        
        // 线程2:监听flag的变化
        Thread thread2 = new Thread(() -> {
            while (!flag) {
                counter++;
                // 不加这个输出,可能会因为JIT优化导致死循环
                if (counter % 100000000 == 0) {
                    System.out.println("线程2还在等待,计数: " + counter);
                }
            }
            System.out.println("线程2检测到flag变化,退出循环,最终计数: " + counter);
        });
        
        thread2.start();
        thread1.start();
        
        thread1.join();
        thread2.join();
        
        System.out.println("程序结束");
    }
}

// volatile的典型应用:单例模式(双重检查锁定)
class Singleton {
    private static volatile Singleton instance;
    
    private Singleton() {
        // 私有构造函数
    }
    
    public static Singleton getInstance() {
        if (instance == null) {
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

11.2.4 Lock接口和ReentrantLock

import java.util.concurrent.locks.*;
import java.util.concurrent.TimeUnit;

// 使用ReentrantLock的示例
class AdvancedCounter {
    private int count = 0;
    private final ReentrantLock lock = new ReentrantLock();
    
    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
    
    public boolean tryIncrement() {
        if (lock.tryLock()) {
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }
    
    public boolean tryIncrementWithTimeout() {
        try {
            if (lock.tryLock(1, TimeUnit.SECONDS)) {
                try {
                    count++;
                    Thread.sleep(500); // 模拟耗时操作
                    return true;
                } finally {
                    lock.unlock();
                }
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return false;
    }
    
    public int getCount() {
        lock.lock();
        try {
            return count;
        } finally {
            lock.unlock();
        }
    }
    
    public void printLockInfo() {
        System.out.println("锁的持有者数量: " + lock.getHoldCount());
        System.out.println("等待锁的线程数: " + lock.getQueueLength());
        System.out.println("是否为公平锁: " + lock.isFair());
    }
}

public class ReentrantLockDemo {
    public static void main(String[] args) throws InterruptedException {
        AdvancedCounter counter = new AdvancedCounter();
        
        // 测试基本锁功能
        Thread[] threads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter.increment();
                }
                System.out.println("线程" + threadId + "完成");
            });
        }
        
        for (Thread thread : threads) {
            thread.start();
        }
        
        for (Thread thread : threads) {
            thread.join();
        }
        
        System.out.println("基本锁测试结果: " + counter.getCount());
        
        // 测试tryLock功能
        Thread tryLockThread1 = new Thread(() -> {
            if (counter.tryIncrement()) {
                System.out.println("tryLock成功");
            } else {
                System.out.println("tryLock失败");
            }
        });
        
        Thread tryLockThread2 = new Thread(() -> {
            if (counter.tryIncrementWithTimeout()) {
                System.out.println("tryLock with timeout成功");
            } else {
                System.out.println("tryLock with timeout失败");
            }
        });
        
        tryLockThread1.start();
        tryLockThread2.start();
        
        tryLockThread1.join();
        tryLockThread2.join();
        
        System.out.println("最终结果: " + counter.getCount());
        counter.printLockInfo();
    }
}

11.2.5 ReadWriteLock读写锁

import java.util.concurrent.locks.*;
import java.util.*;

// 使用读写锁的缓存示例
class ReadWriteCache {
    private final Map<String, String> cache = new HashMap<>();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();
    private final Lock writeLock = lock.writeLock();
    
    public String get(String key) {
        readLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 读取: " + key);
            Thread.sleep(100); // 模拟读取耗时
            return cache.get(key);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            readLock.unlock();
        }
    }
    
    public void put(String key, String value) {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 写入: " + key + " = " + value);
            Thread.sleep(200); // 模拟写入耗时
            cache.put(key, value);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            writeLock.unlock();
        }
    }
    
    public void remove(String key) {
        writeLock.lock();
        try {
            System.out.println(Thread.currentThread().getName() + " 删除: " + key);
            Thread.sleep(150); // 模拟删除耗时
            cache.remove(key);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            writeLock.unlock();
        }
    }
    
    public int size() {
        readLock.lock();
        try {
            return cache.size();
        } finally {
            readLock.unlock();
        }
    }
}

public class ReadWriteLockDemo {
    public static void main(String[] args) throws InterruptedException {
        ReadWriteCache cache = new ReadWriteCache();
        
        // 初始化一些数据
        cache.put("key1", "value1");
        cache.put("key2", "value2");
        cache.put("key3", "value3");
        
        // 创建多个读线程
        Thread[] readThreads = new Thread[5];
        for (int i = 0; i < 5; i++) {
            final int threadId = i;
            readThreads[i] = new Thread(() -> {
                for (int j = 0; j < 3; j++) {
                    String value = cache.get("key" + (j + 1));
                    System.out.println("读线程" + threadId + " 读取到: " + value);
                }
            }, "ReadThread-" + i);
        }
        
        // 创建写线程
        Thread writeThread = new Thread(() -> {
            cache.put("key4", "value4");
            cache.put("key5", "value5");
        }, "WriteThread");
        
        // 启动所有线程
        for (Thread thread : readThreads) {
            thread.start();
        }
        writeThread.start();
        
        // 等待所有线程完成
        for (Thread thread : readThreads) {
            thread.join();
        }
        writeThread.join();
        
        System.out.println("缓存大小: " + cache.size());
    }
}

11.3 线程间通信

11.3.1 wait()、notify()和notifyAll()

// 生产者消费者模式
class SharedBuffer {
    private final Object[] buffer;
    private int count = 0;
    private int in = 0;
    private int out = 0;
    
    public SharedBuffer(int size) {
        buffer = new Object[size];
    }
    
    public synchronized void put(Object item) throws InterruptedException {
        while (count == buffer.length) {
            System.out.println(Thread.currentThread().getName() + " 缓冲区满,等待...");
            wait(); // 缓冲区满,等待
        }
        
        buffer[in] = item;
        in = (in + 1) % buffer.length;
        count++;
        
        System.out.println(Thread.currentThread().getName() + " 生产: " + item + 
            ", 缓冲区数量: " + count);
        
        notifyAll(); // 通知等待的消费者
    }
    
    public synchronized Object take() throws InterruptedException {
        while (count == 0) {
            System.out.println(Thread.currentThread().getName() + " 缓冲区空,等待...");
            wait(); // 缓冲区空,等待
        }
        
        Object item = buffer[out];
        buffer[out] = null;
        out = (out + 1) % buffer.length;
        count--;
        
        System.out.println(Thread.currentThread().getName() + " 消费: " + item + 
            ", 缓冲区数量: " + count);
        
        notifyAll(); // 通知等待的生产者
        return item;
    }
}

// 生产者
class Producer implements Runnable {
    private final SharedBuffer buffer;
    private final String name;
    
    public Producer(SharedBuffer buffer, String name) {
        this.buffer = buffer;
        this.name = name;
    }
    
    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                String item = name + "-Item" + i;
                buffer.put(item);
                Thread.sleep(1000); // 模拟生产时间
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

// 消费者
class Consumer implements Runnable {
    private final SharedBuffer buffer;
    private final String name;
    
    public Consumer(SharedBuffer buffer, String name) {
        this.buffer = buffer;
        this.name = name;
    }
    
    @Override
    public void run() {
        try {
            for (int i = 1; i <= 5; i++) {
                Object item = buffer.take();
                Thread.sleep(1500); // 模拟消费时间
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

public class ProducerConsumerDemo {
    public static void main(String[] args) throws InterruptedException {
        SharedBuffer buffer = new SharedBuffer(3); // 缓冲区大小为3
        
        // 创建生产者和消费者
        Thread producer1 = new Thread(new Producer(buffer, "生产者1"));
        Thread producer2 = new Thread(new Producer(buffer, "生产者2"));
        Thread consumer1 = new Thread(new Consumer(buffer, "消费者1"));
        Thread consumer2 = new Thread(new Consumer(buffer, "消费者2"));
        
        // 启动线程
        producer1.start();
        producer2.start();
        consumer1.start();
        consumer2.start();
        
        // 等待所有线程完成
        producer1.join();
        producer2.join();
        consumer1.join();
        consumer2.join();
        
        System.out.println("所有线程执行完毕");
    }
}

11.3.2 Condition接口

import java.util.concurrent.locks.*;

// 使用Condition实现的生产者消费者
class AdvancedBuffer {
    private final Object[] buffer;
    private int count = 0;
    private int in = 0;
    private int out = 0;
    
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    
    public AdvancedBuffer(int size) {
        buffer = new Object[size];
    }
    
    public void put(Object item) throws InterruptedException {
        lock.lock();
        try {
            while (count == buffer.length) {
                System.out.println(Thread.currentThread().getName() + " 等待缓冲区不满");
                notFull.await();
            }
            
            buffer[in] = item;
            in = (in + 1) % buffer.length;
            count++;
            
            System.out.println(Thread.currentThread().getName() + " 生产: " + item + 
                ", 缓冲区数量: " + count);
            
            notEmpty.signalAll(); // 通知消费者
        } finally {
            lock.unlock();
        }
    }
    
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0) {
                System.out.println(Thread.currentThread().getName() + " 等待缓冲区不空");
                notEmpty.await();
            }
            
            Object item = buffer[out];
            buffer[out] = null;
            out = (out + 1) % buffer.length;
            count--;
            
            System.out.println(Thread.currentThread().getName() + " 消费: " + item + 
                ", 缓冲区数量: " + count);
            
            notFull.signalAll(); // 通知生产者
            return item;
        } finally {
            lock.unlock();
        }
    }
}

public class ConditionDemo {
    public static void main(String[] args) throws InterruptedException {
        AdvancedBuffer buffer = new AdvancedBuffer(2);
        
        // 创建多个生产者和消费者
        Thread[] producers = new Thread[2];
        Thread[] consumers = new Thread[2];
        
        for (int i = 0; i < 2; i++) {
            final int id = i;
            producers[i] = new Thread(() -> {
                try {
                    for (int j = 1; j <= 3; j++) {
                        buffer.put("P" + id + "-" + j);
                        Thread.sleep(800);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Producer-" + i);
            
            consumers[i] = new Thread(() -> {
                try {
                    for (int j = 1; j <= 3; j++) {
                        buffer.take();
                        Thread.sleep(1200);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Consumer-" + i);
        }
        
        // 启动所有线程
        for (int i = 0; i < 2; i++) {
            producers[i].start();
            consumers[i].start();
        }
        
        // 等待所有线程完成
        for (int i = 0; i < 2; i++) {
            producers[i].join();
            consumers[i].join();
        }
        
        System.out.println("程序执行完毕");
    }
}

11.4 并发工具类

11.4.1 CountDownLatch

import java.util.concurrent.*;

// CountDownLatch示例:等待所有任务完成
public class CountDownLatchDemo {
    public static void main(String[] args) throws InterruptedException {
        int taskCount = 5;
        CountDownLatch latch = new CountDownLatch(taskCount);
        
        System.out.println("开始执行任务...");
        
        // 创建并启动多个任务
        for (int i = 1; i <= taskCount; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    System.out.println("任务" + taskId + "开始执行");
                    Thread.sleep((int)(Math.random() * 3000) + 1000); // 随机执行时间
                    System.out.println("任务" + taskId + "执行完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown(); // 任务完成,计数器减1
                }
            }, "Task-" + taskId).start();
        }
        
        System.out.println("等待所有任务完成...");
        latch.await(); // 等待计数器归零
        System.out.println("所有任务已完成!");
    }
}

// 实际应用:服务启动协调
class ServiceManager {
    private final CountDownLatch startupLatch;
    private final String[] services = {"数据库", "缓存", "消息队列", "Web服务", "监控服务"};
    
    public ServiceManager() {
        this.startupLatch = new CountDownLatch(services.length);
    }
    
    public void startAllServices() {
        System.out.println("开始启动所有服务...");
        
        for (String service : services) {
            new Thread(() -> {
                try {
                    System.out.println(service + " 正在启动...");
                    Thread.sleep((int)(Math.random() * 2000) + 500);
                    System.out.println(service + " 启动完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    System.out.println(service + " 启动失败");
                } finally {
                    startupLatch.countDown();
                }
            }, service + "-Thread").start();
        }
    }
    
    public void waitForAllServices() throws InterruptedException {
        startupLatch.await();
        System.out.println("所有服务启动完成,系统就绪!");
    }
    
    public static void main(String[] args) throws InterruptedException {
        ServiceManager manager = new ServiceManager();
        manager.startAllServices();
        manager.waitForAllServices();
    }
}

11.4.2 CyclicBarrier

import java.util.concurrent.*;

// CyclicBarrier示例:多线程协同工作
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        int playerCount = 4;
        CyclicBarrier barrier = new CyclicBarrier(playerCount, () -> {
            System.out.println("所有玩家准备就绪,游戏开始!\n");
        });
        
        // 创建玩家线程
        for (int i = 1; i <= playerCount; i++) {
            final int playerId = i;
            new Thread(() -> {
                try {
                    // 第一阶段:加载游戏资源
                    System.out.println("玩家" + playerId + " 正在加载游戏资源...");
                    Thread.sleep((int)(Math.random() * 2000) + 1000);
                    System.out.println("玩家" + playerId + " 资源加载完成");
                    
                    barrier.await(); // 等待所有玩家加载完成
                    
                    // 第二阶段:选择角色
                    System.out.println("玩家" + playerId + " 正在选择角色...");
                    Thread.sleep((int)(Math.random() * 1500) + 500);
                    System.out.println("玩家" + playerId + " 角色选择完成");
                    
                    barrier.await(); // 等待所有玩家选择角色
                    
                    // 第三阶段:进入游戏
                    System.out.println("玩家" + playerId + " 进入游戏世界");
                    
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println("玩家" + playerId + " 遇到错误: " + e.getMessage());
                }
            }, "Player-" + playerId).start();
        }
    }
}

// 实际应用:并行计算
class ParallelCalculation {
    private final int[] data;
    private final int[] results;
    private final CyclicBarrier barrier;
    private final int threadCount;
    
    public ParallelCalculation(int[] data, int threadCount) {
        this.data = data;
        this.results = new int[data.length];
        this.threadCount = threadCount;
        this.barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有线程计算完成,开始汇总结果");
        });
    }
    
    public void calculate() throws InterruptedException {
        int chunkSize = data.length / threadCount;
        
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            final int start = i * chunkSize;
            final int end = (i == threadCount - 1) ? data.length : (i + 1) * chunkSize;
            
            new Thread(() -> {
                try {
                    System.out.println("线程" + threadId + " 开始计算范围 [" + start + ", " + end + ")");
                    
                    // 执行计算(这里简单地将每个元素乘以2)
                    for (int j = start; j < end; j++) {
                        results[j] = data[j] * 2;
                        Thread.sleep(10); // 模拟计算时间
                    }
                    
                    System.out.println("线程" + threadId + " 计算完成");
                    barrier.await(); // 等待所有线程完成
                    
                    // 所有线程都完成后,可以进行后续处理
                    if (threadId == 0) {
                        int sum = 0;
                        for (int result : results) {
                            sum += result;
                        }
                        System.out.println("计算结果总和: " + sum);
                    }
                    
                } catch (InterruptedException | BrokenBarrierException e) {
                    System.out.println("线程" + threadId + " 遇到错误: " + e.getMessage());
                }
            }, "CalcThread-" + threadId).start();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        int[] data = new int[20];
        for (int i = 0; i < data.length; i++) {
            data[i] = i + 1;
        }
        
        ParallelCalculation calc = new ParallelCalculation(data, 4);
        calc.calculate();
        
        Thread.sleep(5000); // 等待计算完成
    }
}

11.4.3 Semaphore

import java.util.concurrent.*;

// Semaphore示例:限制资源访问
public class SemaphoreDemo {
    public static void main(String[] args) {
        // 模拟停车场,只有3个停车位
        Semaphore parkingLot = new Semaphore(3);
        
        // 创建10辆车尝试停车
        for (int i = 1; i <= 10; i++) {
            final int carId = i;
            new Thread(() -> {
                try {
                    System.out.println("车辆" + carId + " 到达停车场");
                    
                    parkingLot.acquire(); // 获取停车位
                    System.out.println("车辆" + carId + " 成功停车,剩余车位: " + 
                        parkingLot.availablePermits());
                    
                    // 模拟停车时间
                    Thread.sleep((int)(Math.random() * 3000) + 1000);
                    
                    System.out.println("车辆" + carId + " 离开停车场");
                    parkingLot.release(); // 释放停车位
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Car-" + carId).start();
        }
    }
}

// 实际应用:数据库连接池
class DatabaseConnectionPool {
    private final Semaphore connections;
    private final String[] connectionPool;
    private final boolean[] used;
    
    public DatabaseConnectionPool(int poolSize) {
        this.connections = new Semaphore(poolSize);
        this.connectionPool = new String[poolSize];
        this.used = new boolean[poolSize];
        
        // 初始化连接池
        for (int i = 0; i < poolSize; i++) {
            connectionPool[i] = "Connection-" + (i + 1);
        }
    }
    
    public String getConnection() throws InterruptedException {
        connections.acquire(); // 获取连接许可
        
        synchronized (this) {
            for (int i = 0; i < connectionPool.length; i++) {
                if (!used[i]) {
                    used[i] = true;
                    System.out.println(Thread.currentThread().getName() + 
                        " 获取连接: " + connectionPool[i]);
                    return connectionPool[i];
                }
            }
        }
        return null; // 理论上不会到达这里
    }
    
    public void releaseConnection(String connection) {
        synchronized (this) {
            for (int i = 0; i < connectionPool.length; i++) {
                if (connectionPool[i].equals(connection) && used[i]) {
                    used[i] = false;
                    System.out.println(Thread.currentThread().getName() + 
                        " 释放连接: " + connection);
                    break;
                }
            }
        }
        connections.release(); // 释放连接许可
    }
    
    public int getAvailableConnections() {
        return connections.availablePermits();
    }
    
    public static void main(String[] args) throws InterruptedException {
        DatabaseConnectionPool pool = new DatabaseConnectionPool(3);
        
        // 创建多个线程模拟数据库操作
        for (int i = 1; i <= 8; i++) {
            final int taskId = i;
            new Thread(() -> {
                try {
                    String connection = pool.getConnection();
                    
                    // 模拟数据库操作
                    System.out.println("任务" + taskId + " 正在执行数据库操作...");
                    Thread.sleep((int)(Math.random() * 2000) + 1000);
                    System.out.println("任务" + taskId + " 数据库操作完成");
                    
                    pool.releaseConnection(connection);
                    
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }, "Task-" + taskId).start();
        }
        
        // 监控连接池状态
        new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000);
                    System.out.println("[监控] 可用连接数: " + pool.getAvailableConnections());
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Monitor").start();
    }
}

11.5 线程池

11.5.1 线程池概念

线程池是一种多线程处理形式,它预先创建了若干个线程,这些线程在没有任务处理时处于等待状态,当有任务来临时就可以获取一个线程来处理这个任务,任务处理完后线程并不会被销毁,而是等待下一个任务。

线程池的优势: 1. 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的消耗 2. 提高响应速度:任务到达时,无需等待线程创建就能立即执行 3. 提高线程的可管理性:统一分配、调优和监控 4. 提供更多功能:定时执行、周期执行等

11.5.2 ExecutorService接口

import java.util.concurrent.*;
import java.util.*;

// ExecutorService基本使用
public class ExecutorServiceDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        // 创建固定大小的线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        System.out.println("提交任务到线程池...");
        
        // 提交Runnable任务
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务" + taskId + " 开始执行 - " + 
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                System.out.println("任务" + taskId + " 执行完成");
            });
        }
        
        // 提交Callable任务
        List<Future<Integer>> futures = new ArrayList<>();
        for (int i = 1; i <= 3; i++) {
            final int num = i;
            Future<Integer> future = executor.submit(() -> {
                System.out.println("计算任务" + num + " 开始 - " + 
                    Thread.currentThread().getName());
                Thread.sleep(1000);
                int result = num * num;
                System.out.println("计算任务" + num + " 完成,结果: " + result);
                return result;
            });
            futures.add(future);
        }
        
        // 获取Callable任务的结果
        System.out.println("\n获取计算结果:");
        for (int i = 0; i < futures.size(); i++) {
            Integer result = futures.get(i).get();
            System.out.println("计算任务" + (i + 1) + " 结果: " + result);
        }
        
        // 关闭线程池
        executor.shutdown();
        
        // 等待所有任务完成
        if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
            System.out.println("强制关闭线程池");
            executor.shutdownNow();
        }
        
        System.out.println("所有任务完成,程序结束");
    }
}

11.5.3 不同类型的线程池

import java.util.concurrent.*;

// 不同类型线程池的演示
public class ThreadPoolTypesDemo {
    
    // 固定大小线程池
    public static void fixedThreadPoolDemo() {
        System.out.println("=== 固定大小线程池演示 ===");
        ExecutorService executor = Executors.newFixedThreadPool(2);
        
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Fixed Pool - 任务" + taskId + " - " + 
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
    
    // 缓存线程池
    public static void cachedThreadPoolDemo() {
        System.out.println("\n=== 缓存线程池演示 ===");
        ExecutorService executor = Executors.newCachedThreadPool();
        
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Cached Pool - 任务" + taskId + " - " + 
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
    
    // 单线程线程池
    public static void singleThreadExecutorDemo() {
        System.out.println("\n=== 单线程线程池演示 ===");
        ExecutorService executor = Executors.newSingleThreadExecutor();
        
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("Single Thread - 任务" + taskId + " - " + 
                    Thread.currentThread().getName());
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        executor.shutdown();
    }
    
    // 定时任务线程池
    public static void scheduledThreadPoolDemo() throws InterruptedException {
        System.out.println("\n=== 定时任务线程池演示 ===");
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
        
        // 延迟执行
        executor.schedule(() -> {
            System.out.println("延迟任务执行 - " + Thread.currentThread().getName());
        }, 2, TimeUnit.SECONDS);
        
        // 固定频率执行
        ScheduledFuture<?> future1 = executor.scheduleAtFixedRate(() -> {
            System.out.println("固定频率任务 - " + Thread.currentThread().getName() + 
                " - " + System.currentTimeMillis());
        }, 1, 1, TimeUnit.SECONDS);
        
        // 固定延迟执行
        ScheduledFuture<?> future2 = executor.scheduleWithFixedDelay(() -> {
            System.out.println("固定延迟任务 - " + Thread.currentThread().getName());
            try {
                Thread.sleep(500); // 任务执行时间
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, 1, 1, TimeUnit.SECONDS);
        
        // 运行5秒后取消定时任务
        Thread.sleep(5000);
        future1.cancel(true);
        future2.cancel(true);
        
        executor.shutdown();
    }
    
    public static void main(String[] args) throws InterruptedException {
        fixedThreadPoolDemo();
        Thread.sleep(3000);
        
        cachedThreadPoolDemo();
        Thread.sleep(3000);
        
        singleThreadExecutorDemo();
        Thread.sleep(3000);
        
        scheduledThreadPoolDemo();
    }
}

11.5.4 ThreadPoolExecutor详解

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

// 自定义ThreadPoolExecutor
public class CustomThreadPoolDemo {
    
    // 自定义线程工厂
    static class CustomThreadFactory implements ThreadFactory {
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        
        public CustomThreadFactory(String namePrefix) {
            this.namePrefix = namePrefix;
        }
        
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
            thread.setDaemon(false);
            thread.setPriority(Thread.NORM_PRIORITY);
            return thread;
        }
    }
    
    // 自定义拒绝策略
    static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("任务被拒绝: " + r.toString() + 
                ", 线程池状态: 活跃线程=" + executor.getActiveCount() + 
                ", 队列大小=" + executor.getQueue().size());
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 创建自定义线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2,                                    // 核心线程数
            4,                                    // 最大线程数
            60L,                                  // 空闲线程存活时间
            TimeUnit.SECONDS,                     // 时间单位
            new ArrayBlockingQueue<>(3),          // 工作队列
            new CustomThreadFactory("MyPool"),    // 线程工厂
            new CustomRejectedExecutionHandler()  // 拒绝策略
        );
        
        // 监控线程池状态
        Thread monitorThread = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    System.out.println(String.format(
                        "[监控] 核心线程数: %d, 当前线程数: %d, 活跃线程数: %d, " +
                        "队列任务数: %d, 已完成任务数: %d",
                        executor.getCorePoolSize(),
                        executor.getPoolSize(),
                        executor.getActiveCount(),
                        executor.getQueue().size(),
                        executor.getCompletedTaskCount()
                    ));
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        monitorThread.setDaemon(true);
        monitorThread.start();
        
        // 提交任务
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            try {
                executor.submit(() -> {
                    System.out.println("任务" + taskId + " 开始执行 - " + 
                        Thread.currentThread().getName());
                    try {
                        Thread.sleep(3000); // 模拟耗时任务
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    System.out.println("任务" + taskId + " 执行完成");
                });
                Thread.sleep(200); // 控制任务提交速度
            } catch (RejectedExecutionException e) {
                System.out.println("任务" + taskId + " 提交失败: " + e.getMessage());
            }
        }
        
        // 等待任务完成
        Thread.sleep(15000);
        
        // 关闭线程池
        executor.shutdown();
        if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
            executor.shutdownNow();
        }
        
        monitorThread.interrupt();
        System.out.println("程序结束");
    }
}

11.5.5 Future和CompletableFuture

import java.util.concurrent.*;
import java.util.*;

// Future基本使用
public class FutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 提交Callable任务
        Future<String> future1 = executor.submit(() -> {
            Thread.sleep(2000);
            return "任务1完成";
        });
        
        Future<String> future2 = executor.submit(() -> {
            Thread.sleep(1000);
            return "任务2完成";
        });
        
        Future<String> future3 = executor.submit(() -> {
            Thread.sleep(3000);
            return "任务3完成";
        });
        
        System.out.println("任务已提交,开始等待结果...");
        
        // 检查任务状态
        System.out.println("任务1是否完成: " + future1.isDone());
        System.out.println("任务2是否完成: " + future2.isDone());
        System.out.println("任务3是否完成: " + future3.isDone());
        
        // 获取结果(阻塞等待)
        System.out.println("结果1: " + future1.get());
        System.out.println("结果2: " + future2.get());
        
        // 带超时的获取结果
        try {
            String result3 = future3.get(1, TimeUnit.SECONDS);
            System.out.println("结果3: " + result3);
        } catch (TimeoutException e) {
            System.out.println("任务3超时,取消任务");
            future3.cancel(true);
        }
        
        executor.shutdown();
    }
}

// CompletableFuture演示
class CompletableFutureDemo {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        System.out.println("=== CompletableFuture基本使用 ===");
        
        // 创建已完成的Future
        CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello");
        System.out.println("已完成的Future: " + completedFuture.get());
        
        // 异步执行
        CompletableFuture<String> asyncFuture = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "异步任务完成";
        });
        
        System.out.println("异步任务结果: " + asyncFuture.get());
        
        // 链式操作
        CompletableFuture<String> chainFuture = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("第一步: 获取数据");
                return "原始数据";
            })
            .thenApply(data -> {
                System.out.println("第二步: 处理数据 - " + data);
                return data.toUpperCase();
            })
            .thenApply(data -> {
                System.out.println("第三步: 格式化数据 - " + data);
                return "[" + data + "]";
            });
        
        System.out.println("链式操作结果: " + chainFuture.get());
        
        // 组合多个Future
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Future1";
        });
        
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(800);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Future2";
        });
        
        // 等待所有完成
        CompletableFuture<String> combinedFuture = future1.thenCombine(future2, 
            (result1, result2) -> result1 + " + " + result2);
        
        System.out.println("组合结果: " + combinedFuture.get());
        
        // 任意一个完成
        CompletableFuture<String> anyFuture = future1.applyToEither(future2, 
            result -> "最快完成: " + result);
        
        System.out.println("最快结果: " + anyFuture.get());
    }
}

11.6 并发集合

11.6.1 ConcurrentHashMap

import java.util.concurrent.*;
import java.util.*;

// ConcurrentHashMap演示
public class ConcurrentHashMapDemo {
    public static void main(String[] args) throws InterruptedException {
        // 对比HashMap和ConcurrentHashMap的线程安全性
        testThreadSafety();
        
        // ConcurrentHashMap的特殊方法
        demonstrateSpecialMethods();
    }
    
    public static void testThreadSafety() throws InterruptedException {
        System.out.println("=== 线程安全性测试 ===");
        
        // 使用HashMap(线程不安全)
        Map<String, Integer> hashMap = new HashMap<>();
        
        // 使用ConcurrentHashMap(线程安全)
        Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
        
        int threadCount = 10;
        int operationsPerThread = 1000;
        
        // 测试HashMap
        Thread[] hashMapThreads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            hashMapThreads[i] = new Thread(() -> {
                for (int j = 0; j < operationsPerThread; j++) {
                    String key = "key" + (j % 100);
                    hashMap.put(key, hashMap.getOrDefault(key, 0) + 1);
                }
            });
        }
        
        long startTime = System.currentTimeMillis();
        for (Thread thread : hashMapThreads) {
            thread.start();
        }
        for (Thread thread : hashMapThreads) {
            thread.join();
        }
        long hashMapTime = System.currentTimeMillis() - startTime;
        
        // 测试ConcurrentHashMap
        Thread[] concurrentMapThreads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++) {
            final int threadId = i;
            concurrentMapThreads[i] = new Thread(() -> {
                for (int j = 0; j < operationsPerThread; j++) {
                    String key = "key" + (j % 100);
                    concurrentMap.put(key, concurrentMap.getOrDefault(key, 0) + 1);
                }
            });
        }
        
        startTime = System.currentTimeMillis();
        for (Thread thread : concurrentMapThreads) {
            thread.start();
        }
        for (Thread thread : concurrentMapThreads) {
            thread.join();
        }
        long concurrentMapTime = System.currentTimeMillis() - startTime;
        
        System.out.println("HashMap大小: " + hashMap.size() + ", 耗时: " + hashMapTime + "ms");
        System.out.println("ConcurrentHashMap大小: " + concurrentMap.size() + ", 耗时: " + concurrentMapTime + "ms");
        
        // 验证数据一致性
        int expectedSum = threadCount * operationsPerThread;
        int hashMapSum = hashMap.values().stream().mapToInt(Integer::intValue).sum();
        int concurrentMapSum = concurrentMap.values().stream().mapToInt(Integer::intValue).sum();
        
        System.out.println("期望总和: " + expectedSum);
        System.out.println("HashMap实际总和: " + hashMapSum + " (" + 
            (hashMapSum == expectedSum ? "正确" : "错误") + ")");
        System.out.println("ConcurrentHashMap实际总和: " + concurrentMapSum + " (" + 
            (concurrentMapSum == expectedSum ? "正确" : "错误") + ")");
    }
    
    public static void demonstrateSpecialMethods() {
        System.out.println("\n=== ConcurrentHashMap特殊方法演示 ===");
        
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
        
        // putIfAbsent
        map.putIfAbsent("count", 0);
        map.putIfAbsent("count", 10); // 不会覆盖
        System.out.println("putIfAbsent结果: " + map.get("count"));
        
        // replace
        map.replace("count", 0, 1); // 原子性的条件替换
        System.out.println("replace结果: " + map.get("count"));
        
        // compute
        map.compute("count", (key, value) -> value == null ? 1 : value + 1);
        System.out.println("compute结果: " + map.get("count"));
        
        // computeIfAbsent
        map.computeIfAbsent("newKey", key -> key.length());
        System.out.println("computeIfAbsent结果: " + map.get("newKey"));
        
        // computeIfPresent
        map.computeIfPresent("count", (key, value) -> value * 2);
        System.out.println("computeIfPresent结果: " + map.get("count"));
        
        // merge
        map.merge("count", 5, Integer::sum);
        System.out.println("merge结果: " + map.get("count"));
        
        System.out.println("最终map内容: " + map);
    }
}

11.6.2 其他并发集合

import java.util.concurrent.*;
import java.util.*;

// 其他并发集合演示
public class ConcurrentCollectionsDemo {
    
    // CopyOnWriteArrayList演示
    public static void copyOnWriteArrayListDemo() throws InterruptedException {
        System.out.println("=== CopyOnWriteArrayList演示 ===");
        
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
        
        // 添加初始数据
        list.add("Item1");
        list.add("Item2");
        list.add("Item3");
        
        // 读线程
        Thread readerThread = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                System.out.println("读取: " + list);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Reader");
        
        // 写线程
        Thread writerThread = new Thread(() -> {
            for (int i = 4; i <= 8; i++) {
                list.add("Item" + i);
                System.out.println("添加: Item" + i);
                try {
                    Thread.sleep(800);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Writer");
        
        readerThread.start();
        writerThread.start();
        
        readerThread.join();
        writerThread.join();
        
        System.out.println("最终列表: " + list);
    }
    
    // BlockingQueue演示
    public static void blockingQueueDemo() throws InterruptedException {
        System.out.println("\n=== BlockingQueue演示 ===");
        
        BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
        
        // 生产者
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    String item = "Product" + i;
                    queue.put(item); // 阻塞式添加
                    System.out.println("生产: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(1000);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer");
        
        // 消费者
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    String item = queue.take(); // 阻塞式获取
                    System.out.println("消费: " + item + ", 队列大小: " + queue.size());
                    Thread.sleep(1500);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer");
        
        producer.start();
        consumer.start();
        
        producer.join();
        consumer.join();
    }
    
    // ConcurrentLinkedQueue演示
    public static void concurrentLinkedQueueDemo() throws InterruptedException {
        System.out.println("\n=== ConcurrentLinkedQueue演示 ===");
        
        ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
        
        // 多个生产者
        Thread[] producers = new Thread[3];
        for (int i = 0; i < 3; i++) {
            final int producerId = i;
            producers[i] = new Thread(() -> {
                for (int j = 1; j <= 5; j++) {
                    int value = producerId * 10 + j;
                    queue.offer(value);
                    System.out.println("生产者" + producerId + " 添加: " + value);
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }, "Producer-" + i);
        }
        
        // 多个消费者
        Thread[] consumers = new Thread[2];
        for (int i = 0; i < 2; i++) {
            final int consumerId = i;
            consumers[i] = new Thread(() -> {
                for (int j = 0; j < 8; j++) {
                    Integer value = queue.poll();
                    if (value != null) {
                        System.out.println("消费者" + consumerId + " 获取: " + value);
                    } else {
                        System.out.println("消费者" + consumerId + " 队列为空");
                        j--; // 重试
                    }
                    try {
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }, "Consumer-" + i);
        }
        
        // 启动所有线程
        for (Thread producer : producers) {
            producer.start();
        }
        for (Thread consumer : consumers) {
            consumer.start();
        }
        
        // 等待所有线程完成
        for (Thread producer : producers) {
            producer.join();
        }
        for (Thread consumer : consumers) {
            consumer.join();
        }
        
        System.out.println("剩余队列元素: " + queue);
    }
    
    public static void main(String[] args) throws InterruptedException {
        copyOnWriteArrayListDemo();
        Thread.sleep(1000);
        
        blockingQueueDemo();
        Thread.sleep(1000);
        
        concurrentLinkedQueueDemo();
    }
}

11.7 原子类

11.7.1 基本原子类

import java.util.concurrent.atomic.*;
import java.util.concurrent.*;

// 原子类演示
public class AtomicDemo {
    
    // 对比普通变量和原子变量
    public static void compareAtomicAndNormal() throws InterruptedException {
        System.out.println("=== 原子类与普通变量对比 ===");
        
        // 普通变量
        class NormalCounter {
            private volatile int count = 0;
            
            public void increment() {
                count++; // 非原子操作
            }
            
            public int getCount() {
                return count;
            }
        }
        
        // 原子变量
        class AtomicCounter {
            private AtomicInteger count = new AtomicInteger(0);
            
            public void increment() {
                count.incrementAndGet(); // 原子操作
            }
            
            public int getCount() {
                return count.get();
            }
        }
        
        NormalCounter normalCounter = new NormalCounter();
        AtomicCounter atomicCounter = new AtomicCounter();
        
        int threadCount = 10;
        int incrementsPerThread = 1000;
        
        // 测试普通计数器
        Thread[] normalThreads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++) {
            normalThreads[i] = new Thread(() -> {
                for (int j = 0; j < incrementsPerThread; j++) {
                    normalCounter.increment();
                }
            });
        }
        
        long startTime = System.currentTimeMillis();
        for (Thread thread : normalThreads) {
            thread.start();
        }
        for (Thread thread : normalThreads) {
            thread.join();
        }
        long normalTime = System.currentTimeMillis() - startTime;
        
        // 测试原子计数器
        Thread[] atomicThreads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++) {
            atomicThreads[i] = new Thread(() -> {
                for (int j = 0; j < incrementsPerThread; j++) {
                    atomicCounter.increment();
                }
            });
        }
        
        startTime = System.currentTimeMillis();
        for (Thread thread : atomicThreads) {
            thread.start();
        }
        for (Thread thread : atomicThreads) {
            thread.join();
        }
        long atomicTime = System.currentTimeMillis() - startTime;
        
        int expected = threadCount * incrementsPerThread;
        System.out.println("期望结果: " + expected);
        System.out.println("普通计数器: " + normalCounter.getCount() + 
            " (" + (normalCounter.getCount() == expected ? "正确" : "错误") + 
            "), 耗时: " + normalTime + "ms");
        System.out.println("原子计数器: " + atomicCounter.getCount() + 
            " (" + (atomicCounter.getCount() == expected ? "正确" : "错误") + 
            "), 耗时: " + atomicTime + "ms");
    }
    
    // 原子类的各种操作
    public static void demonstrateAtomicOperations() {
        System.out.println("\n=== 原子类操作演示 ===");
        
        AtomicInteger atomicInt = new AtomicInteger(10);
        AtomicLong atomicLong = new AtomicLong(100L);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AtomicReference<String> atomicRef = new AtomicReference<>("Hello");
        
        // AtomicInteger操作
        System.out.println("AtomicInteger初始值: " + atomicInt.get());
        System.out.println("incrementAndGet: " + atomicInt.incrementAndGet());
        System.out.println("getAndIncrement: " + atomicInt.getAndIncrement());
        System.out.println("当前值: " + atomicInt.get());
        System.out.println("addAndGet(5): " + atomicInt.addAndGet(5));
        System.out.println("compareAndSet(17, 20): " + atomicInt.compareAndSet(17, 20));
        System.out.println("当前值: " + atomicInt.get());
        
        // AtomicBoolean操作
        System.out.println("\nAtomicBoolean初始值: " + atomicBoolean.get());
        System.out.println("compareAndSet(false, true): " + atomicBoolean.compareAndSet(false, true));
        System.out.println("getAndSet(false): " + atomicBoolean.getAndSet(false));
        System.out.println("当前值: " + atomicBoolean.get());
        
        // AtomicReference操作
        System.out.println("\nAtomicReference初始值: " + atomicRef.get());
        System.out.println("compareAndSet(\"Hello\", \"World\"): " + 
            atomicRef.compareAndSet("Hello", "World"));
        System.out.println("当前值: " + atomicRef.get());
        
        // 使用updateAndGet
        atomicInt.updateAndGet(value -> value * 2);
        System.out.println("\nupdateAndGet(value -> value * 2): " + atomicInt.get());
        
        // 使用accumulateAndGet
        int result = atomicInt.accumulateAndGet(10, Integer::sum);
        System.out.println("accumulateAndGet(10, Integer::sum): " + result);
    }
    
    public static void main(String[] args) throws InterruptedException {
        compareAtomicAndNormal();
        demonstrateAtomicOperations();
    }
}

11.8 线程安全的最佳实践

11.8.1 线程安全策略

// 线程安全的最佳实践
public class ThreadSafetyBestPractices {
    
    // 1. 不可变对象
    public static final class ImmutablePerson {
        private final String name;
        private final int age;
        private final List<String> hobbies;
        
        public ImmutablePerson(String name, int age, List<String> hobbies) {
            this.name = name;
            this.age = age;
            // 防御性复制
            this.hobbies = Collections.unmodifiableList(new ArrayList<>(hobbies));
        }
        
        public String getName() { return name; }
        public int getAge() { return age; }
        public List<String> getHobbies() { return hobbies; }
        
        @Override
        public String toString() {
            return "ImmutablePerson{name='" + name + "', age=" + age + 
                ", hobbies=" + hobbies + "}";
        }
    }
    
    // 2. 线程局部变量
    private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT = 
        ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    
    public static String formatDate(Date date) {
        return DATE_FORMAT.get().format(date);
    }
    
    // 3. 同步工具类的使用
    public static class SafeResourceManager {
        private final Semaphore semaphore;
        private final List<String> resources;
        private final AtomicInteger usageCount = new AtomicInteger(0);
        
        public SafeResourceManager(List<String> resources) {
            this.resources = new ArrayList<>(resources);
            this.semaphore = new Semaphore(resources.size());
        }
        
        public String acquireResource() throws InterruptedException {
            semaphore.acquire();
            synchronized (resources) {
                if (!resources.isEmpty()) {
                    String resource = resources.remove(resources.size() - 1);
                    usageCount.incrementAndGet();
                    System.out.println(Thread.currentThread().getName() + 
                        " 获取资源: " + resource);
                    return resource;
                }
            }
            semaphore.release();
            return null;
        }
        
        public void releaseResource(String resource) {
            synchronized (resources) {
                resources.add(resource);
                System.out.println(Thread.currentThread().getName() + 
                    " 释放资源: " + resource);
            }
            semaphore.release();
        }
        
        public int getUsageCount() {
            return usageCount.get();
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        // 测试不可变对象
        System.out.println("=== 不可变对象测试 ===");
        List<String> hobbies = Arrays.asList("读书", "游泳", "编程");
        ImmutablePerson person = new ImmutablePerson("张三", 25, hobbies);
        System.out.println(person);
        
        // 测试线程局部变量
        System.out.println("\n=== ThreadLocal测试 ===");
        ExecutorService executor = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 5; i++) {
            executor.submit(() -> {
                String formattedDate = formatDate(new Date());
                System.out.println(Thread.currentThread().getName() + ": " + formattedDate);
            });
        }
        
        // 测试资源管理
        System.out.println("\n=== 安全资源管理测试 ===");
        List<String> resourceList = Arrays.asList("资源1", "资源2", "资源3");
        SafeResourceManager manager = new SafeResourceManager(resourceList);
        
        for (int i = 0; i < 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    String resource = manager.acquireResource();
                    if (resource != null) {
                        Thread.sleep(2000); // 模拟使用资源
                        manager.releaseResource(resource);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }
        
        Thread.sleep(8000);
        System.out.println("总使用次数: " + manager.getUsageCount());
        
        executor.shutdown();
    }
}

11.9 本章小结

本章详细介绍了Java多线程编程的核心概念和实践技巧:

主要内容回顾

  1. 线程基础

    • 线程与进程的区别
    • 线程的生命周期和状态转换
    • 三种创建线程的方式:继承Thread、实现Runnable、使用Callable
  2. 线程同步

    • synchronized关键字的使用(同步方法和同步代码块)
    • volatile关键字保证可见性
    • Lock接口和ReentrantLock的高级功能
    • ReadWriteLock读写分离锁
  3. 线程间通信

    • wait()、notify()和notifyAll()方法
    • Condition接口提供更灵活的等待/通知机制
    • 生产者消费者模式的实现
  4. 并发工具类

    • CountDownLatch:等待多个任务完成
    • CyclicBarrier:多线程协同工作
    • Semaphore:控制资源访问数量
  5. 线程池

    • ExecutorService接口和各种线程池类型
    • ThreadPoolExecutor的详细配置
    • Future和CompletableFuture处理异步结果
  6. 并发集合

    • ConcurrentHashMap:线程安全的哈希表
    • CopyOnWriteArrayList:读多写少场景的列表
    • BlockingQueue:阻塞队列的生产者消费者模式
  7. 原子类

    • AtomicInteger、AtomicLong等基本原子类
    • AtomicReference引用类型的原子操作
    • CAS(Compare-And-Swap)操作原理
  8. 线程安全最佳实践

    • 不可变对象设计
    • ThreadLocal线程局部变量
    • 合理使用同步工具

关键要点

  • 线程安全:多线程环境下保证数据一致性的重要性
  • 性能考虑:同步机制会带来性能开销,需要权衡
  • 死锁预防:避免多个锁的循环等待
  • 资源管理:正确关闭线程池和释放资源
  • 异常处理:多线程环境下的异常传播和处理

实际应用场景

  • Web服务器:处理并发请求
  • 数据处理:并行计算和数据分析
  • I/O操作:异步文件读写和网络通信
  • 缓存系统:多线程访问共享缓存
  • 任务调度:定时任务和后台处理

下一章预告

下一章我们将学习Java网络编程,包括: - Socket编程基础 - TCP和UDP通信 - NIO(非阻塞I/O) - 网络协议处理 - HTTP客户端和服务器开发

练习题

  1. 基础练习:实现一个线程安全的计数器,支持增加、减少和获取当前值操作。

  2. 生产者消费者:使用BlockingQueue实现一个文件处理系统,生产者读取文件列表,消费者处理文件内容。

  3. 线程池应用:创建一个图片处理服务,使用线程池并行处理多张图片的缩放操作。

  4. 并发工具:使用CountDownLatch和CyclicBarrier实现一个多阶段的数据处理流水线。

  5. 综合项目:设计一个简单的Web爬虫,使用多线程并发抓取网页内容,并将结果存储到线程安全的数据结构中。