本章将通过几个完整的实战案例,展示Kafka在不同场景下的应用,包括日志收集系统、实时数据处理平台、微服务事件驱动架构等。

7.1 日志收集系统

7.1.1 系统架构

import json
import time
import threading
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from datetime import datetime
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import logging
import os
import re
from pathlib import Path

@dataclass
class LogEntry:
    """日志条目"""
    timestamp: str
    level: str
    service: str
    host: str
    message: str
    thread_id: Optional[str] = None
    request_id: Optional[str] = None
    user_id: Optional[str] = None
    extra_fields: Optional[Dict[str, Any]] = None

@dataclass
class LogCollectorConfig:
    """日志收集器配置"""
    kafka_servers: List[str]
    topic: str
    log_files: List[str]
    service_name: str
    host_name: str
    batch_size: int = 100
    flush_interval: int = 5  # 秒

class LogFileWatcher:
    """日志文件监控器"""
    
    def __init__(self, file_path: str, callback):
        self.file_path = file_path
        self.callback = callback
        self.file_handle = None
        self.position = 0
        self.running = False
        self.thread = None
        self.logger = logging.getLogger(__name__)
    
    def start(self):
        """开始监控"""
        try:
            self.running = True
            self.thread = threading.Thread(target=self._watch_file, daemon=True)
            self.thread.start()
            self.logger.info(f"开始监控日志文件: {self.file_path}")
        except Exception as e:
            self.logger.error(f"启动文件监控失败: {e}")
    
    def stop(self):
        """停止监控"""
        self.running = False
        if self.file_handle:
            self.file_handle.close()
        if self.thread:
            self.thread.join()
        self.logger.info(f"停止监控日志文件: {self.file_path}")
    
    def _watch_file(self):
        """监控文件变化"""
        try:
            # 打开文件并移动到末尾
            self.file_handle = open(self.file_path, 'r', encoding='utf-8')
            self.file_handle.seek(0, 2)  # 移动到文件末尾
            self.position = self.file_handle.tell()
            
            while self.running:
                # 检查文件是否有新内容
                current_size = os.path.getsize(self.file_path)
                
                if current_size > self.position:
                    # 读取新内容
                    self.file_handle.seek(self.position)
                    new_lines = self.file_handle.readlines()
                    
                    for line in new_lines:
                        line = line.strip()
                        if line:
                            self.callback(line)
                    
                    self.position = self.file_handle.tell()
                
                elif current_size < self.position:
                    # 文件被截断或重新创建
                    self.file_handle.close()
                    self.file_handle = open(self.file_path, 'r', encoding='utf-8')
                    self.position = 0
                
                time.sleep(0.1)  # 短暂休眠
        
        except Exception as e:
            self.logger.error(f"监控文件时发生错误: {e}")
        finally:
            if self.file_handle:
                self.file_handle.close()

class LogParser:
    """日志解析器"""
    
    def __init__(self, service_name: str, host_name: str):
        self.service_name = service_name
        self.host_name = host_name
        self.logger = logging.getLogger(__name__)
        
        # 常见日志格式的正则表达式
        self.patterns = {
            'apache': re.compile(
                r'(?P<ip>\S+) \S+ \S+ \[(?P<timestamp>[^\]]+)\] '
                r'"(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" '
                r'(?P<status>\d+) (?P<size>\S+)'
            ),
            'nginx': re.compile(
                r'(?P<ip>\S+) - \S+ \[(?P<timestamp>[^\]]+)\] '
                r'"(?P<method>\S+) (?P<path>\S+) (?P<protocol>\S+)" '
                r'(?P<status>\d+) (?P<size>\d+)'
            ),
            'application': re.compile(
                r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[,.]\d{3}) '
                r'\[(?P<thread>[^\]]+)\] (?P<level>\w+)\s+(?P<logger>\S+) - '
                r'(?P<message>.*)'
            ),
            'json': re.compile(r'^\{.*\}$')
        }
    
    def parse_log_line(self, line: str) -> Optional[LogEntry]:
        """解析日志行"""
        try:
            # 尝试JSON格式
            if self.patterns['json'].match(line):
                return self._parse_json_log(line)
            
            # 尝试应用程序日志格式
            match = self.patterns['application'].match(line)
            if match:
                return self._parse_application_log(match)
            
            # 尝试Apache日志格式
            match = self.patterns['apache'].match(line)
            if match:
                return self._parse_apache_log(match)
            
            # 尝试Nginx日志格式
            match = self.patterns['nginx'].match(line)
            if match:
                return self._parse_nginx_log(match)
            
            # 如果都不匹配,创建通用日志条目
            return LogEntry(
                timestamp=datetime.now().isoformat(),
                level="INFO",
                service=self.service_name,
                host=self.host_name,
                message=line
            )
        
        except Exception as e:
            self.logger.error(f"解析日志行失败: {e}")
            return None
    
    def _parse_json_log(self, line: str) -> LogEntry:
        """解析JSON格式日志"""
        data = json.loads(line)
        
        return LogEntry(
            timestamp=data.get('timestamp', datetime.now().isoformat()),
            level=data.get('level', 'INFO'),
            service=data.get('service', self.service_name),
            host=data.get('host', self.host_name),
            message=data.get('message', ''),
            thread_id=data.get('thread_id'),
            request_id=data.get('request_id'),
            user_id=data.get('user_id'),
            extra_fields={k: v for k, v in data.items() 
                         if k not in ['timestamp', 'level', 'service', 'host', 'message', 
                                    'thread_id', 'request_id', 'user_id']}
        )
    
    def _parse_application_log(self, match) -> LogEntry:
        """解析应用程序日志"""
        groups = match.groupdict()
        
        return LogEntry(
            timestamp=groups['timestamp'],
            level=groups['level'],
            service=self.service_name,
            host=self.host_name,
            message=groups['message'],
            thread_id=groups.get('thread'),
            extra_fields={'logger': groups.get('logger')}
        )
    
    def _parse_apache_log(self, match) -> LogEntry:
        """解析Apache日志"""
        groups = match.groupdict()
        
        return LogEntry(
            timestamp=groups['timestamp'],
            level="INFO",
            service=self.service_name,
            host=self.host_name,
            message=f"{groups['method']} {groups['path']} {groups['status']}",
            extra_fields={
                'ip': groups['ip'],
                'method': groups['method'],
                'path': groups['path'],
                'protocol': groups['protocol'],
                'status': groups['status'],
                'size': groups['size']
            }
        )
    
    def _parse_nginx_log(self, match) -> LogEntry:
        """解析Nginx日志"""
        groups = match.groupdict()
        
        return LogEntry(
            timestamp=groups['timestamp'],
            level="INFO",
            service=self.service_name,
            host=self.host_name,
            message=f"{groups['method']} {groups['path']} {groups['status']}",
            extra_fields={
                'ip': groups['ip'],
                'method': groups['method'],
                'path': groups['path'],
                'protocol': groups['protocol'],
                'status': groups['status'],
                'size': groups['size']
            }
        )

