5.1 集合概述

5.1.1 什么是Redis集合

Redis集合(Set)是一个无序的字符串集合,其中每个元素都是唯一的。集合支持添加、删除和测试成员的存在性等操作,还支持集合间的交集、并集、差集等数学运算。

5.1.2 集合特点

  • 唯一性: 集合中的元素不能重复
  • 无序性: 元素没有固定的顺序
  • 高效性: 添加、删除、查找的时间复杂度都是O(1)
  • 集合运算: 支持交集、并集、差集等运算
  • 原子性: 所有操作都是原子的

5.1.3 应用场景

  • 标签系统: 为文章、用户等添加标签
  • 好友关系: 存储用户的好友列表
  • 权限管理: 存储用户的权限集合
  • 去重处理: 对数据进行去重
  • 推荐系统: 计算用户兴趣交集
  • 在线用户: 跟踪当前在线用户

5.2 基本操作

5.2.1 添加和删除元素

# 添加元素到集合
SADD myset "apple"
SADD myset "banana" "orange"
SADD myset "apple"  # 重复元素不会被添加

# 删除集合中的元素
SREM myset "banana"
SREM myset "grape" "apple"  # 可以删除多个元素

# 随机删除并返回一个元素
SPOP myset
SPOP myset 2  # 删除并返回2个元素

# 移动元素到另一个集合
SMOVE myset otherset "orange"

5.2.2 查询操作

# 获取集合中的所有元素
SMEMBERS myset

# 检查元素是否存在
SISMEMBER myset "apple"

# 获取集合的大小
SCARD myset

# 随机获取元素(不删除)
SRANDMEMBER myset
SRANDMEMBER myset 3  # 获取3个随机元素
SRANDMEMBER myset -3  # 允许重复的3个随机元素

5.2.3 扫描操作

# 扫描集合元素
SSCAN myset 0
SSCAN myset 0 MATCH "a*"  # 匹配以'a'开头的元素
SSCAN myset 0 COUNT 10    # 每次返回大约10个元素

5.3 集合运算

5.3.1 交集运算

# 创建测试集合
SADD set1 "a" "b" "c" "d"
SADD set2 "c" "d" "e" "f"
SADD set3 "d" "e" "f" "g"

# 计算交集
SINTER set1 set2          # 返回 {"c", "d"}
SINTER set1 set2 set3     # 返回 {"d"}

# 计算交集并存储到新集合
SINTERSTORE result set1 set2
SMEMBERS result

# 获取交集的大小(不返回具体元素)
# Redis 7.0+
# SINTERCARD 2 set1 set2

5.3.2 并集运算

# 计算并集
SUNION set1 set2          # 返回 {"a", "b", "c", "d", "e", "f"}
SUNION set1 set2 set3     # 返回 {"a", "b", "c", "d", "e", "f", "g"}

# 计算并集并存储到新集合
SUNIONSTORE result set1 set2
SMEMBERS result

5.3.3 差集运算

# 计算差集(在set1中但不在set2中的元素)
SDIFF set1 set2           # 返回 {"a", "b"}
SDIFF set2 set1           # 返回 {"e", "f"}

# 计算差集并存储到新集合
SDIFFSTORE result set1 set2
SMEMBERS result

5.4 实际应用示例

5.4.1 标签系统

import redis
from typing import Set, List, Dict, Any

