7.1 位图(Bitmap)

7.1.1 位图概述

Redis位图(Bitmap)是一种特殊的字符串类型,可以对字符串的每一位进行操作。位图非常适合存储布尔值信息,具有极高的空间效率。

7.1.2 位图特点

  • 空间高效: 每个位只占用1bit空间
  • 快速操作: 位操作的时间复杂度为O(1)
  • 大容量: 理论上可以存储2^32个位
  • 原子操作: 所有位操作都是原子的
  • 统计功能: 支持快速统计位数

7.1.3 应用场景

  • 用户签到: 记录用户每日签到状态
  • 在线状态: 记录用户在线/离线状态
  • 权限控制: 记录用户权限位
  • 活跃统计: 统计日活、月活用户
  • 布隆过滤器: 实现概率性数据结构

7.2 位图基本操作

7.2.1 设置和获取位

# 设置位值
SETBIT key offset value
SETBIT user:1001:signin 0 1    # 设置第0天签到
SETBIT user:1001:signin 1 1    # 设置第1天签到
SETBIT user:1001:signin 5 1    # 设置第5天签到

# 获取位值
GETBIT key offset
GETBIT user:1001:signin 0      # 返回1
GETBIT user:1001:signin 2      # 返回0(未签到)

# 统计位数
BITCOUNT key [start end]
BITCOUNT user:1001:signin      # 统计总签到天数
BITCOUNT user:1001:signin 0 6  # 统计前7天签到次数

# 查找第一个设置的位
BITPOS key bit [start] [end]
BITPOS user:1001:signin 1      # 查找第一个签到的位置
BITPOS user:1001:signin 0      # 查找第一个未签到的位置

7.2.2 位运算操作

# 位运算
BITOP operation destkey key [key ...]

# 创建测试数据
SETBIT user:1001 0 1
SETBIT user:1001 1 1
SETBIT user:1001 2 0

SETBIT user:1002 0 1
SETBIT user:1002 1 0
SETBIT user:1002 2 1

# AND运算(交集)
BITOP AND result user:1001 user:1002
GETBIT result 0  # 返回1(两个用户都在第0天活跃)
GETBIT result 1  # 返回0(第1天不是都活跃)

# OR运算(并集)
BITOP OR result user:1001 user:1002
BITCOUNT result  # 统计至少有一个用户活跃的天数

# XOR运算(异或)
BITOP XOR result user:1001 user:1002

# NOT运算(取反)
BITOP NOT result user:1001

7.3 Python位图实现

7.3.1 基础位图类

import redis
from typing import List, Dict, Optional, Set, Tuple
from datetime import datetime, timedelta
import calendar

class RedisBitmap:
    def __init__(self, redis_client: redis.Redis, key: str):
        self.redis_client = redis_client
        self.key = key
    
    def set_bit(self, offset: int, value: int) -> int:
        """设置位值"""
        return self.redis_client.setbit(self.key, offset, value)
    
    def get_bit(self, offset: int) -> int:
        """获取位值"""
        return self.redis_client.getbit(self.key, offset)
    
    def count(self, start: int = None, end: int = None) -> int:
        """统计位数"""
        if start is not None and end is not None:
            return self.redis_client.bitcount(self.key, start, end)
        return self.redis_client.bitcount(self.key)
    
    def find_first(self, bit: int, start: int = None, end: int = None) -> int:
        """查找第一个指定位"""
        if start is not None and end is not None:
            return self.redis_client.bitpos(self.key, bit, start, end)
        elif start is not None:
            return self.redis_client.bitpos(self.key, bit, start)
        return self.redis_client.bitpos(self.key, bit)
    
    def set_range(self, start: int, end: int, value: int = 1):
        """设置范围内的位"""
        for offset in range(start, end + 1):
            self.set_bit(offset, value)
    
    def get_range(self, start: int, end: int) -> List[int]:
        """获取范围内的位"""
        return [self.get_bit(offset) for offset in range(start, end + 1)]
    
    def clear(self) -> bool:
        """清空位图"""
        return self.redis_client.delete(self.key) > 0
    
    def size(self) -> int:
        """获取位图大小(字节)"""
        return len(self.redis_client.get(self.key) or b'')
    
    def to_list(self, max_offset: int = None) -> List[int]:
        """转换为列表"""
        if max_offset is None:
            # 找到最后一个设置的位
            last_bit = self.find_first(1)
            if last_bit == -1:
                return []
            max_offset = last_bit
        
        return self.get_range(0, max_offset)

