10.1 本章概述
本章将深入探讨状态模式和策略模式的高级应用,包括状态机的设计、策略的动态切换、模式的组合使用等内容。这两种模式都属于行为型设计模式,在处理对象行为变化方面有着重要作用。
10.1.1 学习目标
- 深入理解状态模式的核心思想和实现方式
- 掌握策略模式的高级应用技巧
- 学会设计复杂的状态机系统
- 了解两种模式的组合使用方法
- 掌握模式在实际项目中的应用技巧
10.1.2 应用场景
- 状态模式:游戏角色状态、订单流程、设备控制、工作流引擎
- 策略模式:算法选择、支付方式、数据处理、业务规则
10.2 状态模式深入解析
10.2.1 状态模式定义
状态模式(State Pattern)允许对象在内部状态改变时改变它的行为,对象看起来好像修改了它的类。这种模式将状态封装成独立的类,并将动作委托到代表当前状态的对象。
10.2.2 状态模式的动机
在软件开发中,我们经常遇到对象需要根据不同状态表现出不同行为的情况。传统的做法是使用大量的条件语句,但这会导致代码难以维护和扩展。状态模式通过将每个状态封装成独立的类来解决这个问题。
10.2.3 状态模式结构
┌─────────────────┐
│ Context │
│ - state │
│ + request() │
│ + setState() │
└─────────────────┘
│
▼
┌─────────────────┐
│ State │
│ + handle() │
└─────────────────┘
△
│
┌────┴────┐
│ │
┌───▼───┐ ┌──▼────┐
│StateA │ │StateB │
│handle()│ │handle()│
└───────┘ └───────┘
10.2.4 Python实现:游戏角色状态系统
from abc import ABC, abstractmethod
from enum import Enum
import time
import random
class ActionType(Enum):
"""动作类型枚举"""
ATTACK = "attack"
DEFEND = "defend"
MOVE = "move"
REST = "rest"
USE_SKILL = "use_skill"
TAKE_DAMAGE = "take_damage"
HEAL = "heal"
class GameCharacterState(ABC):
"""游戏角色状态抽象基类"""
@abstractmethod
def get_state_name(self):
"""获取状态名称"""
pass
@abstractmethod
def handle_action(self, character, action_type, **kwargs):
"""处理动作"""
pass
@abstractmethod
def can_transition_to(self, new_state):
"""检查是否可以转换到新状态"""
pass
def on_enter(self, character):
"""进入状态时的处理"""
print(f"[状态] {character.name} 进入 {self.get_state_name()} 状态")
def on_exit(self, character):
"""离开状态时的处理"""
print(f"[状态] {character.name} 离开 {self.get_state_name()} 状态")
class NormalState(GameCharacterState):
"""正常状态"""
def get_state_name(self):
return "正常"
def handle_action(self, character, action_type, **kwargs):
if action_type == ActionType.ATTACK:
damage = kwargs.get('damage', 10)
target = kwargs.get('target')
if target:
print(f"{character.name} 攻击 {target.name},造成 {damage} 点伤害")
target.take_damage(damage)
# 攻击后可能进入疲劳状态
if random.random() < 0.3:
character.change_state(FatiguedState())
elif action_type == ActionType.DEFEND:
print(f"{character.name} 进入防御姿态")
character.change_state(DefendingState())
elif action_type == ActionType.MOVE:
distance = kwargs.get('distance', 1)
print(f"{character.name} 移动了 {distance} 米")
elif action_type == ActionType.USE_SKILL:
skill_name = kwargs.get('skill_name', '普通技能')
mp_cost = kwargs.get('mp_cost', 20)
if character.mp >= mp_cost:
character.mp -= mp_cost
print(f"{character.name} 使用技能:{skill_name}")
# 使用技能后进入冷却状态
character.change_state(CooldownState())
else:
print(f"{character.name} MP不足,无法使用技能")
elif action_type == ActionType.TAKE_DAMAGE:
damage = kwargs.get('damage', 0)
character.hp -= damage
print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
if character.hp <= 0:
character.change_state(DeadState())
elif character.hp < character.max_hp * 0.3:
character.change_state(CriticalState())
elif action_type == ActionType.REST:
print(f"{character.name} 开始休息")
character.change_state(RestingState())
def can_transition_to(self, new_state):
# 正常状态可以转换到任何其他状态
return not isinstance(new_state, NormalState)
class DefendingState(GameCharacterState):
"""防御状态"""
def __init__(self):
self.defense_bonus = 0.5 # 防御加成
self.duration = 3 # 防御持续时间
self.start_time = time.time()
def get_state_name(self):
return "防御"
def handle_action(self, character, action_type, **kwargs):
# 检查防御状态是否过期
if time.time() - self.start_time > self.duration:
print(f"{character.name} 防御状态结束")
character.change_state(NormalState())
return
if action_type == ActionType.TAKE_DAMAGE:
damage = kwargs.get('damage', 0)
reduced_damage = int(damage * (1 - self.defense_bonus))
character.hp -= reduced_damage
print(f"{character.name} 防御中,伤害减少到 {reduced_damage} 点,剩余HP: {character.hp}")
if character.hp <= 0:
character.change_state(DeadState())
elif action_type == ActionType.ATTACK:
print(f"{character.name} 在防御状态下无法攻击")
else:
print(f"{character.name} 在防御状态下无法执行 {action_type.value} 动作")
def can_transition_to(self, new_state):
return isinstance(new_state, (NormalState, DeadState, CriticalState))
class CriticalState(GameCharacterState):
"""危险状态(生命值过低)"""
def get_state_name(self):
return "危险"
def handle_action(self, character, action_type, **kwargs):
if action_type == ActionType.ATTACK:
# 危险状态下攻击力降低
damage = kwargs.get('damage', 10) * 0.7
target = kwargs.get('target')
if target:
print(f"{character.name} 虚弱地攻击 {target.name},造成 {int(damage)} 点伤害")
target.take_damage(int(damage))
elif action_type == ActionType.HEAL:
heal_amount = kwargs.get('heal_amount', 20)
character.hp = min(character.max_hp, character.hp + heal_amount)
print(f"{character.name} 恢复了 {heal_amount} 点生命值,当前HP: {character.hp}")
if character.hp >= character.max_hp * 0.3:
character.change_state(NormalState())
elif action_type == ActionType.TAKE_DAMAGE:
damage = kwargs.get('damage', 0)
character.hp -= damage
print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
if character.hp <= 0:
character.change_state(DeadState())
elif action_type == ActionType.REST:
print(f"{character.name} 在危险状态下开始紧急休息")
character.change_state(RestingState())
else:
print(f"{character.name} 在危险状态下行动受限")
def can_transition_to(self, new_state):
return True
class FatiguedState(GameCharacterState):
"""疲劳状态"""
def __init__(self):
self.recovery_time = 5 # 恢复时间
self.start_time = time.time()
def get_state_name(self):
return "疲劳"
def handle_action(self, character, action_type, **kwargs):
# 检查是否自动恢复
if time.time() - self.start_time > self.recovery_time:
print(f"{character.name} 从疲劳状态中恢复")
character.change_state(NormalState())
return
if action_type == ActionType.ATTACK:
print(f"{character.name} 太疲劳了,攻击失败")
elif action_type == ActionType.REST:
print(f"{character.name} 开始休息以恢复体力")
character.change_state(RestingState())
elif action_type == ActionType.TAKE_DAMAGE:
damage = kwargs.get('damage', 0)
character.hp -= damage
print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
if character.hp <= 0:
character.change_state(DeadState())
elif character.hp < character.max_hp * 0.3:
character.change_state(CriticalState())
else:
print(f"{character.name} 疲劳中,无法执行 {action_type.value} 动作")
def can_transition_to(self, new_state):
return True
class CooldownState(GameCharacterState):
"""技能冷却状态"""
def __init__(self):
self.cooldown_time = 3 # 冷却时间
self.start_time = time.time()
def get_state_name(self):
return "技能冷却"
def handle_action(self, character, action_type, **kwargs):
# 检查冷却是否结束
if time.time() - self.start_time > self.cooldown_time:
print(f"{character.name} 技能冷却结束")
character.change_state(NormalState())
return
if action_type == ActionType.USE_SKILL:
remaining_time = self.cooldown_time - (time.time() - self.start_time)
print(f"{character.name} 技能还在冷却中,剩余 {remaining_time:.1f} 秒")
elif action_type == ActionType.ATTACK:
damage = kwargs.get('damage', 10)
target = kwargs.get('target')
if target:
print(f"{character.name} 普通攻击 {target.name},造成 {damage} 点伤害")
target.take_damage(damage)
elif action_type == ActionType.TAKE_DAMAGE:
damage = kwargs.get('damage', 0)
character.hp -= damage
print(f"{character.name} 受到 {damage} 点伤害,剩余HP: {character.hp}")
if character.hp <= 0:
character.change_state(DeadState())
elif character.hp < character.max_hp * 0.3:
character.change_state(CriticalState())
else:
# 其他动作正常执行
character.get_state().handle_action(character, action_type, **kwargs)
def can_transition_to(self, new_state):
return True
class RestingState(GameCharacterState):
"""休息状态"""
def __init__(self):
self.rest_duration = 4 # 休息持续时间
self.start_time = time.time()
self.hp_recovery_rate = 5 # 每秒恢复的HP
self.mp_recovery_rate = 10 # 每秒恢复的MP
def get_state_name(self):
return "休息"
def handle_action(self, character, action_type, **kwargs):
# 计算休息时间和恢复量
elapsed_time = time.time() - self.start_time
if elapsed_time >= self.rest_duration:
# 休息结束
total_hp_recovery = int(self.hp_recovery_rate * self.rest_duration)
total_mp_recovery = int(self.mp_recovery_rate * self.rest_duration)
character.hp = min(character.max_hp, character.hp + total_hp_recovery)
character.mp = min(character.max_mp, character.mp + total_mp_recovery)
print(f"{character.name} 休息结束,恢复了 {total_hp_recovery} HP 和 {total_mp_recovery} MP")
character.change_state(NormalState())
return
if action_type == ActionType.TAKE_DAMAGE:
damage = kwargs.get('damage', 0)
character.hp -= damage
print(f"{character.name} 休息被打断!受到 {damage} 点伤害")
if character.hp <= 0:
character.change_state(DeadState())
else:
character.change_state(NormalState())
else:
print(f"{character.name} 正在休息中,无法执行其他动作")
def can_transition_to(self, new_state):
return True
class DeadState(GameCharacterState):
"""死亡状态"""
def get_state_name(self):
return "死亡"
def handle_action(self, character, action_type, **kwargs):
if action_type == ActionType.HEAL:
# 复活
heal_amount = kwargs.get('heal_amount', 0)
if heal_amount >= 50: # 需要强力治疗才能复活
character.hp = heal_amount
print(f"{character.name} 复活了!当前HP: {character.hp}")
character.change_state(NormalState())
else:
print(f"{character.name} 已死亡,需要更强的治疗才能复活")
else:
print(f"{character.name} 已死亡,无法执行任何动作")
def can_transition_to(self, new_state):
return isinstance(new_state, NormalState)
class GameCharacter:
"""游戏角色类"""
def __init__(self, name, max_hp=100, max_mp=50):
self.name = name
self.max_hp = max_hp
self.max_mp = max_mp
self.hp = max_hp
self.mp = max_mp
self._state = NormalState()
self._state_history = []
def get_state(self):
return self._state
def change_state(self, new_state):
"""改变状态"""
if self._state.can_transition_to(new_state):
self._state.on_exit(self)
self._state_history.append((self._state.get_state_name(), time.time()))
self._state = new_state
self._state.on_enter(self)
else:
print(f"无法从 {self._state.get_state_name()} 状态转换到 {new_state.get_state_name()} 状态")
def perform_action(self, action_type, **kwargs):
"""执行动作"""
print(f"\n--- {self.name} 尝试执行 {action_type.value} ---")
self._state.handle_action(self, action_type, **kwargs)
self._show_status()
def take_damage(self, damage):
"""受到伤害"""
self.perform_action(ActionType.TAKE_DAMAGE, damage=damage)
def _show_status(self):
"""显示状态信息"""
print(f"状态: {self._state.get_state_name()} | HP: {self.hp}/{self.max_hp} | MP: {self.mp}/{self.max_mp}")
def get_state_history(self):
"""获取状态历史"""
return self._state_history
# 使用示例
def demonstrate_state_pattern():
"""演示状态模式"""
print("=== 状态模式演示:游戏角色状态系统 ===")
# 创建角色
hero = GameCharacter("英雄", max_hp=100, max_mp=50)
enemy = GameCharacter("敌人", max_hp=80, max_mp=30)
print(f"\n初始状态:")
hero._show_status()
# 测试各种动作和状态转换
print("\n=== 战斗场景 ===")
# 英雄攻击敌人
hero.perform_action(ActionType.ATTACK, damage=15, target=enemy)
# 英雄使用技能
hero.perform_action(ActionType.USE_SKILL, skill_name="火球术", mp_cost=20)
# 在冷却状态下尝试再次使用技能
time.sleep(1)
hero.perform_action(ActionType.USE_SKILL, skill_name="冰箭术", mp_cost=15)
# 等待冷却结束
time.sleep(3)
hero.perform_action(ActionType.ATTACK, damage=12, target=enemy)
# 英雄进入防御状态
hero.perform_action(ActionType.DEFEND)
# 在防御状态下受到攻击
time.sleep(1)
hero.take_damage(20)
# 等待防御状态结束
time.sleep(3)
hero.perform_action(ActionType.ATTACK, damage=10, target=enemy)
# 英雄受到大量伤害进入危险状态
hero.take_damage(60)
# 在危险状态下攻击
hero.perform_action(ActionType.ATTACK, damage=15, target=enemy)
# 治疗恢复
hero.perform_action(ActionType.HEAL, heal_amount=30)
# 休息恢复
hero.perform_action(ActionType.REST)
# 休息被打断
time.sleep(2)
hero.take_damage(10)
print("\n=== 状态历史 ===")
for state_name, timestamp in hero.get_state_history():
print(f"状态: {state_name} - 时间: {time.ctime(timestamp)}")
if __name__ == "__main__":
demonstrate_state_pattern()
10.2.5 状态模式的优缺点
优点: 1. 消除条件语句:避免大量的if-else或switch语句 2. 状态转换明确:状态之间的转换逻辑清晰 3. 易于扩展:添加新状态不影响现有代码 4. 职责分离:每个状态类只负责自己的行为 5. 符合开闭原则:对扩展开放,对修改关闭
缺点: 1. 类数量增加:每个状态都需要一个类 2. 状态转换复杂:复杂的状态机可能难以理解 3. 内存开销:需要维护状态对象 4. 设计复杂度:需要仔细设计状态转换规则
10.2.6 适用场景
- 对象行为依赖于状态:根据状态执行不同行为
- 状态转换复杂:有多个状态和复杂的转换规则
- 避免条件语句:替代大量的条件判断
- 状态机实现:工作流、游戏状态、设备控制
10.3 策略模式高级应用
10.3.1 策略模式回顾
策略模式定义了一系列算法,把它们一个个封装起来,并且使它们可相互替换。策略模式让算法的变化独立于使用算法的客户。
10.3.2 动态策略选择系统
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional
import json
import time
from dataclasses import dataclass
from enum import Enum
class DataFormat(Enum):
"""数据格式枚举"""
JSON = "json"
XML = "xml"
CSV = "csv"
YAML = "yaml"
class CompressionLevel(Enum):
"""压缩级别枚举"""
NONE = 0
LOW = 1
MEDIUM = 5
HIGH = 9
@dataclass
class ProcessingContext:
"""处理上下文"""
data_size: int
data_format: DataFormat
performance_priority: bool # True表示优先性能,False表示优先压缩率
network_bandwidth: int # 网络带宽 (Mbps)
cpu_cores: int
memory_available: int # 可用内存 (MB)
class DataProcessingStrategy(ABC):
"""数据处理策略抽象基类"""
@abstractmethod
def get_strategy_name(self) -> str:
"""获取策略名称"""
pass
@abstractmethod
def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
"""处理数据"""
pass
@abstractmethod
def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
"""获取性能指标"""
pass
@abstractmethod
def is_suitable(self, context: ProcessingContext) -> bool:
"""判断策略是否适合当前上下文"""
pass
class FastProcessingStrategy(DataProcessingStrategy):
"""快速处理策略"""
def get_strategy_name(self) -> str:
return "快速处理"
def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
start_time = time.time()
# 模拟快速处理(最小压缩)
processed_data = {
'original_data': data,
'compression_level': CompressionLevel.NONE.value,
'processing_time': 0.1, # 模拟处理时间
'compressed_size': context.data_size * 0.95 # 几乎无压缩
}
processing_time = time.time() - start_time
processed_data['actual_processing_time'] = processing_time
return processed_data
def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
return {
'processing_speed': 10.0, # 处理速度评分
'compression_ratio': 1.0, # 压缩比
'memory_usage': context.data_size * 0.1, # 内存使用
'cpu_usage': 20.0 # CPU使用率
}
def is_suitable(self, context: ProcessingContext) -> bool:
return (
context.performance_priority and
context.network_bandwidth > 100 and # 高带宽环境
context.data_size < 1000000 # 小数据量
)
class BalancedProcessingStrategy(DataProcessingStrategy):
"""平衡处理策略"""
def get_strategy_name(self) -> str:
return "平衡处理"
def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
start_time = time.time()
# 模拟平衡处理(中等压缩)
processed_data = {
'original_data': data,
'compression_level': CompressionLevel.MEDIUM.value,
'processing_time': 0.5, # 模拟处理时间
'compressed_size': context.data_size * 0.6 # 中等压缩
}
processing_time = time.time() - start_time
processed_data['actual_processing_time'] = processing_time
return processed_data
def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
return {
'processing_speed': 6.0,
'compression_ratio': 0.6,
'memory_usage': context.data_size * 0.2,
'cpu_usage': 50.0
}
def is_suitable(self, context: ProcessingContext) -> bool:
return (
context.network_bandwidth >= 50 and
context.memory_available >= 512 and
context.data_size <= 10000000
)
class HighCompressionStrategy(DataProcessingStrategy):
"""高压缩策略"""
def get_strategy_name(self) -> str:
return "高压缩处理"
def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
start_time = time.time()
# 模拟高压缩处理
processed_data = {
'original_data': data,
'compression_level': CompressionLevel.HIGH.value,
'processing_time': 2.0, # 模拟处理时间
'compressed_size': context.data_size * 0.3 # 高压缩
}
processing_time = time.time() - start_time
processed_data['actual_processing_time'] = processing_time
return processed_data
def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
return {
'processing_speed': 2.0,
'compression_ratio': 0.3,
'memory_usage': context.data_size * 0.4,
'cpu_usage': 80.0
}
def is_suitable(self, context: ProcessingContext) -> bool:
return (
not context.performance_priority and
context.network_bandwidth < 50 and # 低带宽环境
context.cpu_cores >= 4 # 多核CPU
)
class ParallelProcessingStrategy(DataProcessingStrategy):
"""并行处理策略"""
def get_strategy_name(self) -> str:
return "并行处理"
def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
start_time = time.time()
# 模拟并行处理
parallel_factor = min(context.cpu_cores, 8) # 最多使用8个核心
processed_data = {
'original_data': data,
'compression_level': CompressionLevel.MEDIUM.value,
'processing_time': 1.0 / parallel_factor, # 并行加速
'compressed_size': context.data_size * 0.5,
'parallel_workers': parallel_factor
}
processing_time = time.time() - start_time
processed_data['actual_processing_time'] = processing_time
return processed_data
def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
parallel_factor = min(context.cpu_cores, 8)
return {
'processing_speed': 8.0 * parallel_factor / 4, # 基于并行度的速度
'compression_ratio': 0.5,
'memory_usage': context.data_size * 0.3 * parallel_factor,
'cpu_usage': 70.0
}
def is_suitable(self, context: ProcessingContext) -> bool:
return (
context.cpu_cores >= 4 and
context.memory_available >= 1024 and
context.data_size >= 1000000 # 大数据量
)
class AdaptiveProcessingStrategy(DataProcessingStrategy):
"""自适应处理策略"""
def __init__(self):
self._performance_history = []
self._adaptation_threshold = 5 # 适应阈值
def get_strategy_name(self) -> str:
return "自适应处理"
def process_data(self, data: Any, context: ProcessingContext) -> Dict[str, Any]:
start_time = time.time()
# 根据历史性能动态调整参数
compression_level = self._calculate_optimal_compression(context)
processed_data = {
'original_data': data,
'compression_level': compression_level,
'processing_time': self._estimate_processing_time(context, compression_level),
'compressed_size': context.data_size * (1 - compression_level / 10),
'adaptation_factor': len(self._performance_history)
}
processing_time = time.time() - start_time
processed_data['actual_processing_time'] = processing_time
# 记录性能历史
self._performance_history.append({
'context': context,
'compression_level': compression_level,
'processing_time': processing_time
})
# 限制历史记录数量
if len(self._performance_history) > 100:
self._performance_history = self._performance_history[-50:]
return processed_data
def _calculate_optimal_compression(self, context: ProcessingContext) -> int:
"""计算最优压缩级别"""
if len(self._performance_history) < self._adaptation_threshold:
# 历史数据不足,使用默认策略
if context.performance_priority:
return CompressionLevel.LOW.value
else:
return CompressionLevel.MEDIUM.value
# 基于历史性能数据计算最优压缩级别
similar_contexts = [
record for record in self._performance_history[-20:]
if self._is_similar_context(record['context'], context)
]
if similar_contexts:
# 找到性能最好的压缩级别
best_record = min(similar_contexts, key=lambda x: x['processing_time'])
return best_record['compression_level']
# 没有相似上下文,使用启发式规则
if context.network_bandwidth < 10:
return CompressionLevel.HIGH.value
elif context.performance_priority:
return CompressionLevel.LOW.value
else:
return CompressionLevel.MEDIUM.value
def _is_similar_context(self, ctx1: ProcessingContext, ctx2: ProcessingContext) -> bool:
"""判断两个上下文是否相似"""
size_ratio = min(ctx1.data_size, ctx2.data_size) / max(ctx1.data_size, ctx2.data_size)
bandwidth_ratio = min(ctx1.network_bandwidth, ctx2.network_bandwidth) / max(ctx1.network_bandwidth, ctx2.network_bandwidth)
return (
size_ratio > 0.8 and
bandwidth_ratio > 0.8 and
ctx1.performance_priority == ctx2.performance_priority
)
def _estimate_processing_time(self, context: ProcessingContext, compression_level: int) -> float:
"""估算处理时间"""
base_time = context.data_size / 1000000 # 基础时间
compression_factor = 1 + compression_level / 10 # 压缩因子
cpu_factor = 4 / context.cpu_cores # CPU因子
return base_time * compression_factor * cpu_factor
def get_performance_metrics(self, context: ProcessingContext) -> Dict[str, float]:
compression_level = self._calculate_optimal_compression(context)
return {
'processing_speed': 10 - compression_level,
'compression_ratio': 1 - compression_level / 10,
'memory_usage': context.data_size * (0.1 + compression_level / 50),
'cpu_usage': 30 + compression_level * 5
}
def is_suitable(self, context: ProcessingContext) -> bool:
# 自适应策略适用于所有场景
return True
class StrategySelector:
"""策略选择器"""
def __init__(self):
self._strategies: Dict[str, DataProcessingStrategy] = {
'fast': FastProcessingStrategy(),
'balanced': BalancedProcessingStrategy(),
'high_compression': HighCompressionStrategy(),
'parallel': ParallelProcessingStrategy(),
'adaptive': AdaptiveProcessingStrategy()
}
self._selection_history = []
def select_strategy(self, context: ProcessingContext,
strategy_name: Optional[str] = None) -> DataProcessingStrategy:
"""选择策略"""
if strategy_name and strategy_name in self._strategies:
selected_strategy = self._strategies[strategy_name]
else:
selected_strategy = self._auto_select_strategy(context)
# 记录选择历史
self._selection_history.append({
'context': context,
'strategy': selected_strategy.get_strategy_name(),
'timestamp': time.time()
})
return selected_strategy
def _auto_select_strategy(self, context: ProcessingContext) -> DataProcessingStrategy:
"""自动选择策略"""
suitable_strategies = [
(name, strategy) for name, strategy in self._strategies.items()
if strategy.is_suitable(context)
]
if not suitable_strategies:
# 没有合适的策略,返回自适应策略
return self._strategies['adaptive']
# 根据性能指标选择最佳策略
best_strategy = None
best_score = -1
for name, strategy in suitable_strategies:
metrics = strategy.get_performance_metrics(context)
score = self._calculate_strategy_score(metrics, context)
if score > best_score:
best_score = score
best_strategy = strategy
return best_strategy or self._strategies['adaptive']
def _calculate_strategy_score(self, metrics: Dict[str, float],
context: ProcessingContext) -> float:
"""计算策略评分"""
if context.performance_priority:
# 优先考虑处理速度
speed_weight = 0.6
compression_weight = 0.2
memory_weight = 0.1
cpu_weight = 0.1
else:
# 优先考虑压缩率
speed_weight = 0.2
compression_weight = 0.6
memory_weight = 0.1
cpu_weight = 0.1
# 归一化指标
speed_score = min(metrics['processing_speed'] / 10, 1.0)
compression_score = 1 - metrics['compression_ratio'] # 压缩率越高分数越高
memory_score = max(0, 1 - metrics['memory_usage'] / context.memory_available)
cpu_score = max(0, 1 - metrics['cpu_usage'] / 100)
total_score = (
speed_score * speed_weight +
compression_score * compression_weight +
memory_score * memory_weight +
cpu_score * cpu_weight
)
return total_score
def get_strategy_recommendations(self, context: ProcessingContext) -> List[Dict[str, Any]]:
"""获取策略推荐"""
recommendations = []
for name, strategy in self._strategies.items():
metrics = strategy.get_performance_metrics(context)
score = self._calculate_strategy_score(metrics, context)
recommendations.append({
'strategy_name': strategy.get_strategy_name(),
'strategy_key': name,
'score': score,
'metrics': metrics,
'suitable': strategy.is_suitable(context)
})
# 按评分排序
recommendations.sort(key=lambda x: x['score'], reverse=True)
return recommendations
class DataProcessor:
"""数据处理器"""
def __init__(self):
self._strategy_selector = StrategySelector()
self._current_strategy: Optional[DataProcessingStrategy] = None
def process(self, data: Any, context: ProcessingContext,
strategy_name: Optional[str] = None) -> Dict[str, Any]:
"""处理数据"""
# 选择策略
self._current_strategy = self._strategy_selector.select_strategy(context, strategy_name)
print(f"选择策略: {self._current_strategy.get_strategy_name()}")
# 处理数据
result = self._current_strategy.process_data(data, context)
# 添加策略信息
result['strategy_used'] = self._current_strategy.get_strategy_name()
result['context'] = context
return result
def get_strategy_recommendations(self, context: ProcessingContext) -> List[Dict[str, Any]]:
"""获取策略推荐"""
return self._strategy_selector.get_strategy_recommendations(context)
def analyze_performance(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""分析性能"""
if not results:
return {}
strategy_performance = {}
for result in results:
strategy_name = result['strategy_used']
if strategy_name not in strategy_performance:
strategy_performance[strategy_name] = {
'count': 0,
'total_time': 0,
'total_compression': 0,
'avg_time': 0,
'avg_compression': 0
}
perf = strategy_performance[strategy_name]
perf['count'] += 1
perf['total_time'] += result['actual_processing_time']
perf['total_compression'] += result['compressed_size'] / result['context'].data_size
# 计算平均值
for strategy_name, perf in strategy_performance.items():
perf['avg_time'] = perf['total_time'] / perf['count']
perf['avg_compression'] = perf['total_compression'] / perf['count']
return strategy_performance
# 使用示例
def demonstrate_advanced_strategy_pattern():
"""演示高级策略模式"""
print("=== 高级策略模式演示:动态数据处理系统 ===")
processor = DataProcessor()
# 测试不同的处理场景
scenarios = [
{
'name': '小文件高性能场景',
'context': ProcessingContext(
data_size=50000,
data_format=DataFormat.JSON,
performance_priority=True,
network_bandwidth=200,
cpu_cores=4,
memory_available=2048
),
'data': {'small_data': 'test' * 100}
},
{
'name': '大文件低带宽场景',
'context': ProcessingContext(
data_size=5000000,
data_format=DataFormat.XML,
performance_priority=False,
network_bandwidth=10,
cpu_cores=8,
memory_available=4096
),
'data': {'large_data': 'content' * 10000}
},
{
'name': '多核并行场景',
'context': ProcessingContext(
data_size=2000000,
data_format=DataFormat.CSV,
performance_priority=True,
network_bandwidth=100,
cpu_cores=16,
memory_available=8192
),
'data': {'parallel_data': list(range(10000))}
}
]
results = []
for scenario in scenarios:
print(f"\n=== {scenario['name']} ===")
# 获取策略推荐
recommendations = processor.get_strategy_recommendations(scenario['context'])
print("\n策略推荐排序:")
for i, rec in enumerate(recommendations[:3], 1):
print(f"{i}. {rec['strategy_name']} (评分: {rec['score']:.3f})")
print(f" 适合度: {'是' if rec['suitable'] else '否'}")
print(f" 性能指标: 速度={rec['metrics']['processing_speed']:.1f}, "
f"压缩率={rec['metrics']['compression_ratio']:.1f}")
# 自动选择策略处理
result = processor.process(scenario['data'], scenario['context'])
results.append(result)
print(f"\n处理结果:")
print(f"使用策略: {result['strategy_used']}")
print(f"处理时间: {result['actual_processing_time']:.4f}秒")
print(f"压缩后大小: {result['compressed_size']:.0f} bytes")
print(f"压缩率: {(1 - result['compressed_size']/scenario['context'].data_size)*100:.1f}%")
# 性能分析
print("\n=== 性能分析 ===")
performance_analysis = processor.analyze_performance(results)
for strategy_name, perf in performance_analysis.items():
print(f"\n策略: {strategy_name}")
print(f"使用次数: {perf['count']}")
print(f"平均处理时间: {perf['avg_time']:.4f}秒")
print(f"平均压缩率: {(1-perf['avg_compression'])*100:.1f}%")
if __name__ == "__main__":
demonstrate_advanced_strategy_pattern()
10.3.3 策略模式的优缺点
优点: 1. 算法可替换:运行时可以动态切换算法 2. 避免条件语句:消除大量的if-else或switch语句 3. 易于扩展:添加新策略不影响现有代码 4. 算法独立:每个策略都是独立的,便于测试和维护 5. 符合开闭原则:对扩展开放,对修改关闭
缺点: 1. 策略类增多:每个算法都需要一个策略类 2. 客户端复杂:客户端需要了解不同策略的区别 3. 通信开销:策略和上下文之间可能有额外的通信开销 4. 对象数量增加:运行时需要维护多个策略对象
10.3.4 适用场景
- 算法选择:需要在运行时选择不同的算法
- 条件语句替代:替代复杂的条件判断逻辑
- 算法变化频繁:算法经常需要修改或扩展
- 算法保密:不同的算法实现需要保密
- 性能优化:根据环境动态选择最优算法
10.4 状态模式与策略模式对比分析
10.4.1 相同点
- 行为型模式:都属于行为型设计模式
- 多态应用:都使用多态来实现不同的行为
- 消除条件语句:都能避免大量的条件判断
- 易于扩展:都符合开闭原则,便于添加新的实现
- 封装变化:都将变化的部分封装起来
10.4.2 不同点
对比维度 | 状态模式 | 策略模式 |
---|---|---|
主要目的 | 管理对象状态变化 | 封装算法族 |
变化原因 | 内部状态改变 | 外部选择不同算法 |
转换控制 | 状态自己控制转换 | 客户端控制选择 |
对象关系 | 状态间有转换关系 | 策略间相互独立 |
生命周期 | 状态可能短暂存在 | 策略通常长期存在 |
上下文依赖 | 强依赖上下文状态 | 弱依赖上下文 |
使用方式 | 自动状态转换 | 手动策略选择 |
10.4.3 选择指南
选择状态模式的情况: - 对象行为依赖于内部状态 - 状态之间有明确的转换规则 - 需要避免大量的状态相关条件语句 - 状态转换逻辑复杂
选择策略模式的情况: - 需要在运行时选择算法 - 有多种方式实现同一功能 - 算法变化频繁 - 需要隐藏算法的具体实现
10.4.4 组合使用
状态模式和策略模式可以结合使用,创建更强大的系统:
class StatefulStrategy(ABC):
"""有状态的策略接口"""
@abstractmethod
def execute(self, context, data):
"""执行策略"""
pass
@abstractmethod
def get_next_state(self, context, result):
"""根据执行结果获取下一个状态"""
pass
class ProcessingState(ABC):
"""处理状态接口"""
@abstractmethod
def process(self, context, data):
"""处理数据"""
pass
@abstractmethod
def select_strategy(self, context):
"""选择策略"""
pass
class InitialProcessingState(ProcessingState):
"""初始处理状态"""
def process(self, context, data):
print("初始处理阶段")
strategy = self.select_strategy(context)
result = strategy.execute(context, data)
# 根据结果转换状态
next_state = strategy.get_next_state(context, result)
if next_state:
context.change_state(next_state)
return result
def select_strategy(self, context):
# 根据上下文选择初始处理策略
if context.data_size > 1000000:
return ParallelInitialStrategy()
else:
return SimpleInitialStrategy()
class ValidationProcessingState(ProcessingState):
"""验证处理状态"""
def process(self, context, data):
print("验证处理阶段")
strategy = self.select_strategy(context)
result = strategy.execute(context, data)
next_state = strategy.get_next_state(context, result)
if next_state:
context.change_state(next_state)
return result
def select_strategy(self, context):
# 根据数据类型选择验证策略
if context.data_format == DataFormat.JSON:
return JsonValidationStrategy()
elif context.data_format == DataFormat.XML:
return XmlValidationStrategy()
else:
return GenericValidationStrategy()
class StatefulProcessingContext:
"""有状态的处理上下文"""
def __init__(self, data_size, data_format):
self.data_size = data_size
self.data_format = data_format
self._state = InitialProcessingState()
self._processing_history = []
def change_state(self, new_state):
"""改变状态"""
self._processing_history.append(self._state.__class__.__name__)
self._state = new_state
print(f"状态转换到: {new_state.__class__.__name__}")
def process(self, data):
"""处理数据"""
return self._state.process(self, data)
def get_processing_history(self):
"""获取处理历史"""
return self._processing_history
# 具体策略实现
class SimpleInitialStrategy(StatefulStrategy):
def execute(self, context, data):
print("执行简单初始处理")
return {'processed': True, 'method': 'simple'}
def get_next_state(self, context, result):
if result['processed']:
return ValidationProcessingState()
return None
class ParallelInitialStrategy(StatefulStrategy):
def execute(self, context, data):
print("执行并行初始处理")
return {'processed': True, 'method': 'parallel'}
def get_next_state(self, context, result):
if result['processed']:
return ValidationProcessingState()
return None
class JsonValidationStrategy(StatefulStrategy):
def execute(self, context, data):
print("执行JSON验证")
return {'validated': True, 'format': 'json'}
def get_next_state(self, context, result):
# 验证完成,处理结束
return None
class XmlValidationStrategy(StatefulStrategy):
def execute(self, context, data):
print("执行XML验证")
return {'validated': True, 'format': 'xml'}
def get_next_state(self, context, result):
return None
class GenericValidationStrategy(StatefulStrategy):
def execute(self, context, data):
print("执行通用验证")
return {'validated': True, 'format': 'generic'}
def get_next_state(self, context, result):
return None
def demonstrate_combined_patterns():
"""演示状态模式与策略模式的组合使用"""
print("=== 状态模式与策略模式组合演示 ===")
# 小数据JSON处理
print("\n--- 小数据JSON处理 ---")
context1 = StatefulProcessingContext(50000, DataFormat.JSON)
result1 = context1.process({'data': 'small json data'})
print(f"处理结果: {result1}")
print(f"处理历史: {context1.get_processing_history()}")
# 大数据XML处理
print("\n--- 大数据XML处理 ---")
context2 = StatefulProcessingContext(2000000, DataFormat.XML)
result2 = context2.process({'data': 'large xml data'})
print(f"处理结果: {result2}")
print(f"处理历史: {context2.get_processing_history()}")
if __name__ == "__main__":
demonstrate_combined_patterns()
10.5 高级应用技巧
10.5.1 状态机设计最佳实践
- 状态转换表:使用表格明确定义状态转换规则
- 状态验证:在状态转换前进行有效性检查
- 状态持久化:支持状态的保存和恢复
- 状态监控:记录状态转换历史,便于调试
- 异常处理:定义异常状态和恢复机制
10.5.2 策略选择优化
- 性能基准测试:建立策略性能基准
- 自适应选择:根据历史性能动态调整
- 策略缓存:缓存策略实例减少创建开销
- 策略组合:支持多个策略的组合使用
- 策略降级:在策略失败时提供备选方案
10.5.3 内存和性能优化
class OptimizedStateManager:
"""优化的状态管理器"""
def __init__(self):
self._state_pool = {} # 状态对象池
self._transition_cache = {} # 转换缓存
def get_state(self, state_class):
"""从对象池获取状态"""
state_name = state_class.__name__
if state_name not in self._state_pool:
self._state_pool[state_name] = state_class()
return self._state_pool[state_name]
def can_transition(self, from_state, to_state):
"""检查状态转换(带缓存)"""
cache_key = (from_state.__class__.__name__, to_state.__class__.__name__)
if cache_key not in self._transition_cache:
self._transition_cache[cache_key] = from_state.can_transition_to(to_state)
return self._transition_cache[cache_key]
class OptimizedStrategyRegistry:
"""优化的策略注册表"""
def __init__(self):
self._strategies = {}
self._strategy_cache = {}
self._performance_metrics = {}
def register_strategy(self, name, strategy_class):
"""注册策略"""
self._strategies[name] = strategy_class
def get_strategy(self, name, context_hash=None):
"""获取策略(带缓存)"""
cache_key = (name, context_hash)
if cache_key not in self._strategy_cache:
if name in self._strategies:
self._strategy_cache[cache_key] = self._strategies[name]()
else:
raise ValueError(f"未知策略: {name}")
return self._strategy_cache[cache_key]
def update_performance_metrics(self, strategy_name, metrics):
"""更新性能指标"""
if strategy_name not in self._performance_metrics:
self._performance_metrics[strategy_name] = []
self._performance_metrics[strategy_name].append(metrics)
# 保持最近100条记录
if len(self._performance_metrics[strategy_name]) > 100:
self._performance_metrics[strategy_name] = self._performance_metrics[strategy_name][-50:]
10.6 实际应用案例
10.6.1 游戏AI状态机
class GameAIState(ABC):
"""游戏AI状态基类"""
@abstractmethod
def update(self, ai_agent, delta_time):
"""更新状态"""
pass
@abstractmethod
def on_enter(self, ai_agent):
"""进入状态"""
pass
@abstractmethod
def on_exit(self, ai_agent):
"""离开状态"""
pass
class PatrolState(GameAIState):
"""巡逻状态"""
def __init__(self):
self.patrol_points = [(0, 0), (10, 0), (10, 10), (0, 10)]
self.current_target = 0
def update(self, ai_agent, delta_time):
# 移动到巡逻点
target = self.patrol_points[self.current_target]
if ai_agent.move_towards(target, delta_time):
self.current_target = (self.current_target + 1) % len(self.patrol_points)
# 检查是否发现敌人
if ai_agent.detect_enemy():
ai_agent.change_state(ChaseState())
def on_enter(self, ai_agent):
print(f"{ai_agent.name} 开始巡逻")
def on_exit(self, ai_agent):
print(f"{ai_agent.name} 停止巡逻")
class ChaseState(GameAIState):
"""追击状态"""
def __init__(self):
self.chase_duration = 0
self.max_chase_time = 10.0
def update(self, ai_agent, delta_time):
self.chase_duration += delta_time
enemy = ai_agent.get_nearest_enemy()
if enemy:
if ai_agent.get_distance_to(enemy) < ai_agent.attack_range:
ai_agent.change_state(AttackState())
else:
ai_agent.move_towards(enemy.position, delta_time)
else:
# 失去目标,返回巡逻
ai_agent.change_state(PatrolState())
# 追击超时
if self.chase_duration > self.max_chase_time:
ai_agent.change_state(PatrolState())
def on_enter(self, ai_agent):
print(f"{ai_agent.name} 开始追击")
self.chase_duration = 0
def on_exit(self, ai_agent):
print(f"{ai_agent.name} 停止追击")
class AttackState(GameAIState):
"""攻击状态"""
def __init__(self):
self.attack_cooldown = 0
self.attack_interval = 1.0
def update(self, ai_agent, delta_time):
self.attack_cooldown -= delta_time
enemy = ai_agent.get_nearest_enemy()
if enemy:
if ai_agent.get_distance_to(enemy) <= ai_agent.attack_range:
if self.attack_cooldown <= 0:
ai_agent.attack(enemy)
self.attack_cooldown = self.attack_interval
else:
# 敌人超出攻击范围,继续追击
ai_agent.change_state(ChaseState())
else:
# 没有敌人,返回巡逻
ai_agent.change_state(PatrolState())
def on_enter(self, ai_agent):
print(f"{ai_agent.name} 开始攻击")
def on_exit(self, ai_agent):
print(f"{ai_agent.name} 停止攻击")
10.6.2 支付系统策略模式
class PaymentStrategy(ABC):
"""支付策略接口"""
@abstractmethod
def pay(self, amount, currency, payment_info):
"""执行支付"""
pass
@abstractmethod
def validate_payment_info(self, payment_info):
"""验证支付信息"""
pass
@abstractmethod
def get_transaction_fee(self, amount, currency):
"""获取交易费用"""
pass
class CreditCardPayment(PaymentStrategy):
"""信用卡支付"""
def pay(self, amount, currency, payment_info):
if not self.validate_payment_info(payment_info):
raise ValueError("信用卡信息无效")
fee = self.get_transaction_fee(amount, currency)
total_amount = amount + fee
# 模拟信用卡支付
print(f"信用卡支付: {amount} {currency}, 手续费: {fee} {currency}")
return {
'success': True,
'transaction_id': f"CC_{int(time.time())}",
'amount': amount,
'fee': fee,
'total': total_amount
}
def validate_payment_info(self, payment_info):
required_fields = ['card_number', 'expiry_date', 'cvv', 'holder_name']
return all(field in payment_info for field in required_fields)
def get_transaction_fee(self, amount, currency):
return amount * 0.025 # 2.5% 手续费
class PayPalPayment(PaymentStrategy):
"""PayPal支付"""
def pay(self, amount, currency, payment_info):
if not self.validate_payment_info(payment_info):
raise ValueError("PayPal信息无效")
fee = self.get_transaction_fee(amount, currency)
total_amount = amount + fee
print(f"PayPal支付: {amount} {currency}, 手续费: {fee} {currency}")
return {
'success': True,
'transaction_id': f"PP_{int(time.time())}",
'amount': amount,
'fee': fee,
'total': total_amount
}
def validate_payment_info(self, payment_info):
return 'email' in payment_info and 'password' in payment_info
def get_transaction_fee(self, amount, currency):
return amount * 0.029 + 0.30 # 2.9% + $0.30
class CryptocurrencyPayment(PaymentStrategy):
"""加密货币支付"""
def pay(self, amount, currency, payment_info):
if not self.validate_payment_info(payment_info):
raise ValueError("加密货币钱包信息无效")
fee = self.get_transaction_fee(amount, currency)
total_amount = amount + fee
print(f"加密货币支付: {amount} {currency}, 网络费: {fee} {currency}")
return {
'success': True,
'transaction_id': f"CRYPTO_{int(time.time())}",
'amount': amount,
'fee': fee,
'total': total_amount
}
def validate_payment_info(self, payment_info):
return 'wallet_address' in payment_info and 'private_key' in payment_info
def get_transaction_fee(self, amount, currency):
return 0.001 # 固定网络费用
class PaymentProcessor:
"""支付处理器"""
def __init__(self):
self._strategies = {
'credit_card': CreditCardPayment(),
'paypal': PayPalPayment(),
'cryptocurrency': CryptocurrencyPayment()
}
def process_payment(self, payment_method, amount, currency, payment_info):
"""处理支付"""
if payment_method not in self._strategies:
raise ValueError(f"不支持的支付方式: {payment_method}")
strategy = self._strategies[payment_method]
return strategy.pay(amount, currency, payment_info)
def get_payment_options(self, amount, currency):
"""获取支付选项"""
options = []
for method, strategy in self._strategies.items():
fee = strategy.get_transaction_fee(amount, currency)
options.append({
'method': method,
'fee': fee,
'total': amount + fee
})
return sorted(options, key=lambda x: x['total'])
10.7 本章总结
10.7.1 核心概念回顾
状态模式: - 将对象的状态封装成独立的类 - 状态转换由状态对象自己控制 - 适用于复杂的状态机实现 - 消除了大量的条件判断语句
策略模式: - 将算法封装成独立的策略类 - 策略选择由客户端控制 - 适用于算法的动态选择 - 支持算法的运行时替换
10.7.2 最佳实践
状态模式最佳实践:
- 明确定义状态转换规则
- 使用状态对象池优化性能
- 实现状态的持久化和恢复
- 添加状态转换的日志记录
策略模式最佳实践:
- 建立策略性能基准测试
- 实现策略的自适应选择
- 使用策略注册表管理策略
- 支持策略的组合使用
10.7.3 实际应用建议
选择合适的模式:
- 根据问题的本质选择模式
- 考虑系统的复杂度和维护成本
- 评估性能和内存影响
模式组合使用:
- 状态模式可以与策略模式结合
- 在状态转换中使用策略选择
- 创建更灵活的系统架构
10.7.4 注意事项
状态模式注意事项:
- 避免状态类过多导致的复杂性
- 正确处理状态转换的异常情况
- 考虑状态对象的生命周期管理
策略模式注意事项:
- 避免策略选择逻辑过于复杂
- 合理管理策略对象的创建和销毁
- 考虑策略执行的线程安全性
10.8 练习题
10.8.1 基础练习
状态模式练习:
- 实现一个简单的订单状态机(待支付、已支付、已发货、已完成、已取消)
- 实现一个电梯控制系统的状态机
策略模式练习:
- 实现一个图像压缩系统,支持不同的压缩算法
- 实现一个路径规划系统,支持不同的路径算法
10.8.2 进阶练习
状态模式进阶:
- 实现一个支持状态持久化的工作流引擎
- 实现一个多线程安全的状态机
策略模式进阶:
- 实现一个自适应的负载均衡策略系统
- 实现一个支持策略组合的数据处理框架
10.8.3 思考题
- 如何在状态模式中处理状态转换的回滚操作?
- 策略模式中如何实现策略的热插拔功能?
- 如何设计一个既支持状态转换又支持策略选择的系统?
- 在什么情况下应该考虑使用状态模式和策略模式的组合?
下一章预告: 第十一章将学习责任链模式和中介者模式,探讨对象间的通信和协作机制。 “`