11.1 线程基础
11.1.1 线程与进程
进程(Process): - 是操作系统分配资源的基本单位 - 拥有独立的内存空间 - 进程间通信需要特殊机制(IPC)
线程(Thread): - 是CPU调度的基本单位 - 同一进程内的线程共享内存空间 - 线程间通信更加便捷 - 创建和切换开销较小
// 获取当前线程信息
public class ThreadInfo {
public static void main(String[] args) {
Thread currentThread = Thread.currentThread();
System.out.println("线程名称: " + currentThread.getName());
System.out.println("线程ID: " + currentThread.getId());
System.out.println("线程优先级: " + currentThread.getPriority());
System.out.println("线程状态: " + currentThread.getState());
System.out.println("是否为守护线程: " + currentThread.isDaemon());
System.out.println("是否存活: " + currentThread.isAlive());
// 获取线程组信息
ThreadGroup group = currentThread.getThreadGroup();
System.out.println("线程组名称: " + group.getName());
System.out.println("活跃线程数: " + group.activeCount());
}
}
11.1.2 线程的生命周期
Java线程有以下几种状态:
- NEW(新建):线程对象已创建,但还未调用start()方法
- RUNNABLE(可运行):线程正在JVM中执行,可能正在运行或等待CPU时间片
- BLOCKED(阻塞):线程被阻塞,等待监视器锁
- WAITING(等待):线程无限期等待另一个线程执行特定操作
- TIMED_WAITING(超时等待):线程等待另一个线程执行操作,但有时间限制
- TERMINATED(终止):线程执行完毕
// 线程状态演示
public class ThreadStateDemo {
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(() -> {
try {
System.out.println("线程开始执行");
Thread.sleep(2000); // TIMED_WAITING
System.out.println("线程执行完毕");
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
});
System.out.println("创建后状态: " + thread.getState()); // NEW
thread.start();
System.out.println("启动后状态: " + thread.getState()); // RUNNABLE
Thread.sleep(500);
System.out.println("睡眠中状态: " + thread.getState()); // TIMED_WAITING
thread.join(); // 等待线程结束
System.out.println("结束后状态: " + thread.getState()); // TERMINATED
}
}
11.1.3 线程的创建与启动
方法一:继承Thread类
// 继承Thread类创建线程
class MyThread extends Thread {
private String threadName;
public MyThread(String name) {
this.threadName = name;
}
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
System.out.println(threadName + " 执行第 " + i + " 次");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(threadName + " 被中断");
return;
}
}
System.out.println(threadName + " 执行完毕");
}
}
public class ThreadDemo1 {
public static void main(String[] args) {
MyThread thread1 = new MyThread("线程1");
MyThread thread2 = new MyThread("线程2");
thread1.start();
thread2.start();
System.out.println("主线程继续执行");
}
}
方法二:实现Runnable接口
// 实现Runnable接口创建线程
class MyRunnable implements Runnable {
private String taskName;
public MyRunnable(String name) {
this.taskName = name;
}
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
System.out.println(taskName + " 执行第 " + i + " 次 - " +
Thread.currentThread().getName());
try {
Thread.sleep(800);
} catch (InterruptedException e) {
System.out.println(taskName + " 被中断");
return;
}
}
System.out.println(taskName + " 执行完毕");
}
}
public class ThreadDemo2 {
public static void main(String[] args) {
MyRunnable task1 = new MyRunnable("任务1");
MyRunnable task2 = new MyRunnable("任务2");
Thread thread1 = new Thread(task1, "工作线程1");
Thread thread2 = new Thread(task2, "工作线程2");
thread1.start();
thread2.start();
// 使用Lambda表达式创建线程
Thread thread3 = new Thread(() -> {
System.out.println("Lambda线程: " + Thread.currentThread().getName());
}, "Lambda线程");
thread3.start();
}
}
方法三:使用Callable和Future
import java.util.concurrent.*;
// 使用Callable创建有返回值的线程
class CalculationTask implements Callable<Integer> {
private int number;
public CalculationTask(int number) {
this.number = number;
}
@Override
public Integer call() throws Exception {
System.out.println("开始计算: " + number + "的平方");
Thread.sleep(2000); // 模拟耗时操作
int result = number * number;
System.out.println("计算完成: " + number + "^2 = " + result);
return result;
}
}
public class CallableDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交多个计算任务
Future<Integer> future1 = executor.submit(new CalculationTask(5));
Future<Integer> future2 = executor.submit(new CalculationTask(10));
Future<Integer> future3 = executor.submit(new CalculationTask(15));
try {
// 获取计算结果
System.out.println("结果1: " + future1.get());
System.out.println("结果2: " + future2.get());
System.out.println("结果3: " + future3.get());
// 计算总和
int sum = future1.get() + future2.get() + future3.get();
System.out.println("总和: " + sum);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}
11.2 线程同步
11.2.1 同步问题的产生
当多个线程同时访问共享资源时,可能会出现数据不一致的问题:
// 线程不安全的计数器
class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // 这不是原子操作!
}
public int getCount() {
return count;
}
}
public class UnsafeCounterDemo {
public static void main(String[] args) throws InterruptedException {
UnsafeCounter counter = new UnsafeCounter();
// 创建1000个线程,每个线程增加计数器1000次
Thread[] threads = new Thread[1000];
for (int i = 0; i < 1000; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
}
// 启动所有线程
for (Thread thread : threads) {
thread.start();
}
// 等待所有线程完成
for (Thread thread : threads) {
thread.join();
}
System.out.println("期望结果: 1000000");
System.out.println("实际结果: " + counter.getCount());
// 实际结果通常小于1000000,说明存在线程安全问题
}
}
11.2.2 synchronized关键字
同步方法
// 线程安全的计数器 - 使用同步方法
class SafeCounter {
private int count = 0;
// 同步方法
public synchronized void increment() {
count++;
}
public synchronized void decrement() {
count--;
}
public synchronized int getCount() {
return count;
}
// 静态同步方法
private static int staticCount = 0;
public static synchronized void incrementStatic() {
staticCount++;
}
public static synchronized int getStaticCount() {
return staticCount;
}
}
public class SafeCounterDemo {
public static void main(String[] args) throws InterruptedException {
SafeCounter counter = new SafeCounter();
Thread[] threads = new Thread[1000];
for (int i = 0; i < 1000; i++) {
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("同步方法结果: " + counter.getCount());
// 结果应该是1000000
}
}
同步代码块
// 使用同步代码块
class BankAccount {
private double balance;
private final Object lock = new Object();
public BankAccount(double initialBalance) {
this.balance = initialBalance;
}
public void deposit(double amount) {
synchronized(lock) {
double newBalance = balance + amount;
// 模拟处理时间
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
balance = newBalance;
System.out.println(Thread.currentThread().getName() +
" 存款: " + amount + ", 余额: " + balance);
}
}
public boolean withdraw(double amount) {
synchronized(lock) {
if (balance >= amount) {
double newBalance = balance - amount;
// 模拟处理时间
try {
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
balance = newBalance;
System.out.println(Thread.currentThread().getName() +
" 取款: " + amount + ", 余额: " + balance);
return true;
} else {
System.out.println(Thread.currentThread().getName() +
" 取款失败,余额不足: " + balance);
return false;
}
}
}
public double getBalance() {
synchronized(lock) {
return balance;
}
}
}
public class BankAccountDemo {
public static void main(String[] args) throws InterruptedException {
BankAccount account = new BankAccount(1000);
// 创建多个线程进行并发操作
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
if (threadId % 2 == 0) {
account.deposit(100);
} else {
account.withdraw(150);
}
}, "Thread-" + threadId);
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("最终余额: " + account.getBalance());
}
}
11.2.3 volatile关键字
// volatile关键字演示
public class VolatileDemo {
// 不使用volatile可能导致线程看不到变化
private static volatile boolean flag = false;
private static int counter = 0;
public static void main(String[] args) throws InterruptedException {
// 线程1:修改flag的值
Thread thread1 = new Thread(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
flag = true;
System.out.println("线程1设置flag为true");
});
// 线程2:监听flag的变化
Thread thread2 = new Thread(() -> {
while (!flag) {
counter++;
// 不加这个输出,可能会因为JIT优化导致死循环
if (counter % 100000000 == 0) {
System.out.println("线程2还在等待,计数: " + counter);
}
}
System.out.println("线程2检测到flag变化,退出循环,最终计数: " + counter);
});
thread2.start();
thread1.start();
thread1.join();
thread2.join();
System.out.println("程序结束");
}
}
// volatile的典型应用:单例模式(双重检查锁定)
class Singleton {
private static volatile Singleton instance;
private Singleton() {
// 私有构造函数
}
public static Singleton getInstance() {
if (instance == null) {
synchronized (Singleton.class) {
if (instance == null) {
instance = new Singleton();
}
}
}
return instance;
}
}
11.2.4 Lock接口和ReentrantLock
import java.util.concurrent.locks.*;
import java.util.concurrent.TimeUnit;
// 使用ReentrantLock的示例
class AdvancedCounter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
count++;
} finally {
lock.unlock();
}
}
public boolean tryIncrement() {
if (lock.tryLock()) {
try {
count++;
return true;
} finally {
lock.unlock();
}
}
return false;
}
public boolean tryIncrementWithTimeout() {
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
count++;
Thread.sleep(500); // 模拟耗时操作
return true;
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
public void printLockInfo() {
System.out.println("锁的持有者数量: " + lock.getHoldCount());
System.out.println("等待锁的线程数: " + lock.getQueueLength());
System.out.println("是否为公平锁: " + lock.isFair());
}
}
public class ReentrantLockDemo {
public static void main(String[] args) throws InterruptedException {
AdvancedCounter counter = new AdvancedCounter();
// 测试基本锁功能
Thread[] threads = new Thread[5];
for (int i = 0; i < 5; i++) {
final int threadId = i;
threads[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
counter.increment();
}
System.out.println("线程" + threadId + "完成");
});
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
System.out.println("基本锁测试结果: " + counter.getCount());
// 测试tryLock功能
Thread tryLockThread1 = new Thread(() -> {
if (counter.tryIncrement()) {
System.out.println("tryLock成功");
} else {
System.out.println("tryLock失败");
}
});
Thread tryLockThread2 = new Thread(() -> {
if (counter.tryIncrementWithTimeout()) {
System.out.println("tryLock with timeout成功");
} else {
System.out.println("tryLock with timeout失败");
}
});
tryLockThread1.start();
tryLockThread2.start();
tryLockThread1.join();
tryLockThread2.join();
System.out.println("最终结果: " + counter.getCount());
counter.printLockInfo();
}
}
11.2.5 ReadWriteLock读写锁
import java.util.concurrent.locks.*;
import java.util.*;
// 使用读写锁的缓存示例
class ReadWriteCache {
private final Map<String, String> cache = new HashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock();
private final Lock writeLock = lock.writeLock();
public String get(String key) {
readLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 读取: " + key);
Thread.sleep(100); // 模拟读取耗时
return cache.get(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
readLock.unlock();
}
}
public void put(String key, String value) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 写入: " + key + " = " + value);
Thread.sleep(200); // 模拟写入耗时
cache.put(key, value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
}
}
public void remove(String key) {
writeLock.lock();
try {
System.out.println(Thread.currentThread().getName() + " 删除: " + key);
Thread.sleep(150); // 模拟删除耗时
cache.remove(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
writeLock.unlock();
}
}
public int size() {
readLock.lock();
try {
return cache.size();
} finally {
readLock.unlock();
}
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) throws InterruptedException {
ReadWriteCache cache = new ReadWriteCache();
// 初始化一些数据
cache.put("key1", "value1");
cache.put("key2", "value2");
cache.put("key3", "value3");
// 创建多个读线程
Thread[] readThreads = new Thread[5];
for (int i = 0; i < 5; i++) {
final int threadId = i;
readThreads[i] = new Thread(() -> {
for (int j = 0; j < 3; j++) {
String value = cache.get("key" + (j + 1));
System.out.println("读线程" + threadId + " 读取到: " + value);
}
}, "ReadThread-" + i);
}
// 创建写线程
Thread writeThread = new Thread(() -> {
cache.put("key4", "value4");
cache.put("key5", "value5");
}, "WriteThread");
// 启动所有线程
for (Thread thread : readThreads) {
thread.start();
}
writeThread.start();
// 等待所有线程完成
for (Thread thread : readThreads) {
thread.join();
}
writeThread.join();
System.out.println("缓存大小: " + cache.size());
}
}
11.3 线程间通信
11.3.1 wait()、notify()和notifyAll()
// 生产者消费者模式
class SharedBuffer {
private final Object[] buffer;
private int count = 0;
private int in = 0;
private int out = 0;
public SharedBuffer(int size) {
buffer = new Object[size];
}
public synchronized void put(Object item) throws InterruptedException {
while (count == buffer.length) {
System.out.println(Thread.currentThread().getName() + " 缓冲区满,等待...");
wait(); // 缓冲区满,等待
}
buffer[in] = item;
in = (in + 1) % buffer.length;
count++;
System.out.println(Thread.currentThread().getName() + " 生产: " + item +
", 缓冲区数量: " + count);
notifyAll(); // 通知等待的消费者
}
public synchronized Object take() throws InterruptedException {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + " 缓冲区空,等待...");
wait(); // 缓冲区空,等待
}
Object item = buffer[out];
buffer[out] = null;
out = (out + 1) % buffer.length;
count--;
System.out.println(Thread.currentThread().getName() + " 消费: " + item +
", 缓冲区数量: " + count);
notifyAll(); // 通知等待的生产者
return item;
}
}
// 生产者
class Producer implements Runnable {
private final SharedBuffer buffer;
private final String name;
public Producer(SharedBuffer buffer, String name) {
this.buffer = buffer;
this.name = name;
}
@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
String item = name + "-Item" + i;
buffer.put(item);
Thread.sleep(1000); // 模拟生产时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
// 消费者
class Consumer implements Runnable {
private final SharedBuffer buffer;
private final String name;
public Consumer(SharedBuffer buffer, String name) {
this.buffer = buffer;
this.name = name;
}
@Override
public void run() {
try {
for (int i = 1; i <= 5; i++) {
Object item = buffer.take();
Thread.sleep(1500); // 模拟消费时间
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public class ProducerConsumerDemo {
public static void main(String[] args) throws InterruptedException {
SharedBuffer buffer = new SharedBuffer(3); // 缓冲区大小为3
// 创建生产者和消费者
Thread producer1 = new Thread(new Producer(buffer, "生产者1"));
Thread producer2 = new Thread(new Producer(buffer, "生产者2"));
Thread consumer1 = new Thread(new Consumer(buffer, "消费者1"));
Thread consumer2 = new Thread(new Consumer(buffer, "消费者2"));
// 启动线程
producer1.start();
producer2.start();
consumer1.start();
consumer2.start();
// 等待所有线程完成
producer1.join();
producer2.join();
consumer1.join();
consumer2.join();
System.out.println("所有线程执行完毕");
}
}
11.3.2 Condition接口
import java.util.concurrent.locks.*;
// 使用Condition实现的生产者消费者
class AdvancedBuffer {
private final Object[] buffer;
private int count = 0;
private int in = 0;
private int out = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public AdvancedBuffer(int size) {
buffer = new Object[size];
}
public void put(Object item) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) {
System.out.println(Thread.currentThread().getName() + " 等待缓冲区不满");
notFull.await();
}
buffer[in] = item;
in = (in + 1) % buffer.length;
count++;
System.out.println(Thread.currentThread().getName() + " 生产: " + item +
", 缓冲区数量: " + count);
notEmpty.signalAll(); // 通知消费者
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
System.out.println(Thread.currentThread().getName() + " 等待缓冲区不空");
notEmpty.await();
}
Object item = buffer[out];
buffer[out] = null;
out = (out + 1) % buffer.length;
count--;
System.out.println(Thread.currentThread().getName() + " 消费: " + item +
", 缓冲区数量: " + count);
notFull.signalAll(); // 通知生产者
return item;
} finally {
lock.unlock();
}
}
}
public class ConditionDemo {
public static void main(String[] args) throws InterruptedException {
AdvancedBuffer buffer = new AdvancedBuffer(2);
// 创建多个生产者和消费者
Thread[] producers = new Thread[2];
Thread[] consumers = new Thread[2];
for (int i = 0; i < 2; i++) {
final int id = i;
producers[i] = new Thread(() -> {
try {
for (int j = 1; j <= 3; j++) {
buffer.put("P" + id + "-" + j);
Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer-" + i);
consumers[i] = new Thread(() -> {
try {
for (int j = 1; j <= 3; j++) {
buffer.take();
Thread.sleep(1200);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer-" + i);
}
// 启动所有线程
for (int i = 0; i < 2; i++) {
producers[i].start();
consumers[i].start();
}
// 等待所有线程完成
for (int i = 0; i < 2; i++) {
producers[i].join();
consumers[i].join();
}
System.out.println("程序执行完毕");
}
}
11.4 并发工具类
11.4.1 CountDownLatch
import java.util.concurrent.*;
// CountDownLatch示例:等待所有任务完成
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
int taskCount = 5;
CountDownLatch latch = new CountDownLatch(taskCount);
System.out.println("开始执行任务...");
// 创建并启动多个任务
for (int i = 1; i <= taskCount; i++) {
final int taskId = i;
new Thread(() -> {
try {
System.out.println("任务" + taskId + "开始执行");
Thread.sleep((int)(Math.random() * 3000) + 1000); // 随机执行时间
System.out.println("任务" + taskId + "执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 任务完成,计数器减1
}
}, "Task-" + taskId).start();
}
System.out.println("等待所有任务完成...");
latch.await(); // 等待计数器归零
System.out.println("所有任务已完成!");
}
}
// 实际应用:服务启动协调
class ServiceManager {
private final CountDownLatch startupLatch;
private final String[] services = {"数据库", "缓存", "消息队列", "Web服务", "监控服务"};
public ServiceManager() {
this.startupLatch = new CountDownLatch(services.length);
}
public void startAllServices() {
System.out.println("开始启动所有服务...");
for (String service : services) {
new Thread(() -> {
try {
System.out.println(service + " 正在启动...");
Thread.sleep((int)(Math.random() * 2000) + 500);
System.out.println(service + " 启动完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println(service + " 启动失败");
} finally {
startupLatch.countDown();
}
}, service + "-Thread").start();
}
}
public void waitForAllServices() throws InterruptedException {
startupLatch.await();
System.out.println("所有服务启动完成,系统就绪!");
}
public static void main(String[] args) throws InterruptedException {
ServiceManager manager = new ServiceManager();
manager.startAllServices();
manager.waitForAllServices();
}
}
11.4.2 CyclicBarrier
import java.util.concurrent.*;
// CyclicBarrier示例:多线程协同工作
public class CyclicBarrierDemo {
public static void main(String[] args) {
int playerCount = 4;
CyclicBarrier barrier = new CyclicBarrier(playerCount, () -> {
System.out.println("所有玩家准备就绪,游戏开始!\n");
});
// 创建玩家线程
for (int i = 1; i <= playerCount; i++) {
final int playerId = i;
new Thread(() -> {
try {
// 第一阶段:加载游戏资源
System.out.println("玩家" + playerId + " 正在加载游戏资源...");
Thread.sleep((int)(Math.random() * 2000) + 1000);
System.out.println("玩家" + playerId + " 资源加载完成");
barrier.await(); // 等待所有玩家加载完成
// 第二阶段:选择角色
System.out.println("玩家" + playerId + " 正在选择角色...");
Thread.sleep((int)(Math.random() * 1500) + 500);
System.out.println("玩家" + playerId + " 角色选择完成");
barrier.await(); // 等待所有玩家选择角色
// 第三阶段:进入游戏
System.out.println("玩家" + playerId + " 进入游戏世界");
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println("玩家" + playerId + " 遇到错误: " + e.getMessage());
}
}, "Player-" + playerId).start();
}
}
}
// 实际应用:并行计算
class ParallelCalculation {
private final int[] data;
private final int[] results;
private final CyclicBarrier barrier;
private final int threadCount;
public ParallelCalculation(int[] data, int threadCount) {
this.data = data;
this.results = new int[data.length];
this.threadCount = threadCount;
this.barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有线程计算完成,开始汇总结果");
});
}
public void calculate() throws InterruptedException {
int chunkSize = data.length / threadCount;
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
final int start = i * chunkSize;
final int end = (i == threadCount - 1) ? data.length : (i + 1) * chunkSize;
new Thread(() -> {
try {
System.out.println("线程" + threadId + " 开始计算范围 [" + start + ", " + end + ")");
// 执行计算(这里简单地将每个元素乘以2)
for (int j = start; j < end; j++) {
results[j] = data[j] * 2;
Thread.sleep(10); // 模拟计算时间
}
System.out.println("线程" + threadId + " 计算完成");
barrier.await(); // 等待所有线程完成
// 所有线程都完成后,可以进行后续处理
if (threadId == 0) {
int sum = 0;
for (int result : results) {
sum += result;
}
System.out.println("计算结果总和: " + sum);
}
} catch (InterruptedException | BrokenBarrierException e) {
System.out.println("线程" + threadId + " 遇到错误: " + e.getMessage());
}
}, "CalcThread-" + threadId).start();
}
}
public static void main(String[] args) throws InterruptedException {
int[] data = new int[20];
for (int i = 0; i < data.length; i++) {
data[i] = i + 1;
}
ParallelCalculation calc = new ParallelCalculation(data, 4);
calc.calculate();
Thread.sleep(5000); // 等待计算完成
}
}
11.4.3 Semaphore
import java.util.concurrent.*;
// Semaphore示例:限制资源访问
public class SemaphoreDemo {
public static void main(String[] args) {
// 模拟停车场,只有3个停车位
Semaphore parkingLot = new Semaphore(3);
// 创建10辆车尝试停车
for (int i = 1; i <= 10; i++) {
final int carId = i;
new Thread(() -> {
try {
System.out.println("车辆" + carId + " 到达停车场");
parkingLot.acquire(); // 获取停车位
System.out.println("车辆" + carId + " 成功停车,剩余车位: " +
parkingLot.availablePermits());
// 模拟停车时间
Thread.sleep((int)(Math.random() * 3000) + 1000);
System.out.println("车辆" + carId + " 离开停车场");
parkingLot.release(); // 释放停车位
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Car-" + carId).start();
}
}
}
// 实际应用:数据库连接池
class DatabaseConnectionPool {
private final Semaphore connections;
private final String[] connectionPool;
private final boolean[] used;
public DatabaseConnectionPool(int poolSize) {
this.connections = new Semaphore(poolSize);
this.connectionPool = new String[poolSize];
this.used = new boolean[poolSize];
// 初始化连接池
for (int i = 0; i < poolSize; i++) {
connectionPool[i] = "Connection-" + (i + 1);
}
}
public String getConnection() throws InterruptedException {
connections.acquire(); // 获取连接许可
synchronized (this) {
for (int i = 0; i < connectionPool.length; i++) {
if (!used[i]) {
used[i] = true;
System.out.println(Thread.currentThread().getName() +
" 获取连接: " + connectionPool[i]);
return connectionPool[i];
}
}
}
return null; // 理论上不会到达这里
}
public void releaseConnection(String connection) {
synchronized (this) {
for (int i = 0; i < connectionPool.length; i++) {
if (connectionPool[i].equals(connection) && used[i]) {
used[i] = false;
System.out.println(Thread.currentThread().getName() +
" 释放连接: " + connection);
break;
}
}
}
connections.release(); // 释放连接许可
}
public int getAvailableConnections() {
return connections.availablePermits();
}
public static void main(String[] args) throws InterruptedException {
DatabaseConnectionPool pool = new DatabaseConnectionPool(3);
// 创建多个线程模拟数据库操作
for (int i = 1; i <= 8; i++) {
final int taskId = i;
new Thread(() -> {
try {
String connection = pool.getConnection();
// 模拟数据库操作
System.out.println("任务" + taskId + " 正在执行数据库操作...");
Thread.sleep((int)(Math.random() * 2000) + 1000);
System.out.println("任务" + taskId + " 数据库操作完成");
pool.releaseConnection(connection);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Task-" + taskId).start();
}
// 监控连接池状态
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
System.out.println("[监控] 可用连接数: " + pool.getAvailableConnections());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Monitor").start();
}
}
11.5 线程池
11.5.1 线程池概念
线程池是一种多线程处理形式,它预先创建了若干个线程,这些线程在没有任务处理时处于等待状态,当有任务来临时就可以获取一个线程来处理这个任务,任务处理完后线程并不会被销毁,而是等待下一个任务。
线程池的优势: 1. 降低资源消耗:重复利用已创建的线程,降低线程创建和销毁造成的消耗 2. 提高响应速度:任务到达时,无需等待线程创建就能立即执行 3. 提高线程的可管理性:统一分配、调优和监控 4. 提供更多功能:定时执行、周期执行等
11.5.2 ExecutorService接口
import java.util.concurrent.*;
import java.util.*;
// ExecutorService基本使用
public class ExecutorServiceDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
// 创建固定大小的线程池
ExecutorService executor = Executors.newFixedThreadPool(3);
System.out.println("提交任务到线程池...");
// 提交Runnable任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务" + taskId + " 开始执行 - " +
Thread.currentThread().getName());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + " 执行完成");
});
}
// 提交Callable任务
List<Future<Integer>> futures = new ArrayList<>();
for (int i = 1; i <= 3; i++) {
final int num = i;
Future<Integer> future = executor.submit(() -> {
System.out.println("计算任务" + num + " 开始 - " +
Thread.currentThread().getName());
Thread.sleep(1000);
int result = num * num;
System.out.println("计算任务" + num + " 完成,结果: " + result);
return result;
});
futures.add(future);
}
// 获取Callable任务的结果
System.out.println("\n获取计算结果:");
for (int i = 0; i < futures.size(); i++) {
Integer result = futures.get(i).get();
System.out.println("计算任务" + (i + 1) + " 结果: " + result);
}
// 关闭线程池
executor.shutdown();
// 等待所有任务完成
if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("强制关闭线程池");
executor.shutdownNow();
}
System.out.println("所有任务完成,程序结束");
}
}
11.5.3 不同类型的线程池
import java.util.concurrent.*;
// 不同类型线程池的演示
public class ThreadPoolTypesDemo {
// 固定大小线程池
public static void fixedThreadPoolDemo() {
System.out.println("=== 固定大小线程池演示 ===");
ExecutorService executor = Executors.newFixedThreadPool(2);
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Fixed Pool - 任务" + taskId + " - " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
// 缓存线程池
public static void cachedThreadPoolDemo() {
System.out.println("\n=== 缓存线程池演示 ===");
ExecutorService executor = Executors.newCachedThreadPool();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Cached Pool - 任务" + taskId + " - " +
Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
// 单线程线程池
public static void singleThreadExecutorDemo() {
System.out.println("\n=== 单线程线程池演示 ===");
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Single Thread - 任务" + taskId + " - " +
Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
executor.shutdown();
}
// 定时任务线程池
public static void scheduledThreadPoolDemo() throws InterruptedException {
System.out.println("\n=== 定时任务线程池演示 ===");
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
// 延迟执行
executor.schedule(() -> {
System.out.println("延迟任务执行 - " + Thread.currentThread().getName());
}, 2, TimeUnit.SECONDS);
// 固定频率执行
ScheduledFuture<?> future1 = executor.scheduleAtFixedRate(() -> {
System.out.println("固定频率任务 - " + Thread.currentThread().getName() +
" - " + System.currentTimeMillis());
}, 1, 1, TimeUnit.SECONDS);
// 固定延迟执行
ScheduledFuture<?> future2 = executor.scheduleWithFixedDelay(() -> {
System.out.println("固定延迟任务 - " + Thread.currentThread().getName());
try {
Thread.sleep(500); // 任务执行时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, 1, 1, TimeUnit.SECONDS);
// 运行5秒后取消定时任务
Thread.sleep(5000);
future1.cancel(true);
future2.cancel(true);
executor.shutdown();
}
public static void main(String[] args) throws InterruptedException {
fixedThreadPoolDemo();
Thread.sleep(3000);
cachedThreadPoolDemo();
Thread.sleep(3000);
singleThreadExecutorDemo();
Thread.sleep(3000);
scheduledThreadPoolDemo();
}
}
11.5.4 ThreadPoolExecutor详解
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
// 自定义ThreadPoolExecutor
public class CustomThreadPoolDemo {
// 自定义线程工厂
static class CustomThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
public CustomThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
thread.setDaemon(false);
thread.setPriority(Thread.NORM_PRIORITY);
return thread;
}
}
// 自定义拒绝策略
static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("任务被拒绝: " + r.toString() +
", 线程池状态: 活跃线程=" + executor.getActiveCount() +
", 队列大小=" + executor.getQueue().size());
}
}
public static void main(String[] args) throws InterruptedException {
// 创建自定义线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, // 核心线程数
4, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new ArrayBlockingQueue<>(3), // 工作队列
new CustomThreadFactory("MyPool"), // 线程工厂
new CustomRejectedExecutionHandler() // 拒绝策略
);
// 监控线程池状态
Thread monitorThread = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println(String.format(
"[监控] 核心线程数: %d, 当前线程数: %d, 活跃线程数: %d, " +
"队列任务数: %d, 已完成任务数: %d",
executor.getCorePoolSize(),
executor.getPoolSize(),
executor.getActiveCount(),
executor.getQueue().size(),
executor.getCompletedTaskCount()
));
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
monitorThread.setDaemon(true);
monitorThread.start();
// 提交任务
for (int i = 1; i <= 10; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("任务" + taskId + " 开始执行 - " +
Thread.currentThread().getName());
try {
Thread.sleep(3000); // 模拟耗时任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("任务" + taskId + " 执行完成");
});
Thread.sleep(200); // 控制任务提交速度
} catch (RejectedExecutionException e) {
System.out.println("任务" + taskId + " 提交失败: " + e.getMessage());
}
}
// 等待任务完成
Thread.sleep(15000);
// 关闭线程池
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
monitorThread.interrupt();
System.out.println("程序结束");
}
}
11.5.5 Future和CompletableFuture
import java.util.concurrent.*;
import java.util.*;
// Future基本使用
public class FutureDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(3);
// 提交Callable任务
Future<String> future1 = executor.submit(() -> {
Thread.sleep(2000);
return "任务1完成";
});
Future<String> future2 = executor.submit(() -> {
Thread.sleep(1000);
return "任务2完成";
});
Future<String> future3 = executor.submit(() -> {
Thread.sleep(3000);
return "任务3完成";
});
System.out.println("任务已提交,开始等待结果...");
// 检查任务状态
System.out.println("任务1是否完成: " + future1.isDone());
System.out.println("任务2是否完成: " + future2.isDone());
System.out.println("任务3是否完成: " + future3.isDone());
// 获取结果(阻塞等待)
System.out.println("结果1: " + future1.get());
System.out.println("结果2: " + future2.get());
// 带超时的获取结果
try {
String result3 = future3.get(1, TimeUnit.SECONDS);
System.out.println("结果3: " + result3);
} catch (TimeoutException e) {
System.out.println("任务3超时,取消任务");
future3.cancel(true);
}
executor.shutdown();
}
}
// CompletableFuture演示
class CompletableFutureDemo {
public static void main(String[] args) throws InterruptedException, ExecutionException {
System.out.println("=== CompletableFuture基本使用 ===");
// 创建已完成的Future
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello");
System.out.println("已完成的Future: " + completedFuture.get());
// 异步执行
CompletableFuture<String> asyncFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "异步任务完成";
});
System.out.println("异步任务结果: " + asyncFuture.get());
// 链式操作
CompletableFuture<String> chainFuture = CompletableFuture
.supplyAsync(() -> {
System.out.println("第一步: 获取数据");
return "原始数据";
})
.thenApply(data -> {
System.out.println("第二步: 处理数据 - " + data);
return data.toUpperCase();
})
.thenApply(data -> {
System.out.println("第三步: 格式化数据 - " + data);
return "[" + data + "]";
});
System.out.println("链式操作结果: " + chainFuture.get());
// 组合多个Future
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Future1";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(800);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Future2";
});
// 等待所有完成
CompletableFuture<String> combinedFuture = future1.thenCombine(future2,
(result1, result2) -> result1 + " + " + result2);
System.out.println("组合结果: " + combinedFuture.get());
// 任意一个完成
CompletableFuture<String> anyFuture = future1.applyToEither(future2,
result -> "最快完成: " + result);
System.out.println("最快结果: " + anyFuture.get());
}
}
11.6 并发集合
11.6.1 ConcurrentHashMap
import java.util.concurrent.*;
import java.util.*;
// ConcurrentHashMap演示
public class ConcurrentHashMapDemo {
public static void main(String[] args) throws InterruptedException {
// 对比HashMap和ConcurrentHashMap的线程安全性
testThreadSafety();
// ConcurrentHashMap的特殊方法
demonstrateSpecialMethods();
}
public static void testThreadSafety() throws InterruptedException {
System.out.println("=== 线程安全性测试 ===");
// 使用HashMap(线程不安全)
Map<String, Integer> hashMap = new HashMap<>();
// 使用ConcurrentHashMap(线程安全)
Map<String, Integer> concurrentMap = new ConcurrentHashMap<>();
int threadCount = 10;
int operationsPerThread = 1000;
// 测试HashMap
Thread[] hashMapThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
hashMapThreads[i] = new Thread(() -> {
for (int j = 0; j < operationsPerThread; j++) {
String key = "key" + (j % 100);
hashMap.put(key, hashMap.getOrDefault(key, 0) + 1);
}
});
}
long startTime = System.currentTimeMillis();
for (Thread thread : hashMapThreads) {
thread.start();
}
for (Thread thread : hashMapThreads) {
thread.join();
}
long hashMapTime = System.currentTimeMillis() - startTime;
// 测试ConcurrentHashMap
Thread[] concurrentMapThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
concurrentMapThreads[i] = new Thread(() -> {
for (int j = 0; j < operationsPerThread; j++) {
String key = "key" + (j % 100);
concurrentMap.put(key, concurrentMap.getOrDefault(key, 0) + 1);
}
});
}
startTime = System.currentTimeMillis();
for (Thread thread : concurrentMapThreads) {
thread.start();
}
for (Thread thread : concurrentMapThreads) {
thread.join();
}
long concurrentMapTime = System.currentTimeMillis() - startTime;
System.out.println("HashMap大小: " + hashMap.size() + ", 耗时: " + hashMapTime + "ms");
System.out.println("ConcurrentHashMap大小: " + concurrentMap.size() + ", 耗时: " + concurrentMapTime + "ms");
// 验证数据一致性
int expectedSum = threadCount * operationsPerThread;
int hashMapSum = hashMap.values().stream().mapToInt(Integer::intValue).sum();
int concurrentMapSum = concurrentMap.values().stream().mapToInt(Integer::intValue).sum();
System.out.println("期望总和: " + expectedSum);
System.out.println("HashMap实际总和: " + hashMapSum + " (" +
(hashMapSum == expectedSum ? "正确" : "错误") + ")");
System.out.println("ConcurrentHashMap实际总和: " + concurrentMapSum + " (" +
(concurrentMapSum == expectedSum ? "正确" : "错误") + ")");
}
public static void demonstrateSpecialMethods() {
System.out.println("\n=== ConcurrentHashMap特殊方法演示 ===");
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// putIfAbsent
map.putIfAbsent("count", 0);
map.putIfAbsent("count", 10); // 不会覆盖
System.out.println("putIfAbsent结果: " + map.get("count"));
// replace
map.replace("count", 0, 1); // 原子性的条件替换
System.out.println("replace结果: " + map.get("count"));
// compute
map.compute("count", (key, value) -> value == null ? 1 : value + 1);
System.out.println("compute结果: " + map.get("count"));
// computeIfAbsent
map.computeIfAbsent("newKey", key -> key.length());
System.out.println("computeIfAbsent结果: " + map.get("newKey"));
// computeIfPresent
map.computeIfPresent("count", (key, value) -> value * 2);
System.out.println("computeIfPresent结果: " + map.get("count"));
// merge
map.merge("count", 5, Integer::sum);
System.out.println("merge结果: " + map.get("count"));
System.out.println("最终map内容: " + map);
}
}
11.6.2 其他并发集合
import java.util.concurrent.*;
import java.util.*;
// 其他并发集合演示
public class ConcurrentCollectionsDemo {
// CopyOnWriteArrayList演示
public static void copyOnWriteArrayListDemo() throws InterruptedException {
System.out.println("=== CopyOnWriteArrayList演示 ===");
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
// 添加初始数据
list.add("Item1");
list.add("Item2");
list.add("Item3");
// 读线程
Thread readerThread = new Thread(() -> {
for (int i = 0; i < 10; i++) {
System.out.println("读取: " + list);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Reader");
// 写线程
Thread writerThread = new Thread(() -> {
for (int i = 4; i <= 8; i++) {
list.add("Item" + i);
System.out.println("添加: Item" + i);
try {
Thread.sleep(800);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Writer");
readerThread.start();
writerThread.start();
readerThread.join();
writerThread.join();
System.out.println("最终列表: " + list);
}
// BlockingQueue演示
public static void blockingQueueDemo() throws InterruptedException {
System.out.println("\n=== BlockingQueue演示 ===");
BlockingQueue<String> queue = new ArrayBlockingQueue<>(3);
// 生产者
Thread producer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String item = "Product" + i;
queue.put(item); // 阻塞式添加
System.out.println("生产: " + item + ", 队列大小: " + queue.size());
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
// 消费者
Thread consumer = new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String item = queue.take(); // 阻塞式获取
System.out.println("消费: " + item + ", 队列大小: " + queue.size());
Thread.sleep(1500);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");
producer.start();
consumer.start();
producer.join();
consumer.join();
}
// ConcurrentLinkedQueue演示
public static void concurrentLinkedQueueDemo() throws InterruptedException {
System.out.println("\n=== ConcurrentLinkedQueue演示 ===");
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 多个生产者
Thread[] producers = new Thread[3];
for (int i = 0; i < 3; i++) {
final int producerId = i;
producers[i] = new Thread(() -> {
for (int j = 1; j <= 5; j++) {
int value = producerId * 10 + j;
queue.offer(value);
System.out.println("生产者" + producerId + " 添加: " + value);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Producer-" + i);
}
// 多个消费者
Thread[] consumers = new Thread[2];
for (int i = 0; i < 2; i++) {
final int consumerId = i;
consumers[i] = new Thread(() -> {
for (int j = 0; j < 8; j++) {
Integer value = queue.poll();
if (value != null) {
System.out.println("消费者" + consumerId + " 获取: " + value);
} else {
System.out.println("消费者" + consumerId + " 队列为空");
j--; // 重试
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Consumer-" + i);
}
// 启动所有线程
for (Thread producer : producers) {
producer.start();
}
for (Thread consumer : consumers) {
consumer.start();
}
// 等待所有线程完成
for (Thread producer : producers) {
producer.join();
}
for (Thread consumer : consumers) {
consumer.join();
}
System.out.println("剩余队列元素: " + queue);
}
public static void main(String[] args) throws InterruptedException {
copyOnWriteArrayListDemo();
Thread.sleep(1000);
blockingQueueDemo();
Thread.sleep(1000);
concurrentLinkedQueueDemo();
}
}
11.7 原子类
11.7.1 基本原子类
import java.util.concurrent.atomic.*;
import java.util.concurrent.*;
// 原子类演示
public class AtomicDemo {
// 对比普通变量和原子变量
public static void compareAtomicAndNormal() throws InterruptedException {
System.out.println("=== 原子类与普通变量对比 ===");
// 普通变量
class NormalCounter {
private volatile int count = 0;
public void increment() {
count++; // 非原子操作
}
public int getCount() {
return count;
}
}
// 原子变量
class AtomicCounter {
private AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子操作
}
public int getCount() {
return count.get();
}
}
NormalCounter normalCounter = new NormalCounter();
AtomicCounter atomicCounter = new AtomicCounter();
int threadCount = 10;
int incrementsPerThread = 1000;
// 测试普通计数器
Thread[] normalThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
normalThreads[i] = new Thread(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
normalCounter.increment();
}
});
}
long startTime = System.currentTimeMillis();
for (Thread thread : normalThreads) {
thread.start();
}
for (Thread thread : normalThreads) {
thread.join();
}
long normalTime = System.currentTimeMillis() - startTime;
// 测试原子计数器
Thread[] atomicThreads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
atomicThreads[i] = new Thread(() -> {
for (int j = 0; j < incrementsPerThread; j++) {
atomicCounter.increment();
}
});
}
startTime = System.currentTimeMillis();
for (Thread thread : atomicThreads) {
thread.start();
}
for (Thread thread : atomicThreads) {
thread.join();
}
long atomicTime = System.currentTimeMillis() - startTime;
int expected = threadCount * incrementsPerThread;
System.out.println("期望结果: " + expected);
System.out.println("普通计数器: " + normalCounter.getCount() +
" (" + (normalCounter.getCount() == expected ? "正确" : "错误") +
"), 耗时: " + normalTime + "ms");
System.out.println("原子计数器: " + atomicCounter.getCount() +
" (" + (atomicCounter.getCount() == expected ? "正确" : "错误") +
"), 耗时: " + atomicTime + "ms");
}
// 原子类的各种操作
public static void demonstrateAtomicOperations() {
System.out.println("\n=== 原子类操作演示 ===");
AtomicInteger atomicInt = new AtomicInteger(10);
AtomicLong atomicLong = new AtomicLong(100L);
AtomicBoolean atomicBoolean = new AtomicBoolean(false);
AtomicReference<String> atomicRef = new AtomicReference<>("Hello");
// AtomicInteger操作
System.out.println("AtomicInteger初始值: " + atomicInt.get());
System.out.println("incrementAndGet: " + atomicInt.incrementAndGet());
System.out.println("getAndIncrement: " + atomicInt.getAndIncrement());
System.out.println("当前值: " + atomicInt.get());
System.out.println("addAndGet(5): " + atomicInt.addAndGet(5));
System.out.println("compareAndSet(17, 20): " + atomicInt.compareAndSet(17, 20));
System.out.println("当前值: " + atomicInt.get());
// AtomicBoolean操作
System.out.println("\nAtomicBoolean初始值: " + atomicBoolean.get());
System.out.println("compareAndSet(false, true): " + atomicBoolean.compareAndSet(false, true));
System.out.println("getAndSet(false): " + atomicBoolean.getAndSet(false));
System.out.println("当前值: " + atomicBoolean.get());
// AtomicReference操作
System.out.println("\nAtomicReference初始值: " + atomicRef.get());
System.out.println("compareAndSet(\"Hello\", \"World\"): " +
atomicRef.compareAndSet("Hello", "World"));
System.out.println("当前值: " + atomicRef.get());
// 使用updateAndGet
atomicInt.updateAndGet(value -> value * 2);
System.out.println("\nupdateAndGet(value -> value * 2): " + atomicInt.get());
// 使用accumulateAndGet
int result = atomicInt.accumulateAndGet(10, Integer::sum);
System.out.println("accumulateAndGet(10, Integer::sum): " + result);
}
public static void main(String[] args) throws InterruptedException {
compareAtomicAndNormal();
demonstrateAtomicOperations();
}
}
11.8 线程安全的最佳实践
11.8.1 线程安全策略
// 线程安全的最佳实践
public class ThreadSafetyBestPractices {
// 1. 不可变对象
public static final class ImmutablePerson {
private final String name;
private final int age;
private final List<String> hobbies;
public ImmutablePerson(String name, int age, List<String> hobbies) {
this.name = name;
this.age = age;
// 防御性复制
this.hobbies = Collections.unmodifiableList(new ArrayList<>(hobbies));
}
public String getName() { return name; }
public int getAge() { return age; }
public List<String> getHobbies() { return hobbies; }
@Override
public String toString() {
return "ImmutablePerson{name='" + name + "', age=" + age +
", hobbies=" + hobbies + "}";
}
}
// 2. 线程局部变量
private static final ThreadLocal<SimpleDateFormat> DATE_FORMAT =
ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
public static String formatDate(Date date) {
return DATE_FORMAT.get().format(date);
}
// 3. 同步工具类的使用
public static class SafeResourceManager {
private final Semaphore semaphore;
private final List<String> resources;
private final AtomicInteger usageCount = new AtomicInteger(0);
public SafeResourceManager(List<String> resources) {
this.resources = new ArrayList<>(resources);
this.semaphore = new Semaphore(resources.size());
}
public String acquireResource() throws InterruptedException {
semaphore.acquire();
synchronized (resources) {
if (!resources.isEmpty()) {
String resource = resources.remove(resources.size() - 1);
usageCount.incrementAndGet();
System.out.println(Thread.currentThread().getName() +
" 获取资源: " + resource);
return resource;
}
}
semaphore.release();
return null;
}
public void releaseResource(String resource) {
synchronized (resources) {
resources.add(resource);
System.out.println(Thread.currentThread().getName() +
" 释放资源: " + resource);
}
semaphore.release();
}
public int getUsageCount() {
return usageCount.get();
}
}
public static void main(String[] args) throws InterruptedException {
// 测试不可变对象
System.out.println("=== 不可变对象测试 ===");
List<String> hobbies = Arrays.asList("读书", "游泳", "编程");
ImmutablePerson person = new ImmutablePerson("张三", 25, hobbies);
System.out.println(person);
// 测试线程局部变量
System.out.println("\n=== ThreadLocal测试 ===");
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
String formattedDate = formatDate(new Date());
System.out.println(Thread.currentThread().getName() + ": " + formattedDate);
});
}
// 测试资源管理
System.out.println("\n=== 安全资源管理测试 ===");
List<String> resourceList = Arrays.asList("资源1", "资源2", "资源3");
SafeResourceManager manager = new SafeResourceManager(resourceList);
for (int i = 0; i < 5; i++) {
final int taskId = i;
executor.submit(() -> {
try {
String resource = manager.acquireResource();
if (resource != null) {
Thread.sleep(2000); // 模拟使用资源
manager.releaseResource(resource);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
Thread.sleep(8000);
System.out.println("总使用次数: " + manager.getUsageCount());
executor.shutdown();
}
}
11.9 本章小结
本章详细介绍了Java多线程编程的核心概念和实践技巧:
主要内容回顾
线程基础
- 线程与进程的区别
- 线程的生命周期和状态转换
- 三种创建线程的方式:继承Thread、实现Runnable、使用Callable
线程同步
- synchronized关键字的使用(同步方法和同步代码块)
- volatile关键字保证可见性
- Lock接口和ReentrantLock的高级功能
- ReadWriteLock读写分离锁
线程间通信
- wait()、notify()和notifyAll()方法
- Condition接口提供更灵活的等待/通知机制
- 生产者消费者模式的实现
并发工具类
- CountDownLatch:等待多个任务完成
- CyclicBarrier:多线程协同工作
- Semaphore:控制资源访问数量
线程池
- ExecutorService接口和各种线程池类型
- ThreadPoolExecutor的详细配置
- Future和CompletableFuture处理异步结果
并发集合
- ConcurrentHashMap:线程安全的哈希表
- CopyOnWriteArrayList:读多写少场景的列表
- BlockingQueue:阻塞队列的生产者消费者模式
原子类
- AtomicInteger、AtomicLong等基本原子类
- AtomicReference引用类型的原子操作
- CAS(Compare-And-Swap)操作原理
线程安全最佳实践
- 不可变对象设计
- ThreadLocal线程局部变量
- 合理使用同步工具
关键要点
- 线程安全:多线程环境下保证数据一致性的重要性
- 性能考虑:同步机制会带来性能开销,需要权衡
- 死锁预防:避免多个锁的循环等待
- 资源管理:正确关闭线程池和释放资源
- 异常处理:多线程环境下的异常传播和处理
实际应用场景
- Web服务器:处理并发请求
- 数据处理:并行计算和数据分析
- I/O操作:异步文件读写和网络通信
- 缓存系统:多线程访问共享缓存
- 任务调度:定时任务和后台处理
下一章预告
下一章我们将学习Java网络编程,包括: - Socket编程基础 - TCP和UDP通信 - NIO(非阻塞I/O) - 网络协议处理 - HTTP客户端和服务器开发
练习题
基础练习:实现一个线程安全的计数器,支持增加、减少和获取当前值操作。
生产者消费者:使用BlockingQueue实现一个文件处理系统,生产者读取文件列表,消费者处理文件内容。
线程池应用:创建一个图片处理服务,使用线程池并行处理多张图片的缩放操作。
并发工具:使用CountDownLatch和CyclicBarrier实现一个多阶段的数据处理流水线。
综合项目:设计一个简单的Web爬虫,使用多线程并发抓取网页内容,并将结果存储到线程安全的数据结构中。