10.1 本章概述

本章将深入探讨状态模式和策略模式的高级应用,包括状态机的设计、策略的动态切换、模式的组合使用等内容。这两种模式都属于行为型设计模式,在处理对象行为变化方面有着重要作用。

10.1.1 学习目标

  • 深入理解状态模式的核心思想和实现方式
  • 掌握策略模式的高级应用技巧
  • 学会设计复杂的状态机系统
  • 了解两种模式的组合使用方法
  • 掌握模式在实际项目中的应用技巧

10.1.2 应用场景

  • 状态模式:游戏角色状态、订单流程、设备控制、工作流引擎
  • 策略模式:算法选择、支付方式、数据处理、业务规则

10.2 状态模式深入解析

10.2.1 状态模式定义

状态模式(State Pattern)允许对象在内部状态改变时改变它的行为,对象看起来好像修改了它的类。这种模式将状态封装成独立的类,并将动作委托到代表当前状态的对象。

10.2.2 状态模式的动机

在软件开发中,我们经常遇到对象需要根据不同状态表现出不同行为的情况。传统的做法是使用大量的条件语句,但这会导致代码难以维护和扩展。状态模式通过将每个状态封装成独立的类来解决这个问题。

10.2.3 状态模式结构

┌─────────────────┐
│    Context      │
│  - state        │
│  + request()    │
│  + setState()   │
└─────────────────┘
         │
         ▼
┌─────────────────┐
│     State       │
│ + handle()      │
└─────────────────┘
         △
         │
    ┌────┴────┐
    │         │
┌───▼───┐ ┌──▼────┐
│StateA │ │StateB │
│handle()│ │handle()│
└───────┘ └───────┘

10.2.4 Python实现:游戏角色状态系统

from abc import ABC, abstractmethod
from enum import Enum
import time
import random

class ActionType(Enum):
    """动作类型枚举"""
    ATTACK = "attack"
    DEFEND = "defend"
    MOVE = "move"
    REST = "rest"
    USE_SKILL = "use_skill"
    TAKE_DAMAGE = "take_damage"
    HEAL = "heal"

class GameCharacterState(ABC):
    """游戏角色状态抽象基类"""
    
    @abstractmethod
    def get_state_name(self):
        """获取状态名称"""
        pass
    
    @abstractmethod
    def handle_action(self, character, action_type, **kwargs):
        """处理动作"""
        pass
    
    @abstractmethod
    def can_transition_to(self, new_state):
        """检查是否可以转换到新状态"""
        pass
    
    def on_enter(self, character):
        """进入状态时的处理"""
        print(f"[状态] {character.name} 进入 {self.get_state_name()} 状态")
    
    def on_exit(self, character):
        """离开状态时的处理"""
        print(f"[状态] {character.name} 离开 {self.get_state_name()} 状态")

class NormalState(GameCharacterState):
    """正常状态"""
    
    def get_state_name(self):
        return "正常"
    
    def handle_action(self, character, action_type, **kwargs):
        if action_type == ActionType.ATTACK:
            damage = kwargs.get('damage', 10)
            target = kwargs.get('target')
            if target:
                print(f"{character.name} 攻击 {target.name},造成 {damage} 点伤害")
                target.take_damage(damage)
                # 攻击后可能进入疲劳状态
                if random.random() < 0.3:
                    character.change_state(FatiguedState())
        
        elif action_type == ActionType.DEFEND:
            print(f"{character.name} 进入防御姿态")
            character.change_state(DefendingState())
        
        elif action_type == ActionType.MOVE:
            distance = kwargs.get('distance', 1)
            print(f"{character.name} 移动了 {distance} 米")
        
        elif action_type == ActionType.USE_SKILL:
            skill_name = kwargs.get('skill_name', '普通技能')
            mp_cost = kwargs.get('mp_cost', 20)
            if character.mp >= mp_cost:
                character.mp -= mp_cost
                print(f"{character.name} 使用技能:{skill_name}")
                # 使用技能后进入冷却状态
                character.change_state(CooldownState())
            else:
                print(f"{character.name} MP不足,无法使用技能")
        
        elif action_type == ActionType.TAKE_DAMAGE:
            damage = kwargs.get('damage', 0)
            character.hp -= damage
            print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
            
            if character.hp <= 0:
                character.change_state(DeadState())
            elif character.hp < character.max_hp * 0.3:
                character.change_state(CriticalState())
        
        elif action_type == ActionType.REST:
            print(f"{character.name} 开始休息")
            character.change_state(RestingState())
    
    def can_transition_to(self, new_state):
        # 正常状态可以转换到任何其他状态
        return not isinstance(new_state, NormalState)

class DefendingState(GameCharacterState):
    """防御状态"""
    
    def __init__(self):
        self.defense_bonus = 0.5  # 防御加成
        self.duration = 3  # 防御持续时间
        self.start_time = time.time()
    
    def get_state_name(self):
        return "防御"
    
    def handle_action(self, character, action_type, **kwargs):
        # 检查防御状态是否过期
        if time.time() - self.start_time > self.duration:
            print(f"{character.name} 防御状态结束")
            character.change_state(NormalState())
            return
        
        if action_type == ActionType.TAKE_DAMAGE:
            damage = kwargs.get('damage', 0)
            reduced_damage = int(damage * (1 - self.defense_bonus))
            character.hp -= reduced_damage
            print(f"{character.name} 防御中,伤害减少到 {reduced_damage} 点,剩余HP: {character.hp}")
            
            if character.hp <= 0:
                character.change_state(DeadState())
        
        elif action_type == ActionType.ATTACK:
            print(f"{character.name} 在防御状态下无法攻击")
        
        else:
            print(f"{character.name} 在防御状态下无法执行 {action_type.value} 动作")
    
    def can_transition_to(self, new_state):
        return isinstance(new_state, (NormalState, DeadState, CriticalState))

class CriticalState(GameCharacterState):
    """危险状态(生命值过低)"""
    
    def get_state_name(self):
        return "危险"
    
    def handle_action(self, character, action_type, **kwargs):
        if action_type == ActionType.ATTACK:
            # 危险状态下攻击力降低
            damage = kwargs.get('damage', 10) * 0.7
            target = kwargs.get('target')
            if target:
                print(f"{character.name} 虚弱地攻击 {target.name},造成 {int(damage)} 点伤害")
                target.take_damage(int(damage))
        
        elif action_type == ActionType.HEAL:
            heal_amount = kwargs.get('heal_amount', 20)
            character.hp = min(character.max_hp, character.hp + heal_amount)
            print(f"{character.name} 恢复了 {heal_amount} 点生命值,当前HP: {character.hp}")
            
            if character.hp >= character.max_hp * 0.3:
                character.change_state(NormalState())
        
        elif action_type == ActionType.TAKE_DAMAGE:
            damage = kwargs.get('damage', 0)
            character.hp -= damage
            print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
            
            if character.hp <= 0:
                character.change_state(DeadState())
        
        elif action_type == ActionType.REST:
            print(f"{character.name} 在危险状态下开始紧急休息")
            character.change_state(RestingState())
        
        else:
            print(f"{character.name} 在危险状态下行动受限")
    
    def can_transition_to(self, new_state):
        return True

