4.1 列表类型概述

4.1.1 什么是Redis列表

Redis列表(List)是一个有序的字符串集合,支持在列表的两端进行插入和删除操作。列表中的元素可以重复,并且保持插入时的顺序。

4.1.2 列表的特点

  1. 有序性

    • 元素按插入顺序排列
    • 支持按索引访问
    • 索引从0开始,支持负索引
  2. 双端操作

    • 支持从左端(头部)操作
    • 支持从右端(尾部)操作
    • 操作时间复杂度为O(1)
  3. 动态长度

    • 列表长度可动态变化
    • 最大长度为2^32-1个元素
    • 内存使用随元素数量变化

4.1.3 应用场景

  1. 消息队列

    • 任务队列
    • 消息传递
    • 事件处理
  2. 时间线功能

    • 社交媒体动态
    • 操作日志
    • 历史记录
  3. 栈和队列

    • LIFO栈结构
    • FIFO队列结构
    • 双端队列

4.2 基本列表操作

4.2.1 插入操作

# 从左端插入(头部插入)
LPUSH key element [element ...]
LPUSH mylist "first"
LPUSH mylist "second" "third"
# 列表内容:["third", "second", "first"]

# 从右端插入(尾部插入)
RPUSH key element [element ...]
RPUSH mylist "fourth"
RPUSH mylist "fifth" "sixth"
# 列表内容:["third", "second", "first", "fourth", "fifth", "sixth"]

# 在指定元素前/后插入
LINSERT key BEFORE|AFTER pivot element
LINSERT mylist BEFORE "first" "zero"
# 在"first"前插入"zero"

LINSERT mylist AFTER "sixth" "seventh"
# 在"sixth"后插入"seventh"

4.2.2 获取操作

# 获取指定范围的元素
LRANGE key start stop
LRANGE mylist 0 -1
# 获取所有元素

LRANGE mylist 0 2
# 获取前3个元素

LRANGE mylist -3 -1
# 获取最后3个元素

# 获取指定索引的元素
LINDEX key index
LINDEX mylist 0
# 获取第一个元素

LINDEX mylist -1
# 获取最后一个元素

# 获取列表长度
LLEN key
LLEN mylist
# 返回列表元素数量

4.2.3 删除操作

# 从左端删除(头部删除)
LPOP key [count]
LPOP mylist
# 删除并返回第一个元素

LPOP mylist 2
# 删除并返回前2个元素

# 从右端删除(尾部删除)
RPOP key [count]
RPOP mylist
# 删除并返回最后一个元素

RPOP mylist 2
# 删除并返回最后2个元素

# 删除指定值的元素
LREM key count element
LREM mylist 1 "second"
# 从头开始删除1个"second"

LREM mylist -1 "third"
# 从尾开始删除1个"third"

LREM mylist 0 "fourth"
# 删除所有"fourth"

4.2.4 修改操作

# 设置指定索引的元素值
LSET key index element
LSET mylist 0 "new_first"
# 设置第一个元素为"new_first"

LSET mylist -1 "new_last"
# 设置最后一个元素为"new_last"

# 保留指定范围的元素
LTRIM key start stop
LTRIM mylist 1 3
# 只保留索引1到3的元素

4.3 阻塞操作

4.3.1 阻塞弹出

# 阻塞式左端弹出
BLPOP key [key ...] timeout
BLPOP mylist 10
# 如果列表为空,等待10秒

BLPOP list1 list2 list3 5
# 从多个列表中弹出,等待5秒

# 阻塞式右端弹出
BRPOP key [key ...] timeout
BRPOP mylist 0
# 无限等待直到有元素

4.3.2 阻塞移动

# 阻塞式从一个列表移动到另一个列表
BRPOPLPUSH source destination timeout
BRPOPLPUSH source_list dest_list 10
# 从source_list右端弹出,插入到dest_list左端

# 非阻塞版本
RPOPLPUSH source destination
RPOPLPUSH source_list dest_list

4.4 实际应用示例

4.4.1 任务队列系统

import redis
import json
import time
import threading
from typing import Dict, Any, Optional, List, Callable
from datetime import datetime
import uuid