class LogCollector:
    """日志收集器"""
    
    def __init__(self, config: LogCollectorConfig):
        self.config = config
        self.producer = None
        self.parser = LogParser(config.service_name, config.host_name)
        self.watchers = []
        self.log_buffer = []
        self.buffer_lock = threading.Lock()
        self.running = False
        self.flush_thread = None
        self.logger = logging.getLogger(__name__)
    
    def start(self):
        """启动日志收集器"""
        try:
            # 创建Kafka生产者
            self.producer = KafkaProducer(
                bootstrap_servers=self.config.kafka_servers,
                value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
                key_serializer=lambda k: k.encode('utf-8') if k else None,
                acks='all',
                retries=3,
                batch_size=16384,
                linger_ms=10,
                buffer_memory=33554432
            )
            
            # 启动文件监控器
            for log_file in self.config.log_files:
                if os.path.exists(log_file):
                    watcher = LogFileWatcher(log_file, self._on_log_line)
                    watcher.start()
                    self.watchers.append(watcher)
                else:
                    self.logger.warning(f"日志文件不存在: {log_file}")
            
            # 启动缓冲区刷新线程
            self.running = True
            self.flush_thread = threading.Thread(target=self._flush_buffer, daemon=True)
            self.flush_thread.start()
            
            self.logger.info("日志收集器启动成功")
        
        except Exception as e:
            self.logger.error(f"启动日志收集器失败: {e}")
            raise
    
    def stop(self):
        """停止日志收集器"""
        try:
            self.running = False
            
            # 停止文件监控器
            for watcher in self.watchers:
                watcher.stop()
            
            # 刷新剩余的日志
            self._flush_logs()
            
            # 关闭生产者
            if self.producer:
                self.producer.close()
            
            # 等待刷新线程结束
            if self.flush_thread:
                self.flush_thread.join()
            
            self.logger.info("日志收集器已停止")
        
        except Exception as e:
            self.logger.error(f"停止日志收集器失败: {e}")
    
    def _on_log_line(self, line: str):
        """处理新的日志行"""
        try:
            log_entry = self.parser.parse_log_line(line)
            if log_entry:
                with self.buffer_lock:
                    self.log_buffer.append(asdict(log_entry))
                    
                    # 如果缓冲区满了,立即刷新
                    if len(self.log_buffer) >= self.config.batch_size:
                        self._flush_logs()
        
        except Exception as e:
            self.logger.error(f"处理日志行失败: {e}")
    
    def _flush_buffer(self):
        """定期刷新缓冲区"""
        while self.running:
            try:
                time.sleep(self.config.flush_interval)
                with self.buffer_lock:
                    if self.log_buffer:
                        self._flush_logs()
            
            except Exception as e:
                self.logger.error(f"刷新缓冲区失败: {e}")
    
    def _flush_logs(self):
        """刷新日志到Kafka"""
        if not self.log_buffer:
            return
        
        try:
            logs_to_send = self.log_buffer.copy()
            self.log_buffer.clear()
            
            for log_data in logs_to_send:
                # 使用服务名作为分区键
                key = f"{log_data['service']}:{log_data['host']}"
                
                future = self.producer.send(
                    self.config.topic,
                    value=log_data,
                    key=key
                )
                
                # 添加回调处理
                future.add_callback(self._on_send_success)
                future.add_errback(self._on_send_error)
            
            # 确保消息发送
            self.producer.flush()
            
            self.logger.debug(f"发送 {len(logs_to_send)} 条日志到Kafka")
        
        except Exception as e:
            self.logger.error(f"发送日志到Kafka失败: {e}")
    
    def _on_send_success(self, record_metadata):
        """发送成功回调"""
        self.logger.debug(f"日志发送成功: topic={record_metadata.topic}, "
                         f"partition={record_metadata.partition}, "
                         f"offset={record_metadata.offset}")
    
    def _on_send_error(self, exception):
        """发送失败回调"""
        self.logger.error(f"日志发送失败: {exception}")

### 7.1.2 日志处理器

class LogProcessor:
    """日志处理器"""
    
    def __init__(self, kafka_servers: List[str], topic: str, 
                 consumer_group: str = "log-processor"):
        self.kafka_servers = kafka_servers
        self.topic = topic
        self.consumer_group = consumer_group
        self.consumer = None
        self.running = False
        self.logger = logging.getLogger(__name__)
        
        # 日志统计
        self.stats = {
            'total_logs': 0,
            'error_logs': 0,
            'warn_logs': 0,
            'info_logs': 0,
            'debug_logs': 0,
            'services': set(),
            'hosts': set()
        }
    
    def start(self):
        """启动日志处理器"""
        try:
            self.consumer = KafkaConsumer(
                self.topic,
                bootstrap_servers=self.kafka_servers,
                group_id=self.consumer_group,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                key_deserializer=lambda k: k.decode('utf-8') if k else None,
                auto_offset_reset='latest',
                enable_auto_commit=True,
                auto_commit_interval_ms=1000
            )
            
            self.running = True
            self.logger.info("日志处理器启动成功")
            
            # 开始处理消息
            self._process_messages()
        
        except Exception as e:
            self.logger.error(f"启动日志处理器失败: {e}")
            raise
    
    def stop(self):
        """停止日志处理器"""
        try:
            self.running = False
            if self.consumer:
                self.consumer.close()
            self.logger.info("日志处理器已停止")
        
        except Exception as e:
            self.logger.error(f"停止日志处理器失败: {e}")
    
    def _process_messages(self):
        """处理消息"""
        try:
            for message in self.consumer:
                if not self.running:
                    break
                
                try:
                    log_data = message.value
                    self._process_log_entry(log_data)
                    
                except Exception as e:
                    self.logger.error(f"处理日志条目失败: {e}")
        
        except Exception as e:
            self.logger.error(f"处理消息失败: {e}")
    
    def _process_log_entry(self, log_data: Dict[str, Any]):
        """处理单个日志条目"""
        try:
            # 更新统计信息
            self.stats['total_logs'] += 1
            
            level = log_data.get('level', 'INFO').upper()
            if level == 'ERROR':
                self.stats['error_logs'] += 1
                self._handle_error_log(log_data)
            elif level == 'WARN' or level == 'WARNING':
                self.stats['warn_logs'] += 1
                self._handle_warn_log(log_data)
            elif level == 'INFO':
                self.stats['info_logs'] += 1
            elif level == 'DEBUG':
                self.stats['debug_logs'] += 1
            
            # 记录服务和主机
            self.stats['services'].add(log_data.get('service', 'unknown'))
            self.stats['hosts'].add(log_data.get('host', 'unknown'))
            
            # 检查异常模式
            self._check_anomaly_patterns(log_data)
            
            # 存储到数据库或搜索引擎(这里只是示例)
            self._store_log(log_data)
        
        except Exception as e:
            self.logger.error(f"处理日志条目失败: {e}")
    
    def _handle_error_log(self, log_data: Dict[str, Any]):
        """处理错误日志"""
        try:
            message = log_data.get('message', '')
            service = log_data.get('service', 'unknown')
            host = log_data.get('host', 'unknown')
            
            # 检查是否是严重错误
            critical_keywords = ['OutOfMemoryError', 'StackOverflowError', 
                               'DatabaseConnectionError', 'ServiceUnavailable']
            
            if any(keyword in message for keyword in critical_keywords):
                self._send_alert({
                    'type': 'critical_error',
                    'service': service,
                    'host': host,
                    'message': message,
                    'timestamp': log_data.get('timestamp')
                })
            
            self.logger.warning(f"错误日志 [{service}@{host}]: {message}")
        
        except Exception as e:
            self.logger.error(f"处理错误日志失败: {e}")
    
    def _handle_warn_log(self, log_data: Dict[str, Any]):
        """处理警告日志"""
        try:
            message = log_data.get('message', '')
            
            # 检查是否是性能警告
            if 'slow query' in message.lower() or 'timeout' in message.lower():
                self._send_alert({
                    'type': 'performance_warning',
                    'service': log_data.get('service'),
                    'host': log_data.get('host'),
                    'message': message,
                    'timestamp': log_data.get('timestamp')
                })
        
        except Exception as e:
            self.logger.error(f"处理警告日志失败: {e}")
    
    def _check_anomaly_patterns(self, log_data: Dict[str, Any]):
        """检查异常模式"""
        try:
            # 这里可以实现更复杂的异常检测逻辑
            # 例如:频率异常、模式匹配、机器学习检测等
            pass
        
        except Exception as e:
            self.logger.error(f"检查异常模式失败: {e}")
    
    def _store_log(self, log_data: Dict[str, Any]):
        """存储日志"""
        try:
            # 这里可以实现将日志存储到Elasticsearch、MongoDB等
            # 示例:只打印到控制台
            timestamp = log_data.get('timestamp')
            level = log_data.get('level')
            service = log_data.get('service')
            message = log_data.get('message')
            
            print(f"[{timestamp}] {level} {service}: {message}")
        
        except Exception as e:
            self.logger.error(f"存储日志失败: {e}")
    
    def _send_alert(self, alert_data: Dict[str, Any]):
        """发送告警"""
        try:
            # 这里可以实现发送邮件、短信、钉钉等告警
            self.logger.critical(f"告警: {alert_data}")
        
        except Exception as e:
            self.logger.error(f"发送告警失败: {e}")
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        stats = self.stats.copy()
        stats['services'] = list(stats['services'])
        stats['hosts'] = list(stats['hosts'])
        return stats

