8.1 发布订阅概述

8.1.1 什么是发布订阅

Redis发布订阅(Pub/Sub)是一种消息通信模式,发送者(发布者)发送消息,接收者(订阅者)接收消息。发布者和订阅者之间通过频道(Channel)进行解耦。

8.1.2 核心概念

  • 发布者(Publisher): 发送消息的客户端
  • 订阅者(Subscriber): 接收消息的客户端
  • 频道(Channel): 消息传递的通道
  • 模式订阅(Pattern Subscribe): 支持通配符的频道订阅

8.1.3 特点

  • 实时性: 消息实时推送,延迟极低
  • 解耦性: 发布者和订阅者互不感知
  • 多对多: 支持多个发布者和订阅者
  • 无持久化: 消息不会被持久化存储
  • 即发即失: 如果没有订阅者,消息会丢失

8.1.4 应用场景

  • 实时通知: 系统状态变更通知
  • 聊天系统: 实时消息推送
  • 事件驱动: 微服务间事件通信
  • 缓存失效: 缓存更新通知
  • 监控告警: 系统监控和告警
  • 直播弹幕: 实时弹幕推送

8.2 基本命令

8.2.1 发布消息

# 发布消息到频道
PUBLISH channel message
PUBLISH news "Breaking: Redis 7.0 Released!"
PUBLISH chat:room1 "Hello, everyone!"
PUBLISH notifications '{"type":"alert","message":"System maintenance"}'

# 返回值是接收到消息的订阅者数量
PUBLISH empty_channel "test"  # 返回0,没有订阅者

8.2.2 订阅频道

# 订阅一个或多个频道
SUBSCRIBE channel [channel ...]
SUBSCRIBE news
SUBSCRIBE chat:room1 chat:room2 notifications

# 订阅后会收到确认消息
# 1) "subscribe"
# 2) "news"
# 3) (integer) 1  # 当前订阅的频道数

# 接收消息格式
# 1) "message"
# 2) "news"  # 频道名
# 3) "Breaking: Redis 7.0 Released!"  # 消息内容

8.2.3 模式订阅

# 使用通配符订阅多个频道
PSUBSCRIBE pattern [pattern ...]
PSUBSCRIBE chat:*        # 订阅所有以chat:开头的频道
PSUBSCRIBE news.*        # 订阅所有以news.开头的频道
PSUBSCRIBE user:*:login  # 订阅用户登录事件

# 模式订阅的消息格式
# 1) "pmessage"
# 2) "chat:*"     # 匹配的模式
# 3) "chat:room1" # 实际的频道名
# 4) "Hello!"     # 消息内容

8.2.4 取消订阅

# 取消订阅指定频道
UNSUBSCRIBE [channel [channel ...]]
UNSUBSCRIBE news
UNSUBSCRIBE  # 取消所有频道订阅

# 取消模式订阅
PUNSUBSCRIBE [pattern [pattern ...]]
PUNSUBSCRIBE chat:*
PUNSUBSCRIBE  # 取消所有模式订阅

8.2.5 查看订阅信息

# 查看活跃的频道(至少有一个订阅者)
PUBSUB CHANNELS [pattern]
PUBSUB CHANNELS        # 查看所有活跃频道
PUBSUB CHANNELS chat:* # 查看匹配模式的活跃频道

# 查看频道的订阅者数量
PUBSUB NUMSUB [channel [channel ...]]
PUBSUB NUMSUB news chat:room1

# 查看模式订阅的数量
PUBSUB NUMPAT

8.3 Python实现

8.3.1 基础发布订阅类

import redis
import json
import threading
import time
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum

class MessageType(Enum):
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    MESSAGE = "message"
    PMESSAGE = "pmessage"
    PSUBSCRIBE = "psubscribe"
    PUNSUBSCRIBE = "punsubscribe"

@dataclass
class PubSubMessage:
    type: MessageType
    channel: str
    data: Any
    pattern: Optional[str] = None
    timestamp: datetime = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now()

class RedisPublisher:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def publish(self, channel: str, message: Any) -> int:
        """发布消息到频道"""
        if isinstance(message, (dict, list)):
            message = json.dumps(message, ensure_ascii=False)
        elif not isinstance(message, str):
            message = str(message)
        
        return self.redis_client.publish(channel, message)
    
    def publish_json(self, channel: str, data: Dict[str, Any]) -> int:
        """发布JSON消息"""
        message = json.dumps(data, ensure_ascii=False)
        return self.redis_client.publish(channel, message)
    
    def publish_event(self, event_type: str, event_data: Dict[str, Any]) -> int:
        """发布事件消息"""
        event = {
            'type': event_type,
            'data': event_data,
            'timestamp': datetime.now().isoformat()
        }
        return self.publish_json(f"events:{event_type}", event)
    
    def broadcast(self, channels: List[str], message: Any) -> Dict[str, int]:
        """广播消息到多个频道"""
        results = {}
        for channel in channels:
            results[channel] = self.publish(channel, message)
        return results