class RedisTaskQueue:
    def __init__(self, host='localhost', port=6379, db=0, password=None):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db, 
            password=password,
            decode_responses=True
        )
        self.task_queue = "task_queue"
        self.processing_queue = "processing_queue"
        self.completed_queue = "completed_queue"
        self.failed_queue = "failed_queue"
        self.task_data_prefix = "task_data"
        self.worker_prefix = "worker"
        
        # 任务处理器注册表
        self.task_handlers: Dict[str, Callable] = {}
        
        # 工作线程控制
        self.running = False
        self.worker_threads: List[threading.Thread] = []
    
    def _get_task_data_key(self, task_id: str) -> str:
        return f"{self.task_data_prefix}:{task_id}"
    
    def _get_worker_key(self, worker_id: str) -> str:
        return f"{self.worker_prefix}:{worker_id}"
    
    def register_handler(self, task_type: str, handler: Callable):
        """注册任务处理器"""
        self.task_handlers[task_type] = handler
    
    def enqueue_task(self, task_type: str, task_data: Dict[str, Any], 
                    priority: int = 0, delay: int = 0) -> str:
        """添加任务到队列"""
        try:
            task_id = str(uuid.uuid4())
            
            # 创建任务信息
            task_info = {
                'task_id': task_id,
                'task_type': task_type,
                'priority': priority,
                'created_at': datetime.now().isoformat(),
                'status': 'pending',
                'retry_count': 0,
                'max_retries': 3
            }
            
            # 存储任务数据
            task_data_key = self._get_task_data_key(task_id)
            self.redis_client.hmset(task_data_key, {
                'task_info': json.dumps(task_info),
                'task_data': json.dumps(task_data)
            })
            
            # 设置任务数据过期时间(24小时)
            self.redis_client.expire(task_data_key, 86400)
            
            if delay > 0:
                # 延迟任务
                self.redis_client.zadd('delayed_tasks', {
                    task_id: time.time() + delay
                })
            else:
                # 立即执行的任务
                if priority > 0:
                    # 高优先级任务插入到队列前面
                    self.redis_client.lpush(self.task_queue, task_id)
                else:
                    # 普通任务插入到队列后面
                    self.redis_client.rpush(self.task_queue, task_id)
            
            return task_id
        except Exception as e:
            print(f"添加任务失败: {e}")
            return ""
    
    def dequeue_task(self, timeout: int = 10) -> Optional[Dict[str, Any]]:
        """从队列获取任务"""
        try:
            # 阻塞式获取任务
            result = self.redis_client.blpop(self.task_queue, timeout=timeout)
            
            if not result:
                return None
            
            _, task_id = result
            
            # 获取任务数据
            task_data_key = self._get_task_data_key(task_id)
            task_raw_data = self.redis_client.hgetall(task_data_key)
            
            if not task_raw_data:
                return None
            
            # 解析任务数据
            task_info = json.loads(task_raw_data['task_info'])
            task_data = json.loads(task_raw_data['task_data'])
            
            # 移动到处理队列
            self.redis_client.lpush(self.processing_queue, task_id)
            
            # 更新任务状态
            task_info['status'] = 'processing'
            task_info['started_at'] = datetime.now().isoformat()
            
            self.redis_client.hset(task_data_key, 'task_info', json.dumps(task_info))
            
            return {
                'task_id': task_id,
                'task_info': task_info,
                'task_data': task_data
            }
        except Exception as e:
            print(f"获取任务失败: {e}")
            return None
    
    def complete_task(self, task_id: str, result: Dict[str, Any] = None) -> bool:
        """标记任务完成"""
        try:
            task_data_key = self._get_task_data_key(task_id)
            task_raw_data = self.redis_client.hgetall(task_data_key)
            
            if not task_raw_data:
                return False
            
            # 更新任务信息
            task_info = json.loads(task_raw_data['task_info'])
            task_info['status'] = 'completed'
            task_info['completed_at'] = datetime.now().isoformat()
            
            if result:
                task_info['result'] = result
            
            # 更新任务数据
            self.redis_client.hset(task_data_key, 'task_info', json.dumps(task_info))
            
            # 从处理队列移除,添加到完成队列
            self.redis_client.lrem(self.processing_queue, 1, task_id)
            self.redis_client.lpush(self.completed_queue, task_id)
            
            # 限制完成队列长度
            self.redis_client.ltrim(self.completed_queue, 0, 999)
            
            return True
        except Exception as e:
            print(f"完成任务失败: {e}")
            return False
    
    def fail_task(self, task_id: str, error: str, retry: bool = True) -> bool:
        """标记任务失败"""
        try:
            task_data_key = self._get_task_data_key(task_id)
            task_raw_data = self.redis_client.hgetall(task_data_key)
            
            if not task_raw_data:
                return False
            
            # 更新任务信息
            task_info = json.loads(task_raw_data['task_info'])
            task_info['retry_count'] += 1
            task_info['last_error'] = error
            task_info['failed_at'] = datetime.now().isoformat()
            
            # 从处理队列移除
            self.redis_client.lrem(self.processing_queue, 1, task_id)
            
            # 检查是否需要重试
            if retry and task_info['retry_count'] < task_info['max_retries']:
                task_info['status'] = 'retrying'
                # 重新加入队列(延迟重试)
                retry_delay = min(60 * (2 ** task_info['retry_count']), 3600)  # 指数退避
                self.redis_client.zadd('delayed_tasks', {
                    task_id: time.time() + retry_delay
                })
            else:
                task_info['status'] = 'failed'
                # 添加到失败队列
                self.redis_client.lpush(self.failed_queue, task_id)
                self.redis_client.ltrim(self.failed_queue, 0, 999)
            
            # 更新任务数据
            self.redis_client.hset(task_data_key, 'task_info', json.dumps(task_info))
            
            return True
        except Exception as e:
            print(f"失败任务处理失败: {e}")
            return False
    
    def process_delayed_tasks(self):
        """处理延迟任务"""
        try:
            current_time = time.time()
            
            # 获取到期的延迟任务
            ready_tasks = self.redis_client.zrangebyscore(
                'delayed_tasks', 0, current_time
            )
            
            for task_id in ready_tasks:
                # 移动到主队列
                self.redis_client.rpush(self.task_queue, task_id)
                # 从延迟队列移除
                self.redis_client.zrem('delayed_tasks', task_id)
        except Exception as e:
            print(f"处理延迟任务失败: {e}")
    
    def worker(self, worker_id: str):
        """工作线程"""
        worker_key = self._get_worker_key(worker_id)
        
        # 注册工作线程
        self.redis_client.hset(worker_key, 'started_at', datetime.now().isoformat())
        self.redis_client.hset(worker_key, 'status', 'running')
        self.redis_client.expire(worker_key, 300)  # 5分钟过期
        
        print(f"工作线程 {worker_id} 启动")
        
        while self.running:
            try:
                # 处理延迟任务
                self.process_delayed_tasks()
                
                # 获取任务
                task = self.dequeue_task(timeout=5)
                
                if not task:
                    continue
                
                task_id = task['task_id']
                task_info = task['task_info']
                task_data = task['task_data']
                task_type = task_info['task_type']
                
                print(f"工作线程 {worker_id} 处理任务 {task_id} (类型: {task_type})")
                
                # 更新工作线程状态
                self.redis_client.hset(worker_key, 'current_task', task_id)
                self.redis_client.hset(worker_key, 'last_activity', datetime.now().isoformat())
                self.redis_client.expire(worker_key, 300)
                
                # 执行任务
                if task_type in self.task_handlers:
                    try:
                        handler = self.task_handlers[task_type]
                        result = handler(task_data)
                        self.complete_task(task_id, result)
                        print(f"任务 {task_id} 完成")
                    except Exception as e:
                        error_msg = f"任务执行失败: {str(e)}"
                        self.fail_task(task_id, error_msg)
                        print(f"任务 {task_id} 失败: {error_msg}")
                else:
                    error_msg = f"未知任务类型: {task_type}"
                    self.fail_task(task_id, error_msg, retry=False)
                    print(f"任务 {task_id} 失败: {error_msg}")
                
                # 清除当前任务
                self.redis_client.hdel(worker_key, 'current_task')
                
            except Exception as e:
                print(f"工作线程 {worker_id} 错误: {e}")
                time.sleep(1)
        
        # 清理工作线程信息
        self.redis_client.delete(worker_key)
        print(f"工作线程 {worker_id} 停止")
    
    def start_workers(self, num_workers: int = 2):
        """启动工作线程"""
        self.running = True
        
        for i in range(num_workers):
            worker_id = f"worker_{i+1}"
            thread = threading.Thread(target=self.worker, args=(worker_id,))
            thread.daemon = True
            thread.start()
            self.worker_threads.append(thread)
        
        print(f"启动了 {num_workers} 个工作线程")
    
    def stop_workers(self):
        """停止工作线程"""
        self.running = False
        
        for thread in self.worker_threads:
            thread.join(timeout=5)
        
        self.worker_threads.clear()
        print("所有工作线程已停止")
    
    def get_queue_stats(self) -> Dict[str, int]:
        """获取队列统计信息"""
        try:
            return {
                'pending': self.redis_client.llen(self.task_queue),
                'processing': self.redis_client.llen(self.processing_queue),
                'completed': self.redis_client.llen(self.completed_queue),
                'failed': self.redis_client.llen(self.failed_queue),
                'delayed': self.redis_client.zcard('delayed_tasks')
            }
        except Exception as e:
            print(f"获取统计信息失败: {e}")
            return {}
    
    def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
        """获取任务状态"""
        try:
            task_data_key = self._get_task_data_key(task_id)
            task_raw_data = self.redis_client.hgetall(task_data_key)
            
            if not task_raw_data:
                return None
            
            return json.loads(task_raw_data['task_info'])
        except Exception as e:
            print(f"获取任务状态失败: {e}")
            return None
    
    def clear_completed_tasks(self, keep_count: int = 100):
        """清理已完成的任务"""
        try:
            # 保留最近的完成任务
            self.redis_client.ltrim(self.completed_queue, 0, keep_count - 1)
            
            # 清理过期的任务数据
            completed_tasks = self.redis_client.lrange(self.completed_queue, keep_count, -1)
            for task_id in completed_tasks:
                task_data_key = self._get_task_data_key(task_id)
                self.redis_client.delete(task_data_key)
            
            print(f"清理了 {len(completed_tasks)} 个已完成任务")
        except Exception as e:
            print(f"清理任务失败: {e}")

