14.1 电商系统中的Redis应用

14.1.1 购物车系统

”`python import redis import json import time from typing import Dict, List, Any, Optional from dataclasses import dataclass from datetime import datetime, timedelta import uuid

@dataclass class CartItem: “”“购物车商品项”“” product_id: str product_name: str price: float quantity: int sku_id: str attributes: Dict[str, Any] = None added_time: str = None

class ShoppingCartService: “”“购物车服务”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.cart_prefix = "cart:"
    self.cart_ttl = 7 * 24 * 3600  # 7天过期

def _get_cart_key(self, user_id: str) -> str:
    """获取购物车键名"""
    return f"{self.cart_prefix}{user_id}"

def add_item(self, user_id: str, item: CartItem) -> bool:
    """添加商品到购物车"""
    try:
        cart_key = self._get_cart_key(user_id)

        # 检查商品是否已存在
        existing_item = self.redis_client.hget(cart_key, item.product_id)

        if existing_item:
            # 更新数量
            existing_data = json.loads(existing_item)
            existing_data['quantity'] += item.quantity
            existing_data['updated_time'] = datetime.now().isoformat()
        else:
            # 新增商品
            existing_data = {
                'product_id': item.product_id,
                'product_name': item.product_name,
                'price': item.price,
                'quantity': item.quantity,
                'sku_id': item.sku_id,
                'attributes': item.attributes or {},
                'added_time': datetime.now().isoformat(),
                'updated_time': datetime.now().isoformat()
            }

        # 保存到Redis
        self.redis_client.hset(cart_key, item.product_id, json.dumps(existing_data))
        self.redis_client.expire(cart_key, self.cart_ttl)

        return True

    except Exception as e:
        print(f"添加商品到购物车失败: {e}")
        return False

def remove_item(self, user_id: str, product_id: str) -> bool:
    """从购物车移除商品"""
    try:
        cart_key = self._get_cart_key(user_id)
        result = self.redis_client.hdel(cart_key, product_id)
        return bool(result)
    except Exception as e:
        print(f"移除购物车商品失败: {e}")
        return False

def update_quantity(self, user_id: str, product_id: str, quantity: int) -> bool:
    """更新商品数量"""
    try:
        cart_key = self._get_cart_key(user_id)
        item_data = self.redis_client.hget(cart_key, product_id)

        if not item_data:
            return False

        item_info = json.loads(item_data)
        item_info['quantity'] = quantity
        item_info['updated_time'] = datetime.now().isoformat()

        self.redis_client.hset(cart_key, product_id, json.dumps(item_info))
        return True

    except Exception as e:
        print(f"更新商品数量失败: {e}")
        return False

def get_cart(self, user_id: str) -> Dict[str, Any]:
    """获取购物车内容"""
    try:
        cart_key = self._get_cart_key(user_id)
        cart_data = self.redis_client.hgetall(cart_key)

        items = []
        total_amount = 0
        total_quantity = 0

        for product_id, item_data in cart_data.items():
            item_info = json.loads(item_data)
            items.append(item_info)
            total_amount += item_info['price'] * item_info['quantity']
            total_quantity += item_info['quantity']

        return {
            'user_id': user_id,
            'items': items,
            'total_quantity': total_quantity,
            'total_amount': round(total_amount, 2),
            'updated_time': datetime.now().isoformat()
        }

    except Exception as e:
        print(f"获取购物车失败: {e}")
        return {'items': [], 'total_quantity': 0, 'total_amount': 0}

def clear_cart(self, user_id: str) -> bool:
    """清空购物车"""
    try:
        cart_key = self._get_cart_key(user_id)
        self.redis_client.delete(cart_key)
        return True
    except Exception as e:
        print(f"清空购物车失败: {e}")
        return False

def merge_carts(self, source_user_id: str, target_user_id: str) -> bool:
    """合并购物车(用户登录时)"""
    try:
        source_cart = self.get_cart(source_user_id)

        for item in source_cart['items']:
            cart_item = CartItem(
                product_id=item['product_id'],
                product_name=item['product_name'],
                price=item['price'],
                quantity=item['quantity'],
                sku_id=item['sku_id'],
                attributes=item.get('attributes', {})
            )
            self.add_item(target_user_id, cart_item)

        # 清空源购物车
        self.clear_cart(source_user_id)
        return True

    except Exception as e:
        print(f"合并购物车失败: {e}")
        return False

14.1.2 商品库存管理

”`python class InventoryService: “”“库存管理服务”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.inventory_prefix = "inventory:"
    self.lock_prefix = "lock:inventory:"
    self.reserved_prefix = "reserved:"