class RedisSubscriber:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.pubsub = self.redis_client.pubsub()
        self.message_handlers: Dict[str, List[Callable]] = {}
        self.pattern_handlers: Dict[str, List[Callable]] = {}
        self.running = False
        self.thread = None
    
    def subscribe(self, channel: str, handler: Callable[[PubSubMessage], None] = None):
        """订阅频道"""
        self.pubsub.subscribe(channel)
        
        if handler:
            if channel not in self.message_handlers:
                self.message_handlers[channel] = []
            self.message_handlers[channel].append(handler)
    
    def psubscribe(self, pattern: str, handler: Callable[[PubSubMessage], None] = None):
        """模式订阅"""
        self.pubsub.psubscribe(pattern)
        
        if handler:
            if pattern not in self.pattern_handlers:
                self.pattern_handlers[pattern] = []
            self.pattern_handlers[pattern].append(handler)
    
    def unsubscribe(self, channel: str = None):
        """取消订阅"""
        if channel:
            self.pubsub.unsubscribe(channel)
            if channel in self.message_handlers:
                del self.message_handlers[channel]
        else:
            self.pubsub.unsubscribe()
            self.message_handlers.clear()
    
    def punsubscribe(self, pattern: str = None):
        """取消模式订阅"""
        if pattern:
            self.pubsub.punsubscribe(pattern)
            if pattern in self.pattern_handlers:
                del self.pattern_handlers[pattern]
        else:
            self.pubsub.punsubscribe()
            self.pattern_handlers.clear()
    
    def add_handler(self, channel: str, handler: Callable[[PubSubMessage], None]):
        """添加消息处理器"""
        if channel not in self.message_handlers:
            self.message_handlers[channel] = []
        self.message_handlers[channel].append(handler)
    
    def add_pattern_handler(self, pattern: str, handler: Callable[[PubSubMessage], None]):
        """添加模式处理器"""
        if pattern not in self.pattern_handlers:
            self.pattern_handlers[pattern] = []
        self.pattern_handlers[pattern].append(handler)
    
    def _process_message(self, raw_message: Dict[str, Any]):
        """处理接收到的消息"""
        msg_type = MessageType(raw_message['type'])
        
        if msg_type in [MessageType.SUBSCRIBE, MessageType.UNSUBSCRIBE, 
                       MessageType.PSUBSCRIBE, MessageType.PUNSUBSCRIBE]:
            # 订阅/取消订阅确认消息
            return
        
        # 构造消息对象
        if msg_type == MessageType.MESSAGE:
            message = PubSubMessage(
                type=msg_type,
                channel=raw_message['channel'].decode('utf-8'),
                data=raw_message['data'].decode('utf-8')
            )
            
            # 调用频道处理器
            handlers = self.message_handlers.get(message.channel, [])
            for handler in handlers:
                try:
                    handler(message)
                except Exception as e:
                    print(f"Handler error for channel {message.channel}: {e}")
        
        elif msg_type == MessageType.PMESSAGE:
            pattern = raw_message['pattern'].decode('utf-8')
            message = PubSubMessage(
                type=msg_type,
                channel=raw_message['channel'].decode('utf-8'),
                data=raw_message['data'].decode('utf-8'),
                pattern=pattern
            )
            
            # 调用模式处理器
            handlers = self.pattern_handlers.get(pattern, [])
            for handler in handlers:
                try:
                    handler(message)
                except Exception as e:
                    print(f"Handler error for pattern {pattern}: {e}")
    
    def start_listening(self):
        """开始监听消息"""
        if self.running:
            return
        
        self.running = True
        self.thread = threading.Thread(target=self._listen_loop, daemon=True)
        self.thread.start()
    
    def _listen_loop(self):
        """消息监听循环"""
        while self.running:
            try:
                message = self.pubsub.get_message(timeout=1.0)
                if message:
                    self._process_message(message)
            except Exception as e:
                print(f"Listen loop error: {e}")
                time.sleep(1)
    
    def stop_listening(self):
        """停止监听"""
        self.running = False
        if self.thread:
            self.thread.join(timeout=5)
    
    def close(self):
        """关闭订阅者"""
        self.stop_listening()
        self.pubsub.close()

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=False)  # 注意:这里不要decode_responses
    
    # 创建发布者
    publisher = RedisPublisher(redis_client)
    
    # 创建订阅者
    subscriber = RedisSubscriber(redis_client)
    
    # 定义消息处理器
    def news_handler(message: PubSubMessage):
        print(f"[NEWS] {message.timestamp}: {message.data}")
    
    def chat_handler(message: PubSubMessage):
        print(f"[CHAT] {message.channel}: {message.data}")
    
    def event_handler(message: PubSubMessage):
        try:
            event_data = json.loads(message.data)
            print(f"[EVENT] {event_data['type']}: {event_data['data']}")
        except json.JSONDecodeError:
            print(f"[EVENT] Invalid JSON: {message.data}")
    
    # 订阅频道
    subscriber.subscribe("news", news_handler)
    subscriber.psubscribe("chat:*", chat_handler)
    subscriber.psubscribe("events:*", event_handler)
    
    # 开始监听
    subscriber.start_listening()
    
    # 发布消息
    time.sleep(1)  # 等待订阅生效
    
    publisher.publish("news", "Redis发布订阅功能介绍")
    publisher.publish("chat:room1", "大家好!")
    publisher.publish("chat:room2", "欢迎来到聊天室")
    
    # 发布事件
    publisher.publish_event("user_login", {
        "user_id": "user123",
        "ip": "192.168.1.100",
        "timestamp": datetime.now().isoformat()
    })
    
    # 等待消息处理
    time.sleep(2)
    
    # 清理
    subscriber.close()

8.3.2 聊天室系统

import redis
import json
import uuid
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum

class MessageType(Enum):
    TEXT = "text"
    IMAGE = "image"
    FILE = "file"
    SYSTEM = "system"
    EMOJI = "emoji"

class UserStatus(Enum):
    ONLINE = "online"
    OFFLINE = "offline"
    AWAY = "away"
    BUSY = "busy"

@dataclass
class ChatUser:
    user_id: str
    username: str
    avatar: str = ""
    status: UserStatus = UserStatus.ONLINE
    last_seen: datetime = None
    
    def __post_init__(self):
        if self.last_seen is None:
            self.last_seen = datetime.now()

@dataclass
class ChatMessage:
    message_id: str
    room_id: str
    user_id: str
    username: str
    message_type: MessageType
    content: str
    timestamp: datetime
    reply_to: Optional[str] = None
    metadata: Optional[Dict] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        data['timestamp'] = self.timestamp.isoformat()
        data['message_type'] = self.message_type.value
        return data
    
    @classmethod
    def from_dict(cls, data: Dict) -> 'ChatMessage':
        data['timestamp'] = datetime.fromisoformat(data['timestamp'])
        data['message_type'] = MessageType(data['message_type'])
        return cls(**data)

@dataclass
class ChatRoom:
    room_id: str
    name: str
    description: str = ""
    created_by: str = ""
    created_at: datetime = None
    max_users: int = 100
    is_private: bool = False
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()