# 任务处理器示例
def send_email_handler(task_data: Dict[str, Any]) -> Dict[str, Any]:
    """发送邮件任务处理器"""
    to_email = task_data.get('to_email')
    subject = task_data.get('subject')
    content = task_data.get('content')
    
    # 模拟发送邮件
    print(f"发送邮件到 {to_email}: {subject}")
    time.sleep(2)  # 模拟处理时间
    
    return {
        'sent_at': datetime.now().isoformat(),
        'to_email': to_email,
        'status': 'sent'
    }

def process_image_handler(task_data: Dict[str, Any]) -> Dict[str, Any]:
    """图片处理任务处理器"""
    image_url = task_data.get('image_url')
    operations = task_data.get('operations', [])
    
    # 模拟图片处理
    print(f"处理图片 {image_url}: {operations}")
    time.sleep(5)  # 模拟处理时间
    
    return {
        'processed_at': datetime.now().isoformat(),
        'original_url': image_url,
        'processed_url': f"processed_{image_url}",
        'operations': operations
    }

def generate_report_handler(task_data: Dict[str, Any]) -> Dict[str, Any]:
    """生成报告任务处理器"""
    report_type = task_data.get('report_type')
    date_range = task_data.get('date_range')
    
    # 模拟报告生成
    print(f"生成报告: {report_type} ({date_range})")
    time.sleep(10)  # 模拟处理时间
    
    return {
        'generated_at': datetime.now().isoformat(),
        'report_type': report_type,
        'date_range': date_range,
        'file_path': f"/reports/{report_type}_{int(time.time())}.pdf"
    }

# 使用示例
if __name__ == "__main__":
    # 创建任务队列
    task_queue = RedisTaskQueue()
    
    # 注册任务处理器
    task_queue.register_handler('send_email', send_email_handler)
    task_queue.register_handler('process_image', process_image_handler)
    task_queue.register_handler('generate_report', generate_report_handler)
    
    # 启动工作线程
    task_queue.start_workers(num_workers=3)
    
    try:
        # 添加一些任务
        email_task_id = task_queue.enqueue_task('send_email', {
            'to_email': 'user@example.com',
            'subject': '欢迎注册',
            'content': '感谢您注册我们的服务!'
        })
        print(f"邮件任务ID: {email_task_id}")
        
        image_task_id = task_queue.enqueue_task('process_image', {
            'image_url': 'https://example.com/image.jpg',
            'operations': ['resize', 'watermark', 'compress']
        }, priority=1)  # 高优先级
        print(f"图片处理任务ID: {image_task_id}")
        
        report_task_id = task_queue.enqueue_task('generate_report', {
            'report_type': 'monthly_sales',
            'date_range': '2024-01'
        }, delay=30)  # 延迟30秒执行
        print(f"报告生成任务ID: {report_task_id}")
        
        # 监控队列状态
        for i in range(60):
            stats = task_queue.get_queue_stats()
            print(f"队列状态: {stats}")
            
            # 检查任务状态
            if i % 10 == 0:
                for task_id in [email_task_id, image_task_id, report_task_id]:
                    status = task_queue.get_task_status(task_id)
                    if status:
                        print(f"任务 {task_id} 状态: {status['status']}")
            
            time.sleep(1)
            
            # 如果所有任务都完成了,退出
            if stats['pending'] == 0 and stats['processing'] == 0 and stats['delayed'] == 0:
                break
    
    except KeyboardInterrupt:
        print("\n收到中断信号,正在停止...")
    
    finally:
        # 停止工作线程
        task_queue.stop_workers()
        
        # 显示最终统计
        final_stats = task_queue.get_queue_stats()
        print(f"最终队列状态: {final_stats}")

4.4.2 社交媒体时间线

import redis
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
import time