class FatiguedState(GameCharacterState):
    """疲劳状态"""
    
    def __init__(self):
        self.recovery_time = 5  # 恢复时间
        self.start_time = time.time()
    
    def get_state_name(self):
        return "疲劳"
    
    def handle_action(self, character, action_type, **kwargs):
        # 检查是否自动恢复
        if time.time() - self.start_time > self.recovery_time:
            print(f"{character.name} 从疲劳状态中恢复")
            character.change_state(NormalState())
            return
        
        if action_type == ActionType.ATTACK:
            print(f"{character.name} 太疲劳了,攻击失败")
        
        elif action_type == ActionType.REST:
            print(f"{character.name} 开始休息以恢复体力")
            character.change_state(RestingState())
        
        elif action_type == ActionType.TAKE_DAMAGE:
            damage = kwargs.get('damage', 0)
            character.hp -= damage
            print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
            
            if character.hp <= 0:
                character.change_state(DeadState())
            elif character.hp < character.max_hp * 0.3:
                character.change_state(CriticalState())
        
        else:
            print(f"{character.name} 疲劳中,无法执行 {action_type.value} 动作")
    
    def can_transition_to(self, new_state):
        return True

class CooldownState(GameCharacterState):
    """技能冷却状态"""
    
    def __init__(self):
        self.cooldown_time = 3  # 冷却时间
        self.start_time = time.time()
    
    def get_state_name(self):
        return "技能冷却"
    
    def handle_action(self, character, action_type, **kwargs):
        # 检查冷却是否结束
        if time.time() - self.start_time > self.cooldown_time:
            print(f"{character.name} 技能冷却结束")
            character.change_state(NormalState())
            return
        
        if action_type == ActionType.USE_SKILL:
            remaining_time = self.cooldown_time - (time.time() - self.start_time)
            print(f"{character.name} 技能还在冷却中,剩余 {remaining_time:.1f} 秒")
        
        elif action_type == ActionType.ATTACK:
            damage = kwargs.get('damage', 10)
            target = kwargs.get('target')
            if target:
                print(f"{character.name} 普通攻击 {target.name},造成 {damage} 点伤害")
                target.take_damage(damage)
        
        elif action_type == ActionType.TAKE_DAMAGE:
            damage = kwargs.get('damage', 0)
            character.hp -= damage
            print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
            
            if character.hp <= 0:
                character.change_state(DeadState())
            elif character.hp < character.max_hp * 0.3:
                character.change_state(CriticalState())
        
        else:
            # 其他动作正常执行
            character.get_state().handle_action(character, action_type, **kwargs)
    
    def can_transition_to(self, new_state):
        return True

class RestingState(GameCharacterState):
    """休息状态"""
    
    def __init__(self):
        self.rest_duration = 4  # 休息持续时间
        self.start_time = time.time()
        self.hp_recovery_rate = 5  # 每秒恢复的HP
        self.mp_recovery_rate = 10  # 每秒恢复的MP
    
    def get_state_name(self):
        return "休息"
    
    def handle_action(self, character, action_type, **kwargs):
        # 计算休息时间和恢复量
        elapsed_time = time.time() - self.start_time
        
        if elapsed_time >= self.rest_duration:
            # 休息结束
            total_hp_recovery = int(self.hp_recovery_rate * self.rest_duration)
            total_mp_recovery = int(self.mp_recovery_rate * self.rest_duration)
            
            character.hp = min(character.max_hp, character.hp + total_hp_recovery)
            character.mp = min(character.max_mp, character.mp + total_mp_recovery)
            
            print(f"{character.name} 休息结束,恢复了 {total_hp_recovery} HP 和 {total_mp_recovery} MP")
            character.change_state(NormalState())
            return
        
        if action_type == ActionType.TAKE_DAMAGE:
            damage = kwargs.get('damage', 0)
            character.hp -= damage
            print(f"{character.name} 休息被打断!受到 {damage} 点伤害")
            
            if character.hp <= 0:
                character.change_state(DeadState())
            else:
                character.change_state(NormalState())
        
        else:
            print(f"{character.name} 正在休息中,无法执行其他动作")
    
    def can_transition_to(self, new_state):
        return True

class DeadState(GameCharacterState):
    """死亡状态"""
    
    def get_state_name(self):
        return "死亡"
    
    def handle_action(self, character, action_type, **kwargs):
        if action_type == ActionType.HEAL:
            # 复活
            heal_amount = kwargs.get('heal_amount', 0)
            if heal_amount >= 50:  # 需要强力治疗才能复活
                character.hp = heal_amount
                print(f"{character.name} 复活了!当前HP: {character.hp}")
                character.change_state(NormalState())
            else:
                print(f"{character.name} 已死亡,需要更强的治疗才能复活")
        else:
            print(f"{character.name} 已死亡,无法执行任何动作")
    
    def can_transition_to(self, new_state):
        return isinstance(new_state, NormalState)

class GameCharacter:
    """游戏角色类"""
    
    def __init__(self, name, max_hp=100, max_mp=50):
        self.name = name
        self.max_hp = max_hp
        self.max_mp = max_mp
        self.hp = max_hp
        self.mp = max_mp
        self._state = NormalState()
        self._state_history = []
    
    def get_state(self):
        return self._state
    
    def change_state(self, new_state):
        """改变状态"""
        if self._state.can_transition_to(new_state):
            self._state.on_exit(self)
            self._state_history.append((self._state.get_state_name(), time.time()))
            self._state = new_state
            self._state.on_enter(self)
        else:
            print(f"无法从 {self._state.get_state_name()} 状态转换到 {new_state.get_state_name()} 状态")
    
    def perform_action(self, action_type, **kwargs):
        """执行动作"""
        print(f"\n--- {self.name} 尝试执行 {action_type.value} ---")
        self._state.handle_action(self, action_type, **kwargs)
        self._show_status()
    
    def take_damage(self, damage):
        """受到伤害"""
        self.perform_action(ActionType.TAKE_DAMAGE, damage=damage)
    
    def _show_status(self):
        """显示状态信息"""
        print(f"状态: {self._state.get_state_name()} | HP: {self.hp}/{self.max_hp} | MP: {self.mp}/{self.max_mp}")
    
    def get_state_history(self):
        """获取状态历史"""
        return self._state_history