class BitmapOperations:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
    
    def bitop_and(self, dest_key: str, *keys: str) -> int:
        """位AND运算"""
        return self.redis_client.bitop('AND', dest_key, *keys)
    
    def bitop_or(self, dest_key: str, *keys: str) -> int:
        """位OR运算"""
        return self.redis_client.bitop('OR', dest_key, *keys)
    
    def bitop_xor(self, dest_key: str, *keys: str) -> int:
        """位XOR运算"""
        return self.redis_client.bitop('XOR', dest_key, *keys)
    
    def bitop_not(self, dest_key: str, key: str) -> int:
        """位NOT运算"""
        return self.redis_client.bitop('NOT', dest_key, key)
    
    def intersection(self, *keys: str) -> Set[int]:
        """获取交集位置"""
        if not keys:
            return set()
        
        temp_key = f"temp_and_{int(datetime.now().timestamp())}"
        try:
            self.bitop_and(temp_key, *keys)
            result = set()
            
            # 查找所有设置的位
            offset = 0
            while True:
                pos = self.redis_client.bitpos(temp_key, 1, offset)
                if pos == -1:
                    break
                result.add(pos)
                offset = pos + 1
            
            return result
        finally:
            self.redis_client.delete(temp_key)
    
    def union(self, *keys: str) -> Set[int]:
        """获取并集位置"""
        if not keys:
            return set()
        
        temp_key = f"temp_or_{int(datetime.now().timestamp())}"
        try:
            self.bitop_or(temp_key, *keys)
            result = set()
            
            offset = 0
            while True:
                pos = self.redis_client.bitpos(temp_key, 1, offset)
                if pos == -1:
                    break
                result.add(pos)
                offset = pos + 1
            
            return result
        finally:
            self.redis_client.delete(temp_key)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    
    # 创建位图
    user_activity = RedisBitmap(redis_client, "user:1001:activity")
    
    # 设置用户活跃天数
    user_activity.set_bit(0, 1)  # 第1天活跃
    user_activity.set_bit(2, 1)  # 第3天活跃
    user_activity.set_bit(5, 1)  # 第6天活跃
    
    # 查询活跃状态
    print(f"第1天活跃: {user_activity.get_bit(0)}")
    print(f"第2天活跃: {user_activity.get_bit(1)}")
    
    # 统计活跃天数
    total_active_days = user_activity.count()
    print(f"总活跃天数: {total_active_days}")
    
    # 查找第一个活跃天
    first_active = user_activity.find_first(1)
    print(f"第一个活跃天: {first_active}")
    
    # 位运算示例
    user1 = RedisBitmap(redis_client, "user:1001:signin")
    user2 = RedisBitmap(redis_client, "user:1002:signin")
    
    # 设置签到数据
    user1.set_range(0, 2, 1)  # 前3天都签到
    user2.set_bit(1, 1)       # 只有第2天签到
    user2.set_bit(3, 1)       # 第4天签到
    
    # 位运算
    bitops = BitmapOperations(redis_client)
    
    # 找出两个用户都签到的天数
    common_days = bitops.intersection("user:1001:signin", "user:1002:signin")
    print(f"共同签到天数: {common_days}")
    
    # 找出至少有一个用户签到的天数
    any_signin_days = bitops.union("user:1001:signin", "user:1002:signin")
    print(f"任一用户签到天数: {any_signin_days}")
    
    # 清理
    user_activity.clear()
    user1.clear()
    user2.clear()

7.3.2 用户签到系统

import redis
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum

class SigninRewardType(Enum):
    POINTS = "points"
    COINS = "coins"
    ITEMS = "items"

@dataclass
class SigninReward:
    reward_type: SigninRewardType
    amount: int
    description: str

@dataclass
class SigninStats:
    total_days: int
    current_streak: int
    max_streak: int
    this_month_days: int
    last_signin_date: Optional[datetime]