# 使用示例
if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== 日志收集系统 ===")
    
    # 日志收集器配置
    collector_config = LogCollectorConfig(
        kafka_servers=['localhost:9092'],
        topic='application-logs',
        log_files=[
            '/var/log/application.log',
            '/var/log/nginx/access.log',
            '/var/log/nginx/error.log'
        ],
        service_name='web-service',
        host_name='web-server-01',
        batch_size=50,
        flush_interval=3
    )
    
    # 启动日志收集器
    collector = LogCollector(collector_config)
    
    try:
        print("\n1. 启动日志收集器")
        collector.start()
        
        # 启动日志处理器
        print("\n2. 启动日志处理器")
        processor = LogProcessor(
            kafka_servers=['localhost:9092'],
            topic='application-logs',
            consumer_group='log-processor-group'
        )
        
        # 在单独的线程中启动处理器
        import threading
        processor_thread = threading.Thread(target=processor.start, daemon=True)
        processor_thread.start()
        
        # 模拟运行一段时间
        print("\n3. 系统运行中...")
        time.sleep(30)
        
        # 显示统计信息
        print("\n4. 统计信息")
        stats = processor.get_stats()
        print(f"总日志数: {stats['total_logs']}")
        print(f"错误日志: {stats['error_logs']}")
        print(f"警告日志: {stats['warn_logs']}")
        print(f"信息日志: {stats['info_logs']}")
        print(f"调试日志: {stats['debug_logs']}")
        print(f"服务列表: {stats['services']}")
        print(f"主机列表: {stats['hosts']}")
        
    except KeyboardInterrupt:
        print("\n收到中断信号,正在停止...")
    
    finally:
        # 停止收集器和处理器
        collector.stop()
        processor.stop()
        print("日志收集系统已停止")

7.2 实时数据处理平台

7.2.1 流处理架构

import json
import time
import threading
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from kafka import KafkaProducer, KafkaConsumer
from collections import defaultdict, deque
import statistics
import logging

@dataclass
class MetricData:
    """指标数据"""
    metric_name: str
    value: float
    timestamp: str
    tags: Dict[str, str]
    source: str

@dataclass
class AlertRule:
    """告警规则"""
    name: str
    metric_name: str
    condition: str  # >, <, >=, <=, ==
    threshold: float
    duration: int  # 持续时间(秒)
    tags_filter: Optional[Dict[str, str]] = None

@dataclass
class Alert:
    """告警"""
    rule_name: str
    metric_name: str
    current_value: float
    threshold: float
    condition: str
    timestamp: str
    tags: Dict[str, str]
    source: str

class MetricAggregator:
    """指标聚合器"""
    
    def __init__(self, window_size: int = 60):
        self.window_size = window_size  # 窗口大小(秒)
        self.metrics_buffer = defaultdict(lambda: deque())
        self.lock = threading.Lock()
        self.logger = logging.getLogger(__name__)
    
    def add_metric(self, metric: MetricData):
        """添加指标"""
        try:
            with self.lock:
                key = f"{metric.metric_name}:{metric.source}"
                
                # 添加到缓冲区
                self.metrics_buffer[key].append({
                    'value': metric.value,
                    'timestamp': datetime.fromisoformat(metric.timestamp),
                    'tags': metric.tags
                })
                
                # 清理过期数据
                self._cleanup_expired_data(key)
        
        except Exception as e:
            self.logger.error(f"添加指标失败: {e}")
    
    def get_aggregated_metrics(self, metric_name: str, source: str, 
                             agg_func: str = 'avg') -> Optional[float]:
        """获取聚合指标"""
        try:
            with self.lock:
                key = f"{metric_name}:{source}"
                
                if key not in self.metrics_buffer or not self.metrics_buffer[key]:
                    return None
                
                values = [item['value'] for item in self.metrics_buffer[key]]
                
                if agg_func == 'avg':
                    return statistics.mean(values)
                elif agg_func == 'sum':
                    return sum(values)
                elif agg_func == 'min':
                    return min(values)
                elif agg_func == 'max':
                    return max(values)
                elif agg_func == 'count':
                    return len(values)
                else:
                    return statistics.mean(values)
        
        except Exception as e:
            self.logger.error(f"获取聚合指标失败: {e}")
            return None
    
    def _cleanup_expired_data(self, key: str):
        """清理过期数据"""
        try:
            cutoff_time = datetime.now() - timedelta(seconds=self.window_size)
            
            while (self.metrics_buffer[key] and 
                   self.metrics_buffer[key][0]['timestamp'] < cutoff_time):
                self.metrics_buffer[key].popleft()
        
        except Exception as e:
            self.logger.error(f"清理过期数据失败: {e}")

