11.1 本章概述

11.1.1 学习目标

本章将深入学习两种重要的行为型设计模式:

  1. 责任链模式(Chain of Responsibility Pattern)

    • 理解责任链模式的核心思想和应用场景
    • 掌握责任链的构建和请求传递机制
    • 学习不同类型的责任链实现方式
    • 了解责任链模式在实际项目中的应用
  2. 中介者模式(Mediator Pattern)

    • 理解中介者模式的设计理念和优势
    • 掌握中介者与同事类的协作机制
    • 学习复杂交互系统的解耦设计
    • 了解中介者模式的实际应用场景

11.1.2 应用场景

责任链模式适用于: - 请求处理系统(审批流程、权限验证) - 事件处理链(GUI事件、中间件) - 数据处理管道(过滤器、转换器) - 异常处理机制(错误处理链)

中介者模式适用于: - 复杂的用户界面交互 - 多对象协作系统 - 聊天室、论坛等通信系统 - 工作流管理系统


11.2 责任链模式

11.2.1 模式定义

责任链模式是一种行为型设计模式,它为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。

11.2.2 模式动机

在软件开发中,我们经常遇到这样的情况: - 有多个对象可以处理同一个请求 - 具体哪个对象处理该请求在运行时确定 - 不希望请求发送者与接收者耦合 - 希望动态地指定处理请求的对象集合

责任链模式通过将请求的发送者和接收者解耦,让多个对象都有机会处理请求,直到有一个对象处理它为止。

11.2.3 模式结构

┌─────────────────┐    ┌─────────────────┐
│     Client      │───▶│    Handler      │
└─────────────────┘    │  (Abstract)     │
                       │ + handleRequest │
                       │ + setNext       │
                       └─────────────────┘
                               △
                               │
                    ┌──────────┼──────────┐
                    │          │          │
            ┌───────▼──┐ ┌─────▼────┐ ┌───▼──────┐
            │Handler1  │ │Handler2  │ │Handler3  │
            │          │ │          │ │          │
            └──────────┘ └──────────┘ └──────────┘

主要角色: 1. Handler(抽象处理者):定义处理请求的接口,包含指向下一个处理者的引用 2. ConcreteHandler(具体处理者):实现处理请求的具体逻辑 3. Client(客户端):创建责任链并发送请求

11.2.4 Python实现示例:请求审批系统

from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Dict, Any
import time

class RequestType(Enum):
    """请求类型枚举"""
    LEAVE = "请假申请"
    EXPENSE = "费用报销"
    PURCHASE = "采购申请"
    PROMOTION = "晋升申请"
    BUDGET = "预算申请"

class Request:
    """请求对象"""
    
    def __init__(self, request_id: str, request_type: RequestType, 
                 amount: float, description: str, requester: str):
        self.request_id = request_id
        self.request_type = request_type
        self.amount = amount
        self.description = description
        self.requester = requester
        self.timestamp = time.time()
        self.processing_history = []
    
    def add_processing_record(self, handler_name: str, action: str, comment: str = ""):
        """添加处理记录"""
        self.processing_history.append({
            'handler': handler_name,
            'action': action,
            'comment': comment,
            'timestamp': time.time()
        })
    
    def __str__(self):
        return f"Request({self.request_id}, {self.request_type.value}, {self.amount}, {self.requester})"

class ApprovalHandler(ABC):
    """抽象审批处理者"""
    
    def __init__(self, name: str):
        self.name = name
        self._next_handler: Optional['ApprovalHandler'] = None
    
    def set_next(self, handler: 'ApprovalHandler') -> 'ApprovalHandler':
        """设置下一个处理者"""
        self._next_handler = handler
        return handler
    
    def handle_request(self, request: Request) -> bool:
        """处理请求"""
        if self.can_handle(request):
            return self.process_request(request)
        elif self._next_handler:
            print(f"{self.name} 无法处理请求,转发给下一个处理者")
            request.add_processing_record(self.name, "转发", "超出处理权限")
            return self._next_handler.handle_request(request)
        else:
            print(f"没有合适的处理者可以处理请求: {request}")
            request.add_processing_record(self.name, "拒绝", "无合适处理者")
            return False
    
    @abstractmethod
    def can_handle(self, request: Request) -> bool:
        """判断是否可以处理请求"""
        pass
    
    @abstractmethod
    def process_request(self, request: Request) -> bool:
        """处理请求的具体逻辑"""
        pass

class TeamLeaderHandler(ApprovalHandler):
    """团队领导处理者"""
    
    def __init__(self):
        super().__init__("团队领导")
        self.max_amount = 5000
        self.supported_types = {RequestType.LEAVE, RequestType.EXPENSE}
    
    def can_handle(self, request: Request) -> bool:
        """团队领导可以处理小额请假和报销"""
        return (request.request_type in self.supported_types and 
                request.amount <= self.max_amount)
    
    def process_request(self, request: Request) -> bool:
        """处理请求"""
        print(f"{self.name} 正在处理: {request}")
        
        # 模拟审批逻辑
        if request.request_type == RequestType.LEAVE and request.amount > 3:
            print(f"{self.name}: 请假天数过多,需要上级审批")
            request.add_processing_record(self.name, "转发", "请假天数超过权限")
            return self._next_handler.handle_request(request) if self._next_handler else False
        
        print(f"{self.name}: 请求已批准")
        request.add_processing_record(self.name, "批准", "符合团队政策")
        return True

class DepartmentManagerHandler(ApprovalHandler):
    """部门经理处理者"""
    
    def __init__(self):
        super().__init__("部门经理")
        self.max_amount = 20000
        self.supported_types = {RequestType.LEAVE, RequestType.EXPENSE, RequestType.PURCHASE}
    
    def can_handle(self, request: Request) -> bool:
        """部门经理可以处理中等金额的多种请求"""
        return (request.request_type in self.supported_types and 
                request.amount <= self.max_amount)
    
    def process_request(self, request: Request) -> bool:
        """处理请求"""
        print(f"{self.name} 正在处理: {request}")
        
        # 模拟复杂的审批逻辑
        if request.request_type == RequestType.PURCHASE:
            if "紧急" not in request.description:
                print(f"{self.name}: 非紧急采购需要详细评估")
                # 可以添加额外的验证逻辑
        
        print(f"{self.name}: 请求已批准")
        request.add_processing_record(self.name, "批准", "符合部门政策")
        return True

class DirectorHandler(ApprovalHandler):
    """总监处理者"""
    
    def __init__(self):
        super().__init__("总监")
        self.max_amount = 100000
        self.supported_types = {RequestType.EXPENSE, RequestType.PURCHASE, 
                              RequestType.PROMOTION, RequestType.BUDGET}
    
    def can_handle(self, request: Request) -> bool:
        """总监可以处理大额和重要请求"""
        return (request.request_type in self.supported_types and 
                request.amount <= self.max_amount)
    
    def process_request(self, request: Request) -> bool:
        """处理请求"""
        print(f"{self.name} 正在处理: {request}")
        
        # 模拟高级审批逻辑
        if request.request_type == RequestType.PROMOTION:
            print(f"{self.name}: 晋升申请需要HR部门配合审核")
            # 可以触发其他流程
        
        if request.amount > 50000:
            print(f"{self.name}: 大额申请需要额外审核")
            # 可以要求额外文档或审批
        
        print(f"{self.name}: 请求已批准")
        request.add_processing_record(self.name, "批准", "符合公司政策")
        return True

class CEOHandler(ApprovalHandler):
    """CEO处理者"""
    
    def __init__(self):
        super().__init__("CEO")
    
    def can_handle(self, request: Request) -> bool:
        """CEO可以处理所有请求"""
        return True
    
    def process_request(self, request: Request) -> bool:
        """处理请求"""
        print(f"{self.name} 正在处理: {request}")
        
        # CEO级别的审批逻辑
        if request.amount > 500000:
            print(f"{self.name}: 超大额申请需要董事会审批")
            request.add_processing_record(self.name, "转发", "需要董事会审批")
            return False
        
        print(f"{self.name}: 请求已批准")
        request.add_processing_record(self.name, "批准", "CEO最终批准")
        return True