class RedisTimeline:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db,
            decode_responses=True
        )
        self.timeline_prefix = "timeline"
        self.post_prefix = "post"
        self.user_prefix = "user"
        self.following_prefix = "following"
        self.followers_prefix = "followers"
    
    def _get_timeline_key(self, user_id: str) -> str:
        return f"{self.timeline_prefix}:{user_id}"
    
    def _get_post_key(self, post_id: str) -> str:
        return f"{self.post_prefix}:{post_id}"
    
    def _get_following_key(self, user_id: str) -> str:
        return f"{self.following_prefix}:{user_id}"
    
    def _get_followers_key(self, user_id: str) -> str:
        return f"{self.followers_prefix}:{user_id}"
    
    def create_post(self, user_id: str, content: str, 
                   media_urls: List[str] = None, tags: List[str] = None) -> str:
        """创建帖子"""
        try:
            post_id = f"{user_id}_{int(time.time() * 1000)}"
            
            # 创建帖子数据
            post_data = {
                'post_id': post_id,
                'user_id': user_id,
                'content': content,
                'media_urls': json.dumps(media_urls or []),
                'tags': json.dumps(tags or []),
                'created_at': datetime.now().isoformat(),
                'likes_count': 0,
                'comments_count': 0,
                'shares_count': 0
            }
            
            # 存储帖子数据
            post_key = self._get_post_key(post_id)
            self.redis_client.hmset(post_key, post_data)
            
            # 设置帖子过期时间(30天)
            self.redis_client.expire(post_key, 30 * 24 * 3600)
            
            # 添加到用户自己的时间线
            user_timeline = self._get_timeline_key(user_id)
            self.redis_client.lpush(user_timeline, post_id)
            
            # 限制时间线长度
            self.redis_client.ltrim(user_timeline, 0, 999)
            
            # 推送到粉丝的时间线
            self._push_to_followers(user_id, post_id)
            
            return post_id
        except Exception as e:
            print(f"创建帖子失败: {e}")
            return ""
    
    def _push_to_followers(self, user_id: str, post_id: str):
        """推送帖子到粉丝时间线"""
        try:
            followers_key = self._get_followers_key(user_id)
            followers = self.redis_client.smembers(followers_key)
            
            # 使用管道批量操作
            pipe = self.redis_client.pipeline()
            
            for follower_id in followers:
                follower_timeline = self._get_timeline_key(follower_id)
                pipe.lpush(follower_timeline, post_id)
                pipe.ltrim(follower_timeline, 0, 999)
            
            pipe.execute()
        except Exception as e:
            print(f"推送到粉丝失败: {e}")
    
    def get_timeline(self, user_id: str, start: int = 0, 
                    count: int = 20) -> List[Dict[str, Any]]:
        """获取用户时间线"""
        try:
            timeline_key = self._get_timeline_key(user_id)
            post_ids = self.redis_client.lrange(timeline_key, start, start + count - 1)
            
            posts = []
            for post_id in post_ids:
                post = self.get_post(post_id)
                if post:
                    posts.append(post)
            
            return posts
        except Exception as e:
            print(f"获取时间线失败: {e}")
            return []
    
    def get_post(self, post_id: str) -> Optional[Dict[str, Any]]:
        """获取帖子详情"""
        try:
            post_key = self._get_post_key(post_id)
            post_data = self.redis_client.hgetall(post_key)
            
            if not post_data:
                return None
            
            # 解析JSON字段
            post_data['media_urls'] = json.loads(post_data.get('media_urls', '[]'))
            post_data['tags'] = json.loads(post_data.get('tags', '[]'))
            
            # 转换数值字段
            for field in ['likes_count', 'comments_count', 'shares_count']:
                post_data[field] = int(post_data.get(field, 0))
            
            return post_data
        except Exception as e:
            print(f"获取帖子失败: {e}")
            return None
    
    def delete_post(self, user_id: str, post_id: str) -> bool:
        """删除帖子"""
        try:
            post = self.get_post(post_id)
            if not post or post['user_id'] != user_id:
                return False
            
            # 删除帖子数据
            post_key = self._get_post_key(post_id)
            self.redis_client.delete(post_key)
            
            # 从时间线中移除
            self._remove_from_timelines(post_id)
            
            return True
        except Exception as e:
            print(f"删除帖子失败: {e}")
            return False
    
    def _remove_from_timelines(self, post_id: str):
        """从所有时间线中移除帖子"""
        try:
            # 扫描所有时间线
            for key in self.redis_client.scan_iter(match=f"{self.timeline_prefix}:*"):
                self.redis_client.lrem(key, 0, post_id)
        except Exception as e:
            print(f"从时间线移除帖子失败: {e}")
    
    def follow_user(self, follower_id: str, following_id: str) -> bool:
        """关注用户"""
        try:
            if follower_id == following_id:
                return False
            
            # 添加关注关系
            following_key = self._get_following_key(follower_id)
            followers_key = self._get_followers_key(following_id)
            
            pipe = self.redis_client.pipeline()
            pipe.sadd(following_key, following_id)
            pipe.sadd(followers_key, follower_id)
            pipe.execute()
            
            # 将被关注用户的最近帖子添加到关注者时间线
            self._sync_timeline_on_follow(follower_id, following_id)
            
            return True
        except Exception as e:
            print(f"关注用户失败: {e}")
            return False
    
    def unfollow_user(self, follower_id: str, following_id: str) -> bool:
        """取消关注用户"""
        try:
            # 移除关注关系
            following_key = self._get_following_key(follower_id)
            followers_key = self._get_followers_key(following_id)
            
            pipe = self.redis_client.pipeline()
            pipe.srem(following_key, following_id)
            pipe.srem(followers_key, follower_id)
            pipe.execute()
            
            # 从时间线中移除被取消关注用户的帖子
            self._remove_user_posts_from_timeline(follower_id, following_id)
            
            return True
        except Exception as e:
            print(f"取消关注失败: {e}")
            return False
    
    def _sync_timeline_on_follow(self, follower_id: str, following_id: str):
        """关注时同步时间线"""
        try:
            # 获取被关注用户的最近帖子
            following_timeline = self._get_timeline_key(following_id)
            recent_posts = self.redis_client.lrange(following_timeline, 0, 49)
            
            if recent_posts:
                follower_timeline = self._get_timeline_key(follower_id)
                
                # 将帖子添加到关注者时间线(保持时间顺序)
                for post_id in reversed(recent_posts):
                    self.redis_client.lpush(follower_timeline, post_id)
                
                # 限制时间线长度
                self.redis_client.ltrim(follower_timeline, 0, 999)
        except Exception as e:
            print(f"同步时间线失败: {e}")
    
    def _remove_user_posts_from_timeline(self, user_id: str, target_user_id: str):
        """从时间线中移除特定用户的帖子"""
        try:
            timeline_key = self._get_timeline_key(user_id)
            timeline_posts = self.redis_client.lrange(timeline_key, 0, -1)
            
            # 找出需要移除的帖子
            posts_to_remove = []
            for post_id in timeline_posts:
                if post_id.startswith(f"{target_user_id}_"):
                    posts_to_remove.append(post_id)
            
            # 移除帖子
            for post_id in posts_to_remove:
                self.redis_client.lrem(timeline_key, 0, post_id)
        except Exception as e:
            print(f"移除用户帖子失败: {e}")
    
    def like_post(self, user_id: str, post_id: str) -> bool:
        """点赞帖子"""
        try:
            post_key = self._get_post_key(post_id)
            
            if not self.redis_client.exists(post_key):
                return False
            
            # 检查是否已经点赞
            like_key = f"likes:{post_id}"
            if self.redis_client.sismember(like_key, user_id):
                return False
            
            # 添加点赞记录
            pipe = self.redis_client.pipeline()
            pipe.sadd(like_key, user_id)
            pipe.hincrby(post_key, 'likes_count', 1)
            pipe.expire(like_key, 30 * 24 * 3600)  # 30天过期
            pipe.execute()
            
            return True
        except Exception as e:
            print(f"点赞失败: {e}")
            return False
    
    def unlike_post(self, user_id: str, post_id: str) -> bool:
        """取消点赞"""
        try:
            post_key = self._get_post_key(post_id)
            like_key = f"likes:{post_id}"
            
            if not self.redis_client.sismember(like_key, user_id):
                return False
            
            # 移除点赞记录
            pipe = self.redis_client.pipeline()
            pipe.srem(like_key, user_id)
            pipe.hincrby(post_key, 'likes_count', -1)
            pipe.execute()
            
            return True
        except Exception as e:
            print(f"取消点赞失败: {e}")
            return False
    
    def get_user_stats(self, user_id: str) -> Dict[str, int]:
        """获取用户统计信息"""
        try:
            following_key = self._get_following_key(user_id)
            followers_key = self._get_followers_key(user_id)
            timeline_key = self._get_timeline_key(user_id)
            
            return {
                'following_count': self.redis_client.scard(following_key),
                'followers_count': self.redis_client.scard(followers_key),
                'posts_count': self.redis_client.llen(timeline_key)
            }
        except Exception as e:
            print(f"获取用户统计失败: {e}")
            return {}
    
    def get_trending_posts(self, hours: int = 24, limit: int = 10) -> List[Dict[str, Any]]:
        """获取热门帖子"""
        try:
            # 这里简化实现,实际应该基于时间窗口和互动数据
            trending_posts = []
            
            # 扫描最近的帖子
            cutoff_time = time.time() - (hours * 3600)
            
            for key in self.redis_client.scan_iter(match=f"{self.post_prefix}:*"):
                post_data = self.redis_client.hgetall(key)
                if post_data:
                    created_at = datetime.fromisoformat(post_data['created_at'])
                    if created_at.timestamp() > cutoff_time:
                        # 计算热度分数(简化算法)
                        likes = int(post_data.get('likes_count', 0))
                        comments = int(post_data.get('comments_count', 0))
                        shares = int(post_data.get('shares_count', 0))
                        
                        score = likes + (comments * 2) + (shares * 3)
                        
                        post_data['trending_score'] = score
                        post_data['media_urls'] = json.loads(post_data.get('media_urls', '[]'))
                        post_data['tags'] = json.loads(post_data.get('tags', '[]'))
                        
                        trending_posts.append(post_data)
            
            # 按热度排序
            trending_posts.sort(key=lambda x: x['trending_score'], reverse=True)
            
            return trending_posts[:limit]
        except Exception as e:
            print(f"获取热门帖子失败: {e}")
            return []
    
    def search_posts(self, keyword: str, limit: int = 20) -> List[Dict[str, Any]]:
        """搜索帖子"""
        try:
            matching_posts = []
            
            for key in self.redis_client.scan_iter(match=f"{self.post_prefix}:*"):
                post_data = self.redis_client.hgetall(key)
                if post_data:
                    content = post_data.get('content', '').lower()
                    tags = json.loads(post_data.get('tags', '[]'))
                    
                    # 检查关键词是否在内容或标签中
                    if (keyword.lower() in content or 
                        any(keyword.lower() in tag.lower() for tag in tags)):
                        
                        post_data['media_urls'] = json.loads(post_data.get('media_urls', '[]'))
                        post_data['tags'] = tags
                        
                        for field in ['likes_count', 'comments_count', 'shares_count']:
                            post_data[field] = int(post_data.get(field, 0))
                        
                        matching_posts.append(post_data)
            
            # 按创建时间排序
            matching_posts.sort(
                key=lambda x: datetime.fromisoformat(x['created_at']), 
                reverse=True
            )
            
            return matching_posts[:limit]
        except Exception as e:
            print(f"搜索帖子失败: {e}")
            return []