class AlertEngine:
    """告警引擎"""
    
    def __init__(self, aggregator: MetricAggregator):
        self.aggregator = aggregator
        self.rules = []
        self.alert_states = {}  # 记录告警状态
        self.logger = logging.getLogger(__name__)
    
    def add_rule(self, rule: AlertRule):
        """添加告警规则"""
        self.rules.append(rule)
        self.alert_states[rule.name] = {
            'triggered': False,
            'trigger_time': None,
            'last_check': None
        }
        self.logger.info(f"添加告警规则: {rule.name}")
    
    def check_alerts(self, metric: MetricData) -> List[Alert]:
        """检查告警"""
        alerts = []
        
        try:
            for rule in self.rules:
                if rule.metric_name != metric.metric_name:
                    continue
                
                # 检查标签过滤
                if rule.tags_filter:
                    if not all(metric.tags.get(k) == v for k, v in rule.tags_filter.items()):
                        continue
                
                # 获取聚合值
                agg_value = self.aggregator.get_aggregated_metrics(
                    metric.metric_name, metric.source, 'avg'
                )
                
                if agg_value is None:
                    continue
                
                # 检查条件
                condition_met = self._evaluate_condition(
                    agg_value, rule.condition, rule.threshold
                )
                
                state = self.alert_states[rule.name]
                current_time = datetime.now()
                
                if condition_met:
                    if not state['triggered']:
                        # 首次触发
                        state['trigger_time'] = current_time
                        state['triggered'] = True
                    
                    # 检查是否达到持续时间
                    if (current_time - state['trigger_time']).total_seconds() >= rule.duration:
                        alert = Alert(
                            rule_name=rule.name,
                            metric_name=rule.metric_name,
                            current_value=agg_value,
                            threshold=rule.threshold,
                            condition=rule.condition,
                            timestamp=current_time.isoformat(),
                            tags=metric.tags,
                            source=metric.source
                        )
                        alerts.append(alert)
                        
                        # 重置状态以避免重复告警
                        state['trigger_time'] = current_time
                
                else:
                    # 条件不满足,重置状态
                    state['triggered'] = False
                    state['trigger_time'] = None
                
                state['last_check'] = current_time
        
        except Exception as e:
            self.logger.error(f"检查告警失败: {e}")
        
        return alerts
    
    def _evaluate_condition(self, value: float, condition: str, threshold: float) -> bool:
        """评估条件"""
        try:
            if condition == '>':
                return value > threshold
            elif condition == '<':
                return value < threshold
            elif condition == '>=':
                return value >= threshold
            elif condition == '<=':
                return value <= threshold
            elif condition == '==':
                return abs(value - threshold) < 0.001  # 浮点数比较
            else:
                return False
        
        except Exception as e:
            self.logger.error(f"评估条件失败: {e}")
            return False

class StreamProcessor:
    """流处理器"""
    
    def __init__(self, kafka_servers: List[str], input_topic: str, 
                 output_topic: str, consumer_group: str = "stream-processor"):
        self.kafka_servers = kafka_servers
        self.input_topic = input_topic
        self.output_topic = output_topic
        self.consumer_group = consumer_group
        
        self.consumer = None
        self.producer = None
        self.aggregator = MetricAggregator(window_size=60)
        self.alert_engine = AlertEngine(self.aggregator)
        
        self.running = False
        self.logger = logging.getLogger(__name__)
        
        # 添加默认告警规则
        self._setup_default_rules()
    
    def _setup_default_rules(self):
        """设置默认告警规则"""
        # CPU使用率告警
        self.alert_engine.add_rule(AlertRule(
            name="high_cpu_usage",
            metric_name="cpu_usage",
            condition=">",
            threshold=80.0,
            duration=60
        ))
        
        # 内存使用率告警
        self.alert_engine.add_rule(AlertRule(
            name="high_memory_usage",
            metric_name="memory_usage",
            condition=">",
            threshold=85.0,
            duration=60
        ))
        
        # 响应时间告警
        self.alert_engine.add_rule(AlertRule(
            name="slow_response_time",
            metric_name="response_time",
            condition=">",
            threshold=1000.0,  # 1秒
            duration=30
        ))
        
        # 错误率告警
        self.alert_engine.add_rule(AlertRule(
            name="high_error_rate",
            metric_name="error_rate",
            condition=">",
            threshold=5.0,  # 5%
            duration=60
        ))
    
    def start(self):
        """启动流处理器"""
        try:
            # 创建消费者
            self.consumer = KafkaConsumer(
                self.input_topic,
                bootstrap_servers=self.kafka_servers,
                group_id=self.consumer_group,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                auto_offset_reset='latest',
                enable_auto_commit=True
            )
            
            # 创建生产者
            self.producer = KafkaProducer(
                bootstrap_servers=self.kafka_servers,
                value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
                acks='all',
                retries=3
            )
            
            self.running = True
            self.logger.info("流处理器启动成功")
            
            # 开始处理消息
            self._process_stream()
        
        except Exception as e:
            self.logger.error(f"启动流处理器失败: {e}")
            raise
    
    def stop(self):
        """停止流处理器"""
        try:
            self.running = False
            
            if self.consumer:
                self.consumer.close()
            
            if self.producer:
                self.producer.close()
            
            self.logger.info("流处理器已停止")
        
        except Exception as e:
            self.logger.error(f"停止流处理器失败: {e}")
    
    def _process_stream(self):
        """处理数据流"""
        try:
            for message in self.consumer:
                if not self.running:
                    break
                
                try:
                    # 解析指标数据
                    metric_data = MetricData(**message.value)
                    
                    # 添加到聚合器
                    self.aggregator.add_metric(metric_data)
                    
                    # 检查告警
                    alerts = self.alert_engine.check_alerts(metric_data)
                    
                    # 发送告警
                    for alert in alerts:
                        self._send_alert(alert)
                    
                    # 处理特定指标
                    processed_data = self._process_metric(metric_data)
                    
                    if processed_data:
                        # 发送处理后的数据
                        self.producer.send(
                            self.output_topic,
                            value=processed_data
                        )
                
                except Exception as e:
                    self.logger.error(f"处理消息失败: {e}")
        
        except Exception as e:
            self.logger.error(f"处理数据流失败: {e}")
    
    def _process_metric(self, metric: MetricData) -> Optional[Dict[str, Any]]:
        """处理指标数据"""
        try:
            # 计算聚合指标
            avg_value = self.aggregator.get_aggregated_metrics(
                metric.metric_name, metric.source, 'avg'
            )
            max_value = self.aggregator.get_aggregated_metrics(
                metric.metric_name, metric.source, 'max'
            )
            min_value = self.aggregator.get_aggregated_metrics(
                metric.metric_name, metric.source, 'min'
            )
            
            if avg_value is None:
                return None
            
            # 创建处理后的数据
            processed_data = {
                'metric_name': metric.metric_name,
                'source': metric.source,
                'timestamp': metric.timestamp,
                'tags': metric.tags,
                'current_value': metric.value,
                'avg_value': avg_value,
                'max_value': max_value,
                'min_value': min_value,
                'window_size': self.aggregator.window_size
            }
            
            # 添加趋势分析
            trend = self._calculate_trend(metric)
            if trend:
                processed_data['trend'] = trend
            
            return processed_data
        
        except Exception as e:
            self.logger.error(f"处理指标失败: {e}")
            return None
    
    def _calculate_trend(self, metric: MetricData) -> Optional[str]:
        """计算趋势"""
        try:
            # 简单的趋势计算逻辑
            key = f"{metric.metric_name}:{metric.source}"
            
            with self.aggregator.lock:
                if key not in self.aggregator.metrics_buffer:
                    return None
                
                buffer = self.aggregator.metrics_buffer[key]
                if len(buffer) < 3:
                    return None
                
                # 取最近3个值计算趋势
                recent_values = [item['value'] for item in list(buffer)[-3:]]
                
                if recent_values[2] > recent_values[1] > recent_values[0]:
                    return "increasing"
                elif recent_values[2] < recent_values[1] < recent_values[0]:
                    return "decreasing"
                else:
                    return "stable"
        
        except Exception as e:
            self.logger.error(f"计算趋势失败: {e}")
            return None
    
    def _send_alert(self, alert: Alert):
        """发送告警"""
        try:
            alert_data = asdict(alert)
            
            # 发送到告警主题
            self.producer.send(
                'alerts',
                value=alert_data
            )
            
            self.logger.warning(f"告警: {alert.rule_name} - {alert.metric_name} "
                              f"{alert.condition} {alert.threshold}, 当前值: {alert.current_value}")
        
        except Exception as e:
            self.logger.error(f"发送告警失败: {e}")

