9.1 章节概述

本章将深入学习两个重要的行为型设计模式:观察者模式和命令模式。这两个模式在现代软件开发中应用广泛,特别是在事件驱动编程、GUI应用程序、以及分布式系统中发挥着重要作用。

9.1.1 学习目标

  1. 理解观察者模式的设计思想和实现机制
  2. 掌握命令模式的核心概念和应用场景
  3. 学会在实际项目中应用这两种模式
  4. 了解模式的变体和最佳实践
  5. 能够选择合适的模式解决具体问题

9.1.2 应用场景预览

  • 观察者模式:事件系统、MVC架构、发布-订阅系统
  • 命令模式:撤销/重做功能、宏命令、队列操作

9.2 观察者模式(Observer Pattern)

9.2.1 模式定义与动机

定义: 观察者模式定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。

动机: - 对象间存在一对多的依赖关系 - 当一个对象状态改变时,需要通知多个对象 - 希望降低对象间的耦合度 - 支持广播通信

9.2.2 模式结构

观察者模式包含以下角色:

  1. Subject(主题/被观察者):维护观察者列表,提供注册和删除观察者的方法
  2. Observer(观察者):定义更新接口,当主题状态改变时被调用
  3. ConcreteSubject(具体主题):存储状态,状态改变时通知观察者
  4. ConcreteObserver(具体观察者):实现更新接口,保持与主题状态的一致性

9.2.3 Python实现示例:股票价格监控系统

from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
import threading
import time
import random

# 观察者接口
class Observer(ABC):
    """观察者抽象基类"""
    
    @abstractmethod
    def update(self, subject: 'Subject', *args, **kwargs) -> None:
        """当主题状态改变时被调用"""
        pass
    
    @abstractmethod
    def get_observer_id(self) -> str:
        """获取观察者ID"""
        pass

# 主题接口
class Subject(ABC):
    """主题抽象基类"""
    
    def __init__(self):
        self._observers: List[Observer] = []
        self._lock = threading.Lock()
    
    def attach(self, observer: Observer) -> bool:
        """添加观察者"""
        with self._lock:
            if observer not in self._observers:
                self._observers.append(observer)
                print(f"观察者 {observer.get_observer_id()} 已注册")
                return True
            return False
    
    def detach(self, observer: Observer) -> bool:
        """移除观察者"""
        with self._lock:
            if observer in self._observers:
                self._observers.remove(observer)
                print(f"观察者 {observer.get_observer_id()} 已移除")
                return True
            return False
    
    def notify(self, *args, **kwargs) -> None:
        """通知所有观察者"""
        with self._lock:
            observers_copy = self._observers.copy()
        
        for observer in observers_copy:
            try:
                observer.update(self, *args, **kwargs)
            except Exception as e:
                print(f"通知观察者 {observer.get_observer_id()} 时发生错误: {e}")
    
    def get_observer_count(self) -> int:
        """获取观察者数量"""
        with self._lock:
            return len(self._observers)

# 股票价格变化类型
class PriceChangeType(Enum):
    INCREASE = "上涨"
    DECREASE = "下跌"
    STABLE = "持平"

# 具体主题:股票
class Stock(Subject):
    """股票类(具体主题)"""
    
    def __init__(self, symbol: str, name: str, initial_price: float):
        super().__init__()
        self._symbol = symbol
        self._name = name
        self._price = initial_price
        self._previous_price = initial_price
        self._change_history: List[Dict[str, Any]] = []
        self._last_update = datetime.now()
    
    @property
    def symbol(self) -> str:
        return self._symbol
    
    @property
    def name(self) -> str:
        return self._name
    
    @property
    def price(self) -> float:
        return self._price
    
    @property
    def previous_price(self) -> float:
        return self._previous_price
    
    @property
    def change_percentage(self) -> float:
        """计算价格变化百分比"""
        if self._previous_price == 0:
            return 0
        return ((self._price - self._previous_price) / self._previous_price) * 100
    
    @property
    def change_type(self) -> PriceChangeType:
        """获取价格变化类型"""
        if self._price > self._previous_price:
            return PriceChangeType.INCREASE
        elif self._price < self._previous_price:
            return PriceChangeType.DECREASE
        else:
            return PriceChangeType.STABLE
    
    def set_price(self, new_price: float) -> None:
        """设置新价格并通知观察者"""
        if new_price < 0:
            raise ValueError("股票价格不能为负数")
        
        self._previous_price = self._price
        self._price = new_price
        self._last_update = datetime.now()
        
        # 记录变化历史
        change_record = {
            "timestamp": self._last_update,
            "previous_price": self._previous_price,
            "new_price": self._price,
            "change_percentage": self.change_percentage,
            "change_type": self.change_type
        }
        self._change_history.append(change_record)
        
        # 通知观察者
        self.notify(
            price_change=change_record,
            stock_info=self.get_stock_info()
        )
    
    def get_stock_info(self) -> Dict[str, Any]:
        """获取股票信息"""
        return {
            "symbol": self._symbol,
            "name": self._name,
            "current_price": self._price,
            "previous_price": self._previous_price,
            "change_percentage": self.change_percentage,
            "change_type": self.change_type.value,
            "last_update": self._last_update,
            "observer_count": self.get_observer_count()
        }
    
    def get_change_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """获取价格变化历史"""
        if limit:
            return self._change_history[-limit:]
        return self._change_history.copy()