def _get_inventory_key(self, product_id: str) -> str:
    """获取库存键名"""
    return f"{self.inventory_prefix}{product_id}"

def _get_lock_key(self, product_id: str) -> str:
    """获取锁键名"""
    return f"{self.lock_prefix}{product_id}"

def _get_reserved_key(self, product_id: str) -> str:
    """获取预留库存键名"""
    return f"{self.reserved_prefix}{product_id}"

def set_inventory(self, product_id: str, quantity: int) -> bool:
    """设置商品库存"""
    try:
        inventory_key = self._get_inventory_key(product_id)
        self.redis_client.set(inventory_key, quantity)
        return True
    except Exception as e:
        print(f"设置库存失败: {e}")
        return False

def get_inventory(self, product_id: str) -> int:
    """获取商品库存"""
    try:
        inventory_key = self._get_inventory_key(product_id)
        inventory = self.redis_client.get(inventory_key)
        return int(inventory) if inventory else 0
    except Exception as e:
        print(f"获取库存失败: {e}")
        return 0

def reserve_inventory(self, product_id: str, quantity: int, order_id: str) -> bool:
    """预留库存"""
    lock_key = self._get_lock_key(product_id)

    # 使用Lua脚本确保原子性
    lua_script = """
    local inventory_key = KEYS[1]
    local reserved_key = KEYS[2]
    local lock_key = KEYS[3]
    local quantity = tonumber(ARGV[1])
    local order_id = ARGV[2]
    local lock_timeout = tonumber(ARGV[3])

    -- 获取分布式锁
    local lock_acquired = redis.call('SET', lock_key, order_id, 'PX', lock_timeout, 'NX')
    if not lock_acquired then
        return {0, 'lock_failed'}
    end

    -- 检查库存
    local current_inventory = tonumber(redis.call('GET', inventory_key) or 0)
    local current_reserved = tonumber(redis.call('GET', reserved_key) or 0)
    local available = current_inventory - current_reserved

    if available >= quantity then
        -- 增加预留库存
        redis.call('INCRBY', reserved_key, quantity)
        redis.call('EXPIRE', reserved_key, 1800)  -- 30分钟过期

        -- 记录预留信息
        local reservation_key = 'reservation:' .. order_id
        redis.call('HSET', reservation_key, 'product_id', KEYS[1], 'quantity', quantity, 'timestamp', ARGV[4])
        redis.call('EXPIRE', reservation_key, 1800)

        -- 释放锁
        redis.call('DEL', lock_key)
        return {1, 'success'}
    else
        -- 释放锁
        redis.call('DEL', lock_key)
        return {0, 'insufficient_inventory'}
    end
    """

    try:
        inventory_key = self._get_inventory_key(product_id)
        reserved_key = self._get_reserved_key(product_id)

        result = self.redis_client.eval(
            lua_script,
            3,
            inventory_key,
            reserved_key,
            lock_key,
            quantity,
            order_id,
            5000,  # 锁超时时间(毫秒)
            int(time.time())
        )

        return result[0] == 1

    except Exception as e:
        print(f"预留库存失败: {e}")
        return False