# 数据生成器(模拟指标数据)
class MetricDataGenerator:
    """指标数据生成器"""
    
    def __init__(self, kafka_servers: List[str], topic: str):
        self.kafka_servers = kafka_servers
        self.topic = topic
        self.producer = None
        self.running = False
        self.logger = logging.getLogger(__name__)
    
    def start(self):
        """启动数据生成器"""
        try:
            self.producer = KafkaProducer(
                bootstrap_servers=self.kafka_servers,
                value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8')
            )
            
            self.running = True
            self.logger.info("数据生成器启动成功")
            
            # 开始生成数据
            self._generate_data()
        
        except Exception as e:
            self.logger.error(f"启动数据生成器失败: {e}")
            raise
    
    def stop(self):
        """停止数据生成器"""
        try:
            self.running = False
            if self.producer:
                self.producer.close()
            self.logger.info("数据生成器已停止")
        
        except Exception as e:
            self.logger.error(f"停止数据生成器失败: {e}")
    
    def _generate_data(self):
        """生成模拟数据"""
        import random
        
        sources = ['web-server-01', 'web-server-02', 'db-server-01', 'cache-server-01']
        
        while self.running:
            try:
                for source in sources:
                    # 生成CPU使用率
                    cpu_usage = random.uniform(20, 95)
                    self._send_metric('cpu_usage', cpu_usage, source, {'unit': 'percent'})
                    
                    # 生成内存使用率
                    memory_usage = random.uniform(30, 90)
                    self._send_metric('memory_usage', memory_usage, source, {'unit': 'percent'})
                    
                    # 生成响应时间
                    response_time = random.uniform(100, 2000)
                    self._send_metric('response_time', response_time, source, {'unit': 'ms'})
                    
                    # 生成错误率
                    error_rate = random.uniform(0, 10)
                    self._send_metric('error_rate', error_rate, source, {'unit': 'percent'})
                
                time.sleep(5)  # 每5秒生成一次数据
            
            except Exception as e:
                self.logger.error(f"生成数据失败: {e}")
    
    def _send_metric(self, metric_name: str, value: float, source: str, tags: Dict[str, str]):
        """发送指标"""
        try:
            metric = MetricData(
                metric_name=metric_name,
                value=value,
                timestamp=datetime.now().isoformat(),
                tags=tags,
                source=source
            )
            
            self.producer.send(
                self.topic,
                value=asdict(metric)
            )
        
        except Exception as e:
            self.logger.error(f"发送指标失败: {e}")

# 使用示例
if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== 实时数据处理平台 ===")
    
    # 启动数据生成器
    print("\n1. 启动数据生成器")
    generator = MetricDataGenerator(
        kafka_servers=['localhost:9092'],
        topic='metrics'
    )
    
    generator_thread = threading.Thread(target=generator.start, daemon=True)
    generator_thread.start()
    
    # 启动流处理器
    print("\n2. 启动流处理器")
    processor = StreamProcessor(
        kafka_servers=['localhost:9092'],
        input_topic='metrics',
        output_topic='processed-metrics',
        consumer_group='stream-processor-group'
    )
    
    processor_thread = threading.Thread(target=processor.start, daemon=True)
    processor_thread.start()
    
    try:
        print("\n3. 系统运行中...")
        print("按 Ctrl+C 停止系统")
        
        while True:
            time.sleep(10)
            print(f"系统运行中... {datetime.now().strftime('%H:%M:%S')}")
    
    except KeyboardInterrupt:
        print("\n收到中断信号,正在停止...")
    
    finally:
        # 停止所有组件
        generator.stop()
        processor.stop()
        print("实时数据处理平台已停止")

7.3 微服务事件驱动架构

7.3.1 事件驱动架构设计

import json
import time
import uuid
import threading
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
from kafka import KafkaProducer, KafkaConsumer
import logging

class EventType(Enum):
    """事件类型"""
    USER_REGISTERED = "user.registered"
    USER_UPDATED = "user.updated"
    ORDER_CREATED = "order.created"
    ORDER_UPDATED = "order.updated"
    ORDER_CANCELLED = "order.cancelled"
    PAYMENT_PROCESSED = "payment.processed"
    PAYMENT_FAILED = "payment.failed"
    INVENTORY_UPDATED = "inventory.updated"
    NOTIFICATION_SENT = "notification.sent"

@dataclass
class Event:
    """事件基类"""
    event_id: str
    event_type: str
    timestamp: str
    source_service: str
    version: str
    data: Dict[str, Any]
    correlation_id: Optional[str] = None
    causation_id: Optional[str] = None