class UserSigninSystem:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.signin_prefix = "signin"
        self.user_stats_prefix = "signin_stats"
        
        # 签到奖励配置
        self.daily_rewards = {
            1: SigninReward(SigninRewardType.POINTS, 10, "首次签到奖励"),
            7: SigninReward(SigninRewardType.COINS, 50, "连续7天奖励"),
            30: SigninReward(SigninRewardType.ITEMS, 1, "连续30天奖励")
        }
    
    def _get_signin_key(self, user_id: str, year: int, month: int) -> str:
        """获取签到键名"""
        return f"{self.signin_prefix}:{user_id}:{year}:{month:02d}"
    
    def _get_day_offset(self, date: datetime) -> int:
        """获取月内天数偏移"""
        return date.day - 1
    
    def signin(self, user_id: str, date: datetime = None) -> Dict[str, any]:
        """用户签到"""
        if date is None:
            date = datetime.now()
        
        year, month = date.year, date.month
        day_offset = self._get_day_offset(date)
        signin_key = self._get_signin_key(user_id, year, month)
        
        # 检查是否已签到
        if self.redis_client.getbit(signin_key, day_offset):
            return {
                'success': False,
                'message': '今日已签到',
                'date': date.strftime('%Y-%m-%d')
            }
        
        # 执行签到
        self.redis_client.setbit(signin_key, day_offset, 1)
        
        # 设置过期时间(保留13个月)
        self.redis_client.expire(signin_key, 86400 * 400)
        
        # 更新统计信息
        stats = self._update_stats(user_id, date)
        
        # 计算奖励
        rewards = self._calculate_rewards(stats)
        
        return {
            'success': True,
            'message': '签到成功',
            'date': date.strftime('%Y-%m-%d'),
            'stats': stats,
            'rewards': rewards
        }
    
    def is_signed_in(self, user_id: str, date: datetime = None) -> bool:
        """检查是否已签到"""
        if date is None:
            date = datetime.now()
        
        year, month = date.year, date.month
        day_offset = self._get_day_offset(date)
        signin_key = self._get_signin_key(user_id, year, month)
        
        return bool(self.redis_client.getbit(signin_key, day_offset))
    
    def get_month_signin_status(self, user_id: str, year: int, month: int) -> List[bool]:
        """获取月签到状态"""
        signin_key = self._get_signin_key(user_id, year, month)
        
        # 获取该月的天数
        import calendar
        days_in_month = calendar.monthrange(year, month)[1]
        
        status = []
        for day in range(days_in_month):
            is_signed = bool(self.redis_client.getbit(signin_key, day))
            status.append(is_signed)
        
        return status
    
    def get_signin_calendar(self, user_id: str, year: int, month: int) -> Dict[str, any]:
        """获取签到日历"""
        signin_status = self.get_month_signin_status(user_id, year, month)
        signin_key = self._get_signin_key(user_id, year, month)
        
        # 统计本月签到天数
        month_signin_count = self.redis_client.bitcount(signin_key)
        
        # 生成日历数据
        import calendar
        cal = calendar.monthcalendar(year, month)
        calendar_data = []
        
        for week in cal:
            week_data = []
            for day in week:
                if day == 0:
                    week_data.append({'day': 0, 'signed': False})
                else:
                    signed = signin_status[day - 1] if day <= len(signin_status) else False
                    week_data.append({'day': day, 'signed': signed})
            calendar_data.append(week_data)
        
        return {
            'year': year,
            'month': month,
            'calendar': calendar_data,
            'month_signin_count': month_signin_count,
            'total_days': len(signin_status)
        }
    
    def get_signin_stats(self, user_id: str) -> SigninStats:
        """获取签到统计"""
        now = datetime.now()
        
        # 计算总签到天数
        total_days = 0
        for year in range(now.year - 1, now.year + 1):
            for month in range(1, 13):
                if year == now.year and month > now.month:
                    break
                signin_key = self._get_signin_key(user_id, year, month)
                total_days += self.redis_client.bitcount(signin_key)
        
        # 计算当前连续签到天数
        current_streak = self._calculate_current_streak(user_id, now)
        
        # 计算最大连续签到天数
        max_streak = self._calculate_max_streak(user_id)
        
        # 本月签到天数
        this_month_key = self._get_signin_key(user_id, now.year, now.month)
        this_month_days = self.redis_client.bitcount(this_month_key)
        
        # 最后签到日期
        last_signin_date = self._get_last_signin_date(user_id)
        
        return SigninStats(
            total_days=total_days,
            current_streak=current_streak,
            max_streak=max_streak,
            this_month_days=this_month_days,
            last_signin_date=last_signin_date
        )
    
    def _calculate_current_streak(self, user_id: str, end_date: datetime) -> int:
        """计算当前连续签到天数"""
        streak = 0
        current_date = end_date
        
        # 从今天开始往前查找
        for _ in range(365):  # 最多查找一年
            if self.is_signed_in(user_id, current_date):
                streak += 1
                current_date -= timedelta(days=1)
            else:
                break
        
        return streak
    
    def _calculate_max_streak(self, user_id: str) -> int:
        """计算最大连续签到天数"""
        # 简化实现,实际应该遍历所有历史数据
        # 这里只计算最近一年的数据
        now = datetime.now()
        max_streak = 0
        current_streak = 0
        
        # 从一年前开始计算
        start_date = now - timedelta(days=365)
        current_date = start_date
        
        while current_date <= now:
            if self.is_signed_in(user_id, current_date):
                current_streak += 1
                max_streak = max(max_streak, current_streak)
            else:
                current_streak = 0
            current_date += timedelta(days=1)
        
        return max_streak
    
    def _get_last_signin_date(self, user_id: str) -> Optional[datetime]:
        """获取最后签到日期"""
        now = datetime.now()
        
        # 从今天开始往前查找
        for i in range(365):
            check_date = now - timedelta(days=i)
            if self.is_signed_in(user_id, check_date):
                return check_date
        
        return None
    
    def _update_stats(self, user_id: str, date: datetime) -> SigninStats:
        """更新统计信息"""
        # 这里可以缓存统计信息到Redis中,避免每次重新计算
        return self.get_signin_stats(user_id)
    
    def _calculate_rewards(self, stats: SigninStats) -> List[SigninReward]:
        """计算奖励"""
        rewards = []
        
        # 基础签到奖励
        rewards.append(SigninReward(SigninRewardType.POINTS, 10, "每日签到奖励"))
        
        # 连续签到奖励
        for days, reward in self.daily_rewards.items():
            if stats.current_streak >= days and stats.current_streak % days == 0:
                rewards.append(reward)
        
        return rewards
    
    def get_signin_ranking(self, limit: int = 10) -> List[Dict[str, any]]:
        """获取签到排行榜"""
        # 这里需要维护一个签到排行榜的有序集合
        ranking_key = "signin_ranking"
        
        # 获取排行榜数据
        results = self.redis_client.zrevrange(ranking_key, 0, limit-1, withscores=True)
        
        ranking = []
        for i, (user_id, score) in enumerate(results):
            ranking.append({
                'rank': i + 1,
                'user_id': user_id,
                'total_signin_days': int(score)
            })
        
        return ranking
    
    def update_ranking(self, user_id: str):
        """更新排行榜"""
        stats = self.get_signin_stats(user_id)
        ranking_key = "signin_ranking"
        
        # 更新用户在排行榜中的分数
        self.redis_client.zadd(ranking_key, {user_id: stats.total_days})

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    signin_system = UserSigninSystem(redis_client)
    
    user_id = "user_1001"
    
    # 用户签到
    result = signin_system.signin(user_id)
    print(f"签到结果: {result}")
    
    # 检查今日是否已签到
    is_signed = signin_system.is_signed_in(user_id)
    print(f"今日已签到: {is_signed}")
    
    # 获取本月签到日历
    now = datetime.now()
    calendar_data = signin_system.get_signin_calendar(user_id, now.year, now.month)
    print(f"\n本月签到日历:")
    print(f"签到天数: {calendar_data['month_signin_count']}/{calendar_data['total_days']}")
    
    # 获取签到统计
    stats = signin_system.get_signin_stats(user_id)
    print(f"\n签到统计:")
    print(f"总签到天数: {stats.total_days}")
    print(f"当前连续: {stats.current_streak}天")
    print(f"最大连续: {stats.max_streak}天")
    print(f"本月签到: {stats.this_month_days}天")
    
    if stats.last_signin_date:
        print(f"最后签到: {stats.last_signin_date.strftime('%Y-%m-%d')}")