def confirm_reservation(self, order_id: str) -> bool:
    """确认预留(扣减库存)"""
    lua_script = """
    local reservation_key = 'reservation:' .. ARGV[1]
    local reservation_data = redis.call('HGETALL', reservation_key)

    if #reservation_data == 0 then
        return {0, 'reservation_not_found'}
    end

    local product_id = reservation_data[2]
    local quantity = tonumber(reservation_data[4])

    local inventory_key = 'inventory:' .. product_id
    local reserved_key = 'reserved:' .. product_id

    -- 扣减实际库存
    redis.call('DECRBY', inventory_key, quantity)

    -- 减少预留库存
    redis.call('DECRBY', reserved_key, quantity)

    -- 删除预留记录
    redis.call('DEL', reservation_key)

    return {1, 'success'}
    """

    try:
        result = self.redis_client.eval(lua_script, 0, order_id)
        return result[0] == 1
    except Exception as e:
        print(f"确认预留失败: {e}")
        return False

def cancel_reservation(self, order_id: str) -> bool:
    """取消预留"""
    lua_script = """
    local reservation_key = 'reservation:' .. ARGV[1]
    local reservation_data = redis.call('HGETALL', reservation_key)

    if #reservation_data == 0 then
        return {0, 'reservation_not_found'}
    end

    local product_id = reservation_data[2]
    local quantity = tonumber(reservation_data[4])

    local reserved_key = 'reserved:' .. product_id

    -- 减少预留库存
    redis.call('DECRBY', reserved_key, quantity)

    -- 删除预留记录
    redis.call('DEL', reservation_key)

    return {1, 'success'}
    """

    try:
        result = self.redis_client.eval(lua_script, 0, order_id)
        return result[0] == 1
    except Exception as e:
        print(f"取消预留失败: {e}")
        return False

14.2 社交媒体系统

14.2.1 用户关注系统

”`python class SocialFollowService: “”“用户关注服务”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.following_prefix = "following:"  # 关注列表
    self.followers_prefix = "followers:"  # 粉丝列表
    self.feed_prefix = "feed:"  # 动态流

def follow_user(self, user_id: str, target_user_id: str) -> bool:
    """关注用户"""
    if user_id == target_user_id:
        return False

    try:
        pipe = self.redis_client.pipeline()

        # 添加到关注列表
        following_key = f"{self.following_prefix}{user_id}"
        pipe.sadd(following_key, target_user_id)

        # 添加到粉丝列表
        followers_key = f"{self.followers_prefix}{target_user_id}"
        pipe.sadd(followers_key, user_id)

        # 记录关注时间
        follow_time_key = f"follow_time:{user_id}:{target_user_id}"
        pipe.set(follow_time_key, int(time.time()))
        pipe.expire(follow_time_key, 365 * 24 * 3600)  # 1年过期

        pipe.execute()
        return True

    except Exception as e:
        print(f"关注用户失败: {e}")
        return False

def unfollow_user(self, user_id: str, target_user_id: str) -> bool:
    """取消关注"""
    try:
        pipe = self.redis_client.pipeline()

        # 从关注列表移除
        following_key = f"{self.following_prefix}{user_id}"
        pipe.srem(following_key, target_user_id)

        # 从粉丝列表移除
        followers_key = f"{self.followers_prefix}{target_user_id}"
        pipe.srem(followers_key, user_id)

        # 删除关注时间记录
        follow_time_key = f"follow_time:{user_id}:{target_user_id}"
        pipe.delete(follow_time_key)

        pipe.execute()
        return True

    except Exception as e:
        print(f"取消关注失败: {e}")
        return False

def is_following(self, user_id: str, target_user_id: str) -> bool:
    """检查是否关注"""
    try:
        following_key = f"{self.following_prefix}{user_id}"
        return self.redis_client.sismember(following_key, target_user_id)
    except Exception as e:
        print(f"检查关注状态失败: {e}")
        return False

def get_following_list(self, user_id: str, limit: int = 100) -> List[str]:
    """获取关注列表"""
    try:
        following_key = f"{self.following_prefix}{user_id}"
        following_list = list(self.redis_client.smembers(following_key))
        return following_list[:limit]
    except Exception as e:
        print(f"获取关注列表失败: {e}")
        return []

def get_followers_list(self, user_id: str, limit: int = 100) -> List[str]:
    """获取粉丝列表"""
    try:
        followers_key = f"{self.followers_prefix}{user_id}"
        followers_list = list(self.redis_client.smembers(followers_key))
        return followers_list[:limit]
    except Exception as e:
        print(f"获取粉丝列表失败: {e}")
        return []

def get_mutual_follows(self, user_id: str, target_user_id: str) -> List[str]:
    """获取共同关注"""
    try:
        following_key1 = f"{self.following_prefix}{user_id}"
        following_key2 = f"{self.following_prefix}{target_user_id}"

        mutual_follows = self.redis_client.sinter(following_key1, following_key2)
        return list(mutual_follows)

    except Exception as e:
        print(f"获取共同关注失败: {e}")
        return []

def get_user_stats(self, user_id: str) -> Dict[str, int]:
    """获取用户统计信息"""
    try:
        following_key = f"{self.following_prefix}{user_id}"
        followers_key = f"{self.followers_prefix}{user_id}"

        following_count = self.redis_client.scard(following_key)
        followers_count = self.redis_client.scard(followers_key)

        return {
            'following_count': following_count,
            'followers_count': followers_count
        }

    except Exception as e:
        print(f"获取用户统计失败: {e}")
        return {'following_count': 0, 'followers_count': 0}

14.2.2 动态流系统

”`python @dataclass class Post: “”“动态帖子”“” post_id: str user_id: str content: str media_urls: List[str] = None tags: List[str] = None created_time: str = None like_count: int = 0 comment_count: int = 0