# 使用示例
if __name__ == "__main__":
    timeline = RedisTimeline()
    
    # 创建一些用户帖子
    post1_id = timeline.create_post(
        user_id="user_alice",
        content="今天天气真好!去公园散步了。",
        media_urls=["https://example.com/park.jpg"],
        tags=["天气", "散步", "公园"]
    )
    print(f"Alice的帖子ID: {post1_id}")
    
    post2_id = timeline.create_post(
        user_id="user_bob",
        content="刚刚完成了一个很棒的项目!",
        tags=["工作", "项目", "成就"]
    )
    print(f"Bob的帖子ID: {post2_id}")
    
    post3_id = timeline.create_post(
        user_id="user_charlie",
        content="分享一个有趣的技术文章",
        media_urls=["https://example.com/article.pdf"],
        tags=["技术", "分享", "学习"]
    )
    print(f"Charlie的帖子ID: {post3_id}")
    
    # 建立关注关系
    timeline.follow_user("user_alice", "user_bob")
    timeline.follow_user("user_alice", "user_charlie")
    timeline.follow_user("user_bob", "user_alice")
    print("关注关系已建立")
    
    # 获取Alice的时间线
    alice_timeline = timeline.get_timeline("user_alice", count=10)
    print(f"\nAlice的时间线 ({len(alice_timeline)} 条帖子):")
    for post in alice_timeline:
        print(f"- {post['user_id']}: {post['content'][:50]}...")
    
    # 点赞帖子
    timeline.like_post("user_alice", post2_id)
    timeline.like_post("user_bob", post1_id)
    timeline.like_post("user_charlie", post1_id)
    print("\n点赞操作完成")
    
    # 获取用户统计
    alice_stats = timeline.get_user_stats("user_alice")
    print(f"\nAlice的统计信息: {alice_stats}")
    
    # 搜索帖子
    tech_posts = timeline.search_posts("技术", limit=5)
    print(f"\n搜索'技术'相关帖子 ({len(tech_posts)} 条):")
    for post in tech_posts:
        print(f"- {post['user_id']}: {post['content']}")
    
    # 获取热门帖子
    trending = timeline.get_trending_posts(hours=24, limit=5)
    print(f"\n热门帖子 ({len(trending)} 条):")
    for post in trending:
        print(f"- {post['user_id']}: {post['content'][:50]}... (热度: {post['trending_score']})")
    
    # 获取帖子详情
    post_detail = timeline.get_post(post1_id)
    if post_detail:
        print(f"\n帖子详情:")
        print(f"内容: {post_detail['content']}")
        print(f"点赞数: {post_detail['likes_count']}")
        print(f"标签: {post_detail['tags']}")
        print(f"创建时间: {post_detail['created_at']}")

4.4.3 操作日志系统

import redis
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
from enum import Enum
import uuid

class LogLevel(Enum):
    DEBUG = "DEBUG"
    INFO = "INFO"
    WARNING = "WARNING"
    ERROR = "ERROR"
    CRITICAL = "CRITICAL"

