本章概述

多线程和并发编程是现代应用程序开发的重要技能,特别是在需要处理大量数据、提高应用程序响应性和充分利用多核处理器的场景中。本章将深入探讨C#中的多线程和并发编程技术。

学习目标

  • 理解线程的基本概念和生命周期
  • 掌握Task和async/await异步编程模式
  • 学习线程同步和并发控制技术
  • 了解并行编程和PLINQ
  • 掌握线程安全的集合和数据结构
  • 学习并发设计模式和最佳实践

1. 线程基础

1.1 线程概念和生命周期

using System;
using System.Threading;
using System.Diagnostics;

public class ThreadBasicsDemo
{
    public static void DemonstrateThreadBasics()
    {
        Console.WriteLine("=== 线程基础演示 ===");
        
        // 获取当前线程信息
        var currentThread = Thread.CurrentThread;
        Console.WriteLine($"主线程ID: {currentThread.ManagedThreadId}");
        Console.WriteLine($"主线程名称: {currentThread.Name ?? "未命名"}");
        Console.WriteLine($"是否为后台线程: {currentThread.IsBackground}");
        Console.WriteLine($"线程状态: {currentThread.ThreadState}");
        Console.WriteLine($"处理器核心数: {Environment.ProcessorCount}");
        
        Console.WriteLine("\n--- 创建和启动线程 ---");
        
        // 方法1:使用Thread类
        var thread1 = new Thread(WorkerMethod)
        {
            Name = "工作线程1",
            IsBackground = false // 前台线程
        };
        
        var thread2 = new Thread(() => WorkerMethodWithParameter("线程2参数"))
        {
            Name = "工作线程2",
            IsBackground = true // 后台线程
        };
        
        // 启动线程
        thread1.Start();
        thread2.Start();
        
        Console.WriteLine("线程已启动");
        
        // 等待线程完成
        thread1.Join(); // 等待thread1完成
        thread2.Join(2000); // 最多等待thread2 2秒
        
        Console.WriteLine("主线程继续执行");
        
        // 方法2:使用ThreadStart委托
        Console.WriteLine("\n--- 使用ThreadStart委托 ---");
        
        var threadStart = new ThreadStart(() =>
        {
            for (int i = 0; i < 3; i++)
            {
                Console.WriteLine($"ThreadStart线程: {Thread.CurrentThread.ManagedThreadId}, 计数: {i}");
                Thread.Sleep(500);
            }
        });
        
        var thread3 = new Thread(threadStart) { Name = "ThreadStart线程" };
        thread3.Start();
        thread3.Join();
        
        // 方法3:使用ParameterizedThreadStart
        Console.WriteLine("\n--- 使用ParameterizedThreadStart ---");
        
        var paramThread = new Thread(new ParameterizedThreadStart(ParameterizedWorker))
        {
            Name = "参数化线程"
        };
        
        paramThread.Start("Hello from parameterized thread!");
        paramThread.Join();
    }
    
    private static void WorkerMethod()
    {
        var thread = Thread.CurrentThread;
        Console.WriteLine($"[{thread.Name}] 开始执行,线程ID: {thread.ManagedThreadId}");
        
        for (int i = 0; i < 5; i++)
        {
            Console.WriteLine($"[{thread.Name}] 工作中... {i + 1}/5");
            Thread.Sleep(300); // 模拟工作
        }
        
        Console.WriteLine($"[{thread.Name}] 执行完成");
    }
    
    private static void WorkerMethodWithParameter(string parameter)
    {
        var thread = Thread.CurrentThread;
        Console.WriteLine($"[{thread.Name}] 接收到参数: {parameter}");
        
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine($"[{thread.Name}] 处理中... {i + 1}/3");
            Thread.Sleep(400);
        }
        
        Console.WriteLine($"[{thread.Name}] 参数处理完成");
    }
    
    private static void ParameterizedWorker(object parameter)
    {
        var thread = Thread.CurrentThread;
        var message = parameter as string;
        
        Console.WriteLine($"[{thread.Name}] 收到消息: {message}");
        
        for (int i = 0; i < 3; i++)
        {
            Console.WriteLine($"[{thread.Name}] 处理消息... {i + 1}/3");
            Thread.Sleep(200);
        }
        
        Console.WriteLine($"[{thread.Name}] 消息处理完成");
    }
}

1.2 线程池

public class ThreadPoolDemo
{
    private static int _completedTasks = 0;
    private static readonly object _lockObject = new object();
    
    public static void DemonstrateThreadPool()
    {
        Console.WriteLine("\n=== 线程池演示 ===");
        
        // 获取线程池信息
        ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
        ThreadPool.GetMinThreads(out int minWorkerThreads, out int minCompletionPortThreads);
        ThreadPool.GetAvailableThreads(out int availableWorkerThreads, out int availableCompletionPortThreads);
        
        Console.WriteLine($"最大工作线程数: {maxWorkerThreads}");
        Console.WriteLine($"最大I/O完成端口线程数: {maxCompletionPortThreads}");
        Console.WriteLine($"最小工作线程数: {minWorkerThreads}");
        Console.WriteLine($"最小I/O完成端口线程数: {minCompletionPortThreads}");
        Console.WriteLine($"可用工作线程数: {availableWorkerThreads}");
        Console.WriteLine($"可用I/O完成端口线程数: {availableCompletionPortThreads}");
        
        Console.WriteLine("\n--- 使用线程池执行任务 ---");
        
        var stopwatch = Stopwatch.StartNew();
        
        // 方法1:使用QueueUserWorkItem
        for (int i = 0; i < 10; i++)
        {
            int taskId = i;
            ThreadPool.QueueUserWorkItem(state =>
            {
                ThreadPoolWorker(taskId);
            });
        }
        
        // 等待所有任务完成
        while (_completedTasks < 10)
        {
            Thread.Sleep(100);
        }
        
        stopwatch.Stop();
        Console.WriteLine($"\n所有线程池任务完成,耗时: {stopwatch.ElapsedMilliseconds} 毫秒");
        
        // 重置计数器
        _completedTasks = 0;
        
        Console.WriteLine("\n--- 使用WaitCallback委托 ---");
        
        var waitHandles = new ManualResetEvent[5];
        
        for (int i = 0; i < 5; i++)
        {
            waitHandles[i] = new ManualResetEvent(false);
            int taskId = i;
            
            ThreadPool.QueueUserWorkItem(new WaitCallback(state =>
            {
                var data = (TaskData)state;
                ThreadPoolWorkerWithCallback(data.TaskId, data.WaitHandle);
            }), new TaskData { TaskId = taskId, WaitHandle = waitHandles[i] });
        }
        
        // 等待所有任务完成
        WaitHandle.WaitAll(waitHandles);
        Console.WriteLine("所有WaitCallback任务完成");
        
        // 清理资源
        foreach (var handle in waitHandles)
        {
            handle.Dispose();
        }
    }
    
    private static void ThreadPoolWorker(int taskId)
    {
        var threadId = Thread.CurrentThread.ManagedThreadId;
        var isBackground = Thread.CurrentThread.IsBackground;
        var isThreadPoolThread = Thread.CurrentThread.IsThreadPoolThread;
        
        Console.WriteLine($"任务 {taskId} 开始 - 线程ID: {threadId}, 后台线程: {isBackground}, 线程池线程: {isThreadPoolThread}");
        
        // 模拟工作
        Thread.Sleep(Random.Shared.Next(500, 1500));
        
        Console.WriteLine($"任务 {taskId} 完成 - 线程ID: {threadId}");
        
        lock (_lockObject)
        {
            _completedTasks++;
        }
    }
    
    private static void ThreadPoolWorkerWithCallback(int taskId, ManualResetEvent waitHandle)
    {
        var threadId = Thread.CurrentThread.ManagedThreadId;
        
        Console.WriteLine($"回调任务 {taskId} 开始 - 线程ID: {threadId}");
        
        // 模拟工作
        Thread.Sleep(Random.Shared.Next(300, 800));
        
        Console.WriteLine($"回调任务 {taskId} 完成 - 线程ID: {threadId}");
        
        // 通知任务完成
        waitHandle.Set();
    }
    
    private class TaskData
    {
        public int TaskId { get; set; }
        public ManualResetEvent WaitHandle { get; set; }
    }
}

2. Task和异步编程

2.1 Task基础