# 使用示例
def demonstrate_state_pattern():
    """演示状态模式"""
    print("=== 状态模式演示:游戏角色状态系统 ===")
    
    # 创建角色
    hero = GameCharacter("英雄", max_hp=100, max_mp=50)
    enemy = GameCharacter("敌人", max_hp=80, max_mp=30)
    
    print(f"\n初始状态:")
    hero._show_status()
    
    # 测试各种动作和状态转换
    print("\n=== 战斗场景 ===")
    
    # 英雄攻击敌人
    hero.perform_action(ActionType.ATTACK, damage=15, target=enemy)
    
    # 英雄使用技能
    hero.perform_action(ActionType.USE_SKILL, skill_name="火球术", mp_cost=20)
    
    # 在冷却状态下尝试再次使用技能
    time.sleep(1)
    hero.perform_action(ActionType.USE_SKILL, skill_name="冰箭术", mp_cost=15)
    
    # 等待冷却结束
    time.sleep(3)
    hero.perform_action(ActionType.ATTACK, damage=12, target=enemy)
    
    # 英雄进入防御状态
    hero.perform_action(ActionType.DEFEND)
    
    # 在防御状态下受到攻击
    time.sleep(1)
    hero.take_damage(20)
    
    # 等待防御状态结束
    time.sleep(3)
    hero.perform_action(ActionType.ATTACK, damage=10, target=enemy)
    
    # 英雄受到大量伤害进入危险状态
    hero.take_damage(60)
    
    # 在危险状态下攻击
    hero.perform_action(ActionType.ATTACK, damage=15, target=enemy)
    
    # 治疗恢复
    hero.perform_action(ActionType.HEAL, heal_amount=30)
    
    # 休息恢复
    hero.perform_action(ActionType.REST)
    
    # 休息被打断
    time.sleep(2)
    hero.take_damage(10)
    
    print("\n=== 状态历史 ===")
    for state_name, timestamp in hero.get_state_history():
        print(f"状态: {state_name} - 时间: {time.ctime(timestamp)}")

if __name__ == "__main__":
    demonstrate_state_pattern()

10.2.5 状态模式的优缺点

优点: 1. 消除条件语句:避免大量的if-else或switch语句 2. 状态转换明确:状态之间的转换逻辑清晰 3. 易于扩展:添加新状态不影响现有代码 4. 职责分离:每个状态类只负责自己的行为 5. 符合开闭原则:对扩展开放,对修改关闭

缺点: 1. 类数量增加:每个状态都需要一个类 2. 状态转换复杂:复杂的状态机可能难以理解 3. 内存开销:需要维护状态对象 4. 设计复杂度:需要仔细设计状态转换规则

10.2.6 适用场景

  1. 对象行为依赖于状态:根据状态执行不同行为
  2. 状态转换复杂:有多个状态和复杂的转换规则
  3. 避免条件语句:替代大量的条件判断
  4. 状态机实现:工作流、游戏状态、设备控制

10.3 策略模式高级应用

10.3.1 策略模式回顾

策略模式定义了一系列算法,把它们一个个封装起来,并且使它们可相互替换。策略模式让算法的变化独立于使用算法的客户。

10.3.2 动态策略选择系统

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
import json
import time
from dataclasses import dataclass
from enum import Enum

class DataFormat(Enum):
    """数据格式枚举"""
    JSON = "json"
    XML = "xml"
    CSV = "csv"
    YAML = "yaml"

class CompressionLevel(Enum):
    """压缩级别枚举"""
    NONE = 0
    LOW = 1
    MEDIUM = 5
    HIGH = 9

@dataclass
class ProcessingContext:
    """处理上下文"""
    data_size: int
    data_format: DataFormat
    performance_priority: bool  # True表示优先性能,False表示优先压缩率
    network_bandwidth: int  # 网络带宽 (Mbps)
    cpu_cores: int
    memory_available: int  # 可用内存 (MB)

class DataProcessingStrategy(ABC):
    """数据处理策略抽象基类"""
    
    @abstractmethod
    def get_strategy_name(self) -> str:
        """获取策略名称"""
        pass
    
    @abstractmethod
    def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
        """处理数据"""
        pass
    
    @abstractmethod
    def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
        """获取性能指标"""
        pass
    
    @abstractmethod
    def is_suitable(self, context: ProcessingContext) -> bool:
        """判断策略是否适合当前上下文"""
        pass

class FastProcessingStrategy(DataProcessingStrategy):
    """快速处理策略"""
    
    def get_strategy_name(self) -> str:
        return "快速处理"
    
    def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
        start_time = time.time()
        
        # 模拟快速处理(最小压缩)
        processed_data = {
            'original_data': data,
            'compression_level': CompressionLevel.NONE.value,
            'processing_time': 0.1,  # 模拟处理时间
            'compressed_size': context.data_size * 0.95  # 几乎无压缩
        }
        
        processing_time = time.time() - start_time
        processed_data['actual_processing_time'] = processing_time
        
        return processed_data
    
    def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
        return {
            'processing_speed': 10.0,  # 处理速度评分
            'compression_ratio': 1.0,  # 压缩比
            'memory_usage': context.data_size * 0.1,  # 内存使用
            'cpu_usage': 20.0  # CPU使用率
        }
    
    def is_suitable(self, context: ProcessingContext) -> bool:
        return (
            context.performance_priority and
            context.network_bandwidth > 100 and  # 高带宽环境
            context.data_size < 1000000  # 小数据量
        )

class BalancedProcessingStrategy(DataProcessingStrategy):
    """平衡处理策略"""
    
    def get_strategy_name(self) -> str:
        return "平衡处理"
    
    def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
        start_time = time.time()
        
        # 模拟平衡处理(中等压缩)
        processed_data = {
            'original_data': data,
            'compression_level': CompressionLevel.MEDIUM.value,
            'processing_time': 0.5,  # 模拟处理时间
            'compressed_size': context.data_size * 0.6  # 中等压缩
        }
        
        processing_time = time.time() - start_time
        processed_data['actual_processing_time'] = processing_time
        
        return processed_data
    
    def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
        return {
            'processing_speed': 6.0,
            'compression_ratio': 0.6,
            'memory_usage': context.data_size * 0.2,
            'cpu_usage': 50.0
        }
    
    def is_suitable(self, context: ProcessingContext) -> bool:
        return (
            context.network_bandwidth >= 50 and
            context.memory_available >= 512 and
            context.data_size <= 10000000
        )