class ApprovalSystem:
    """审批系统"""
    
    def __init__(self):
        self.chain = self._build_approval_chain()
        self.processed_requests = []
    
    def _build_approval_chain(self) -> ApprovalHandler:
        """构建审批链"""
        team_leader = TeamLeaderHandler()
        department_manager = DepartmentManagerHandler()
        director = DirectorHandler()
        ceo = CEOHandler()
        
        # 构建责任链
        team_leader.set_next(department_manager).set_next(director).set_next(ceo)
        
        return team_leader
    
    def submit_request(self, request: Request) -> bool:
        """提交请求"""
        print(f"\n=== 处理请求: {request} ===")
        result = self.chain.handle_request(request)
        self.processed_requests.append(request)
        
        print(f"\n--- 处理历史 ---")
        for record in request.processing_history:
            print(f"{record['handler']}: {record['action']} - {record['comment']}")
        
        return result
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        total_requests = len(self.processed_requests)
        approved_requests = sum(1 for req in self.processed_requests 
                              if any(record['action'] == '批准' for record in req.processing_history))
        
        handler_stats = {}
        for request in self.processed_requests:
            for record in request.processing_history:
                handler = record['handler']
                action = record['action']
                
                if handler not in handler_stats:
                    handler_stats[handler] = {'批准': 0, '转发': 0, '拒绝': 0}
                
                if action in handler_stats[handler]:
                    handler_stats[handler][action] += 1
        
        return {
            'total_requests': total_requests,
            'approved_requests': approved_requests,
            'approval_rate': approved_requests / total_requests if total_requests > 0 else 0,
            'handler_statistics': handler_stats
        }

def demonstrate_chain_of_responsibility():
    """演示责任链模式"""
    print("=== 责任链模式演示:请求审批系统 ===")
    
    # 创建审批系统
    approval_system = ApprovalSystem()
    
    # 创建各种请求
    requests = [
        Request("REQ001", RequestType.LEAVE, 2, "病假2天", "张三"),
        Request("REQ002", RequestType.EXPENSE, 3000, "出差费用报销", "李四"),
        Request("REQ003", RequestType.PURCHASE, 15000, "紧急采购办公设备", "王五"),
        Request("REQ004", RequestType.PROMOTION, 0, "高级工程师晋升", "赵六"),
        Request("REQ005", RequestType.BUDGET, 80000, "Q4市场推广预算", "钱七"),
        Request("REQ006", RequestType.EXPENSE, 25000, "大型会议费用", "孙八"),
        Request("REQ007", RequestType.LEAVE, 10, "年假申请", "周九"),
    ]
    
    # 处理所有请求
    for request in requests:
        approval_system.submit_request(request)
    
    # 显示统计信息
    print("\n=== 系统统计信息 ===")
    stats = approval_system.get_statistics()
    print(f"总请求数: {stats['total_requests']}")
    print(f"批准请求数: {stats['approved_requests']}")
    print(f"批准率: {stats['approval_rate']:.2%}")
    
    print("\n--- 各处理者统计 ---")
    for handler, actions in stats['handler_statistics'].items():
        print(f"{handler}: 批准={actions['批准']}, 转发={actions['转发']}, 拒绝={actions['拒绝']}")

if __name__ == "__main__":
    demonstrate_chain_of_responsibility()

11.2.5 Java实现示例:日志处理系统

// 抽象日志处理器
abstract class LogHandler {
    protected LogHandler nextHandler;
    protected String name;
    
    public LogHandler(String name) {
        this.name = name;
    }
    
    public LogHandler setNext(LogHandler handler) {
        this.nextHandler = handler;
        return handler;
    }
    
    public void handle(LogLevel level, String message) {
        if (canHandle(level)) {
            processLog(level, message);
        } else if (nextHandler != null) {
            nextHandler.handle(level, message);
        }
    }
    
    protected abstract boolean canHandle(LogLevel level);
    protected abstract void processLog(LogLevel level, String message);
}

// 日志级别枚举
enum LogLevel {
    DEBUG(1), INFO(2), WARN(3), ERROR(4), FATAL(5);
    
    private final int level;
    
    LogLevel(int level) {
        this.level = level;
    }
    
    public int getLevel() {
        return level;
    }
}

// 控制台日志处理器
class ConsoleLogHandler extends LogHandler {
    private LogLevel minLevel;
    
    public ConsoleLogHandler(LogLevel minLevel) {
        super("Console Logger");
        this.minLevel = minLevel;
    }
    
    @Override
    protected boolean canHandle(LogLevel level) {
        return level.getLevel() >= minLevel.getLevel();
    }
    
    @Override
    protected void processLog(LogLevel level, String message) {
        System.out.println(String.format("[%s] %s: %s", 
            name, level, message));
    }
}

// 文件日志处理器
class FileLogHandler extends LogHandler {
    private LogLevel minLevel;
    private String filename;
    
    public FileLogHandler(LogLevel minLevel, String filename) {
        super("File Logger");
        this.minLevel = minLevel;
        this.filename = filename;
    }
    
    @Override
    protected boolean canHandle(LogLevel level) {
        return level.getLevel() >= minLevel.getLevel();
    }
    
    @Override
    protected void processLog(LogLevel level, String message) {
        // 模拟写入文件
        System.out.println(String.format("[%s] Writing to %s - %s: %s", 
            name, filename, level, message));
    }
}

// 邮件日志处理器
class EmailLogHandler extends LogHandler {
    private LogLevel minLevel;
    private String emailAddress;
    
    public EmailLogHandler(LogLevel minLevel, String emailAddress) {
        super("Email Logger");
        this.minLevel = minLevel;
        this.emailAddress = emailAddress;
    }
    
    @Override
    protected boolean canHandle(LogLevel level) {
        return level.getLevel() >= minLevel.getLevel();
    }
    
    @Override
    protected void processLog(LogLevel level, String message) {
        // 模拟发送邮件
        System.out.println(String.format("[%s] Sending email to %s - %s: %s", 
            name, emailAddress, level, message));
    }
}

// 日志系统
class LoggingSystem {
    private LogHandler handlerChain;
    
    public LoggingSystem() {
        buildHandlerChain();
    }
    
    private void buildHandlerChain() {
        LogHandler consoleHandler = new ConsoleLogHandler(LogLevel.DEBUG);
        LogHandler fileHandler = new FileLogHandler(LogLevel.INFO, "app.log");
        LogHandler emailHandler = new EmailLogHandler(LogLevel.ERROR, "admin@company.com");
        
        // 构建处理链
        consoleHandler.setNext(fileHandler).setNext(emailHandler);
        
        this.handlerChain = consoleHandler;
    }
    
    public void log(LogLevel level, String message) {
        handlerChain.handle(level, message);
    }
}

// 演示类
public class ChainOfResponsibilityDemo {
    public static void main(String[] args) {
        LoggingSystem logger = new LoggingSystem();
        
        System.out.println("=== 责任链模式演示:日志处理系统 ===");
        
        logger.log(LogLevel.DEBUG, "调试信息:用户登录");
        logger.log(LogLevel.INFO, "信息:订单创建成功");
        logger.log(LogLevel.WARN, "警告:内存使用率较高");
        logger.log(LogLevel.ERROR, "错误:数据库连接失败");
        logger.log(LogLevel.FATAL, "严重错误:系统崩溃");
    }
}

11.2.6 责任链模式的优缺点

优点: 1. 降低耦合度:请求发送者和接收者解耦 2. 增强灵活性:可以动态地增加或删除处理者 3. 符合开闭原则:增加新的处理者不影响现有代码 4. 简化对象:每个处理者只需关心自己的处理逻辑 5. 责任分担:每个处理者承担明确的责任