7.3.3 实时活跃用户统计

import redis
from datetime import datetime, timedelta
from typing import Dict, List, Set, Optional
from dataclasses import dataclass
from enum import Enum

class TimeWindow(Enum):
    HOURLY = "hourly"
    DAILY = "daily"
    WEEKLY = "weekly"
    MONTHLY = "monthly"

@dataclass
class ActivityStats:
    window: TimeWindow
    timestamp: datetime
    active_users: int
    new_users: int
    returning_users: int

class RealTimeActivityTracker:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.activity_prefix = "activity"
        self.user_first_seen_prefix = "user_first_seen"
    
    def _get_activity_key(self, window: TimeWindow, timestamp: datetime) -> str:
        """获取活跃用户键名"""
        if window == TimeWindow.HOURLY:
            return f"{self.activity_prefix}:hourly:{timestamp.strftime('%Y%m%d%H')}"
        elif window == TimeWindow.DAILY:
            return f"{self.activity_prefix}:daily:{timestamp.strftime('%Y%m%d')}"
        elif window == TimeWindow.WEEKLY:
            # 获取周的第一天
            week_start = timestamp - timedelta(days=timestamp.weekday())
            return f"{self.activity_prefix}:weekly:{week_start.strftime('%Y%m%d')}"
        elif window == TimeWindow.MONTHLY:
            return f"{self.activity_prefix}:monthly:{timestamp.strftime('%Y%m')}"
    
    def _get_user_offset(self, user_id: str) -> int:
        """获取用户在位图中的偏移量"""
        # 简单的哈希函数,实际应用中可能需要更复杂的映射
        return hash(user_id) % (2**20)  # 支持约100万用户
    
    def record_user_activity(self, user_id: str, timestamp: datetime = None) -> Dict[str, bool]:
        """记录用户活跃"""
        if timestamp is None:
            timestamp = datetime.now()
        
        user_offset = self._get_user_offset(user_id)
        results = {}
        
        # 记录到各个时间窗口
        for window in TimeWindow:
            activity_key = self._get_activity_key(window, timestamp)
            
            # 检查用户是否已经在这个时间窗口内活跃过
            was_active = bool(self.redis_client.getbit(activity_key, user_offset))
            
            # 设置用户活跃
            self.redis_client.setbit(activity_key, user_offset, 1)
            
            # 设置过期时间
            if window == TimeWindow.HOURLY:
                self.redis_client.expire(activity_key, 86400 * 7)  # 保留7天
            elif window == TimeWindow.DAILY:
                self.redis_client.expire(activity_key, 86400 * 90)  # 保留90天
            elif window == TimeWindow.WEEKLY:
                self.redis_client.expire(activity_key, 86400 * 365)  # 保留1年
            elif window == TimeWindow.MONTHLY:
                self.redis_client.expire(activity_key, 86400 * 730)  # 保留2年
            
            results[window.value] = not was_active  # 返回是否是新活跃
        
        # 记录用户首次出现时间
        first_seen_key = f"{self.user_first_seen_prefix}:{user_id}"
        if not self.redis_client.exists(first_seen_key):
            self.redis_client.set(first_seen_key, timestamp.timestamp())
            self.redis_client.expire(first_seen_key, 86400 * 730)  # 保留2年
        
        return results
    
    def get_active_users_count(self, window: TimeWindow, timestamp: datetime = None) -> int:
        """获取活跃用户数"""
        if timestamp is None:
            timestamp = datetime.now()
        
        activity_key = self._get_activity_key(window, timestamp)
        return self.redis_client.bitcount(activity_key)
    
    def get_activity_stats(self, window: TimeWindow, timestamp: datetime = None) -> ActivityStats:
        """获取活跃统计"""
        if timestamp is None:
            timestamp = datetime.now()
        
        current_key = self._get_activity_key(window, timestamp)
        active_users = self.redis_client.bitcount(current_key)
        
        # 计算新用户和回归用户
        new_users = 0
        returning_users = 0
        
        if window == TimeWindow.DAILY:
            # 获取昨天的活跃用户
            yesterday = timestamp - timedelta(days=1)
            yesterday_key = self._get_activity_key(window, yesterday)
            
            # 计算新用户(今天活跃但昨天不活跃)
            temp_new_key = f"temp_new_{int(timestamp.timestamp())}"
            temp_returning_key = f"temp_returning_{int(timestamp.timestamp())}"
            
            try:
                # 新用户 = 今天活跃 AND NOT 昨天活跃
                self.redis_client.bitop('NOT', temp_new_key, yesterday_key)
                self.redis_client.bitop('AND', temp_new_key, current_key, temp_new_key)
                new_users = self.redis_client.bitcount(temp_new_key)
                
                # 回归用户 = 今天活跃 AND 昨天活跃
                self.redis_client.bitop('AND', temp_returning_key, current_key, yesterday_key)
                returning_users = self.redis_client.bitcount(temp_returning_key)
                
            finally:
                self.redis_client.delete(temp_new_key, temp_returning_key)
        
        return ActivityStats(
            window=window,
            timestamp=timestamp,
            active_users=active_users,
            new_users=new_users,
            returning_users=returning_users
        )
    
    def get_activity_trend(self, window: TimeWindow, days: int = 7) -> List[ActivityStats]:
        """获取活跃趋势"""
        trends = []
        now = datetime.now()
        
        for i in range(days):
            if window == TimeWindow.DAILY:
                target_date = now - timedelta(days=i)
            elif window == TimeWindow.HOURLY:
                target_date = now - timedelta(hours=i)
            else:
                target_date = now - timedelta(days=i * 7)  # 周数据
            
            stats = self.get_activity_stats(window, target_date)
            trends.append(stats)
        
        return list(reversed(trends))  # 按时间正序返回
    
    def get_user_activity_pattern(self, user_id: str, days: int = 30) -> Dict[str, any]:
        """获取用户活跃模式"""
        user_offset = self._get_user_offset(user_id)
        pattern = []
        now = datetime.now()
        
        for i in range(days):
            date = now - timedelta(days=i)
            daily_key = self._get_activity_key(TimeWindow.DAILY, date)
            is_active = bool(self.redis_client.getbit(daily_key, user_offset))
            
            pattern.append({
                'date': date.strftime('%Y-%m-%d'),
                'active': is_active
            })
        
        # 计算活跃天数和连续活跃天数
        active_days = sum(1 for p in pattern if p['active'])
        
        # 计算最大连续活跃天数
        max_streak = 0
        current_streak = 0
        for p in reversed(pattern):  # 从最早开始计算
            if p['active']:
                current_streak += 1
                max_streak = max(max_streak, current_streak)
            else:
                current_streak = 0
        
        # 获取用户首次出现时间
        first_seen_key = f"{self.user_first_seen_prefix}:{user_id}"
        first_seen_timestamp = self.redis_client.get(first_seen_key)
        first_seen = None
        if first_seen_timestamp:
            first_seen = datetime.fromtimestamp(float(first_seen_timestamp))
        
        return {
            'user_id': user_id,
            'days_analyzed': days,
            'active_days': active_days,
            'activity_rate': active_days / days,
            'max_consecutive_days': max_streak,
            'first_seen': first_seen.strftime('%Y-%m-%d') if first_seen else None,
            'pattern': list(reversed(pattern))  # 按时间正序
        }
    
    def get_cohort_analysis(self, start_date: datetime, cohort_size_days: int = 7) -> Dict[str, any]:
        """获取用户留存分析"""
        cohorts = []
        
        # 生成队列
        current_date = start_date
        end_date = datetime.now() - timedelta(days=cohort_size_days)
        
        while current_date <= end_date:
            cohort_end = current_date + timedelta(days=cohort_size_days - 1)
            
            # 获取队列期间的新用户
            cohort_users_key = f"cohort_{current_date.strftime('%Y%m%d')}"
            
            # 计算队列用户(简化实现)
            # 实际应该基于用户首次出现时间来确定队列
            
            cohort_data = {
                'cohort_start': current_date.strftime('%Y-%m-%d'),
                'cohort_end': cohort_end.strftime('%Y-%m-%d'),
                'initial_users': 0,  # 需要实现队列用户识别逻辑
                'retention': []  # 各周期的留存率
            }
            
            cohorts.append(cohort_data)
            current_date += timedelta(days=cohort_size_days)
        
        return {
            'cohort_size_days': cohort_size_days,
            'cohorts': cohorts
        }
    
    def cleanup_old_data(self, days_to_keep: int = 90):
        """清理旧数据"""
        cutoff_date = datetime.now() - timedelta(days=days_to_keep)
        
        # 清理小时数据
        for i in range(days_to_keep * 24, days_to_keep * 24 + 168):  # 额外清理一周
            old_date = datetime.now() - timedelta(hours=i)
            old_key = self._get_activity_key(TimeWindow.HOURLY, old_date)
            self.redis_client.delete(old_key)
        
        # 清理日数据
        for i in range(days_to_keep, days_to_keep + 30):  # 额外清理一个月
            old_date = datetime.now() - timedelta(days=i)
            old_key = self._get_activity_key(TimeWindow.DAILY, old_date)
            self.redis_client.delete(old_key)

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    tracker = RealTimeActivityTracker(redis_client)
    
    # 模拟用户活跃
    users = [f"user_{i}" for i in range(1000, 1100)]
    
    for user_id in users[:50]:  # 50个用户今天活跃
        result = tracker.record_user_activity(user_id)
        print(f"用户 {user_id} 活跃记录: {result}")
    
    # 获取今日活跃用户数
    daily_active = tracker.get_active_users_count(TimeWindow.DAILY)
    print(f"\n今日活跃用户数: {daily_active}")
    
    # 获取详细统计
    stats = tracker.get_activity_stats(TimeWindow.DAILY)
    print(f"今日统计:")
    print(f"  活跃用户: {stats.active_users}")
    print(f"  新用户: {stats.new_users}")
    print(f"  回归用户: {stats.returning_users}")
    
    # 获取活跃趋势
    trends = tracker.get_activity_trend(TimeWindow.DAILY, days=7)
    print(f"\n7天活跃趋势:")
    for trend in trends:
        print(f"  {trend.timestamp.strftime('%Y-%m-%d')}: {trend.active_users} 活跃用户")
    
    # 获取用户活跃模式
    user_pattern = tracker.get_user_activity_pattern("user_1001", days=7)
    print(f"\n用户活跃模式:")
    print(f"  活跃天数: {user_pattern['active_days']}/{user_pattern['days_analyzed']}")
    print(f"  活跃率: {user_pattern['activity_rate']:.2%}")
    print(f"  最大连续活跃: {user_pattern['max_consecutive_days']}天")