class HighCompressionStrategy(DataProcessingStrategy):
    """高压缩策略"""
    
    def get_strategy_name(self) -> str:
        return "高压缩处理"
    
    def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
        start_time = time.time()
        
        # 模拟高压缩处理
        processed_data = {
            'original_data': data,
            'compression_level': CompressionLevel.HIGH.value,
            'processing_time': 2.0,  # 模拟处理时间
            'compressed_size': context.data_size * 0.3  # 高压缩
        }
        
        processing_time = time.time() - start_time
        processed_data['actual_processing_time'] = processing_time
        
        return processed_data
    
    def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
        return {
            'processing_speed': 2.0,
            'compression_ratio': 0.3,
            'memory_usage': context.data_size * 0.4,
            'cpu_usage': 80.0
        }
    
    def is_suitable(self, context: ProcessingContext) -> bool:
        return (
            not context.performance_priority and
            context.network_bandwidth < 50 and  # 低带宽环境
            context.cpu_cores >= 4  # 多核CPU
        )

class ParallelProcessingStrategy(DataProcessingStrategy):
    """并行处理策略"""
    
    def get_strategy_name(self) -> str:
        return "并行处理"
    
    def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
        start_time = time.time()
        
        # 模拟并行处理
        parallel_factor = min(context.cpu_cores, 8)  # 最多使用8个核心
        
        processed_data = {
            'original_data': data,
            'compression_level': CompressionLevel.MEDIUM.value,
            'processing_time': 1.0 / parallel_factor,  # 并行加速
            'compressed_size': context.data_size * 0.5,
            'parallel_workers': parallel_factor
        }
        
        processing_time = time.time() - start_time
        processed_data['actual_processing_time'] = processing_time
        
        return processed_data
    
    def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
        parallel_factor = min(context.cpu_cores, 8)
        return {
            'processing_speed': 8.0 * parallel_factor / 4,  # 基于并行度的速度
            'compression_ratio': 0.5,
            'memory_usage': context.data_size * 0.3 * parallel_factor,
            'cpu_usage': 70.0
        }
    
    def is_suitable(self, context: ProcessingContext) -> bool:
        return (
            context.cpu_cores >= 4 and
            context.memory_available >= 1024 and
            context.data_size >= 1000000  # 大数据量
        )

class AdaptiveProcessingStrategy(DataProcessingStrategy):
    """自适应处理策略"""
    
    def __init__(self):
        self._performance_history = []
        self._adaptation_threshold = 5  # 适应阈值
    
    def get_strategy_name(self) -> str:
        return "自适应处理"
    
    def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
        start_time = time.time()
        
        # 根据历史性能动态调整参数
        compression_level = self._calculate_optimal_compression(context)
        
        processed_data = {
            'original_data': data,
            'compression_level': compression_level,
            'processing_time': self._estimate_processing_time(context, compression_level),
            'compressed_size': context.data_size * (1 - compression_level / 10),
            'adaptation_factor': len(self._performance_history)
        }
        
        processing_time = time.time() - start_time
        processed_data['actual_processing_time'] = processing_time
        
        # 记录性能历史
        self._performance_history.append({
            'context': context,
            'compression_level': compression_level,
            'processing_time': processing_time
        })
        
        # 限制历史记录数量
        if len(self._performance_history) > 100:
            self._performance_history = self._performance_history[-50:]
        
        return processed_data
    
    def _calculate_optimal_compression(self, context: ProcessingContext) -> int:
        """计算最优压缩级别"""
        if len(self._performance_history) < self._adaptation_threshold:
            # 历史数据不足,使用默认策略
            if context.performance_priority:
                return CompressionLevel.LOW.value
            else:
                return CompressionLevel.MEDIUM.value
        
        # 基于历史性能数据计算最优压缩级别
        similar_contexts = [
            record for record in self._performance_history[-20:]
            if self._is_similar_context(record['context'], context)
        ]
        
        if similar_contexts:
            # 找到性能最好的压缩级别
            best_record = min(similar_contexts, key=lambda x: x['processing_time'])
            return best_record['compression_level']
        
        # 没有相似上下文,使用启发式规则
        if context.network_bandwidth < 10:
            return CompressionLevel.HIGH.value
        elif context.performance_priority:
            return CompressionLevel.LOW.value
        else:
            return CompressionLevel.MEDIUM.value
    
    def _is_similar_context(self, ctx1: ProcessingContext, ctx2: ProcessingContext) -> bool:
        """判断两个上下文是否相似"""
        size_ratio = min(ctx1.data_size, ctx2.data_size) / max(ctx1.data_size, ctx2.data_size)
        bandwidth_ratio = min(ctx1.network_bandwidth, ctx2.network_bandwidth) / max(ctx1.network_bandwidth, ctx2.network_bandwidth)
        
        return (
            size_ratio > 0.8 and
            bandwidth_ratio > 0.8 and
            ctx1.performance_priority == ctx2.performance_priority
        )
    
    def _estimate_processing_time(self, context: ProcessingContext, compression_level: int) -> float:
        """估算处理时间"""
        base_time = context.data_size / 1000000  # 基础时间
        compression_factor = 1 + compression_level / 10  # 压缩因子
        cpu_factor = 4 / context.cpu_cores  # CPU因子
        
        return base_time * compression_factor * cpu_factor
    
    def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
        compression_level = self._calculate_optimal_compression(context)
        
        return {
            'processing_speed': 10 - compression_level,
            'compression_ratio': 1 - compression_level / 10,
            'memory_usage': context.data_size * (0.1 + compression_level / 50),
            'cpu_usage': 30 + compression_level * 5
        }
    
    def is_suitable(self, context: ProcessingContext) -> bool:
        # 自适应策略适用于所有场景
        return True

