5.1 集合概述
5.1.1 什么是Redis集合
Redis集合(Set)是一个无序的字符串集合,其中每个元素都是唯一的。集合支持添加、删除和测试成员的存在性等操作,还支持集合间的交集、并集、差集等数学运算。
5.1.2 集合特点
- 唯一性: 集合中的元素不能重复
- 无序性: 元素没有固定的顺序
- 高效性: 添加、删除、查找的时间复杂度都是O(1)
- 集合运算: 支持交集、并集、差集等运算
- 原子性: 所有操作都是原子的
5.1.3 应用场景
- 标签系统: 为文章、用户等添加标签
- 好友关系: 存储用户的好友列表
- 权限管理: 存储用户的权限集合
- 去重处理: 对数据进行去重
- 推荐系统: 计算用户兴趣交集
- 在线用户: 跟踪当前在线用户
5.2 基本操作
5.2.1 添加和删除元素
# 添加元素到集合
SADD myset "apple"
SADD myset "banana" "orange"
SADD myset "apple" # 重复元素不会被添加
# 删除集合中的元素
SREM myset "banana"
SREM myset "grape" "apple" # 可以删除多个元素
# 随机删除并返回一个元素
SPOP myset
SPOP myset 2 # 删除并返回2个元素
# 移动元素到另一个集合
SMOVE myset otherset "orange"
5.2.2 查询操作
# 获取集合中的所有元素
SMEMBERS myset
# 检查元素是否存在
SISMEMBER myset "apple"
# 获取集合的大小
SCARD myset
# 随机获取元素(不删除)
SRANDMEMBER myset
SRANDMEMBER myset 3 # 获取3个随机元素
SRANDMEMBER myset -3 # 允许重复的3个随机元素
5.2.3 扫描操作
# 扫描集合元素
SSCAN myset 0
SSCAN myset 0 MATCH "a*" # 匹配以'a'开头的元素
SSCAN myset 0 COUNT 10 # 每次返回大约10个元素
5.3 集合运算
5.3.1 交集运算
# 创建测试集合
SADD set1 "a" "b" "c" "d"
SADD set2 "c" "d" "e" "f"
SADD set3 "d" "e" "f" "g"
# 计算交集
SINTER set1 set2 # 返回 {"c", "d"}
SINTER set1 set2 set3 # 返回 {"d"}
# 计算交集并存储到新集合
SINTERSTORE result set1 set2
SMEMBERS result
# 获取交集的大小(不返回具体元素)
# Redis 7.0+
# SINTERCARD 2 set1 set2
5.3.2 并集运算
# 计算并集
SUNION set1 set2 # 返回 {"a", "b", "c", "d", "e", "f"}
SUNION set1 set2 set3 # 返回 {"a", "b", "c", "d", "e", "f", "g"}
# 计算并集并存储到新集合
SUNIONSTORE result set1 set2
SMEMBERS result
5.3.3 差集运算
# 计算差集(在set1中但不在set2中的元素)
SDIFF set1 set2 # 返回 {"a", "b"}
SDIFF set2 set1 # 返回 {"e", "f"}
# 计算差集并存储到新集合
SDIFFSTORE result set1 set2
SMEMBERS result
5.4 实际应用示例
5.4.1 标签系统
import redis
from typing import Set, List, Dict, Any
class TagSystem:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.tag_prefix = "tags"
self.item_prefix = "item_tags"
def add_tags(self, item_id: str, tags: List[str]):
"""为项目添加标签"""
item_key = f"{self.item_prefix}:{item_id}"
# 添加标签到项目
if tags:
self.redis_client.sadd(item_key, *tags)
# 为每个标签添加项目引用
for tag in tags:
tag_key = f"{self.tag_prefix}:{tag}"
self.redis_client.sadd(tag_key, item_id)
def remove_tags(self, item_id: str, tags: List[str]):
"""从项目中移除标签"""
item_key = f"{self.item_prefix}:{item_id}"
# 从项目中移除标签
if tags:
self.redis_client.srem(item_key, *tags)
# 从标签中移除项目引用
for tag in tags:
tag_key = f"{self.tag_prefix}:{tag}"
self.redis_client.srem(tag_key, item_id)
# 如果标签没有项目了,删除标签
if self.redis_client.scard(tag_key) == 0:
self.redis_client.delete(tag_key)
def get_item_tags(self, item_id: str) -> Set[str]:
"""获取项目的所有标签"""
item_key = f"{self.item_prefix}:{item_id}"
return self.redis_client.smembers(item_key)
def get_items_by_tag(self, tag: str) -> Set[str]:
"""获取具有指定标签的所有项目"""
tag_key = f"{self.tag_prefix}:{tag}"
return self.redis_client.smembers(tag_key)
def find_items_with_all_tags(self, tags: List[str]) -> Set[str]:
"""查找同时具有所有指定标签的项目(交集)"""
if not tags:
return set()
tag_keys = [f"{self.tag_prefix}:{tag}" for tag in tags]
return self.redis_client.sinter(*tag_keys)
def find_items_with_any_tags(self, tags: List[str]) -> Set[str]:
"""查找具有任意指定标签的项目(并集)"""
if not tags:
return set()
tag_keys = [f"{self.tag_prefix}:{tag}" for tag in tags]
return self.redis_client.sunion(*tag_keys)
def get_related_items(self, item_id: str, min_common_tags: int = 1) -> Dict[str, int]:
"""获取相关项目(基于共同标签数量)"""
item_tags = self.get_item_tags(item_id)
if not item_tags:
return {}
# 获取所有具有这些标签的项目
related_items = self.find_items_with_any_tags(list(item_tags))
related_items.discard(item_id) # 排除自己
# 计算每个相关项目的共同标签数量
result = {}
for related_item in related_items:
related_tags = self.get_item_tags(related_item)
common_count = len(item_tags.intersection(related_tags))
if common_count >= min_common_tags:
result[related_item] = common_count
# 按共同标签数量排序
return dict(sorted(result.items(), key=lambda x: x[1], reverse=True))
def get_popular_tags(self, limit: int = 10) -> List[Dict[str, Any]]:
"""获取热门标签"""
# 扫描所有标签键
tag_stats = []
for key in self.redis_client.scan_iter(match=f"{self.tag_prefix}:*"):
tag = key.split(':', 1)[1]
count = self.redis_client.scard(key)
if count > 0:
tag_stats.append({'tag': tag, 'count': count})
# 按使用次数排序
tag_stats.sort(key=lambda x: x['count'], reverse=True)
return tag_stats[:limit]
def cleanup_empty_tags(self):
"""清理空标签"""
deleted_count = 0
for key in self.redis_client.scan_iter(match=f"{self.tag_prefix}:*"):
if self.redis_client.scard(key) == 0:
self.redis_client.delete(key)
deleted_count += 1
return deleted_count
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
tag_system = TagSystem(redis_client)
# 为文章添加标签
tag_system.add_tags("article_1", ["python", "redis", "database", "tutorial"])
tag_system.add_tags("article_2", ["python", "web", "flask", "tutorial"])
tag_system.add_tags("article_3", ["redis", "cache", "performance", "database"])
tag_system.add_tags("article_4", ["python", "data-science", "pandas"])
# 查询文章标签
print(f"文章1的标签: {tag_system.get_item_tags('article_1')}")
# 查询具有特定标签的文章
print(f"Python相关文章: {tag_system.get_items_by_tag('python')}")
# 查找同时具有多个标签的文章
common_articles = tag_system.find_items_with_all_tags(["python", "tutorial"])
print(f"同时具有python和tutorial标签的文章: {common_articles}")
# 获取相关文章
related = tag_system.get_related_items("article_1", min_common_tags=1)
print(f"与文章1相关的文章: {related}")
# 获取热门标签
popular_tags = tag_system.get_popular_tags(5)
print(f"热门标签: {popular_tags}")
# 移除标签
tag_system.remove_tags("article_1", ["tutorial"])
print(f"移除tutorial标签后,文章1的标签: {tag_system.get_item_tags('article_1')}")
5.4.2 好友关系系统
import redis
from typing import Set, List, Dict, Optional
class FriendshipSystem:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.friends_prefix = "friends"
self.followers_prefix = "followers"
self.following_prefix = "following"
def add_friend(self, user1: str, user2: str):
"""添加双向好友关系"""
friends_key1 = f"{self.friends_prefix}:{user1}"
friends_key2 = f"{self.friends_prefix}:{user2}"
self.redis_client.sadd(friends_key1, user2)
self.redis_client.sadd(friends_key2, user1)
def remove_friend(self, user1: str, user2: str):
"""移除双向好友关系"""
friends_key1 = f"{self.friends_prefix}:{user1}"
friends_key2 = f"{self.friends_prefix}:{user2}"
self.redis_client.srem(friends_key1, user2)
self.redis_client.srem(friends_key2, user1)
def follow(self, follower: str, following: str):
"""关注用户(单向关系)"""
followers_key = f"{self.followers_prefix}:{following}"
following_key = f"{self.following_prefix}:{follower}"
self.redis_client.sadd(followers_key, follower)
self.redis_client.sadd(following_key, following)
def unfollow(self, follower: str, following: str):
"""取消关注"""
followers_key = f"{self.followers_prefix}:{following}"
following_key = f"{self.following_prefix}:{follower}"
self.redis_client.srem(followers_key, follower)
self.redis_client.srem(following_key, following)
def get_friends(self, user: str) -> Set[str]:
"""获取用户的好友列表"""
friends_key = f"{self.friends_prefix}:{user}"
return self.redis_client.smembers(friends_key)
def get_followers(self, user: str) -> Set[str]:
"""获取用户的粉丝列表"""
followers_key = f"{self.followers_prefix}:{user}"
return self.redis_client.smembers(followers_key)
def get_following(self, user: str) -> Set[str]:
"""获取用户关注的人"""
following_key = f"{self.following_prefix}:{user}"
return self.redis_client.smembers(following_key)
def is_friend(self, user1: str, user2: str) -> bool:
"""检查是否为好友"""
friends_key = f"{self.friends_prefix}:{user1}"
return self.redis_client.sismember(friends_key, user2)
def is_following(self, follower: str, following: str) -> bool:
"""检查是否关注"""
following_key = f"{self.following_prefix}:{follower}"
return self.redis_client.sismember(following_key, following)
def get_mutual_friends(self, user1: str, user2: str) -> Set[str]:
"""获取共同好友"""
friends_key1 = f"{self.friends_prefix}:{user1}"
friends_key2 = f"{self.friends_prefix}:{user2}"
return self.redis_client.sinter(friends_key1, friends_key2)
def suggest_friends(self, user: str, limit: int = 10) -> List[Dict[str, any]]:
"""好友推荐(基于共同好友)"""
user_friends = self.get_friends(user)
suggestions = {}
# 遍历每个好友的好友列表
for friend in user_friends:
friend_friends = self.get_friends(friend)
for potential_friend in friend_friends:
if potential_friend != user and potential_friend not in user_friends:
if potential_friend not in suggestions:
suggestions[potential_friend] = {
'user': potential_friend,
'mutual_friends': set(),
'score': 0
}
suggestions[potential_friend]['mutual_friends'].add(friend)
suggestions[potential_friend]['score'] += 1
# 转换为列表并排序
result = []
for suggestion in suggestions.values():
suggestion['mutual_friends'] = list(suggestion['mutual_friends'])
result.append(suggestion)
result.sort(key=lambda x: x['score'], reverse=True)
return result[:limit]
def get_user_stats(self, user: str) -> Dict[str, int]:
"""获取用户统计信息"""
return {
'friends_count': len(self.get_friends(user)),
'followers_count': len(self.get_followers(user)),
'following_count': len(self.get_following(user))
}
def find_influencers(self, min_followers: int = 100) -> List[Dict[str, any]]:
"""查找影响者(粉丝数量多的用户)"""
influencers = []
# 扫描所有粉丝键
for key in self.redis_client.scan_iter(match=f"{self.followers_prefix}:*"):
user = key.split(':', 1)[1]
followers_count = self.redis_client.scard(key)
if followers_count >= min_followers:
influencers.append({
'user': user,
'followers_count': followers_count
})
# 按粉丝数量排序
influencers.sort(key=lambda x: x['followers_count'], reverse=True)
return influencers
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
friendship = FriendshipSystem(redis_client)
# 建立好友关系
friendship.add_friend("alice", "bob")
friendship.add_friend("alice", "charlie")
friendship.add_friend("bob", "david")
friendship.add_friend("charlie", "david")
# 建立关注关系
friendship.follow("eve", "alice")
friendship.follow("frank", "alice")
friendship.follow("alice", "bob")
# 查询关系
print(f"Alice的好友: {friendship.get_friends('alice')}")
print(f"Alice的粉丝: {friendship.get_followers('alice')}")
print(f"Alice关注的人: {friendship.get_following('alice')}")
# 检查关系
print(f"Alice和Bob是好友吗: {friendship.is_friend('alice', 'bob')}")
print(f"Eve关注Alice吗: {friendship.is_following('eve', 'alice')}")
# 共同好友
mutual = friendship.get_mutual_friends("alice", "david")
print(f"Alice和David的共同好友: {mutual}")
# 好友推荐
suggestions = friendship.suggest_friends("alice", 3)
print(f"为Alice推荐的好友: {suggestions}")
# 用户统计
stats = friendship.get_user_stats("alice")
print(f"Alice的统计信息: {stats}")
5.4.3 权限管理系统
import redis
from typing import Set, List, Dict, Optional
from enum import Enum
class PermissionType(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
EXECUTE = "execute"
class PermissionSystem:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.user_permissions_prefix = "user_perms"
self.role_permissions_prefix = "role_perms"
self.user_roles_prefix = "user_roles"
self.resource_permissions_prefix = "resource_perms"
def grant_permission(self, user: str, resource: str, permission: PermissionType):
"""授予用户对资源的权限"""
user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
self.redis_client.sadd(user_key, permission.value)
def revoke_permission(self, user: str, resource: str, permission: PermissionType):
"""撤销用户对资源的权限"""
user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
self.redis_client.srem(user_key, permission.value)
def assign_role(self, user: str, role: str):
"""为用户分配角色"""
user_roles_key = f"{self.user_roles_prefix}:{user}"
self.redis_client.sadd(user_roles_key, role)
def remove_role(self, user: str, role: str):
"""移除用户角色"""
user_roles_key = f"{self.user_roles_prefix}:{user}"
self.redis_client.srem(user_roles_key, role)
def grant_role_permission(self, role: str, resource: str, permission: PermissionType):
"""为角色授予权限"""
role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
self.redis_client.sadd(role_key, permission.value)
def revoke_role_permission(self, role: str, resource: str, permission: PermissionType):
"""撤销角色权限"""
role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
self.redis_client.srem(role_key, permission.value)
def has_permission(self, user: str, resource: str, permission: PermissionType) -> bool:
"""检查用户是否有特定权限"""
# 检查直接权限
user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
if self.redis_client.sismember(user_key, permission.value):
return True
# 检查角色权限
user_roles = self.get_user_roles(user)
for role in user_roles:
role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
if self.redis_client.sismember(role_key, permission.value):
return True
return False
def get_user_permissions(self, user: str, resource: str) -> Set[str]:
"""获取用户对资源的所有权限"""
# 直接权限
user_key = f"{self.user_permissions_prefix}:{user}:{resource}"
direct_perms = self.redis_client.smembers(user_key)
# 角色权限
role_perms = set()
user_roles = self.get_user_roles(user)
for role in user_roles:
role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
role_perms.update(self.redis_client.smembers(role_key))
return direct_perms.union(role_perms)
def get_user_roles(self, user: str) -> Set[str]:
"""获取用户的所有角色"""
user_roles_key = f"{self.user_roles_prefix}:{user}"
return self.redis_client.smembers(user_roles_key)
def get_role_permissions(self, role: str, resource: str) -> Set[str]:
"""获取角色对资源的权限"""
role_key = f"{self.role_permissions_prefix}:{role}:{resource}"
return self.redis_client.smembers(role_key)
def get_users_with_permission(self, resource: str, permission: PermissionType) -> Set[str]:
"""获取对资源有特定权限的所有用户"""
users_with_permission = set()
# 扫描直接权限
pattern = f"{self.user_permissions_prefix}:*:{resource}"
for key in self.redis_client.scan_iter(match=pattern):
if self.redis_client.sismember(key, permission.value):
user = key.split(':')[1]
users_with_permission.add(user)
# 扫描角色权限
role_pattern = f"{self.role_permissions_prefix}:*:{resource}"
roles_with_permission = set()
for key in self.redis_client.scan_iter(match=role_pattern):
if self.redis_client.sismember(key, permission.value):
role = key.split(':')[1]
roles_with_permission.add(role)
# 查找具有这些角色的用户
for role in roles_with_permission:
user_roles_pattern = f"{self.user_roles_prefix}:*"
for key in self.redis_client.scan_iter(match=user_roles_pattern):
if self.redis_client.sismember(key, role):
user = key.split(':')[1]
users_with_permission.add(user)
return users_with_permission
def check_access(self, user: str, resource: str, required_permissions: List[PermissionType]) -> Dict[str, bool]:
"""检查用户对资源的多个权限"""
result = {}
for permission in required_permissions:
result[permission.value] = self.has_permission(user, resource, permission)
return result
def get_user_accessible_resources(self, user: str, permission: PermissionType) -> List[str]:
"""获取用户有特定权限的所有资源"""
accessible_resources = set()
# 检查直接权限
pattern = f"{self.user_permissions_prefix}:{user}:*"
for key in self.redis_client.scan_iter(match=pattern):
if self.redis_client.sismember(key, permission.value):
resource = key.split(':', 2)[2]
accessible_resources.add(resource)
# 检查角色权限
user_roles = self.get_user_roles(user)
for role in user_roles:
role_pattern = f"{self.role_permissions_prefix}:{role}:*"
for key in self.redis_client.scan_iter(match=role_pattern):
if self.redis_client.sismember(key, permission.value):
resource = key.split(':', 2)[2]
accessible_resources.add(resource)
return list(accessible_resources)
def cleanup_user_permissions(self, user: str):
"""清理用户的所有权限和角色"""
# 删除用户角色
user_roles_key = f"{self.user_roles_prefix}:{user}"
self.redis_client.delete(user_roles_key)
# 删除用户权限
pattern = f"{self.user_permissions_prefix}:{user}:*"
for key in self.redis_client.scan_iter(match=pattern):
self.redis_client.delete(key)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
perm_system = PermissionSystem(redis_client)
# 创建角色和权限
perm_system.grant_role_permission("admin", "database", PermissionType.READ)
perm_system.grant_role_permission("admin", "database", PermissionType.WRITE)
perm_system.grant_role_permission("admin", "database", PermissionType.DELETE)
perm_system.grant_role_permission("editor", "articles", PermissionType.READ)
perm_system.grant_role_permission("editor", "articles", PermissionType.WRITE)
perm_system.grant_role_permission("viewer", "articles", PermissionType.READ)
# 分配角色
perm_system.assign_role("alice", "admin")
perm_system.assign_role("bob", "editor")
perm_system.assign_role("charlie", "viewer")
# 授予直接权限
perm_system.grant_permission("bob", "reports", PermissionType.READ)
# 检查权限
print(f"Alice对数据库有删除权限吗: {perm_system.has_permission('alice', 'database', PermissionType.DELETE)}")
print(f"Bob对文章有写权限吗: {perm_system.has_permission('bob', 'articles', PermissionType.WRITE)}")
print(f"Charlie对文章有写权限吗: {perm_system.has_permission('charlie', 'articles', PermissionType.WRITE)}")
# 获取用户权限
alice_perms = perm_system.get_user_permissions("alice", "database")
print(f"Alice对数据库的权限: {alice_perms}")
# 获取用户角色
bob_roles = perm_system.get_user_roles("bob")
print(f"Bob的角色: {bob_roles}")
# 检查多个权限
access_check = perm_system.check_access("bob", "articles", [PermissionType.READ, PermissionType.WRITE, PermissionType.DELETE])
print(f"Bob对文章的访问权限: {access_check}")
# 获取有特定权限的用户
users_with_write = perm_system.get_users_with_permission("articles", PermissionType.WRITE)
print(f"对文章有写权限的用户: {users_with_write}")
# 获取用户可访问的资源
bob_resources = perm_system.get_user_accessible_resources("bob", PermissionType.READ)
print(f"Bob有读权限的资源: {bob_resources}")
5.4.4 在线用户跟踪
import redis
import time
from typing import Set, List, Dict, Optional
from datetime import datetime, timedelta
class OnlineUserTracker:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.online_users_key = "online_users"
self.user_sessions_prefix = "user_sessions"
self.room_users_prefix = "room_users"
self.user_activity_prefix = "user_activity"
def user_online(self, user_id: str, session_id: str = None, room_id: str = None):
"""用户上线"""
# 添加到在线用户集合
self.redis_client.sadd(self.online_users_key, user_id)
# 记录会话信息
if session_id:
session_key = f"{self.user_sessions_prefix}:{user_id}"
self.redis_client.sadd(session_key, session_id)
self.redis_client.expire(session_key, 3600) # 1小时过期
# 加入房间
if room_id:
self.join_room(user_id, room_id)
# 更新活动时间
self.update_activity(user_id)
def user_offline(self, user_id: str, session_id: str = None):
"""用户下线"""
# 从在线用户集合中移除
self.redis_client.srem(self.online_users_key, user_id)
# 移除会话
if session_id:
session_key = f"{self.user_sessions_prefix}:{user_id}"
self.redis_client.srem(session_key, session_id)
# 如果没有其他会话,完全下线
if self.redis_client.scard(session_key) == 0:
self.redis_client.delete(session_key)
# 从所有房间中移除
self.leave_all_rooms(user_id)
def join_room(self, user_id: str, room_id: str):
"""加入房间"""
room_key = f"{self.room_users_prefix}:{room_id}"
self.redis_client.sadd(room_key, user_id)
self.update_activity(user_id)
def leave_room(self, user_id: str, room_id: str):
"""离开房间"""
room_key = f"{self.room_users_prefix}:{room_id}"
self.redis_client.srem(room_key, user_id)
def leave_all_rooms(self, user_id: str):
"""离开所有房间"""
# 扫描所有房间,移除用户
for key in self.redis_client.scan_iter(match=f"{self.room_users_prefix}:*"):
self.redis_client.srem(key, user_id)
def update_activity(self, user_id: str):
"""更新用户活动时间"""
activity_key = f"{self.user_activity_prefix}:{user_id}"
self.redis_client.set(activity_key, int(time.time()))
self.redis_client.expire(activity_key, 3600) # 1小时过期
def is_user_online(self, user_id: str) -> bool:
"""检查用户是否在线"""
return self.redis_client.sismember(self.online_users_key, user_id)
def get_online_users(self) -> Set[str]:
"""获取所有在线用户"""
return self.redis_client.smembers(self.online_users_key)
def get_online_count(self) -> int:
"""获取在线用户数量"""
return self.redis_client.scard(self.online_users_key)
def get_room_users(self, room_id: str) -> Set[str]:
"""获取房间内的用户"""
room_key = f"{self.room_users_prefix}:{room_id}"
return self.redis_client.smembers(room_key)
def get_room_online_users(self, room_id: str) -> Set[str]:
"""获取房间内的在线用户"""
room_key = f"{self.room_users_prefix}:{room_id}"
return self.redis_client.sinter(room_key, self.online_users_key)
def get_user_rooms(self, user_id: str) -> List[str]:
"""获取用户所在的房间"""
rooms = []
for key in self.redis_client.scan_iter(match=f"{self.room_users_prefix}:*"):
if self.redis_client.sismember(key, user_id):
room_id = key.split(':', 1)[1]
rooms.append(room_id)
return rooms
def get_common_rooms(self, user1: str, user2: str) -> List[str]:
"""获取两个用户的共同房间"""
user1_rooms = set(self.get_user_rooms(user1))
user2_rooms = set(self.get_user_rooms(user2))
return list(user1_rooms.intersection(user2_rooms))
def get_user_sessions(self, user_id: str) -> Set[str]:
"""获取用户的所有会话"""
session_key = f"{self.user_sessions_prefix}:{user_id}"
return self.redis_client.smembers(session_key)
def get_user_last_activity(self, user_id: str) -> Optional[datetime]:
"""获取用户最后活动时间"""
activity_key = f"{self.user_activity_prefix}:{user_id}"
timestamp = self.redis_client.get(activity_key)
if timestamp:
return datetime.fromtimestamp(int(timestamp))
return None
def cleanup_inactive_users(self, inactive_minutes: int = 30):
"""清理不活跃用户"""
cutoff_time = time.time() - (inactive_minutes * 60)
inactive_users = []
online_users = self.get_online_users()
for user_id in online_users:
activity_key = f"{self.user_activity_prefix}:{user_id}"
last_activity = self.redis_client.get(activity_key)
if not last_activity or int(last_activity) < cutoff_time:
inactive_users.append(user_id)
self.user_offline(user_id)
return inactive_users
def get_room_stats(self) -> Dict[str, Dict[str, int]]:
"""获取房间统计信息"""
stats = {}
for key in self.redis_client.scan_iter(match=f"{self.room_users_prefix}:*"):
room_id = key.split(':', 1)[1]
total_users = self.redis_client.scard(key)
online_users = len(self.get_room_online_users(room_id))
stats[room_id] = {
'total_users': total_users,
'online_users': online_users,
'offline_users': total_users - online_users
}
return stats
def broadcast_to_room(self, room_id: str, message: str, exclude_user: str = None) -> List[str]:
"""向房间广播消息(返回接收用户列表)"""
room_users = self.get_room_online_users(room_id)
if exclude_user:
room_users.discard(exclude_user)
# 这里只返回用户列表,实际广播逻辑需要在应用层实现
return list(room_users)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
tracker = OnlineUserTracker(redis_client)
# 用户上线
tracker.user_online("user1", "session1", "room1")
tracker.user_online("user2", "session2", "room1")
tracker.user_online("user3", "session3", "room2")
tracker.user_online("user1", "session4", "room2") # 用户1同时在两个房间
# 查询在线状态
print(f"用户1是否在线: {tracker.is_user_online('user1')}")
print(f"在线用户总数: {tracker.get_online_count()}")
print(f"所有在线用户: {tracker.get_online_users()}")
# 查询房间信息
print(f"房间1的用户: {tracker.get_room_users('room1')}")
print(f"房间1的在线用户: {tracker.get_room_online_users('room1')}")
# 查询用户信息
print(f"用户1所在的房间: {tracker.get_user_rooms('user1')}")
print(f"用户1的会话: {tracker.get_user_sessions('user1')}")
# 共同房间
common_rooms = tracker.get_common_rooms("user1", "user2")
print(f"用户1和用户2的共同房间: {common_rooms}")
# 房间统计
room_stats = tracker.get_room_stats()
print(f"房间统计: {room_stats}")
# 广播消息
recipients = tracker.broadcast_to_room("room1", "Hello everyone!", exclude_user="user1")
print(f"房间1广播消息的接收者: {recipients}")
# 用户下线
tracker.user_offline("user2", "session2")
print(f"用户2下线后,房间1的在线用户: {tracker.get_room_online_users('room1')}")
# 清理不活跃用户
time.sleep(1)
inactive_users = tracker.cleanup_inactive_users(0) # 立即清理
print(f"清理的不活跃用户: {inactive_users}")
5.5 性能优化
5.5.1 集合编码优化
# 查看集合编码
redis-cli
> OBJECT ENCODING myset
# 配置优化参数
# redis.conf
set-max-intset-entries 512 # 整数集合最大元素数
5.5.2 内存优化
import redis
from typing import Set, List, Iterator
class OptimizedRedisSet:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def batch_add(self, key: str, members: List[str], batch_size: int = 1000):
"""批量添加成员"""
for i in range(0, len(members), batch_size):
batch = members[i:i + batch_size]
self.redis_client.sadd(key, *batch)
def batch_remove(self, key: str, members: List[str], batch_size: int = 1000):
"""批量删除成员"""
for i in range(0, len(members), batch_size):
batch = members[i:i + batch_size]
self.redis_client.srem(key, *batch)
def scan_members(self, key: str, pattern: str = "*", count: int = 10) -> Iterator[str]:
"""扫描集合成员"""
cursor = 0
while True:
cursor, members = self.redis_client.sscan(key, cursor, match=pattern, count=count)
for member in members:
yield member
if cursor == 0:
break
def efficient_intersection(self, keys: List[str], store_key: str = None) -> Set[str]:
"""高效交集计算"""
if not keys:
return set()
# 按集合大小排序,从小到大
key_sizes = [(key, self.redis_client.scard(key)) for key in keys]
key_sizes.sort(key=lambda x: x[1])
sorted_keys = [key for key, _ in key_sizes]
if store_key:
self.redis_client.sinterstore(store_key, *sorted_keys)
return self.redis_client.smembers(store_key)
else:
return self.redis_client.sinter(*sorted_keys)
def memory_efficient_union(self, keys: List[str], store_key: str) -> int:
"""内存高效的并集计算"""
if not keys:
return 0
# 使用SUNIONSTORE避免传输大量数据
return self.redis_client.sunionstore(store_key, *keys)
def compress_set(self, key: str, compressed_key: str, compression_ratio: float = 0.1):
"""压缩集合(保留部分元素)"""
total_size = self.redis_client.scard(key)
sample_size = max(1, int(total_size * compression_ratio))
# 随机采样
sampled_members = self.redis_client.srandmember(key, sample_size)
if sampled_members:
self.redis_client.sadd(compressed_key, *sampled_members)
return len(sampled_members) if sampled_members else 0
# 使用示例
optimized_set = OptimizedRedisSet(redis_client)
# 批量操作
large_members = [f"member_{i}" for i in range(10000)]
optimized_set.batch_add("large_set", large_members)
# 扫描成员
print("扫描前100个成员:")
count = 0
for member in optimized_set.scan_members("large_set", count=100):
print(member)
count += 1
if count >= 10: # 只显示前10个
break
# 高效交集
optimized_set.batch_add("set1", ["a", "b", "c", "d"])
optimized_set.batch_add("set2", ["c", "d", "e", "f"])
intersection = optimized_set.efficient_intersection(["set1", "set2"])
print(f"交集结果: {intersection}")
# 压缩集合
compressed_size = optimized_set.compress_set("large_set", "compressed_set", 0.01)
print(f"压缩后大小: {compressed_size}")
5.5.3 管道化操作
import redis
from typing import List, Dict, Any
class PipelinedSetOperations:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def bulk_set_operations(self, operations: List[Dict[str, Any]]) -> List[Any]:
"""批量集合操作"""
pipe = self.redis_client.pipeline()
for op in operations:
cmd = op['command']
key = op['key']
if cmd == 'sadd':
pipe.sadd(key, *op['members'])
elif cmd == 'srem':
pipe.srem(key, *op['members'])
elif cmd == 'scard':
pipe.scard(key)
elif cmd == 'sismember':
pipe.sismember(key, op['member'])
elif cmd == 'smembers':
pipe.smembers(key)
elif cmd == 'sinter':
pipe.sinter(*op['keys'])
elif cmd == 'sunion':
pipe.sunion(*op['keys'])
elif cmd == 'sdiff':
pipe.sdiff(*op['keys'])
return pipe.execute()
def multi_set_stats(self, keys: List[str]) -> Dict[str, Dict[str, Any]]:
"""多集合统计"""
pipe = self.redis_client.pipeline()
for key in keys:
pipe.exists(key)
pipe.scard(key)
pipe.srandmember(key, 3) # 获取3个随机样本
results = pipe.execute()
stats = {}
for i, key in enumerate(keys):
base_idx = i * 3
stats[key] = {
'exists': bool(results[base_idx]),
'size': results[base_idx + 1],
'samples': results[base_idx + 2] or []
}
return stats
def batch_membership_check(self, key: str, members: List[str]) -> Dict[str, bool]:
"""批量成员检查"""
pipe = self.redis_client.pipeline()
for member in members:
pipe.sismember(key, member)
results = pipe.execute()
return dict(zip(members, results))
def efficient_set_comparison(self, set1: str, set2: str) -> Dict[str, Any]:
"""高效集合比较"""
pipe = self.redis_client.pipeline()
# 获取基本信息
pipe.scard(set1)
pipe.scard(set2)
# 计算集合运算
pipe.sinter(set1, set2) # 交集
pipe.sunion(set1, set2) # 并集
pipe.sdiff(set1, set2) # set1 - set2
pipe.sdiff(set2, set1) # set2 - set1
results = pipe.execute()
return {
'set1_size': results[0],
'set2_size': results[1],
'intersection': results[2],
'union': results[3],
'set1_only': results[4],
'set2_only': results[5],
'intersection_size': len(results[2]),
'union_size': len(results[3]),
'jaccard_similarity': len(results[2]) / len(results[3]) if results[3] else 0
}
# 使用示例
pipelined_ops = PipelinedSetOperations(redis_client)
# 批量操作
operations = [
{'command': 'sadd', 'key': 'set1', 'members': ['a', 'b', 'c']},
{'command': 'sadd', 'key': 'set2', 'members': ['c', 'd', 'e']},
{'command': 'scard', 'key': 'set1'},
{'command': 'scard', 'key': 'set2'},
{'command': 'sinter', 'keys': ['set1', 'set2']}
]
results = pipelined_ops.bulk_set_operations(operations)
print(f"批量操作结果: {results}")
# 多集合统计
stats = pipelined_ops.multi_set_stats(['set1', 'set2', 'set3'])
for key, stat in stats.items():
print(f"{key}: 存在={stat['exists']}, 大小={stat['size']}, 样本={stat['samples']}")
# 批量成员检查
membership = pipelined_ops.batch_membership_check('set1', ['a', 'b', 'x', 'y'])
print(f"成员检查结果: {membership}")
# 集合比较
comparison = pipelined_ops.efficient_set_comparison('set1', 'set2')
print(f"集合比较结果: {comparison}")
5.6 监控和调试
5.6.1 集合监控
import redis
import time
from typing import Dict, Any, List
class SetMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def get_set_info(self, key: str) -> Dict[str, Any]:
"""获取集合详细信息"""
info = {
'exists': self.redis_client.exists(key),
'type': self.redis_client.type(key),
'size': 0,
'encoding': None,
'memory_usage': 0,
'ttl': self.redis_client.ttl(key)
}
if info['exists']:
info['size'] = self.redis_client.scard(key)
info['encoding'] = self.redis_client.object('encoding', key)
# 内存使用(Redis 4.0+)
try:
info['memory_usage'] = self.redis_client.memory_usage(key)
except:
info['memory_usage'] = 'N/A'
# 获取样本数据
if info['size'] > 0:
info['samples'] = self.redis_client.srandmember(key, min(5, info['size']))
return info
def analyze_set_distribution(self, pattern: str = "*") -> Dict[str, Any]:
"""分析集合分布"""
sizes = []
encodings = {}
total_memory = 0
set_count = 0
for key in self.redis_client.scan_iter(match=pattern):
if self.redis_client.type(key) == 'set':
set_count += 1
size = self.redis_client.scard(key)
sizes.append(size)
encoding = self.redis_client.object('encoding', key)
encodings[encoding] = encodings.get(encoding, 0) + 1
try:
memory = self.redis_client.memory_usage(key)
total_memory += memory
except:
pass
if not sizes:
return {'error': 'No sets found'}
return {
'total_sets': set_count,
'size_stats': {
'min': min(sizes),
'max': max(sizes),
'avg': sum(sizes) / len(sizes),
'total': sum(sizes)
},
'encoding_distribution': encodings,
'total_memory': total_memory,
'avg_memory_per_set': total_memory / set_count if set_count > 0 else 0
}
def monitor_set_operations(self, key: str, duration: int = 60) -> Dict[str, Any]:
"""监控集合操作"""
start_time = time.time()
initial_size = self.redis_client.scard(key)
samples = [{'timestamp': start_time, 'size': initial_size}]
while time.time() - start_time < duration:
time.sleep(1)
current_size = self.redis_client.scard(key)
samples.append({
'timestamp': time.time(),
'size': current_size
})
return self._analyze_operation_samples(samples)
def _analyze_operation_samples(self, samples: List[Dict[str, Any]]) -> Dict[str, Any]:
"""分析操作样本"""
if len(samples) < 2:
return {}
sizes = [s['size'] for s in samples]
changes = [sizes[i] - sizes[i-1] for i in range(1, len(sizes))]
return {
'duration': samples[-1]['timestamp'] - samples[0]['timestamp'],
'initial_size': sizes[0],
'final_size': sizes[-1],
'size_change': sizes[-1] - sizes[0],
'max_size': max(sizes),
'min_size': min(sizes),
'avg_size': sum(sizes) / len(sizes),
'total_changes': sum(abs(c) for c in changes),
'net_change': sum(changes)
}
def diagnose_set_issues(self, key: str) -> Dict[str, Any]:
"""诊断集合问题"""
info = self.get_set_info(key)
issues = []
recommendations = []
if not info['exists']:
return {'error': 'Set does not exist'}
# 检查大小
if info['size'] > 100000:
issues.append("集合过大,可能影响性能")
recommendations.append("考虑分片或使用SSCAN分批处理")
# 检查编码
if info['encoding'] == 'hashtable' and info['size'] < 512:
issues.append("小集合使用hashtable编码,内存效率较低")
recommendations.append("检查set-max-intset-entries配置")
# 检查内存使用
if isinstance(info['memory_usage'], int):
avg_member_size = info['memory_usage'] / info['size'] if info['size'] > 0 else 0
if avg_member_size > 100: # 平均每个成员超过100字节
issues.append("成员平均大小较大")
recommendations.append("考虑压缩成员数据或使用引用")
# 检查TTL
if info['ttl'] == -1 and info['size'] > 10000:
issues.append("大集合没有设置过期时间")
recommendations.append("设置合适的TTL避免内存泄漏")
return {
'info': info,
'issues': issues,
'recommendations': recommendations
}
def compare_sets(self, key1: str, key2: str) -> Dict[str, Any]:
"""比较两个集合"""
info1 = self.get_set_info(key1)
info2 = self.get_set_info(key2)
if not (info1['exists'] and info2['exists']):
return {'error': 'One or both sets do not exist'}
# 计算集合运算
intersection = self.redis_client.sinter(key1, key2)
union = self.redis_client.sunion(key1, key2)
diff1 = self.redis_client.sdiff(key1, key2)
diff2 = self.redis_client.sdiff(key2, key1)
return {
'set1_info': info1,
'set2_info': info2,
'intersection_size': len(intersection),
'union_size': len(union),
'set1_only_size': len(diff1),
'set2_only_size': len(diff2),
'jaccard_similarity': len(intersection) / len(union) if union else 0,
'overlap_ratio': len(intersection) / min(info1['size'], info2['size']) if min(info1['size'], info2['size']) > 0 else 0
}
# 使用示例
monitor = SetMonitor(redis_client)
# 获取集合信息
info = monitor.get_set_info("my_set")
print(f"集合信息: {info}")
# 分析集合分布
distribution = monitor.analyze_set_distribution("set*")
print(f"集合分布分析: {distribution}")
# 诊断问题
diagnosis = monitor.diagnose_set_issues("my_set")
print(f"诊断结果: {diagnosis}")
# 比较集合
if redis_client.exists("set1") and redis_client.exists("set2"):
comparison = monitor.compare_sets("set1", "set2")
print(f"集合比较: {comparison}")
5.7 总结
Redis集合类型是一个功能强大的数据结构,具有以下特点:
5.7.1 核心特性
- 唯一性: 自动去重,确保元素唯一
- 无序性: 元素没有固定顺序,但操作高效
- 集合运算: 支持交集、并集、差集等数学运算
- 高效查询: O(1)时间复杂度的成员检查
- 原子操作: 所有操作都是原子的
5.7.2 适用场景
- 标签系统: 为内容添加和管理标签
- 社交网络: 管理好友关系和关注关系
- 权限控制: 存储和检查用户权限
- 去重处理: 对数据进行自动去重
- 在线状态: 跟踪用户在线状态和房间成员
- 推荐系统: 基于共同兴趣进行推荐
5.7.3 性能优化要点
- 编码优化: 合理配置intset参数
- 批量操作: 使用管道减少网络开销
- 集合运算: 优先使用STORE命令避免数据传输
- 扫描操作: 使用SSCAN处理大集合
- 内存管理: 定期清理和监控内存使用
5.7.4 最佳实践
- 根据数据特点选择合适的编码方式
- 避免在大集合上进行频繁的SMEMBERS操作
- 使用集合运算的STORE版本减少网络传输
- 合理设置TTL防止内存泄漏
- 定期监控集合大小和性能指标
- 对于大型集合考虑分片策略
Redis集合类型为开发者提供了处理唯一性数据和集合运算的强大工具,正确使用能够显著简化应用逻辑并提升性能。