6.1 有序集合概述
6.1.1 什么是有序集合
Redis有序集合(Sorted Set,简称ZSet)是一种特殊的数据结构,它结合了集合和列表的特点: - 像集合一样,元素是唯一的 - 像列表一样,元素是有序的 - 每个元素都关联一个分数(score),用于排序
6.1.2 有序集合特点
- 唯一性: 成员不能重复
- 有序性: 按分数从小到大排序
- 双重索引: 既可以按成员查找,也可以按分数范围查找
- 高效操作: 插入、删除、查找的时间复杂度都是O(log N)
- 范围查询: 支持按分数或排名范围查询
6.1.3 应用场景
- 排行榜系统: 游戏积分榜、销售排行榜
- 优先级队列: 任务调度、消息队列
- 时间序列数据: 按时间戳排序的事件
- 搜索结果排序: 按相关性分数排序
- 限流和计数: 滑动窗口计数器
6.1.4 内部实现
Redis有序集合使用两种数据结构: - 跳跃表(Skip List): 用于范围查询和排序 - 哈希表(Hash Table): 用于O(1)时间复杂度的成员查找
6.2 基本操作
6.2.1 添加和更新
# 添加单个成员
ZADD key score member
ZADD leaderboard 100 "player1"
ZADD leaderboard 85 "player2"
ZADD leaderboard 92 "player3"
# 添加多个成员
ZADD key score1 member1 score2 member2 ...
ZADD leaderboard 78 "player4" 95 "player5" 88 "player6"
# 条件添加(仅当成员不存在时)
ZADD key NX score member
ZADD leaderboard NX 90 "player1" # 不会更新,因为player1已存在
# 条件添加(仅当成员存在时)
ZADD key XX score member
ZADD leaderboard XX 105 "player1" # 更新player1的分数
# 返回变化的成员数量
ZADD key CH score member
ZADD leaderboard CH 99 "player7" # 返回1(新增)
ZADD leaderboard CH 101 "player1" # 返回0(更新)
# 增量更新
ZINCRBY key increment member
ZINCRBY leaderboard 10 "player1" # player1分数增加10
ZINCRBY leaderboard -5 "player2" # player2分数减少5
6.2.2 查询操作
# 获取成员分数
ZSCORE key member
ZSCORE leaderboard "player1"
# 获取成员排名(从0开始,分数从小到大)
ZRANK key member
ZRANK leaderboard "player1"
# 获取成员排名(从0开始,分数从大到小)
ZREVRANK key member
ZREVRANK leaderboard "player1"
# 获取有序集合大小
ZCARD key
ZCARD leaderboard
# 按排名范围获取成员(分数从小到大)
ZRANGE key start stop [WITHSCORES]
ZRANGE leaderboard 0 2 # 获取前3名(分数最低的)
ZRANGE leaderboard 0 2 WITHSCORES # 同时返回分数
ZRANGE leaderboard -3 -1 # 获取后3名
# 按排名范围获取成员(分数从大到小)
ZREVRANGE key start stop [WITHSCORES]
ZREVRANGE leaderboard 0 2 WITHSCORES # 获取前3名(分数最高的)
# 按分数范围获取成员
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
ZRANGEBYSCORE leaderboard 80 100 # 分数在80-100之间的成员
ZRANGEBYSCORE leaderboard (80 100 # 分数在80-100之间(不包含80)
ZRANGEBYSCORE leaderboard 80 +inf # 分数大于等于80的成员
ZRANGEBYSCORE leaderboard -inf 100 # 分数小于等于100的成员
ZRANGEBYSCORE leaderboard 80 100 LIMIT 0 5 # 限制返回5个结果
# 按分数范围获取成员(逆序)
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
ZREVRANGEBYSCORE leaderboard 100 80 WITHSCORES
# 按分数范围计数
ZCOUNT key min max
ZCOUNT leaderboard 80 100
# 按字典序范围获取成员(当分数相同时)
ZRANGEBYLEX key min max [LIMIT offset count]
ZRANGEBYLEX leaderboard [a [z # 成员名在a-z之间
6.2.3 删除操作
# 删除成员
ZREM key member [member ...]
ZREM leaderboard "player1"
ZREM leaderboard "player2" "player3"
# 按排名范围删除
ZREMRANGEBYRANK key start stop
ZREMRANGEBYRANK leaderboard 0 2 # 删除前3名(分数最低的)
ZREMRANGEBYRANK leaderboard -3 -1 # 删除后3名
# 按分数范围删除
ZREMRANGEBYSCORE key min max
ZREMRANGEBYSCORE leaderboard 0 50 # 删除分数在0-50之间的成员
# 按字典序范围删除
ZREMRANGEBYLEX key min max
ZREMRANGEBYLEX leaderboard [a [m # 删除成员名在a-m之间的成员
6.2.4 集合运算
# 并集
ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
ZUNIONSTORE result 2 set1 set2 # 简单并集
ZUNIONSTORE result 2 set1 set2 WEIGHTS 1 2 # 加权并集
ZUNIONSTORE result 2 set1 set2 AGGREGATE MAX # 取最大分数
# 交集
ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
ZINTERSTORE result 2 set1 set2
ZINTERSTORE result 2 set1 set2 WEIGHTS 0.5 1.5
ZINTERSTORE result 2 set1 set2 AGGREGATE MIN
6.3 Python实现示例
6.3.1 基础有序集合类
import redis
from typing import List, Tuple, Optional, Dict, Any, Union
import time
class RedisSortedSet:
def __init__(self, redis_client: redis.Redis, key: str):
self.redis_client = redis_client
self.key = key
def add(self, score: float, member: str, **kwargs) -> int:
"""添加成员"""
return self.redis_client.zadd(self.key, {member: score}, **kwargs)
def add_multiple(self, score_members: Dict[str, float], **kwargs) -> int:
"""批量添加成员"""
return self.redis_client.zadd(self.key, score_members, **kwargs)
def increment(self, member: str, amount: float = 1) -> float:
"""增加成员分数"""
return self.redis_client.zincrby(self.key, amount, member)
def score(self, member: str) -> Optional[float]:
"""获取成员分数"""
return self.redis_client.zscore(self.key, member)
def rank(self, member: str, reverse: bool = False) -> Optional[int]:
"""获取成员排名"""
if reverse:
return self.redis_client.zrevrank(self.key, member)
return self.redis_client.zrank(self.key, member)
def size(self) -> int:
"""获取集合大小"""
return self.redis_client.zcard(self.key)
def range_by_rank(self, start: int = 0, end: int = -1,
reverse: bool = False, withscores: bool = False) -> List:
"""按排名范围获取成员"""
if reverse:
return self.redis_client.zrevrange(self.key, start, end, withscores=withscores)
return self.redis_client.zrange(self.key, start, end, withscores=withscores)
def range_by_score(self, min_score: Union[float, str] = '-inf',
max_score: Union[float, str] = '+inf',
reverse: bool = False, withscores: bool = False,
offset: int = None, count: int = None) -> List:
"""按分数范围获取成员"""
kwargs = {'withscores': withscores}
if offset is not None and count is not None:
kwargs['start'] = offset
kwargs['num'] = count
if reverse:
return self.redis_client.zrevrangebyscore(self.key, max_score, min_score, **kwargs)
return self.redis_client.zrangebyscore(self.key, min_score, max_score, **kwargs)
def count_by_score(self, min_score: Union[float, str] = '-inf',
max_score: Union[float, str] = '+inf') -> int:
"""按分数范围计数"""
return self.redis_client.zcount(self.key, min_score, max_score)
def remove(self, *members: str) -> int:
"""删除成员"""
return self.redis_client.zrem(self.key, *members)
def remove_by_rank(self, start: int, end: int) -> int:
"""按排名范围删除"""
return self.redis_client.zremrangebyrank(self.key, start, end)
def remove_by_score(self, min_score: Union[float, str],
max_score: Union[float, str]) -> int:
"""按分数范围删除"""
return self.redis_client.zremrangebyscore(self.key, min_score, max_score)
def exists(self, member: str) -> bool:
"""检查成员是否存在"""
return self.score(member) is not None
def get_all(self, withscores: bool = True) -> List:
"""获取所有成员"""
return self.range_by_rank(withscores=withscores)
def get_top(self, n: int, withscores: bool = True) -> List:
"""获取前N名(分数最高)"""
return self.range_by_rank(0, n-1, reverse=True, withscores=withscores)
def get_bottom(self, n: int, withscores: bool = True) -> List:
"""获取后N名(分数最低)"""
return self.range_by_rank(0, n-1, withscores=withscores)
def clear(self) -> bool:
"""清空集合"""
return self.redis_client.delete(self.key) > 0
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
leaderboard = RedisSortedSet(redis_client, "game_leaderboard")
# 添加玩家分数
leaderboard.add(100, "player1")
leaderboard.add(85, "player2")
leaderboard.add(92, "player3")
# 批量添加
leaderboard.add_multiple({
"player4": 78,
"player5": 95,
"player6": 88
})
# 增加分数
new_score = leaderboard.increment("player1", 10)
print(f"player1新分数: {new_score}")
# 查询排名和分数
rank = leaderboard.rank("player1", reverse=True) # 从高到低的排名
score = leaderboard.score("player1")
print(f"player1排名: {rank + 1}, 分数: {score}")
# 获取排行榜前3名
top3 = leaderboard.get_top(3)
print(f"前3名: {top3}")
# 获取分数在80-100之间的玩家
players_80_100 = leaderboard.range_by_score(80, 100, withscores=True)
print(f"分数80-100的玩家: {players_80_100}")
# 统计分数大于90的玩家数量
high_score_count = leaderboard.count_by_score(90, '+inf')
print(f"高分玩家数量: {high_score_count}")
6.3.2 游戏排行榜系统
import redis
import time
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
class LeaderboardType(Enum):
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
ALL_TIME = "all_time"
@dataclass
class PlayerScore:
player_id: str
score: float
rank: int
timestamp: float
@dataclass
class LeaderboardEntry:
player_id: str
player_name: str
score: float
rank: int
change: int = 0 # 排名变化
class GameLeaderboard:
def __init__(self, redis_client: redis.Redis, game_id: str):
self.redis_client = redis_client
self.game_id = game_id
self.leaderboards = {
LeaderboardType.DAILY: f"leaderboard:{game_id}:daily",
LeaderboardType.WEEKLY: f"leaderboard:{game_id}:weekly",
LeaderboardType.MONTHLY: f"leaderboard:{game_id}:monthly",
LeaderboardType.ALL_TIME: f"leaderboard:{game_id}:all_time"
}
self.player_info_key = f"players:{game_id}"
self.score_history_prefix = f"score_history:{game_id}"
def submit_score(self, player_id: str, player_name: str, score: float) -> Dict[str, Any]:
"""提交玩家分数"""
timestamp = time.time()
# 存储玩家信息
self.redis_client.hset(self.player_info_key, player_id, player_name)
# 获取当前排名(用于计算变化)
old_ranks = {}
for lb_type in LeaderboardType:
old_rank = self.redis_client.zrevrank(self.leaderboards[lb_type], player_id)
old_ranks[lb_type] = old_rank + 1 if old_rank is not None else None
# 更新各个排行榜
pipe = self.redis_client.pipeline()
# 只有当新分数更高时才更新
for lb_type in LeaderboardType:
key = self.leaderboards[lb_type]
current_score = self.redis_client.zscore(key, player_id)
if current_score is None or score > current_score:
pipe.zadd(key, {player_id: score})
# 记录分数历史
history_key = f"{self.score_history_prefix}:{player_id}"
pipe.zadd(history_key, {str(timestamp): score})
# 设置过期时间
pipe.expire(self.leaderboards[LeaderboardType.DAILY], 86400) # 1天
pipe.expire(self.leaderboards[LeaderboardType.WEEKLY], 604800) # 7天
pipe.expire(self.leaderboards[LeaderboardType.MONTHLY], 2592000) # 30天
pipe.expire(history_key, 2592000) # 30天
pipe.execute()
# 获取新排名
new_ranks = {}
rank_changes = {}
for lb_type in LeaderboardType:
new_rank = self.redis_client.zrevrank(self.leaderboards[lb_type], player_id)
new_ranks[lb_type] = new_rank + 1 if new_rank is not None else None
if old_ranks[lb_type] and new_ranks[lb_type]:
rank_changes[lb_type] = old_ranks[lb_type] - new_ranks[lb_type]
else:
rank_changes[lb_type] = 0
return {
'player_id': player_id,
'score': score,
'timestamp': timestamp,
'old_ranks': old_ranks,
'new_ranks': new_ranks,
'rank_changes': rank_changes
}
def get_leaderboard(self, lb_type: LeaderboardType, start: int = 0,
end: int = 9, with_rank_change: bool = False) -> List[LeaderboardEntry]:
"""获取排行榜"""
key = self.leaderboards[lb_type]
# 获取排行榜数据
results = self.redis_client.zrevrange(key, start, end, withscores=True)
entries = []
for i, (player_id, score) in enumerate(results):
# 获取玩家名称
player_name = self.redis_client.hget(self.player_info_key, player_id)
if not player_name:
player_name = f"Player_{player_id}"
rank = start + i + 1
change = 0
# 计算排名变化(如果需要)
if with_rank_change:
change = self._calculate_rank_change(player_id, lb_type, rank)
entries.append(LeaderboardEntry(
player_id=player_id,
player_name=player_name,
score=score,
rank=rank,
change=change
))
return entries
def get_player_rank(self, player_id: str, lb_type: LeaderboardType) -> Optional[PlayerScore]:
"""获取玩家排名信息"""
key = self.leaderboards[lb_type]
rank = self.redis_client.zrevrank(key, player_id)
score = self.redis_client.zscore(key, player_id)
if rank is None or score is None:
return None
return PlayerScore(
player_id=player_id,
score=score,
rank=rank + 1,
timestamp=time.time()
)
def get_players_around(self, player_id: str, lb_type: LeaderboardType,
range_size: int = 5) -> List[LeaderboardEntry]:
"""获取玩家周围的排名"""
key = self.leaderboards[lb_type]
player_rank = self.redis_client.zrevrank(key, player_id)
if player_rank is None:
return []
start = max(0, player_rank - range_size)
end = player_rank + range_size
return self.get_leaderboard(lb_type, start, end)
def get_score_history(self, player_id: str, days: int = 7) -> List[Tuple[datetime, float]]:
"""获取玩家分数历史"""
history_key = f"{self.score_history_prefix}:{player_id}"
# 计算时间范围
end_time = time.time()
start_time = end_time - (days * 86400)
# 获取历史数据
results = self.redis_client.zrangebyscore(
history_key, start_time, end_time, withscores=True
)
history = []
for timestamp_str, score in results:
timestamp = datetime.fromtimestamp(float(timestamp_str))
history.append((timestamp, score))
return history
def get_leaderboard_stats(self, lb_type: LeaderboardType) -> Dict[str, Any]:
"""获取排行榜统计信息"""
key = self.leaderboards[lb_type]
total_players = self.redis_client.zcard(key)
if total_players == 0:
return {'total_players': 0}
# 获取分数统计
all_scores = [score for _, score in self.redis_client.zrange(key, 0, -1, withscores=True)]
return {
'total_players': total_players,
'highest_score': max(all_scores),
'lowest_score': min(all_scores),
'average_score': sum(all_scores) / len(all_scores),
'median_score': sorted(all_scores)[len(all_scores) // 2]
}
def reset_leaderboard(self, lb_type: LeaderboardType) -> bool:
"""重置排行榜"""
key = self.leaderboards[lb_type]
return self.redis_client.delete(key) > 0
def _calculate_rank_change(self, player_id: str, lb_type: LeaderboardType, current_rank: int) -> int:
"""计算排名变化(简化实现)"""
# 这里可以实现更复杂的排名变化计算逻辑
# 比如与昨天的排名比较
return 0
def get_top_players_by_period(self, period_hours: int = 24, limit: int = 10) -> List[LeaderboardEntry]:
"""获取指定时间段内的顶级玩家"""
# 创建临时排行榜
temp_key = f"temp_leaderboard:{self.game_id}:{int(time.time())}"
try:
# 获取时间范围内的所有分数记录
end_time = time.time()
start_time = end_time - (period_hours * 3600)
# 扫描所有玩家的分数历史
for key in self.redis_client.scan_iter(match=f"{self.score_history_prefix}:*"):
player_id = key.split(':')[-1]
# 获取该时间段内的最高分数
scores = self.redis_client.zrangebyscore(
key, start_time, end_time, withscores=True
)
if scores:
max_score = max(score for _, score in scores)
self.redis_client.zadd(temp_key, {player_id: max_score})
# 获取排行榜
results = self.redis_client.zrevrange(temp_key, 0, limit-1, withscores=True)
entries = []
for i, (player_id, score) in enumerate(results):
player_name = self.redis_client.hget(self.player_info_key, player_id) or f"Player_{player_id}"
entries.append(LeaderboardEntry(
player_id=player_id,
player_name=player_name,
score=score,
rank=i + 1
))
return entries
finally:
# 清理临时键
self.redis_client.delete(temp_key)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
leaderboard = GameLeaderboard(redis_client, "puzzle_game")
# 提交分数
result = leaderboard.submit_score("user123", "Alice", 1500)
print(f"分数提交结果: {result}")
result = leaderboard.submit_score("user456", "Bob", 1200)
print(f"分数提交结果: {result}")
result = leaderboard.submit_score("user789", "Charlie", 1800)
print(f"分数提交结果: {result}")
# 获取日榜前10名
daily_top = leaderboard.get_leaderboard(LeaderboardType.DAILY, 0, 9)
print("\n日榜前10名:")
for entry in daily_top:
print(f"{entry.rank}. {entry.player_name}: {entry.score}")
# 获取玩家排名
player_rank = leaderboard.get_player_rank("user123", LeaderboardType.DAILY)
if player_rank:
print(f"\nAlice的排名: {player_rank.rank}, 分数: {player_rank.score}")
# 获取玩家周围的排名
around_players = leaderboard.get_players_around("user123", LeaderboardType.DAILY, 2)
print("\nAlice周围的玩家:")
for entry in around_players:
print(f"{entry.rank}. {entry.player_name}: {entry.score}")
# 获取排行榜统计
stats = leaderboard.get_leaderboard_stats(LeaderboardType.DAILY)
print(f"\n排行榜统计: {stats}")
6.4 实际应用场景
6.4.1 任务优先级队列
import redis
import json
import time
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, asdict
from enum import Enum
class TaskPriority(Enum):
LOW = 1
NORMAL = 5
HIGH = 10
URGENT = 20
CRITICAL = 50
class TaskStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
task_id: str
task_type: str
payload: Dict[str, Any]
priority: TaskPriority
created_at: float
scheduled_at: float = None
max_retries: int = 3
retry_count: int = 0
timeout: int = 300 # 5分钟超时
class PriorityTaskQueue:
def __init__(self, redis_client: redis.Redis, queue_name: str):
self.redis_client = redis_client
self.queue_name = queue_name
self.pending_queue = f"queue:{queue_name}:pending"
self.processing_queue = f"queue:{queue_name}:processing"
self.completed_queue = f"queue:{queue_name}:completed"
self.failed_queue = f"queue:{queue_name}:failed"
self.task_data_prefix = f"task_data:{queue_name}"
self.worker_heartbeat_prefix = f"worker_heartbeat:{queue_name}"
def enqueue(self, task: Task) -> bool:
"""入队任务"""
try:
# 存储任务数据
task_key = f"{self.task_data_prefix}:{task.task_id}"
task_data = asdict(task)
task_data['priority'] = task.priority.value # 转换枚举为值
self.redis_client.setex(task_key, 3600, json.dumps(task_data)) # 1小时过期
# 计算优先级分数(优先级 + 时间戳的倒数,确保高优先级和早创建的任务优先)
# 使用负的时间戳,这样早创建的任务分数更高
score = task.priority.value * 1000000 - task.created_at
# 如果是延迟任务,使用调度时间
if task.scheduled_at and task.scheduled_at > time.time():
score = task.scheduled_at
# 添加到待处理队列
self.redis_client.zadd(self.pending_queue, {task.task_id: score})
return True
except Exception as e:
print(f"入队失败: {e}")
return False
def dequeue(self, worker_id: str, timeout: int = 10) -> Optional[Task]:
"""出队任务(阻塞式)"""
try:
# 使用BZPOPMIN进行阻塞式出队
result = self.redis_client.bzpopmin(self.pending_queue, timeout=timeout)
if not result:
return None
_, task_id, score = result
# 检查是否是延迟任务
if score > time.time():
# 重新放回队列
self.redis_client.zadd(self.pending_queue, {task_id: score})
return None
# 获取任务数据
task = self._get_task_data(task_id)
if not task:
return None
# 移动到处理队列
processing_score = time.time() + task.timeout
self.redis_client.zadd(self.processing_queue, {task_id: processing_score})
# 记录工作者心跳
self._update_worker_heartbeat(worker_id, task_id)
return task
except Exception as e:
print(f"出队失败: {e}")
return None
def complete_task(self, task_id: str, worker_id: str, result: Dict[str, Any] = None) -> bool:
"""完成任务"""
try:
# 从处理队列移除
self.redis_client.zrem(self.processing_queue, task_id)
# 添加到完成队列
self.redis_client.zadd(self.completed_queue, {task_id: time.time()})
# 存储结果
if result:
result_key = f"{self.task_data_prefix}:{task_id}:result"
self.redis_client.setex(result_key, 3600, json.dumps(result))
# 清理工作者心跳
self._clear_worker_heartbeat(worker_id)
return True
except Exception as e:
print(f"完成任务失败: {e}")
return False
def fail_task(self, task_id: str, worker_id: str, error: str, retry: bool = True) -> bool:
"""任务失败"""
try:
task = self._get_task_data(task_id)
if not task:
return False
# 从处理队列移除
self.redis_client.zrem(self.processing_queue, task_id)
# 检查是否需要重试
if retry and task.retry_count < task.max_retries:
# 增加重试次数
task.retry_count += 1
# 更新任务数据
task_key = f"{self.task_data_prefix}:{task_id}"
task_data = asdict(task)
task_data['priority'] = task.priority.value
self.redis_client.setex(task_key, 3600, json.dumps(task_data))
# 重新入队(降低优先级)
retry_score = task.priority.value * 1000000 - time.time() - (task.retry_count * 1000)
self.redis_client.zadd(self.pending_queue, {task_id: retry_score})
else:
# 移动到失败队列
self.redis_client.zadd(self.failed_queue, {task_id: time.time()})
# 存储错误信息
error_key = f"{self.task_data_prefix}:{task_id}:error"
self.redis_client.setex(error_key, 3600, error)
# 清理工作者心跳
self._clear_worker_heartbeat(worker_id)
return True
except Exception as e:
print(f"任务失败处理失败: {e}")
return False
def get_queue_stats(self) -> Dict[str, int]:
"""获取队列统计"""
return {
'pending': self.redis_client.zcard(self.pending_queue),
'processing': self.redis_client.zcard(self.processing_queue),
'completed': self.redis_client.zcard(self.completed_queue),
'failed': self.redis_client.zcard(self.failed_queue)
}
def get_pending_tasks(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取待处理任务列表"""
tasks = []
results = self.redis_client.zrange(self.pending_queue, 0, limit-1, withscores=True)
for task_id, score in results:
task = self._get_task_data(task_id)
if task:
task_info = asdict(task)
task_info['score'] = score
task_info['scheduled_time'] = score if score > time.time() else None
tasks.append(task_info)
return tasks
def cleanup_timeout_tasks(self) -> int:
"""清理超时任务"""
current_time = time.time()
# 获取超时的任务
timeout_tasks = self.redis_client.zrangebyscore(
self.processing_queue, 0, current_time
)
cleaned_count = 0
for task_id in timeout_tasks:
# 重新入队或标记失败
self.fail_task(task_id, "system", "Task timeout", retry=True)
cleaned_count += 1
return cleaned_count
def _get_task_data(self, task_id: str) -> Optional[Task]:
"""获取任务数据"""
task_key = f"{self.task_data_prefix}:{task_id}"
task_data = self.redis_client.get(task_key)
if not task_data:
return None
try:
data = json.loads(task_data)
data['priority'] = TaskPriority(data['priority']) # 转换回枚举
return Task(**data)
except Exception as e:
print(f"解析任务数据失败: {e}")
return None
def _update_worker_heartbeat(self, worker_id: str, task_id: str):
"""更新工作者心跳"""
heartbeat_key = f"{self.worker_heartbeat_prefix}:{worker_id}"
heartbeat_data = {
'task_id': task_id,
'timestamp': time.time()
}
self.redis_client.setex(heartbeat_key, 60, json.dumps(heartbeat_data))
def _clear_worker_heartbeat(self, worker_id: str):
"""清理工作者心跳"""
heartbeat_key = f"{self.worker_heartbeat_prefix}:{worker_id}"
self.redis_client.delete(heartbeat_key)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
queue = PriorityTaskQueue(redis_client, "email_queue")
# 创建任务
task1 = Task(
task_id="task_001",
task_type="send_email",
payload={"to": "user@example.com", "subject": "Welcome"},
priority=TaskPriority.NORMAL,
created_at=time.time()
)
task2 = Task(
task_id="task_002",
task_type="send_sms",
payload={"phone": "+1234567890", "message": "Verification code: 123456"},
priority=TaskPriority.URGENT,
created_at=time.time()
)
# 延迟任务
task3 = Task(
task_id="task_003",
task_type="send_reminder",
payload={"user_id": "123", "message": "Meeting in 1 hour"},
priority=TaskPriority.HIGH,
created_at=time.time(),
scheduled_at=time.time() + 3600 # 1小时后执行
)
# 入队任务
queue.enqueue(task1)
queue.enqueue(task2)
queue.enqueue(task3)
print(f"队列统计: {queue.get_queue_stats()}")
# 模拟工作者处理任务
worker_id = "worker_001"
# 出队任务(优先级高的先出队)
task = queue.dequeue(worker_id, timeout=1)
if task:
print(f"处理任务: {task.task_id}, 类型: {task.task_type}, 优先级: {task.priority}")
# 模拟任务处理
time.sleep(1)
# 完成任务
queue.complete_task(task.task_id, worker_id, {"status": "sent"})
print(f"任务 {task.task_id} 完成")
# 获取待处理任务
pending_tasks = queue.get_pending_tasks(5)
print(f"\n待处理任务: {len(pending_tasks)}")
for task_info in pending_tasks:
print(f"- {task_info['task_id']}: {task_info['task_type']} (优先级: {task_info['priority']})")
print(f"\n最终队列统计: {queue.get_queue_stats()}")
6.4.2 时间序列数据存储
import redis
import time
import json
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
class MetricType(Enum):
COUNTER = "counter"
GAUGE = "gauge"
HISTOGRAM = "histogram"
@dataclass
class DataPoint:
timestamp: float
value: float
tags: Dict[str, str] = None
@dataclass
class MetricSummary:
count: int
sum: float
min: float
max: float
avg: float
class TimeSeriesDB:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.metrics_prefix = "metrics"
self.metadata_prefix = "metadata"
def _get_metric_key(self, metric_name: str, tags: Dict[str, str] = None) -> str:
"""生成指标键名"""
if tags:
tag_str = ":".join([f"{k}={v}" for k, v in sorted(tags.items())])
return f"{self.metrics_prefix}:{metric_name}:{tag_str}"
return f"{self.metrics_prefix}:{metric_name}"
def write_point(self, metric_name: str, value: float,
timestamp: float = None, tags: Dict[str, str] = None) -> bool:
"""写入数据点"""
if timestamp is None:
timestamp = time.time()
try:
key = self._get_metric_key(metric_name, tags)
# 使用时间戳作为分数,值作为成员
# 为了支持相同时间戳的多个值,我们在时间戳后添加微秒
score = timestamp
member = json.dumps({
'value': value,
'timestamp': timestamp,
'tags': tags or {}
})
self.redis_client.zadd(key, {member: score})
# 存储元数据
self._update_metadata(metric_name, tags)
return True
except Exception as e:
print(f"写入数据点失败: {e}")
return False
def write_points(self, metric_name: str, points: List[DataPoint],
tags: Dict[str, str] = None) -> int:
"""批量写入数据点"""
key = self._get_metric_key(metric_name, tags)
try:
pipe = self.redis_client.pipeline()
for point in points:
combined_tags = {**(tags or {}), **(point.tags or {})}
point_key = self._get_metric_key(metric_name, combined_tags)
member = json.dumps({
'value': point.value,
'timestamp': point.timestamp,
'tags': combined_tags
})
pipe.zadd(point_key, {member: point.timestamp})
results = pipe.execute()
return sum(results)
except Exception as e:
print(f"批量写入失败: {e}")
return 0
def query_range(self, metric_name: str, start_time: float, end_time: float,
tags: Dict[str, str] = None, limit: int = None) -> List[DataPoint]:
"""查询时间范围内的数据"""
key = self._get_metric_key(metric_name, tags)
try:
# 查询指定时间范围的数据
kwargs = {'withscores': True}
if limit:
kwargs['start'] = 0
kwargs['num'] = limit
results = self.redis_client.zrangebyscore(
key, start_time, end_time, **kwargs
)
points = []
for member, score in results:
try:
data = json.loads(member)
points.append(DataPoint(
timestamp=data['timestamp'],
value=data['value'],
tags=data.get('tags', {})
))
except json.JSONDecodeError:
continue
return points
except Exception as e:
print(f"查询失败: {e}")
return []
def query_latest(self, metric_name: str, count: int = 1,
tags: Dict[str, str] = None) -> List[DataPoint]:
"""查询最新的数据点"""
key = self._get_metric_key(metric_name, tags)
try:
results = self.redis_client.zrevrange(key, 0, count-1, withscores=True)
points = []
for member, score in results:
try:
data = json.loads(member)
points.append(DataPoint(
timestamp=data['timestamp'],
value=data['value'],
tags=data.get('tags', {})
))
except json.JSONDecodeError:
continue
return points
except Exception as e:
print(f"查询最新数据失败: {e}")
return []
def aggregate(self, metric_name: str, start_time: float, end_time: float,
tags: Dict[str, str] = None) -> Optional[MetricSummary]:
"""聚合统计"""
points = self.query_range(metric_name, start_time, end_time, tags)
if not points:
return None
values = [p.value for p in points]
return MetricSummary(
count=len(values),
sum=sum(values),
min=min(values),
max=max(values),
avg=sum(values) / len(values)
)
def downsample(self, metric_name: str, start_time: float, end_time: float,
interval: int, aggregation: str = 'avg',
tags: Dict[str, str] = None) -> List[Tuple[float, float]]:
"""降采样"""
points = self.query_range(metric_name, start_time, end_time, tags)
if not points:
return []
# 按时间间隔分组
buckets = {}
for point in points:
bucket_time = int(point.timestamp // interval) * interval
if bucket_time not in buckets:
buckets[bucket_time] = []
buckets[bucket_time].append(point.value)
# 聚合每个桶的数据
result = []
for bucket_time in sorted(buckets.keys()):
values = buckets[bucket_time]
if aggregation == 'avg':
agg_value = sum(values) / len(values)
elif aggregation == 'sum':
agg_value = sum(values)
elif aggregation == 'min':
agg_value = min(values)
elif aggregation == 'max':
agg_value = max(values)
elif aggregation == 'count':
agg_value = len(values)
else:
agg_value = sum(values) / len(values) # 默认平均值
result.append((bucket_time, agg_value))
return result
def delete_range(self, metric_name: str, start_time: float, end_time: float,
tags: Dict[str, str] = None) -> int:
"""删除时间范围内的数据"""
key = self._get_metric_key(metric_name, tags)
try:
return self.redis_client.zremrangebyscore(key, start_time, end_time)
except Exception as e:
print(f"删除数据失败: {e}")
return 0
def cleanup_old_data(self, metric_name: str, retention_days: int,
tags: Dict[str, str] = None) -> int:
"""清理旧数据"""
cutoff_time = time.time() - (retention_days * 86400)
return self.delete_range(metric_name, 0, cutoff_time, tags)
def get_metrics_list(self) -> List[str]:
"""获取所有指标列表"""
pattern = f"{self.metrics_prefix}:*"
keys = self.redis_client.keys(pattern)
metrics = set()
for key in keys:
# 提取指标名称
parts = key.split(':')
if len(parts) >= 2:
metrics.add(parts[1])
return list(metrics)
def get_metric_info(self, metric_name: str) -> Dict[str, Any]:
"""获取指标信息"""
pattern = f"{self.metrics_prefix}:{metric_name}*"
keys = self.redis_client.keys(pattern)
total_points = 0
series_count = len(keys)
earliest_time = None
latest_time = None
for key in keys:
count = self.redis_client.zcard(key)
total_points += count
if count > 0:
# 获取最早和最晚的时间戳
earliest = self.redis_client.zrange(key, 0, 0, withscores=True)
latest = self.redis_client.zrange(key, -1, -1, withscores=True)
if earliest:
_, early_score = earliest[0]
if earliest_time is None or early_score < earliest_time:
earliest_time = early_score
if latest:
_, late_score = latest[0]
if latest_time is None or late_score > latest_time:
latest_time = late_score
return {
'metric_name': metric_name,
'series_count': series_count,
'total_points': total_points,
'earliest_time': earliest_time,
'latest_time': latest_time,
'time_range_hours': (latest_time - earliest_time) / 3600 if earliest_time and latest_time else 0
}
def _update_metadata(self, metric_name: str, tags: Dict[str, str] = None):
"""更新指标元数据"""
metadata_key = f"{self.metadata_prefix}:{metric_name}"
metadata = {
'last_updated': time.time(),
'tags': list(tags.keys()) if tags else []
}
self.redis_client.setex(metadata_key, 86400, json.dumps(metadata))
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
tsdb = TimeSeriesDB(redis_client)
# 写入CPU使用率数据
current_time = time.time()
# 单点写入
tsdb.write_point("cpu_usage", 75.5, current_time, {"host": "server1", "cpu": "0"})
tsdb.write_point("cpu_usage", 82.3, current_time + 60, {"host": "server1", "cpu": "0"})
# 批量写入
points = [
DataPoint(current_time + 120, 68.9, {"host": "server1", "cpu": "0"}),
DataPoint(current_time + 180, 91.2, {"host": "server1", "cpu": "0"}),
DataPoint(current_time + 240, 77.8, {"host": "server1", "cpu": "0"})
]
tsdb.write_points("cpu_usage", points)
# 查询数据
data_points = tsdb.query_range(
"cpu_usage",
current_time,
current_time + 300,
{"host": "server1", "cpu": "0"}
)
print(f"查询到 {len(data_points)} 个数据点:")
for point in data_points:
dt = datetime.fromtimestamp(point.timestamp)
print(f" {dt.strftime('%H:%M:%S')}: {point.value}%")
# 聚合统计
summary = tsdb.aggregate(
"cpu_usage",
current_time,
current_time + 300,
{"host": "server1", "cpu": "0"}
)
if summary:
print(f"\n统计信息:")
print(f" 数据点数: {summary.count}")
print(f" 平均值: {summary.avg:.2f}%")
print(f" 最小值: {summary.min:.2f}%")
print(f" 最大值: {summary.max:.2f}%")
# 降采样(每2分钟一个点)
downsampled = tsdb.downsample(
"cpu_usage",
current_time,
current_time + 300,
120, # 2分钟间隔
'avg',
{"host": "server1", "cpu": "0"}
)
print(f"\n降采样结果 (2分钟间隔):")
for timestamp, value in downsampled:
dt = datetime.fromtimestamp(timestamp)
print(f" {dt.strftime('%H:%M:%S')}: {value:.2f}%")
# 获取指标信息
info = tsdb.get_metric_info("cpu_usage")
print(f"\n指标信息: {info}")
6.5 性能优化
6.5.1 有序集合编码优化
# 查看有序集合编码
redis-cli
> OBJECT ENCODING myzset
# 配置优化参数
# redis.conf
zset-max-ziplist-entries 128 # ziplist最大元素数
zset-max-ziplist-value 64 # ziplist最大值长度
6.5.2 内存优化
import redis
from typing import List, Dict, Any, Iterator, Tuple, Union
class OptimizedRedisSortedSet:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def batch_add(self, key: str, score_members: Dict[str, float], batch_size: int = 1000):
"""批量添加成员"""
items = list(score_members.items())
for i in range(0, len(items), batch_size):
batch = dict(items[i:i + batch_size])
self.redis_client.zadd(key, batch)
def memory_efficient_scan(self, key: str, pattern: str = "*", count: int = 10) -> Iterator[Tuple[str, float]]:
"""内存高效的扫描"""
cursor = 0
while True:
cursor, members = self.redis_client.zscan(key, cursor, match=pattern, count=count)
for member, score in members:
yield member, score
if cursor == 0:
break
def trim_by_rank(self, key: str, start: int, end: int, backup_key: str = None) -> int:
"""按排名修剪(保留指定排名范围)"""
if backup_key:
# 备份被删除的数据
removed_data = self.redis_client.zrange(key, 0, start-1, withscores=True)
removed_data.extend(self.redis_client.zrange(key, end+1, -1, withscores=True))
if removed_data:
backup_dict = {member: score for member, score in removed_data}
self.redis_client.zadd(backup_key, backup_dict)
# 删除指定范围外的数据
removed1 = self.redis_client.zremrangebyrank(key, 0, start-1) if start > 0 else 0
removed2 = self.redis_client.zremrangebyrank(key, end+1-removed1, -1)
return removed1 + removed2
def compress_scores(self, key: str, precision: int = 2) -> int:
"""压缩分数精度以节省内存"""
all_data = self.redis_client.zrange(key, 0, -1, withscores=True)
if not all_data:
return 0
# 清空原数据
self.redis_client.delete(key)
# 重新添加压缩后的数据
compressed_data = {}
for member, score in all_data:
compressed_score = round(score, precision)
compressed_data[member] = compressed_score
self.redis_client.zadd(key, compressed_data)
return len(compressed_data)
class PipelinedSortedSetOperations:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def batch_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
"""批量操作"""
pipe = self.redis_client.pipeline()
for op in operations:
op_type = op['type']
key = op['key']
if op_type == 'zadd':
pipe.zadd(key, op['score_members'])
elif op_type == 'zrem':
pipe.zrem(key, *op['members'])
elif op_type == 'zincrby':
pipe.zincrby(key, op['amount'], op['member'])
elif op_type == 'zrange':
pipe.zrange(key, op['start'], op['end'], withscores=op.get('withscores', False))
elif op_type == 'zrank':
pipe.zrank(key, op['member'])
return pipe.execute()
def multi_set_stats(self, keys: List[str]) -> Dict[str, Dict[str, Any]]:
"""多集合统计"""
pipe = self.redis_client.pipeline()
for key in keys:
pipe.zcard(key) # 大小
pipe.zrange(key, 0, 0, withscores=True) # 最小值
pipe.zrange(key, -1, -1, withscores=True) # 最大值
results = pipe.execute()
stats = {}
for i, key in enumerate(keys):
base_idx = i * 3
size = results[base_idx]
min_data = results[base_idx + 1]
max_data = results[base_idx + 2]
stats[key] = {
'size': size,
'min_score': min_data[0][1] if min_data else None,
'max_score': max_data[0][1] if max_data else None
}
return stats
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
optimized_zset = OptimizedRedisSortedSet(redis_client)
# 批量添加大量数据
large_dataset = {f"member_{i}": i * 1.5 for i in range(10000)}
optimized_zset.batch_add("large_leaderboard", large_dataset, batch_size=500)
# 内存高效扫描
print("扫描前100个成员:")
count = 0
for member, score in optimized_zset.memory_efficient_scan("large_leaderboard", count=100):
if count < 5: # 只显示前5个
print(f" {member}: {score}")
count += 1
if count >= 100:
break
# 保留前1000名,其余数据备份后删除
removed_count = optimized_zset.trim_by_rank(
"large_leaderboard", 0, 999, "leaderboard_backup"
)
print(f"删除了 {removed_count} 个成员")
# 压缩分数精度
compressed_count = optimized_zset.compress_scores("large_leaderboard", precision=1)
print(f"压缩了 {compressed_count} 个成员的分数")
6.5.3 管道化操作
# 管道化操作示例
pipelined_ops = PipelinedSortedSetOperations(redis_client)
# 批量操作
operations = [
{'type': 'zadd', 'key': 'scores1', 'score_members': {'player1': 100, 'player2': 200}},
{'type': 'zadd', 'key': 'scores2', 'score_members': {'player3': 150, 'player4': 250}},
{'type': 'zrange', 'key': 'scores1', 'start': 0, 'end': -1, 'withscores': True},
{'type': 'zrank', 'key': 'scores1', 'member': 'player1'}
]
results = pipelined_ops.batch_operations(operations)
print(f"批量操作结果: {results}")
# 多集合统计
stats = pipelined_ops.multi_set_stats(['scores1', 'scores2', 'large_leaderboard'])
print(f"多集合统计: {stats}")
6.6 监控和调试
6.6.1 有序集合监控
import redis
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class SortedSetInfo:
key: str
size: int
memory_usage: int
encoding: str
min_score: Optional[float]
max_score: Optional[float]
score_range: Optional[float]
class SortedSetMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def get_sortedset_info(self, key: str) -> Optional[SortedSetInfo]:
"""获取有序集合详细信息"""
if not self.redis_client.exists(key):
return None
try:
size = self.redis_client.zcard(key)
memory_usage = self.redis_client.memory_usage(key) or 0
encoding = self.redis_client.object('encoding', key)
min_score = None
max_score = None
score_range = None
if size > 0:
min_data = self.redis_client.zrange(key, 0, 0, withscores=True)
max_data = self.redis_client.zrange(key, -1, -1, withscores=True)
if min_data and max_data:
min_score = min_data[0][1]
max_score = max_data[0][1]
score_range = max_score - min_score
return SortedSetInfo(
key=key,
size=size,
memory_usage=memory_usage,
encoding=encoding,
min_score=min_score,
max_score=max_score,
score_range=score_range
)
except Exception as e:
print(f"获取有序集合信息失败: {e}")
return None
def analyze_score_distribution(self, key: str, buckets: int = 10) -> Dict[str, Any]:
"""分析分数分布"""
info = self.get_sortedset_info(key)
if not info or info.size == 0:
return {'error': '集合为空或不存在'}
if info.score_range == 0:
return {
'buckets': [{'range': f'{info.min_score}', 'count': info.size}],
'total': info.size
}
# 计算分桶
bucket_size = info.score_range / buckets
distribution = []
for i in range(buckets):
min_range = info.min_score + (i * bucket_size)
max_range = info.min_score + ((i + 1) * bucket_size)
if i == buckets - 1: # 最后一个桶包含最大值
count = self.redis_client.zcount(key, min_range, max_range)
else:
count = self.redis_client.zcount(key, min_range, f'({max_range}')
distribution.append({
'range': f'{min_range:.2f} - {max_range:.2f}',
'count': count,
'percentage': (count / info.size) * 100
})
return {
'buckets': distribution,
'total': info.size,
'min_score': info.min_score,
'max_score': info.max_score
}
def monitor_operations(self, key: str, duration: int = 60) -> Dict[str, Any]:
"""监控操作性能"""
start_time = time.time()
operations = {
'zadd': 0,
'zrem': 0,
'zrange': 0,
'zrank': 0,
'zscore': 0
}
# 这里应该集成Redis的监控命令或使用Redis模块
# 简化实现,只测试基本操作性能
test_data = {f'test_member_{i}': i for i in range(1000)}
# 测试添加性能
add_start = time.time()
self.redis_client.zadd(f'{key}_test', test_data)
add_time = time.time() - add_start
# 测试查询性能
query_start = time.time()
self.redis_client.zrange(f'{key}_test', 0, 99)
query_time = time.time() - query_start
# 测试排名查询性能
rank_start = time.time()
for i in range(100):
self.redis_client.zrank(f'{key}_test', f'test_member_{i}')
rank_time = time.time() - rank_start
# 清理测试数据
self.redis_client.delete(f'{key}_test')
return {
'duration': duration,
'performance': {
'add_1000_members': f'{add_time:.4f}s',
'query_100_members': f'{query_time:.4f}s',
'rank_100_queries': f'{rank_time:.4f}s'
},
'operations_per_second': {
'add': 1000 / add_time,
'query': 100 / query_time,
'rank': 100 / rank_time
}
}
def diagnose_issues(self, key: str) -> List[Dict[str, str]]:
"""诊断潜在问题"""
issues = []
info = self.get_sortedset_info(key)
if not info:
return [{'type': 'error', 'message': '集合不存在'}]
# 检查大小
if info.size > 100000:
issues.append({
'type': 'warning',
'message': f'集合过大 ({info.size} 个成员),可能影响性能'
})
# 检查内存使用
if info.memory_usage > 100 * 1024 * 1024: # 100MB
issues.append({
'type': 'warning',
'message': f'内存使用过高 ({info.memory_usage / 1024 / 1024:.2f}MB)'
})
# 检查编码
if info.encoding == 'skiplist' and info.size < 128:
issues.append({
'type': 'info',
'message': '小集合使用skiplist编码,考虑调整zset-max-ziplist-entries参数'
})
# 检查分数范围
if info.score_range and info.score_range > 1e10:
issues.append({
'type': 'warning',
'message': f'分数范围过大 ({info.score_range:.2e}),可能影响精度'
})
if not issues:
issues.append({
'type': 'info',
'message': '未发现明显问题'
})
return issues
def compare_sortedsets(self, key1: str, key2: str) -> Dict[str, Any]:
"""比较两个有序集合"""
info1 = self.get_sortedset_info(key1)
info2 = self.get_sortedset_info(key2)
if not info1 or not info2:
return {'error': '一个或多个集合不存在'}
# 计算交集和差集大小
temp_inter = f'temp_inter_{int(time.time())}'
temp_diff1 = f'temp_diff1_{int(time.time())}'
temp_diff2 = f'temp_diff2_{int(time.time())}'
try:
# 交集
inter_size = self.redis_client.zinterstore(temp_inter, [key1, key2])
# 差集 (key1 - key2)
self.redis_client.zunionstore(temp_diff1, [key1])
if inter_size > 0:
# 从key1中减去交集
inter_members = self.redis_client.zrange(temp_inter, 0, -1)
if inter_members:
self.redis_client.zrem(temp_diff1, *inter_members)
diff1_size = self.redis_client.zcard(temp_diff1)
# 差集 (key2 - key1)
self.redis_client.zunionstore(temp_diff2, [key2])
if inter_size > 0:
inter_members = self.redis_client.zrange(temp_inter, 0, -1)
if inter_members:
self.redis_client.zrem(temp_diff2, *inter_members)
diff2_size = self.redis_client.zcard(temp_diff2)
return {
'set1': {
'key': key1,
'size': info1.size,
'memory_usage': info1.memory_usage
},
'set2': {
'key': key2,
'size': info2.size,
'memory_usage': info2.memory_usage
},
'intersection_size': inter_size,
'difference_sizes': {
f'{key1}_only': diff1_size,
f'{key2}_only': diff2_size
},
'similarity': inter_size / max(info1.size, info2.size) if max(info1.size, info2.size) > 0 else 0
}
finally:
# 清理临时键
self.redis_client.delete(temp_inter, temp_diff1, temp_diff2)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
monitor = SortedSetMonitor(redis_client)
# 创建测试数据
test_data = {f'player_{i}': i * 10 + (i % 7) * 0.5 for i in range(1000)}
redis_client.zadd('test_leaderboard', test_data)
# 获取集合信息
info = monitor.get_sortedset_info('test_leaderboard')
if info:
print(f"集合信息:")
print(f" 大小: {info.size}")
print(f" 内存使用: {info.memory_usage} bytes")
print(f" 编码: {info.encoding}")
print(f" 分数范围: {info.min_score} - {info.max_score}")
# 分析分数分布
distribution = monitor.analyze_score_distribution('test_leaderboard', buckets=5)
print(f"\n分数分布:")
for bucket in distribution['buckets']:
print(f" {bucket['range']}: {bucket['count']} ({bucket['percentage']:.1f}%)")
# 性能监控
performance = monitor.monitor_operations('test_leaderboard')
print(f"\n性能测试:")
for op, time_taken in performance['performance'].items():
print(f" {op}: {time_taken}")
# 诊断问题
issues = monitor.diagnose_issues('test_leaderboard')
print(f"\n诊断结果:")
for issue in issues:
print(f" [{issue['type'].upper()}] {issue['message']}")
# 清理
redis_client.delete('test_leaderboard')
6.7 总结
Redis有序集合是一种功能强大的数据结构,结合了集合的唯一性和列表的有序性。通过本章的学习,我们了解了:
6.7.1 核心特性
- 双重索引: 支持按成员和分数两种方式快速查找
- 自动排序: 根据分数自动维护元素顺序
- 高效操作: O(log N)的时间复杂度
- 范围查询: 支持按分数和排名范围查询
- 集合运算: 支持交集、并集等操作
6.7.2 适用场景
- 排行榜系统: 游戏积分、销售排行等
- 优先级队列: 任务调度、消息队列
- 时间序列: 按时间戳排序的数据
- 搜索排序: 按相关性分数排序
- 实时统计: 滑动窗口计数等
6.7.3 性能优化要点
- 编码优化: 合理配置ziplist参数
- 批量操作: 使用管道减少网络开销
- 内存管理: 定期清理和压缩数据
- 分片策略: 大数据集考虑分片存储
6.7.4 最佳实践
- 合理设计分数: 确保分数能正确反映排序需求
- 定期维护: 清理过期数据,控制集合大小
- 监控性能: 关注内存使用和操作延迟
- 错误处理: 处理并发更新和数据一致性问题