class SocialFeedService: “”“社交动态流服务”“”

def __init__(self, redis_client: redis.Redis, follow_service: SocialFollowService):
    self.redis_client = redis_client
    self.follow_service = follow_service
    self.user_posts_prefix = "user_posts:"
    self.timeline_prefix = "timeline:"
    self.post_prefix = "post:"
    self.likes_prefix = "likes:"

def publish_post(self, post: Post) -> bool:
    """发布动态"""
    try:
        post.post_id = str(uuid.uuid4())
        post.created_time = datetime.now().isoformat()

        # 保存帖子数据
        post_key = f"{self.post_prefix}{post.post_id}"
        post_data = {
            'post_id': post.post_id,
            'user_id': post.user_id,
            'content': post.content,
            'media_urls': json.dumps(post.media_urls or []),
            'tags': json.dumps(post.tags or []),
            'created_time': post.created_time,
            'like_count': 0,
            'comment_count': 0
        }

        pipe = self.redis_client.pipeline()

        # 保存帖子
        pipe.hmset(post_key, post_data)
        pipe.expire(post_key, 30 * 24 * 3600)  # 30天过期

        # 添加到用户帖子列表
        user_posts_key = f"{self.user_posts_prefix}{post.user_id}"
        pipe.zadd(user_posts_key, {post.post_id: time.time()})
        pipe.zremrangebyrank(user_posts_key, 0, -1001)  # 保留最新1000条

        # 推送到粉丝时间线
        followers = self.follow_service.get_followers_list(post.user_id, 10000)
        for follower_id in followers:
            timeline_key = f"{self.timeline_prefix}{follower_id}"
            pipe.zadd(timeline_key, {post.post_id: time.time()})
            pipe.zremrangebyrank(timeline_key, 0, -501)  # 保留最新500条
            pipe.expire(timeline_key, 7 * 24 * 3600)  # 7天过期

        pipe.execute()
        return True

    except Exception as e:
        print(f"发布动态失败: {e}")
        return False

def get_user_timeline(self, user_id: str, limit: int = 20, offset: int = 0) -> List[Dict[str, Any]]:
    """获取用户时间线"""
    try:
        timeline_key = f"{self.timeline_prefix}{user_id}"

        # 获取帖子ID列表(按时间倒序)
        post_ids = self.redis_client.zrevrange(
            timeline_key, offset, offset + limit - 1
        )

        # 批量获取帖子数据
        posts = []
        for post_id in post_ids:
            post_data = self.get_post(post_id.decode() if isinstance(post_id, bytes) else post_id)
            if post_data:
                posts.append(post_data)

        return posts

    except Exception as e:
        print(f"获取时间线失败: {e}")
        return []

