本章目标
通过本章学习,你将掌握:
- 线程基础概念:理解进程与线程、线程生命周期、线程创建方式
- 线程同步机制:掌握synchronized、Lock、volatile等同步工具
- 并发工具类:学习CountDownLatch、Semaphore、CyclicBarrier等
- 线程池技术:理解线程池原理、配置和最佳实践
- 并发编程模式:掌握生产者消费者、读写锁等常见模式
- 性能优化:学习并发性能调优和问题诊断
1. 线程基础
1.1 进程与线程
// 线程基础概念演示
public class ThreadBasicsDemo {
public static void main(String[] args) {
demonstrateProcessAndThread();
demonstrateThreadCreation();
demonstrateThreadLifecycle();
demonstrateThreadProperties();
}
// 进程与线程概念
public static void demonstrateProcessAndThread() {
System.out.println("=== 进程与线程 ===");
// 获取当前进程信息
ProcessHandle currentProcess = ProcessHandle.current();
System.out.println("当前进程ID: " + currentProcess.pid());
System.out.println("进程启动时间: " + currentProcess.info().startInstant().orElse(null));
System.out.println("进程命令: " + currentProcess.info().command().orElse("未知"));
// 获取当前线程信息
Thread currentThread = Thread.currentThread();
System.out.println("\n当前线程名称: " + currentThread.getName());
System.out.println("当前线程ID: " + currentThread.getId());
System.out.println("当前线程状态: " + currentThread.getState());
System.out.println("当前线程优先级: " + currentThread.getPriority());
System.out.println("是否为守护线程: " + currentThread.isDaemon());
// 获取所有活跃线程
ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
while (rootGroup.getParent() != null) {
rootGroup = rootGroup.getParent();
}
Thread[] threads = new Thread[rootGroup.activeCount()];
int count = rootGroup.enumerate(threads);
System.out.println("\n当前进程中的活跃线程数: " + count);
for (int i = 0; i < count; i++) {
if (threads[i] != null) {
System.out.println(" 线程: " + threads[i].getName() +
" (状态: " + threads[i].getState() + ")");
}
}
}
// 线程创建方式
public static void demonstrateThreadCreation() {
System.out.println("\n=== 线程创建方式 ===");
// 方式1: 继承Thread类
System.out.println("1. 继承Thread类:");
MyThread thread1 = new MyThread("Thread-1");
thread1.start();
// 方式2: 实现Runnable接口
System.out.println("\n2. 实现Runnable接口:");
MyRunnable runnable = new MyRunnable("Runnable-1");
Thread thread2 = new Thread(runnable);
thread2.start();
// 方式3: 使用Lambda表达式
System.out.println("\n3. 使用Lambda表达式:");
Thread thread3 = new Thread(() -> {
for (int i = 0; i < 3; i++) {
System.out.println("Lambda线程执行: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Lambda-Thread");
thread3.start();
// 方式4: 使用Callable和Future
System.out.println("\n4. 使用Callable和Future:");
ExecutorService executor = Executors.newSingleThreadExecutor();
Callable<String> callable = () -> {
Thread.sleep(200);
return "Callable执行结果: " + Thread.currentThread().getName();
};
Future<String> future = executor.submit(callable);
try {
String result = future.get(); // 阻塞等待结果
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
System.out.println("获取结果失败: " + e.getMessage());
}
executor.shutdown();
// 等待所有线程完成
try {
thread1.join();
thread2.join();
thread3.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 线程生命周期
public static void demonstrateThreadLifecycle() {
System.out.println("\n=== 线程生命周期 ===");
LifecycleThread lifecycleThread = new LifecycleThread();
// NEW状态
System.out.println("创建后状态: " + lifecycleThread.getState());
// RUNNABLE状态
lifecycleThread.start();
System.out.println("启动后状态: " + lifecycleThread.getState());
try {
Thread.sleep(100); // 让线程运行一段时间
System.out.println("运行中状态: " + lifecycleThread.getState());
// 等待线程完成
lifecycleThread.join();
System.out.println("完成后状态: " + lifecycleThread.getState());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 线程属性
public static void demonstrateThreadProperties() {
System.out.println("\n=== 线程属性 ===");
Thread thread = new Thread(() -> {
System.out.println("线程属性演示:");
Thread current = Thread.currentThread();
System.out.println(" 名称: " + current.getName());
System.out.println(" ID: " + current.getId());
System.out.println(" 优先级: " + current.getPriority());
System.out.println(" 线程组: " + current.getThreadGroup().getName());
System.out.println(" 是否守护线程: " + current.isDaemon());
System.out.println(" 是否存活: " + current.isAlive());
});
// 设置线程属性
thread.setName("PropertyDemo-Thread");
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(false); // 用户线程
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 继承Thread类的方式
class MyThread extends Thread {
private final String name;
public MyThread(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println(name + " 执行: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// 实现Runnable接口的方式
class MyRunnable implements Runnable {
private final String name;
public MyRunnable(String name) {
this.name = name;
}
@Override
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println(name + " 执行: " + i);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
}
// 生命周期演示线程
class LifecycleThread extends Thread {
@Override
public void run() {
try {
System.out.println("线程开始执行");
// RUNNABLE状态
for (int i = 0; i < 3; i++) {
System.out.println("执行任务: " + i);
Thread.sleep(50); // TIMED_WAITING状态
}
System.out.println("线程执行完成");
} catch (InterruptedException e) {
System.out.println("线程被中断");
Thread.currentThread().interrupt();
}
}
}
1.2 线程中断机制
// 线程中断机制演示
public class ThreadInterruptDemo {
public static void main(String[] args) {
demonstrateInterruptBasics();
demonstrateInterruptInSleep();
demonstrateInterruptInLoop();
demonstrateInterruptWithFlag();
}
// 中断基础
public static void demonstrateInterruptBasics() {
System.out.println("=== 线程中断基础 ===");
Thread worker = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("工作线程正在运行...");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
System.out.println("工作线程被中断");
// 重新设置中断状态
Thread.currentThread().interrupt();
break;
}
}
System.out.println("工作线程结束");
}, "Worker-Thread");
worker.start();
try {
Thread.sleep(2000); // 让工作线程运行2秒
System.out.println("主线程发送中断信号");
worker.interrupt(); // 发送中断信号
worker.join(); // 等待工作线程结束
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 在sleep中的中断
public static void demonstrateInterruptInSleep() {
System.out.println("\n=== 在sleep中的中断 ===");
Thread sleepingThread = new Thread(() -> {
try {
System.out.println("线程开始睡眠10秒...");
Thread.sleep(10000); // 睡眠10秒
System.out.println("线程睡眠结束");
} catch (InterruptedException e) {
System.out.println("线程在睡眠中被中断");
System.out.println("中断状态: " + Thread.currentThread().isInterrupted());
}
}, "Sleeping-Thread");
sleepingThread.start();
try {
Thread.sleep(1000); // 等待1秒
System.out.println("主线程发送中断信号");
sleepingThread.interrupt();
sleepingThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 在循环中的中断
public static void demonstrateInterruptInLoop() {
System.out.println("\n=== 在循环中的中断 ===");
Thread loopingThread = new Thread(() -> {
int count = 0;
while (!Thread.currentThread().isInterrupted()) {
count++;
if (count % 1000000 == 0) {
System.out.println("循环计数: " + count);
}
// 检查中断状态
if (Thread.interrupted()) {
System.out.println("检测到中断,退出循环");
break;
}
}
System.out.println("循环线程结束,最终计数: " + count);
}, "Looping-Thread");
loopingThread.start();
try {
Thread.sleep(100); // 让循环运行一段时间
System.out.println("主线程发送中断信号");
loopingThread.interrupt();
loopingThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 使用标志位的中断
public static void demonstrateInterruptWithFlag() {
System.out.println("\n=== 使用标志位的中断 ===");
InterruptibleTask task = new InterruptibleTask();
Thread taskThread = new Thread(task, "Task-Thread");
taskThread.start();
try {
Thread.sleep(2000);
System.out.println("主线程请求停止任务");
task.stop(); // 使用自定义停止方法
taskThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 可中断的任务
class InterruptibleTask implements Runnable {
private volatile boolean running = true;
@Override
public void run() {
int count = 0;
while (running && !Thread.currentThread().isInterrupted()) {
count++;
// 模拟工作
if (count % 100000 == 0) {
System.out.println("任务执行中,计数: " + count);
}
// 定期检查中断状态
if (count % 1000000 == 0) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("检测到线程中断信号");
break;
}
}
}
System.out.println("任务结束,最终计数: " + count);
System.out.println("结束原因: " + (running ? "线程中断" : "标志位停止"));
}
public void stop() {
running = false;
}
public boolean isRunning() {
return running;
}
}
1.3 线程通信
// 线程通信演示
public class ThreadCommunicationDemo {
public static void main(String[] args) {
demonstrateWaitNotify();
demonstrateJoin();
demonstrateYield();
demonstrateThreadLocal();
}
// wait/notify机制
public static void demonstrateWaitNotify() {
System.out.println("=== wait/notify机制 ===");
SharedResource resource = new SharedResource();
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
String data = resource.consume();
System.out.println("消费者获取: " + data);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
String data = "数据-" + i;
resource.produce(data);
System.out.println("生产者生产: " + data);
Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
consumer.start();
producer.start();
try {
consumer.join();
producer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// join方法
public static void demonstrateJoin() {
System.out.println("\n=== join方法 ===");
Thread task1 = new Thread(() -> {
try {
System.out.println("任务1开始执行");
Thread.sleep(2000);
System.out.println("任务1执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Task-1");
Thread task2 = new Thread(() -> {
try {
System.out.println("任务2开始执行");
Thread.sleep(1500);
System.out.println("任务2执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Task-2");
long startTime = System.currentTimeMillis();
task1.start();
task2.start();
try {
System.out.println("主线程等待所有任务完成...");
task1.join(); // 等待task1完成
task2.join(); // 等待task2完成
long endTime = System.currentTimeMillis();
System.out.println("所有任务完成,总耗时: " + (endTime - startTime) + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// yield方法
public static void demonstrateYield() {
System.out.println("\n=== yield方法 ===");
Thread highPriorityThread = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("高优先级线程: " + i);
Thread.yield(); // 让出CPU时间片
}
}, "High-Priority");
Thread lowPriorityThread = new Thread(() -> {
for (int i = 0; i < 5; i++) {
System.out.println("低优先级线程: " + i);
}
}, "Low-Priority");
highPriorityThread.setPriority(Thread.MAX_PRIORITY);
lowPriorityThread.setPriority(Thread.MIN_PRIORITY);
highPriorityThread.start();
lowPriorityThread.start();
try {
highPriorityThread.join();
lowPriorityThread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// ThreadLocal
public static void demonstrateThreadLocal() {
System.out.println("\n=== ThreadLocal ===");
ThreadLocalDemo demo = new ThreadLocalDemo();
// 创建多个线程,每个线程都有自己的ThreadLocal值
for (int i = 0; i < 3; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
demo.setThreadLocalValue("Thread-" + threadId + "-Value");
demo.printThreadLocalValue();
// 模拟一些工作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
demo.printThreadLocalValue(); // 再次打印,验证值没有被其他线程影响
}, "Thread-" + i);
thread.start();
}
try {
Thread.sleep(1000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 共享资源类
class SharedResource {
private String data;
private boolean hasData = false;
public synchronized void produce(String data) throws InterruptedException {
while (hasData) {
wait(); // 等待消费者消费
}
this.data = data;
this.hasData = true;
notifyAll(); // 通知消费者
}
public synchronized String consume() throws InterruptedException {
while (!hasData) {
wait(); // 等待生产者生产
}
String result = this.data;
this.hasData = false;
notifyAll(); // 通知生产者
return result;
}
}
// ThreadLocal演示类
class ThreadLocalDemo {
private static final ThreadLocal<String> threadLocal = new ThreadLocal<>();
public void setThreadLocalValue(String value) {
threadLocal.set(value);
System.out.println(Thread.currentThread().getName() + " 设置值: " + value);
}
public void printThreadLocalValue() {
String value = threadLocal.get();
System.out.println(Thread.currentThread().getName() + " 获取值: " + value);
}
public void removeThreadLocalValue() {
threadLocal.remove();
System.out.println(Thread.currentThread().getName() + " 移除值");
}
}
2. 线程同步
2.1 synchronized关键字
// synchronized同步机制演示
public class SynchronizedDemo {
public static void main(String[] args) {
demonstrateSynchronizedMethod();
demonstrateSynchronizedBlock();
demonstrateStaticSynchronized();
demonstrateDeadlock();
}
// 同步方法
public static void demonstrateSynchronizedMethod() {
System.out.println("=== 同步方法 ===");
Counter counter = new Counter();
// 创建多个线程同时访问计数器
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
}, "Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("最终计数值: " + counter.getCount());
System.out.println("预期值: " + (5 * 1000));
}
// 同步代码块
public static void demonstrateSynchronizedBlock() {
System.out.println("\n=== 同步代码块 ===");
BankAccount account = new BankAccount(1000);
// 创建多个线程进行转账操作
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
account.withdraw(1);
account.deposit(1);
}
}, "Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("最终余额: " + account.getBalance());
System.out.println("预期余额: 1000");
}
// 静态同步
public static void demonstrateStaticSynchronized() {
System.out.println("\n=== 静态同步 ===");
// 创建多个线程访问静态方法
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
StaticCounter.increment();
StaticCounter.printCount();
}, "Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("最终静态计数: " + StaticCounter.getCount());
}
// 死锁演示
public static void demonstrateDeadlock() {
System.out.println("\n=== 死锁演示 ===");
Object lock1 = new Object();
Object lock2 = new Object();
Thread thread1 = new Thread(() -> {
synchronized (lock1) {
System.out.println("线程1获得lock1");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("线程1等待lock2...");
synchronized (lock2) {
System.out.println("线程1获得lock2");
}
}
}, "DeadLock-Thread-1");
Thread thread2 = new Thread(() -> {
synchronized (lock2) {
System.out.println("线程2获得lock2");
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("线程2等待lock1...");
synchronized (lock1) {
System.out.println("线程2获得lock1");
}
}
}, "DeadLock-Thread-2");
thread1.start();
thread2.start();
try {
// 等待3秒,如果线程还没结束,说明发生了死锁
thread1.join(3000);
thread2.join(3000);
if (thread1.isAlive() || thread2.isAlive()) {
System.out.println("检测到死锁!强制中断线程...");
thread1.interrupt();
thread2.interrupt();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 计数器类
class Counter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
public synchronized void decrement() {
count--;
}
public synchronized int getCount() {
return count;
}
}
// 银行账户类
class BankAccount {
private double balance;
private final Object lock = new Object();
public BankAccount(double initialBalance) {
this.balance = initialBalance;
}
public void deposit(double amount) {
synchronized (lock) {
double oldBalance = balance;
balance += amount;
System.out.println(Thread.currentThread().getName() +
" 存款: " + amount + ", 余额: " + oldBalance + " -> " + balance);
}
}
public boolean withdraw(double amount) {
synchronized (lock) {
if (balance >= amount) {
double oldBalance = balance;
balance -= amount;
System.out.println(Thread.currentThread().getName() +
" 取款: " + amount + ", 余额: " + oldBalance + " -> " + balance);
return true;
} else {
System.out.println(Thread.currentThread().getName() +
" 取款失败: 余额不足 (当前余额: " + balance + ", 尝试取款: " + amount + ")");
return false;
}
}
}
public double getBalance() {
synchronized (lock) {
return balance;
}
}
}
// 静态计数器类
class StaticCounter {
private static int count = 0;
// 静态同步方法
public static synchronized void increment() {
count++;
System.out.println(Thread.currentThread().getName() + " 增加计数,当前值: " + count);
}
public static synchronized int getCount() {
return count;
}
public static synchronized void printCount() {
System.out.println(Thread.currentThread().getName() + " 读取计数: " + count);
}
}
2.2 Lock接口
// Lock接口演示
public class LockDemo {
public static void main(String[] args) {
demonstrateReentrantLock();
demonstrateReadWriteLock();
demonstrateTryLock();
demonstrateCondition();
}
// ReentrantLock演示
public static void demonstrateReentrantLock() {
System.out.println("=== ReentrantLock演示 ===");
LockCounter counter = new LockCounter();
// 创建多个线程同时访问计数器
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
}, "Thread-" + i);
}
long startTime = System.currentTimeMillis();
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
long endTime = System.currentTimeMillis();
System.out.println("最终计数值: " + counter.getCount());
System.out.println("预期值: " + (5 * 1000));
System.out.println("执行时间: " + (endTime - startTime) + "ms");
}
// ReadWriteLock演示
public static void demonstrateReadWriteLock() {
System.out.println("\n=== ReadWriteLock演示 ===");
ReadWriteCache cache = new ReadWriteCache();
// 创建读线程
Thread[] readers = new Thread[5];
for (int i = 0; i < readers.length; i++) {
final int readerId = i;
readers[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
String value = cache.get("key" + (j % 3));
System.out.println("读线程" + readerId + " 读取: key" + (j % 3) + " = " + value);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Reader-" + i);
}
// 创建写线程
Thread[] writers = new Thread[2];
for (int i = 0; i < writers.length; i++) {
final int writerId = i;
writers[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
String key = "key" + j;
String value = "value" + writerId + "-" + j;
cache.put(key, value);
System.out.println("写线程" + writerId + " 写入: " + key + " = " + value);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Writer-" + i);
}
// 启动所有线程
for (Thread reader : readers) {
reader.start();
}
for (Thread writer : writers) {
writer.start();
}
// 等待所有线程完成
for (Thread reader : readers) {
try {
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
for (Thread writer : writers) {
try {
writer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// tryLock演示
public static void demonstrateTryLock() {
System.out.println("\n=== tryLock演示 ===");
ReentrantLock lock = new ReentrantLock();
// 创建多个线程尝试获取锁
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
try {
// 尝试获取锁,最多等待1秒
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
System.out.println("线程" + threadId + " 获得锁,开始工作");
Thread.sleep(2000); // 模拟工作
System.out.println("线程" + threadId + " 工作完成");
} finally {
lock.unlock();
System.out.println("线程" + threadId + " 释放锁");
}
} else {
System.out.println("线程" + threadId + " 获取锁超时");
}
} catch (InterruptedException e) {
System.out.println("线程" + threadId + " 被中断");
Thread.currentThread().interrupt();
}
}, "TryLock-Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// Condition演示
public static void demonstrateCondition() {
System.out.println("\n=== Condition演示 ===");
BoundedBuffer buffer = new BoundedBuffer(3);
// 生产者线程
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String item = "Item-" + i;
buffer.put(item);
System.out.println("生产者生产: " + item);
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
// 消费者线程
Thread consumer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String item = buffer.take();
System.out.println("消费者消费: " + item);
Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 使用Lock的计数器
class LockCounter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
// 读写锁缓存
class ReadWriteCache {
private final Map<String, String> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public String get(String key) {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取读锁");
Thread.sleep(100); // 模拟读取时间
return cache.get(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
System.out.println(Thread.currentThread().getName() + " 释放读锁");
readLock.unlock();
}
}
public void put(String key, String value) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 获取写锁");
Thread.sleep(200); // 模拟写入时间
cache.put(key, value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println(Thread.currentThread().getName() + " 释放写锁");
writeLock.unlock();
}
}
}
// 有界缓冲区
class BoundedBuffer {
private final String[] buffer;
private int count = 0;
private int putIndex = 0;
private int takeIndex = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int capacity) {
this.buffer = new String[capacity];
}
public void put(String item) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) {
System.out.println("缓冲区已满,生产者等待...");
notFull.await();
}
buffer[putIndex] = item;
putIndex = (putIndex + 1) % buffer.length;
count++;
System.out.println("缓冲区大小: " + count + "/" + buffer.length);
notEmpty.signal();
} finally {
lock.unlock();
}
}
public String take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
System.out.println("缓冲区为空,消费者等待...");
notEmpty.await();
}
String item = buffer[takeIndex];
buffer[takeIndex] = null;
takeIndex = (takeIndex + 1) % buffer.length;
count--;
System.out.println("缓冲区大小: " + count + "/" + buffer.length);
notFull.signal();
return item;
} finally {
lock.unlock();
}
}
}
2.3 volatile关键字
// volatile关键字演示
public class VolatileDemo {
public static void main(String[] args) {
demonstrateVisibility();
demonstrateAtomicity();
demonstrateOrdering();
demonstrateDoubleCheckedLocking();
}
// 可见性演示
public static void demonstrateVisibility() {
System.out.println("=== volatile可见性演示 ===");
VisibilityTest test = new VisibilityTest();
// 读线程
Thread reader = new Thread(() -> {
while (!test.isFlag()) {
// 忙等待
}
System.out.println("读线程检测到flag变化: " + test.isFlag());
}, "Reader");
// 写线程
Thread writer = new Thread(() -> {
try {
Thread.sleep(1000);
test.setFlag(true);
System.out.println("写线程设置flag为true");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Writer");
reader.start();
writer.start();
try {
reader.join(3000); // 最多等待3秒
writer.join();
if (reader.isAlive()) {
System.out.println("读线程仍在运行,可能存在可见性问题");
reader.interrupt();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 原子性演示
public static void demonstrateAtomicity() {
System.out.println("\n=== volatile原子性演示 ===");
AtomicityTest test = new AtomicityTest();
// 创建多个线程同时增加计数
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
test.increment();
}
}, "Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("最终计数值: " + test.getCount());
System.out.println("预期值: " + (10 * 1000));
System.out.println("说明: volatile不能保证原子性,需要使用synchronized或AtomicInteger");
}
// 有序性演示
public static void demonstrateOrdering() {
System.out.println("\n=== volatile有序性演示 ===");
OrderingTest test = new OrderingTest();
for (int i = 0; i < 10; i++) {
final int iteration = i;
Thread writer = new Thread(() -> {
test.writer();
}, "Writer-" + iteration);
Thread reader = new Thread(() -> {
test.reader();
}, "Reader-" + iteration);
writer.start();
reader.start();
try {
writer.join();
reader.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 双重检查锁定演示
public static void demonstrateDoubleCheckedLocking() {
System.out.println("\n=== 双重检查锁定演示 ===");
// 创建多个线程同时获取单例
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
Singleton instance = Singleton.getInstance();
System.out.println(Thread.currentThread().getName() +
" 获取单例: " + instance.hashCode());
}, "Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
// 可见性测试类
class VisibilityTest {
private volatile boolean flag = false; // 使用volatile确保可见性
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
}
// 原子性测试类
class AtomicityTest {
private volatile int count = 0; // volatile不能保证原子性
public void increment() {
count++; // 这不是原子操作
}
public int getCount() {
return count;
}
}
// 有序性测试类
class OrderingTest {
private int a = 0;
private volatile boolean flag = false;
public void writer() {
a = 1; // 1
flag = true; // 2
}
public void reader() {
if (flag) { // 3
int i = a; // 4
System.out.println(Thread.currentThread().getName() +
" 读取到 a = " + i);
}
}
}
// 双重检查锁定单例
class Singleton {
private static volatile Singleton instance; // volatile确保正确的双重检查锁定
private Singleton() {
// 模拟初始化工作
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("单例初始化完成: " + Thread.currentThread().getName());
}
public static Singleton getInstance() {
if (instance == null) { // 第一次检查
synchronized (Singleton.class) {
if (instance == null) { // 第二次检查
instance = new Singleton();
}
}
}
return instance;
}
}
3. 并发工具类
3.1 CountDownLatch
// CountDownLatch演示
public class CountDownLatchDemo {
public static void main(String[] args) {
demonstrateBasicUsage();
demonstrateRaceStart();
demonstrateServiceStartup();
}
// 基本用法
public static void demonstrateBasicUsage() {
System.out.println("=== CountDownLatch基本用法 ===");
int workerCount = 5;
CountDownLatch latch = new CountDownLatch(workerCount);
// 创建工作线程
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
Thread worker = new Thread(() -> {
try {
System.out.println("工作线程" + workerId + " 开始工作");
// 模拟工作
Thread.sleep((workerId + 1) * 1000);
System.out.println("工作线程" + workerId + " 完成工作");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 完成工作,计数减1
}
}, "Worker-" + i);
worker.start();
}
try {
System.out.println("主线程等待所有工作线程完成...");
latch.await(); // 等待计数归零
System.out.println("所有工作线程已完成,主线程继续执行");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 比赛开始场景
public static void demonstrateRaceStart() {
System.out.println("\n=== 比赛开始场景 ===");
int runnerCount = 3;
CountDownLatch startSignal = new CountDownLatch(1); // 开始信号
CountDownLatch doneSignal = new CountDownLatch(runnerCount); // 完成信号
// 创建跑步者
for (int i = 0; i < runnerCount; i++) {
final int runnerId = i;
Thread runner = new Thread(() -> {
try {
System.out.println("跑步者" + runnerId + " 准备就绪");
startSignal.await(); // 等待开始信号
System.out.println("跑步者" + runnerId + " 开始跑步!");
// 模拟跑步时间
int runTime = (int) (Math.random() * 3000) + 1000;
Thread.sleep(runTime);
System.out.println("跑步者" + runnerId + " 完成比赛!用时: " + runTime + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneSignal.countDown(); // 完成比赛
}
}, "Runner-" + i);
runner.start();
}
try {
Thread.sleep(2000); // 等待所有跑步者准备
System.out.println("\n裁判: 3... 2... 1... 开始!");
startSignal.countDown(); // 发出开始信号
doneSignal.await(); // 等待所有跑步者完成
System.out.println("\n裁判: 比赛结束!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 服务启动场景
public static void demonstrateServiceStartup() {
System.out.println("\n=== 服务启动场景 ===");
String[] services = {"数据库服务", "缓存服务", "消息队列服务", "Web服务"};
CountDownLatch latch = new CountDownLatch(services.length);
// 启动各个服务
for (int i = 0; i < services.length; i++) {
final String serviceName = services[i];
final int startupTime = (i + 1) * 1000; // 不同服务启动时间不同
Thread serviceThread = new Thread(() -> {
try {
System.out.println(serviceName + " 开始启动...");
Thread.sleep(startupTime); // 模拟启动时间
System.out.println(serviceName + " 启动完成");
} catch (InterruptedException e) {
System.out.println(serviceName + " 启动被中断");
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}, serviceName + "-Thread");
serviceThread.start();
}
try {
System.out.println("等待所有服务启动完成...");
boolean allStarted = latch.await(10, TimeUnit.SECONDS); // 最多等待10秒
if (allStarted) {
System.out.println("\n所有服务启动完成,应用程序可以开始接收请求");
} else {
System.out.println("\n服务启动超时,当前剩余未启动服务数: " + latch.getCount());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
3.2 Semaphore
// Semaphore演示
public class SemaphoreDemo {
public static void main(String[] args) {
demonstrateBasicUsage();
demonstrateConnectionPool();
demonstrateParkingLot();
}
// 基本用法
public static void demonstrateBasicUsage() {
System.out.println("=== Semaphore基本用法 ===");
// 创建一个许可数为3的信号量
Semaphore semaphore = new Semaphore(3);
// 创建5个线程竞争3个许可
for (int i = 0; i < 5; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
try {
System.out.println("线程" + threadId + " 尝试获取许可...");
semaphore.acquire(); // 获取许可
System.out.println("线程" + threadId + " 获得许可,开始工作");
// 模拟工作
Thread.sleep(2000);
System.out.println("线程" + threadId + " 工作完成,释放许可");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
semaphore.release(); // 释放许可
}
}, "Thread-" + i);
thread.start();
}
try {
Thread.sleep(10000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 连接池场景
public static void demonstrateConnectionPool() {
System.out.println("\n=== 连接池场景 ===");
DatabaseConnectionPool pool = new DatabaseConnectionPool(3);
// 创建多个客户端同时请求连接
for (int i = 0; i < 8; i++) {
final int clientId = i;
Thread client = new Thread(() -> {
try {
Connection conn = pool.getConnection();
System.out.println("客户端" + clientId + " 获得连接: " + conn.getId());
// 模拟使用连接
Thread.sleep((int) (Math.random() * 3000) + 1000);
pool.releaseConnection(conn);
System.out.println("客户端" + clientId + " 释放连接: " + conn.getId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Client-" + i);
client.start();
}
try {
Thread.sleep(15000); // 等待所有客户端完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 停车场场景
public static void demonstrateParkingLot() {
System.out.println("\n=== 停车场场景 ===");
ParkingLot parkingLot = new ParkingLot(4);
// 创建多辆车尝试停车
for (int i = 0; i < 10; i++) {
final int carId = i;
Thread car = new Thread(() -> {
try {
boolean parked = parkingLot.park(carId);
if (parked) {
// 模拟停车时间
int parkingTime = (int) (Math.random() * 4000) + 1000;
Thread.sleep(parkingTime);
parkingLot.leave(carId);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Car-" + i);
car.start();
// 间隔一段时间再来下一辆车
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
Thread.sleep(20000); // 等待所有车辆完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 数据库连接池
class DatabaseConnectionPool {
private final Semaphore semaphore;
private final Queue<Connection> connections;
public DatabaseConnectionPool(int poolSize) {
this.semaphore = new Semaphore(poolSize);
this.connections = new ConcurrentLinkedQueue<>();
// 初始化连接池
for (int i = 0; i < poolSize; i++) {
connections.offer(new Connection(i));
}
}
public Connection getConnection() throws InterruptedException {
semaphore.acquire(); // 获取许可
return connections.poll(); // 获取连接
}
public void releaseConnection(Connection connection) {
connections.offer(connection); // 归还连接
semaphore.release(); // 释放许可
}
// 模拟数据库连接
static class Connection {
private final int id;
public Connection(int id) {
this.id = id;
}
public int getId() {
return id;
}
}
}
// 停车场
class ParkingLot {
private final Semaphore semaphore;
private final int capacity;
public ParkingLot(int capacity) {
this.capacity = capacity;
this.semaphore = new Semaphore(capacity);
}
public boolean park(int carId) {
try {
System.out.println("车辆" + carId + " 尝试进入停车场...");
// 尝试获取停车位,最多等待2秒
if (semaphore.tryAcquire(2, TimeUnit.SECONDS)) {
System.out.println("车辆" + carId + " 成功停车 (剩余车位: " +
semaphore.availablePermits() + "/" + capacity + ")");
return true;
} else {
System.out.println("车辆" + carId + " 停车场已满,离开");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void leave(int carId) {
semaphore.release();
System.out.println("车辆" + carId + " 离开停车场 (剩余车位: " +
semaphore.availablePermits() + "/" + capacity + ")");
}
}
3.3 CyclicBarrier
// CyclicBarrier演示
public class CyclicBarrierDemo {
public static void main(String[] args) {
demonstrateBasicUsage();
demonstrateMultiPhase();
demonstrateTeamWork();
}
// 基本用法
public static void demonstrateBasicUsage() {
System.out.println("=== CyclicBarrier基本用法 ===");
int parties = 4;
CyclicBarrier barrier = new CyclicBarrier(parties, () -> {
System.out.println("\n所有线程都到达屏障点,执行屏障动作!\n");
});
// 创建多个线程
for (int i = 0; i < parties; i++) {
final int threadId = i;
Thread thread = new Thread(() -> {
try {
System.out.println("线程" + threadId + " 开始工作");
// 模拟不同的工作时间
Thread.sleep((threadId + 1) * 1000);
System.out.println("线程" + threadId + " 完成工作,等待其他线程");
barrier.await(); // 等待所有线程到达屏障点
System.out.println("线程" + threadId + " 继续执行后续工作");
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println("线程" + threadId + " 被中断或屏障被破坏");
Thread.currentThread().interrupt();
}
}, "Thread-" + i);
thread.start();
}
try {
Thread.sleep(8000); // 等待所有线程完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 多阶段任务
public static void demonstrateMultiPhase() {
System.out.println("\n=== 多阶段任务演示 ===");
int workerCount = 3;
CyclicBarrier barrier = new CyclicBarrier(workerCount);
// 创建工作线程
for (int i = 0; i < workerCount; i++) {
final int workerId = i;
Thread worker = new Thread(() -> {
try {
// 第一阶段
System.out.println("工作者" + workerId + " 开始第一阶段工作");
Thread.sleep((workerId + 1) * 500);
System.out.println("工作者" + workerId + " 完成第一阶段");
barrier.await(); // 等待所有工作者完成第一阶段
// 第二阶段
System.out.println("工作者" + workerId + " 开始第二阶段工作");
Thread.sleep((3 - workerId) * 500);
System.out.println("工作者" + workerId + " 完成第二阶段");
barrier.await(); // 等待所有工作者完成第二阶段
// 第三阶段
System.out.println("工作者" + workerId + " 开始第三阶段工作");
Thread.sleep(1000);
System.out.println("工作者" + workerId + " 完成第三阶段");
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println("工作者" + workerId + " 被中断或屏障被破坏");
Thread.currentThread().interrupt();
}
}, "Worker-" + i);
worker.start();
}
try {
Thread.sleep(10000); // 等待所有工作者完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 团队协作场景
public static void demonstrateTeamWork() {
System.out.println("\n=== 团队协作场景 ===");
TeamProject project = new TeamProject();
project.start();
}
}
// 团队项目类
class TeamProject {
private final int teamSize = 4;
private final CyclicBarrier designBarrier;
private final CyclicBarrier developBarrier;
private final CyclicBarrier testBarrier;
public TeamProject() {
this.designBarrier = new CyclicBarrier(teamSize, () -> {
System.out.println("\n=== 设计阶段完成,开始开发阶段 ===\n");
});
this.developBarrier = new CyclicBarrier(teamSize, () -> {
System.out.println("\n=== 开发阶段完成,开始测试阶段 ===\n");
});
this.testBarrier = new CyclicBarrier(teamSize, () -> {
System.out.println("\n=== 测试阶段完成,项目发布! ===\n");
});
}
public void start() {
String[] roles = {"前端工程师", "后端工程师", "数据库工程师", "测试工程师"};
for (int i = 0; i < teamSize; i++) {
final String role = roles[i];
final int memberId = i;
Thread member = new Thread(() -> {
try {
// 设计阶段
System.out.println(role + " 开始设计工作");
Thread.sleep((memberId + 1) * 800);
System.out.println(role + " 完成设计工作");
designBarrier.await();
// 开发阶段
System.out.println(role + " 开始开发工作");
Thread.sleep((4 - memberId) * 600);
System.out.println(role + " 完成开发工作");
developBarrier.await();
// 测试阶段
System.out.println(role + " 开始测试工作");
Thread.sleep((memberId % 2 + 1) * 700);
System.out.println(role + " 完成测试工作");
testBarrier.await();
System.out.println(role + " 项目完成!");
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println(role + " 工作被中断");
Thread.currentThread().interrupt();
}
}, role);
member.start();
}
}
}
3.4 Exchanger
// Exchanger演示
public class ExchangerDemo {
public static void main(String[] args) {
demonstrateBasicUsage();
demonstrateDataExchange();
demonstrateProducerConsumer();
}
// 基本用法
public static void demonstrateBasicUsage() {
System.out.println("=== Exchanger基本用法 ===");
Exchanger<String> exchanger = new Exchanger<>();
// 线程A
Thread threadA = new Thread(() -> {
try {
String dataA = "来自线程A的数据";
System.out.println("线程A准备交换数据: " + dataA);
String receivedData = exchanger.exchange(dataA);
System.out.println("线程A收到数据: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "ThreadA");
// 线程B
Thread threadB = new Thread(() -> {
try {
Thread.sleep(1000); // 模拟准备时间
String dataB = "来自线程B的数据";
System.out.println("线程B准备交换数据: " + dataB);
String receivedData = exchanger.exchange(dataB);
System.out.println("线程B收到数据: " + receivedData);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "ThreadB");
threadA.start();
threadB.start();
try {
threadA.join();
threadB.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 数据交换场景
public static void demonstrateDataExchange() {
System.out.println("\n=== 数据交换场景 ===");
DataExchangeDemo demo = new DataExchangeDemo();
demo.start();
}
// 生产者消费者场景
public static void demonstrateProducerConsumer() {
System.out.println("\n=== 生产者消费者场景 ===");
ProducerConsumerExchange demo = new ProducerConsumerExchange();
demo.start();
}
}
// 数据交换演示
class DataExchangeDemo {
private final Exchanger<List<String>> exchanger = new Exchanger<>();
public void start() {
// 数据处理器A
Thread processorA = new Thread(() -> {
List<String> dataA = new ArrayList<>();
try {
for (int i = 0; i < 3; i++) {
// 生成数据
dataA.clear();
for (int j = 0; j < 5; j++) {
dataA.add("A-" + i + "-" + j);
}
System.out.println("处理器A生成数据: " + dataA);
// 交换数据
List<String> receivedData = exchanger.exchange(dataA);
System.out.println("处理器A收到数据: " + receivedData);
// 处理收到的数据
processData("A", receivedData);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "ProcessorA");
// 数据处理器B
Thread processorB = new Thread(() -> {
List<String> dataB = new ArrayList<>();
try {
for (int i = 0; i < 3; i++) {
// 生成数据
dataB.clear();
for (int j = 0; j < 5; j++) {
dataB.add("B-" + i + "-" + j);
}
System.out.println("处理器B生成数据: " + dataB);
// 交换数据
List<String> receivedData = exchanger.exchange(dataB);
System.out.println("处理器B收到数据: " + receivedData);
// 处理收到的数据
processData("B", receivedData);
Thread.sleep(1500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "ProcessorB");
processorA.start();
processorB.start();
try {
processorA.join();
processorB.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void processData(String processor, List<String> data) {
System.out.println("处理器" + processor + " 正在处理数据: " + data.size() + " 项");
}
}
// 生产者消费者交换
class ProducerConsumerExchange {
private final Exchanger<List<String>> exchanger = new Exchanger<>();
public void start() {
// 生产者
Thread producer = new Thread(() -> {
List<String> fullBuffer = new ArrayList<>();
List<String> emptyBuffer = new ArrayList<>();
try {
for (int round = 0; round < 5; round++) {
// 生产数据到满缓冲区
fullBuffer.clear();
for (int i = 0; i < 3; i++) {
String item = "Item-" + round + "-" + i;
fullBuffer.add(item);
System.out.println("生产者生产: " + item);
Thread.sleep(200);
}
System.out.println("生产者准备交换缓冲区...");
// 交换缓冲区
emptyBuffer = exchanger.exchange(fullBuffer);
System.out.println("生产者获得空缓冲区,继续生产");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
// 消费者
Thread consumer = new Thread(() -> {
List<String> emptyBuffer = new ArrayList<>();
List<String> fullBuffer;
try {
for (int round = 0; round < 5; round++) {
System.out.println("消费者等待满缓冲区...");
// 交换缓冲区
fullBuffer = exchanger.exchange(emptyBuffer);
System.out.println("消费者获得满缓冲区,开始消费");
// 消费数据
for (String item : fullBuffer) {
System.out.println("消费者消费: " + item);
Thread.sleep(300);
}
// 清空缓冲区
emptyBuffer = fullBuffer;
emptyBuffer.clear();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
4. 原子类
4.1 基本原子类
// 原子类演示
public class AtomicDemo {
public static void main(String[] args) {
demonstrateAtomicInteger();
demonstrateAtomicLong();
demonstrateAtomicBoolean();
demonstrateAtomicReference();
}
// AtomicInteger演示
public static void demonstrateAtomicInteger() {
System.out.println("=== AtomicInteger演示 ===");
AtomicInteger counter = new AtomicInteger(0);
int threadCount = 10;
int incrementsPerThread = 1000;
Thread[] threads = new Thread[threadCount];
long startTime = System.currentTimeMillis();
// 创建线程进行并发增加
for (int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
counter.incrementAndGet();
}
}, "Thread-" + i);
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
long endTime = System.currentTimeMillis();
System.out.println("最终计数值: " + counter.get());
System.out.println("预期值: " + (threadCount * incrementsPerThread));
System.out.println("执行时间: " + (endTime - startTime) + "ms");
// 演示其他操作
System.out.println("\n其他AtomicInteger操作:");
System.out.println("当前值: " + counter.get());
System.out.println("增加5: " + counter.addAndGet(5));
System.out.println("减少3: " + counter.addAndGet(-3));
System.out.println("比较并设置(期望10002,新值20000): " +
counter.compareAndSet(10002, 20000));
System.out.println("当前值: " + counter.get());
System.out.println("获取并设置为0: " + counter.getAndSet(0));
System.out.println("当前值: " + counter.get());
}
// AtomicLong演示
public static void demonstrateAtomicLong() {
System.out.println("\n=== AtomicLong演示 ===");
AtomicLong accumulator = new AtomicLong(0);
// 模拟多线程累加大数
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final long baseValue = (i + 1) * 1000000L;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
accumulator.addAndGet(baseValue + j);
}
}, "AccumulatorThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("累加结果: " + accumulator.get());
}
// AtomicBoolean演示
public static void demonstrateAtomicBoolean() {
System.out.println("\n=== AtomicBoolean演示 ===");
AtomicBoolean flag = new AtomicBoolean(false);
AtomicInteger successCount = new AtomicInteger(0);
// 多个线程尝试设置标志
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
try {
Thread.sleep((int) (Math.random() * 100));
// 尝试将false设置为true
if (flag.compareAndSet(false, true)) {
System.out.println("线程" + threadId + " 成功设置标志为true");
successCount.incrementAndGet();
Thread.sleep(100); // 模拟工作
flag.set(false); // 重置标志
System.out.println("线程" + threadId + " 重置标志为false");
} else {
System.out.println("线程" + threadId + " 设置标志失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "FlagThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("成功设置标志的线程数: " + successCount.get());
}
// AtomicReference演示
public static void demonstrateAtomicReference() {
System.out.println("\n=== AtomicReference演示 ===");
AtomicReference<User> userRef = new AtomicReference<>(new User("张三", 25));
// 多个线程尝试更新用户信息
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
try {
Thread.sleep((int) (Math.random() * 100));
User currentUser = userRef.get();
User newUser = new User(currentUser.getName(), currentUser.getAge() + 1);
if (userRef.compareAndSet(currentUser, newUser)) {
System.out.println("线程" + threadId + " 成功更新用户年龄: " +
currentUser.getAge() + " -> " + newUser.getAge());
} else {
System.out.println("线程" + threadId + " 更新用户信息失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "UserUpdateThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
User finalUser = userRef.get();
System.out.println("最终用户信息: " + finalUser.getName() + ", 年龄: " + finalUser.getAge());
}
}
// 用户类
class User {
private final String name;
private final int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
@Override
public String toString() {
return "User{name='" + name + "', age=" + age + "}";
}
}
4.2 数组原子类
// 数组原子类演示
public class AtomicArrayDemo {
public static void main(String[] args) {
demonstrateAtomicIntegerArray();
demonstrateAtomicLongArray();
demonstrateAtomicReferenceArray();
}
// AtomicIntegerArray演示
public static void demonstrateAtomicIntegerArray() {
System.out.println("=== AtomicIntegerArray演示 ===");
int arraySize = 10;
AtomicIntegerArray atomicArray = new AtomicIntegerArray(arraySize);
// 多个线程并发更新数组元素
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
int index = j % arraySize;
int newValue = atomicArray.incrementAndGet(index);
if (j % 20 == 0) {
System.out.println("线程" + threadId + " 更新索引" + index +
" 的值为: " + newValue);
}
}
}, "ArrayThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 输出最终结果
System.out.println("\n最终数组状态:");
for (int i = 0; i < arraySize; i++) {
System.out.println("索引" + i + ": " + atomicArray.get(i));
}
}
// AtomicLongArray演示
public static void demonstrateAtomicLongArray() {
System.out.println("\n=== AtomicLongArray演示 ===");
AtomicLongArray longArray = new AtomicLongArray(5);
// 初始化数组
for (int i = 0; i < longArray.length(); i++) {
longArray.set(i, (i + 1) * 1000L);
}
System.out.println("初始数组状态:");
for (int i = 0; i < longArray.length(); i++) {
System.out.println("索引" + i + ": " + longArray.get(i));
}
// 并发累加
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < longArray.length(); j++) {
long addValue = (threadId + 1) * 100L;
long newValue = longArray.addAndGet(j, addValue);
System.out.println("线程" + threadId + " 给索引" + j +
" 增加" + addValue + ",新值: " + newValue);
}
}, "LongArrayThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("\n最终数组状态:");
for (int i = 0; i < longArray.length(); i++) {
System.out.println("索引" + i + ": " + longArray.get(i));
}
}
// AtomicReferenceArray演示
public static void demonstrateAtomicReferenceArray() {
System.out.println("\n=== AtomicReferenceArray演示 ===");
AtomicReferenceArray<String> stringArray = new AtomicReferenceArray<>(5);
// 初始化数组
for (int i = 0; i < stringArray.length(); i++) {
stringArray.set(i, "初始值-" + i);
}
System.out.println("初始数组状态:");
for (int i = 0; i < stringArray.length(); i++) {
System.out.println("索引" + i + ": " + stringArray.get(i));
}
// 并发更新字符串
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < stringArray.length(); j++) {
String currentValue = stringArray.get(j);
String newValue = currentValue + "-线程" + threadId;
if (stringArray.compareAndSet(j, currentValue, newValue)) {
System.out.println("线程" + threadId + " 成功更新索引" + j +
": " + currentValue + " -> " + newValue);
} else {
System.out.println("线程" + threadId + " 更新索引" + j + " 失败");
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "StringArrayThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("\n最终数组状态:");
for (int i = 0; i < stringArray.length(); i++) {
System.out.println("索引" + i + ": " + stringArray.get(i));
}
}
}
4.3 字段更新器
// 字段更新器演示
public class AtomicFieldUpdaterDemo {
public static void main(String[] args) {
demonstrateAtomicIntegerFieldUpdater();
demonstrateAtomicLongFieldUpdater();
demonstrateAtomicReferenceFieldUpdater();
}
// AtomicIntegerFieldUpdater演示
public static void demonstrateAtomicIntegerFieldUpdater() {
System.out.println("=== AtomicIntegerFieldUpdater演示 ===");
AtomicIntegerFieldUpdater<Counter> updater =
AtomicIntegerFieldUpdater.newUpdater(Counter.class, "count");
Counter counter = new Counter();
// 多线程更新字段
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
updater.incrementAndGet(counter);
}
System.out.println("线程" + threadId + " 完成,当前计数: " +
updater.get(counter));
}, "UpdaterThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("最终计数: " + updater.get(counter));
}
// AtomicLongFieldUpdater演示
public static void demonstrateAtomicLongFieldUpdater() {
System.out.println("\n=== AtomicLongFieldUpdater演示 ===");
AtomicLongFieldUpdater<Statistics> updater =
AtomicLongFieldUpdater.newUpdater(Statistics.class, "totalBytes");
Statistics stats = new Statistics();
// 模拟多线程统计字节数
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 100; j++) {
long bytes = (threadId + 1) * 1024L + j;
updater.addAndGet(stats, bytes);
if (j % 25 == 0) {
System.out.println("线程" + threadId + " 添加" + bytes +
" 字节,总计: " + updater.get(stats));
}
}
}, "StatsThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("最终统计字节数: " + updater.get(stats));
}
// AtomicReferenceFieldUpdater演示
public static void demonstrateAtomicReferenceFieldUpdater() {
System.out.println("\n=== AtomicReferenceFieldUpdater演示 ===");
AtomicReferenceFieldUpdater<Node, String> updater =
AtomicReferenceFieldUpdater.newUpdater(Node.class, String.class, "data");
Node node = new Node("初始数据");
// 多线程更新节点数据
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
try {
Thread.sleep((int) (Math.random() * 100));
String currentData = updater.get(node);
String newData = currentData + "-线程" + threadId;
if (updater.compareAndSet(node, currentData, newData)) {
System.out.println("线程" + threadId + " 成功更新: " +
currentData + " -> " + newData);
} else {
System.out.println("线程" + threadId + " 更新失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "NodeThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("最终节点数据: " + updater.get(node));
}
}
// 计数器类
class Counter {
public volatile int count = 0;
public int getCount() {
return count;
}
}
// 统计类
class Statistics {
public volatile long totalBytes = 0;
public long getTotalBytes() {
return totalBytes;
}
}
// 节点类
class Node {
public volatile String data;
public Node(String data) {
this.data = data;
}
public String getData() {
return data;
}
}
5. 线程池
5.1 线程池基础
// 线程池演示
public class ThreadPoolDemo {
public static void main(String[] args) {
demonstrateFixedThreadPool();
demonstrateCachedThreadPool();
demonstrateSingleThreadExecutor();
demonstrateScheduledThreadPool();
demonstrateCustomThreadPool();
}
// 固定大小线程池
public static void demonstrateFixedThreadPool() {
System.out.println("=== 固定大小线程池演示 ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交多个任务
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + " 开始执行,线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(2000); // 模拟任务执行
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + " 执行完成");
});
}
// 关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("固定大小线程池演示完成\n");
}
// 缓存线程池
public static void demonstrateCachedThreadPool() {
System.out.println("=== 缓存线程池演示 ===");
ExecutorService executor = Executors.newCachedThreadPool();
// 提交任务,观察线程创建
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("缓存池任务" + taskId + " 开始,线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("缓存池任务" + taskId + " 完成");
});
// 间隔提交
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 关闭线程池
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("缓存线程池演示完成\n");
}
// 单线程执行器
public static void demonstrateSingleThreadExecutor() {
System.out.println("=== 单线程执行器演示 ===");
ExecutorService executor = Executors.newSingleThreadExecutor();
// 提交多个任务,观察顺序执行
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("单线程任务" + taskId + " 开始执行");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("单线程任务" + taskId + " 执行完成");
});
}
// 关闭线程池
executor.shutdown();
try {
executor.awaitTermination(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("单线程执行器演示完成\n");
}
// 定时线程池
public static void demonstrateScheduledThreadPool() {
System.out.println("=== 定时线程池演示 ===");
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 延迟执行
scheduler.schedule(() -> {
System.out.println("延迟任务执行,时间: " + new Date());
}, 2, TimeUnit.SECONDS);
// 固定频率执行
ScheduledFuture<?> fixedRateTask = scheduler.scheduleAtFixedRate(() -> {
System.out.println("固定频率任务执行,时间: " + new Date());
}, 1, 1, TimeUnit.SECONDS);
// 固定延迟执行
ScheduledFuture<?> fixedDelayTask = scheduler.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟任务执行,时间: " + new Date());
try {
Thread.sleep(500); // 模拟任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 2, TimeUnit.SECONDS);
// 运行一段时间后取消
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
fixedRateTask.cancel(false);
fixedDelayTask.cancel(false);
scheduler.shutdown();
System.out.println("定时线程池演示完成\n");
}
// 自定义线程池
public static void demonstrateCustomThreadPool() {
System.out.println("=== 自定义线程池演示 ===");
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(10), // 工作队列
new ThreadFactory() { // 线程工厂
private int threadNumber = 1;
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "CustomThread-" + threadNumber++);
t.setDaemon(false);
return t;
}
},
new RejectedExecutionHandler() { // 拒绝策略
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任务被拒绝: " + r.toString());
}
}
);
// 监控线程池状态
Thread monitor = new Thread(() -> {
while (!executor.isShutdown()) {
System.out.printf("线程池状态 - 核心线程数: %d, 活跃线程数: %d, " +
"队列大小: %d, 完成任务数: %d%n",
executor.getCorePoolSize(),
executor.getActiveCount(),
executor.getQueue().size(),
executor.getCompletedTaskCount());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
monitor.setDaemon(true);
monitor.start();
// 提交任务
for (int i = 0; i < 20; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("自定义池任务" + taskId + " 开始,线程: " +
Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("自定义池任务" + taskId + " 完成");
});
} catch (RejectedExecutionException e) {
System.out.println("任务" + taskId + " 提交失败: " + e.getMessage());
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 关闭线程池
executor.shutdown();
try {
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
Thread.currentThread().interrupt();
}
System.out.println("自定义线程池演示完成");
}
}
5.2 Future和CompletableFuture
// Future和CompletableFuture演示
public class FutureDemo {
public static void main(String[] args) {
demonstrateFuture();
demonstrateCompletableFuture();
demonstrateCompletableFutureChaining();
demonstrateCompletableFutureCombining();
}
// Future基本用法
public static void demonstrateFuture() {
System.out.println("=== Future演示 ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交Callable任务
Future<Integer> future1 = executor.submit(() -> {
Thread.sleep(2000);
return 42;
});
Future<String> future2 = executor.submit(() -> {
Thread.sleep(1500);
return "Hello Future";
});
Future<Double> future3 = executor.submit(() -> {
Thread.sleep(1000);
return Math.PI;
});
try {
// 获取结果
System.out.println("等待结果...");
// 带超时的获取
Double result3 = future3.get(2, TimeUnit.SECONDS);
System.out.println("结果3: " + result3);
String result2 = future2.get();
System.out.println("结果2: " + result2);
Integer result1 = future1.get();
System.out.println("结果1: " + result1);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.out.println("获取结果时发生异常: " + e.getMessage());
}
executor.shutdown();
System.out.println("Future演示完成\n");
}
// CompletableFuture基本用法
public static void demonstrateCompletableFuture() {
System.out.println("=== CompletableFuture演示 ===");
// 异步执行
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "异步任务结果";
});
// 异步执行带回调
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
System.out.println("异步执行任务,线程: " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).thenRun(() -> {
System.out.println("任务完成后的回调");
});
// 处理结果
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
return "原始数据";
}).thenApply(data -> {
return data.toUpperCase();
}).thenApply(data -> {
return "处理后的" + data;
});
try {
System.out.println("结果1: " + future1.get());
future2.get();
System.out.println("结果3: " + future3.get());
} catch (InterruptedException | ExecutionException e) {
System.out.println("获取结果时发生异常: " + e.getMessage());
}
System.out.println("CompletableFuture演示完成\n");
}
// CompletableFuture链式操作
public static void demonstrateCompletableFutureChaining() {
System.out.println("=== CompletableFuture链式操作演示 ===");
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("步骤1: 获取用户ID");
return "user123";
})
.thenCompose(userId -> {
System.out.println("步骤2: 根据用户ID获取用户信息");
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "用户信息: " + userId;
});
})
.thenCompose(userInfo -> {
System.out.println("步骤3: 根据用户信息获取权限");
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return userInfo + " -> 权限: admin";
});
})
.whenComplete((result, throwable) -> {
if (throwable != null) {
System.out.println("处理过程中发生异常: " + throwable.getMessage());
} else {
System.out.println("最终结果: " + result);
}
});
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
System.out.println("获取结果时发生异常: " + e.getMessage());
}
System.out.println("链式操作演示完成\n");
}
// CompletableFuture组合操作
public static void demonstrateCompletableFutureCombining() {
System.out.println("=== CompletableFuture组合操作演示 ===");
// 并行执行多个任务
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务1结果";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务2结果";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(800);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任务3结果";
});
// 组合两个Future的结果
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (result1, result2) -> {
return result1 + " + " + result2;
});
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2, future3);
CompletableFuture<String> allResults = allFutures.thenApply(v -> {
try {
return "所有结果: [" + future1.get() + ", " + future2.get() + ", " + future3.get() + "]";
} catch (InterruptedException | ExecutionException e) {
return "获取结果失败: " + e.getMessage();
}
});
// 任意一个完成
CompletableFuture<String> anyResult = CompletableFuture.anyOf(future1, future2, future3)
.thenApply(result -> "最先完成的结果: " + result);
try {
System.out.println("组合结果: " + combinedFuture.get());
System.out.println(anyResult.get());
System.out.println(allResults.get());
} catch (InterruptedException | ExecutionException e) {
System.out.println("获取结果时发生异常: " + e.getMessage());
}
System.out.println("组合操作演示完成");
}
}
6. 多线程最佳实践
6.1 线程安全设计
// 线程安全设计演示
public class ThreadSafetyDemo {
public static void main(String[] args) {
demonstrateImmutableClass();
demonstrateThreadLocalUsage();
demonstrateProducerConsumerPattern();
}
// 不可变类演示
public static void demonstrateImmutableClass() {
System.out.println("=== 不可变类演示 ===");
ImmutablePerson person = new ImmutablePerson("张三", 25,
Arrays.asList("reading", "swimming"));
// 多线程访问不可变对象
Thread[] threads = new Thread[5];
for (int i = 0; i < threads.length; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
System.out.println("线程" + threadId + " 访问: " + person);
// 创建新的不可变对象
ImmutablePerson newPerson = person.withAge(person.getAge() + 1);
System.out.println("线程" + threadId + " 创建新对象: " + newPerson);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "ImmutableThread-" + i);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("原始对象仍然不变: " + person);
System.out.println("不可变类演示完成\n");
}
// ThreadLocal使用演示
public static void demonstrateThreadLocalUsage() {
System.out.println("=== ThreadLocal使用演示 ===");
UserContext userContext = new UserContext();
// 多线程设置和获取用户上下文
Thread[] threads = new Thread[3];
for (int i = 0; i < threads.length; i++) {
final int userId = i + 1;
threads[i] = new Thread(() -> {
// 设置当前线程的用户信息
userContext.setCurrentUser("用户" + userId);
// 模拟业务操作
businessOperation1(userContext);
businessOperation2(userContext);
// 清理ThreadLocal
userContext.clear();
}, "UserThread-" + userId);
}
// 启动并等待完成
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("ThreadLocal演示完成\n");
}
private static void businessOperation1(UserContext userContext) {
System.out.println("业务操作1 - 当前用户: " + userContext.getCurrentUser() +
", 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private static void businessOperation2(UserContext userContext) {
System.out.println("业务操作2 - 当前用户: " + userContext.getCurrentUser() +
", 线程: " + Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 生产者消费者模式演示
public static void demonstrateProducerConsumerPattern() {
System.out.println("=== 生产者消费者模式演示 ===");
BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
String item = "商品" + i;
queue.put(item);
System.out.println("生产者生产: " + item + ", 队列大小: " + queue.size());
Thread.sleep(200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
// 消费者1
Thread consumer1 = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
String item = queue.take();
System.out.println("消费者1消费: " + item + ", 队列大小: " + queue.size());
Thread.sleep(300);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer1");
// 消费者2
Thread consumer2 = new Thread(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
String item = queue.take();
System.out.println("消费者2消费: " + item + ", 队列大小: " + queue.size());
Thread.sleep(500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer2");
producer.start();
consumer1.start();
consumer2.start();
try {
producer.join(); // 等待生产者完成
Thread.sleep(3000); // 等待消费者处理完剩余商品
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
consumer1.interrupt();
consumer2.interrupt();
System.out.println("生产者消费者模式演示完成");
}
}
// 不可变Person类
final class ImmutablePerson {
private final String name;
private final int age;
private final List<String> hobbies;
public ImmutablePerson(String name, int age, List<String> hobbies) {
this.name = name;
this.age = age;
this.hobbies = Collections.unmodifiableList(new ArrayList<>(hobbies));
}
public String getName() {
return name;
}
public int getAge() {
return age;
}
public List<String> getHobbies() {
return hobbies;
}
// 创建新的不可变对象
public ImmutablePerson withAge(int newAge) {
return new ImmutablePerson(this.name, newAge, this.hobbies);
}
public ImmutablePerson withName(String newName) {
return new ImmutablePerson(newName, this.age, this.hobbies);
}
@Override
public String toString() {
return "ImmutablePerson{name='" + name + "', age=" + age +
", hobbies=" + hobbies + "}";
}
}
// 用户上下文类
class UserContext {
private static final ThreadLocal<String> currentUser = new ThreadLocal<>();
public void setCurrentUser(String user) {
currentUser.set(user);
}
public String getCurrentUser() {
return currentUser.get();
}
public void clear() {
currentUser.remove();
}
}
本章小结
多线程技术对比
技术 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
synchronized | 简单同步需求 | 使用简单,JVM优化 | 性能相对较低,不可中断 |
ReentrantLock | 复杂同步需求 | 功能丰富,可中断 | 使用复杂,需手动释放 |
volatile | 简单状态标志 | 轻量级,保证可见性 | 不保证原子性 |
原子类 | 简单原子操作 | 无锁,高性能 | 只适用于简单操作 |
线程池 | 任务执行管理 | 资源复用,管理方便 | 配置复杂 |
并发工具类 | 特定协调场景 | 功能专一,使用方便 | 学习成本高 |
性能优化要点
- 减少锁的粒度:使用更细粒度的锁
- 避免锁竞争:减少临界区代码
- 使用无锁算法:原子类、CAS操作
- 合理使用线程池:避免频繁创建销毁线程
- 选择合适的并发工具:根据场景选择最适合的工具
安全考虑
- 避免死锁:统一锁顺序,使用超时
- 防止内存泄漏:及时清理ThreadLocal
- 异常处理:确保锁能正确释放
- 资源管理:正确关闭线程池
开发建议
- 优先使用高级并发工具:如线程池、并发集合
- 遵循不可变设计:减少共享可变状态
- 合理使用ThreadLocal:避免滥用
- 充分测试:并发代码需要充分的压力测试
- 性能监控:监控线程池状态和锁竞争情况
下一章预告
下一章我们将学习网络编程,包括: - Socket编程基础 - NIO和AIO - HTTP客户端编程 - 网络协议处理 - 分布式通信
练习题
基础练习:实现一个线程安全的计数器,支持增加、减少和获取操作
进阶练习:使用生产者消费者模式实现一个任务调度系统
综合练习:设计一个多线程的文件下载器,支持断点续传和进度显示
性能练习:比较不同同步机制的性能差异
实战练习:实现一个简单的线程池监控系统 “`