Redis作为内存数据库,在生产环境中需要特别关注安全配置。本章将详细介绍Redis的安全机制、配置方法和最佳实践。

12.1 Redis安全概述

12.1.1 安全威胁

  1. 未授权访问:默认配置下Redis不需要密码
  2. 数据泄露:敏感数据可能被恶意访问
  3. 命令注入:危险命令可能被滥用
  4. 网络攻击:未加密的网络传输
  5. 权限提升:利用Redis进行系统攻击

12.1.2 安全原则

  1. 最小权限原则:只授予必要的权限
  2. 深度防御:多层安全防护
  3. 定期审计:监控和审计访问日志
  4. 及时更新:保持Redis版本更新
  5. 网络隔离:限制网络访问

12.2 身份认证和授权

12.2.1 密码认证

import redis
import hashlib
import secrets
from typing import Dict, List, Optional, Any
import logging
from datetime import datetime, timedelta

class RedisAuthManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 密码策略配置
        self.password_policy = {
            'min_length': 12,
            'require_uppercase': True,
            'require_lowercase': True,
            'require_numbers': True,
            'require_special_chars': True,
            'max_age_days': 90,
            'history_count': 5
        }
    
    def generate_secure_password(self, length: int = 16) -> str:
        """生成安全密码"""
        import string
        
        # 确保包含各种字符类型
        chars = (
            string.ascii_lowercase +
            string.ascii_uppercase +
            string.digits +
            '!@#$%^&*()_+-=[]{}|;:,.<>?'
        )
        
        while True:
            password = ''.join(secrets.choice(chars) for _ in range(length))
            
            # 验证密码策略
            if self._validate_password_policy(password):
                return password
    
    def _validate_password_policy(self, password: str) -> bool:
        """验证密码策略"""
        if len(password) < self.password_policy['min_length']:
            return False
        
        if self.password_policy['require_uppercase'] and not any(c.isupper() for c in password):
            return False
        
        if self.password_policy['require_lowercase'] and not any(c.islower() for c in password):
            return False
        
        if self.password_policy['require_numbers'] and not any(c.isdigit() for c in password):
            return False
        
        if self.password_policy['require_special_chars']:
            special_chars = '!@#$%^&*()_+-=[]{}|;:,.<>?'
            if not any(c in special_chars for c in password):
                return False
        
        return True
    
    def hash_password(self, password: str, salt: Optional[str] = None) -> Dict[str, str]:
        """哈希密码"""
        if salt is None:
            salt = secrets.token_hex(32)
        
        # 使用PBKDF2进行密码哈希
        import hashlib
        password_hash = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            salt.encode('utf-8'),
            100000  # 迭代次数
        )
        
        return {
            'hash': password_hash.hex(),
            'salt': salt,
            'algorithm': 'pbkdf2_sha256',
            'iterations': 100000
        }
    
    def verify_password(self, password: str, stored_hash: Dict[str, str]) -> bool:
        """验证密码"""
        computed_hash = hashlib.pbkdf2_hmac(
            'sha256',
            password.encode('utf-8'),
            stored_hash['salt'].encode('utf-8'),
            stored_hash['iterations']
        )
        
        return computed_hash.hex() == stored_hash['hash']
    
    def set_redis_password(self, password: str) -> Dict[str, Any]:
        """设置Redis密码"""
        try:
            # 验证密码策略
            if not self._validate_password_policy(password):
                return {
                    'success': False,
                    'error': 'Password does not meet policy requirements'
                }
            
            # 设置密码
            self.redis.config_set('requirepass', password)
            
            # 记录密码设置事件
            self._log_security_event('password_set', {
                'timestamp': datetime.now().isoformat(),
                'action': 'password_configured'
            })
            
            return {
                'success': True,
                'message': 'Password set successfully'
            }
            
        except Exception as e:
            self.logger.error(f"Failed to set Redis password: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def rotate_password(self, new_password: str) -> Dict[str, Any]:
        """轮换密码"""
        try:
            # 获取当前密码历史
            password_history = self._get_password_history()
            
            # 检查密码是否在历史中
            new_hash = self.hash_password(new_password)
            for old_hash in password_history:
                if self.verify_password(new_password, old_hash):
                    return {
                        'success': False,
                        'error': 'Password was used recently'
                    }
            
            # 设置新密码
            result = self.set_redis_password(new_password)
            if result['success']:
                # 更新密码历史
                self._update_password_history(new_hash)
                
                self._log_security_event('password_rotated', {
                    'timestamp': datetime.now().isoformat(),
                    'action': 'password_rotated'
                })
            
            return result
            
        except Exception as e:
            self.logger.error(f"Failed to rotate password: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _get_password_history(self) -> List[Dict[str, str]]:
        """获取密码历史"""
        try:
            history_data = self.redis.get('redis:password_history')
            if history_data:
                import json
                return json.loads(history_data)
            return []
        except Exception:
            return []
    
    def _update_password_history(self, password_hash: Dict[str, str]) -> None:
        """更新密码历史"""
        try:
            history = self._get_password_history()
            
            # 添加新密码哈希
            password_hash['created_at'] = datetime.now().isoformat()
            history.append(password_hash)
            
            # 只保留最近的密码
            if len(history) > self.password_policy['history_count']:
                history = history[-self.password_policy['history_count']:]
            
            # 保存历史
            import json
            self.redis.set('redis:password_history', json.dumps(history))
            
        except Exception as e:
            self.logger.error(f"Failed to update password history: {e}")
    
    def _log_security_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
        """记录安全事件"""
        try:
            event = {
                'type': event_type,
                'timestamp': datetime.now().isoformat(),
                'data': event_data
            }
            
            import json
            self.redis.lpush('redis:security_events', json.dumps(event))
            
            # 只保留最近1000条事件
            self.redis.ltrim('redis:security_events', 0, 999)
            
        except Exception as e:
            self.logger.error(f"Failed to log security event: {e}")
    
    def check_password_expiry(self) -> Dict[str, Any]:
        """检查密码是否过期"""
        try:
            history = self._get_password_history()
            if not history:
                return {
                    'expired': False,
                    'message': 'No password history found'
                }
            
            # 获取最新密码的创建时间
            latest_password = history[-1]
            created_at = datetime.fromisoformat(latest_password['created_at'])
            
            # 计算过期时间
            expiry_date = created_at + timedelta(days=self.password_policy['max_age_days'])
            days_until_expiry = (expiry_date - datetime.now()).days
            
            return {
                'expired': days_until_expiry <= 0,
                'days_until_expiry': days_until_expiry,
                'expiry_date': expiry_date.isoformat(),
                'created_at': created_at.isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to check password expiry: {e}")
            return {
                'expired': False,
                'error': str(e)
            }

### 12.2.2 ACL访问控制列表

```python
class RedisACLManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 预定义角色
        self.predefined_roles = {
            'admin': {
                'commands': ['*'],
                'keys': ['*'],
                'description': 'Full administrative access'
            },
            'read_only': {
                'commands': ['get', 'mget', 'exists', 'keys', 'scan', 'type', 'ttl'],
                'keys': ['*'],
                'description': 'Read-only access to all keys'
            },
            'app_user': {
                'commands': ['get', 'set', 'del', 'exists', 'expire', 'ttl'],
                'keys': ['app:*'],
                'description': 'Application user with limited access'
            },
            'cache_user': {
                'commands': ['get', 'set', 'del', 'exists', 'expire'],
                'keys': ['cache:*'],
                'description': 'Cache operations only'
            }
        }
    
    def create_user(self, username: str, password: str, 
                   role: str = 'app_user', 
                   custom_rules: Optional[Dict[str, List[str]]] = None) -> Dict[str, Any]:
        """创建用户"""
        try:
            # 检查用户是否已存在
            if self._user_exists(username):
                return {
                    'success': False,
                    'error': f'User {username} already exists'
                }
            
            # 获取角色配置
            if role in self.predefined_roles:
                role_config = self.predefined_roles[role]
            elif custom_rules:
                role_config = custom_rules
            else:
                return {
                    'success': False,
                    'error': f'Unknown role {role} and no custom rules provided'
                }
            
            # 构建ACL规则
            acl_rules = self._build_acl_rules(username, password, role_config)
            
            # 创建用户
            self.redis.execute_command('ACL', 'SETUSER', username, *acl_rules)
            
            # 记录用户创建事件
            self._log_acl_event('user_created', {
                'username': username,
                'role': role,
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'username': username,
                'role': role,
                'rules': acl_rules
            }
            
        except Exception as e:
            self.logger.error(f"Failed to create user {username}: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _user_exists(self, username: str) -> bool:
        """检查用户是否存在"""
        try:
            users = self.redis.execute_command('ACL', 'LIST')
            for user_info in users:
                if user_info.startswith(f'user {username} '):
                    return True
            return False
        except Exception:
            return False
    
    def _build_acl_rules(self, username: str, password: str, 
                        role_config: Dict[str, List[str]]) -> List[str]:
        """构建ACL规则"""
        rules = ['on']  # 启用用户
        
        # 设置密码
        rules.append(f'>{password}')
        
        # 设置命令权限
        commands = role_config.get('commands', [])
        if '*' in commands:
            rules.append('+@all')
        else:
            # 首先禁用所有命令
            rules.append('-@all')
            # 然后启用指定命令
            for cmd in commands:
                rules.append(f'+{cmd}')
        
        # 设置键权限
        keys = role_config.get('keys', [])
        if '*' in keys:
            rules.append('~*')
        else:
            # 重置键权限
            rules.append('resetkeys')
            # 添加键模式
            for key_pattern in keys:
                rules.append(f'~{key_pattern}')
        
        return rules
    
    def update_user_password(self, username: str, new_password: str) -> Dict[str, Any]:
        """更新用户密码"""
        try:
            if not self._user_exists(username):
                return {
                    'success': False,
                    'error': f'User {username} does not exist'
                }
            
            # 更新密码
            self.redis.execute_command('ACL', 'SETUSER', username, f'>{new_password}')
            
            # 记录密码更新事件
            self._log_acl_event('password_updated', {
                'username': username,
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'message': f'Password updated for user {username}'
            }
            
        except Exception as e:
            self.logger.error(f"Failed to update password for {username}: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def disable_user(self, username: str) -> Dict[str, Any]:
        """禁用用户"""
        try:
            if not self._user_exists(username):
                return {
                    'success': False,
                    'error': f'User {username} does not exist'
                }
            
            # 禁用用户
            self.redis.execute_command('ACL', 'SETUSER', username, 'off')
            
            # 记录用户禁用事件
            self._log_acl_event('user_disabled', {
                'username': username,
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'message': f'User {username} disabled'
            }
            
        except Exception as e:
            self.logger.error(f"Failed to disable user {username}: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def delete_user(self, username: str) -> Dict[str, Any]:
        """删除用户"""
        try:
            if not self._user_exists(username):
                return {
                    'success': False,
                    'error': f'User {username} does not exist'
                }
            
            # 删除用户
            self.redis.execute_command('ACL', 'DELUSER', username)
            
            # 记录用户删除事件
            self._log_acl_event('user_deleted', {
                'username': username,
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'message': f'User {username} deleted'
            }
            
        except Exception as e:
            self.logger.error(f"Failed to delete user {username}: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def list_users(self) -> Dict[str, Any]:
        """列出所有用户"""
        try:
            users_info = self.redis.execute_command('ACL', 'LIST')
            users = []
            
            for user_info in users_info:
                # 解析用户信息
                parts = user_info.split(' ')
                if len(parts) >= 2 and parts[0] == 'user':
                    username = parts[1]
                    status = 'on' if 'on' in parts else 'off'
                    
                    users.append({
                        'username': username,
                        'status': status,
                        'raw_info': user_info
                    })
            
            return {
                'success': True,
                'users': users,
                'total_count': len(users)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to list users: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_user_info(self, username: str) -> Dict[str, Any]:
        """获取用户详细信息"""
        try:
            user_info = self.redis.execute_command('ACL', 'GETUSER', username)
            
            # 解析用户信息
            parsed_info = {}
            for i in range(0, len(user_info), 2):
                key = user_info[i]
                value = user_info[i + 1] if i + 1 < len(user_info) else None
                parsed_info[key] = value
            
            return {
                'success': True,
                'username': username,
                'info': parsed_info
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get user info for {username}: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _log_acl_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
        """记录ACL事件"""
        try:
            event = {
                'type': event_type,
                'timestamp': datetime.now().isoformat(),
                'data': event_data
            }
            
            import json
            self.redis.lpush('redis:acl_events', json.dumps(event))
            
            # 只保留最近1000条事件
            self.redis.ltrim('redis:acl_events', 0, 999)
            
        except Exception as e:
            self.logger.error(f"Failed to log ACL event: {e}")
    
    def audit_user_permissions(self) -> Dict[str, Any]:
        """审计用户权限"""
        try:
            users_result = self.list_users()
            if not users_result['success']:
                return users_result
            
            audit_results = []
            
            for user in users_result['users']:
                username = user['username']
                user_info_result = self.get_user_info(username)
                
                if user_info_result['success']:
                    user_info = user_info_result['info']
                    
                    # 分析权限
                    permissions_analysis = self._analyze_permissions(user_info)
                    
                    audit_results.append({
                        'username': username,
                        'status': user['status'],
                        'permissions': permissions_analysis,
                        'risk_level': self._assess_risk_level(permissions_analysis)
                    })
            
            return {
                'success': True,
                'audit_results': audit_results,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to audit user permissions: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _analyze_permissions(self, user_info: Dict[str, Any]) -> Dict[str, Any]:
        """分析用户权限"""
        commands = user_info.get('commands', '')
        keys = user_info.get('keys', '')
        
        # 分析命令权限
        has_all_commands = '+@all' in commands
        dangerous_commands = []
        
        if has_all_commands:
            dangerous_commands = ['ALL_COMMANDS']
        else:
            # 检查危险命令
            risky_commands = ['flushdb', 'flushall', 'config', 'eval', 'script', 'shutdown']
            for cmd in risky_commands:
                if f'+{cmd}' in commands:
                    dangerous_commands.append(cmd)
        
        # 分析键权限
        has_all_keys = '~*' in keys
        key_patterns = []
        
        if has_all_keys:
            key_patterns = ['*']
        else:
            # 提取键模式
            import re
            patterns = re.findall(r'~([^\s]+)', keys)
            key_patterns = patterns
        
        return {
            'has_all_commands': has_all_commands,
            'dangerous_commands': dangerous_commands,
            'has_all_keys': has_all_keys,
            'key_patterns': key_patterns,
            'command_count': len(dangerous_commands) if not has_all_commands else 'unlimited',
            'key_pattern_count': len(key_patterns)
        }
    
    def _assess_risk_level(self, permissions: Dict[str, Any]) -> str:
        """评估风险级别"""
        risk_score = 0
        
        # 命令权限风险
        if permissions['has_all_commands']:
            risk_score += 50
        else:
            risk_score += len(permissions['dangerous_commands']) * 10
        
        # 键权限风险
        if permissions['has_all_keys']:
            risk_score += 30
        else:
            # 根据键模式的广泛性评估风险
            for pattern in permissions['key_patterns']:
                if '*' in pattern:
                    risk_score += 10
                else:
                    risk_score += 2
        
        # 确定风险级别
        if risk_score >= 70:
            return 'high'
        elif risk_score >= 40:
            return 'medium'
        elif risk_score >= 20:
            return 'low'
        else:
            return 'minimal'

12.3 网络安全

12.3.1 TLS/SSL加密

import ssl
import socket
from typing import Dict, Any, Optional

class RedisSSLManager:
    def __init__(self, host: str = 'localhost', port: int = 6380):
        self.host = host
        self.port = port
        self.logger = logging.getLogger(__name__)
        
        # SSL配置
        self.ssl_config = {
            'cert_file': None,
            'key_file': None,
            'ca_file': None,
            'verify_mode': ssl.CERT_REQUIRED,
            'check_hostname': True
        }
    
    def configure_ssl(self, cert_file: str, key_file: str, 
                     ca_file: Optional[str] = None,
                     verify_mode: int = ssl.CERT_REQUIRED,
                     check_hostname: bool = True) -> Dict[str, Any]:
        """配置SSL设置"""
        try:
            # 验证证书文件
            import os
            
            if not os.path.exists(cert_file):
                return {
                    'success': False,
                    'error': f'Certificate file not found: {cert_file}'
                }
            
            if not os.path.exists(key_file):
                return {
                    'success': False,
                    'error': f'Key file not found: {key_file}'
                }
            
            if ca_file and not os.path.exists(ca_file):
                return {
                    'success': False,
                    'error': f'CA file not found: {ca_file}'
                }
            
            # 更新SSL配置
            self.ssl_config.update({
                'cert_file': cert_file,
                'key_file': key_file,
                'ca_file': ca_file,
                'verify_mode': verify_mode,
                'check_hostname': check_hostname
            })
            
            return {
                'success': True,
                'message': 'SSL configuration updated',
                'config': self.ssl_config.copy()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to configure SSL: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def create_ssl_context(self) -> ssl.SSLContext:
        """创建SSL上下文"""
        context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
        
        # 设置验证模式
        context.check_hostname = self.ssl_config['check_hostname']
        context.verify_mode = self.ssl_config['verify_mode']
        
        # 加载证书
        if self.ssl_config['cert_file'] and self.ssl_config['key_file']:
            context.load_cert_chain(
                self.ssl_config['cert_file'],
                self.ssl_config['key_file']
            )
        
        # 加载CA证书
        if self.ssl_config['ca_file']:
            context.load_verify_locations(self.ssl_config['ca_file'])
        
        return context
    
    def create_secure_connection(self, password: Optional[str] = None) -> Dict[str, Any]:
        """创建安全连接"""
        try:
            # 创建SSL上下文
            ssl_context = self.create_ssl_context()
            
            # 创建Redis连接
            redis_client = redis.Redis(
                host=self.host,
                port=self.port,
                password=password,
                ssl=True,
                ssl_context=ssl_context,
                decode_responses=True
            )
            
            # 测试连接
            redis_client.ping()
            
            return {
                'success': True,
                'client': redis_client,
                'ssl_info': self._get_ssl_info(redis_client)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to create secure connection: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _get_ssl_info(self, redis_client: redis.Redis) -> Dict[str, Any]:
        """获取SSL连接信息"""
        try:
            # 获取连接池中的连接
            connection = redis_client.connection_pool.get_connection('ping')
            
            if hasattr(connection, '_sock') and hasattr(connection._sock, 'getpeercert'):
                cert = connection._sock.getpeercert()
                cipher = connection._sock.cipher()
                
                return {
                    'certificate': cert,
                    'cipher': cipher,
                    'ssl_version': connection._sock.version()
                }
            
            return {'info': 'SSL connection established'}
            
        except Exception as e:
            return {'error': str(e)}
    
    def validate_certificate(self, cert_file: str) -> Dict[str, Any]:
        """验证证书"""
        try:
            import OpenSSL.crypto
            
            with open(cert_file, 'rb') as f:
                cert_data = f.read()
            
            cert = OpenSSL.crypto.load_certificate(
                OpenSSL.crypto.FILETYPE_PEM, cert_data
            )
            
            # 获取证书信息
            subject = cert.get_subject()
            issuer = cert.get_issuer()
            
            # 检查过期时间
            not_after = cert.get_notAfter().decode('ascii')
            expiry_date = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
            
            days_until_expiry = (expiry_date - datetime.now()).days
            
            return {
                'success': True,
                'certificate_info': {
                    'subject': dict(subject.get_components()),
                    'issuer': dict(issuer.get_components()),
                    'serial_number': cert.get_serial_number(),
                    'version': cert.get_version(),
                    'not_before': cert.get_notBefore().decode('ascii'),
                    'not_after': not_after,
                    'days_until_expiry': days_until_expiry,
                    'expired': days_until_expiry <= 0
                }
            }
            
        except Exception as e:
            self.logger.error(f"Failed to validate certificate: {e}")
            return {
                'success': False,
                'error': str(e)
            }

### 12.3.2 网络访问控制

```python
class RedisNetworkSecurity:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 网络安全配置
        self.security_config = {
            'bind_interfaces': ['127.0.0.1'],
            'protected_mode': True,
            'tcp_keepalive': 300,
            'timeout': 0,
            'tcp_backlog': 511
        }
    
    def configure_network_security(self, config: Dict[str, Any]) -> Dict[str, Any]:
        """配置网络安全"""
        try:
            results = {}
            
            # 配置绑定接口
            if 'bind_interfaces' in config:
                bind_result = self._configure_bind_interfaces(config['bind_interfaces'])
                results['bind_interfaces'] = bind_result
            
            # 配置保护模式
            if 'protected_mode' in config:
                protected_result = self._configure_protected_mode(config['protected_mode'])
                results['protected_mode'] = protected_result
            
            # 配置TCP keepalive
            if 'tcp_keepalive' in config:
                keepalive_result = self._configure_tcp_keepalive(config['tcp_keepalive'])
                results['tcp_keepalive'] = keepalive_result
            
            # 配置超时
            if 'timeout' in config:
                timeout_result = self._configure_timeout(config['timeout'])
                results['timeout'] = timeout_result
            
            return {
                'success': True,
                'results': results
            }
            
        except Exception as e:
            self.logger.error(f"Failed to configure network security: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _configure_bind_interfaces(self, interfaces: List[str]) -> Dict[str, Any]:
        """配置绑定接口"""
        try:
            # 验证IP地址格式
            import ipaddress
            
            valid_interfaces = []
            for interface in interfaces:
                try:
                    ipaddress.ip_address(interface)
                    valid_interfaces.append(interface)
                except ValueError:
                    self.logger.warning(f"Invalid IP address: {interface}")
            
            if not valid_interfaces:
                return {
                    'success': False,
                    'error': 'No valid IP addresses provided'
                }
            
            # 设置绑定接口
            bind_string = ' '.join(valid_interfaces)
            self.redis.config_set('bind', bind_string)
            
            self.security_config['bind_interfaces'] = valid_interfaces
            
            return {
                'success': True,
                'interfaces': valid_interfaces
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def _configure_protected_mode(self, enabled: bool) -> Dict[str, Any]:
        """配置保护模式"""
        try:
            self.redis.config_set('protected-mode', 'yes' if enabled else 'no')
            self.security_config['protected_mode'] = enabled
            
            return {
                'success': True,
                'protected_mode': enabled
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def _configure_tcp_keepalive(self, keepalive: int) -> Dict[str, Any]:
        """配置TCP keepalive"""
        try:
            self.redis.config_set('tcp-keepalive', keepalive)
            self.security_config['tcp_keepalive'] = keepalive
            
            return {
                'success': True,
                'tcp_keepalive': keepalive
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def _configure_timeout(self, timeout: int) -> Dict[str, Any]:
        """配置连接超时"""
        try:
            self.redis.config_set('timeout', timeout)
            self.security_config['timeout'] = timeout
            
            return {
                'success': True,
                'timeout': timeout
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    def setup_firewall_rules(self, allowed_ips: List[str], 
                           redis_port: int = 6379) -> Dict[str, Any]:
        """设置防火墙规则(示例)"""
        try:
            # 这是一个示例实现,实际环境中需要根据具体的防火墙系统调整
            import subprocess
            
            rules_applied = []
            
            # 清除现有规则(谨慎操作)
            # subprocess.run(['iptables', '-F'], check=True)
            
            # 允许本地回环
            subprocess.run([
                'iptables', '-A', 'INPUT', '-i', 'lo', '-j', 'ACCEPT'
            ], check=True)
            rules_applied.append('Allow loopback interface')
            
            # 允许指定IP访问Redis端口
            for ip in allowed_ips:
                subprocess.run([
                    'iptables', '-A', 'INPUT', '-p', 'tcp',
                    '--dport', str(redis_port), '-s', ip, '-j', 'ACCEPT'
                ], check=True)
                rules_applied.append(f'Allow {ip} to access port {redis_port}')
            
            # 拒绝其他访问
            subprocess.run([
                'iptables', '-A', 'INPUT', '-p', 'tcp',
                '--dport', str(redis_port), '-j', 'DROP'
            ], check=True)
            rules_applied.append(f'Drop other access to port {redis_port}')
            
            return {
                'success': True,
                'rules_applied': rules_applied,
                'warning': 'This is a basic example. Adjust for your firewall system.'
            }
            
        except Exception as e:
            self.logger.error(f"Failed to setup firewall rules: {e}")
            return {
                'success': False,
                'error': str(e),
                'warning': 'Firewall configuration requires appropriate permissions'
            }
    
    def monitor_connections(self) -> Dict[str, Any]:
        """监控连接"""
        try:
            # 获取客户端连接信息
            clients_info = self.redis.client_list()
            
            # 分析连接
            connection_analysis = {
                'total_connections': len(clients_info),
                'unique_ips': set(),
                'connection_types': {},
                'suspicious_connections': []
            }
            
            for client in clients_info:
                # 提取IP地址
                addr = client.get('addr', '')
                if ':' in addr:
                    ip = addr.split(':')[0]
                    connection_analysis['unique_ips'].add(ip)
                
                # 统计连接类型
                client_type = client.get('name', 'unknown')
                connection_analysis['connection_types'][client_type] = \
                    connection_analysis['connection_types'].get(client_type, 0) + 1
                
                # 检测可疑连接
                if self._is_suspicious_connection(client):
                    connection_analysis['suspicious_connections'].append(client)
            
            connection_analysis['unique_ips'] = list(connection_analysis['unique_ips'])
            
            return {
                'success': True,
                'analysis': connection_analysis,
                'raw_data': clients_info
            }
            
        except Exception as e:
            self.logger.error(f"Failed to monitor connections: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _is_suspicious_connection(self, client: Dict[str, Any]) -> bool:
        """检测可疑连接"""
        # 检查连接时间过长
        age = client.get('age', 0)
        if age > 3600:  # 超过1小时
            return True
        
        # 检查空闲时间过长
        idle = client.get('idle', 0)
        if idle > 1800:  # 空闲超过30分钟
            return True
        
        # 检查异常的客户端名称
        name = client.get('name', '')
        suspicious_names = ['admin', 'root', 'test', 'scanner']
        if any(sus_name in name.lower() for sus_name in suspicious_names):
            return True
        
        return False
    
    def kill_suspicious_connections(self) -> Dict[str, Any]:
        """终止可疑连接"""
        try:
            monitor_result = self.monitor_connections()
            if not monitor_result['success']:
                return monitor_result
            
            suspicious_connections = monitor_result['analysis']['suspicious_connections']
            killed_connections = []
            
            for client in suspicious_connections:
                client_id = client.get('id')
                if client_id:
                    try:
                        self.redis.client_kill_filter(_id=client_id)
                        killed_connections.append(client)
                    except Exception as e:
                        self.logger.warning(f"Failed to kill client {client_id}: {e}")
            
            return {
                'success': True,
                'killed_connections': killed_connections,
                'total_killed': len(killed_connections)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to kill suspicious connections: {e}")
            return {
                'success': False,
                'error': str(e)
            }

12.4 命令安全

12.4.1 危险命令管理

class RedisCommandSecurity:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 危险命令分类
        self.dangerous_commands = {
            'critical': [
                'FLUSHDB', 'FLUSHALL', 'SHUTDOWN', 'DEBUG',
                'CONFIG', 'EVAL', 'EVALSHA', 'SCRIPT'
            ],
            'administrative': [
                'CLIENT', 'MONITOR', 'SYNC', 'PSYNC',
                'REPLCONF', 'RESTORE', 'MIGRATE'
            ],
            'potentially_harmful': [
                'KEYS', 'SCAN', 'DEL', 'UNLINK', 'EXPIRE',
                'EXPIREAT', 'PEXPIRE', 'PEXPIREAT'
            ]
        }
        
        # 命令别名映射
        self.command_aliases = {
            'FLUSHDB': ['flushdb', 'FLUSHDB'],
            'FLUSHALL': ['flushall', 'FLUSHALL'],
            'CONFIG': ['config', 'CONFIG'],
            'EVAL': ['eval', 'EVAL'],
            'SCRIPT': ['script', 'SCRIPT']
        }
    
    def disable_dangerous_commands(self, command_categories: List[str] = None) -> Dict[str, Any]:
        """禁用危险命令"""
        try:
            if command_categories is None:
                command_categories = ['critical']
            
            disabled_commands = []
            
            for category in command_categories:
                if category in self.dangerous_commands:
                    commands = self.dangerous_commands[category]
                    
                    for command in commands:
                        # 重命名命令为随机字符串(实际上是禁用)
                        random_name = self._generate_random_string(32)
                        
                        try:
                            self.redis.config_set(f'rename-command {command}', random_name)
                            disabled_commands.append(command)
                        except Exception as e:
                            self.logger.warning(f"Failed to disable command {command}: {e}")
            
            # 记录命令禁用事件
            self._log_command_event('commands_disabled', {
                'disabled_commands': disabled_commands,
                'categories': command_categories,
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'disabled_commands': disabled_commands,
                'total_disabled': len(disabled_commands)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to disable dangerous commands: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def create_command_whitelist(self, allowed_commands: List[str]) -> Dict[str, Any]:
        """创建命令白名单"""
        try:
            # 获取所有Redis命令
            all_commands = self._get_all_redis_commands()
            
            # 计算需要禁用的命令
            commands_to_disable = []
            for command in all_commands:
                if command.upper() not in [cmd.upper() for cmd in allowed_commands]:
                    commands_to_disable.append(command)
            
            # 禁用不在白名单中的命令
            disabled_count = 0
            for command in commands_to_disable:
                try:
                    random_name = self._generate_random_string(32)
                    self.redis.config_set(f'rename-command {command}', random_name)
                    disabled_count += 1
                except Exception as e:
                    self.logger.warning(f"Failed to disable command {command}: {e}")
            
            # 记录白名单创建事件
            self._log_command_event('whitelist_created', {
                'allowed_commands': allowed_commands,
                'disabled_count': disabled_count,
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'allowed_commands': allowed_commands,
                'disabled_count': disabled_count,
                'total_commands': len(all_commands)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to create command whitelist: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _get_all_redis_commands(self) -> List[str]:
        """获取所有Redis命令"""
        try:
            # 这是一个基本的Redis命令列表,实际环境中可能需要更完整的列表
            return [
                'GET', 'SET', 'DEL', 'EXISTS', 'EXPIRE', 'TTL', 'TYPE',
                'MGET', 'MSET', 'INCR', 'DECR', 'INCRBY', 'DECRBY',
                'LPUSH', 'RPUSH', 'LPOP', 'RPOP', 'LLEN', 'LRANGE',
                'SADD', 'SREM', 'SMEMBERS', 'SCARD', 'SISMEMBER',
                'ZADD', 'ZREM', 'ZRANGE', 'ZCARD', 'ZSCORE',
                'HSET', 'HGET', 'HDEL', 'HKEYS', 'HVALS', 'HGETALL',
                'FLUSHDB', 'FLUSHALL', 'CONFIG', 'EVAL', 'SCRIPT',
                'MONITOR', 'CLIENT', 'DEBUG', 'SHUTDOWN'
            ]
        except Exception:
            return []
    
    def _generate_random_string(self, length: int) -> str:
        """生成随机字符串"""
        import string
        import secrets
        
        chars = string.ascii_letters + string.digits
        return ''.join(secrets.choice(chars) for _ in range(length))
    
    def monitor_command_usage(self, duration_seconds: int = 60) -> Dict[str, Any]:
        """监控命令使用情况"""
        try:
            import time
            import threading
            
            command_stats = {}
            monitoring = True
            
            def monitor_worker():
                nonlocal monitoring
                
                # 使用MONITOR命令监控Redis命令
                monitor_gen = self.redis.monitor()
                
                for command_info in monitor_gen:
                    if not monitoring:
                        break
                    
                    # 解析命令信息
                    command_parts = command_info['command'].split()
                    if command_parts:
                        command_name = command_parts[0].upper()
                        
                        if command_name not in command_stats:
                            command_stats[command_name] = {
                                'count': 0,
                                'first_seen': datetime.now().isoformat(),
                                'last_seen': None,
                                'clients': set()
                            }
                        
                        command_stats[command_name]['count'] += 1
                        command_stats[command_name]['last_seen'] = datetime.now().isoformat()
                        command_stats[command_name]['clients'].add(command_info.get('client_addr', 'unknown'))
            
            # 启动监控线程
            monitor_thread = threading.Thread(target=monitor_worker)
            monitor_thread.daemon = True
            monitor_thread.start()
            
            # 等待指定时间
            time.sleep(duration_seconds)
            
            # 停止监控
            monitoring = False
            monitor_thread.join(timeout=5)
            
            # 转换集合为列表以便JSON序列化
            for cmd_name in command_stats:
                command_stats[cmd_name]['clients'] = list(command_stats[cmd_name]['clients'])
            
            # 分析结果
            analysis = self._analyze_command_usage(command_stats)
            
            return {
                'success': True,
                'monitoring_duration': duration_seconds,
                'command_stats': command_stats,
                'analysis': analysis
            }
            
        except Exception as e:
            self.logger.error(f"Failed to monitor command usage: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _analyze_command_usage(self, command_stats: Dict[str, Any]) -> Dict[str, Any]:
        """分析命令使用情况"""
        total_commands = sum(stats['count'] for stats in command_stats.values())
        
        # 找出最常用的命令
        most_used = sorted(
            command_stats.items(),
            key=lambda x: x[1]['count'],
            reverse=True
        )[:10]
        
        # 检测危险命令使用
        dangerous_used = []
        for cmd_name, stats in command_stats.items():
            for category, commands in self.dangerous_commands.items():
                if cmd_name in commands:
                    dangerous_used.append({
                        'command': cmd_name,
                        'category': category,
                        'count': stats['count'],
                        'clients': stats['clients']
                    })
        
        return {
            'total_commands_executed': total_commands,
            'unique_commands': len(command_stats),
            'most_used_commands': most_used,
            'dangerous_commands_used': dangerous_used,
            'risk_level': 'high' if dangerous_used else 'low'
        }
    
    def _log_command_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
        """记录命令安全事件"""
        try:
            event = {
                'type': event_type,
                'timestamp': datetime.now().isoformat(),
                'data': event_data
            }
            
            import json
            self.redis.lpush('redis:command_security_events', json.dumps(event))
            
            # 只保留最近1000条事件
            self.redis.ltrim('redis:command_security_events', 0, 999)
            
        except Exception as e:
            self.logger.error(f"Failed to log command security event: {e}")

### 12.4.2 脚本安全

```python
class RedisScriptSecurity:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 脚本安全策略
        self.script_policy = {
            'max_script_size': 8192,  # 最大脚本大小(字节)
            'max_execution_time': 5000,  # 最大执行时间(毫秒)
            'allowed_functions': [
                'redis.call', 'redis.pcall', 'redis.log',
                'table.insert', 'table.remove', 'string.sub',
                'math.floor', 'math.ceil', 'tonumber', 'tostring'
            ],
            'forbidden_patterns': [
                'os\.', 'io\.', 'file', 'require', 'dofile',
                'loadfile', 'loadstring', 'debug\.',
                'package\.', 'module', 'getfenv', 'setfenv'
            ]
        }
    
    def validate_script(self, script: str) -> Dict[str, Any]:
        """验证Lua脚本安全性"""
        try:
            validation_results = {
                'is_safe': True,
                'issues': [],
                'warnings': [],
                'script_info': {
                    'size': len(script.encode('utf-8')),
                    'lines': len(script.split('\n')),
                    'functions_used': [],
                    'redis_calls': []
                }
            }
            
            # 检查脚本大小
            if validation_results['script_info']['size'] > self.script_policy['max_script_size']:
                validation_results['is_safe'] = False
                validation_results['issues'].append(
                    f"Script size ({validation_results['script_info']['size']} bytes) "
                    f"exceeds maximum allowed ({self.script_policy['max_script_size']} bytes)"
                )
            
            # 检查禁用模式
            import re
            for pattern in self.script_policy['forbidden_patterns']:
                matches = re.findall(pattern, script, re.IGNORECASE)
                if matches:
                    validation_results['is_safe'] = False
                    validation_results['issues'].append(
                        f"Forbidden pattern '{pattern}' found: {matches}"
                    )
            
            # 分析函数使用
            self._analyze_script_functions(script, validation_results)
            
            # 分析Redis调用
            self._analyze_redis_calls(script, validation_results)
            
            # 检查潜在的无限循环
            self._check_infinite_loops(script, validation_results)
            
            return {
                'success': True,
                'validation': validation_results
            }
            
        except Exception as e:
            self.logger.error(f"Failed to validate script: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _analyze_script_functions(self, script: str, results: Dict[str, Any]) -> None:
        """分析脚本中使用的函数"""
        import re
        
        # 查找函数调用
        function_pattern = r'([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)\s*\('
        functions = re.findall(function_pattern, script)
        
        results['script_info']['functions_used'] = list(set(functions))
        
        # 检查是否使用了未允许的函数
        for func in functions:
            if func not in self.script_policy['allowed_functions']:
                # 检查是否是允许的函数族
                allowed = False
                for allowed_func in self.script_policy['allowed_functions']:
                    if func.startswith(allowed_func.replace('.', '.')):
                        allowed = True
                        break
                
                if not allowed:
                    results['warnings'].append(
                        f"Potentially unsafe function used: {func}"
                    )
    
    def _analyze_redis_calls(self, script: str, results: Dict[str, Any]) -> None:
        """分析Redis调用"""
        import re
        
        # 查找redis.call和redis.pcall
        redis_call_pattern = r'redis\.(p?call)\s*\(\s*["\']([^"\']*)["\']
        calls = re.findall(redis_call_pattern, script, re.IGNORECASE)
        
        redis_calls = []
        for call_type, command in calls:
            redis_calls.append({
                'type': f'redis.{call_type}call',
                'command': command.upper()
            })
        
        results['script_info']['redis_calls'] = redis_calls
        
        # 检查危险的Redis命令
        dangerous_commands = ['FLUSHDB', 'FLUSHALL', 'CONFIG', 'EVAL', 'SCRIPT']
        for call in redis_calls:
            if call['command'] in dangerous_commands:
                results['is_safe'] = False
                results['issues'].append(
                    f"Dangerous Redis command in script: {call['command']}"
                )
    
    def _check_infinite_loops(self, script: str, results: Dict[str, Any]) -> None:
        """检查潜在的无限循环"""
        import re
        
        # 检查while循环
        while_pattern = r'while\s+.*\s+do'
        while_loops = re.findall(while_pattern, script, re.IGNORECASE)
        
        if while_loops:
            results['warnings'].append(
                f"Found {len(while_loops)} while loop(s). Ensure they have proper exit conditions."
            )
        
        # 检查for循环
        for_pattern = r'for\s+.*\s+do'
        for_loops = re.findall(for_pattern, script, re.IGNORECASE)
        
        if for_loops:
            results['script_info']['loop_count'] = len(for_loops)
    
    def execute_safe_script(self, script: str, keys: List[str] = None, 
                           args: List[str] = None) -> Dict[str, Any]:
        """安全执行脚本"""
        try:
            # 验证脚本
            validation_result = self.validate_script(script)
            if not validation_result['success']:
                return validation_result
            
            if not validation_result['validation']['is_safe']:
                return {
                    'success': False,
                    'error': 'Script failed security validation',
                    'issues': validation_result['validation']['issues']
                }
            
            # 设置执行超时
            original_timeout = None
            try:
                # 获取当前超时设置
                config = self.redis.config_get('lua-time-limit')
                if config:
                    original_timeout = config.get('lua-time-limit')
                
                # 设置脚本执行超时
                self.redis.config_set('lua-time-limit', self.script_policy['max_execution_time'])
                
                # 执行脚本
                start_time = datetime.now()
                result = self.redis.eval(script, len(keys or []), *(keys or []), *(args or []))
                execution_time = (datetime.now() - start_time).total_seconds() * 1000
                
                # 记录脚本执行事件
                self._log_script_event('script_executed', {
                    'script_hash': hashlib.sha256(script.encode()).hexdigest(),
                    'execution_time_ms': execution_time,
                    'keys_count': len(keys or []),
                    'args_count': len(args or []),
                    'timestamp': datetime.now().isoformat()
                })
                
                return {
                    'success': True,
                    'result': result,
                    'execution_time_ms': execution_time,
                    'validation': validation_result['validation']
                }
                
            finally:
                # 恢复原始超时设置
                if original_timeout is not None:
                    self.redis.config_set('lua-time-limit', original_timeout)
            
        except Exception as e:
            self.logger.error(f"Failed to execute script safely: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _log_script_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
        """记录脚本安全事件"""
        try:
            event = {
                'type': event_type,
                'timestamp': datetime.now().isoformat(),
                'data': event_data
            }
            
            import json
            self.redis.lpush('redis:script_security_events', json.dumps(event))
            
            # 只保留最近1000条事件
            self.redis.ltrim('redis:script_security_events', 0, 999)
            
        except Exception as e:
            self.logger.error(f"Failed to log script security event: {e}")

12.5 数据保护

12.5.1 数据加密

class RedisDataEncryption:
    def __init__(self, redis_client: redis.Redis, encryption_key: Optional[str] = None):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 初始化加密密钥
        if encryption_key:
            self.encryption_key = encryption_key.encode('utf-8')
        else:
            self.encryption_key = self._generate_encryption_key()
        
        # 加密配置
        self.encryption_config = {
            'algorithm': 'AES-256-GCM',
            'key_size': 32,  # 256 bits
            'iv_size': 16,   # 128 bits
            'tag_size': 16   # 128 bits
        }
    
    def _generate_encryption_key(self) -> bytes:
        """生成加密密钥"""
        from cryptography.fernet import Fernet
        return Fernet.generate_key()
    
    def encrypt_data(self, data: str) -> Dict[str, Any]:
        """加密数据"""
        try:
            from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
            from cryptography.hazmat.primitives import hashes
            from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
            import os
            import base64
            
            # 生成随机IV
            iv = os.urandom(self.encryption_config['iv_size'])
            
            # 创建加密器
            cipher = Cipher(
                algorithms.AES(self.encryption_key[:32]),  # 确保密钥长度为32字节
                modes.GCM(iv)
            )
            encryptor = cipher.encryptor()
            
            # 加密数据
            ciphertext = encryptor.update(data.encode('utf-8')) + encryptor.finalize()
            
            # 获取认证标签
            tag = encryptor.tag
            
            # 组合加密结果
            encrypted_data = {
                'ciphertext': base64.b64encode(ciphertext).decode('utf-8'),
                'iv': base64.b64encode(iv).decode('utf-8'),
                'tag': base64.b64encode(tag).decode('utf-8'),
                'algorithm': self.encryption_config['algorithm']
            }
            
            return {
                'success': True,
                'encrypted_data': encrypted_data
            }
            
        except Exception as e:
            self.logger.error(f"Failed to encrypt data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def decrypt_data(self, encrypted_data: Dict[str, Any]) -> Dict[str, Any]:
        """解密数据"""
        try:
            from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
            import base64
            
            # 解码加密数据
            ciphertext = base64.b64decode(encrypted_data['ciphertext'])
            iv = base64.b64decode(encrypted_data['iv'])
            tag = base64.b64decode(encrypted_data['tag'])
            
            # 创建解密器
            cipher = Cipher(
                algorithms.AES(self.encryption_key[:32]),
                modes.GCM(iv, tag)
            )
            decryptor = cipher.decryptor()
            
            # 解密数据
            plaintext = decryptor.update(ciphertext) + decryptor.finalize()
            
            return {
                'success': True,
                'decrypted_data': plaintext.decode('utf-8')
            }
            
        except Exception as e:
            self.logger.error(f"Failed to decrypt data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def set_encrypted(self, key: str, value: str, ex: Optional[int] = None) -> Dict[str, Any]:
        """设置加密数据"""
        try:
            # 加密数据
            encryption_result = self.encrypt_data(value)
            if not encryption_result['success']:
                return encryption_result
            
            # 存储加密数据
            import json
            encrypted_value = json.dumps(encryption_result['encrypted_data'])
            
            # 添加加密标记
            encrypted_key = f"encrypted:{key}"
            
            if ex:
                self.redis.setex(encrypted_key, ex, encrypted_value)
            else:
                self.redis.set(encrypted_key, encrypted_value)
            
            return {
                'success': True,
                'key': encrypted_key,
                'encrypted': True
            }
            
        except Exception as e:
            self.logger.error(f"Failed to set encrypted data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def get_encrypted(self, key: str) -> Dict[str, Any]:
        """获取加密数据"""
        try:
            # 获取加密数据
            encrypted_key = f"encrypted:{key}"
            encrypted_value = self.redis.get(encrypted_key)
            
            if encrypted_value is None:
                return {
                    'success': False,
                    'error': 'Key not found'
                }
            
            # 解析加密数据
            import json
            encrypted_data = json.loads(encrypted_value)
            
            # 解密数据
            decryption_result = self.decrypt_data(encrypted_data)
            if not decryption_result['success']:
                return decryption_result
            
            return {
                'success': True,
                'value': decryption_result['decrypted_data'],
                'encrypted': True
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get encrypted data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def rotate_encryption_key(self, new_key: Optional[str] = None) -> Dict[str, Any]:
        """轮换加密密钥"""
        try:
            # 生成新密钥
            if new_key:
                new_encryption_key = new_key.encode('utf-8')
            else:
                new_encryption_key = self._generate_encryption_key()
            
            # 获取所有加密的键
            encrypted_keys = self.redis.keys('encrypted:*')
            
            rotated_keys = []
            failed_keys = []
            
            for encrypted_key in encrypted_keys:
                try:
                    # 获取并解密数据(使用旧密钥)
                    encrypted_value = self.redis.get(encrypted_key)
                    if encrypted_value:
                        import json
                        encrypted_data = json.loads(encrypted_value)
                        
                        # 使用旧密钥解密
                        decryption_result = self.decrypt_data(encrypted_data)
                        if decryption_result['success']:
                            # 使用新密钥加密
                            old_key = self.encryption_key
                            self.encryption_key = new_encryption_key
                            
                            encryption_result = self.encrypt_data(decryption_result['decrypted_data'])
                            if encryption_result['success']:
                                # 更新存储的数据
                                new_encrypted_value = json.dumps(encryption_result['encrypted_data'])
                                self.redis.set(encrypted_key, new_encrypted_value)
                                rotated_keys.append(encrypted_key)
                            else:
                                # 恢复旧密钥
                                self.encryption_key = old_key
                                failed_keys.append(encrypted_key)
                        else:
                            failed_keys.append(encrypted_key)
                
                except Exception as e:
                    self.logger.warning(f"Failed to rotate key for {encrypted_key}: {e}")
                    failed_keys.append(encrypted_key)
            
            # 记录密钥轮换事件
            self._log_encryption_event('key_rotated', {
                'rotated_keys_count': len(rotated_keys),
                'failed_keys_count': len(failed_keys),
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'rotated_keys': rotated_keys,
                'failed_keys': failed_keys,
                'total_processed': len(encrypted_keys)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to rotate encryption key: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _log_encryption_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
        """记录加密事件"""
        try:
            event = {
                'type': event_type,
                'timestamp': datetime.now().isoformat(),
                'data': event_data
            }
            
            import json
            self.redis.lpush('redis:encryption_events', json.dumps(event))
            
            # 只保留最近1000条事件
            self.redis.ltrim('redis:encryption_events', 0, 999)
            
        except Exception as e:
            self.logger.error(f"Failed to log encryption event: {e}")

### 12.5.2 敏感数据处理

```python
class RedisSensitiveDataManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 敏感数据模式
        self.sensitive_patterns = {
            'credit_card': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
            'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'\b\d{3}-\d{3}-\d{4}\b',
            'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
            'password': r'(?i)password[\s]*[:=][\s]*[\S]+'
        }
        
        # 数据分类
        self.data_classification = {
            'public': {'retention_days': 365, 'encryption_required': False},
            'internal': {'retention_days': 180, 'encryption_required': True},
            'confidential': {'retention_days': 90, 'encryption_required': True},
            'restricted': {'retention_days': 30, 'encryption_required': True}
        }
    
    def classify_data(self, data: str) -> Dict[str, Any]:
        """分类数据敏感性"""
        try:
            import re
            
            classification_result = {
                'classification': 'public',
                'sensitive_patterns_found': [],
                'risk_score': 0,
                'recommendations': []
            }
            
            # 检查敏感模式
            for pattern_name, pattern in self.sensitive_patterns.items():
                matches = re.findall(pattern, data, re.IGNORECASE)
                if matches:
                    classification_result['sensitive_patterns_found'].append({
                        'pattern': pattern_name,
                        'matches_count': len(matches),
                        'sample_matches': matches[:3]  # 只显示前3个匹配
                    })
                    
                    # 增加风险分数
                    if pattern_name in ['credit_card', 'ssn']:
                        classification_result['risk_score'] += 50
                    elif pattern_name in ['email', 'phone']:
                        classification_result['risk_score'] += 20
                    else:
                        classification_result['risk_score'] += 10
            
            # 确定分类级别
            if classification_result['risk_score'] >= 80:
                classification_result['classification'] = 'restricted'
            elif classification_result['risk_score'] >= 50:
                classification_result['classification'] = 'confidential'
            elif classification_result['risk_score'] >= 20:
                classification_result['classification'] = 'internal'
            
            # 生成建议
            classification_result['recommendations'] = self._generate_recommendations(
                classification_result
            )
            
            return {
                'success': True,
                'classification': classification_result
            }
            
        except Exception as e:
            self.logger.error(f"Failed to classify data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _generate_recommendations(self, classification_result: Dict[str, Any]) -> List[str]:
        """生成安全建议"""
        recommendations = []
        
        classification = classification_result['classification']
        config = self.data_classification[classification]
        
        if config['encryption_required']:
            recommendations.append('Enable encryption for this data')
        
        recommendations.append(
            f"Set retention period to {config['retention_days']} days"
        )
        
        if classification_result['sensitive_patterns_found']:
            recommendations.append('Consider data masking or tokenization')
            recommendations.append('Implement access logging and monitoring')
        
        if classification_result['risk_score'] >= 50:
            recommendations.append('Restrict access to authorized personnel only')
            recommendations.append('Enable audit logging')
        
        return recommendations
    
    def mask_sensitive_data(self, data: str, mask_char: str = '*') -> Dict[str, Any]:
        """掩码敏感数据"""
        try:
            import re
            
            masked_data = data
            masking_applied = []
            
            # 掩码信用卡号
            credit_card_pattern = r'\b(\d{4})([-\s]?\d{4}[-\s]?\d{4}[-\s]?)(\d{4})\b'
            def mask_credit_card(match):
                return f"{match.group(1)}{match.group(2).replace(match.group(2)[1:-1], mask_char * (len(match.group(2)) - 2))}{match.group(3)}"
            
            if re.search(credit_card_pattern, masked_data):
                masked_data = re.sub(credit_card_pattern, mask_credit_card, masked_data)
                masking_applied.append('credit_card')
            
            # 掩码SSN
            ssn_pattern = r'\b(\d{3})-(\d{2})-(\d{4})\b'
            def mask_ssn(match):
                return f"XXX-XX-{match.group(3)}"
            
            if re.search(ssn_pattern, masked_data):
                masked_data = re.sub(ssn_pattern, mask_ssn, masked_data)
                masking_applied.append('ssn')
            
            # 掩码邮箱
            email_pattern = r'\b([A-Za-z0-9._%+-]+)@([A-Za-z0-9.-]+\.[A-Z|a-z]{2,})\b'
            def mask_email(match):
                username = match.group(1)
                domain = match.group(2)
                if len(username) > 2:
                    masked_username = username[0] + mask_char * (len(username) - 2) + username[-1]
                else:
                    masked_username = mask_char * len(username)
                return f"{masked_username}@{domain}"
            
            if re.search(email_pattern, masked_data):
                masked_data = re.sub(email_pattern, mask_email, masked_data)
                masking_applied.append('email')
            
            return {
                'success': True,
                'masked_data': masked_data,
                'masking_applied': masking_applied,
                'original_length': len(data),
                'masked_length': len(masked_data)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to mask sensitive data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def set_with_classification(self, key: str, value: str, 
                              classification: Optional[str] = None) -> Dict[str, Any]:
        """根据分类设置数据"""
        try:
            # 自动分类数据(如果未提供分类)
            if classification is None:
                classify_result = self.classify_data(value)
                if not classify_result['success']:
                    return classify_result
                classification = classify_result['classification']['classification']
            
            # 获取分类配置
            if classification not in self.data_classification:
                return {
                    'success': False,
                    'error': f'Unknown classification: {classification}'
                }
            
            config = self.data_classification[classification]
            
            # 设置数据
            classified_key = f"{classification}:{key}"
            
            # 设置过期时间
            retention_seconds = config['retention_days'] * 24 * 3600
            self.redis.setex(classified_key, retention_seconds, value)
            
            # 记录数据分类事件
            self._log_data_event('data_classified', {
                'key': classified_key,
                'classification': classification,
                'retention_days': config['retention_days'],
                'encryption_required': config['encryption_required'],
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'key': classified_key,
                'classification': classification,
                'retention_days': config['retention_days'],
                'expires_at': (datetime.now() + timedelta(days=config['retention_days'])).isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to set classified data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def audit_sensitive_data(self) -> Dict[str, Any]:
        """审计敏感数据"""
        try:
            audit_results = {
                'total_keys': 0,
                'classified_data': {},
                'unclassified_data': [],
                'expiring_soon': [],
                'recommendations': []
            }
            
            # 获取所有键
            all_keys = self.redis.keys('*')
            audit_results['total_keys'] = len(all_keys)
            
            # 分析分类数据
            for classification in self.data_classification.keys():
                classified_keys = self.redis.keys(f'{classification}:*')
                audit_results['classified_data'][classification] = {
                    'count': len(classified_keys),
                    'keys': classified_keys[:10]  # 只显示前10个键
                }
            
            # 检查即将过期的数据
            for key in all_keys:
                ttl = self.redis.ttl(key)
                if 0 < ttl < 86400:  # 24小时内过期
                    audit_results['expiring_soon'].append({
                        'key': key,
                        'ttl_seconds': ttl,
                        'expires_at': (datetime.now() + timedelta(seconds=ttl)).isoformat()
                    })
            
            # 检查未分类数据
            for key in all_keys:
                if not any(key.startswith(f'{cls}:') for cls in self.data_classification.keys()):
                    # 检查是否包含敏感数据
                    try:
                        value = self.redis.get(key)
                        if value and isinstance(value, (str, bytes)):
                            if isinstance(value, bytes):
                                value = value.decode('utf-8', errors='ignore')
                            
                            classify_result = self.classify_data(value)
                            if (classify_result['success'] and 
                                classify_result['classification']['classification'] != 'public'):
                                audit_results['unclassified_data'].append({
                                    'key': key,
                                    'suggested_classification': classify_result['classification']['classification'],
                                    'risk_score': classify_result['classification']['risk_score']
                                })
                    except Exception:
                        continue
            
            # 生成建议
            if audit_results['unclassified_data']:
                audit_results['recommendations'].append(
                    f"Classify {len(audit_results['unclassified_data'])} unclassified sensitive keys"
                )
            
            if audit_results['expiring_soon']:
                audit_results['recommendations'].append(
                    f"Review {len(audit_results['expiring_soon'])} keys expiring within 24 hours"
                )
            
            return {
                'success': True,
                'audit': audit_results,
                'timestamp': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to audit sensitive data: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _log_data_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
        """记录数据保护事件"""
        try:
            event = {
                'type': event_type,
                'timestamp': datetime.now().isoformat(),
                'data': event_data
            }
            
            import json
            self.redis.lpush('redis:data_protection_events', json.dumps(event))
            
            # 只保留最近1000条事件
            self.redis.ltrim('redis:data_protection_events', 0, 999)
            
        except Exception as e:
             self.logger.error(f"Failed to log data protection event: {e}")

12.6 监控和审计

12.6.1 安全监控

class RedisSecurityMonitor:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 监控配置
        self.monitor_config = {
            'max_failed_attempts': 5,
            'lockout_duration': 300,  # 5分钟
            'suspicious_commands': [
                'FLUSHDB', 'FLUSHALL', 'CONFIG', 'EVAL',
                'SCRIPT', 'DEBUG', 'SHUTDOWN'
            ],
            'rate_limits': {
                'commands_per_minute': 1000,
                'connections_per_minute': 100
            }
        }
        
        # 告警阈值
        self.alert_thresholds = {
            'failed_auth_rate': 10,  # 每分钟失败认证次数
            'suspicious_command_rate': 5,  # 每分钟可疑命令次数
            'memory_usage_percent': 90,  # 内存使用率
            'connection_count': 1000,  # 连接数
            'slow_query_threshold': 1000  # 慢查询阈值(毫秒)
        }
    
    def start_monitoring(self) -> Dict[str, Any]:
        """启动安全监控"""
        try:
            import threading
            import time
            
            # 启动监控线程
            monitor_threads = [
                threading.Thread(target=self._monitor_authentication, daemon=True),
                threading.Thread(target=self._monitor_commands, daemon=True),
                threading.Thread(target=self._monitor_connections, daemon=True),
                threading.Thread(target=self._monitor_performance, daemon=True)
            ]
            
            for thread in monitor_threads:
                thread.start()
            
            # 记录监控启动事件
            self._log_security_event('monitoring_started', {
                'monitor_threads': len(monitor_threads),
                'timestamp': datetime.now().isoformat()
            })
            
            return {
                'success': True,
                'message': 'Security monitoring started',
                'active_monitors': len(monitor_threads)
            }
            
        except Exception as e:
            self.logger.error(f"Failed to start monitoring: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _monitor_authentication(self) -> None:
        """监控认证事件"""
        try:
            import time
            
            while True:
                # 检查失败的认证尝试
                failed_attempts = self._get_failed_auth_attempts()
                
                if failed_attempts > self.alert_thresholds['failed_auth_rate']:
                    self._trigger_alert('high_failed_auth_rate', {
                        'failed_attempts': failed_attempts,
                        'threshold': self.alert_thresholds['failed_auth_rate'],
                        'timestamp': datetime.now().isoformat()
                    })
                
                time.sleep(60)  # 每分钟检查一次
                
        except Exception as e:
            self.logger.error(f"Authentication monitoring error: {e}")
    
    def _monitor_commands(self) -> None:
        """监控命令执行"""
        try:
            import time
            
            while True:
                # 检查可疑命令执行
                suspicious_commands = self._get_suspicious_commands()
                
                if len(suspicious_commands) > self.alert_thresholds['suspicious_command_rate']:
                    self._trigger_alert('suspicious_commands_detected', {
                        'commands': suspicious_commands,
                        'count': len(suspicious_commands),
                        'threshold': self.alert_thresholds['suspicious_command_rate'],
                        'timestamp': datetime.now().isoformat()
                    })
                
                time.sleep(60)  # 每分钟检查一次
                
        except Exception as e:
            self.logger.error(f"Command monitoring error: {e}")
    
    def _monitor_connections(self) -> None:
        """监控连接状态"""
        try:
            import time
            
            while True:
                # 获取连接信息
                client_info = self.redis.client_list()
                connection_count = len(client_info)
                
                if connection_count > self.alert_thresholds['connection_count']:
                    self._trigger_alert('high_connection_count', {
                        'connection_count': connection_count,
                        'threshold': self.alert_thresholds['connection_count'],
                        'timestamp': datetime.now().isoformat()
                    })
                
                # 检查异常连接
                suspicious_connections = self._analyze_connections(client_info)
                if suspicious_connections:
                    self._trigger_alert('suspicious_connections', {
                        'connections': suspicious_connections,
                        'timestamp': datetime.now().isoformat()
                    })
                
                time.sleep(30)  # 每30秒检查一次
                
        except Exception as e:
            self.logger.error(f"Connection monitoring error: {e}")
    
    def _monitor_performance(self) -> None:
        """监控性能指标"""
        try:
            import time
            
            while True:
                # 获取内存使用情况
                info = self.redis.info('memory')
                used_memory = info.get('used_memory', 0)
                max_memory = info.get('maxmemory', 0)
                
                if max_memory > 0:
                    memory_usage_percent = (used_memory / max_memory) * 100
                    
                    if memory_usage_percent > self.alert_thresholds['memory_usage_percent']:
                        self._trigger_alert('high_memory_usage', {
                            'memory_usage_percent': memory_usage_percent,
                            'used_memory': used_memory,
                            'max_memory': max_memory,
                            'threshold': self.alert_thresholds['memory_usage_percent'],
                            'timestamp': datetime.now().isoformat()
                        })
                
                # 检查慢查询
                slow_queries = self._get_slow_queries()
                if slow_queries:
                    self._trigger_alert('slow_queries_detected', {
                        'slow_queries': slow_queries,
                        'count': len(slow_queries),
                        'timestamp': datetime.now().isoformat()
                    })
                
                time.sleep(60)  # 每分钟检查一次
                
        except Exception as e:
            self.logger.error(f"Performance monitoring error: {e}")
    
    def _get_failed_auth_attempts(self) -> int:
        """获取失败的认证尝试次数"""
        try:
            # 从安全事件日志中统计失败认证
            events = self.redis.lrange('redis:security_events', 0, -1)
            failed_attempts = 0
            
            current_time = datetime.now()
            one_minute_ago = current_time - timedelta(minutes=1)
            
            for event_data in events:
                try:
                    import json
                    event = json.loads(event_data)
                    
                    event_time = datetime.fromisoformat(event['timestamp'])
                    if (event_time >= one_minute_ago and 
                        event['type'] == 'auth_failed'):
                        failed_attempts += 1
                        
                except Exception:
                    continue
            
            return failed_attempts
            
        except Exception as e:
            self.logger.error(f"Failed to get auth attempts: {e}")
            return 0
    
    def _get_suspicious_commands(self) -> List[Dict[str, Any]]:
        """获取可疑命令执行记录"""
        try:
            # 从命令安全事件日志中获取可疑命令
            events = self.redis.lrange('redis:command_security_events', 0, -1)
            suspicious_commands = []
            
            current_time = datetime.now()
            one_minute_ago = current_time - timedelta(minutes=1)
            
            for event_data in events:
                try:
                    import json
                    event = json.loads(event_data)
                    
                    event_time = datetime.fromisoformat(event['timestamp'])
                    if (event_time >= one_minute_ago and 
                        event['type'] == 'suspicious_command'):
                        suspicious_commands.append(event['data'])
                        
                except Exception:
                    continue
            
            return suspicious_commands
            
        except Exception as e:
            self.logger.error(f"Failed to get suspicious commands: {e}")
            return []
    
    def _analyze_connections(self, client_info: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """分析连接信息,识别可疑连接"""
        suspicious_connections = []
        
        try:
            for client in client_info:
                # 检查连接时间过长的客户端
                age = client.get('age', 0)
                if age > 3600:  # 超过1小时
                    suspicious_connections.append({
                        'type': 'long_connection',
                        'client_id': client.get('id'),
                        'addr': client.get('addr'),
                        'age': age,
                        'reason': 'Connection age exceeds 1 hour'
                    })
                
                # 检查空闲时间过长的客户端
                idle = client.get('idle', 0)
                if idle > 1800:  # 空闲超过30分钟
                    suspicious_connections.append({
                        'type': 'idle_connection',
                        'client_id': client.get('id'),
                        'addr': client.get('addr'),
                        'idle': idle,
                        'reason': 'Connection idle time exceeds 30 minutes'
                    })
                
                # 检查异常的客户端名称
                name = client.get('name', '')
                if name and any(suspicious in name.lower() for suspicious in ['bot', 'crawler', 'scanner']):
                    suspicious_connections.append({
                        'type': 'suspicious_name',
                        'client_id': client.get('id'),
                        'addr': client.get('addr'),
                        'name': name,
                        'reason': 'Suspicious client name detected'
                    })
            
            return suspicious_connections
            
        except Exception as e:
            self.logger.error(f"Failed to analyze connections: {e}")
            return []
    
    def _get_slow_queries(self) -> List[Dict[str, Any]]:
        """获取慢查询记录"""
        try:
            slow_queries = []
            
            # 获取慢查询日志
            slowlog = self.redis.slowlog_get(10)  # 获取最近10条慢查询
            
            for entry in slowlog:
                if entry['duration'] > self.alert_thresholds['slow_query_threshold'] * 1000:  # 转换为微秒
                    slow_queries.append({
                        'id': entry['id'],
                        'timestamp': entry['start_time'],
                        'duration_ms': entry['duration'] / 1000,  # 转换为毫秒
                        'command': ' '.join(entry['command']),
                        'client_addr': entry.get('client_addr', 'unknown')
                    })
            
            return slow_queries
            
        except Exception as e:
            self.logger.error(f"Failed to get slow queries: {e}")
            return []
    
    def _trigger_alert(self, alert_type: str, alert_data: Dict[str, Any]) -> None:
        """触发安全告警"""
        try:
            alert = {
                'type': alert_type,
                'severity': self._get_alert_severity(alert_type),
                'timestamp': datetime.now().isoformat(),
                'data': alert_data
            }
            
            # 记录告警
            import json
            self.redis.lpush('redis:security_alerts', json.dumps(alert))
            
            # 只保留最近1000条告警
            self.redis.ltrim('redis:security_alerts', 0, 999)
            
            # 记录安全事件
            self._log_security_event('alert_triggered', alert)
            
            # 发送通知(这里可以集成邮件、短信等通知方式)
            self.logger.warning(f"Security alert triggered: {alert_type} - {alert_data}")
            
        except Exception as e:
            self.logger.error(f"Failed to trigger alert: {e}")
    
    def _get_alert_severity(self, alert_type: str) -> str:
        """获取告警严重级别"""
        severity_map = {
            'high_failed_auth_rate': 'high',
            'suspicious_commands_detected': 'critical',
            'high_connection_count': 'medium',
            'suspicious_connections': 'medium',
            'high_memory_usage': 'high',
            'slow_queries_detected': 'low'
        }
        
        return severity_map.get(alert_type, 'medium')
    
    def get_security_dashboard(self) -> Dict[str, Any]:
        """获取安全仪表板数据"""
        try:
            # 获取最近的告警
            recent_alerts = self.redis.lrange('redis:security_alerts', 0, 9)
            alerts = []
            
            for alert_data in recent_alerts:
                try:
                    import json
                    alert = json.loads(alert_data)
                    alerts.append(alert)
                except Exception:
                    continue
            
            # 获取系统状态
            info = self.redis.info()
            
            # 获取连接统计
            client_info = self.redis.client_list()
            
            # 计算统计信息
            dashboard_data = {
                'system_status': {
                    'uptime_seconds': info.get('uptime_in_seconds', 0),
                    'connected_clients': info.get('connected_clients', 0),
                    'used_memory_human': info.get('used_memory_human', '0B'),
                    'total_commands_processed': info.get('total_commands_processed', 0),
                    'keyspace_hits': info.get('keyspace_hits', 0),
                    'keyspace_misses': info.get('keyspace_misses', 0)
                },
                'security_metrics': {
                    'recent_alerts_count': len(alerts),
                    'critical_alerts': len([a for a in alerts if a.get('severity') == 'critical']),
                    'high_alerts': len([a for a in alerts if a.get('severity') == 'high']),
                    'active_connections': len(client_info),
                    'failed_auth_attempts_last_hour': self._get_failed_auth_attempts_last_hour()
                },
                'recent_alerts': alerts,
                'timestamp': datetime.now().isoformat()
            }
            
            return {
                'success': True,
                'dashboard': dashboard_data
            }
            
        except Exception as e:
            self.logger.error(f"Failed to get security dashboard: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _get_failed_auth_attempts_last_hour(self) -> int:
        """获取最近一小时的失败认证尝试次数"""
        try:
            events = self.redis.lrange('redis:security_events', 0, -1)
            failed_attempts = 0
            
            current_time = datetime.now()
            one_hour_ago = current_time - timedelta(hours=1)
            
            for event_data in events:
                try:
                    import json
                    event = json.loads(event_data)
                    
                    event_time = datetime.fromisoformat(event['timestamp'])
                    if (event_time >= one_hour_ago and 
                        event['type'] == 'auth_failed'):
                        failed_attempts += 1
                        
                except Exception:
                    continue
            
            return failed_attempts
            
        except Exception as e:
            self.logger.error(f"Failed to get failed auth attempts: {e}")
            return 0
    
    def _log_security_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
        """记录安全事件"""
        try:
            event = {
                'type': event_type,
                'timestamp': datetime.now().isoformat(),
                'data': event_data
            }
            
            import json
            self.redis.lpush('redis:security_events', json.dumps(event))
            
            # 只保留最近1000条事件
            self.redis.ltrim('redis:security_events', 0, 999)
            
        except Exception as e:
            self.logger.error(f"Failed to log security event: {e}")

### 12.6.2 审计日志

```python
class RedisAuditLogger:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 审计配置
        self.audit_config = {
            'log_all_commands': False,
            'log_sensitive_commands': True,
            'log_failed_operations': True,
            'log_admin_operations': True,
            'retention_days': 90
        }
        
        # 需要审计的敏感命令
        self.sensitive_commands = [
            'AUTH', 'CONFIG', 'FLUSHDB', 'FLUSHALL',
            'EVAL', 'EVALSHA', 'SCRIPT', 'DEBUG',
            'SHUTDOWN', 'CLIENT', 'MONITOR'
        ]
    
    def log_command_execution(self, command: str, args: List[str], 
                            client_info: Dict[str, Any], 
                            result: Any = None, 
                            error: Optional[str] = None) -> None:
        """记录命令执行"""
        try:
            command_upper = command.upper()
            
            # 判断是否需要记录
            should_log = (
                self.audit_config['log_all_commands'] or
                (self.audit_config['log_sensitive_commands'] and command_upper in self.sensitive_commands) or
                (self.audit_config['log_failed_operations'] and error is not None) or
                (self.audit_config['log_admin_operations'] and self._is_admin_command(command_upper))
            )
            
            if not should_log:
                return
            
            # 创建审计记录
            audit_record = {
                'timestamp': datetime.now().isoformat(),
                'command': command_upper,
                'args': self._sanitize_args(command_upper, args),
                'client_info': {
                    'addr': client_info.get('addr', 'unknown'),
                    'name': client_info.get('name', ''),
                    'user': client_info.get('user', 'default')
                },
                'execution_time_ms': client_info.get('execution_time', 0),
                'success': error is None,
                'error': error,
                'result_type': type(result).__name__ if result is not None else None
            }
            
            # 存储审计记录
            import json
            audit_key = f"audit:{datetime.now().strftime('%Y-%m-%d')}"
            self.redis.lpush(audit_key, json.dumps(audit_record))
            
            # 设置过期时间
            retention_seconds = self.audit_config['retention_days'] * 24 * 3600
            self.redis.expire(audit_key, retention_seconds)
            
        except Exception as e:
            self.logger.error(f"Failed to log command execution: {e}")
    
    def _is_admin_command(self, command: str) -> bool:
        """判断是否为管理命令"""
        admin_commands = [
            'CONFIG', 'CLIENT', 'DEBUG', 'MONITOR',
            'SHUTDOWN', 'SAVE', 'BGSAVE', 'LASTSAVE',
            'SLAVEOF', 'ROLE', 'SYNC', 'PSYNC'
        ]
        return command in admin_commands
    
    def _sanitize_args(self, command: str, args: List[str]) -> List[str]:
        """清理敏感参数"""
        try:
            sanitized_args = args.copy()
            
            # 对AUTH命令的密码参数进行脱敏
            if command == 'AUTH' and len(sanitized_args) > 0:
                sanitized_args[0] = '***REDACTED***'
            
            # 对CONFIG SET命令中的敏感配置进行脱敏
            elif command == 'CONFIG' and len(sanitized_args) >= 3 and sanitized_args[0].upper() == 'SET':
                sensitive_configs = ['requirepass', 'masterauth']
                if sanitized_args[1].lower() in sensitive_configs:
                    sanitized_args[2] = '***REDACTED***'
            
            return sanitized_args
            
        except Exception:
            return ['***ERROR_SANITIZING***']
    
    def search_audit_logs(self, start_date: Optional[str] = None, 
                         end_date: Optional[str] = None,
                         command: Optional[str] = None,
                         client_addr: Optional[str] = None,
                         user: Optional[str] = None,
                         failed_only: bool = False) -> Dict[str, Any]:
        """搜索审计日志"""
        try:
            # 确定搜索日期范围
            if start_date is None:
                start_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
            if end_date is None:
                end_date = datetime.now().strftime('%Y-%m-%d')
            
            # 生成日期列表
            search_dates = []
            current_date = datetime.strptime(start_date, '%Y-%m-%d')
            end_date_obj = datetime.strptime(end_date, '%Y-%m-%d')
            
            while current_date <= end_date_obj:
                search_dates.append(current_date.strftime('%Y-%m-%d'))
                current_date += timedelta(days=1)
            
            # 搜索审计记录
            matching_records = []
            total_records = 0
            
            for date in search_dates:
                audit_key = f"audit:{date}"
                records = self.redis.lrange(audit_key, 0, -1)
                
                for record_data in records:
                    try:
                        import json
                        record = json.loads(record_data)
                        total_records += 1
                        
                        # 应用过滤条件
                        if command and record['command'] != command.upper():
                            continue
                        
                        if client_addr and client_addr not in record['client_info']['addr']:
                            continue
                        
                        if user and record['client_info']['user'] != user:
                            continue
                        
                        if failed_only and record['success']:
                            continue
                        
                        matching_records.append(record)
                        
                    except Exception:
                        continue
            
            # 按时间排序(最新的在前)
            matching_records.sort(key=lambda x: x['timestamp'], reverse=True)
            
            return {
                'success': True,
                'search_criteria': {
                    'start_date': start_date,
                    'end_date': end_date,
                    'command': command,
                    'client_addr': client_addr,
                    'user': user,
                    'failed_only': failed_only
                },
                'total_records_searched': total_records,
                'matching_records_count': len(matching_records),
                'records': matching_records[:100]  # 限制返回前100条记录
            }
            
        except Exception as e:
            self.logger.error(f"Failed to search audit logs: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def generate_audit_report(self, start_date: Optional[str] = None, 
                            end_date: Optional[str] = None) -> Dict[str, Any]:
        """生成审计报告"""
        try:
            # 搜索所有审计记录
            search_result = self.search_audit_logs(start_date, end_date)
            if not search_result['success']:
                return search_result
            
            records = search_result['records']
            
            # 统计分析
            report = {
                'period': {
                    'start_date': search_result['search_criteria']['start_date'],
                    'end_date': search_result['search_criteria']['end_date']
                },
                'summary': {
                    'total_operations': len(records),
                    'successful_operations': len([r for r in records if r['success']]),
                    'failed_operations': len([r for r in records if not r['success']]),
                    'unique_clients': len(set(r['client_info']['addr'] for r in records)),
                    'unique_users': len(set(r['client_info']['user'] for r in records))
                },
                'command_statistics': {},
                'client_statistics': {},
                'user_statistics': {},
                'failed_operations': [],
                'suspicious_activities': []
            }
            
            # 命令统计
            command_counts = {}
            for record in records:
                command = record['command']
                if command not in command_counts:
                    command_counts[command] = {'total': 0, 'failed': 0}
                command_counts[command]['total'] += 1
                if not record['success']:
                    command_counts[command]['failed'] += 1
            
            report['command_statistics'] = dict(sorted(
                command_counts.items(), 
                key=lambda x: x[1]['total'], 
                reverse=True
            )[:10])  # 前10个最常用命令
            
            # 客户端统计
            client_counts = {}
            for record in records:
                client = record['client_info']['addr']
                if client not in client_counts:
                    client_counts[client] = {'total': 0, 'failed': 0}
                client_counts[client]['total'] += 1
                if not record['success']:
                    client_counts[client]['failed'] += 1
            
            report['client_statistics'] = dict(sorted(
                client_counts.items(), 
                key=lambda x: x[1]['total'], 
                reverse=True
            )[:10])  # 前10个最活跃客户端
            
            # 用户统计
            user_counts = {}
            for record in records:
                user = record['client_info']['user']
                if user not in user_counts:
                    user_counts[user] = {'total': 0, 'failed': 0}
                user_counts[user]['total'] += 1
                if not record['success']:
                    user_counts[user]['failed'] += 1
            
            report['user_statistics'] = user_counts
            
            # 失败操作
            report['failed_operations'] = [
                r for r in records if not r['success']
            ][:20]  # 最近20个失败操作
            
            # 可疑活动检测
            report['suspicious_activities'] = self._detect_suspicious_activities(records)
            
            return {
                'success': True,
                'report': report,
                'generated_at': datetime.now().isoformat()
            }
            
        except Exception as e:
            self.logger.error(f"Failed to generate audit report: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _detect_suspicious_activities(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """检测可疑活动"""
        suspicious_activities = []
        
        try:
            # 检测频繁失败的客户端
            client_failures = {}
            for record in records:
                if not record['success']:
                    client = record['client_info']['addr']
                    if client not in client_failures:
                        client_failures[client] = 0
                    client_failures[client] += 1
            
            for client, failure_count in client_failures.items():
                if failure_count >= 10:  # 失败次数超过10次
                    suspicious_activities.append({
                        'type': 'frequent_failures',
                        'client': client,
                        'failure_count': failure_count,
                        'description': f'Client {client} has {failure_count} failed operations'
                    })
            
            # 检测异常时间的操作
            for record in records:
                record_time = datetime.fromisoformat(record['timestamp'])
                hour = record_time.hour
                
                # 检测深夜操作(凌晨2-6点)
                if 2 <= hour <= 6 and record['command'] in self.sensitive_commands:
                    suspicious_activities.append({
                        'type': 'off_hours_sensitive_operation',
                        'timestamp': record['timestamp'],
                        'command': record['command'],
                        'client': record['client_info']['addr'],
                        'description': f'Sensitive command {record["command"]} executed during off hours'
                    })
            
            # 检测大量数据删除操作
            deletion_commands = ['DEL', 'UNLINK', 'FLUSHDB', 'FLUSHALL']
            deletion_count = len([r for r in records if r['command'] in deletion_commands])
            
            if deletion_count >= 50:  # 删除操作超过50次
                suspicious_activities.append({
                    'type': 'excessive_deletions',
                    'deletion_count': deletion_count,
                    'description': f'Excessive deletion operations detected: {deletion_count} operations'
                })
            
            return suspicious_activities
            
        except Exception as e:
            self.logger.error(f"Failed to detect suspicious activities: {e}")
            return []

12.7 最佳实践

12.7.1 安全配置清单

class RedisSecurityChecklist:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.logger = logging.getLogger(__name__)
        
        # 安全检查项
        self.security_checks = {
            'authentication': {
                'password_set': 'Check if password authentication is enabled',
                'strong_password': 'Verify password strength',
                'acl_enabled': 'Check if ACL is configured'
            },
            'network': {
                'bind_address': 'Verify bind address configuration',
                'protected_mode': 'Check if protected mode is enabled',
                'tls_enabled': 'Verify TLS/SSL encryption',
                'port_security': 'Check if default port is changed'
            },
            'commands': {
                'dangerous_commands_disabled': 'Verify dangerous commands are disabled',
                'command_renaming': 'Check if sensitive commands are renamed',
                'script_security': 'Verify Lua script security settings'
            },
            'data_protection': {
                'encryption_at_rest': 'Check data encryption configuration',
                'sensitive_data_handling': 'Verify sensitive data protection',
                'backup_security': 'Check backup encryption and access controls'
            },
            'monitoring': {
                'logging_enabled': 'Verify audit logging is configured',
                'monitoring_setup': 'Check security monitoring configuration',
                'alerting_configured': 'Verify security alerting is set up'
            },
            'system': {
                'file_permissions': 'Check Redis file permissions',
                'user_privileges': 'Verify Redis runs with minimal privileges',
                'system_updates': 'Check for security updates'
            }
        }
    
    def run_security_assessment(self) -> Dict[str, Any]:
        """运行完整的安全评估"""
        try:
            assessment_results = {
                'overall_score': 0,
                'max_score': 0,
                'categories': {},
                'recommendations': [],
                'critical_issues': [],
                'timestamp': datetime.now().isoformat()
            }
            
            # 执行各类安全检查
            for category, checks in self.security_checks.items():
                category_result = self._assess_category(category, checks)
                assessment_results['categories'][category] = category_result
                
                # 累计分数
                assessment_results['overall_score'] += category_result['score']
                assessment_results['max_score'] += category_result['max_score']
                
                # 收集建议和关键问题
                assessment_results['recommendations'].extend(category_result['recommendations'])
                assessment_results['critical_issues'].extend(category_result['critical_issues'])
            
            # 计算总体安全等级
            if assessment_results['max_score'] > 0:
                score_percentage = (assessment_results['overall_score'] / assessment_results['max_score']) * 100
                assessment_results['security_level'] = self._get_security_level(score_percentage)
                assessment_results['score_percentage'] = score_percentage
            else:
                assessment_results['security_level'] = 'unknown'
                assessment_results['score_percentage'] = 0
            
            return {
                'success': True,
                'assessment': assessment_results
            }
            
        except Exception as e:
            self.logger.error(f"Failed to run security assessment: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _assess_category(self, category: str, checks: Dict[str, str]) -> Dict[str, Any]:
        """评估特定类别的安全性"""
        category_result = {
            'score': 0,
            'max_score': len(checks),
            'passed_checks': [],
            'failed_checks': [],
            'recommendations': [],
            'critical_issues': []
        }
        
        # 执行具体的安全检查
        for check_name, check_description in checks.items():
            check_result = self._execute_security_check(category, check_name)
            
            if check_result['passed']:
                category_result['score'] += 1
                category_result['passed_checks'].append({
                    'name': check_name,
                    'description': check_description
                })
            else:
                category_result['failed_checks'].append({
                    'name': check_name,
                    'description': check_description,
                    'details': check_result.get('details', '')
                })
                
                # 添加建议
                if check_result.get('recommendation'):
                    category_result['recommendations'].append(check_result['recommendation'])
                
                # 检查是否为关键问题
                if check_result.get('critical', False):
                    category_result['critical_issues'].append({
                        'check': check_name,
                        'description': check_description,
                        'details': check_result.get('details', '')
                    })
        
        return category_result
    
    def _execute_security_check(self, category: str, check_name: str) -> Dict[str, Any]:
        """执行具体的安全检查"""
        try:
            # 根据类别和检查名称执行相应的检查
            if category == 'authentication':
                return self._check_authentication(check_name)
            elif category == 'network':
                return self._check_network(check_name)
            elif category == 'commands':
                return self._check_commands(check_name)
            elif category == 'data_protection':
                return self._check_data_protection(check_name)
            elif category == 'monitoring':
                return self._check_monitoring(check_name)
            elif category == 'system':
                return self._check_system(check_name)
            else:
                return {'passed': False, 'details': 'Unknown check category'}
                
        except Exception as e:
            return {
                'passed': False,
                'details': f'Check execution failed: {str(e)}'
            }
    
    def _check_authentication(self, check_name: str) -> Dict[str, Any]:
        """检查认证相关配置"""
        if check_name == 'password_set':
            config = self.redis.config_get('requirepass')
            if config and config.get('requirepass'):
                return {'passed': True}
            else:
                return {
                    'passed': False,
                    'critical': True,
                    'details': 'No password authentication configured',
                    'recommendation': 'Set a strong password using CONFIG SET requirepass'
                }
        
        elif check_name == 'strong_password':
            # 这里只能检查是否设置了密码,无法检查密码强度
            config = self.redis.config_get('requirepass')
            if config and config.get('requirepass'):
                return {
                    'passed': True,
                    'details': 'Password is set (strength cannot be verified)'
                }
            else:
                return {
                    'passed': False,
                    'critical': True,
                    'details': 'No password set',
                    'recommendation': 'Set a strong password with at least 12 characters'
                }
        
        elif check_name == 'acl_enabled':
            try:
                # 尝试获取ACL用户列表
                users = self.redis.acl_list()
                if len(users) > 1:  # 除了default用户外还有其他用户
                    return {'passed': True}
                else:
                    return {
                        'passed': False,
                        'details': 'Only default ACL user exists',
                        'recommendation': 'Configure ACL users with specific permissions'
                    }
            except Exception:
                return {
                    'passed': False,
                    'details': 'ACL commands not available or failed',
                    'recommendation': 'Upgrade to Redis 6.0+ and configure ACL'
                }
        
        return {'passed': False, 'details': 'Unknown authentication check'}
    
    def _check_network(self, check_name: str) -> Dict[str, Any]:
        """检查网络相关配置"""
        if check_name == 'bind_address':
            config = self.redis.config_get('bind')
            bind_addresses = config.get('bind', '').split()
            
            if '0.0.0.0' in bind_addresses or '*' in bind_addresses:
                return {
                    'passed': False,
                    'critical': True,
                    'details': 'Redis is bound to all interfaces',
                    'recommendation': 'Bind Redis to specific IP addresses only'
                }
            elif bind_addresses and '127.0.0.1' in bind_addresses:
                return {'passed': True}
            else:
                return {
                    'passed': False,
                    'details': f'Bind configuration: {bind_addresses}',
                    'recommendation': 'Review bind address configuration'
                }
        
        elif check_name == 'protected_mode':
            config = self.redis.config_get('protected-mode')
            if config.get('protected-mode') == 'yes':
                return {'passed': True}
            else:
                return {
                    'passed': False,
                    'critical': True,
                    'details': 'Protected mode is disabled',
                    'recommendation': 'Enable protected mode: CONFIG SET protected-mode yes'
                }
        
        elif check_name == 'tls_enabled':
            # 检查TLS配置(这需要根据实际部署情况调整)
            config = self.redis.config_get('tls-port')
            if config.get('tls-port', '0') != '0':
                return {'passed': True}
            else:
                return {
                    'passed': False,
                    'details': 'TLS is not configured',
                    'recommendation': 'Configure TLS encryption for secure communication'
                }
        
        elif check_name == 'port_security':
            config = self.redis.config_get('port')
            port = config.get('port', '6379')
            if port != '6379':
                return {'passed': True}
            else:
                return {
                    'passed': False,
                    'details': 'Using default port 6379',
                    'recommendation': 'Consider changing to a non-standard port'
                }
        
        return {'passed': False, 'details': 'Unknown network check'}
    
    def _check_commands(self, check_name: str) -> Dict[str, Any]:
        """检查命令安全配置"""
        if check_name == 'dangerous_commands_disabled':
            # 检查危险命令是否被禁用
            dangerous_commands = ['FLUSHDB', 'FLUSHALL', 'CONFIG', 'DEBUG', 'SHUTDOWN']
            enabled_dangerous = []
            
            for cmd in dangerous_commands:
                try:
                    # 尝试执行命令来检查是否被禁用
                    # 这里使用一个安全的方法来检查
                    if cmd == 'CONFIG':
                        self.redis.config_get('timeout')
                        enabled_dangerous.append(cmd)
                except Exception:
                    # 命令被禁用或重命名
                    pass
            
            if enabled_dangerous:
                return {
                    'passed': False,
                    'critical': True,
                    'details': f'Dangerous commands still enabled: {enabled_dangerous}',
                    'recommendation': 'Disable or rename dangerous commands'
                }
            else:
                return {'passed': True}
        
        return {'passed': True, 'details': 'Command check completed'}
    
    def _check_data_protection(self, check_name: str) -> Dict[str, Any]:
        """检查数据保护配置"""
        # 这些检查通常需要访问系统级配置
        return {
            'passed': True,
            'details': 'Data protection check requires manual verification'
        }
    
    def _check_monitoring(self, check_name: str) -> Dict[str, Any]:
        """检查监控配置"""
        if check_name == 'logging_enabled':
            # 检查是否有审计日志
            audit_keys = self.redis.keys('audit:*')
            if audit_keys:
                return {'passed': True}
            else:
                return {
                    'passed': False,
                    'details': 'No audit logs found',
                    'recommendation': 'Enable audit logging'
                }
        
        return {'passed': True, 'details': 'Monitoring check completed'}
    
    def _check_system(self, check_name: str) -> Dict[str, Any]:
        """检查系统级配置"""
        # 系统级检查通常需要操作系统权限
        return {
            'passed': True,
            'details': 'System check requires manual verification'
        }
    
    def _get_security_level(self, score_percentage: float) -> str:
        """根据分数确定安全等级"""
        if score_percentage >= 90:
            return 'excellent'
        elif score_percentage >= 80:
            return 'good'
        elif score_percentage >= 70:
            return 'fair'
        elif score_percentage >= 60:
            return 'poor'
        else:
            return 'critical'
    
    def generate_security_report(self) -> Dict[str, Any]:
        """生成安全报告"""
        try:
            # 运行安全评估
            assessment_result = self.run_security_assessment()
            if not assessment_result['success']:
                return assessment_result
            
            assessment = assessment_result['assessment']
            
            # 生成报告
            report = {
                'executive_summary': {
                    'security_level': assessment['security_level'],
                    'overall_score': f"{assessment['score_percentage']:.1f}%",
                    'critical_issues_count': len(assessment['critical_issues']),
                    'recommendations_count': len(assessment['recommendations'])
                },
                'detailed_results': assessment['categories'],
                'critical_issues': assessment['critical_issues'],
                'recommendations': assessment['recommendations'],
                'next_steps': self._generate_next_steps(assessment),
                'generated_at': assessment['timestamp']
            }
            
            return {
                'success': True,
                'report': report
            }
            
        except Exception as e:
            self.logger.error(f"Failed to generate security report: {e}")
            return {
                'success': False,
                'error': str(e)
            }
    
    def _generate_next_steps(self, assessment: Dict[str, Any]) -> List[str]:
        """生成下一步行动建议"""
        next_steps = []
        
        # 根据安全等级提供建议
        security_level = assessment['security_level']
        
        if security_level == 'critical':
            next_steps.extend([
                'Immediately address all critical security issues',
                'Implement basic authentication and access controls',
                'Review and secure network configuration',
                'Schedule regular security assessments'
            ])
        elif security_level == 'poor':
            next_steps.extend([
                'Address critical issues within 24 hours',
                'Implement recommended security configurations',
                'Set up monitoring and alerting',
                'Plan for comprehensive security review'
            ])
        elif security_level == 'fair':
            next_steps.extend([
                'Address remaining security gaps',
                'Enhance monitoring and logging',
                'Review and update security policies',
                'Consider advanced security features'
            ])
        elif security_level in ['good', 'excellent']:
            next_steps.extend([
                'Maintain current security posture',
                'Regular security assessments',
                'Stay updated with security best practices',
                'Consider security automation'
            ])
        
        # 添加基于关键问题的具体建议
        if assessment['critical_issues']:
            next_steps.insert(0, 'URGENT: Address critical security vulnerabilities immediately')
        
        return next_steps

12.7.2 安全配置模板

class RedisSecurityTemplate:
    """Redis安全配置模板"""
    
    @staticmethod
    def get_production_config() -> Dict[str, str]:
        """生产环境安全配置模板"""
        return {
            # 基本安全配置
            'requirepass': 'your-strong-password-here',
            'protected-mode': 'yes',
            'bind': '127.0.0.1',  # 绑定到特定IP
            'port': '6380',  # 使用非默认端口
            
            # 网络安全
            'tcp-keepalive': '300',
            'timeout': '300',
            'tcp-backlog': '511',
            
            # 内存和性能
            'maxmemory-policy': 'allkeys-lru',
            'maxclients': '1000',
            
            # 持久化安全
            'save': '900 1 300 10 60 10000',
            'stop-writes-on-bgsave-error': 'yes',
            'rdbcompression': 'yes',
            'rdbchecksum': 'yes',
            
            # 日志配置
            'loglevel': 'notice',
            'syslog-enabled': 'yes',
            'syslog-ident': 'redis',
            
            # 安全限制
            'rename-command FLUSHDB': 'FLUSHDB_RENAMED_' + secrets.token_hex(16),
            'rename-command FLUSHALL': 'FLUSHALL_RENAMED_' + secrets.token_hex(16),
            'rename-command CONFIG': 'CONFIG_RENAMED_' + secrets.token_hex(16),
            'rename-command DEBUG': 'DEBUG_RENAMED_' + secrets.token_hex(16),
            'rename-command SHUTDOWN': 'SHUTDOWN_RENAMED_' + secrets.token_hex(16),
            'rename-command EVAL': 'EVAL_RENAMED_' + secrets.token_hex(16),
            'rename-command SCRIPT': 'SCRIPT_RENAMED_' + secrets.token_hex(16)
        }
    
    @staticmethod
    def get_development_config() -> Dict[str, str]:
        """开发环境安全配置模板"""
        return {
            # 基本配置
            'requirepass': 'dev-password-123',
            'protected-mode': 'yes',
            'bind': '127.0.0.1',
            
            # 开发便利性配置
            'timeout': '0',
            'loglevel': 'debug',
            
            # 限制危险命令
            'rename-command FLUSHALL': 'FLUSHALL_DEV',
            'rename-command CONFIG': 'CONFIG_DEV'
        }
    
    @staticmethod
    def get_acl_config() -> List[str]:
        """ACL配置模板"""
        return [
            # 管理员用户
            'ACL SETUSER admin on >admin-strong-password ~* &* +@all',
            
            # 应用用户(只能访问特定键模式)
            'ACL SETUSER app on >app-password ~app:* +@read +@write -@dangerous',
            
            # 只读用户
            'ACL SETUSER readonly on >readonly-password ~* +@read -@write -@dangerous',
            
            # 监控用户
            'ACL SETUSER monitor on >monitor-password ~* +info +ping +client +config|get',
            
            # 禁用默认用户
            'ACL SETUSER default off'
        ]

12.8 总结

Redis安全配置是确保数据安全和系统稳定的关键环节。本章详细介绍了Redis的安全威胁、安全原则以及全面的安全配置方案。

12.8.1 核心安全要点

  1. 身份认证和授权

    • 启用密码认证
    • 配置ACL访问控制
    • 实施最小权限原则
  2. 网络安全

    • 配置TLS/SSL加密
    • 限制网络访问
    • 使用防火墙和VPN
  3. 命令安全

    • 禁用或重命名危险命令
    • 实施脚本安全策略
    • 监控命令执行
  4. 数据保护

    • 实施数据加密
    • 敏感数据处理
    • 数据分类管理
  5. 监控和审计

    • 安全事件监控
    • 审计日志记录
    • 告警机制

12.8.2 最佳实践建议

  1. 定期安全评估

    • 使用安全检查清单
    • 进行渗透测试
    • 更新安全配置
  2. 持续监控

    • 实时安全监控
    • 异常行为检测
    • 自动化响应
  3. 安全培训

    • 团队安全意识培训
    • 最新威胁了解
    • 应急响应演练

12.8.3 参考资料


下一章预告:第13章将介绍Redis的性能优化和调优技巧,包括内存优化、网络优化、持久化优化等内容。