def get_post(self, post_id: str) -> Optional[Dict[str, Any]]:
    """获取帖子详情"""
    try:
        post_key = f"{self.post_prefix}{post_id}"
        post_data = self.redis_client.hgetall(post_key)

        if not post_data:
            return None

        # 转换数据类型
        result = {}
        for key, value in post_data.items():
            key = key.decode() if isinstance(key, bytes) else key
            value = value.decode() if isinstance(value, bytes) else value

            if key in ['media_urls', 'tags']:
                result[key] = json.loads(value)
            elif key in ['like_count', 'comment_count']:
                result[key] = int(value)
            else:
                result[key] = value

        return result

    except Exception as e:
        print(f"获取帖子失败: {e}")
        return None

def like_post(self, user_id: str, post_id: str) -> bool:
    """点赞帖子"""
    try:
        likes_key = f"{self.likes_prefix}{post_id}"
        post_key = f"{self.post_prefix}{post_id}"

        # 检查是否已点赞
        if self.redis_client.sismember(likes_key, user_id):
            return False

        pipe = self.redis_client.pipeline()

        # 添加点赞记录
        pipe.sadd(likes_key, user_id)
        pipe.expire(likes_key, 30 * 24 * 3600)

        # 增加点赞数
        pipe.hincrby(post_key, 'like_count', 1)

        pipe.execute()
        return True

    except Exception as e:
        print(f"点赞失败: {e}")
        return False

def unlike_post(self, user_id: str, post_id: str) -> bool:
    """取消点赞"""
    try:
        likes_key = f"{self.likes_prefix}{post_id}"
        post_key = f"{self.post_prefix}{post_id}"

        # 检查是否已点赞
        if not self.redis_client.sismember(likes_key, user_id):
            return False

        pipe = self.redis_client.pipeline()

        # 移除点赞记录
        pipe.srem(likes_key, user_id)

        # 减少点赞数
        pipe.hincrby(post_key, 'like_count', -1)

        pipe.execute()
        return True

    except Exception as e:
        print(f"取消点赞失败: {e}")
        return False

14.3 实时聊天系统

14.3.1 聊天室管理