class EventBus:
    """事件总线"""
    
    def __init__(self, kafka_servers: List[str], service_name: str):
        self.kafka_servers = kafka_servers
        self.service_name = service_name
        self.producer = None
        self.consumers = {}
        self.event_handlers = {}
        self.running = False
        self.logger = logging.getLogger(__name__)
    
    def start(self):
        """启动事件总线"""
        try:
            # 创建生产者
            self.producer = KafkaProducer(
                bootstrap_servers=self.kafka_servers,
                value_serializer=lambda v: json.dumps(v, ensure_ascii=False).encode('utf-8'),
                key_serializer=lambda k: k.encode('utf-8') if k else None,
                acks='all',
                retries=3,
                enable_idempotence=True
            )
            
            self.running = True
            self.logger.info(f"事件总线启动成功 - 服务: {self.service_name}")
        
        except Exception as e:
            self.logger.error(f"启动事件总线失败: {e}")
            raise
    
    def stop(self):
        """停止事件总线"""
        try:
            self.running = False
            
            # 停止所有消费者
            for consumer in self.consumers.values():
                consumer.close()
            
            # 关闭生产者
            if self.producer:
                self.producer.close()
            
            self.logger.info("事件总线已停止")
        
        except Exception as e:
            self.logger.error(f"停止事件总线失败: {e}")
    
    def publish(self, event: Event, topic: str = None):
        """发布事件"""
        try:
            if not topic:
                topic = self._get_topic_for_event(event.event_type)
            
            # 设置事件元数据
            if not event.event_id:
                event.event_id = str(uuid.uuid4())
            
            if not event.timestamp:
                event.timestamp = datetime.now().isoformat()
            
            if not event.source_service:
                event.source_service = self.service_name
            
            # 发送事件
            future = self.producer.send(
                topic,
                value=asdict(event),
                key=event.event_id
            )
            
            # 等待发送完成
            record_metadata = future.get(timeout=10)
            
            self.logger.info(f"事件发布成功: {event.event_type} -> {topic} "
                           f"(partition={record_metadata.partition}, offset={record_metadata.offset})")
        
        except Exception as e:
            self.logger.error(f"发布事件失败: {e}")
            raise
    
    def subscribe(self, event_type: str, handler: Callable[[Event], None], 
                 consumer_group: str = None):
        """订阅事件"""
        try:
            if not consumer_group:
                consumer_group = f"{self.service_name}-{event_type}"
            
            topic = self._get_topic_for_event(event_type)
            
            # 创建消费者
            consumer = KafkaConsumer(
                topic,
                bootstrap_servers=self.kafka_servers,
                group_id=consumer_group,
                value_deserializer=lambda m: json.loads(m.decode('utf-8')),
                auto_offset_reset='latest',
                enable_auto_commit=True
            )
            
            self.consumers[event_type] = consumer
            self.event_handlers[event_type] = handler
            
            # 启动消费者线程
            thread = threading.Thread(
                target=self._consume_events,
                args=(event_type, consumer, handler),
                daemon=True
            )
            thread.start()
            
            self.logger.info(f"订阅事件成功: {event_type} -> {topic}")
        
        except Exception as e:
            self.logger.error(f"订阅事件失败: {e}")
            raise
    
    def _consume_events(self, event_type: str, consumer: KafkaConsumer, 
                       handler: Callable[[Event], None]):
        """消费事件"""
        try:
            for message in consumer:
                if not self.running:
                    break
                
                try:
                    # 解析事件
                    event_data = message.value
                    event = Event(**event_data)
                    
                    # 过滤事件类型
                    if event.event_type == event_type:
                        # 调用处理器
                        handler(event)
                        
                        self.logger.debug(f"处理事件: {event.event_type} - {event.event_id}")
                
                except Exception as e:
                    self.logger.error(f"处理事件失败: {e}")
        
        except Exception as e:
            self.logger.error(f"消费事件失败: {e}")
    
    def _get_topic_for_event(self, event_type: str) -> str:
        """获取事件对应的主题"""
        # 根据事件类型映射到主题
        topic_mapping = {
            'user.registered': 'user-events',
            'user.updated': 'user-events',
            'order.created': 'order-events',
            'order.updated': 'order-events',
            'order.cancelled': 'order-events',
            'payment.processed': 'payment-events',
            'payment.failed': 'payment-events',
            'inventory.updated': 'inventory-events',
            'notification.sent': 'notification-events'
        }
        
        return topic_mapping.get(event_type, 'default-events')

### 7.3.2 用户服务

class UserService:
    """用户服务"""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.users = {}  # 简单的内存存储
        self.logger = logging.getLogger(__name__)
        
        # 订阅相关事件
        self._setup_event_handlers()
    
    def _setup_event_handlers(self):
        """设置事件处理器"""
        # 这里可以订阅其他服务的事件
        pass
    
    def register_user(self, user_data: Dict[str, Any]) -> str:
        """注册用户"""
        try:
            user_id = str(uuid.uuid4())
            
            # 创建用户
            user = {
                'user_id': user_id,
                'email': user_data['email'],
                'name': user_data['name'],
                'created_at': datetime.now().isoformat(),
                'status': 'active'
            }
            
            self.users[user_id] = user
            
            # 发布用户注册事件
            event = Event(
                event_id=str(uuid.uuid4()),
                event_type=EventType.USER_REGISTERED.value,
                timestamp=datetime.now().isoformat(),
                source_service='user-service',
                version='1.0',
                data=user
            )
            
            self.event_bus.publish(event)
            
            self.logger.info(f"用户注册成功: {user_id}")
            return user_id
        
        except Exception as e:
            self.logger.error(f"用户注册失败: {e}")
            raise
    
    def update_user(self, user_id: str, update_data: Dict[str, Any]):
        """更新用户"""
        try:
            if user_id not in self.users:
                raise ValueError(f"用户不存在: {user_id}")
            
            # 更新用户信息
            user = self.users[user_id]
            user.update(update_data)
            user['updated_at'] = datetime.now().isoformat()
            
            # 发布用户更新事件
            event = Event(
                event_id=str(uuid.uuid4()),
                event_type=EventType.USER_UPDATED.value,
                timestamp=datetime.now().isoformat(),
                source_service='user-service',
                version='1.0',
                data={
                    'user_id': user_id,
                    'updated_fields': list(update_data.keys()),
                    'user': user
                }
            )
            
            self.event_bus.publish(event)
            
            self.logger.info(f"用户更新成功: {user_id}")
        
        except Exception as e:
            self.logger.error(f"用户更新失败: {e}")
            raise
    
    def get_user(self, user_id: str) -> Optional[Dict[str, Any]]:
        """获取用户"""
        return self.users.get(user_id)

### 7.3.3 订单服务