class StrategySelector:
    """策略选择器"""
    
    def __init__(self):
        self._strategies: Dict[str, DataProcessingStrategy] = {
            'fast': FastProcessingStrategy(),
            'balanced': BalancedProcessingStrategy(),
            'high_compression': HighCompressionStrategy(),
            'parallel': ParallelProcessingStrategy(),
            'adaptive': AdaptiveProcessingStrategy()
        }
        self._selection_history = []
    
    def select_strategy(self, context: ProcessingContext, 
                      strategy_name: Optional[str] = None) -> DataProcessingStrategy:
        """选择策略"""
        if strategy_name and strategy_name in self._strategies:
            selected_strategy = self._strategies[strategy_name]
        else:
            selected_strategy = self._auto_select_strategy(context)
        
        # 记录选择历史
        self._selection_history.append({
            'context': context,
            'strategy': selected_strategy.get_strategy_name(),
            'timestamp': time.time()
        })
        
        return selected_strategy
    
    def _auto_select_strategy(self, context: ProcessingContext) -> DataProcessingStrategy:
        """自动选择策略"""
        suitable_strategies = [
            (name, strategy) for name, strategy in self._strategies.items()
            if strategy.is_suitable(context)
        ]
        
        if not suitable_strategies:
            # 没有合适的策略,返回自适应策略
            return self._strategies['adaptive']
        
        # 根据性能指标选择最佳策略
        best_strategy = None
        best_score = -1
        
        for name, strategy in suitable_strategies:
            metrics = strategy.get_performance_metrics(context)
            score = self._calculate_strategy_score(metrics, context)
            
            if score > best_score:
                best_score = score
                best_strategy = strategy
        
        return best_strategy or self._strategies['adaptive']
    
    def _calculate_strategy_score(self, metrics: Dict[str, float], 
                                context: ProcessingContext) -> float:
        """计算策略评分"""
        if context.performance_priority:
            # 优先考虑处理速度
            speed_weight = 0.6
            compression_weight = 0.2
            memory_weight = 0.1
            cpu_weight = 0.1
        else:
            # 优先考虑压缩率
            speed_weight = 0.2
            compression_weight = 0.6
            memory_weight = 0.1
            cpu_weight = 0.1
        
        # 归一化指标
        speed_score = min(metrics['processing_speed'] / 10, 1.0)
        compression_score = 1 - metrics['compression_ratio']  # 压缩率越高分数越高
        memory_score = max(0, 1 - metrics['memory_usage'] / context.memory_available)
        cpu_score = max(0, 1 - metrics['cpu_usage'] / 100)
        
        total_score = (
            speed_score * speed_weight +
            compression_score * compression_weight +
            memory_score * memory_weight +
            cpu_score * cpu_weight
        )
        
        return total_score
    
    def get_strategy_recommendations(self, context: ProcessingContext) -> List[Dict[str, Any]]:
        """获取策略推荐"""
        recommendations = []
        
        for name, strategy in self._strategies.items():
            metrics = strategy.get_performance_metrics(context)
            score = self._calculate_strategy_score(metrics, context)
            
            recommendations.append({
                'strategy_name': strategy.get_strategy_name(),
                'strategy_key': name,
                'score': score,
                'metrics': metrics,
                'suitable': strategy.is_suitable(context)
            })
        
        # 按评分排序
        recommendations.sort(key=lambda x: x['score'], reverse=True)
        return recommendations

