9.1 章节概述
本章将深入学习两个重要的行为型设计模式:观察者模式和命令模式。这两个模式在现代软件开发中应用广泛,特别是在事件驱动编程、GUI应用程序、以及分布式系统中发挥着重要作用。
9.1.1 学习目标
- 理解观察者模式的设计思想和实现机制
- 掌握命令模式的核心概念和应用场景
- 学会在实际项目中应用这两种模式
- 了解模式的变体和最佳实践
- 能够选择合适的模式解决具体问题
9.1.2 应用场景预览
- 观察者模式:事件系统、MVC架构、发布-订阅系统
- 命令模式:撤销/重做功能、宏命令、队列操作
9.2 观察者模式(Observer Pattern)
9.2.1 模式定义与动机
定义: 观察者模式定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
动机: - 对象间存在一对多的依赖关系 - 当一个对象状态改变时,需要通知多个对象 - 希望降低对象间的耦合度 - 支持广播通信
9.2.2 模式结构
观察者模式包含以下角色:
- Subject(主题/被观察者):维护观察者列表,提供注册和删除观察者的方法
- Observer(观察者):定义更新接口,当主题状态改变时被调用
- ConcreteSubject(具体主题):存储状态,状态改变时通知观察者
- ConcreteObserver(具体观察者):实现更新接口,保持与主题状态的一致性
9.2.3 Python实现示例:股票价格监控系统
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
import threading
import time
import random
# 观察者接口
class Observer(ABC):
"""观察者抽象基类"""
@abstractmethod
def update(self, subject: 'Subject', *args, **kwargs) -> None:
"""当主题状态改变时被调用"""
pass
@abstractmethod
def get_observer_id(self) -> str:
"""获取观察者ID"""
pass
# 主题接口
class Subject(ABC):
"""主题抽象基类"""
def __init__(self):
self._observers: List[Observer] = []
self._lock = threading.Lock()
def attach(self, observer: Observer) -> bool:
"""添加观察者"""
with self._lock:
if observer not in self._observers:
self._observers.append(observer)
print(f"观察者 {observer.get_observer_id()} 已注册")
return True
return False
def detach(self, observer: Observer) -> bool:
"""移除观察者"""
with self._lock:
if observer in self._observers:
self._observers.remove(observer)
print(f"观察者 {observer.get_observer_id()} 已移除")
return True
return False
def notify(self, *args, **kwargs) -> None:
"""通知所有观察者"""
with self._lock:
observers_copy = self._observers.copy()
for observer in observers_copy:
try:
observer.update(self, *args, **kwargs)
except Exception as e:
print(f"通知观察者 {observer.get_observer_id()} 时发生错误: {e}")
def get_observer_count(self) -> int:
"""获取观察者数量"""
with self._lock:
return len(self._observers)
# 股票价格变化类型
class PriceChangeType(Enum):
INCREASE = "上涨"
DECREASE = "下跌"
STABLE = "持平"
# 具体主题:股票
class Stock(Subject):
"""股票类(具体主题)"""
def __init__(self, symbol: str, name: str, initial_price: float):
super().__init__()
self._symbol = symbol
self._name = name
self._price = initial_price
self._previous_price = initial_price
self._change_history: List[Dict[str, Any]] = []
self._last_update = datetime.now()
@property
def symbol(self) -> str:
return self._symbol
@property
def name(self) -> str:
return self._name
@property
def price(self) -> float:
return self._price
@property
def previous_price(self) -> float:
return self._previous_price
@property
def change_percentage(self) -> float:
"""计算价格变化百分比"""
if self._previous_price == 0:
return 0
return ((self._price - self._previous_price) / self._previous_price) * 100
@property
def change_type(self) -> PriceChangeType:
"""获取价格变化类型"""
if self._price > self._previous_price:
return PriceChangeType.INCREASE
elif self._price < self._previous_price:
return PriceChangeType.DECREASE
else:
return PriceChangeType.STABLE
def set_price(self, new_price: float) -> None:
"""设置新价格并通知观察者"""
if new_price < 0:
raise ValueError("股票价格不能为负数")
self._previous_price = self._price
self._price = new_price
self._last_update = datetime.now()
# 记录变化历史
change_record = {
"timestamp": self._last_update,
"previous_price": self._previous_price,
"new_price": self._price,
"change_percentage": self.change_percentage,
"change_type": self.change_type
}
self._change_history.append(change_record)
# 通知观察者
self.notify(
price_change=change_record,
stock_info=self.get_stock_info()
)
def get_stock_info(self) -> Dict[str, Any]:
"""获取股票信息"""
return {
"symbol": self._symbol,
"name": self._name,
"current_price": self._price,
"previous_price": self._previous_price,
"change_percentage": self.change_percentage,
"change_type": self.change_type.value,
"last_update": self._last_update,
"observer_count": self.get_observer_count()
}
def get_change_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""获取价格变化历史"""
if limit:
return self._change_history[-limit:]
return self._change_history.copy()
# 具体观察者:投资者
class Investor(Observer):
"""投资者类(具体观察者)"""
def __init__(self, name: str, investment_strategy: str = "保守型"):
self._name = name
self._investment_strategy = investment_strategy
self._portfolio: Dict[str, Dict[str, Any]] = {} # 投资组合
self._notifications: List[Dict[str, Any]] = [] # 通知历史
self._alert_thresholds: Dict[str, float] = {} # 价格警报阈值
def get_observer_id(self) -> str:
return f"Investor-{self._name}"
def update(self, subject: Subject, *args, **kwargs) -> None:
"""接收股票价格更新通知"""
if isinstance(subject, Stock):
price_change = kwargs.get('price_change')
stock_info = kwargs.get('stock_info')
if price_change and stock_info:
self._process_price_update(subject, price_change, stock_info)
def _process_price_update(self, stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> None:
"""处理价格更新"""
notification = {
"timestamp": datetime.now(),
"stock_symbol": stock.symbol,
"stock_name": stock.name,
"price_change": price_change,
"stock_info": stock_info,
"investor_action": self._determine_action(stock, price_change)
}
self._notifications.append(notification)
# 检查价格警报
self._check_price_alerts(stock, price_change)
# 输出通知信息
self._display_notification(notification)
def _determine_action(self, stock: Stock, price_change: Dict[str, Any]) -> str:
"""根据投资策略确定行动"""
change_percentage = price_change['change_percentage']
change_type = price_change['change_type']
if self._investment_strategy == "激进型":
if change_type == PriceChangeType.INCREASE and change_percentage > 5:
return "考虑卖出获利"
elif change_type == PriceChangeType.DECREASE and change_percentage < -5:
return "考虑抄底买入"
elif self._investment_strategy == "保守型":
if change_type == PriceChangeType.INCREASE and change_percentage > 10:
return "考虑部分获利了结"
elif change_type == PriceChangeType.DECREASE and change_percentage < -10:
return "考虑止损"
return "继续观察"
def _check_price_alerts(self, stock: Stock, price_change: Dict[str, Any]) -> None:
"""检查价格警报"""
symbol = stock.symbol
current_price = price_change['new_price']
if symbol in self._alert_thresholds:
threshold = self._alert_thresholds[symbol]
if current_price <= threshold:
print(f"🚨 价格警报!{stock.name}({symbol}) 价格 ${current_price:.2f} 已达到警报阈值 ${threshold:.2f}")
def _display_notification(self, notification: Dict[str, Any]) -> None:
"""显示通知信息"""
stock_info = notification['stock_info']
price_change = notification['price_change']
change_symbol = "📈" if price_change['change_type'] == PriceChangeType.INCREASE else "📉" if price_change['change_type'] == PriceChangeType.DECREASE else "➡️"
print(f"{change_symbol} [{self._name}] {stock_info['name']}({stock_info['symbol']}) ")
print(f" 价格: ${price_change['previous_price']:.2f} → ${price_change['new_price']:.2f} ")
print(f" 变化: {price_change['change_percentage']:.2f}% | 策略: {self._investment_strategy} | 行动: {notification['investor_action']}")
def set_price_alert(self, stock_symbol: str, threshold: float) -> None:
"""设置价格警报"""
self._alert_thresholds[stock_symbol] = threshold
print(f"为 {stock_symbol} 设置价格警报阈值: ${threshold:.2f}")
def add_to_portfolio(self, stock_symbol: str, shares: int, purchase_price: float) -> None:
"""添加股票到投资组合"""
self._portfolio[stock_symbol] = {
"shares": shares,
"purchase_price": purchase_price,
"purchase_date": datetime.now()
}
print(f"添加 {shares} 股 {stock_symbol} 到投资组合,购买价格: ${purchase_price:.2f}")
def get_portfolio_value(self, current_prices: Dict[str, float]) -> Dict[str, Any]:
"""计算投资组合价值"""
total_value = 0
total_cost = 0
portfolio_details = []
for symbol, holding in self._portfolio.items():
if symbol in current_prices:
current_price = current_prices[symbol]
shares = holding['shares']
purchase_price = holding['purchase_price']
current_value = shares * current_price
cost = shares * purchase_price
profit_loss = current_value - cost
profit_loss_percentage = (profit_loss / cost) * 100 if cost > 0 else 0
portfolio_details.append({
"symbol": symbol,
"shares": shares,
"purchase_price": purchase_price,
"current_price": current_price,
"current_value": current_value,
"cost": cost,
"profit_loss": profit_loss,
"profit_loss_percentage": profit_loss_percentage
})
total_value += current_value
total_cost += cost
total_profit_loss = total_value - total_cost
total_profit_loss_percentage = (total_profit_loss / total_cost) * 100 if total_cost > 0 else 0
return {
"investor_name": self._name,
"total_value": total_value,
"total_cost": total_cost,
"total_profit_loss": total_profit_loss,
"total_profit_loss_percentage": total_profit_loss_percentage,
"holdings": portfolio_details
}
def get_notification_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""获取通知历史"""
if limit:
return self._notifications[-limit:]
return self._notifications.copy()
# 具体观察者:交易系统
class TradingSystem(Observer):
"""自动交易系统(具体观察者)"""
def __init__(self, system_name: str):
self._system_name = system_name
self._trading_rules: List[Dict[str, Any]] = []
self._executed_trades: List[Dict[str, Any]] = []
self._enabled = True
def get_observer_id(self) -> str:
return f"TradingSystem-{self._system_name}"
def update(self, subject: Subject, *args, **kwargs) -> None:
"""接收股票价格更新并执行交易规则"""
if not self._enabled:
return
if isinstance(subject, Stock):
price_change = kwargs.get('price_change')
stock_info = kwargs.get('stock_info')
if price_change and stock_info:
self._evaluate_trading_rules(subject, price_change, stock_info)
def _evaluate_trading_rules(self, stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> None:
"""评估交易规则"""
for rule in self._trading_rules:
if self._rule_matches(rule, stock, price_change, stock_info):
self._execute_trade(rule, stock, price_change, stock_info)
def _rule_matches(self, rule: Dict[str, Any], stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> bool:
"""检查规则是否匹配"""
# 检查股票符号
if rule.get('stock_symbol') and rule['stock_symbol'] != stock.symbol:
return False
# 检查价格变化百分比
change_percentage = price_change['change_percentage']
if 'min_change_percentage' in rule and change_percentage < rule['min_change_percentage']:
return False
if 'max_change_percentage' in rule and change_percentage > rule['max_change_percentage']:
return False
# 检查价格范围
current_price = price_change['new_price']
if 'min_price' in rule and current_price < rule['min_price']:
return False
if 'max_price' in rule and current_price > rule['max_price']:
return False
# 检查变化类型
if 'change_type' in rule and price_change['change_type'] != rule['change_type']:
return False
return True
def _execute_trade(self, rule: Dict[str, Any], stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> None:
"""执行交易"""
trade = {
"timestamp": datetime.now(),
"rule_id": rule['id'],
"stock_symbol": stock.symbol,
"stock_name": stock.name,
"action": rule['action'],
"quantity": rule.get('quantity', 100),
"price": price_change['new_price'],
"reason": f"价格变化 {price_change['change_percentage']:.2f}%"
}
self._executed_trades.append(trade)
print(f"🤖 [{self._system_name}] 执行交易:")
print(f" {trade['action']} {trade['quantity']} 股 {trade['stock_name']}({trade['stock_symbol']})")
print(f" 价格: ${trade['price']:.2f} | 原因: {trade['reason']}")
def add_trading_rule(self, rule_id: str, action: str, **conditions) -> None:
"""添加交易规则"""
rule = {
"id": rule_id,
"action": action, # "BUY" 或 "SELL"
**conditions
}
self._trading_rules.append(rule)
print(f"添加交易规则: {rule_id} - {action}")
def enable_trading(self) -> None:
"""启用交易"""
self._enabled = True
print(f"交易系统 {self._system_name} 已启用")
def disable_trading(self) -> None:
"""禁用交易"""
self._enabled = False
print(f"交易系统 {self._system_name} 已禁用")
def get_trade_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
"""获取交易历史"""
if limit:
return self._executed_trades[-limit:]
return self._executed_trades.copy()
# 股票市场模拟器
class StockMarketSimulator:
"""股票市场模拟器"""
def __init__(self):
self._stocks: Dict[str, Stock] = {}
self._running = False
self._simulation_thread: Optional[threading.Thread] = None
def add_stock(self, symbol: str, name: str, initial_price: float) -> Stock:
"""添加股票"""
stock = Stock(symbol, name, initial_price)
self._stocks[symbol] = stock
print(f"添加股票: {name}({symbol}) 初始价格: ${initial_price:.2f}")
return stock
def get_stock(self, symbol: str) -> Optional[Stock]:
"""获取股票"""
return self._stocks.get(symbol)
def start_simulation(self, duration_seconds: int = 30, update_interval: float = 2.0) -> None:
"""开始市场模拟"""
if self._running:
print("模拟已在运行中")
return
self._running = True
self._simulation_thread = threading.Thread(
target=self._simulate_market,
args=(duration_seconds, update_interval)
)
self._simulation_thread.start()
print(f"开始股票市场模拟,持续 {duration_seconds} 秒")
def stop_simulation(self) -> None:
"""停止市场模拟"""
self._running = False
if self._simulation_thread:
self._simulation_thread.join()
print("股票市场模拟已停止")
def _simulate_market(self, duration_seconds: int, update_interval: float) -> None:
"""模拟市场价格变化"""
start_time = time.time()
while self._running and (time.time() - start_time) < duration_seconds:
for stock in self._stocks.values():
# 生成随机价格变化
change_factor = random.uniform(-0.05, 0.05) # -5% 到 +5%
new_price = stock.price * (1 + change_factor)
new_price = max(0.01, new_price) # 确保价格不为负
stock.set_price(new_price)
time.sleep(update_interval)
self._running = False
def get_current_prices(self) -> Dict[str, float]:
"""获取当前所有股票价格"""
return {symbol: stock.price for symbol, stock in self._stocks.items()}
def get_market_summary(self) -> Dict[str, Any]:
"""获取市场摘要"""
summary = {
"total_stocks": len(self._stocks),
"stocks": [],
"market_status": "运行中" if self._running else "已停止"
}
for stock in self._stocks.values():
stock_info = stock.get_stock_info()
summary["stocks"].append(stock_info)
return summary
# 使用示例
def demonstrate_observer_pattern():
"""演示观察者模式"""
print("=== 观察者模式演示:股票价格监控系统 ===")
# 创建股票市场模拟器
market = StockMarketSimulator()
# 添加股票
apple_stock = market.add_stock("AAPL", "苹果公司", 150.00)
google_stock = market.add_stock("GOOGL", "谷歌", 2800.00)
tesla_stock = market.add_stock("TSLA", "特斯拉", 800.00)
# 创建投资者观察者
conservative_investor = Investor("张三", "保守型")
aggressive_investor = Investor("李四", "激进型")
# 创建交易系统观察者
trading_system = TradingSystem("AlgoTrader-1")
# 为投资者设置投资组合和警报
conservative_investor.add_to_portfolio("AAPL", 100, 145.00)
conservative_investor.add_to_portfolio("GOOGL", 10, 2750.00)
conservative_investor.set_price_alert("AAPL", 140.00)
aggressive_investor.add_to_portfolio("TSLA", 50, 780.00)
aggressive_investor.set_price_alert("TSLA", 750.00)
# 为交易系统添加交易规则
trading_system.add_trading_rule(
"buy_dip_apple",
"BUY",
stock_symbol="AAPL",
max_change_percentage=-3.0,
quantity=50
)
trading_system.add_trading_rule(
"sell_high_tesla",
"SELL",
stock_symbol="TSLA",
min_change_percentage=5.0,
quantity=25
)
# 注册观察者
apple_stock.attach(conservative_investor)
apple_stock.attach(aggressive_investor)
apple_stock.attach(trading_system)
google_stock.attach(conservative_investor)
google_stock.attach(trading_system)
tesla_stock.attach(aggressive_investor)
tesla_stock.attach(trading_system)
print("\n=== 开始市场模拟 ===")
# 手动触发一些价格变化进行演示
print("\n--- 手动价格变化演示 ---")
apple_stock.set_price(145.50) # 小幅下跌
time.sleep(1)
tesla_stock.set_price(840.00) # 上涨5%
time.sleep(1)
apple_stock.set_price(140.00) # 触发警报
time.sleep(1)
google_stock.set_price(2720.00) # 下跌约3%
time.sleep(1)
# 显示投资组合价值
print("\n=== 投资组合价值 ===")
current_prices = market.get_current_prices()
conservative_portfolio = conservative_investor.get_portfolio_value(current_prices)
print(f"\n{conservative_portfolio['investor_name']} 的投资组合:")
print(f"总价值: ${conservative_portfolio['total_value']:.2f}")
print(f"总成本: ${conservative_portfolio['total_cost']:.2f}")
print(f"盈亏: ${conservative_portfolio['total_profit_loss']:.2f} ({conservative_portfolio['total_profit_loss_percentage']:.2f}%)")
aggressive_portfolio = aggressive_investor.get_portfolio_value(current_prices)
print(f"\n{aggressive_portfolio['investor_name']} 的投资组合:")
print(f"总价值: ${aggressive_portfolio['total_value']:.2f}")
print(f"总成本: ${aggressive_portfolio['total_cost']:.2f}")
print(f"盈亏: ${aggressive_portfolio['total_profit_loss']:.2f} ({aggressive_portfolio['total_profit_loss_percentage']:.2f}%)")
# 显示交易历史
print("\n=== 自动交易历史 ===")
trades = trading_system.get_trade_history()
for i, trade in enumerate(trades, 1):
print(f"{i}. {trade['timestamp'].strftime('%H:%M:%S')} - {trade['action']} {trade['quantity']} {trade['stock_symbol']} @ ${trade['price']:.2f}")
# 移除观察者演示
print("\n--- 移除观察者演示 ---")
apple_stock.detach(aggressive_investor)
apple_stock.set_price(142.00) # 李四不会收到通知
# 显示市场摘要
print("\n=== 市场摘要 ===")
market_summary = market.get_market_summary()
print(f"股票总数: {market_summary['total_stocks']}")
print(f"市场状态: {market_summary['market_status']}")
for stock_info in market_summary['stocks']:
print(f"{stock_info['name']}({stock_info['symbol']}): ${stock_info['current_price']:.2f} ({stock_info['change_percentage']:+.2f}%) - {stock_info['observer_count']} 个观察者")
if __name__ == "__main__":
demonstrate_observer_pattern()
9.2.4 观察者模式的优缺点
优点: 1. 松耦合:主题和观察者之间松耦合,可以独立变化 2. 动态关系:可以在运行时建立对象间的联系 3. 广播通信:支持一对多的通信机制 4. 符合开闭原则:增加新的观察者不需要修改主题
缺点: 1. 性能问题:观察者过多时通知会影响性能 2. 内存泄漏:观察者没有正确移除可能导致内存泄漏 3. 循环依赖:观察者和主题间可能形成循环依赖 4. 调试困难:间接调用使得调试变得复杂
9.2.5 适用场景
- 事件系统:GUI事件处理、游戏事件系统
- MVC架构:模型变化时通知视图更新
- 发布-订阅系统:消息队列、事件总线
- 数据绑定:数据变化时自动更新UI
9.3 命令模式(Command Pattern)
9.3.1 模式定义与动机
定义: 命令模式将一个请求封装为一个对象,从而使你可用不同的请求对客户进行参数化;对请求排队或记录请求日志,以及支持可撤销的操作。
动机: - 需要将请求调用者和接收者解耦 - 需要支持撤销/重做操作 - 需要支持日志记录和事务处理 - 需要支持宏命令(组合命令)
9.3.2 模式结构
命令模式包含以下角色:
- Command(抽象命令):定义执行操作的接口
- ConcreteCommand(具体命令):实现具体的操作,绑定接收者
- Receiver(接收者):执行具体操作的对象
- Invoker(调用者):调用命令对象执行请求
- Client(客户端):创建具体命令对象并设置接收者
9.3.3 Python实现示例:文本编辑器
from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from datetime import datetime
from enum import Enum
import copy
# 命令接口
class Command(ABC):
"""命令抽象基类"""
@abstractmethod
def execute(self) -> bool:
"""执行命令"""
pass
@abstractmethod
def undo(self) -> bool:
"""撤销命令"""
pass
@abstractmethod
def get_description(self) -> str:
"""获取命令描述"""
pass
def can_merge_with(self, other: 'Command') -> bool:
"""是否可以与其他命令合并"""
return False
def merge_with(self, other: 'Command') -> 'Command':
"""与其他命令合并"""
raise NotImplementedError("此命令不支持合并")
# 接收者:文本文档
class TextDocument:
"""文本文档类(接收者)"""
def __init__(self, initial_content: str = ""):
self._content = initial_content
self._cursor_position = len(initial_content)
self._selection_start: Optional[int] = None
self._selection_end: Optional[int] = None
self._modification_count = 0
self._last_modified = datetime.now()
@property
def content(self) -> str:
return self._content
@property
def cursor_position(self) -> int:
return self._cursor_position
@property
def length(self) -> int:
return len(self._content)
@property
def has_selection(self) -> bool:
return self._selection_start is not None and self._selection_end is not None
def get_selected_text(self) -> str:
"""获取选中的文本"""
if not self.has_selection:
return ""
start = min(self._selection_start, self._selection_end)
end = max(self._selection_start, self._selection_end)
return self._content[start:end]
def set_cursor_position(self, position: int) -> None:
"""设置光标位置"""
self._cursor_position = max(0, min(position, len(self._content)))
self._clear_selection()
def set_selection(self, start: int, end: int) -> None:
"""设置选择范围"""
self._selection_start = max(0, min(start, len(self._content)))
self._selection_end = max(0, min(end, len(self._content)))
self._cursor_position = self._selection_end
def _clear_selection(self) -> None:
"""清除选择"""
self._selection_start = None
self._selection_end = None
def insert_text(self, position: int, text: str) -> None:
"""在指定位置插入文本"""
position = max(0, min(position, len(self._content)))
self._content = self._content[:position] + text + self._content[position:]
self._cursor_position = position + len(text)
self._clear_selection()
self._mark_modified()
def delete_text(self, start: int, end: int) -> str:
"""删除指定范围的文本,返回被删除的文本"""
start = max(0, min(start, len(self._content)))
end = max(start, min(end, len(self._content)))
deleted_text = self._content[start:end]
self._content = self._content[:start] + self._content[end:]
self._cursor_position = start
self._clear_selection()
self._mark_modified()
return deleted_text
def replace_text(self, start: int, end: int, new_text: str) -> str:
"""替换指定范围的文本,返回被替换的文本"""
old_text = self.delete_text(start, end)
self.insert_text(start, new_text)
return old_text
def _mark_modified(self) -> None:
"""标记文档已修改"""
self._modification_count += 1
self._last_modified = datetime.now()
def get_document_info(self) -> Dict[str, Any]:
"""获取文档信息"""
return {
"length": len(self._content),
"cursor_position": self._cursor_position,
"has_selection": self.has_selection,
"selected_text": self.get_selected_text(),
"modification_count": self._modification_count,
"last_modified": self._last_modified
}
def get_preview(self, max_length: int = 50) -> str:
"""获取文档预览"""
if len(self._content) <= max_length:
return self._content
return self._content[:max_length] + "..."
# 具体命令:插入文本
class InsertTextCommand(Command):
"""插入文本命令"""
def __init__(self, document: TextDocument, position: int, text: str):
self._document = document
self._position = position
self._text = text
self._executed = False
def execute(self) -> bool:
"""执行插入操作"""
try:
self._document.insert_text(self._position, self._text)
self._executed = True
return True
except Exception as e:
print(f"插入文本失败: {e}")
return False
def undo(self) -> bool:
"""撤销插入操作"""
if not self._executed:
return False
try:
self._document.delete_text(self._position, self._position + len(self._text))
self._executed = False
return True
except Exception as e:
print(f"撤销插入失败: {e}")
return False
def get_description(self) -> str:
preview = self._text[:20] + "..." if len(self._text) > 20 else self._text
return f"插入文本 '{preview}' 在位置 {self._position}"
def can_merge_with(self, other: Command) -> bool:
"""检查是否可以与其他插入命令合并"""
if not isinstance(other, InsertTextCommand):
return False
# 只有连续的插入操作才能合并
return (other._position == self._position + len(self._text) and
len(self._text) < 100 and len(other._text) < 100)
def merge_with(self, other: Command) -> Command:
"""与其他插入命令合并"""
if not self.can_merge_with(other):
raise ValueError("无法合并这两个命令")
merged_text = self._text + other._text
return InsertTextCommand(self._document, self._position, merged_text)
# 具体命令:删除文本
class DeleteTextCommand(Command):
"""删除文本命令"""
def __init__(self, document: TextDocument, start: int, end: int):
self._document = document
self._start = start
self._end = end
self._deleted_text: Optional[str] = None
self._executed = False
def execute(self) -> bool:
"""执行删除操作"""
try:
self._deleted_text = self._document.delete_text(self._start, self._end)
self._executed = True
return True
except Exception as e:
print(f"删除文本失败: {e}")
return False
def undo(self) -> bool:
"""撤销删除操作"""
if not self._executed or self._deleted_text is None:
return False
try:
self._document.insert_text(self._start, self._deleted_text)
self._executed = False
return True
except Exception as e:
print(f"撤销删除失败: {e}")
return False
def get_description(self) -> str:
if self._deleted_text:
preview = self._deleted_text[:20] + "..." if len(self._deleted_text) > 20 else self._deleted_text
return f"删除文本 '{preview}' 从位置 {self._start} 到 {self._end}"
return f"删除文本从位置 {self._start} 到 {self._end}"
# 具体命令:替换文本
class ReplaceTextCommand(Command):
"""替换文本命令"""
def __init__(self, document: TextDocument, start: int, end: int, new_text: str):
self._document = document
self._start = start
self._end = end
self._new_text = new_text
self._old_text: Optional[str] = None
self._executed = False
def execute(self) -> bool:
"""执行替换操作"""
try:
self._old_text = self._document.replace_text(self._start, self._end, self._new_text)
self._executed = True
return True
except Exception as e:
print(f"替换文本失败: {e}")
return False
def undo(self) -> bool:
"""撤销替换操作"""
if not self._executed or self._old_text is None:
return False
try:
self._document.replace_text(self._start, self._start + len(self._new_text), self._old_text)
self._executed = False
return True
except Exception as e:
print(f"撤销替换失败: {e}")
return False
def get_description(self) -> str:
old_preview = self._old_text[:15] + "..." if self._old_text and len(self._old_text) > 15 else (self._old_text or "")
new_preview = self._new_text[:15] + "..." if len(self._new_text) > 15 else self._new_text
return f"替换 '{old_preview}' 为 '{new_preview}'"
# 宏命令:组合命令
class MacroCommand(Command):
"""宏命令(组合命令)"""
def __init__(self, name: str, commands: List[Command] = None):
self._name = name
self._commands: List[Command] = commands or []
self._executed_commands: List[Command] = []
def add_command(self, command: Command) -> None:
"""添加命令"""
self._commands.append(command)
def execute(self) -> bool:
"""执行所有命令"""
self._executed_commands.clear()
for command in self._commands:
if command.execute():
self._executed_commands.append(command)
else:
# 如果某个命令执行失败,撤销已执行的命令
self._undo_executed_commands()
return False
return True
def undo(self) -> bool:
"""撤销所有已执行的命令"""
return self._undo_executed_commands()
def _undo_executed_commands(self) -> bool:
"""撤销已执行的命令"""
success = True
# 逆序撤销
for command in reversed(self._executed_commands):
if not command.undo():
success = False
self._executed_commands.clear()
return success
def get_description(self) -> str:
return f"宏命令 '{self._name}' ({len(self._commands)} 个子命令)"
def get_command_count(self) -> int:
return len(self._commands)
# 调用者:命令管理器
class CommandManager:
"""命令管理器(调用者)"""
def __init__(self, max_history_size: int = 100):
self._history: List[Command] = []
self._current_index = -1
self._max_history_size = max_history_size
self._command_merging_enabled = True
def execute_command(self, command: Command) -> bool:
"""执行命令"""
# 尝试与上一个命令合并
if (self._command_merging_enabled and
self._current_index >= 0 and
self._current_index < len(self._history)):
last_command = self._history[self._current_index]
if last_command.can_merge_with(command):
try:
merged_command = last_command.merge_with(command)
# 撤销上一个命令
last_command.undo()
# 执行合并后的命令
if merged_command.execute():
self._history[self._current_index] = merged_command
return True
else:
# 如果合并命令执行失败,重新执行原命令
last_command.execute()
return False
except Exception:
# 合并失败,继续正常执行
pass
# 执行命令
if not command.execute():
return False
# 清除当前位置之后的历史记录
self._history = self._history[:self._current_index + 1]
# 添加到历史记录
self._history.append(command)
self._current_index += 1
# 限制历史记录大小
if len(self._history) > self._max_history_size:
self._history.pop(0)
self._current_index -= 1
return True
def undo(self) -> bool:
"""撤销操作"""
if not self.can_undo():
return False
command = self._history[self._current_index]
if command.undo():
self._current_index -= 1
return True
return False
def redo(self) -> bool:
"""重做操作"""
if not self.can_redo():
return False
command = self._history[self._current_index + 1]
if command.execute():
self._current_index += 1
return True
return False
def can_undo(self) -> bool:
"""是否可以撤销"""
return self._current_index >= 0
def can_redo(self) -> bool:
"""是否可以重做"""
return self._current_index < len(self._history) - 1
def get_history(self) -> List[str]:
"""获取命令历史描述"""
history = []
for i, command in enumerate(self._history):
prefix = "→ " if i == self._current_index else " "
history.append(f"{prefix}{command.get_description()}")
return history
def clear_history(self) -> None:
"""清除历史记录"""
self._history.clear()
self._current_index = -1
def enable_command_merging(self) -> None:
"""启用命令合并"""
self._command_merging_enabled = True
def disable_command_merging(self) -> None:
"""禁用命令合并"""
self._command_merging_enabled = False
def get_statistics(self) -> Dict[str, Any]:
"""获取统计信息"""
return {
"total_commands": len(self._history),
"current_position": self._current_index + 1,
"can_undo": self.can_undo(),
"can_redo": self.can_redo(),
"merging_enabled": self._command_merging_enabled,
"max_history_size": self._max_history_size
}
# 文本编辑器类
class TextEditor:
"""文本编辑器"""
def __init__(self):
self._document = TextDocument()
self._command_manager = CommandManager()
self._clipboard = ""
def insert_text(self, text: str) -> bool:
"""插入文本"""
command = InsertTextCommand(self._document, self._document.cursor_position, text)
return self._command_manager.execute_command(command)
def delete_selection(self) -> bool:
"""删除选中的文本"""
if not self._document.has_selection:
return False
selected_text = self._document.get_selected_text()
start = min(self._document._selection_start, self._document._selection_end)
end = max(self._document._selection_start, self._document._selection_end)
command = DeleteTextCommand(self._document, start, end)
return self._command_manager.execute_command(command)
def replace_selection(self, new_text: str) -> bool:
"""替换选中的文本"""
if not self._document.has_selection:
return self.insert_text(new_text)
start = min(self._document._selection_start, self._document._selection_end)
end = max(self._document._selection_start, self._document._selection_end)
command = ReplaceTextCommand(self._document, start, end, new_text)
return self._command_manager.execute_command(command)
def cut(self) -> bool:
"""剪切"""
if not self._document.has_selection:
return False
self._clipboard = self._document.get_selected_text()
return self.delete_selection()
def copy(self) -> bool:
"""复制"""
if not self._document.has_selection:
return False
self._clipboard = self._document.get_selected_text()
return True
def paste(self) -> bool:
"""粘贴"""
if not self._clipboard:
return False
if self._document.has_selection:
return self.replace_selection(self._clipboard)
else:
return self.insert_text(self._clipboard)
def undo(self) -> bool:
"""撤销"""
return self._command_manager.undo()
def redo(self) -> bool:
"""重做"""
return self._command_manager.redo()
def select_all(self) -> None:
"""全选"""
self._document.set_selection(0, self._document.length)
def set_cursor_position(self, position: int) -> None:
"""设置光标位置"""
self._document.set_cursor_position(position)
def set_selection(self, start: int, end: int) -> None:
"""设置选择范围"""
self._document.set_selection(start, end)
def create_macro(self, name: str, commands: List[str]) -> MacroCommand:
"""创建宏命令"""
macro = MacroCommand(name)
for cmd_str in commands:
if cmd_str.startswith("insert:"):
text = cmd_str[7:] # 移除 "insert:" 前缀
command = InsertTextCommand(self._document, self._document.cursor_position, text)
macro.add_command(command)
# 可以添加更多命令类型的解析
return macro
def execute_macro(self, macro: MacroCommand) -> bool:
"""执行宏命令"""
return self._command_manager.execute_command(macro)
def get_content(self) -> str:
"""获取文档内容"""
return self._document.content
def get_document_info(self) -> Dict[str, Any]:
"""获取文档信息"""
return self._document.get_document_info()
def get_command_history(self) -> List[str]:
"""获取命令历史"""
return self._command_manager.get_history()
def get_statistics(self) -> Dict[str, Any]:
"""获取编辑器统计信息"""
doc_info = self.get_document_info()
cmd_stats = self._command_manager.get_statistics()
return {
"document": doc_info,
"commands": cmd_stats,
"clipboard_length": len(self._clipboard)
}
# 使用示例
def demonstrate_command_pattern():
"""演示命令模式"""
print("=== 命令模式演示:文本编辑器 ===")
# 创建文本编辑器
editor = TextEditor()
print("\n--- 基本编辑操作 ---")
# 插入文本
editor.insert_text("Hello, ")
print(f"插入后: '{editor.get_content()}'")
editor.insert_text("World!")
print(f"再次插入后: '{editor.get_content()}'")
# 选择文本并替换
editor.set_selection(7, 12) # 选择 "World"
print(f"选中文本: '{editor._document.get_selected_text()}'")
editor.replace_selection("Python")
print(f"替换后: '{editor.get_content()}'")
# 添加更多文本
editor.set_cursor_position(len(editor.get_content()))
editor.insert_text(" Programming is fun!")
print(f"添加文本后: '{editor.get_content()}'")
print("\n--- 撤销/重做操作 ---")
# 显示命令历史
print("命令历史:")
for i, cmd in enumerate(editor.get_command_history(), 1):
print(f" {i}. {cmd}")
# 撤销操作
print("\n执行撤销操作:")
for i in range(3):
if editor.undo():
print(f"撤销 {i+1}: '{editor.get_content()}'")
else:
print(f"无法继续撤销")
break
# 重做操作
print("\n执行重做操作:")
for i in range(2):
if editor.redo():
print(f"重做 {i+1}: '{editor.get_content()}'")
else:
print(f"无法继续重做")
break
print("\n--- 剪切、复制、粘贴操作 ---")
# 选择并复制
editor.set_selection(0, 5) # 选择 "Hello"
editor.copy()
print(f"复制文本: '{editor._clipboard}'")
# 移动光标到末尾并粘贴
editor.set_cursor_position(len(editor.get_content()))
editor.paste()
print(f"粘贴后: '{editor.get_content()}'")
# 选择并剪切
editor.set_selection(len(editor.get_content()) - 5, len(editor.get_content())) # 选择最后的 "Hello"
editor.cut()
print(f"剪切后: '{editor.get_content()}'")
print(f"剪贴板: '{editor._clipboard}'")
print("\n--- 宏命令演示 ---")
# 创建宏命令
macro_commands = [
"insert: Welcome to ",
"insert:Python ",
"insert:programming!"
]
macro = editor.create_macro("欢迎消息", macro_commands)
print(f"创建宏命令: {macro.get_description()}")
# 执行宏命令
editor.execute_macro(macro)
print(f"执行宏命令后: '{editor.get_content()}'")
# 撤销宏命令(一次撤销整个宏)
editor.undo()
print(f"撤销宏命令后: '{editor.get_content()}'")
print("\n--- 编辑器统计信息 ---")
stats = editor.get_statistics()
print(f"文档长度: {stats['document']['length']}")
print(f"光标位置: {stats['document']['cursor_position']}")
print(f"修改次数: {stats['document']['modification_count']}")
print(f"命令总数: {stats['commands']['total_commands']}")
print(f"当前位置: {stats['commands']['current_position']}")
print(f"可撤销: {stats['commands']['can_undo']}")
print(f"可重做: {stats['commands']['can_redo']}")
print("\n--- 最终命令历史 ---")
for i, cmd in enumerate(editor.get_command_history(), 1):
print(f" {i}. {cmd}")
if __name__ == "__main__":
demonstrate_command_pattern()
9.3.4 Java实现示例:图形编辑器
import java.util.*;
import java.time.LocalDateTime;
// 命令接口
interface Command {
boolean execute();
boolean undo();
String getDescription();
default boolean canMergeWith(Command other) {
return false;
}
default Command mergeWith(Command other) {
throw new UnsupportedOperationException("此命令不支持合并");
}
}
// 接收者:图形对象
class Shape {
private String id;
private String type;
private double x, y;
private double width, height;
private String color;
private boolean visible;
public Shape(String id, String type, double x, double y, double width, double height) {
this.id = id;
this.type = type;
this.x = x;
this.y = y;
this.width = width;
this.height = height;
this.color = "black";
this.visible = true;
}
// Getters and Setters
public String getId() { return id; }
public String getType() { return type; }
public double getX() { return x; }
public void setX(double x) { this.x = x; }
public double getY() { return y; }
public void setY(double y) { this.y = y; }
public double getWidth() { return width; }
public void setWidth(double width) { this.width = width; }
public double getHeight() { return height; }
public void setHeight(double height) { this.height = height; }
public String getColor() { return color; }
public void setColor(String color) { this.color = color; }
public boolean isVisible() { return visible; }
public void setVisible(boolean visible) { this.visible = visible; }
public void move(double deltaX, double deltaY) {
this.x += deltaX;
this.y += deltaY;
}
public void resize(double newWidth, double newHeight) {
this.width = newWidth;
this.height = newHeight;
}
@Override
public String toString() {
return String.format("%s[%s] at (%.1f,%.1f) size %.1fx%.1f color=%s visible=%s",
type, id, x, y, width, height, color, visible);
}
public Shape clone() {
Shape cloned = new Shape(id + "_copy", type, x, y, width, height);
cloned.setColor(color);
cloned.setVisible(visible);
return cloned;
}
}
// 接收者:画布
class Canvas {
private Map<String, Shape> shapes;
private List<String> layerOrder;
public Canvas() {
this.shapes = new HashMap<>();
this.layerOrder = new ArrayList<>();
}
public void addShape(Shape shape) {
shapes.put(shape.getId(), shape);
layerOrder.add(shape.getId());
}
public Shape removeShape(String shapeId) {
Shape removed = shapes.remove(shapeId);
layerOrder.remove(shapeId);
return removed;
}
public Shape getShape(String shapeId) {
return shapes.get(shapeId);
}
public List<Shape> getAllShapes() {
return layerOrder.stream()
.map(shapes::get)
.filter(Objects::nonNull)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
}
public void clear() {
shapes.clear();
layerOrder.clear();
}
public int getShapeCount() {
return shapes.size();
}
public void printCanvas() {
System.out.println("画布内容:");
if (shapes.isEmpty()) {
System.out.println(" (空画布)");
} else {
for (String shapeId : layerOrder) {
Shape shape = shapes.get(shapeId);
if (shape != null) {
System.out.println(" " + shape);
}
}
}
}
}
// 具体命令:创建图形
class CreateShapeCommand implements Command {
private Canvas canvas;
private Shape shape;
private boolean executed;
public CreateShapeCommand(Canvas canvas, Shape shape) {
this.canvas = canvas;
this.shape = shape;
this.executed = false;
}
@Override
public boolean execute() {
try {
canvas.addShape(shape);
executed = true;
return true;
} catch (Exception e) {
System.err.println("创建图形失败: " + e.getMessage());
return false;
}
}
@Override
public boolean undo() {
if (!executed) return false;
try {
canvas.removeShape(shape.getId());
executed = false;
return true;
} catch (Exception e) {
System.err.println("撤销创建失败: " + e.getMessage());
return false;
}
}
@Override
public String getDescription() {
return "创建 " + shape.getType() + " [" + shape.getId() + "]";
}
}
// 具体命令:删除图形
class DeleteShapeCommand implements Command {
private Canvas canvas;
private String shapeId;
private Shape deletedShape;
private boolean executed;
public DeleteShapeCommand(Canvas canvas, String shapeId) {
this.canvas = canvas;
this.shapeId = shapeId;
this.executed = false;
}
@Override
public boolean execute() {
try {
deletedShape = canvas.removeShape(shapeId);
executed = (deletedShape != null);
return executed;
} catch (Exception e) {
System.err.println("删除图形失败: " + e.getMessage());
return false;
}
}
@Override
public boolean undo() {
if (!executed || deletedShape == null) return false;
try {
canvas.addShape(deletedShape);
executed = false;
return true;
} catch (Exception e) {
System.err.println("撤销删除失败: " + e.getMessage());
return false;
}
}
@Override
public String getDescription() {
return "删除图形 [" + shapeId + "]";
}
}
// 具体命令:移动图形
class MoveShapeCommand implements Command {
private Canvas canvas;
private String shapeId;
private double deltaX, deltaY;
private boolean executed;
public MoveShapeCommand(Canvas canvas, String shapeId, double deltaX, double deltaY) {
this.canvas = canvas;
this.shapeId = shapeId;
this.deltaX = deltaX;
this.deltaY = deltaY;
this.executed = false;
}
@Override
public boolean execute() {
Shape shape = canvas.getShape(shapeId);
if (shape == null) return false;
try {
shape.move(deltaX, deltaY);
executed = true;
return true;
} catch (Exception e) {
System.err.println("移动图形失败: " + e.getMessage());
return false;
}
}
@Override
public boolean undo() {
if (!executed) return false;
Shape shape = canvas.getShape(shapeId);
if (shape == null) return false;
try {
shape.move(-deltaX, -deltaY);
executed = false;
return true;
} catch (Exception e) {
System.err.println("撤销移动失败: " + e.getMessage());
return false;
}
}
@Override
public String getDescription() {
return String.format("移动 [%s] (%.1f, %.1f)", shapeId, deltaX, deltaY);
}
@Override
public boolean canMergeWith(Command other) {
if (!(other instanceof MoveShapeCommand)) return false;
MoveShapeCommand otherMove = (MoveShapeCommand) other;
return this.shapeId.equals(otherMove.shapeId);
}
@Override
public Command mergeWith(Command other) {
if (!canMergeWith(other)) {
throw new IllegalArgumentException("无法合并这两个命令");
}
MoveShapeCommand otherMove = (MoveShapeCommand) other;
return new MoveShapeCommand(canvas, shapeId,
deltaX + otherMove.deltaX, deltaY + otherMove.deltaY);
}
}
// 宏命令
class MacroCommand implements Command {
private String name;
private List<Command> commands;
private List<Command> executedCommands;
public MacroCommand(String name) {
this.name = name;
this.commands = new ArrayList<>();
this.executedCommands = new ArrayList<>();
}
public void addCommand(Command command) {
commands.add(command);
}
@Override
public boolean execute() {
executedCommands.clear();
for (Command command : commands) {
if (command.execute()) {
executedCommands.add(command);
} else {
// 执行失败,撤销已执行的命令
undoExecutedCommands();
return false;
}
}
return true;
}
@Override
public boolean undo() {
return undoExecutedCommands();
}
private boolean undoExecutedCommands() {
boolean success = true;
// 逆序撤销
for (int i = executedCommands.size() - 1; i >= 0; i--) {
if (!executedCommands.get(i).undo()) {
success = false;
}
}
executedCommands.clear();
return success;
}
@Override
public String getDescription() {
return "宏命令 '" + name + "' (" + commands.size() + " 个子命令)";
}
}
// 命令管理器
class CommandManager {
private List<Command> history;
private int currentIndex;
private int maxHistorySize;
private boolean mergingEnabled;
public CommandManager(int maxHistorySize) {
this.history = new ArrayList<>();
this.currentIndex = -1;
this.maxHistorySize = maxHistorySize;
this.mergingEnabled = true;
}
public boolean executeCommand(Command command) {
// 尝试与上一个命令合并
if (mergingEnabled && currentIndex >= 0 && currentIndex < history.size()) {
Command lastCommand = history.get(currentIndex);
if (lastCommand.canMergeWith(command)) {
try {
Command mergedCommand = lastCommand.mergeWith(command);
// 撤销上一个命令
lastCommand.undo();
// 执行合并后的命令
if (mergedCommand.execute()) {
history.set(currentIndex, mergedCommand);
return true;
} else {
// 合并命令执行失败,重新执行原命令
lastCommand.execute();
return false;
}
} catch (Exception e) {
// 合并失败,继续正常执行
}
}
}
// 执行命令
if (!command.execute()) {
return false;
}
// 清除当前位置之后的历史记录
if (currentIndex < history.size() - 1) {
history.subList(currentIndex + 1, history.size()).clear();
}
// 添加到历史记录
history.add(command);
currentIndex++;
// 限制历史记录大小
if (history.size() > maxHistorySize) {
history.remove(0);
currentIndex--;
}
return true;
}
public boolean undo() {
if (!canUndo()) return false;
Command command = history.get(currentIndex);
if (command.undo()) {
currentIndex--;
return true;
}
return false;
}
public boolean redo() {
if (!canRedo()) return false;
Command command = history.get(currentIndex + 1);
if (command.execute()) {
currentIndex++;
return true;
}
return false;
}
public boolean canUndo() {
return currentIndex >= 0;
}
public boolean canRedo() {
return currentIndex < history.size() - 1;
}
public List<String> getHistory() {
List<String> result = new ArrayList<>();
for (int i = 0; i < history.size(); i++) {
String prefix = (i == currentIndex) ? "→ " : " ";
result.add(prefix + history.get(i).getDescription());
}
return result;
}
public void clearHistory() {
history.clear();
currentIndex = -1;
}
public void setMergingEnabled(boolean enabled) {
this.mergingEnabled = enabled;
}
}
// 图形编辑器
class GraphicsEditor {
private Canvas canvas;
private CommandManager commandManager;
public GraphicsEditor() {
this.canvas = new Canvas();
this.commandManager = new CommandManager(50);
}
public boolean createShape(String id, String type, double x, double y, double width, double height) {
Shape shape = new Shape(id, type, x, y, width, height);
Command command = new CreateShapeCommand(canvas, shape);
return commandManager.executeCommand(command);
}
public boolean deleteShape(String shapeId) {
Command command = new DeleteShapeCommand(canvas, shapeId);
return commandManager.executeCommand(command);
}
public boolean moveShape(String shapeId, double deltaX, double deltaY) {
Command command = new MoveShapeCommand(canvas, shapeId, deltaX, deltaY);
return commandManager.executeCommand(command);
}
public boolean undo() {
return commandManager.undo();
}
public boolean redo() {
return commandManager.redo();
}
public MacroCommand createMacro(String name) {
return new MacroCommand(name);
}
public boolean executeMacro(MacroCommand macro) {
return commandManager.executeCommand(macro);
}
public void printCanvas() {
canvas.printCanvas();
}
public void printHistory() {
System.out.println("命令历史:");
List<String> history = commandManager.getHistory();
for (int i = 0; i < history.size(); i++) {
System.out.println(" " + (i + 1) + ". " + history.get(i));
}
}
public Canvas getCanvas() {
return canvas;
}
}
// 演示类
public class CommandPatternDemo {
public static void main(String[] args) {
System.out.println("=== 命令模式演示:图形编辑器 ===");
GraphicsEditor editor = new GraphicsEditor();
System.out.println("\n--- 创建图形 ---");
editor.createShape("rect1", "矩形", 10, 10, 100, 50);
editor.createShape("circle1", "圆形", 150, 20, 60, 60);
editor.createShape("line1", "直线", 50, 100, 200, 2);
editor.printCanvas();
System.out.println("\n--- 移动图形 ---");
editor.moveShape("rect1", 20, 30);
editor.moveShape("circle1", -10, 15);
editor.printCanvas();
System.out.println("\n--- 删除图形 ---");
editor.deleteShape("line1");
editor.printCanvas();
System.out.println("\n--- 撤销操作 ---");
editor.printHistory();
System.out.println("\n执行撤销:");
for (int i = 0; i < 3; i++) {
if (editor.undo()) {
System.out.println("撤销 " + (i + 1) + ":");
editor.printCanvas();
} else {
System.out.println("无法继续撤销");
break;
}
}
System.out.println("\n--- 重做操作 ---");
for (int i = 0; i < 2; i++) {
if (editor.redo()) {
System.out.println("重做 " + (i + 1) + ":");
editor.printCanvas();
} else {
System.out.println("无法继续重做");
break;
}
}
System.out.println("\n--- 宏命令演示 ---");
MacroCommand createShapesMacro = editor.createMacro("创建多个图形");
createShapesMacro.addCommand(new CreateShapeCommand(editor.getCanvas(),
new Shape("rect2", "矩形", 200, 200, 80, 40)));
createShapesMacro.addCommand(new CreateShapeCommand(editor.getCanvas(),
new Shape("circle2", "圆形", 300, 220, 50, 50)));
createShapesMacro.addCommand(new MoveShapeCommand(editor.getCanvas(), "rect2", 10, -10));
System.out.println("执行宏命令: " + createShapesMacro.getDescription());
editor.executeMacro(createShapesMacro);
editor.printCanvas();
System.out.println("\n撤销宏命令:");
editor.undo();
editor.printCanvas();
System.out.println("\n--- 最终命令历史 ---");
editor.printHistory();
}
}
9.3.5 命令模式的优缺点
优点: 1. 解耦调用者和接收者:调用者不需要知道接收者的具体实现 2. 支持撤销/重做:命令对象可以存储状态信息,支持撤销操作 3. 支持宏命令:可以将多个命令组合成复合命令 4. 支持日志和事务:可以记录命令执行历史,支持事务回滚 5. 支持队列操作:命令可以被存储、排队、延迟执行
缺点: 1. 类数量增加:每个具体操作都需要一个命令类 2. 内存开销:需要存储命令历史和状态信息 3. 复杂性增加:简单操作也需要通过命令模式实现 4. 性能影响:额外的对象创建和方法调用开销
9.3.6 适用场景
- 撤销/重做功能:文本编辑器、图形编辑器
- 宏命令:批处理操作、脚本执行
- 队列操作:任务调度、异步处理
- 日志记录:操作审计、事务管理
- 参数化对象:需要用不同请求参数化对象
9.4 观察者模式与命令模式对比分析
9.4.1 相同点
- 行为型模式:都属于行为型设计模式
- 解耦设计:都能够降低对象间的耦合度
- 动态性:都支持运行时的动态配置
- 扩展性:都符合开闭原则,易于扩展
9.4.2 不同点
对比维度 | 观察者模式 | 命令模式 |
---|---|---|
主要目的 | 对象间一对多依赖关系 | 请求封装和参数化 |
通信方式 | 推送通知(广播) | 拉取执行(调用) |
时间特性 | 实时响应 | 可延迟执行 |
状态管理 | 主题维护状态 | 命令封装状态 |
撤销支持 | 不直接支持 | 天然支持 |
执行方式 | 自动触发 | 手动调用 |
应用场景 | 事件系统、数据绑定 | 撤销重做、宏操作 |
9.4.3 选择指南
选择观察者模式的情况: - 需要实现事件驱动的架构 - 对象状态变化需要通知多个依赖对象 - 需要实现发布-订阅机制 - 希望实现松耦合的通信机制
选择命令模式的情况: - 需要支持撤销/重做操作 - 需要实现宏命令或批处理 - 需要对请求进行排队、记录或延迟执行 - 需要将调用者与接收者完全解耦
9.4.4 组合使用
观察者模式和命令模式可以结合使用:
class CommandExecutedEvent:
"""命令执行事件"""
def __init__(self, command, success, timestamp):
self.command = command
self.success = success
self.timestamp = timestamp
class CommandManagerWithObserver(Subject):
"""支持观察者的命令管理器"""
def __init__(self):
super().__init__()
self._command_manager = CommandManager()
def execute_command(self, command):
"""执行命令并通知观察者"""
success = self._command_manager.execute_command(command)
# 通知观察者命令执行结果
event = CommandExecutedEvent(command, success, datetime.now())
self.notify(event=event)
return success
def undo(self):
"""撤销命令并通知观察者"""
success = self._command_manager.undo()
if success:
event = CommandExecutedEvent("UNDO", success, datetime.now())
self.notify(event=event)
return success
class CommandLogger(Observer):
"""命令日志记录器"""
def __init__(self):
self._logs = []
def get_observer_id(self):
return "CommandLogger"
def update(self, subject, *args, **kwargs):
event = kwargs.get('event')
if event:
log_entry = f"{event.timestamp}: {event.command} - {'成功' if event.success else '失败'}"
self._logs.append(log_entry)
print(f"[日志] {log_entry}")
9.5 本章总结
9.5.1 核心概念回顾
观察者模式: - 定义对象间一对多的依赖关系 - 当主题状态改变时,自动通知所有观察者 - 实现了发布-订阅机制 - 支持动态添加和移除观察者
命令模式: - 将请求封装为对象 - 支持撤销/重做操作 - 可以实现宏命令和队列操作 - 完全解耦调用者和接收者
9.5.2 最佳实践
观察者模式最佳实践:
- 避免观察者过多导致性能问题
- 正确管理观察者的生命周期,防止内存泄漏
- 使用弱引用避免循环依赖
- 考虑异步通知机制
命令模式最佳实践:
- 合理设计命令的粒度
- 实现命令合并机制减少内存使用
- 限制命令历史的大小
- 考虑命令的序列化和持久化
9.5.3 实际应用建议
技术选型:
- 根据具体需求选择合适的模式
- 考虑模式的组合使用
- 评估性能和内存影响
设计决策:
- 明确对象间的关系和职责
- 考虑系统的扩展性和维护性
- 平衡灵活性和复杂性
9.5.4 注意事项
观察者模式注意事项:
- 避免观察者链过长
- 处理观察者异常,避免影响其他观察者
- 考虑线程安全问题
命令模式注意事项:
- 避免命令类过多导致的类爆炸
- 合理管理命令历史的内存使用
- 考虑命令执行失败的处理策略
9.6 练习题
9.6.1 基础练习
观察者模式练习:
- 实现一个简单的新闻发布系统,支持订阅者订阅不同类型的新闻
- 实现一个温度监控系统,当温度超过阈值时通知多个设备
命令模式练习:
- 实现一个简单的计算器,支持撤销/重做功能
- 实现一个文件操作系统,支持创建、删除、移动文件的撤销操作
9.6.2 进阶练习
观察者模式进阶:
- 实现一个支持优先级的观察者系统
- 实现一个异步观察者模式,支持非阻塞通知
命令模式进阶:
- 实现一个支持命令序列化的系统,可以保存和恢复操作历史
- 实现一个分布式命令执行系统
9.6.3 思考题
- 如何在观察者模式中处理观察者执行异常的情况?
- 命令模式中如何处理命令执行失败后的回滚操作?
- 如何设计一个既支持观察者模式又支持命令模式的系统架构?
- 在什么情况下应该考虑使用事件总线而不是直接的观察者模式?
下一章预告: 第十章将学习状态模式和策略模式的高级应用,包括状态机的设计、策略的动态切换等内容。