缺点: 1. 性能影响:请求可能需要遍历整个链 2. 调试困难:运行时难以观察链的结构 3. 不保证处理:请求可能到达链尾仍未被处理 4. 系统性能:链过长会影响性能

11.2.7 适用场景

  1. 多个对象可以处理同一请求:但具体由哪个对象处理在运行时确定
  2. 不明确指定接收者:希望向多个对象中的一个提交请求
  3. 动态指定处理者集合:可处理请求的对象集合应被动态指定
  4. 请求处理的顺序很重要:需要按特定顺序尝试处理请求

11.3 中介者模式

11.3.1 模式定义

中介者模式是一种行为型设计模式,它定义了一个中介对象来封装一系列对象之间的交互。中介者使各对象不需要显式地相互引用,从而使其耦合松散,而且可以独立地改变它们之间的交互。

11.3.2 模式动机

在面向对象的软件设计中,对象之间的交互和通信是必不可少的,但是如果让对象之间直接进行交互,会导致: - 对象之间的耦合度过高 - 系统结构复杂,难以维护 - 对象的复用性降低 - 系统扩展困难

中介者模式通过引入一个中介对象来管理对象间的交互,从而降低了对象间的耦合度。

11.3.3 模式结构

┌─────────────────┐    ┌─────────────────┐
│   Colleague1    │───▶│    Mediator     │◀───┐
└─────────────────┘    │   (Abstract)    │    │
                       └─────────────────┘    │
┌─────────────────┐            △             │
│   Colleague2    │───────────┐│             │
└─────────────────┘           ││             │
                              ││             │
┌─────────────────┐    ┌──────▼▼──────┐      │
│   Colleague3    │───▶│ ConcreteMediator │───┘
└─────────────────┘    └─────────────────┘

主要角色: 1. Mediator(抽象中介者):定义中介者接口 2. ConcreteMediator(具体中介者):实现中介者接口,协调各同事对象 3. Colleague(抽象同事类):定义同事类接口 4. ConcreteColleague(具体同事类):实现同事类,通过中介者与其他同事交互

11.3.4 Python实现示例:聊天室系统

from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum

class MessageType(Enum):
    """消息类型"""
    TEXT = "文本"
    IMAGE = "图片"
    FILE = "文件"
    SYSTEM = "系统"

class Message:
    """消息类"""
    
    def __init__(self, sender: str, content: str, msg_type: MessageType = MessageType.TEXT):
        self.sender = sender
        self.content = content
        self.msg_type = msg_type
        self.timestamp = datetime.now()
        self.message_id = f"{sender}_{int(self.timestamp.timestamp())}"
    
    def __str__(self):
        time_str = self.timestamp.strftime("%H:%M:%S")
        return f"[{time_str}] {self.sender}: {self.content}"

class ChatMediator(ABC):
    """聊天中介者接口"""
    
    @abstractmethod
    def send_message(self, message: Message, sender: 'User') -> None:
        """发送消息"""
        pass
    
    @abstractmethod
    def add_user(self, user: 'User') -> None:
        """添加用户"""
        pass
    
    @abstractmethod
    def remove_user(self, user: 'User') -> None:
        """移除用户"""
        pass

class User(ABC):
    """抽象用户类"""
    
    def __init__(self, name: str, mediator: ChatMediator):
        self.name = name
        self.mediator = mediator
        self.is_online = True
        self.message_history: List[Message] = []
    
    def send_message(self, content: str, msg_type: MessageType = MessageType.TEXT) -> None:
        """发送消息"""
        if self.is_online:
            message = Message(self.name, content, msg_type)
            self.mediator.send_message(message, self)
        else:
            print(f"{self.name} 当前离线,无法发送消息")
    
    @abstractmethod
    def receive_message(self, message: Message) -> None:
        """接收消息"""
        pass
    
    def go_online(self) -> None:
        """上线"""
        self.is_online = True
        print(f"{self.name} 已上线")
    
    def go_offline(self) -> None:
        """下线"""
        self.is_online = False
        print(f"{self.name} 已下线")

class RegularUser(User):
    """普通用户"""
    
    def receive_message(self, message: Message) -> None:
        """接收消息"""
        if self.is_online and message.sender != self.name:
            self.message_history.append(message)
            print(f"{self.name} 收到消息: {message}")

class AdminUser(User):
    """管理员用户"""
    
    def __init__(self, name: str, mediator: ChatMediator):
        super().__init__(name, mediator)
        self.can_moderate = True
    
    def receive_message(self, message: Message) -> None:
        """接收消息"""
        if self.is_online:
            self.message_history.append(message)
            if message.sender != self.name:
                print(f"[管理员] {self.name} 收到消息: {message}")
    
    def moderate_user(self, username: str, action: str) -> None:
        """管理用户"""
        if self.can_moderate:
            system_message = Message("系统", f"管理员 {self.name} 对用户 {username} 执行了 {action}", MessageType.SYSTEM)
            self.mediator.send_message(system_message, self)

class ChatRoom(ChatMediator):
    """聊天室(具体中介者)"""
    
    def __init__(self, room_name: str):
        self.room_name = room_name
        self.users: Dict[str, User] = {}
        self.message_history: List[Message] = []
        self.banned_words = ["垃圾", "广告", "spam"]
        self.max_users = 50
    
    def add_user(self, user: User) -> None:
        """添加用户"""
        if len(self.users) >= self.max_users:
            print(f"聊天室 {self.room_name} 已满,无法添加用户 {user.name}")
            return
        
        if user.name not in self.users:
            self.users[user.name] = user
            join_message = Message("系统", f"{user.name} 加入了聊天室", MessageType.SYSTEM)
            self.message_history.append(join_message)
            
            # 通知所有在线用户
            for existing_user in self.users.values():
                if existing_user.is_online and existing_user.name != user.name:
                    existing_user.receive_message(join_message)
            
            print(f"{user.name} 加入聊天室 {self.room_name}")
        else:
            print(f"用户 {user.name} 已在聊天室中")
    
    def remove_user(self, user: User) -> None:
        """移除用户"""
        if user.name in self.users:
            del self.users[user.name]
            leave_message = Message("系统", f"{user.name} 离开了聊天室", MessageType.SYSTEM)
            self.message_history.append(leave_message)
            
            # 通知所有在线用户
            for remaining_user in self.users.values():
                if remaining_user.is_online:
                    remaining_user.receive_message(leave_message)
            
            print(f"{user.name} 离开聊天室 {self.room_name}")
    
    def send_message(self, message: Message, sender: User) -> None:
        """发送消息"""
        # 检查发送者是否在聊天室中
        if sender.name not in self.users:
            print(f"用户 {sender.name} 不在聊天室中,无法发送消息")
            return
        
        # 内容过滤
        if self._is_message_appropriate(message):
            self.message_history.append(message)
            
            # 转发给所有在线用户
            for user in self.users.values():
                if user.is_online:
                    user.receive_message(message)
            
            print(f"聊天室 {self.room_name} 广播消息: {message}")
        else:
            warning_message = Message("系统", f"{sender.name} 的消息包含不当内容,已被过滤", MessageType.SYSTEM)
            sender.receive_message(warning_message)
    
    def _is_message_appropriate(self, message: Message) -> bool:
        """检查消息是否合适"""
        content_lower = message.content.lower()
        return not any(banned_word in content_lower for banned_word in self.banned_words)
    
    def get_online_users(self) -> List[str]:
        """获取在线用户列表"""
        return [user.name for user in self.users.values() if user.is_online]
    
    def get_room_statistics(self) -> Dict:
        """获取聊天室统计信息"""
        total_messages = len(self.message_history)
        system_messages = sum(1 for msg in self.message_history if msg.msg_type == MessageType.SYSTEM)
        user_messages = total_messages - system_messages
        
        user_message_counts = {}
        for message in self.message_history:
            if message.msg_type != MessageType.SYSTEM:
                user_message_counts[message.sender] = user_message_counts.get(message.sender, 0) + 1
        
        return {
            'room_name': self.room_name,
            'total_users': len(self.users),
            'online_users': len(self.get_online_users()),
            'total_messages': total_messages,
            'user_messages': user_messages,
            'system_messages': system_messages,
            'user_message_counts': user_message_counts
        }

