本章概述
多线程和并发编程是现代应用程序开发的重要技能,特别是在需要处理大量数据、提高应用程序响应性和充分利用多核处理器的场景中。本章将深入探讨C#中的多线程和并发编程技术。
学习目标
- 理解线程的基本概念和生命周期
- 掌握Task和async/await异步编程模式
- 学习线程同步和并发控制技术
- 了解并行编程和PLINQ
- 掌握线程安全的集合和数据结构
- 学习并发设计模式和最佳实践
1. 线程基础
1.1 线程概念和生命周期
using System;
using System.Threading;
using System.Diagnostics;
public class ThreadBasicsDemo
{
public static void DemonstrateThreadBasics()
{
Console.WriteLine("=== 线程基础演示 ===");
// 获取当前线程信息
var currentThread = Thread.CurrentThread;
Console.WriteLine($"主线程ID: {currentThread.ManagedThreadId}");
Console.WriteLine($"主线程名称: {currentThread.Name ?? "未命名"}");
Console.WriteLine($"是否为后台线程: {currentThread.IsBackground}");
Console.WriteLine($"线程状态: {currentThread.ThreadState}");
Console.WriteLine($"处理器核心数: {Environment.ProcessorCount}");
Console.WriteLine("\n--- 创建和启动线程 ---");
// 方法1:使用Thread类
var thread1 = new Thread(WorkerMethod)
{
Name = "工作线程1",
IsBackground = false // 前台线程
};
var thread2 = new Thread(() => WorkerMethodWithParameter("线程2参数"))
{
Name = "工作线程2",
IsBackground = true // 后台线程
};
// 启动线程
thread1.Start();
thread2.Start();
Console.WriteLine("线程已启动");
// 等待线程完成
thread1.Join(); // 等待thread1完成
thread2.Join(2000); // 最多等待thread2 2秒
Console.WriteLine("主线程继续执行");
// 方法2:使用ThreadStart委托
Console.WriteLine("\n--- 使用ThreadStart委托 ---");
var threadStart = new ThreadStart(() =>
{
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"ThreadStart线程: {Thread.CurrentThread.ManagedThreadId}, 计数: {i}");
Thread.Sleep(500);
}
});
var thread3 = new Thread(threadStart) { Name = "ThreadStart线程" };
thread3.Start();
thread3.Join();
// 方法3:使用ParameterizedThreadStart
Console.WriteLine("\n--- 使用ParameterizedThreadStart ---");
var paramThread = new Thread(new ParameterizedThreadStart(ParameterizedWorker))
{
Name = "参数化线程"
};
paramThread.Start("Hello from parameterized thread!");
paramThread.Join();
}
private static void WorkerMethod()
{
var thread = Thread.CurrentThread;
Console.WriteLine($"[{thread.Name}] 开始执行,线程ID: {thread.ManagedThreadId}");
for (int i = 0; i < 5; i++)
{
Console.WriteLine($"[{thread.Name}] 工作中... {i + 1}/5");
Thread.Sleep(300); // 模拟工作
}
Console.WriteLine($"[{thread.Name}] 执行完成");
}
private static void WorkerMethodWithParameter(string parameter)
{
var thread = Thread.CurrentThread;
Console.WriteLine($"[{thread.Name}] 接收到参数: {parameter}");
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"[{thread.Name}] 处理中... {i + 1}/3");
Thread.Sleep(400);
}
Console.WriteLine($"[{thread.Name}] 参数处理完成");
}
private static void ParameterizedWorker(object parameter)
{
var thread = Thread.CurrentThread;
var message = parameter as string;
Console.WriteLine($"[{thread.Name}] 收到消息: {message}");
for (int i = 0; i < 3; i++)
{
Console.WriteLine($"[{thread.Name}] 处理消息... {i + 1}/3");
Thread.Sleep(200);
}
Console.WriteLine($"[{thread.Name}] 消息处理完成");
}
}
1.2 线程池
public class ThreadPoolDemo
{
private static int _completedTasks = 0;
private static readonly object _lockObject = new object();
public static void DemonstrateThreadPool()
{
Console.WriteLine("\n=== 线程池演示 ===");
// 获取线程池信息
ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
ThreadPool.GetMinThreads(out int minWorkerThreads, out int minCompletionPortThreads);
ThreadPool.GetAvailableThreads(out int availableWorkerThreads, out int availableCompletionPortThreads);
Console.WriteLine($"最大工作线程数: {maxWorkerThreads}");
Console.WriteLine($"最大I/O完成端口线程数: {maxCompletionPortThreads}");
Console.WriteLine($"最小工作线程数: {minWorkerThreads}");
Console.WriteLine($"最小I/O完成端口线程数: {minCompletionPortThreads}");
Console.WriteLine($"可用工作线程数: {availableWorkerThreads}");
Console.WriteLine($"可用I/O完成端口线程数: {availableCompletionPortThreads}");
Console.WriteLine("\n--- 使用线程池执行任务 ---");
var stopwatch = Stopwatch.StartNew();
// 方法1:使用QueueUserWorkItem
for (int i = 0; i < 10; i++)
{
int taskId = i;
ThreadPool.QueueUserWorkItem(state =>
{
ThreadPoolWorker(taskId);
});
}
// 等待所有任务完成
while (_completedTasks < 10)
{
Thread.Sleep(100);
}
stopwatch.Stop();
Console.WriteLine($"\n所有线程池任务完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
// 重置计数器
_completedTasks = 0;
Console.WriteLine("\n--- 使用WaitCallback委托 ---");
var waitHandles = new ManualResetEvent[5];
for (int i = 0; i < 5; i++)
{
waitHandles[i] = new ManualResetEvent(false);
int taskId = i;
ThreadPool.QueueUserWorkItem(new WaitCallback(state =>
{
var data = (TaskData)state;
ThreadPoolWorkerWithCallback(data.TaskId, data.WaitHandle);
}), new TaskData { TaskId = taskId, WaitHandle = waitHandles[i] });
}
// 等待所有任务完成
WaitHandle.WaitAll(waitHandles);
Console.WriteLine("所有WaitCallback任务完成");
// 清理资源
foreach (var handle in waitHandles)
{
handle.Dispose();
}
}
private static void ThreadPoolWorker(int taskId)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
var isBackground = Thread.CurrentThread.IsBackground;
var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
Console.WriteLine($"任务 {taskId} 开始 - 线程ID: {threadId}, 后台线程: {isBackground}, 线程池线程: {isThreadPoolThread}");
// 模拟工作
Thread.Sleep(Random.Shared.Next(500, 1500));
Console.WriteLine($"任务 {taskId} 完成 - 线程ID: {threadId}");
lock (_lockObject)
{
_completedTasks++;
}
}
private static void ThreadPoolWorkerWithCallback(int taskId, ManualResetEvent waitHandle)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"回调任务 {taskId} 开始 - 线程ID: {threadId}");
// 模拟工作
Thread.Sleep(Random.Shared.Next(300, 800));
Console.WriteLine($"回调任务 {taskId} 完成 - 线程ID: {threadId}");
// 通知任务完成
waitHandle.Set();
}
private class TaskData
{
public int TaskId { get; set; }
public ManualResetEvent WaitHandle { get; set; }
}
}
2. Task和异步编程
2.1 Task基础
public class TaskBasicsDemo
{
public static async Task DemonstrateTaskBasics()
{
Console.WriteLine("\n=== Task基础演示 ===");
Console.WriteLine("\n--- 创建和启动Task ---");
// 方法1:使用Task.Run
var task1 = Task.Run(() =>
{
Console.WriteLine($"Task.Run - 线程ID: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000);
return "Task.Run 完成";
});
// 方法2:使用Task构造函数
var task2 = new Task(() =>
{
Console.WriteLine($"Task构造函数 - 线程ID: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(800);
});
task2.Start();
// 方法3:使用Task.Factory.StartNew
var task3 = Task.Factory.StartNew(() =>
{
Console.WriteLine($"Task.Factory.StartNew - 线程ID: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(600);
return 42;
});
Console.WriteLine("所有Task已启动");
// 等待Task完成
var result1 = await task1;
Console.WriteLine($"Task1 结果: {result1}");
await task2;
Console.WriteLine("Task2 完成");
var result3 = await task3;
Console.WriteLine($"Task3 结果: {result3}");
Console.WriteLine("\n--- Task状态监控 ---");
var monitorTask = Task.Run(async () =>
{
Console.WriteLine("长时间运行的任务开始");
for (int i = 0; i < 5; i++)
{
await Task.Delay(500);
Console.WriteLine($"进度: {(i + 1) * 20}%");
}
return "长时间任务完成";
});
// 监控Task状态
while (!monitorTask.IsCompleted)
{
Console.WriteLine($"任务状态: {monitorTask.Status}");
await Task.Delay(200);
}
var monitorResult = await monitorTask;
Console.WriteLine($"监控任务结果: {monitorResult}");
Console.WriteLine($"最终状态: {monitorTask.Status}");
}
public static async Task DemonstrateTaskContinuation()
{
Console.WriteLine("\n--- Task延续演示 ---");
var initialTask = Task.Run(() =>
{
Console.WriteLine("初始任务执行中...");
Thread.Sleep(1000);
return 10;
});
// 使用ContinueWith
var continuationTask = initialTask.ContinueWith(antecedent =>
{
Console.WriteLine($"延续任务接收到结果: {antecedent.Result}");
return antecedent.Result * 2;
});
var finalResult = await continuationTask;
Console.WriteLine($"最终结果: {finalResult}");
// 链式延续
Console.WriteLine("\n--- 链式延续 ---");
var chainResult = await Task.Run(() =>
{
Console.WriteLine("步骤1: 初始计算");
return 5;
})
.ContinueWith(task =>
{
Console.WriteLine($"步骤2: 接收 {task.Result},计算平方");
return task.Result * task.Result;
})
.ContinueWith(task =>
{
Console.WriteLine($"步骤3: 接收 {task.Result},加10");
return task.Result + 10;
});
Console.WriteLine($"链式计算结果: {chainResult}");
}
public static async Task DemonstrateTaskCombination()
{
Console.WriteLine("\n--- Task组合演示 ---");
// 创建多个任务
var tasks = new List<Task<int>>();
for (int i = 0; i < 5; i++)
{
int taskId = i;
var task = Task.Run(async () =>
{
var delay = Random.Shared.Next(500, 2000);
Console.WriteLine($"任务 {taskId} 开始,预计耗时 {delay}ms");
await Task.Delay(delay);
Console.WriteLine($"任务 {taskId} 完成");
return taskId * 10;
});
tasks.Add(task);
}
// Task.WhenAll - 等待所有任务完成
Console.WriteLine("\n使用 Task.WhenAll 等待所有任务完成...");
var stopwatch = Stopwatch.StartNew();
var allResults = await Task.WhenAll(tasks);
stopwatch.Stop();
Console.WriteLine($"所有任务完成,耗时: {stopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"结果: [{string.Join(", ", allResults)}]");
// Task.WhenAny - 等待任何一个任务完成
Console.WriteLine("\n--- Task.WhenAny演示 ---");
var newTasks = new List<Task<string>>();
for (int i = 0; i < 3; i++)
{
int taskId = i;
var task = Task.Run(async () =>
{
var delay = Random.Shared.Next(1000, 3000);
await Task.Delay(delay);
return $"任务{taskId}完成";
});
newTasks.Add(task);
}
var completedTask = await Task.WhenAny(newTasks);
var firstResult = await completedTask;
Console.WriteLine($"第一个完成的任务结果: {firstResult}");
// 等待剩余任务完成
var remainingTasks = newTasks.Where(t => t != completedTask);
await Task.WhenAll(remainingTasks);
Console.WriteLine("所有剩余任务也已完成");
}
}
2.2 async/await异步编程模式
public class AsyncAwaitDemo
{
public static async Task DemonstrateAsyncAwait()
{
Console.WriteLine("\n=== async/await演示 ===");
Console.WriteLine("\n--- 基础async/await ---");
// 异步方法调用
var result1 = await PerformAsyncOperation("操作1", 1000);
Console.WriteLine($"结果1: {result1}");
var result2 = await PerformAsyncOperation("操作2", 1500);
Console.WriteLine($"结果2: {result2}");
Console.WriteLine("\n--- 并发执行多个异步操作 ---");
var stopwatch = Stopwatch.StartNew();
// 串行执行
Console.WriteLine("串行执行:");
var serialStart = stopwatch.ElapsedMilliseconds;
var serialResult1 = await PerformAsyncOperation("串行操作1", 800);
var serialResult2 = await PerformAsyncOperation("串行操作2", 600);
var serialResult3 = await PerformAsyncOperation("串行操作3", 700);
var serialTime = stopwatch.ElapsedMilliseconds - serialStart;
Console.WriteLine($"串行执行完成,耗时: {serialTime}ms");
// 并发执行
Console.WriteLine("\n并发执行:");
var concurrentStart = stopwatch.ElapsedMilliseconds;
var task1 = PerformAsyncOperation("并发操作1", 800);
var task2 = PerformAsyncOperation("并发操作2", 600);
var task3 = PerformAsyncOperation("并发操作3", 700);
var concurrentResults = await Task.WhenAll(task1, task2, task3);
var concurrentTime = stopwatch.ElapsedMilliseconds - concurrentStart;
Console.WriteLine($"并发执行完成,耗时: {concurrentTime}ms");
Console.WriteLine($"性能提升: {(double)serialTime / concurrentTime:F2}x");
stopwatch.Stop();
}
private static async Task<string> PerformAsyncOperation(string operationName, int delayMs)
{
var threadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"[线程{threadId}] {operationName} 开始");
// 模拟异步I/O操作
await Task.Delay(delayMs);
var completionThreadId = Thread.CurrentThread.ManagedThreadId;
Console.WriteLine($"[线程{completionThreadId}] {operationName} 完成");
return $"{operationName}结果";
}
public static async Task DemonstrateAsyncExceptionHandling()
{
Console.WriteLine("\n--- 异步异常处理演示 ---");
// 单个异步操作的异常处理
try
{
var result = await PerformAsyncOperationWithException("正常操作", false);
Console.WriteLine($"正常结果: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"捕获异常: {ex.Message}");
}
try
{
var result = await PerformAsyncOperationWithException("异常操作", true);
Console.WriteLine($"异常结果: {result}");
}
catch (InvalidOperationException ex)
{
Console.WriteLine($"捕获特定异常: {ex.Message}");
}
catch (Exception ex)
{
Console.WriteLine($"捕获通用异常: {ex.Message}");
}
// 多个异步操作的异常处理
Console.WriteLine("\n--- 多个异步操作的异常处理 ---");
var tasks = new[]
{
PerformAsyncOperationWithException("任务1", false),
PerformAsyncOperationWithException("任务2", true),
PerformAsyncOperationWithException("任务3", false)
};
try
{
var results = await Task.WhenAll(tasks);
Console.WriteLine($"所有任务成功: {string.Join(", ", results)}");
}
catch (Exception ex)
{
Console.WriteLine($"Task.WhenAll 捕获异常: {ex.Message}");
// 检查各个任务的状态
for (int i = 0; i < tasks.Length; i++)
{
var task = tasks[i];
Console.WriteLine($"任务{i + 1} 状态: {task.Status}");
if (task.IsFaulted)
{
Console.WriteLine($" 异常: {task.Exception?.InnerException?.Message}");
}
else if (task.IsCompletedSuccessfully)
{
Console.WriteLine($" 结果: {task.Result}");
}
}
}
}
private static async Task<string> PerformAsyncOperationWithException(string operationName, bool shouldThrow)
{
Console.WriteLine($"{operationName} 开始");
await Task.Delay(500);
if (shouldThrow)
{
throw new InvalidOperationException($"{operationName} 发生异常");
}
Console.WriteLine($"{operationName} 完成");
return $"{operationName} 成功";
}
public static async Task DemonstrateAsyncEnumerable()
{
Console.WriteLine("\n--- 异步可枚举演示 ---");
await foreach (var item in GenerateAsyncSequence(5))
{
Console.WriteLine($"接收到项目: {item}");
}
Console.WriteLine("异步序列处理完成");
}
private static async IAsyncEnumerable<string> GenerateAsyncSequence(int count)
{
for (int i = 0; i < count; i++)
{
// 模拟异步数据生成
await Task.Delay(300);
yield return $"项目-{i + 1}";
}
}
public static async Task DemonstrateConfigureAwait()
{
Console.WriteLine("\n--- ConfigureAwait演示 ---");
// 在UI应用程序中,这很重要
// ConfigureAwait(false) 可以避免死锁并提高性能
Console.WriteLine("使用 ConfigureAwait(false):");
var result1 = await PerformAsyncOperation("ConfigureAwait(false)", 500).ConfigureAwait(false);
Console.WriteLine($"结果: {result1}");
Console.WriteLine("\n使用默认 ConfigureAwait(true):");
var result2 = await PerformAsyncOperation("默认ConfigureAwait", 500);
Console.WriteLine($"结果: {result2}");
// 在库代码中,通常建议使用 ConfigureAwait(false)
// 在应用程序代码中,通常使用默认的 ConfigureAwait(true)
}
}
### 2.3 取消令牌(CancellationToken)
```csharp
public class CancellationTokenDemo
{
public static async Task DemonstrateCancellationToken()
{
Console.WriteLine("\n=== 取消令牌演示 ===");
Console.WriteLine("\n--- 基础取消操作 ---");
using var cts = new CancellationTokenSource();
// 设置5秒后自动取消
cts.CancelAfter(TimeSpan.FromSeconds(5));
try
{
var result = await LongRunningOperation("长时间操作", 10000, cts.Token);
Console.WriteLine($"操作完成: {result}");
}
catch (OperationCanceledException)
{
Console.WriteLine("操作被取消");
}
Console.WriteLine("\n--- 手动取消操作 ---");
using var manualCts = new CancellationTokenSource();
// 启动长时间运行的任务
var longTask = LongRunningOperation("手动取消操作", 8000, manualCts.Token);
// 2秒后手动取消
await Task.Delay(2000);
Console.WriteLine("发送取消信号...");
manualCts.Cancel();
try
{
var result = await longTask;
Console.WriteLine($"操作完成: {result}");
}
catch (OperationCanceledException)
{
Console.WriteLine("操作被手动取消");
}
Console.WriteLine("\n--- 多个操作的协调取消 ---");
using var coordinatedCts = new CancellationTokenSource();
var tasks = new[]
{
CoordinatedOperation("操作A", 3000, coordinatedCts.Token),
CoordinatedOperation("操作B", 4000, coordinatedCts.Token),
CoordinatedOperation("操作C", 5000, coordinatedCts.Token)
};
// 3.5秒后取消所有操作
coordinatedCts.CancelAfter(3500);
var results = await Task.WhenAll(tasks.Select(async task =>
{
try
{
return await task;
}
catch (OperationCanceledException)
{
return "已取消";
}
}));
Console.WriteLine($"协调操作结果: [{string.Join(", ", results)}]");
}
private static async Task<string> LongRunningOperation(string operationName, int durationMs, CancellationToken cancellationToken)
{
Console.WriteLine($"{operationName} 开始,预计耗时 {durationMs}ms");
var progress = 0;
var stepDuration = durationMs / 10;
for (int i = 0; i < 10; i++)
{
// 检查取消请求
cancellationToken.ThrowIfCancellationRequested();
await Task.Delay(stepDuration, cancellationToken);
progress += 10;
Console.WriteLine($"{operationName} 进度: {progress}%");
}
Console.WriteLine($"{operationName} 完成");
return $"{operationName} 成功";
}
private static async Task<string> CoordinatedOperation(string operationName, int durationMs, CancellationToken cancellationToken)
{
Console.WriteLine($"{operationName} 开始");
try
{
await Task.Delay(durationMs, cancellationToken);
Console.WriteLine($"{operationName} 完成");
return $"{operationName} 成功";
}
catch (OperationCanceledException)
{
Console.WriteLine($"{operationName} 被取消");
throw;
}
}
public static async Task DemonstrateCancellationTokenCombination()
{
Console.WriteLine("\n--- 取消令牌组合演示 ---");
using var cts1 = new CancellationTokenSource();
using var cts2 = new CancellationTokenSource();
// 组合多个取消令牌
using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
// 设置不同的取消时间
cts1.CancelAfter(3000); // 3秒后取消
cts2.CancelAfter(5000); // 5秒后取消
try
{
var result = await LongRunningOperation("组合取消操作", 8000, combinedCts.Token);
Console.WriteLine($"操作完成: {result}");
}
catch (OperationCanceledException)
{
Console.WriteLine("操作被组合取消令牌取消");
Console.WriteLine($"CTS1 是否取消: {cts1.Token.IsCancellationRequested}");
Console.WriteLine($"CTS2 是否取消: {cts2.Token.IsCancellationRequested}");
}
}
public static async Task DemonstrateCancellationCallback()
{
Console.WriteLine("\n--- 取消回调演示 ---");
using var cts = new CancellationTokenSource();
// 注册取消回调
var registration = cts.Token.Register(() =>
{
Console.WriteLine("取消回调被调用 - 执行清理操作");
});
var registration2 = cts.Token.Register(state =>
{
var message = state as string;
Console.WriteLine($"带状态的取消回调: {message}");
}, "清理资源完成");
// 启动操作
var operationTask = Task.Run(async () =>
{
try
{
for (int i = 0; i < 10; i++)
{
cts.Token.ThrowIfCancellationRequested();
await Task.Delay(500, cts.Token);
Console.WriteLine($"操作进度: {(i + 1) * 10}%");
}
return "操作成功";
}
catch (OperationCanceledException)
{
Console.WriteLine("操作在回调演示中被取消");
throw;
}
});
// 2.5秒后取消
await Task.Delay(2500);
cts.Cancel();
try
{
var result = await operationTask;
Console.WriteLine($"结果: {result}");
}
catch (OperationCanceledException)
{
Console.WriteLine("确认操作被取消");
}
// 清理注册
registration.Dispose();
registration2.Dispose();
}
}
3. 线程同步和并发控制
3.1 锁机制
public class LockingDemo
{
private static readonly object _lockObject = new object();
private static int _sharedCounter = 0;
private static readonly ReaderWriterLockSlim _readerWriterLock = new ReaderWriterLockSlim();
private static readonly Dictionary<string, string> _sharedDictionary = new Dictionary<string, string>();
public static async Task DemonstrateLocking()
{
Console.WriteLine("\n=== 锁机制演示 ===");
Console.WriteLine("\n--- 不使用锁的竞态条件 ---");
await DemonstrateRaceCondition();
Console.WriteLine("\n--- 使用lock语句 ---");
await DemonstrateLockStatement();
Console.WriteLine("\n--- 使用Monitor类 ---");
await DemonstrateMonitor();
Console.WriteLine("\n--- 使用ReaderWriterLockSlim ---");
await DemonstrateReaderWriterLock();
}
private static async Task DemonstrateRaceCondition()
{
var unsafeCounter = 0;
var tasks = new List<Task>();
// 创建多个任务同时修改共享变量
for (int i = 0; i < 10; i++)
{
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 1000; j++)
{
unsafeCounter++; // 非线程安全操作
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"不安全计数器最终值: {unsafeCounter} (期望值: 10000)");
}
private static async Task DemonstrateLockStatement()
{
_sharedCounter = 0;
var tasks = new List<Task>();
// 使用lock语句保护共享资源
for (int i = 0; i < 10; i++)
{
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 1000; j++)
{
lock (_lockObject)
{
_sharedCounter++; // 线程安全操作
}
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"安全计数器最终值: {_sharedCounter} (期望值: 10000)");
}
private static async Task DemonstrateMonitor()
{
var monitorCounter = 0;
var monitorLock = new object();
var tasks = new List<Task>();
for (int i = 0; i < 5; i++)
{
int taskId = i;
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 500; j++)
{
bool lockTaken = false;
try
{
Monitor.Enter(monitorLock, ref lockTaken);
var oldValue = monitorCounter;
Thread.Sleep(1); // 模拟一些工作
monitorCounter = oldValue + 1;
if (j % 100 == 0)
{
Console.WriteLine($"任务{taskId} 更新计数器到: {monitorCounter}");
}
}
finally
{
if (lockTaken)
{
Monitor.Exit(monitorLock);
}
}
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"Monitor计数器最终值: {monitorCounter} (期望值: 2500)");
}
private static async Task DemonstrateReaderWriterLock()
{
_sharedDictionary.Clear();
var tasks = new List<Task>();
// 创建写入任务
for (int i = 0; i < 3; i++)
{
int writerId = i;
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 5; j++)
{
_readerWriterLock.EnterWriteLock();
try
{
var key = $"writer{writerId}_item{j}";
var value = $"value_{DateTime.Now.Ticks}";
_sharedDictionary[key] = value;
Console.WriteLine($"写入者{writerId} 写入: {key} = {value}");
Thread.Sleep(100); // 模拟写入工作
}
finally
{
_readerWriterLock.ExitWriteLock();
}
}
}));
}
// 创建读取任务
for (int i = 0; i < 5; i++)
{
int readerId = i;
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 10; j++)
{
_readerWriterLock.EnterReadLock();
try
{
var count = _sharedDictionary.Count;
Console.WriteLine($"读取者{readerId} 读取到 {count} 个项目");
if (count > 0)
{
var firstItem = _sharedDictionary.First();
Console.WriteLine($" 第一个项目: {firstItem.Key} = {firstItem.Value}");
}
Thread.Sleep(50); // 模拟读取工作
}
finally
{
_readerWriterLock.ExitReadLock();
}
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"最终字典包含 {_sharedDictionary.Count} 个项目");
}
}
### 3.2 信号量和事件
```csharp
public class SynchronizationPrimitivesDemo
{
private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(3, 3); // 最多3个并发
private static readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false);
private static readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
private static readonly CountdownEvent _countdownEvent = new CountdownEvent(5);
public static async Task DemonstrateSynchronizationPrimitives()
{
Console.WriteLine("\n=== 同步原语演示 ===");
Console.WriteLine("\n--- 信号量(Semaphore)演示 ---");
await DemonstrateSemaphore();
Console.WriteLine("\n--- ManualResetEvent演示 ---");
await DemonstrateManualResetEvent();
Console.WriteLine("\n--- AutoResetEvent演示 ---");
await DemonstrateAutoResetEvent();
Console.WriteLine("\n--- CountdownEvent演示 ---");
await DemonstrateCountdownEvent();
}
private static async Task DemonstrateSemaphore()
{
var tasks = new List<Task>();
// 创建8个任务,但信号量只允许3个并发
for (int i = 0; i < 8; i++)
{
int taskId = i;
tasks.Add(Task.Run(async () =>
{
Console.WriteLine($"任务{taskId} 等待进入信号量...");
await _semaphore.WaitAsync();
try
{
Console.WriteLine($"任务{taskId} 进入信号量,开始工作");
await Task.Delay(2000); // 模拟工作
Console.WriteLine($"任务{taskId} 完成工作,离开信号量");
}
finally
{
_semaphore.Release();
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine("所有信号量任务完成");
}
private static async Task DemonstrateManualResetEvent()
{
_manualResetEvent.Reset();
var tasks = new List<Task>();
// 创建多个等待任务
for (int i = 0; i < 5; i++)
{
int taskId = i;
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"任务{taskId} 等待ManualResetEvent信号...");
_manualResetEvent.Wait();
Console.WriteLine($"任务{taskId} 收到信号,继续执行");
}));
}
// 等待2秒后发送信号
await Task.Delay(2000);
Console.WriteLine("发送ManualResetEvent信号...");
_manualResetEvent.Set();
await Task.WhenAll(tasks);
Console.WriteLine("所有ManualResetEvent任务完成");
}
private static async Task DemonstrateAutoResetEvent()
{
var tasks = new List<Task>();
// 创建多个等待任务
for (int i = 0; i < 5; i++)
{
int taskId = i;
tasks.Add(Task.Run(() =>
{
Console.WriteLine($"任务{taskId} 等待AutoResetEvent信号...");
_autoResetEvent.WaitOne();
Console.WriteLine($"任务{taskId} 收到信号,继续执行");
}));
}
// 每秒发送一个信号
for (int i = 0; i < 5; i++)
{
await Task.Delay(1000);
Console.WriteLine($"发送第{i + 1}个AutoResetEvent信号...");
_autoResetEvent.Set();
}
await Task.WhenAll(tasks);
Console.WriteLine("所有AutoResetEvent任务完成");
}
private static async Task DemonstrateCountdownEvent()
{
_countdownEvent.Reset(5);
var workerTasks = new List<Task>();
// 创建工作任务
for (int i = 0; i < 5; i++)
{
int workerId = i;
workerTasks.Add(Task.Run(async () =>
{
Console.WriteLine($"工作者{workerId} 开始工作...");
await Task.Delay(Random.Shared.Next(1000, 3000)); // 随机工作时间
Console.WriteLine($"工作者{workerId} 完成工作");
_countdownEvent.Signal();
}));
}
// 等待所有工作完成
var waitTask = Task.Run(() =>
{
Console.WriteLine("主线程等待所有工作者完成...");
_countdownEvent.Wait();
Console.WriteLine("所有工作者已完成,主线程继续");
});
await Task.WhenAll(workerTasks.Concat(new[] { waitTask }));
Console.WriteLine("CountdownEvent演示完成");
}
}
### 3.3 原子操作
```csharp
public class AtomicOperationsDemo
{
private static long _atomicCounter = 0;
private static int _interlocked = 0;
public static async Task DemonstrateAtomicOperations()
{
Console.WriteLine("\n=== 原子操作演示 ===");
Console.WriteLine("\n--- Interlocked类演示 ---");
await DemonstrateInterlocked();
Console.WriteLine("\n--- 原子操作性能对比 ---");
await CompareAtomicPerformance();
Console.WriteLine("\n--- 复杂原子操作 ---");
await DemonstrateComplexAtomicOperations();
}
private static async Task DemonstrateInterlocked()
{
_atomicCounter = 0;
var tasks = new List<Task>();
// 使用Interlocked进行原子操作
for (int i = 0; i < 10; i++)
{
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 1000; j++)
{
Interlocked.Increment(ref _atomicCounter);
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"Interlocked计数器最终值: {_atomicCounter} (期望值: 10000)");
// 演示其他Interlocked操作
var value = 100L;
Console.WriteLine($"\n原始值: {value}");
var exchanged = Interlocked.Exchange(ref value, 200);
Console.WriteLine($"Exchange操作 - 旧值: {exchanged}, 新值: {value}");
var compared = Interlocked.CompareExchange(ref value, 300, 200);
Console.WriteLine($"CompareExchange操作 - 比较值: {compared}, 当前值: {value}");
var added = Interlocked.Add(ref value, 50);
Console.WriteLine($"Add操作 - 结果: {added}");
}
private static async Task CompareAtomicPerformance()
{
const int iterations = 1000000;
const int taskCount = 4;
// 测试lock性能
var lockObject = new object();
var lockCounter = 0;
var lockStopwatch = Stopwatch.StartNew();
var lockTasks = Enumerable.Range(0, taskCount).Select(_ => Task.Run(() =>
{
for (int i = 0; i < iterations / taskCount; i++)
{
lock (lockObject)
{
lockCounter++;
}
}
}));
await Task.WhenAll(lockTasks);
lockStopwatch.Stop();
// 测试Interlocked性能
_interlocked = 0;
var interlockedStopwatch = Stopwatch.StartNew();
var interlockedTasks = Enumerable.Range(0, taskCount).Select(_ => Task.Run(() =>
{
for (int i = 0; i < iterations / taskCount; i++)
{
Interlocked.Increment(ref _interlocked);
}
}));
await Task.WhenAll(interlockedTasks);
interlockedStopwatch.Stop();
Console.WriteLine($"Lock方式 - 结果: {lockCounter}, 耗时: {lockStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"Interlocked方式 - 结果: {_interlocked}, 耗时: {interlockedStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"性能提升: {(double)lockStopwatch.ElapsedMilliseconds / interlockedStopwatch.ElapsedMilliseconds:F2}x");
}
private static async Task DemonstrateComplexAtomicOperations()
{
var atomicReference = new AtomicReference<string>("初始值");
var tasks = new List<Task>();
// 多个任务尝试更新引用
for (int i = 0; i < 5; i++)
{
int taskId = i;
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 3; j++)
{
var newValue = $"任务{taskId}_更新{j}";
var oldValue = atomicReference.CompareExchange(newValue, atomicReference.Value);
Console.WriteLine($"任务{taskId} 尝试更新: {oldValue} -> {newValue}");
Thread.Sleep(100);
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"最终原子引用值: {atomicReference.Value}");
}
}
// 简单的原子引用实现
public class AtomicReference<T> where T : class
{
private volatile T _value;
public AtomicReference(T initialValue)
{
_value = initialValue;
}
public T Value => _value;
public T CompareExchange(T newValue, T comparand)
{
return Interlocked.CompareExchange(ref _value, newValue, comparand);
}
public T Exchange(T newValue)
{
return Interlocked.Exchange(ref _value, newValue);
}
}
4. 并行编程和PLINQ
4.1 Parallel类
public class ParallelDemo
{
public static void DemonstrateParallelOperations()
{
Console.WriteLine("\n=== 并行编程演示 ===");
Console.WriteLine("\n--- Parallel.For演示 ---");
DemonstrateParallelFor();
Console.WriteLine("\n--- Parallel.ForEach演示 ---");
DemonstrateParallelForEach();
Console.WriteLine("\n--- Parallel.Invoke演示 ---");
DemonstrateParallelInvoke();
Console.WriteLine("\n--- 并行性能对比 ---");
CompareParallelPerformance();
}
private static void DemonstrateParallelFor()
{
const int count = 10;
var results = new int[count];
Console.WriteLine("串行For循环:");
var serialStopwatch = Stopwatch.StartNew();
for (int i = 0; i < count; i++)
{
results[i] = ExpensiveComputation(i);
Console.WriteLine($"串行处理项目 {i}: {results[i]}");
}
serialStopwatch.Stop();
Console.WriteLine($"串行执行耗时: {serialStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine("\n并行For循环:");
Array.Clear(results, 0, results.Length);
var parallelStopwatch = Stopwatch.StartNew();
Parallel.For(0, count, i =>
{
results[i] = ExpensiveComputation(i);
Console.WriteLine($"并行处理项目 {i}: {results[i]} [线程{Thread.CurrentThread.ManagedThreadId}]");
});
parallelStopwatch.Stop();
Console.WriteLine($"并行执行耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"性能提升: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
}
private static void DemonstrateParallelForEach()
{
var items = Enumerable.Range(1, 8).Select(i => $"项目{i}").ToList();
var results = new ConcurrentBag<string>();
Console.WriteLine("串行ForEach:");
var serialStopwatch = Stopwatch.StartNew();
foreach (var item in items)
{
var result = ProcessItem(item);
Console.WriteLine($"串行处理: {result}");
}
serialStopwatch.Stop();
Console.WriteLine("\n并行ForEach:");
var parallelStopwatch = Stopwatch.StartNew();
Parallel.ForEach(items, item =>
{
var result = ProcessItem(item);
results.Add(result);
Console.WriteLine($"并行处理: {result} [线程{Thread.CurrentThread.ManagedThreadId}]");
});
parallelStopwatch.Stop();
Console.WriteLine($"串行耗时: {serialStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"并行耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"并行结果数量: {results.Count}");
}
private static void DemonstrateParallelInvoke()
{
Console.WriteLine("串行调用多个方法:");
var serialStopwatch = Stopwatch.StartNew();
Method1();
Method2();
Method3();
serialStopwatch.Stop();
Console.WriteLine("\n并行调用多个方法:");
var parallelStopwatch = Stopwatch.StartNew();
Parallel.Invoke(
Method1,
Method2,
Method3
);
parallelStopwatch.Stop();
Console.WriteLine($"串行调用耗时: {serialStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"并行调用耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
}
private static void CompareParallelPerformance()
{
const int dataSize = 1000000;
var data = Enumerable.Range(1, dataSize).ToArray();
// 串行计算
var serialStopwatch = Stopwatch.StartNew();
var serialSum = data.Sum(x => Math.Sqrt(x));
serialStopwatch.Stop();
// 并行计算
var parallelStopwatch = Stopwatch.StartNew();
var parallelSum = data.AsParallel().Sum(x => Math.Sqrt(x));
parallelStopwatch.Stop();
Console.WriteLine($"数据大小: {dataSize:N0}");
Console.WriteLine($"串行计算结果: {serialSum:F2}, 耗时: {serialStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"并行计算结果: {parallelSum:F2}, 耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"性能提升: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
Console.WriteLine($"处理器核心数: {Environment.ProcessorCount}");
}
private static int ExpensiveComputation(int input)
{
Thread.Sleep(200); // 模拟耗时计算
return input * input;
}
private static string ProcessItem(string item)
{
Thread.Sleep(300); // 模拟处理时间
return $"{item}_已处理";
}
private static void Method1()
{
Console.WriteLine($"方法1开始 [线程{Thread.CurrentThread.ManagedThreadId}]");
Thread.Sleep(1000);
Console.WriteLine($"方法1完成 [线程{Thread.CurrentThread.ManagedThreadId}]");
}
private static void Method2()
{
Console.WriteLine($"方法2开始 [线程{Thread.CurrentThread.ManagedThreadId}]");
Thread.Sleep(800);
Console.WriteLine($"方法2完成 [线程{Thread.CurrentThread.ManagedThreadId}]");
}
private static void Method3()
{
Console.WriteLine($"方法3开始 [线程{Thread.CurrentThread.ManagedThreadId}]");
Thread.Sleep(600);
Console.WriteLine($"方法3完成 [线程{Thread.CurrentThread.ManagedThreadId}]");
}
}
### 4.2 PLINQ (Parallel LINQ)
```csharp
public class PLINQDemo
{
public static void DemonstratePLINQ()
{
Console.WriteLine("\n=== PLINQ演示 ===");
Console.WriteLine("\n--- 基础PLINQ操作 ---");
DemonstrateBasicPLINQ();
Console.WriteLine("\n--- PLINQ性能对比 ---");
ComparePLINQPerformance();
Console.WriteLine("\n--- PLINQ配置选项 ---");
DemonstratePLINQOptions();
Console.WriteLine("\n--- PLINQ异常处理 ---");
DemonstratePLINQExceptionHandling();
}
private static void DemonstrateBasicPLINQ()
{
var numbers = Enumerable.Range(1, 1000).ToList();
Console.WriteLine("串行LINQ查询:");
var serialStopwatch = Stopwatch.StartNew();
var serialResults = numbers
.Where(n => IsPrime(n))
.Select(n => new { Number = n, Square = n * n })
.OrderByDescending(x => x.Number)
.Take(10)
.ToList();
serialStopwatch.Stop();
Console.WriteLine("并行PLINQ查询:");
var parallelStopwatch = Stopwatch.StartNew();
var parallelResults = numbers
.AsParallel()
.Where(n => IsPrime(n))
.Select(n => new { Number = n, Square = n * n })
.OrderByDescending(x => x.Number)
.Take(10)
.ToList();
parallelStopwatch.Stop();
Console.WriteLine($"串行结果数量: {serialResults.Count}, 耗时: {serialStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"并行结果数量: {parallelResults.Count}, 耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine("\n前5个质数及其平方:");
foreach (var result in parallelResults.Take(5))
{
Console.WriteLine($"质数: {result.Number}, 平方: {result.Square}");
}
}
private static void ComparePLINQPerformance()
{
var largeDataSet = Enumerable.Range(1, 10000000).ToArray();
// 串行聚合操作
var serialStopwatch = Stopwatch.StartNew();
var serialSum = largeDataSet
.Where(x => x % 2 == 0)
.Select(x => Math.Sqrt(x))
.Sum();
serialStopwatch.Stop();
// 并行聚合操作
var parallelStopwatch = Stopwatch.StartNew();
var parallelSum = largeDataSet
.AsParallel()
.Where(x => x % 2 == 0)
.Select(x => Math.Sqrt(x))
.Sum();
parallelStopwatch.Stop();
Console.WriteLine($"数据集大小: {largeDataSet.Length:N0}");
Console.WriteLine($"串行处理: 结果={serialSum:F2}, 耗时={serialStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"并行处理: 结果={parallelSum:F2}, 耗时={parallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"性能提升: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
}
private static void DemonstratePLINQOptions()
{
var data = Enumerable.Range(1, 1000).ToArray();
Console.WriteLine("\n--- 并行度控制 ---");
// 限制并行度
var limitedParallelStopwatch = Stopwatch.StartNew();
var limitedResults = data
.AsParallel()
.WithDegreeOfParallelism(2) // 限制为2个线程
.Where(n => IsPrime(n))
.ToList();
limitedParallelStopwatch.Stop();
// 默认并行度
var defaultParallelStopwatch = Stopwatch.StartNew();
var defaultResults = data
.AsParallel()
.Where(n => IsPrime(n))
.ToList();
defaultParallelStopwatch.Stop();
Console.WriteLine($"限制并行度(2): 结果数={limitedResults.Count}, 耗时={limitedParallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($"默认并行度: 结果数={defaultResults.Count}, 耗时={defaultParallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine("\n--- 执行模式控制 ---");
// 强制并行执行
var forcedParallelStopwatch = Stopwatch.StartNew();
var forcedResults = data
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Take(10)
.ToList();
forcedParallelStopwatch.Stop();
Console.WriteLine($"强制并行: 结果数={forcedResults.Count}, 耗时={forcedParallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine("\n--- 结果排序控制 ---");
// 保持顺序
var orderedResults = data
.AsParallel()
.AsOrdered()
.Where(n => n % 100 == 0)
.ToList();
// 不保持顺序
var unorderedResults = data
.AsParallel()
.Where(n => n % 100 == 0)
.ToList();
Console.WriteLine($"保持顺序: [{string.Join(", ", orderedResults)}]");
Console.WriteLine($"不保持顺序: [{string.Join(", ", unorderedResults)}]");
}
private static void DemonstratePLINQExceptionHandling()
{
var data = Enumerable.Range(1, 100).ToArray();
try
{
var results = data
.AsParallel()
.Select(x =>
{
if (x == 50) // 在某个特定值时抛出异常
{
throw new InvalidOperationException($"处理值 {x} 时发生错误");
}
return x * 2;
})
.ToList();
Console.WriteLine($"处理完成,结果数量: {results.Count}");
}
catch (AggregateException ex)
{
Console.WriteLine($"PLINQ捕获到聚合异常,包含 {ex.InnerExceptions.Count} 个内部异常:");
foreach (var innerEx in ex.InnerExceptions)
{
Console.WriteLine($" - {innerEx.GetType().Name}: {innerEx.Message}");
}
}
// 使用ForAll避免聚合异常
Console.WriteLine("\n使用ForAll处理:");
data
.AsParallel()
.Where(x => x <= 10)
.ForAll(x =>
{
Console.WriteLine($"处理值 {x} [线程{Thread.CurrentThread.ManagedThreadId}]");
});
}
private static bool IsPrime(int number)
{
if (number < 2) return false;
if (number == 2) return true;
if (number % 2 == 0) return false;
var sqrt = (int)Math.Sqrt(number);
for (int i = 3; i <= sqrt; i += 2)
{
if (number % i == 0) return false;
}
return true;
}
}
### 4.3 并行集合
```csharp
public class ConcurrentCollectionsDemo
{
public static async Task DemonstrateConcurrentCollections()
{
Console.WriteLine("\n=== 并发集合演示 ===");
Console.WriteLine("\n--- ConcurrentBag演示 ---");
await DemonstrateConcurrentBag();
Console.WriteLine("\n--- ConcurrentQueue演示 ---");
await DemonstrateConcurrentQueue();
Console.WriteLine("\n--- ConcurrentStack演示 ---");
await DemonstrateConcurrentStack();
Console.WriteLine("\n--- ConcurrentDictionary演示 ---");
await DemonstrateConcurrentDictionary();
Console.WriteLine("\n--- BlockingCollection演示 ---");
await DemonstrateBlockingCollection();
}
private static async Task DemonstrateConcurrentBag()
{
var concurrentBag = new ConcurrentBag<string>();
var tasks = new List<Task>();
// 多个任务并发添加项目
for (int i = 0; i < 5; i++)
{
int taskId = i;
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 3; j++)
{
var item = $"任务{taskId}_项目{j}";
concurrentBag.Add(item);
Console.WriteLine($"添加: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
Thread.Sleep(100);
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"ConcurrentBag总数: {concurrentBag.Count}");
Console.WriteLine($"所有项目: [{string.Join(", ", concurrentBag)}]");
// 尝试取出项目
while (concurrentBag.TryTake(out string item))
{
Console.WriteLine($"取出: {item}");
}
}
private static async Task DemonstrateConcurrentQueue()
{
var concurrentQueue = new ConcurrentQueue<int>();
var tasks = new List<Task>();
// 生产者任务
var producerTask = Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
concurrentQueue.Enqueue(i);
Console.WriteLine($"生产者入队: {i} [线程{Thread.CurrentThread.ManagedThreadId}]");
await Task.Delay(200);
}
});
// 消费者任务
var consumerTasks = Enumerable.Range(0, 3).Select(consumerId => Task.Run(async () =>
{
while (!producerTask.IsCompleted || !concurrentQueue.IsEmpty)
{
if (concurrentQueue.TryDequeue(out int item))
{
Console.WriteLine($"消费者{consumerId}出队: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
}
await Task.Delay(300);
}
}));
await Task.WhenAll(new[] { producerTask }.Concat(consumerTasks));
Console.WriteLine($"队列剩余项目数: {concurrentQueue.Count}");
}
private static async Task DemonstrateConcurrentStack()
{
var concurrentStack = new ConcurrentStack<string>();
var tasks = new List<Task>();
// 推入项目
for (int i = 0; i < 3; i++)
{
int taskId = i;
tasks.Add(Task.Run(() =>
{
for (int j = 0; j < 4; j++)
{
var item = $"栈项目{taskId}_{j}";
concurrentStack.Push(item);
Console.WriteLine($"推入: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
Thread.Sleep(150);
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"\n栈中项目数: {concurrentStack.Count}");
// 弹出项目
while (concurrentStack.TryPop(out string item))
{
Console.WriteLine($"弹出: {item}");
}
}
private static async Task DemonstrateConcurrentDictionary()
{
var concurrentDict = new ConcurrentDictionary<string, int>();
var tasks = new List<Task>();
// 多个任务并发操作字典
for (int i = 0; i < 5; i++)
{
int taskId = i;
tasks.Add(Task.Run(() =>
{
// 添加或更新
var key = $"键{taskId}";
var value = concurrentDict.AddOrUpdate(key, 1, (k, v) => v + 1);
Console.WriteLine($"任务{taskId} 添加/更新: {key} = {value} [线程{Thread.CurrentThread.ManagedThreadId}]");
// 尝试获取
if (concurrentDict.TryGetValue(key, out int currentValue))
{
Console.WriteLine($"任务{taskId} 获取: {key} = {currentValue}");
}
Thread.Sleep(100);
}));
}
await Task.WhenAll(tasks);
Console.WriteLine($"\n字典最终状态:");
foreach (var kvp in concurrentDict)
{
Console.WriteLine($" {kvp.Key}: {kvp.Value}");
}
// 演示GetOrAdd
var computedValue = concurrentDict.GetOrAdd("计算键", key =>
{
Console.WriteLine($"为键 '{key}' 计算值");
return key.Length * 10;
});
Console.WriteLine($"GetOrAdd结果: 计算键 = {computedValue}");
}
private static async Task DemonstrateBlockingCollection()
{
using var blockingCollection = new BlockingCollection<string>(5); // 容量限制为5
// 生产者任务
var producerTask = Task.Run(async () =>
{
try
{
for (int i = 0; i < 10; i++)
{
var item = $"项目{i}";
blockingCollection.Add(item);
Console.WriteLine($"生产者添加: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
await Task.Delay(200);
}
}
finally
{
blockingCollection.CompleteAdding();
Console.WriteLine("生产者完成添加");
}
});
// 消费者任务
var consumerTasks = Enumerable.Range(0, 2).Select(consumerId => Task.Run(() =>
{
try
{
foreach (var item in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine($"消费者{consumerId}处理: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
Thread.Sleep(400); // 模拟处理时间
}
}
catch (InvalidOperationException)
{
Console.WriteLine($"消费者{consumerId}检测到集合已完成");
}
}));
await Task.WhenAll(new[] { producerTask }.Concat(consumerTasks));
Console.WriteLine("BlockingCollection演示完成");
}
}
5. 并发设计模式
5.1 生产者-消费者模式
public class ProducerConsumerPattern
{
public static async Task DemonstrateProducerConsumer()
{
Console.WriteLine("\n=== 生产者-消费者模式演示 ===");
Console.WriteLine("\n--- 基础生产者-消费者 ---");
await DemonstrateBasicProducerConsumer();
Console.WriteLine("\n--- 多生产者-多消费者 ---");
await DemonstrateMultipleProducersConsumers();
Console.WriteLine("\n--- 优先级队列 ---");
await DemonstratePriorityQueue();
}
private static async Task DemonstrateBasicProducerConsumer()
{
var channel = Channel.CreateUnbounded<WorkItem>();
var writer = channel.Writer;
var reader = channel.Reader;
// 生产者
var producer = Task.Run(async () =>
{
for (int i = 1; i <= 10; i++)
{
var workItem = new WorkItem { Id = i, Data = $"数据{i}" };
await writer.WriteAsync(workItem);
Console.WriteLine($"生产者创建: {workItem}");
await Task.Delay(500);
}
writer.Complete();
Console.WriteLine("生产者完成");
});
// 消费者
var consumer = Task.Run(async () =>
{
await foreach (var workItem in reader.ReadAllAsync())
{
Console.WriteLine($"消费者处理: {workItem}");
await Task.Delay(800); // 模拟处理时间
}
Console.WriteLine("消费者完成");
});
await Task.WhenAll(producer, consumer);
}
private static async Task DemonstrateMultipleProducersConsumers()
{
var channel = Channel.CreateBounded<WorkItem>(5);
var writer = channel.Writer;
var reader = channel.Reader;
// 多个生产者
var producers = Enumerable.Range(1, 3).Select(producerId => Task.Run(async () =>
{
for (int i = 1; i <= 5; i++)
{
var workItem = new WorkItem
{
Id = producerId * 100 + i,
Data = $"生产者{producerId}_数据{i}"
};
await writer.WriteAsync(workItem);
Console.WriteLine($"生产者{producerId}创建: {workItem}");
await Task.Delay(Random.Shared.Next(200, 600));
}
Console.WriteLine($"生产者{producerId}完成");
}));
// 多个消费者
var consumers = Enumerable.Range(1, 2).Select(consumerId => Task.Run(async () =>
{
await foreach (var workItem in reader.ReadAllAsync())
{
Console.WriteLine($"消费者{consumerId}处理: {workItem}");
await Task.Delay(Random.Shared.Next(400, 1000));
}
Console.WriteLine($"消费者{consumerId}完成");
}));
// 等待所有生产者完成,然后关闭写入器
_ = Task.Run(async () =>
{
await Task.WhenAll(producers);
writer.Complete();
Console.WriteLine("所有生产者完成,关闭通道");
});
await Task.WhenAll(consumers);
}
private static async Task DemonstratePriorityQueue()
{
var priorityQueue = new PriorityQueue<WorkItem, int>();
var semaphore = new SemaphoreSlim(0);
var isCompleted = false;
// 生产者(创建不同优先级的任务)
var producer = Task.Run(async () =>
{
var priorities = new[] { 1, 3, 2, 1, 3, 2, 1 };
for (int i = 0; i < priorities.Length; i++)
{
var workItem = new WorkItem
{
Id = i + 1,
Data = $"优先级{priorities[i]}_任务{i + 1}"
};
lock (priorityQueue)
{
priorityQueue.Enqueue(workItem, priorities[i]);
}
semaphore.Release();
Console.WriteLine($"添加任务: {workItem} (优先级: {priorities[i]})");
await Task.Delay(300);
}
isCompleted = true;
semaphore.Release(); // 释放信号量以唤醒消费者
Console.WriteLine("生产者完成");
});
// 消费者(按优先级处理任务)
var consumer = Task.Run(async () =>
{
while (!isCompleted || priorityQueue.Count > 0)
{
await semaphore.WaitAsync();
WorkItem workItem;
lock (priorityQueue)
{
if (priorityQueue.Count == 0) continue;
workItem = priorityQueue.Dequeue();
}
Console.WriteLine($"处理高优先级任务: {workItem}");
await Task.Delay(500);
}
Console.WriteLine("消费者完成");
});
await Task.WhenAll(producer, consumer);
}
}
public class WorkItem
{
public int Id { get; set; }
public string Data { get; set; }
public override string ToString() => $"WorkItem(Id={Id}, Data={Data})";
}
### 5.2 读写锁模式
```csharp
public class ReaderWriterPattern
{
private readonly ReaderWriterLockSlim _lock = new();
private readonly Dictionary<string, string> _cache = new();
private readonly Random _random = new();
public static async Task DemonstrateReaderWriterPattern()
{
Console.WriteLine("\n=== 读写锁模式演示 ===");
var pattern = new ReaderWriterPattern();
// 初始化一些数据
pattern.Write("key1", "value1");
pattern.Write("key2", "value2");
pattern.Write("key3", "value3");
var tasks = new List<Task>();
// 创建多个读取任务
for (int i = 0; i < 5; i++)
{
int readerId = i + 1;
tasks.Add(Task.Run(async () =>
{
for (int j = 0; j < 10; j++)
{
var key = $"key{pattern._random.Next(1, 4)}";
var value = pattern.Read(key);
Console.WriteLine($"读取者{readerId}: {key} = {value} [线程{Thread.CurrentThread.ManagedThreadId}]");
await Task.Delay(pattern._random.Next(100, 300));
}
}));
}
// 创建少量写入任务
for (int i = 0; i < 2; i++)
{
int writerId = i + 1;
tasks.Add(Task.Run(async () =>
{
for (int j = 0; j < 5; j++)
{
var key = $"key{pattern._random.Next(1, 6)}";
var value = $"writer{writerId}_value{j}";
pattern.Write(key, value);
Console.WriteLine($"写入者{writerId}: {key} = {value} [线程{Thread.CurrentThread.ManagedThreadId}]");
await Task.Delay(pattern._random.Next(500, 1000));
}
}));
}
await Task.WhenAll(tasks);
Console.WriteLine("\n最终缓存状态:");
foreach (var kvp in pattern._cache)
{
Console.WriteLine($" {kvp.Key}: {kvp.Value}");
}
}
public string Read(string key)
{
_lock.EnterReadLock();
try
{
Thread.Sleep(50); // 模拟读取时间
return _cache.TryGetValue(key, out var value) ? value : null;
}
finally
{
_lock.ExitReadLock();
}
}
public void Write(string key, string value)
{
_lock.EnterWriteLock();
try
{
Thread.Sleep(100); // 模拟写入时间
_cache[key] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
public bool TryUpgradeRead(string key, Func<string, string> updateFunc)
{
_lock.EnterUpgradeableReadLock();
try
{
if (_cache.TryGetValue(key, out var currentValue))
{
var newValue = updateFunc(currentValue);
_lock.EnterWriteLock();
try
{
_cache[key] = newValue;
Console.WriteLine($"升级锁更新: {key} = {newValue}");
return true;
}
finally
{
_lock.ExitWriteLock();
}
}
return false;
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
### 5.3 Actor模式
```csharp
public abstract class Actor
{
private readonly Channel<object> _mailbox;
private readonly ChannelWriter<object> _writer;
private readonly ChannelReader<object> _reader;
private readonly CancellationTokenSource _cancellationTokenSource;
private readonly Task _processingTask;
protected Actor()
{
_mailbox = Channel.CreateUnbounded<object>();
_writer = _mailbox.Writer;
_reader = _mailbox.Reader;
_cancellationTokenSource = new CancellationTokenSource();
_processingTask = Task.Run(ProcessMessages);
}
public async Task SendAsync(object message)
{
await _writer.WriteAsync(message);
}
public void Send(object message)
{
if (!_writer.TryWrite(message))
{
throw new InvalidOperationException("无法发送消息到Actor");
}
}
protected abstract Task HandleMessage(object message);
private async Task ProcessMessages()
{
try
{
await foreach (var message in _reader.ReadAllAsync(_cancellationTokenSource.Token))
{
try
{
await HandleMessage(message);
}
catch (Exception ex)
{
Console.WriteLine($"Actor处理消息时发生错误: {ex.Message}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Actor停止处理消息");
}
}
public async Task StopAsync()
{
_writer.Complete();
_cancellationTokenSource.Cancel();
await _processingTask;
_cancellationTokenSource.Dispose();
}
}
public class CounterActor : Actor
{
private int _count = 0;
private readonly string _name;
public CounterActor(string name)
{
_name = name;
}
protected override async Task HandleMessage(object message)
{
switch (message)
{
case "increment":
_count++;
Console.WriteLine($"{_name}: 计数器递增到 {_count} [线程{Thread.CurrentThread.ManagedThreadId}]");
break;
case "decrement":
_count--;
Console.WriteLine($"{_name}: 计数器递减到 {_count} [线程{Thread.CurrentThread.ManagedThreadId}]");
break;
case "get":
Console.WriteLine($"{_name}: 当前计数 {_count} [线程{Thread.CurrentThread.ManagedThreadId}]");
break;
case "reset":
_count = 0;
Console.WriteLine($"{_name}: 计数器重置 [线程{Thread.CurrentThread.ManagedThreadId}]");
break;
default:
Console.WriteLine($"{_name}: 未知消息类型: {message}");
break;
}
// 模拟处理时间
await Task.Delay(100);
}
}
public class ActorPatternDemo
{
public static async Task DemonstrateActorPattern()
{
Console.WriteLine("\n=== Actor模式演示 ===");
var actor1 = new CounterActor("Actor1");
var actor2 = new CounterActor("Actor2");
var tasks = new List<Task>();
// 向Actor1发送消息
tasks.Add(Task.Run(async () =>
{
for (int i = 0; i < 5; i++)
{
await actor1.SendAsync("increment");
await Task.Delay(200);
}
await actor1.SendAsync("get");
await actor1.SendAsync("reset");
await actor1.SendAsync("get");
}));
// 向Actor2发送消息
tasks.Add(Task.Run(async () =>
{
for (int i = 0; i < 3; i++)
{
await actor2.SendAsync("increment");
await actor2.SendAsync("increment");
await actor2.SendAsync("decrement");
await Task.Delay(300);
}
await actor2.SendAsync("get");
}));
// 并发向两个Actor发送消息
tasks.Add(Task.Run(async () =>
{
for (int i = 0; i < 10; i++)
{
if (i % 2 == 0)
{
actor1.Send("increment");
}
else
{
actor2.Send("increment");
}
await Task.Delay(150);
}
}));
await Task.WhenAll(tasks);
// 等待一段时间让所有消息处理完成
await Task.Delay(1000);
// 停止Actor
await actor1.StopAsync();
await actor2.StopAsync();
Console.WriteLine("Actor模式演示完成");
}
}
6. 最佳实践和性能优化
6.1 线程安全最佳实践
public class ThreadSafetyBestPractices
{
public static void DemonstrateBestPractices()
{
Console.WriteLine("\n=== 线程安全最佳实践 ===");
Console.WriteLine("\n--- 不可变对象 ---");
DemonstrateImmutableObjects();
Console.WriteLine("\n--- 线程本地存储 ---");
DemonstrateThreadLocalStorage();
Console.WriteLine("\n--- 锁的最佳实践 ---");
DemonstrateLockingBestPractices();
}
private static void DemonstrateImmutableObjects()
{
// 不可变对象天然线程安全
var immutableData = new ImmutableData("初始值", 42);
var tasks = Enumerable.Range(0, 5).Select(i => Task.Run(() =>
{
// 读取操作完全安全
Console.WriteLine($"任务{i}: {immutableData.Name} = {immutableData.Value} [线程{Thread.CurrentThread.ManagedThreadId}]");
// 修改操作返回新实例
var newData = immutableData.WithValue(immutableData.Value + i);
Console.WriteLine($"任务{i}: 新实例 {newData.Name} = {newData.Value}");
}));
Task.WaitAll(tasks.ToArray());
}
private static void DemonstrateThreadLocalStorage()
{
var threadLocalCounter = new ThreadLocal<int>(() => 0);
var tasks = Enumerable.Range(0, 5).Select(i => Task.Run(() =>
{
// 每个线程都有自己的计数器实例
for (int j = 0; j < 3; j++)
{
threadLocalCounter.Value++;
Console.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId}: 计数器 = {threadLocalCounter.Value}");
Thread.Sleep(100);
}
}));
Task.WaitAll(tasks.ToArray());
threadLocalCounter.Dispose();
}
private static void DemonstrateLockingBestPractices()
{
var goodLockingExample = new GoodLockingExample();
var tasks = Enumerable.Range(0, 10).Select(i => Task.Run(() =>
{
goodLockingExample.SafeOperation(i);
}));
Task.WaitAll(tasks.ToArray());
Console.WriteLine($"最终计数: {goodLockingExample.GetCount()}");
}
}
// 不可变数据类
public record ImmutableData(string Name, int Value)
{
public ImmutableData WithValue(int newValue) => this with { Value = newValue };
public ImmutableData WithName(string newName) => this with { Name = newName };
}
// 良好的锁使用示例
public class GoodLockingExample
{
private readonly object _lock = new object();
private int _count = 0;
private readonly List<string> _log = new List<string>();
public void SafeOperation(int operationId)
{
// 最小化锁的持有时间
string logEntry;
lock (_lock)
{
_count++;
logEntry = $"操作{operationId}: 计数 = {_count} [线程{Thread.CurrentThread.ManagedThreadId}]";
_log.Add(logEntry);
}
// 在锁外执行耗时操作
Console.WriteLine(logEntry);
Thread.Sleep(50); // 模拟耗时操作
}
public int GetCount()
{
lock (_lock)
{
return _count;
}
}
public List<string> GetLog()
{
lock (_lock)
{
return new List<string>(_log); // 返回副本
}
}
}
### 6.2 性能监控和调试
```csharp
public class PerformanceMonitoring
{
public static async Task DemonstratePerformanceMonitoring()
{
Console.WriteLine("\n=== 性能监控和调试 ===");
Console.WriteLine("\n--- 线程池监控 ---");
MonitorThreadPool();
Console.WriteLine("\n--- 任务性能测量 ---");
await MeasureTaskPerformance();
Console.WriteLine("\n--- 内存使用监控 ---");
await MonitorMemoryUsage();
}
private static void MonitorThreadPool()
{
ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);
ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
ThreadPool.GetMinThreads(out int minWorkerThreads, out int minCompletionPortThreads);
Console.WriteLine($"线程池状态:");
Console.WriteLine($" 可用工作线程: {workerThreads}/{maxWorkerThreads}");
Console.WriteLine($" 可用I/O线程: {completionPortThreads}/{maxCompletionPortThreads}");
Console.WriteLine($" 最小工作线程: {minWorkerThreads}");
Console.WriteLine($" 最小I/O线程: {minCompletionPortThreads}");
Console.WriteLine($" 处理器核心数: {Environment.ProcessorCount}");
}
private static async Task MeasureTaskPerformance()
{
const int taskCount = 100;
const int workDuration = 10;
// 测量串行执行
var serialStopwatch = Stopwatch.StartNew();
for (int i = 0; i < taskCount; i++)
{
await SimulateWork(workDuration);
}
serialStopwatch.Stop();
// 测量并行执行
var parallelStopwatch = Stopwatch.StartNew();
var parallelTasks = Enumerable.Range(0, taskCount)
.Select(_ => SimulateWork(workDuration));
await Task.WhenAll(parallelTasks);
parallelStopwatch.Stop();
// 测量限制并发度的并行执行
var semaphore = new SemaphoreSlim(Environment.ProcessorCount);
var limitedParallelStopwatch = Stopwatch.StartNew();
var limitedTasks = Enumerable.Range(0, taskCount)
.Select(async _ =>
{
await semaphore.WaitAsync();
try
{
await SimulateWork(workDuration);
}
finally
{
semaphore.Release();
}
});
await Task.WhenAll(limitedTasks);
limitedParallelStopwatch.Stop();
Console.WriteLine($"性能对比 ({taskCount}个任务, 每个{workDuration}ms):");
Console.WriteLine($" 串行执行: {serialStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($" 无限制并行: {parallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($" 限制并发度: {limitedParallelStopwatch.ElapsedMilliseconds}ms");
Console.WriteLine($" 并行加速比: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
}
private static async Task MonitorMemoryUsage()
{
var initialMemory = GC.GetTotalMemory(false);
Console.WriteLine($"初始内存使用: {initialMemory / 1024 / 1024:F2} MB");
// 创建大量任务来观察内存使用
var tasks = new List<Task>();
for (int i = 0; i < 1000; i++)
{
tasks.Add(Task.Run(async () =>
{
var data = new byte[1024]; // 1KB数据
await Task.Delay(100);
return data.Length;
}));
}
var peakMemory = GC.GetTotalMemory(false);
Console.WriteLine($"峰值内存使用: {peakMemory / 1024 / 1024:F2} MB");
Console.WriteLine($"内存增长: {(peakMemory - initialMemory) / 1024 / 1024:F2} MB");
await Task.WhenAll(tasks);
// 强制垃圾回收
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
var finalMemory = GC.GetTotalMemory(false);
Console.WriteLine($"垃圾回收后内存: {finalMemory / 1024 / 1024:F2} MB");
Console.WriteLine($"GC统计:");
Console.WriteLine($" Gen 0 回收次数: {GC.CollectionCount(0)}");
Console.WriteLine($" Gen 1 回收次数: {GC.CollectionCount(1)}");
Console.WriteLine($" Gen 2 回收次数: {GC.CollectionCount(2)}");
}
private static async Task<int> SimulateWork(int durationMs)
{
await Task.Delay(durationMs);
return Thread.CurrentThread.ManagedThreadId;
}
}
7. 实践练习
7.1 并发Web爬虫
public class ConcurrentWebCrawler
{
private readonly HttpClient _httpClient;
private readonly SemaphoreSlim _semaphore;
private readonly ConcurrentDictionary<string, bool> _visitedUrls;
private readonly ConcurrentBag<CrawlResult> _results;
public ConcurrentWebCrawler(int maxConcurrency = 5)
{
_httpClient = new HttpClient();
_semaphore = new SemaphoreSlim(maxConcurrency);
_visitedUrls = new ConcurrentDictionary<string, bool>();
_results = new ConcurrentBag<CrawlResult>();
}
public async Task<List<CrawlResult>> CrawlAsync(List<string> urls, CancellationToken cancellationToken = default)
{
Console.WriteLine($"开始爬取 {urls.Count} 个URL,最大并发数: {_semaphore.CurrentCount}");
var tasks = urls.Select(url => CrawlUrlAsync(url, cancellationToken));
await Task.WhenAll(tasks);
return _results.ToList();
}
private async Task CrawlUrlAsync(string url, CancellationToken cancellationToken)
{
if (!_visitedUrls.TryAdd(url, true))
{
Console.WriteLine($"URL已访问,跳过: {url}");
return;
}
await _semaphore.WaitAsync(cancellationToken);
try
{
var stopwatch = Stopwatch.StartNew();
Console.WriteLine($"开始爬取: {url} [线程{Thread.CurrentThread.ManagedThreadId}]");
var response = await _httpClient.GetAsync(url, cancellationToken);
var content = await response.Content.ReadAsStringAsync(cancellationToken);
stopwatch.Stop();
var result = new CrawlResult
{
Url = url,
StatusCode = response.StatusCode,
ContentLength = content.Length,
Duration = stopwatch.Elapsed,
ThreadId = Thread.CurrentThread.ManagedThreadId,
Timestamp = DateTime.Now
};
_results.Add(result);
Console.WriteLine($"完成爬取: {url} - {response.StatusCode} ({content.Length} 字符, {stopwatch.ElapsedMilliseconds}ms)");
}
catch (Exception ex)
{
var errorResult = new CrawlResult
{
Url = url,
Error = ex.Message,
ThreadId = Thread.CurrentThread.ManagedThreadId,
Timestamp = DateTime.Now
};
_results.Add(errorResult);
Console.WriteLine($"爬取失败: {url} - {ex.Message}");
}
finally
{
_semaphore.Release();
}
}
public void Dispose()
{
_httpClient?.Dispose();
_semaphore?.Dispose();
}
}
public class CrawlResult
{
public string Url { get; set; }
public HttpStatusCode StatusCode { get; set; }
public int ContentLength { get; set; }
public TimeSpan Duration { get; set; }
public int ThreadId { get; set; }
public DateTime Timestamp { get; set; }
public string Error { get; set; }
public bool IsSuccess => string.IsNullOrEmpty(Error);
}
### 7.2 并发数据处理管道
```csharp
public class DataProcessingPipeline
{
public static async Task DemonstrateDataPipeline()
{
Console.WriteLine("\n=== 数据处理管道演示 ===");
var pipeline = new Pipeline<int, string>();
// 阶段1: 数据生成
pipeline.AddStage("数据生成", async data =>
{
await Task.Delay(50); // 模拟数据生成时间
return data * 2;
});
// 阶段2: 数据验证
pipeline.AddStage("数据验证", async data =>
{
await Task.Delay(30);
if (data < 0) throw new InvalidOperationException("负数无效");
return data;
});
// 阶段3: 数据转换
pipeline.AddStage("数据转换", async data =>
{
await Task.Delay(40);
return $"处理后的数据: {data}";
});
// 处理数据
var inputData = Enumerable.Range(-5, 11).ToList();
var results = await pipeline.ProcessAsync(inputData);
Console.WriteLine("\n处理结果:");
foreach (var result in results)
{
if (result.IsSuccess)
{
Console.WriteLine($"成功: {result.Data}");
}
else
{
Console.WriteLine($"失败: {result.Error}");
}
}
}
}
public class Pipeline<TInput, TOutput>
{
private readonly List<PipelineStage> _stages = new();
public void AddStage<T>(string name, Func<object, Task<T>> processor)
{
_stages.Add(new PipelineStage
{
Name = name,
Processor = async data => await processor(data)
});
}
public async Task<List<PipelineResult<TOutput>>> ProcessAsync(List<TInput> inputs)
{
var results = new List<PipelineResult<TOutput>>();
var tasks = inputs.Select(async input =>
{
try
{
object currentData = input;
foreach (var stage in _stages)
{
Console.WriteLine($"阶段 '{stage.Name}' 处理数据: {currentData} [线程{Thread.CurrentThread.ManagedThreadId}]");
currentData = await stage.Processor(currentData);
}
return new PipelineResult<TOutput>
{
IsSuccess = true,
Data = (TOutput)currentData
};
}
catch (Exception ex)
{
return new PipelineResult<TOutput>
{
IsSuccess = false,
Error = ex.Message
};
}
});
var taskResults = await Task.WhenAll(tasks);
return taskResults.ToList();
}
private class PipelineStage
{
public string Name { get; set; }
public Func<object, Task<object>> Processor { get; set; }
}
}
public class PipelineResult<T>
{
public bool IsSuccess { get; set; }
public T Data { get; set; }
public string Error { get; set; }
}
8. 章节总结
本章深入介绍了C#中的多线程和并发编程,涵盖了以下核心内容:
核心概念
- 线程基础: 线程生命周期、前台/后台线程、线程池的使用
- Task和异步编程:
Task
、async/await
、取消令牌的使用 - 线程同步: 锁机制、信号量、事件、原子操作
- 并行编程:
Parallel
类、PLINQ、并发集合
重要技能
- 掌握异步编程模式,提高应用程序响应性
- 理解线程安全和同步机制,避免竞态条件
- 学会使用并行编程技术,充分利用多核处理器
- 掌握并发设计模式,构建可扩展的应用程序
最佳实践
- 优先使用
async/await
而不是直接操作线程 - 使用不可变对象和线程安全集合
- 最小化锁的持有时间,避免死锁
- 合理控制并发度,避免资源耗尽
- 进行性能监控和调试,优化并发性能
实际应用
- Web应用程序的异步处理
- 数据处理管道和批处理
- 并发网络请求和I/O操作
- 实时系统和高性能计算
掌握多线程和并发编程是现代C#开发的重要技能,它能帮助你构建高性能、可扩展的应用程序。下一章我们将学习反射和特性,探索C#的元编程能力。