”`python class ChatRoomService: “”“聊天室服务”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.room_prefix = "chatroom:"
    self.user_rooms_prefix = "user_rooms:"
    self.room_messages_prefix = "room_messages:"
    self.online_users_prefix = "online_users:"

def create_room(self, room_id: str, room_name: str, creator_id: str, 
               room_type: str = 'public', max_users: int = 100) -> bool:
    """创建聊天室"""
    try:
        room_key = f"{self.room_prefix}{room_id}"

        room_data = {
            'room_id': room_id,
            'room_name': room_name,
            'creator_id': creator_id,
            'room_type': room_type,
            'max_users': max_users,
            'created_time': datetime.now().isoformat(),
            'user_count': 0
        }

        self.redis_client.hmset(room_key, room_data)
        return True

    except Exception as e:
        print(f"创建聊天室失败: {e}")
        return False

def join_room(self, room_id: str, user_id: str) -> bool:
    """加入聊天室"""
    try:
        room_key = f"{self.room_prefix}{room_id}"
        online_users_key = f"{self.online_users_prefix}{room_id}"
        user_rooms_key = f"{self.user_rooms_prefix}{user_id}"

        # 检查聊天室是否存在
        if not self.redis_client.exists(room_key):
            return False

        # 检查用户数量限制
        max_users = int(self.redis_client.hget(room_key, 'max_users') or 100)
        current_users = self.redis_client.scard(online_users_key)

        if current_users >= max_users:
            return False

        pipe = self.redis_client.pipeline()

        # 添加用户到在线列表
        pipe.sadd(online_users_key, user_id)

        # 更新用户计数
        pipe.hincrby(room_key, 'user_count', 1)

        # 添加到用户房间列表
        pipe.sadd(user_rooms_key, room_id)

        # 记录加入时间
        join_time_key = f"join_time:{room_id}:{user_id}"
        pipe.set(join_time_key, int(time.time()))
        pipe.expire(join_time_key, 24 * 3600)

        pipe.execute()
        return True

    except Exception as e:
        print(f"加入聊天室失败: {e}")
        return False

def leave_room(self, room_id: str, user_id: str) -> bool:
    """离开聊天室"""
    try:
        room_key = f"{self.room_prefix}{room_id}"
        online_users_key = f"{self.online_users_prefix}{room_id}"
        user_rooms_key = f"{self.user_rooms_prefix}{user_id}"

        pipe = self.redis_client.pipeline()

        # 从在线列表移除
        pipe.srem(online_users_key, user_id)

        # 更新用户计数
        pipe.hincrby(room_key, 'user_count', -1)

        # 从用户房间列表移除
        pipe.srem(user_rooms_key, room_id)

        # 删除加入时间记录
        join_time_key = f"join_time:{room_id}:{user_id}"
        pipe.delete(join_time_key)

        pipe.execute()
        return True

    except Exception as e:
        print(f"离开聊天室失败: {e}")
        return False

def send_message(self, room_id: str, user_id: str, message: str, 
                message_type: str = 'text') -> Optional[str]:
    """发送消息"""
    try:
        # 检查用户是否在房间中
        online_users_key = f"{self.online_users_prefix}{room_id}"
        if not self.redis_client.sismember(online_users_key, user_id):
            return None

        message_id = str(uuid.uuid4())
        message_data = {
            'message_id': message_id,
            'room_id': room_id,
            'user_id': user_id,
            'message': message,
            'message_type': message_type,
            'timestamp': time.time(),
            'created_time': datetime.now().isoformat()
        }

        # 保存到消息流
        messages_key = f"{self.room_messages_prefix}{room_id}"
        self.redis_client.zadd(messages_key, {json.dumps(message_data): time.time()})

        # 保留最近1000条消息
        self.redis_client.zremrangebyrank(messages_key, 0, -1001)

        # 设置过期时间
        self.redis_client.expire(messages_key, 7 * 24 * 3600)

        return message_id

    except Exception as e:
        print(f"发送消息失败: {e}")
        return None

def get_room_messages(self, room_id: str, limit: int = 50, 
                     before_timestamp: float = None) -> List[Dict[str, Any]]:
    """获取聊天室消息"""
    try:
        messages_key = f"{self.room_messages_prefix}{room_id}"

        if before_timestamp:
            # 获取指定时间之前的消息
            messages = self.redis_client.zrevrangebyscore(
                messages_key, before_timestamp, 0, start=0, num=limit
            )
        else:
            # 获取最新消息
            messages = self.redis_client.zrevrange(messages_key, 0, limit - 1)

        result = []
        for msg_data in messages:
            try:
                msg_json = msg_data.decode() if isinstance(msg_data, bytes) else msg_data
                message = json.loads(msg_json)
                result.append(message)
            except json.JSONDecodeError:
                continue

        return result

    except Exception as e:
        print(f"获取消息失败: {e}")
        return []

def get_online_users(self, room_id: str) -> List[str]:
    """获取在线用户列表"""
    try:
        online_users_key = f"{self.online_users_prefix}{room_id}"
        users = self.redis_client.smembers(online_users_key)
        return [user.decode() if isinstance(user, bytes) else user for user in users]
    except Exception as e:
        print(f"获取在线用户失败: {e}")
        return []

def get_room_info(self, room_id: str) -> Optional[Dict[str, Any]]:
    """获取聊天室信息"""
    try:
        room_key = f"{self.room_prefix}{room_id}"
        room_data = self.redis_client.hgetall(room_key)

        if not room_data:
            return None

        # 转换数据类型
        result = {}
        for key, value in room_data.items():
            key = key.decode() if isinstance(key, bytes) else key
            value = value.decode() if isinstance(value, bytes) else value

            if key in ['max_users', 'user_count']:
                result[key] = int(value)
            else:
                result[key] = value

        # 获取实时在线用户数
        online_users_key = f"{self.online_users_prefix}{room_id}"
        result['online_count'] = self.redis_client.scard(online_users_key)

        return result

    except Exception as e:
        print(f"获取聊天室信息失败: {e}")
        return None

14.4 游戏排行榜系统

14.4.1 实时排行榜

”`python class GameLeaderboardService: “”“游戏排行榜服务”“”