class ChatSystem:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.publisher = RedisPublisher(redis_client)
        self.rooms_key = "chat:rooms"
        self.users_key = "chat:users"
        self.room_users_prefix = "chat:room_users"
        self.user_rooms_prefix = "chat:user_rooms"
        self.message_history_prefix = "chat:history"
    
    def create_room(self, room: ChatRoom) -> bool:
        """创建聊天室"""
        room_data = asdict(room)
        room_data['created_at'] = room.created_at.isoformat()
        
        # 保存房间信息
        self.redis_client.hset(self.rooms_key, room.room_id, json.dumps(room_data))
        
        # 发布房间创建事件
        self.publisher.publish_event("room_created", {
            "room_id": room.room_id,
            "name": room.name,
            "created_by": room.created_by
        })
        
        return True
    
    def get_room(self, room_id: str) -> Optional[ChatRoom]:
        """获取聊天室信息"""
        room_data = self.redis_client.hget(self.rooms_key, room_id)
        if not room_data:
            return None
        
        data = json.loads(room_data)
        data['created_at'] = datetime.fromisoformat(data['created_at'])
        return ChatRoom(**data)
    
    def list_rooms(self) -> List[ChatRoom]:
        """获取所有聊天室"""
        rooms_data = self.redis_client.hgetall(self.rooms_key)
        rooms = []
        
        for room_id, room_data in rooms_data.items():
            data = json.loads(room_data)
            data['created_at'] = datetime.fromisoformat(data['created_at'])
            rooms.append(ChatRoom(**data))
        
        return rooms
    
    def join_room(self, room_id: str, user: ChatUser) -> bool:
        """用户加入聊天室"""
        room = self.get_room(room_id)
        if not room:
            return False
        
        room_users_key = f"{self.room_users_prefix}:{room_id}"
        user_rooms_key = f"{self.user_rooms_prefix}:{user.user_id}"
        
        # 检查房间人数限制
        current_users = self.redis_client.scard(room_users_key)
        if current_users >= room.max_users:
            return False
        
        # 添加用户到房间
        self.redis_client.sadd(room_users_key, user.user_id)
        self.redis_client.sadd(user_rooms_key, room_id)
        
        # 保存用户信息
        user_data = asdict(user)
        user_data['last_seen'] = user.last_seen.isoformat()
        user_data['status'] = user.status.value
        self.redis_client.hset(self.users_key, user.user_id, json.dumps(user_data))
        
        # 发布用户加入事件
        self.publisher.publish(f"chat:room:{room_id}", json.dumps({
            "type": "user_joined",
            "user_id": user.user_id,
            "username": user.username,
            "timestamp": datetime.now().isoformat()
        }))
        
        # 发送系统消息
        system_message = ChatMessage(
            message_id=str(uuid.uuid4()),
            room_id=room_id,
            user_id="system",
            username="系统",
            message_type=MessageType.SYSTEM,
            content=f"{user.username} 加入了聊天室",
            timestamp=datetime.now()
        )
        self._save_message(system_message)
        
        return True
    
    def leave_room(self, room_id: str, user_id: str) -> bool:
        """用户离开聊天室"""
        room_users_key = f"{self.room_users_prefix}:{room_id}"
        user_rooms_key = f"{self.user_rooms_prefix}:{user_id}"
        
        # 从房间移除用户
        removed = self.redis_client.srem(room_users_key, user_id)
        self.redis_client.srem(user_rooms_key, room_id)
        
        if removed:
            # 获取用户信息
            user_data = self.redis_client.hget(self.users_key, user_id)
            username = "未知用户"
            if user_data:
                user_info = json.loads(user_data)
                username = user_info.get('username', '未知用户')
            
            # 发布用户离开事件
            self.publisher.publish(f"chat:room:{room_id}", json.dumps({
                "type": "user_left",
                "user_id": user_id,
                "username": username,
                "timestamp": datetime.now().isoformat()
            }))
            
            # 发送系统消息
            system_message = ChatMessage(
                message_id=str(uuid.uuid4()),
                room_id=room_id,
                user_id="system",
                username="系统",
                message_type=MessageType.SYSTEM,
                content=f"{username} 离开了聊天室",
                timestamp=datetime.now()
            )
            self._save_message(system_message)
        
        return bool(removed)
    
    def send_message(self, message: ChatMessage) -> bool:
        """发送消息"""
        # 验证用户是否在房间中
        room_users_key = f"{self.room_users_prefix}:{message.room_id}"
        if not self.redis_client.sismember(room_users_key, message.user_id):
            return False
        
        # 保存消息
        self._save_message(message)
        
        # 发布消息到房间频道
        self.publisher.publish(f"chat:room:{message.room_id}", json.dumps(message.to_dict()))
        
        # 更新用户最后活跃时间
        self._update_user_activity(message.user_id)
        
        return True
    
    def _save_message(self, message: ChatMessage):
        """保存消息到历史记录"""
        history_key = f"{self.message_history_prefix}:{message.room_id}"
        
        # 使用有序集合保存消息,分数为时间戳
        score = message.timestamp.timestamp()
        self.redis_client.zadd(history_key, {json.dumps(message.to_dict()): score})
        
        # 保留最近1000条消息
        self.redis_client.zremrangebyrank(history_key, 0, -1001)
        
        # 设置过期时间(30天)
        self.redis_client.expire(history_key, 86400 * 30)
    
    def get_message_history(self, room_id: str, limit: int = 50, before_timestamp: datetime = None) -> List[ChatMessage]:
        """获取消息历史"""
        history_key = f"{self.message_history_prefix}:{room_id}"
        
        if before_timestamp:
            max_score = before_timestamp.timestamp()
            messages_data = self.redis_client.zrevrangebyscore(
                history_key, max_score, '-inf', start=0, num=limit
            )
        else:
            messages_data = self.redis_client.zrevrange(history_key, 0, limit-1)
        
        messages = []
        for msg_data in messages_data:
            try:
                msg_dict = json.loads(msg_data)
                messages.append(ChatMessage.from_dict(msg_dict))
            except (json.JSONDecodeError, TypeError):
                continue
        
        return list(reversed(messages))  # 按时间正序返回
    
    def get_room_users(self, room_id: str) -> List[ChatUser]:
        """获取房间用户列表"""
        room_users_key = f"{self.room_users_prefix}:{room_id}"
        user_ids = self.redis_client.smembers(room_users_key)
        
        users = []
        for user_id in user_ids:
            user_data = self.redis_client.hget(self.users_key, user_id)
            if user_data:
                try:
                    data = json.loads(user_data)
                    data['last_seen'] = datetime.fromisoformat(data['last_seen'])
                    data['status'] = UserStatus(data['status'])
                    users.append(ChatUser(**data))
                except (json.JSONDecodeError, ValueError):
                    continue
        
        return users
    
    def get_user_rooms(self, user_id: str) -> List[ChatRoom]:
        """获取用户加入的房间列表"""
        user_rooms_key = f"{self.user_rooms_prefix}:{user_id}"
        room_ids = self.redis_client.smembers(user_rooms_key)
        
        rooms = []
        for room_id in room_ids:
            room = self.get_room(room_id)
            if room:
                rooms.append(room)
        
        return rooms
    
    def update_user_status(self, user_id: str, status: UserStatus) -> bool:
        """更新用户状态"""
        user_data = self.redis_client.hget(self.users_key, user_id)
        if not user_data:
            return False
        
        data = json.loads(user_data)
        data['status'] = status.value
        data['last_seen'] = datetime.now().isoformat()
        
        self.redis_client.hset(self.users_key, user_id, json.dumps(data))
        
        # 通知用户状态变更
        user_rooms = self.get_user_rooms(user_id)
        for room in user_rooms:
            self.publisher.publish(f"chat:room:{room.room_id}", json.dumps({
                "type": "user_status_changed",
                "user_id": user_id,
                "status": status.value,
                "timestamp": datetime.now().isoformat()
            }))
        
        return True
    
    def _update_user_activity(self, user_id: str):
        """更新用户活跃时间"""
        user_data = self.redis_client.hget(self.users_key, user_id)
        if user_data:
            data = json.loads(user_data)
            data['last_seen'] = datetime.now().isoformat()
            self.redis_client.hset(self.users_key, user_id, json.dumps(data))
    
    def search_messages(self, room_id: str, keyword: str, limit: int = 20) -> List[ChatMessage]:
        """搜索消息"""
        history_key = f"{self.message_history_prefix}:{room_id}"
        all_messages = self.redis_client.zrevrange(history_key, 0, -1)
        
        matching_messages = []
        for msg_data in all_messages:
            try:
                msg_dict = json.loads(msg_data)
                if keyword.lower() in msg_dict.get('content', '').lower():
                    matching_messages.append(ChatMessage.from_dict(msg_dict))
                    if len(matching_messages) >= limit:
                        break
            except (json.JSONDecodeError, TypeError):
                continue
        
        return matching_messages
    
    def get_room_stats(self, room_id: str) -> Dict[str, any]:
        """获取房间统计信息"""
        room = self.get_room(room_id)
        if not room:
            return {}
        
        room_users_key = f"{self.room_users_prefix}:{room_id}"
        history_key = f"{self.message_history_prefix}:{room_id}"
        
        # 当前在线用户数
        current_users = self.redis_client.scard(room_users_key)
        
        # 消息总数
        total_messages = self.redis_client.zcard(history_key)
        
        # 今日消息数
        today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
        today_messages = self.redis_client.zcount(
            history_key, today_start.timestamp(), '+inf'
        )
        
        return {
            'room_id': room_id,
            'room_name': room.name,
            'current_users': current_users,
            'max_users': room.max_users,
            'total_messages': total_messages,
            'today_messages': today_messages,
            'created_at': room.created_at.isoformat()
        }

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    chat_system = ChatSystem(redis_client)
    
    # 创建聊天室
    room = ChatRoom(
        room_id="room_001",
        name="技术讨论",
        description="讨论技术相关话题",
        created_by="admin",
        max_users=50
    )
    chat_system.create_room(room)
    
    # 创建用户
    user1 = ChatUser(
        user_id="user_001",
        username="张三",
        avatar="avatar1.jpg"
    )
    
    user2 = ChatUser(
        user_id="user_002",
        username="李四",
        avatar="avatar2.jpg"
    )
    
    # 用户加入聊天室
    chat_system.join_room("room_001", user1)
    chat_system.join_room("room_001", user2)
    
    # 发送消息
    message1 = ChatMessage(
        message_id=str(uuid.uuid4()),
        room_id="room_001",
        user_id="user_001",
        username="张三",
        message_type=MessageType.TEXT,
        content="大家好!",
        timestamp=datetime.now()
    )
    
    message2 = ChatMessage(
        message_id=str(uuid.uuid4()),
        room_id="room_001",
        user_id="user_002",
        username="李四",
        message_type=MessageType.TEXT,
        content="你好,张三!",
        timestamp=datetime.now(),
        reply_to=message1.message_id
    )
    
    chat_system.send_message(message1)
    chat_system.send_message(message2)
    
    # 获取消息历史
    history = chat_system.get_message_history("room_001")
    print(f"消息历史 ({len(history)} 条):")
    for msg in history:
        print(f"  [{msg.timestamp.strftime('%H:%M:%S')}] {msg.username}: {msg.content}")
    
    # 获取房间用户
    users = chat_system.get_room_users("room_001")
    print(f"\n房间用户 ({len(users)} 人):")
    for user in users:
        print(f"  {user.username} ({user.status.value})")
    
    # 获取房间统计
    stats = chat_system.get_room_stats("room_001")
    print(f"\n房间统计:")
    print(f"  当前用户: {stats['current_users']}/{stats['max_users']}")
    print(f"  总消息数: {stats['total_messages']}")
    print(f"  今日消息: {stats['today_messages']}")