7.4 HyperLogLog

7.4.1 HyperLogLog概述

HyperLogLog是一种概率性数据结构,用于估算集合的基数(不重复元素的数量)。它具有以下特点:

  • 内存高效: 固定使用12KB内存
  • 高精度: 标准误差小于1%
  • 可合并: 支持多个HyperLogLog合并
  • 适合大数据: 可以处理数十亿级别的数据

7.4.2 HyperLogLog基本操作

# 添加元素
PFADD key element [element ...]
PFADD unique_visitors "user1" "user2" "user3"
PFADD unique_visitors "user1" "user4"  # user1重复,不会增加计数

# 获取基数估算
PFCOUNT key [key ...]
PFCOUNT unique_visitors  # 返回估算的唯一访客数

# 合并多个HyperLogLog
PFMERGE destkey sourcekey [sourcekey ...]
PFMERGE total_visitors page1_visitors page2_visitors
PFCOUNT total_visitors  # 返回合并后的唯一访客数

7.4.3 Python HyperLogLog实现

import redis
from typing import List, Dict, Set, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from enum import Enum

class MetricType(Enum):
    UV = "unique_visitors"  # 独立访客
    IP = "unique_ips"       # 独立IP
    USER = "unique_users"   # 独立用户
    DEVICE = "unique_devices"  # 独立设备