# 具体观察者:投资者
class Investor(Observer):
    """投资者类(具体观察者)"""
    
    def __init__(self, name: str, investment_strategy: str = "保守型"):
        self._name = name
        self._investment_strategy = investment_strategy
        self._portfolio: Dict[str, Dict[str, Any]] = {}  # 投资组合
        self._notifications: List[Dict[str, Any]] = []  # 通知历史
        self._alert_thresholds: Dict[str, float] = {}  # 价格警报阈值
    
    def get_observer_id(self) -> str:
        return f"Investor-{self._name}"
    
    def update(self, subject: Subject, *args, **kwargs) -> None:
        """接收股票价格更新通知"""
        if isinstance(subject, Stock):
            price_change = kwargs.get('price_change')
            stock_info = kwargs.get('stock_info')
            
            if price_change and stock_info:
                self._process_price_update(subject, price_change, stock_info)
    
    def _process_price_update(self, stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> None:
        """处理价格更新"""
        notification = {
            "timestamp": datetime.now(),
            "stock_symbol": stock.symbol,
            "stock_name": stock.name,
            "price_change": price_change,
            "stock_info": stock_info,
            "investor_action": self._determine_action(stock, price_change)
        }
        
        self._notifications.append(notification)
        
        # 检查价格警报
        self._check_price_alerts(stock, price_change)
        
        # 输出通知信息
        self._display_notification(notification)
    
    def _determine_action(self, stock: Stock, price_change: Dict[str, Any]) -> str:
        """根据投资策略确定行动"""
        change_percentage = price_change['change_percentage']
        change_type = price_change['change_type']
        
        if self._investment_strategy == "激进型":
            if change_type == PriceChangeType.INCREASE and change_percentage > 5:
                return "考虑卖出获利"
            elif change_type == PriceChangeType.DECREASE and change_percentage < -5:
                return "考虑抄底买入"
        elif self._investment_strategy == "保守型":
            if change_type == PriceChangeType.INCREASE and change_percentage > 10:
                return "考虑部分获利了结"
            elif change_type == PriceChangeType.DECREASE and change_percentage < -10:
                return "考虑止损"
        
        return "继续观察"
    
    def _check_price_alerts(self, stock: Stock, price_change: Dict[str, Any]) -> None:
        """检查价格警报"""
        symbol = stock.symbol
        current_price = price_change['new_price']
        
        if symbol in self._alert_thresholds:
            threshold = self._alert_thresholds[symbol]
            if current_price <= threshold:
                print(f"🚨 价格警报!{stock.name}({symbol}) 价格 ${current_price:.2f} 已达到警报阈值 ${threshold:.2f}")
    
    def _display_notification(self, notification: Dict[str, Any]) -> None:
        """显示通知信息"""
        stock_info = notification['stock_info']
        price_change = notification['price_change']
        
        change_symbol = "📈" if price_change['change_type'] == PriceChangeType.INCREASE else "📉" if price_change['change_type'] == PriceChangeType.DECREASE else "➡️"
        
        print(f"{change_symbol} [{self._name}] {stock_info['name']}({stock_info['symbol']}) ")
        print(f"   价格: ${price_change['previous_price']:.2f} → ${price_change['new_price']:.2f} ")
        print(f"   变化: {price_change['change_percentage']:.2f}% | 策略: {self._investment_strategy} | 行动: {notification['investor_action']}")
    
    def set_price_alert(self, stock_symbol: str, threshold: float) -> None:
        """设置价格警报"""
        self._alert_thresholds[stock_symbol] = threshold
        print(f"为 {stock_symbol} 设置价格警报阈值: ${threshold:.2f}")
    
    def add_to_portfolio(self, stock_symbol: str, shares: int, purchase_price: float) -> None:
        """添加股票到投资组合"""
        self._portfolio[stock_symbol] = {
            "shares": shares,
            "purchase_price": purchase_price,
            "purchase_date": datetime.now()
        }
        print(f"添加 {shares} 股 {stock_symbol} 到投资组合,购买价格: ${purchase_price:.2f}")
    
    def get_portfolio_value(self, current_prices: Dict[str, float]) -> Dict[str, Any]:
        """计算投资组合价值"""
        total_value = 0
        total_cost = 0
        portfolio_details = []
        
        for symbol, holding in self._portfolio.items():
            if symbol in current_prices:
                current_price = current_prices[symbol]
                shares = holding['shares']
                purchase_price = holding['purchase_price']
                
                current_value = shares * current_price
                cost = shares * purchase_price
                profit_loss = current_value - cost
                profit_loss_percentage = (profit_loss / cost) * 100 if cost > 0 else 0
                
                portfolio_details.append({
                    "symbol": symbol,
                    "shares": shares,
                    "purchase_price": purchase_price,
                    "current_price": current_price,
                    "current_value": current_value,
                    "cost": cost,
                    "profit_loss": profit_loss,
                    "profit_loss_percentage": profit_loss_percentage
                })
                
                total_value += current_value
                total_cost += cost
        
        total_profit_loss = total_value - total_cost
        total_profit_loss_percentage = (total_profit_loss / total_cost) * 100 if total_cost > 0 else 0
        
        return {
            "investor_name": self._name,
            "total_value": total_value,
            "total_cost": total_cost,
            "total_profit_loss": total_profit_loss,
            "total_profit_loss_percentage": total_profit_loss_percentage,
            "holdings": portfolio_details
        }
    
    def get_notification_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """获取通知历史"""
        if limit:
            return self._notifications[-limit:]
        return self._notifications.copy()

# 具体观察者:交易系统
class TradingSystem(Observer):
    """自动交易系统(具体观察者)"""
    
    def __init__(self, system_name: str):
        self._system_name = system_name
        self._trading_rules: List[Dict[str, Any]] = []
        self._executed_trades: List[Dict[str, Any]] = []
        self._enabled = True
    
    def get_observer_id(self) -> str:
        return f"TradingSystem-{self._system_name}"
    
    def update(self, subject: Subject, *args, **kwargs) -> None:
        """接收股票价格更新并执行交易规则"""
        if not self._enabled:
            return
        
        if isinstance(subject, Stock):
            price_change = kwargs.get('price_change')
            stock_info = kwargs.get('stock_info')
            
            if price_change and stock_info:
                self._evaluate_trading_rules(subject, price_change, stock_info)
    
    def _evaluate_trading_rules(self, stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> None:
        """评估交易规则"""
        for rule in self._trading_rules:
            if self._rule_matches(rule, stock, price_change, stock_info):
                self._execute_trade(rule, stock, price_change, stock_info)
    
    def _rule_matches(self, rule: Dict[str, Any], stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> bool:
        """检查规则是否匹配"""
        # 检查股票符号
        if rule.get('stock_symbol') and rule['stock_symbol'] != stock.symbol:
            return False
        
        # 检查价格变化百分比
        change_percentage = price_change['change_percentage']
        if 'min_change_percentage' in rule and change_percentage < rule['min_change_percentage']:
            return False
        if 'max_change_percentage' in rule and change_percentage > rule['max_change_percentage']:
            return False
        
        # 检查价格范围
        current_price = price_change['new_price']
        if 'min_price' in rule and current_price < rule['min_price']:
            return False
        if 'max_price' in rule and current_price > rule['max_price']:
            return False
        
        # 检查变化类型
        if 'change_type' in rule and price_change['change_type'] != rule['change_type']:
            return False
        
        return True
    
    def _execute_trade(self, rule: Dict[str, Any], stock: Stock, price_change: Dict[str, Any], stock_info: Dict[str, Any]) -> None:
        """执行交易"""
        trade = {
            "timestamp": datetime.now(),
            "rule_id": rule['id'],
            "stock_symbol": stock.symbol,
            "stock_name": stock.name,
            "action": rule['action'],
            "quantity": rule.get('quantity', 100),
            "price": price_change['new_price'],
            "reason": f"价格变化 {price_change['change_percentage']:.2f}%"
        }
        
        self._executed_trades.append(trade)
        
        print(f"🤖 [{self._system_name}] 执行交易:")
        print(f"   {trade['action']} {trade['quantity']} 股 {trade['stock_name']}({trade['stock_symbol']})")
        print(f"   价格: ${trade['price']:.2f} | 原因: {trade['reason']}")
    
    def add_trading_rule(self, rule_id: str, action: str, **conditions) -> None:
        """添加交易规则"""
        rule = {
            "id": rule_id,
            "action": action,  # "BUY" 或 "SELL"
            **conditions
        }
        self._trading_rules.append(rule)
        print(f"添加交易规则: {rule_id} - {action}")
    
    def enable_trading(self) -> None:
        """启用交易"""
        self._enabled = True
        print(f"交易系统 {self._system_name} 已启用")
    
    def disable_trading(self) -> None:
        """禁用交易"""
        self._enabled = False
        print(f"交易系统 {self._system_name} 已禁用")
    
    def get_trade_history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]:
        """获取交易历史"""
        if limit:
            return self._executed_trades[-limit:]
        return self._executed_trades.copy()

# 股票市场模拟器
class StockMarketSimulator:
    """股票市场模拟器"""
    
    def __init__(self):
        self._stocks: Dict[str, Stock] = {}
        self._running = False
        self._simulation_thread: Optional[threading.Thread] = None
    
    def add_stock(self, symbol: str, name: str, initial_price: float) -> Stock:
        """添加股票"""
        stock = Stock(symbol, name, initial_price)
        self._stocks[symbol] = stock
        print(f"添加股票: {name}({symbol}) 初始价格: ${initial_price:.2f}")
        return stock
    
    def get_stock(self, symbol: str) -> Optional[Stock]:
        """获取股票"""
        return self._stocks.get(symbol)
    
    def start_simulation(self, duration_seconds: int = 30, update_interval: float = 2.0) -> None:
        """开始市场模拟"""
        if self._running:
            print("模拟已在运行中")
            return
        
        self._running = True
        self._simulation_thread = threading.Thread(
            target=self._simulate_market,
            args=(duration_seconds, update_interval)
        )
        self._simulation_thread.start()
        print(f"开始股票市场模拟,持续 {duration_seconds} 秒")
    
    def stop_simulation(self) -> None:
        """停止市场模拟"""
        self._running = False
        if self._simulation_thread:
            self._simulation_thread.join()
        print("股票市场模拟已停止")
    
    def _simulate_market(self, duration_seconds: int, update_interval: float) -> None:
        """模拟市场价格变化"""
        start_time = time.time()
        
        while self._running and (time.time() - start_time) < duration_seconds:
            for stock in self._stocks.values():
                # 生成随机价格变化
                change_factor = random.uniform(-0.05, 0.05)  # -5% 到 +5%
                new_price = stock.price * (1 + change_factor)
                new_price = max(0.01, new_price)  # 确保价格不为负
                
                stock.set_price(new_price)
            
            time.sleep(update_interval)
        
        self._running = False
    
    def get_current_prices(self) -> Dict[str, float]:
        """获取当前所有股票价格"""
        return {symbol: stock.price for symbol, stock in self._stocks.items()}
    
    def get_market_summary(self) -> Dict[str, Any]:
        """获取市场摘要"""
        summary = {
            "total_stocks": len(self._stocks),
            "stocks": [],
            "market_status": "运行中" if self._running else "已停止"
        }
        
        for stock in self._stocks.values():
            stock_info = stock.get_stock_info()
            summary["stocks"].append(stock_info)
        
        return summary

# 使用示例
def demonstrate_observer_pattern():
    """演示观察者模式"""
    print("=== 观察者模式演示:股票价格监控系统 ===")
    
    # 创建股票市场模拟器
    market = StockMarketSimulator()
    
    # 添加股票
    apple_stock = market.add_stock("AAPL", "苹果公司", 150.00)
    google_stock = market.add_stock("GOOGL", "谷歌", 2800.00)
    tesla_stock = market.add_stock("TSLA", "特斯拉", 800.00)
    
    # 创建投资者观察者
    conservative_investor = Investor("张三", "保守型")
    aggressive_investor = Investor("李四", "激进型")
    
    # 创建交易系统观察者
    trading_system = TradingSystem("AlgoTrader-1")
    
    # 为投资者设置投资组合和警报
    conservative_investor.add_to_portfolio("AAPL", 100, 145.00)
    conservative_investor.add_to_portfolio("GOOGL", 10, 2750.00)
    conservative_investor.set_price_alert("AAPL", 140.00)
    
    aggressive_investor.add_to_portfolio("TSLA", 50, 780.00)
    aggressive_investor.set_price_alert("TSLA", 750.00)
    
    # 为交易系统添加交易规则
    trading_system.add_trading_rule(
        "buy_dip_apple",
        "BUY",
        stock_symbol="AAPL",
        max_change_percentage=-3.0,
        quantity=50
    )
    
    trading_system.add_trading_rule(
        "sell_high_tesla",
        "SELL",
        stock_symbol="TSLA",
        min_change_percentage=5.0,
        quantity=25
    )
    
    # 注册观察者
    apple_stock.attach(conservative_investor)
    apple_stock.attach(aggressive_investor)
    apple_stock.attach(trading_system)
    
    google_stock.attach(conservative_investor)
    google_stock.attach(trading_system)
    
    tesla_stock.attach(aggressive_investor)
    tesla_stock.attach(trading_system)
    
    print("\n=== 开始市场模拟 ===")
    
    # 手动触发一些价格变化进行演示
    print("\n--- 手动价格变化演示 ---")
    apple_stock.set_price(145.50)  # 小幅下跌
    time.sleep(1)
    
    tesla_stock.set_price(840.00)  # 上涨5%
    time.sleep(1)
    
    apple_stock.set_price(140.00)  # 触发警报
    time.sleep(1)
    
    google_stock.set_price(2720.00)  # 下跌约3%
    time.sleep(1)
    
    # 显示投资组合价值
    print("\n=== 投资组合价值 ===")
    current_prices = market.get_current_prices()
    
    conservative_portfolio = conservative_investor.get_portfolio_value(current_prices)
    print(f"\n{conservative_portfolio['investor_name']} 的投资组合:")
    print(f"总价值: ${conservative_portfolio['total_value']:.2f}")
    print(f"总成本: ${conservative_portfolio['total_cost']:.2f}")
    print(f"盈亏: ${conservative_portfolio['total_profit_loss']:.2f} ({conservative_portfolio['total_profit_loss_percentage']:.2f}%)")
    
    aggressive_portfolio = aggressive_investor.get_portfolio_value(current_prices)
    print(f"\n{aggressive_portfolio['investor_name']} 的投资组合:")
    print(f"总价值: ${aggressive_portfolio['total_value']:.2f}")
    print(f"总成本: ${aggressive_portfolio['total_cost']:.2f}")
    print(f"盈亏: ${aggressive_portfolio['total_profit_loss']:.2f} ({aggressive_portfolio['total_profit_loss_percentage']:.2f}%)")
    
    # 显示交易历史
    print("\n=== 自动交易历史 ===")
    trades = trading_system.get_trade_history()
    for i, trade in enumerate(trades, 1):
        print(f"{i}. {trade['timestamp'].strftime('%H:%M:%S')} - {trade['action']} {trade['quantity']} {trade['stock_symbol']} @ ${trade['price']:.2f}")
    
    # 移除观察者演示
    print("\n--- 移除观察者演示 ---")
    apple_stock.detach(aggressive_investor)
    apple_stock.set_price(142.00)  # 李四不会收到通知
    
    # 显示市场摘要
    print("\n=== 市场摘要 ===")
    market_summary = market.get_market_summary()
    print(f"股票总数: {market_summary['total_stocks']}")
    print(f"市场状态: {market_summary['market_status']}")
    for stock_info in market_summary['stocks']:
        print(f"{stock_info['name']}({stock_info['symbol']}): ${stock_info['current_price']:.2f} ({stock_info['change_percentage']:+.2f}%) - {stock_info['observer_count']} 个观察者")

if __name__ == "__main__":
    demonstrate_observer_pattern()

9.2.4 观察者模式的优缺点

优点: 1. 松耦合:主题和观察者之间松耦合,可以独立变化 2. 动态关系:可以在运行时建立对象间的联系 3. 广播通信:支持一对多的通信机制 4. 符合开闭原则:增加新的观察者不需要修改主题

缺点: 1. 性能问题:观察者过多时通知会影响性能 2. 内存泄漏:观察者没有正确移除可能导致内存泄漏 3. 循环依赖:观察者和主题间可能形成循环依赖 4. 调试困难:间接调用使得调试变得复杂

9.2.5 适用场景

  1. 事件系统:GUI事件处理、游戏事件系统
  2. MVC架构:模型变化时通知视图更新
  3. 发布-订阅系统:消息队列、事件总线
  4. 数据绑定:数据变化时自动更新UI

9.3 命令模式(Command Pattern)

9.3.1 模式定义与动机

定义: 命令模式将一个请求封装为一个对象,从而使你可用不同的请求对客户进行参数化;对请求排队或记录请求日志,以及支持可撤销的操作。

动机: - 需要将请求调用者和接收者解耦 - 需要支持撤销/重做操作 - 需要支持日志记录和事务处理 - 需要支持宏命令(组合命令)

9.3.2 模式结构

命令模式包含以下角色:

  1. Command(抽象命令):定义执行操作的接口
  2. ConcreteCommand(具体命令):实现具体的操作,绑定接收者
  3. Receiver(接收者):执行具体操作的对象
  4. Invoker(调用者):调用命令对象执行请求
  5. Client(客户端):创建具体命令对象并设置接收者

9.3.3 Python实现示例:文本编辑器

from abc import ABC, abstractmethod
from typing import List, Optional, Dict, Any
from datetime import datetime
from enum import Enum
import copy

# 命令接口
class Command(ABC):
    """命令抽象基类"""
    
    @abstractmethod
    def execute(self) -> bool:
        """执行命令"""
        pass
    
    @abstractmethod
    def undo(self) -> bool:
        """撤销命令"""
        pass
    
    @abstractmethod
    def get_description(self) -> str:
        """获取命令描述"""
        pass
    
    def can_merge_with(self, other: 'Command') -> bool:
        """是否可以与其他命令合并"""
        return False
    
    def merge_with(self, other: 'Command') -> 'Command':
        """与其他命令合并"""
        raise NotImplementedError("此命令不支持合并")

# 接收者:文本文档
class TextDocument:
    """文本文档类(接收者)"""
    
    def __init__(self, initial_content: str = ""):
        self._content = initial_content
        self._cursor_position = len(initial_content)
        self._selection_start: Optional[int] = None
        self._selection_end: Optional[int] = None
        self._modification_count = 0
        self._last_modified = datetime.now()
    
    @property
    def content(self) -> str:
        return self._content
    
    @property
    def cursor_position(self) -> int:
        return self._cursor_position
    
    @property
    def length(self) -> int:
        return len(self._content)
    
    @property
    def has_selection(self) -> bool:
        return self._selection_start is not None and self._selection_end is not None
    
    def get_selected_text(self) -> str:
        """获取选中的文本"""
        if not self.has_selection:
            return ""
        start = min(self._selection_start, self._selection_end)
        end = max(self._selection_start, self._selection_end)
        return self._content[start:end]
    
    def set_cursor_position(self, position: int) -> None:
        """设置光标位置"""
        self._cursor_position = max(0, min(position, len(self._content)))
        self._clear_selection()
    
    def set_selection(self, start: int, end: int) -> None:
        """设置选择范围"""
        self._selection_start = max(0, min(start, len(self._content)))
        self._selection_end = max(0, min(end, len(self._content)))
        self._cursor_position = self._selection_end
    
    def _clear_selection(self) -> None:
        """清除选择"""
        self._selection_start = None
        self._selection_end = None
    
    def insert_text(self, position: int, text: str) -> None:
        """在指定位置插入文本"""
        position = max(0, min(position, len(self._content)))
        self._content = self._content[:position] + text + self._content[position:]
        self._cursor_position = position + len(text)
        self._clear_selection()
        self._mark_modified()
    
    def delete_text(self, start: int, end: int) -> str:
        """删除指定范围的文本,返回被删除的文本"""
        start = max(0, min(start, len(self._content)))
        end = max(start, min(end, len(self._content)))
        
        deleted_text = self._content[start:end]
        self._content = self._content[:start] + self._content[end:]
        self._cursor_position = start
        self._clear_selection()
        self._mark_modified()
        
        return deleted_text
    
    def replace_text(self, start: int, end: int, new_text: str) -> str:
        """替换指定范围的文本,返回被替换的文本"""
        old_text = self.delete_text(start, end)
        self.insert_text(start, new_text)
        return old_text
    
    def _mark_modified(self) -> None:
        """标记文档已修改"""
        self._modification_count += 1
        self._last_modified = datetime.now()
    
    def get_document_info(self) -> Dict[str, Any]:
        """获取文档信息"""
        return {
            "length": len(self._content),
            "cursor_position": self._cursor_position,
            "has_selection": self.has_selection,
            "selected_text": self.get_selected_text(),
            "modification_count": self._modification_count,
            "last_modified": self._last_modified
        }
    
    def get_preview(self, max_length: int = 50) -> str:
        """获取文档预览"""
        if len(self._content) <= max_length:
            return self._content
        return self._content[:max_length] + "..."

# 具体命令:插入文本
class InsertTextCommand(Command):
    """插入文本命令"""
    
    def __init__(self, document: TextDocument, position: int, text: str):
        self._document = document
        self._position = position
        self._text = text
        self._executed = False
    
    def execute(self) -> bool:
        """执行插入操作"""
        try:
            self._document.insert_text(self._position, self._text)
            self._executed = True
            return True
        except Exception as e:
            print(f"插入文本失败: {e}")
            return False
    
    def undo(self) -> bool:
        """撤销插入操作"""
        if not self._executed:
            return False
        
        try:
            self._document.delete_text(self._position, self._position + len(self._text))
            self._executed = False
            return True
        except Exception as e:
            print(f"撤销插入失败: {e}")
            return False
    
    def get_description(self) -> str:
        preview = self._text[:20] + "..." if len(self._text) > 20 else self._text
        return f"插入文本 '{preview}' 在位置 {self._position}"
    
    def can_merge_with(self, other: Command) -> bool:
        """检查是否可以与其他插入命令合并"""
        if not isinstance(other, InsertTextCommand):
            return False
        
        # 只有连续的插入操作才能合并
        return (other._position == self._position + len(self._text) and 
                len(self._text) < 100 and len(other._text) < 100)
    
    def merge_with(self, other: Command) -> Command:
        """与其他插入命令合并"""
        if not self.can_merge_with(other):
            raise ValueError("无法合并这两个命令")
        
        merged_text = self._text + other._text
        return InsertTextCommand(self._document, self._position, merged_text)

# 具体命令:删除文本
class DeleteTextCommand(Command):
    """删除文本命令"""
    
    def __init__(self, document: TextDocument, start: int, end: int):
        self._document = document
        self._start = start
        self._end = end
        self._deleted_text: Optional[str] = None
        self._executed = False
    
    def execute(self) -> bool:
        """执行删除操作"""
        try:
            self._deleted_text = self._document.delete_text(self._start, self._end)
            self._executed = True
            return True
        except Exception as e:
            print(f"删除文本失败: {e}")
            return False
    
    def undo(self) -> bool:
        """撤销删除操作"""
        if not self._executed or self._deleted_text is None:
            return False
        
        try:
            self._document.insert_text(self._start, self._deleted_text)
            self._executed = False
            return True
        except Exception as e:
            print(f"撤销删除失败: {e}")
            return False
    
    def get_description(self) -> str:
        if self._deleted_text:
            preview = self._deleted_text[:20] + "..." if len(self._deleted_text) > 20 else self._deleted_text
            return f"删除文本 '{preview}' 从位置 {self._start} 到 {self._end}"
        return f"删除文本从位置 {self._start} 到 {self._end}"

# 具体命令:替换文本
class ReplaceTextCommand(Command):
    """替换文本命令"""
    
    def __init__(self, document: TextDocument, start: int, end: int, new_text: str):
        self._document = document
        self._start = start
        self._end = end
        self._new_text = new_text
        self._old_text: Optional[str] = None
        self._executed = False
    
    def execute(self) -> bool:
        """执行替换操作"""
        try:
            self._old_text = self._document.replace_text(self._start, self._end, self._new_text)
            self._executed = True
            return True
        except Exception as e:
            print(f"替换文本失败: {e}")
            return False
    
    def undo(self) -> bool:
        """撤销替换操作"""
        if not self._executed or self._old_text is None:
            return False
        
        try:
            self._document.replace_text(self._start, self._start + len(self._new_text), self._old_text)
            self._executed = False
            return True
        except Exception as e:
            print(f"撤销替换失败: {e}")
            return False
    
    def get_description(self) -> str:
        old_preview = self._old_text[:15] + "..." if self._old_text and len(self._old_text) > 15 else (self._old_text or "")
        new_preview = self._new_text[:15] + "..." if len(self._new_text) > 15 else self._new_text
        return f"替换 '{old_preview}' 为 '{new_preview}'"

# 宏命令:组合命令
class MacroCommand(Command):
    """宏命令(组合命令)"""
    
    def __init__(self, name: str, commands: List[Command] = None):
        self._name = name
        self._commands: List[Command] = commands or []
        self._executed_commands: List[Command] = []
    
    def add_command(self, command: Command) -> None:
        """添加命令"""
        self._commands.append(command)
    
    def execute(self) -> bool:
        """执行所有命令"""
        self._executed_commands.clear()
        
        for command in self._commands:
            if command.execute():
                self._executed_commands.append(command)
            else:
                # 如果某个命令执行失败,撤销已执行的命令
                self._undo_executed_commands()
                return False
        
        return True
    
    def undo(self) -> bool:
        """撤销所有已执行的命令"""
        return self._undo_executed_commands()
    
    def _undo_executed_commands(self) -> bool:
        """撤销已执行的命令"""
        success = True
        # 逆序撤销
        for command in reversed(self._executed_commands):
            if not command.undo():
                success = False
        
        self._executed_commands.clear()
        return success
    
    def get_description(self) -> str:
        return f"宏命令 '{self._name}' ({len(self._commands)} 个子命令)"
    
    def get_command_count(self) -> int:
        return len(self._commands)

# 调用者:命令管理器
class CommandManager:
    """命令管理器(调用者)"""
    
    def __init__(self, max_history_size: int = 100):
        self._history: List[Command] = []
        self._current_index = -1
        self._max_history_size = max_history_size
        self._command_merging_enabled = True
    
    def execute_command(self, command: Command) -> bool:
        """执行命令"""
        # 尝试与上一个命令合并
        if (self._command_merging_enabled and 
            self._current_index >= 0 and 
            self._current_index < len(self._history)):
            
            last_command = self._history[self._current_index]
            if last_command.can_merge_with(command):
                try:
                    merged_command = last_command.merge_with(command)
                    # 撤销上一个命令
                    last_command.undo()
                    # 执行合并后的命令
                    if merged_command.execute():
                        self._history[self._current_index] = merged_command
                        return True
                    else:
                        # 如果合并命令执行失败,重新执行原命令
                        last_command.execute()
                        return False
                except Exception:
                    # 合并失败,继续正常执行
                    pass
        
        # 执行命令
        if not command.execute():
            return False
        
        # 清除当前位置之后的历史记录
        self._history = self._history[:self._current_index + 1]
        
        # 添加到历史记录
        self._history.append(command)
        self._current_index += 1
        
        # 限制历史记录大小
        if len(self._history) > self._max_history_size:
            self._history.pop(0)
            self._current_index -= 1
        
        return True
    
    def undo(self) -> bool:
        """撤销操作"""
        if not self.can_undo():
            return False
        
        command = self._history[self._current_index]
        if command.undo():
            self._current_index -= 1
            return True
        
        return False
    
    def redo(self) -> bool:
        """重做操作"""
        if not self.can_redo():
            return False
        
        command = self._history[self._current_index + 1]
        if command.execute():
            self._current_index += 1
            return True
        
        return False
    
    def can_undo(self) -> bool:
        """是否可以撤销"""
        return self._current_index >= 0
    
    def can_redo(self) -> bool:
        """是否可以重做"""
        return self._current_index < len(self._history) - 1
    
    def get_history(self) -> List[str]:
        """获取命令历史描述"""
        history = []
        for i, command in enumerate(self._history):
            prefix = "→ " if i == self._current_index else "  "
            history.append(f"{prefix}{command.get_description()}")
        return history
    
    def clear_history(self) -> None:
        """清除历史记录"""
        self._history.clear()
        self._current_index = -1
    
    def enable_command_merging(self) -> None:
        """启用命令合并"""
        self._command_merging_enabled = True
    
    def disable_command_merging(self) -> None:
        """禁用命令合并"""
        self._command_merging_enabled = False
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            "total_commands": len(self._history),
            "current_position": self._current_index + 1,
            "can_undo": self.can_undo(),
            "can_redo": self.can_redo(),
            "merging_enabled": self._command_merging_enabled,
            "max_history_size": self._max_history_size
        }

# 文本编辑器类
class TextEditor:
    """文本编辑器"""
    
    def __init__(self):
        self._document = TextDocument()
        self._command_manager = CommandManager()
        self._clipboard = ""
    
    def insert_text(self, text: str) -> bool:
        """插入文本"""
        command = InsertTextCommand(self._document, self._document.cursor_position, text)
        return self._command_manager.execute_command(command)
    
    def delete_selection(self) -> bool:
        """删除选中的文本"""
        if not self._document.has_selection:
            return False
        
        selected_text = self._document.get_selected_text()
        start = min(self._document._selection_start, self._document._selection_end)
        end = max(self._document._selection_start, self._document._selection_end)
        
        command = DeleteTextCommand(self._document, start, end)
        return self._command_manager.execute_command(command)
    
    def replace_selection(self, new_text: str) -> bool:
        """替换选中的文本"""
        if not self._document.has_selection:
            return self.insert_text(new_text)
        
        start = min(self._document._selection_start, self._document._selection_end)
        end = max(self._document._selection_start, self._document._selection_end)
        
        command = ReplaceTextCommand(self._document, start, end, new_text)
        return self._command_manager.execute_command(command)
    
    def cut(self) -> bool:
        """剪切"""
        if not self._document.has_selection:
            return False
        
        self._clipboard = self._document.get_selected_text()
        return self.delete_selection()
    
    def copy(self) -> bool:
        """复制"""
        if not self._document.has_selection:
            return False
        
        self._clipboard = self._document.get_selected_text()
        return True
    
    def paste(self) -> bool:
        """粘贴"""
        if not self._clipboard:
            return False
        
        if self._document.has_selection:
            return self.replace_selection(self._clipboard)
        else:
            return self.insert_text(self._clipboard)
    
    def undo(self) -> bool:
        """撤销"""
        return self._command_manager.undo()
    
    def redo(self) -> bool:
        """重做"""
        return self._command_manager.redo()
    
    def select_all(self) -> None:
        """全选"""
        self._document.set_selection(0, self._document.length)
    
    def set_cursor_position(self, position: int) -> None:
        """设置光标位置"""
        self._document.set_cursor_position(position)
    
    def set_selection(self, start: int, end: int) -> None:
        """设置选择范围"""
        self._document.set_selection(start, end)
    
    def create_macro(self, name: str, commands: List[str]) -> MacroCommand:
        """创建宏命令"""
        macro = MacroCommand(name)
        
        for cmd_str in commands:
            if cmd_str.startswith("insert:"):
                text = cmd_str[7:]  # 移除 "insert:" 前缀
                command = InsertTextCommand(self._document, self._document.cursor_position, text)
                macro.add_command(command)
            # 可以添加更多命令类型的解析
        
        return macro
    
    def execute_macro(self, macro: MacroCommand) -> bool:
        """执行宏命令"""
        return self._command_manager.execute_command(macro)
    
    def get_content(self) -> str:
        """获取文档内容"""
        return self._document.content
    
    def get_document_info(self) -> Dict[str, Any]:
        """获取文档信息"""
        return self._document.get_document_info()
    
    def get_command_history(self) -> List[str]:
        """获取命令历史"""
        return self._command_manager.get_history()
    
    def get_statistics(self) -> Dict[str, Any]:
        """获取编辑器统计信息"""
        doc_info = self.get_document_info()
        cmd_stats = self._command_manager.get_statistics()
        
        return {
            "document": doc_info,
            "commands": cmd_stats,
            "clipboard_length": len(self._clipboard)
        }

# 使用示例
def demonstrate_command_pattern():
    """演示命令模式"""
    print("=== 命令模式演示:文本编辑器 ===")
    
    # 创建文本编辑器
    editor = TextEditor()
    
    print("\n--- 基本编辑操作 ---")
    
    # 插入文本
    editor.insert_text("Hello, ")
    print(f"插入后: '{editor.get_content()}'")
    
    editor.insert_text("World!")
    print(f"再次插入后: '{editor.get_content()}'")
    
    # 选择文本并替换
    editor.set_selection(7, 12)  # 选择 "World"
    print(f"选中文本: '{editor._document.get_selected_text()}'")
    
    editor.replace_selection("Python")
    print(f"替换后: '{editor.get_content()}'")
    
    # 添加更多文本
    editor.set_cursor_position(len(editor.get_content()))
    editor.insert_text(" Programming is fun!")
    print(f"添加文本后: '{editor.get_content()}'")
    
    print("\n--- 撤销/重做操作 ---")
    
    # 显示命令历史
    print("命令历史:")
    for i, cmd in enumerate(editor.get_command_history(), 1):
        print(f"  {i}. {cmd}")
    
    # 撤销操作
    print("\n执行撤销操作:")
    for i in range(3):
        if editor.undo():
            print(f"撤销 {i+1}: '{editor.get_content()}'")
        else:
            print(f"无法继续撤销")
            break
    
    # 重做操作
    print("\n执行重做操作:")
    for i in range(2):
        if editor.redo():
            print(f"重做 {i+1}: '{editor.get_content()}'")
        else:
            print(f"无法继续重做")
            break
    
    print("\n--- 剪切、复制、粘贴操作 ---")
    
    # 选择并复制
    editor.set_selection(0, 5)  # 选择 "Hello"
    editor.copy()
    print(f"复制文本: '{editor._clipboard}'")
    
    # 移动光标到末尾并粘贴
    editor.set_cursor_position(len(editor.get_content()))
    editor.paste()
    print(f"粘贴后: '{editor.get_content()}'")
    
    # 选择并剪切
    editor.set_selection(len(editor.get_content()) - 5, len(editor.get_content()))  # 选择最后的 "Hello"
    editor.cut()
    print(f"剪切后: '{editor.get_content()}'")
    print(f"剪贴板: '{editor._clipboard}'")
    
    print("\n--- 宏命令演示 ---")
    
    # 创建宏命令
    macro_commands = [
        "insert: Welcome to ",
        "insert:Python ",
        "insert:programming!"
    ]
    
    macro = editor.create_macro("欢迎消息", macro_commands)
    print(f"创建宏命令: {macro.get_description()}")
    
    # 执行宏命令
    editor.execute_macro(macro)
    print(f"执行宏命令后: '{editor.get_content()}'")
    
    # 撤销宏命令(一次撤销整个宏)
    editor.undo()
    print(f"撤销宏命令后: '{editor.get_content()}'")
    
    print("\n--- 编辑器统计信息 ---")
    stats = editor.get_statistics()
    print(f"文档长度: {stats['document']['length']}")
    print(f"光标位置: {stats['document']['cursor_position']}")
    print(f"修改次数: {stats['document']['modification_count']}")
    print(f"命令总数: {stats['commands']['total_commands']}")
    print(f"当前位置: {stats['commands']['current_position']}")
    print(f"可撤销: {stats['commands']['can_undo']}")
    print(f"可重做: {stats['commands']['can_redo']}")
    
    print("\n--- 最终命令历史 ---")
    for i, cmd in enumerate(editor.get_command_history(), 1):
        print(f"  {i}. {cmd}")

if __name__ == "__main__":
    demonstrate_command_pattern()

9.3.4 Java实现示例:图形编辑器

import java.util.*;
import java.time.LocalDateTime;

// 命令接口
interface Command {
    boolean execute();
    boolean undo();
    String getDescription();
    
    default boolean canMergeWith(Command other) {
        return false;
    }
    
    default Command mergeWith(Command other) {
        throw new UnsupportedOperationException("此命令不支持合并");
    }
}

// 接收者:图形对象
class Shape {
    private String id;
    private String type;
    private double x, y;
    private double width, height;
    private String color;
    private boolean visible;
    
    public Shape(String id, String type, double x, double y, double width, double height) {
        this.id = id;
        this.type = type;
        this.x = x;
        this.y = y;
        this.width = width;
        this.height = height;
        this.color = "black";
        this.visible = true;
    }
    
    // Getters and Setters
    public String getId() { return id; }
    public String getType() { return type; }
    public double getX() { return x; }
    public void setX(double x) { this.x = x; }
    public double getY() { return y; }
    public void setY(double y) { this.y = y; }
    public double getWidth() { return width; }
    public void setWidth(double width) { this.width = width; }
    public double getHeight() { return height; }
    public void setHeight(double height) { this.height = height; }
    public String getColor() { return color; }
    public void setColor(String color) { this.color = color; }
    public boolean isVisible() { return visible; }
    public void setVisible(boolean visible) { this.visible = visible; }
    
    public void move(double deltaX, double deltaY) {
        this.x += deltaX;
        this.y += deltaY;
    }
    
    public void resize(double newWidth, double newHeight) {
        this.width = newWidth;
        this.height = newHeight;
    }
    
    @Override
    public String toString() {
        return String.format("%s[%s] at (%.1f,%.1f) size %.1fx%.1f color=%s visible=%s",
                type, id, x, y, width, height, color, visible);
    }
    
    public Shape clone() {
        Shape cloned = new Shape(id + "_copy", type, x, y, width, height);
        cloned.setColor(color);
        cloned.setVisible(visible);
        return cloned;
    }
}

// 接收者:画布
class Canvas {
    private Map<String, Shape> shapes;
    private List<String> layerOrder;
    
    public Canvas() {
        this.shapes = new HashMap<>();
        this.layerOrder = new ArrayList<>();
    }
    
    public void addShape(Shape shape) {
        shapes.put(shape.getId(), shape);
        layerOrder.add(shape.getId());
    }
    
    public Shape removeShape(String shapeId) {
        Shape removed = shapes.remove(shapeId);
        layerOrder.remove(shapeId);
        return removed;
    }
    
    public Shape getShape(String shapeId) {
        return shapes.get(shapeId);
    }
    
    public List<Shape> getAllShapes() {
        return layerOrder.stream()
                .map(shapes::get)
                .filter(Objects::nonNull)
                .collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
    }
    
    public void clear() {
        shapes.clear();
        layerOrder.clear();
    }
    
    public int getShapeCount() {
        return shapes.size();
    }
    
    public void printCanvas() {
        System.out.println("画布内容:");
        if (shapes.isEmpty()) {
            System.out.println("  (空画布)");
        } else {
            for (String shapeId : layerOrder) {
                Shape shape = shapes.get(shapeId);
                if (shape != null) {
                    System.out.println("  " + shape);
                }
            }
        }
    }
}

// 具体命令:创建图形
class CreateShapeCommand implements Command {
    private Canvas canvas;
    private Shape shape;
    private boolean executed;
    
    public CreateShapeCommand(Canvas canvas, Shape shape) {
        this.canvas = canvas;
        this.shape = shape;
        this.executed = false;
    }
    
    @Override
    public boolean execute() {
        try {
            canvas.addShape(shape);
            executed = true;
            return true;
        } catch (Exception e) {
            System.err.println("创建图形失败: " + e.getMessage());
            return false;
        }
    }
    
    @Override
    public boolean undo() {
        if (!executed) return false;
        
        try {
            canvas.removeShape(shape.getId());
            executed = false;
            return true;
        } catch (Exception e) {
            System.err.println("撤销创建失败: " + e.getMessage());
            return false;
        }
    }
    
    @Override
    public String getDescription() {
        return "创建 " + shape.getType() + " [" + shape.getId() + "]";
    }
}

// 具体命令:删除图形
class DeleteShapeCommand implements Command {
    private Canvas canvas;
    private String shapeId;
    private Shape deletedShape;
    private boolean executed;
    
    public DeleteShapeCommand(Canvas canvas, String shapeId) {
        this.canvas = canvas;
        this.shapeId = shapeId;
        this.executed = false;
    }
    
    @Override
    public boolean execute() {
        try {
            deletedShape = canvas.removeShape(shapeId);
            executed = (deletedShape != null);
            return executed;
        } catch (Exception e) {
            System.err.println("删除图形失败: " + e.getMessage());
            return false;
        }
    }
    
    @Override
    public boolean undo() {
        if (!executed || deletedShape == null) return false;
        
        try {
            canvas.addShape(deletedShape);
            executed = false;
            return true;
        } catch (Exception e) {
            System.err.println("撤销删除失败: " + e.getMessage());
            return false;
        }
    }
    
    @Override
    public String getDescription() {
        return "删除图形 [" + shapeId + "]";
    }
}

// 具体命令:移动图形
class MoveShapeCommand implements Command {
    private Canvas canvas;
    private String shapeId;
    private double deltaX, deltaY;
    private boolean executed;
    
    public MoveShapeCommand(Canvas canvas, String shapeId, double deltaX, double deltaY) {
        this.canvas = canvas;
        this.shapeId = shapeId;
        this.deltaX = deltaX;
        this.deltaY = deltaY;
        this.executed = false;
    }
    
    @Override
    public boolean execute() {
        Shape shape = canvas.getShape(shapeId);
        if (shape == null) return false;
        
        try {
            shape.move(deltaX, deltaY);
            executed = true;
            return true;
        } catch (Exception e) {
            System.err.println("移动图形失败: " + e.getMessage());
            return false;
        }
    }
    
    @Override
    public boolean undo() {
        if (!executed) return false;
        
        Shape shape = canvas.getShape(shapeId);
        if (shape == null) return false;
        
        try {
            shape.move(-deltaX, -deltaY);
            executed = false;
            return true;
        } catch (Exception e) {
            System.err.println("撤销移动失败: " + e.getMessage());
            return false;
        }
    }
    
    @Override
    public String getDescription() {
        return String.format("移动 [%s] (%.1f, %.1f)", shapeId, deltaX, deltaY);
    }
    
    @Override
    public boolean canMergeWith(Command other) {
        if (!(other instanceof MoveShapeCommand)) return false;
        
        MoveShapeCommand otherMove = (MoveShapeCommand) other;
        return this.shapeId.equals(otherMove.shapeId);
    }
    
    @Override
    public Command mergeWith(Command other) {
        if (!canMergeWith(other)) {
            throw new IllegalArgumentException("无法合并这两个命令");
        }
        
        MoveShapeCommand otherMove = (MoveShapeCommand) other;
        return new MoveShapeCommand(canvas, shapeId, 
                deltaX + otherMove.deltaX, deltaY + otherMove.deltaY);
    }
}

// 宏命令
class MacroCommand implements Command {
    private String name;
    private List<Command> commands;
    private List<Command> executedCommands;
    
    public MacroCommand(String name) {
        this.name = name;
        this.commands = new ArrayList<>();
        this.executedCommands = new ArrayList<>();
    }
    
    public void addCommand(Command command) {
        commands.add(command);
    }
    
    @Override
    public boolean execute() {
        executedCommands.clear();
        
        for (Command command : commands) {
            if (command.execute()) {
                executedCommands.add(command);
            } else {
                // 执行失败,撤销已执行的命令
                undoExecutedCommands();
                return false;
            }
        }
        
        return true;
    }
    
    @Override
    public boolean undo() {
        return undoExecutedCommands();
    }
    
    private boolean undoExecutedCommands() {
        boolean success = true;
        // 逆序撤销
        for (int i = executedCommands.size() - 1; i >= 0; i--) {
            if (!executedCommands.get(i).undo()) {
                success = false;
            }
        }
        executedCommands.clear();
        return success;
    }
    
    @Override
    public String getDescription() {
        return "宏命令 '" + name + "' (" + commands.size() + " 个子命令)";
    }
}

// 命令管理器
class CommandManager {
    private List<Command> history;
    private int currentIndex;
    private int maxHistorySize;
    private boolean mergingEnabled;
    
    public CommandManager(int maxHistorySize) {
        this.history = new ArrayList<>();
        this.currentIndex = -1;
        this.maxHistorySize = maxHistorySize;
        this.mergingEnabled = true;
    }
    
    public boolean executeCommand(Command command) {
        // 尝试与上一个命令合并
        if (mergingEnabled && currentIndex >= 0 && currentIndex < history.size()) {
            Command lastCommand = history.get(currentIndex);
            if (lastCommand.canMergeWith(command)) {
                try {
                    Command mergedCommand = lastCommand.mergeWith(command);
                    // 撤销上一个命令
                    lastCommand.undo();
                    // 执行合并后的命令
                    if (mergedCommand.execute()) {
                        history.set(currentIndex, mergedCommand);
                        return true;
                    } else {
                        // 合并命令执行失败,重新执行原命令
                        lastCommand.execute();
                        return false;
                    }
                } catch (Exception e) {
                    // 合并失败,继续正常执行
                }
            }
        }
        
        // 执行命令
        if (!command.execute()) {
            return false;
        }
        
        // 清除当前位置之后的历史记录
        if (currentIndex < history.size() - 1) {
            history.subList(currentIndex + 1, history.size()).clear();
        }
        
        // 添加到历史记录
        history.add(command);
        currentIndex++;
        
        // 限制历史记录大小
        if (history.size() > maxHistorySize) {
            history.remove(0);
            currentIndex--;
        }
        
        return true;
    }
    
    public boolean undo() {
        if (!canUndo()) return false;
        
        Command command = history.get(currentIndex);
        if (command.undo()) {
            currentIndex--;
            return true;
        }
        
        return false;
    }
    
    public boolean redo() {
        if (!canRedo()) return false;
        
        Command command = history.get(currentIndex + 1);
        if (command.execute()) {
            currentIndex++;
            return true;
        }
        
        return false;
    }
    
    public boolean canUndo() {
        return currentIndex >= 0;
    }
    
    public boolean canRedo() {
        return currentIndex < history.size() - 1;
    }
    
    public List<String> getHistory() {
        List<String> result = new ArrayList<>();
        for (int i = 0; i < history.size(); i++) {
            String prefix = (i == currentIndex) ? "→ " : "  ";
            result.add(prefix + history.get(i).getDescription());
        }
        return result;
    }
    
    public void clearHistory() {
        history.clear();
        currentIndex = -1;
    }
    
    public void setMergingEnabled(boolean enabled) {
        this.mergingEnabled = enabled;
    }
}

// 图形编辑器
class GraphicsEditor {
    private Canvas canvas;
    private CommandManager commandManager;
    
    public GraphicsEditor() {
        this.canvas = new Canvas();
        this.commandManager = new CommandManager(50);
    }
    
    public boolean createShape(String id, String type, double x, double y, double width, double height) {
        Shape shape = new Shape(id, type, x, y, width, height);
        Command command = new CreateShapeCommand(canvas, shape);
        return commandManager.executeCommand(command);
    }
    
    public boolean deleteShape(String shapeId) {
        Command command = new DeleteShapeCommand(canvas, shapeId);
        return commandManager.executeCommand(command);
    }
    
    public boolean moveShape(String shapeId, double deltaX, double deltaY) {
        Command command = new MoveShapeCommand(canvas, shapeId, deltaX, deltaY);
        return commandManager.executeCommand(command);
    }
    
    public boolean undo() {
        return commandManager.undo();
    }
    
    public boolean redo() {
        return commandManager.redo();
    }
    
    public MacroCommand createMacro(String name) {
        return new MacroCommand(name);
    }
    
    public boolean executeMacro(MacroCommand macro) {
        return commandManager.executeCommand(macro);
    }
    
    public void printCanvas() {
        canvas.printCanvas();
    }
    
    public void printHistory() {
        System.out.println("命令历史:");
        List<String> history = commandManager.getHistory();
        for (int i = 0; i < history.size(); i++) {
            System.out.println("  " + (i + 1) + ". " + history.get(i));
        }
    }
    
    public Canvas getCanvas() {
        return canvas;
    }
}

// 演示类
public class CommandPatternDemo {
    public static void main(String[] args) {
        System.out.println("=== 命令模式演示:图形编辑器 ===");
        
        GraphicsEditor editor = new GraphicsEditor();
        
        System.out.println("\n--- 创建图形 ---");
        editor.createShape("rect1", "矩形", 10, 10, 100, 50);
        editor.createShape("circle1", "圆形", 150, 20, 60, 60);
        editor.createShape("line1", "直线", 50, 100, 200, 2);
        editor.printCanvas();
        
        System.out.println("\n--- 移动图形 ---");
        editor.moveShape("rect1", 20, 30);
        editor.moveShape("circle1", -10, 15);
        editor.printCanvas();
        
        System.out.println("\n--- 删除图形 ---");
        editor.deleteShape("line1");
        editor.printCanvas();
        
        System.out.println("\n--- 撤销操作 ---");
        editor.printHistory();
        
        System.out.println("\n执行撤销:");
        for (int i = 0; i < 3; i++) {
            if (editor.undo()) {
                System.out.println("撤销 " + (i + 1) + ":");
                editor.printCanvas();
            } else {
                System.out.println("无法继续撤销");
                break;
            }
        }
        
        System.out.println("\n--- 重做操作 ---");
        for (int i = 0; i < 2; i++) {
            if (editor.redo()) {
                System.out.println("重做 " + (i + 1) + ":");
                editor.printCanvas();
            } else {
                System.out.println("无法继续重做");
                break;
            }
        }
        
        System.out.println("\n--- 宏命令演示 ---");
        MacroCommand createShapesMacro = editor.createMacro("创建多个图形");
        createShapesMacro.addCommand(new CreateShapeCommand(editor.getCanvas(), 
                new Shape("rect2", "矩形", 200, 200, 80, 40)));
        createShapesMacro.addCommand(new CreateShapeCommand(editor.getCanvas(), 
                new Shape("circle2", "圆形", 300, 220, 50, 50)));
        createShapesMacro.addCommand(new MoveShapeCommand(editor.getCanvas(), "rect2", 10, -10));
        
        System.out.println("执行宏命令: " + createShapesMacro.getDescription());
        editor.executeMacro(createShapesMacro);
        editor.printCanvas();
        
        System.out.println("\n撤销宏命令:");
        editor.undo();
        editor.printCanvas();
        
        System.out.println("\n--- 最终命令历史 ---");
         editor.printHistory();
     }
 }

9.3.5 命令模式的优缺点

优点: 1. 解耦调用者和接收者:调用者不需要知道接收者的具体实现 2. 支持撤销/重做:命令对象可以存储状态信息,支持撤销操作 3. 支持宏命令:可以将多个命令组合成复合命令 4. 支持日志和事务:可以记录命令执行历史,支持事务回滚 5. 支持队列操作:命令可以被存储、排队、延迟执行

缺点: 1. 类数量增加:每个具体操作都需要一个命令类 2. 内存开销:需要存储命令历史和状态信息 3. 复杂性增加:简单操作也需要通过命令模式实现 4. 性能影响:额外的对象创建和方法调用开销

9.3.6 适用场景

  1. 撤销/重做功能:文本编辑器、图形编辑器
  2. 宏命令:批处理操作、脚本执行
  3. 队列操作:任务调度、异步处理
  4. 日志记录:操作审计、事务管理
  5. 参数化对象:需要用不同请求参数化对象

9.4 观察者模式与命令模式对比分析

9.4.1 相同点

  1. 行为型模式:都属于行为型设计模式
  2. 解耦设计:都能够降低对象间的耦合度
  3. 动态性:都支持运行时的动态配置
  4. 扩展性:都符合开闭原则,易于扩展

9.4.2 不同点

对比维度 观察者模式 命令模式
主要目的 对象间一对多依赖关系 请求封装和参数化
通信方式 推送通知(广播) 拉取执行(调用)
时间特性 实时响应 可延迟执行
状态管理 主题维护状态 命令封装状态
撤销支持 不直接支持 天然支持
执行方式 自动触发 手动调用
应用场景 事件系统、数据绑定 撤销重做、宏操作

9.4.3 选择指南

选择观察者模式的情况: - 需要实现事件驱动的架构 - 对象状态变化需要通知多个依赖对象 - 需要实现发布-订阅机制 - 希望实现松耦合的通信机制

选择命令模式的情况: - 需要支持撤销/重做操作 - 需要实现宏命令或批处理 - 需要对请求进行排队、记录或延迟执行 - 需要将调用者与接收者完全解耦

9.4.4 组合使用

观察者模式和命令模式可以结合使用:

class CommandExecutedEvent:
    """命令执行事件"""
    def __init__(self, command, success, timestamp):
        self.command = command
        self.success = success
        self.timestamp = timestamp

class CommandManagerWithObserver(Subject):
    """支持观察者的命令管理器"""
    
    def __init__(self):
        super().__init__()
        self._command_manager = CommandManager()
    
    def execute_command(self, command):
        """执行命令并通知观察者"""
        success = self._command_manager.execute_command(command)
        
        # 通知观察者命令执行结果
        event = CommandExecutedEvent(command, success, datetime.now())
        self.notify(event=event)
        
        return success
    
    def undo(self):
        """撤销命令并通知观察者"""
        success = self._command_manager.undo()
        
        if success:
            event = CommandExecutedEvent("UNDO", success, datetime.now())
            self.notify(event=event)
        
        return success

class CommandLogger(Observer):
    """命令日志记录器"""
    
    def __init__(self):
        self._logs = []
    
    def get_observer_id(self):
        return "CommandLogger"
    
    def update(self, subject, *args, **kwargs):
        event = kwargs.get('event')
        if event:
            log_entry = f"{event.timestamp}: {event.command} - {'成功' if event.success else '失败'}"
            self._logs.append(log_entry)
            print(f"[日志] {log_entry}")

9.5 本章总结

9.5.1 核心概念回顾

观察者模式: - 定义对象间一对多的依赖关系 - 当主题状态改变时,自动通知所有观察者 - 实现了发布-订阅机制 - 支持动态添加和移除观察者

命令模式: - 将请求封装为对象 - 支持撤销/重做操作 - 可以实现宏命令和队列操作 - 完全解耦调用者和接收者

9.5.2 最佳实践

  1. 观察者模式最佳实践:

    • 避免观察者过多导致性能问题
    • 正确管理观察者的生命周期,防止内存泄漏
    • 使用弱引用避免循环依赖
    • 考虑异步通知机制
  2. 命令模式最佳实践:

    • 合理设计命令的粒度
    • 实现命令合并机制减少内存使用
    • 限制命令历史的大小
    • 考虑命令的序列化和持久化

9.5.3 实际应用建议

  1. 技术选型:

    • 根据具体需求选择合适的模式
    • 考虑模式的组合使用
    • 评估性能和内存影响
  2. 设计决策:

    • 明确对象间的关系和职责
    • 考虑系统的扩展性和维护性
    • 平衡灵活性和复杂性

9.5.4 注意事项

  1. 观察者模式注意事项:

    • 避免观察者链过长
    • 处理观察者异常,避免影响其他观察者
    • 考虑线程安全问题
  2. 命令模式注意事项:

    • 避免命令类过多导致的类爆炸
    • 合理管理命令历史的内存使用
    • 考虑命令执行失败的处理策略

9.6 练习题

9.6.1 基础练习

  1. 观察者模式练习:

    • 实现一个简单的新闻发布系统,支持订阅者订阅不同类型的新闻
    • 实现一个温度监控系统,当温度超过阈值时通知多个设备
  2. 命令模式练习:

    • 实现一个简单的计算器,支持撤销/重做功能
    • 实现一个文件操作系统,支持创建、删除、移动文件的撤销操作

9.6.2 进阶练习

  1. 观察者模式进阶:

    • 实现一个支持优先级的观察者系统
    • 实现一个异步观察者模式,支持非阻塞通知
  2. 命令模式进阶:

    • 实现一个支持命令序列化的系统,可以保存和恢复操作历史
    • 实现一个分布式命令执行系统

9.6.3 思考题

  1. 如何在观察者模式中处理观察者执行异常的情况?
  2. 命令模式中如何处理命令执行失败后的回滚操作?
  3. 如何设计一个既支持观察者模式又支持命令模式的系统架构?
  4. 在什么情况下应该考虑使用事件总线而不是直接的观察者模式?

下一章预告: 第十章将学习状态模式和策略模式的高级应用,包括状态机的设计、策略的动态切换等内容。