8.3.3 实时通知系统

import redis
import json
import uuid
from typing import Dict, List, Optional, Set, Callable
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum

class NotificationType(Enum):
    INFO = "info"
    WARNING = "warning"
    ERROR = "error"
    SUCCESS = "success"
    SYSTEM = "system"

class NotificationPriority(Enum):
    LOW = "low"
    NORMAL = "normal"
    HIGH = "high"
    URGENT = "urgent"

@dataclass
class Notification:
    notification_id: str
    user_id: str
    title: str
    content: str
    notification_type: NotificationType
    priority: NotificationPriority
    timestamp: datetime
    read: bool = False
    action_url: Optional[str] = None
    metadata: Optional[Dict] = None
    expires_at: Optional[datetime] = None
    
    def to_dict(self) -> Dict:
        data = asdict(self)
        data['timestamp'] = self.timestamp.isoformat()
        data['notification_type'] = self.notification_type.value
        data['priority'] = self.priority.value
        if self.expires_at:
            data['expires_at'] = self.expires_at.isoformat()
        return data
    
    @classmethod
    def from_dict(cls, data: Dict) -> 'Notification':
        data['timestamp'] = datetime.fromisoformat(data['timestamp'])
        data['notification_type'] = NotificationType(data['notification_type'])
        data['priority'] = NotificationPriority(data['priority'])
        if data.get('expires_at'):
            data['expires_at'] = datetime.fromisoformat(data['expires_at'])
        return cls(**data)

class NotificationSystem:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.publisher = RedisPublisher(redis_client)
        self.notifications_prefix = "notifications"
        self.user_notifications_prefix = "user_notifications"
        self.notification_settings_prefix = "notification_settings"
        self.unread_count_prefix = "unread_count"
    
    def send_notification(self, notification: Notification) -> bool:
        """发送通知"""
        # 保存通知
        notification_key = f"{self.notifications_prefix}:{notification.notification_id}"
        self.redis_client.setex(
            notification_key,
            86400 * 30,  # 30天过期
            json.dumps(notification.to_dict())
        )
        
        # 添加到用户通知列表
        user_notifications_key = f"{self.user_notifications_prefix}:{notification.user_id}"
        score = notification.timestamp.timestamp()
        self.redis_client.zadd(user_notifications_key, {notification.notification_id: score})
        
        # 保留最近1000条通知
        self.redis_client.zremrangebyrank(user_notifications_key, 0, -1001)
        
        # 更新未读计数
        if not notification.read:
            unread_key = f"{self.unread_count_prefix}:{notification.user_id}"
            self.redis_client.incr(unread_key)
            self.redis_client.expire(unread_key, 86400 * 30)
        
        # 实时推送通知
        self.publisher.publish(
            f"notifications:user:{notification.user_id}",
            json.dumps(notification.to_dict())
        )
        
        # 根据优先级推送到不同频道
        if notification.priority in [NotificationPriority.HIGH, NotificationPriority.URGENT]:
            self.publisher.publish(
                f"notifications:priority:{notification.priority.value}",
                json.dumps(notification.to_dict())
            )
        
        return True
    
    def send_bulk_notification(self, user_ids: List[str], title: str, content: str, 
                             notification_type: NotificationType = NotificationType.INFO,
                             priority: NotificationPriority = NotificationPriority.NORMAL,
                             action_url: str = None, metadata: Dict = None) -> List[str]:
        """批量发送通知"""
        notification_ids = []
        
        for user_id in user_ids:
            notification_id = str(uuid.uuid4())
            notification = Notification(
                notification_id=notification_id,
                user_id=user_id,
                title=title,
                content=content,
                notification_type=notification_type,
                priority=priority,
                timestamp=datetime.now(),
                action_url=action_url,
                metadata=metadata
            )
            
            if self.send_notification(notification):
                notification_ids.append(notification_id)
        
        return notification_ids
    
    def get_user_notifications(self, user_id: str, limit: int = 20, 
                             offset: int = 0, unread_only: bool = False) -> List[Notification]:
        """获取用户通知列表"""
        user_notifications_key = f"{self.user_notifications_prefix}:{user_id}"
        
        # 获取通知ID列表
        notification_ids = self.redis_client.zrevrange(
            user_notifications_key, offset, offset + limit - 1
        )
        
        notifications = []
        for notification_id in notification_ids:
            notification = self.get_notification(notification_id)
            if notification:
                if unread_only and notification.read:
                    continue
                notifications.append(notification)
        
        return notifications
    
    def get_notification(self, notification_id: str) -> Optional[Notification]:
        """获取单个通知"""
        notification_key = f"{self.notifications_prefix}:{notification_id}"
        notification_data = self.redis_client.get(notification_key)
        
        if not notification_data:
            return None
        
        try:
            data = json.loads(notification_data)
            return Notification.from_dict(data)
        except (json.JSONDecodeError, TypeError, ValueError):
            return None
    
    def mark_as_read(self, notification_id: str) -> bool:
        """标记通知为已读"""
        notification = self.get_notification(notification_id)
        if not notification or notification.read:
            return False
        
        # 更新通知状态
        notification.read = True
        notification_key = f"{self.notifications_prefix}:{notification_id}"
        self.redis_client.setex(
            notification_key,
            86400 * 30,
            json.dumps(notification.to_dict())
        )
        
        # 减少未读计数
        unread_key = f"{self.unread_count_prefix}:{notification.user_id}"
        current_count = self.redis_client.get(unread_key)
        if current_count and int(current_count) > 0:
            self.redis_client.decr(unread_key)
        
        # 推送已读状态更新
        self.publisher.publish(
            f"notifications:user:{notification.user_id}:read",
            json.dumps({
                "notification_id": notification_id,
                "read": True,
                "timestamp": datetime.now().isoformat()
            })
        )
        
        return True
    
    def mark_all_as_read(self, user_id: str) -> int:
        """标记用户所有通知为已读"""
        user_notifications_key = f"{self.user_notifications_prefix}:{user_id}"
        notification_ids = self.redis_client.zrange(user_notifications_key, 0, -1)
        
        read_count = 0
        for notification_id in notification_ids:
            if self.mark_as_read(notification_id):
                read_count += 1
        
        # 重置未读计数
        unread_key = f"{self.unread_count_prefix}:{user_id}"
        self.redis_client.delete(unread_key)
        
        return read_count
    
    def get_unread_count(self, user_id: str) -> int:
        """获取未读通知数量"""
        unread_key = f"{self.unread_count_prefix}:{user_id}"
        count = self.redis_client.get(unread_key)
        return int(count) if count else 0
    
    def delete_notification(self, notification_id: str) -> bool:
        """删除通知"""
        notification = self.get_notification(notification_id)
        if not notification:
            return False
        
        # 删除通知数据
        notification_key = f"{self.notifications_prefix}:{notification_id}"
        self.redis_client.delete(notification_key)
        
        # 从用户通知列表中移除
        user_notifications_key = f"{self.user_notifications_prefix}:{notification.user_id}"
        self.redis_client.zrem(user_notifications_key, notification_id)
        
        # 如果是未读通知,减少未读计数
        if not notification.read:
            unread_key = f"{self.unread_count_prefix}:{notification.user_id}"
            current_count = self.redis_client.get(unread_key)
            if current_count and int(current_count) > 0:
                self.redis_client.decr(unread_key)
        
        return True
    
    def cleanup_expired_notifications(self) -> int:
        """清理过期通知"""
        # 这是一个简化的实现,实际应该使用更高效的方法
        # 比如使用Redis的过期机制或定期任务
        
        cleaned_count = 0
        now = datetime.now()
        
        # 获取所有用户的通知列表
        pattern = f"{self.user_notifications_prefix}:*"
        for key in self.redis_client.scan_iter(match=pattern):
            user_id = key.split(':')[-1]
            notification_ids = self.redis_client.zrange(key, 0, -1)
            
            for notification_id in notification_ids:
                notification = self.get_notification(notification_id)
                if notification and notification.expires_at and notification.expires_at < now:
                    if self.delete_notification(notification_id):
                        cleaned_count += 1
        
        return cleaned_count
    
    def get_notification_stats(self, user_id: str) -> Dict[str, any]:
        """获取通知统计信息"""
        user_notifications_key = f"{self.user_notifications_prefix}:{user_id}"
        
        # 总通知数
        total_notifications = self.redis_client.zcard(user_notifications_key)
        
        # 未读通知数
        unread_count = self.get_unread_count(user_id)
        
        # 今日通知数
        today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
        today_notifications = self.redis_client.zcount(
            user_notifications_key, today_start.timestamp(), '+inf'
        )
        
        # 按类型统计
        type_stats = {}
        notification_ids = self.redis_client.zrange(user_notifications_key, 0, -1)
        
        for notification_id in notification_ids:
            notification = self.get_notification(notification_id)
            if notification:
                type_name = notification.notification_type.value
                type_stats[type_name] = type_stats.get(type_name, 0) + 1
        
        return {
            'user_id': user_id,
            'total_notifications': total_notifications,
            'unread_count': unread_count,
            'read_count': total_notifications - unread_count,
            'today_notifications': today_notifications,
            'type_stats': type_stats
        }