class TagSystem:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.tag_prefix = "tags"
        self.item_prefix = "item_tags"
    
    def add_tags(self, item_id: str, tags: List[str]):
        """为项目添加标签"""
        item_key = f"{self.item_prefix}:{item_id}"
        
        # 添加标签到项目
        if tags:
            self.redis_client.sadd(item_key, *tags)
        
        # 为每个标签添加项目引用
        for tag in tags:
            tag_key = f"{self.tag_prefix}:{tag}"
            self.redis_client.sadd(tag_key, item_id)
    
    def remove_tags(self, item_id: str, tags: List[str]):
        """从项目中移除标签"""
        item_key = f"{self.item_prefix}:{item_id}"
        
        # 从项目中移除标签
        if tags:
            self.redis_client.srem(item_key, *tags)
        
        # 从标签中移除项目引用
        for tag in tags:
            tag_key = f"{self.tag_prefix}:{tag}"
            self.redis_client.srem(tag_key, item_id)
            
            # 如果标签没有项目了,删除标签
            if self.redis_client.scard(tag_key) == 0:
                self.redis_client.delete(tag_key)
    
    def get_item_tags(self, item_id: str) -> Set[str]:
        """获取项目的所有标签"""
        item_key = f"{self.item_prefix}:{item_id}"
        return self.redis_client.smembers(item_key)
    
    def get_items_by_tag(self, tag: str) -> Set[str]:
        """获取具有指定标签的所有项目"""
        tag_key = f"{self.tag_prefix}:{tag}"
        return self.redis_client.smembers(tag_key)
    
    def find_items_with_all_tags(self, tags: List[str]) -> Set[str]:
        """查找同时具有所有指定标签的项目(交集)"""
        if not tags:
            return set()
        
        tag_keys = [f"{self.tag_prefix}:{tag}" for tag in tags]
        return self.redis_client.sinter(*tag_keys)
    
    def find_items_with_any_tags(self, tags: List[str]) -> Set[str]:
        """查找具有任意指定标签的项目(并集)"""
        if not tags:
            return set()
        
        tag_keys = [f"{self.tag_prefix}:{tag}" for tag in tags]
        return self.redis_client.sunion(*tag_keys)
    
    def get_related_items(self, item_id: str, min_common_tags: int = 1) -> Dict[str, int]:
        """获取相关项目(基于共同标签数量)"""
        item_tags = self.get_item_tags(item_id)
        if not item_tags:
            return {}
        
        # 获取所有具有这些标签的项目
        related_items = self.find_items_with_any_tags(list(item_tags))
        related_items.discard(item_id)  # 排除自己
        
        # 计算每个相关项目的共同标签数量
        result = {}
        for related_item in related_items:
            related_tags = self.get_item_tags(related_item)
            common_count = len(item_tags.intersection(related_tags))
            if common_count >= min_common_tags:
                result[related_item] = common_count
        
        # 按共同标签数量排序
        return dict(sorted(result.items(), key=lambda x: x[1], reverse=True))
    
    def get_popular_tags(self, limit: int = 10) -> List[Dict[str, Any]]:
        """获取热门标签"""
        # 扫描所有标签键
        tag_stats = []
        for key in self.redis_client.scan_iter(match=f"{self.tag_prefix}:*"):
            tag = key.split(':', 1)[1]
            count = self.redis_client.scard(key)
            if count > 0:
                tag_stats.append({'tag': tag, 'count': count})
        
        # 按使用次数排序
        tag_stats.sort(key=lambda x: x['count'], reverse=True)
        return tag_stats[:limit]
    
    def cleanup_empty_tags(self):
        """清理空标签"""
        deleted_count = 0
        for key in self.redis_client.scan_iter(match=f"{self.tag_prefix}:*"):
            if self.redis_client.scard(key) == 0:
                self.redis_client.delete(key)
                deleted_count += 1
        return deleted_count

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    tag_system = TagSystem(redis_client)
    
    # 为文章添加标签
    tag_system.add_tags("article_1", ["python", "redis", "database", "tutorial"])
    tag_system.add_tags("article_2", ["python", "web", "flask", "tutorial"])
    tag_system.add_tags("article_3", ["redis", "cache", "performance", "database"])
    tag_system.add_tags("article_4", ["python", "data-science", "pandas"])
    
    # 查询文章标签
    print(f"文章1的标签: {tag_system.get_item_tags('article_1')}")
    
    # 查询具有特定标签的文章
    print(f"Python相关文章: {tag_system.get_items_by_tag('python')}")
    
    # 查找同时具有多个标签的文章
    common_articles = tag_system.find_items_with_all_tags(["python", "tutorial"])
    print(f"同时具有python和tutorial标签的文章: {common_articles}")
    
    # 获取相关文章
    related = tag_system.get_related_items("article_1", min_common_tags=1)
    print(f"与文章1相关的文章: {related}")
    
    # 获取热门标签
    popular_tags = tag_system.get_popular_tags(5)
    print(f"热门标签: {popular_tags}")
    
    # 移除标签
    tag_system.remove_tags("article_1", ["tutorial"])
    print(f"移除tutorial标签后,文章1的标签: {tag_system.get_item_tags('article_1')}")

5.4.2 好友关系系统

import redis
from typing import Set, List, Dict, Optional