class PrivateChat(ChatMediator):
    """私聊(具体中介者)"""
    
    def __init__(self, user1: User, user2: User):
        self.participants = {user1.name: user1, user2.name: user2}
        self.message_history: List[Message] = []
        self.chat_id = f"{user1.name}_{user2.name}"
    
    def add_user(self, user: User) -> None:
        """私聊不支持添加用户"""
        print("私聊不支持添加新用户")
    
    def remove_user(self, user: User) -> None:
        """私聊不支持移除用户"""
        print("私聊不支持移除用户")
    
    def send_message(self, message: Message, sender: User) -> None:
        """发送私聊消息"""
        if sender.name not in self.participants:
            print(f"用户 {sender.name} 不在此私聊中")
            return
        
        self.message_history.append(message)
        
        # 发送给另一个参与者
        for user in self.participants.values():
            if user.name != sender.name and user.is_online:
                user.receive_message(message)
        
        print(f"私聊 {self.chat_id} 消息: {message}")

def demonstrate_mediator_pattern():
    """演示中介者模式"""
    print("=== 中介者模式演示:聊天室系统 ===")
    
    # 创建聊天室
    tech_room = ChatRoom("技术讨论")
    
    # 创建用户
    alice = RegularUser("Alice", tech_room)
    bob = RegularUser("Bob", tech_room)
    charlie = AdminUser("Charlie", tech_room)
    david = RegularUser("David", tech_room)
    
    # 用户加入聊天室
    print("\n--- 用户加入聊天室 ---")
    tech_room.add_user(alice)
    tech_room.add_user(bob)
    tech_room.add_user(charlie)
    tech_room.add_user(david)
    
    # 用户发送消息
    print("\n--- 用户聊天 ---")
    alice.send_message("大家好!我是新来的")
    bob.send_message("欢迎Alice!")
    charlie.send_message("欢迎加入技术讨论组")
    
    # 用户下线
    print("\n--- 用户状态变化 ---")
    david.go_offline()
    alice.send_message("David还在吗?")
    
    # 管理员操作
    print("\n--- 管理员操作 ---")
    charlie.moderate_user("Bob", "警告")
    
    # 内容过滤测试
    print("\n--- 内容过滤测试 ---")
    bob.send_message("这是垃圾信息")
    
    # David重新上线
    print("\n--- 用户重新上线 ---")
    david.go_online()
    david.send_message("我回来了!")
    
    # 显示聊天室统计
    print("\n--- 聊天室统计 ---")
    stats = tech_room.get_room_statistics()
    print(f"聊天室: {stats['room_name']}")
    print(f"总用户数: {stats['total_users']}")
    print(f"在线用户数: {stats['online_users']}")
    print(f"总消息数: {stats['total_messages']}")
    print(f"用户消息数: {stats['user_messages']}")
    print(f"系统消息数: {stats['system_messages']}")
    print("用户发言统计:", stats['user_message_counts'])
    
    # 演示私聊
    print("\n--- 私聊演示 ---")
    private_chat = PrivateChat(alice, bob)
    alice.mediator = private_chat
    bob.mediator = private_chat
    
    alice.send_message("Bob,我们私下聊聊")
    bob.send_message("好的,Alice")

if __name__ == "__main__":
    demonstrate_mediator_pattern()

11.3.5 Go实现示例:GUI组件交互

package main

import (
    "fmt"
    "strings"
    "time"
)

// Mediator 中介者接口
type Mediator interface {
    Notify(sender Component, event string)
}

// Component 组件接口
type Component interface {
    SetMediator(mediator Mediator)
    GetName() string
}

// BaseComponent 基础组件
type BaseComponent struct {
    name     string
    mediator Mediator
}

func (c *BaseComponent) SetMediator(mediator Mediator) {
    c.mediator = mediator
}

func (c *BaseComponent) GetName() string {
    return c.name
}

// Button 按钮组件
type Button struct {
    BaseComponent
    enabled bool
    text    string
}

func NewButton(name, text string) *Button {
    return &Button{
        BaseComponent: BaseComponent{name: name},
        enabled:       true,
        text:          text,
    }
}

func (b *Button) Click() {
    if b.enabled {
        fmt.Printf("按钮 %s 被点击\n", b.name)
        b.mediator.Notify(b, "click")
    } else {
        fmt.Printf("按钮 %s 已禁用,无法点击\n", b.name)
    }
}

func (b *Button) SetEnabled(enabled bool) {
    b.enabled = enabled
    status := "启用"
    if !enabled {
        status = "禁用"
    }
    fmt.Printf("按钮 %s 已%s\n", b.name, status)
}

func (b *Button) SetText(text string) {
    b.text = text
    fmt.Printf("按钮 %s 文本更新为: %s\n", b.name, text)
}

// TextBox 文本框组件
type TextBox struct {
    BaseComponent
    content string
    maxLength int
}

func NewTextBox(name string, maxLength int) *TextBox {
    return &TextBox{
        BaseComponent: BaseComponent{name: name},
        maxLength:     maxLength,
    }
}

func (t *TextBox) SetText(text string) {
    if len(text) <= t.maxLength {
        t.content = text
        fmt.Printf("文本框 %s 内容更新为: %s\n", t.name, text)
        t.mediator.Notify(t, "textChanged")
    } else {
        fmt.Printf("文本框 %s 内容超出最大长度限制\n", t.name)
    }
}

func (t *TextBox) GetText() string {
    return t.content
}

func (t *TextBox) Clear() {
    t.content = ""
    fmt.Printf("文本框 %s 已清空\n", t.name)
    t.mediator.Notify(t, "textChanged")
}

// CheckBox 复选框组件
type CheckBox struct {
    BaseComponent
    checked bool
}

func NewCheckBox(name string) *CheckBox {
    return &CheckBox{
        BaseComponent: BaseComponent{name: name},
        checked:       false,
    }
}

func (c *CheckBox) Toggle() {
    c.checked = !c.checked
    status := "选中"
    if !c.checked {
        status = "取消选中"
    }
    fmt.Printf("复选框 %s 已%s\n", c.name, status)
    c.mediator.Notify(c, "stateChanged")
}

func (c *CheckBox) IsChecked() bool {
    return c.checked
}

// ListBox 列表框组件
type ListBox struct {
    BaseComponent
    items         []string
    selectedIndex int
}

func NewListBox(name string) *ListBox {
    return &ListBox{
        BaseComponent: BaseComponent{name: name},
        items:         make([]string, 0),
        selectedIndex: -1,
    }
}

func (l *ListBox) AddItem(item string) {
    l.items = append(l.items, item)
    fmt.Printf("列表框 %s 添加项目: %s\n", l.name, item)
    l.mediator.Notify(l, "itemAdded")
}

func (l *ListBox) SelectItem(index int) {
    if index >= 0 && index < len(l.items) {
        l.selectedIndex = index
        fmt.Printf("列表框 %s 选中项目: %s\n", l.name, l.items[index])
        l.mediator.Notify(l, "selectionChanged")
    }
}

func (l *ListBox) GetSelectedItem() string {
    if l.selectedIndex >= 0 && l.selectedIndex < len(l.items) {
        return l.items[l.selectedIndex]
    }
    return ""
}