@dataclass
class UniqueStats:
    metric_type: MetricType
    period: str
    estimated_count: int
    timestamp: datetime

class HyperLogLogAnalytics:
    def __init__(self, redis_client: redis.Redis):
        self.redis_client = redis_client
        self.hll_prefix = "hll"
    
    def _get_hll_key(self, metric_type: MetricType, period: str) -> str:
        """获取HyperLogLog键名"""
        return f"{self.hll_prefix}:{metric_type.value}:{period}"
    
    def add_unique_visitor(self, visitor_id: str, timestamp: datetime = None) -> Dict[str, int]:
        """添加独立访客"""
        if timestamp is None:
            timestamp = datetime.now()
        
        # 生成不同时间粒度的键
        periods = {
            'hourly': timestamp.strftime('%Y%m%d%H'),
            'daily': timestamp.strftime('%Y%m%d'),
            'weekly': (timestamp - timedelta(days=timestamp.weekday())).strftime('%Y%m%d'),
            'monthly': timestamp.strftime('%Y%m'),
            'yearly': timestamp.strftime('%Y')
        }
        
        results = {}
        
        for period_type, period_key in periods.items():
            hll_key = self._get_hll_key(MetricType.UV, f"{period_type}:{period_key}")
            
            # 添加访客ID
            added = self.redis_client.pfadd(hll_key, visitor_id)
            
            # 设置过期时间
            if period_type == 'hourly':
                self.redis_client.expire(hll_key, 86400 * 7)  # 7天
            elif period_type == 'daily':
                self.redis_client.expire(hll_key, 86400 * 90)  # 90天
            elif period_type == 'weekly':
                self.redis_client.expire(hll_key, 86400 * 365)  # 1年
            elif period_type == 'monthly':
                self.redis_client.expire(hll_key, 86400 * 730)  # 2年
            
            # 获取当前估算值
            count = self.redis_client.pfcount(hll_key)
            results[period_type] = count
        
        return results
    
    def add_multiple_visitors(self, visitor_ids: List[str], timestamp: datetime = None) -> Dict[str, int]:
        """批量添加访客"""
        if timestamp is None:
            timestamp = datetime.now()
        
        periods = {
            'hourly': timestamp.strftime('%Y%m%d%H'),
            'daily': timestamp.strftime('%Y%m%d'),
            'weekly': (timestamp - timedelta(days=timestamp.weekday())).strftime('%Y%m%d'),
            'monthly': timestamp.strftime('%Y%m'),
            'yearly': timestamp.strftime('%Y')
        }
        
        results = {}
        
        for period_type, period_key in periods.items():
            hll_key = self._get_hll_key(MetricType.UV, f"{period_type}:{period_key}")
            
            # 批量添加
            self.redis_client.pfadd(hll_key, *visitor_ids)
            
            # 设置过期时间
            if period_type == 'hourly':
                self.redis_client.expire(hll_key, 86400 * 7)
            elif period_type == 'daily':
                self.redis_client.expire(hll_key, 86400 * 90)
            elif period_type == 'weekly':
                self.redis_client.expire(hll_key, 86400 * 365)
            elif period_type == 'monthly':
                self.redis_client.expire(hll_key, 86400 * 730)
            
            count = self.redis_client.pfcount(hll_key)
            results[period_type] = count
        
        return results
    
    def get_unique_count(self, metric_type: MetricType, period_type: str, period_key: str) -> int:
        """获取独立计数"""
        hll_key = self._get_hll_key(metric_type, f"{period_type}:{period_key}")
        return self.redis_client.pfcount(hll_key)
    
    def get_daily_stats(self, metric_type: MetricType, date: datetime = None) -> UniqueStats:
        """获取日统计"""
        if date is None:
            date = datetime.now()
        
        period_key = date.strftime('%Y%m%d')
        count = self.get_unique_count(metric_type, 'daily', period_key)
        
        return UniqueStats(
            metric_type=metric_type,
            period=f"daily:{period_key}",
            estimated_count=count,
            timestamp=date
        )
    
    def get_period_trend(self, metric_type: MetricType, period_type: str, days: int = 7) -> List[UniqueStats]:
        """获取周期趋势"""
        trends = []
        now = datetime.now()
        
        for i in range(days):
            if period_type == 'daily':
                target_date = now - timedelta(days=i)
                period_key = target_date.strftime('%Y%m%d')
            elif period_type == 'hourly':
                target_date = now - timedelta(hours=i)
                period_key = target_date.strftime('%Y%m%d%H')
            else:
                target_date = now - timedelta(days=i * 7)
                period_key = (target_date - timedelta(days=target_date.weekday())).strftime('%Y%m%d')
            
            count = self.get_unique_count(metric_type, period_type, period_key)
            
            trends.append(UniqueStats(
                metric_type=metric_type,
                period=f"{period_type}:{period_key}",
                estimated_count=count,
                timestamp=target_date
            ))
        
        return list(reversed(trends))
    
    def merge_periods(self, metric_type: MetricType, period_type: str, period_keys: List[str], dest_key: str = None) -> int:
        """合并多个周期的数据"""
        if dest_key is None:
            dest_key = f"merged_{metric_type.value}_{int(datetime.now().timestamp())}"
        
        source_keys = [self._get_hll_key(metric_type, f"{period_type}:{key}") for key in period_keys]
        
        # 过滤存在的键
        existing_keys = [key for key in source_keys if self.redis_client.exists(key)]
        
        if not existing_keys:
            return 0
        
        # 合并HyperLogLog
        self.redis_client.pfmerge(dest_key, *existing_keys)
        
        # 获取合并后的计数
        merged_count = self.redis_client.pfcount(dest_key)
        
        # 设置临时键的过期时间
        self.redis_client.expire(dest_key, 3600)  # 1小时后过期
        
        return merged_count
    
    def get_weekly_unique_visitors(self, start_date: datetime = None) -> int:
        """获取周独立访客数"""
        if start_date is None:
            start_date = datetime.now() - timedelta(days=datetime.now().weekday())
        
        # 生成一周的日期键
        daily_keys = []
        for i in range(7):
            date = start_date + timedelta(days=i)
            daily_keys.append(date.strftime('%Y%m%d'))
        
        return self.merge_periods(MetricType.UV, 'daily', daily_keys)
    
    def get_monthly_unique_visitors(self, year: int = None, month: int = None) -> int:
        """获取月独立访客数"""
        if year is None or month is None:
            now = datetime.now()
            year, month = now.year, now.month
        
        # 生成一个月的日期键
        import calendar
        days_in_month = calendar.monthrange(year, month)[1]
        
        daily_keys = []
        for day in range(1, days_in_month + 1):
            date = datetime(year, month, day)
            daily_keys.append(date.strftime('%Y%m%d'))
        
        return self.merge_periods(MetricType.UV, 'daily', daily_keys)
    
    def compare_periods(self, metric_type: MetricType, period1: str, period2: str) -> Dict[str, any]:
        """比较两个周期的数据"""
        count1 = self.get_unique_count(metric_type, 'daily', period1)
        count2 = self.get_unique_count(metric_type, 'daily', period2)
        
        # 计算重叠度(需要合并两个周期的数据)
        temp_key = f"temp_compare_{int(datetime.now().timestamp())}"
        try:
            key1 = self._get_hll_key(metric_type, f"daily:{period1}")
            key2 = self._get_hll_key(metric_type, f"daily:{period2}")
            
            if self.redis_client.exists(key1) and self.redis_client.exists(key2):
                self.redis_client.pfmerge(temp_key, key1, key2)
                merged_count = self.redis_client.pfcount(temp_key)
                
                # 估算重叠用户数
                overlap_estimate = count1 + count2 - merged_count
                overlap_rate = overlap_estimate / max(count1, count2) if max(count1, count2) > 0 else 0
            else:
                merged_count = 0
                overlap_estimate = 0
                overlap_rate = 0
            
            return {
                'period1': {'period': period1, 'count': count1},
                'period2': {'period': period2, 'count': count2},
                'merged_count': merged_count,
                'overlap_estimate': overlap_estimate,
                'overlap_rate': overlap_rate,
                'growth_rate': (count2 - count1) / count1 if count1 > 0 else 0
            }
        
        finally:
            self.redis_client.delete(temp_key)
    
    def get_analytics_dashboard(self) -> Dict[str, any]:
        """获取分析仪表板数据"""
        now = datetime.now()
        today = now.strftime('%Y%m%d')
        yesterday = (now - timedelta(days=1)).strftime('%Y%m%d')
        
        # 今日数据
        today_uv = self.get_unique_count(MetricType.UV, 'daily', today)
        yesterday_uv = self.get_unique_count(MetricType.UV, 'daily', yesterday)
        
        # 本周数据
        week_uv = self.get_weekly_unique_visitors()
        
        # 本月数据
        month_uv = self.get_monthly_unique_visitors()
        
        # 7天趋势
        daily_trend = self.get_period_trend(MetricType.UV, 'daily', 7)
        
        # 24小时趋势
        hourly_trend = self.get_period_trend(MetricType.UV, 'hourly', 24)
        
        return {
            'summary': {
                'today_uv': today_uv,
                'yesterday_uv': yesterday_uv,
                'week_uv': week_uv,
                'month_uv': month_uv,
                'daily_growth': (today_uv - yesterday_uv) / yesterday_uv if yesterday_uv > 0 else 0
            },
            'trends': {
                'daily': [{
                    'date': trend.timestamp.strftime('%Y-%m-%d'),
                    'uv': trend.estimated_count
                } for trend in daily_trend],
                'hourly': [{
                    'hour': trend.timestamp.strftime('%Y-%m-%d %H:00'),
                    'uv': trend.estimated_count
                } for trend in hourly_trend]
            }
        }

