5.1 结构型模式概述
5.1.1 什么是结构型模式
结构型设计模式关注如何将类和对象组合成更大的结构。这些模式通过继承和组合来组织接口和实现,从而获得新的功能。结构型模式解决的是”如何组合”的问题,而不是”如何创建”的问题。
核心思想: - 通过组合获得比各部分之和更强大的功能 - 简化复杂系统的结构 - 提供统一的接口来访问复杂的子系统 - 在不改变现有代码的情况下扩展功能
5.1.2 结构型模式的分类
结构型模式主要包括以下7种:
模式名称 | 主要目的 | 适用场景 | 关键特点 |
---|---|---|---|
适配器模式 | 接口转换 | 系统集成、遗留代码复用 | 让不兼容的接口协同工作 |
桥接模式 | 分离抽象与实现 | 多维度变化、跨平台开发 | 避免继承爆炸 |
组合模式 | 树形结构处理 | 文件系统、UI组件 | 统一处理单个对象和组合对象 |
装饰器模式 | 动态添加功能 | 功能扩展、中间件 | 不改变原有结构的前提下增强功能 |
外观模式 | 简化复杂接口 | 子系统封装、API设计 | 提供统一的高层接口 |
享元模式 | 共享细粒度对象 | 大量相似对象、内存优化 | 通过共享减少内存使用 |
代理模式 | 控制对象访问 | 远程调用、缓存、权限控制 | 为其他对象提供代理以控制访问 |
5.1.3 结构型模式的优势
降低耦合度
- 通过接口和抽象类减少直接依赖
- 使系统更容易维护和扩展
提高复用性
- 通过组合而非继承实现功能扩展
- 避免类继承层次过深的问题
简化复杂性
- 隐藏子系统的复杂性
- 提供简洁的接口
增强灵活性
- 运行时动态组合对象
- 支持功能的动态添加和移除
5.2 适配器模式(Adapter Pattern)
5.2.1 模式定义与动机
定义: 适配器模式将一个类的接口转换成客户希望的另一个接口。适配器让原本接口不兼容的类可以合作无间。
动机: - 系统集成时遇到接口不兼容的问题 - 复用现有类,但其接口与需求不匹配 - 第三方库的接口与系统接口不一致 - 遗留系统的现代化改造
5.2.2 模式结构
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Client │───▶│ Target │ │ Adaptee │
│ │ │ (interface) │ │ (existing) │
└─────────────┘ └─────────────┘ └─────────────┘
▲ ▲
│ │
┌─────────────┐ │
│ Adapter │────────────┘
│ │
└─────────────┘
角色说明: - Target(目标接口): 客户所期待的接口 - Adaptee(被适配者): 需要适配的现有接口 - Adapter(适配器): 实现Target接口,并持有Adaptee的实例 - Client(客户端): 使用Target接口的代码
5.2.3 实现方式
对象适配器(推荐)
# Python实现 - 对象适配器
from abc import ABC, abstractmethod
from typing import List, Dict, Any
import json
import xml.etree.ElementTree as ET
# 目标接口:统一的数据处理接口
class DataProcessor(ABC):
"""数据处理器的目标接口"""
@abstractmethod
def process_data(self, data: str) -> Dict[str, Any]:
"""处理数据并返回标准格式"""
pass
@abstractmethod
def get_format_info(self) -> str:
"""获取支持的数据格式信息"""
pass
# 被适配者1:JSON处理器(现有的类)
class JSONProcessor:
"""现有的JSON处理器"""
def parse_json(self, json_string: str) -> dict:
"""解析JSON字符串"""
try:
return json.loads(json_string)
except json.JSONDecodeError as e:
raise ValueError(f"Invalid JSON format: {e}")
def get_json_info(self) -> str:
return "JSON Data Processor v1.0"
# 被适配者2:XML处理器(现有的类)
class XMLProcessor:
"""现有的XML处理器"""
def parse_xml(self, xml_string: str) -> dict:
"""解析XML字符串"""
try:
root = ET.fromstring(xml_string)
return self._xml_to_dict(root)
except ET.ParseError as e:
raise ValueError(f"Invalid XML format: {e}")
def _xml_to_dict(self, element) -> dict:
"""将XML元素转换为字典"""
result = {}
# 处理属性
if element.attrib:
result['@attributes'] = element.attrib
# 处理文本内容
if element.text and element.text.strip():
if len(element) == 0: # 叶子节点
return element.text.strip()
else:
result['#text'] = element.text.strip()
# 处理子元素
for child in element:
child_data = self._xml_to_dict(child)
if child.tag in result:
if not isinstance(result[child.tag], list):
result[child.tag] = [result[child.tag]]
result[child.tag].append(child_data)
else:
result[child.tag] = child_data
return result
def get_xml_info(self) -> str:
return "XML Data Processor v2.1"
# 被适配者3:CSV处理器(现有的类)
class CSVProcessor:
"""现有的CSV处理器"""
def parse_csv(self, csv_string: str, delimiter: str = ',') -> List[Dict[str, str]]:
"""解析CSV字符串"""
lines = csv_string.strip().split('\n')
if not lines:
return []
headers = [h.strip() for h in lines[0].split(delimiter)]
result = []
for line in lines[1:]:
if line.strip():
values = [v.strip() for v in line.split(delimiter)]
row = {}
for i, header in enumerate(headers):
row[header] = values[i] if i < len(values) else ''
result.append(row)
return result
def get_csv_info(self) -> str:
return "CSV Data Processor v1.5"
# 适配器1:JSON适配器
class JSONAdapter(DataProcessor):
"""JSON处理器的适配器"""
def __init__(self, json_processor: JSONProcessor):
self._json_processor = json_processor
def process_data(self, data: str) -> Dict[str, Any]:
"""适配JSON处理器的接口"""
parsed_data = self._json_processor.parse_json(data)
return {
'format': 'JSON',
'data': parsed_data,
'metadata': {
'processor': self._json_processor.get_json_info(),
'timestamp': self._get_timestamp(),
'data_type': type(parsed_data).__name__
}
}
def get_format_info(self) -> str:
return f"JSON Adapter - {self._json_processor.get_json_info()}"
def _get_timestamp(self) -> str:
from datetime import datetime
return datetime.now().isoformat()
# 适配器2:XML适配器
class XMLAdapter(DataProcessor):
"""XML处理器的适配器"""
def __init__(self, xml_processor: XMLProcessor):
self._xml_processor = xml_processor
def process_data(self, data: str) -> Dict[str, Any]:
"""适配XML处理器的接口"""
parsed_data = self._xml_processor.parse_xml(data)
return {
'format': 'XML',
'data': parsed_data,
'metadata': {
'processor': self._xml_processor.get_xml_info(),
'timestamp': self._get_timestamp(),
'data_type': type(parsed_data).__name__
}
}
def get_format_info(self) -> str:
return f"XML Adapter - {self._xml_processor.get_xml_info()}"
def _get_timestamp(self) -> str:
from datetime import datetime
return datetime.now().isoformat()
# 适配器3:CSV适配器
class CSVAdapter(DataProcessor):
"""CSV处理器的适配器"""
def __init__(self, csv_processor: CSVProcessor, delimiter: str = ','):
self._csv_processor = csv_processor
self._delimiter = delimiter
def process_data(self, data: str) -> Dict[str, Any]:
"""适配CSV处理器的接口"""
parsed_data = self._csv_processor.parse_csv(data, self._delimiter)
return {
'format': 'CSV',
'data': parsed_data,
'metadata': {
'processor': self._csv_processor.get_csv_info(),
'timestamp': self._get_timestamp(),
'delimiter': self._delimiter,
'row_count': len(parsed_data),
'data_type': type(parsed_data).__name__
}
}
def get_format_info(self) -> str:
return f"CSV Adapter - {self._csv_processor.get_csv_info()}"
def _get_timestamp(self) -> str:
from datetime import datetime
return datetime.now().isoformat()
# 客户端代码:统一的数据处理系统
class UnifiedDataProcessingSystem:
"""统一的数据处理系统"""
def __init__(self):
self._processors: Dict[str, DataProcessor] = {}
def register_processor(self, name: str, processor: DataProcessor):
"""注册数据处理器"""
self._processors[name] = processor
print(f"Registered processor: {name} - {processor.get_format_info()}")
def process_data(self, processor_name: str, data: str) -> Dict[str, Any]:
"""使用指定的处理器处理数据"""
if processor_name not in self._processors:
raise ValueError(f"Unknown processor: {processor_name}")
processor = self._processors[processor_name]
return processor.process_data(data)
def get_available_processors(self) -> List[str]:
"""获取可用的处理器列表"""
return list(self._processors.keys())
def process_multiple_formats(self, data_samples: Dict[str, str]) -> Dict[str, Any]:
"""处理多种格式的数据"""
results = {}
for format_name, data in data_samples.items():
try:
result = self.process_data(format_name, data)
results[format_name] = {
'success': True,
'result': result
}
except Exception as e:
results[format_name] = {
'success': False,
'error': str(e)
}
return results
# 使用示例和演示
def demonstrate_adapter_pattern():
print("=== Adapter Pattern Demo ===")
print()
# 创建被适配者实例
json_processor = JSONProcessor()
xml_processor = XMLProcessor()
csv_processor = CSVProcessor()
# 创建适配器
json_adapter = JSONAdapter(json_processor)
xml_adapter = XMLAdapter(xml_processor)
csv_adapter = CSVAdapter(csv_processor)
# 创建统一处理系统
system = UnifiedDataProcessingSystem()
# 注册处理器
system.register_processor('json', json_adapter)
system.register_processor('xml', xml_adapter)
system.register_processor('csv', csv_adapter)
print(f"\nAvailable processors: {system.get_available_processors()}")
print()
# 准备测试数据
test_data = {
'json': '''
{
"users": [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"}
],
"total": 2
}
''',
'xml': '''
<users>
<user id="1">
<name>Alice</name>
<email>alice@example.com</email>
</user>
<user id="2">
<name>Bob</name>
<email>bob@example.com</email>
</user>
</users>
''',
'csv': '''
id,name,email
1,Alice,alice@example.com
2,Bob,bob@example.com
3,Charlie,charlie@example.com
'''
}
# 处理多种格式的数据
print("Processing multiple data formats:")
print("-" * 50)
results = system.process_multiple_formats(test_data)
for format_name, result in results.items():
print(f"\n{format_name.upper()} Processing Result:")
if result['success']:
processed_data = result['result']
print(f" Format: {processed_data['format']}")
print(f" Processor: {processed_data['metadata']['processor']}")
print(f" Timestamp: {processed_data['metadata']['timestamp']}")
print(f" Data Type: {processed_data['metadata']['data_type']}")
if format_name == 'csv':
print(f" Row Count: {processed_data['metadata']['row_count']}")
print(f" Delimiter: {processed_data['metadata']['delimiter']}")
print(f" Data Preview: {str(processed_data['data'])[:100]}...")
else:
print(f" Error: {result['error']}")
print("\n" + "="*60)
print("Adapter Pattern Benefits Demonstrated:")
print("1. Unified interface for different data formats")
print("2. Existing processors reused without modification")
print("3. Easy to add new format support")
print("4. Client code remains unchanged when adding new formats")
if __name__ == "__main__":
demonstrate_adapter_pattern()
类适配器(继承方式)
// Java实现 - 类适配器
// 目标接口
interface MediaPlayer {
void play(String audioType, String fileName);
}
// 被适配者接口
interface AdvancedMediaPlayer {
void playVlc(String fileName);
void playMp4(String fileName);
}
// 具体的被适配者
class VlcPlayer implements AdvancedMediaPlayer {
@Override
public void playVlc(String fileName) {
System.out.println("Playing vlc file. Name: " + fileName);
}
@Override
public void playMp4(String fileName) {
// 空实现
}
}
class Mp4Player implements AdvancedMediaPlayer {
@Override
public void playVlc(String fileName) {
// 空实现
}
@Override
public void playMp4(String fileName) {
System.out.println("Playing mp4 file. Name: " + fileName);
}
}
// 适配器类
class MediaAdapter implements MediaPlayer {
private AdvancedMediaPlayer advancedMusicPlayer;
public MediaAdapter(String audioType) {
if (audioType.equalsIgnoreCase("vlc")) {
advancedMusicPlayer = new VlcPlayer();
} else if (audioType.equalsIgnoreCase("mp4")) {
advancedMusicPlayer = new Mp4Player();
}
}
@Override
public void play(String audioType, String fileName) {
if (audioType.equalsIgnoreCase("vlc")) {
advancedMusicPlayer.playVlc(fileName);
} else if (audioType.equalsIgnoreCase("mp4")) {
advancedMusicPlayer.playMp4(fileName);
}
}
}
// 客户端
class AudioPlayer implements MediaPlayer {
private MediaAdapter mediaAdapter;
@Override
public void play(String audioType, String fileName) {
// 内置支持mp3格式
if (audioType.equalsIgnoreCase("mp3")) {
System.out.println("Playing mp3 file. Name: " + fileName);
}
// 通过适配器支持其他格式
else if (audioType.equalsIgnoreCase("vlc") ||
audioType.equalsIgnoreCase("mp4")) {
mediaAdapter = new MediaAdapter(audioType);
mediaAdapter.play(audioType, fileName);
} else {
System.out.println("Invalid media. " + audioType + " format not supported");
}
}
}
5.2.4 适配器模式的变体
双向适配器
# 双向适配器示例
class BidirectionalAdapter(DataProcessor):
"""双向适配器:既可以作为JSON处理器,也可以作为XML处理器"""
def __init__(self, json_processor: JSONProcessor, xml_processor: XMLProcessor):
self._json_processor = json_processor
self._xml_processor = xml_processor
self._current_format = 'json' # 默认格式
def set_format(self, format_type: str):
"""设置当前处理格式"""
if format_type.lower() in ['json', 'xml']:
self._current_format = format_type.lower()
else:
raise ValueError(f"Unsupported format: {format_type}")
def process_data(self, data: str) -> Dict[str, Any]:
"""根据当前格式处理数据"""
if self._current_format == 'json':
return self._process_as_json(data)
else:
return self._process_as_xml(data)
def _process_as_json(self, data: str) -> Dict[str, Any]:
parsed_data = self._json_processor.parse_json(data)
return {
'format': 'JSON',
'data': parsed_data,
'adapter_type': 'bidirectional',
'metadata': {
'processor': self._json_processor.get_json_info(),
'timestamp': self._get_timestamp()
}
}
def _process_as_xml(self, data: str) -> Dict[str, Any]:
parsed_data = self._xml_processor.parse_xml(data)
return {
'format': 'XML',
'data': parsed_data,
'adapter_type': 'bidirectional',
'metadata': {
'processor': self._xml_processor.get_xml_info(),
'timestamp': self._get_timestamp()
}
}
def get_format_info(self) -> str:
return f"Bidirectional Adapter (Current: {self._current_format.upper()})"
def _get_timestamp(self) -> str:
from datetime import datetime
return datetime.now().isoformat()
缓存适配器
# 缓存适配器示例
class CachingAdapter(DataProcessor):
"""带缓存功能的适配器"""
def __init__(self, base_processor: DataProcessor, cache_size: int = 100):
self._base_processor = base_processor
self._cache = {}
self._cache_size = cache_size
self._access_order = [] # LRU缓存
def process_data(self, data: str) -> Dict[str, Any]:
"""带缓存的数据处理"""
# 生成缓存键
cache_key = self._generate_cache_key(data)
# 检查缓存
if cache_key in self._cache:
self._update_access_order(cache_key)
cached_result = self._cache[cache_key].copy()
cached_result['metadata']['cache_hit'] = True
cached_result['metadata']['cached_at'] = self._cache[cache_key]['metadata']['timestamp']
return cached_result
# 缓存未命中,处理数据
result = self._base_processor.process_data(data)
result['metadata']['cache_hit'] = False
# 存入缓存
self._add_to_cache(cache_key, result)
return result
def _generate_cache_key(self, data: str) -> str:
"""生成缓存键"""
import hashlib
return hashlib.md5(data.encode()).hexdigest()
def _add_to_cache(self, key: str, value: Dict[str, Any]):
"""添加到缓存(LRU策略)"""
if len(self._cache) >= self._cache_size:
# 移除最久未使用的项
oldest_key = self._access_order.pop(0)
del self._cache[oldest_key]
self._cache[key] = value.copy()
self._access_order.append(key)
def _update_access_order(self, key: str):
"""更新访问顺序"""
self._access_order.remove(key)
self._access_order.append(key)
def get_format_info(self) -> str:
cache_stats = f"Cache: {len(self._cache)}/{self._cache_size}"
return f"Caching Adapter - {self._base_processor.get_format_info()} ({cache_stats})"
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
return {
'cache_size': len(self._cache),
'max_cache_size': self._cache_size,
'cache_keys': list(self._cache.keys())
}
5.2.5 适配器模式的优缺点
优点: 1. 复用现有代码: 无需修改现有类即可使其适配新接口 2. 分离关注点: 将接口转换逻辑与业务逻辑分离 3. 开闭原则: 对扩展开放,对修改关闭 4. 灵活性: 可以同时适配多个不兼容的接口
缺点: 1. 增加复杂性: 引入额外的抽象层 2. 性能开销: 额外的方法调用和对象创建 3. 维护成本: 需要维护适配器代码
5.2.6 适用场景
- 系统集成: 整合第三方库或遗留系统
- 接口标准化: 统一不同供应商的API
- 版本兼容: 新旧版本接口的兼容处理
- 数据格式转换: 不同数据格式之间的转换
- 跨平台开发: 适配不同平台的API差异
5.3 装饰器模式(Decorator Pattern)
5.3.1 模式定义与动机
定义: 装饰器模式动态地给一个对象添加一些额外的职责。就增加功能来说,装饰器模式相比生成子类更为灵活。
动机: - 需要在运行时动态地给对象添加功能 - 避免使用继承导致的类爆炸问题 - 需要撤销某个功能的添加 - 需要组合多种功能
5.3.2 模式结构
┌─────────────┐
│ Component │
│ (abstract) │
└─────────────┘
▲
│
┌─────────────┐ ┌─────────────┐
│ Concrete │ │ Decorator │
│ Component │ │ (abstract) │
└─────────────┘ └─────────────┘
▲
│
┌─────────────┐
│ Concrete │
│ Decorator │
└─────────────┘
角色说明: - Component: 定义对象的接口,可以给这些对象动态添加职责 - ConcreteComponent: 具体组件,实现Component接口的对象 - Decorator: 装饰器抽象类,持有Component对象的引用 - ConcreteDecorator: 具体装饰器,给组件添加具体的职责
5.3.3 Python实现示例
# Python装饰器模式实现
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import time
import json
from datetime import datetime
# 组件接口
class DataService(ABC):
"""数据服务组件接口"""
@abstractmethod
def get_data(self, query: str) -> Dict[str, Any]:
"""获取数据"""
pass
@abstractmethod
def save_data(self, data: Dict[str, Any]) -> bool:
"""保存数据"""
pass
# 具体组件
class DatabaseService(DataService):
"""数据库服务(具体组件)"""
def __init__(self, db_name: str = "default_db"):
self.db_name = db_name
self._data_store = {} # 模拟数据库
def get_data(self, query: str) -> Dict[str, Any]:
"""从数据库获取数据"""
print(f"[DatabaseService] Executing query: {query}")
# 模拟数据库查询
time.sleep(0.1) # 模拟查询延迟
if query in self._data_store:
return {
'status': 'success',
'data': self._data_store[query],
'source': 'database',
'db_name': self.db_name
}
else:
return {
'status': 'not_found',
'data': None,
'source': 'database',
'db_name': self.db_name
}
def save_data(self, data: Dict[str, Any]) -> bool:
"""保存数据到数据库"""
key = data.get('key', str(len(self._data_store)))
self._data_store[key] = data
print(f"[DatabaseService] Saved data with key: {key}")
return True
# 装饰器基类
class DataServiceDecorator(DataService):
"""数据服务装饰器基类"""
def __init__(self, service: DataService):
self._service = service
def get_data(self, query: str) -> Dict[str, Any]:
return self._service.get_data(query)
def save_data(self, data: Dict[str, Any]) -> bool:
return self._service.save_data(data)
# 具体装饰器1:缓存装饰器
class CacheDecorator(DataServiceDecorator):
"""缓存装饰器"""
def __init__(self, service: DataService, cache_ttl: int = 300):
super().__init__(service)
self._cache = {}
self._cache_ttl = cache_ttl # 缓存生存时间(秒)
def get_data(self, query: str) -> Dict[str, Any]:
# 检查缓存
cache_key = self._generate_cache_key(query)
cached_result = self._get_from_cache(cache_key)
if cached_result is not None:
print(f"[CacheDecorator] Cache hit for query: {query}")
cached_result['cache_hit'] = True
return cached_result
print(f"[CacheDecorator] Cache miss for query: {query}")
# 缓存未命中,调用原始服务
result = super().get_data(query)
result['cache_hit'] = False
# 存入缓存
self._add_to_cache(cache_key, result)
return result
def _generate_cache_key(self, query: str) -> str:
import hashlib
return hashlib.md5(query.encode()).hexdigest()
def _get_from_cache(self, cache_key: str) -> Dict[str, Any]:
if cache_key in self._cache:
cache_entry = self._cache[cache_key]
if time.time() - cache_entry['timestamp'] < self._cache_ttl:
return cache_entry['data'].copy()
else:
# 缓存过期,删除
del self._cache[cache_key]
return None
def _add_to_cache(self, cache_key: str, data: Dict[str, Any]):
self._cache[cache_key] = {
'data': data.copy(),
'timestamp': time.time()
}
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
valid_entries = 0
current_time = time.time()
for entry in self._cache.values():
if current_time - entry['timestamp'] < self._cache_ttl:
valid_entries += 1
return {
'total_entries': len(self._cache),
'valid_entries': valid_entries,
'cache_ttl': self._cache_ttl
}
# 具体装饰器2:日志装饰器
class LoggingDecorator(DataServiceDecorator):
"""日志装饰器"""
def __init__(self, service: DataService, log_level: str = "INFO"):
super().__init__(service)
self._log_level = log_level
self._logs = []
def get_data(self, query: str) -> Dict[str, Any]:
start_time = time.time()
self._log(f"Starting get_data operation for query: {query}")
try:
result = super().get_data(query)
execution_time = time.time() - start_time
self._log(f"get_data completed successfully in {execution_time:.3f}s")
# 添加日志信息到结果中
result['execution_time'] = execution_time
result['logged'] = True
return result
except Exception as e:
execution_time = time.time() - start_time
self._log(f"get_data failed after {execution_time:.3f}s: {str(e)}", "ERROR")
raise
def save_data(self, data: Dict[str, Any]) -> bool:
start_time = time.time()
self._log(f"Starting save_data operation")
try:
result = super().save_data(data)
execution_time = time.time() - start_time
self._log(f"save_data completed successfully in {execution_time:.3f}s")
return result
except Exception as e:
execution_time = time.time() - start_time
self._log(f"save_data failed after {execution_time:.3f}s: {str(e)}", "ERROR")
raise
def _log(self, message: str, level: str = None):
level = level or self._log_level
timestamp = datetime.now().isoformat()
log_entry = f"[{timestamp}] [{level}] [LoggingDecorator] {message}"
print(log_entry)
self._logs.append({
'timestamp': timestamp,
'level': level,
'message': message
})
def get_logs(self) -> List[Dict[str, str]]:
"""获取日志记录"""
return self._logs.copy()
# 具体装饰器3:性能监控装饰器
class PerformanceDecorator(DataServiceDecorator):
"""性能监控装饰器"""
def __init__(self, service: DataService):
super().__init__(service)
self._metrics = {
'get_data_calls': 0,
'save_data_calls': 0,
'total_get_time': 0.0,
'total_save_time': 0.0,
'get_data_errors': 0,
'save_data_errors': 0
}
def get_data(self, query: str) -> Dict[str, Any]:
start_time = time.time()
self._metrics['get_data_calls'] += 1
try:
result = super().get_data(query)
execution_time = time.time() - start_time
self._metrics['total_get_time'] += execution_time
# 添加性能信息
result['performance'] = {
'execution_time': execution_time,
'operation': 'get_data',
'call_count': self._metrics['get_data_calls']
}
return result
except Exception as e:
self._metrics['get_data_errors'] += 1
execution_time = time.time() - start_time
self._metrics['total_get_time'] += execution_time
raise
def save_data(self, data: Dict[str, Any]) -> bool:
start_time = time.time()
self._metrics['save_data_calls'] += 1
try:
result = super().save_data(data)
execution_time = time.time() - start_time
self._metrics['total_save_time'] += execution_time
return result
except Exception as e:
self._metrics['save_data_errors'] += 1
execution_time = time.time() - start_time
self._metrics['total_save_time'] += execution_time
raise
def get_performance_metrics(self) -> Dict[str, Any]:
"""获取性能指标"""
metrics = self._metrics.copy()
# 计算平均时间
if metrics['get_data_calls'] > 0:
metrics['avg_get_time'] = metrics['total_get_time'] / metrics['get_data_calls']
else:
metrics['avg_get_time'] = 0.0
if metrics['save_data_calls'] > 0:
metrics['avg_save_time'] = metrics['total_save_time'] / metrics['save_data_calls']
else:
metrics['avg_save_time'] = 0.0
# 计算错误率
if metrics['get_data_calls'] > 0:
metrics['get_error_rate'] = metrics['get_data_errors'] / metrics['get_data_calls']
else:
metrics['get_error_rate'] = 0.0
if metrics['save_data_calls'] > 0:
metrics['save_error_rate'] = metrics['save_data_errors'] / metrics['save_data_calls']
else:
metrics['save_error_rate'] = 0.0
return metrics
# 具体装饰器4:重试装饰器
class RetryDecorator(DataServiceDecorator):
"""重试装饰器"""
def __init__(self, service: DataService, max_retries: int = 3, retry_delay: float = 1.0):
super().__init__(service)
self._max_retries = max_retries
self._retry_delay = retry_delay
def get_data(self, query: str) -> Dict[str, Any]:
last_exception = None
for attempt in range(self._max_retries + 1):
try:
result = super().get_data(query)
# 添加重试信息
result['retry_info'] = {
'attempt': attempt + 1,
'max_retries': self._max_retries,
'success': True
}
if attempt > 0:
print(f"[RetryDecorator] Operation succeeded on attempt {attempt + 1}")
return result
except Exception as e:
last_exception = e
if attempt < self._max_retries:
print(f"[RetryDecorator] Attempt {attempt + 1} failed: {str(e)}. Retrying in {self._retry_delay}s...")
time.sleep(self._retry_delay)
else:
print(f"[RetryDecorator] All {self._max_retries + 1} attempts failed")
# 所有重试都失败了
raise last_exception
def save_data(self, data: Dict[str, Any]) -> bool:
last_exception = None
for attempt in range(self._max_retries + 1):
try:
result = super().save_data(data)
if attempt > 0:
print(f"[RetryDecorator] Save operation succeeded on attempt {attempt + 1}")
return result
except Exception as e:
last_exception = e
if attempt < self._max_retries:
print(f"[RetryDecorator] Save attempt {attempt + 1} failed: {str(e)}. Retrying in {self._retry_delay}s...")
time.sleep(self._retry_delay)
else:
print(f"[RetryDecorator] All {self._max_retries + 1} save attempts failed")
# 所有重试都失败了
raise last_exception
# 使用示例和演示
def demonstrate_decorator_pattern():
print("=== Decorator Pattern Demo ===")
print()
# 创建基础服务
base_service = DatabaseService("user_db")
# 逐步添加装饰器
print("1. Testing base service:")
print("-" * 30)
# 先保存一些测试数据
test_data = {
'key': 'user_123',
'name': 'Alice',
'email': 'alice@example.com',
'age': 30
}
base_service.save_data(test_data)
result = base_service.get_data('user_123')
print(f"Base service result: {result}")
print()
# 添加缓存装饰器
print("2. Adding cache decorator:")
print("-" * 30)
cached_service = CacheDecorator(base_service, cache_ttl=60)
# 第一次查询(缓存未命中)
result1 = cached_service.get_data('user_123')
print(f"First query (cache miss): cache_hit = {result1.get('cache_hit')}")
# 第二次查询(缓存命中)
result2 = cached_service.get_data('user_123')
print(f"Second query (cache hit): cache_hit = {result2.get('cache_hit')}")
print(f"Cache stats: {cached_service.get_cache_stats()}")
print()
# 添加日志装饰器
print("3. Adding logging decorator:")
print("-" * 30)
logged_service = LoggingDecorator(cached_service)
result3 = logged_service.get_data('user_123')
print(f"Logged query result: execution_time = {result3.get('execution_time'):.3f}s")
print()
# 添加性能监控装饰器
print("4. Adding performance decorator:")
print("-" * 30)
perf_service = PerformanceDecorator(logged_service)
# 执行多次查询
for i in range(3):
result = perf_service.get_data('user_123')
print(f"Query {i+1}: execution_time = {result['performance']['execution_time']:.3f}s")
print(f"Performance metrics: {perf_service.get_performance_metrics()}")
print()
# 添加重试装饰器
print("5. Adding retry decorator:")
print("-" * 30)
retry_service = RetryDecorator(perf_service, max_retries=2, retry_delay=0.5)
# 正常查询
result4 = retry_service.get_data('user_123')
print(f"Retry query result: {result4['retry_info']}")
print()
# 完整的装饰器链演示
print("6. Complete decorator chain:")
print("-" * 30)
print("Service chain: DatabaseService -> CacheDecorator -> LoggingDecorator -> PerformanceDecorator -> RetryDecorator")
# 查询不存在的数据
result5 = retry_service.get_data('nonexistent_user')
print(f"Non-existent query result: {result5['status']}")
print("\n" + "="*60)
print("Decorator Pattern Benefits Demonstrated:")
print("1. Dynamic functionality addition without modifying original class")
print("2. Flexible combination of different decorators")
print("3. Single responsibility principle - each decorator has one purpose")
print("4. Easy to add/remove functionality at runtime")
print("5. Transparent to client - same interface throughout")
if __name__ == "__main__":
demonstrate_decorator_pattern()
下一节预告: 接下来我们将继续学习外观模式和代理模式,探讨如何简化复杂系统的接口以及如何控制对象的访问。