7.1 位图(Bitmap)
7.1.1 位图概述
Redis位图(Bitmap)是一种特殊的字符串类型,可以对字符串的每一位进行操作。位图非常适合存储布尔值信息,具有极高的空间效率。
7.1.2 位图特点
- 空间高效: 每个位只占用1bit空间
- 快速操作: 位操作的时间复杂度为O(1)
- 大容量: 理论上可以存储2^32个位
- 原子操作: 所有位操作都是原子的
- 统计功能: 支持快速统计位数
7.1.3 应用场景
- 用户签到: 记录用户每日签到状态
- 在线状态: 记录用户在线/离线状态
- 权限控制: 记录用户权限位
- 活跃统计: 统计日活、月活用户
- 布隆过滤器: 实现概率性数据结构
7.2 位图基本操作
7.2.1 设置和获取位
# 设置位值
SETBIT key offset value
SETBIT user:1001:signin 0 1 # 设置第0天签到
SETBIT user:1001:signin 1 1 # 设置第1天签到
SETBIT user:1001:signin 5 1 # 设置第5天签到
# 获取位值
GETBIT key offset
GETBIT user:1001:signin 0 # 返回1
GETBIT user:1001:signin 2 # 返回0(未签到)
# 统计位数
BITCOUNT key [start end]
BITCOUNT user:1001:signin # 统计总签到天数
BITCOUNT user:1001:signin 0 6 # 统计前7天签到次数
# 查找第一个设置的位
BITPOS key bit [start] [end]
BITPOS user:1001:signin 1 # 查找第一个签到的位置
BITPOS user:1001:signin 0 # 查找第一个未签到的位置
7.2.2 位运算操作
# 位运算
BITOP operation destkey key [key ...]
# 创建测试数据
SETBIT user:1001 0 1
SETBIT user:1001 1 1
SETBIT user:1001 2 0
SETBIT user:1002 0 1
SETBIT user:1002 1 0
SETBIT user:1002 2 1
# AND运算(交集)
BITOP AND result user:1001 user:1002
GETBIT result 0 # 返回1(两个用户都在第0天活跃)
GETBIT result 1 # 返回0(第1天不是都活跃)
# OR运算(并集)
BITOP OR result user:1001 user:1002
BITCOUNT result # 统计至少有一个用户活跃的天数
# XOR运算(异或)
BITOP XOR result user:1001 user:1002
# NOT运算(取反)
BITOP NOT result user:1001
7.3 Python位图实现
7.3.1 基础位图类
import redis
from typing import List, Dict, Optional, Set, Tuple
from datetime import datetime, timedelta
import calendar
class RedisBitmap:
def __init__(self, redis_client: redis.Redis, key: str):
self.redis_client = redis_client
self.key = key
def set_bit(self, offset: int, value: int) -> int:
"""设置位值"""
return self.redis_client.setbit(self.key, offset, value)
def get_bit(self, offset: int) -> int:
"""获取位值"""
return self.redis_client.getbit(self.key, offset)
def count(self, start: int = None, end: int = None) -> int:
"""统计位数"""
if start is not None and end is not None:
return self.redis_client.bitcount(self.key, start, end)
return self.redis_client.bitcount(self.key)
def find_first(self, bit: int, start: int = None, end: int = None) -> int:
"""查找第一个指定位"""
if start is not None and end is not None:
return self.redis_client.bitpos(self.key, bit, start, end)
elif start is not None:
return self.redis_client.bitpos(self.key, bit, start)
return self.redis_client.bitpos(self.key, bit)
def set_range(self, start: int, end: int, value: int = 1):
"""设置范围内的位"""
for offset in range(start, end + 1):
self.set_bit(offset, value)
def get_range(self, start: int, end: int) -> List[int]:
"""获取范围内的位"""
return [self.get_bit(offset) for offset in range(start, end + 1)]
def clear(self) -> bool:
"""清空位图"""
return self.redis_client.delete(self.key) > 0
def size(self) -> int:
"""获取位图大小(字节)"""
return len(self.redis_client.get(self.key) or b'')
def to_list(self, max_offset: int = None) -> List[int]:
"""转换为列表"""
if max_offset is None:
# 找到最后一个设置的位
last_bit = self.find_first(1)
if last_bit == -1:
return []
max_offset = last_bit
return self.get_range(0, max_offset)
class BitmapOperations:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
def bitop_and(self, dest_key: str, *keys: str) -> int:
"""位AND运算"""
return self.redis_client.bitop('AND', dest_key, *keys)
def bitop_or(self, dest_key: str, *keys: str) -> int:
"""位OR运算"""
return self.redis_client.bitop('OR', dest_key, *keys)
def bitop_xor(self, dest_key: str, *keys: str) -> int:
"""位XOR运算"""
return self.redis_client.bitop('XOR', dest_key, *keys)
def bitop_not(self, dest_key: str, key: str) -> int:
"""位NOT运算"""
return self.redis_client.bitop('NOT', dest_key, key)
def intersection(self, *keys: str) -> Set[int]:
"""获取交集位置"""
if not keys:
return set()
temp_key = f"temp_and_{int(datetime.now().timestamp())}"
try:
self.bitop_and(temp_key, *keys)
result = set()
# 查找所有设置的位
offset = 0
while True:
pos = self.redis_client.bitpos(temp_key, 1, offset)
if pos == -1:
break
result.add(pos)
offset = pos + 1
return result
finally:
self.redis_client.delete(temp_key)
def union(self, *keys: str) -> Set[int]:
"""获取并集位置"""
if not keys:
return set()
temp_key = f"temp_or_{int(datetime.now().timestamp())}"
try:
self.bitop_or(temp_key, *keys)
result = set()
offset = 0
while True:
pos = self.redis_client.bitpos(temp_key, 1, offset)
if pos == -1:
break
result.add(pos)
offset = pos + 1
return result
finally:
self.redis_client.delete(temp_key)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
# 创建位图
user_activity = RedisBitmap(redis_client, "user:1001:activity")
# 设置用户活跃天数
user_activity.set_bit(0, 1) # 第1天活跃
user_activity.set_bit(2, 1) # 第3天活跃
user_activity.set_bit(5, 1) # 第6天活跃
# 查询活跃状态
print(f"第1天活跃: {user_activity.get_bit(0)}")
print(f"第2天活跃: {user_activity.get_bit(1)}")
# 统计活跃天数
total_active_days = user_activity.count()
print(f"总活跃天数: {total_active_days}")
# 查找第一个活跃天
first_active = user_activity.find_first(1)
print(f"第一个活跃天: {first_active}")
# 位运算示例
user1 = RedisBitmap(redis_client, "user:1001:signin")
user2 = RedisBitmap(redis_client, "user:1002:signin")
# 设置签到数据
user1.set_range(0, 2, 1) # 前3天都签到
user2.set_bit(1, 1) # 只有第2天签到
user2.set_bit(3, 1) # 第4天签到
# 位运算
bitops = BitmapOperations(redis_client)
# 找出两个用户都签到的天数
common_days = bitops.intersection("user:1001:signin", "user:1002:signin")
print(f"共同签到天数: {common_days}")
# 找出至少有一个用户签到的天数
any_signin_days = bitops.union("user:1001:signin", "user:1002:signin")
print(f"任一用户签到天数: {any_signin_days}")
# 清理
user_activity.clear()
user1.clear()
user2.clear()
7.3.2 用户签到系统
import redis
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class SigninRewardType(Enum):
POINTS = "points"
COINS = "coins"
ITEMS = "items"
@dataclass
class SigninReward:
reward_type: SigninRewardType
amount: int
description: str
@dataclass
class SigninStats:
total_days: int
current_streak: int
max_streak: int
this_month_days: int
last_signin_date: Optional[datetime]
class UserSigninSystem:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.signin_prefix = "signin"
self.user_stats_prefix = "signin_stats"
# 签到奖励配置
self.daily_rewards = {
1: SigninReward(SigninRewardType.POINTS, 10, "首次签到奖励"),
7: SigninReward(SigninRewardType.COINS, 50, "连续7天奖励"),
30: SigninReward(SigninRewardType.ITEMS, 1, "连续30天奖励")
}
def _get_signin_key(self, user_id: str, year: int, month: int) -> str:
"""获取签到键名"""
return f"{self.signin_prefix}:{user_id}:{year}:{month:02d}"
def _get_day_offset(self, date: datetime) -> int:
"""获取月内天数偏移"""
return date.day - 1
def signin(self, user_id: str, date: datetime = None) -> Dict[str, any]:
"""用户签到"""
if date is None:
date = datetime.now()
year, month = date.year, date.month
day_offset = self._get_day_offset(date)
signin_key = self._get_signin_key(user_id, year, month)
# 检查是否已签到
if self.redis_client.getbit(signin_key, day_offset):
return {
'success': False,
'message': '今日已签到',
'date': date.strftime('%Y-%m-%d')
}
# 执行签到
self.redis_client.setbit(signin_key, day_offset, 1)
# 设置过期时间(保留13个月)
self.redis_client.expire(signin_key, 86400 * 400)
# 更新统计信息
stats = self._update_stats(user_id, date)
# 计算奖励
rewards = self._calculate_rewards(stats)
return {
'success': True,
'message': '签到成功',
'date': date.strftime('%Y-%m-%d'),
'stats': stats,
'rewards': rewards
}
def is_signed_in(self, user_id: str, date: datetime = None) -> bool:
"""检查是否已签到"""
if date is None:
date = datetime.now()
year, month = date.year, date.month
day_offset = self._get_day_offset(date)
signin_key = self._get_signin_key(user_id, year, month)
return bool(self.redis_client.getbit(signin_key, day_offset))
def get_month_signin_status(self, user_id: str, year: int, month: int) -> List[bool]:
"""获取月签到状态"""
signin_key = self._get_signin_key(user_id, year, month)
# 获取该月的天数
import calendar
days_in_month = calendar.monthrange(year, month)[1]
status = []
for day in range(days_in_month):
is_signed = bool(self.redis_client.getbit(signin_key, day))
status.append(is_signed)
return status
def get_signin_calendar(self, user_id: str, year: int, month: int) -> Dict[str, any]:
"""获取签到日历"""
signin_status = self.get_month_signin_status(user_id, year, month)
signin_key = self._get_signin_key(user_id, year, month)
# 统计本月签到天数
month_signin_count = self.redis_client.bitcount(signin_key)
# 生成日历数据
import calendar
cal = calendar.monthcalendar(year, month)
calendar_data = []
for week in cal:
week_data = []
for day in week:
if day == 0:
week_data.append({'day': 0, 'signed': False})
else:
signed = signin_status[day - 1] if day <= len(signin_status) else False
week_data.append({'day': day, 'signed': signed})
calendar_data.append(week_data)
return {
'year': year,
'month': month,
'calendar': calendar_data,
'month_signin_count': month_signin_count,
'total_days': len(signin_status)
}
def get_signin_stats(self, user_id: str) -> SigninStats:
"""获取签到统计"""
now = datetime.now()
# 计算总签到天数
total_days = 0
for year in range(now.year - 1, now.year + 1):
for month in range(1, 13):
if year == now.year and month > now.month:
break
signin_key = self._get_signin_key(user_id, year, month)
total_days += self.redis_client.bitcount(signin_key)
# 计算当前连续签到天数
current_streak = self._calculate_current_streak(user_id, now)
# 计算最大连续签到天数
max_streak = self._calculate_max_streak(user_id)
# 本月签到天数
this_month_key = self._get_signin_key(user_id, now.year, now.month)
this_month_days = self.redis_client.bitcount(this_month_key)
# 最后签到日期
last_signin_date = self._get_last_signin_date(user_id)
return SigninStats(
total_days=total_days,
current_streak=current_streak,
max_streak=max_streak,
this_month_days=this_month_days,
last_signin_date=last_signin_date
)
def _calculate_current_streak(self, user_id: str, end_date: datetime) -> int:
"""计算当前连续签到天数"""
streak = 0
current_date = end_date
# 从今天开始往前查找
for _ in range(365): # 最多查找一年
if self.is_signed_in(user_id, current_date):
streak += 1
current_date -= timedelta(days=1)
else:
break
return streak
def _calculate_max_streak(self, user_id: str) -> int:
"""计算最大连续签到天数"""
# 简化实现,实际应该遍历所有历史数据
# 这里只计算最近一年的数据
now = datetime.now()
max_streak = 0
current_streak = 0
# 从一年前开始计算
start_date = now - timedelta(days=365)
current_date = start_date
while current_date <= now:
if self.is_signed_in(user_id, current_date):
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 0
current_date += timedelta(days=1)
return max_streak
def _get_last_signin_date(self, user_id: str) -> Optional[datetime]:
"""获取最后签到日期"""
now = datetime.now()
# 从今天开始往前查找
for i in range(365):
check_date = now - timedelta(days=i)
if self.is_signed_in(user_id, check_date):
return check_date
return None
def _update_stats(self, user_id: str, date: datetime) -> SigninStats:
"""更新统计信息"""
# 这里可以缓存统计信息到Redis中,避免每次重新计算
return self.get_signin_stats(user_id)
def _calculate_rewards(self, stats: SigninStats) -> List[SigninReward]:
"""计算奖励"""
rewards = []
# 基础签到奖励
rewards.append(SigninReward(SigninRewardType.POINTS, 10, "每日签到奖励"))
# 连续签到奖励
for days, reward in self.daily_rewards.items():
if stats.current_streak >= days and stats.current_streak % days == 0:
rewards.append(reward)
return rewards
def get_signin_ranking(self, limit: int = 10) -> List[Dict[str, any]]:
"""获取签到排行榜"""
# 这里需要维护一个签到排行榜的有序集合
ranking_key = "signin_ranking"
# 获取排行榜数据
results = self.redis_client.zrevrange(ranking_key, 0, limit-1, withscores=True)
ranking = []
for i, (user_id, score) in enumerate(results):
ranking.append({
'rank': i + 1,
'user_id': user_id,
'total_signin_days': int(score)
})
return ranking
def update_ranking(self, user_id: str):
"""更新排行榜"""
stats = self.get_signin_stats(user_id)
ranking_key = "signin_ranking"
# 更新用户在排行榜中的分数
self.redis_client.zadd(ranking_key, {user_id: stats.total_days})
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
signin_system = UserSigninSystem(redis_client)
user_id = "user_1001"
# 用户签到
result = signin_system.signin(user_id)
print(f"签到结果: {result}")
# 检查今日是否已签到
is_signed = signin_system.is_signed_in(user_id)
print(f"今日已签到: {is_signed}")
# 获取本月签到日历
now = datetime.now()
calendar_data = signin_system.get_signin_calendar(user_id, now.year, now.month)
print(f"\n本月签到日历:")
print(f"签到天数: {calendar_data['month_signin_count']}/{calendar_data['total_days']}")
# 获取签到统计
stats = signin_system.get_signin_stats(user_id)
print(f"\n签到统计:")
print(f"总签到天数: {stats.total_days}")
print(f"当前连续: {stats.current_streak}天")
print(f"最大连续: {stats.max_streak}天")
print(f"本月签到: {stats.this_month_days}天")
if stats.last_signin_date:
print(f"最后签到: {stats.last_signin_date.strftime('%Y-%m-%d')}")
7.3.3 实时活跃用户统计
import redis
from datetime import datetime, timedelta
from typing import Dict, List, Set, Optional
from dataclasses import dataclass
from enum import Enum
class TimeWindow(Enum):
HOURLY = "hourly"
DAILY = "daily"
WEEKLY = "weekly"
MONTHLY = "monthly"
@dataclass
class ActivityStats:
window: TimeWindow
timestamp: datetime
active_users: int
new_users: int
returning_users: int
class RealTimeActivityTracker:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.activity_prefix = "activity"
self.user_first_seen_prefix = "user_first_seen"
def _get_activity_key(self, window: TimeWindow, timestamp: datetime) -> str:
"""获取活跃用户键名"""
if window == TimeWindow.HOURLY:
return f"{self.activity_prefix}:hourly:{timestamp.strftime('%Y%m%d%H')}"
elif window == TimeWindow.DAILY:
return f"{self.activity_prefix}:daily:{timestamp.strftime('%Y%m%d')}"
elif window == TimeWindow.WEEKLY:
# 获取周的第一天
week_start = timestamp - timedelta(days=timestamp.weekday())
return f"{self.activity_prefix}:weekly:{week_start.strftime('%Y%m%d')}"
elif window == TimeWindow.MONTHLY:
return f"{self.activity_prefix}:monthly:{timestamp.strftime('%Y%m')}"
def _get_user_offset(self, user_id: str) -> int:
"""获取用户在位图中的偏移量"""
# 简单的哈希函数,实际应用中可能需要更复杂的映射
return hash(user_id) % (2**20) # 支持约100万用户
def record_user_activity(self, user_id: str, timestamp: datetime = None) -> Dict[str, bool]:
"""记录用户活跃"""
if timestamp is None:
timestamp = datetime.now()
user_offset = self._get_user_offset(user_id)
results = {}
# 记录到各个时间窗口
for window in TimeWindow:
activity_key = self._get_activity_key(window, timestamp)
# 检查用户是否已经在这个时间窗口内活跃过
was_active = bool(self.redis_client.getbit(activity_key, user_offset))
# 设置用户活跃
self.redis_client.setbit(activity_key, user_offset, 1)
# 设置过期时间
if window == TimeWindow.HOURLY:
self.redis_client.expire(activity_key, 86400 * 7) # 保留7天
elif window == TimeWindow.DAILY:
self.redis_client.expire(activity_key, 86400 * 90) # 保留90天
elif window == TimeWindow.WEEKLY:
self.redis_client.expire(activity_key, 86400 * 365) # 保留1年
elif window == TimeWindow.MONTHLY:
self.redis_client.expire(activity_key, 86400 * 730) # 保留2年
results[window.value] = not was_active # 返回是否是新活跃
# 记录用户首次出现时间
first_seen_key = f"{self.user_first_seen_prefix}:{user_id}"
if not self.redis_client.exists(first_seen_key):
self.redis_client.set(first_seen_key, timestamp.timestamp())
self.redis_client.expire(first_seen_key, 86400 * 730) # 保留2年
return results
def get_active_users_count(self, window: TimeWindow, timestamp: datetime = None) -> int:
"""获取活跃用户数"""
if timestamp is None:
timestamp = datetime.now()
activity_key = self._get_activity_key(window, timestamp)
return self.redis_client.bitcount(activity_key)
def get_activity_stats(self, window: TimeWindow, timestamp: datetime = None) -> ActivityStats:
"""获取活跃统计"""
if timestamp is None:
timestamp = datetime.now()
current_key = self._get_activity_key(window, timestamp)
active_users = self.redis_client.bitcount(current_key)
# 计算新用户和回归用户
new_users = 0
returning_users = 0
if window == TimeWindow.DAILY:
# 获取昨天的活跃用户
yesterday = timestamp - timedelta(days=1)
yesterday_key = self._get_activity_key(window, yesterday)
# 计算新用户(今天活跃但昨天不活跃)
temp_new_key = f"temp_new_{int(timestamp.timestamp())}"
temp_returning_key = f"temp_returning_{int(timestamp.timestamp())}"
try:
# 新用户 = 今天活跃 AND NOT 昨天活跃
self.redis_client.bitop('NOT', temp_new_key, yesterday_key)
self.redis_client.bitop('AND', temp_new_key, current_key, temp_new_key)
new_users = self.redis_client.bitcount(temp_new_key)
# 回归用户 = 今天活跃 AND 昨天活跃
self.redis_client.bitop('AND', temp_returning_key, current_key, yesterday_key)
returning_users = self.redis_client.bitcount(temp_returning_key)
finally:
self.redis_client.delete(temp_new_key, temp_returning_key)
return ActivityStats(
window=window,
timestamp=timestamp,
active_users=active_users,
new_users=new_users,
returning_users=returning_users
)
def get_activity_trend(self, window: TimeWindow, days: int = 7) -> List[ActivityStats]:
"""获取活跃趋势"""
trends = []
now = datetime.now()
for i in range(days):
if window == TimeWindow.DAILY:
target_date = now - timedelta(days=i)
elif window == TimeWindow.HOURLY:
target_date = now - timedelta(hours=i)
else:
target_date = now - timedelta(days=i * 7) # 周数据
stats = self.get_activity_stats(window, target_date)
trends.append(stats)
return list(reversed(trends)) # 按时间正序返回
def get_user_activity_pattern(self, user_id: str, days: int = 30) -> Dict[str, any]:
"""获取用户活跃模式"""
user_offset = self._get_user_offset(user_id)
pattern = []
now = datetime.now()
for i in range(days):
date = now - timedelta(days=i)
daily_key = self._get_activity_key(TimeWindow.DAILY, date)
is_active = bool(self.redis_client.getbit(daily_key, user_offset))
pattern.append({
'date': date.strftime('%Y-%m-%d'),
'active': is_active
})
# 计算活跃天数和连续活跃天数
active_days = sum(1 for p in pattern if p['active'])
# 计算最大连续活跃天数
max_streak = 0
current_streak = 0
for p in reversed(pattern): # 从最早开始计算
if p['active']:
current_streak += 1
max_streak = max(max_streak, current_streak)
else:
current_streak = 0
# 获取用户首次出现时间
first_seen_key = f"{self.user_first_seen_prefix}:{user_id}"
first_seen_timestamp = self.redis_client.get(first_seen_key)
first_seen = None
if first_seen_timestamp:
first_seen = datetime.fromtimestamp(float(first_seen_timestamp))
return {
'user_id': user_id,
'days_analyzed': days,
'active_days': active_days,
'activity_rate': active_days / days,
'max_consecutive_days': max_streak,
'first_seen': first_seen.strftime('%Y-%m-%d') if first_seen else None,
'pattern': list(reversed(pattern)) # 按时间正序
}
def get_cohort_analysis(self, start_date: datetime, cohort_size_days: int = 7) -> Dict[str, any]:
"""获取用户留存分析"""
cohorts = []
# 生成队列
current_date = start_date
end_date = datetime.now() - timedelta(days=cohort_size_days)
while current_date <= end_date:
cohort_end = current_date + timedelta(days=cohort_size_days - 1)
# 获取队列期间的新用户
cohort_users_key = f"cohort_{current_date.strftime('%Y%m%d')}"
# 计算队列用户(简化实现)
# 实际应该基于用户首次出现时间来确定队列
cohort_data = {
'cohort_start': current_date.strftime('%Y-%m-%d'),
'cohort_end': cohort_end.strftime('%Y-%m-%d'),
'initial_users': 0, # 需要实现队列用户识别逻辑
'retention': [] # 各周期的留存率
}
cohorts.append(cohort_data)
current_date += timedelta(days=cohort_size_days)
return {
'cohort_size_days': cohort_size_days,
'cohorts': cohorts
}
def cleanup_old_data(self, days_to_keep: int = 90):
"""清理旧数据"""
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
# 清理小时数据
for i in range(days_to_keep * 24, days_to_keep * 24 + 168): # 额外清理一周
old_date = datetime.now() - timedelta(hours=i)
old_key = self._get_activity_key(TimeWindow.HOURLY, old_date)
self.redis_client.delete(old_key)
# 清理日数据
for i in range(days_to_keep, days_to_keep + 30): # 额外清理一个月
old_date = datetime.now() - timedelta(days=i)
old_key = self._get_activity_key(TimeWindow.DAILY, old_date)
self.redis_client.delete(old_key)
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
tracker = RealTimeActivityTracker(redis_client)
# 模拟用户活跃
users = [f"user_{i}" for i in range(1000, 1100)]
for user_id in users[:50]: # 50个用户今天活跃
result = tracker.record_user_activity(user_id)
print(f"用户 {user_id} 活跃记录: {result}")
# 获取今日活跃用户数
daily_active = tracker.get_active_users_count(TimeWindow.DAILY)
print(f"\n今日活跃用户数: {daily_active}")
# 获取详细统计
stats = tracker.get_activity_stats(TimeWindow.DAILY)
print(f"今日统计:")
print(f" 活跃用户: {stats.active_users}")
print(f" 新用户: {stats.new_users}")
print(f" 回归用户: {stats.returning_users}")
# 获取活跃趋势
trends = tracker.get_activity_trend(TimeWindow.DAILY, days=7)
print(f"\n7天活跃趋势:")
for trend in trends:
print(f" {trend.timestamp.strftime('%Y-%m-%d')}: {trend.active_users} 活跃用户")
# 获取用户活跃模式
user_pattern = tracker.get_user_activity_pattern("user_1001", days=7)
print(f"\n用户活跃模式:")
print(f" 活跃天数: {user_pattern['active_days']}/{user_pattern['days_analyzed']}")
print(f" 活跃率: {user_pattern['activity_rate']:.2%}")
print(f" 最大连续活跃: {user_pattern['max_consecutive_days']}天")
7.4 HyperLogLog
7.4.1 HyperLogLog概述
HyperLogLog是一种概率性数据结构,用于估算集合的基数(不重复元素的数量)。它具有以下特点:
- 内存高效: 固定使用12KB内存
- 高精度: 标准误差小于1%
- 可合并: 支持多个HyperLogLog合并
- 适合大数据: 可以处理数十亿级别的数据
7.4.2 HyperLogLog基本操作
# 添加元素
PFADD key element [element ...]
PFADD unique_visitors "user1" "user2" "user3"
PFADD unique_visitors "user1" "user4" # user1重复,不会增加计数
# 获取基数估算
PFCOUNT key [key ...]
PFCOUNT unique_visitors # 返回估算的唯一访客数
# 合并多个HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]
PFMERGE total_visitors page1_visitors page2_visitors
PFCOUNT total_visitors # 返回合并后的唯一访客数
7.4.3 Python HyperLogLog实现
import redis
from typing import List, Dict, Set, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum
class MetricType(Enum):
UV = "unique_visitors" # 独立访客
IP = "unique_ips" # 独立IP
USER = "unique_users" # 独立用户
DEVICE = "unique_devices" # 独立设备
@dataclass
class UniqueStats:
metric_type: MetricType
period: str
estimated_count: int
timestamp: datetime
class HyperLogLogAnalytics:
def __init__(self, redis_client: redis.Redis):
self.redis_client = redis_client
self.hll_prefix = "hll"
def _get_hll_key(self, metric_type: MetricType, period: str) -> str:
"""获取HyperLogLog键名"""
return f"{self.hll_prefix}:{metric_type.value}:{period}"
def add_unique_visitor(self, visitor_id: str, timestamp: datetime = None) -> Dict[str, int]:
"""添加独立访客"""
if timestamp is None:
timestamp = datetime.now()
# 生成不同时间粒度的键
periods = {
'hourly': timestamp.strftime('%Y%m%d%H'),
'daily': timestamp.strftime('%Y%m%d'),
'weekly': (timestamp - timedelta(days=timestamp.weekday())).strftime('%Y%m%d'),
'monthly': timestamp.strftime('%Y%m'),
'yearly': timestamp.strftime('%Y')
}
results = {}
for period_type, period_key in periods.items():
hll_key = self._get_hll_key(MetricType.UV, f"{period_type}:{period_key}")
# 添加访客ID
added = self.redis_client.pfadd(hll_key, visitor_id)
# 设置过期时间
if period_type == 'hourly':
self.redis_client.expire(hll_key, 86400 * 7) # 7天
elif period_type == 'daily':
self.redis_client.expire(hll_key, 86400 * 90) # 90天
elif period_type == 'weekly':
self.redis_client.expire(hll_key, 86400 * 365) # 1年
elif period_type == 'monthly':
self.redis_client.expire(hll_key, 86400 * 730) # 2年
# 获取当前估算值
count = self.redis_client.pfcount(hll_key)
results[period_type] = count
return results
def add_multiple_visitors(self, visitor_ids: List[str], timestamp: datetime = None) -> Dict[str, int]:
"""批量添加访客"""
if timestamp is None:
timestamp = datetime.now()
periods = {
'hourly': timestamp.strftime('%Y%m%d%H'),
'daily': timestamp.strftime('%Y%m%d'),
'weekly': (timestamp - timedelta(days=timestamp.weekday())).strftime('%Y%m%d'),
'monthly': timestamp.strftime('%Y%m'),
'yearly': timestamp.strftime('%Y')
}
results = {}
for period_type, period_key in periods.items():
hll_key = self._get_hll_key(MetricType.UV, f"{period_type}:{period_key}")
# 批量添加
self.redis_client.pfadd(hll_key, *visitor_ids)
# 设置过期时间
if period_type == 'hourly':
self.redis_client.expire(hll_key, 86400 * 7)
elif period_type == 'daily':
self.redis_client.expire(hll_key, 86400 * 90)
elif period_type == 'weekly':
self.redis_client.expire(hll_key, 86400 * 365)
elif period_type == 'monthly':
self.redis_client.expire(hll_key, 86400 * 730)
count = self.redis_client.pfcount(hll_key)
results[period_type] = count
return results
def get_unique_count(self, metric_type: MetricType, period_type: str, period_key: str) -> int:
"""获取独立计数"""
hll_key = self._get_hll_key(metric_type, f"{period_type}:{period_key}")
return self.redis_client.pfcount(hll_key)
def get_daily_stats(self, metric_type: MetricType, date: datetime = None) -> UniqueStats:
"""获取日统计"""
if date is None:
date = datetime.now()
period_key = date.strftime('%Y%m%d')
count = self.get_unique_count(metric_type, 'daily', period_key)
return UniqueStats(
metric_type=metric_type,
period=f"daily:{period_key}",
estimated_count=count,
timestamp=date
)
def get_period_trend(self, metric_type: MetricType, period_type: str, days: int = 7) -> List[UniqueStats]:
"""获取周期趋势"""
trends = []
now = datetime.now()
for i in range(days):
if period_type == 'daily':
target_date = now - timedelta(days=i)
period_key = target_date.strftime('%Y%m%d')
elif period_type == 'hourly':
target_date = now - timedelta(hours=i)
period_key = target_date.strftime('%Y%m%d%H')
else:
target_date = now - timedelta(days=i * 7)
period_key = (target_date - timedelta(days=target_date.weekday())).strftime('%Y%m%d')
count = self.get_unique_count(metric_type, period_type, period_key)
trends.append(UniqueStats(
metric_type=metric_type,
period=f"{period_type}:{period_key}",
estimated_count=count,
timestamp=target_date
))
return list(reversed(trends))
def merge_periods(self, metric_type: MetricType, period_type: str, period_keys: List[str], dest_key: str = None) -> int:
"""合并多个周期的数据"""
if dest_key is None:
dest_key = f"merged_{metric_type.value}_{int(datetime.now().timestamp())}"
source_keys = [self._get_hll_key(metric_type, f"{period_type}:{key}") for key in period_keys]
# 过滤存在的键
existing_keys = [key for key in source_keys if self.redis_client.exists(key)]
if not existing_keys:
return 0
# 合并HyperLogLog
self.redis_client.pfmerge(dest_key, *existing_keys)
# 获取合并后的计数
merged_count = self.redis_client.pfcount(dest_key)
# 设置临时键的过期时间
self.redis_client.expire(dest_key, 3600) # 1小时后过期
return merged_count
def get_weekly_unique_visitors(self, start_date: datetime = None) -> int:
"""获取周独立访客数"""
if start_date is None:
start_date = datetime.now() - timedelta(days=datetime.now().weekday())
# 生成一周的日期键
daily_keys = []
for i in range(7):
date = start_date + timedelta(days=i)
daily_keys.append(date.strftime('%Y%m%d'))
return self.merge_periods(MetricType.UV, 'daily', daily_keys)
def get_monthly_unique_visitors(self, year: int = None, month: int = None) -> int:
"""获取月独立访客数"""
if year is None or month is None:
now = datetime.now()
year, month = now.year, now.month
# 生成一个月的日期键
import calendar
days_in_month = calendar.monthrange(year, month)[1]
daily_keys = []
for day in range(1, days_in_month + 1):
date = datetime(year, month, day)
daily_keys.append(date.strftime('%Y%m%d'))
return self.merge_periods(MetricType.UV, 'daily', daily_keys)
def compare_periods(self, metric_type: MetricType, period1: str, period2: str) -> Dict[str, any]:
"""比较两个周期的数据"""
count1 = self.get_unique_count(metric_type, 'daily', period1)
count2 = self.get_unique_count(metric_type, 'daily', period2)
# 计算重叠度(需要合并两个周期的数据)
temp_key = f"temp_compare_{int(datetime.now().timestamp())}"
try:
key1 = self._get_hll_key(metric_type, f"daily:{period1}")
key2 = self._get_hll_key(metric_type, f"daily:{period2}")
if self.redis_client.exists(key1) and self.redis_client.exists(key2):
self.redis_client.pfmerge(temp_key, key1, key2)
merged_count = self.redis_client.pfcount(temp_key)
# 估算重叠用户数
overlap_estimate = count1 + count2 - merged_count
overlap_rate = overlap_estimate / max(count1, count2) if max(count1, count2) > 0 else 0
else:
merged_count = 0
overlap_estimate = 0
overlap_rate = 0
return {
'period1': {'period': period1, 'count': count1},
'period2': {'period': period2, 'count': count2},
'merged_count': merged_count,
'overlap_estimate': overlap_estimate,
'overlap_rate': overlap_rate,
'growth_rate': (count2 - count1) / count1 if count1 > 0 else 0
}
finally:
self.redis_client.delete(temp_key)
def get_analytics_dashboard(self) -> Dict[str, any]:
"""获取分析仪表板数据"""
now = datetime.now()
today = now.strftime('%Y%m%d')
yesterday = (now - timedelta(days=1)).strftime('%Y%m%d')
# 今日数据
today_uv = self.get_unique_count(MetricType.UV, 'daily', today)
yesterday_uv = self.get_unique_count(MetricType.UV, 'daily', yesterday)
# 本周数据
week_uv = self.get_weekly_unique_visitors()
# 本月数据
month_uv = self.get_monthly_unique_visitors()
# 7天趋势
daily_trend = self.get_period_trend(MetricType.UV, 'daily', 7)
# 24小时趋势
hourly_trend = self.get_period_trend(MetricType.UV, 'hourly', 24)
return {
'summary': {
'today_uv': today_uv,
'yesterday_uv': yesterday_uv,
'week_uv': week_uv,
'month_uv': month_uv,
'daily_growth': (today_uv - yesterday_uv) / yesterday_uv if yesterday_uv > 0 else 0
},
'trends': {
'daily': [{
'date': trend.timestamp.strftime('%Y-%m-%d'),
'uv': trend.estimated_count
} for trend in daily_trend],
'hourly': [{
'hour': trend.timestamp.strftime('%Y-%m-%d %H:00'),
'uv': trend.estimated_count
} for trend in hourly_trend]
}
}
# 使用示例
if __name__ == "__main__":
redis_client = redis.Redis(decode_responses=True)
analytics = HyperLogLogAnalytics(redis_client)
# 模拟访客数据
import random
# 添加今日访客
today_visitors = [f"visitor_{i}" for i in range(1000, 1500)]
result = analytics.add_multiple_visitors(today_visitors)
print(f"今日访客添加结果: {result}")
# 添加昨日访客(部分重复)
yesterday = datetime.now() - timedelta(days=1)
yesterday_visitors = [f"visitor_{i}" for i in range(800, 1200)] # 部分重复
for visitor_id in yesterday_visitors:
analytics.add_unique_visitor(visitor_id, yesterday)
# 获取统计数据
today_stats = analytics.get_daily_stats(MetricType.UV)
print(f"\n今日独立访客: {today_stats.estimated_count}")
yesterday_stats = analytics.get_daily_stats(MetricType.UV, yesterday)
print(f"昨日独立访客: {yesterday_stats.estimated_count}")
# 获取周独立访客
week_uv = analytics.get_weekly_unique_visitors()
print(f"本周独立访客: {week_uv}")
# 获取7天趋势
trends = analytics.get_period_trend(MetricType.UV, 'daily', 7)
print(f"\n7天访客趋势:")
for trend in trends:
print(f" {trend.timestamp.strftime('%Y-%m-%d')}: {trend.estimated_count}")
# 比较今日和昨日
today_key = datetime.now().strftime('%Y%m%d')
yesterday_key = yesterday.strftime('%Y%m%d')
comparison = analytics.compare_periods(MetricType.UV, yesterday_key, today_key)
print(f"\n今日vs昨日比较:")
print(f" 昨日: {comparison['period1']['count']}")
print(f" 今日: {comparison['period2']['count']}")
print(f" 增长率: {comparison['growth_rate']:.2%}")
print(f" 重叠率: {comparison['overlap_rate']:.2%}")
# 获取仪表板数据
dashboard = analytics.get_analytics_dashboard()
print(f"\n仪表板数据:")
print(f" 今日UV: {dashboard['summary']['today_uv']}")
print(f" 本周UV: {dashboard['summary']['week_uv']}")
print(f" 本月UV: {dashboard['summary']['month_uv']}")
print(f" 日增长率: {dashboard['summary']['daily_growth']:.2%}")
7.5 总结
通过本章的学习,我们深入了解了Redis的位图和HyperLogLog两种特殊数据结构:
7.5.1 位图特点和应用
- 高效存储: 每个位只占用1bit,适合大规模布尔值存储
- 快速操作: 支持位级别的快速操作和统计
- 应用场景: 用户签到、在线状态、权限控制、活跃统计等
7.5.2 HyperLogLog特点和应用
- 内存固定: 仅使用12KB内存,可处理数十亿数据
- 高精度: 标准误差小于1%的基数估算
- 可合并: 支持多个HyperLogLog的合并操作
- 应用场景: 独立访客统计、去重计数、大数据分析等
7.5.3 最佳实践
- 合理选择: 根据精度要求选择位图或HyperLogLog
- 内存优化: 位图适合稀疏数据,HyperLogLog适合大基数统计
- 过期策略: 合理设置数据过期时间,避免内存泄漏
- 批量操作: 使用管道或批量命令提高性能
- 监控告警: 监控内存使用和操作性能
7.5.4 性能考虑
- 位图性能: 位操作时间复杂度为O(1),统计操作为O(n)
- HyperLogLog性能: 添加和计数操作都是O(1)时间复杂度
- 内存使用: 位图内存使用与最大偏移量相关,HyperLogLog固定12KB
- 网络开销: 批量操作可以减少网络往返次数
7.5.5 参考资料
下一章预告: 在下一章中,我们将学习Redis的发布订阅功能,了解如何实现实时消息传递和事件通知系统。