# 使用示例
if __name__ == "__main__":
    redis_client = redis.Redis(decode_responses=True)
    analytics = HyperLogLogAnalytics(redis_client)
    
    # 模拟访客数据
    import random
    
    # 添加今日访客
    today_visitors = [f"visitor_{i}" for i in range(1000, 1500)]
    result = analytics.add_multiple_visitors(today_visitors)
    print(f"今日访客添加结果: {result}")
    
    # 添加昨日访客(部分重复)
    yesterday = datetime.now() - timedelta(days=1)
    yesterday_visitors = [f"visitor_{i}" for i in range(800, 1200)]  # 部分重复
    
    for visitor_id in yesterday_visitors:
        analytics.add_unique_visitor(visitor_id, yesterday)
    
    # 获取统计数据
    today_stats = analytics.get_daily_stats(MetricType.UV)
    print(f"\n今日独立访客: {today_stats.estimated_count}")
    
    yesterday_stats = analytics.get_daily_stats(MetricType.UV, yesterday)
    print(f"昨日独立访客: {yesterday_stats.estimated_count}")
    
    # 获取周独立访客
    week_uv = analytics.get_weekly_unique_visitors()
    print(f"本周独立访客: {week_uv}")
    
    # 获取7天趋势
    trends = analytics.get_period_trend(MetricType.UV, 'daily', 7)
    print(f"\n7天访客趋势:")
    for trend in trends:
        print(f"  {trend.timestamp.strftime('%Y-%m-%d')}: {trend.estimated_count}")
    
    # 比较今日和昨日
    today_key = datetime.now().strftime('%Y%m%d')
    yesterday_key = yesterday.strftime('%Y%m%d')
    comparison = analytics.compare_periods(MetricType.UV, yesterday_key, today_key)
    print(f"\n今日vs昨日比较:")
    print(f"  昨日: {comparison['period1']['count']}")
    print(f"  今日: {comparison['period2']['count']}")
    print(f"  增长率: {comparison['growth_rate']:.2%}")
    print(f"  重叠率: {comparison['overlap_rate']:.2%}")
    
    # 获取仪表板数据
    dashboard = analytics.get_analytics_dashboard()
    print(f"\n仪表板数据:")
    print(f"  今日UV: {dashboard['summary']['today_uv']}")
    print(f"  本周UV: {dashboard['summary']['week_uv']}")
    print(f"  本月UV: {dashboard['summary']['month_uv']}")
    print(f"  日增长率: {dashboard['summary']['daily_growth']:.2%}")

