8.1 发布订阅概述
8.1.1 什么是发布订阅
Redis发布订阅(Pub/Sub)是一种消息通信模式,发送者(发布者)发送消息,接收者(订阅者)接收消息。发布者和订阅者之间通过频道(Channel)进行解耦。
8.1.2 核心概念
- 发布者(Publisher): 发送消息的客户端
- 订阅者(Subscriber): 接收消息的客户端
- 频道(Channel): 消息传递的通道
- 模式订阅(Pattern Subscribe): 支持通配符的频道订阅
8.1.3 特点
- 实时性: 消息实时推送,延迟极低
- 解耦性: 发布者和订阅者互不感知
- 多对多: 支持多个发布者和订阅者
- 无持久化: 消息不会被持久化存储
- 即发即失: 如果没有订阅者,消息会丢失
8.1.4 应用场景
- 实时通知: 系统状态变更通知
- 聊天系统: 实时消息推送
- 事件驱动: 微服务间事件通信
- 缓存失效: 缓存更新通知
- 监控告警: 系统监控和告警
- 直播弹幕: 实时弹幕推送
8.2 基本命令
8.2.1 发布消息
# 发布消息到频道
PUBLISH channel message
PUBLISH news "Breaking: Redis 7.0 Released!"
PUBLISH chat:room1 "Hello, everyone!"
PUBLISH notifications '{"type":"alert","message":"System maintenance"}'
# 返回值是接收到消息的订阅者数量
PUBLISH empty_channel "test" # 返回0,没有订阅者
8.2.2 订阅频道
# 订阅一个或多个频道
SUBSCRIBE channel [channel ...]
SUBSCRIBE news
SUBSCRIBE chat:room1 chat:room2 notifications
# 订阅后会收到确认消息
# 1) "subscribe"
# 2) "news"
# 3) (integer) 1 # 当前订阅的频道数
# 接收消息格式
# 1) "message"
# 2) "news" # 频道名
# 3) "Breaking: Redis 7.0 Released!" # 消息内容
8.2.3 模式订阅
# 使用通配符订阅多个频道
PSUBSCRIBE pattern [pattern ...]
PSUBSCRIBE chat:* # 订阅所有以chat:开头的频道
PSUBSCRIBE news.* # 订阅所有以news.开头的频道
PSUBSCRIBE user:*:login # 订阅用户登录事件
# 模式订阅的消息格式
# 1) "pmessage"
# 2) "chat:*" # 匹配的模式
# 3) "chat:room1" # 实际的频道名
# 4) "Hello!" # 消息内容
8.2.4 取消订阅
# 取消订阅指定频道
UNSUBSCRIBE [channel [channel ...]]
UNSUBSCRIBE news
UNSUBSCRIBE # 取消所有频道订阅
# 取消模式订阅
PUNSUBSCRIBE [pattern [pattern ...]]
PUNSUBSCRIBE chat:*
PUNSUBSCRIBE # 取消所有模式订阅
8.2.5 查看订阅信息
# 查看活跃的频道(至少有一个订阅者)
PUBSUB CHANNELS [pattern]
PUBSUB CHANNELS # 查看所有活跃频道
PUBSUB CHANNELS chat:* # 查看匹配模式的活跃频道
# 查看频道的订阅者数量
PUBSUB NUMSUB [channel [channel ...]]
PUBSUB NUMSUB news chat:room1
# 查看模式订阅的数量
PUBSUB NUMPAT
8.3 Python实现
8.3.1 基础发布订阅类
import redis
import json
import threading
import time
from typing import Dict, List, Callable, Any, Optional
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class MessageType(Enum):
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
MESSAGE = "message"
PMESSAGE = "pmessage"
PSUBSCRIBE = "psubscribe"
PUNSUBSCRIBE = "punsubscribe"
@dataclass
class PubSubMessage:
type: MessageType
channel: str
data: Any
pattern: Optional[str] = None
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class RedisPublisher:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def publish(self, channel: str, message: Any) -> int:
"""发布消息到频道"""
if isinstance(message, (dict, list)):
message = json.dumps(message, ensure_ascii=False)
elif not isinstance(message, str):
message = str(message)
return self.redis_client.publish(channel, message)
def publish_json(self, channel: str, data: Dict[str, Any]) -> int:
"""发布JSON消息"""
message = json.dumps(data, ensure_ascii=False)
return self.redis_client.publish(channel, message)
def publish_event(self, event_type: str, event_data: Dict[str, Any]) -> int:
"""发布事件消息"""
event = {
'type': event_type,
'data': event_data,
'timestamp': datetime.now().isoformat()
}
return self.publish_json(f"events:{event_type}", event)
def broadcast(self, channels: List[str], message: Any) -> Dict[str, int]:
"""广播消息到多个频道"""
results = {}
for channel in channels:
results[channel] = self.publish(channel, message)
return results
class RedisSubscriber:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.pubsub = self.redis_client.pubsub()
self.message_handlers: Dict[str, List[Callable]] = {}
self.pattern_handlers: Dict[str, List[Callable]] = {}
self.running = False
self.thread = None
def subscribe(self, channel: str, handler: Callable[[PubSubMessage], None] = None):
"""订阅频道"""
self.pubsub.subscribe(channel)
if handler:
if channel not in self.message_handlers:
self.message_handlers[channel] = []
self.message_handlers[channel].append(handler)
def psubscribe(self, pattern: str, handler: Callable[[PubSubMessage], None] = None):
"""模式订阅"""
self.pubsub.psubscribe(pattern)
if handler:
if pattern not in self.pattern_handlers:
self.pattern_handlers[pattern] = []
self.pattern_handlers[pattern].append(handler)
def unsubscribe(self, channel: str = None):
"""取消订阅"""
if channel:
self.pubsub.unsubscribe(channel)
if channel in self.message_handlers:
del self.message_handlers[channel]
else:
self.pubsub.unsubscribe()
self.message_handlers.clear()
def punsubscribe(self, pattern: str = None):
"""取消模式订阅"""
if pattern:
self.pubsub.punsubscribe(pattern)
if pattern in self.pattern_handlers:
del self.pattern_handlers[pattern]
else:
self.pubsub.punsubscribe()
self.pattern_handlers.clear()
def add_handler(self, channel: str, handler: Callable[[PubSubMessage], None]):
"""添加消息处理器"""
if channel not in self.message_handlers:
self.message_handlers[channel] = []
self.message_handlers[channel].append(handler)
def add_pattern_handler(self, pattern: str, handler: Callable[[PubSubMessage], None]):
"""添加模式处理器"""
if pattern not in self.pattern_handlers:
self.pattern_handlers[pattern] = []
self.pattern_handlers[pattern].append(handler)
def _process_message(self, raw_message: Dict[str, Any]):
"""处理接收到的消息"""
msg_type = MessageType(raw_message['type'])
if msg_type in [MessageType.SUBSCRIBE, MessageType.UNSUBSCRIBE,
MessageType.PSUBSCRIBE, MessageType.PUNSUBSCRIBE]:
# 订阅/取消订阅确认消息
return
# 构造消息对象
if msg_type == MessageType.MESSAGE:
message = PubSubMessage(
type=msg_type,
channel=raw_message['channel'].decode('utf-8'),
data=raw_message['data'].decode('utf-8')
)
# 调用频道处理器
handlers = self.message_handlers.get(message.channel, [])
for handler in handlers:
try:
handler(message)
except Exception as e:
print(f"Handler error for channel {message.channel}: {e}")
elif msg_type == MessageType.PMESSAGE:
pattern = raw_message['pattern'].decode('utf-8')
message = PubSubMessage(
type=msg_type,
channel=raw_message['channel'].decode('utf-8'),
data=raw_message['data'].decode('utf-8'),
pattern=pattern
)
# 调用模式处理器
handlers = self.pattern_handlers.get(pattern, [])
for handler in handlers:
try:
handler(message)
except Exception as e:
print(f"Handler error for pattern {pattern}: {e}")
def start_listening(self):
"""开始监听消息"""
if self.running:
return
self.running = True
self.thread = threading.Thread(target=self._listen_loop, daemon=True)
self.thread.start()
def _listen_loop(self):
"""消息监听循环"""
while self.running:
try:
message = self.pubsub.get_message(timeout=1.0)
if message:
self._process_message(message)
except Exception as e:
print(f"Listen loop error: {e}")
time.sleep(1)
def stop_listening(self):
"""停止监听"""
self.running = False
if self.thread:
self.thread.join(timeout=5)
def close(self):
"""关闭订阅者"""
self.stop_listening()
self.pubsub.close()
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=False) # 注意:这里不要decode_responses
# 创建发布者
publisher = RedisPublisher(redis_client)
# 创建订阅者
subscriber = RedisSubscriber(redis_client)
# 定义消息处理器
def news_handler(message: PubSubMessage):
print(f"[NEWS] {message.timestamp}: {message.data}")
def chat_handler(message: PubSubMessage):
print(f"[CHAT] {message.channel}: {message.data}")
def event_handler(message: PubSubMessage):
try:
event_data = json.loads(message.data)
print(f"[EVENT] {event_data['type']}: {event_data['data']}")
except json.JSONDecodeError:
print(f"[EVENT] Invalid JSON: {message.data}")
# 订阅频道
subscriber.subscribe("news", news_handler)
subscriber.psubscribe("chat:*", chat_handler)
subscriber.psubscribe("events:*", event_handler)
# 开始监听
subscriber.start_listening()
# 发布消息
time.sleep(1) # 等待订阅生效
publisher.publish("news", "Redis发布订阅功能介绍")
publisher.publish("chat:room1", "大家好!")
publisher.publish("chat:room2", "欢迎来到聊天室")
# 发布事件
publisher.publish_event("user_login", {
"user_id": "user123",
"ip": "192.168.1.100",
"timestamp": datetime.now().isoformat()
})
# 等待消息处理
time.sleep(2)
# 清理
subscriber.close()
8.3.2 聊天室系统
import redis
import json
import uuid
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, asdict
from datetime import datetime
from enum import Enum
class MessageType(Enum):
TEXT = "text"
IMAGE = "image"
FILE = "file"
SYSTEM = "system"
EMOJI = "emoji"
class UserStatus(Enum):
ONLINE = "online"
OFFLINE = "offline"
AWAY = "away"
BUSY = "busy"
@dataclass
class ChatUser:
user_id: str
username: str
avatar: str = ""
status: UserStatus = UserStatus.ONLINE
last_seen: datetime = None
def __post_init__(self):
if self.last_seen is None:
self.last_seen = datetime.now()
@dataclass
class ChatMessage:
message_id: str
room_id: str
user_id: str
username: str
message_type: MessageType
content: str
timestamp: datetime
reply_to: Optional[str] = None
metadata: Optional[Dict] = None
def to_dict(self) -> Dict:
data = asdict(self)
data['timestamp'] = self.timestamp.isoformat()
data['message_type'] = self.message_type.value
return data
@classmethod
def from_dict(cls, data: Dict) -> 'ChatMessage':
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
data['message_type'] = MessageType(data['message_type'])
return cls(**data)
@dataclass
class ChatRoom:
room_id: str
name: str
description: str = ""
created_by: str = ""
created_at: datetime = None
max_users: int = 100
is_private: bool = False
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
class ChatSystem:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.publisher = RedisPublisher(redis_client)
self.rooms_key = "chat:rooms"
self.users_key = "chat:users"
self.room_users_prefix = "chat:room_users"
self.user_rooms_prefix = "chat:user_rooms"
self.message_history_prefix = "chat:history"
def create_room(self, room: ChatRoom) -> bool:
"""创建聊天室"""
room_data = asdict(room)
room_data['created_at'] = room.created_at.isoformat()
# 保存房间信息
self.redis_client.hset(self.rooms_key, room.room_id, json.dumps(room_data))
# 发布房间创建事件
self.publisher.publish_event("room_created", {
"room_id": room.room_id,
"name": room.name,
"created_by": room.created_by
})
return True
def get_room(self, room_id: str) -> Optional[ChatRoom]:
"""获取聊天室信息"""
room_data = self.redis_client.hget(self.rooms_key, room_id)
if not room_data:
return None
data = json.loads(room_data)
data['created_at'] = datetime.fromisoformat(data['created_at'])
return ChatRoom(**data)
def list_rooms(self) -> List[ChatRoom]:
"""获取所有聊天室"""
rooms_data = self.redis_client.hgetall(self.rooms_key)
rooms = []
for room_id, room_data in rooms_data.items():
data = json.loads(room_data)
data['created_at'] = datetime.fromisoformat(data['created_at'])
rooms.append(ChatRoom(**data))
return rooms
def join_room(self, room_id: str, user: ChatUser) -> bool:
"""用户加入聊天室"""
room = self.get_room(room_id)
if not room:
return False
room_users_key = f"{self.room_users_prefix}:{room_id}"
user_rooms_key = f"{self.user_rooms_prefix}:{user.user_id}"
# 检查房间人数限制
current_users = self.redis_client.scard(room_users_key)
if current_users >= room.max_users:
return False
# 添加用户到房间
self.redis_client.sadd(room_users_key, user.user_id)
self.redis_client.sadd(user_rooms_key, room_id)
# 保存用户信息
user_data = asdict(user)
user_data['last_seen'] = user.last_seen.isoformat()
user_data['status'] = user.status.value
self.redis_client.hset(self.users_key, user.user_id, json.dumps(user_data))
# 发布用户加入事件
self.publisher.publish(f"chat:room:{room_id}", json.dumps({
"type": "user_joined",
"user_id": user.user_id,
"username": user.username,
"timestamp": datetime.now().isoformat()
}))
# 发送系统消息
system_message = ChatMessage(
message_id=str(uuid.uuid4()),
room_id=room_id,
user_id="system",
username="系统",
message_type=MessageType.SYSTEM,
content=f"{user.username} 加入了聊天室",
timestamp=datetime.now()
)
self._save_message(system_message)
return True
def leave_room(self, room_id: str, user_id: str) -> bool:
"""用户离开聊天室"""
room_users_key = f"{self.room_users_prefix}:{room_id}"
user_rooms_key = f"{self.user_rooms_prefix}:{user_id}"
# 从房间移除用户
removed = self.redis_client.srem(room_users_key, user_id)
self.redis_client.srem(user_rooms_key, room_id)
if removed:
# 获取用户信息
user_data = self.redis_client.hget(self.users_key, user_id)
username = "未知用户"
if user_data:
user_info = json.loads(user_data)
username = user_info.get('username', '未知用户')
# 发布用户离开事件
self.publisher.publish(f"chat:room:{room_id}", json.dumps({
"type": "user_left",
"user_id": user_id,
"username": username,
"timestamp": datetime.now().isoformat()
}))
# 发送系统消息
system_message = ChatMessage(
message_id=str(uuid.uuid4()),
room_id=room_id,
user_id="system",
username="系统",
message_type=MessageType.SYSTEM,
content=f"{username} 离开了聊天室",
timestamp=datetime.now()
)
self._save_message(system_message)
return bool(removed)
def send_message(self, message: ChatMessage) -> bool:
"""发送消息"""
# 验证用户是否在房间中
room_users_key = f"{self.room_users_prefix}:{message.room_id}"
if not self.redis_client.sismember(room_users_key, message.user_id):
return False
# 保存消息
self._save_message(message)
# 发布消息到房间频道
self.publisher.publish(f"chat:room:{message.room_id}", json.dumps(message.to_dict()))
# 更新用户最后活跃时间
self._update_user_activity(message.user_id)
return True
def _save_message(self, message: ChatMessage):
"""保存消息到历史记录"""
history_key = f"{self.message_history_prefix}:{message.room_id}"
# 使用有序集合保存消息,分数为时间戳
score = message.timestamp.timestamp()
self.redis_client.zadd(history_key, {json.dumps(message.to_dict()): score})
# 保留最近1000条消息
self.redis_client.zremrangebyrank(history_key, 0, -1001)
# 设置过期时间(30天)
self.redis_client.expire(history_key, 86400 * 30)
def get_message_history(self, room_id: str, limit: int = 50, before_timestamp: datetime = None) -> List[ChatMessage]:
"""获取消息历史"""
history_key = f"{self.message_history_prefix}:{room_id}"
if before_timestamp:
max_score = before_timestamp.timestamp()
messages_data = self.redis_client.zrevrangebyscore(
history_key, max_score, '-inf', start=0, num=limit
)
else:
messages_data = self.redis_client.zrevrange(history_key, 0, limit-1)
messages = []
for msg_data in messages_data:
try:
msg_dict = json.loads(msg_data)
messages.append(ChatMessage.from_dict(msg_dict))
except (json.JSONDecodeError, TypeError):
continue
return list(reversed(messages)) # 按时间正序返回
def get_room_users(self, room_id: str) -> List[ChatUser]:
"""获取房间用户列表"""
room_users_key = f"{self.room_users_prefix}:{room_id}"
user_ids = self.redis_client.smembers(room_users_key)
users = []
for user_id in user_ids:
user_data = self.redis_client.hget(self.users_key, user_id)
if user_data:
try:
data = json.loads(user_data)
data['last_seen'] = datetime.fromisoformat(data['last_seen'])
data['status'] = UserStatus(data['status'])
users.append(ChatUser(**data))
except (json.JSONDecodeError, ValueError):
continue
return users
def get_user_rooms(self, user_id: str) -> List[ChatRoom]:
"""获取用户加入的房间列表"""
user_rooms_key = f"{self.user_rooms_prefix}:{user_id}"
room_ids = self.redis_client.smembers(user_rooms_key)
rooms = []
for room_id in room_ids:
room = self.get_room(room_id)
if room:
rooms.append(room)
return rooms
def update_user_status(self, user_id: str, status: UserStatus) -> bool:
"""更新用户状态"""
user_data = self.redis_client.hget(self.users_key, user_id)
if not user_data:
return False
data = json.loads(user_data)
data['status'] = status.value
data['last_seen'] = datetime.now().isoformat()
self.redis_client.hset(self.users_key, user_id, json.dumps(data))
# 通知用户状态变更
user_rooms = self.get_user_rooms(user_id)
for room in user_rooms:
self.publisher.publish(f"chat:room:{room.room_id}", json.dumps({
"type": "user_status_changed",
"user_id": user_id,
"status": status.value,
"timestamp": datetime.now().isoformat()
}))
return True
def _update_user_activity(self, user_id: str):
"""更新用户活跃时间"""
user_data = self.redis_client.hget(self.users_key, user_id)
if user_data:
data = json.loads(user_data)
data['last_seen'] = datetime.now().isoformat()
self.redis_client.hset(self.users_key, user_id, json.dumps(data))
def search_messages(self, room_id: str, keyword: str, limit: int = 20) -> List[ChatMessage]:
"""搜索消息"""
history_key = f"{self.message_history_prefix}:{room_id}"
all_messages = self.redis_client.zrevrange(history_key, 0, -1)
matching_messages = []
for msg_data in all_messages:
try:
msg_dict = json.loads(msg_data)
if keyword.lower() in msg_dict.get('content', '').lower():
matching_messages.append(ChatMessage.from_dict(msg_dict))
if len(matching_messages) >= limit:
break
except (json.JSONDecodeError, TypeError):
continue
return matching_messages
def get_room_stats(self, room_id: str) -> Dict[str, any]:
"""获取房间统计信息"""
room = self.get_room(room_id)
if not room:
return {}
room_users_key = f"{self.room_users_prefix}:{room_id}"
history_key = f"{self.message_history_prefix}:{room_id}"
# 当前在线用户数
current_users = self.redis_client.scard(room_users_key)
# 消息总数
total_messages = self.redis_client.zcard(history_key)
# 今日消息数
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_messages = self.redis_client.zcount(
history_key, today_start.timestamp(), '+inf'
)
return {
'room_id': room_id,
'room_name': room.name,
'current_users': current_users,
'max_users': room.max_users,
'total_messages': total_messages,
'today_messages': today_messages,
'created_at': room.created_at.isoformat()
}
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
chat_system = ChatSystem(redis_client)
# 创建聊天室
room = ChatRoom(
room_id="room_001",
name="技术讨论",
description="讨论技术相关话题",
created_by="admin",
max_users=50
)
chat_system.create_room(room)
# 创建用户
user1 = ChatUser(
user_id="user_001",
username="张三",
avatar="avatar1.jpg"
)
user2 = ChatUser(
user_id="user_002",
username="李四",
avatar="avatar2.jpg"
)
# 用户加入聊天室
chat_system.join_room("room_001", user1)
chat_system.join_room("room_001", user2)
# 发送消息
message1 = ChatMessage(
message_id=str(uuid.uuid4()),
room_id="room_001",
user_id="user_001",
username="张三",
message_type=MessageType.TEXT,
content="大家好!",
timestamp=datetime.now()
)
message2 = ChatMessage(
message_id=str(uuid.uuid4()),
room_id="room_001",
user_id="user_002",
username="李四",
message_type=MessageType.TEXT,
content="你好,张三!",
timestamp=datetime.now(),
reply_to=message1.message_id
)
chat_system.send_message(message1)
chat_system.send_message(message2)
# 获取消息历史
history = chat_system.get_message_history("room_001")
print(f"消息历史 ({len(history)} 条):")
for msg in history:
print(f" [{msg.timestamp.strftime('%H:%M:%S')}] {msg.username}: {msg.content}")
# 获取房间用户
users = chat_system.get_room_users("room_001")
print(f"\n房间用户 ({len(users)} 人):")
for user in users:
print(f" {user.username} ({user.status.value})")
# 获取房间统计
stats = chat_system.get_room_stats("room_001")
print(f"\n房间统计:")
print(f" 当前用户: {stats['current_users']}/{stats['max_users']}")
print(f" 总消息数: {stats['total_messages']}")
print(f" 今日消息: {stats['today_messages']}")
8.3.3 实时通知系统
import redis
import json
import uuid
from typing import Dict, List, Optional, Set, Callable
from dataclasses import dataclass, asdict
from datetime import datetime, timedelta
from enum import Enum
class NotificationType(Enum):
INFO = "info"
WARNING = "warning"
ERROR = "error"
SUCCESS = "success"
SYSTEM = "system"
class NotificationPriority(Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
URGENT = "urgent"
@dataclass
class Notification:
notification_id: str
user_id: str
title: str
content: str
notification_type: NotificationType
priority: NotificationPriority
timestamp: datetime
read: bool = False
action_url: Optional[str] = None
metadata: Optional[Dict] = None
expires_at: Optional[datetime] = None
def to_dict(self) -> Dict:
data = asdict(self)
data['timestamp'] = self.timestamp.isoformat()
data['notification_type'] = self.notification_type.value
data['priority'] = self.priority.value
if self.expires_at:
data['expires_at'] = self.expires_at.isoformat()
return data
@classmethod
def from_dict(cls, data: Dict) -> 'Notification':
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
data['notification_type'] = NotificationType(data['notification_type'])
data['priority'] = NotificationPriority(data['priority'])
if data.get('expires_at'):
data['expires_at'] = datetime.fromisoformat(data['expires_at'])
return cls(**data)
class NotificationSystem:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.publisher = RedisPublisher(redis_client)
self.notifications_prefix = "notifications"
self.user_notifications_prefix = "user_notifications"
self.notification_settings_prefix = "notification_settings"
self.unread_count_prefix = "unread_count"
def send_notification(self, notification: Notification) -> bool:
"""发送通知"""
# 保存通知
notification_key = f"{self.notifications_prefix}:{notification.notification_id}"
self.redis_client.setex(
notification_key,
86400 * 30, # 30天过期
json.dumps(notification.to_dict())
)
# 添加到用户通知列表
user_notifications_key = f"{self.user_notifications_prefix}:{notification.user_id}"
score = notification.timestamp.timestamp()
self.redis_client.zadd(user_notifications_key, {notification.notification_id: score})
# 保留最近1000条通知
self.redis_client.zremrangebyrank(user_notifications_key, 0, -1001)
# 更新未读计数
if not notification.read:
unread_key = f"{self.unread_count_prefix}:{notification.user_id}"
self.redis_client.incr(unread_key)
self.redis_client.expire(unread_key, 86400 * 30)
# 实时推送通知
self.publisher.publish(
f"notifications:user:{notification.user_id}",
json.dumps(notification.to_dict())
)
# 根据优先级推送到不同频道
if notification.priority in [NotificationPriority.HIGH, NotificationPriority.URGENT]:
self.publisher.publish(
f"notifications:priority:{notification.priority.value}",
json.dumps(notification.to_dict())
)
return True
def send_bulk_notification(self, user_ids: List[str], title: str, content: str,
notification_type: NotificationType = NotificationType.INFO,
priority: NotificationPriority = NotificationPriority.NORMAL,
action_url: str = None, metadata: Dict = None) -> List[str]:
"""批量发送通知"""
notification_ids = []
for user_id in user_ids:
notification_id = str(uuid.uuid4())
notification = Notification(
notification_id=notification_id,
user_id=user_id,
title=title,
content=content,
notification_type=notification_type,
priority=priority,
timestamp=datetime.now(),
action_url=action_url,
metadata=metadata
)
if self.send_notification(notification):
notification_ids.append(notification_id)
return notification_ids
def get_user_notifications(self, user_id: str, limit: int = 20,
offset: int = 0, unread_only: bool = False) -> List[Notification]:
"""获取用户通知列表"""
user_notifications_key = f"{self.user_notifications_prefix}:{user_id}"
# 获取通知ID列表
notification_ids = self.redis_client.zrevrange(
user_notifications_key, offset, offset + limit - 1
)
notifications = []
for notification_id in notification_ids:
notification = self.get_notification(notification_id)
if notification:
if unread_only and notification.read:
continue
notifications.append(notification)
return notifications
def get_notification(self, notification_id: str) -> Optional[Notification]:
"""获取单个通知"""
notification_key = f"{self.notifications_prefix}:{notification_id}"
notification_data = self.redis_client.get(notification_key)
if not notification_data:
return None
try:
data = json.loads(notification_data)
return Notification.from_dict(data)
except (json.JSONDecodeError, TypeError, ValueError):
return None
def mark_as_read(self, notification_id: str) -> bool:
"""标记通知为已读"""
notification = self.get_notification(notification_id)
if not notification or notification.read:
return False
# 更新通知状态
notification.read = True
notification_key = f"{self.notifications_prefix}:{notification_id}"
self.redis_client.setex(
notification_key,
86400 * 30,
json.dumps(notification.to_dict())
)
# 减少未读计数
unread_key = f"{self.unread_count_prefix}:{notification.user_id}"
current_count = self.redis_client.get(unread_key)
if current_count and int(current_count) > 0:
self.redis_client.decr(unread_key)
# 推送已读状态更新
self.publisher.publish(
f"notifications:user:{notification.user_id}:read",
json.dumps({
"notification_id": notification_id,
"read": True,
"timestamp": datetime.now().isoformat()
})
)
return True
def mark_all_as_read(self, user_id: str) -> int:
"""标记用户所有通知为已读"""
user_notifications_key = f"{self.user_notifications_prefix}:{user_id}"
notification_ids = self.redis_client.zrange(user_notifications_key, 0, -1)
read_count = 0
for notification_id in notification_ids:
if self.mark_as_read(notification_id):
read_count += 1
# 重置未读计数
unread_key = f"{self.unread_count_prefix}:{user_id}"
self.redis_client.delete(unread_key)
return read_count
def get_unread_count(self, user_id: str) -> int:
"""获取未读通知数量"""
unread_key = f"{self.unread_count_prefix}:{user_id}"
count = self.redis_client.get(unread_key)
return int(count) if count else 0
def delete_notification(self, notification_id: str) -> bool:
"""删除通知"""
notification = self.get_notification(notification_id)
if not notification:
return False
# 删除通知数据
notification_key = f"{self.notifications_prefix}:{notification_id}"
self.redis_client.delete(notification_key)
# 从用户通知列表中移除
user_notifications_key = f"{self.user_notifications_prefix}:{notification.user_id}"
self.redis_client.zrem(user_notifications_key, notification_id)
# 如果是未读通知,减少未读计数
if not notification.read:
unread_key = f"{self.unread_count_prefix}:{notification.user_id}"
current_count = self.redis_client.get(unread_key)
if current_count and int(current_count) > 0:
self.redis_client.decr(unread_key)
return True
def cleanup_expired_notifications(self) -> int:
"""清理过期通知"""
# 这是一个简化的实现,实际应该使用更高效的方法
# 比如使用Redis的过期机制或定期任务
cleaned_count = 0
now = datetime.now()
# 获取所有用户的通知列表
pattern = f"{self.user_notifications_prefix}:*"
for key in self.redis_client.scan_iter(match=pattern):
user_id = key.split(':')[-1]
notification_ids = self.redis_client.zrange(key, 0, -1)
for notification_id in notification_ids:
notification = self.get_notification(notification_id)
if notification and notification.expires_at and notification.expires_at < now:
if self.delete_notification(notification_id):
cleaned_count += 1
return cleaned_count
def get_notification_stats(self, user_id: str) -> Dict[str, any]:
"""获取通知统计信息"""
user_notifications_key = f"{self.user_notifications_prefix}:{user_id}"
# 总通知数
total_notifications = self.redis_client.zcard(user_notifications_key)
# 未读通知数
unread_count = self.get_unread_count(user_id)
# 今日通知数
today_start = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
today_notifications = self.redis_client.zcount(
user_notifications_key, today_start.timestamp(), '+inf'
)
# 按类型统计
type_stats = {}
notification_ids = self.redis_client.zrange(user_notifications_key, 0, -1)
for notification_id in notification_ids:
notification = self.get_notification(notification_id)
if notification:
type_name = notification.notification_type.value
type_stats[type_name] = type_stats.get(type_name, 0) + 1
return {
'user_id': user_id,
'total_notifications': total_notifications,
'unread_count': unread_count,
'read_count': total_notifications - unread_count,
'today_notifications': today_notifications,
'type_stats': type_stats
}
class NotificationSubscriber:
def __init__(self, redis_client: redis.Redis, user_id: str):
self.redis_client = redis_client
self.user_id = user_id
self.subscriber = RedisSubscriber(redis_client)
self.notification_handlers: List[Callable[[Notification], None]] = []
self.read_handlers: List[Callable[[str], None]] = []
def add_notification_handler(self, handler: Callable[[Notification], None]):
"""添加通知处理器"""
self.notification_handlers.append(handler)
def add_read_handler(self, handler: Callable[[str], None]):
"""添加已读处理器"""
self.read_handlers.append(handler)
def start_listening(self):
"""开始监听通知"""
# 订阅用户通知频道
self.subscriber.subscribe(
f"notifications:user:{self.user_id}",
self._handle_notification
)
# 订阅已读状态更新频道
self.subscriber.subscribe(
f"notifications:user:{self.user_id}:read",
self._handle_read_update
)
# 订阅高优先级通知
self.subscriber.psubscribe(
"notifications:priority:*",
self._handle_priority_notification
)
self.subscriber.start_listening()
def _handle_notification(self, message: PubSubMessage):
"""处理通知消息"""
try:
data = json.loads(message.data)
notification = Notification.from_dict(data)
for handler in self.notification_handlers:
handler(notification)
except (json.JSONDecodeError, TypeError, ValueError) as e:
print(f"Failed to parse notification: {e}")
def _handle_read_update(self, message: PubSubMessage):
"""处理已读状态更新"""
try:
data = json.loads(message.data)
notification_id = data.get('notification_id')
for handler in self.read_handlers:
handler(notification_id)
except (json.JSONDecodeError, TypeError) as e:
print(f"Failed to parse read update: {e}")
def _handle_priority_notification(self, message: PubSubMessage):
"""处理高优先级通知"""
try:
data = json.loads(message.data)
notification = Notification.from_dict(data)
# 只处理发给当前用户的高优先级通知
if notification.user_id == self.user_id:
print(f"[URGENT] {notification.title}: {notification.content}")
except (json.JSONDecodeError, TypeError, ValueError) as e:
print(f"Failed to parse priority notification: {e}")
def stop_listening(self):
"""停止监听"""
self.subscriber.stop_listening()
def close(self):
"""关闭订阅者"""
self.subscriber.close()
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
notification_system = NotificationSystem(redis_client)
# 创建通知订阅者
user_id = "user_123"
subscriber = NotificationSubscriber(redis_client, user_id)
# 定义通知处理器
def notification_handler(notification: Notification):
print(f"[{notification.notification_type.value.upper()}] {notification.title}")
print(f" 内容: {notification.content}")
print(f" 时间: {notification.timestamp.strftime('%Y-%m-%d %H:%M:%S')}")
print(f" 优先级: {notification.priority.value}")
if notification.action_url:
print(f" 操作链接: {notification.action_url}")
print()
def read_handler(notification_id: str):
print(f"通知 {notification_id} 已标记为已读")
# 添加处理器
subscriber.add_notification_handler(notification_handler)
subscriber.add_read_handler(read_handler)
# 开始监听
subscriber.start_listening()
# 发送各种类型的通知
import time
time.sleep(1) # 等待订阅生效
# 普通信息通知
notification1 = Notification(
notification_id=str(uuid.uuid4()),
user_id=user_id,
title="欢迎使用系统",
content="感谢您注册我们的服务,祝您使用愉快!",
notification_type=NotificationType.INFO,
priority=NotificationPriority.NORMAL,
timestamp=datetime.now()
)
notification_system.send_notification(notification1)
# 警告通知
notification2 = Notification(
notification_id=str(uuid.uuid4()),
user_id=user_id,
title="账户安全提醒",
content="检测到您的账户在异地登录,请确认是否为本人操作",
notification_type=NotificationType.WARNING,
priority=NotificationPriority.HIGH,
timestamp=datetime.now(),
action_url="/security/check"
)
notification_system.send_notification(notification2)
# 紧急通知
notification3 = Notification(
notification_id=str(uuid.uuid4()),
user_id=user_id,
title="系统维护通知",
content="系统将于今晚23:00-01:00进行维护,请提前保存工作",
notification_type=NotificationType.SYSTEM,
priority=NotificationPriority.URGENT,
timestamp=datetime.now(),
expires_at=datetime.now() + timedelta(hours=8)
)
notification_system.send_notification(notification3)
# 等待消息处理
time.sleep(2)
# 获取通知统计
stats = notification_system.get_notification_stats(user_id)
print(f"通知统计:")
print(f" 总通知数: {stats['total_notifications']}")
print(f" 未读通知: {stats['unread_count']}")
print(f" 今日通知: {stats['today_notifications']}")
print(f" 类型统计: {stats['type_stats']}")
# 标记第一个通知为已读
notification_system.mark_as_read(notification1.notification_id)
time.sleep(1)
# 获取用户通知列表
notifications = notification_system.get_user_notifications(user_id)
print(f"\n用户通知列表 ({len(notifications)} 条):")
for notif in notifications:
status = "已读" if notif.read else "未读"
print(f" [{status}] {notif.title} - {notif.timestamp.strftime('%H:%M:%S')}")
# 清理
subscriber.close()
8.4 性能优化
8.4.1 连接池优化
import redis
from redis.connection import ConnectionPool
from typing import Dict, List
class OptimizedPubSubManager:
def __init__(self, redis_config: Dict[str, any]):
# 为发布者创建专用连接池
self.publisher_pool = ConnectionPool(
host=redis_config.get('host', 'localhost'),
port=redis_config.get('port', 6379),
db=redis_config.get('db', 0),
password=redis_config.get('password'),
max_connections=redis_config.get('publisher_max_connections', 10),
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={}
)
# 为订阅者创建专用连接池
self.subscriber_pool = ConnectionPool(
host=redis_config.get('host', 'localhost'),
port=redis_config.get('port', 6379),
db=redis_config.get('db', 0),
password=redis_config.get('password'),
max_connections=redis_config.get('subscriber_max_connections', 20),
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={}
)
self.publisher_client = redis.Redis(connection_pool=self.publisher_pool)
self.subscriber_clients: List[redis.Redis] = []
def get_publisher(self) -> redis.Redis:
"""获取发布者客户端"""
return self.publisher_client
def get_subscriber(self) -> redis.Redis:
"""获取订阅者客户端"""
client = redis.Redis(connection_pool=self.subscriber_pool)
self.subscriber_clients.append(client)
return client
def close_all(self):
"""关闭所有连接"""
for client in self.subscriber_clients:
try:
client.close()
except:
pass
try:
self.publisher_client.close()
except:
pass
self.publisher_pool.disconnect()
self.subscriber_pool.disconnect()
8.4.2 批量操作优化
import redis
from typing import List, Dict, Any
from datetime import datetime
class BatchPubSubOperations:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def batch_publish(self, messages: List[Dict[str, Any]]) -> List[int]:
"""批量发布消息"""
pipe = self.redis_client.pipeline()
for msg in messages:
channel = msg['channel']
content = msg['content']
pipe.publish(channel, content)
results = pipe.execute()
return results
def publish_to_multiple_channels(self, channels: List[str], message: str) -> Dict[str, int]:
"""发布消息到多个频道"""
pipe = self.redis_client.pipeline()
for channel in channels:
pipe.publish(channel, message)
results = pipe.execute()
return dict(zip(channels, results))
def batch_channel_info(self, channels: List[str]) -> Dict[str, Dict[str, Any]]:
"""批量获取频道信息"""
pipe = self.redis_client.pipeline()
# 批量获取频道订阅者数量
for channel in channels:
pipe.pubsub_numsub(channel)
# 获取活跃频道列表
pipe.pubsub_channels()
# 获取模式订阅数量
pipe.pubsub_numpat()
results = pipe.execute()
# 解析结果
channel_info = {}
active_channels = results[-2]
pattern_count = results[-1]
for i, channel in enumerate(channels):
numsub_result = results[i]
subscriber_count = numsub_result[1] if len(numsub_result) > 1 else 0
channel_info[channel] = {
'subscriber_count': subscriber_count,
'is_active': channel.encode() in active_channels,
'pattern_subscriptions': pattern_count
}
return channel_info
### 8.4.3 消息压缩优化
```python
import gzip
import json
import base64
from typing import Any, Dict
class CompressedPublisher:
def __init__(self, redis_client: redis.Redis, compression_threshold: int = 1024):
self.redis_client = redis_client
self.compression_threshold = compression_threshold
def publish_compressed(self, channel: str, data: Any) -> int:
"""发布压缩消息"""
# 序列化数据
if isinstance(data, (dict, list)):
message = json.dumps(data, ensure_ascii=False)
else:
message = str(data)
# 检查是否需要压缩
if len(message.encode('utf-8')) > self.compression_threshold:
# 压缩消息
compressed = gzip.compress(message.encode('utf-8'))
encoded = base64.b64encode(compressed).decode('ascii')
# 添加压缩标识
final_message = json.dumps({
'compressed': True,
'data': encoded,
'original_size': len(message.encode('utf-8')),
'compressed_size': len(compressed)
})
else:
# 不压缩
final_message = json.dumps({
'compressed': False,
'data': message
})
return self.redis_client.publish(channel, final_message)
@staticmethod
def decompress_message(raw_message: str) -> str:
"""解压缩消息"""
try:
msg_data = json.loads(raw_message)
if msg_data.get('compressed', False):
# 解压缩
compressed_data = base64.b64decode(msg_data['data'])
decompressed = gzip.decompress(compressed_data)
return decompressed.decode('utf-8')
else:
# 未压缩
return msg_data['data']
except (json.JSONDecodeError, KeyError, Exception):
# 如果解析失败,返回原始消息
return raw_message
### 8.4.4 连接重试机制
```python
import time
import logging
from typing import Callable, Optional
from functools import wraps
class ResilientSubscriber:
def __init__(self, redis_client: redis.Redis, max_retries: int = 5,
retry_delay: float = 1.0, backoff_factor: float = 2.0):
self.redis_client = redis_client
self.max_retries = max_retries
self.retry_delay = retry_delay
self.backoff_factor = backoff_factor
self.pubsub = None
self.subscriptions = {}
self.pattern_subscriptions = {}
self.running = False
self.logger = logging.getLogger(__name__)
def with_retry(self, func: Callable) -> Callable:
"""重试装饰器"""
@wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
delay = self.retry_delay
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except (redis.ConnectionError, redis.TimeoutError) as e:
last_exception = e
if attempt < self.max_retries:
self.logger.warning(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay}s...")
time.sleep(delay)
delay *= self.backoff_factor
# 重新建立连接
self._reconnect()
else:
self.logger.error(f"All {self.max_retries + 1} attempts failed")
raise last_exception
except Exception as e:
self.logger.error(f"Non-recoverable error: {e}")
raise
raise last_exception
return wrapper
def _reconnect(self):
"""重新连接"""
try:
if self.pubsub:
self.pubsub.close()
except:
pass
self.pubsub = self.redis_client.pubsub()
# 重新订阅
if self.subscriptions:
self.pubsub.subscribe(**self.subscriptions)
if self.pattern_subscriptions:
self.pubsub.psubscribe(**self.pattern_subscriptions)
@with_retry
def subscribe(self, channel: str, handler: Callable = None):
"""订阅频道(带重试)"""
if not self.pubsub:
self.pubsub = self.redis_client.pubsub()
self.subscriptions[channel] = handler
self.pubsub.subscribe(channel)
@with_retry
def psubscribe(self, pattern: str, handler: Callable = None):
"""模式订阅(带重试)"""
if not self.pubsub:
self.pubsub = self.redis_client.pubsub()
self.pattern_subscriptions[pattern] = handler
self.pubsub.psubscribe(pattern)
@with_retry
def get_message(self, timeout: float = 1.0):
"""获取消息(带重试)"""
if not self.pubsub:
raise redis.ConnectionError("Not connected")
return self.pubsub.get_message(timeout=timeout)
def start_resilient_listening(self):
"""开始弹性监听"""
self.running = True
while self.running:
try:
message = self.get_message()
if message:
self._process_message(message)
except (redis.ConnectionError, redis.TimeoutError) as e:
self.logger.warning(f"Connection lost during listening: {e}")
time.sleep(self.retry_delay)
except Exception as e:
self.logger.error(f"Unexpected error during listening: {e}")
time.sleep(1)
def _process_message(self, message):
"""处理消息"""
msg_type = message.get('type')
if msg_type == 'message':
channel = message['channel'].decode('utf-8')
handler = self.subscriptions.get(channel)
if handler:
try:
handler(message)
except Exception as e:
self.logger.error(f"Handler error for channel {channel}: {e}")
elif msg_type == 'pmessage':
pattern = message['pattern'].decode('utf-8')
handler = self.pattern_subscriptions.get(pattern)
if handler:
try:
handler(message)
except Exception as e:
self.logger.error(f"Handler error for pattern {pattern}: {e}")
def stop_listening(self):
"""停止监听"""
self.running = False
def close(self):
"""关闭连接"""
self.stop_listening()
if self.pubsub:
self.pubsub.close()
## 8.5 监控和调试
### 8.5.1 发布订阅监控
```python
import redis
import time
import json
from typing import Dict, List, Any
from datetime import datetime, timedelta
from dataclasses import dataclass
@dataclass
class PubSubMetrics:
timestamp: datetime
active_channels: int
pattern_subscriptions: int
total_subscribers: int
messages_per_second: float
memory_usage: int
connection_count: int
class PubSubMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.metrics_history: List[PubSubMetrics] = []
self.message_counts = {}
self.last_check_time = time.time()
def collect_metrics(self) -> PubSubMetrics:
"""收集发布订阅指标"""
current_time = time.time()
# 获取活跃频道
active_channels = self.redis_client.pubsub_channels()
# 获取模式订阅数量
pattern_count = self.redis_client.pubsub_numpat()
# 计算总订阅者数量
total_subscribers = 0
if active_channels:
numsub_results = self.redis_client.pubsub_numsub(*active_channels)
for i in range(1, len(numsub_results), 2):
total_subscribers += numsub_results[i]
# 计算消息速率
time_diff = current_time - self.last_check_time
messages_per_second = 0
if time_diff > 0:
# 这里需要实际的消息计数逻辑
messages_per_second = self._calculate_message_rate(time_diff)
# 获取内存使用情况
info = self.redis_client.info('memory')
memory_usage = info.get('used_memory', 0)
# 获取连接数
clients_info = self.redis_client.info('clients')
connection_count = clients_info.get('connected_clients', 0)
metrics = PubSubMetrics(
timestamp=datetime.now(),
active_channels=len(active_channels),
pattern_subscriptions=pattern_count,
total_subscribers=total_subscribers,
messages_per_second=messages_per_second,
memory_usage=memory_usage,
connection_count=connection_count
)
self.metrics_history.append(metrics)
self.last_check_time = current_time
# 保留最近100个指标
if len(self.metrics_history) > 100:
self.metrics_history.pop(0)
return metrics
def _calculate_message_rate(self, time_diff: float) -> float:
"""计算消息速率(简化实现)"""
# 实际实现中,你需要跟踪消息计数
# 这里返回一个模拟值
return 0.0
def get_channel_details(self, channel: str) -> Dict[str, Any]:
"""获取频道详细信息"""
# 获取订阅者数量
numsub_result = self.redis_client.pubsub_numsub(channel)
subscriber_count = numsub_result[1] if len(numsub_result) > 1 else 0
# 检查是否活跃
active_channels = self.redis_client.pubsub_channels()
is_active = channel.encode() in active_channels
return {
'channel': channel,
'subscriber_count': subscriber_count,
'is_active': is_active,
'last_checked': datetime.now().isoformat()
}
def get_system_overview(self) -> Dict[str, Any]:
"""获取系统概览"""
latest_metrics = self.collect_metrics()
# 计算趋势
if len(self.metrics_history) >= 2:
prev_metrics = self.metrics_history[-2]
channel_trend = latest_metrics.active_channels - prev_metrics.active_channels
subscriber_trend = latest_metrics.total_subscribers - prev_metrics.total_subscribers
else:
channel_trend = 0
subscriber_trend = 0
return {
'current_metrics': {
'active_channels': latest_metrics.active_channels,
'pattern_subscriptions': latest_metrics.pattern_subscriptions,
'total_subscribers': latest_metrics.total_subscribers,
'messages_per_second': latest_metrics.messages_per_second,
'memory_usage_mb': latest_metrics.memory_usage / (1024 * 1024),
'connection_count': latest_metrics.connection_count
},
'trends': {
'channel_change': channel_trend,
'subscriber_change': subscriber_trend
},
'timestamp': latest_metrics.timestamp.isoformat()
}
def generate_report(self, hours: int = 24) -> Dict[str, Any]:
"""生成监控报告"""
cutoff_time = datetime.now() - timedelta(hours=hours)
recent_metrics = [m for m in self.metrics_history if m.timestamp >= cutoff_time]
if not recent_metrics:
return {'error': 'No metrics available for the specified time period'}
# 计算统计信息
avg_channels = sum(m.active_channels for m in recent_metrics) / len(recent_metrics)
max_channels = max(m.active_channels for m in recent_metrics)
min_channels = min(m.active_channels for m in recent_metrics)
avg_subscribers = sum(m.total_subscribers for m in recent_metrics) / len(recent_metrics)
max_subscribers = max(m.total_subscribers for m in recent_metrics)
avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
max_memory = max(m.memory_usage for m in recent_metrics)
return {
'period': f'{hours} hours',
'data_points': len(recent_metrics),
'channels': {
'average': round(avg_channels, 2),
'maximum': max_channels,
'minimum': min_channels
},
'subscribers': {
'average': round(avg_subscribers, 2),
'maximum': max_subscribers
},
'memory': {
'average_mb': round(avg_memory / (1024 * 1024), 2),
'maximum_mb': round(max_memory / (1024 * 1024), 2)
},
'generated_at': datetime.now().isoformat()
}
### 8.5.2 调试工具
```python
import redis
import json
import threading
import time
from typing import Dict, List, Callable, Any
from datetime import datetime
class PubSubDebugger:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.message_log: List[Dict[str, Any]] = []
self.interceptors: List[Callable] = []
self.debug_subscriber = None
self.monitoring = False
def add_interceptor(self, interceptor: Callable[[Dict[str, Any]], None]):
"""添加消息拦截器"""
self.interceptors.append(interceptor)
def start_monitoring(self, patterns: List[str] = None):
"""开始监控消息"""
if self.monitoring:
return
self.monitoring = True
self.debug_subscriber = self.redis_client.pubsub()
# 如果没有指定模式,监控所有消息
if not patterns:
patterns = ['*']
for pattern in patterns:
self.debug_subscriber.psubscribe(pattern)
# 启动监控线程
monitor_thread = threading.Thread(target=self._monitor_loop, daemon=True)
monitor_thread.start()
def _monitor_loop(self):
"""监控循环"""
while self.monitoring:
try:
message = self.debug_subscriber.get_message(timeout=1.0)
if message and message['type'] == 'pmessage':
self._process_debug_message(message)
except Exception as e:
print(f"Debug monitor error: {e}")
time.sleep(1)
def _process_debug_message(self, message: Dict[str, Any]):
"""处理调试消息"""
debug_info = {
'timestamp': datetime.now().isoformat(),
'pattern': message['pattern'].decode('utf-8'),
'channel': message['channel'].decode('utf-8'),
'data': message['data'].decode('utf-8'),
'data_size': len(message['data'])
}
# 尝试解析JSON
try:
parsed_data = json.loads(debug_info['data'])
debug_info['parsed_data'] = parsed_data
debug_info['data_type'] = 'json'
except json.JSONDecodeError:
debug_info['data_type'] = 'text'
# 添加到日志
self.message_log.append(debug_info)
# 保留最近1000条消息
if len(self.message_log) > 1000:
self.message_log.pop(0)
# 调用拦截器
for interceptor in self.interceptors:
try:
interceptor(debug_info)
except Exception as e:
print(f"Interceptor error: {e}")
def stop_monitoring(self):
"""停止监控"""
self.monitoring = False
if self.debug_subscriber:
self.debug_subscriber.close()
def get_message_log(self, limit: int = 100) -> List[Dict[str, Any]]:
"""获取消息日志"""
return self.message_log[-limit:]
def search_messages(self, keyword: str = None, channel: str = None,
since: datetime = None) -> List[Dict[str, Any]]:
"""搜索消息"""
results = []
for msg in self.message_log:
# 时间过滤
if since:
msg_time = datetime.fromisoformat(msg['timestamp'])
if msg_time < since:
continue
# 频道过滤
if channel and msg['channel'] != channel:
continue
# 关键词过滤
if keyword:
if (keyword.lower() not in msg['data'].lower() and
keyword.lower() not in msg['channel'].lower()):
continue
results.append(msg)
return results
def analyze_traffic(self) -> Dict[str, Any]:
"""分析消息流量"""
if not self.message_log:
return {'error': 'No messages to analyze'}
# 按频道统计
channel_stats = {}
total_size = 0
for msg in self.message_log:
channel = msg['channel']
size = msg['data_size']
if channel not in channel_stats:
channel_stats[channel] = {'count': 0, 'total_size': 0}
channel_stats[channel]['count'] += 1
channel_stats[channel]['total_size'] += size
total_size += size
# 计算平均值
for stats in channel_stats.values():
stats['avg_size'] = stats['total_size'] / stats['count']
# 按消息数量排序
sorted_channels = sorted(
channel_stats.items(),
key=lambda x: x[1]['count'],
reverse=True
)
return {
'total_messages': len(self.message_log),
'total_size_bytes': total_size,
'avg_message_size': total_size / len(self.message_log),
'unique_channels': len(channel_stats),
'top_channels': sorted_channels[:10],
'analysis_time': datetime.now().isoformat()
}
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
# 创建监控器
monitor = PubSubMonitor(redis_client)
debugger = PubSubDebugger(redis_client)
# 添加调试拦截器
def debug_interceptor(message_info):
if message_info['data_size'] > 1024: # 大消息警告
print(f"[WARNING] Large message on {message_info['channel']}: {message_info['data_size']} bytes")
debugger.add_interceptor(debug_interceptor)
# 开始监控
debugger.start_monitoring(['chat:*', 'notifications:*'])
# 收集指标
metrics = monitor.collect_metrics()
print(f"当前指标:")
print(f" 活跃频道: {metrics.active_channels}")
print(f" 总订阅者: {metrics.total_subscribers}")
print(f" 内存使用: {metrics.memory_usage / (1024*1024):.2f} MB")
# 模拟一些消息发布
publisher = redis_client
publisher.publish("chat:room1", "Hello, world!")
publisher.publish("notifications:user123", json.dumps({
"type": "info",
"message": "Welcome!"
}))
time.sleep(2)
# 分析流量
traffic_analysis = debugger.analyze_traffic()
print(f"\n流量分析:")
print(f" 总消息数: {traffic_analysis['total_messages']}")
print(f" 平均消息大小: {traffic_analysis['avg_message_size']:.2f} bytes")
# 生成监控报告
report = monitor.generate_report(hours=1)
print(f"\n监控报告:")
print(json.dumps(report, indent=2, ensure_ascii=False))
# 清理
debugger.stop_monitoring()
8.6 最佳实践
8.6.1 设计原则
频道命名规范
- 使用层次化命名:
service:module:action
- 避免特殊字符和空格
- 保持命名一致性
- 使用层次化命名:
消息格式标准化
- 使用JSON格式传输结构化数据
- 包含时间戳和消息类型
- 添加版本信息以支持向后兼容
错误处理
- 实现连接重试机制
- 处理消息解析错误
- 记录详细的错误日志
8.6.2 性能考虑
连接管理
- 使用连接池减少连接开销
- 为发布者和订阅者使用不同的连接
- 合理设置连接超时参数
消息优化
- 对大消息进行压缩
- 避免发送过于频繁的小消息
- 使用批量操作提高效率
内存管理
- 及时清理过期的订阅
- 限制消息历史记录的大小
- 监控内存使用情况
8.6.3 安全考虑
访问控制
- 使用Redis ACL限制频道访问
- 验证发布者身份
- 过滤敏感信息
消息验证
- 验证消息格式和内容
- 防止恶意消息注入
- 限制消息大小和频率
8.7 总结
Redis发布订阅是一个强大的实时消息传递机制,具有以下特点:
核心优势: - 实时性强,延迟极低 - 支持多对多通信模式 - 实现简单,易于使用 - 支持模式匹配订阅
适用场景: - 实时通知系统 - 聊天和即时消息 - 事件驱动架构 - 缓存失效通知 - 系统监控告警
注意事项: - 消息不持久化,可能丢失 - 订阅者离线时无法接收消息 - 不支持消息确认机制 - 性能受网络和客户端处理能力影响
最佳实践: - 合理设计频道命名规范 - 实现健壮的错误处理机制 - 优化连接和消息传输 - 做好监控和调试准备 - 考虑安全和访问控制
通过合理使用Redis发布订阅功能,可以构建高效、实时的消息传递系统,满足各种实时通信需求。