class OrderService:
    """订单服务"""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.orders = {}  # 简单的内存存储
        self.logger = logging.getLogger(__name__)
        
        # 订阅相关事件
        self._setup_event_handlers()
    
    def _setup_event_handlers(self):
        """设置事件处理器"""
        # 订阅支付事件
        self.event_bus.subscribe(
            EventType.PAYMENT_PROCESSED.value,
            self._handle_payment_processed
        )
        
        self.event_bus.subscribe(
            EventType.PAYMENT_FAILED.value,
            self._handle_payment_failed
        )
    
    def create_order(self, order_data: Dict[str, Any]) -> str:
        """创建订单"""
        try:
            order_id = str(uuid.uuid4())
            
            # 创建订单
            order = {
                'order_id': order_id,
                'user_id': order_data['user_id'],
                'items': order_data['items'],
                'total_amount': order_data['total_amount'],
                'status': 'pending',
                'created_at': datetime.now().isoformat()
            }
            
            self.orders[order_id] = order
            
            # 发布订单创建事件
            event = Event(
                event_id=str(uuid.uuid4()),
                event_type=EventType.ORDER_CREATED.value,
                timestamp=datetime.now().isoformat(),
                source_service='order-service',
                version='1.0',
                data=order
            )
            
            self.event_bus.publish(event)
            
            self.logger.info(f"订单创建成功: {order_id}")
            return order_id
        
        except Exception as e:
            self.logger.error(f"订单创建失败: {e}")
            raise
    
    def cancel_order(self, order_id: str):
        """取消订单"""
        try:
            if order_id not in self.orders:
                raise ValueError(f"订单不存在: {order_id}")
            
            order = self.orders[order_id]
            
            if order['status'] not in ['pending', 'confirmed']:
                raise ValueError(f"订单状态不允许取消: {order['status']}")
            
            # 更新订单状态
            order['status'] = 'cancelled'
            order['cancelled_at'] = datetime.now().isoformat()
            
            # 发布订单取消事件
            event = Event(
                event_id=str(uuid.uuid4()),
                event_type=EventType.ORDER_CANCELLED.value,
                timestamp=datetime.now().isoformat(),
                source_service='order-service',
                version='1.0',
                data={
                    'order_id': order_id,
                    'user_id': order['user_id'],
                    'total_amount': order['total_amount']
                }
            )
            
            self.event_bus.publish(event)
            
            self.logger.info(f"订单取消成功: {order_id}")
        
        except Exception as e:
            self.logger.error(f"订单取消失败: {e}")
            raise
    
    def _handle_payment_processed(self, event: Event):
        """处理支付成功事件"""
        try:
            payment_data = event.data
            order_id = payment_data['order_id']
            
            if order_id in self.orders:
                order = self.orders[order_id]
                order['status'] = 'paid'
                order['paid_at'] = datetime.now().isoformat()
                
                # 发布订单更新事件
                update_event = Event(
                    event_id=str(uuid.uuid4()),
                    event_type=EventType.ORDER_UPDATED.value,
                    timestamp=datetime.now().isoformat(),
                    source_service='order-service',
                    version='1.0',
                    data={
                        'order_id': order_id,
                        'status': 'paid',
                        'order': order
                    },
                    correlation_id=event.correlation_id,
                    causation_id=event.event_id
                )
                
                self.event_bus.publish(update_event)
                
                self.logger.info(f"订单支付成功: {order_id}")
        
        except Exception as e:
            self.logger.error(f"处理支付成功事件失败: {e}")
    
    def _handle_payment_failed(self, event: Event):
        """处理支付失败事件"""
        try:
            payment_data = event.data
            order_id = payment_data['order_id']
            
            if order_id in self.orders:
                order = self.orders[order_id]
                order['status'] = 'payment_failed'
                order['payment_failed_at'] = datetime.now().isoformat()
                
                # 发布订单更新事件
                update_event = Event(
                    event_id=str(uuid.uuid4()),
                    event_type=EventType.ORDER_UPDATED.value,
                    timestamp=datetime.now().isoformat(),
                    source_service='order-service',
                    version='1.0',
                    data={
                        'order_id': order_id,
                        'status': 'payment_failed',
                        'order': order
                    },
                    correlation_id=event.correlation_id,
                    causation_id=event.event_id
                )
                
                self.event_bus.publish(update_event)
                
                self.logger.warning(f"订单支付失败: {order_id}")
        
        except Exception as e:
            self.logger.error(f"处理支付失败事件失败: {e}")
    
    def get_order(self, order_id: str) -> Optional[Dict[str, Any]]:
        """获取订单"""
        return self.orders.get(order_id)

### 7.3.4 支付服务

class PaymentService:
    """支付服务"""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.payments = {}  # 简单的内存存储
        self.logger = logging.getLogger(__name__)
        
        # 订阅相关事件
        self._setup_event_handlers()
    
    def _setup_event_handlers(self):
        """设置事件处理器"""
        # 订阅订单创建事件
        self.event_bus.subscribe(
            EventType.ORDER_CREATED.value,
            self._handle_order_created
        )
    
    def process_payment(self, payment_data: Dict[str, Any]) -> str:
        """处理支付"""
        try:
            payment_id = str(uuid.uuid4())
            
            # 模拟支付处理
            import random
            success = random.choice([True, True, True, False])  # 75%成功率
            
            payment = {
                'payment_id': payment_id,
                'order_id': payment_data['order_id'],
                'amount': payment_data['amount'],
                'method': payment_data.get('method', 'credit_card'),
                'status': 'success' if success else 'failed',
                'processed_at': datetime.now().isoformat()
            }
            
            self.payments[payment_id] = payment
            
            # 发布支付事件
            if success:
                event_type = EventType.PAYMENT_PROCESSED.value
                self.logger.info(f"支付处理成功: {payment_id}")
            else:
                event_type = EventType.PAYMENT_FAILED.value
                payment['error_message'] = "支付网关错误"
                self.logger.warning(f"支付处理失败: {payment_id}")
            
            event = Event(
                event_id=str(uuid.uuid4()),
                event_type=event_type,
                timestamp=datetime.now().isoformat(),
                source_service='payment-service',
                version='1.0',
                data=payment
            )
            
            self.event_bus.publish(event)
            
            return payment_id
        
        except Exception as e:
            self.logger.error(f"支付处理失败: {e}")
            raise
    
    def _handle_order_created(self, event: Event):
        """处理订单创建事件"""
        try:
            order_data = event.data
            
            # 自动处理支付(在实际场景中,这通常由用户触发)
            payment_data = {
                'order_id': order_data['order_id'],
                'amount': order_data['total_amount'],
                'method': 'auto_payment'
            }
            
            # 延迟处理支付(模拟异步处理)
            def delayed_payment():
                time.sleep(2)  # 模拟处理时间
                self.process_payment(payment_data)
            
            thread = threading.Thread(target=delayed_payment, daemon=True)
            thread.start()
            
            self.logger.info(f"开始处理订单支付: {order_data['order_id']}")
        
        except Exception as e:
            self.logger.error(f"处理订单创建事件失败: {e}")

### 7.3.5 通知服务

