6.1 有序集合概述

6.1.1 什么是有序集合

Redis有序集合(Sorted Set,简称ZSet)是一种特殊的数据结构,它结合了集合和列表的特点: - 像集合一样,元素是唯一的 - 像列表一样,元素是有序的 - 每个元素都关联一个分数(score),用于排序

6.1.2 有序集合特点

  • 唯一性: 成员不能重复
  • 有序性: 按分数从小到大排序
  • 双重索引: 既可以按成员查找,也可以按分数范围查找
  • 高效操作: 插入、删除、查找的时间复杂度都是O(log N)
  • 范围查询: 支持按分数或排名范围查询

6.1.3 应用场景

  • 排行榜系统: 游戏积分榜、销售排行榜
  • 优先级队列: 任务调度、消息队列
  • 时间序列数据: 按时间戳排序的事件
  • 搜索结果排序: 按相关性分数排序
  • 限流和计数: 滑动窗口计数器

6.1.4 内部实现

Redis有序集合使用两种数据结构: - 跳跃表(Skip List): 用于范围查询和排序 - 哈希表(Hash Table): 用于O(1)时间复杂度的成员查找

6.2 基本操作

6.2.1 添加和更新

# 添加单个成员
ZADD key score member
ZADD leaderboard 100 "player1"
ZADD leaderboard 85 "player2"
ZADD leaderboard 92 "player3"

# 添加多个成员
ZADD key score1 member1 score2 member2 ...
ZADD leaderboard 78 "player4" 95 "player5" 88 "player6"

# 条件添加(仅当成员不存在时)
ZADD key NX score member
ZADD leaderboard NX 90 "player1"  # 不会更新,因为player1已存在

# 条件添加(仅当成员存在时)
ZADD key XX score member
ZADD leaderboard XX 105 "player1"  # 更新player1的分数

# 返回变化的成员数量
ZADD key CH score member
ZADD leaderboard CH 99 "player7"  # 返回1(新增)
ZADD leaderboard CH 101 "player1"  # 返回0(更新)

# 增量更新
ZINCRBY key increment member
ZINCRBY leaderboard 10 "player1"  # player1分数增加10
ZINCRBY leaderboard -5 "player2"  # player2分数减少5

6.2.2 查询操作

# 获取成员分数
ZSCORE key member
ZSCORE leaderboard "player1"

# 获取成员排名(从0开始,分数从小到大)
ZRANK key member
ZRANK leaderboard "player1"

# 获取成员排名(从0开始,分数从大到小)
ZREVRANK key member
ZREVRANK leaderboard "player1"

# 获取有序集合大小
ZCARD key
ZCARD leaderboard

# 按排名范围获取成员(分数从小到大)
ZRANGE key start stop [WITHSCORES]
ZRANGE leaderboard 0 2          # 获取前3名(分数最低的)
ZRANGE leaderboard 0 2 WITHSCORES  # 同时返回分数
ZRANGE leaderboard -3 -1        # 获取后3名

# 按排名范围获取成员(分数从大到小)
ZREVRANGE key start stop [WITHSCORES]
ZREVRANGE leaderboard 0 2 WITHSCORES  # 获取前3名(分数最高的)