func (l *ListBox) Clear() {
    l.items = make([]string, 0)
    l.selectedIndex = -1
    fmt.Printf("列表框 %s 已清空\n", l.name)
    l.mediator.Notify(l, "cleared")
}

// DialogMediator 对话框中介者
type DialogMediator struct {
    title      string
    nameBox    *TextBox
    emailBox   *TextBox
    submitBtn  *Button
    clearBtn   *Button
    agreeCheck *CheckBox
    resultList *ListBox
}

func NewDialogMediator(title string) *DialogMediator {
    dm := &DialogMediator{
        title:      title,
        nameBox:    NewTextBox("姓名输入框", 50),
        emailBox:   NewTextBox("邮箱输入框", 100),
        submitBtn:  NewButton("提交按钮", "提交"),
        clearBtn:   NewButton("清空按钮", "清空"),
        agreeCheck: NewCheckBox("同意条款"),
        resultList: NewListBox("结果列表"),
    }
    
    // 设置中介者
    dm.nameBox.SetMediator(dm)
    dm.emailBox.SetMediator(dm)
    dm.submitBtn.SetMediator(dm)
    dm.clearBtn.SetMediator(dm)
    dm.agreeCheck.SetMediator(dm)
    dm.resultList.SetMediator(dm)
    
    // 初始状态:提交按钮禁用
    dm.submitBtn.SetEnabled(false)
    
    return dm
}

func (dm *DialogMediator) Notify(sender Component, event string) {
    switch sender.GetName() {
    case "姓名输入框", "邮箱输入框":
        if event == "textChanged" {
            dm.validateForm()
        }
    
    case "同意条款":
        if event == "stateChanged" {
            dm.validateForm()
        }
    
    case "提交按钮":
        if event == "click" {
            dm.submitForm()
        }
    
    case "清空按钮":
        if event == "click" {
            dm.clearForm()
        }
    }
}

func (dm *DialogMediator) validateForm() {
    nameValid := len(strings.TrimSpace(dm.nameBox.GetText())) > 0
    emailValid := dm.isValidEmail(dm.emailBox.GetText())
    agreeChecked := dm.agreeCheck.IsChecked()
    
    canSubmit := nameValid && emailValid && agreeChecked
    dm.submitBtn.SetEnabled(canSubmit)
    
    if canSubmit {
        dm.submitBtn.SetText("提交")
    } else {
        reasons := make([]string, 0)
        if !nameValid {
            reasons = append(reasons, "姓名为空")
        }
        if !emailValid {
            reasons = append(reasons, "邮箱格式错误")
        }
        if !agreeChecked {
            reasons = append(reasons, "未同意条款")
        }
        dm.submitBtn.SetText(fmt.Sprintf("无法提交(%s)", strings.Join(reasons, ",")))
    }
}

func (dm *DialogMediator) isValidEmail(email string) bool {
    return strings.Contains(email, "@") && strings.Contains(email, ".")
}

func (dm *DialogMediator) submitForm() {
    name := dm.nameBox.GetText()
    email := dm.emailBox.GetText()
    
    result := fmt.Sprintf("用户注册成功 - 姓名: %s, 邮箱: %s, 时间: %s", 
        name, email, time.Now().Format("15:04:05"))
    
    dm.resultList.AddItem(result)
    fmt.Printf("表单提交成功!\n")
    
    // 提交后清空表单
    dm.clearForm()
}

func (dm *DialogMediator) clearForm() {
    dm.nameBox.Clear()
    dm.emailBox.Clear()
    dm.agreeCheck.checked = false
    fmt.Printf("复选框 %s 已取消选中\n", dm.agreeCheck.name)
    dm.validateForm()
}

// 模拟用户操作
func simulateUserInteraction(dm *DialogMediator) {
    fmt.Println("\n=== 模拟用户交互 ===")
    
    // 用户输入姓名
    fmt.Println("\n--- 用户输入姓名 ---")
    dm.nameBox.SetText("张三")
    
    // 用户输入无效邮箱
    fmt.Println("\n--- 用户输入无效邮箱 ---")
    dm.emailBox.SetText("invalid-email")
    
    // 用户同意条款
    fmt.Println("\n--- 用户同意条款 ---")
    dm.agreeCheck.Toggle()
    
    // 用户输入有效邮箱
    fmt.Println("\n--- 用户输入有效邮箱 ---")
    dm.emailBox.SetText("zhangsan@example.com")
    
    // 用户提交表单
    fmt.Println("\n--- 用户提交表单 ---")
    dm.submitBtn.Click()
    
    // 用户再次填写表单
    fmt.Println("\n--- 用户再次填写表单 ---")
    dm.nameBox.SetText("李四")
    dm.emailBox.SetText("lisi@test.com")
    dm.agreeCheck.Toggle()
    dm.submitBtn.Click()
    
    // 用户清空表单
    fmt.Println("\n--- 用户清空表单 ---")
    dm.clearBtn.Click()
}

func main() {
    fmt.Println("=== 中介者模式演示:GUI组件交互 ===")
    
    // 创建对话框中介者
    dialog := NewDialogMediator("用户注册对话框")
    
    // 模拟用户交互
    simulateUserInteraction(dialog)
    
    fmt.Println("\n=== 演示结束 ===")
}

11.3.6 中介者模式的优缺点

优点: 1. 降低耦合度:减少对象间的直接依赖关系 2. 集中控制:将复杂的交互逻辑集中在中介者中 3. 提高复用性:同事对象可以独立变化和复用 4. 简化对象协议:多对多关系变为一对多关系 5. 符合迪米特法则:对象只与中介者通信

缺点: 1. 中介者复杂化:中介者可能变得过于复杂 2. 单点故障:中介者出问题会影响整个系统 3. 性能开销:增加了一层间接调用 4. 难以维护:复杂的中介者难以维护和扩展

11.3.7 适用场景

  1. 对象间交互复杂:一组对象以定义良好但复杂的方式进行通信
  2. 对象间依赖混乱:对象之间的依赖关系混乱且难以理解
  3. 对象难以复用:因为引用了太多其他对象而难以复用
  4. 行为分布在多个类中:想定制一个分布在多个类中的行为

11.4 责任链模式与中介者模式对比分析

11.4.1 相同点

  1. 行为型模式:都属于行为型设计模式
  2. 解耦目的:都旨在减少对象间的直接耦合
  3. 动态性:都支持运行时的动态配置
  4. 扩展性:都便于系统的扩展和维护
  5. 封装变化:都将变化的部分进行封装

11.4.2 不同点

对比维度 责任链模式 中介者模式
主要目的 处理请求的传递 协调对象间交互
对象关系 链式结构 星型结构
通信方式 单向传递 双向通信
处理方式 顺序处理 集中协调
扩展方式 添加处理者 修改中介者
复杂度 链可能很长 中介者可能复杂
适用场景 请求处理流程 复杂对象交互

11.4.3 选择指南

选择责任链模式的情况: - 有多个对象可以处理同一请求 - 处理者的集合需要动态指定 - 不希望明确指定接收者 - 请求需要按顺序尝试处理

选择中介者模式的情况: - 对象间交互复杂且耦合度高 - 需要集中管理对象间的交互逻辑 - 对象的行为依赖于其他多个对象 - 希望提高对象的复用性

11.4.4 组合使用

责任链模式和中介者模式可以结合使用,创建更强大的系统:

class MediatedChainHandler(ABC):
    """支持中介者的责任链处理者"""
    
    def __init__(self, name: str, mediator: 'ChainMediator'):
        self.name = name
        self.mediator = mediator
        self._next_handler: Optional['MediatedChainHandler'] = None
    
    def set_next(self, handler: 'MediatedChainHandler') -> 'MediatedChainHandler':
        self._next_handler = handler
        return handler
    
    def handle_request(self, request: Any) -> bool:
        # 通知中介者处理开始
        self.mediator.notify_processing_start(self, request)
        
        if self.can_handle(request):
            result = self.process_request(request)
            self.mediator.notify_processing_complete(self, request, result)
            return result
        elif self._next_handler:
            self.mediator.notify_forwarding(self, self._next_handler, request)
            return self._next_handler.handle_request(request)
        else:
            self.mediator.notify_processing_failed(self, request)
            return False
    
    @abstractmethod
    def can_handle(self, request: Any) -> bool:
        pass
    
    @abstractmethod
    def process_request(self, request: Any) -> bool:
        pass