7.5 总结

通过本章的学习,我们深入了解了Redis的位图和HyperLogLog两种特殊数据结构:

7.5.1 位图特点和应用

  • 高效存储: 每个位只占用1bit,适合大规模布尔值存储
  • 快速操作: 支持位级别的快速操作和统计
  • 应用场景: 用户签到、在线状态、权限控制、活跃统计等

7.5.2 HyperLogLog特点和应用

  • 内存固定: 仅使用12KB内存,可处理数十亿数据
  • 高精度: 标准误差小于1%的基数估算
  • 可合并: 支持多个HyperLogLog的合并操作
  • 应用场景: 独立访客统计、去重计数、大数据分析等

7.5.3 最佳实践

  • 合理选择: 根据精度要求选择位图或HyperLogLog
  • 内存优化: 位图适合稀疏数据,HyperLogLog适合大基数统计
  • 过期策略: 合理设置数据过期时间,避免内存泄漏
  • 批量操作: 使用管道或批量命令提高性能
  • 监控告警: 监控内存使用和操作性能

7.5.4 性能考虑

  • 位图性能: 位操作时间复杂度为O(1),统计操作为O(n)
  • HyperLogLog性能: 添加和计数操作都是O(1)时间复杂度
  • 内存使用: 位图内存使用与最大偏移量相关,HyperLogLog固定12KB
  • 网络开销: 批量操作可以减少网络往返次数

7.5.5 参考资料


下一章预告: 在下一章中,我们将学习Redis的发布订阅功能,了解如何实现实时消息传递和事件通知系统。