11.1 本章概述
11.1.1 学习目标
本章将深入学习两种重要的行为型设计模式:
责任链模式(Chain of Responsibility Pattern)
- 理解责任链模式的核心思想和应用场景
- 掌握责任链的构建和请求传递机制
- 学习不同类型的责任链实现方式
- 了解责任链模式在实际项目中的应用
中介者模式(Mediator Pattern)
- 理解中介者模式的设计理念和优势
- 掌握中介者与同事类的协作机制
- 学习复杂交互系统的解耦设计
- 了解中介者模式的实际应用场景
11.1.2 应用场景
责任链模式适用于: - 请求处理系统(审批流程、权限验证) - 事件处理链(GUI事件、中间件) - 数据处理管道(过滤器、转换器) - 异常处理机制(错误处理链)
中介者模式适用于: - 复杂的用户界面交互 - 多对象协作系统 - 聊天室、论坛等通信系统 - 工作流管理系统
11.2 责任链模式
11.2.1 模式定义
责任链模式是一种行为型设计模式,它为请求创建了一个接收者对象的链。这种模式给予请求的类型,对请求的发送者和接收者进行解耦。在这种模式中,通常每个接收者都包含对另一个接收者的引用。如果一个对象不能处理该请求,那么它会把相同的请求传给下一个接收者,依此类推。
11.2.2 模式动机
在软件开发中,我们经常遇到这样的情况: - 有多个对象可以处理同一个请求 - 具体哪个对象处理该请求在运行时确定 - 不希望请求发送者与接收者耦合 - 希望动态地指定处理请求的对象集合
责任链模式通过将请求的发送者和接收者解耦,让多个对象都有机会处理请求,直到有一个对象处理它为止。
11.2.3 模式结构
┌─────────────────┐ ┌─────────────────┐
│ Client │───▶│ Handler │
└─────────────────┘ │ (Abstract) │
│ + handleRequest │
│ + setNext │
└─────────────────┘
△
│
┌──────────┼──────────┐
│ │ │
┌───────▼──┐ ┌─────▼────┐ ┌───▼──────┐
│Handler1 │ │Handler2 │ │Handler3 │
│ │ │ │ │ │
└──────────┘ └──────────┘ └──────────┘
主要角色: 1. Handler(抽象处理者):定义处理请求的接口,包含指向下一个处理者的引用 2. ConcreteHandler(具体处理者):实现处理请求的具体逻辑 3. Client(客户端):创建责任链并发送请求
11.2.4 Python实现示例:请求审批系统
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Dict, Any
import time
class RequestType(Enum):
"""请求类型枚举"""
LEAVE = "请假申请"
EXPENSE = "费用报销"
PURCHASE = "采购申请"
PROMOTION = "晋升申请"
BUDGET = "预算申请"
class Request:
"""请求对象"""
def __init__(self, request_id: str, request_type: RequestType,
amount: float, description: str, requester: str):
self.request_id = request_id
self.request_type = request_type
self.amount = amount
self.description = description
self.requester = requester
self.timestamp = time.time()
self.processing_history = []
def add_processing_record(self, handler_name: str, action: str, comment: str = ""):
"""添加处理记录"""
self.processing_history.append({
'handler': handler_name,
'action': action,
'comment': comment,
'timestamp': time.time()
})
def __str__(self):
return f"Request({self.request_id}, {self.request_type.value}, {self.amount}, {self.requester})"
class ApprovalHandler(ABC):
"""抽象审批处理者"""
def __init__(self, name: str):
self.name = name
self._next_handler: Optional['ApprovalHandler'] = None
def set_next(self, handler: 'ApprovalHandler') -> 'ApprovalHandler':
"""设置下一个处理者"""
self._next_handler = handler
return handler
def handle_request(self, request: Request) -> bool:
"""处理请求"""
if self.can_handle(request):
return self.process_request(request)
elif self._next_handler:
print(f"{self.name} 无法处理请求,转发给下一个处理者")
request.add_processing_record(self.name, "转发", "超出处理权限")
return self._next_handler.handle_request(request)
else:
print(f"没有合适的处理者可以处理请求: {request}")
request.add_processing_record(self.name, "拒绝", "无合适处理者")
return False
@abstractmethod
def can_handle(self, request: Request) -> bool:
"""判断是否可以处理请求"""
pass
@abstractmethod
def process_request(self, request: Request) -> bool:
"""处理请求的具体逻辑"""
pass
class TeamLeaderHandler(ApprovalHandler):
"""团队领导处理者"""
def __init__(self):
super().__init__("团队领导")
self.max_amount = 5000
self.supported_types = {RequestType.LEAVE, RequestType.EXPENSE}
def can_handle(self, request: Request) -> bool:
"""团队领导可以处理小额请假和报销"""
return (request.request_type in self.supported_types and
request.amount <= self.max_amount)
def process_request(self, request: Request) -> bool:
"""处理请求"""
print(f"{self.name} 正在处理: {request}")
# 模拟审批逻辑
if request.request_type == RequestType.LEAVE and request.amount > 3:
print(f"{self.name}: 请假天数过多,需要上级审批")
request.add_processing_record(self.name, "转发", "请假天数超过权限")
return self._next_handler.handle_request(request) if self._next_handler else False
print(f"{self.name}: 请求已批准")
request.add_processing_record(self.name, "批准", "符合团队政策")
return True
class DepartmentManagerHandler(ApprovalHandler):
"""部门经理处理者"""
def __init__(self):
super().__init__("部门经理")
self.max_amount = 20000
self.supported_types = {RequestType.LEAVE, RequestType.EXPENSE, RequestType.PURCHASE}
def can_handle(self, request: Request) -> bool:
"""部门经理可以处理中等金额的多种请求"""
return (request.request_type in self.supported_types and
request.amount <= self.max_amount)
def process_request(self, request: Request) -> bool:
"""处理请求"""
print(f"{self.name} 正在处理: {request}")
# 模拟复杂的审批逻辑
if request.request_type == RequestType.PURCHASE:
if "紧急" not in request.description:
print(f"{self.name}: 非紧急采购需要详细评估")
# 可以添加额外的验证逻辑
print(f"{self.name}: 请求已批准")
request.add_processing_record(self.name, "批准", "符合部门政策")
return True
class DirectorHandler(ApprovalHandler):
"""总监处理者"""
def __init__(self):
super().__init__("总监")
self.max_amount = 100000
self.supported_types = {RequestType.EXPENSE, RequestType.PURCHASE,
RequestType.PROMOTION, RequestType.BUDGET}
def can_handle(self, request: Request) -> bool:
"""总监可以处理大额和重要请求"""
return (request.request_type in self.supported_types and
request.amount <= self.max_amount)
def process_request(self, request: Request) -> bool:
"""处理请求"""
print(f"{self.name} 正在处理: {request}")
# 模拟高级审批逻辑
if request.request_type == RequestType.PROMOTION:
print(f"{self.name}: 晋升申请需要HR部门配合审核")
# 可以触发其他流程
if request.amount > 50000:
print(f"{self.name}: 大额申请需要额外审核")
# 可以要求额外文档或审批
print(f"{self.name}: 请求已批准")
request.add_processing_record(self.name, "批准", "符合公司政策")
return True
class CEOHandler(ApprovalHandler):
"""CEO处理者"""
def __init__(self):
super().__init__("CEO")
def can_handle(self, request: Request) -> bool:
"""CEO可以处理所有请求"""
return True
def process_request(self, request: Request) -> bool:
"""处理请求"""
print(f"{self.name} 正在处理: {request}")
# CEO级别的审批逻辑
if request.amount > 500000:
print(f"{self.name}: 超大额申请需要董事会审批")
request.add_processing_record(self.name, "转发", "需要董事会审批")
return False
print(f"{self.name}: 请求已批准")
request.add_processing_record(self.name, "批准", "CEO最终批准")
return True
class ApprovalSystem:
"""审批系统"""
def __init__(self):
self.chain = self._build_approval_chain()
self.processed_requests = []
def _build_approval_chain(self) -> ApprovalHandler:
"""构建审批链"""
team_leader = TeamLeaderHandler()
department_manager = DepartmentManagerHandler()
director = DirectorHandler()
ceo = CEOHandler()
# 构建责任链
team_leader.set_next(department_manager).set_next(director).set_next(ceo)
return team_leader
def submit_request(self, request: Request) -> bool:
"""提交请求"""
print(f"\n=== 处理请求: {request} ===")
result = self.chain.handle_request(request)
self.processed_requests.append(request)
print(f"\n--- 处理历史 ---")
for record in request.processing_history:
print(f"{record['handler']}: {record['action']} - {record['comment']}")
return result
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
total_requests = len(self.processed_requests)
approved_requests = sum(1 for req in self.processed_requests
if any(record['action'] == '批准' for record in req.processing_history))
handler_stats = {}
for request in self.processed_requests:
for record in request.processing_history:
handler = record['handler']
action = record['action']
if handler not in handler_stats:
handler_stats[handler] = {'批准': 0, '转发': 0, '拒绝': 0}
if action in handler_stats[handler]:
handler_stats[handler][action] += 1
return {
'total_requests': total_requests,
'approved_requests': approved_requests,
'approval_rate': approved_requests / total_requests if total_requests > 0 else 0,
'handler_statistics': handler_stats
}
def demonstrate_chain_of_responsibility():
"""演示责任链模式"""
print("=== 责任链模式演示:请求审批系统 ===")
# 创建审批系统
approval_system = ApprovalSystem()
# 创建各种请求
requests = [
Request("REQ001", RequestType.LEAVE, 2, "病假2天", "张三"),
Request("REQ002", RequestType.EXPENSE, 3000, "出差费用报销", "李四"),
Request("REQ003", RequestType.PURCHASE, 15000, "紧急采购办公设备", "王五"),
Request("REQ004", RequestType.PROMOTION, 0, "高级工程师晋升", "赵六"),
Request("REQ005", RequestType.BUDGET, 80000, "Q4市场推广预算", "钱七"),
Request("REQ006", RequestType.EXPENSE, 25000, "大型会议费用", "孙八"),
Request("REQ007", RequestType.LEAVE, 10, "年假申请", "周九"),
]
# 处理所有请求
for request in requests:
approval_system.submit_request(request)
# 显示统计信息
print("\n=== 系统统计信息 ===")
stats = approval_system.get_statistics()
print(f"总请求数: {stats['total_requests']}")
print(f"批准请求数: {stats['approved_requests']}")
print(f"批准率: {stats['approval_rate']:.2%}")
print("\n--- 各处理者统计 ---")
for handler, actions in stats['handler_statistics'].items():
print(f"{handler}: 批准={actions['批准']}, 转发={actions['转发']}, 拒绝={actions['拒绝']}")
if __name__ == "__main__":
demonstrate_chain_of_responsibility()
11.2.5 Java实现示例:日志处理系统
// 抽象日志处理器
abstract class LogHandler {
protected LogHandler nextHandler;
protected String name;
public LogHandler(String name) {
this.name = name;
}
public LogHandler setNext(LogHandler handler) {
this.nextHandler = handler;
return handler;
}
public void handle(LogLevel level, String message) {
if (canHandle(level)) {
processLog(level, message);
} else if (nextHandler != null) {
nextHandler.handle(level, message);
}
}
protected abstract boolean canHandle(LogLevel level);
protected abstract void processLog(LogLevel level, String message);
}
// 日志级别枚举
enum LogLevel {
DEBUG(1), INFO(2), WARN(3), ERROR(4), FATAL(5);
private final int level;
LogLevel(int level) {
this.level = level;
}
public int getLevel() {
return level;
}
}
// 控制台日志处理器
class ConsoleLogHandler extends LogHandler {
private LogLevel minLevel;
public ConsoleLogHandler(LogLevel minLevel) {
super("Console Logger");
this.minLevel = minLevel;
}
@Override
protected boolean canHandle(LogLevel level) {
return level.getLevel() >= minLevel.getLevel();
}
@Override
protected void processLog(LogLevel level, String message) {
System.out.println(String.format("[%s] %s: %s",
name, level, message));
}
}
// 文件日志处理器
class FileLogHandler extends LogHandler {
private LogLevel minLevel;
private String filename;
public FileLogHandler(LogLevel minLevel, String filename) {
super("File Logger");
this.minLevel = minLevel;
this.filename = filename;
}
@Override
protected boolean canHandle(LogLevel level) {
return level.getLevel() >= minLevel.getLevel();
}
@Override
protected void processLog(LogLevel level, String message) {
// 模拟写入文件
System.out.println(String.format("[%s] Writing to %s - %s: %s",
name, filename, level, message));
}
}
// 邮件日志处理器
class EmailLogHandler extends LogHandler {
private LogLevel minLevel;
private String emailAddress;
public EmailLogHandler(LogLevel minLevel, String emailAddress) {
super("Email Logger");
this.minLevel = minLevel;
this.emailAddress = emailAddress;
}
@Override
protected boolean canHandle(LogLevel level) {
return level.getLevel() >= minLevel.getLevel();
}
@Override
protected void processLog(LogLevel level, String message) {
// 模拟发送邮件
System.out.println(String.format("[%s] Sending email to %s - %s: %s",
name, emailAddress, level, message));
}
}
// 日志系统
class LoggingSystem {
private LogHandler handlerChain;
public LoggingSystem() {
buildHandlerChain();
}
private void buildHandlerChain() {
LogHandler consoleHandler = new ConsoleLogHandler(LogLevel.DEBUG);
LogHandler fileHandler = new FileLogHandler(LogLevel.INFO, "app.log");
LogHandler emailHandler = new EmailLogHandler(LogLevel.ERROR, "admin@company.com");
// 构建处理链
consoleHandler.setNext(fileHandler).setNext(emailHandler);
this.handlerChain = consoleHandler;
}
public void log(LogLevel level, String message) {
handlerChain.handle(level, message);
}
}
// 演示类
public class ChainOfResponsibilityDemo {
public static void main(String[] args) {
LoggingSystem logger = new LoggingSystem();
System.out.println("=== 责任链模式演示:日志处理系统 ===");
logger.log(LogLevel.DEBUG, "调试信息:用户登录");
logger.log(LogLevel.INFO, "信息:订单创建成功");
logger.log(LogLevel.WARN, "警告:内存使用率较高");
logger.log(LogLevel.ERROR, "错误:数据库连接失败");
logger.log(LogLevel.FATAL, "严重错误:系统崩溃");
}
}
11.2.6 责任链模式的优缺点
优点: 1. 降低耦合度:请求发送者和接收者解耦 2. 增强灵活性:可以动态地增加或删除处理者 3. 符合开闭原则:增加新的处理者不影响现有代码 4. 简化对象:每个处理者只需关心自己的处理逻辑 5. 责任分担:每个处理者承担明确的责任
缺点: 1. 性能影响:请求可能需要遍历整个链 2. 调试困难:运行时难以观察链的结构 3. 不保证处理:请求可能到达链尾仍未被处理 4. 系统性能:链过长会影响性能
11.2.7 适用场景
- 多个对象可以处理同一请求:但具体由哪个对象处理在运行时确定
- 不明确指定接收者:希望向多个对象中的一个提交请求
- 动态指定处理者集合:可处理请求的对象集合应被动态指定
- 请求处理的顺序很重要:需要按特定顺序尝试处理请求
11.3 中介者模式
11.3.1 模式定义
中介者模式是一种行为型设计模式,它定义了一个中介对象来封装一系列对象之间的交互。中介者使各对象不需要显式地相互引用,从而使其耦合松散,而且可以独立地改变它们之间的交互。
11.3.2 模式动机
在面向对象的软件设计中,对象之间的交互和通信是必不可少的,但是如果让对象之间直接进行交互,会导致: - 对象之间的耦合度过高 - 系统结构复杂,难以维护 - 对象的复用性降低 - 系统扩展困难
中介者模式通过引入一个中介对象来管理对象间的交互,从而降低了对象间的耦合度。
11.3.3 模式结构
┌─────────────────┐ ┌─────────────────┐
│ Colleague1 │───▶│ Mediator │◀───┐
└─────────────────┘ │ (Abstract) │ │
└─────────────────┘ │
┌─────────────────┐ △ │
│ Colleague2 │───────────┐│ │
└─────────────────┘ ││ │
││ │
┌─────────────────┐ ┌──────▼▼──────┐ │
│ Colleague3 │───▶│ ConcreteMediator │───┘
└─────────────────┘ └─────────────────┘
主要角色: 1. Mediator(抽象中介者):定义中介者接口 2. ConcreteMediator(具体中介者):实现中介者接口,协调各同事对象 3. Colleague(抽象同事类):定义同事类接口 4. ConcreteColleague(具体同事类):实现同事类,通过中介者与其他同事交互
11.3.4 Python实现示例:聊天室系统
from abc import ABC, abstractmethod
from typing import List, Dict, Optional
from datetime import datetime
from enum import Enum
class MessageType(Enum):
"""消息类型"""
TEXT = "文本"
IMAGE = "图片"
FILE = "文件"
SYSTEM = "系统"
class Message:
"""消息类"""
def __init__(self, sender: str, content: str, msg_type: MessageType = MessageType.TEXT):
self.sender = sender
self.content = content
self.msg_type = msg_type
self.timestamp = datetime.now()
self.message_id = f"{sender}_{int(self.timestamp.timestamp())}"
def __str__(self):
time_str = self.timestamp.strftime("%H:%M:%S")
return f"[{time_str}] {self.sender}: {self.content}"
class ChatMediator(ABC):
"""聊天中介者接口"""
@abstractmethod
def send_message(self, message: Message, sender: 'User') -> None:
"""发送消息"""
pass
@abstractmethod
def add_user(self, user: 'User') -> None:
"""添加用户"""
pass
@abstractmethod
def remove_user(self, user: 'User') -> None:
"""移除用户"""
pass
class User(ABC):
"""抽象用户类"""
def __init__(self, name: str, mediator: ChatMediator):
self.name = name
self.mediator = mediator
self.is_online = True
self.message_history: List[Message] = []
def send_message(self, content: str, msg_type: MessageType = MessageType.TEXT) -> None:
"""发送消息"""
if self.is_online:
message = Message(self.name, content, msg_type)
self.mediator.send_message(message, self)
else:
print(f"{self.name} 当前离线,无法发送消息")
@abstractmethod
def receive_message(self, message: Message) -> None:
"""接收消息"""
pass
def go_online(self) -> None:
"""上线"""
self.is_online = True
print(f"{self.name} 已上线")
def go_offline(self) -> None:
"""下线"""
self.is_online = False
print(f"{self.name} 已下线")
class RegularUser(User):
"""普通用户"""
def receive_message(self, message: Message) -> None:
"""接收消息"""
if self.is_online and message.sender != self.name:
self.message_history.append(message)
print(f"{self.name} 收到消息: {message}")
class AdminUser(User):
"""管理员用户"""
def __init__(self, name: str, mediator: ChatMediator):
super().__init__(name, mediator)
self.can_moderate = True
def receive_message(self, message: Message) -> None:
"""接收消息"""
if self.is_online:
self.message_history.append(message)
if message.sender != self.name:
print(f"[管理员] {self.name} 收到消息: {message}")
def moderate_user(self, username: str, action: str) -> None:
"""管理用户"""
if self.can_moderate:
system_message = Message("系统", f"管理员 {self.name} 对用户 {username} 执行了 {action}", MessageType.SYSTEM)
self.mediator.send_message(system_message, self)
class ChatRoom(ChatMediator):
"""聊天室(具体中介者)"""
def __init__(self, room_name: str):
self.room_name = room_name
self.users: Dict[str, User] = {}
self.message_history: List[Message] = []
self.banned_words = ["垃圾", "广告", "spam"]
self.max_users = 50
def add_user(self, user: User) -> None:
"""添加用户"""
if len(self.users) >= self.max_users:
print(f"聊天室 {self.room_name} 已满,无法添加用户 {user.name}")
return
if user.name not in self.users:
self.users[user.name] = user
join_message = Message("系统", f"{user.name} 加入了聊天室", MessageType.SYSTEM)
self.message_history.append(join_message)
# 通知所有在线用户
for existing_user in self.users.values():
if existing_user.is_online and existing_user.name != user.name:
existing_user.receive_message(join_message)
print(f"{user.name} 加入聊天室 {self.room_name}")
else:
print(f"用户 {user.name} 已在聊天室中")
def remove_user(self, user: User) -> None:
"""移除用户"""
if user.name in self.users:
del self.users[user.name]
leave_message = Message("系统", f"{user.name} 离开了聊天室", MessageType.SYSTEM)
self.message_history.append(leave_message)
# 通知所有在线用户
for remaining_user in self.users.values():
if remaining_user.is_online:
remaining_user.receive_message(leave_message)
print(f"{user.name} 离开聊天室 {self.room_name}")
def send_message(self, message: Message, sender: User) -> None:
"""发送消息"""
# 检查发送者是否在聊天室中
if sender.name not in self.users:
print(f"用户 {sender.name} 不在聊天室中,无法发送消息")
return
# 内容过滤
if self._is_message_appropriate(message):
self.message_history.append(message)
# 转发给所有在线用户
for user in self.users.values():
if user.is_online:
user.receive_message(message)
print(f"聊天室 {self.room_name} 广播消息: {message}")
else:
warning_message = Message("系统", f"{sender.name} 的消息包含不当内容,已被过滤", MessageType.SYSTEM)
sender.receive_message(warning_message)
def _is_message_appropriate(self, message: Message) -> bool:
"""检查消息是否合适"""
content_lower = message.content.lower()
return not any(banned_word in content_lower for banned_word in self.banned_words)
def get_online_users(self) -> List[str]:
"""获取在线用户列表"""
return [user.name for user in self.users.values() if user.is_online]
def get_room_statistics(self) -> Dict:
"""获取聊天室统计信息"""
total_messages = len(self.message_history)
system_messages = sum(1 for msg in self.message_history if msg.msg_type == MessageType.SYSTEM)
user_messages = total_messages - system_messages
user_message_counts = {}
for message in self.message_history:
if message.msg_type != MessageType.SYSTEM:
user_message_counts[message.sender] = user_message_counts.get(message.sender, 0) + 1
return {
'room_name': self.room_name,
'total_users': len(self.users),
'online_users': len(self.get_online_users()),
'total_messages': total_messages,
'user_messages': user_messages,
'system_messages': system_messages,
'user_message_counts': user_message_counts
}
class PrivateChat(ChatMediator):
"""私聊(具体中介者)"""
def __init__(self, user1: User, user2: User):
self.participants = {user1.name: user1, user2.name: user2}
self.message_history: List[Message] = []
self.chat_id = f"{user1.name}_{user2.name}"
def add_user(self, user: User) -> None:
"""私聊不支持添加用户"""
print("私聊不支持添加新用户")
def remove_user(self, user: User) -> None:
"""私聊不支持移除用户"""
print("私聊不支持移除用户")
def send_message(self, message: Message, sender: User) -> None:
"""发送私聊消息"""
if sender.name not in self.participants:
print(f"用户 {sender.name} 不在此私聊中")
return
self.message_history.append(message)
# 发送给另一个参与者
for user in self.participants.values():
if user.name != sender.name and user.is_online:
user.receive_message(message)
print(f"私聊 {self.chat_id} 消息: {message}")
def demonstrate_mediator_pattern():
"""演示中介者模式"""
print("=== 中介者模式演示:聊天室系统 ===")
# 创建聊天室
tech_room = ChatRoom("技术讨论")
# 创建用户
alice = RegularUser("Alice", tech_room)
bob = RegularUser("Bob", tech_room)
charlie = AdminUser("Charlie", tech_room)
david = RegularUser("David", tech_room)
# 用户加入聊天室
print("\n--- 用户加入聊天室 ---")
tech_room.add_user(alice)
tech_room.add_user(bob)
tech_room.add_user(charlie)
tech_room.add_user(david)
# 用户发送消息
print("\n--- 用户聊天 ---")
alice.send_message("大家好!我是新来的")
bob.send_message("欢迎Alice!")
charlie.send_message("欢迎加入技术讨论组")
# 用户下线
print("\n--- 用户状态变化 ---")
david.go_offline()
alice.send_message("David还在吗?")
# 管理员操作
print("\n--- 管理员操作 ---")
charlie.moderate_user("Bob", "警告")
# 内容过滤测试
print("\n--- 内容过滤测试 ---")
bob.send_message("这是垃圾信息")
# David重新上线
print("\n--- 用户重新上线 ---")
david.go_online()
david.send_message("我回来了!")
# 显示聊天室统计
print("\n--- 聊天室统计 ---")
stats = tech_room.get_room_statistics()
print(f"聊天室: {stats['room_name']}")
print(f"总用户数: {stats['total_users']}")
print(f"在线用户数: {stats['online_users']}")
print(f"总消息数: {stats['total_messages']}")
print(f"用户消息数: {stats['user_messages']}")
print(f"系统消息数: {stats['system_messages']}")
print("用户发言统计:", stats['user_message_counts'])
# 演示私聊
print("\n--- 私聊演示 ---")
private_chat = PrivateChat(alice, bob)
alice.mediator = private_chat
bob.mediator = private_chat
alice.send_message("Bob,我们私下聊聊")
bob.send_message("好的,Alice")
if __name__ == "__main__":
demonstrate_mediator_pattern()
11.3.5 Go实现示例:GUI组件交互
package main
import (
"fmt"
"strings"
"time"
)
// Mediator 中介者接口
type Mediator interface {
Notify(sender Component, event string)
}
// Component 组件接口
type Component interface {
SetMediator(mediator Mediator)
GetName() string
}
// BaseComponent 基础组件
type BaseComponent struct {
name string
mediator Mediator
}
func (c *BaseComponent) SetMediator(mediator Mediator) {
c.mediator = mediator
}
func (c *BaseComponent) GetName() string {
return c.name
}
// Button 按钮组件
type Button struct {
BaseComponent
enabled bool
text string
}
func NewButton(name, text string) *Button {
return &Button{
BaseComponent: BaseComponent{name: name},
enabled: true,
text: text,
}
}
func (b *Button) Click() {
if b.enabled {
fmt.Printf("按钮 %s 被点击\n", b.name)
b.mediator.Notify(b, "click")
} else {
fmt.Printf("按钮 %s 已禁用,无法点击\n", b.name)
}
}
func (b *Button) SetEnabled(enabled bool) {
b.enabled = enabled
status := "启用"
if !enabled {
status = "禁用"
}
fmt.Printf("按钮 %s 已%s\n", b.name, status)
}
func (b *Button) SetText(text string) {
b.text = text
fmt.Printf("按钮 %s 文本更新为: %s\n", b.name, text)
}
// TextBox 文本框组件
type TextBox struct {
BaseComponent
content string
maxLength int
}
func NewTextBox(name string, maxLength int) *TextBox {
return &TextBox{
BaseComponent: BaseComponent{name: name},
maxLength: maxLength,
}
}
func (t *TextBox) SetText(text string) {
if len(text) <= t.maxLength {
t.content = text
fmt.Printf("文本框 %s 内容更新为: %s\n", t.name, text)
t.mediator.Notify(t, "textChanged")
} else {
fmt.Printf("文本框 %s 内容超出最大长度限制\n", t.name)
}
}
func (t *TextBox) GetText() string {
return t.content
}
func (t *TextBox) Clear() {
t.content = ""
fmt.Printf("文本框 %s 已清空\n", t.name)
t.mediator.Notify(t, "textChanged")
}
// CheckBox 复选框组件
type CheckBox struct {
BaseComponent
checked bool
}
func NewCheckBox(name string) *CheckBox {
return &CheckBox{
BaseComponent: BaseComponent{name: name},
checked: false,
}
}
func (c *CheckBox) Toggle() {
c.checked = !c.checked
status := "选中"
if !c.checked {
status = "取消选中"
}
fmt.Printf("复选框 %s 已%s\n", c.name, status)
c.mediator.Notify(c, "stateChanged")
}
func (c *CheckBox) IsChecked() bool {
return c.checked
}
// ListBox 列表框组件
type ListBox struct {
BaseComponent
items []string
selectedIndex int
}
func NewListBox(name string) *ListBox {
return &ListBox{
BaseComponent: BaseComponent{name: name},
items: make([]string, 0),
selectedIndex: -1,
}
}
func (l *ListBox) AddItem(item string) {
l.items = append(l.items, item)
fmt.Printf("列表框 %s 添加项目: %s\n", l.name, item)
l.mediator.Notify(l, "itemAdded")
}
func (l *ListBox) SelectItem(index int) {
if index >= 0 && index < len(l.items) {
l.selectedIndex = index
fmt.Printf("列表框 %s 选中项目: %s\n", l.name, l.items[index])
l.mediator.Notify(l, "selectionChanged")
}
}
func (l *ListBox) GetSelectedItem() string {
if l.selectedIndex >= 0 && l.selectedIndex < len(l.items) {
return l.items[l.selectedIndex]
}
return ""
}
func (l *ListBox) Clear() {
l.items = make([]string, 0)
l.selectedIndex = -1
fmt.Printf("列表框 %s 已清空\n", l.name)
l.mediator.Notify(l, "cleared")
}
// DialogMediator 对话框中介者
type DialogMediator struct {
title string
nameBox *TextBox
emailBox *TextBox
submitBtn *Button
clearBtn *Button
agreeCheck *CheckBox
resultList *ListBox
}
func NewDialogMediator(title string) *DialogMediator {
dm := &DialogMediator{
title: title,
nameBox: NewTextBox("姓名输入框", 50),
emailBox: NewTextBox("邮箱输入框", 100),
submitBtn: NewButton("提交按钮", "提交"),
clearBtn: NewButton("清空按钮", "清空"),
agreeCheck: NewCheckBox("同意条款"),
resultList: NewListBox("结果列表"),
}
// 设置中介者
dm.nameBox.SetMediator(dm)
dm.emailBox.SetMediator(dm)
dm.submitBtn.SetMediator(dm)
dm.clearBtn.SetMediator(dm)
dm.agreeCheck.SetMediator(dm)
dm.resultList.SetMediator(dm)
// 初始状态:提交按钮禁用
dm.submitBtn.SetEnabled(false)
return dm
}
func (dm *DialogMediator) Notify(sender Component, event string) {
switch sender.GetName() {
case "姓名输入框", "邮箱输入框":
if event == "textChanged" {
dm.validateForm()
}
case "同意条款":
if event == "stateChanged" {
dm.validateForm()
}
case "提交按钮":
if event == "click" {
dm.submitForm()
}
case "清空按钮":
if event == "click" {
dm.clearForm()
}
}
}
func (dm *DialogMediator) validateForm() {
nameValid := len(strings.TrimSpace(dm.nameBox.GetText())) > 0
emailValid := dm.isValidEmail(dm.emailBox.GetText())
agreeChecked := dm.agreeCheck.IsChecked()
canSubmit := nameValid && emailValid && agreeChecked
dm.submitBtn.SetEnabled(canSubmit)
if canSubmit {
dm.submitBtn.SetText("提交")
} else {
reasons := make([]string, 0)
if !nameValid {
reasons = append(reasons, "姓名为空")
}
if !emailValid {
reasons = append(reasons, "邮箱格式错误")
}
if !agreeChecked {
reasons = append(reasons, "未同意条款")
}
dm.submitBtn.SetText(fmt.Sprintf("无法提交(%s)", strings.Join(reasons, ",")))
}
}
func (dm *DialogMediator) isValidEmail(email string) bool {
return strings.Contains(email, "@") && strings.Contains(email, ".")
}
func (dm *DialogMediator) submitForm() {
name := dm.nameBox.GetText()
email := dm.emailBox.GetText()
result := fmt.Sprintf("用户注册成功 - 姓名: %s, 邮箱: %s, 时间: %s",
name, email, time.Now().Format("15:04:05"))
dm.resultList.AddItem(result)
fmt.Printf("表单提交成功!\n")
// 提交后清空表单
dm.clearForm()
}
func (dm *DialogMediator) clearForm() {
dm.nameBox.Clear()
dm.emailBox.Clear()
dm.agreeCheck.checked = false
fmt.Printf("复选框 %s 已取消选中\n", dm.agreeCheck.name)
dm.validateForm()
}
// 模拟用户操作
func simulateUserInteraction(dm *DialogMediator) {
fmt.Println("\n=== 模拟用户交互 ===")
// 用户输入姓名
fmt.Println("\n--- 用户输入姓名 ---")
dm.nameBox.SetText("张三")
// 用户输入无效邮箱
fmt.Println("\n--- 用户输入无效邮箱 ---")
dm.emailBox.SetText("invalid-email")
// 用户同意条款
fmt.Println("\n--- 用户同意条款 ---")
dm.agreeCheck.Toggle()
// 用户输入有效邮箱
fmt.Println("\n--- 用户输入有效邮箱 ---")
dm.emailBox.SetText("zhangsan@example.com")
// 用户提交表单
fmt.Println("\n--- 用户提交表单 ---")
dm.submitBtn.Click()
// 用户再次填写表单
fmt.Println("\n--- 用户再次填写表单 ---")
dm.nameBox.SetText("李四")
dm.emailBox.SetText("lisi@test.com")
dm.agreeCheck.Toggle()
dm.submitBtn.Click()
// 用户清空表单
fmt.Println("\n--- 用户清空表单 ---")
dm.clearBtn.Click()
}
func main() {
fmt.Println("=== 中介者模式演示:GUI组件交互 ===")
// 创建对话框中介者
dialog := NewDialogMediator("用户注册对话框")
// 模拟用户交互
simulateUserInteraction(dialog)
fmt.Println("\n=== 演示结束 ===")
}
11.3.6 中介者模式的优缺点
优点: 1. 降低耦合度:减少对象间的直接依赖关系 2. 集中控制:将复杂的交互逻辑集中在中介者中 3. 提高复用性:同事对象可以独立变化和复用 4. 简化对象协议:多对多关系变为一对多关系 5. 符合迪米特法则:对象只与中介者通信
缺点: 1. 中介者复杂化:中介者可能变得过于复杂 2. 单点故障:中介者出问题会影响整个系统 3. 性能开销:增加了一层间接调用 4. 难以维护:复杂的中介者难以维护和扩展
11.3.7 适用场景
- 对象间交互复杂:一组对象以定义良好但复杂的方式进行通信
- 对象间依赖混乱:对象之间的依赖关系混乱且难以理解
- 对象难以复用:因为引用了太多其他对象而难以复用
- 行为分布在多个类中:想定制一个分布在多个类中的行为
11.4 责任链模式与中介者模式对比分析
11.4.1 相同点
- 行为型模式:都属于行为型设计模式
- 解耦目的:都旨在减少对象间的直接耦合
- 动态性:都支持运行时的动态配置
- 扩展性:都便于系统的扩展和维护
- 封装变化:都将变化的部分进行封装
11.4.2 不同点
对比维度 | 责任链模式 | 中介者模式 |
---|---|---|
主要目的 | 处理请求的传递 | 协调对象间交互 |
对象关系 | 链式结构 | 星型结构 |
通信方式 | 单向传递 | 双向通信 |
处理方式 | 顺序处理 | 集中协调 |
扩展方式 | 添加处理者 | 修改中介者 |
复杂度 | 链可能很长 | 中介者可能复杂 |
适用场景 | 请求处理流程 | 复杂对象交互 |
11.4.3 选择指南
选择责任链模式的情况: - 有多个对象可以处理同一请求 - 处理者的集合需要动态指定 - 不希望明确指定接收者 - 请求需要按顺序尝试处理
选择中介者模式的情况: - 对象间交互复杂且耦合度高 - 需要集中管理对象间的交互逻辑 - 对象的行为依赖于其他多个对象 - 希望提高对象的复用性
11.4.4 组合使用
责任链模式和中介者模式可以结合使用,创建更强大的系统:
class MediatedChainHandler(ABC):
"""支持中介者的责任链处理者"""
def __init__(self, name: str, mediator: 'ChainMediator'):
self.name = name
self.mediator = mediator
self._next_handler: Optional['MediatedChainHandler'] = None
def set_next(self, handler: 'MediatedChainHandler') -> 'MediatedChainHandler':
self._next_handler = handler
return handler
def handle_request(self, request: Any) -> bool:
# 通知中介者处理开始
self.mediator.notify_processing_start(self, request)
if self.can_handle(request):
result = self.process_request(request)
self.mediator.notify_processing_complete(self, request, result)
return result
elif self._next_handler:
self.mediator.notify_forwarding(self, self._next_handler, request)
return self._next_handler.handle_request(request)
else:
self.mediator.notify_processing_failed(self, request)
return False
@abstractmethod
def can_handle(self, request: Any) -> bool:
pass
@abstractmethod
def process_request(self, request: Any) -> bool:
pass
class ChainMediator:
"""责任链中介者"""
def __init__(self):
self.handlers: List[MediatedChainHandler] = []
self.processing_history: List[Dict] = []
self.observers: List['ChainObserver'] = []
def register_handler(self, handler: MediatedChainHandler):
"""注册处理者"""
self.handlers.append(handler)
def add_observer(self, observer: 'ChainObserver'):
"""添加观察者"""
self.observers.append(observer)
def notify_processing_start(self, handler: MediatedChainHandler, request: Any):
"""通知处理开始"""
event = {
'type': 'processing_start',
'handler': handler.name,
'request': str(request),
'timestamp': time.time()
}
self.processing_history.append(event)
for observer in self.observers:
observer.on_processing_start(handler, request)
def notify_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
"""通知处理完成"""
event = {
'type': 'processing_complete',
'handler': handler.name,
'request': str(request),
'result': result,
'timestamp': time.time()
}
self.processing_history.append(event)
for observer in self.observers:
observer.on_processing_complete(handler, request, result)
def notify_forwarding(self, from_handler: MediatedChainHandler,
to_handler: MediatedChainHandler, request: Any):
"""通知转发"""
event = {
'type': 'forwarding',
'from_handler': from_handler.name,
'to_handler': to_handler.name,
'request': str(request),
'timestamp': time.time()
}
self.processing_history.append(event)
for observer in self.observers:
observer.on_forwarding(from_handler, to_handler, request)
def notify_processing_failed(self, handler: MediatedChainHandler, request: Any):
"""通知处理失败"""
event = {
'type': 'processing_failed',
'handler': handler.name,
'request': str(request),
'timestamp': time.time()
}
self.processing_history.append(event)
for observer in self.observers:
observer.on_processing_failed(handler, request)
def get_processing_statistics(self) -> Dict:
"""获取处理统计"""
total_events = len(self.processing_history)
event_counts = {}
handler_stats = {}
for event in self.processing_history:
event_type = event['type']
event_counts[event_type] = event_counts.get(event_type, 0) + 1
if 'handler' in event:
handler = event['handler']
if handler not in handler_stats:
handler_stats[handler] = {'processed': 0, 'forwarded': 0, 'failed': 0}
if event_type == 'processing_complete':
handler_stats[handler]['processed'] += 1
elif event_type == 'forwarding':
handler_stats[handler]['forwarded'] += 1
elif event_type == 'processing_failed':
handler_stats[handler]['failed'] += 1
return {
'total_events': total_events,
'event_counts': event_counts,
'handler_statistics': handler_stats
}
class ChainObserver(ABC):
"""责任链观察者接口"""
@abstractmethod
def on_processing_start(self, handler: MediatedChainHandler, request: Any):
pass
@abstractmethod
def on_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
pass
@abstractmethod
def on_forwarding(self, from_handler: MediatedChainHandler,
to_handler: MediatedChainHandler, request: Any):
pass
@abstractmethod
def on_processing_failed(self, handler: MediatedChainHandler, request: Any):
pass
class LoggingObserver(ChainObserver):
"""日志观察者"""
def on_processing_start(self, handler: MediatedChainHandler, request: Any):
print(f"[LOG] {handler.name} 开始处理请求: {request}")
def on_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
status = "成功" if result else "失败"
print(f"[LOG] {handler.name} 处理{status}: {request}")
def on_forwarding(self, from_handler: MediatedChainHandler,
to_handler: MediatedChainHandler, request: Any):
print(f"[LOG] {from_handler.name} 转发请求到 {to_handler.name}: {request}")
def on_processing_failed(self, handler: MediatedChainHandler, request: Any):
print(f"[LOG] {handler.name} 处理失败,无后续处理者: {request}")
class MetricsObserver(ChainObserver):
"""指标观察者"""
def __init__(self):
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'forwarded_requests': 0
}
def on_processing_start(self, handler: MediatedChainHandler, request: Any):
self.metrics['total_requests'] += 1
def on_processing_complete(self, handler: MediatedChainHandler, request: Any, result: bool):
if result:
self.metrics['successful_requests'] += 1
else:
self.metrics['failed_requests'] += 1
def on_forwarding(self, from_handler: MediatedChainHandler,
to_handler: MediatedChainHandler, request: Any):
self.metrics['forwarded_requests'] += 1
def on_processing_failed(self, handler: MediatedChainHandler, request: Any):
self.metrics['failed_requests'] += 1
def get_metrics(self) -> Dict:
return self.metrics.copy()
def demonstrate_combined_patterns():
"""演示责任链模式与中介者模式的组合使用"""
print("=== 责任链模式与中介者模式组合演示 ===")
# 创建中介者
mediator = ChainMediator()
# 创建观察者
logging_observer = LoggingObserver()
metrics_observer = MetricsObserver()
mediator.add_observer(logging_observer)
mediator.add_observer(metrics_observer)
# 创建处理者(这里简化实现)
class SimpleHandler(MediatedChainHandler):
def __init__(self, name: str, mediator: ChainMediator, max_value: int):
super().__init__(name, mediator)
self.max_value = max_value
def can_handle(self, request: Any) -> bool:
return isinstance(request, int) and request <= self.max_value
def process_request(self, request: Any) -> bool:
print(f"{self.name} 处理请求: {request}")
return True
# 创建处理链
handler1 = SimpleHandler("低级处理者", mediator, 10)
handler2 = SimpleHandler("中级处理者", mediator, 50)
handler3 = SimpleHandler("高级处理者", mediator, 100)
handler1.set_next(handler2).set_next(handler3)
# 注册处理者
mediator.register_handler(handler1)
mediator.register_handler(handler2)
mediator.register_handler(handler3)
# 处理请求
requests = [5, 25, 75, 150]
for request in requests:
print(f"\n--- 处理请求: {request} ---")
handler1.handle_request(request)
# 显示统计信息
print("\n=== 处理统计 ===")
stats = mediator.get_processing_statistics()
print(f"总事件数: {stats['total_events']}")
print(f"事件统计: {stats['event_counts']}")
print(f"处理者统计: {stats['handler_statistics']}")
print("\n=== 指标统计 ===")
metrics = metrics_observer.get_metrics()
for key, value in metrics.items():
print(f"{key}: {value}")
if __name__ == "__main__":
demonstrate_combined_patterns()
11.5 高级应用技巧
11.5.1 责任链模式优化技巧
- 处理者缓存:缓存处理者实例减少创建开销
- 链路优化:根据请求类型动态调整链路顺序
- 并行处理:支持多个处理者并行处理请求
- 链路监控:监控链路性能和处理效率
- 异常处理:完善的异常处理和恢复机制
11.5.2 中介者模式优化技巧
- 中介者分层:将复杂中介者分解为多个层次
- 事件驱动:使用事件驱动机制减少直接调用
- 异步处理:支持异步消息处理
- 状态管理:合理管理中介者的状态
- 性能监控:监控中介者的性能瓶颈
11.5.3 内存和性能优化
class OptimizedChainManager:
"""优化的责任链管理器"""
def __init__(self):
self._handler_cache = {}
self._chain_cache = {}
self._performance_metrics = {}
def get_handler(self, handler_type: str, config: Dict):
"""获取处理者(带缓存)"""
cache_key = f"{handler_type}_{hash(str(config))}"
if cache_key not in self._handler_cache:
# 根据类型创建处理者
self._handler_cache[cache_key] = self._create_handler(handler_type, config)
return self._handler_cache[cache_key]
def build_optimized_chain(self, request_type: str, handlers_config: List[Dict]):
"""构建优化的处理链"""
cache_key = f"{request_type}_{hash(str(handlers_config))}"
if cache_key not in self._chain_cache:
# 根据历史性能数据优化链路顺序
optimized_config = self._optimize_chain_order(request_type, handlers_config)
# 构建链路
chain = self._build_chain(optimized_config)
self._chain_cache[cache_key] = chain
return self._chain_cache[cache_key]
def _optimize_chain_order(self, request_type: str, handlers_config: List[Dict]) -> List[Dict]:
"""根据性能数据优化链路顺序"""
if request_type not in self._performance_metrics:
return handlers_config
metrics = self._performance_metrics[request_type]
# 根据成功率和处理时间排序
def sort_key(config):
handler_name = config['name']
if handler_name in metrics:
success_rate = metrics[handler_name].get('success_rate', 0)
avg_time = metrics[handler_name].get('avg_processing_time', float('inf'))
return (-success_rate, avg_time) # 成功率高、时间短的优先
return (0, float('inf'))
return sorted(handlers_config, key=sort_key)
class OptimizedMediator:
"""优化的中介者"""
def __init__(self):
self._event_queue = []
self._async_handlers = {}
self._performance_monitor = PerformanceMonitor()
def notify_async(self, sender: Component, event: str, data: Any = None):
"""异步通知"""
event_obj = {
'sender': sender,
'event': event,
'data': data,
'timestamp': time.time()
}
self._event_queue.append(event_obj)
self._process_event_queue()
def register_async_handler(self, event_type: str, handler: Callable):
"""注册异步处理器"""
if event_type not in self._async_handlers:
self._async_handlers[event_type] = []
self._async_handlers[event_type].append(handler)
def _process_event_queue(self):
"""处理事件队列"""
while self._event_queue:
event = self._event_queue.pop(0)
self._handle_event(event)
def _handle_event(self, event: Dict):
"""处理单个事件"""
event_type = event['event']
start_time = time.time()
if event_type in self._async_handlers:
for handler in self._async_handlers[event_type]:
try:
handler(event)
except Exception as e:
print(f"事件处理错误: {e}")
processing_time = time.time() - start_time
self._performance_monitor.record_event_processing(event_type, processing_time)
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self._metrics = {}
def record_event_processing(self, event_type: str, processing_time: float):
"""记录事件处理性能"""
if event_type not in self._metrics:
self._metrics[event_type] = {
'total_count': 0,
'total_time': 0,
'avg_time': 0,
'max_time': 0,
'min_time': float('inf')
}
metrics = self._metrics[event_type]
metrics['total_count'] += 1
metrics['total_time'] += processing_time
metrics['avg_time'] = metrics['total_time'] / metrics['total_count']
metrics['max_time'] = max(metrics['max_time'], processing_time)
metrics['min_time'] = min(metrics['min_time'], processing_time)
def get_performance_report(self) -> Dict:
"""获取性能报告"""
return self._metrics.copy()
11.6 实际应用案例
11.6.1 Web请求处理管道
class HTTPRequest:
"""HTTP请求对象"""
def __init__(self, method: str, path: str, headers: Dict, body: str = ""):
self.method = method
self.path = path
self.headers = headers
self.body = body
self.context = {} # 用于在处理器间传递数据
def __str__(self):
return f"{self.method} {self.path}"
class HTTPResponse:
"""HTTP响应对象"""
def __init__(self, status_code: int = 200, headers: Dict = None, body: str = ""):
self.status_code = status_code
self.headers = headers or {}
self.body = body
class MiddlewareHandler(ABC):
"""中间件处理器基类"""
def __init__(self, name: str):
self.name = name
self._next_handler: Optional['MiddlewareHandler'] = None
def set_next(self, handler: 'MiddlewareHandler') -> 'MiddlewareHandler':
self._next_handler = handler
return handler
def handle(self, request: HTTPRequest) -> Optional[HTTPResponse]:
"""处理请求"""
# 前置处理
response = self.before_process(request)
if response:
return response
# 调用下一个处理器
if self._next_handler:
response = self._next_handler.handle(request)
else:
response = HTTPResponse(404, body="Not Found")
# 后置处理
return self.after_process(request, response)
@abstractmethod
def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
"""前置处理,返回Response表示提前结束"""
pass
def after_process(self, request: HTTPRequest, response: HTTPResponse) -> HTTPResponse:
"""后置处理"""
return response
class AuthenticationMiddleware(MiddlewareHandler):
"""认证中间件"""
def __init__(self):
super().__init__("认证中间件")
self.protected_paths = ["/api/", "/admin/"]
def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
# 检查是否需要认证
needs_auth = any(request.path.startswith(path) for path in self.protected_paths)
if needs_auth:
auth_header = request.headers.get('Authorization')
if not auth_header or not self._validate_token(auth_header):
return HTTPResponse(401, body="Unauthorized")
# 将用户信息添加到上下文
request.context['user'] = self._get_user_from_token(auth_header)
return None
def _validate_token(self, auth_header: str) -> bool:
# 模拟token验证
return auth_header.startswith("Bearer ") and len(auth_header) > 10
def _get_user_from_token(self, auth_header: str) -> Dict:
# 模拟从token获取用户信息
return {'id': 1, 'username': 'user1', 'role': 'admin'}
class RateLimitMiddleware(MiddlewareHandler):
"""限流中间件"""
def __init__(self, max_requests: int = 100, time_window: int = 60):
super().__init__("限流中间件")
self.max_requests = max_requests
self.time_window = time_window
self.request_counts = {} # IP -> [(timestamp, count)]
def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
client_ip = request.headers.get('X-Real-IP', '127.0.0.1')
current_time = time.time()
# 清理过期记录
if client_ip in self.request_counts:
self.request_counts[client_ip] = [
(ts, count) for ts, count in self.request_counts[client_ip]
if current_time - ts < self.time_window
]
# 计算当前时间窗口内的请求数
if client_ip not in self.request_counts:
self.request_counts[client_ip] = []
current_requests = sum(count for _, count in self.request_counts[client_ip])
if current_requests >= self.max_requests:
return HTTPResponse(429, body="Too Many Requests")
# 记录当前请求
self.request_counts[client_ip].append((current_time, 1))
return None
class LoggingMiddleware(MiddlewareHandler):
"""日志中间件"""
def __init__(self):
super().__init__("日志中间件")
def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
print(f"[REQUEST] {request} - Headers: {request.headers}")
request.context['start_time'] = time.time()
return None
def after_process(self, request: HTTPRequest, response: HTTPResponse) -> HTTPResponse:
processing_time = time.time() - request.context.get('start_time', 0)
print(f"[RESPONSE] {request} - Status: {response.status_code} - Time: {processing_time:.3f}s")
return response
class CORSMiddleware(MiddlewareHandler):
"""CORS中间件"""
def __init__(self, allowed_origins: List[str] = None):
super().__init__("CORS中间件")
self.allowed_origins = allowed_origins or ['*']
def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
# 处理预检请求
if request.method == 'OPTIONS':
return HTTPResponse(200, headers=self._get_cors_headers(request))
return None
def after_process(self, request: HTTPRequest, response: HTTPResponse) -> HTTPResponse:
# 添加CORS头
cors_headers = self._get_cors_headers(request)
response.headers.update(cors_headers)
return response
def _get_cors_headers(self, request: HTTPRequest) -> Dict[str, str]:
origin = request.headers.get('Origin', '')
if '*' in self.allowed_origins or origin in self.allowed_origins:
return {
'Access-Control-Allow-Origin': origin or '*',
'Access-Control-Allow-Methods': 'GET, POST, PUT, DELETE, OPTIONS',
'Access-Control-Allow-Headers': 'Content-Type, Authorization',
'Access-Control-Max-Age': '86400'
}
return {}
class RouteHandler(MiddlewareHandler):
"""路由处理器"""
def __init__(self):
super().__init__("路由处理器")
self.routes = {
'/': self._handle_home,
'/api/users': self._handle_users,
'/api/orders': self._handle_orders,
'/admin/dashboard': self._handle_admin_dashboard
}
def before_process(self, request: HTTPRequest) -> Optional[HTTPResponse]:
handler = self.routes.get(request.path)
if handler:
return handler(request)
return HTTPResponse(404, body="Not Found")
def _handle_home(self, request: HTTPRequest) -> HTTPResponse:
return HTTPResponse(200, body="Welcome to the API")
def _handle_users(self, request: HTTPRequest) -> HTTPResponse:
user = request.context.get('user')
if user:
return HTTPResponse(200, body=f"Hello, {user['username']}!")
return HTTPResponse(200, body="Users endpoint")
def _handle_orders(self, request: HTTPRequest) -> HTTPResponse:
return HTTPResponse(200, body="Orders endpoint")
def _handle_admin_dashboard(self, request: HTTPRequest) -> HTTPResponse:
user = request.context.get('user')
if user and user.get('role') == 'admin':
return HTTPResponse(200, body="Admin Dashboard")
return HTTPResponse(403, body="Forbidden")
class WebServer:
"""Web服务器"""
def __init__(self):
self.middleware_chain = self._build_middleware_chain()
def _build_middleware_chain(self) -> MiddlewareHandler:
"""构建中间件链"""
logging_middleware = LoggingMiddleware()
cors_middleware = CORSMiddleware(['http://localhost:3000', 'https://example.com'])
rate_limit_middleware = RateLimitMiddleware(max_requests=10, time_window=60)
auth_middleware = AuthenticationMiddleware()
route_handler = RouteHandler()
# 构建处理链
logging_middleware.set_next(cors_middleware) \
.set_next(rate_limit_middleware) \
.set_next(auth_middleware) \
.set_next(route_handler)
return logging_middleware
def handle_request(self, request: HTTPRequest) -> HTTPResponse:
"""处理HTTP请求"""
return self.middleware_chain.handle(request)
def demonstrate_web_middleware():
"""演示Web中间件责任链"""
print("=== Web请求处理管道演示 ===")
server = WebServer()
# 测试请求
test_requests = [
HTTPRequest('GET', '/', {'X-Real-IP': '192.168.1.1'}),
HTTPRequest('GET', '/api/users', {
'X-Real-IP': '192.168.1.1',
'Authorization': 'Bearer valid_token_123'
}),
HTTPRequest('GET', '/api/users', {'X-Real-IP': '192.168.1.1'}), # 无认证
HTTPRequest('GET', '/admin/dashboard', {
'X-Real-IP': '192.168.1.2',
'Authorization': 'Bearer valid_token_123'
}),
HTTPRequest('OPTIONS', '/api/users', {
'Origin': 'http://localhost:3000'
}),
]
# 处理请求
for i, request in enumerate(test_requests, 1):
print(f"\n--- 请求 {i} ---")
response = server.handle_request(request)
print(f"响应: {response.status_code} - {response.body}")
if response.headers:
print(f"响应头: {response.headers}")
# 测试限流
print("\n--- 限流测试 ---")
for i in range(12): # 超过限制的10个请求
request = HTTPRequest('GET', '/', {'X-Real-IP': '192.168.1.3'})
response = server.handle_request(request)
if response.status_code == 429:
print(f"请求 {i+1}: 被限流")
break
else:
print(f"请求 {i+1}: 正常处理")
if __name__ == "__main__":
demonstrate_web_middleware()
11.7 本章总结
11.7.1 核心概念回顾
责任链模式: - 将请求的发送者和接收者解耦 - 多个对象都有机会处理请求 - 请求沿着链传递直到被处理 - 支持动态配置处理者链
中介者模式: - 定义对象间的交互方式 - 将多对多关系转为一对多关系 - 集中管理复杂的交互逻辑 - 提高对象的复用性和可维护性
11.7.2 最佳实践
责任链模式最佳实践:
- 合理设计链的长度和顺序
- 实现完善的异常处理机制
- 考虑性能优化和缓存策略
- 提供链路监控和调试功能
中介者模式最佳实践:
- 避免中介者过于复杂
- 使用事件驱动减少耦合
- 实现异步处理提高性能
- 合理分层管理复杂交互
11.7.3 实际应用建议
选择合适的模式:
- 根据问题的性质选择模式
- 考虑系统的复杂度和扩展性
- 评估维护成本和性能影响
模式组合使用:
- 责任链可以与中介者结合
- 使用观察者模式增强监控
- 结合策略模式优化处理逻辑
11.7.4 注意事项
责任链模式注意事项:
- 避免链过长影响性能
- 确保请求能被正确处理
- 实现适当的错误处理机制
- 考虑并发访问的线程安全
中介者模式注意事项:
- 避免中介者成为性能瓶颈
- 防止中介者逻辑过于复杂
- 合理管理对象的生命周期
- 考虑系统的可测试性
11.8 练习题
11.8.1 基础练习
责任链模式练习:
- 实现一个异常处理责任链系统
- 实现一个数据验证责任链
中介者模式练习:
- 实现一个简单的聊天室系统
- 实现一个表单验证中介者
11.8.2 进阶练习
责任链模式进阶:
- 实现一个支持并行处理的责任链
- 实现一个可配置的动态责任链系统
中介者模式进阶:
- 实现一个支持插件的中介者系统
- 实现一个分布式中介者架构
11.8.3 思考题
- 如何在责任链模式中实现请求的优先级处理?
- 中介者模式中如何处理循环依赖问题?
- 如何设计一个既支持责任链又支持中介者的混合系统?
- 在什么情况下应该考虑将责任链模式和中介者模式结合使用?
下一章预告: 第十二章将学习迭代器模式和访问者模式,探讨对象遍历和操作的设计技巧。 “`