class RedisLogger:
    def __init__(self, host='localhost', port=6379, db=0):
        self.redis_client = redis.Redis(
            host=host, 
            port=port, 
            db=db,
            decode_responses=True
        )
        self.log_prefix = "logs"
        self.app_logs_prefix = "app_logs"
        self.user_logs_prefix = "user_logs"
        self.error_logs_prefix = "error_logs"
        self.audit_logs_prefix = "audit_logs"
    
    def _get_log_key(self, log_type: str, identifier: str = "global") -> str:
        return f"{self.log_prefix}:{log_type}:{identifier}"
    
    def _create_log_entry(self, level: LogLevel, message: str, 
                         context: Dict[str, Any] = None) -> Dict[str, Any]:
        """创建日志条目"""
        return {
            'id': str(uuid.uuid4()),
            'timestamp': datetime.now().isoformat(),
            'level': level.value,
            'message': message,
            'context': context or {},
            'created_at': datetime.now().timestamp()
        }
    
    def log(self, level: LogLevel, message: str, 
           app_name: str = None, user_id: str = None, 
           context: Dict[str, Any] = None):
        """记录日志"""
        try:
            log_entry = self._create_log_entry(level, message, context)
            log_json = json.dumps(log_entry)
            
            # 添加到全局日志
            global_key = self._get_log_key("global")
            self.redis_client.lpush(global_key, log_json)
            self.redis_client.ltrim(global_key, 0, 9999)  # 保留最近10000条
            
            # 添加到应用日志
            if app_name:
                app_key = self._get_log_key("app", app_name)
                self.redis_client.lpush(app_key, log_json)
                self.redis_client.ltrim(app_key, 0, 4999)  # 保留最近5000条
            
            # 添加到用户日志
            if user_id:
                user_key = self._get_log_key("user", user_id)
                self.redis_client.lpush(user_key, log_json)
                self.redis_client.ltrim(user_key, 0, 999)  # 保留最近1000条
            
            # 错误日志单独存储
            if level in [LogLevel.ERROR, LogLevel.CRITICAL]:
                error_key = self._get_log_key("error")
                self.redis_client.lpush(error_key, log_json)
                self.redis_client.ltrim(error_key, 0, 1999)  # 保留最近2000条
            
            # 设置过期时间
            for key in [global_key, app_key, user_key, error_key]:
                if key and self.redis_client.exists(key):
                    self.redis_client.expire(key, 7 * 24 * 3600)  # 7天过期
        
        except Exception as e:
            print(f"记录日志失败: {e}")
    
    def debug(self, message: str, app_name: str = None, 
             user_id: str = None, context: Dict[str, Any] = None):
        """记录DEBUG日志"""
        self.log(LogLevel.DEBUG, message, app_name, user_id, context)
    
    def info(self, message: str, app_name: str = None, 
            user_id: str = None, context: Dict[str, Any] = None):
        """记录INFO日志"""
        self.log(LogLevel.INFO, message, app_name, user_id, context)
    
    def warning(self, message: str, app_name: str = None, 
               user_id: str = None, context: Dict[str, Any] = None):
        """记录WARNING日志"""
        self.log(LogLevel.WARNING, message, app_name, user_id, context)
    
    def error(self, message: str, app_name: str = None, 
             user_id: str = None, context: Dict[str, Any] = None):
        """记录ERROR日志"""
        self.log(LogLevel.ERROR, message, app_name, user_id, context)
    
    def critical(self, message: str, app_name: str = None, 
                user_id: str = None, context: Dict[str, Any] = None):
        """记录CRITICAL日志"""
        self.log(LogLevel.CRITICAL, message, app_name, user_id, context)
    
    def get_logs(self, log_type: str = "global", identifier: str = "global", 
                start: int = 0, count: int = 100, 
                level_filter: LogLevel = None) -> List[Dict[str, Any]]:
        """获取日志"""
        try:
            log_key = self._get_log_key(log_type, identifier)
            log_entries = self.redis_client.lrange(log_key, start, start + count - 1)
            
            logs = []
            for entry in log_entries:
                log_data = json.loads(entry)
                
                # 级别过滤
                if level_filter and log_data['level'] != level_filter.value:
                    continue
                
                logs.append(log_data)
            
            return logs
        except Exception as e:
            print(f"获取日志失败: {e}")
            return []
    
    def get_app_logs(self, app_name: str, start: int = 0, count: int = 100, 
                    level_filter: LogLevel = None) -> List[Dict[str, Any]]:
        """获取应用日志"""
        return self.get_logs("app", app_name, start, count, level_filter)
    
    def get_user_logs(self, user_id: str, start: int = 0, count: int = 100, 
                     level_filter: LogLevel = None) -> List[Dict[str, Any]]:
        """获取用户日志"""
        return self.get_logs("user", user_id, start, count, level_filter)
    
    def get_error_logs(self, start: int = 0, count: int = 100) -> List[Dict[str, Any]]:
        """获取错误日志"""
        return self.get_logs("error", "global", start, count)
    
    def search_logs(self, keyword: str, log_type: str = "global", 
                   identifier: str = "global", limit: int = 100) -> List[Dict[str, Any]]:
        """搜索日志"""
        try:
            log_key = self._get_log_key(log_type, identifier)
            all_logs = self.redis_client.lrange(log_key, 0, -1)
            
            matching_logs = []
            for entry in all_logs:
                log_data = json.loads(entry)
                
                # 在消息和上下文中搜索关键词
                message = log_data.get('message', '').lower()
                context_str = json.dumps(log_data.get('context', {})).lower()
                
                if keyword.lower() in message or keyword.lower() in context_str:
                    matching_logs.append(log_data)
                    
                    if len(matching_logs) >= limit:
                        break
            
            return matching_logs
        except Exception as e:
            print(f"搜索日志失败: {e}")
            return []
    
    def get_log_stats(self, log_type: str = "global", 
                     identifier: str = "global") -> Dict[str, int]:
        """获取日志统计"""
        try:
            log_key = self._get_log_key(log_type, identifier)
            all_logs = self.redis_client.lrange(log_key, 0, -1)
            
            stats = {
                'total': 0,
                'DEBUG': 0,
                'INFO': 0,
                'WARNING': 0,
                'ERROR': 0,
                'CRITICAL': 0
            }
            
            for entry in all_logs:
                log_data = json.loads(entry)
                level = log_data.get('level', 'INFO')
                stats['total'] += 1
                stats[level] = stats.get(level, 0) + 1
            
            return stats
        except Exception as e:
            print(f"获取日志统计失败: {e}")
            return {}
    
    def audit_log(self, action: str, user_id: str, resource: str, 
                 details: Dict[str, Any] = None, ip_address: str = None):
        """记录审计日志"""
        try:
            audit_entry = {
                'id': str(uuid.uuid4()),
                'timestamp': datetime.now().isoformat(),
                'action': action,
                'user_id': user_id,
                'resource': resource,
                'details': details or {},
                'ip_address': ip_address,
                'created_at': datetime.now().timestamp()
            }
            
            audit_key = self._get_log_key("audit")
            self.redis_client.lpush(audit_key, json.dumps(audit_entry))
            self.redis_client.ltrim(audit_key, 0, 9999)  # 保留最近10000条
            self.redis_client.expire(audit_key, 90 * 24 * 3600)  # 90天过期
            
            # 同时记录到用户审计日志
            user_audit_key = self._get_log_key("user_audit", user_id)
            self.redis_client.lpush(user_audit_key, json.dumps(audit_entry))
            self.redis_client.ltrim(user_audit_key, 0, 999)  # 保留最近1000条
            self.redis_client.expire(user_audit_key, 90 * 24 * 3600)  # 90天过期
        
        except Exception as e:
            print(f"记录审计日志失败: {e}")
    
    def get_audit_logs(self, start: int = 0, count: int = 100, 
                      user_id: str = None) -> List[Dict[str, Any]]:
        """获取审计日志"""
        try:
            if user_id:
                audit_key = self._get_log_key("user_audit", user_id)
            else:
                audit_key = self._get_log_key("audit")
            
            audit_entries = self.redis_client.lrange(audit_key, start, start + count - 1)
            
            audits = []
            for entry in audit_entries:
                audit_data = json.loads(entry)
                audits.append(audit_data)
            
            return audits
        except Exception as e:
            print(f"获取审计日志失败: {e}")
            return []
    
    def clear_old_logs(self, days: int = 7):
        """清理旧日志"""
        try:
            cutoff_time = datetime.now().timestamp() - (days * 24 * 3600)
            
            # 清理各类日志
            log_patterns = [
                f"{self.log_prefix}:global:*",
                f"{self.log_prefix}:app:*",
                f"{self.log_prefix}:user:*",
                f"{self.log_prefix}:error:*"
            ]
            
            for pattern in log_patterns:
                for key in self.redis_client.scan_iter(match=pattern):
                    self._clean_log_key(key, cutoff_time)
        
        except Exception as e:
            print(f"清理旧日志失败: {e}")
    
    def _clean_log_key(self, key: str, cutoff_time: float):
        """清理单个日志键的旧记录"""
        try:
            all_logs = self.redis_client.lrange(key, 0, -1)
            valid_logs = []
            
            for entry in all_logs:
                log_data = json.loads(entry)
                if log_data.get('created_at', 0) > cutoff_time:
                    valid_logs.append(entry)
            
            # 重新设置日志
            if valid_logs:
                self.redis_client.delete(key)
                self.redis_client.lpush(key, *valid_logs)
            else:
                self.redis_client.delete(key)
        
        except Exception as e:
            print(f"清理日志键失败: {e}")