class FriendshipSystem:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.friends_prefix = "friends"
        self.followers_prefix = "followers"
        self.following_prefix = "following"
    
    def add_friend(self, user1: str, user2: str):
        """添加双向好友关系"""
        friends_key1 = f"{self.friends_prefix}:{user1}"
        friends_key2 = f"{self.friends_prefix}:{user2}"
        
        self.redis_client.sadd(friends_key1, user2)
        self.redis_client.sadd(friends_key2, user1)
    
    def remove_friend(self, user1: str, user2: str):
        """移除双向好友关系"""
        friends_key1 = f"{self.friends_prefix}:{user1}"
        friends_key2 = f"{self.friends_prefix}:{user2}"
        
        self.redis_client.srem(friends_key1, user2)
        self.redis_client.srem(friends_key2, user1)
    
    def follow(self, follower: str, following: str):
        """关注用户(单向关系)"""
        followers_key = f"{self.followers_prefix}:{following}"
        following_key = f"{self.following_prefix}:{follower}"
        
        self.redis_client.sadd(followers_key, follower)
        self.redis_client.sadd(following_key, following)
    
    def unfollow(self, follower: str, following: str):
        """取消关注"""
        followers_key = f"{self.followers_prefix}:{following}"
        following_key = f"{self.following_prefix}:{follower}"
        
        self.redis_client.srem(followers_key, follower)
        self.redis_client.srem(following_key, following)
    
    def get_friends(self, user: str) -> Set[str]:
        """获取用户的好友列表"""
        friends_key = f"{self.friends_prefix}:{user}"
        return self.redis_client.smembers(friends_key)
    
    def get_followers(self, user: str) -> Set[str]:
        """获取用户的粉丝列表"""
        followers_key = f"{self.followers_prefix}:{user}"
        return self.redis_client.smembers(followers_key)
    
    def get_following(self, user: str) -> Set[str]:
        """获取用户关注的人"""
        following_key = f"{self.following_prefix}:{user}"
        return self.redis_client.smembers(following_key)
    
    def is_friend(self, user1: str, user2: str) -> bool:
        """检查是否为好友"""
        friends_key = f"{self.friends_prefix}:{user1}"
        return self.redis_client.sismember(friends_key, user2)
    
    def is_following(self, follower: str, following: str) -> bool:
        """检查是否关注"""
        following_key = f"{self.following_prefix}:{follower}"
        return self.redis_client.sismember(following_key, following)
    
    def get_mutual_friends(self, user1: str, user2: str) -> Set[str]:
        """获取共同好友"""
        friends_key1 = f"{self.friends_prefix}:{user1}"
        friends_key2 = f"{self.friends_prefix}:{user2}"
        return self.redis_client.sinter(friends_key1, friends_key2)
    
    def suggest_friends(self, user: str, limit: int = 10) -> List[Dict[str, any]]:
        """好友推荐(基于共同好友)"""
        user_friends = self.get_friends(user)
        suggestions = {}
        
        # 遍历每个好友的好友列表
        for friend in user_friends:
            friend_friends = self.get_friends(friend)
            
            for potential_friend in friend_friends:
                if potential_friend != user and potential_friend not in user_friends:
                    if potential_friend not in suggestions:
                        suggestions[potential_friend] = {
                            'user': potential_friend,
                            'mutual_friends': set(),
                            'score': 0
                        }
                    
                    suggestions[potential_friend]['mutual_friends'].add(friend)
                    suggestions[potential_friend]['score'] += 1
        
        # 转换为列表并排序
        result = []
        for suggestion in suggestions.values():
            suggestion['mutual_friends'] = list(suggestion['mutual_friends'])
            result.append(suggestion)
        
        result.sort(key=lambda x: x['score'], reverse=True)
        return result[:limit]
    
    def get_user_stats(self, user: str) -> Dict[str, int]:
        """获取用户统计信息"""
        return {
            'friends_count': len(self.get_friends(user)),
            'followers_count': len(self.get_followers(user)),
            'following_count': len(self.get_following(user))
        }
    
    def find_influencers(self, min_followers: int = 100) -> List[Dict[str, any]]:
        """查找影响者(粉丝数量多的用户)"""
        influencers = []
        
        # 扫描所有粉丝键
        for key in self.redis_client.scan_iter(match=f"{self.followers_prefix}:*"):
            user = key.split(':', 1)[1]
            followers_count = self.redis_client.scard(key)
            
            if followers_count >= min_followers:
                influencers.append({
                    'user': user,
                    'followers_count': followers_count
                })
        
        # 按粉丝数量排序
        influencers.sort(key=lambda x: x['followers_count'], reverse=True)
        return influencers

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    friendship = FriendshipSystem(redis_client)
    
    # 建立好友关系
    friendship.add_friend("alice", "bob")
    friendship.add_friend("alice", "charlie")
    friendship.add_friend("bob", "david")
    friendship.add_friend("charlie", "david")
    
    # 建立关注关系
    friendship.follow("eve", "alice")
    friendship.follow("frank", "alice")
    friendship.follow("alice", "bob")
    
    # 查询关系
    print(f"Alice的好友: {friendship.get_friends('alice')}")
    print(f"Alice的粉丝: {friendship.get_followers('alice')}")
    print(f"Alice关注的人: {friendship.get_following('alice')}")
    
    # 检查关系
    print(f"Alice和Bob是好友吗: {friendship.is_friend('alice', 'bob')}")
    print(f"Eve关注Alice吗: {friendship.is_following('eve', 'alice')}")
    
    # 共同好友
    mutual = friendship.get_mutual_friends("alice", "david")
    print(f"Alice和David的共同好友: {mutual}")
    
    # 好友推荐
    suggestions = friendship.suggest_friends("alice", 3)
    print(f"为Alice推荐的好友: {suggestions}")
    
    # 用户统计
    stats = friendship.get_user_stats("alice")
    print(f"Alice的统计信息: {stats}")

5.4.3 权限管理系统

import redis
from typing import Set, List, Dict, Optional
from enum import Enum