class NotificationSubscriber:
    def __init__(self, redis_client: redis.Redis, user_id: str):
        self.redis_client = redis_client
        self.user_id = user_id
        self.subscriber = RedisSubscriber(redis_client)
        self.notification_handlers: List[Callable[[Notification], None]] = []
        self.read_handlers: List[Callable[[str], None]] = []
    
    def add_notification_handler(self, handler: Callable[[Notification], None]):
        """添加通知处理器"""
        self.notification_handlers.append(handler)
    
    def add_read_handler(self, handler: Callable[[str], None]):
        """添加已读处理器"""
        self.read_handlers.append(handler)
    
    def start_listening(self):
        """开始监听通知"""
        # 订阅用户通知频道
        self.subscriber.subscribe(
            f"notifications:user:{self.user_id}",
            self._handle_notification
        )
        
        # 订阅已读状态更新频道
        self.subscriber.subscribe(
            f"notifications:user:{self.user_id}:read",
            self._handle_read_update
        )
        
        # 订阅高优先级通知
        self.subscriber.psubscribe(
            "notifications:priority:*",
            self._handle_priority_notification
        )
        
        self.subscriber.start_listening()
    
    def _handle_notification(self, message: PubSubMessage):
        """处理通知消息"""
        try:
            data = json.loads(message.data)
            notification = Notification.from_dict(data)
            
            for handler in self.notification_handlers:
                handler(notification)
        except (json.JSONDecodeError, TypeError, ValueError) as e:
            print(f"Failed to parse notification: {e}")
    
    def _handle_read_update(self, message: PubSubMessage):
        """处理已读状态更新"""
        try:
            data = json.loads(message.data)
            notification_id = data.get('notification_id')
            
            for handler in self.read_handlers:
                handler(notification_id)
        except (json.JSONDecodeError, TypeError) as e:
            print(f"Failed to parse read update: {e}")
    
    def _handle_priority_notification(self, message: PubSubMessage):
        """处理高优先级通知"""
        try:
            data = json.loads(message.data)
            notification = Notification.from_dict(data)
            
            # 只处理发给当前用户的高优先级通知
            if notification.user_id == self.user_id:
                print(f"[URGENT] {notification.title}: {notification.content}")
        except (json.JSONDecodeError, TypeError, ValueError) as e:
            print(f"Failed to parse priority notification: {e}")
    
    def stop_listening(self):
        """停止监听"""
        self.subscriber.stop_listening()
    
    def close(self):
        """关闭订阅者"""
        self.subscriber.close()

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    notification_system = NotificationSystem(redis_client)
    
    # 创建通知订阅者
    user_id = "user_123"
    subscriber = NotificationSubscriber(redis_client, user_id)
    
    # 定义通知处理器
    def notification_handler(notification: Notification):
        print(f"[{notification.notification_type.value.upper()}] {notification.title}")
        print(f"  内容: {notification.content}")
        print(f"  时间: {notification.timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"  优先级: {notification.priority.value}")
        if notification.action_url:
            print(f"  操作链接: {notification.action_url}")
        print()
    
    def read_handler(notification_id: str):
        print(f"通知 {notification_id} 已标记为已读")
    
    # 添加处理器
    subscriber.add_notification_handler(notification_handler)
    subscriber.add_read_handler(read_handler)
    
    # 开始监听
    subscriber.start_listening()
    
    # 发送各种类型的通知
    import time
    time.sleep(1)  # 等待订阅生效
    
    # 普通信息通知
    notification1 = Notification(
        notification_id=str(uuid.uuid4()),
        user_id=user_id,
        title="欢迎使用系统",
        content="感谢您注册我们的服务,祝您使用愉快!",
        notification_type=NotificationType.INFO,
        priority=NotificationPriority.NORMAL,
        timestamp=datetime.now()
    )
    notification_system.send_notification(notification1)
    
    # 警告通知
    notification2 = Notification(
        notification_id=str(uuid.uuid4()),
        user_id=user_id,
        title="账户安全提醒",
        content="检测到您的账户在异地登录,请确认是否为本人操作",
        notification_type=NotificationType.WARNING,
        priority=NotificationPriority.HIGH,
        timestamp=datetime.now(),
        action_url="/security/check"
    )
    notification_system.send_notification(notification2)
    
    # 紧急通知
    notification3 = Notification(
        notification_id=str(uuid.uuid4()),
        user_id=user_id,
        title="系统维护通知",
        content="系统将于今晚23:00-01:00进行维护,请提前保存工作",
        notification_type=NotificationType.SYSTEM,
        priority=NotificationPriority.URGENT,
        timestamp=datetime.now(),
        expires_at=datetime.now() + timedelta(hours=8)
    )
    notification_system.send_notification(notification3)
    
    # 等待消息处理
    time.sleep(2)
    
    # 获取通知统计
    stats = notification_system.get_notification_stats(user_id)
    print(f"通知统计:")
    print(f"  总通知数: {stats['total_notifications']}")
    print(f"  未读通知: {stats['unread_count']}")
    print(f"  今日通知: {stats['today_notifications']}")
    print(f"  类型统计: {stats['type_stats']}")
    
    # 标记第一个通知为已读
    notification_system.mark_as_read(notification1.notification_id)
    
    time.sleep(1)
    
    # 获取用户通知列表
    notifications = notification_system.get_user_notifications(user_id)
    print(f"\n用户通知列表 ({len(notifications)} 条):")
    for notif in notifications:
        status = "已读" if notif.read else "未读"
        print(f"  [{status}] {notif.title} - {notif.timestamp.strftime('%H:%M:%S')}")
    
    # 清理
    subscriber.close()

8.4 性能优化

8.4.1 连接池优化

import redis
from redis.connection import ConnectionPool
from typing import Dict, List

class OptimizedPubSubManager:
    def __init__(self, redis_config: Dict[str, any]):
        # 为发布者创建专用连接池
        self.publisher_pool = ConnectionPool(
            host=redis_config.get('host', 'localhost'),
            port=redis_config.get('port', 6379),
            db=redis_config.get('db', 0),
            password=redis_config.get('password'),
            max_connections=redis_config.get('publisher_max_connections', 10),
            retry_on_timeout=True,
            socket_keepalive=True,
            socket_keepalive_options={}
        )
        
        # 为订阅者创建专用连接池
        self.subscriber_pool = ConnectionPool(
            host=redis_config.get('host', 'localhost'),
            port=redis_config.get('port', 6379),
            db=redis_config.get('db', 0),
            password=redis_config.get('password'),
            max_connections=redis_config.get('subscriber_max_connections', 20),
            retry_on_timeout=True,
            socket_keepalive=True,
            socket_keepalive_options={}
        )
        
        self.publisher_client = redis.Redis(connection_pool=self.publisher_pool)
        self.subscriber_clients: List[redis.Redis] = []
    
    def get_publisher(self) -> redis.Redis:
        """获取发布者客户端"""
        return self.publisher_client
    
    def get_subscriber(self) -> redis.Redis:
        """获取订阅者客户端"""
        client = redis.Redis(connection_pool=self.subscriber_pool)
        self.subscriber_clients.append(client)
        return client
    
    def close_all(self):
        """关闭所有连接"""
        for client in self.subscriber_clients:
            try:
                client.close()
            except:
                pass
        
        try:
            self.publisher_client.close()
        except:
            pass
        
        self.publisher_pool.disconnect()
        self.subscriber_pool.disconnect()

8.4.2 批量操作优化

import redis
from typing import List, Dict, Any
from datetime import datetime

class BatchPubSubOperations:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def batch_publish(self, messages: List[Dict[str, Any]]) -> List[int]:
        """批量发布消息"""
        pipe = self.redis_client.pipeline()
        
        for msg in messages:
            channel = msg['channel']
            content = msg['content']
            pipe.publish(channel, content)
        
        results = pipe.execute()
        return results
    
    def publish_to_multiple_channels(self, channels: List[str], message: str) -> Dict[str, int]:
        """发布消息到多个频道"""
        pipe = self.redis_client.pipeline()
        
        for channel in channels:
            pipe.publish(channel, message)
        
        results = pipe.execute()
        return dict(zip(channels, results))
    
    def batch_channel_info(self, channels: List[str]) -> Dict[str, Dict[str, Any]]:
        """批量获取频道信息"""
        pipe = self.redis_client.pipeline()
        
        # 批量获取频道订阅者数量
        for channel in channels:
            pipe.pubsub_numsub(channel)
        
        # 获取活跃频道列表
        pipe.pubsub_channels()
        
        # 获取模式订阅数量
        pipe.pubsub_numpat()
        
        results = pipe.execute()
        
        # 解析结果
        channel_info = {}
        active_channels = results[-2]
        pattern_count = results[-1]
        
        for i, channel in enumerate(channels):
            numsub_result = results[i]
            subscriber_count = numsub_result[1] if len(numsub_result) > 1 else 0
            
            channel_info[channel] = {
                'subscriber_count': subscriber_count,
                'is_active': channel.encode() in active_channels,
                'pattern_subscriptions': pattern_count
            }
        
        return channel_info

### 8.4.3 消息压缩优化

```python
import gzip
import json
import base64
from typing import Any, Dict

class CompressedPublisher:
    def __init__(self, redis_client: redis.Redis, compression_threshold: int = 1024):
        self.redis_client = redis_client
        self.compression_threshold = compression_threshold
    
    def publish_compressed(self, channel: str, data: Any) -> int:
        """发布压缩消息"""
        # 序列化数据
        if isinstance(data, (dict, list)):
            message = json.dumps(data, ensure_ascii=False)
        else:
            message = str(data)
        
        # 检查是否需要压缩
        if len(message.encode('utf-8')) > self.compression_threshold:
            # 压缩消息
            compressed = gzip.compress(message.encode('utf-8'))
            encoded = base64.b64encode(compressed).decode('ascii')
            
            # 添加压缩标识
            final_message = json.dumps({
                'compressed': True,
                'data': encoded,
                'original_size': len(message.encode('utf-8')),
                'compressed_size': len(compressed)
            })
        else:
            # 不压缩
            final_message = json.dumps({
                'compressed': False,
                'data': message
            })
        
        return self.redis_client.publish(channel, final_message)
    
    @staticmethod
    def decompress_message(raw_message: str) -> str:
        """解压缩消息"""
        try:
            msg_data = json.loads(raw_message)
            
            if msg_data.get('compressed', False):
                # 解压缩
                compressed_data = base64.b64decode(msg_data['data'])
                decompressed = gzip.decompress(compressed_data)
                return decompressed.decode('utf-8')
            else:
                # 未压缩
                return msg_data['data']
        except (json.JSONDecodeError, KeyError, Exception):
            # 如果解析失败,返回原始消息
            return raw_message

### 8.4.4 连接重试机制

```python
import time
import logging
from typing import Callable, Optional
from functools import wraps

class ResilientSubscriber:
    def __init__(self, redis_client: redis.Redis, max_retries: int = 5, 
                 retry_delay: float = 1.0, backoff_factor: float = 2.0):
        self.redis_client = redis_client
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.backoff_factor = backoff_factor
        self.pubsub = None
        self.subscriptions = {}
        self.pattern_subscriptions = {}
        self.running = False
        self.logger = logging.getLogger(__name__)
    
    def with_retry(self, func: Callable) -> Callable:
        """重试装饰器"""
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_exception = None
            delay = self.retry_delay
            
            for attempt in range(self.max_retries + 1):
                try:
                    return func(*args, **kwargs)
                except (redis.ConnectionError, redis.TimeoutError) as e:
                    last_exception = e
                    if attempt < self.max_retries:
                        self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
                        time.sleep(delay)
                        delay *= self.backoff_factor
                        
                        # 重新建立连接
                        self._reconnect()
                    else:
                        self.logger.error(f"All {self.max_retries + 1} attempts failed")
                        raise last_exception
                except Exception as e:
                    self.logger.error(f"Non-recoverable error: {e}")
                    raise
            
            raise last_exception
        
        return wrapper
    
    def _reconnect(self):
        """重新连接"""
        try:
            if self.pubsub:
                self.pubsub.close()
        except:
            pass
        
        self.pubsub = self.redis_client.pubsub()
        
        # 重新订阅
        if self.subscriptions:
            self.pubsub.subscribe(**self.subscriptions)
        
        if self.pattern_subscriptions:
            self.pubsub.psubscribe(**self.pattern_subscriptions)
    
    @with_retry
    def subscribe(self, channel: str, handler: Callable = None):
        """订阅频道(带重试)"""
        if not self.pubsub:
            self.pubsub = self.redis_client.pubsub()
        
        self.subscriptions[channel] = handler
        self.pubsub.subscribe(channel)
    
    @with_retry
    def psubscribe(self, pattern: str, handler: Callable = None):
        """模式订阅(带重试)"""
        if not self.pubsub:
            self.pubsub = self.redis_client.pubsub()
        
        self.pattern_subscriptions[pattern] = handler
        self.pubsub.psubscribe(pattern)
    
    @with_retry
    def get_message(self, timeout: float = 1.0):
        """获取消息(带重试)"""
        if not self.pubsub:
            raise redis.ConnectionError("Not connected")
        
        return self.pubsub.get_message(timeout=timeout)
    
    def start_resilient_listening(self):
        """开始弹性监听"""
        self.running = True
        
        while self.running:
            try:
                message = self.get_message()
                if message:
                    self._process_message(message)
            except (redis.ConnectionError, redis.TimeoutError) as e:
                self.logger.warning(f"Connection lost during listening: {e}")
                time.sleep(self.retry_delay)
            except Exception as e:
                self.logger.error(f"Unexpected error during listening: {e}")
                time.sleep(1)
    
    def _process_message(self, message):
        """处理消息"""
        msg_type = message.get('type')
        
        if msg_type == 'message':
            channel = message['channel'].decode('utf-8')
            handler = self.subscriptions.get(channel)
            if handler:
                try:
                    handler(message)
                except Exception as e:
                    self.logger.error(f"Handler error for channel {channel}: {e}")
        
        elif msg_type == 'pmessage':
            pattern = message['pattern'].decode('utf-8')
            handler = self.pattern_subscriptions.get(pattern)
            if handler:
                try:
                    handler(message)
                except Exception as e:
                    self.logger.error(f"Handler error for pattern {pattern}: {e}")
    
    def stop_listening(self):
        """停止监听"""
        self.running = False
    
    def close(self):
        """关闭连接"""
        self.stop_listening()
        if self.pubsub:
            self.pubsub.close()

## 8.5 监控和调试

### 8.5.1 发布订阅监控

```python
import redis
import time
import json
from typing import Dict, List, Any
from datetime import datetime, timedelta
from dataclasses import dataclass

@dataclass
class PubSubMetrics:
    timestamp: datetime
    active_channels: int
    pattern_subscriptions: int
    total_subscribers: int
    messages_per_second: float
    memory_usage: int
    connection_count: int

class PubSubMonitor:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.metrics_history: List[PubSubMetrics] = []
        self.message_counts = {}
        self.last_check_time = time.time()
    
    def collect_metrics(self) -> PubSubMetrics:
        """收集发布订阅指标"""
        current_time = time.time()
        
        # 获取活跃频道
        active_channels = self.redis_client.pubsub_channels()
        
        # 获取模式订阅数量
        pattern_count = self.redis_client.pubsub_numpat()
        
        # 计算总订阅者数量
        total_subscribers = 0
        if active_channels:
            numsub_results = self.redis_client.pubsub_numsub(*active_channels)
            for i in range(1, len(numsub_results), 2):
                total_subscribers += numsub_results[i]
        
        # 计算消息速率
        time_diff = current_time - self.last_check_time
        messages_per_second = 0
        if time_diff > 0:
            # 这里需要实际的消息计数逻辑
            messages_per_second = self._calculate_message_rate(time_diff)
        
        # 获取内存使用情况
        info = self.redis_client.info('memory')
        memory_usage = info.get('used_memory', 0)
        
        # 获取连接数
        clients_info = self.redis_client.info('clients')
        connection_count = clients_info.get('connected_clients', 0)
        
        metrics = PubSubMetrics(
            timestamp=datetime.now(),
            active_channels=len(active_channels),
            pattern_subscriptions=pattern_count,
            total_subscribers=total_subscribers,
            messages_per_second=messages_per_second,
            memory_usage=memory_usage,
            connection_count=connection_count
        )
        
        self.metrics_history.append(metrics)
        self.last_check_time = current_time
        
        # 保留最近100个指标
        if len(self.metrics_history) > 100:
            self.metrics_history.pop(0)
        
        return metrics
    
    def _calculate_message_rate(self, time_diff: float) -> float:
        """计算消息速率(简化实现)"""
        # 实际实现中,你需要跟踪消息计数
        # 这里返回一个模拟值
        return 0.0
    
    def get_channel_details(self, channel: str) -> Dict[str, Any]:
        """获取频道详细信息"""
        # 获取订阅者数量
        numsub_result = self.redis_client.pubsub_numsub(channel)
        subscriber_count = numsub_result[1] if len(numsub_result) > 1 else 0
        
        # 检查是否活跃
        active_channels = self.redis_client.pubsub_channels()
        is_active = channel.encode() in active_channels
        
        return {
            'channel': channel,
            'subscriber_count': subscriber_count,
            'is_active': is_active,
            'last_checked': datetime.now().isoformat()
        }
    
    def get_system_overview(self) -> Dict[str, Any]:
        """获取系统概览"""
        latest_metrics = self.collect_metrics()
        
        # 计算趋势
        if len(self.metrics_history) >= 2:
            prev_metrics = self.metrics_history[-2]
            channel_trend = latest_metrics.active_channels - prev_metrics.active_channels
            subscriber_trend = latest_metrics.total_subscribers - prev_metrics.total_subscribers
        else:
            channel_trend = 0
            subscriber_trend = 0
        
        return {
            'current_metrics': {
                'active_channels': latest_metrics.active_channels,
                'pattern_subscriptions': latest_metrics.pattern_subscriptions,
                'total_subscribers': latest_metrics.total_subscribers,
                'messages_per_second': latest_metrics.messages_per_second,
                'memory_usage_mb': latest_metrics.memory_usage / (1024 * 1024),
                'connection_count': latest_metrics.connection_count
            },
            'trends': {
                'channel_change': channel_trend,
                'subscriber_change': subscriber_trend
            },
            'timestamp': latest_metrics.timestamp.isoformat()
        }
    
    def generate_report(self, hours: int = 24) -> Dict[str, Any]:
        """生成监控报告"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        recent_metrics = [m for m in self.metrics_history if m.timestamp >= cutoff_time]
        
        if not recent_metrics:
            return {'error': 'No metrics available for the specified time period'}
        
        # 计算统计信息
        avg_channels = sum(m.active_channels for m in recent_metrics) / len(recent_metrics)
        max_channels = max(m.active_channels for m in recent_metrics)
        min_channels = min(m.active_channels for m in recent_metrics)
        
        avg_subscribers = sum(m.total_subscribers for m in recent_metrics) / len(recent_metrics)
        max_subscribers = max(m.total_subscribers for m in recent_metrics)
        
        avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
        max_memory = max(m.memory_usage for m in recent_metrics)
        
        return {
            'period': f'{hours} hours',
            'data_points': len(recent_metrics),
            'channels': {
                'average': round(avg_channels, 2),
                'maximum': max_channels,
                'minimum': min_channels
            },
            'subscribers': {
                'average': round(avg_subscribers, 2),
                'maximum': max_subscribers
            },
            'memory': {
                'average_mb': round(avg_memory / (1024 * 1024), 2),
                'maximum_mb': round(max_memory / (1024 * 1024), 2)
            },
            'generated_at': datetime.now().isoformat()
        }

### 8.5.2 调试工具

```python
import redis
import json
import threading
import time
from typing import Dict, List, Callable, Any
from datetime import datetime

class PubSubDebugger:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.message_log: List[Dict[str, Any]] = []
        self.interceptors: List[Callable] = []
        self.debug_subscriber = None
        self.monitoring = False
    
    def add_interceptor(self, interceptor: Callable[[Dict[str, Any]], None]):
        """添加消息拦截器"""
        self.interceptors.append(interceptor)
    
    def start_monitoring(self, patterns: List[str] = None):
        """开始监控消息"""
        if self.monitoring:
            return
        
        self.monitoring = True
        self.debug_subscriber = self.redis_client.pubsub()
        
        # 如果没有指定模式,监控所有消息
        if not patterns:
            patterns = ['*']
        
        for pattern in patterns:
            self.debug_subscriber.psubscribe(pattern)
        
        # 启动监控线程
        monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
        monitor_thread.start()
    
    def _monitor_loop(self):
        """监控循环"""
        while self.monitoring:
            try:
                message = self.debug_subscriber.get_message(timeout=1.0)
                if message and message['type'] == 'pmessage':
                    self._process_debug_message(message)
            except Exception as e:
                print(f"Debug monitor error: {e}")
                time.sleep(1)
    
    def _process_debug_message(self, message: Dict[str, Any]):
        """处理调试消息"""
        debug_info = {
            'timestamp': datetime.now().isoformat(),
            'pattern': message['pattern'].decode('utf-8'),
            'channel': message['channel'].decode('utf-8'),
            'data': message['data'].decode('utf-8'),
            'data_size': len(message['data'])
        }
        
        # 尝试解析JSON
        try:
            parsed_data = json.loads(debug_info['data'])
            debug_info['parsed_data'] = parsed_data
            debug_info['data_type'] = 'json'
        except json.JSONDecodeError:
            debug_info['data_type'] = 'text'
        
        # 添加到日志
        self.message_log.append(debug_info)
        
        # 保留最近1000条消息
        if len(self.message_log) > 1000:
            self.message_log.pop(0)
        
        # 调用拦截器
        for interceptor in self.interceptors:
            try:
                interceptor(debug_info)
            except Exception as e:
                print(f"Interceptor error: {e}")
    
    def stop_monitoring(self):
        """停止监控"""
        self.monitoring = False
        if self.debug_subscriber:
            self.debug_subscriber.close()
    
    def get_message_log(self, limit: int = 100) -> List[Dict[str, Any]]:
        """获取消息日志"""
        return self.message_log[-limit:]
    
    def search_messages(self, keyword: str = None, channel: str = None, 
                      since: datetime = None) -> List[Dict[str, Any]]:
        """搜索消息"""
        results = []
        
        for msg in self.message_log:
            # 时间过滤
            if since:
                msg_time = datetime.fromisoformat(msg['timestamp'])
                if msg_time < since:
                    continue
            
            # 频道过滤
            if channel and msg['channel'] != channel:
                continue
            
            # 关键词过滤
            if keyword:
                if (keyword.lower() not in msg['data'].lower() and 
                    keyword.lower() not in msg['channel'].lower()):
                    continue
            
            results.append(msg)
        
        return results
    
    def analyze_traffic(self) -> Dict[str, Any]:
        """分析消息流量"""
        if not self.message_log:
            return {'error': 'No messages to analyze'}
        
        # 按频道统计
        channel_stats = {}
        total_size = 0
        
        for msg in self.message_log:
            channel = msg['channel']
            size = msg['data_size']
            
            if channel not in channel_stats:
                channel_stats[channel] = {'count': 0, 'total_size': 0}
            
            channel_stats[channel]['count'] += 1
            channel_stats[channel]['total_size'] += size
            total_size += size
        
        # 计算平均值
        for stats in channel_stats.values():
            stats['avg_size'] = stats['total_size'] / stats['count']
        
        # 按消息数量排序
        sorted_channels = sorted(
            channel_stats.items(), 
            key=lambda x: x[1]['count'], 
            reverse=True
        )
        
        return {
            'total_messages': len(self.message_log),
            'total_size_bytes': total_size,
            'avg_message_size': total_size / len(self.message_log),
            'unique_channels': len(channel_stats),
            'top_channels': sorted_channels[:10],
            'analysis_time': datetime.now().isoformat()
        }

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    
    # 创建监控器
    monitor = PubSubMonitor(redis_client)
    debugger = PubSubDebugger(redis_client)
    
    # 添加调试拦截器
    def debug_interceptor(message_info):
        if message_info['data_size'] > 1024:  # 大消息警告
            print(f"[WARNING] Large message on {message_info['channel']}: {message_info['data_size']} bytes")
    
    debugger.add_interceptor(debug_interceptor)
    
    # 开始监控
    debugger.start_monitoring(['chat:*', 'notifications:*'])
    
    # 收集指标
    metrics = monitor.collect_metrics()
    print(f"当前指标:")
    print(f"  活跃频道: {metrics.active_channels}")
    print(f"  总订阅者: {metrics.total_subscribers}")
    print(f"  内存使用: {metrics.memory_usage / (1024*1024):.2f} MB")
    
    # 模拟一些消息发布
    publisher = redis_client
    publisher.publish("chat:room1", "Hello, world!")
    publisher.publish("notifications:user123", json.dumps({
        "type": "info",
        "message": "Welcome!"
    }))
    
    time.sleep(2)
    
    # 分析流量
    traffic_analysis = debugger.analyze_traffic()
    print(f"\n流量分析:")
    print(f"  总消息数: {traffic_analysis['total_messages']}")
    print(f"  平均消息大小: {traffic_analysis['avg_message_size']:.2f} bytes")
    
    # 生成监控报告
    report = monitor.generate_report(hours=1)
    print(f"\n监控报告:")
    print(json.dumps(report, indent=2, ensure_ascii=False))
    
    # 清理
    debugger.stop_monitoring()

8.6 最佳实践

8.6.1 设计原则

  1. 频道命名规范

    • 使用层次化命名:service:module:action
    • 避免特殊字符和空格
    • 保持命名一致性
  2. 消息格式标准化

    • 使用JSON格式传输结构化数据
    • 包含时间戳和消息类型
    • 添加版本信息以支持向后兼容
  3. 错误处理

    • 实现连接重试机制
    • 处理消息解析错误
    • 记录详细的错误日志

8.6.2 性能考虑

  1. 连接管理

    • 使用连接池减少连接开销
    • 为发布者和订阅者使用不同的连接
    • 合理设置连接超时参数
  2. 消息优化

    • 对大消息进行压缩
    • 避免发送过于频繁的小消息
    • 使用批量操作提高效率
  3. 内存管理

    • 及时清理过期的订阅
    • 限制消息历史记录的大小
    • 监控内存使用情况

8.6.3 安全考虑

  1. 访问控制

    • 使用Redis ACL限制频道访问
    • 验证发布者身份
    • 过滤敏感信息
  2. 消息验证

    • 验证消息格式和内容
    • 防止恶意消息注入
    • 限制消息大小和频率

8.7 总结

Redis发布订阅是一个强大的实时消息传递机制,具有以下特点:

核心优势: - 实时性强,延迟极低 - 支持多对多通信模式 - 实现简单,易于使用 - 支持模式匹配订阅

适用场景: - 实时通知系统 - 聊天和即时消息 - 事件驱动架构 - 缓存失效通知 - 系统监控告警

注意事项: - 消息不持久化,可能丢失 - 订阅者离线时无法接收消息 - 不支持消息确认机制 - 性能受网络和客户端处理能力影响

最佳实践: - 合理设计频道命名规范 - 实现健壮的错误处理机制 - 优化连接和消息传输 - 做好监控和调试准备 - 考虑安全和访问控制

通过合理使用Redis发布订阅功能,可以构建高效、实时的消息传递系统,满足各种实时通信需求。

参考资料