14.1 电商系统中的Redis应用
14.1.1 购物车系统
”`python import redis import json import time from typing import Dict, List, Any, Optional from dataclasses import dataclass from datetime import datetime, timedelta import uuid
@dataclass class CartItem: “”“购物车商品项”“” product_id: str product_name: str price: float quantity: int sku_id: str attributes: Dict[str, Any] = None added_time: str = None
class ShoppingCartService: “”“购物车服务”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.cart_prefix = "cart:"
self.cart_ttl = 7 * 24 * 3600 # 7天过期
def _get_cart_key(self, user_id: str) -> str:
"""获取购物车键名"""
return f"{self.cart_prefix}{user_id}"
def add_item(self, user_id: str, item: CartItem) -> bool:
"""添加商品到购物车"""
try:
cart_key = self._get_cart_key(user_id)
# 检查商品是否已存在
existing_item = self.redis_client.hget(cart_key, item.product_id)
if existing_item:
# 更新数量
existing_data = json.loads(existing_item)
existing_data['quantity'] += item.quantity
existing_data['updated_time'] = datetime.now().isoformat()
else:
# 新增商品
existing_data = {
'product_id': item.product_id,
'product_name': item.product_name,
'price': item.price,
'quantity': item.quantity,
'sku_id': item.sku_id,
'attributes': item.attributes or {},
'added_time': datetime.now().isoformat(),
'updated_time': datetime.now().isoformat()
}
# 保存到Redis
self.redis_client.hset(cart_key, item.product_id, json.dumps(existing_data))
self.redis_client.expire(cart_key, self.cart_ttl)
return True
except Exception as e:
print(f"添加商品到购物车失败: {e}")
return False
def remove_item(self, user_id: str, product_id: str) -> bool:
"""从购物车移除商品"""
try:
cart_key = self._get_cart_key(user_id)
result = self.redis_client.hdel(cart_key, product_id)
return bool(result)
except Exception as e:
print(f"移除购物车商品失败: {e}")
return False
def update_quantity(self, user_id: str, product_id: str, quantity: int) -> bool:
"""更新商品数量"""
try:
cart_key = self._get_cart_key(user_id)
item_data = self.redis_client.hget(cart_key, product_id)
if not item_data:
return False
item_info = json.loads(item_data)
item_info['quantity'] = quantity
item_info['updated_time'] = datetime.now().isoformat()
self.redis_client.hset(cart_key, product_id, json.dumps(item_info))
return True
except Exception as e:
print(f"更新商品数量失败: {e}")
return False
def get_cart(self, user_id: str) -> Dict[str, Any]:
"""获取购物车内容"""
try:
cart_key = self._get_cart_key(user_id)
cart_data = self.redis_client.hgetall(cart_key)
items = []
total_amount = 0
total_quantity = 0
for product_id, item_data in cart_data.items():
item_info = json.loads(item_data)
items.append(item_info)
total_amount += item_info['price'] * item_info['quantity']
total_quantity += item_info['quantity']
return {
'user_id': user_id,
'items': items,
'total_quantity': total_quantity,
'total_amount': round(total_amount, 2),
'updated_time': datetime.now().isoformat()
}
except Exception as e:
print(f"获取购物车失败: {e}")
return {'items': [], 'total_quantity': 0, 'total_amount': 0}
def clear_cart(self, user_id: str) -> bool:
"""清空购物车"""
try:
cart_key = self._get_cart_key(user_id)
self.redis_client.delete(cart_key)
return True
except Exception as e:
print(f"清空购物车失败: {e}")
return False
def merge_carts(self, source_user_id: str, target_user_id: str) -> bool:
"""合并购物车(用户登录时)"""
try:
source_cart = self.get_cart(source_user_id)
for item in source_cart['items']:
cart_item = CartItem(
product_id=item['product_id'],
product_name=item['product_name'],
price=item['price'],
quantity=item['quantity'],
sku_id=item['sku_id'],
attributes=item.get('attributes', {})
)
self.add_item(target_user_id, cart_item)
# 清空源购物车
self.clear_cart(source_user_id)
return True
except Exception as e:
print(f"合并购物车失败: {e}")
return False
14.1.2 商品库存管理
”`python class InventoryService: “”“库存管理服务”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.inventory_prefix = "inventory:"
self.lock_prefix = "lock:inventory:"
self.reserved_prefix = "reserved:"
def _get_inventory_key(self, product_id: str) -> str:
"""获取库存键名"""
return f"{self.inventory_prefix}{product_id}"
def _get_lock_key(self, product_id: str) -> str:
"""获取锁键名"""
return f"{self.lock_prefix}{product_id}"
def _get_reserved_key(self, product_id: str) -> str:
"""获取预留库存键名"""
return f"{self.reserved_prefix}{product_id}"
def set_inventory(self, product_id: str, quantity: int) -> bool:
"""设置商品库存"""
try:
inventory_key = self._get_inventory_key(product_id)
self.redis_client.set(inventory_key, quantity)
return True
except Exception as e:
print(f"设置库存失败: {e}")
return False
def get_inventory(self, product_id: str) -> int:
"""获取商品库存"""
try:
inventory_key = self._get_inventory_key(product_id)
inventory = self.redis_client.get(inventory_key)
return int(inventory) if inventory else 0
except Exception as e:
print(f"获取库存失败: {e}")
return 0
def reserve_inventory(self, product_id: str, quantity: int, order_id: str) -> bool:
"""预留库存"""
lock_key = self._get_lock_key(product_id)
# 使用Lua脚本确保原子性
lua_script = """
local inventory_key = KEYS[1]
local reserved_key = KEYS[2]
local lock_key = KEYS[3]
local quantity = tonumber(ARGV[1])
local order_id = ARGV[2]
local lock_timeout = tonumber(ARGV[3])
-- 获取分布式锁
local lock_acquired = redis.call('SET', lock_key, order_id, 'PX', lock_timeout, 'NX')
if not lock_acquired then
return {0, 'lock_failed'}
end
-- 检查库存
local current_inventory = tonumber(redis.call('GET', inventory_key) or 0)
local current_reserved = tonumber(redis.call('GET', reserved_key) or 0)
local available = current_inventory - current_reserved
if available >= quantity then
-- 增加预留库存
redis.call('INCRBY', reserved_key, quantity)
redis.call('EXPIRE', reserved_key, 1800) -- 30分钟过期
-- 记录预留信息
local reservation_key = 'reservation:' .. order_id
redis.call('HSET', reservation_key, 'product_id', KEYS[1], 'quantity', quantity, 'timestamp', ARGV[4])
redis.call('EXPIRE', reservation_key, 1800)
-- 释放锁
redis.call('DEL', lock_key)
return {1, 'success'}
else
-- 释放锁
redis.call('DEL', lock_key)
return {0, 'insufficient_inventory'}
end
"""
try:
inventory_key = self._get_inventory_key(product_id)
reserved_key = self._get_reserved_key(product_id)
result = self.redis_client.eval(
lua_script,
3,
inventory_key,
reserved_key,
lock_key,
quantity,
order_id,
5000, # 锁超时时间(毫秒)
int(time.time())
)
return result[0] == 1
except Exception as e:
print(f"预留库存失败: {e}")
return False
def confirm_reservation(self, order_id: str) -> bool:
"""确认预留(扣减库存)"""
lua_script = """
local reservation_key = 'reservation:' .. ARGV[1]
local reservation_data = redis.call('HGETALL', reservation_key)
if #reservation_data == 0 then
return {0, 'reservation_not_found'}
end
local product_id = reservation_data[2]
local quantity = tonumber(reservation_data[4])
local inventory_key = 'inventory:' .. product_id
local reserved_key = 'reserved:' .. product_id
-- 扣减实际库存
redis.call('DECRBY', inventory_key, quantity)
-- 减少预留库存
redis.call('DECRBY', reserved_key, quantity)
-- 删除预留记录
redis.call('DEL', reservation_key)
return {1, 'success'}
"""
try:
result = self.redis_client.eval(lua_script, 0, order_id)
return result[0] == 1
except Exception as e:
print(f"确认预留失败: {e}")
return False
def cancel_reservation(self, order_id: str) -> bool:
"""取消预留"""
lua_script = """
local reservation_key = 'reservation:' .. ARGV[1]
local reservation_data = redis.call('HGETALL', reservation_key)
if #reservation_data == 0 then
return {0, 'reservation_not_found'}
end
local product_id = reservation_data[2]
local quantity = tonumber(reservation_data[4])
local reserved_key = 'reserved:' .. product_id
-- 减少预留库存
redis.call('DECRBY', reserved_key, quantity)
-- 删除预留记录
redis.call('DEL', reservation_key)
return {1, 'success'}
"""
try:
result = self.redis_client.eval(lua_script, 0, order_id)
return result[0] == 1
except Exception as e:
print(f"取消预留失败: {e}")
return False
14.2 社交媒体系统
14.2.1 用户关注系统
”`python class SocialFollowService: “”“用户关注服务”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.following_prefix = "following:" # 关注列表
self.followers_prefix = "followers:" # 粉丝列表
self.feed_prefix = "feed:" # 动态流
def follow_user(self, user_id: str, target_user_id: str) -> bool:
"""关注用户"""
if user_id == target_user_id:
return False
try:
pipe = self.redis_client.pipeline()
# 添加到关注列表
following_key = f"{self.following_prefix}{user_id}"
pipe.sadd(following_key, target_user_id)
# 添加到粉丝列表
followers_key = f"{self.followers_prefix}{target_user_id}"
pipe.sadd(followers_key, user_id)
# 记录关注时间
follow_time_key = f"follow_time:{user_id}:{target_user_id}"
pipe.set(follow_time_key, int(time.time()))
pipe.expire(follow_time_key, 365 * 24 * 3600) # 1年过期
pipe.execute()
return True
except Exception as e:
print(f"关注用户失败: {e}")
return False
def unfollow_user(self, user_id: str, target_user_id: str) -> bool:
"""取消关注"""
try:
pipe = self.redis_client.pipeline()
# 从关注列表移除
following_key = f"{self.following_prefix}{user_id}"
pipe.srem(following_key, target_user_id)
# 从粉丝列表移除
followers_key = f"{self.followers_prefix}{target_user_id}"
pipe.srem(followers_key, user_id)
# 删除关注时间记录
follow_time_key = f"follow_time:{user_id}:{target_user_id}"
pipe.delete(follow_time_key)
pipe.execute()
return True
except Exception as e:
print(f"取消关注失败: {e}")
return False
def is_following(self, user_id: str, target_user_id: str) -> bool:
"""检查是否关注"""
try:
following_key = f"{self.following_prefix}{user_id}"
return self.redis_client.sismember(following_key, target_user_id)
except Exception as e:
print(f"检查关注状态失败: {e}")
return False
def get_following_list(self, user_id: str, limit: int = 100) -> List[str]:
"""获取关注列表"""
try:
following_key = f"{self.following_prefix}{user_id}"
following_list = list(self.redis_client.smembers(following_key))
return following_list[:limit]
except Exception as e:
print(f"获取关注列表失败: {e}")
return []
def get_followers_list(self, user_id: str, limit: int = 100) -> List[str]:
"""获取粉丝列表"""
try:
followers_key = f"{self.followers_prefix}{user_id}"
followers_list = list(self.redis_client.smembers(followers_key))
return followers_list[:limit]
except Exception as e:
print(f"获取粉丝列表失败: {e}")
return []
def get_mutual_follows(self, user_id: str, target_user_id: str) -> List[str]:
"""获取共同关注"""
try:
following_key1 = f"{self.following_prefix}{user_id}"
following_key2 = f"{self.following_prefix}{target_user_id}"
mutual_follows = self.redis_client.sinter(following_key1, following_key2)
return list(mutual_follows)
except Exception as e:
print(f"获取共同关注失败: {e}")
return []
def get_user_stats(self, user_id: str) -> Dict[str, int]:
"""获取用户统计信息"""
try:
following_key = f"{self.following_prefix}{user_id}"
followers_key = f"{self.followers_prefix}{user_id}"
following_count = self.redis_client.scard(following_key)
followers_count = self.redis_client.scard(followers_key)
return {
'following_count': following_count,
'followers_count': followers_count
}
except Exception as e:
print(f"获取用户统计失败: {e}")
return {'following_count': 0, 'followers_count': 0}
14.2.2 动态流系统
”`python @dataclass class Post: “”“动态帖子”“” post_id: str user_id: str content: str media_urls: List[str] = None tags: List[str] = None created_time: str = None like_count: int = 0 comment_count: int = 0
class SocialFeedService: “”“社交动态流服务”“”
def __init__(self, redis_client: redis.Redis, follow_service: SocialFollowService):
self.redis_client = redis_client
self.follow_service = follow_service
self.user_posts_prefix = "user_posts:"
self.timeline_prefix = "timeline:"
self.post_prefix = "post:"
self.likes_prefix = "likes:"
def publish_post(self, post: Post) -> bool:
"""发布动态"""
try:
post.post_id = str(uuid.uuid4())
post.created_time = datetime.now().isoformat()
# 保存帖子数据
post_key = f"{self.post_prefix}{post.post_id}"
post_data = {
'post_id': post.post_id,
'user_id': post.user_id,
'content': post.content,
'media_urls': json.dumps(post.media_urls or []),
'tags': json.dumps(post.tags or []),
'created_time': post.created_time,
'like_count': 0,
'comment_count': 0
}
pipe = self.redis_client.pipeline()
# 保存帖子
pipe.hmset(post_key, post_data)
pipe.expire(post_key, 30 * 24 * 3600) # 30天过期
# 添加到用户帖子列表
user_posts_key = f"{self.user_posts_prefix}{post.user_id}"
pipe.zadd(user_posts_key, {post.post_id: time.time()})
pipe.zremrangebyrank(user_posts_key, 0, -1001) # 保留最新1000条
# 推送到粉丝时间线
followers = self.follow_service.get_followers_list(post.user_id, 10000)
for follower_id in followers:
timeline_key = f"{self.timeline_prefix}{follower_id}"
pipe.zadd(timeline_key, {post.post_id: time.time()})
pipe.zremrangebyrank(timeline_key, 0, -501) # 保留最新500条
pipe.expire(timeline_key, 7 * 24 * 3600) # 7天过期
pipe.execute()
return True
except Exception as e:
print(f"发布动态失败: {e}")
return False
def get_user_timeline(self, user_id: str, limit: int = 20, offset: int = 0) -> List[Dict[str, Any]]:
"""获取用户时间线"""
try:
timeline_key = f"{self.timeline_prefix}{user_id}"
# 获取帖子ID列表(按时间倒序)
post_ids = self.redis_client.zrevrange(
timeline_key, offset, offset + limit - 1
)
# 批量获取帖子数据
posts = []
for post_id in post_ids:
post_data = self.get_post(post_id.decode() if isinstance(post_id, bytes) else post_id)
if post_data:
posts.append(post_data)
return posts
except Exception as e:
print(f"获取时间线失败: {e}")
return []
def get_post(self, post_id: str) -> Optional[Dict[str, Any]]:
"""获取帖子详情"""
try:
post_key = f"{self.post_prefix}{post_id}"
post_data = self.redis_client.hgetall(post_key)
if not post_data:
return None
# 转换数据类型
result = {}
for key, value in post_data.items():
key = key.decode() if isinstance(key, bytes) else key
value = value.decode() if isinstance(value, bytes) else value
if key in ['media_urls', 'tags']:
result[key] = json.loads(value)
elif key in ['like_count', 'comment_count']:
result[key] = int(value)
else:
result[key] = value
return result
except Exception as e:
print(f"获取帖子失败: {e}")
return None
def like_post(self, user_id: str, post_id: str) -> bool:
"""点赞帖子"""
try:
likes_key = f"{self.likes_prefix}{post_id}"
post_key = f"{self.post_prefix}{post_id}"
# 检查是否已点赞
if self.redis_client.sismember(likes_key, user_id):
return False
pipe = self.redis_client.pipeline()
# 添加点赞记录
pipe.sadd(likes_key, user_id)
pipe.expire(likes_key, 30 * 24 * 3600)
# 增加点赞数
pipe.hincrby(post_key, 'like_count', 1)
pipe.execute()
return True
except Exception as e:
print(f"点赞失败: {e}")
return False
def unlike_post(self, user_id: str, post_id: str) -> bool:
"""取消点赞"""
try:
likes_key = f"{self.likes_prefix}{post_id}"
post_key = f"{self.post_prefix}{post_id}"
# 检查是否已点赞
if not self.redis_client.sismember(likes_key, user_id):
return False
pipe = self.redis_client.pipeline()
# 移除点赞记录
pipe.srem(likes_key, user_id)
# 减少点赞数
pipe.hincrby(post_key, 'like_count', -1)
pipe.execute()
return True
except Exception as e:
print(f"取消点赞失败: {e}")
return False
14.3 实时聊天系统
14.3.1 聊天室管理
”`python class ChatRoomService: “”“聊天室服务”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.room_prefix = "chatroom:"
self.user_rooms_prefix = "user_rooms:"
self.room_messages_prefix = "room_messages:"
self.online_users_prefix = "online_users:"
def create_room(self, room_id: str, room_name: str, creator_id: str,
room_type: str = 'public', max_users: int = 100) -> bool:
"""创建聊天室"""
try:
room_key = f"{self.room_prefix}{room_id}"
room_data = {
'room_id': room_id,
'room_name': room_name,
'creator_id': creator_id,
'room_type': room_type,
'max_users': max_users,
'created_time': datetime.now().isoformat(),
'user_count': 0
}
self.redis_client.hmset(room_key, room_data)
return True
except Exception as e:
print(f"创建聊天室失败: {e}")
return False
def join_room(self, room_id: str, user_id: str) -> bool:
"""加入聊天室"""
try:
room_key = f"{self.room_prefix}{room_id}"
online_users_key = f"{self.online_users_prefix}{room_id}"
user_rooms_key = f"{self.user_rooms_prefix}{user_id}"
# 检查聊天室是否存在
if not self.redis_client.exists(room_key):
return False
# 检查用户数量限制
max_users = int(self.redis_client.hget(room_key, 'max_users') or 100)
current_users = self.redis_client.scard(online_users_key)
if current_users >= max_users:
return False
pipe = self.redis_client.pipeline()
# 添加用户到在线列表
pipe.sadd(online_users_key, user_id)
# 更新用户计数
pipe.hincrby(room_key, 'user_count', 1)
# 添加到用户房间列表
pipe.sadd(user_rooms_key, room_id)
# 记录加入时间
join_time_key = f"join_time:{room_id}:{user_id}"
pipe.set(join_time_key, int(time.time()))
pipe.expire(join_time_key, 24 * 3600)
pipe.execute()
return True
except Exception as e:
print(f"加入聊天室失败: {e}")
return False
def leave_room(self, room_id: str, user_id: str) -> bool:
"""离开聊天室"""
try:
room_key = f"{self.room_prefix}{room_id}"
online_users_key = f"{self.online_users_prefix}{room_id}"
user_rooms_key = f"{self.user_rooms_prefix}{user_id}"
pipe = self.redis_client.pipeline()
# 从在线列表移除
pipe.srem(online_users_key, user_id)
# 更新用户计数
pipe.hincrby(room_key, 'user_count', -1)
# 从用户房间列表移除
pipe.srem(user_rooms_key, room_id)
# 删除加入时间记录
join_time_key = f"join_time:{room_id}:{user_id}"
pipe.delete(join_time_key)
pipe.execute()
return True
except Exception as e:
print(f"离开聊天室失败: {e}")
return False
def send_message(self, room_id: str, user_id: str, message: str,
message_type: str = 'text') -> Optional[str]:
"""发送消息"""
try:
# 检查用户是否在房间中
online_users_key = f"{self.online_users_prefix}{room_id}"
if not self.redis_client.sismember(online_users_key, user_id):
return None
message_id = str(uuid.uuid4())
message_data = {
'message_id': message_id,
'room_id': room_id,
'user_id': user_id,
'message': message,
'message_type': message_type,
'timestamp': time.time(),
'created_time': datetime.now().isoformat()
}
# 保存到消息流
messages_key = f"{self.room_messages_prefix}{room_id}"
self.redis_client.zadd(messages_key, {json.dumps(message_data): time.time()})
# 保留最近1000条消息
self.redis_client.zremrangebyrank(messages_key, 0, -1001)
# 设置过期时间
self.redis_client.expire(messages_key, 7 * 24 * 3600)
return message_id
except Exception as e:
print(f"发送消息失败: {e}")
return None
def get_room_messages(self, room_id: str, limit: int = 50,
before_timestamp: float = None) -> List[Dict[str, Any]]:
"""获取聊天室消息"""
try:
messages_key = f"{self.room_messages_prefix}{room_id}"
if before_timestamp:
# 获取指定时间之前的消息
messages = self.redis_client.zrevrangebyscore(
messages_key, before_timestamp, 0, start=0, num=limit
)
else:
# 获取最新消息
messages = self.redis_client.zrevrange(messages_key, 0, limit - 1)
result = []
for msg_data in messages:
try:
msg_json = msg_data.decode() if isinstance(msg_data, bytes) else msg_data
message = json.loads(msg_json)
result.append(message)
except json.JSONDecodeError:
continue
return result
except Exception as e:
print(f"获取消息失败: {e}")
return []
def get_online_users(self, room_id: str) -> List[str]:
"""获取在线用户列表"""
try:
online_users_key = f"{self.online_users_prefix}{room_id}"
users = self.redis_client.smembers(online_users_key)
return [user.decode() if isinstance(user, bytes) else user for user in users]
except Exception as e:
print(f"获取在线用户失败: {e}")
return []
def get_room_info(self, room_id: str) -> Optional[Dict[str, Any]]:
"""获取聊天室信息"""
try:
room_key = f"{self.room_prefix}{room_id}"
room_data = self.redis_client.hgetall(room_key)
if not room_data:
return None
# 转换数据类型
result = {}
for key, value in room_data.items():
key = key.decode() if isinstance(key, bytes) else key
value = value.decode() if isinstance(value, bytes) else value
if key in ['max_users', 'user_count']:
result[key] = int(value)
else:
result[key] = value
# 获取实时在线用户数
online_users_key = f"{self.online_users_prefix}{room_id}"
result['online_count'] = self.redis_client.scard(online_users_key)
return result
except Exception as e:
print(f"获取聊天室信息失败: {e}")
return None
14.4 游戏排行榜系统
14.4.1 实时排行榜
”`python class GameLeaderboardService: “”“游戏排行榜服务”“”
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.leaderboard_prefix = "leaderboard:"
self.user_score_prefix = "user_score:"
self.daily_prefix = "daily:"
self.weekly_prefix = "weekly:"
self.monthly_prefix = "monthly:"
def update_score(self, user_id: str, score: int, game_type: str = 'default') -> bool:
"""更新用户分数"""
try:
current_time = datetime.now()
pipe = self.redis_client.pipeline()
# 全局排行榜
global_key = f"{self.leaderboard_prefix}{game_type}"
pipe.zadd(global_key, {user_id: score})
# 日排行榜
daily_key = f"{self.leaderboard_prefix}{self.daily_prefix}{game_type}:{current_time.strftime('%Y-%m-%d')}"
pipe.zadd(daily_key, {user_id: score})
pipe.expire(daily_key, 2 * 24 * 3600) # 2天过期
# 周排行榜
week_start = current_time - timedelta(days=current_time.weekday())
weekly_key = f"{self.leaderboard_prefix}{self.weekly_prefix}{game_type}:{week_start.strftime('%Y-W%U')}"
pipe.zadd(weekly_key, {user_id: score})
pipe.expire(weekly_key, 14 * 24 * 3600) # 14天过期
# 月排行榜
monthly_key = f"{self.leaderboard_prefix}{self.monthly_prefix}{game_type}:{current_time.strftime('%Y-%m')}"
pipe.zadd(monthly_key, {user_id: score})
pipe.expire(monthly_key, 60 * 24 * 3600) # 60天过期
# 保存用户分数历史
user_score_key = f"{self.user_score_prefix}{user_id}:{game_type}"
score_data = {
'score': score,
'timestamp': time.time(),
'date': current_time.isoformat()
}
pipe.zadd(user_score_key, {json.dumps(score_data): time.time()})
pipe.zremrangebyrank(user_score_key, 0, -101) # 保留最近100条记录
pipe.expire(user_score_key, 30 * 24 * 3600)
pipe.execute()
return True
except Exception as e:
print(f"更新分数失败: {e}")
return False
def get_leaderboard(self, game_type: str = 'default', period: str = 'global',
limit: int = 100, with_scores: bool = True) -> List[Dict[str, Any]]:
"""获取排行榜"""
try:
# 构建键名
if period == 'global':
key = f"{self.leaderboard_prefix}{game_type}"
elif period == 'daily':
today = datetime.now().strftime('%Y-%m-%d')
key = f"{self.leaderboard_prefix}{self.daily_prefix}{game_type}:{today}"
elif period == 'weekly':
current_time = datetime.now()
week_start = current_time - timedelta(days=current_time.weekday())
key = f"{self.leaderboard_prefix}{self.weekly_prefix}{game_type}:{week_start.strftime('%Y-W%U')}"
elif period == 'monthly':
month = datetime.now().strftime('%Y-%m')
key = f"{self.leaderboard_prefix}{self.monthly_prefix}{game_type}:{month}"
else:
return []
# 获取排行榜数据
if with_scores:
leaderboard_data = self.redis_client.zrevrange(key, 0, limit - 1, withscores=True)
result = []
for i, (user_id, score) in enumerate(leaderboard_data):
result.append({
'rank': i + 1,
'user_id': user_id.decode() if isinstance(user_id, bytes) else user_id,
'score': int(score)
})
else:
leaderboard_data = self.redis_client.zrevrange(key, 0, limit - 1)
result = []
for i, user_id in enumerate(leaderboard_data):
result.append({
'rank': i + 1,
'user_id': user_id.decode() if isinstance(user_id, bytes) else user_id
})
return result
except Exception as e:
print(f"获取排行榜失败: {e}")
return []
def get_user_rank(self, user_id: str, game_type: str = 'default',
period: str = 'global') -> Optional[Dict[str, Any]]:
"""获取用户排名"""
try:
# 构建键名
if period == 'global':
key = f"{self.leaderboard_prefix}{game_type}"
elif period == 'daily':
today = datetime.now().strftime('%Y-%m-%d')
key = f"{self.leaderboard_prefix}{self.daily_prefix}{game_type}:{today}"
elif period == 'weekly':
current_time = datetime.now()
week_start = current_time - timedelta(days=current_time.weekday())
key = f"{self.leaderboard_prefix}{self.weekly_prefix}{game_type}:{week_start.strftime('%Y-W%U')}"
elif period == 'monthly':
month = datetime.now().strftime('%Y-%m')
key = f"{self.leaderboard_prefix}{self.monthly_prefix}{game_type}:{month}"
else:
return None
# 获取用户分数和排名
score = self.redis_client.zscore(key, user_id)
if score is None:
return None
rank = self.redis_client.zrevrank(key, user_id)
if rank is None:
return None
total_users = self.redis_client.zcard(key)
return {
'user_id': user_id,
'score': int(score),
'rank': rank + 1, # Redis排名从0开始,转换为从1开始
'total_users': total_users,
'percentile': round((total_users - rank) / total_users * 100, 2) if total_users > 0 else 0
}
except Exception as e:
print(f"获取用户排名失败: {e}")
return None
def get_user_score_history(self, user_id: str, game_type: str = 'default',
limit: int = 30) -> List[Dict[str, Any]]:
"""获取用户分数历史"""
try:
user_score_key = f"{self.user_score_prefix}{user_id}:{game_type}"
# 获取最近的分数记录
score_records = self.redis_client.zrevrange(user_score_key, 0, limit - 1)
result = []
for record in score_records:
try:
record_str = record.decode() if isinstance(record, bytes) else record
score_data = json.loads(record_str)
result.append(score_data)
except json.JSONDecodeError:
continue
return result
except Exception as e:
print(f"获取分数历史失败: {e}")
return []
def get_nearby_ranks(self, user_id: str, game_type: str = 'default',
period: str = 'global', range_size: int = 5) -> List[Dict[str, Any]]:
"""获取用户附近的排名"""
try:
user_rank_info = self.get_user_rank(user_id, game_type, period)
if not user_rank_info:
return []
user_rank = user_rank_info['rank']
start_rank = max(1, user_rank - range_size)
end_rank = user_rank + range_size
leaderboard = self.get_leaderboard(game_type, period, limit=end_rank)
# 筛选附近排名
nearby_ranks = []
for entry in leaderboard:
if start_rank <= entry['rank'] <= end_rank:
nearby_ranks.append(entry)
return nearby_ranks
except Exception as e:
print(f"获取附近排名失败: {e}")
return []
14.5 总结
本章通过多个实战案例展示了Redis在不同场景下的应用:
14.5.1 应用场景总结
- 电商系统:购物车管理、库存控制
- 社交媒体:关注系统、动态流
- 实时聊天:聊天室管理、消息存储
- 游戏系统:排行榜、分数统计
14.5.2 技术特点
- 高性能:利用Redis的内存存储优势
- 实时性:支持实时数据更新和查询
- 可扩展:支持大规模并发访问
- 数据一致性:使用Lua脚本保证原子操作
14.5.3 最佳实践
- 合理设计数据结构和键名规范
- 使用管道和批量操作提高性能
- 设置合适的过期时间管理内存
- 使用Lua脚本保证复杂操作的原子性
- 建立完善的监控和告警机制
14.5.4 参考资料
下一章将创建Redis教程的总结和展望。