# 使用示例
if __name__ == "__main__":
    logger = RedisLogger()
    
    # 记录各种级别的日志
    logger.info("应用启动", app_name="myapp", context={"version": "1.0.0"})
    logger.debug("调试信息", app_name="myapp", context={"module": "auth"})
    logger.warning("内存使用率较高", app_name="myapp", context={"memory_usage": "85%"})
    logger.error("数据库连接失败", app_name="myapp", context={
        "error_code": "DB_CONN_FAILED",
        "retry_count": 3
    })
    
    # 记录用户操作日志
    logger.info("用户登录", app_name="myapp", user_id="user_123", context={
        "ip_address": "192.168.1.100",
        "user_agent": "Mozilla/5.0..."
    })
    
    # 记录审计日志
    logger.audit_log(
        action="DELETE_USER",
        user_id="admin_001",
        resource="user:user_456",
        details={"reason": "违规行为", "operator": "admin_001"},
        ip_address="192.168.1.10"
    )
    
    # 获取应用日志
    app_logs = logger.get_app_logs("myapp", count=10)
    print(f"\n应用日志 ({len(app_logs)} 条):")
    for log in app_logs:
        print(f"[{log['level']}] {log['timestamp']}: {log['message']}")
    
    # 获取错误日志
    error_logs = logger.get_error_logs(count=5)
    print(f"\n错误日志 ({len(error_logs)} 条):")
    for log in error_logs:
        print(f"[{log['level']}] {log['timestamp']}: {log['message']}")
    
    # 搜索日志
    search_results = logger.search_logs("数据库", limit=5)
    print(f"\n搜索'数据库'相关日志 ({len(search_results)} 条):")
    for log in search_results:
        print(f"[{log['level']}] {log['message']}")
    
    # 获取日志统计
    stats = logger.get_log_stats()
    print(f"\n日志统计: {stats}")
    
    # 获取审计日志
    audit_logs = logger.get_audit_logs(count=5)
    print(f"\n审计日志 ({len(audit_logs)} 条):")
    for audit in audit_logs:
        print(f"{audit['timestamp']}: {audit['user_id']} {audit['action']} {audit['resource']}")

4.5 栈和队列实现

4.5.1 栈(LIFO)实现

import redis
from typing import Any, Optional

class RedisStack:
    def __init__(self, name: str, redis_client: redis.Redis):
        self.name = name
        self.redis_client = redis_client
        self.key = f"stack:{name}"
    
    def push(self, item: Any) -> int:
        """入栈"""
        return self.redis_client.lpush(self.key, str(item))
    
    def pop(self) -> Optional[str]:
        """出栈"""
        return self.redis_client.lpop(self.key)
    
    def peek(self) -> Optional[str]:
        """查看栈顶元素"""
        return self.redis_client.lindex(self.key, 0)
    
    def size(self) -> int:
        """栈大小"""
        return self.redis_client.llen(self.key)
    
    def is_empty(self) -> bool:
        """是否为空"""
        return self.size() == 0
    
    def clear(self):
        """清空栈"""
        self.redis_client.delete(self.key)

# 使用示例
redis_client = redis.Redis(decode_responses=True)
stack = RedisStack("my_stack", redis_client)

# 入栈操作
stack.push("first")
stack.push("second")
stack.push("third")
print(f"栈大小: {stack.size()}")

# 查看栈顶
print(f"栈顶元素: {stack.peek()}")

# 出栈操作
while not stack.is_empty():
    item = stack.pop()
    print(f"出栈: {item}")

4.5.2 队列(FIFO)实现

import redis
from typing import Any, Optional

class RedisQueue:
    def __init__(self, name: str, redis_client: redis.Redis):
        self.name = name
        self.redis_client = redis_client
        self.key = f"queue:{name}"
    
    def enqueue(self, item: Any) -> int:
        """入队"""
        return self.redis_client.rpush(self.key, str(item))
    
    def dequeue(self) -> Optional[str]:
        """出队"""
        return self.redis_client.lpop(self.key)
    
    def dequeue_blocking(self, timeout: int = 0) -> Optional[str]:
        """阻塞出队"""
        result = self.redis_client.blpop(self.key, timeout=timeout)
        return result[1] if result else None
    
    def front(self) -> Optional[str]:
        """查看队首元素"""
        return self.redis_client.lindex(self.key, 0)
    
    def rear(self) -> Optional[str]:
        """查看队尾元素"""
        return self.redis_client.lindex(self.key, -1)
    
    def size(self) -> int:
        """队列大小"""
        return self.redis_client.llen(self.key)
    
    def is_empty(self) -> bool:
        """是否为空"""
        return self.size() == 0
    
    def clear(self):
        """清空队列"""
        self.redis_client.delete(self.key)

# 使用示例
queue = RedisQueue("my_queue", redis_client)

# 入队操作
queue.enqueue("task1")
queue.enqueue("task2")
queue.enqueue("task3")
print(f"队列大小: {queue.size()}")

# 查看队首和队尾
print(f"队首: {queue.front()}")
print(f"队尾: {queue.rear()}")

# 出队操作
while not queue.is_empty():
    item = queue.dequeue()
    print(f"出队: {item}")

4.6 性能优化

4.6.1 列表编码优化

# 查看列表编码
redis-cli
> OBJECT ENCODING mylist

# 配置优化参数
# redis.conf
list-max-ziplist-size -2      # 每个节点最大8KB
list-compress-depth 1         # 压缩深度

4.6.2 内存优化

import redis

class OptimizedRedisList:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def batch_push(self, key: str, items: list, batch_size: int = 1000):
        """批量推送优化"""
        for i in range(0, len(items), batch_size):
            batch = items[i:i + batch_size]
            self.redis_client.lpush(key, *batch)
    
    def memory_efficient_scan(self, key: str, chunk_size: int = 100):
        """内存高效的扫描"""
        total_length = self.redis_client.llen(key)
        
        for start in range(0, total_length, chunk_size):
            end = min(start + chunk_size - 1, total_length - 1)
            chunk = self.redis_client.lrange(key, start, end)
            yield chunk
    
    def trim_with_backup(self, key: str, start: int, stop: int, 
                        backup_key: str = None):
        """带备份的修剪"""
        if backup_key:
            # 备份被删除的数据
            if start > 0:
                backup_data = self.redis_client.lrange(key, 0, start - 1)
                if backup_data:
                    self.redis_client.lpush(backup_key, *backup_data)
            
            list_len = self.redis_client.llen(key)
            if stop < list_len - 1:
                backup_data = self.redis_client.lrange(key, stop + 1, -1)
                if backup_data:
                    self.redis_client.rpush(backup_key, *backup_data)
        
        # 执行修剪
        self.redis_client.ltrim(key, start, stop)

# 使用示例
optimized_list = OptimizedRedisList(redis_client)

# 批量操作
large_data = [f"item_{i}" for i in range(10000)]
optimized_list.batch_push("large_list", large_data)

# 内存高效扫描
for chunk in optimized_list.memory_efficient_scan("large_list", 500):
    print(f"处理 {len(chunk)} 个项目")