class PermissionType(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"
    EXECUTE = "execute"

class PermissionSystem:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.user_permissions_prefix = "user_perms"
        self.role_permissions_prefix = "role_perms"
        self.user_roles_prefix = "user_roles"
        self.resource_permissions_prefix = "resource_perms"
    
    def grant_permission(self, user: str, resource: str, permission: PermissionType):
        """授予用户对资源的权限"""
        user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
        self.redis_client.sadd(user_key, permission.value)
    
    def revoke_permission(self, user: str, resource: str, permission: PermissionType):
        """撤销用户对资源的权限"""
        user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
        self.redis_client.srem(user_key, permission.value)
    
    def assign_role(self, user: str, role: str):
        """为用户分配角色"""
        user_roles_key = f"{self.user_roles_prefix}:{user}"
        self.redis_client.sadd(user_roles_key, role)
    
    def remove_role(self, user: str, role: str):
        """移除用户角色"""
        user_roles_key = f"{self.user_roles_prefix}:{user}"
        self.redis_client.srem(user_roles_key, role)
    
    def grant_role_permission(self, role: str, resource: str, permission: PermissionType):
        """为角色授予权限"""
        role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
        self.redis_client.sadd(role_key, permission.value)
    
    def revoke_role_permission(self, role: str, resource: str, permission: PermissionType):
        """撤销角色权限"""
        role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
        self.redis_client.srem(role_key, permission.value)
    
    def has_permission(self, user: str, resource: str, permission: PermissionType) -> bool:
        """检查用户是否有特定权限"""
        # 检查直接权限
        user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
        if self.redis_client.sismember(user_key, permission.value):
            return True
        
        # 检查角色权限
        user_roles = self.get_user_roles(user)
        for role in user_roles:
            role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
            if self.redis_client.sismember(role_key, permission.value):
                return True
        
        return False
    
    def get_user_permissions(self, user: str, resource: str) -> Set[str]:
        """获取用户对资源的所有权限"""
        # 直接权限
        user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
        direct_perms = self.redis_client.smembers(user_key)
        
        # 角色权限
        role_perms = set()
        user_roles = self.get_user_roles(user)
        for role in user_roles:
            role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
            role_perms.update(self.redis_client.smembers(role_key))
        
        return direct_perms.union(role_perms)
    
    def get_user_roles(self, user: str) -> Set[str]:
        """获取用户的所有角色"""
        user_roles_key = f"{self.user_roles_prefix}:{user}"
        return self.redis_client.smembers(user_roles_key)
    
    def get_role_permissions(self, role: str, resource: str) -> Set[str]:
        """获取角色对资源的权限"""
        role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
        return self.redis_client.smembers(role_key)
    
    def get_users_with_permission(self, resource: str, permission: PermissionType) -> Set[str]:
        """获取对资源有特定权限的所有用户"""
        users_with_permission = set()
        
        # 扫描直接权限
        pattern = f"{self.user_permissions_prefix}:*:{resource}"
        for key in self.redis_client.scan_iter(match=pattern):
            if self.redis_client.sismember(key, permission.value):
                user = key.split(':')[1]
                users_with_permission.add(user)
        
        # 扫描角色权限
        role_pattern = f"{self.role_permissions_prefix}:*:{resource}"
        roles_with_permission = set()
        
        for key in self.redis_client.scan_iter(match=role_pattern):
            if self.redis_client.sismember(key, permission.value):
                role = key.split(':')[1]
                roles_with_permission.add(role)
        
        # 查找具有这些角色的用户
        for role in roles_with_permission:
            user_roles_pattern = f"{self.user_roles_prefix}:*"
            for key in self.redis_client.scan_iter(match=user_roles_pattern):
                if self.redis_client.sismember(key, role):
                    user = key.split(':')[1]
                    users_with_permission.add(user)
        
        return users_with_permission
    
    def check_access(self, user: str, resource: str, required_permissions: List[PermissionType]) -> Dict[str, bool]:
        """检查用户对资源的多个权限"""
        result = {}
        for permission in required_permissions:
            result[permission.value] = self.has_permission(user, resource, permission)
        return result
    
    def get_user_accessible_resources(self, user: str, permission: PermissionType) -> List[str]:
        """获取用户有特定权限的所有资源"""
        accessible_resources = set()
        
        # 检查直接权限
        pattern = f"{self.user_permissions_prefix}:{user}:*"
        for key in self.redis_client.scan_iter(match=pattern):
            if self.redis_client.sismember(key, permission.value):
                resource = key.split(':', 2)[2]
                accessible_resources.add(resource)
        
        # 检查角色权限
        user_roles = self.get_user_roles(user)
        for role in user_roles:
            role_pattern = f"{self.role_permissions_prefix}:{role}:*"
            for key in self.redis_client.scan_iter(match=role_pattern):
                if self.redis_client.sismember(key, permission.value):
                    resource = key.split(':', 2)[2]
                    accessible_resources.add(resource)
        
        return list(accessible_resources)
    
    def cleanup_user_permissions(self, user: str):
        """清理用户的所有权限和角色"""
        # 删除用户角色
        user_roles_key = f"{self.user_roles_prefix}:{user}"
        self.redis_client.delete(user_roles_key)
        
        # 删除用户权限
        pattern = f"{self.user_permissions_prefix}:{user}:*"
        for key in self.redis_client.scan_iter(match=pattern):
            self.redis_client.delete(key)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    perm_system = PermissionSystem(redis_client)
    
    # 创建角色和权限
    perm_system.grant_role_permission("admin", "database", PermissionType.READ)
    perm_system.grant_role_permission("admin", "database", PermissionType.WRITE)
    perm_system.grant_role_permission("admin", "database", PermissionType.DELETE)
    
    perm_system.grant_role_permission("editor", "articles", PermissionType.READ)
    perm_system.grant_role_permission("editor", "articles", PermissionType.WRITE)
    
    perm_system.grant_role_permission("viewer", "articles", PermissionType.READ)
    
    # 分配角色
    perm_system.assign_role("alice", "admin")
    perm_system.assign_role("bob", "editor")
    perm_system.assign_role("charlie", "viewer")
    
    # 授予直接权限
    perm_system.grant_permission("bob", "reports", PermissionType.READ)
    
    # 检查权限
    print(f"Alice对数据库有删除权限吗: {perm_system.has_permission('alice', 'database', PermissionType.DELETE)}")
    print(f"Bob对文章有写权限吗: {perm_system.has_permission('bob', 'articles', PermissionType.WRITE)}")
    print(f"Charlie对文章有写权限吗: {perm_system.has_permission('charlie', 'articles', PermissionType.WRITE)}")
    
    # 获取用户权限
    alice_perms = perm_system.get_user_permissions("alice", "database")
    print(f"Alice对数据库的权限: {alice_perms}")
    
    # 获取用户角色
    bob_roles = perm_system.get_user_roles("bob")
    print(f"Bob的角色: {bob_roles}")
    
    # 检查多个权限
    access_check = perm_system.check_access("bob", "articles", [PermissionType.READ, PermissionType.WRITE, PermissionType.DELETE])
    print(f"Bob对文章的访问权限: {access_check}")
    
    # 获取有特定权限的用户
    users_with_write = perm_system.get_users_with_permission("articles", PermissionType.WRITE)
    print(f"对文章有写权限的用户: {users_with_write}")
    
    # 获取用户可访问的资源
    bob_resources = perm_system.get_user_accessible_resources("bob", PermissionType.READ)
    print(f"Bob有读权限的资源: {bob_resources}")

5.4.4 在线用户跟踪

import redis
import time
from typing import Set, List, Dict, Optional
from datetime import datetime, timedelta

class OnlineUserTracker:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.online_users_key = "online_users"
        self.user_sessions_prefix = "user_sessions"
        self.room_users_prefix = "room_users"
        self.user_activity_prefix = "user_activity"
    
    def user_online(self, user_id: str, session_id: str = None, room_id: str = None):
        """用户上线"""
        # 添加到在线用户集合
        self.redis_client.sadd(self.online_users_key, user_id)
        
        # 记录会话信息
        if session_id:
            session_key = f"{self.user_sessions_prefix}:{user_id}"
            self.redis_client.sadd(session_key, session_id)
            self.redis_client.expire(session_key, 3600)  # 1小时过期
        
        # 加入房间
        if room_id:
            self.join_room(user_id, room_id)
        
        # 更新活动时间
        self.update_activity(user_id)
    
    def user_offline(self, user_id: str, session_id: str = None):
        """用户下线"""
        # 从在线用户集合中移除
        self.redis_client.srem(self.online_users_key, user_id)
        
        # 移除会话
        if session_id:
            session_key = f"{self.user_sessions_prefix}:{user_id}"
            self.redis_client.srem(session_key, session_id)
            
            # 如果没有其他会话,完全下线
            if self.redis_client.scard(session_key) == 0:
                self.redis_client.delete(session_key)
                # 从所有房间中移除
                self.leave_all_rooms(user_id)
    
    def join_room(self, user_id: str, room_id: str):
        """加入房间"""
        room_key = f"{self.room_users_prefix}:{room_id}"
        self.redis_client.sadd(room_key, user_id)
        self.update_activity(user_id)
    
    def leave_room(self, user_id: str, room_id: str):
        """离开房间"""
        room_key = f"{self.room_users_prefix}:{room_id}"
        self.redis_client.srem(room_key, user_id)
    
    def leave_all_rooms(self, user_id: str):
        """离开所有房间"""
        # 扫描所有房间,移除用户
        for key in self.redis_client.scan_iter(match=f"{self.room_users_prefix}:*"):
            self.redis_client.srem(key, user_id)
    
    def update_activity(self, user_id: str):
        """更新用户活动时间"""
        activity_key = f"{self.user_activity_prefix}:{user_id}"
        self.redis_client.set(activity_key, int(time.time()))
        self.redis_client.expire(activity_key, 3600)  # 1小时过期
    
    def is_user_online(self, user_id: str) -> bool:
        """检查用户是否在线"""
        return self.redis_client.sismember(self.online_users_key, user_id)
    
    def get_online_users(self) -> Set[str]:
        """获取所有在线用户"""
        return self.redis_client.smembers(self.online_users_key)
    
    def get_online_count(self) -> int:
        """获取在线用户数量"""
        return self.redis_client.scard(self.online_users_key)
    
    def get_room_users(self, room_id: str) -> Set[str]:
        """获取房间内的用户"""
        room_key = f"{self.room_users_prefix}:{room_id}"
        return self.redis_client.smembers(room_key)
    
    def get_room_online_users(self, room_id: str) -> Set[str]:
        """获取房间内的在线用户"""
        room_key = f"{self.room_users_prefix}:{room_id}"
        return self.redis_client.sinter(room_key, self.online_users_key)
    
    def get_user_rooms(self, user_id: str) -> List[str]:
        """获取用户所在的房间"""
        rooms = []
        for key in self.redis_client.scan_iter(match=f"{self.room_users_prefix}:*"):
            if self.redis_client.sismember(key, user_id):
                room_id = key.split(':', 1)[1]
                rooms.append(room_id)
        return rooms
    
    def get_common_rooms(self, user1: str, user2: str) -> List[str]:
        """获取两个用户的共同房间"""
        user1_rooms = set(self.get_user_rooms(user1))
        user2_rooms = set(self.get_user_rooms(user2))
        return list(user1_rooms.intersection(user2_rooms))
    
    def get_user_sessions(self, user_id: str) -> Set[str]:
        """获取用户的所有会话"""
        session_key = f"{self.user_sessions_prefix}:{user_id}"
        return self.redis_client.smembers(session_key)
    
    def get_user_last_activity(self, user_id: str) -> Optional[datetime]:
        """获取用户最后活动时间"""
        activity_key = f"{self.user_activity_prefix}:{user_id}"
        timestamp = self.redis_client.get(activity_key)
        if timestamp:
            return datetime.fromtimestamp(int(timestamp))
        return None
    
    def cleanup_inactive_users(self, inactive_minutes: int = 30):
        """清理不活跃用户"""
        cutoff_time = time.time() - (inactive_minutes * 60)
        inactive_users = []
        
        online_users = self.get_online_users()
        for user_id in online_users:
            activity_key = f"{self.user_activity_prefix}:{user_id}"
            last_activity = self.redis_client.get(activity_key)
            
            if not last_activity or int(last_activity) < cutoff_time:
                inactive_users.append(user_id)
                self.user_offline(user_id)
        
        return inactive_users
    
    def get_room_stats(self) -> Dict[str, Dict[str, int]]:
        """获取房间统计信息"""
        stats = {}
        
        for key in self.redis_client.scan_iter(match=f"{self.room_users_prefix}:*"):
            room_id = key.split(':', 1)[1]
            total_users = self.redis_client.scard(key)
            online_users = len(self.get_room_online_users(room_id))
            
            stats[room_id] = {
                'total_users': total_users,
                'online_users': online_users,
                'offline_users': total_users - online_users
            }
        
        return stats
    
    def broadcast_to_room(self, room_id: str, message: str, exclude_user: str = None) -> List[str]:
        """向房间广播消息(返回接收用户列表)"""
        room_users = self.get_room_online_users(room_id)
        
        if exclude_user:
            room_users.discard(exclude_user)
        
        # 这里只返回用户列表,实际广播逻辑需要在应用层实现
        return list(room_users)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    tracker = OnlineUserTracker(redis_client)
    
    # 用户上线
    tracker.user_online("user1", "session1", "room1")
    tracker.user_online("user2", "session2", "room1")
    tracker.user_online("user3", "session3", "room2")
    tracker.user_online("user1", "session4", "room2")  # 用户1同时在两个房间
    
    # 查询在线状态
    print(f"用户1是否在线: {tracker.is_user_online('user1')}")
    print(f"在线用户总数: {tracker.get_online_count()}")
    print(f"所有在线用户: {tracker.get_online_users()}")
    
    # 查询房间信息
    print(f"房间1的用户: {tracker.get_room_users('room1')}")
    print(f"房间1的在线用户: {tracker.get_room_online_users('room1')}")
    
    # 查询用户信息
    print(f"用户1所在的房间: {tracker.get_user_rooms('user1')}")
    print(f"用户1的会话: {tracker.get_user_sessions('user1')}")
    
    # 共同房间
    common_rooms = tracker.get_common_rooms("user1", "user2")
    print(f"用户1和用户2的共同房间: {common_rooms}")
    
    # 房间统计
    room_stats = tracker.get_room_stats()
    print(f"房间统计: {room_stats}")
    
    # 广播消息
    recipients = tracker.broadcast_to_room("room1", "Hello everyone!", exclude_user="user1")
    print(f"房间1广播消息的接收者: {recipients}")
    
    # 用户下线
    tracker.user_offline("user2", "session2")
    print(f"用户2下线后,房间1的在线用户: {tracker.get_room_online_users('room1')}")
    
    # 清理不活跃用户
    time.sleep(1)
    inactive_users = tracker.cleanup_inactive_users(0)  # 立即清理
    print(f"清理的不活跃用户: {inactive_users}")

5.5 性能优化

5.5.1 集合编码优化

# 查看集合编码
redis-cli
> OBJECT ENCODING myset

# 配置优化参数
# redis.conf
set-max-intset-entries 512    # 整数集合最大元素数

5.5.2 内存优化

import redis
from typing import Set, List, Iterator

class OptimizedRedisSet:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def batch_add(self, key: str, members: List[str], batch_size: int = 1000):
        """批量添加成员"""
        for i in range(0, len(members), batch_size):
            batch = members[i:i + batch_size]
            self.redis_client.sadd(key, *batch)
    
    def batch_remove(self, key: str, members: List[str], batch_size: int = 1000):
        """批量删除成员"""
        for i in range(0, len(members), batch_size):
            batch = members[i:i + batch_size]
            self.redis_client.srem(key, *batch)
    
    def scan_members(self, key: str, pattern: str = "*", count: int = 10) -> Iterator[str]:
        """扫描集合成员"""
        cursor = 0
        while True:
            cursor, members = self.redis_client.sscan(key, cursor, match=pattern, count=count)
            for member in members:
                yield member
            if cursor == 0:
                break
    
    def efficient_intersection(self, keys: List[str], store_key: str = None) -> Set[str]:
        """高效交集计算"""
        if not keys:
            return set()
        
        # 按集合大小排序,从小到大
        key_sizes = [(key, self.redis_client.scard(key)) for key in keys]
        key_sizes.sort(key=lambda x: x[1])
        sorted_keys = [key for key, _ in key_sizes]
        
        if store_key:
            self.redis_client.sinterstore(store_key, *sorted_keys)
            return self.redis_client.smembers(store_key)
        else:
            return self.redis_client.sinter(*sorted_keys)
    
    def memory_efficient_union(self, keys: List[str], store_key: str) -> int:
        """内存高效的并集计算"""
        if not keys:
            return 0
        
        # 使用SUNIONSTORE避免传输大量数据
        return self.redis_client.sunionstore(store_key, *keys)
    
    def compress_set(self, key: str, compressed_key: str, compression_ratio: float = 0.1):
        """压缩集合(保留部分元素)"""
        total_size = self.redis_client.scard(key)
        sample_size = max(1, int(total_size * compression_ratio))
        
        # 随机采样
        sampled_members = self.redis_client.srandmember(key, sample_size)
        if sampled_members:
            self.redis_client.sadd(compressed_key, *sampled_members)
        
        return len(sampled_members) if sampled_members else 0

# 使用示例
optimized_set = OptimizedRedisSet(redis_client)

# 批量操作
large_members = [f"member_{i}" for i in range(10000)]
optimized_set.batch_add("large_set", large_members)

# 扫描成员
print("扫描前100个成员:")
count = 0
for member in optimized_set.scan_members("large_set", count=100):
    print(member)
    count += 1
    if count >= 10:  # 只显示前10个
        break

# 高效交集
optimized_set.batch_add("set1", ["a", "b", "c", "d"])
optimized_set.batch_add("set2", ["c", "d", "e", "f"])
intersection = optimized_set.efficient_intersection(["set1", "set2"])
print(f"交集结果: {intersection}")

# 压缩集合
compressed_size = optimized_set.compress_set("large_set", "compressed_set", 0.01)
print(f"压缩后大小: {compressed_size}")

5.5.3 管道化操作

import redis
from typing import List, Dict, Any

class PipelinedSetOperations:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def bulk_set_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
        """批量集合操作"""
        pipe = self.redis_client.pipeline()
        
        for op in operations:
            cmd = op['command']
            key = op['key']
            
            if cmd == 'sadd':
                pipe.sadd(key, *op['members'])
            elif cmd == 'srem':
                pipe.srem(key, *op['members'])
            elif cmd == 'scard':
                pipe.scard(key)
            elif cmd == 'sismember':
                pipe.sismember(key, op['member'])
            elif cmd == 'smembers':
                pipe.smembers(key)
            elif cmd == 'sinter':
                pipe.sinter(*op['keys'])
            elif cmd == 'sunion':
                pipe.sunion(*op['keys'])
            elif cmd == 'sdiff':
                pipe.sdiff(*op['keys'])
        
        return pipe.execute()
    
    def multi_set_stats(self, keys: List[str]) -> Dict[str, Dict[str, Any]]:
        """多集合统计"""
        pipe = self.redis_client.pipeline()
        
        for key in keys:
            pipe.exists(key)
            pipe.scard(key)
            pipe.srandmember(key, 3)  # 获取3个随机样本
        
        results = pipe.execute()
        
        stats = {}
        for i, key in enumerate(keys):
            base_idx = i * 3
            stats[key] = {
                'exists': bool(results[base_idx]),
                'size': results[base_idx + 1],
                'samples': results[base_idx + 2] or []
            }
        
        return stats
    
    def batch_membership_check(self, key: str, members: List[str]) -> Dict[str, bool]:
        """批量成员检查"""
        pipe = self.redis_client.pipeline()
        
        for member in members:
            pipe.sismember(key, member)
        
        results = pipe.execute()
        
        return dict(zip(members, results))
    
    def efficient_set_comparison(self, set1: str, set2: str) -> Dict[str, Any]:
        """高效集合比较"""
        pipe = self.redis_client.pipeline()
        
        # 获取基本信息
        pipe.scard(set1)
        pipe.scard(set2)
        
        # 计算集合运算
        pipe.sinter(set1, set2)  # 交集
        pipe.sunion(set1, set2)  # 并集
        pipe.sdiff(set1, set2)   # set1 - set2
        pipe.sdiff(set2, set1)   # set2 - set1
        
        results = pipe.execute()
        
        return {
            'set1_size': results[0],
            'set2_size': results[1],
            'intersection': results[2],
            'union': results[3],
            'set1_only': results[4],
            'set2_only': results[5],
            'intersection_size': len(results[2]),
            'union_size': len(results[3]),
            'jaccard_similarity': len(results[2]) / len(results[3]) if results[3] else 0
        }

# 使用示例
pipelined_ops = PipelinedSetOperations(redis_client)

# 批量操作
operations = [
    {'command': 'sadd', 'key': 'set1', 'members': ['a', 'b', 'c']},
    {'command': 'sadd', 'key': 'set2', 'members': ['c', 'd', 'e']},
    {'command': 'scard', 'key': 'set1'},
    {'command': 'scard', 'key': 'set2'},
    {'command': 'sinter', 'keys': ['set1', 'set2']}
]

results = pipelined_ops.bulk_set_operations(operations)
print(f"批量操作结果: {results}")

# 多集合统计
stats = pipelined_ops.multi_set_stats(['set1', 'set2', 'set3'])
for key, stat in stats.items():
    print(f"{key}: 存在={stat['exists']}, 大小={stat['size']}, 样本={stat['samples']}")

# 批量成员检查
membership = pipelined_ops.batch_membership_check('set1', ['a', 'b', 'x', 'y'])
print(f"成员检查结果: {membership}")

# 集合比较
comparison = pipelined_ops.efficient_set_comparison('set1', 'set2')
print(f"集合比较结果: {comparison}")

5.6 监控和调试

5.6.1 集合监控

import redis
import time
from typing import Dict, Any, List

class SetMonitor:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def get_set_info(self, key: str) -> Dict[str, Any]:
        """获取集合详细信息"""
        info = {
            'exists': self.redis_client.exists(key),
            'type': self.redis_client.type(key),
            'size': 0,
            'encoding': None,
            'memory_usage': 0,
            'ttl': self.redis_client.ttl(key)
        }
        
        if info['exists']:
            info['size'] = self.redis_client.scard(key)
            info['encoding'] = self.redis_client.object('encoding', key)
            
            # 内存使用(Redis 4.0+)
            try:
                info['memory_usage'] = self.redis_client.memory_usage(key)
            except:
                info['memory_usage'] = 'N/A'
            
            # 获取样本数据
            if info['size'] > 0:
                info['samples'] = self.redis_client.srandmember(key, min(5, info['size']))
        
        return info
    
    def analyze_set_distribution(self, pattern: str = "*") -> Dict[str, Any]:
        """分析集合分布"""
        sizes = []
        encodings = {}
        total_memory = 0
        set_count = 0
        
        for key in self.redis_client.scan_iter(match=pattern):
            if self.redis_client.type(key) == 'set':
                set_count += 1
                size = self.redis_client.scard(key)
                sizes.append(size)
                
                encoding = self.redis_client.object('encoding', key)
                encodings[encoding] = encodings.get(encoding, 0) + 1
                
                try:
                    memory = self.redis_client.memory_usage(key)
                    total_memory += memory
                except:
                    pass
        
        if not sizes:
            return {'error': 'No sets found'}
        
        return {
            'total_sets': set_count,
            'size_stats': {
                'min': min(sizes),
                'max': max(sizes),
                'avg': sum(sizes) / len(sizes),
                'total': sum(sizes)
            },
            'encoding_distribution': encodings,
            'total_memory': total_memory,
            'avg_memory_per_set': total_memory / set_count if set_count > 0 else 0
        }
    
    def monitor_set_operations(self, key: str, duration: int = 60) -> Dict[str, Any]:
        """监控集合操作"""
        start_time = time.time()
        initial_size = self.redis_client.scard(key)
        samples = [{'timestamp': start_time, 'size': initial_size}]
        
        while time.time() - start_time < duration:
            time.sleep(1)
            current_size = self.redis_client.scard(key)
            samples.append({
                'timestamp': time.time(),
                'size': current_size
            })
        
        return self._analyze_operation_samples(samples)
    
    def _analyze_operation_samples(self, samples: List[Dict[str, Any]]) -> Dict[str, Any]:
        """分析操作样本"""
        if len(samples) < 2:
            return {}
        
        sizes = [s['size'] for s in samples]
        changes = [sizes[i] - sizes[i-1] for i in range(1, len(sizes))]
        
        return {
            'duration': samples[-1]['timestamp'] - samples[0]['timestamp'],
            'initial_size': sizes[0],
            'final_size': sizes[-1],
            'size_change': sizes[-1] - sizes[0],
            'max_size': max(sizes),
            'min_size': min(sizes),
            'avg_size': sum(sizes) / len(sizes),
            'total_changes': sum(abs(c) for c in changes),
            'net_change': sum(changes)
        }
    
    def diagnose_set_issues(self, key: str) -> Dict[str, Any]:
        """诊断集合问题"""
        info = self.get_set_info(key)
        issues = []
        recommendations = []
        
        if not info['exists']:
            return {'error': 'Set does not exist'}
        
        # 检查大小
        if info['size'] > 100000:
            issues.append("集合过大,可能影响性能")
            recommendations.append("考虑分片或使用SSCAN分批处理")
        
        # 检查编码
        if info['encoding'] == 'hashtable' and info['size'] < 512:
            issues.append("小集合使用hashtable编码,内存效率较低")
            recommendations.append("检查set-max-intset-entries配置")
        
        # 检查内存使用
        if isinstance(info['memory_usage'], int):
            avg_member_size = info['memory_usage'] / info['size'] if info['size'] > 0 else 0
            if avg_member_size > 100:  # 平均每个成员超过100字节
                issues.append("成员平均大小较大")
                recommendations.append("考虑压缩成员数据或使用引用")
        
        # 检查TTL
        if info['ttl'] == -1 and info['size'] > 10000:
            issues.append("大集合没有设置过期时间")
            recommendations.append("设置合适的TTL避免内存泄漏")
        
        return {
            'info': info,
            'issues': issues,
            'recommendations': recommendations
        }
    
    def compare_sets(self, key1: str, key2: str) -> Dict[str, Any]:
        """比较两个集合"""
        info1 = self.get_set_info(key1)
        info2 = self.get_set_info(key2)
        
        if not (info1['exists'] and info2['exists']):
            return {'error': 'One or both sets do not exist'}
        
        # 计算集合运算
        intersection = self.redis_client.sinter(key1, key2)
        union = self.redis_client.sunion(key1, key2)
        diff1 = self.redis_client.sdiff(key1, key2)
        diff2 = self.redis_client.sdiff(key2, key1)
        
        return {
            'set1_info': info1,
            'set2_info': info2,
            'intersection_size': len(intersection),
            'union_size': len(union),
            'set1_only_size': len(diff1),
            'set2_only_size': len(diff2),
            'jaccard_similarity': len(intersection) / len(union) if union else 0,
            'overlap_ratio': len(intersection) / min(info1['size'], info2['size']) if min(info1['size'], info2['size']) > 0 else 0
        }

# 使用示例
monitor = SetMonitor(redis_client)

# 获取集合信息
info = monitor.get_set_info("my_set")
print(f"集合信息: {info}")

# 分析集合分布
distribution = monitor.analyze_set_distribution("set*")
print(f"集合分布分析: {distribution}")

# 诊断问题
diagnosis = monitor.diagnose_set_issues("my_set")
print(f"诊断结果: {diagnosis}")

# 比较集合
if redis_client.exists("set1") and redis_client.exists("set2"):
    comparison = monitor.compare_sets("set1", "set2")
    print(f"集合比较: {comparison}")

5.7 总结

Redis集合类型是一个功能强大的数据结构,具有以下特点:

5.7.1 核心特性

  • 唯一性: 自动去重,确保元素唯一
  • 无序性: 元素没有固定顺序,但操作高效
  • 集合运算: 支持交集、并集、差集等数学运算
  • 高效查询: O(1)时间复杂度的成员检查
  • 原子操作: 所有操作都是原子的

5.7.2 适用场景

  • 标签系统: 为内容添加和管理标签
  • 社交网络: 管理好友关系和关注关系
  • 权限控制: 存储和检查用户权限
  • 去重处理: 对数据进行自动去重
  • 在线状态: 跟踪用户在线状态和房间成员
  • 推荐系统: 基于共同兴趣进行推荐

5.7.3 性能优化要点

  • 编码优化: 合理配置intset参数
  • 批量操作: 使用管道减少网络开销
  • 集合运算: 优先使用STORE命令避免数据传输
  • 扫描操作: 使用SSCAN处理大集合
  • 内存管理: 定期清理和监控内存使用

5.7.4 最佳实践

  • 根据数据特点选择合适的编码方式
  • 避免在大集合上进行频繁的SMEMBERS操作
  • 使用集合运算的STORE版本减少网络传输
  • 合理设置TTL防止内存泄漏
  • 定期监控集合大小和性能指标
  • 对于大型集合考虑分片策略

Redis集合类型为开发者提供了处理唯一性数据和集合运算的强大工具,正确使用能够显著简化应用逻辑并提升性能。

参考资料