public class TaskBasicsDemo
{
    public static async Task DemonstrateTaskBasics()
    {
        Console.WriteLine("\n=== Task基础演示 ===");
        
        Console.WriteLine("\n--- 创建和启动Task ---");
        
        // 方法1:使用Task.Run
        var task1 = Task.Run(() =>
        {
            Console.WriteLine($"Task.Run - 线程ID: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(1000);
            return "Task.Run 完成";
        });
        
        // 方法2:使用Task构造函数
        var task2 = new Task(() =>
        {
            Console.WriteLine($"Task构造函数 - 线程ID: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(800);
        });
        task2.Start();
        
        // 方法3:使用Task.Factory.StartNew
        var task3 = Task.Factory.StartNew(() =>
        {
            Console.WriteLine($"Task.Factory.StartNew - 线程ID: {Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(600);
            return 42;
        });
        
        Console.WriteLine("所有Task已启动");
        
        // 等待Task完成
        var result1 = await task1;
        Console.WriteLine($"Task1 结果: {result1}");
        
        await task2;
        Console.WriteLine("Task2 完成");
        
        var result3 = await task3;
        Console.WriteLine($"Task3 结果: {result3}");
        
        Console.WriteLine("\n--- Task状态监控 ---");
        
        var monitorTask = Task.Run(async () =>
        {
            Console.WriteLine("长时间运行的任务开始");
            for (int i = 0; i < 5; i++)
            {
                await Task.Delay(500);
                Console.WriteLine($"进度: {(i + 1) * 20}%");
            }
            return "长时间任务完成";
        });
        
        // 监控Task状态
        while (!monitorTask.IsCompleted)
        {
            Console.WriteLine($"任务状态: {monitorTask.Status}");
            await Task.Delay(200);
        }
        
        var monitorResult = await monitorTask;
        Console.WriteLine($"监控任务结果: {monitorResult}");
        Console.WriteLine($"最终状态: {monitorTask.Status}");
    }
    
    public static async Task DemonstrateTaskContinuation()
    {
        Console.WriteLine("\n--- Task延续演示 ---");
        
        var initialTask = Task.Run(() =>
        {
            Console.WriteLine("初始任务执行中...");
            Thread.Sleep(1000);
            return 10;
        });
        
        // 使用ContinueWith
        var continuationTask = initialTask.ContinueWith(antecedent =>
        {
            Console.WriteLine($"延续任务接收到结果: {antecedent.Result}");
            return antecedent.Result * 2;
        });
        
        var finalResult = await continuationTask;
        Console.WriteLine($"最终结果: {finalResult}");
        
        // 链式延续
        Console.WriteLine("\n--- 链式延续 ---");
        
        var chainResult = await Task.Run(() =>
        {
            Console.WriteLine("步骤1: 初始计算");
            return 5;
        })
        .ContinueWith(task =>
        {
            Console.WriteLine($"步骤2: 接收 {task.Result},计算平方");
            return task.Result * task.Result;
        })
        .ContinueWith(task =>
        {
            Console.WriteLine($"步骤3: 接收 {task.Result},加10");
            return task.Result + 10;
        });
        
        Console.WriteLine($"链式计算结果: {chainResult}");
    }
    
    public static async Task DemonstrateTaskCombination()
    {
        Console.WriteLine("\n--- Task组合演示 ---");
        
        // 创建多个任务
        var tasks = new List<Task<int>>();
        
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            var task = Task.Run(async () =>
            {
                var delay = Random.Shared.Next(500, 2000);
                Console.WriteLine($"任务 {taskId} 开始,预计耗时 {delay}ms");
                await Task.Delay(delay);
                Console.WriteLine($"任务 {taskId} 完成");
                return taskId * 10;
            });
            tasks.Add(task);
        }
        
        // Task.WhenAll - 等待所有任务完成
        Console.WriteLine("\n使用 Task.WhenAll 等待所有任务完成...");
        var stopwatch = Stopwatch.StartNew();
        
        var allResults = await Task.WhenAll(tasks);
        
        stopwatch.Stop();
        Console.WriteLine($"所有任务完成,耗时: {stopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"结果: [{string.Join(", ", allResults)}]");
        
        // Task.WhenAny - 等待任何一个任务完成
        Console.WriteLine("\n--- Task.WhenAny演示 ---");
        
        var newTasks = new List<Task<string>>();
        for (int i = 0; i < 3; i++)
        {
            int taskId = i;
            var task = Task.Run(async () =>
            {
                var delay = Random.Shared.Next(1000, 3000);
                await Task.Delay(delay);
                return $"任务{taskId}完成";
            });
            newTasks.Add(task);
        }
        
        var completedTask = await Task.WhenAny(newTasks);
        var firstResult = await completedTask;
        
        Console.WriteLine($"第一个完成的任务结果: {firstResult}");
        
        // 等待剩余任务完成
        var remainingTasks = newTasks.Where(t => t != completedTask);
        await Task.WhenAll(remainingTasks);
        Console.WriteLine("所有剩余任务也已完成");
    }
}

2.2 async/await异步编程模式

public class AsyncAwaitDemo
{
    public static async Task DemonstrateAsyncAwait()
    {
        Console.WriteLine("\n=== async/await演示 ===");
        
        Console.WriteLine("\n--- 基础async/await ---");
        
        // 异步方法调用
        var result1 = await PerformAsyncOperation("操作1", 1000);
        Console.WriteLine($"结果1: {result1}");
        
        var result2 = await PerformAsyncOperation("操作2", 1500);
        Console.WriteLine($"结果2: {result2}");
        
        Console.WriteLine("\n--- 并发执行多个异步操作 ---");
        
        var stopwatch = Stopwatch.StartNew();
        
        // 串行执行
        Console.WriteLine("串行执行:");
        var serialStart = stopwatch.ElapsedMilliseconds;
        
        var serialResult1 = await PerformAsyncOperation("串行操作1", 800);
        var serialResult2 = await PerformAsyncOperation("串行操作2", 600);
        var serialResult3 = await PerformAsyncOperation("串行操作3", 700);
        
        var serialTime = stopwatch.ElapsedMilliseconds - serialStart;
        Console.WriteLine($"串行执行完成,耗时: {serialTime}ms");
        
        // 并发执行
        Console.WriteLine("\n并发执行:");
        var concurrentStart = stopwatch.ElapsedMilliseconds;
        
        var task1 = PerformAsyncOperation("并发操作1", 800);
        var task2 = PerformAsyncOperation("并发操作2", 600);
        var task3 = PerformAsyncOperation("并发操作3", 700);
        
        var concurrentResults = await Task.WhenAll(task1, task2, task3);
        
        var concurrentTime = stopwatch.ElapsedMilliseconds - concurrentStart;
        Console.WriteLine($"并发执行完成,耗时: {concurrentTime}ms");
        Console.WriteLine($"性能提升: {(double)serialTime / concurrentTime:F2}x");
        
        stopwatch.Stop();
    }
    
    private static async Task<string> PerformAsyncOperation(string operationName, int delayMs)
    {
        var threadId = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine($"[线程{threadId}] {operationName} 开始");
        
        // 模拟异步I/O操作
        await Task.Delay(delayMs);
        
        var completionThreadId = Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine($"[线程{completionThreadId}] {operationName} 完成");
        
        return $"{operationName}结果";
    }
    
    public static async Task DemonstrateAsyncExceptionHandling()
    {
        Console.WriteLine("\n--- 异步异常处理演示 ---");
        
        // 单个异步操作的异常处理
        try
        {
            var result = await PerformAsyncOperationWithException("正常操作", false);
            Console.WriteLine($"正常结果: {result}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"捕获异常: {ex.Message}");
        }
        
        try
        {
            var result = await PerformAsyncOperationWithException("异常操作", true);
            Console.WriteLine($"异常结果: {result}");
        }
        catch (InvalidOperationException ex)
        {
            Console.WriteLine($"捕获特定异常: {ex.Message}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"捕获通用异常: {ex.Message}");
        }
        
        // 多个异步操作的异常处理
        Console.WriteLine("\n--- 多个异步操作的异常处理 ---");
        
        var tasks = new[]
        {
            PerformAsyncOperationWithException("任务1", false),
            PerformAsyncOperationWithException("任务2", true),
            PerformAsyncOperationWithException("任务3", false)
        };
        
        try
        {
            var results = await Task.WhenAll(tasks);
            Console.WriteLine($"所有任务成功: {string.Join(", ", results)}");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"Task.WhenAll 捕获异常: {ex.Message}");
            
            // 检查各个任务的状态
            for (int i = 0; i < tasks.Length; i++)
            {
                var task = tasks[i];
                Console.WriteLine($"任务{i + 1} 状态: {task.Status}");
                
                if (task.IsFaulted)
                {
                    Console.WriteLine($"  异常: {task.Exception?.InnerException?.Message}");
                }
                else if (task.IsCompletedSuccessfully)
                {
                    Console.WriteLine($"  结果: {task.Result}");
                }
            }
        }
    }
    
    private static async Task<string> PerformAsyncOperationWithException(string operationName, bool shouldThrow)
    {
        Console.WriteLine($"{operationName} 开始");
        
        await Task.Delay(500);
        
        if (shouldThrow)
        {
            throw new InvalidOperationException($"{operationName} 发生异常");
        }
        
        Console.WriteLine($"{operationName} 完成");
        return $"{operationName} 成功";
    }
    
    public static async Task DemonstrateAsyncEnumerable()
    {
        Console.WriteLine("\n--- 异步可枚举演示 ---");
        
        await foreach (var item in GenerateAsyncSequence(5))
        {
            Console.WriteLine($"接收到项目: {item}");
        }
        
        Console.WriteLine("异步序列处理完成");
    }
    
    private static async IAsyncEnumerable<string> GenerateAsyncSequence(int count)
    {
        for (int i = 0; i < count; i++)
        {
            // 模拟异步数据生成
            await Task.Delay(300);
            yield return $"项目-{i + 1}";
        }
    }
    
    public static async Task DemonstrateConfigureAwait()
    {
        Console.WriteLine("\n--- ConfigureAwait演示 ---");
        
        // 在UI应用程序中,这很重要
        // ConfigureAwait(false) 可以避免死锁并提高性能
        
        Console.WriteLine("使用 ConfigureAwait(false):");
        var result1 = await PerformAsyncOperation("ConfigureAwait(false)", 500).ConfigureAwait(false);
        Console.WriteLine($"结果: {result1}");
        
        Console.WriteLine("\n使用默认 ConfigureAwait(true):");
        var result2 = await PerformAsyncOperation("默认ConfigureAwait", 500);
        Console.WriteLine($"结果: {result2}");
        
        // 在库代码中,通常建议使用 ConfigureAwait(false)
        // 在应用程序代码中,通常使用默认的 ConfigureAwait(true)
    }
}

### 2.3 取消令牌(CancellationToken)

```csharp
public class CancellationTokenDemo
{
    public static async Task DemonstrateCancellationToken()
    {
        Console.WriteLine("\n=== 取消令牌演示 ===");
        
        Console.WriteLine("\n--- 基础取消操作 ---");
        
        using var cts = new CancellationTokenSource();
        
        // 设置5秒后自动取消
        cts.CancelAfter(TimeSpan.FromSeconds(5));
        
        try
        {
            var result = await LongRunningOperation("长时间操作", 10000, cts.Token);
            Console.WriteLine($"操作完成: {result}");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("操作被取消");
        }
        
        Console.WriteLine("\n--- 手动取消操作 ---");
        
        using var manualCts = new CancellationTokenSource();
        
        // 启动长时间运行的任务
        var longTask = LongRunningOperation("手动取消操作", 8000, manualCts.Token);
        
        // 2秒后手动取消
        await Task.Delay(2000);
        Console.WriteLine("发送取消信号...");
        manualCts.Cancel();
        
        try
        {
            var result = await longTask;
            Console.WriteLine($"操作完成: {result}");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("操作被手动取消");
        }
        
        Console.WriteLine("\n--- 多个操作的协调取消 ---");
        
        using var coordinatedCts = new CancellationTokenSource();
        
        var tasks = new[]
        {
            CoordinatedOperation("操作A", 3000, coordinatedCts.Token),
            CoordinatedOperation("操作B", 4000, coordinatedCts.Token),
            CoordinatedOperation("操作C", 5000, coordinatedCts.Token)
        };
        
        // 3.5秒后取消所有操作
        coordinatedCts.CancelAfter(3500);
        
        var results = await Task.WhenAll(tasks.Select(async task =>
        {
            try
            {
                return await task;
            }
            catch (OperationCanceledException)
            {
                return "已取消";
            }
        }));
        
        Console.WriteLine($"协调操作结果: [{string.Join(", ", results)}]");
    }
    
    private static async Task<string> LongRunningOperation(string operationName, int durationMs, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{operationName} 开始,预计耗时 {durationMs}ms");
        
        var progress = 0;
        var stepDuration = durationMs / 10;
        
        for (int i = 0; i < 10; i++)
        {
            // 检查取消请求
            cancellationToken.ThrowIfCancellationRequested();
            
            await Task.Delay(stepDuration, cancellationToken);
            progress += 10;
            
            Console.WriteLine($"{operationName} 进度: {progress}%");
        }
        
        Console.WriteLine($"{operationName} 完成");
        return $"{operationName} 成功";
    }
    
    private static async Task<string> CoordinatedOperation(string operationName, int durationMs, CancellationToken cancellationToken)
    {
        Console.WriteLine($"{operationName} 开始");
        
        try
        {
            await Task.Delay(durationMs, cancellationToken);
            Console.WriteLine($"{operationName} 完成");
            return $"{operationName} 成功";
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine($"{operationName} 被取消");
            throw;
        }
    }
    
    public static async Task DemonstrateCancellationTokenCombination()
    {
        Console.WriteLine("\n--- 取消令牌组合演示 ---");
        
        using var cts1 = new CancellationTokenSource();
        using var cts2 = new CancellationTokenSource();
        
        // 组合多个取消令牌
        using var combinedCts = CancellationTokenSource.CreateLinkedTokenSource(cts1.Token, cts2.Token);
        
        // 设置不同的取消时间
        cts1.CancelAfter(3000); // 3秒后取消
        cts2.CancelAfter(5000); // 5秒后取消
        
        try
        {
            var result = await LongRunningOperation("组合取消操作", 8000, combinedCts.Token);
            Console.WriteLine($"操作完成: {result}");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("操作被组合取消令牌取消");
            Console.WriteLine($"CTS1 是否取消: {cts1.Token.IsCancellationRequested}");
            Console.WriteLine($"CTS2 是否取消: {cts2.Token.IsCancellationRequested}");
        }
    }
    
    public static async Task DemonstrateCancellationCallback()
    {
        Console.WriteLine("\n--- 取消回调演示 ---");
        
        using var cts = new CancellationTokenSource();
        
        // 注册取消回调
        var registration = cts.Token.Register(() =>
        {
            Console.WriteLine("取消回调被调用 - 执行清理操作");
        });
        
        var registration2 = cts.Token.Register(state =>
        {
            var message = state as string;
            Console.WriteLine($"带状态的取消回调: {message}");
        }, "清理资源完成");
        
        // 启动操作
        var operationTask = Task.Run(async () =>
        {
            try
            {
                for (int i = 0; i < 10; i++)
                {
                    cts.Token.ThrowIfCancellationRequested();
                    await Task.Delay(500, cts.Token);
                    Console.WriteLine($"操作进度: {(i + 1) * 10}%");
                }
                return "操作成功";
            }
            catch (OperationCanceledException)
            {
                Console.WriteLine("操作在回调演示中被取消");
                throw;
            }
        });
        
        // 2.5秒后取消
        await Task.Delay(2500);
        cts.Cancel();
        
        try
        {
            var result = await operationTask;
            Console.WriteLine($"结果: {result}");
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("确认操作被取消");
        }
        
        // 清理注册
        registration.Dispose();
        registration2.Dispose();
    }
}

3. 线程同步和并发控制

3.1 锁机制

public class LockingDemo
{
    private static readonly object _lockObject = new object();
    private static int _sharedCounter = 0;
    private static readonly ReaderWriterLockSlim _readerWriterLock = new ReaderWriterLockSlim();
    private static readonly Dictionary<string, string> _sharedDictionary = new Dictionary<string, string>();
    
    public static async Task DemonstrateLocking()
    {
        Console.WriteLine("\n=== 锁机制演示 ===");
        
        Console.WriteLine("\n--- 不使用锁的竞态条件 ---");
        await DemonstrateRaceCondition();
        
        Console.WriteLine("\n--- 使用lock语句 ---");
        await DemonstrateLockStatement();
        
        Console.WriteLine("\n--- 使用Monitor类 ---");
        await DemonstrateMonitor();
        
        Console.WriteLine("\n--- 使用ReaderWriterLockSlim ---");
        await DemonstrateReaderWriterLock();
    }
    
    private static async Task DemonstrateRaceCondition()
    {
        var unsafeCounter = 0;
        var tasks = new List<Task>();
        
        // 创建多个任务同时修改共享变量
        for (int i = 0; i < 10; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 1000; j++)
                {
                    unsafeCounter++; // 非线程安全操作
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine($"不安全计数器最终值: {unsafeCounter} (期望值: 10000)");
    }
    
    private static async Task DemonstrateLockStatement()
    {
        _sharedCounter = 0;
        var tasks = new List<Task>();
        
        // 使用lock语句保护共享资源
        for (int i = 0; i < 10; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 1000; j++)
                {
                    lock (_lockObject)
                    {
                        _sharedCounter++; // 线程安全操作
                    }
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine($"安全计数器最终值: {_sharedCounter} (期望值: 10000)");
    }
    
    private static async Task DemonstrateMonitor()
    {
        var monitorCounter = 0;
        var monitorLock = new object();
        var tasks = new List<Task>();
        
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 500; j++)
                {
                    bool lockTaken = false;
                    try
                    {
                        Monitor.Enter(monitorLock, ref lockTaken);
                        
                        var oldValue = monitorCounter;
                        Thread.Sleep(1); // 模拟一些工作
                        monitorCounter = oldValue + 1;
                        
                        if (j % 100 == 0)
                        {
                            Console.WriteLine($"任务{taskId} 更新计数器到: {monitorCounter}");
                        }
                    }
                    finally
                    {
                        if (lockTaken)
                        {
                            Monitor.Exit(monitorLock);
                        }
                    }
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine($"Monitor计数器最终值: {monitorCounter} (期望值: 2500)");
    }
    
    private static async Task DemonstrateReaderWriterLock()
    {
        _sharedDictionary.Clear();
        var tasks = new List<Task>();
        
        // 创建写入任务
        for (int i = 0; i < 3; i++)
        {
            int writerId = i;
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 5; j++)
                {
                    _readerWriterLock.EnterWriteLock();
                    try
                    {
                        var key = $"writer{writerId}_item{j}";
                        var value = $"value_{DateTime.Now.Ticks}";
                        _sharedDictionary[key] = value;
                        
                        Console.WriteLine($"写入者{writerId} 写入: {key} = {value}");
                        Thread.Sleep(100); // 模拟写入工作
                    }
                    finally
                    {
                        _readerWriterLock.ExitWriteLock();
                    }
                }
            }));
        }
        
        // 创建读取任务
        for (int i = 0; i < 5; i++)
        {
            int readerId = i;
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 10; j++)
                {
                    _readerWriterLock.EnterReadLock();
                    try
                    {
                        var count = _sharedDictionary.Count;
                        Console.WriteLine($"读取者{readerId} 读取到 {count} 个项目");
                        
                        if (count > 0)
                        {
                            var firstItem = _sharedDictionary.First();
                            Console.WriteLine($"  第一个项目: {firstItem.Key} = {firstItem.Value}");
                        }
                        
                        Thread.Sleep(50); // 模拟读取工作
                    }
                    finally
                    {
                        _readerWriterLock.ExitReadLock();
                    }
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine($"最终字典包含 {_sharedDictionary.Count} 个项目");
    }
}

### 3.2 信号量和事件

```csharp
public class SynchronizationPrimitivesDemo
{
    private static readonly SemaphoreSlim _semaphore = new SemaphoreSlim(3, 3); // 最多3个并发
    private static readonly ManualResetEventSlim _manualResetEvent = new ManualResetEventSlim(false);
    private static readonly AutoResetEvent _autoResetEvent = new AutoResetEvent(false);
    private static readonly CountdownEvent _countdownEvent = new CountdownEvent(5);
    
    public static async Task DemonstrateSynchronizationPrimitives()
    {
        Console.WriteLine("\n=== 同步原语演示 ===");
        
        Console.WriteLine("\n--- 信号量(Semaphore)演示 ---");
        await DemonstrateSemaphore();
        
        Console.WriteLine("\n--- ManualResetEvent演示 ---");
        await DemonstrateManualResetEvent();
        
        Console.WriteLine("\n--- AutoResetEvent演示 ---");
        await DemonstrateAutoResetEvent();
        
        Console.WriteLine("\n--- CountdownEvent演示 ---");
        await DemonstrateCountdownEvent();
    }
    
    private static async Task DemonstrateSemaphore()
    {
        var tasks = new List<Task>();
        
        // 创建8个任务,但信号量只允许3个并发
        for (int i = 0; i < 8; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(async () =>
            {
                Console.WriteLine($"任务{taskId} 等待进入信号量...");
                
                await _semaphore.WaitAsync();
                try
                {
                    Console.WriteLine($"任务{taskId} 进入信号量,开始工作");
                    await Task.Delay(2000); // 模拟工作
                    Console.WriteLine($"任务{taskId} 完成工作,离开信号量");
                }
                finally
                {
                    _semaphore.Release();
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine("所有信号量任务完成");
    }
    
    private static async Task DemonstrateManualResetEvent()
    {
        _manualResetEvent.Reset();
        var tasks = new List<Task>();
        
        // 创建多个等待任务
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(() =>
            {
                Console.WriteLine($"任务{taskId} 等待ManualResetEvent信号...");
                _manualResetEvent.Wait();
                Console.WriteLine($"任务{taskId} 收到信号,继续执行");
            }));
        }
        
        // 等待2秒后发送信号
        await Task.Delay(2000);
        Console.WriteLine("发送ManualResetEvent信号...");
        _manualResetEvent.Set();
        
        await Task.WhenAll(tasks);
        Console.WriteLine("所有ManualResetEvent任务完成");
    }
    
    private static async Task DemonstrateAutoResetEvent()
    {
        var tasks = new List<Task>();
        
        // 创建多个等待任务
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(() =>
            {
                Console.WriteLine($"任务{taskId} 等待AutoResetEvent信号...");
                _autoResetEvent.WaitOne();
                Console.WriteLine($"任务{taskId} 收到信号,继续执行");
            }));
        }
        
        // 每秒发送一个信号
        for (int i = 0; i < 5; i++)
        {
            await Task.Delay(1000);
            Console.WriteLine($"发送第{i + 1}个AutoResetEvent信号...");
            _autoResetEvent.Set();
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine("所有AutoResetEvent任务完成");
    }
    
    private static async Task DemonstrateCountdownEvent()
    {
        _countdownEvent.Reset(5);
        var workerTasks = new List<Task>();
        
        // 创建工作任务
        for (int i = 0; i < 5; i++)
        {
            int workerId = i;
            workerTasks.Add(Task.Run(async () =>
            {
                Console.WriteLine($"工作者{workerId} 开始工作...");
                await Task.Delay(Random.Shared.Next(1000, 3000)); // 随机工作时间
                Console.WriteLine($"工作者{workerId} 完成工作");
                _countdownEvent.Signal();
            }));
        }
        
        // 等待所有工作完成
        var waitTask = Task.Run(() =>
        {
            Console.WriteLine("主线程等待所有工作者完成...");
            _countdownEvent.Wait();
            Console.WriteLine("所有工作者已完成,主线程继续");
        });
        
        await Task.WhenAll(workerTasks.Concat(new[] { waitTask }));
        Console.WriteLine("CountdownEvent演示完成");
    }
}

### 3.3 原子操作

```csharp
public class AtomicOperationsDemo
{
    private static long _atomicCounter = 0;
    private static int _interlocked = 0;
    
    public static async Task DemonstrateAtomicOperations()
    {
        Console.WriteLine("\n=== 原子操作演示 ===");
        
        Console.WriteLine("\n--- Interlocked类演示 ---");
        await DemonstrateInterlocked();
        
        Console.WriteLine("\n--- 原子操作性能对比 ---");
        await CompareAtomicPerformance();
        
        Console.WriteLine("\n--- 复杂原子操作 ---");
        await DemonstrateComplexAtomicOperations();
    }
    
    private static async Task DemonstrateInterlocked()
    {
        _atomicCounter = 0;
        var tasks = new List<Task>();
        
        // 使用Interlocked进行原子操作
        for (int i = 0; i < 10; i++)
        {
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 1000; j++)
                {
                    Interlocked.Increment(ref _atomicCounter);
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine($"Interlocked计数器最终值: {_atomicCounter} (期望值: 10000)");
        
        // 演示其他Interlocked操作
        var value = 100L;
        Console.WriteLine($"\n原始值: {value}");
        
        var exchanged = Interlocked.Exchange(ref value, 200);
        Console.WriteLine($"Exchange操作 - 旧值: {exchanged}, 新值: {value}");
        
        var compared = Interlocked.CompareExchange(ref value, 300, 200);
        Console.WriteLine($"CompareExchange操作 - 比较值: {compared}, 当前值: {value}");
        
        var added = Interlocked.Add(ref value, 50);
        Console.WriteLine($"Add操作 - 结果: {added}");
    }
    
    private static async Task CompareAtomicPerformance()
    {
        const int iterations = 1000000;
        const int taskCount = 4;
        
        // 测试lock性能
        var lockObject = new object();
        var lockCounter = 0;
        var lockStopwatch = Stopwatch.StartNew();
        
        var lockTasks = Enumerable.Range(0, taskCount).Select(_ => Task.Run(() =>
        {
            for (int i = 0; i < iterations / taskCount; i++)
            {
                lock (lockObject)
                {
                    lockCounter++;
                }
            }
        }));
        
        await Task.WhenAll(lockTasks);
        lockStopwatch.Stop();
        
        // 测试Interlocked性能
        _interlocked = 0;
        var interlockedStopwatch = Stopwatch.StartNew();
        
        var interlockedTasks = Enumerable.Range(0, taskCount).Select(_ => Task.Run(() =>
        {
            for (int i = 0; i < iterations / taskCount; i++)
            {
                Interlocked.Increment(ref _interlocked);
            }
        }));
        
        await Task.WhenAll(interlockedTasks);
        interlockedStopwatch.Stop();
        
        Console.WriteLine($"Lock方式 - 结果: {lockCounter}, 耗时: {lockStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"Interlocked方式 - 结果: {_interlocked}, 耗时: {interlockedStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"性能提升: {(double)lockStopwatch.ElapsedMilliseconds / interlockedStopwatch.ElapsedMilliseconds:F2}x");
    }
    
    private static async Task DemonstrateComplexAtomicOperations()
    {
        var atomicReference = new AtomicReference<string>("初始值");
        var tasks = new List<Task>();
        
        // 多个任务尝试更新引用
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 3; j++)
                {
                    var newValue = $"任务{taskId}_更新{j}";
                    var oldValue = atomicReference.CompareExchange(newValue, atomicReference.Value);
                    Console.WriteLine($"任务{taskId} 尝试更新: {oldValue} -> {newValue}");
                    Thread.Sleep(100);
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        Console.WriteLine($"最终原子引用值: {atomicReference.Value}");
    }
}

// 简单的原子引用实现
public class AtomicReference<T> where T : class
{
    private volatile T _value;
    
    public AtomicReference(T initialValue)
    {
        _value = initialValue;
    }
    
    public T Value => _value;
    
    public T CompareExchange(T newValue, T comparand)
    {
        return Interlocked.CompareExchange(ref _value, newValue, comparand);
    }
    
    public T Exchange(T newValue)
    {
        return Interlocked.Exchange(ref _value, newValue);
    }
}

4. 并行编程和PLINQ

4.1 Parallel类

public class ParallelDemo
{
    public static void DemonstrateParallelOperations()
    {
        Console.WriteLine("\n=== 并行编程演示 ===");
        
        Console.WriteLine("\n--- Parallel.For演示 ---");
        DemonstrateParallelFor();
        
        Console.WriteLine("\n--- Parallel.ForEach演示 ---");
        DemonstrateParallelForEach();
        
        Console.WriteLine("\n--- Parallel.Invoke演示 ---");
        DemonstrateParallelInvoke();
        
        Console.WriteLine("\n--- 并行性能对比 ---");
        CompareParallelPerformance();
    }
    
    private static void DemonstrateParallelFor()
    {
        const int count = 10;
        var results = new int[count];
        
        Console.WriteLine("串行For循环:");
        var serialStopwatch = Stopwatch.StartNew();
        for (int i = 0; i < count; i++)
        {
            results[i] = ExpensiveComputation(i);
            Console.WriteLine($"串行处理项目 {i}: {results[i]}");
        }
        serialStopwatch.Stop();
        Console.WriteLine($"串行执行耗时: {serialStopwatch.ElapsedMilliseconds}ms");
        
        Console.WriteLine("\n并行For循环:");
        Array.Clear(results, 0, results.Length);
        var parallelStopwatch = Stopwatch.StartNew();
        
        Parallel.For(0, count, i =>
        {
            results[i] = ExpensiveComputation(i);
            Console.WriteLine($"并行处理项目 {i}: {results[i]} [线程{Thread.CurrentThread.ManagedThreadId}]");
        });
        
        parallelStopwatch.Stop();
        Console.WriteLine($"并行执行耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"性能提升: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
    }
    
    private static void DemonstrateParallelForEach()
    {
        var items = Enumerable.Range(1, 8).Select(i => $"项目{i}").ToList();
        var results = new ConcurrentBag<string>();
        
        Console.WriteLine("串行ForEach:");
        var serialStopwatch = Stopwatch.StartNew();
        foreach (var item in items)
        {
            var result = ProcessItem(item);
            Console.WriteLine($"串行处理: {result}");
        }
        serialStopwatch.Stop();
        
        Console.WriteLine("\n并行ForEach:");
        var parallelStopwatch = Stopwatch.StartNew();
        
        Parallel.ForEach(items, item =>
        {
            var result = ProcessItem(item);
            results.Add(result);
            Console.WriteLine($"并行处理: {result} [线程{Thread.CurrentThread.ManagedThreadId}]");
        });
        
        parallelStopwatch.Stop();
        Console.WriteLine($"串行耗时: {serialStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"并行耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"并行结果数量: {results.Count}");
    }
    
    private static void DemonstrateParallelInvoke()
    {
        Console.WriteLine("串行调用多个方法:");
        var serialStopwatch = Stopwatch.StartNew();
        
        Method1();
        Method2();
        Method3();
        
        serialStopwatch.Stop();
        
        Console.WriteLine("\n并行调用多个方法:");
        var parallelStopwatch = Stopwatch.StartNew();
        
        Parallel.Invoke(
            Method1,
            Method2,
            Method3
        );
        
        parallelStopwatch.Stop();
        Console.WriteLine($"串行调用耗时: {serialStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"并行调用耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
    }
    
    private static void CompareParallelPerformance()
    {
        const int dataSize = 1000000;
        var data = Enumerable.Range(1, dataSize).ToArray();
        
        // 串行计算
        var serialStopwatch = Stopwatch.StartNew();
        var serialSum = data.Sum(x => Math.Sqrt(x));
        serialStopwatch.Stop();
        
        // 并行计算
        var parallelStopwatch = Stopwatch.StartNew();
        var parallelSum = data.AsParallel().Sum(x => Math.Sqrt(x));
        parallelStopwatch.Stop();
        
        Console.WriteLine($"数据大小: {dataSize:N0}");
        Console.WriteLine($"串行计算结果: {serialSum:F2}, 耗时: {serialStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"并行计算结果: {parallelSum:F2}, 耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"性能提升: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
        Console.WriteLine($"处理器核心数: {Environment.ProcessorCount}");
    }
    
    private static int ExpensiveComputation(int input)
    {
        Thread.Sleep(200); // 模拟耗时计算
        return input * input;
    }
    
    private static string ProcessItem(string item)
    {
        Thread.Sleep(300); // 模拟处理时间
        return $"{item}_已处理";
    }
    
    private static void Method1()
    {
        Console.WriteLine($"方法1开始 [线程{Thread.CurrentThread.ManagedThreadId}]");
        Thread.Sleep(1000);
        Console.WriteLine($"方法1完成 [线程{Thread.CurrentThread.ManagedThreadId}]");
    }
    
    private static void Method2()
    {
        Console.WriteLine($"方法2开始 [线程{Thread.CurrentThread.ManagedThreadId}]");
        Thread.Sleep(800);
        Console.WriteLine($"方法2完成 [线程{Thread.CurrentThread.ManagedThreadId}]");
    }
    
    private static void Method3()
    {
        Console.WriteLine($"方法3开始 [线程{Thread.CurrentThread.ManagedThreadId}]");
        Thread.Sleep(600);
        Console.WriteLine($"方法3完成 [线程{Thread.CurrentThread.ManagedThreadId}]");
    }
}

### 4.2 PLINQ (Parallel LINQ)

```csharp
public class PLINQDemo
{
    public static void DemonstratePLINQ()
    {
        Console.WriteLine("\n=== PLINQ演示 ===");
        
        Console.WriteLine("\n--- 基础PLINQ操作 ---");
        DemonstrateBasicPLINQ();
        
        Console.WriteLine("\n--- PLINQ性能对比 ---");
        ComparePLINQPerformance();
        
        Console.WriteLine("\n--- PLINQ配置选项 ---");
        DemonstratePLINQOptions();
        
        Console.WriteLine("\n--- PLINQ异常处理 ---");
        DemonstratePLINQExceptionHandling();
    }
    
    private static void DemonstrateBasicPLINQ()
    {
        var numbers = Enumerable.Range(1, 1000).ToList();
        
        Console.WriteLine("串行LINQ查询:");
        var serialStopwatch = Stopwatch.StartNew();
        
        var serialResults = numbers
            .Where(n => IsPrime(n))
            .Select(n => new { Number = n, Square = n * n })
            .OrderByDescending(x => x.Number)
            .Take(10)
            .ToList();
        
        serialStopwatch.Stop();
        
        Console.WriteLine("并行PLINQ查询:");
        var parallelStopwatch = Stopwatch.StartNew();
        
        var parallelResults = numbers
            .AsParallel()
            .Where(n => IsPrime(n))
            .Select(n => new { Number = n, Square = n * n })
            .OrderByDescending(x => x.Number)
            .Take(10)
            .ToList();
        
        parallelStopwatch.Stop();
        
        Console.WriteLine($"串行结果数量: {serialResults.Count}, 耗时: {serialStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"并行结果数量: {parallelResults.Count}, 耗时: {parallelStopwatch.ElapsedMilliseconds}ms");
        
        Console.WriteLine("\n前5个质数及其平方:");
        foreach (var result in parallelResults.Take(5))
        {
            Console.WriteLine($"质数: {result.Number}, 平方: {result.Square}");
        }
    }
    
    private static void ComparePLINQPerformance()
    {
        var largeDataSet = Enumerable.Range(1, 10000000).ToArray();
        
        // 串行聚合操作
        var serialStopwatch = Stopwatch.StartNew();
        var serialSum = largeDataSet
            .Where(x => x % 2 == 0)
            .Select(x => Math.Sqrt(x))
            .Sum();
        serialStopwatch.Stop();
        
        // 并行聚合操作
        var parallelStopwatch = Stopwatch.StartNew();
        var parallelSum = largeDataSet
            .AsParallel()
            .Where(x => x % 2 == 0)
            .Select(x => Math.Sqrt(x))
            .Sum();
        parallelStopwatch.Stop();
        
        Console.WriteLine($"数据集大小: {largeDataSet.Length:N0}");
        Console.WriteLine($"串行处理: 结果={serialSum:F2}, 耗时={serialStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"并行处理: 结果={parallelSum:F2}, 耗时={parallelStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"性能提升: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
    }
    
    private static void DemonstratePLINQOptions()
    {
        var data = Enumerable.Range(1, 1000).ToArray();
        
        Console.WriteLine("\n--- 并行度控制 ---");
        
        // 限制并行度
        var limitedParallelStopwatch = Stopwatch.StartNew();
        var limitedResults = data
            .AsParallel()
            .WithDegreeOfParallelism(2) // 限制为2个线程
            .Where(n => IsPrime(n))
            .ToList();
        limitedParallelStopwatch.Stop();
        
        // 默认并行度
        var defaultParallelStopwatch = Stopwatch.StartNew();
        var defaultResults = data
            .AsParallel()
            .Where(n => IsPrime(n))
            .ToList();
        defaultParallelStopwatch.Stop();
        
        Console.WriteLine($"限制并行度(2): 结果数={limitedResults.Count}, 耗时={limitedParallelStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"默认并行度: 结果数={defaultResults.Count}, 耗时={defaultParallelStopwatch.ElapsedMilliseconds}ms");
        
        Console.WriteLine("\n--- 执行模式控制 ---");
        
        // 强制并行执行
        var forcedParallelStopwatch = Stopwatch.StartNew();
        var forcedResults = data
            .AsParallel()
            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
            .Take(10)
            .ToList();
        forcedParallelStopwatch.Stop();
        
        Console.WriteLine($"强制并行: 结果数={forcedResults.Count}, 耗时={forcedParallelStopwatch.ElapsedMilliseconds}ms");
        
        Console.WriteLine("\n--- 结果排序控制 ---");
        
        // 保持顺序
        var orderedResults = data
            .AsParallel()
            .AsOrdered()
            .Where(n => n % 100 == 0)
            .ToList();
        
        // 不保持顺序
        var unorderedResults = data
            .AsParallel()
            .Where(n => n % 100 == 0)
            .ToList();
        
        Console.WriteLine($"保持顺序: [{string.Join(", ", orderedResults)}]");
        Console.WriteLine($"不保持顺序: [{string.Join(", ", unorderedResults)}]");
    }
    
    private static void DemonstratePLINQExceptionHandling()
    {
        var data = Enumerable.Range(1, 100).ToArray();
        
        try
        {
            var results = data
                .AsParallel()
                .Select(x =>
                {
                    if (x == 50) // 在某个特定值时抛出异常
                    {
                        throw new InvalidOperationException($"处理值 {x} 时发生错误");
                    }
                    return x * 2;
                })
                .ToList();
            
            Console.WriteLine($"处理完成,结果数量: {results.Count}");
        }
        catch (AggregateException ex)
        {
            Console.WriteLine($"PLINQ捕获到聚合异常,包含 {ex.InnerExceptions.Count} 个内部异常:");
            
            foreach (var innerEx in ex.InnerExceptions)
            {
                Console.WriteLine($"  - {innerEx.GetType().Name}: {innerEx.Message}");
            }
        }
        
        // 使用ForAll避免聚合异常
        Console.WriteLine("\n使用ForAll处理:");
        data
            .AsParallel()
            .Where(x => x <= 10)
            .ForAll(x =>
            {
                Console.WriteLine($"处理值 {x} [线程{Thread.CurrentThread.ManagedThreadId}]");
            });
    }
    
    private static bool IsPrime(int number)
    {
        if (number < 2) return false;
        if (number == 2) return true;
        if (number % 2 == 0) return false;
        
        var sqrt = (int)Math.Sqrt(number);
        for (int i = 3; i <= sqrt; i += 2)
        {
            if (number % i == 0) return false;
        }
        
        return true;
    }
}

### 4.3 并行集合

```csharp
public class ConcurrentCollectionsDemo
{
    public static async Task DemonstrateConcurrentCollections()
    {
        Console.WriteLine("\n=== 并发集合演示 ===");
        
        Console.WriteLine("\n--- ConcurrentBag演示 ---");
        await DemonstrateConcurrentBag();
        
        Console.WriteLine("\n--- ConcurrentQueue演示 ---");
        await DemonstrateConcurrentQueue();
        
        Console.WriteLine("\n--- ConcurrentStack演示 ---");
        await DemonstrateConcurrentStack();
        
        Console.WriteLine("\n--- ConcurrentDictionary演示 ---");
        await DemonstrateConcurrentDictionary();
        
        Console.WriteLine("\n--- BlockingCollection演示 ---");
        await DemonstrateBlockingCollection();
    }
    
    private static async Task DemonstrateConcurrentBag()
    {
        var concurrentBag = new ConcurrentBag<string>();
        var tasks = new List<Task>();
        
        // 多个任务并发添加项目
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 3; j++)
                {
                    var item = $"任务{taskId}_项目{j}";
                    concurrentBag.Add(item);
                    Console.WriteLine($"添加: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
                    Thread.Sleep(100);
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        
        Console.WriteLine($"ConcurrentBag总数: {concurrentBag.Count}");
        Console.WriteLine($"所有项目: [{string.Join(", ", concurrentBag)}]");
        
        // 尝试取出项目
        while (concurrentBag.TryTake(out string item))
        {
            Console.WriteLine($"取出: {item}");
        }
    }
    
    private static async Task DemonstrateConcurrentQueue()
    {
        var concurrentQueue = new ConcurrentQueue<int>();
        var tasks = new List<Task>();
        
        // 生产者任务
        var producerTask = Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                concurrentQueue.Enqueue(i);
                Console.WriteLine($"生产者入队: {i} [线程{Thread.CurrentThread.ManagedThreadId}]");
                await Task.Delay(200);
            }
        });
        
        // 消费者任务
        var consumerTasks = Enumerable.Range(0, 3).Select(consumerId => Task.Run(async () =>
        {
            while (!producerTask.IsCompleted || !concurrentQueue.IsEmpty)
            {
                if (concurrentQueue.TryDequeue(out int item))
                {
                    Console.WriteLine($"消费者{consumerId}出队: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
                }
                await Task.Delay(300);
            }
        }));
        
        await Task.WhenAll(new[] { producerTask }.Concat(consumerTasks));
        Console.WriteLine($"队列剩余项目数: {concurrentQueue.Count}");
    }
    
    private static async Task DemonstrateConcurrentStack()
    {
        var concurrentStack = new ConcurrentStack<string>();
        var tasks = new List<Task>();
        
        // 推入项目
        for (int i = 0; i < 3; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(() =>
            {
                for (int j = 0; j < 4; j++)
                {
                    var item = $"栈项目{taskId}_{j}";
                    concurrentStack.Push(item);
                    Console.WriteLine($"推入: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
                    Thread.Sleep(150);
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        
        Console.WriteLine($"\n栈中项目数: {concurrentStack.Count}");
        
        // 弹出项目
        while (concurrentStack.TryPop(out string item))
        {
            Console.WriteLine($"弹出: {item}");
        }
    }
    
    private static async Task DemonstrateConcurrentDictionary()
    {
        var concurrentDict = new ConcurrentDictionary<string, int>();
        var tasks = new List<Task>();
        
        // 多个任务并发操作字典
        for (int i = 0; i < 5; i++)
        {
            int taskId = i;
            tasks.Add(Task.Run(() =>
            {
                // 添加或更新
                var key = $"键{taskId}";
                var value = concurrentDict.AddOrUpdate(key, 1, (k, v) => v + 1);
                Console.WriteLine($"任务{taskId} 添加/更新: {key} = {value} [线程{Thread.CurrentThread.ManagedThreadId}]");
                
                // 尝试获取
                if (concurrentDict.TryGetValue(key, out int currentValue))
                {
                    Console.WriteLine($"任务{taskId} 获取: {key} = {currentValue}");
                }
                
                Thread.Sleep(100);
            }));
        }
        
        await Task.WhenAll(tasks);
        
        Console.WriteLine($"\n字典最终状态:");
        foreach (var kvp in concurrentDict)
        {
            Console.WriteLine($"  {kvp.Key}: {kvp.Value}");
        }
        
        // 演示GetOrAdd
        var computedValue = concurrentDict.GetOrAdd("计算键", key =>
        {
            Console.WriteLine($"为键 '{key}' 计算值");
            return key.Length * 10;
        });
        
        Console.WriteLine($"GetOrAdd结果: 计算键 = {computedValue}");
    }
    
    private static async Task DemonstrateBlockingCollection()
    {
        using var blockingCollection = new BlockingCollection<string>(5); // 容量限制为5
        
        // 生产者任务
        var producerTask = Task.Run(async () =>
        {
            try
            {
                for (int i = 0; i < 10; i++)
                {
                    var item = $"项目{i}";
                    blockingCollection.Add(item);
                    Console.WriteLine($"生产者添加: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
                    await Task.Delay(200);
                }
            }
            finally
            {
                blockingCollection.CompleteAdding();
                Console.WriteLine("生产者完成添加");
            }
        });
        
        // 消费者任务
        var consumerTasks = Enumerable.Range(0, 2).Select(consumerId => Task.Run(() =>
        {
            try
            {
                foreach (var item in blockingCollection.GetConsumingEnumerable())
                {
                    Console.WriteLine($"消费者{consumerId}处理: {item} [线程{Thread.CurrentThread.ManagedThreadId}]");
                    Thread.Sleep(400); // 模拟处理时间
                }
            }
            catch (InvalidOperationException)
            {
                Console.WriteLine($"消费者{consumerId}检测到集合已完成");
            }
        }));
        
        await Task.WhenAll(new[] { producerTask }.Concat(consumerTasks));
        Console.WriteLine("BlockingCollection演示完成");
    }
}

5. 并发设计模式

5.1 生产者-消费者模式

public class ProducerConsumerPattern
{
    public static async Task DemonstrateProducerConsumer()
    {
        Console.WriteLine("\n=== 生产者-消费者模式演示 ===");
        
        Console.WriteLine("\n--- 基础生产者-消费者 ---");
        await DemonstrateBasicProducerConsumer();
        
        Console.WriteLine("\n--- 多生产者-多消费者 ---");
        await DemonstrateMultipleProducersConsumers();
        
        Console.WriteLine("\n--- 优先级队列 ---");
        await DemonstratePriorityQueue();
    }
    
    private static async Task DemonstrateBasicProducerConsumer()
    {
        var channel = Channel.CreateUnbounded<WorkItem>();
        var writer = channel.Writer;
        var reader = channel.Reader;
        
        // 生产者
        var producer = Task.Run(async () =>
        {
            for (int i = 1; i <= 10; i++)
            {
                var workItem = new WorkItem { Id = i, Data = $"数据{i}" };
                await writer.WriteAsync(workItem);
                Console.WriteLine($"生产者创建: {workItem}");
                await Task.Delay(500);
            }
            writer.Complete();
            Console.WriteLine("生产者完成");
        });
        
        // 消费者
        var consumer = Task.Run(async () =>
        {
            await foreach (var workItem in reader.ReadAllAsync())
            {
                Console.WriteLine($"消费者处理: {workItem}");
                await Task.Delay(800); // 模拟处理时间
            }
            Console.WriteLine("消费者完成");
        });
        
        await Task.WhenAll(producer, consumer);
    }
    
    private static async Task DemonstrateMultipleProducersConsumers()
    {
        var channel = Channel.CreateBounded<WorkItem>(5);
        var writer = channel.Writer;
        var reader = channel.Reader;
        
        // 多个生产者
        var producers = Enumerable.Range(1, 3).Select(producerId => Task.Run(async () =>
        {
            for (int i = 1; i <= 5; i++)
            {
                var workItem = new WorkItem 
                { 
                    Id = producerId * 100 + i, 
                    Data = $"生产者{producerId}_数据{i}" 
                };
                
                await writer.WriteAsync(workItem);
                Console.WriteLine($"生产者{producerId}创建: {workItem}");
                await Task.Delay(Random.Shared.Next(200, 600));
            }
            Console.WriteLine($"生产者{producerId}完成");
        }));
        
        // 多个消费者
        var consumers = Enumerable.Range(1, 2).Select(consumerId => Task.Run(async () =>
        {
            await foreach (var workItem in reader.ReadAllAsync())
            {
                Console.WriteLine($"消费者{consumerId}处理: {workItem}");
                await Task.Delay(Random.Shared.Next(400, 1000));
            }
            Console.WriteLine($"消费者{consumerId}完成");
        }));
        
        // 等待所有生产者完成,然后关闭写入器
        _ = Task.Run(async () =>
        {
            await Task.WhenAll(producers);
            writer.Complete();
            Console.WriteLine("所有生产者完成,关闭通道");
        });
        
        await Task.WhenAll(consumers);
    }
    
    private static async Task DemonstratePriorityQueue()
    {
        var priorityQueue = new PriorityQueue<WorkItem, int>();
        var semaphore = new SemaphoreSlim(0);
        var isCompleted = false;
        
        // 生产者(创建不同优先级的任务)
        var producer = Task.Run(async () =>
        {
            var priorities = new[] { 1, 3, 2, 1, 3, 2, 1 };
            
            for (int i = 0; i < priorities.Length; i++)
            {
                var workItem = new WorkItem 
                { 
                    Id = i + 1, 
                    Data = $"优先级{priorities[i]}_任务{i + 1}" 
                };
                
                lock (priorityQueue)
                {
                    priorityQueue.Enqueue(workItem, priorities[i]);
                }
                
                semaphore.Release();
                Console.WriteLine($"添加任务: {workItem} (优先级: {priorities[i]})");
                await Task.Delay(300);
            }
            
            isCompleted = true;
            semaphore.Release(); // 释放信号量以唤醒消费者
            Console.WriteLine("生产者完成");
        });
        
        // 消费者(按优先级处理任务)
        var consumer = Task.Run(async () =>
        {
            while (!isCompleted || priorityQueue.Count > 0)
            {
                await semaphore.WaitAsync();
                
                WorkItem workItem;
                lock (priorityQueue)
                {
                    if (priorityQueue.Count == 0) continue;
                    workItem = priorityQueue.Dequeue();
                }
                
                Console.WriteLine($"处理高优先级任务: {workItem}");
                await Task.Delay(500);
            }
            Console.WriteLine("消费者完成");
        });
        
        await Task.WhenAll(producer, consumer);
    }
}

public class WorkItem
{
    public int Id { get; set; }
    public string Data { get; set; }
    
    public override string ToString() => $"WorkItem(Id={Id}, Data={Data})";
}

### 5.2 读写锁模式

```csharp
public class ReaderWriterPattern
{
    private readonly ReaderWriterLockSlim _lock = new();
    private readonly Dictionary<string, string> _cache = new();
    private readonly Random _random = new();
    
    public static async Task DemonstrateReaderWriterPattern()
    {
        Console.WriteLine("\n=== 读写锁模式演示 ===");
        
        var pattern = new ReaderWriterPattern();
        
        // 初始化一些数据
        pattern.Write("key1", "value1");
        pattern.Write("key2", "value2");
        pattern.Write("key3", "value3");
        
        var tasks = new List<Task>();
        
        // 创建多个读取任务
        for (int i = 0; i < 5; i++)
        {
            int readerId = i + 1;
            tasks.Add(Task.Run(async () =>
            {
                for (int j = 0; j < 10; j++)
                {
                    var key = $"key{pattern._random.Next(1, 4)}";
                    var value = pattern.Read(key);
                    Console.WriteLine($"读取者{readerId}: {key} = {value} [线程{Thread.CurrentThread.ManagedThreadId}]");
                    await Task.Delay(pattern._random.Next(100, 300));
                }
            }));
        }
        
        // 创建少量写入任务
        for (int i = 0; i < 2; i++)
        {
            int writerId = i + 1;
            tasks.Add(Task.Run(async () =>
            {
                for (int j = 0; j < 5; j++)
                {
                    var key = $"key{pattern._random.Next(1, 6)}";
                    var value = $"writer{writerId}_value{j}";
                    pattern.Write(key, value);
                    Console.WriteLine($"写入者{writerId}: {key} = {value} [线程{Thread.CurrentThread.ManagedThreadId}]");
                    await Task.Delay(pattern._random.Next(500, 1000));
                }
            }));
        }
        
        await Task.WhenAll(tasks);
        
        Console.WriteLine("\n最终缓存状态:");
        foreach (var kvp in pattern._cache)
        {
            Console.WriteLine($"  {kvp.Key}: {kvp.Value}");
        }
    }
    
    public string Read(string key)
    {
        _lock.EnterReadLock();
        try
        {
            Thread.Sleep(50); // 模拟读取时间
            return _cache.TryGetValue(key, out var value) ? value : null;
        }
        finally
        {
            _lock.ExitReadLock();
        }
    }
    
    public void Write(string key, string value)
    {
        _lock.EnterWriteLock();
        try
        {
            Thread.Sleep(100); // 模拟写入时间
            _cache[key] = value;
        }
        finally
        {
            _lock.ExitWriteLock();
        }
    }
    
    public bool TryUpgradeRead(string key, Func<string, string> updateFunc)
    {
        _lock.EnterUpgradeableReadLock();
        try
        {
            if (_cache.TryGetValue(key, out var currentValue))
            {
                var newValue = updateFunc(currentValue);
                
                _lock.EnterWriteLock();
                try
                {
                    _cache[key] = newValue;
                    Console.WriteLine($"升级锁更新: {key} = {newValue}");
                    return true;
                }
                finally
                {
                    _lock.ExitWriteLock();
                }
            }
            return false;
        }
        finally
        {
            _lock.ExitUpgradeableReadLock();
        }
    }
}

### 5.3 Actor模式

```csharp
public abstract class Actor
{
    private readonly Channel<object> _mailbox;
    private readonly ChannelWriter<object> _writer;
    private readonly ChannelReader<object> _reader;
    private readonly CancellationTokenSource _cancellationTokenSource;
    private readonly Task _processingTask;
    
    protected Actor()
    {
        _mailbox = Channel.CreateUnbounded<object>();
        _writer = _mailbox.Writer;
        _reader = _mailbox.Reader;
        _cancellationTokenSource = new CancellationTokenSource();
        
        _processingTask = Task.Run(ProcessMessages);
    }
    
    public async Task SendAsync(object message)
    {
        await _writer.WriteAsync(message);
    }
    
    public void Send(object message)
    {
        if (!_writer.TryWrite(message))
        {
            throw new InvalidOperationException("无法发送消息到Actor");
        }
    }
    
    protected abstract Task HandleMessage(object message);
    
    private async Task ProcessMessages()
    {
        try
        {
            await foreach (var message in _reader.ReadAllAsync(_cancellationTokenSource.Token))
            {
                try
                {
                    await HandleMessage(message);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"Actor处理消息时发生错误: {ex.Message}");
                }
            }
        }
        catch (OperationCanceledException)
        {
            Console.WriteLine("Actor停止处理消息");
        }
    }
    
    public async Task StopAsync()
    {
        _writer.Complete();
        _cancellationTokenSource.Cancel();
        await _processingTask;
        _cancellationTokenSource.Dispose();
    }
}

public class CounterActor : Actor
{
    private int _count = 0;
    private readonly string _name;
    
    public CounterActor(string name)
    {
        _name = name;
    }
    
    protected override async Task HandleMessage(object message)
    {
        switch (message)
        {
            case "increment":
                _count++;
                Console.WriteLine($"{_name}: 计数器递增到 {_count} [线程{Thread.CurrentThread.ManagedThreadId}]");
                break;
                
            case "decrement":
                _count--;
                Console.WriteLine($"{_name}: 计数器递减到 {_count} [线程{Thread.CurrentThread.ManagedThreadId}]");
                break;
                
            case "get":
                Console.WriteLine($"{_name}: 当前计数 {_count} [线程{Thread.CurrentThread.ManagedThreadId}]");
                break;
                
            case "reset":
                _count = 0;
                Console.WriteLine($"{_name}: 计数器重置 [线程{Thread.CurrentThread.ManagedThreadId}]");
                break;
                
            default:
                Console.WriteLine($"{_name}: 未知消息类型: {message}");
                break;
        }
        
        // 模拟处理时间
        await Task.Delay(100);
    }
}

public class ActorPatternDemo
{
    public static async Task DemonstrateActorPattern()
    {
        Console.WriteLine("\n=== Actor模式演示 ===");
        
        var actor1 = new CounterActor("Actor1");
        var actor2 = new CounterActor("Actor2");
        
        var tasks = new List<Task>();
        
        // 向Actor1发送消息
        tasks.Add(Task.Run(async () =>
        {
            for (int i = 0; i < 5; i++)
            {
                await actor1.SendAsync("increment");
                await Task.Delay(200);
            }
            
            await actor1.SendAsync("get");
            await actor1.SendAsync("reset");
            await actor1.SendAsync("get");
        }));
        
        // 向Actor2发送消息
        tasks.Add(Task.Run(async () =>
        {
            for (int i = 0; i < 3; i++)
            {
                await actor2.SendAsync("increment");
                await actor2.SendAsync("increment");
                await actor2.SendAsync("decrement");
                await Task.Delay(300);
            }
            
            await actor2.SendAsync("get");
        }));
        
        // 并发向两个Actor发送消息
        tasks.Add(Task.Run(async () =>
        {
            for (int i = 0; i < 10; i++)
            {
                if (i % 2 == 0)
                {
                    actor1.Send("increment");
                }
                else
                {
                    actor2.Send("increment");
                }
                await Task.Delay(150);
            }
        }));
        
        await Task.WhenAll(tasks);
        
        // 等待一段时间让所有消息处理完成
        await Task.Delay(1000);
        
        // 停止Actor
        await actor1.StopAsync();
        await actor2.StopAsync();
        
        Console.WriteLine("Actor模式演示完成");
    }
}

6. 最佳实践和性能优化

6.1 线程安全最佳实践

public class ThreadSafetyBestPractices
{
    public static void DemonstrateBestPractices()
    {
        Console.WriteLine("\n=== 线程安全最佳实践 ===");
        
        Console.WriteLine("\n--- 不可变对象 ---");
        DemonstrateImmutableObjects();
        
        Console.WriteLine("\n--- 线程本地存储 ---");
        DemonstrateThreadLocalStorage();
        
        Console.WriteLine("\n--- 锁的最佳实践 ---");
        DemonstrateLockingBestPractices();
    }
    
    private static void DemonstrateImmutableObjects()
    {
        // 不可变对象天然线程安全
        var immutableData = new ImmutableData("初始值", 42);
        
        var tasks = Enumerable.Range(0, 5).Select(i => Task.Run(() =>
        {
            // 读取操作完全安全
            Console.WriteLine($"任务{i}: {immutableData.Name} = {immutableData.Value} [线程{Thread.CurrentThread.ManagedThreadId}]");
            
            // 修改操作返回新实例
            var newData = immutableData.WithValue(immutableData.Value + i);
            Console.WriteLine($"任务{i}: 新实例 {newData.Name} = {newData.Value}");
        }));
        
        Task.WaitAll(tasks.ToArray());
    }
    
    private static void DemonstrateThreadLocalStorage()
    {
        var threadLocalCounter = new ThreadLocal<int>(() => 0);
        
        var tasks = Enumerable.Range(0, 5).Select(i => Task.Run(() =>
        {
            // 每个线程都有自己的计数器实例
            for (int j = 0; j < 3; j++)
            {
                threadLocalCounter.Value++;
                Console.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId}: 计数器 = {threadLocalCounter.Value}");
                Thread.Sleep(100);
            }
        }));
        
        Task.WaitAll(tasks.ToArray());
        
        threadLocalCounter.Dispose();
    }
    
    private static void DemonstrateLockingBestPractices()
    {
        var goodLockingExample = new GoodLockingExample();
        
        var tasks = Enumerable.Range(0, 10).Select(i => Task.Run(() =>
        {
            goodLockingExample.SafeOperation(i);
        }));
        
        Task.WaitAll(tasks.ToArray());
        
        Console.WriteLine($"最终计数: {goodLockingExample.GetCount()}");
    }
}

// 不可变数据类
public record ImmutableData(string Name, int Value)
{
    public ImmutableData WithValue(int newValue) => this with { Value = newValue };
    public ImmutableData WithName(string newName) => this with { Name = newName };
}

// 良好的锁使用示例
public class GoodLockingExample
{
    private readonly object _lock = new object();
    private int _count = 0;
    private readonly List<string> _log = new List<string>();
    
    public void SafeOperation(int operationId)
    {
        // 最小化锁的持有时间
        string logEntry;
        
        lock (_lock)
        {
            _count++;
            logEntry = $"操作{operationId}: 计数 = {_count} [线程{Thread.CurrentThread.ManagedThreadId}]";
            _log.Add(logEntry);
        }
        
        // 在锁外执行耗时操作
        Console.WriteLine(logEntry);
        Thread.Sleep(50); // 模拟耗时操作
    }
    
    public int GetCount()
    {
        lock (_lock)
        {
            return _count;
        }
    }
    
    public List<string> GetLog()
    {
        lock (_lock)
        {
            return new List<string>(_log); // 返回副本
        }
    }
}

### 6.2 性能监控和调试

```csharp
public class PerformanceMonitoring
{
    public static async Task DemonstratePerformanceMonitoring()
    {
        Console.WriteLine("\n=== 性能监控和调试 ===");
        
        Console.WriteLine("\n--- 线程池监控 ---");
        MonitorThreadPool();
        
        Console.WriteLine("\n--- 任务性能测量 ---");
        await MeasureTaskPerformance();
        
        Console.WriteLine("\n--- 内存使用监控 ---");
        await MonitorMemoryUsage();
    }
    
    private static void MonitorThreadPool()
    {
        ThreadPool.GetAvailableThreads(out int workerThreads, out int completionPortThreads);
        ThreadPool.GetMaxThreads(out int maxWorkerThreads, out int maxCompletionPortThreads);
        ThreadPool.GetMinThreads(out int minWorkerThreads, out int minCompletionPortThreads);
        
        Console.WriteLine($"线程池状态:");
        Console.WriteLine($"  可用工作线程: {workerThreads}/{maxWorkerThreads}");
        Console.WriteLine($"  可用I/O线程: {completionPortThreads}/{maxCompletionPortThreads}");
        Console.WriteLine($"  最小工作线程: {minWorkerThreads}");
        Console.WriteLine($"  最小I/O线程: {minCompletionPortThreads}");
        Console.WriteLine($"  处理器核心数: {Environment.ProcessorCount}");
    }
    
    private static async Task MeasureTaskPerformance()
    {
        const int taskCount = 100;
        const int workDuration = 10;
        
        // 测量串行执行
        var serialStopwatch = Stopwatch.StartNew();
        for (int i = 0; i < taskCount; i++)
        {
            await SimulateWork(workDuration);
        }
        serialStopwatch.Stop();
        
        // 测量并行执行
        var parallelStopwatch = Stopwatch.StartNew();
        var parallelTasks = Enumerable.Range(0, taskCount)
            .Select(_ => SimulateWork(workDuration));
        await Task.WhenAll(parallelTasks);
        parallelStopwatch.Stop();
        
        // 测量限制并发度的并行执行
        var semaphore = new SemaphoreSlim(Environment.ProcessorCount);
        var limitedParallelStopwatch = Stopwatch.StartNew();
        var limitedTasks = Enumerable.Range(0, taskCount)
            .Select(async _ =>
            {
                await semaphore.WaitAsync();
                try
                {
                    await SimulateWork(workDuration);
                }
                finally
                {
                    semaphore.Release();
                }
            });
        await Task.WhenAll(limitedTasks);
        limitedParallelStopwatch.Stop();
        
        Console.WriteLine($"性能对比 ({taskCount}个任务, 每个{workDuration}ms):");
        Console.WriteLine($"  串行执行: {serialStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"  无限制并行: {parallelStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"  限制并发度: {limitedParallelStopwatch.ElapsedMilliseconds}ms");
        Console.WriteLine($"  并行加速比: {(double)serialStopwatch.ElapsedMilliseconds / parallelStopwatch.ElapsedMilliseconds:F2}x");
    }
    
    private static async Task MonitorMemoryUsage()
    {
        var initialMemory = GC.GetTotalMemory(false);
        Console.WriteLine($"初始内存使用: {initialMemory / 1024 / 1024:F2} MB");
        
        // 创建大量任务来观察内存使用
        var tasks = new List<Task>();
        for (int i = 0; i < 1000; i++)
        {
            tasks.Add(Task.Run(async () =>
            {
                var data = new byte[1024]; // 1KB数据
                await Task.Delay(100);
                return data.Length;
            }));
        }
        
        var peakMemory = GC.GetTotalMemory(false);
        Console.WriteLine($"峰值内存使用: {peakMemory / 1024 / 1024:F2} MB");
        Console.WriteLine($"内存增长: {(peakMemory - initialMemory) / 1024 / 1024:F2} MB");
        
        await Task.WhenAll(tasks);
        
        // 强制垃圾回收
        GC.Collect();
        GC.WaitForPendingFinalizers();
        GC.Collect();
        
        var finalMemory = GC.GetTotalMemory(false);
        Console.WriteLine($"垃圾回收后内存: {finalMemory / 1024 / 1024:F2} MB");
        
        Console.WriteLine($"GC统计:");
        Console.WriteLine($"  Gen 0 回收次数: {GC.CollectionCount(0)}");
        Console.WriteLine($"  Gen 1 回收次数: {GC.CollectionCount(1)}");
        Console.WriteLine($"  Gen 2 回收次数: {GC.CollectionCount(2)}");
    }
    
    private static async Task<int> SimulateWork(int durationMs)
    {
        await Task.Delay(durationMs);
        return Thread.CurrentThread.ManagedThreadId;
    }
}

7. 实践练习

7.1 并发Web爬虫

public class ConcurrentWebCrawler
{
    private readonly HttpClient _httpClient;
    private readonly SemaphoreSlim _semaphore;
    private readonly ConcurrentDictionary<string, bool> _visitedUrls;
    private readonly ConcurrentBag<CrawlResult> _results;
    
    public ConcurrentWebCrawler(int maxConcurrency = 5)
    {
        _httpClient = new HttpClient();
        _semaphore = new SemaphoreSlim(maxConcurrency);
        _visitedUrls = new ConcurrentDictionary<string, bool>();
        _results = new ConcurrentBag<CrawlResult>();
    }
    
    public async Task<List<CrawlResult>> CrawlAsync(List<string> urls, CancellationToken cancellationToken = default)
    {
        Console.WriteLine($"开始爬取 {urls.Count} 个URL,最大并发数: {_semaphore.CurrentCount}");
        
        var tasks = urls.Select(url => CrawlUrlAsync(url, cancellationToken));
        await Task.WhenAll(tasks);
        
        return _results.ToList();
    }
    
    private async Task CrawlUrlAsync(string url, CancellationToken cancellationToken)
    {
        if (!_visitedUrls.TryAdd(url, true))
        {
            Console.WriteLine($"URL已访问,跳过: {url}");
            return;
        }
        
        await _semaphore.WaitAsync(cancellationToken);
        
        try
        {
            var stopwatch = Stopwatch.StartNew();
            Console.WriteLine($"开始爬取: {url} [线程{Thread.CurrentThread.ManagedThreadId}]");
            
            var response = await _httpClient.GetAsync(url, cancellationToken);
            var content = await response.Content.ReadAsStringAsync(cancellationToken);
            
            stopwatch.Stop();
            
            var result = new CrawlResult
            {
                Url = url,
                StatusCode = response.StatusCode,
                ContentLength = content.Length,
                Duration = stopwatch.Elapsed,
                ThreadId = Thread.CurrentThread.ManagedThreadId,
                Timestamp = DateTime.Now
            };
            
            _results.Add(result);
            
            Console.WriteLine($"完成爬取: {url} - {response.StatusCode} ({content.Length} 字符, {stopwatch.ElapsedMilliseconds}ms)");
        }
        catch (Exception ex)
        {
            var errorResult = new CrawlResult
            {
                Url = url,
                Error = ex.Message,
                ThreadId = Thread.CurrentThread.ManagedThreadId,
                Timestamp = DateTime.Now
            };
            
            _results.Add(errorResult);
            Console.WriteLine($"爬取失败: {url} - {ex.Message}");
        }
        finally
        {
            _semaphore.Release();
        }
    }
    
    public void Dispose()
    {
        _httpClient?.Dispose();
        _semaphore?.Dispose();
    }
}

public class CrawlResult
{
    public string Url { get; set; }
    public HttpStatusCode StatusCode { get; set; }
    public int ContentLength { get; set; }
    public TimeSpan Duration { get; set; }
    public int ThreadId { get; set; }
    public DateTime Timestamp { get; set; }
    public string Error { get; set; }
    
    public bool IsSuccess => string.IsNullOrEmpty(Error);
}

### 7.2 并发数据处理管道

```csharp
public class DataProcessingPipeline
{
    public static async Task DemonstrateDataPipeline()
    {
        Console.WriteLine("\n=== 数据处理管道演示 ===");
        
        var pipeline = new Pipeline<int, string>();
        
        // 阶段1: 数据生成
        pipeline.AddStage("数据生成", async data =>
        {
            await Task.Delay(50); // 模拟数据生成时间
            return data * 2;
        });
        
        // 阶段2: 数据验证
        pipeline.AddStage("数据验证", async data =>
        {
            await Task.Delay(30);
            if (data < 0) throw new InvalidOperationException("负数无效");
            return data;
        });
        
        // 阶段3: 数据转换
        pipeline.AddStage("数据转换", async data =>
        {
            await Task.Delay(40);
            return $"处理后的数据: {data}";
        });
        
        // 处理数据
        var inputData = Enumerable.Range(-5, 11).ToList();
        var results = await pipeline.ProcessAsync(inputData);
        
        Console.WriteLine("\n处理结果:");
        foreach (var result in results)
        {
            if (result.IsSuccess)
            {
                Console.WriteLine($"成功: {result.Data}");
            }
            else
            {
                Console.WriteLine($"失败: {result.Error}");
            }
        }
    }
}

public class Pipeline<TInput, TOutput>
{
    private readonly List<PipelineStage> _stages = new();
    
    public void AddStage<T>(string name, Func<object, Task<T>> processor)
    {
        _stages.Add(new PipelineStage
        {
            Name = name,
            Processor = async data => await processor(data)
        });
    }
    
    public async Task<List<PipelineResult<TOutput>>> ProcessAsync(List<TInput> inputs)
    {
        var results = new List<PipelineResult<TOutput>>();
        
        var tasks = inputs.Select(async input =>
        {
            try
            {
                object currentData = input;
                
                foreach (var stage in _stages)
                {
                    Console.WriteLine($"阶段 '{stage.Name}' 处理数据: {currentData} [线程{Thread.CurrentThread.ManagedThreadId}]");
                    currentData = await stage.Processor(currentData);
                }
                
                return new PipelineResult<TOutput>
                {
                    IsSuccess = true,
                    Data = (TOutput)currentData
                };
            }
            catch (Exception ex)
            {
                return new PipelineResult<TOutput>
                {
                    IsSuccess = false,
                    Error = ex.Message
                };
            }
        });
        
        var taskResults = await Task.WhenAll(tasks);
        return taskResults.ToList();
    }
    
    private class PipelineStage
    {
        public string Name { get; set; }
        public Func<object, Task<object>> Processor { get; set; }
    }
}

public class PipelineResult<T>
{
    public bool IsSuccess { get; set; }
    public T Data { get; set; }
    public string Error { get; set; }
}

8. 章节总结

本章深入介绍了C#中的多线程和并发编程,涵盖了以下核心内容:

核心概念

  • 线程基础: 线程生命周期、前台/后台线程、线程池的使用
  • Task和异步编程: Taskasync/await、取消令牌的使用
  • 线程同步: 锁机制、信号量、事件、原子操作
  • 并行编程: Parallel类、PLINQ、并发集合

重要技能

  • 掌握异步编程模式,提高应用程序响应性
  • 理解线程安全和同步机制,避免竞态条件
  • 学会使用并行编程技术,充分利用多核处理器
  • 掌握并发设计模式,构建可扩展的应用程序

最佳实践

  • 优先使用async/await而不是直接操作线程
  • 使用不可变对象和线程安全集合
  • 最小化锁的持有时间,避免死锁
  • 合理控制并发度,避免资源耗尽
  • 进行性能监控和调试,优化并发性能

实际应用

  • Web应用程序的异步处理
  • 数据处理管道和批处理
  • 并发网络请求和I/O操作
  • 实时系统和高性能计算

掌握多线程和并发编程是现代C#开发的重要技能,它能帮助你构建高性能、可扩展的应用程序。下一章我们将学习反射和特性,探索C#的元编程能力。