class ChainMediator:
    """责任链中介者"""
    
    def __init__(self):
        self.handlers: List[MediatedChainHandler] = []
        self.processing_history: List[Dict] = []
        self.observers: List['ChainObserver'] = []
    
    def register_handler(self, handler: MediatedChainHandler):
        """注册处理者"""
        self.handlers.append(handler)
    
    def add_observer(self, observer: 'ChainObserver'):
        """添加观察者"""
        self.observers.append(observer)
    
    def notify_processing_start(self, handler: MediatedChainHandler, request: Any):
        """通知处理开始"""
        event = {
            'type': 'processing_start',
            'handler': handler.name,
            'request': str(request),
            'timestamp': time.time()
        }
        self.processing_history.append(event)
        
        for observer in self.observers:
            observer.on_processing_start(handler, request)
    
    def notify_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
        """通知处理完成"""
        event = {
            'type': 'processing_complete',
            'handler': handler.name,
            'request': str(request),
            'result': result,
            'timestamp': time.time()
        }
        self.processing_history.append(event)
        
        for observer in self.observers:
            observer.on_processing_complete(handler, request, result)
    
    def notify_forwarding(self, from_handler: MediatedChainHandler, 
                         to_handler: MediatedChainHandler, request: Any):
        """通知转发"""
        event = {
            'type': 'forwarding',
            'from_handler': from_handler.name,
            'to_handler': to_handler.name,
            'request': str(request),
            'timestamp': time.time()
        }
        self.processing_history.append(event)
        
        for observer in self.observers:
            observer.on_forwarding(from_handler, to_handler, request)
    
    def notify_processing_failed(self, handler: MediatedChainHandler, request: Any):
        """通知处理失败"""
        event = {
            'type': 'processing_failed',
            'handler': handler.name,
            'request': str(request),
            'timestamp': time.time()
        }
        self.processing_history.append(event)
        
        for observer in self.observers:
            observer.on_processing_failed(handler, request)
    
    def get_processing_statistics(self) -> Dict:
        """获取处理统计"""
        total_events = len(self.processing_history)
        event_counts = {}
        handler_stats = {}
        
        for event in self.processing_history:
            event_type = event['type']
            event_counts[event_type] = event_counts.get(event_type, 0) + 1
            
            if 'handler' in event:
                handler = event['handler']
                if handler not in handler_stats:
                    handler_stats[handler] = {'processed': 0, 'forwarded': 0, 'failed': 0}
                
                if event_type == 'processing_complete':
                    handler_stats[handler]['processed'] += 1
                elif event_type == 'forwarding':
                    handler_stats[handler]['forwarded'] += 1
                elif event_type == 'processing_failed':
                    handler_stats[handler]['failed'] += 1
        
        return {
            'total_events': total_events,
            'event_counts': event_counts,
            'handler_statistics': handler_stats
        }

class ChainObserver(ABC):
    """责任链观察者接口"""
    
    @abstractmethod
    def on_processing_start(self, handler: MediatedChainHandler, request: Any):
        pass
    
    @abstractmethod
    def on_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
        pass
    
    @abstractmethod
    def on_forwarding(self, from_handler: MediatedChainHandler, 
                     to_handler: MediatedChainHandler, request: Any):
        pass
    
    @abstractmethod
    def on_processing_failed(self, handler: MediatedChainHandler, request: Any):
        pass

class LoggingObserver(ChainObserver):
    """日志观察者"""
    
    def on_processing_start(self, handler: MediatedChainHandler, request: Any):
        print(f"[LOG] {handler.name} 开始处理请求: {request}")
    
    def on_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
        status = "成功" if result else "失败"
        print(f"[LOG] {handler.name} 处理{status}: {request}")
    
    def on_forwarding(self, from_handler: MediatedChainHandler, 
                     to_handler: MediatedChainHandler, request: Any):
        print(f"[LOG] {from_handler.name} 转发请求到 {to_handler.name}: {request}")
    
    def on_processing_failed(self, handler: MediatedChainHandler, request: Any):
        print(f"[LOG] {handler.name} 处理失败,无后续处理者: {request}")

class MetricsObserver(ChainObserver):
    """指标观察者"""
    
    def __init__(self):
        self.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'forwarded_requests': 0
        }
    
    def on_processing_start(self, handler: MediatedChainHandler, request: Any):
        self.metrics['total_requests'] += 1
    
    def on_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
        if result:
            self.metrics['successful_requests'] += 1
        else:
            self.metrics['failed_requests'] += 1
    
    def on_forwarding(self, from_handler: MediatedChainHandler, 
                     to_handler: MediatedChainHandler, request: Any):
        self.metrics['forwarded_requests'] += 1
    
    def on_processing_failed(self, handler: MediatedChainHandler, request: Any):
        self.metrics['failed_requests'] += 1
    
    def get_metrics(self) -> Dict:
        return self.metrics.copy()

def demonstrate_combined_patterns():
    """演示责任链模式与中介者模式的组合使用"""
    print("=== 责任链模式与中介者模式组合演示 ===")
    
    # 创建中介者
    mediator = ChainMediator()
    
    # 创建观察者
    logging_observer = LoggingObserver()
    metrics_observer = MetricsObserver()
    
    mediator.add_observer(logging_observer)
    mediator.add_observer(metrics_observer)
    
    # 创建处理者(这里简化实现)
    class SimpleHandler(MediatedChainHandler):
        def __init__(self, name: str, mediator: ChainMediator, max_value: int):
            super().__init__(name, mediator)
            self.max_value = max_value
        
        def can_handle(self, request: Any) -> bool:
            return isinstance(request, int) and request <= self.max_value
        
        def process_request(self, request: Any) -> bool:
            print(f"{self.name} 处理请求: {request}")
            return True
    
    # 创建处理链
    handler1 = SimpleHandler("低级处理者", mediator, 10)
    handler2 = SimpleHandler("中级处理者", mediator, 50)
    handler3 = SimpleHandler("高级处理者", mediator, 100)
    
    handler1.set_next(handler2).set_next(handler3)
    
    # 注册处理者
    mediator.register_handler(handler1)
    mediator.register_handler(handler2)
    mediator.register_handler(handler3)
    
    # 处理请求
    requests = [5, 25, 75, 150]
    
    for request in requests:
        print(f"\n--- 处理请求: {request} ---")
        handler1.handle_request(request)
    
    # 显示统计信息
    print("\n=== 处理统计 ===")
    stats = mediator.get_processing_statistics()
    print(f"总事件数: {stats['total_events']}")
    print(f"事件统计: {stats['event_counts']}")
    print(f"处理者统计: {stats['handler_statistics']}")
    
    print("\n=== 指标统计 ===")
    metrics = metrics_observer.get_metrics()
    for key, value in metrics.items():
        print(f"{key}: {value}")

if __name__ == "__main__":
    demonstrate_combined_patterns()

11.5 高级应用技巧

11.5.1 责任链模式优化技巧

  1. 处理者缓存:缓存处理者实例减少创建开销
  2. 链路优化:根据请求类型动态调整链路顺序
  3. 并行处理:支持多个处理者并行处理请求
  4. 链路监控:监控链路性能和处理效率
  5. 异常处理:完善的异常处理和恢复机制