def __init__(self, redis_client: redis.Redis):
    self.redis_client = redis_client
    self.leaderboard_prefix = "leaderboard:"
    self.user_score_prefix = "user_score:"
    self.daily_prefix = "daily:"
    self.weekly_prefix = "weekly:"
    self.monthly_prefix = "monthly:"

def update_score(self, user_id: str, score: int, game_type: str = 'default') -> bool:
    """更新用户分数"""
    try:
        current_time = datetime.now()

        pipe = self.redis_client.pipeline()

        # 全局排行榜
        global_key = f"{self.leaderboard_prefix}{game_type}"
        pipe.zadd(global_key, {user_id: score})

        # 日排行榜
        daily_key = f"{self.leaderboard_prefix}{self.daily_prefix}{game_type}:{current_time.strftime('%Y-%m-%d')}"
        pipe.zadd(daily_key, {user_id: score})
        pipe.expire(daily_key, 2 * 24 * 3600)  # 2天过期

        # 周排行榜
        week_start = current_time - timedelta(days=current_time.weekday())
        weekly_key = f"{self.leaderboard_prefix}{self.weekly_prefix}{game_type}:{week_start.strftime('%Y-W%U')}"
        pipe.zadd(weekly_key, {user_id: score})
        pipe.expire(weekly_key, 14 * 24 * 3600)  # 14天过期

        # 月排行榜
        monthly_key = f"{self.leaderboard_prefix}{self.monthly_prefix}{game_type}:{current_time.strftime('%Y-%m')}"
        pipe.zadd(monthly_key, {user_id: score})
        pipe.expire(monthly_key, 60 * 24 * 3600)  # 60天过期

        # 保存用户分数历史
        user_score_key = f"{self.user_score_prefix}{user_id}:{game_type}"
        score_data = {
            'score': score,
            'timestamp': time.time(),
            'date': current_time.isoformat()
        }
        pipe.zadd(user_score_key, {json.dumps(score_data): time.time()})
        pipe.zremrangebyrank(user_score_key, 0, -101)  # 保留最近100条记录
        pipe.expire(user_score_key, 30 * 24 * 3600)

        pipe.execute()
        return True

    except Exception as e:
        print(f"更新分数失败: {e}")
        return False

def get_leaderboard(self, game_type: str = 'default', period: str = 'global', 
                   limit: int = 100, with_scores: bool = True) -> List[Dict[str, Any]]:
    """获取排行榜"""
    try:
        # 构建键名
        if period == 'global':
            key = f"{self.leaderboard_prefix}{game_type}"
        elif period == 'daily':
            today = datetime.now().strftime('%Y-%m-%d')
            key = f"{self.leaderboard_prefix}{self.daily_prefix}{game_type}:{today}"
        elif period == 'weekly':
            current_time = datetime.now()
            week_start = current_time - timedelta(days=current_time.weekday())
            key = f"{self.leaderboard_prefix}{self.weekly_prefix}{game_type}:{week_start.strftime('%Y-W%U')}"
        elif period == 'monthly':
            month = datetime.now().strftime('%Y-%m')
            key = f"{self.leaderboard_prefix}{self.monthly_prefix}{game_type}:{month}"
        else:
            return []

        # 获取排行榜数据
        if with_scores:
            leaderboard_data = self.redis_client.zrevrange(key, 0, limit - 1, withscores=True)
            result = []
            for i, (user_id, score) in enumerate(leaderboard_data):
                result.append({
                    'rank': i + 1,
                    'user_id': user_id.decode() if isinstance(user_id, bytes) else user_id,
                    'score': int(score)
                })
        else:
            leaderboard_data = self.redis_client.zrevrange(key, 0, limit - 1)
            result = []
            for i, user_id in enumerate(leaderboard_data):
                result.append({
                    'rank': i + 1,
                    'user_id': user_id.decode() if isinstance(user_id, bytes) else user_id
                })

        return result

    except Exception as e:
        print(f"获取排行榜失败: {e}")
        return []