# 按分数范围获取成员
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
ZRANGEBYSCORE leaderboard 80 100        # 分数在80-100之间的成员
ZRANGEBYSCORE leaderboard (80 100       # 分数在80-100之间(不包含80)
ZRANGEBYSCORE leaderboard 80 +inf       # 分数大于等于80的成员
ZRANGEBYSCORE leaderboard -inf 100      # 分数小于等于100的成员
ZRANGEBYSCORE leaderboard 80 100 LIMIT 0 5  # 限制返回5个结果

# 按分数范围获取成员(逆序)
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
ZREVRANGEBYSCORE leaderboard 100 80 WITHSCORES

# 按分数范围计数
ZCOUNT key min max
ZCOUNT leaderboard 80 100

# 按字典序范围获取成员(当分数相同时)
ZRANGEBYLEX key min max [LIMIT offset count]
ZRANGEBYLEX leaderboard [a [z  # 成员名在a-z之间

6.2.3 删除操作

# 删除成员
ZREM key member [member ...]
ZREM leaderboard "player1"
ZREM leaderboard "player2" "player3"

# 按排名范围删除
ZREMRANGEBYRANK key start stop
ZREMRANGEBYRANK leaderboard 0 2    # 删除前3名(分数最低的)
ZREMRANGEBYRANK leaderboard -3 -1  # 删除后3名

# 按分数范围删除
ZREMRANGEBYSCORE key min max
ZREMRANGEBYSCORE leaderboard 0 50  # 删除分数在0-50之间的成员

# 按字典序范围删除
ZREMRANGEBYLEX key min max
ZREMRANGEBYLEX leaderboard [a [m   # 删除成员名在a-m之间的成员

6.2.4 集合运算

# 并集
ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
ZUNIONSTORE result 2 set1 set2              # 简单并集
ZUNIONSTORE result 2 set1 set2 WEIGHTS 1 2  # 加权并集
ZUNIONSTORE result 2 set1 set2 AGGREGATE MAX # 取最大分数

# 交集
ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [AGGREGATE SUM|MIN|MAX]
ZINTERSTORE result 2 set1 set2
ZINTERSTORE result 2 set1 set2 WEIGHTS 0.5 1.5
ZINTERSTORE result 2 set1 set2 AGGREGATE MIN

6.3 Python实现示例

6.3.1 基础有序集合类

import redis
from typing import List, Tuple, Optional, Dict, Any, Union
import time

class RedisSortedSet:
    def __init__(self, redis_client: redis.Redis, key: str):
        self.redis_client = redis_client
        self.key = key
    
    def add(self, score: float, member: str, **kwargs) -> int:
        """添加成员"""
        return self.redis_client.zadd(self.key, {member: score}, **kwargs)
    
    def add_multiple(self, score_members: Dict[str, float], **kwargs) -> int:
        """批量添加成员"""
        return self.redis_client.zadd(self.key, score_members, **kwargs)
    
    def increment(self, member: str, amount: float = 1) -> float:
        """增加成员分数"""
        return self.redis_client.zincrby(self.key, amount, member)
    
    def score(self, member: str) -> Optional[float]:
        """获取成员分数"""
        return self.redis_client.zscore(self.key, member)
    
    def rank(self, member: str, reverse: bool = False) -> Optional[int]:
        """获取成员排名"""
        if reverse:
            return self.redis_client.zrevrank(self.key, member)
        return self.redis_client.zrank(self.key, member)
    
    def size(self) -> int:
        """获取集合大小"""
        return self.redis_client.zcard(self.key)
    
    def range_by_rank(self, start: int = 0, end: int = -1, 
                     reverse: bool = False, withscores: bool = False) -> List:
        """按排名范围获取成员"""
        if reverse:
            return self.redis_client.zrevrange(self.key, start, end, withscores=withscores)
        return self.redis_client.zrange(self.key, start, end, withscores=withscores)
    
    def range_by_score(self, min_score: Union[float, str] = '-inf', 
                      max_score: Union[float, str] = '+inf',
                      reverse: bool = False, withscores: bool = False,
                      offset: int = None, count: int = None) -> List:
        """按分数范围获取成员"""
        kwargs = {'withscores': withscores}
        if offset is not None and count is not None:
            kwargs['start'] = offset
            kwargs['num'] = count
        
        if reverse:
            return self.redis_client.zrevrangebyscore(self.key, max_score, min_score, **kwargs)
        return self.redis_client.zrangebyscore(self.key, min_score, max_score, **kwargs)
    
    def count_by_score(self, min_score: Union[float, str] = '-inf', 
                      max_score: Union[float, str] = '+inf') -> int:
        """按分数范围计数"""
        return self.redis_client.zcount(self.key, min_score, max_score)
    
    def remove(self, *members: str) -> int:
        """删除成员"""
        return self.redis_client.zrem(self.key, *members)
    
    def remove_by_rank(self, start: int, end: int) -> int:
        """按排名范围删除"""
        return self.redis_client.zremrangebyrank(self.key, start, end)
    
    def remove_by_score(self, min_score: Union[float, str], 
                       max_score: Union[float, str]) -> int:
        """按分数范围删除"""
        return self.redis_client.zremrangebyscore(self.key, min_score, max_score)
    
    def exists(self, member: str) -> bool:
        """检查成员是否存在"""
        return self.score(member) is not None
    
    def get_all(self, withscores: bool = True) -> List:
        """获取所有成员"""
        return self.range_by_rank(withscores=withscores)
    
    def get_top(self, n: int, withscores: bool = True) -> List:
        """获取前N名(分数最高)"""
        return self.range_by_rank(0, n-1, reverse=True, withscores=withscores)
    
    def get_bottom(self, n: int, withscores: bool = True) -> List:
        """获取后N名(分数最低)"""
        return self.range_by_rank(0, n-1, withscores=withscores)
    
    def clear(self) -> bool:
        """清空集合"""
        return self.redis_client.delete(self.key) > 0

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    leaderboard = RedisSortedSet(redis_client, "game_leaderboard")
    
    # 添加玩家分数
    leaderboard.add(100, "player1")
    leaderboard.add(85, "player2")
    leaderboard.add(92, "player3")
    
    # 批量添加
    leaderboard.add_multiple({
        "player4": 78,
        "player5": 95,
        "player6": 88
    })
    
    # 增加分数
    new_score = leaderboard.increment("player1", 10)
    print(f"player1新分数: {new_score}")
    
    # 查询排名和分数
    rank = leaderboard.rank("player1", reverse=True)  # 从高到低的排名
    score = leaderboard.score("player1")
    print(f"player1排名: {rank + 1}, 分数: {score}")
    
    # 获取排行榜前3名
    top3 = leaderboard.get_top(3)
    print(f"前3名: {top3}")
    
    # 获取分数在80-100之间的玩家
    players_80_100 = leaderboard.range_by_score(80, 100, withscores=True)
    print(f"分数80-100的玩家: {players_80_100}")
    
    # 统计分数大于90的玩家数量
    high_score_count = leaderboard.count_by_score(90, '+inf')
    print(f"高分玩家数量: {high_score_count}")

6.3.2 游戏排行榜系统

import redis
import time
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum

class LeaderboardType(Enum):
    DAILY = "daily"
    WEEKLY = "weekly"
    MONTHLY = "monthly"
    ALL_TIME = "all_time"

@dataclass
class PlayerScore:
    player_id: str
    score: float
    rank: int
    timestamp: float

@dataclass
class LeaderboardEntry:
    player_id: str
    player_name: str
    score: float
    rank: int
    change: int = 0  # 排名变化

class GameLeaderboard:
    def __init__(self, redis_client: redis.Redis, game_id: str):
        self.redis_client = redis_client
        self.game_id = game_id
        self.leaderboards = {
            LeaderboardType.DAILY: f"leaderboard:{game_id}:daily",
            LeaderboardType.WEEKLY: f"leaderboard:{game_id}:weekly", 
            LeaderboardType.MONTHLY: f"leaderboard:{game_id}:monthly",
            LeaderboardType.ALL_TIME: f"leaderboard:{game_id}:all_time"
        }
        self.player_info_key = f"players:{game_id}"
        self.score_history_prefix = f"score_history:{game_id}"
    
    def submit_score(self, player_id: str, player_name: str, score: float) -> Dict[str, Any]:
        """提交玩家分数"""
        timestamp = time.time()
        
        # 存储玩家信息
        self.redis_client.hset(self.player_info_key, player_id, player_name)
        
        # 获取当前排名(用于计算变化)
        old_ranks = {}
        for lb_type in LeaderboardType:
            old_rank = self.redis_client.zrevrank(self.leaderboards[lb_type], player_id)
            old_ranks[lb_type] = old_rank + 1 if old_rank is not None else None
        
        # 更新各个排行榜
        pipe = self.redis_client.pipeline()
        
        # 只有当新分数更高时才更新
        for lb_type in LeaderboardType:
            key = self.leaderboards[lb_type]
            current_score = self.redis_client.zscore(key, player_id)
            
            if current_score is None or score > current_score:
                pipe.zadd(key, {player_id: score})
        
        # 记录分数历史
        history_key = f"{self.score_history_prefix}:{player_id}"
        pipe.zadd(history_key, {str(timestamp): score})
        
        # 设置过期时间
        pipe.expire(self.leaderboards[LeaderboardType.DAILY], 86400)  # 1天
        pipe.expire(self.leaderboards[LeaderboardType.WEEKLY], 604800)  # 7天
        pipe.expire(self.leaderboards[LeaderboardType.MONTHLY], 2592000)  # 30天
        pipe.expire(history_key, 2592000)  # 30天
        
        pipe.execute()
        
        # 获取新排名
        new_ranks = {}
        rank_changes = {}
        
        for lb_type in LeaderboardType:
            new_rank = self.redis_client.zrevrank(self.leaderboards[lb_type], player_id)
            new_ranks[lb_type] = new_rank + 1 if new_rank is not None else None
            
            if old_ranks[lb_type] and new_ranks[lb_type]:
                rank_changes[lb_type] = old_ranks[lb_type] - new_ranks[lb_type]
            else:
                rank_changes[lb_type] = 0
        
        return {
            'player_id': player_id,
            'score': score,
            'timestamp': timestamp,
            'old_ranks': old_ranks,
            'new_ranks': new_ranks,
            'rank_changes': rank_changes
        }
    
    def get_leaderboard(self, lb_type: LeaderboardType, start: int = 0, 
                       end: int = 9, with_rank_change: bool = False) -> List[LeaderboardEntry]:
        """获取排行榜"""
        key = self.leaderboards[lb_type]
        
        # 获取排行榜数据
        results = self.redis_client.zrevrange(key, start, end, withscores=True)
        
        entries = []
        for i, (player_id, score) in enumerate(results):
            # 获取玩家名称
            player_name = self.redis_client.hget(self.player_info_key, player_id)
            if not player_name:
                player_name = f"Player_{player_id}"
            
            rank = start + i + 1
            change = 0
            
            # 计算排名变化(如果需要)
            if with_rank_change:
                change = self._calculate_rank_change(player_id, lb_type, rank)
            
            entries.append(LeaderboardEntry(
                player_id=player_id,
                player_name=player_name,
                score=score,
                rank=rank,
                change=change
            ))
        
        return entries
    
    def get_player_rank(self, player_id: str, lb_type: LeaderboardType) -> Optional[PlayerScore]:
        """获取玩家排名信息"""
        key = self.leaderboards[lb_type]
        
        rank = self.redis_client.zrevrank(key, player_id)
        score = self.redis_client.zscore(key, player_id)
        
        if rank is None or score is None:
            return None
        
        return PlayerScore(
            player_id=player_id,
            score=score,
            rank=rank + 1,
            timestamp=time.time()
        )
    
    def get_players_around(self, player_id: str, lb_type: LeaderboardType, 
                          range_size: int = 5) -> List[LeaderboardEntry]:
        """获取玩家周围的排名"""
        key = self.leaderboards[lb_type]
        
        player_rank = self.redis_client.zrevrank(key, player_id)
        if player_rank is None:
            return []
        
        start = max(0, player_rank - range_size)
        end = player_rank + range_size
        
        return self.get_leaderboard(lb_type, start, end)
    
    def get_score_history(self, player_id: str, days: int = 7) -> List[Tuple[datetime, float]]:
        """获取玩家分数历史"""
        history_key = f"{self.score_history_prefix}:{player_id}"
        
        # 计算时间范围
        end_time = time.time()
        start_time = end_time - (days * 86400)
        
        # 获取历史数据
        results = self.redis_client.zrangebyscore(
            history_key, start_time, end_time, withscores=True
        )
        
        history = []
        for timestamp_str, score in results:
            timestamp = datetime.fromtimestamp(float(timestamp_str))
            history.append((timestamp, score))
        
        return history
    
    def get_leaderboard_stats(self, lb_type: LeaderboardType) -> Dict[str, Any]:
        """获取排行榜统计信息"""
        key = self.leaderboards[lb_type]
        
        total_players = self.redis_client.zcard(key)
        if total_players == 0:
            return {'total_players': 0}
        
        # 获取分数统计
        all_scores = [score for _, score in self.redis_client.zrange(key, 0, -1, withscores=True)]
        
        return {
            'total_players': total_players,
            'highest_score': max(all_scores),
            'lowest_score': min(all_scores),
            'average_score': sum(all_scores) / len(all_scores),
            'median_score': sorted(all_scores)[len(all_scores) // 2]
        }
    
    def reset_leaderboard(self, lb_type: LeaderboardType) -> bool:
        """重置排行榜"""
        key = self.leaderboards[lb_type]
        return self.redis_client.delete(key) > 0
    
    def _calculate_rank_change(self, player_id: str, lb_type: LeaderboardType, current_rank: int) -> int:
        """计算排名变化(简化实现)"""
        # 这里可以实现更复杂的排名变化计算逻辑
        # 比如与昨天的排名比较
        return 0
    
    def get_top_players_by_period(self, period_hours: int = 24, limit: int = 10) -> List[LeaderboardEntry]:
        """获取指定时间段内的顶级玩家"""
        # 创建临时排行榜
        temp_key = f"temp_leaderboard:{self.game_id}:{int(time.time())}"
        
        try:
            # 获取时间范围内的所有分数记录
            end_time = time.time()
            start_time = end_time - (period_hours * 3600)
            
            # 扫描所有玩家的分数历史
            for key in self.redis_client.scan_iter(match=f"{self.score_history_prefix}:*"):
                player_id = key.split(':')[-1]
                
                # 获取该时间段内的最高分数
                scores = self.redis_client.zrangebyscore(
                    key, start_time, end_time, withscores=True
                )
                
                if scores:
                    max_score = max(score for _, score in scores)
                    self.redis_client.zadd(temp_key, {player_id: max_score})
            
            # 获取排行榜
            results = self.redis_client.zrevrange(temp_key, 0, limit-1, withscores=True)
            
            entries = []
            for i, (player_id, score) in enumerate(results):
                player_name = self.redis_client.hget(self.player_info_key, player_id) or f"Player_{player_id}"
                entries.append(LeaderboardEntry(
                    player_id=player_id,
                    player_name=player_name,
                    score=score,
                    rank=i + 1
                ))
            
            return entries
        
        finally:
            # 清理临时键
            self.redis_client.delete(temp_key)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    leaderboard = GameLeaderboard(redis_client, "puzzle_game")
    
    # 提交分数
    result = leaderboard.submit_score("user123", "Alice", 1500)
    print(f"分数提交结果: {result}")
    
    result = leaderboard.submit_score("user456", "Bob", 1200)
    print(f"分数提交结果: {result}")
    
    result = leaderboard.submit_score("user789", "Charlie", 1800)
    print(f"分数提交结果: {result}")
    
    # 获取日榜前10名
    daily_top = leaderboard.get_leaderboard(LeaderboardType.DAILY, 0, 9)
    print("\n日榜前10名:")
    for entry in daily_top:
        print(f"{entry.rank}. {entry.player_name}: {entry.score}")
    
    # 获取玩家排名
    player_rank = leaderboard.get_player_rank("user123", LeaderboardType.DAILY)
    if player_rank:
        print(f"\nAlice的排名: {player_rank.rank}, 分数: {player_rank.score}")
    
    # 获取玩家周围的排名
    around_players = leaderboard.get_players_around("user123", LeaderboardType.DAILY, 2)
    print("\nAlice周围的玩家:")
    for entry in around_players:
        print(f"{entry.rank}. {entry.player_name}: {entry.score}")
    
    # 获取排行榜统计
    stats = leaderboard.get_leaderboard_stats(LeaderboardType.DAILY)
    print(f"\n排行榜统计: {stats}")

6.4 实际应用场景

6.4.1 任务优先级队列

import redis
import json
import time
from typing import Optional, List, Dict, Any
from dataclasses import dataclass, asdict
from enum import Enum

class TaskPriority(Enum):
    LOW = 1
    NORMAL = 5
    HIGH = 10
    URGENT = 20
    CRITICAL = 50

class TaskStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    task_id: str
    task_type: str
    payload: Dict[str, Any]
    priority: TaskPriority
    created_at: float
    scheduled_at: float = None
    max_retries: int = 3
    retry_count: int = 0
    timeout: int = 300  # 5分钟超时

class PriorityTaskQueue:
    def __init__(self, redis_client: redis.Redis, queue_name: str):
        self.redis_client = redis_client
        self.queue_name = queue_name
        self.pending_queue = f"queue:{queue_name}:pending"
        self.processing_queue = f"queue:{queue_name}:processing"
        self.completed_queue = f"queue:{queue_name}:completed"
        self.failed_queue = f"queue:{queue_name}:failed"
        self.task_data_prefix = f"task_data:{queue_name}"
        self.worker_heartbeat_prefix = f"worker_heartbeat:{queue_name}"
    
    def enqueue(self, task: Task) -> bool:
        """入队任务"""
        try:
            # 存储任务数据
            task_key = f"{self.task_data_prefix}:{task.task_id}"
            task_data = asdict(task)
            task_data['priority'] = task.priority.value  # 转换枚举为值
            
            self.redis_client.setex(task_key, 3600, json.dumps(task_data))  # 1小时过期
            
            # 计算优先级分数(优先级 + 时间戳的倒数,确保高优先级和早创建的任务优先)
            # 使用负的时间戳,这样早创建的任务分数更高
            score = task.priority.value * 1000000 - task.created_at
            
            # 如果是延迟任务,使用调度时间
            if task.scheduled_at and task.scheduled_at > time.time():
                score = task.scheduled_at
            
            # 添加到待处理队列
            self.redis_client.zadd(self.pending_queue, {task.task_id: score})
            
            return True
        except Exception as e:
            print(f"入队失败: {e}")
            return False
    
    def dequeue(self, worker_id: str, timeout: int = 10) -> Optional[Task]:
        """出队任务(阻塞式)"""
        try:
            # 使用BZPOPMIN进行阻塞式出队
            result = self.redis_client.bzpopmin(self.pending_queue, timeout=timeout)
            
            if not result:
                return None
            
            _, task_id, score = result
            
            # 检查是否是延迟任务
            if score > time.time():
                # 重新放回队列
                self.redis_client.zadd(self.pending_queue, {task_id: score})
                return None
            
            # 获取任务数据
            task = self._get_task_data(task_id)
            if not task:
                return None
            
            # 移动到处理队列
            processing_score = time.time() + task.timeout
            self.redis_client.zadd(self.processing_queue, {task_id: processing_score})
            
            # 记录工作者心跳
            self._update_worker_heartbeat(worker_id, task_id)
            
            return task
        
        except Exception as e:
            print(f"出队失败: {e}")
            return None
    
    def complete_task(self, task_id: str, worker_id: str, result: Dict[str, Any] = None) -> bool:
        """完成任务"""
        try:
            # 从处理队列移除
            self.redis_client.zrem(self.processing_queue, task_id)
            
            # 添加到完成队列
            self.redis_client.zadd(self.completed_queue, {task_id: time.time()})
            
            # 存储结果
            if result:
                result_key = f"{self.task_data_prefix}:{task_id}:result"
                self.redis_client.setex(result_key, 3600, json.dumps(result))
            
            # 清理工作者心跳
            self._clear_worker_heartbeat(worker_id)
            
            return True
        
        except Exception as e:
            print(f"完成任务失败: {e}")
            return False
    
    def fail_task(self, task_id: str, worker_id: str, error: str, retry: bool = True) -> bool:
        """任务失败"""
        try:
            task = self._get_task_data(task_id)
            if not task:
                return False
            
            # 从处理队列移除
            self.redis_client.zrem(self.processing_queue, task_id)
            
            # 检查是否需要重试
            if retry and task.retry_count < task.max_retries:
                # 增加重试次数
                task.retry_count += 1
                
                # 更新任务数据
                task_key = f"{self.task_data_prefix}:{task_id}"
                task_data = asdict(task)
                task_data['priority'] = task.priority.value
                self.redis_client.setex(task_key, 3600, json.dumps(task_data))
                
                # 重新入队(降低优先级)
                retry_score = task.priority.value * 1000000 - time.time() - (task.retry_count * 1000)
                self.redis_client.zadd(self.pending_queue, {task_id: retry_score})
            else:
                # 移动到失败队列
                self.redis_client.zadd(self.failed_queue, {task_id: time.time()})
                
                # 存储错误信息
                error_key = f"{self.task_data_prefix}:{task_id}:error"
                self.redis_client.setex(error_key, 3600, error)
            
            # 清理工作者心跳
            self._clear_worker_heartbeat(worker_id)
            
            return True
        
        except Exception as e:
            print(f"任务失败处理失败: {e}")
            return False
    
    def get_queue_stats(self) -> Dict[str, int]:
        """获取队列统计"""
        return {
            'pending': self.redis_client.zcard(self.pending_queue),
            'processing': self.redis_client.zcard(self.processing_queue),
            'completed': self.redis_client.zcard(self.completed_queue),
            'failed': self.redis_client.zcard(self.failed_queue)
        }
    
    def get_pending_tasks(self, limit: int = 10) -> List[Dict[str, Any]]:
        """获取待处理任务列表"""
        tasks = []
        results = self.redis_client.zrange(self.pending_queue, 0, limit-1, withscores=True)
        
        for task_id, score in results:
            task = self._get_task_data(task_id)
            if task:
                task_info = asdict(task)
                task_info['score'] = score
                task_info['scheduled_time'] = score if score > time.time() else None
                tasks.append(task_info)
        
        return tasks
    
    def cleanup_timeout_tasks(self) -> int:
        """清理超时任务"""
        current_time = time.time()
        
        # 获取超时的任务
        timeout_tasks = self.redis_client.zrangebyscore(
            self.processing_queue, 0, current_time
        )
        
        cleaned_count = 0
        for task_id in timeout_tasks:
            # 重新入队或标记失败
            self.fail_task(task_id, "system", "Task timeout", retry=True)
            cleaned_count += 1
        
        return cleaned_count
    
    def _get_task_data(self, task_id: str) -> Optional[Task]:
        """获取任务数据"""
        task_key = f"{self.task_data_prefix}:{task_id}"
        task_data = self.redis_client.get(task_key)
        
        if not task_data:
            return None
        
        try:
            data = json.loads(task_data)
            data['priority'] = TaskPriority(data['priority'])  # 转换回枚举
            return Task(**data)
        except Exception as e:
            print(f"解析任务数据失败: {e}")
            return None
    
    def _update_worker_heartbeat(self, worker_id: str, task_id: str):
        """更新工作者心跳"""
        heartbeat_key = f"{self.worker_heartbeat_prefix}:{worker_id}"
        heartbeat_data = {
            'task_id': task_id,
            'timestamp': time.time()
        }
        self.redis_client.setex(heartbeat_key, 60, json.dumps(heartbeat_data))
    
    def _clear_worker_heartbeat(self, worker_id: str):
        """清理工作者心跳"""
        heartbeat_key = f"{self.worker_heartbeat_prefix}:{worker_id}"
        self.redis_client.delete(heartbeat_key)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    queue = PriorityTaskQueue(redis_client, "email_queue")
    
    # 创建任务
    task1 = Task(
        task_id="task_001",
        task_type="send_email",
        payload={"to": "user@example.com", "subject": "Welcome"},
        priority=TaskPriority.NORMAL,
        created_at=time.time()
    )
    
    task2 = Task(
        task_id="task_002",
        task_type="send_sms",
        payload={"phone": "+1234567890", "message": "Verification code: 123456"},
        priority=TaskPriority.URGENT,
        created_at=time.time()
    )
    
    # 延迟任务
    task3 = Task(
        task_id="task_003",
        task_type="send_reminder",
        payload={"user_id": "123", "message": "Meeting in 1 hour"},
        priority=TaskPriority.HIGH,
        created_at=time.time(),
        scheduled_at=time.time() + 3600  # 1小时后执行
    )
    
    # 入队任务
    queue.enqueue(task1)
    queue.enqueue(task2)
    queue.enqueue(task3)
    
    print(f"队列统计: {queue.get_queue_stats()}")
    
    # 模拟工作者处理任务
    worker_id = "worker_001"
    
    # 出队任务(优先级高的先出队)
    task = queue.dequeue(worker_id, timeout=1)
    if task:
        print(f"处理任务: {task.task_id}, 类型: {task.task_type}, 优先级: {task.priority}")
        
        # 模拟任务处理
        time.sleep(1)
        
        # 完成任务
        queue.complete_task(task.task_id, worker_id, {"status": "sent"})
        print(f"任务 {task.task_id} 完成")
    
    # 获取待处理任务
    pending_tasks = queue.get_pending_tasks(5)
    print(f"\n待处理任务: {len(pending_tasks)}")
    for task_info in pending_tasks:
        print(f"- {task_info['task_id']}: {task_info['task_type']} (优先级: {task_info['priority']})")
    
    print(f"\n最终队列统计: {queue.get_queue_stats()}")

6.4.2 时间序列数据存储

import redis
import time
import json
from typing import List, Dict, Any, Optional, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum

class MetricType(Enum):
    COUNTER = "counter"
    GAUGE = "gauge"
    HISTOGRAM = "histogram"

@dataclass
class DataPoint:
    timestamp: float
    value: float
    tags: Dict[str, str] = None

@dataclass
class MetricSummary:
    count: int
    sum: float
    min: float
    max: float
    avg: float

class TimeSeriesDB:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.metrics_prefix = "metrics"
        self.metadata_prefix = "metadata"
    
    def _get_metric_key(self, metric_name: str, tags: Dict[str, str] = None) -> str:
        """生成指标键名"""
        if tags:
            tag_str = ":".join([f"{k}={v}" for k, v in sorted(tags.items())])
            return f"{self.metrics_prefix}:{metric_name}:{tag_str}"
        return f"{self.metrics_prefix}:{metric_name}"
    
    def write_point(self, metric_name: str, value: float, 
                   timestamp: float = None, tags: Dict[str, str] = None) -> bool:
        """写入数据点"""
        if timestamp is None:
            timestamp = time.time()
        
        try:
            key = self._get_metric_key(metric_name, tags)
            
            # 使用时间戳作为分数,值作为成员
            # 为了支持相同时间戳的多个值,我们在时间戳后添加微秒
            score = timestamp
            member = json.dumps({
                'value': value,
                'timestamp': timestamp,
                'tags': tags or {}
            })
            
            self.redis_client.zadd(key, {member: score})
            
            # 存储元数据
            self._update_metadata(metric_name, tags)
            
            return True
        
        except Exception as e:
            print(f"写入数据点失败: {e}")
            return False
    
    def write_points(self, metric_name: str, points: List[DataPoint], 
                    tags: Dict[str, str] = None) -> int:
        """批量写入数据点"""
        key = self._get_metric_key(metric_name, tags)
        
        try:
            pipe = self.redis_client.pipeline()
            
            for point in points:
                combined_tags = {**(tags or {}), **(point.tags or {})}
                point_key = self._get_metric_key(metric_name, combined_tags)
                
                member = json.dumps({
                    'value': point.value,
                    'timestamp': point.timestamp,
                    'tags': combined_tags
                })
                
                pipe.zadd(point_key, {member: point.timestamp})
            
            results = pipe.execute()
            return sum(results)
        
        except Exception as e:
            print(f"批量写入失败: {e}")
            return 0
    
    def query_range(self, metric_name: str, start_time: float, end_time: float,
                   tags: Dict[str, str] = None, limit: int = None) -> List[DataPoint]:
        """查询时间范围内的数据"""
        key = self._get_metric_key(metric_name, tags)
        
        try:
            # 查询指定时间范围的数据
            kwargs = {'withscores': True}
            if limit:
                kwargs['start'] = 0
                kwargs['num'] = limit
            
            results = self.redis_client.zrangebyscore(
                key, start_time, end_time, **kwargs
            )
            
            points = []
            for member, score in results:
                try:
                    data = json.loads(member)
                    points.append(DataPoint(
                        timestamp=data['timestamp'],
                        value=data['value'],
                        tags=data.get('tags', {})
                    ))
                except json.JSONDecodeError:
                    continue
            
            return points
        
        except Exception as e:
            print(f"查询失败: {e}")
            return []
    
    def query_latest(self, metric_name: str, count: int = 1,
                    tags: Dict[str, str] = None) -> List[DataPoint]:
        """查询最新的数据点"""
        key = self._get_metric_key(metric_name, tags)
        
        try:
            results = self.redis_client.zrevrange(key, 0, count-1, withscores=True)
            
            points = []
            for member, score in results:
                try:
                    data = json.loads(member)
                    points.append(DataPoint(
                        timestamp=data['timestamp'],
                        value=data['value'],
                        tags=data.get('tags', {})
                    ))
                except json.JSONDecodeError:
                    continue
            
            return points
        
        except Exception as e:
            print(f"查询最新数据失败: {e}")
            return []
    
    def aggregate(self, metric_name: str, start_time: float, end_time: float,
                 tags: Dict[str, str] = None) -> Optional[MetricSummary]:
        """聚合统计"""
        points = self.query_range(metric_name, start_time, end_time, tags)
        
        if not points:
            return None
        
        values = [p.value for p in points]
        
        return MetricSummary(
            count=len(values),
            sum=sum(values),
            min=min(values),
            max=max(values),
            avg=sum(values) / len(values)
        )
    
    def downsample(self, metric_name: str, start_time: float, end_time: float,
                  interval: int, aggregation: str = 'avg',
                  tags: Dict[str, str] = None) -> List[Tuple[float, float]]:
        """降采样"""
        points = self.query_range(metric_name, start_time, end_time, tags)
        
        if not points:
            return []
        
        # 按时间间隔分组
        buckets = {}
        for point in points:
            bucket_time = int(point.timestamp // interval) * interval
            if bucket_time not in buckets:
                buckets[bucket_time] = []
            buckets[bucket_time].append(point.value)
        
        # 聚合每个桶的数据
        result = []
        for bucket_time in sorted(buckets.keys()):
            values = buckets[bucket_time]
            
            if aggregation == 'avg':
                agg_value = sum(values) / len(values)
            elif aggregation == 'sum':
                agg_value = sum(values)
            elif aggregation == 'min':
                agg_value = min(values)
            elif aggregation == 'max':
                agg_value = max(values)
            elif aggregation == 'count':
                agg_value = len(values)
            else:
                agg_value = sum(values) / len(values)  # 默认平均值
            
            result.append((bucket_time, agg_value))
        
        return result
    
    def delete_range(self, metric_name: str, start_time: float, end_time: float,
                    tags: Dict[str, str] = None) -> int:
        """删除时间范围内的数据"""
        key = self._get_metric_key(metric_name, tags)
        
        try:
            return self.redis_client.zremrangebyscore(key, start_time, end_time)
        except Exception as e:
            print(f"删除数据失败: {e}")
            return 0
    
    def cleanup_old_data(self, metric_name: str, retention_days: int,
                        tags: Dict[str, str] = None) -> int:
        """清理旧数据"""
        cutoff_time = time.time() - (retention_days * 86400)
        return self.delete_range(metric_name, 0, cutoff_time, tags)
    
    def get_metrics_list(self) -> List[str]:
        """获取所有指标列表"""
        pattern = f"{self.metrics_prefix}:*"
        keys = self.redis_client.keys(pattern)
        
        metrics = set()
        for key in keys:
            # 提取指标名称
            parts = key.split(':')
            if len(parts) >= 2:
                metrics.add(parts[1])
        
        return list(metrics)
    
    def get_metric_info(self, metric_name: str) -> Dict[str, Any]:
        """获取指标信息"""
        pattern = f"{self.metrics_prefix}:{metric_name}*"
        keys = self.redis_client.keys(pattern)
        
        total_points = 0
        series_count = len(keys)
        earliest_time = None
        latest_time = None
        
        for key in keys:
            count = self.redis_client.zcard(key)
            total_points += count
            
            if count > 0:
                # 获取最早和最晚的时间戳
                earliest = self.redis_client.zrange(key, 0, 0, withscores=True)
                latest = self.redis_client.zrange(key, -1, -1, withscores=True)
                
                if earliest:
                    _, early_score = earliest[0]
                    if earliest_time is None or early_score < earliest_time:
                        earliest_time = early_score
                
                if latest:
                    _, late_score = latest[0]
                    if latest_time is None or late_score > latest_time:
                        latest_time = late_score
        
        return {
            'metric_name': metric_name,
            'series_count': series_count,
            'total_points': total_points,
            'earliest_time': earliest_time,
            'latest_time': latest_time,
            'time_range_hours': (latest_time - earliest_time) / 3600 if earliest_time and latest_time else 0
        }
    
    def _update_metadata(self, metric_name: str, tags: Dict[str, str] = None):
        """更新指标元数据"""
        metadata_key = f"{self.metadata_prefix}:{metric_name}"
        metadata = {
            'last_updated': time.time(),
            'tags': list(tags.keys()) if tags else []
        }
        self.redis_client.setex(metadata_key, 86400, json.dumps(metadata))

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    tsdb = TimeSeriesDB(redis_client)
    
    # 写入CPU使用率数据
    current_time = time.time()
    
    # 单点写入
    tsdb.write_point("cpu_usage", 75.5, current_time, {"host": "server1", "cpu": "0"})
    tsdb.write_point("cpu_usage", 82.3, current_time + 60, {"host": "server1", "cpu": "0"})
    
    # 批量写入
    points = [
        DataPoint(current_time + 120, 68.9, {"host": "server1", "cpu": "0"}),
        DataPoint(current_time + 180, 91.2, {"host": "server1", "cpu": "0"}),
        DataPoint(current_time + 240, 77.8, {"host": "server1", "cpu": "0"})
    ]
    tsdb.write_points("cpu_usage", points)
    
    # 查询数据
    data_points = tsdb.query_range(
        "cpu_usage", 
        current_time, 
        current_time + 300,
        {"host": "server1", "cpu": "0"}
    )
    
    print(f"查询到 {len(data_points)} 个数据点:")
    for point in data_points:
        dt = datetime.fromtimestamp(point.timestamp)
        print(f"  {dt.strftime('%H:%M:%S')}: {point.value}%")
    
    # 聚合统计
    summary = tsdb.aggregate(
        "cpu_usage",
        current_time,
        current_time + 300,
        {"host": "server1", "cpu": "0"}
    )
    
    if summary:
        print(f"\n统计信息:")
        print(f"  数据点数: {summary.count}")
        print(f"  平均值: {summary.avg:.2f}%")
        print(f"  最小值: {summary.min:.2f}%")
        print(f"  最大值: {summary.max:.2f}%")
    
    # 降采样(每2分钟一个点)
    downsampled = tsdb.downsample(
        "cpu_usage",
        current_time,
        current_time + 300,
        120,  # 2分钟间隔
        'avg',
        {"host": "server1", "cpu": "0"}
    )
    
    print(f"\n降采样结果 (2分钟间隔):")
    for timestamp, value in downsampled:
        dt = datetime.fromtimestamp(timestamp)
        print(f"  {dt.strftime('%H:%M:%S')}: {value:.2f}%")
    
    # 获取指标信息
    info = tsdb.get_metric_info("cpu_usage")
    print(f"\n指标信息: {info}")

6.5 性能优化

6.5.1 有序集合编码优化

# 查看有序集合编码
redis-cli
> OBJECT ENCODING myzset

# 配置优化参数
# redis.conf
zset-max-ziplist-entries 128    # ziplist最大元素数
zset-max-ziplist-value 64       # ziplist最大值长度

6.5.2 内存优化

import redis
from typing import List, Dict, Any, Iterator, Tuple, Union

class OptimizedRedisSortedSet:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def batch_add(self, key: str, score_members: Dict[str, float], batch_size: int = 1000):
        """批量添加成员"""
        items = list(score_members.items())
        
        for i in range(0, len(items), batch_size):
            batch = dict(items[i:i + batch_size])
            self.redis_client.zadd(key, batch)
    
    def memory_efficient_scan(self, key: str, pattern: str = "*", count: int = 10) -> Iterator[Tuple[str, float]]:
        """内存高效的扫描"""
        cursor = 0
        while True:
            cursor, members = self.redis_client.zscan(key, cursor, match=pattern, count=count)
            for member, score in members:
                yield member, score
            if cursor == 0:
                break
    
    def trim_by_rank(self, key: str, start: int, end: int, backup_key: str = None) -> int:
        """按排名修剪(保留指定排名范围)"""
        if backup_key:
            # 备份被删除的数据
            removed_data = self.redis_client.zrange(key, 0, start-1, withscores=True)
            removed_data.extend(self.redis_client.zrange(key, end+1, -1, withscores=True))
            if removed_data:
                backup_dict = {member: score for member, score in removed_data}
                self.redis_client.zadd(backup_key, backup_dict)
        
        # 删除指定范围外的数据
        removed1 = self.redis_client.zremrangebyrank(key, 0, start-1) if start > 0 else 0
        removed2 = self.redis_client.zremrangebyrank(key, end+1-removed1, -1)
        return removed1 + removed2
    
    def compress_scores(self, key: str, precision: int = 2) -> int:
        """压缩分数精度以节省内存"""
        all_data = self.redis_client.zrange(key, 0, -1, withscores=True)
        
        if not all_data:
            return 0
        
        # 清空原数据
        self.redis_client.delete(key)
        
        # 重新添加压缩后的数据
        compressed_data = {}
        for member, score in all_data:
            compressed_score = round(score, precision)
            compressed_data[member] = compressed_score
        
        self.redis_client.zadd(key, compressed_data)
        return len(compressed_data)

class PipelinedSortedSetOperations:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def batch_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
        """批量操作"""
        pipe = self.redis_client.pipeline()
        
        for op in operations:
            op_type = op['type']
            key = op['key']
            
            if op_type == 'zadd':
                pipe.zadd(key, op['score_members'])
            elif op_type == 'zrem':
                pipe.zrem(key, *op['members'])
            elif op_type == 'zincrby':
                pipe.zincrby(key, op['amount'], op['member'])
            elif op_type == 'zrange':
                pipe.zrange(key, op['start'], op['end'], withscores=op.get('withscores', False))
            elif op_type == 'zrank':
                pipe.zrank(key, op['member'])
        
        return pipe.execute()
    
    def multi_set_stats(self, keys: List[str]) -> Dict[str, Dict[str, Any]]:
        """多集合统计"""
        pipe = self.redis_client.pipeline()
        
        for key in keys:
            pipe.zcard(key)  # 大小
            pipe.zrange(key, 0, 0, withscores=True)  # 最小值
            pipe.zrange(key, -1, -1, withscores=True)  # 最大值
        
        results = pipe.execute()
        
        stats = {}
        for i, key in enumerate(keys):
            base_idx = i * 3
            size = results[base_idx]
            min_data = results[base_idx + 1]
            max_data = results[base_idx + 2]
            
            stats[key] = {
                'size': size,
                'min_score': min_data[0][1] if min_data else None,
                'max_score': max_data[0][1] if max_data else None
            }
        
        return stats

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    optimized_zset = OptimizedRedisSortedSet(redis_client)
    
    # 批量添加大量数据
    large_dataset = {f"member_{i}": i * 1.5 for i in range(10000)}
    optimized_zset.batch_add("large_leaderboard", large_dataset, batch_size=500)
    
    # 内存高效扫描
    print("扫描前100个成员:")
    count = 0
    for member, score in optimized_zset.memory_efficient_scan("large_leaderboard", count=100):
        if count < 5:  # 只显示前5个
            print(f"  {member}: {score}")
        count += 1
        if count >= 100:
            break
    
    # 保留前1000名,其余数据备份后删除
    removed_count = optimized_zset.trim_by_rank(
        "large_leaderboard", 0, 999, "leaderboard_backup"
    )
    print(f"删除了 {removed_count} 个成员")
    
    # 压缩分数精度
    compressed_count = optimized_zset.compress_scores("large_leaderboard", precision=1)
    print(f"压缩了 {compressed_count} 个成员的分数")

6.5.3 管道化操作

# 管道化操作示例
pipelined_ops = PipelinedSortedSetOperations(redis_client)

# 批量操作
operations = [
    {'type': 'zadd', 'key': 'scores1', 'score_members': {'player1': 100, 'player2': 200}},
    {'type': 'zadd', 'key': 'scores2', 'score_members': {'player3': 150, 'player4': 250}},
    {'type': 'zrange', 'key': 'scores1', 'start': 0, 'end': -1, 'withscores': True},
    {'type': 'zrank', 'key': 'scores1', 'member': 'player1'}
]

results = pipelined_ops.batch_operations(operations)
print(f"批量操作结果: {results}")

# 多集合统计
stats = pipelined_ops.multi_set_stats(['scores1', 'scores2', 'large_leaderboard'])
print(f"多集合统计: {stats}")

6.6 监控和调试

6.6.1 有序集合监控

import redis
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass

@dataclass
class SortedSetInfo:
    key: str
    size: int
    memory_usage: int
    encoding: str
    min_score: Optional[float]
    max_score: Optional[float]
    score_range: Optional[float]

class SortedSetMonitor:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def get_sortedset_info(self, key: str) -> Optional[SortedSetInfo]:
        """获取有序集合详细信息"""
        if not self.redis_client.exists(key):
            return None
        
        try:
            size = self.redis_client.zcard(key)
            memory_usage = self.redis_client.memory_usage(key) or 0
            encoding = self.redis_client.object('encoding', key)
            
            min_score = None
            max_score = None
            score_range = None
            
            if size > 0:
                min_data = self.redis_client.zrange(key, 0, 0, withscores=True)
                max_data = self.redis_client.zrange(key, -1, -1, withscores=True)
                
                if min_data and max_data:
                    min_score = min_data[0][1]
                    max_score = max_data[0][1]
                    score_range = max_score - min_score
            
            return SortedSetInfo(
                key=key,
                size=size,
                memory_usage=memory_usage,
                encoding=encoding,
                min_score=min_score,
                max_score=max_score,
                score_range=score_range
            )
        
        except Exception as e:
            print(f"获取有序集合信息失败: {e}")
            return None
    
    def analyze_score_distribution(self, key: str, buckets: int = 10) -> Dict[str, Any]:
        """分析分数分布"""
        info = self.get_sortedset_info(key)
        if not info or info.size == 0:
            return {'error': '集合为空或不存在'}
        
        if info.score_range == 0:
            return {
                'buckets': [{'range': f'{info.min_score}', 'count': info.size}],
                'total': info.size
            }
        
        # 计算分桶
        bucket_size = info.score_range / buckets
        distribution = []
        
        for i in range(buckets):
            min_range = info.min_score + (i * bucket_size)
            max_range = info.min_score + ((i + 1) * bucket_size)
            
            if i == buckets - 1:  # 最后一个桶包含最大值
                count = self.redis_client.zcount(key, min_range, max_range)
            else:
                count = self.redis_client.zcount(key, min_range, f'({max_range}')
            
            distribution.append({
                'range': f'{min_range:.2f} - {max_range:.2f}',
                'count': count,
                'percentage': (count / info.size) * 100
            })
        
        return {
            'buckets': distribution,
            'total': info.size,
            'min_score': info.min_score,
            'max_score': info.max_score
        }
    
    def monitor_operations(self, key: str, duration: int = 60) -> Dict[str, Any]:
        """监控操作性能"""
        start_time = time.time()
        operations = {
            'zadd': 0,
            'zrem': 0,
            'zrange': 0,
            'zrank': 0,
            'zscore': 0
        }
        
        # 这里应该集成Redis的监控命令或使用Redis模块
        # 简化实现,只测试基本操作性能
        
        test_data = {f'test_member_{i}': i for i in range(1000)}
        
        # 测试添加性能
        add_start = time.time()
        self.redis_client.zadd(f'{key}_test', test_data)
        add_time = time.time() - add_start
        
        # 测试查询性能
        query_start = time.time()
        self.redis_client.zrange(f'{key}_test', 0, 99)
        query_time = time.time() - query_start
        
        # 测试排名查询性能
        rank_start = time.time()
        for i in range(100):
            self.redis_client.zrank(f'{key}_test', f'test_member_{i}')
        rank_time = time.time() - rank_start
        
        # 清理测试数据
        self.redis_client.delete(f'{key}_test')
        
        return {
            'duration': duration,
            'performance': {
                'add_1000_members': f'{add_time:.4f}s',
                'query_100_members': f'{query_time:.4f}s',
                'rank_100_queries': f'{rank_time:.4f}s'
            },
            'operations_per_second': {
                'add': 1000 / add_time,
                'query': 100 / query_time,
                'rank': 100 / rank_time
            }
        }
    
    def diagnose_issues(self, key: str) -> List[Dict[str, str]]:
        """诊断潜在问题"""
        issues = []
        info = self.get_sortedset_info(key)
        
        if not info:
            return [{'type': 'error', 'message': '集合不存在'}]
        
        # 检查大小
        if info.size > 100000:
            issues.append({
                'type': 'warning',
                'message': f'集合过大 ({info.size} 个成员),可能影响性能'
            })
        
        # 检查内存使用
        if info.memory_usage > 100 * 1024 * 1024:  # 100MB
            issues.append({
                'type': 'warning',
                'message': f'内存使用过高 ({info.memory_usage / 1024 / 1024:.2f}MB)'
            })
        
        # 检查编码
        if info.encoding == 'skiplist' and info.size < 128:
            issues.append({
                'type': 'info',
                'message': '小集合使用skiplist编码,考虑调整zset-max-ziplist-entries参数'
            })
        
        # 检查分数范围
        if info.score_range and info.score_range > 1e10:
            issues.append({
                'type': 'warning',
                'message': f'分数范围过大 ({info.score_range:.2e}),可能影响精度'
            })
        
        if not issues:
            issues.append({
                'type': 'info',
                'message': '未发现明显问题'
            })
        
        return issues
    
    def compare_sortedsets(self, key1: str, key2: str) -> Dict[str, Any]:
        """比较两个有序集合"""
        info1 = self.get_sortedset_info(key1)
        info2 = self.get_sortedset_info(key2)
        
        if not info1 or not info2:
            return {'error': '一个或多个集合不存在'}
        
        # 计算交集和差集大小
        temp_inter = f'temp_inter_{int(time.time())}'
        temp_diff1 = f'temp_diff1_{int(time.time())}'
        temp_diff2 = f'temp_diff2_{int(time.time())}'
        
        try:
            # 交集
            inter_size = self.redis_client.zinterstore(temp_inter, [key1, key2])
            
            # 差集 (key1 - key2)
            self.redis_client.zunionstore(temp_diff1, [key1])
            if inter_size > 0:
                # 从key1中减去交集
                inter_members = self.redis_client.zrange(temp_inter, 0, -1)
                if inter_members:
                    self.redis_client.zrem(temp_diff1, *inter_members)
            diff1_size = self.redis_client.zcard(temp_diff1)
            
            # 差集 (key2 - key1)
            self.redis_client.zunionstore(temp_diff2, [key2])
            if inter_size > 0:
                inter_members = self.redis_client.zrange(temp_inter, 0, -1)
                if inter_members:
                    self.redis_client.zrem(temp_diff2, *inter_members)
            diff2_size = self.redis_client.zcard(temp_diff2)
            
            return {
                'set1': {
                    'key': key1,
                    'size': info1.size,
                    'memory_usage': info1.memory_usage
                },
                'set2': {
                    'key': key2,
                    'size': info2.size,
                    'memory_usage': info2.memory_usage
                },
                'intersection_size': inter_size,
                'difference_sizes': {
                    f'{key1}_only': diff1_size,
                    f'{key2}_only': diff2_size
                },
                'similarity': inter_size / max(info1.size, info2.size) if max(info1.size, info2.size) > 0 else 0
            }
        
        finally:
            # 清理临时键
            self.redis_client.delete(temp_inter, temp_diff1, temp_diff2)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    monitor = SortedSetMonitor(redis_client)
    
    # 创建测试数据
    test_data = {f'player_{i}': i * 10 + (i % 7) * 0.5 for i in range(1000)}
    redis_client.zadd('test_leaderboard', test_data)
    
    # 获取集合信息
    info = monitor.get_sortedset_info('test_leaderboard')
    if info:
        print(f"集合信息:")
        print(f"  大小: {info.size}")
        print(f"  内存使用: {info.memory_usage} bytes")
        print(f"  编码: {info.encoding}")
        print(f"  分数范围: {info.min_score} - {info.max_score}")
    
    # 分析分数分布
    distribution = monitor.analyze_score_distribution('test_leaderboard', buckets=5)
    print(f"\n分数分布:")
    for bucket in distribution['buckets']:
        print(f"  {bucket['range']}: {bucket['count']} ({bucket['percentage']:.1f}%)")
    
    # 性能监控
    performance = monitor.monitor_operations('test_leaderboard')
    print(f"\n性能测试:")
    for op, time_taken in performance['performance'].items():
        print(f"  {op}: {time_taken}")
    
    # 诊断问题
    issues = monitor.diagnose_issues('test_leaderboard')
    print(f"\n诊断结果:")
    for issue in issues:
        print(f"  [{issue['type'].upper()}] {issue['message']}")
    
    # 清理
    redis_client.delete('test_leaderboard')

6.7 总结

Redis有序集合是一种功能强大的数据结构,结合了集合的唯一性和列表的有序性。通过本章的学习,我们了解了:

6.7.1 核心特性

  • 双重索引: 支持按成员和分数两种方式快速查找
  • 自动排序: 根据分数自动维护元素顺序
  • 高效操作: O(log N)的时间复杂度
  • 范围查询: 支持按分数和排名范围查询
  • 集合运算: 支持交集、并集等操作

6.7.2 适用场景

  • 排行榜系统: 游戏积分、销售排行等
  • 优先级队列: 任务调度、消息队列
  • 时间序列: 按时间戳排序的数据
  • 搜索排序: 按相关性分数排序
  • 实时统计: 滑动窗口计数等

6.7.3 性能优化要点

  • 编码优化: 合理配置ziplist参数
  • 批量操作: 使用管道减少网络开销
  • 内存管理: 定期清理和压缩数据
  • 分片策略: 大数据集考虑分片存储

6.7.4 最佳实践

  • 合理设计分数: 确保分数能正确反映排序需求
  • 定期维护: 清理过期数据,控制集合大小
  • 监控性能: 关注内存使用和操作延迟
  • 错误处理: 处理并发更新和数据一致性问题

参考资料