11.5.2 中介者模式优化技巧

  1. 中介者分层:将复杂中介者分解为多个层次
  2. 事件驱动:使用事件驱动机制减少直接调用
  3. 异步处理:支持异步消息处理
  4. 状态管理:合理管理中介者的状态
  5. 性能监控:监控中介者的性能瓶颈

11.5.3 内存和性能优化

class OptimizedChainManager:
    """优化的责任链管理器"""
    
    def __init__(self):
        self._handler_cache = {}
        self._chain_cache = {}
        self._performance_metrics = {}
    
    def get_handler(self, handler_type: str, config: Dict):
        """获取处理者(带缓存)"""
        cache_key = f"{handler_type}_{hash(str(config))}"
        
        if cache_key not in self._handler_cache:
            # 根据类型创建处理者
            self._handler_cache[cache_key] = self._create_handler(handler_type, config)
        
        return self._handler_cache[cache_key]
    
    def build_optimized_chain(self, request_type: str, handlers_config: List[Dict]):
        """构建优化的处理链"""
        cache_key = f"{request_type}_{hash(str(handlers_config))}"
        
        if cache_key not in self._chain_cache:
            # 根据历史性能数据优化链路顺序
            optimized_config = self._optimize_chain_order(request_type, handlers_config)
            
            # 构建链路
            chain = self._build_chain(optimized_config)
            self._chain_cache[cache_key] = chain
        
        return self._chain_cache[cache_key]
    
    def _optimize_chain_order(self, request_type: str, handlers_config: List[Dict]) -> List[Dict]:
        """根据性能数据优化链路顺序"""
        if request_type not in self._performance_metrics:
            return handlers_config
        
        metrics = self._performance_metrics[request_type]
        
        # 根据成功率和处理时间排序
        def sort_key(config):
            handler_name = config['name']
            if handler_name in metrics:
                success_rate = metrics[handler_name].get('success_rate', 0)
                avg_time = metrics[handler_name].get('avg_processing_time', float('inf'))
                return (-success_rate, avg_time)  # 成功率高、时间短的优先
            return (0, float('inf'))
        
        return sorted(handlers_config, key=sort_key)

class OptimizedMediator:
    """优化的中介者"""
    
    def __init__(self):
        self._event_queue = []
        self._async_handlers = {}
        self._performance_monitor = PerformanceMonitor()
    
    def notify_async(self, sender: Component, event: str, data: Any = None):
        """异步通知"""
        event_obj = {
            'sender': sender,
            'event': event,
            'data': data,
            'timestamp': time.time()
        }
        
        self._event_queue.append(event_obj)
        self._process_event_queue()
    
    def register_async_handler(self, event_type: str, handler: Callable):
        """注册异步处理器"""
        if event_type not in self._async_handlers:
            self._async_handlers[event_type] = []
        
        self._async_handlers[event_type].append(handler)
    
    def _process_event_queue(self):
        """处理事件队列"""
        while self._event_queue:
            event = self._event_queue.pop(0)
            self._handle_event(event)
    
    def _handle_event(self, event: Dict):
        """处理单个事件"""
        event_type = event['event']
        
        start_time = time.time()
        
        if event_type in self._async_handlers:
            for handler in self._async_handlers[event_type]:
                try:
                    handler(event)
                except Exception as e:
                    print(f"事件处理错误: {e}")
        
        processing_time = time.time() - start_time
        self._performance_monitor.record_event_processing(event_type, processing_time)

class PerformanceMonitor:
    """性能监控器"""
    
    def __init__(self):
        self._metrics = {}
    
    def record_event_processing(self, event_type: str, processing_time: float):
        """记录事件处理性能"""
        if event_type not in self._metrics:
            self._metrics[event_type] = {
                'total_count': 0,
                'total_time': 0,
                'avg_time': 0,
                'max_time': 0,
                'min_time': float('inf')
            }
        
        metrics = self._metrics[event_type]
        metrics['total_count'] += 1
        metrics['total_time'] += processing_time
        metrics['avg_time'] = metrics['total_time'] / metrics['total_count']
        metrics['max_time'] = max(metrics['max_time'], processing_time)
        metrics['min_time'] = min(metrics['min_time'], processing_time)
    
    def get_performance_report(self) -> Dict:
        """获取性能报告"""
        return self._metrics.copy()

11.6 实际应用案例

11.6.1 Web请求处理管道

class HTTPRequest:
    """HTTP请求对象"""
    
    def __init__(self, method: str, path: str, headers: Dict, body: str = ""):
        self.method = method
        self.path = path
        self.headers = headers
        self.body = body
        self.context = {}  # 用于在处理器间传递数据
    
    def __str__(self):
        return f"{self.method} {self.path}"

class HTTPResponse:
    """HTTP响应对象"""
    
    def __init__(self, status_code: int = 200, headers: Dict = None, body: str = ""):
        self.status_code = status_code
        self.headers = headers or {}
        self.body = body

class MiddlewareHandler(ABC):
    """中间件处理器基类"""
    
    def __init__(self, name: str):
        self.name = name
        self._next_handler: Optional['MiddlewareHandler'] = None
    
    def set_next(self, handler: 'MiddlewareHandler') -> 'MiddlewareHandler':
        self._next_handler = handler
        return handler
    
    def handle(self, request: HTTPRequest) -> Optional[HTTPResponse]:
        """处理请求"""
        # 前置处理
        response = self.before_process(request)
        if response:
            return response
        
        # 调用下一个处理器
        if self._next_handler:
            response = self._next_handler.handle(request)
        else:
            response = HTTPResponse(404, body="Not Found")
        
        # 后置处理
        return self.after_process(request, response)
    
    @abstractmethod
    def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
        """前置处理,返回Response表示提前结束"""
        pass
    
    def after_process(self, request: HTTPRequest, response: HTTPResponse) -> HTTPResponse:
        """后置处理"""
        return response

class AuthenticationMiddleware(MiddlewareHandler):
    """认证中间件"""
    
    def __init__(self):
        super().__init__("认证中间件")
        self.protected_paths = ["/api/", "/admin/"]
    
    def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
        # 检查是否需要认证
        needs_auth = any(request.path.startswith(path) for path in self.protected_paths)
        
        if needs_auth:
            auth_header = request.headers.get('Authorization')
            if not auth_header or not self._validate_token(auth_header):
                return HTTPResponse(401, body="Unauthorized")
            
            # 将用户信息添加到上下文
            request.context['user'] = self._get_user_from_token(auth_header)
        
        return None
    
    def _validate_token(self, auth_header: str) -> bool:
        # 模拟token验证
        return auth_header.startswith("Bearer ") and len(auth_header) > 10
    
    def _get_user_from_token(self, auth_header: str) -> Dict:
        # 模拟从token获取用户信息
        return {'id': 1, 'username': 'user1', 'role': 'admin'}

class RateLimitMiddleware(MiddlewareHandler):
    """限流中间件"""
    
    def __init__(self, max_requests: int = 100, time_window: int = 60):
        super().__init__("限流中间件")
        self.max_requests = max_requests
        self.time_window = time_window
        self.request_counts = {}  # IP -> [(timestamp, count)]
    
    def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
        client_ip = request.headers.get('X-Real-IP', '127.0.0.1')
        current_time = time.time()
        
        # 清理过期记录
        if client_ip in self.request_counts:
            self.request_counts[client_ip] = [
                (ts, count) for ts, count in self.request_counts[client_ip]
                if current_time - ts < self.time_window
            ]
        
        # 计算当前时间窗口内的请求数
        if client_ip not in self.request_counts:
            self.request_counts[client_ip] = []
        
        current_requests = sum(count for _, count in self.request_counts[client_ip])
        
        if current_requests >= self.max_requests:
            return HTTPResponse(429, body="Too Many Requests")
        
        # 记录当前请求
        self.request_counts[client_ip].append((current_time, 1))
        
        return None