class DataProcessor:
    """数据处理器"""
    
    def __init__(self):
        self._strategy_selector = StrategySelector()
        self._current_strategy: Optional[DataProcessingStrategy] = None
    
    def process(self, data: Any, context: ProcessingContext, 
               strategy_name: Optional[str] = None) -> Dict[str, Any]:
        """处理数据"""
        # 选择策略
        self._current_strategy = self._strategy_selector.select_strategy(context, strategy_name)
        
        print(f"选择策略: {self._current_strategy.get_strategy_name()}")
        
        # 处理数据
        result = self._current_strategy.process_data(data, context)
        
        # 添加策略信息
        result['strategy_used'] = self._current_strategy.get_strategy_name()
        result['context'] = context
        
        return result
    
    def get_strategy_recommendations(self, context: ProcessingContext) -> List[Dict[str, Any]]:
        """获取策略推荐"""
        return self._strategy_selector.get_strategy_recommendations(context)
    
    def analyze_performance(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """分析性能"""
        if not results:
            return {}
        
        strategy_performance = {}
        
        for result in results:
            strategy_name = result['strategy_used']
            if strategy_name not in strategy_performance:
                strategy_performance[strategy_name] = {
                    'count': 0,
                    'total_time': 0,
                    'total_compression': 0,
                    'avg_time': 0,
                    'avg_compression': 0
                }
            
            perf = strategy_performance[strategy_name]
            perf['count'] += 1
            perf['total_time'] += result['actual_processing_time']
            perf['total_compression'] += result['compressed_size'] / result['context'].data_size
        
        # 计算平均值
        for strategy_name, perf in strategy_performance.items():
            perf['avg_time'] = perf['total_time'] / perf['count']
            perf['avg_compression'] = perf['total_compression'] / perf['count']
        
        return strategy_performance

# 使用示例
def demonstrate_advanced_strategy_pattern():
    """演示高级策略模式"""
    print("=== 高级策略模式演示:动态数据处理系统 ===")
    
    processor = DataProcessor()
    
    # 测试不同的处理场景
    scenarios = [
        {
            'name': '小文件高性能场景',
            'context': ProcessingContext(
                data_size=50000,
                data_format=DataFormat.JSON,
                performance_priority=True,
                network_bandwidth=200,
                cpu_cores=4,
                memory_available=2048
            ),
            'data': {'small_data': 'test' * 100}
        },
        {
            'name': '大文件低带宽场景',
            'context': ProcessingContext(
                data_size=5000000,
                data_format=DataFormat.XML,
                performance_priority=False,
                network_bandwidth=10,
                cpu_cores=8,
                memory_available=4096
            ),
            'data': {'large_data': 'content' * 10000}
        },
        {
            'name': '多核并行场景',
            'context': ProcessingContext(
                data_size=2000000,
                data_format=DataFormat.CSV,
                performance_priority=True,
                network_bandwidth=100,
                cpu_cores=16,
                memory_available=8192
            ),
            'data': {'parallel_data': list(range(10000))}
        }
    ]
    
    results = []
    
    for scenario in scenarios:
        print(f"\n=== {scenario['name']} ===")
        
        # 获取策略推荐
        recommendations = processor.get_strategy_recommendations(scenario['context'])
        print("\n策略推荐排序:")
        for i, rec in enumerate(recommendations[:3], 1):
            print(f"{i}. {rec['strategy_name']} (评分: {rec['score']:.3f})")
            print(f"   适合度: {'是' if rec['suitable'] else '否'}")
            print(f"   性能指标: 速度={rec['metrics']['processing_speed']:.1f}, "
                  f"压缩率={rec['metrics']['compression_ratio']:.1f}")
        
        # 自动选择策略处理
        result = processor.process(scenario['data'], scenario['context'])
        results.append(result)
        
        print(f"\n处理结果:")
        print(f"使用策略: {result['strategy_used']}")
        print(f"处理时间: {result['actual_processing_time']:.4f}秒")
        print(f"压缩后大小: {result['compressed_size']:.0f} bytes")
        print(f"压缩率: {(1 - result['compressed_size']/scenario['context'].data_size)*100:.1f}%")
    
    # 性能分析
    print("\n=== 性能分析 ===")
    performance_analysis = processor.analyze_performance(results)
    
    for strategy_name, perf in performance_analysis.items():
        print(f"\n策略: {strategy_name}")
        print(f"使用次数: {perf['count']}")
        print(f"平均处理时间: {perf['avg_time']:.4f}秒")
        print(f"平均压缩率: {(1-perf['avg_compression'])*100:.1f}%")

if __name__ == "__main__":
    demonstrate_advanced_strategy_pattern()

10.3.3 策略模式的优缺点

优点: 1. 算法可替换:运行时可以动态切换算法 2. 避免条件语句:消除大量的if-else或switch语句 3. 易于扩展:添加新策略不影响现有代码 4. 算法独立:每个策略都是独立的,便于测试和维护 5. 符合开闭原则:对扩展开放,对修改关闭

缺点: 1. 策略类增多:每个算法都需要一个策略类 2. 客户端复杂:客户端需要了解不同策略的区别 3. 通信开销:策略和上下文之间可能有额外的通信开销 4. 对象数量增加:运行时需要维护多个策略对象

10.3.4 适用场景

  1. 算法选择:需要在运行时选择不同的算法
  2. 条件语句替代:替代复杂的条件判断逻辑
  3. 算法变化频繁:算法经常需要修改或扩展
  4. 算法保密:不同的算法实现需要保密
  5. 性能优化:根据环境动态选择最优算法

10.4 状态模式与策略模式对比分析

10.4.1 相同点

  1. 行为型模式:都属于行为型设计模式
  2. 多态应用:都使用多态来实现不同的行为
  3. 消除条件语句:都能避免大量的条件判断
  4. 易于扩展:都符合开闭原则,便于添加新的实现
  5. 封装变化:都将变化的部分封装起来

10.4.2 不同点

对比维度 状态模式 策略模式
主要目的 管理对象状态变化 封装算法族
变化原因 内部状态改变 外部选择不同算法
转换控制 状态自己控制转换 客户端控制选择
对象关系 状态间有转换关系 策略间相互独立
生命周期 状态可能短暂存在 策略通常长期存在
上下文依赖 强依赖上下文状态 弱依赖上下文
使用方式 自动状态转换 手动策略选择

10.4.3 选择指南

选择状态模式的情况: - 对象行为依赖于内部状态 - 状态之间有明确的转换规则 - 需要避免大量的状态相关条件语句 - 状态转换逻辑复杂

选择策略模式的情况: - 需要在运行时选择算法 - 有多种方式实现同一功能 - 算法变化频繁 - 需要隐藏算法的具体实现

10.4.4 组合使用

状态模式和策略模式可以结合使用,创建更强大的系统:

class StatefulStrategy(ABC):
    """有状态的策略接口"""
    
    @abstractmethod
    def execute(self, context, data):
        """执行策略"""
        pass
    
    @abstractmethod
    def get_next_state(self, context, result):
        """根据执行结果获取下一个状态"""
        pass

class ProcessingState(ABC):
    """处理状态接口"""
    
    @abstractmethod
    def process(self, context, data):
        """处理数据"""
        pass
    
    @abstractmethod
    def select_strategy(self, context):
        """选择策略"""
        pass

class InitialProcessingState(ProcessingState):
    """初始处理状态"""
    
    def process(self, context, data):
        print("初始处理阶段")
        strategy = self.select_strategy(context)
        result = strategy.execute(context, data)
        
        # 根据结果转换状态
        next_state = strategy.get_next_state(context, result)
        if next_state:
            context.change_state(next_state)
        
        return result
    
    def select_strategy(self, context):
        # 根据上下文选择初始处理策略
        if context.data_size > 1000000:
            return ParallelInitialStrategy()
        else:
            return SimpleInitialStrategy()

class ValidationProcessingState(ProcessingState):
    """验证处理状态"""
    
    def process(self, context, data):
        print("验证处理阶段")
        strategy = self.select_strategy(context)
        result = strategy.execute(context, data)
        
        next_state = strategy.get_next_state(context, result)
        if next_state:
            context.change_state(next_state)
        
        return result
    
    def select_strategy(self, context):
        # 根据数据类型选择验证策略
        if context.data_format == DataFormat.JSON:
            return JsonValidationStrategy()
        elif context.data_format == DataFormat.XML:
            return XmlValidationStrategy()
        else:
            return GenericValidationStrategy()

class StatefulProcessingContext:
    """有状态的处理上下文"""
    
    def __init__(self, data_size, data_format):
        self.data_size = data_size
        self.data_format = data_format
        self._state = InitialProcessingState()
        self._processing_history = []
    
    def change_state(self, new_state):
        """改变状态"""
        self._processing_history.append(self._state.__class__.__name__)
        self._state = new_state
        print(f"状态转换到: {new_state.__class__.__name__}")
    
    def process(self, data):
        """处理数据"""
        return self._state.process(self, data)
    
    def get_processing_history(self):
        """获取处理历史"""
        return self._processing_history

# 具体策略实现
class SimpleInitialStrategy(StatefulStrategy):
    def execute(self, context, data):
        print("执行简单初始处理")
        return {'processed': True, 'method': 'simple'}
    
    def get_next_state(self, context, result):
        if result['processed']:
            return ValidationProcessingState()
        return None

class ParallelInitialStrategy(StatefulStrategy):
    def execute(self, context, data):
        print("执行并行初始处理")
        return {'processed': True, 'method': 'parallel'}
    
    def get_next_state(self, context, result):
        if result['processed']:
            return ValidationProcessingState()
        return None

class JsonValidationStrategy(StatefulStrategy):
    def execute(self, context, data):
        print("执行JSON验证")
        return {'validated': True, 'format': 'json'}
    
    def get_next_state(self, context, result):
        # 验证完成,处理结束
        return None

class XmlValidationStrategy(StatefulStrategy):
    def execute(self, context, data):
        print("执行XML验证")
        return {'validated': True, 'format': 'xml'}
    
    def get_next_state(self, context, result):
        return None

class GenericValidationStrategy(StatefulStrategy):
    def execute(self, context, data):
        print("执行通用验证")
        return {'validated': True, 'format': 'generic'}
    
    def get_next_state(self, context, result):
        return None

def demonstrate_combined_patterns():
    """演示状态模式与策略模式的组合使用"""
    print("=== 状态模式与策略模式组合演示 ===")
    
    # 小数据JSON处理
    print("\n--- 小数据JSON处理 ---")
    context1 = StatefulProcessingContext(50000, DataFormat.JSON)
    result1 = context1.process({'data': 'small json data'})
    print(f"处理结果: {result1}")
    print(f"处理历史: {context1.get_processing_history()}")
    
    # 大数据XML处理
    print("\n--- 大数据XML处理 ---")
    context2 = StatefulProcessingContext(2000000, DataFormat.XML)
    result2 = context2.process({'data': 'large xml data'})
    print(f"处理结果: {result2}")
    print(f"处理历史: {context2.get_processing_history()}")

if __name__ == "__main__":
    demonstrate_combined_patterns()

10.5 高级应用技巧

10.5.1 状态机设计最佳实践

  1. 状态转换表:使用表格明确定义状态转换规则
  2. 状态验证:在状态转换前进行有效性检查
  3. 状态持久化:支持状态的保存和恢复
  4. 状态监控:记录状态转换历史,便于调试
  5. 异常处理:定义异常状态和恢复机制

10.5.2 策略选择优化

  1. 性能基准测试:建立策略性能基准
  2. 自适应选择:根据历史性能动态调整
  3. 策略缓存:缓存策略实例减少创建开销
  4. 策略组合:支持多个策略的组合使用
  5. 策略降级:在策略失败时提供备选方案

10.5.3 内存和性能优化

class OptimizedStateManager:
    """优化的状态管理器"""
    
    def __init__(self):
        self._state_pool = {}  # 状态对象池
        self._transition_cache = {}  # 转换缓存
    
    def get_state(self, state_class):
        """从对象池获取状态"""
        state_name = state_class.__name__
        if state_name not in self._state_pool:
            self._state_pool[state_name] = state_class()
        return self._state_pool[state_name]
    
    def can_transition(self, from_state, to_state):
        """检查状态转换(带缓存)"""
        cache_key = (from_state.__class__.__name__, to_state.__class__.__name__)
        
        if cache_key not in self._transition_cache:
            self._transition_cache[cache_key] = from_state.can_transition_to(to_state)
        
        return self._transition_cache[cache_key]

class OptimizedStrategyRegistry:
    """优化的策略注册表"""
    
    def __init__(self):
        self._strategies = {}
        self._strategy_cache = {}
        self._performance_metrics = {}
    
    def register_strategy(self, name, strategy_class):
        """注册策略"""
        self._strategies[name] = strategy_class
    
    def get_strategy(self, name, context_hash=None):
        """获取策略(带缓存)"""
        cache_key = (name, context_hash)
        
        if cache_key not in self._strategy_cache:
            if name in self._strategies:
                self._strategy_cache[cache_key] = self._strategies[name]()
            else:
                raise ValueError(f"未知策略: {name}")
        
        return self._strategy_cache[cache_key]
    
    def update_performance_metrics(self, strategy_name, metrics):
        """更新性能指标"""
        if strategy_name not in self._performance_metrics:
            self._performance_metrics[strategy_name] = []
        
        self._performance_metrics[strategy_name].append(metrics)
        
        # 保持最近100条记录
        if len(self._performance_metrics[strategy_name]) > 100:
            self._performance_metrics[strategy_name] = self._performance_metrics[strategy_name][-50:]

10.6 实际应用案例

10.6.1 游戏AI状态机

class GameAIState(ABC):
    """游戏AI状态基类"""
    
    @abstractmethod
    def update(self, ai_agent, delta_time):
        """更新状态"""
        pass
    
    @abstractmethod
    def on_enter(self, ai_agent):
        """进入状态"""
        pass
    
    @abstractmethod
    def on_exit(self, ai_agent):
        """离开状态"""
        pass

class PatrolState(GameAIState):
    """巡逻状态"""
    
    def __init__(self):
        self.patrol_points = [(0, 0), (10, 0), (10, 10), (0, 10)]
        self.current_target = 0
    
    def update(self, ai_agent, delta_time):
        # 移动到巡逻点
        target = self.patrol_points[self.current_target]
        
        if ai_agent.move_towards(target, delta_time):
            self.current_target = (self.current_target + 1) % len(self.patrol_points)
        
        # 检查是否发现敌人
        if ai_agent.detect_enemy():
            ai_agent.change_state(ChaseState())
    
    def on_enter(self, ai_agent):
        print(f"{ai_agent.name} 开始巡逻")
    
    def on_exit(self, ai_agent):
        print(f"{ai_agent.name} 停止巡逻")

class ChaseState(GameAIState):
    """追击状态"""
    
    def __init__(self):
        self.chase_duration = 0
        self.max_chase_time = 10.0
    
    def update(self, ai_agent, delta_time):
        self.chase_duration += delta_time
        
        enemy = ai_agent.get_nearest_enemy()
        if enemy:
            if ai_agent.get_distance_to(enemy) < ai_agent.attack_range:
                ai_agent.change_state(AttackState())
            else:
                ai_agent.move_towards(enemy.position, delta_time)
        else:
            # 失去目标,返回巡逻
            ai_agent.change_state(PatrolState())
        
        # 追击超时
        if self.chase_duration > self.max_chase_time:
            ai_agent.change_state(PatrolState())
    
    def on_enter(self, ai_agent):
        print(f"{ai_agent.name} 开始追击")
        self.chase_duration = 0
    
    def on_exit(self, ai_agent):
        print(f"{ai_agent.name} 停止追击")

class AttackState(GameAIState):
    """攻击状态"""
    
    def __init__(self):
        self.attack_cooldown = 0
        self.attack_interval = 1.0
    
    def update(self, ai_agent, delta_time):
        self.attack_cooldown -= delta_time
        
        enemy = ai_agent.get_nearest_enemy()
        if enemy:
            if ai_agent.get_distance_to(enemy) <= ai_agent.attack_range:
                if self.attack_cooldown <= 0:
                    ai_agent.attack(enemy)
                    self.attack_cooldown = self.attack_interval
            else:
                # 敌人超出攻击范围,继续追击
                ai_agent.change_state(ChaseState())
        else:
            # 没有敌人,返回巡逻
            ai_agent.change_state(PatrolState())
    
    def on_enter(self, ai_agent):
        print(f"{ai_agent.name} 开始攻击")
    
    def on_exit(self, ai_agent):
        print(f"{ai_agent.name} 停止攻击")

10.6.2 支付系统策略模式

class PaymentStrategy(ABC):
    """支付策略接口"""
    
    @abstractmethod
    def pay(self, amount, currency, payment_info):
        """执行支付"""
        pass
    
    @abstractmethod
    def validate_payment_info(self, payment_info):
        """验证支付信息"""
        pass
    
    @abstractmethod
    def get_transaction_fee(self, amount, currency):
        """获取交易费用"""
        pass

class CreditCardPayment(PaymentStrategy):
    """信用卡支付"""
    
    def pay(self, amount, currency, payment_info):
        if not self.validate_payment_info(payment_info):
            raise ValueError("信用卡信息无效")
        
        fee = self.get_transaction_fee(amount, currency)
        total_amount = amount + fee
        
        # 模拟信用卡支付
        print(f"信用卡支付: {amount} {currency}, 手续费: {fee} {currency}")
        return {
            'success': True,
            'transaction_id': f"CC_{int(time.time())}",
            'amount': amount,
            'fee': fee,
            'total': total_amount
        }
    
    def validate_payment_info(self, payment_info):
        required_fields = ['card_number', 'expiry_date', 'cvv', 'holder_name']
        return all(field in payment_info for field in required_fields)
    
    def get_transaction_fee(self, amount, currency):
        return amount * 0.025  # 2.5% 手续费

class PayPalPayment(PaymentStrategy):
    """PayPal支付"""
    
    def pay(self, amount, currency, payment_info):
        if not self.validate_payment_info(payment_info):
            raise ValueError("PayPal信息无效")
        
        fee = self.get_transaction_fee(amount, currency)
        total_amount = amount + fee
        
        print(f"PayPal支付: {amount} {currency}, 手续费: {fee} {currency}")
        return {
            'success': True,
            'transaction_id': f"PP_{int(time.time())}",
            'amount': amount,
            'fee': fee,
            'total': total_amount
        }
    
    def validate_payment_info(self, payment_info):
        return 'email' in payment_info and 'password' in payment_info
    
    def get_transaction_fee(self, amount, currency):
        return amount * 0.029 + 0.30  # 2.9% + $0.30

class CryptocurrencyPayment(PaymentStrategy):
    """加密货币支付"""
    
    def pay(self, amount, currency, payment_info):
        if not self.validate_payment_info(payment_info):
            raise ValueError("加密货币钱包信息无效")
        
        fee = self.get_transaction_fee(amount, currency)
        total_amount = amount + fee
        
        print(f"加密货币支付: {amount} {currency}, 网络费: {fee} {currency}")
        return {
            'success': True,
            'transaction_id': f"CRYPTO_{int(time.time())}",
            'amount': amount,
            'fee': fee,
            'total': total_amount
        }
    
    def validate_payment_info(self, payment_info):
        return 'wallet_address' in payment_info and 'private_key' in payment_info
    
    def get_transaction_fee(self, amount, currency):
        return 0.001  # 固定网络费用

class PaymentProcessor:
    """支付处理器"""
    
    def __init__(self):
        self._strategies = {
            'credit_card': CreditCardPayment(),
            'paypal': PayPalPayment(),
            'cryptocurrency': CryptocurrencyPayment()
        }
    
    def process_payment(self, payment_method, amount, currency, payment_info):
        """处理支付"""
        if payment_method not in self._strategies:
            raise ValueError(f"不支持的支付方式: {payment_method}")
        
        strategy = self._strategies[payment_method]
        return strategy.pay(amount, currency, payment_info)
    
    def get_payment_options(self, amount, currency):
        """获取支付选项"""
        options = []
        
        for method, strategy in self._strategies.items():
            fee = strategy.get_transaction_fee(amount, currency)
            options.append({
                'method': method,
                'fee': fee,
                'total': amount + fee
            })
        
        return sorted(options, key=lambda x: x['total'])

10.7 本章总结

10.7.1 核心概念回顾

状态模式: - 将对象的状态封装成独立的类 - 状态转换由状态对象自己控制 - 适用于复杂的状态机实现 - 消除了大量的条件判断语句

策略模式: - 将算法封装成独立的策略类 - 策略选择由客户端控制 - 适用于算法的动态选择 - 支持算法的运行时替换

10.7.2 最佳实践

  1. 状态模式最佳实践:

    • 明确定义状态转换规则
    • 使用状态对象池优化性能
    • 实现状态的持久化和恢复
    • 添加状态转换的日志记录
  2. 策略模式最佳实践:

    • 建立策略性能基准测试
    • 实现策略的自适应选择
    • 使用策略注册表管理策略
    • 支持策略的组合使用

10.7.3 实际应用建议

  1. 选择合适的模式:

    • 根据问题的本质选择模式
    • 考虑系统的复杂度和维护成本
    • 评估性能和内存影响
  2. 模式组合使用:

    • 状态模式可以与策略模式结合
    • 在状态转换中使用策略选择
    • 创建更灵活的系统架构

10.7.4 注意事项

  1. 状态模式注意事项:

    • 避免状态类过多导致的复杂性
    • 正确处理状态转换的异常情况
    • 考虑状态对象的生命周期管理
  2. 策略模式注意事项:

    • 避免策略选择逻辑过于复杂
    • 合理管理策略对象的创建和销毁
    • 考虑策略执行的线程安全性

10.8 练习题

10.8.1 基础练习

  1. 状态模式练习:

    • 实现一个简单的订单状态机(待支付、已支付、已发货、已完成、已取消)
    • 实现一个电梯控制系统的状态机
  2. 策略模式练习:

    • 实现一个图像压缩系统,支持不同的压缩算法
    • 实现一个路径规划系统,支持不同的路径算法

10.8.2 进阶练习

  1. 状态模式进阶:

    • 实现一个支持状态持久化的工作流引擎
    • 实现一个多线程安全的状态机
  2. 策略模式进阶:

    • 实现一个自适应的负载均衡策略系统
    • 实现一个支持策略组合的数据处理框架

10.8.3 思考题

  1. 如何在状态模式中处理状态转换的回滚操作?
  2. 策略模式中如何实现策略的热插拔功能?
  3. 如何设计一个既支持状态转换又支持策略选择的系统?
  4. 在什么情况下应该考虑使用状态模式和策略模式的组合?

下一章预告: 第十一章将学习责任链模式和中介者模式,探讨对象间的通信和协作机制。 “`