# 带备份的修剪
optimized_list.trim_with_backup("large_list", 100, 199, "backup_list")

4.6.3 管道化操作

import redis
from typing import List, Any

class PipelinedListOperations:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def bulk_operations(self, operations: List[dict]):
        """批量操作"""
        pipe = self.redis_client.pipeline()
        
        for op in operations:
            cmd = op['command']
            key = op['key']
            
            if cmd == 'lpush':
                pipe.lpush(key, *op['values'])
            elif cmd == 'rpush':
                pipe.rpush(key, *op['values'])
            elif cmd == 'lpop':
                pipe.lpop(key)
            elif cmd == 'rpop':
                pipe.rpop(key)
            elif cmd == 'llen':
                pipe.llen(key)
            elif cmd == 'lrange':
                pipe.lrange(key, op['start'], op['stop'])
        
        return pipe.execute()
    
    def multi_list_stats(self, keys: List[str]) -> dict:
        """多列表统计"""
        pipe = self.redis_client.pipeline()
        
        for key in keys:
            pipe.llen(key)
            pipe.lindex(key, 0)   # 第一个元素
            pipe.lindex(key, -1)  # 最后一个元素
        
        results = pipe.execute()
        
        stats = {}
        for i, key in enumerate(keys):
            base_idx = i * 3
            stats[key] = {
                'length': results[base_idx],
                'first': results[base_idx + 1],
                'last': results[base_idx + 2]
            }
        
        return stats

# 使用示例
pipelined_ops = PipelinedListOperations(redis_client)

# 批量操作
operations = [
    {'command': 'lpush', 'key': 'list1', 'values': ['a', 'b', 'c']},
    {'command': 'rpush', 'key': 'list2', 'values': ['x', 'y', 'z']},
    {'command': 'llen', 'key': 'list1'},
    {'command': 'llen', 'key': 'list2'}
]

results = pipelined_ops.bulk_operations(operations)
print(f"批量操作结果: {results}")

# 多列表统计
stats = pipelined_ops.multi_list_stats(['list1', 'list2', 'list3'])
for key, stat in stats.items():
    print(f"{key}: 长度={stat['length']}, 首={stat['first']}, 尾={stat['last']}")

4.7 监控和调试

4.7.1 列表监控

import redis
import time
from typing import Dict, Any

class ListMonitor:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def get_list_info(self, key: str) -> Dict[str, Any]:
        """获取列表详细信息"""
        info = {
            'exists': self.redis_client.exists(key),
            'type': self.redis_client.type(key),
            'length': 0,
            'encoding': None,
            'memory_usage': 0,
            'ttl': self.redis_client.ttl(key)
        }
        
        if info['exists']:
            info['length'] = self.redis_client.llen(key)
            info['encoding'] = self.redis_client.object('encoding', key)
            
            # 内存使用(Redis 4.0+)
            try:
                info['memory_usage'] = self.redis_client.memory_usage(key)
            except:
                info['memory_usage'] = 'N/A'
        
        return info
    
    def monitor_list_performance(self, key: str, duration: int = 60):
        """监控列表性能"""
        start_time = time.time()
        samples = []
        
        while time.time() - start_time < duration:
            sample = {
                'timestamp': time.time(),
                'length': self.redis_client.llen(key),
                'memory': self.redis_client.memory_usage(key) if self.redis_client.exists(key) else 0
            }
            samples.append(sample)
            time.sleep(1)
        
        return self._analyze_samples(samples)
    
    def _analyze_samples(self, samples: list) -> Dict[str, Any]:
        """分析性能样本"""
        if not samples:
            return {}
        
        lengths = [s['length'] for s in samples]
        memories = [s['memory'] for s in samples if s['memory'] > 0]
        
        analysis = {
            'duration': samples[-1]['timestamp'] - samples[0]['timestamp'],
            'sample_count': len(samples),
            'length_stats': {
                'min': min(lengths),
                'max': max(lengths),
                'avg': sum(lengths) / len(lengths),
                'final': lengths[-1]
            }
        }
        
        if memories:
            analysis['memory_stats'] = {
                'min': min(memories),
                'max': max(memories),
                'avg': sum(memories) / len(memories),
                'final': memories[-1] if memories else 0
            }
        
        return analysis
    
    def diagnose_list_issues(self, key: str) -> Dict[str, Any]:
        """诊断列表问题"""
        info = self.get_list_info(key)
        issues = []
        recommendations = []
        
        # 检查大小
        if info['length'] > 100000:
            issues.append("列表过大,可能影响性能")
            recommendations.append("考虑分片或使用LTRIM定期清理")
        
        # 检查编码
        if info['encoding'] == 'linkedlist':
            issues.append("使用链表编码,内存效率较低")
            recommendations.append("调整list-max-ziplist-size参数")
        
        # 检查内存使用
        if isinstance(info['memory_usage'], int) and info['memory_usage'] > 10 * 1024 * 1024:  # 10MB
            issues.append("内存使用过高")
            recommendations.append("考虑数据压缩或分片")
        
        # 检查TTL
        if info['ttl'] == -1 and info['length'] > 1000:
            issues.append("大列表没有设置过期时间")
            recommendations.append("设置合适的TTL避免内存泄漏")
        
        return {
            'info': info,
            'issues': issues,
            'recommendations': recommendations
        }

# 使用示例
monitor = ListMonitor(redis_client)

# 获取列表信息
info = monitor.get_list_info("my_list")
print(f"列表信息: {info}")

# 诊断问题
diagnosis = monitor.diagnose_list_issues("my_list")
print(f"\n诊断结果:")
print(f"问题: {diagnosis['issues']}")
print(f"建议: {diagnosis['recommendations']}")

# 性能监控(需要在实际环境中运行)
# performance = monitor.monitor_list_performance("my_list", 30)
# print(f"\n性能分析: {performance}")

4.8 总结

Redis列表类型是一个功能强大且灵活的数据结构,具有以下特点:

4.8.1 核心特性

  • 有序性: 保持元素插入顺序
  • 可重复: 允许重复元素
  • 双端操作: 支持头尾两端的高效操作
  • 阻塞操作: 支持阻塞式弹出操作
  • 原子性: 所有操作都是原子的

4.8.2 适用场景

  • 消息队列: 实现可靠的任务队列系统
  • 时间线: 存储用户动态、文章列表等
  • 日志系统: 存储和管理应用日志
  • 栈和队列: 实现LIFO和FIFO数据结构
  • 最近访问: 维护最近访问记录

4.8.3 性能优化要点

  • 编码优化: 合理配置ziplist参数
  • 批量操作: 使用管道减少网络开销
  • 内存管理: 定期清理和设置TTL
  • 监控诊断: 及时发现和解决性能问题

4.8.4 最佳实践

  • 根据使用场景选择合适的操作命令
  • 避免在大列表上进行LRANGE全量操作
  • 使用LTRIM控制列表大小
  • 合理使用阻塞操作实现高效队列
  • 定期监控列表性能和内存使用

Redis列表类型为开发者提供了构建高性能、可扩展应用的强大工具,正确使用能够显著提升系统的响应速度和处理能力。

参考资料