class LoggingMiddleware(MiddlewareHandler):
    """日志中间件"""
    
    def __init__(self):
        super().__init__("日志中间件")
    
    def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
        print(f"[REQUEST] {request} - Headers: {request.headers}")
        request.context['start_time'] = time.time()
        return None
    
    def after_process(self, request: HTTPRequest, response: HTTPResponse) -> HTTPResponse:
        processing_time = time.time() - request.context.get('start_time', 0)
        print(f"[RESPONSE] {request} - Status: {response.status_code} - Time: {processing_time:.3f}s")
        return response

class CORSMiddleware(MiddlewareHandler):
    """CORS中间件"""
    
    def __init__(self, allowed_origins: List[str] = None):
        super().__init__("CORS中间件")
        self.allowed_origins = allowed_origins or ['*']
    
    def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
        # 处理预检请求
        if request.method == 'OPTIONS':
            return HTTPResponse(200, headers=self._get_cors_headers(request))
        
        return None
    
    def after_process(self, request: HTTPRequest, response: HTTPResponse) -> HTTPResponse:
        # 添加CORS头
        cors_headers = self._get_cors_headers(request)
        response.headers.update(cors_headers)
        return response
    
    def _get_cors_headers(self, request: HTTPRequest) -> Dict[str, str]:
        origin = request.headers.get('Origin', '')
        
        if '*' in self.allowed_origins or origin in self.allowed_origins:
            return {
                'Access-Control-Allow-Origin': origin or '*',
                'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
                'Access-Control-Allow-Headers': 'Content-Type, Authorization',
                'Access-Control-Max-Age': '86400'
            }
        
        return {}

class RouteHandler(MiddlewareHandler):
    """路由处理器"""
    
    def __init__(self):
        super().__init__("路由处理器")
        self.routes = {
            '/': self._handle_home,
            '/api/users': self._handle_users,
            '/api/orders': self._handle_orders,
            '/admin/dashboard': self._handle_admin_dashboard
        }
    
    def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
        handler = self.routes.get(request.path)
        
        if handler:
            return handler(request)
        
        return HTTPResponse(404, body="Not Found")
    
    def _handle_home(self, request: HTTPRequest) -> HTTPResponse:
        return HTTPResponse(200, body="Welcome to the API")
    
    def _handle_users(self, request: HTTPRequest) -> HTTPResponse:
        user = request.context.get('user')
        if user:
            return HTTPResponse(200, body=f"Hello, {user['username']}!")
        return HTTPResponse(200, body="Users endpoint")
    
    def _handle_orders(self, request: HTTPRequest) -> HTTPResponse:
        return HTTPResponse(200, body="Orders endpoint")
    
    def _handle_admin_dashboard(self, request: HTTPRequest) -> HTTPResponse:
        user = request.context.get('user')
        if user and user.get('role') == 'admin':
            return HTTPResponse(200, body="Admin Dashboard")
        return HTTPResponse(403, body="Forbidden")

class WebServer:
    """Web服务器"""
    
    def __init__(self):
        self.middleware_chain = self._build_middleware_chain()
    
    def _build_middleware_chain(self) -> MiddlewareHandler:
        """构建中间件链"""
        logging_middleware = LoggingMiddleware()
        cors_middleware = CORSMiddleware(['http://localhost:3000', 'https://example.com'])
        rate_limit_middleware = RateLimitMiddleware(max_requests=10, time_window=60)
        auth_middleware = AuthenticationMiddleware()
        route_handler = RouteHandler()
        
        # 构建处理链
        logging_middleware.set_next(cors_middleware) \
                         .set_next(rate_limit_middleware) \
                         .set_next(auth_middleware) \
                         .set_next(route_handler)
        
        return logging_middleware
    
    def handle_request(self, request: HTTPRequest) -> HTTPResponse:
        """处理HTTP请求"""
        return self.middleware_chain.handle(request)

def demonstrate_web_middleware():
    """演示Web中间件责任链"""
    print("=== Web请求处理管道演示 ===")
    
    server = WebServer()
    
    # 测试请求
    test_requests = [
        HTTPRequest('GET', '/', {'X-Real-IP': '192.168.1.1'}),
        HTTPRequest('GET', '/api/users', {
            'X-Real-IP': '192.168.1.1',
            'Authorization': 'Bearer valid_token_123'
        }),
        HTTPRequest('GET', '/api/users', {'X-Real-IP': '192.168.1.1'}),  # 无认证
        HTTPRequest('GET', '/admin/dashboard', {
            'X-Real-IP': '192.168.1.2',
            'Authorization': 'Bearer valid_token_123'
        }),
        HTTPRequest('OPTIONS', '/api/users', {
            'Origin': 'http://localhost:3000'
        }),
    ]
    
    # 处理请求
    for i, request in enumerate(test_requests, 1):
        print(f"\n--- 请求 {i} ---")
        response = server.handle_request(request)
        print(f"响应: {response.status_code} - {response.body}")
        if response.headers:
            print(f"响应头: {response.headers}")
    
    # 测试限流
    print("\n--- 限流测试 ---")
    for i in range(12):  # 超过限制的10个请求
        request = HTTPRequest('GET', '/', {'X-Real-IP': '192.168.1.3'})
        response = server.handle_request(request)
        if response.status_code == 429:
            print(f"请求 {i+1}: 被限流")
            break
        else:
            print(f"请求 {i+1}: 正常处理")

if __name__ == "__main__":
    demonstrate_web_middleware()

11.7 本章总结

11.7.1 核心概念回顾

责任链模式: - 将请求的发送者和接收者解耦 - 多个对象都有机会处理请求 - 请求沿着链传递直到被处理 - 支持动态配置处理者链

中介者模式: - 定义对象间的交互方式 - 将多对多关系转为一对多关系 - 集中管理复杂的交互逻辑 - 提高对象的复用性和可维护性

11.7.2 最佳实践

  1. 责任链模式最佳实践:

    • 合理设计链的长度和顺序
    • 实现完善的异常处理机制
    • 考虑性能优化和缓存策略
    • 提供链路监控和调试功能
  2. 中介者模式最佳实践:

    • 避免中介者过于复杂
    • 使用事件驱动减少耦合
    • 实现异步处理提高性能
    • 合理分层管理复杂交互

11.7.3 实际应用建议

  1. 选择合适的模式:

    • 根据问题的性质选择模式
    • 考虑系统的复杂度和扩展性
    • 评估维护成本和性能影响
  2. 模式组合使用:

    • 责任链可以与中介者结合
    • 使用观察者模式增强监控
    • 结合策略模式优化处理逻辑

11.7.4 注意事项

  1. 责任链模式注意事项:

    • 避免链过长影响性能
    • 确保请求能被正确处理
    • 实现适当的错误处理机制
    • 考虑并发访问的线程安全
  2. 中介者模式注意事项:

    • 避免中介者成为性能瓶颈
    • 防止中介者逻辑过于复杂
    • 合理管理对象的生命周期
    • 考虑系统的可测试性

11.8 练习题

11.8.1 基础练习

  1. 责任链模式练习:

    • 实现一个异常处理责任链系统
    • 实现一个数据验证责任链
  2. 中介者模式练习:

    • 实现一个简单的聊天室系统
    • 实现一个表单验证中介者

11.8.2 进阶练习

  1. 责任链模式进阶:

    • 实现一个支持并行处理的责任链
    • 实现一个可配置的动态责任链系统
  2. 中介者模式进阶:

    • 实现一个支持插件的中介者系统
    • 实现一个分布式中介者架构

11.8.3 思考题

  1. 如何在责任链模式中实现请求的优先级处理?
  2. 中介者模式中如何处理循环依赖问题?
  3. 如何设计一个既支持责任链又支持中介者的混合系统?
  4. 在什么情况下应该考虑将责任链模式和中介者模式结合使用?

下一章预告: 第十二章将学习迭代器模式和访问者模式,探讨对象遍历和操作的设计技巧。 “`