class NotificationService:
    """通知服务"""
    
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
        self.notifications = {}  # 简单的内存存储
        self.logger = logging.getLogger(__name__)
        
        # 订阅相关事件
        self._setup_event_handlers()
    
    def _setup_event_handlers(self):
        """设置事件处理器"""
        # 订阅用户注册事件
        self.event_bus.subscribe(
            EventType.USER_REGISTERED.value,
            self._handle_user_registered
        )
        
        # 订阅订单事件
        self.event_bus.subscribe(
            EventType.ORDER_CREATED.value,
            self._handle_order_created
        )
        
        self.event_bus.subscribe(
            EventType.ORDER_UPDATED.value,
            self._handle_order_updated
        )
        
        # 订阅支付事件
        self.event_bus.subscribe(
            EventType.PAYMENT_PROCESSED.value,
            self._handle_payment_processed
        )
        
        self.event_bus.subscribe(
            EventType.PAYMENT_FAILED.value,
            self._handle_payment_failed
        )
    
    def send_notification(self, notification_data: Dict[str, Any]) -> str:
        """发送通知"""
        try:
            notification_id = str(uuid.uuid4())
            
            notification = {
                'notification_id': notification_id,
                'user_id': notification_data['user_id'],
                'type': notification_data['type'],
                'title': notification_data['title'],
                'message': notification_data['message'],
                'channel': notification_data.get('channel', 'email'),
                'status': 'sent',
                'sent_at': datetime.now().isoformat()
            }
            
            self.notifications[notification_id] = notification
            
            # 模拟发送通知
            self._send_notification(notification)
            
            # 发布通知发送事件
            event = Event(
                event_id=str(uuid.uuid4()),
                event_type=EventType.NOTIFICATION_SENT.value,
                timestamp=datetime.now().isoformat(),
                source_service='notification-service',
                version='1.0',
                data=notification
            )
            
            self.event_bus.publish(event)
            
            self.logger.info(f"通知发送成功: {notification_id}")
            return notification_id
        
        except Exception as e:
            self.logger.error(f"发送通知失败: {e}")
            raise
    
    def _send_notification(self, notification: Dict[str, Any]):
        """实际发送通知(模拟)"""
        channel = notification['channel']
        title = notification['title']
        message = notification['message']
        
        print(f"📧 [{channel.upper()}] {title}: {message}")
    
    def _handle_user_registered(self, event: Event):
        """处理用户注册事件"""
        try:
            user_data = event.data
            
            notification_data = {
                'user_id': user_data['user_id'],
                'type': 'welcome',
                'title': '欢迎注册',
                'message': f"欢迎 {user_data['name']} 注册我们的服务!",
                'channel': 'email'
            }
            
            self.send_notification(notification_data)
        
        except Exception as e:
            self.logger.error(f"处理用户注册事件失败: {e}")
    
    def _handle_order_created(self, event: Event):
        """处理订单创建事件"""
        try:
            order_data = event.data
            
            notification_data = {
                'user_id': order_data['user_id'],
                'type': 'order_confirmation',
                'title': '订单确认',
                'message': f"您的订单 {order_data['order_id']} 已创建,金额: ¥{order_data['total_amount']}",
                'channel': 'email'
            }
            
            self.send_notification(notification_data)
        
        except Exception as e:
            self.logger.error(f"处理订单创建事件失败: {e}")
    
    def _handle_order_updated(self, event: Event):
        """处理订单更新事件"""
        try:
            order_data = event.data
            order = order_data['order']
            status = order_data['status']
            
            if status == 'paid':
                notification_data = {
                    'user_id': order['user_id'],
                    'type': 'payment_success',
                    'title': '支付成功',
                    'message': f"您的订单 {order['order_id']} 支付成功!",
                    'channel': 'sms'
                }
                
                self.send_notification(notification_data)
        
        except Exception as e:
            self.logger.error(f"处理订单更新事件失败: {e}")
    
    def _handle_payment_processed(self, event: Event):
        """处理支付成功事件"""
        # 订单更新事件已经处理了支付成功通知
        pass
    
    def _handle_payment_failed(self, event: Event):
        """处理支付失败事件"""
        try:
            payment_data = event.data
            
            # 这里需要获取用户ID,实际场景中可能需要查询订单服务
            # 为了简化,我们假设可以从其他地方获取
            notification_data = {
                'user_id': 'unknown',  # 实际场景中需要查询
                'type': 'payment_failed',
                'title': '支付失败',
                'message': f"订单 {payment_data['order_id']} 支付失败,请重试",
                'channel': 'email'
            }
            
            # 在实际场景中,这里应该查询订单服务获取用户ID
            # self.send_notification(notification_data)
            
            self.logger.warning(f"支付失败通知: {payment_data['order_id']}")
        
        except Exception as e:
            self.logger.error(f"处理支付失败事件失败: {e}")

# 使用示例
if __name__ == "__main__":
    # 配置日志
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )
    
    print("=== 微服务事件驱动架构 ===")
    
    # 创建事件总线
    print("\n1. 初始化事件总线")
    user_event_bus = EventBus(['localhost:9092'], 'user-service')
    order_event_bus = EventBus(['localhost:9092'], 'order-service')
    payment_event_bus = EventBus(['localhost:9092'], 'payment-service')
    notification_event_bus = EventBus(['localhost:9092'], 'notification-service')
    
    # 启动事件总线
    user_event_bus.start()
    order_event_bus.start()
    payment_event_bus.start()
    notification_event_bus.start()
    
    # 创建服务
    print("\n2. 初始化微服务")
    user_service = UserService(user_event_bus)
    order_service = OrderService(order_event_bus)
    payment_service = PaymentService(payment_event_bus)
    notification_service = NotificationService(notification_event_bus)
    
    try:
        print("\n3. 模拟业务流程")
        
        # 用户注册
        print("\n3.1 用户注册")
        user_id = user_service.register_user({
            'email': 'user@example.com',
            'name': '张三'
        })
        
        time.sleep(2)  # 等待事件处理
        
        # 创建订单
        print("\n3.2 创建订单")
        order_id = order_service.create_order({
            'user_id': user_id,
            'items': [
                {'product_id': 'p1', 'quantity': 2, 'price': 100},
                {'product_id': 'p2', 'quantity': 1, 'price': 50}
            ],
            'total_amount': 250
        })
        
        time.sleep(5)  # 等待支付处理
        
        # 查看订单状态
        print("\n3.3 查看订单状态")
        order = order_service.get_order(order_id)
        print(f"订单状态: {order['status']}")
        
        # 模拟更多业务流程
        print("\n3.4 创建更多订单")
        for i in range(3):
            order_id = order_service.create_order({
                'user_id': user_id,
                'items': [{'product_id': f'p{i}', 'quantity': 1, 'price': 100}],
                'total_amount': 100
            })
            time.sleep(1)
        
        print("\n4. 系统运行中...")
        time.sleep(10)
        
    except KeyboardInterrupt:
        print("\n收到中断信号,正在停止...")
    
    finally:
        # 停止所有事件总线
        user_event_bus.stop()
        order_event_bus.stop()
        payment_event_bus.stop()
        notification_event_bus.stop()
        print("微服务事件驱动架构已停止")

7.4 章节总结

7.4.1 核心知识点

  1. 日志收集系统

    • 文件监控和实时日志收集
    • 多格式日志解析(JSON、Apache、Nginx等)
    • 批量处理和缓冲机制
    • 日志分析和异常检测
    • 告警机制和统计分析
  2. 实时数据处理平台

    • 流式数据处理架构
    • 指标聚合和窗口计算
    • 实时告警引擎
    • 趋势分析和异常检测
    • 多维度数据分析
  3. 微服务事件驱动架构

    • 事件总线设计模式
    • 服务间异步通信
    • 事件溯源和CQRS
    • 最终一致性保证
    • 分布式事务处理

7.4.2 最佳实践

  1. 系统设计原则

    • 高可用性和容错性
    • 水平扩展能力
    • 监控和可观测性
    • 数据一致性保证
  2. 性能优化

    • 批量处理减少网络开销
    • 异步处理提高吞吐量
    • 缓存机制减少延迟
    • 负载均衡和分区策略
  3. 运维管理

    • 完善的日志记录
    • 健康检查和监控
    • 优雅的启停机制
    • 错误处理和恢复

7.4.3 练习题

  1. 基础练习

    • 实现一个简单的日志收集器,支持多种日志格式
    • 创建一个实时指标监控系统
    • 设计一个事件驱动的订单处理流程
  2. 进阶练习

    • 实现分布式日志聚合系统
    • 构建实时数据分析平台
    • 设计微服务架构的事务处理机制
  3. 项目实战

    • 构建完整的电商平台事件驱动架构
    • 实现大规模日志处理和分析系统
    • 设计高可用的实时监控平台

通过这些实战案例,我们可以看到Kafka在不同场景下的强大应用能力。无论是日志收集、实时数据处理,还是微服务架构,Kafka都能提供可靠、高性能的消息传递服务,是构建现代分布式系统的重要基础设施。 “`