4.1 列表类型概述
4.1.1 什么是Redis列表
Redis列表(List)是一个有序的字符串集合,支持在列表的两端进行插入和删除操作。列表中的元素可以重复,并且保持插入时的顺序。
4.1.2 列表的特点
有序性
- 元素按插入顺序排列
- 支持按索引访问
- 索引从0开始,支持负索引
双端操作
- 支持从左端(头部)操作
- 支持从右端(尾部)操作
- 操作时间复杂度为O(1)
动态长度
- 列表长度可动态变化
- 最大长度为2^32-1个元素
- 内存使用随元素数量变化
4.1.3 应用场景
消息队列
- 任务队列
- 消息传递
- 事件处理
时间线功能
- 社交媒体动态
- 操作日志
- 历史记录
栈和队列
- LIFO栈结构
- FIFO队列结构
- 双端队列
4.2 基本列表操作
4.2.1 插入操作
# 从左端插入(头部插入)
LPUSH key element [element ...]
LPUSH mylist "first"
LPUSH mylist "second" "third"
# 列表内容:["third", "second", "first"]
# 从右端插入(尾部插入)
RPUSH key element [element ...]
RPUSH mylist "fourth"
RPUSH mylist "fifth" "sixth"
# 列表内容:["third", "second", "first", "fourth", "fifth", "sixth"]
# 在指定元素前/后插入
LINSERT key BEFORE|AFTER pivot element
LINSERT mylist BEFORE "first" "zero"
# 在"first"前插入"zero"
LINSERT mylist AFTER "sixth" "seventh"
# 在"sixth"后插入"seventh"
4.2.2 获取操作
# 获取指定范围的元素
LRANGE key start stop
LRANGE mylist 0 -1
# 获取所有元素
LRANGE mylist 0 2
# 获取前3个元素
LRANGE mylist -3 -1
# 获取最后3个元素
# 获取指定索引的元素
LINDEX key index
LINDEX mylist 0
# 获取第一个元素
LINDEX mylist -1
# 获取最后一个元素
# 获取列表长度
LLEN key
LLEN mylist
# 返回列表元素数量
4.2.3 删除操作
# 从左端删除(头部删除)
LPOP key [count]
LPOP mylist
# 删除并返回第一个元素
LPOP mylist 2
# 删除并返回前2个元素
# 从右端删除(尾部删除)
RPOP key [count]
RPOP mylist
# 删除并返回最后一个元素
RPOP mylist 2
# 删除并返回最后2个元素
# 删除指定值的元素
LREM key count element
LREM mylist 1 "second"
# 从头开始删除1个"second"
LREM mylist -1 "third"
# 从尾开始删除1个"third"
LREM mylist 0 "fourth"
# 删除所有"fourth"
4.2.4 修改操作
# 设置指定索引的元素值
LSET key index element
LSET mylist 0 "new_first"
# 设置第一个元素为"new_first"
LSET mylist -1 "new_last"
# 设置最后一个元素为"new_last"
# 保留指定范围的元素
LTRIM key start stop
LTRIM mylist 1 3
# 只保留索引1到3的元素
4.3 阻塞操作
4.3.1 阻塞弹出
# 阻塞式左端弹出
BLPOP key [key ...] timeout
BLPOP mylist 10
# 如果列表为空,等待10秒
BLPOP list1 list2 list3 5
# 从多个列表中弹出,等待5秒
# 阻塞式右端弹出
BRPOP key [key ...] timeout
BRPOP mylist 0
# 无限等待直到有元素
4.3.2 阻塞移动
# 阻塞式从一个列表移动到另一个列表
BRPOPLPUSH source destination timeout
BRPOPLPUSH source_list dest_list 10
# 从source_list右端弹出,插入到dest_list左端
# 非阻塞版本
RPOPLPUSH source destination
RPOPLPUSH source_list dest_list
4.4 实际应用示例
4.4.1 任务队列系统
import redis
import json
import time
import threading
from typing import Dict, Any, Optional, List, Callable
from datetime import datetime
import uuid
class RedisTaskQueue:
def __init__(self, host='localhost', port=6379, db=0, password=None):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
password=password,
decode_responses=True
)
self.task_queue = "task_queue"
self.processing_queue = "processing_queue"
self.completed_queue = "completed_queue"
self.failed_queue = "failed_queue"
self.task_data_prefix = "task_data"
self.worker_prefix = "worker"
# 任务处理器注册表
self.task_handlers: Dict[str, Callable] = {}
# 工作线程控制
self.running = False
self.worker_threads: List[threading.Thread] = []
def _get_task_data_key(self, task_id: str) -> str:
return f"{self.task_data_prefix}:{task_id}"
def _get_worker_key(self, worker_id: str) -> str:
return f"{self.worker_prefix}:{worker_id}"
def register_handler(self, task_type: str, handler: Callable):
"""注册任务处理器"""
self.task_handlers[task_type] = handler
def enqueue_task(self, task_type: str, task_data: Dict[str, Any],
priority: int = 0, delay: int = 0) -> str:
"""添加任务到队列"""
try:
task_id = str(uuid.uuid4())
# 创建任务信息
task_info = {
'task_id': task_id,
'task_type': task_type,
'priority': priority,
'created_at': datetime.now().isoformat(),
'status': 'pending',
'retry_count': 0,
'max_retries': 3
}
# 存储任务数据
task_data_key = self._get_task_data_key(task_id)
self.redis_client.hmset(task_data_key, {
'task_info': json.dumps(task_info),
'task_data': json.dumps(task_data)
})
# 设置任务数据过期时间(24小时)
self.redis_client.expire(task_data_key, 86400)
if delay > 0:
# 延迟任务
self.redis_client.zadd('delayed_tasks', {
task_id: time.time() + delay
})
else:
# 立即执行的任务
if priority > 0:
# 高优先级任务插入到队列前面
self.redis_client.lpush(self.task_queue, task_id)
else:
# 普通任务插入到队列后面
self.redis_client.rpush(self.task_queue, task_id)
return task_id
except Exception as e:
print(f"添加任务失败: {e}")
return ""
def dequeue_task(self, timeout: int = 10) -> Optional[Dict[str, Any]]:
"""从队列获取任务"""
try:
# 阻塞式获取任务
result = self.redis_client.blpop(self.task_queue, timeout=timeout)
if not result:
return None
_, task_id = result
# 获取任务数据
task_data_key = self._get_task_data_key(task_id)
task_raw_data = self.redis_client.hgetall(task_data_key)
if not task_raw_data:
return None
# 解析任务数据
task_info = json.loads(task_raw_data['task_info'])
task_data = json.loads(task_raw_data['task_data'])
# 移动到处理队列
self.redis_client.lpush(self.processing_queue, task_id)
# 更新任务状态
task_info['status'] = 'processing'
task_info['started_at'] = datetime.now().isoformat()
self.redis_client.hset(task_data_key, 'task_info', json.dumps(task_info))
return {
'task_id': task_id,
'task_info': task_info,
'task_data': task_data
}
except Exception as e:
print(f"获取任务失败: {e}")
return None
def complete_task(self, task_id: str, result: Dict[str, Any] = None) -> bool:
"""标记任务完成"""
try:
task_data_key = self._get_task_data_key(task_id)
task_raw_data = self.redis_client.hgetall(task_data_key)
if not task_raw_data:
return False
# 更新任务信息
task_info = json.loads(task_raw_data['task_info'])
task_info['status'] = 'completed'
task_info['completed_at'] = datetime.now().isoformat()
if result:
task_info['result'] = result
# 更新任务数据
self.redis_client.hset(task_data_key, 'task_info', json.dumps(task_info))
# 从处理队列移除,添加到完成队列
self.redis_client.lrem(self.processing_queue, 1, task_id)
self.redis_client.lpush(self.completed_queue, task_id)
# 限制完成队列长度
self.redis_client.ltrim(self.completed_queue, 0, 999)
return True
except Exception as e:
print(f"完成任务失败: {e}")
return False
def fail_task(self, task_id: str, error: str, retry: bool = True) -> bool:
"""标记任务失败"""
try:
task_data_key = self._get_task_data_key(task_id)
task_raw_data = self.redis_client.hgetall(task_data_key)
if not task_raw_data:
return False
# 更新任务信息
task_info = json.loads(task_raw_data['task_info'])
task_info['retry_count'] += 1
task_info['last_error'] = error
task_info['failed_at'] = datetime.now().isoformat()
# 从处理队列移除
self.redis_client.lrem(self.processing_queue, 1, task_id)
# 检查是否需要重试
if retry and task_info['retry_count'] < task_info['max_retries']:
task_info['status'] = 'retrying'
# 重新加入队列(延迟重试)
retry_delay = min(60 * (2 ** task_info['retry_count']), 3600) # 指数退避
self.redis_client.zadd('delayed_tasks', {
task_id: time.time() + retry_delay
})
else:
task_info['status'] = 'failed'
# 添加到失败队列
self.redis_client.lpush(self.failed_queue, task_id)
self.redis_client.ltrim(self.failed_queue, 0, 999)
# 更新任务数据
self.redis_client.hset(task_data_key, 'task_info', json.dumps(task_info))
return True
except Exception as e:
print(f"失败任务处理失败: {e}")
return False
def process_delayed_tasks(self):
"""处理延迟任务"""
try:
current_time = time.time()
# 获取到期的延迟任务
ready_tasks = self.redis_client.zrangebyscore(
'delayed_tasks', 0, current_time
)
for task_id in ready_tasks:
# 移动到主队列
self.redis_client.rpush(self.task_queue, task_id)
# 从延迟队列移除
self.redis_client.zrem('delayed_tasks', task_id)
except Exception as e:
print(f"处理延迟任务失败: {e}")
def worker(self, worker_id: str):
"""工作线程"""
worker_key = self._get_worker_key(worker_id)
# 注册工作线程
self.redis_client.hset(worker_key, 'started_at', datetime.now().isoformat())
self.redis_client.hset(worker_key, 'status', 'running')
self.redis_client.expire(worker_key, 300) # 5分钟过期
print(f"工作线程 {worker_id} 启动")
while self.running:
try:
# 处理延迟任务
self.process_delayed_tasks()
# 获取任务
task = self.dequeue_task(timeout=5)
if not task:
continue
task_id = task['task_id']
task_info = task['task_info']
task_data = task['task_data']
task_type = task_info['task_type']
print(f"工作线程 {worker_id} 处理任务 {task_id} (类型: {task_type})")
# 更新工作线程状态
self.redis_client.hset(worker_key, 'current_task', task_id)
self.redis_client.hset(worker_key, 'last_activity', datetime.now().isoformat())
self.redis_client.expire(worker_key, 300)
# 执行任务
if task_type in self.task_handlers:
try:
handler = self.task_handlers[task_type]
result = handler(task_data)
self.complete_task(task_id, result)
print(f"任务 {task_id} 完成")
except Exception as e:
error_msg = f"任务执行失败: {str(e)}"
self.fail_task(task_id, error_msg)
print(f"任务 {task_id} 失败: {error_msg}")
else:
error_msg = f"未知任务类型: {task_type}"
self.fail_task(task_id, error_msg, retry=False)
print(f"任务 {task_id} 失败: {error_msg}")
# 清除当前任务
self.redis_client.hdel(worker_key, 'current_task')
except Exception as e:
print(f"工作线程 {worker_id} 错误: {e}")
time.sleep(1)
# 清理工作线程信息
self.redis_client.delete(worker_key)
print(f"工作线程 {worker_id} 停止")
def start_workers(self, num_workers: int = 2):
"""启动工作线程"""
self.running = True
for i in range(num_workers):
worker_id = f"worker_{i+1}"
thread = threading.Thread(target=self.worker, args=(worker_id,))
thread.daemon = True
thread.start()
self.worker_threads.append(thread)
print(f"启动了 {num_workers} 个工作线程")
def stop_workers(self):
"""停止工作线程"""
self.running = False
for thread in self.worker_threads:
thread.join(timeout=5)
self.worker_threads.clear()
print("所有工作线程已停止")
def get_queue_stats(self) -> Dict[str, int]:
"""获取队列统计信息"""
try:
return {
'pending': self.redis_client.llen(self.task_queue),
'processing': self.redis_client.llen(self.processing_queue),
'completed': self.redis_client.llen(self.completed_queue),
'failed': self.redis_client.llen(self.failed_queue),
'delayed': self.redis_client.zcard('delayed_tasks')
}
except Exception as e:
print(f"获取统计信息失败: {e}")
return {}
def get_task_status(self, task_id: str) -> Optional[Dict[str, Any]]:
"""获取任务状态"""
try:
task_data_key = self._get_task_data_key(task_id)
task_raw_data = self.redis_client.hgetall(task_data_key)
if not task_raw_data:
return None
return json.loads(task_raw_data['task_info'])
except Exception as e:
print(f"获取任务状态失败: {e}")
return None
def clear_completed_tasks(self, keep_count: int = 100):
"""清理已完成的任务"""
try:
# 保留最近的完成任务
self.redis_client.ltrim(self.completed_queue, 0, keep_count - 1)
# 清理过期的任务数据
completed_tasks = self.redis_client.lrange(self.completed_queue, keep_count, -1)
for task_id in completed_tasks:
task_data_key = self._get_task_data_key(task_id)
self.redis_client.delete(task_data_key)
print(f"清理了 {len(completed_tasks)} 个已完成任务")
except Exception as e:
print(f"清理任务失败: {e}")
# 任务处理器示例
def send_email_handler(task_data: Dict[str, Any]) -> Dict[str, Any]:
"""发送邮件任务处理器"""
to_email = task_data.get('to_email')
subject = task_data.get('subject')
content = task_data.get('content')
# 模拟发送邮件
print(f"发送邮件到 {to_email}: {subject}")
time.sleep(2) # 模拟处理时间
return {
'sent_at': datetime.now().isoformat(),
'to_email': to_email,
'status': 'sent'
}
def process_image_handler(task_data: Dict[str, Any]) -> Dict[str, Any]:
"""图片处理任务处理器"""
image_url = task_data.get('image_url')
operations = task_data.get('operations', [])
# 模拟图片处理
print(f"处理图片 {image_url}: {operations}")
time.sleep(5) # 模拟处理时间
return {
'processed_at': datetime.now().isoformat(),
'original_url': image_url,
'processed_url': f"processed_{image_url}",
'operations': operations
}
def generate_report_handler(task_data: Dict[str, Any]) -> Dict[str, Any]:
"""生成报告任务处理器"""
report_type = task_data.get('report_type')
date_range = task_data.get('date_range')
# 模拟报告生成
print(f"生成报告: {report_type} ({date_range})")
time.sleep(10) # 模拟处理时间
return {
'generated_at': datetime.now().isoformat(),
'report_type': report_type,
'date_range': date_range,
'file_path': f"/reports/{report_type}_{int(time.time())}.pdf"
}
# 使用示例
if __name__ == "__main__":
# 创建任务队列
task_queue = RedisTaskQueue()
# 注册任务处理器
task_queue.register_handler('send_email', send_email_handler)
task_queue.register_handler('process_image', process_image_handler)
task_queue.register_handler('generate_report', generate_report_handler)
# 启动工作线程
task_queue.start_workers(num_workers=3)
try:
# 添加一些任务
email_task_id = task_queue.enqueue_task('send_email', {
'to_email': 'user@example.com',
'subject': '欢迎注册',
'content': '感谢您注册我们的服务!'
})
print(f"邮件任务ID: {email_task_id}")
image_task_id = task_queue.enqueue_task('process_image', {
'image_url': 'https://example.com/image.jpg',
'operations': ['resize', 'watermark', 'compress']
}, priority=1) # 高优先级
print(f"图片处理任务ID: {image_task_id}")
report_task_id = task_queue.enqueue_task('generate_report', {
'report_type': 'monthly_sales',
'date_range': '2024-01'
}, delay=30) # 延迟30秒执行
print(f"报告生成任务ID: {report_task_id}")
# 监控队列状态
for i in range(60):
stats = task_queue.get_queue_stats()
print(f"队列状态: {stats}")
# 检查任务状态
if i % 10 == 0:
for task_id in [email_task_id, image_task_id, report_task_id]:
status = task_queue.get_task_status(task_id)
if status:
print(f"任务 {task_id} 状态: {status['status']}")
time.sleep(1)
# 如果所有任务都完成了,退出
if stats['pending'] == 0 and stats['processing'] == 0 and stats['delayed'] == 0:
break
except KeyboardInterrupt:
print("\n收到中断信号,正在停止...")
finally:
# 停止工作线程
task_queue.stop_workers()
# 显示最终统计
final_stats = task_queue.get_queue_stats()
print(f"最终队列状态: {final_stats}")
4.4.2 社交媒体时间线
import redis
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
import time
class RedisTimeline:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
self.timeline_prefix = "timeline"
self.post_prefix = "post"
self.user_prefix = "user"
self.following_prefix = "following"
self.followers_prefix = "followers"
def _get_timeline_key(self, user_id: str) -> str:
return f"{self.timeline_prefix}:{user_id}"
def _get_post_key(self, post_id: str) -> str:
return f"{self.post_prefix}:{post_id}"
def _get_following_key(self, user_id: str) -> str:
return f"{self.following_prefix}:{user_id}"
def _get_followers_key(self, user_id: str) -> str:
return f"{self.followers_prefix}:{user_id}"
def create_post(self, user_id: str, content: str,
media_urls: List[str] = None, tags: List[str] = None) -> str:
"""创建帖子"""
try:
post_id = f"{user_id}_{int(time.time() * 1000)}"
# 创建帖子数据
post_data = {
'post_id': post_id,
'user_id': user_id,
'content': content,
'media_urls': json.dumps(media_urls or []),
'tags': json.dumps(tags or []),
'created_at': datetime.now().isoformat(),
'likes_count': 0,
'comments_count': 0,
'shares_count': 0
}
# 存储帖子数据
post_key = self._get_post_key(post_id)
self.redis_client.hmset(post_key, post_data)
# 设置帖子过期时间(30天)
self.redis_client.expire(post_key, 30 * 24 * 3600)
# 添加到用户自己的时间线
user_timeline = self._get_timeline_key(user_id)
self.redis_client.lpush(user_timeline, post_id)
# 限制时间线长度
self.redis_client.ltrim(user_timeline, 0, 999)
# 推送到粉丝的时间线
self._push_to_followers(user_id, post_id)
return post_id
except Exception as e:
print(f"创建帖子失败: {e}")
return ""
def _push_to_followers(self, user_id: str, post_id: str):
"""推送帖子到粉丝时间线"""
try:
followers_key = self._get_followers_key(user_id)
followers = self.redis_client.smembers(followers_key)
# 使用管道批量操作
pipe = self.redis_client.pipeline()
for follower_id in followers:
follower_timeline = self._get_timeline_key(follower_id)
pipe.lpush(follower_timeline, post_id)
pipe.ltrim(follower_timeline, 0, 999)
pipe.execute()
except Exception as e:
print(f"推送到粉丝失败: {e}")
def get_timeline(self, user_id: str, start: int = 0,
count: int = 20) -> List[Dict[str, Any]]:
"""获取用户时间线"""
try:
timeline_key = self._get_timeline_key(user_id)
post_ids = self.redis_client.lrange(timeline_key, start, start + count - 1)
posts = []
for post_id in post_ids:
post = self.get_post(post_id)
if post:
posts.append(post)
return posts
except Exception as e:
print(f"获取时间线失败: {e}")
return []
def get_post(self, post_id: str) -> Optional[Dict[str, Any]]:
"""获取帖子详情"""
try:
post_key = self._get_post_key(post_id)
post_data = self.redis_client.hgetall(post_key)
if not post_data:
return None
# 解析JSON字段
post_data['media_urls'] = json.loads(post_data.get('media_urls', '[]'))
post_data['tags'] = json.loads(post_data.get('tags', '[]'))
# 转换数值字段
for field in ['likes_count', 'comments_count', 'shares_count']:
post_data[field] = int(post_data.get(field, 0))
return post_data
except Exception as e:
print(f"获取帖子失败: {e}")
return None
def delete_post(self, user_id: str, post_id: str) -> bool:
"""删除帖子"""
try:
post = self.get_post(post_id)
if not post or post['user_id'] != user_id:
return False
# 删除帖子数据
post_key = self._get_post_key(post_id)
self.redis_client.delete(post_key)
# 从时间线中移除
self._remove_from_timelines(post_id)
return True
except Exception as e:
print(f"删除帖子失败: {e}")
return False
def _remove_from_timelines(self, post_id: str):
"""从所有时间线中移除帖子"""
try:
# 扫描所有时间线
for key in self.redis_client.scan_iter(match=f"{self.timeline_prefix}:*"):
self.redis_client.lrem(key, 0, post_id)
except Exception as e:
print(f"从时间线移除帖子失败: {e}")
def follow_user(self, follower_id: str, following_id: str) -> bool:
"""关注用户"""
try:
if follower_id == following_id:
return False
# 添加关注关系
following_key = self._get_following_key(follower_id)
followers_key = self._get_followers_key(following_id)
pipe = self.redis_client.pipeline()
pipe.sadd(following_key, following_id)
pipe.sadd(followers_key, follower_id)
pipe.execute()
# 将被关注用户的最近帖子添加到关注者时间线
self._sync_timeline_on_follow(follower_id, following_id)
return True
except Exception as e:
print(f"关注用户失败: {e}")
return False
def unfollow_user(self, follower_id: str, following_id: str) -> bool:
"""取消关注用户"""
try:
# 移除关注关系
following_key = self._get_following_key(follower_id)
followers_key = self._get_followers_key(following_id)
pipe = self.redis_client.pipeline()
pipe.srem(following_key, following_id)
pipe.srem(followers_key, follower_id)
pipe.execute()
# 从时间线中移除被取消关注用户的帖子
self._remove_user_posts_from_timeline(follower_id, following_id)
return True
except Exception as e:
print(f"取消关注失败: {e}")
return False
def _sync_timeline_on_follow(self, follower_id: str, following_id: str):
"""关注时同步时间线"""
try:
# 获取被关注用户的最近帖子
following_timeline = self._get_timeline_key(following_id)
recent_posts = self.redis_client.lrange(following_timeline, 0, 49)
if recent_posts:
follower_timeline = self._get_timeline_key(follower_id)
# 将帖子添加到关注者时间线(保持时间顺序)
for post_id in reversed(recent_posts):
self.redis_client.lpush(follower_timeline, post_id)
# 限制时间线长度
self.redis_client.ltrim(follower_timeline, 0, 999)
except Exception as e:
print(f"同步时间线失败: {e}")
def _remove_user_posts_from_timeline(self, user_id: str, target_user_id: str):
"""从时间线中移除特定用户的帖子"""
try:
timeline_key = self._get_timeline_key(user_id)
timeline_posts = self.redis_client.lrange(timeline_key, 0, -1)
# 找出需要移除的帖子
posts_to_remove = []
for post_id in timeline_posts:
if post_id.startswith(f"{target_user_id}_"):
posts_to_remove.append(post_id)
# 移除帖子
for post_id in posts_to_remove:
self.redis_client.lrem(timeline_key, 0, post_id)
except Exception as e:
print(f"移除用户帖子失败: {e}")
def like_post(self, user_id: str, post_id: str) -> bool:
"""点赞帖子"""
try:
post_key = self._get_post_key(post_id)
if not self.redis_client.exists(post_key):
return False
# 检查是否已经点赞
like_key = f"likes:{post_id}"
if self.redis_client.sismember(like_key, user_id):
return False
# 添加点赞记录
pipe = self.redis_client.pipeline()
pipe.sadd(like_key, user_id)
pipe.hincrby(post_key, 'likes_count', 1)
pipe.expire(like_key, 30 * 24 * 3600) # 30天过期
pipe.execute()
return True
except Exception as e:
print(f"点赞失败: {e}")
return False
def unlike_post(self, user_id: str, post_id: str) -> bool:
"""取消点赞"""
try:
post_key = self._get_post_key(post_id)
like_key = f"likes:{post_id}"
if not self.redis_client.sismember(like_key, user_id):
return False
# 移除点赞记录
pipe = self.redis_client.pipeline()
pipe.srem(like_key, user_id)
pipe.hincrby(post_key, 'likes_count', -1)
pipe.execute()
return True
except Exception as e:
print(f"取消点赞失败: {e}")
return False
def get_user_stats(self, user_id: str) -> Dict[str, int]:
"""获取用户统计信息"""
try:
following_key = self._get_following_key(user_id)
followers_key = self._get_followers_key(user_id)
timeline_key = self._get_timeline_key(user_id)
return {
'following_count': self.redis_client.scard(following_key),
'followers_count': self.redis_client.scard(followers_key),
'posts_count': self.redis_client.llen(timeline_key)
}
except Exception as e:
print(f"获取用户统计失败: {e}")
return {}
def get_trending_posts(self, hours: int = 24, limit: int = 10) -> List[Dict[str, Any]]:
"""获取热门帖子"""
try:
# 这里简化实现,实际应该基于时间窗口和互动数据
trending_posts = []
# 扫描最近的帖子
cutoff_time = time.time() - (hours * 3600)
for key in self.redis_client.scan_iter(match=f"{self.post_prefix}:*"):
post_data = self.redis_client.hgetall(key)
if post_data:
created_at = datetime.fromisoformat(post_data['created_at'])
if created_at.timestamp() > cutoff_time:
# 计算热度分数(简化算法)
likes = int(post_data.get('likes_count', 0))
comments = int(post_data.get('comments_count', 0))
shares = int(post_data.get('shares_count', 0))
score = likes + (comments * 2) + (shares * 3)
post_data['trending_score'] = score
post_data['media_urls'] = json.loads(post_data.get('media_urls', '[]'))
post_data['tags'] = json.loads(post_data.get('tags', '[]'))
trending_posts.append(post_data)
# 按热度排序
trending_posts.sort(key=lambda x: x['trending_score'], reverse=True)
return trending_posts[:limit]
except Exception as e:
print(f"获取热门帖子失败: {e}")
return []
def search_posts(self, keyword: str, limit: int = 20) -> List[Dict[str, Any]]:
"""搜索帖子"""
try:
matching_posts = []
for key in self.redis_client.scan_iter(match=f"{self.post_prefix}:*"):
post_data = self.redis_client.hgetall(key)
if post_data:
content = post_data.get('content', '').lower()
tags = json.loads(post_data.get('tags', '[]'))
# 检查关键词是否在内容或标签中
if (keyword.lower() in content or
any(keyword.lower() in tag.lower() for tag in tags)):
post_data['media_urls'] = json.loads(post_data.get('media_urls', '[]'))
post_data['tags'] = tags
for field in ['likes_count', 'comments_count', 'shares_count']:
post_data[field] = int(post_data.get(field, 0))
matching_posts.append(post_data)
# 按创建时间排序
matching_posts.sort(
key=lambda x: datetime.fromisoformat(x['created_at']),
reverse=True
)
return matching_posts[:limit]
except Exception as e:
print(f"搜索帖子失败: {e}")
return []
# 使用示例
if __name__ == "__main__":
timeline = RedisTimeline()
# 创建一些用户帖子
post1_id = timeline.create_post(
user_id="user_alice",
content="今天天气真好!去公园散步了。",
media_urls=["https://example.com/park.jpg"],
tags=["天气", "散步", "公园"]
)
print(f"Alice的帖子ID: {post1_id}")
post2_id = timeline.create_post(
user_id="user_bob",
content="刚刚完成了一个很棒的项目!",
tags=["工作", "项目", "成就"]
)
print(f"Bob的帖子ID: {post2_id}")
post3_id = timeline.create_post(
user_id="user_charlie",
content="分享一个有趣的技术文章",
media_urls=["https://example.com/article.pdf"],
tags=["技术", "分享", "学习"]
)
print(f"Charlie的帖子ID: {post3_id}")
# 建立关注关系
timeline.follow_user("user_alice", "user_bob")
timeline.follow_user("user_alice", "user_charlie")
timeline.follow_user("user_bob", "user_alice")
print("关注关系已建立")
# 获取Alice的时间线
alice_timeline = timeline.get_timeline("user_alice", count=10)
print(f"\nAlice的时间线 ({len(alice_timeline)} 条帖子):")
for post in alice_timeline:
print(f"- {post['user_id']}: {post['content'][:50]}...")
# 点赞帖子
timeline.like_post("user_alice", post2_id)
timeline.like_post("user_bob", post1_id)
timeline.like_post("user_charlie", post1_id)
print("\n点赞操作完成")
# 获取用户统计
alice_stats = timeline.get_user_stats("user_alice")
print(f"\nAlice的统计信息: {alice_stats}")
# 搜索帖子
tech_posts = timeline.search_posts("技术", limit=5)
print(f"\n搜索'技术'相关帖子 ({len(tech_posts)} 条):")
for post in tech_posts:
print(f"- {post['user_id']}: {post['content']}")
# 获取热门帖子
trending = timeline.get_trending_posts(hours=24, limit=5)
print(f"\n热门帖子 ({len(trending)} 条):")
for post in trending:
print(f"- {post['user_id']}: {post['content'][:50]}... (热度: {post['trending_score']})")
# 获取帖子详情
post_detail = timeline.get_post(post1_id)
if post_detail:
print(f"\n帖子详情:")
print(f"内容: {post_detail['content']}")
print(f"点赞数: {post_detail['likes_count']}")
print(f"标签: {post_detail['tags']}")
print(f"创建时间: {post_detail['created_at']}")
4.4.3 操作日志系统
import redis
import json
from typing import Dict, Any, Optional, List
from datetime import datetime
from enum import Enum
import uuid
class LogLevel(Enum):
DEBUG = "DEBUG"
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
class RedisLogger:
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
self.log_prefix = "logs"
self.app_logs_prefix = "app_logs"
self.user_logs_prefix = "user_logs"
self.error_logs_prefix = "error_logs"
self.audit_logs_prefix = "audit_logs"
def _get_log_key(self, log_type: str, identifier: str = "global") -> str:
return f"{self.log_prefix}:{log_type}:{identifier}"
def _create_log_entry(self, level: LogLevel, message: str,
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""创建日志条目"""
return {
'id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'level': level.value,
'message': message,
'context': context or {},
'created_at': datetime.now().timestamp()
}
def log(self, level: LogLevel, message: str,
app_name: str = None, user_id: str = None,
context: Dict[str, Any] = None):
"""记录日志"""
try:
log_entry = self._create_log_entry(level, message, context)
log_json = json.dumps(log_entry)
# 添加到全局日志
global_key = self._get_log_key("global")
self.redis_client.lpush(global_key, log_json)
self.redis_client.ltrim(global_key, 0, 9999) # 保留最近10000条
# 添加到应用日志
if app_name:
app_key = self._get_log_key("app", app_name)
self.redis_client.lpush(app_key, log_json)
self.redis_client.ltrim(app_key, 0, 4999) # 保留最近5000条
# 添加到用户日志
if user_id:
user_key = self._get_log_key("user", user_id)
self.redis_client.lpush(user_key, log_json)
self.redis_client.ltrim(user_key, 0, 999) # 保留最近1000条
# 错误日志单独存储
if level in [LogLevel.ERROR, LogLevel.CRITICAL]:
error_key = self._get_log_key("error")
self.redis_client.lpush(error_key, log_json)
self.redis_client.ltrim(error_key, 0, 1999) # 保留最近2000条
# 设置过期时间
for key in [global_key, app_key, user_key, error_key]:
if key and self.redis_client.exists(key):
self.redis_client.expire(key, 7 * 24 * 3600) # 7天过期
except Exception as e:
print(f"记录日志失败: {e}")
def debug(self, message: str, app_name: str = None,
user_id: str = None, context: Dict[str, Any] = None):
"""记录DEBUG日志"""
self.log(LogLevel.DEBUG, message, app_name, user_id, context)
def info(self, message: str, app_name: str = None,
user_id: str = None, context: Dict[str, Any] = None):
"""记录INFO日志"""
self.log(LogLevel.INFO, message, app_name, user_id, context)
def warning(self, message: str, app_name: str = None,
user_id: str = None, context: Dict[str, Any] = None):
"""记录WARNING日志"""
self.log(LogLevel.WARNING, message, app_name, user_id, context)
def error(self, message: str, app_name: str = None,
user_id: str = None, context: Dict[str, Any] = None):
"""记录ERROR日志"""
self.log(LogLevel.ERROR, message, app_name, user_id, context)
def critical(self, message: str, app_name: str = None,
user_id: str = None, context: Dict[str, Any] = None):
"""记录CRITICAL日志"""
self.log(LogLevel.CRITICAL, message, app_name, user_id, context)
def get_logs(self, log_type: str = "global", identifier: str = "global",
start: int = 0, count: int = 100,
level_filter: LogLevel = None) -> List[Dict[str, Any]]:
"""获取日志"""
try:
log_key = self._get_log_key(log_type, identifier)
log_entries = self.redis_client.lrange(log_key, start, start + count - 1)
logs = []
for entry in log_entries:
log_data = json.loads(entry)
# 级别过滤
if level_filter and log_data['level'] != level_filter.value:
continue
logs.append(log_data)
return logs
except Exception as e:
print(f"获取日志失败: {e}")
return []
def get_app_logs(self, app_name: str, start: int = 0, count: int = 100,
level_filter: LogLevel = None) -> List[Dict[str, Any]]:
"""获取应用日志"""
return self.get_logs("app", app_name, start, count, level_filter)
def get_user_logs(self, user_id: str, start: int = 0, count: int = 100,
level_filter: LogLevel = None) -> List[Dict[str, Any]]:
"""获取用户日志"""
return self.get_logs("user", user_id, start, count, level_filter)
def get_error_logs(self, start: int = 0, count: int = 100) -> List[Dict[str, Any]]:
"""获取错误日志"""
return self.get_logs("error", "global", start, count)
def search_logs(self, keyword: str, log_type: str = "global",
identifier: str = "global", limit: int = 100) -> List[Dict[str, Any]]:
"""搜索日志"""
try:
log_key = self._get_log_key(log_type, identifier)
all_logs = self.redis_client.lrange(log_key, 0, -1)
matching_logs = []
for entry in all_logs:
log_data = json.loads(entry)
# 在消息和上下文中搜索关键词
message = log_data.get('message', '').lower()
context_str = json.dumps(log_data.get('context', {})).lower()
if keyword.lower() in message or keyword.lower() in context_str:
matching_logs.append(log_data)
if len(matching_logs) >= limit:
break
return matching_logs
except Exception as e:
print(f"搜索日志失败: {e}")
return []
def get_log_stats(self, log_type: str = "global",
identifier: str = "global") -> Dict[str, int]:
"""获取日志统计"""
try:
log_key = self._get_log_key(log_type, identifier)
all_logs = self.redis_client.lrange(log_key, 0, -1)
stats = {
'total': 0,
'DEBUG': 0,
'INFO': 0,
'WARNING': 0,
'ERROR': 0,
'CRITICAL': 0
}
for entry in all_logs:
log_data = json.loads(entry)
level = log_data.get('level', 'INFO')
stats['total'] += 1
stats[level] = stats.get(level, 0) + 1
return stats
except Exception as e:
print(f"获取日志统计失败: {e}")
return {}
def audit_log(self, action: str, user_id: str, resource: str,
details: Dict[str, Any] = None, ip_address: str = None):
"""记录审计日志"""
try:
audit_entry = {
'id': str(uuid.uuid4()),
'timestamp': datetime.now().isoformat(),
'action': action,
'user_id': user_id,
'resource': resource,
'details': details or {},
'ip_address': ip_address,
'created_at': datetime.now().timestamp()
}
audit_key = self._get_log_key("audit")
self.redis_client.lpush(audit_key, json.dumps(audit_entry))
self.redis_client.ltrim(audit_key, 0, 9999) # 保留最近10000条
self.redis_client.expire(audit_key, 90 * 24 * 3600) # 90天过期
# 同时记录到用户审计日志
user_audit_key = self._get_log_key("user_audit", user_id)
self.redis_client.lpush(user_audit_key, json.dumps(audit_entry))
self.redis_client.ltrim(user_audit_key, 0, 999) # 保留最近1000条
self.redis_client.expire(user_audit_key, 90 * 24 * 3600) # 90天过期
except Exception as e:
print(f"记录审计日志失败: {e}")
def get_audit_logs(self, start: int = 0, count: int = 100,
user_id: str = None) -> List[Dict[str, Any]]:
"""获取审计日志"""
try:
if user_id:
audit_key = self._get_log_key("user_audit", user_id)
else:
audit_key = self._get_log_key("audit")
audit_entries = self.redis_client.lrange(audit_key, start, start + count - 1)
audits = []
for entry in audit_entries:
audit_data = json.loads(entry)
audits.append(audit_data)
return audits
except Exception as e:
print(f"获取审计日志失败: {e}")
return []
def clear_old_logs(self, days: int = 7):
"""清理旧日志"""
try:
cutoff_time = datetime.now().timestamp() - (days * 24 * 3600)
# 清理各类日志
log_patterns = [
f"{self.log_prefix}:global:*",
f"{self.log_prefix}:app:*",
f"{self.log_prefix}:user:*",
f"{self.log_prefix}:error:*"
]
for pattern in log_patterns:
for key in self.redis_client.scan_iter(match=pattern):
self._clean_log_key(key, cutoff_time)
except Exception as e:
print(f"清理旧日志失败: {e}")
def _clean_log_key(self, key: str, cutoff_time: float):
"""清理单个日志键的旧记录"""
try:
all_logs = self.redis_client.lrange(key, 0, -1)
valid_logs = []
for entry in all_logs:
log_data = json.loads(entry)
if log_data.get('created_at', 0) > cutoff_time:
valid_logs.append(entry)
# 重新设置日志
if valid_logs:
self.redis_client.delete(key)
self.redis_client.lpush(key, *valid_logs)
else:
self.redis_client.delete(key)
except Exception as e:
print(f"清理日志键失败: {e}")
# 使用示例
if __name__ == "__main__":
logger = RedisLogger()
# 记录各种级别的日志
logger.info("应用启动", app_name="myapp", context={"version": "1.0.0"})
logger.debug("调试信息", app_name="myapp", context={"module": "auth"})
logger.warning("内存使用率较高", app_name="myapp", context={"memory_usage": "85%"})
logger.error("数据库连接失败", app_name="myapp", context={
"error_code": "DB_CONN_FAILED",
"retry_count": 3
})
# 记录用户操作日志
logger.info("用户登录", app_name="myapp", user_id="user_123", context={
"ip_address": "192.168.1.100",
"user_agent": "Mozilla/5.0..."
})
# 记录审计日志
logger.audit_log(
action="DELETE_USER",
user_id="admin_001",
resource="user:user_456",
details={"reason": "违规行为", "operator": "admin_001"},
ip_address="192.168.1.10"
)
# 获取应用日志
app_logs = logger.get_app_logs("myapp", count=10)
print(f"\n应用日志 ({len(app_logs)} 条):")
for log in app_logs:
print(f"[{log['level']}] {log['timestamp']}: {log['message']}")
# 获取错误日志
error_logs = logger.get_error_logs(count=5)
print(f"\n错误日志 ({len(error_logs)} 条):")
for log in error_logs:
print(f"[{log['level']}] {log['timestamp']}: {log['message']}")
# 搜索日志
search_results = logger.search_logs("数据库", limit=5)
print(f"\n搜索'数据库'相关日志 ({len(search_results)} 条):")
for log in search_results:
print(f"[{log['level']}] {log['message']}")
# 获取日志统计
stats = logger.get_log_stats()
print(f"\n日志统计: {stats}")
# 获取审计日志
audit_logs = logger.get_audit_logs(count=5)
print(f"\n审计日志 ({len(audit_logs)} 条):")
for audit in audit_logs:
print(f"{audit['timestamp']}: {audit['user_id']} {audit['action']} {audit['resource']}")
4.5 栈和队列实现
4.5.1 栈(LIFO)实现
import redis
from typing import Any, Optional
class RedisStack:
def __init__(self, name: str, redis_client: redis.Redis):
self.name = name
self.redis_client = redis_client
self.key = f"stack:{name}"
def push(self, item: Any) -> int:
"""入栈"""
return self.redis_client.lpush(self.key, str(item))
def pop(self) -> Optional[str]:
"""出栈"""
return self.redis_client.lpop(self.key)
def peek(self) -> Optional[str]:
"""查看栈顶元素"""
return self.redis_client.lindex(self.key, 0)
def size(self) -> int:
"""栈大小"""
return self.redis_client.llen(self.key)
def is_empty(self) -> bool:
"""是否为空"""
return self.size() == 0
def clear(self):
"""清空栈"""
self.redis_client.delete(self.key)
# 使用示例
redis_client = redis.Redis(decode_responses=True)
stack = RedisStack("my_stack", redis_client)
# 入栈操作
stack.push("first")
stack.push("second")
stack.push("third")
print(f"栈大小: {stack.size()}")
# 查看栈顶
print(f"栈顶元素: {stack.peek()}")
# 出栈操作
while not stack.is_empty():
item = stack.pop()
print(f"出栈: {item}")
4.5.2 队列(FIFO)实现
import redis
from typing import Any, Optional
class RedisQueue:
def __init__(self, name: str, redis_client: redis.Redis):
self.name = name
self.redis_client = redis_client
self.key = f"queue:{name}"
def enqueue(self, item: Any) -> int:
"""入队"""
return self.redis_client.rpush(self.key, str(item))
def dequeue(self) -> Optional[str]:
"""出队"""
return self.redis_client.lpop(self.key)
def dequeue_blocking(self, timeout: int = 0) -> Optional[str]:
"""阻塞出队"""
result = self.redis_client.blpop(self.key, timeout=timeout)
return result[1] if result else None
def front(self) -> Optional[str]:
"""查看队首元素"""
return self.redis_client.lindex(self.key, 0)
def rear(self) -> Optional[str]:
"""查看队尾元素"""
return self.redis_client.lindex(self.key, -1)
def size(self) -> int:
"""队列大小"""
return self.redis_client.llen(self.key)
def is_empty(self) -> bool:
"""是否为空"""
return self.size() == 0
def clear(self):
"""清空队列"""
self.redis_client.delete(self.key)
# 使用示例
queue = RedisQueue("my_queue", redis_client)
# 入队操作
queue.enqueue("task1")
queue.enqueue("task2")
queue.enqueue("task3")
print(f"队列大小: {queue.size()}")
# 查看队首和队尾
print(f"队首: {queue.front()}")
print(f"队尾: {queue.rear()}")
# 出队操作
while not queue.is_empty():
item = queue.dequeue()
print(f"出队: {item}")
4.6 性能优化
4.6.1 列表编码优化
# 查看列表编码
redis-cli
> OBJECT ENCODING mylist
# 配置优化参数
# redis.conf
list-max-ziplist-size -2 # 每个节点最大8KB
list-compress-depth 1 # 压缩深度
4.6.2 内存优化
import redis
class OptimizedRedisList:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def batch_push(self, key: str, items: list, batch_size: int = 1000):
"""批量推送优化"""
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
self.redis_client.lpush(key, *batch)
def memory_efficient_scan(self, key: str, chunk_size: int = 100):
"""内存高效的扫描"""
total_length = self.redis_client.llen(key)
for start in range(0, total_length, chunk_size):
end = min(start + chunk_size - 1, total_length - 1)
chunk = self.redis_client.lrange(key, start, end)
yield chunk
def trim_with_backup(self, key: str, start: int, stop: int,
backup_key: str = None):
"""带备份的修剪"""
if backup_key:
# 备份被删除的数据
if start > 0:
backup_data = self.redis_client.lrange(key, 0, start - 1)
if backup_data:
self.redis_client.lpush(backup_key, *backup_data)
list_len = self.redis_client.llen(key)
if stop < list_len - 1:
backup_data = self.redis_client.lrange(key, stop + 1, -1)
if backup_data:
self.redis_client.rpush(backup_key, *backup_data)
# 执行修剪
self.redis_client.ltrim(key, start, stop)
# 使用示例
optimized_list = OptimizedRedisList(redis_client)
# 批量操作
large_data = [f"item_{i}" for i in range(10000)]
optimized_list.batch_push("large_list", large_data)
# 内存高效扫描
for chunk in optimized_list.memory_efficient_scan("large_list", 500):
print(f"处理 {len(chunk)} 个项目")
# 带备份的修剪
optimized_list.trim_with_backup("large_list", 100, 199, "backup_list")
4.6.3 管道化操作
import redis
from typing import List, Any
class PipelinedListOperations:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def bulk_operations(self, operations: List[dict]):
"""批量操作"""
pipe = self.redis_client.pipeline()
for op in operations:
cmd = op['command']
key = op['key']
if cmd == 'lpush':
pipe.lpush(key, *op['values'])
elif cmd == 'rpush':
pipe.rpush(key, *op['values'])
elif cmd == 'lpop':
pipe.lpop(key)
elif cmd == 'rpop':
pipe.rpop(key)
elif cmd == 'llen':
pipe.llen(key)
elif cmd == 'lrange':
pipe.lrange(key, op['start'], op['stop'])
return pipe.execute()
def multi_list_stats(self, keys: List[str]) -> dict:
"""多列表统计"""
pipe = self.redis_client.pipeline()
for key in keys:
pipe.llen(key)
pipe.lindex(key, 0) # 第一个元素
pipe.lindex(key, -1) # 最后一个元素
results = pipe.execute()
stats = {}
for i, key in enumerate(keys):
base_idx = i * 3
stats[key] = {
'length': results[base_idx],
'first': results[base_idx + 1],
'last': results[base_idx + 2]
}
return stats
# 使用示例
pipelined_ops = PipelinedListOperations(redis_client)
# 批量操作
operations = [
{'command': 'lpush', 'key': 'list1', 'values': ['a', 'b', 'c']},
{'command': 'rpush', 'key': 'list2', 'values': ['x', 'y', 'z']},
{'command': 'llen', 'key': 'list1'},
{'command': 'llen', 'key': 'list2'}
]
results = pipelined_ops.bulk_operations(operations)
print(f"批量操作结果: {results}")
# 多列表统计
stats = pipelined_ops.multi_list_stats(['list1', 'list2', 'list3'])
for key, stat in stats.items():
print(f"{key}: 长度={stat['length']}, 首={stat['first']}, 尾={stat['last']}")
4.7 监控和调试
4.7.1 列表监控
import redis
import time
from typing import Dict, Any
class ListMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def get_list_info(self, key: str) -> Dict[str, Any]:
"""获取列表详细信息"""
info = {
'exists': self.redis_client.exists(key),
'type': self.redis_client.type(key),
'length': 0,
'encoding': None,
'memory_usage': 0,
'ttl': self.redis_client.ttl(key)
}
if info['exists']:
info['length'] = self.redis_client.llen(key)
info['encoding'] = self.redis_client.object('encoding', key)
# 内存使用(Redis 4.0+)
try:
info['memory_usage'] = self.redis_client.memory_usage(key)
except:
info['memory_usage'] = 'N/A'
return info
def monitor_list_performance(self, key: str, duration: int = 60):
"""监控列表性能"""
start_time = time.time()
samples = []
while time.time() - start_time < duration:
sample = {
'timestamp': time.time(),
'length': self.redis_client.llen(key),
'memory': self.redis_client.memory_usage(key) if self.redis_client.exists(key) else 0
}
samples.append(sample)
time.sleep(1)
return self._analyze_samples(samples)
def _analyze_samples(self, samples: list) -> Dict[str, Any]:
"""分析性能样本"""
if not samples:
return {}
lengths = [s['length'] for s in samples]
memories = [s['memory'] for s in samples if s['memory'] > 0]
analysis = {
'duration': samples[-1]['timestamp'] - samples[0]['timestamp'],
'sample_count': len(samples),
'length_stats': {
'min': min(lengths),
'max': max(lengths),
'avg': sum(lengths) / len(lengths),
'final': lengths[-1]
}
}
if memories:
analysis['memory_stats'] = {
'min': min(memories),
'max': max(memories),
'avg': sum(memories) / len(memories),
'final': memories[-1] if memories else 0
}
return analysis
def diagnose_list_issues(self, key: str) -> Dict[str, Any]:
"""诊断列表问题"""
info = self.get_list_info(key)
issues = []
recommendations = []
# 检查大小
if info['length'] > 100000:
issues.append("列表过大,可能影响性能")
recommendations.append("考虑分片或使用LTRIM定期清理")
# 检查编码
if info['encoding'] == 'linkedlist':
issues.append("使用链表编码,内存效率较低")
recommendations.append("调整list-max-ziplist-size参数")
# 检查内存使用
if isinstance(info['memory_usage'], int) and info['memory_usage'] > 10 * 1024 * 1024: # 10MB
issues.append("内存使用过高")
recommendations.append("考虑数据压缩或分片")
# 检查TTL
if info['ttl'] == -1 and info['length'] > 1000:
issues.append("大列表没有设置过期时间")
recommendations.append("设置合适的TTL避免内存泄漏")
return {
'info': info,
'issues': issues,
'recommendations': recommendations
}
# 使用示例
monitor = ListMonitor(redis_client)
# 获取列表信息
info = monitor.get_list_info("my_list")
print(f"列表信息: {info}")
# 诊断问题
diagnosis = monitor.diagnose_list_issues("my_list")
print(f"\n诊断结果:")
print(f"问题: {diagnosis['issues']}")
print(f"建议: {diagnosis['recommendations']}")
# 性能监控(需要在实际环境中运行)
# performance = monitor.monitor_list_performance("my_list", 30)
# print(f"\n性能分析: {performance}")
4.8 总结
Redis列表类型是一个功能强大且灵活的数据结构,具有以下特点:
4.8.1 核心特性
- 有序性: 保持元素插入顺序
- 可重复: 允许重复元素
- 双端操作: 支持头尾两端的高效操作
- 阻塞操作: 支持阻塞式弹出操作
- 原子性: 所有操作都是原子的
4.8.2 适用场景
- 消息队列: 实现可靠的任务队列系统
- 时间线: 存储用户动态、文章列表等
- 日志系统: 存储和管理应用日志
- 栈和队列: 实现LIFO和FIFO数据结构
- 最近访问: 维护最近访问记录
4.8.3 性能优化要点
- 编码优化: 合理配置ziplist参数
- 批量操作: 使用管道减少网络开销
- 内存管理: 定期清理和设置TTL
- 监控诊断: 及时发现和解决性能问题
4.8.4 最佳实践
- 根据使用场景选择合适的操作命令
- 避免在大列表上进行LRANGE全量操作
- 使用LTRIM控制列表大小
- 合理使用阻塞操作实现高效队列
- 定期监控列表性能和内存使用
Redis列表类型为开发者提供了构建高性能、可扩展应用的强大工具,正确使用能够显著提升系统的响应速度和处理能力。