def get_user_rank(self, user_id: str, game_type: str = 'default', 
                 period: str = 'global') -> Optional[Dict[str, Any]]:
    """获取用户排名"""
    try:
        # 构建键名
        if period == 'global':
            key = f"{self.leaderboard_prefix}{game_type}"
        elif period == 'daily':
            today = datetime.now().strftime('%Y-%m-%d')
            key = f"{self.leaderboard_prefix}{self.daily_prefix}{game_type}:{today}"
        elif period == 'weekly':
            current_time = datetime.now()
            week_start = current_time - timedelta(days=current_time.weekday())
            key = f"{self.leaderboard_prefix}{self.weekly_prefix}{game_type}:{week_start.strftime('%Y-W%U')}"
        elif period == 'monthly':
            month = datetime.now().strftime('%Y-%m')
            key = f"{self.leaderboard_prefix}{self.monthly_prefix}{game_type}:{month}"
        else:
            return None

        # 获取用户分数和排名
        score = self.redis_client.zscore(key, user_id)
        if score is None:
            return None

        rank = self.redis_client.zrevrank(key, user_id)
        if rank is None:
            return None

        total_users = self.redis_client.zcard(key)

        return {
            'user_id': user_id,
            'score': int(score),
            'rank': rank + 1,  # Redis排名从0开始,转换为从1开始
            'total_users': total_users,
            'percentile': round((total_users - rank) / total_users * 100, 2) if total_users > 0 else 0
        }

    except Exception as e:
        print(f"获取用户排名失败: {e}")
        return None

def get_user_score_history(self, user_id: str, game_type: str = 'default', 
                          limit: int = 30) -> List[Dict[str, Any]]:
    """获取用户分数历史"""
    try:
        user_score_key = f"{self.user_score_prefix}{user_id}:{game_type}"

        # 获取最近的分数记录
        score_records = self.redis_client.zrevrange(user_score_key, 0, limit - 1)

        result = []
        for record in score_records:
            try:
                record_str = record.decode() if isinstance(record, bytes) else record
                score_data = json.loads(record_str)
                result.append(score_data)
            except json.JSONDecodeError:
                continue

        return result

    except Exception as e:
        print(f"获取分数历史失败: {e}")
        return []

def get_nearby_ranks(self, user_id: str, game_type: str = 'default', 
                    period: str = 'global', range_size: int = 5) -> List[Dict[str, Any]]:
    """获取用户附近的排名"""
    try:
        user_rank_info = self.get_user_rank(user_id, game_type, period)
        if not user_rank_info:
            return []

        user_rank = user_rank_info['rank']
        start_rank = max(1, user_rank - range_size)
        end_rank = user_rank + range_size

        leaderboard = self.get_leaderboard(game_type, period, limit=end_rank)

        # 筛选附近排名
        nearby_ranks = []
        for entry in leaderboard:
            if start_rank <= entry['rank'] <= end_rank:
                nearby_ranks.append(entry)

        return nearby_ranks

    except Exception as e:
        print(f"获取附近排名失败: {e}")
        return []

14.5 总结

本章通过多个实战案例展示了Redis在不同场景下的应用:

14.5.1 应用场景总结

  1. 电商系统:购物车管理、库存控制
  2. 社交媒体:关注系统、动态流
  3. 实时聊天:聊天室管理、消息存储
  4. 游戏系统:排行榜、分数统计

14.5.2 技术特点

  • 高性能:利用Redis的内存存储优势
  • 实时性:支持实时数据更新和查询
  • 可扩展:支持大规模并发访问
  • 数据一致性:使用Lua脚本保证原子操作

14.5.3 最佳实践

  • 合理设计数据结构和键名规范
  • 使用管道和批量操作提高性能
  • 设置合适的过期时间管理内存
  • 使用Lua脚本保证复杂操作的原子性
  • 建立完善的监控和告警机制

14.5.4 参考资料


下一章将创建Redis教程的总结和展望。