Redis作为内存数据库,在生产环境中需要特别关注安全配置。本章将详细介绍Redis的安全机制、配置方法和最佳实践。
12.1 Redis安全概述
12.1.1 安全威胁
- 未授权访问:默认配置下Redis不需要密码
- 数据泄露:敏感数据可能被恶意访问
- 命令注入:危险命令可能被滥用
- 网络攻击:未加密的网络传输
- 权限提升:利用Redis进行系统攻击
12.1.2 安全原则
- 最小权限原则:只授予必要的权限
- 深度防御:多层安全防护
- 定期审计:监控和审计访问日志
- 及时更新:保持Redis版本更新
- 网络隔离:限制网络访问
12.2 身份认证和授权
12.2.1 密码认证
import redis
import hashlib
import secrets
from typing import Dict, List, Optional, Any
import logging
from datetime import datetime, timedelta
class RedisAuthManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 密码策略配置
self.password_policy = {
'min_length': 12,
'require_uppercase': True,
'require_lowercase': True,
'require_numbers': True,
'require_special_chars': True,
'max_age_days': 90,
'history_count': 5
}
def generate_secure_password(self, length: int = 16) -> str:
"""生成安全密码"""
import string
# 确保包含各种字符类型
chars = (
string.ascii_lowercase +
string.ascii_uppercase +
string.digits +
'!@#$%^&*()_+-=[]{}|;:,.<>?'
)
while True:
password = ''.join(secrets.choice(chars) for _ in range(length))
# 验证密码策略
if self._validate_password_policy(password):
return password
def _validate_password_policy(self, password: str) -> bool:
"""验证密码策略"""
if len(password) < self.password_policy['min_length']:
return False
if self.password_policy['require_uppercase'] and not any(c.isupper() for c in password):
return False
if self.password_policy['require_lowercase'] and not any(c.islower() for c in password):
return False
if self.password_policy['require_numbers'] and not any(c.isdigit() for c in password):
return False
if self.password_policy['require_special_chars']:
special_chars = '!@#$%^&*()_+-=[]{}|;:,.<>?'
if not any(c in special_chars for c in password):
return False
return True
def hash_password(self, password: str, salt: Optional[str] = None) -> Dict[str, str]:
"""哈希密码"""
if salt is None:
salt = secrets.token_hex(32)
# 使用PBKDF2进行密码哈希
import hashlib
password_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
salt.encode('utf-8'),
100000 # 迭代次数
)
return {
'hash': password_hash.hex(),
'salt': salt,
'algorithm': 'pbkdf2_sha256',
'iterations': 100000
}
def verify_password(self, password: str, stored_hash: Dict[str, str]) -> bool:
"""验证密码"""
computed_hash = hashlib.pbkdf2_hmac(
'sha256',
password.encode('utf-8'),
stored_hash['salt'].encode('utf-8'),
stored_hash['iterations']
)
return computed_hash.hex() == stored_hash['hash']
def set_redis_password(self, password: str) -> Dict[str, Any]:
"""设置Redis密码"""
try:
# 验证密码策略
if not self._validate_password_policy(password):
return {
'success': False,
'error': 'Password does not meet policy requirements'
}
# 设置密码
self.redis.config_set('requirepass', password)
# 记录密码设置事件
self._log_security_event('password_set', {
'timestamp': datetime.now().isoformat(),
'action': 'password_configured'
})
return {
'success': True,
'message': 'Password set successfully'
}
except Exception as e:
self.logger.error(f"Failed to set Redis password: {e}")
return {
'success': False,
'error': str(e)
}
def rotate_password(self, new_password: str) -> Dict[str, Any]:
"""轮换密码"""
try:
# 获取当前密码历史
password_history = self._get_password_history()
# 检查密码是否在历史中
new_hash = self.hash_password(new_password)
for old_hash in password_history:
if self.verify_password(new_password, old_hash):
return {
'success': False,
'error': 'Password was used recently'
}
# 设置新密码
result = self.set_redis_password(new_password)
if result['success']:
# 更新密码历史
self._update_password_history(new_hash)
self._log_security_event('password_rotated', {
'timestamp': datetime.now().isoformat(),
'action': 'password_rotated'
})
return result
except Exception as e:
self.logger.error(f"Failed to rotate password: {e}")
return {
'success': False,
'error': str(e)
}
def _get_password_history(self) -> List[Dict[str, str]]:
"""获取密码历史"""
try:
history_data = self.redis.get('redis:password_history')
if history_data:
import json
return json.loads(history_data)
return []
except Exception:
return []
def _update_password_history(self, password_hash: Dict[str, str]) -> None:
"""更新密码历史"""
try:
history = self._get_password_history()
# 添加新密码哈希
password_hash['created_at'] = datetime.now().isoformat()
history.append(password_hash)
# 只保留最近的密码
if len(history) > self.password_policy['history_count']:
history = history[-self.password_policy['history_count']:]
# 保存历史
import json
self.redis.set('redis:password_history', json.dumps(history))
except Exception as e:
self.logger.error(f"Failed to update password history: {e}")
def _log_security_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""记录安全事件"""
try:
event = {
'type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
import json
self.redis.lpush('redis:security_events', json.dumps(event))
# 只保留最近1000条事件
self.redis.ltrim('redis:security_events', 0, 999)
except Exception as e:
self.logger.error(f"Failed to log security event: {e}")
def check_password_expiry(self) -> Dict[str, Any]:
"""检查密码是否过期"""
try:
history = self._get_password_history()
if not history:
return {
'expired': False,
'message': 'No password history found'
}
# 获取最新密码的创建时间
latest_password = history[-1]
created_at = datetime.fromisoformat(latest_password['created_at'])
# 计算过期时间
expiry_date = created_at + timedelta(days=self.password_policy['max_age_days'])
days_until_expiry = (expiry_date - datetime.now()).days
return {
'expired': days_until_expiry <= 0,
'days_until_expiry': days_until_expiry,
'expiry_date': expiry_date.isoformat(),
'created_at': created_at.isoformat()
}
except Exception as e:
self.logger.error(f"Failed to check password expiry: {e}")
return {
'expired': False,
'error': str(e)
}
### 12.2.2 ACL访问控制列表
```python
class RedisACLManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 预定义角色
self.predefined_roles = {
'admin': {
'commands': ['*'],
'keys': ['*'],
'description': 'Full administrative access'
},
'read_only': {
'commands': ['get', 'mget', 'exists', 'keys', 'scan', 'type', 'ttl'],
'keys': ['*'],
'description': 'Read-only access to all keys'
},
'app_user': {
'commands': ['get', 'set', 'del', 'exists', 'expire', 'ttl'],
'keys': ['app:*'],
'description': 'Application user with limited access'
},
'cache_user': {
'commands': ['get', 'set', 'del', 'exists', 'expire'],
'keys': ['cache:*'],
'description': 'Cache operations only'
}
}
def create_user(self, username: str, password: str,
role: str = 'app_user',
custom_rules: Optional[Dict[str, List[str]]] = None) -> Dict[str, Any]:
"""创建用户"""
try:
# 检查用户是否已存在
if self._user_exists(username):
return {
'success': False,
'error': f'User {username} already exists'
}
# 获取角色配置
if role in self.predefined_roles:
role_config = self.predefined_roles[role]
elif custom_rules:
role_config = custom_rules
else:
return {
'success': False,
'error': f'Unknown role {role} and no custom rules provided'
}
# 构建ACL规则
acl_rules = self._build_acl_rules(username, password, role_config)
# 创建用户
self.redis.execute_command('ACL', 'SETUSER', username, *acl_rules)
# 记录用户创建事件
self._log_acl_event('user_created', {
'username': username,
'role': role,
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'username': username,
'role': role,
'rules': acl_rules
}
except Exception as e:
self.logger.error(f"Failed to create user {username}: {e}")
return {
'success': False,
'error': str(e)
}
def _user_exists(self, username: str) -> bool:
"""检查用户是否存在"""
try:
users = self.redis.execute_command('ACL', 'LIST')
for user_info in users:
if user_info.startswith(f'user {username} '):
return True
return False
except Exception:
return False
def _build_acl_rules(self, username: str, password: str,
role_config: Dict[str, List[str]]) -> List[str]:
"""构建ACL规则"""
rules = ['on'] # 启用用户
# 设置密码
rules.append(f'>{password}')
# 设置命令权限
commands = role_config.get('commands', [])
if '*' in commands:
rules.append('+@all')
else:
# 首先禁用所有命令
rules.append('-@all')
# 然后启用指定命令
for cmd in commands:
rules.append(f'+{cmd}')
# 设置键权限
keys = role_config.get('keys', [])
if '*' in keys:
rules.append('~*')
else:
# 重置键权限
rules.append('resetkeys')
# 添加键模式
for key_pattern in keys:
rules.append(f'~{key_pattern}')
return rules
def update_user_password(self, username: str, new_password: str) -> Dict[str, Any]:
"""更新用户密码"""
try:
if not self._user_exists(username):
return {
'success': False,
'error': f'User {username} does not exist'
}
# 更新密码
self.redis.execute_command('ACL', 'SETUSER', username, f'>{new_password}')
# 记录密码更新事件
self._log_acl_event('password_updated', {
'username': username,
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'message': f'Password updated for user {username}'
}
except Exception as e:
self.logger.error(f"Failed to update password for {username}: {e}")
return {
'success': False,
'error': str(e)
}
def disable_user(self, username: str) -> Dict[str, Any]:
"""禁用用户"""
try:
if not self._user_exists(username):
return {
'success': False,
'error': f'User {username} does not exist'
}
# 禁用用户
self.redis.execute_command('ACL', 'SETUSER', username, 'off')
# 记录用户禁用事件
self._log_acl_event('user_disabled', {
'username': username,
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'message': f'User {username} disabled'
}
except Exception as e:
self.logger.error(f"Failed to disable user {username}: {e}")
return {
'success': False,
'error': str(e)
}
def delete_user(self, username: str) -> Dict[str, Any]:
"""删除用户"""
try:
if not self._user_exists(username):
return {
'success': False,
'error': f'User {username} does not exist'
}
# 删除用户
self.redis.execute_command('ACL', 'DELUSER', username)
# 记录用户删除事件
self._log_acl_event('user_deleted', {
'username': username,
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'message': f'User {username} deleted'
}
except Exception as e:
self.logger.error(f"Failed to delete user {username}: {e}")
return {
'success': False,
'error': str(e)
}
def list_users(self) -> Dict[str, Any]:
"""列出所有用户"""
try:
users_info = self.redis.execute_command('ACL', 'LIST')
users = []
for user_info in users_info:
# 解析用户信息
parts = user_info.split(' ')
if len(parts) >= 2 and parts[0] == 'user':
username = parts[1]
status = 'on' if 'on' in parts else 'off'
users.append({
'username': username,
'status': status,
'raw_info': user_info
})
return {
'success': True,
'users': users,
'total_count': len(users)
}
except Exception as e:
self.logger.error(f"Failed to list users: {e}")
return {
'success': False,
'error': str(e)
}
def get_user_info(self, username: str) -> Dict[str, Any]:
"""获取用户详细信息"""
try:
user_info = self.redis.execute_command('ACL', 'GETUSER', username)
# 解析用户信息
parsed_info = {}
for i in range(0, len(user_info), 2):
key = user_info[i]
value = user_info[i + 1] if i + 1 < len(user_info) else None
parsed_info[key] = value
return {
'success': True,
'username': username,
'info': parsed_info
}
except Exception as e:
self.logger.error(f"Failed to get user info for {username}: {e}")
return {
'success': False,
'error': str(e)
}
def _log_acl_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""记录ACL事件"""
try:
event = {
'type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
import json
self.redis.lpush('redis:acl_events', json.dumps(event))
# 只保留最近1000条事件
self.redis.ltrim('redis:acl_events', 0, 999)
except Exception as e:
self.logger.error(f"Failed to log ACL event: {e}")
def audit_user_permissions(self) -> Dict[str, Any]:
"""审计用户权限"""
try:
users_result = self.list_users()
if not users_result['success']:
return users_result
audit_results = []
for user in users_result['users']:
username = user['username']
user_info_result = self.get_user_info(username)
if user_info_result['success']:
user_info = user_info_result['info']
# 分析权限
permissions_analysis = self._analyze_permissions(user_info)
audit_results.append({
'username': username,
'status': user['status'],
'permissions': permissions_analysis,
'risk_level': self._assess_risk_level(permissions_analysis)
})
return {
'success': True,
'audit_results': audit_results,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to audit user permissions: {e}")
return {
'success': False,
'error': str(e)
}
def _analyze_permissions(self, user_info: Dict[str, Any]) -> Dict[str, Any]:
"""分析用户权限"""
commands = user_info.get('commands', '')
keys = user_info.get('keys', '')
# 分析命令权限
has_all_commands = '+@all' in commands
dangerous_commands = []
if has_all_commands:
dangerous_commands = ['ALL_COMMANDS']
else:
# 检查危险命令
risky_commands = ['flushdb', 'flushall', 'config', 'eval', 'script', 'shutdown']
for cmd in risky_commands:
if f'+{cmd}' in commands:
dangerous_commands.append(cmd)
# 分析键权限
has_all_keys = '~*' in keys
key_patterns = []
if has_all_keys:
key_patterns = ['*']
else:
# 提取键模式
import re
patterns = re.findall(r'~([^\s]+)', keys)
key_patterns = patterns
return {
'has_all_commands': has_all_commands,
'dangerous_commands': dangerous_commands,
'has_all_keys': has_all_keys,
'key_patterns': key_patterns,
'command_count': len(dangerous_commands) if not has_all_commands else 'unlimited',
'key_pattern_count': len(key_patterns)
}
def _assess_risk_level(self, permissions: Dict[str, Any]) -> str:
"""评估风险级别"""
risk_score = 0
# 命令权限风险
if permissions['has_all_commands']:
risk_score += 50
else:
risk_score += len(permissions['dangerous_commands']) * 10
# 键权限风险
if permissions['has_all_keys']:
risk_score += 30
else:
# 根据键模式的广泛性评估风险
for pattern in permissions['key_patterns']:
if '*' in pattern:
risk_score += 10
else:
risk_score += 2
# 确定风险级别
if risk_score >= 70:
return 'high'
elif risk_score >= 40:
return 'medium'
elif risk_score >= 20:
return 'low'
else:
return 'minimal'
12.3 网络安全
12.3.1 TLS/SSL加密
import ssl
import socket
from typing import Dict, Any, Optional
class RedisSSLManager:
def __init__(self, host: str = 'localhost', port: int = 6380):
self.host = host
self.port = port
self.logger = logging.getLogger(__name__)
# SSL配置
self.ssl_config = {
'cert_file': None,
'key_file': None,
'ca_file': None,
'verify_mode': ssl.CERT_REQUIRED,
'check_hostname': True
}
def configure_ssl(self, cert_file: str, key_file: str,
ca_file: Optional[str] = None,
verify_mode: int = ssl.CERT_REQUIRED,
check_hostname: bool = True) -> Dict[str, Any]:
"""配置SSL设置"""
try:
# 验证证书文件
import os
if not os.path.exists(cert_file):
return {
'success': False,
'error': f'Certificate file not found: {cert_file}'
}
if not os.path.exists(key_file):
return {
'success': False,
'error': f'Key file not found: {key_file}'
}
if ca_file and not os.path.exists(ca_file):
return {
'success': False,
'error': f'CA file not found: {ca_file}'
}
# 更新SSL配置
self.ssl_config.update({
'cert_file': cert_file,
'key_file': key_file,
'ca_file': ca_file,
'verify_mode': verify_mode,
'check_hostname': check_hostname
})
return {
'success': True,
'message': 'SSL configuration updated',
'config': self.ssl_config.copy()
}
except Exception as e:
self.logger.error(f"Failed to configure SSL: {e}")
return {
'success': False,
'error': str(e)
}
def create_ssl_context(self) -> ssl.SSLContext:
"""创建SSL上下文"""
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
# 设置验证模式
context.check_hostname = self.ssl_config['check_hostname']
context.verify_mode = self.ssl_config['verify_mode']
# 加载证书
if self.ssl_config['cert_file'] and self.ssl_config['key_file']:
context.load_cert_chain(
self.ssl_config['cert_file'],
self.ssl_config['key_file']
)
# 加载CA证书
if self.ssl_config['ca_file']:
context.load_verify_locations(self.ssl_config['ca_file'])
return context
def create_secure_connection(self, password: Optional[str] = None) -> Dict[str, Any]:
"""创建安全连接"""
try:
# 创建SSL上下文
ssl_context = self.create_ssl_context()
# 创建Redis连接
redis_client = redis.Redis(
host=self.host,
port=self.port,
password=password,
ssl=True,
ssl_context=ssl_context,
decode_responses=True
)
# 测试连接
redis_client.ping()
return {
'success': True,
'client': redis_client,
'ssl_info': self._get_ssl_info(redis_client)
}
except Exception as e:
self.logger.error(f"Failed to create secure connection: {e}")
return {
'success': False,
'error': str(e)
}
def _get_ssl_info(self, redis_client: redis.Redis) -> Dict[str, Any]:
"""获取SSL连接信息"""
try:
# 获取连接池中的连接
connection = redis_client.connection_pool.get_connection('ping')
if hasattr(connection, '_sock') and hasattr(connection._sock, 'getpeercert'):
cert = connection._sock.getpeercert()
cipher = connection._sock.cipher()
return {
'certificate': cert,
'cipher': cipher,
'ssl_version': connection._sock.version()
}
return {'info': 'SSL connection established'}
except Exception as e:
return {'error': str(e)}
def validate_certificate(self, cert_file: str) -> Dict[str, Any]:
"""验证证书"""
try:
import OpenSSL.crypto
with open(cert_file, 'rb') as f:
cert_data = f.read()
cert = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, cert_data
)
# 获取证书信息
subject = cert.get_subject()
issuer = cert.get_issuer()
# 检查过期时间
not_after = cert.get_notAfter().decode('ascii')
expiry_date = datetime.strptime(not_after, '%Y%m%d%H%M%SZ')
days_until_expiry = (expiry_date - datetime.now()).days
return {
'success': True,
'certificate_info': {
'subject': dict(subject.get_components()),
'issuer': dict(issuer.get_components()),
'serial_number': cert.get_serial_number(),
'version': cert.get_version(),
'not_before': cert.get_notBefore().decode('ascii'),
'not_after': not_after,
'days_until_expiry': days_until_expiry,
'expired': days_until_expiry <= 0
}
}
except Exception as e:
self.logger.error(f"Failed to validate certificate: {e}")
return {
'success': False,
'error': str(e)
}
### 12.3.2 网络访问控制
```python
class RedisNetworkSecurity:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 网络安全配置
self.security_config = {
'bind_interfaces': ['127.0.0.1'],
'protected_mode': True,
'tcp_keepalive': 300,
'timeout': 0,
'tcp_backlog': 511
}
def configure_network_security(self, config: Dict[str, Any]) -> Dict[str, Any]:
"""配置网络安全"""
try:
results = {}
# 配置绑定接口
if 'bind_interfaces' in config:
bind_result = self._configure_bind_interfaces(config['bind_interfaces'])
results['bind_interfaces'] = bind_result
# 配置保护模式
if 'protected_mode' in config:
protected_result = self._configure_protected_mode(config['protected_mode'])
results['protected_mode'] = protected_result
# 配置TCP keepalive
if 'tcp_keepalive' in config:
keepalive_result = self._configure_tcp_keepalive(config['tcp_keepalive'])
results['tcp_keepalive'] = keepalive_result
# 配置超时
if 'timeout' in config:
timeout_result = self._configure_timeout(config['timeout'])
results['timeout'] = timeout_result
return {
'success': True,
'results': results
}
except Exception as e:
self.logger.error(f"Failed to configure network security: {e}")
return {
'success': False,
'error': str(e)
}
def _configure_bind_interfaces(self, interfaces: List[str]) -> Dict[str, Any]:
"""配置绑定接口"""
try:
# 验证IP地址格式
import ipaddress
valid_interfaces = []
for interface in interfaces:
try:
ipaddress.ip_address(interface)
valid_interfaces.append(interface)
except ValueError:
self.logger.warning(f"Invalid IP address: {interface}")
if not valid_interfaces:
return {
'success': False,
'error': 'No valid IP addresses provided'
}
# 设置绑定接口
bind_string = ' '.join(valid_interfaces)
self.redis.config_set('bind', bind_string)
self.security_config['bind_interfaces'] = valid_interfaces
return {
'success': True,
'interfaces': valid_interfaces
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _configure_protected_mode(self, enabled: bool) -> Dict[str, Any]:
"""配置保护模式"""
try:
self.redis.config_set('protected-mode', 'yes' if enabled else 'no')
self.security_config['protected_mode'] = enabled
return {
'success': True,
'protected_mode': enabled
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _configure_tcp_keepalive(self, keepalive: int) -> Dict[str, Any]:
"""配置TCP keepalive"""
try:
self.redis.config_set('tcp-keepalive', keepalive)
self.security_config['tcp_keepalive'] = keepalive
return {
'success': True,
'tcp_keepalive': keepalive
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _configure_timeout(self, timeout: int) -> Dict[str, Any]:
"""配置连接超时"""
try:
self.redis.config_set('timeout', timeout)
self.security_config['timeout'] = timeout
return {
'success': True,
'timeout': timeout
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def setup_firewall_rules(self, allowed_ips: List[str],
redis_port: int = 6379) -> Dict[str, Any]:
"""设置防火墙规则(示例)"""
try:
# 这是一个示例实现,实际环境中需要根据具体的防火墙系统调整
import subprocess
rules_applied = []
# 清除现有规则(谨慎操作)
# subprocess.run(['iptables', '-F'], check=True)
# 允许本地回环
subprocess.run([
'iptables', '-A', 'INPUT', '-i', 'lo', '-j', 'ACCEPT'
], check=True)
rules_applied.append('Allow loopback interface')
# 允许指定IP访问Redis端口
for ip in allowed_ips:
subprocess.run([
'iptables', '-A', 'INPUT', '-p', 'tcp',
'--dport', str(redis_port), '-s', ip, '-j', 'ACCEPT'
], check=True)
rules_applied.append(f'Allow {ip} to access port {redis_port}')
# 拒绝其他访问
subprocess.run([
'iptables', '-A', 'INPUT', '-p', 'tcp',
'--dport', str(redis_port), '-j', 'DROP'
], check=True)
rules_applied.append(f'Drop other access to port {redis_port}')
return {
'success': True,
'rules_applied': rules_applied,
'warning': 'This is a basic example. Adjust for your firewall system.'
}
except Exception as e:
self.logger.error(f"Failed to setup firewall rules: {e}")
return {
'success': False,
'error': str(e),
'warning': 'Firewall configuration requires appropriate permissions'
}
def monitor_connections(self) -> Dict[str, Any]:
"""监控连接"""
try:
# 获取客户端连接信息
clients_info = self.redis.client_list()
# 分析连接
connection_analysis = {
'total_connections': len(clients_info),
'unique_ips': set(),
'connection_types': {},
'suspicious_connections': []
}
for client in clients_info:
# 提取IP地址
addr = client.get('addr', '')
if ':' in addr:
ip = addr.split(':')[0]
connection_analysis['unique_ips'].add(ip)
# 统计连接类型
client_type = client.get('name', 'unknown')
connection_analysis['connection_types'][client_type] = \
connection_analysis['connection_types'].get(client_type, 0) + 1
# 检测可疑连接
if self._is_suspicious_connection(client):
connection_analysis['suspicious_connections'].append(client)
connection_analysis['unique_ips'] = list(connection_analysis['unique_ips'])
return {
'success': True,
'analysis': connection_analysis,
'raw_data': clients_info
}
except Exception as e:
self.logger.error(f"Failed to monitor connections: {e}")
return {
'success': False,
'error': str(e)
}
def _is_suspicious_connection(self, client: Dict[str, Any]) -> bool:
"""检测可疑连接"""
# 检查连接时间过长
age = client.get('age', 0)
if age > 3600: # 超过1小时
return True
# 检查空闲时间过长
idle = client.get('idle', 0)
if idle > 1800: # 空闲超过30分钟
return True
# 检查异常的客户端名称
name = client.get('name', '')
suspicious_names = ['admin', 'root', 'test', 'scanner']
if any(sus_name in name.lower() for sus_name in suspicious_names):
return True
return False
def kill_suspicious_connections(self) -> Dict[str, Any]:
"""终止可疑连接"""
try:
monitor_result = self.monitor_connections()
if not monitor_result['success']:
return monitor_result
suspicious_connections = monitor_result['analysis']['suspicious_connections']
killed_connections = []
for client in suspicious_connections:
client_id = client.get('id')
if client_id:
try:
self.redis.client_kill_filter(_id=client_id)
killed_connections.append(client)
except Exception as e:
self.logger.warning(f"Failed to kill client {client_id}: {e}")
return {
'success': True,
'killed_connections': killed_connections,
'total_killed': len(killed_connections)
}
except Exception as e:
self.logger.error(f"Failed to kill suspicious connections: {e}")
return {
'success': False,
'error': str(e)
}
12.4 命令安全
12.4.1 危险命令管理
class RedisCommandSecurity:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 危险命令分类
self.dangerous_commands = {
'critical': [
'FLUSHDB', 'FLUSHALL', 'SHUTDOWN', 'DEBUG',
'CONFIG', 'EVAL', 'EVALSHA', 'SCRIPT'
],
'administrative': [
'CLIENT', 'MONITOR', 'SYNC', 'PSYNC',
'REPLCONF', 'RESTORE', 'MIGRATE'
],
'potentially_harmful': [
'KEYS', 'SCAN', 'DEL', 'UNLINK', 'EXPIRE',
'EXPIREAT', 'PEXPIRE', 'PEXPIREAT'
]
}
# 命令别名映射
self.command_aliases = {
'FLUSHDB': ['flushdb', 'FLUSHDB'],
'FLUSHALL': ['flushall', 'FLUSHALL'],
'CONFIG': ['config', 'CONFIG'],
'EVAL': ['eval', 'EVAL'],
'SCRIPT': ['script', 'SCRIPT']
}
def disable_dangerous_commands(self, command_categories: List[str] = None) -> Dict[str, Any]:
"""禁用危险命令"""
try:
if command_categories is None:
command_categories = ['critical']
disabled_commands = []
for category in command_categories:
if category in self.dangerous_commands:
commands = self.dangerous_commands[category]
for command in commands:
# 重命名命令为随机字符串(实际上是禁用)
random_name = self._generate_random_string(32)
try:
self.redis.config_set(f'rename-command {command}', random_name)
disabled_commands.append(command)
except Exception as e:
self.logger.warning(f"Failed to disable command {command}: {e}")
# 记录命令禁用事件
self._log_command_event('commands_disabled', {
'disabled_commands': disabled_commands,
'categories': command_categories,
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'disabled_commands': disabled_commands,
'total_disabled': len(disabled_commands)
}
except Exception as e:
self.logger.error(f"Failed to disable dangerous commands: {e}")
return {
'success': False,
'error': str(e)
}
def create_command_whitelist(self, allowed_commands: List[str]) -> Dict[str, Any]:
"""创建命令白名单"""
try:
# 获取所有Redis命令
all_commands = self._get_all_redis_commands()
# 计算需要禁用的命令
commands_to_disable = []
for command in all_commands:
if command.upper() not in [cmd.upper() for cmd in allowed_commands]:
commands_to_disable.append(command)
# 禁用不在白名单中的命令
disabled_count = 0
for command in commands_to_disable:
try:
random_name = self._generate_random_string(32)
self.redis.config_set(f'rename-command {command}', random_name)
disabled_count += 1
except Exception as e:
self.logger.warning(f"Failed to disable command {command}: {e}")
# 记录白名单创建事件
self._log_command_event('whitelist_created', {
'allowed_commands': allowed_commands,
'disabled_count': disabled_count,
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'allowed_commands': allowed_commands,
'disabled_count': disabled_count,
'total_commands': len(all_commands)
}
except Exception as e:
self.logger.error(f"Failed to create command whitelist: {e}")
return {
'success': False,
'error': str(e)
}
def _get_all_redis_commands(self) -> List[str]:
"""获取所有Redis命令"""
try:
# 这是一个基本的Redis命令列表,实际环境中可能需要更完整的列表
return [
'GET', 'SET', 'DEL', 'EXISTS', 'EXPIRE', 'TTL', 'TYPE',
'MGET', 'MSET', 'INCR', 'DECR', 'INCRBY', 'DECRBY',
'LPUSH', 'RPUSH', 'LPOP', 'RPOP', 'LLEN', 'LRANGE',
'SADD', 'SREM', 'SMEMBERS', 'SCARD', 'SISMEMBER',
'ZADD', 'ZREM', 'ZRANGE', 'ZCARD', 'ZSCORE',
'HSET', 'HGET', 'HDEL', 'HKEYS', 'HVALS', 'HGETALL',
'FLUSHDB', 'FLUSHALL', 'CONFIG', 'EVAL', 'SCRIPT',
'MONITOR', 'CLIENT', 'DEBUG', 'SHUTDOWN'
]
except Exception:
return []
def _generate_random_string(self, length: int) -> str:
"""生成随机字符串"""
import string
import secrets
chars = string.ascii_letters + string.digits
return ''.join(secrets.choice(chars) for _ in range(length))
def monitor_command_usage(self, duration_seconds: int = 60) -> Dict[str, Any]:
"""监控命令使用情况"""
try:
import time
import threading
command_stats = {}
monitoring = True
def monitor_worker():
nonlocal monitoring
# 使用MONITOR命令监控Redis命令
monitor_gen = self.redis.monitor()
for command_info in monitor_gen:
if not monitoring:
break
# 解析命令信息
command_parts = command_info['command'].split()
if command_parts:
command_name = command_parts[0].upper()
if command_name not in command_stats:
command_stats[command_name] = {
'count': 0,
'first_seen': datetime.now().isoformat(),
'last_seen': None,
'clients': set()
}
command_stats[command_name]['count'] += 1
command_stats[command_name]['last_seen'] = datetime.now().isoformat()
command_stats[command_name]['clients'].add(command_info.get('client_addr', 'unknown'))
# 启动监控线程
monitor_thread = threading.Thread(target=monitor_worker)
monitor_thread.daemon = True
monitor_thread.start()
# 等待指定时间
time.sleep(duration_seconds)
# 停止监控
monitoring = False
monitor_thread.join(timeout=5)
# 转换集合为列表以便JSON序列化
for cmd_name in command_stats:
command_stats[cmd_name]['clients'] = list(command_stats[cmd_name]['clients'])
# 分析结果
analysis = self._analyze_command_usage(command_stats)
return {
'success': True,
'monitoring_duration': duration_seconds,
'command_stats': command_stats,
'analysis': analysis
}
except Exception as e:
self.logger.error(f"Failed to monitor command usage: {e}")
return {
'success': False,
'error': str(e)
}
def _analyze_command_usage(self, command_stats: Dict[str, Any]) -> Dict[str, Any]:
"""分析命令使用情况"""
total_commands = sum(stats['count'] for stats in command_stats.values())
# 找出最常用的命令
most_used = sorted(
command_stats.items(),
key=lambda x: x[1]['count'],
reverse=True
)[:10]
# 检测危险命令使用
dangerous_used = []
for cmd_name, stats in command_stats.items():
for category, commands in self.dangerous_commands.items():
if cmd_name in commands:
dangerous_used.append({
'command': cmd_name,
'category': category,
'count': stats['count'],
'clients': stats['clients']
})
return {
'total_commands_executed': total_commands,
'unique_commands': len(command_stats),
'most_used_commands': most_used,
'dangerous_commands_used': dangerous_used,
'risk_level': 'high' if dangerous_used else 'low'
}
def _log_command_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""记录命令安全事件"""
try:
event = {
'type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
import json
self.redis.lpush('redis:command_security_events', json.dumps(event))
# 只保留最近1000条事件
self.redis.ltrim('redis:command_security_events', 0, 999)
except Exception as e:
self.logger.error(f"Failed to log command security event: {e}")
### 12.4.2 脚本安全
```python
class RedisScriptSecurity:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 脚本安全策略
self.script_policy = {
'max_script_size': 8192, # 最大脚本大小(字节)
'max_execution_time': 5000, # 最大执行时间(毫秒)
'allowed_functions': [
'redis.call', 'redis.pcall', 'redis.log',
'table.insert', 'table.remove', 'string.sub',
'math.floor', 'math.ceil', 'tonumber', 'tostring'
],
'forbidden_patterns': [
'os\.', 'io\.', 'file', 'require', 'dofile',
'loadfile', 'loadstring', 'debug\.',
'package\.', 'module', 'getfenv', 'setfenv'
]
}
def validate_script(self, script: str) -> Dict[str, Any]:
"""验证Lua脚本安全性"""
try:
validation_results = {
'is_safe': True,
'issues': [],
'warnings': [],
'script_info': {
'size': len(script.encode('utf-8')),
'lines': len(script.split('\n')),
'functions_used': [],
'redis_calls': []
}
}
# 检查脚本大小
if validation_results['script_info']['size'] > self.script_policy['max_script_size']:
validation_results['is_safe'] = False
validation_results['issues'].append(
f"Script size ({validation_results['script_info']['size']} bytes) "
f"exceeds maximum allowed ({self.script_policy['max_script_size']} bytes)"
)
# 检查禁用模式
import re
for pattern in self.script_policy['forbidden_patterns']:
matches = re.findall(pattern, script, re.IGNORECASE)
if matches:
validation_results['is_safe'] = False
validation_results['issues'].append(
f"Forbidden pattern '{pattern}' found: {matches}"
)
# 分析函数使用
self._analyze_script_functions(script, validation_results)
# 分析Redis调用
self._analyze_redis_calls(script, validation_results)
# 检查潜在的无限循环
self._check_infinite_loops(script, validation_results)
return {
'success': True,
'validation': validation_results
}
except Exception as e:
self.logger.error(f"Failed to validate script: {e}")
return {
'success': False,
'error': str(e)
}
def _analyze_script_functions(self, script: str, results: Dict[str, Any]) -> None:
"""分析脚本中使用的函数"""
import re
# 查找函数调用
function_pattern = r'([a-zA-Z_][a-zA-Z0-9_]*(?:\.[a-zA-Z_][a-zA-Z0-9_]*)?)\s*\('
functions = re.findall(function_pattern, script)
results['script_info']['functions_used'] = list(set(functions))
# 检查是否使用了未允许的函数
for func in functions:
if func not in self.script_policy['allowed_functions']:
# 检查是否是允许的函数族
allowed = False
for allowed_func in self.script_policy['allowed_functions']:
if func.startswith(allowed_func.replace('.', '.')):
allowed = True
break
if not allowed:
results['warnings'].append(
f"Potentially unsafe function used: {func}"
)
def _analyze_redis_calls(self, script: str, results: Dict[str, Any]) -> None:
"""分析Redis调用"""
import re
# 查找redis.call和redis.pcall
redis_call_pattern = r'redis\.(p?call)\s*\(\s*["\']([^"\']*)["\']
calls = re.findall(redis_call_pattern, script, re.IGNORECASE)
redis_calls = []
for call_type, command in calls:
redis_calls.append({
'type': f'redis.{call_type}call',
'command': command.upper()
})
results['script_info']['redis_calls'] = redis_calls
# 检查危险的Redis命令
dangerous_commands = ['FLUSHDB', 'FLUSHALL', 'CONFIG', 'EVAL', 'SCRIPT']
for call in redis_calls:
if call['command'] in dangerous_commands:
results['is_safe'] = False
results['issues'].append(
f"Dangerous Redis command in script: {call['command']}"
)
def _check_infinite_loops(self, script: str, results: Dict[str, Any]) -> None:
"""检查潜在的无限循环"""
import re
# 检查while循环
while_pattern = r'while\s+.*\s+do'
while_loops = re.findall(while_pattern, script, re.IGNORECASE)
if while_loops:
results['warnings'].append(
f"Found {len(while_loops)} while loop(s). Ensure they have proper exit conditions."
)
# 检查for循环
for_pattern = r'for\s+.*\s+do'
for_loops = re.findall(for_pattern, script, re.IGNORECASE)
if for_loops:
results['script_info']['loop_count'] = len(for_loops)
def execute_safe_script(self, script: str, keys: List[str] = None,
args: List[str] = None) -> Dict[str, Any]:
"""安全执行脚本"""
try:
# 验证脚本
validation_result = self.validate_script(script)
if not validation_result['success']:
return validation_result
if not validation_result['validation']['is_safe']:
return {
'success': False,
'error': 'Script failed security validation',
'issues': validation_result['validation']['issues']
}
# 设置执行超时
original_timeout = None
try:
# 获取当前超时设置
config = self.redis.config_get('lua-time-limit')
if config:
original_timeout = config.get('lua-time-limit')
# 设置脚本执行超时
self.redis.config_set('lua-time-limit', self.script_policy['max_execution_time'])
# 执行脚本
start_time = datetime.now()
result = self.redis.eval(script, len(keys or []), *(keys or []), *(args or []))
execution_time = (datetime.now() - start_time).total_seconds() * 1000
# 记录脚本执行事件
self._log_script_event('script_executed', {
'script_hash': hashlib.sha256(script.encode()).hexdigest(),
'execution_time_ms': execution_time,
'keys_count': len(keys or []),
'args_count': len(args or []),
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'result': result,
'execution_time_ms': execution_time,
'validation': validation_result['validation']
}
finally:
# 恢复原始超时设置
if original_timeout is not None:
self.redis.config_set('lua-time-limit', original_timeout)
except Exception as e:
self.logger.error(f"Failed to execute script safely: {e}")
return {
'success': False,
'error': str(e)
}
def _log_script_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""记录脚本安全事件"""
try:
event = {
'type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
import json
self.redis.lpush('redis:script_security_events', json.dumps(event))
# 只保留最近1000条事件
self.redis.ltrim('redis:script_security_events', 0, 999)
except Exception as e:
self.logger.error(f"Failed to log script security event: {e}")
12.5 数据保护
12.5.1 数据加密
class RedisDataEncryption:
def __init__(self, redis_client: redis.Redis, encryption_key: Optional[str] = None):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 初始化加密密钥
if encryption_key:
self.encryption_key = encryption_key.encode('utf-8')
else:
self.encryption_key = self._generate_encryption_key()
# 加密配置
self.encryption_config = {
'algorithm': 'AES-256-GCM',
'key_size': 32, # 256 bits
'iv_size': 16, # 128 bits
'tag_size': 16 # 128 bits
}
def _generate_encryption_key(self) -> bytes:
"""生成加密密钥"""
from cryptography.fernet import Fernet
return Fernet.generate_key()
def encrypt_data(self, data: str) -> Dict[str, Any]:
"""加密数据"""
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import base64
# 生成随机IV
iv = os.urandom(self.encryption_config['iv_size'])
# 创建加密器
cipher = Cipher(
algorithms.AES(self.encryption_key[:32]), # 确保密钥长度为32字节
modes.GCM(iv)
)
encryptor = cipher.encryptor()
# 加密数据
ciphertext = encryptor.update(data.encode('utf-8')) + encryptor.finalize()
# 获取认证标签
tag = encryptor.tag
# 组合加密结果
encrypted_data = {
'ciphertext': base64.b64encode(ciphertext).decode('utf-8'),
'iv': base64.b64encode(iv).decode('utf-8'),
'tag': base64.b64encode(tag).decode('utf-8'),
'algorithm': self.encryption_config['algorithm']
}
return {
'success': True,
'encrypted_data': encrypted_data
}
except Exception as e:
self.logger.error(f"Failed to encrypt data: {e}")
return {
'success': False,
'error': str(e)
}
def decrypt_data(self, encrypted_data: Dict[str, Any]) -> Dict[str, Any]:
"""解密数据"""
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import base64
# 解码加密数据
ciphertext = base64.b64decode(encrypted_data['ciphertext'])
iv = base64.b64decode(encrypted_data['iv'])
tag = base64.b64decode(encrypted_data['tag'])
# 创建解密器
cipher = Cipher(
algorithms.AES(self.encryption_key[:32]),
modes.GCM(iv, tag)
)
decryptor = cipher.decryptor()
# 解密数据
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
return {
'success': True,
'decrypted_data': plaintext.decode('utf-8')
}
except Exception as e:
self.logger.error(f"Failed to decrypt data: {e}")
return {
'success': False,
'error': str(e)
}
def set_encrypted(self, key: str, value: str, ex: Optional[int] = None) -> Dict[str, Any]:
"""设置加密数据"""
try:
# 加密数据
encryption_result = self.encrypt_data(value)
if not encryption_result['success']:
return encryption_result
# 存储加密数据
import json
encrypted_value = json.dumps(encryption_result['encrypted_data'])
# 添加加密标记
encrypted_key = f"encrypted:{key}"
if ex:
self.redis.setex(encrypted_key, ex, encrypted_value)
else:
self.redis.set(encrypted_key, encrypted_value)
return {
'success': True,
'key': encrypted_key,
'encrypted': True
}
except Exception as e:
self.logger.error(f"Failed to set encrypted data: {e}")
return {
'success': False,
'error': str(e)
}
def get_encrypted(self, key: str) -> Dict[str, Any]:
"""获取加密数据"""
try:
# 获取加密数据
encrypted_key = f"encrypted:{key}"
encrypted_value = self.redis.get(encrypted_key)
if encrypted_value is None:
return {
'success': False,
'error': 'Key not found'
}
# 解析加密数据
import json
encrypted_data = json.loads(encrypted_value)
# 解密数据
decryption_result = self.decrypt_data(encrypted_data)
if not decryption_result['success']:
return decryption_result
return {
'success': True,
'value': decryption_result['decrypted_data'],
'encrypted': True
}
except Exception as e:
self.logger.error(f"Failed to get encrypted data: {e}")
return {
'success': False,
'error': str(e)
}
def rotate_encryption_key(self, new_key: Optional[str] = None) -> Dict[str, Any]:
"""轮换加密密钥"""
try:
# 生成新密钥
if new_key:
new_encryption_key = new_key.encode('utf-8')
else:
new_encryption_key = self._generate_encryption_key()
# 获取所有加密的键
encrypted_keys = self.redis.keys('encrypted:*')
rotated_keys = []
failed_keys = []
for encrypted_key in encrypted_keys:
try:
# 获取并解密数据(使用旧密钥)
encrypted_value = self.redis.get(encrypted_key)
if encrypted_value:
import json
encrypted_data = json.loads(encrypted_value)
# 使用旧密钥解密
decryption_result = self.decrypt_data(encrypted_data)
if decryption_result['success']:
# 使用新密钥加密
old_key = self.encryption_key
self.encryption_key = new_encryption_key
encryption_result = self.encrypt_data(decryption_result['decrypted_data'])
if encryption_result['success']:
# 更新存储的数据
new_encrypted_value = json.dumps(encryption_result['encrypted_data'])
self.redis.set(encrypted_key, new_encrypted_value)
rotated_keys.append(encrypted_key)
else:
# 恢复旧密钥
self.encryption_key = old_key
failed_keys.append(encrypted_key)
else:
failed_keys.append(encrypted_key)
except Exception as e:
self.logger.warning(f"Failed to rotate key for {encrypted_key}: {e}")
failed_keys.append(encrypted_key)
# 记录密钥轮换事件
self._log_encryption_event('key_rotated', {
'rotated_keys_count': len(rotated_keys),
'failed_keys_count': len(failed_keys),
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'rotated_keys': rotated_keys,
'failed_keys': failed_keys,
'total_processed': len(encrypted_keys)
}
except Exception as e:
self.logger.error(f"Failed to rotate encryption key: {e}")
return {
'success': False,
'error': str(e)
}
def _log_encryption_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""记录加密事件"""
try:
event = {
'type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
import json
self.redis.lpush('redis:encryption_events', json.dumps(event))
# 只保留最近1000条事件
self.redis.ltrim('redis:encryption_events', 0, 999)
except Exception as e:
self.logger.error(f"Failed to log encryption event: {e}")
### 12.5.2 敏感数据处理
```python
class RedisSensitiveDataManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 敏感数据模式
self.sensitive_patterns = {
'credit_card': r'\b(?:\d{4}[-\s]?){3}\d{4}\b',
'ssn': r'\b\d{3}-\d{2}-\d{4}\b',
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}-\d{3}-\d{4}\b',
'ip_address': r'\b(?:\d{1,3}\.){3}\d{1,3}\b',
'password': r'(?i)password[\s]*[:=][\s]*[\S]+'
}
# 数据分类
self.data_classification = {
'public': {'retention_days': 365, 'encryption_required': False},
'internal': {'retention_days': 180, 'encryption_required': True},
'confidential': {'retention_days': 90, 'encryption_required': True},
'restricted': {'retention_days': 30, 'encryption_required': True}
}
def classify_data(self, data: str) -> Dict[str, Any]:
"""分类数据敏感性"""
try:
import re
classification_result = {
'classification': 'public',
'sensitive_patterns_found': [],
'risk_score': 0,
'recommendations': []
}
# 检查敏感模式
for pattern_name, pattern in self.sensitive_patterns.items():
matches = re.findall(pattern, data, re.IGNORECASE)
if matches:
classification_result['sensitive_patterns_found'].append({
'pattern': pattern_name,
'matches_count': len(matches),
'sample_matches': matches[:3] # 只显示前3个匹配
})
# 增加风险分数
if pattern_name in ['credit_card', 'ssn']:
classification_result['risk_score'] += 50
elif pattern_name in ['email', 'phone']:
classification_result['risk_score'] += 20
else:
classification_result['risk_score'] += 10
# 确定分类级别
if classification_result['risk_score'] >= 80:
classification_result['classification'] = 'restricted'
elif classification_result['risk_score'] >= 50:
classification_result['classification'] = 'confidential'
elif classification_result['risk_score'] >= 20:
classification_result['classification'] = 'internal'
# 生成建议
classification_result['recommendations'] = self._generate_recommendations(
classification_result
)
return {
'success': True,
'classification': classification_result
}
except Exception as e:
self.logger.error(f"Failed to classify data: {e}")
return {
'success': False,
'error': str(e)
}
def _generate_recommendations(self, classification_result: Dict[str, Any]) -> List[str]:
"""生成安全建议"""
recommendations = []
classification = classification_result['classification']
config = self.data_classification[classification]
if config['encryption_required']:
recommendations.append('Enable encryption for this data')
recommendations.append(
f"Set retention period to {config['retention_days']} days"
)
if classification_result['sensitive_patterns_found']:
recommendations.append('Consider data masking or tokenization')
recommendations.append('Implement access logging and monitoring')
if classification_result['risk_score'] >= 50:
recommendations.append('Restrict access to authorized personnel only')
recommendations.append('Enable audit logging')
return recommendations
def mask_sensitive_data(self, data: str, mask_char: str = '*') -> Dict[str, Any]:
"""掩码敏感数据"""
try:
import re
masked_data = data
masking_applied = []
# 掩码信用卡号
credit_card_pattern = r'\b(\d{4})([-\s]?\d{4}[-\s]?\d{4}[-\s]?)(\d{4})\b'
def mask_credit_card(match):
return f"{match.group(1)}{match.group(2).replace(match.group(2)[1:-1], mask_char * (len(match.group(2)) - 2))}{match.group(3)}"
if re.search(credit_card_pattern, masked_data):
masked_data = re.sub(credit_card_pattern, mask_credit_card, masked_data)
masking_applied.append('credit_card')
# 掩码SSN
ssn_pattern = r'\b(\d{3})-(\d{2})-(\d{4})\b'
def mask_ssn(match):
return f"XXX-XX-{match.group(3)}"
if re.search(ssn_pattern, masked_data):
masked_data = re.sub(ssn_pattern, mask_ssn, masked_data)
masking_applied.append('ssn')
# 掩码邮箱
email_pattern = r'\b([A-Za-z0-9._%+-]+)@([A-Za-z0-9.-]+\.[A-Z|a-z]{2,})\b'
def mask_email(match):
username = match.group(1)
domain = match.group(2)
if len(username) > 2:
masked_username = username[0] + mask_char * (len(username) - 2) + username[-1]
else:
masked_username = mask_char * len(username)
return f"{masked_username}@{domain}"
if re.search(email_pattern, masked_data):
masked_data = re.sub(email_pattern, mask_email, masked_data)
masking_applied.append('email')
return {
'success': True,
'masked_data': masked_data,
'masking_applied': masking_applied,
'original_length': len(data),
'masked_length': len(masked_data)
}
except Exception as e:
self.logger.error(f"Failed to mask sensitive data: {e}")
return {
'success': False,
'error': str(e)
}
def set_with_classification(self, key: str, value: str,
classification: Optional[str] = None) -> Dict[str, Any]:
"""根据分类设置数据"""
try:
# 自动分类数据(如果未提供分类)
if classification is None:
classify_result = self.classify_data(value)
if not classify_result['success']:
return classify_result
classification = classify_result['classification']['classification']
# 获取分类配置
if classification not in self.data_classification:
return {
'success': False,
'error': f'Unknown classification: {classification}'
}
config = self.data_classification[classification]
# 设置数据
classified_key = f"{classification}:{key}"
# 设置过期时间
retention_seconds = config['retention_days'] * 24 * 3600
self.redis.setex(classified_key, retention_seconds, value)
# 记录数据分类事件
self._log_data_event('data_classified', {
'key': classified_key,
'classification': classification,
'retention_days': config['retention_days'],
'encryption_required': config['encryption_required'],
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'key': classified_key,
'classification': classification,
'retention_days': config['retention_days'],
'expires_at': (datetime.now() + timedelta(days=config['retention_days'])).isoformat()
}
except Exception as e:
self.logger.error(f"Failed to set classified data: {e}")
return {
'success': False,
'error': str(e)
}
def audit_sensitive_data(self) -> Dict[str, Any]:
"""审计敏感数据"""
try:
audit_results = {
'total_keys': 0,
'classified_data': {},
'unclassified_data': [],
'expiring_soon': [],
'recommendations': []
}
# 获取所有键
all_keys = self.redis.keys('*')
audit_results['total_keys'] = len(all_keys)
# 分析分类数据
for classification in self.data_classification.keys():
classified_keys = self.redis.keys(f'{classification}:*')
audit_results['classified_data'][classification] = {
'count': len(classified_keys),
'keys': classified_keys[:10] # 只显示前10个键
}
# 检查即将过期的数据
for key in all_keys:
ttl = self.redis.ttl(key)
if 0 < ttl < 86400: # 24小时内过期
audit_results['expiring_soon'].append({
'key': key,
'ttl_seconds': ttl,
'expires_at': (datetime.now() + timedelta(seconds=ttl)).isoformat()
})
# 检查未分类数据
for key in all_keys:
if not any(key.startswith(f'{cls}:') for cls in self.data_classification.keys()):
# 检查是否包含敏感数据
try:
value = self.redis.get(key)
if value and isinstance(value, (str, bytes)):
if isinstance(value, bytes):
value = value.decode('utf-8', errors='ignore')
classify_result = self.classify_data(value)
if (classify_result['success'] and
classify_result['classification']['classification'] != 'public'):
audit_results['unclassified_data'].append({
'key': key,
'suggested_classification': classify_result['classification']['classification'],
'risk_score': classify_result['classification']['risk_score']
})
except Exception:
continue
# 生成建议
if audit_results['unclassified_data']:
audit_results['recommendations'].append(
f"Classify {len(audit_results['unclassified_data'])} unclassified sensitive keys"
)
if audit_results['expiring_soon']:
audit_results['recommendations'].append(
f"Review {len(audit_results['expiring_soon'])} keys expiring within 24 hours"
)
return {
'success': True,
'audit': audit_results,
'timestamp': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to audit sensitive data: {e}")
return {
'success': False,
'error': str(e)
}
def _log_data_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""记录数据保护事件"""
try:
event = {
'type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
import json
self.redis.lpush('redis:data_protection_events', json.dumps(event))
# 只保留最近1000条事件
self.redis.ltrim('redis:data_protection_events', 0, 999)
except Exception as e:
self.logger.error(f"Failed to log data protection event: {e}")
12.6 监控和审计
12.6.1 安全监控
class RedisSecurityMonitor:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 监控配置
self.monitor_config = {
'max_failed_attempts': 5,
'lockout_duration': 300, # 5分钟
'suspicious_commands': [
'FLUSHDB', 'FLUSHALL', 'CONFIG', 'EVAL',
'SCRIPT', 'DEBUG', 'SHUTDOWN'
],
'rate_limits': {
'commands_per_minute': 1000,
'connections_per_minute': 100
}
}
# 告警阈值
self.alert_thresholds = {
'failed_auth_rate': 10, # 每分钟失败认证次数
'suspicious_command_rate': 5, # 每分钟可疑命令次数
'memory_usage_percent': 90, # 内存使用率
'connection_count': 1000, # 连接数
'slow_query_threshold': 1000 # 慢查询阈值(毫秒)
}
def start_monitoring(self) -> Dict[str, Any]:
"""启动安全监控"""
try:
import threading
import time
# 启动监控线程
monitor_threads = [
threading.Thread(target=self._monitor_authentication, daemon=True),
threading.Thread(target=self._monitor_commands, daemon=True),
threading.Thread(target=self._monitor_connections, daemon=True),
threading.Thread(target=self._monitor_performance, daemon=True)
]
for thread in monitor_threads:
thread.start()
# 记录监控启动事件
self._log_security_event('monitoring_started', {
'monitor_threads': len(monitor_threads),
'timestamp': datetime.now().isoformat()
})
return {
'success': True,
'message': 'Security monitoring started',
'active_monitors': len(monitor_threads)
}
except Exception as e:
self.logger.error(f"Failed to start monitoring: {e}")
return {
'success': False,
'error': str(e)
}
def _monitor_authentication(self) -> None:
"""监控认证事件"""
try:
import time
while True:
# 检查失败的认证尝试
failed_attempts = self._get_failed_auth_attempts()
if failed_attempts > self.alert_thresholds['failed_auth_rate']:
self._trigger_alert('high_failed_auth_rate', {
'failed_attempts': failed_attempts,
'threshold': self.alert_thresholds['failed_auth_rate'],
'timestamp': datetime.now().isoformat()
})
time.sleep(60) # 每分钟检查一次
except Exception as e:
self.logger.error(f"Authentication monitoring error: {e}")
def _monitor_commands(self) -> None:
"""监控命令执行"""
try:
import time
while True:
# 检查可疑命令执行
suspicious_commands = self._get_suspicious_commands()
if len(suspicious_commands) > self.alert_thresholds['suspicious_command_rate']:
self._trigger_alert('suspicious_commands_detected', {
'commands': suspicious_commands,
'count': len(suspicious_commands),
'threshold': self.alert_thresholds['suspicious_command_rate'],
'timestamp': datetime.now().isoformat()
})
time.sleep(60) # 每分钟检查一次
except Exception as e:
self.logger.error(f"Command monitoring error: {e}")
def _monitor_connections(self) -> None:
"""监控连接状态"""
try:
import time
while True:
# 获取连接信息
client_info = self.redis.client_list()
connection_count = len(client_info)
if connection_count > self.alert_thresholds['connection_count']:
self._trigger_alert('high_connection_count', {
'connection_count': connection_count,
'threshold': self.alert_thresholds['connection_count'],
'timestamp': datetime.now().isoformat()
})
# 检查异常连接
suspicious_connections = self._analyze_connections(client_info)
if suspicious_connections:
self._trigger_alert('suspicious_connections', {
'connections': suspicious_connections,
'timestamp': datetime.now().isoformat()
})
time.sleep(30) # 每30秒检查一次
except Exception as e:
self.logger.error(f"Connection monitoring error: {e}")
def _monitor_performance(self) -> None:
"""监控性能指标"""
try:
import time
while True:
# 获取内存使用情况
info = self.redis.info('memory')
used_memory = info.get('used_memory', 0)
max_memory = info.get('maxmemory', 0)
if max_memory > 0:
memory_usage_percent = (used_memory / max_memory) * 100
if memory_usage_percent > self.alert_thresholds['memory_usage_percent']:
self._trigger_alert('high_memory_usage', {
'memory_usage_percent': memory_usage_percent,
'used_memory': used_memory,
'max_memory': max_memory,
'threshold': self.alert_thresholds['memory_usage_percent'],
'timestamp': datetime.now().isoformat()
})
# 检查慢查询
slow_queries = self._get_slow_queries()
if slow_queries:
self._trigger_alert('slow_queries_detected', {
'slow_queries': slow_queries,
'count': len(slow_queries),
'timestamp': datetime.now().isoformat()
})
time.sleep(60) # 每分钟检查一次
except Exception as e:
self.logger.error(f"Performance monitoring error: {e}")
def _get_failed_auth_attempts(self) -> int:
"""获取失败的认证尝试次数"""
try:
# 从安全事件日志中统计失败认证
events = self.redis.lrange('redis:security_events', 0, -1)
failed_attempts = 0
current_time = datetime.now()
one_minute_ago = current_time - timedelta(minutes=1)
for event_data in events:
try:
import json
event = json.loads(event_data)
event_time = datetime.fromisoformat(event['timestamp'])
if (event_time >= one_minute_ago and
event['type'] == 'auth_failed'):
failed_attempts += 1
except Exception:
continue
return failed_attempts
except Exception as e:
self.logger.error(f"Failed to get auth attempts: {e}")
return 0
def _get_suspicious_commands(self) -> List[Dict[str, Any]]:
"""获取可疑命令执行记录"""
try:
# 从命令安全事件日志中获取可疑命令
events = self.redis.lrange('redis:command_security_events', 0, -1)
suspicious_commands = []
current_time = datetime.now()
one_minute_ago = current_time - timedelta(minutes=1)
for event_data in events:
try:
import json
event = json.loads(event_data)
event_time = datetime.fromisoformat(event['timestamp'])
if (event_time >= one_minute_ago and
event['type'] == 'suspicious_command'):
suspicious_commands.append(event['data'])
except Exception:
continue
return suspicious_commands
except Exception as e:
self.logger.error(f"Failed to get suspicious commands: {e}")
return []
def _analyze_connections(self, client_info: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""分析连接信息,识别可疑连接"""
suspicious_connections = []
try:
for client in client_info:
# 检查连接时间过长的客户端
age = client.get('age', 0)
if age > 3600: # 超过1小时
suspicious_connections.append({
'type': 'long_connection',
'client_id': client.get('id'),
'addr': client.get('addr'),
'age': age,
'reason': 'Connection age exceeds 1 hour'
})
# 检查空闲时间过长的客户端
idle = client.get('idle', 0)
if idle > 1800: # 空闲超过30分钟
suspicious_connections.append({
'type': 'idle_connection',
'client_id': client.get('id'),
'addr': client.get('addr'),
'idle': idle,
'reason': 'Connection idle time exceeds 30 minutes'
})
# 检查异常的客户端名称
name = client.get('name', '')
if name and any(suspicious in name.lower() for suspicious in ['bot', 'crawler', 'scanner']):
suspicious_connections.append({
'type': 'suspicious_name',
'client_id': client.get('id'),
'addr': client.get('addr'),
'name': name,
'reason': 'Suspicious client name detected'
})
return suspicious_connections
except Exception as e:
self.logger.error(f"Failed to analyze connections: {e}")
return []
def _get_slow_queries(self) -> List[Dict[str, Any]]:
"""获取慢查询记录"""
try:
slow_queries = []
# 获取慢查询日志
slowlog = self.redis.slowlog_get(10) # 获取最近10条慢查询
for entry in slowlog:
if entry['duration'] > self.alert_thresholds['slow_query_threshold'] * 1000: # 转换为微秒
slow_queries.append({
'id': entry['id'],
'timestamp': entry['start_time'],
'duration_ms': entry['duration'] / 1000, # 转换为毫秒
'command': ' '.join(entry['command']),
'client_addr': entry.get('client_addr', 'unknown')
})
return slow_queries
except Exception as e:
self.logger.error(f"Failed to get slow queries: {e}")
return []
def _trigger_alert(self, alert_type: str, alert_data: Dict[str, Any]) -> None:
"""触发安全告警"""
try:
alert = {
'type': alert_type,
'severity': self._get_alert_severity(alert_type),
'timestamp': datetime.now().isoformat(),
'data': alert_data
}
# 记录告警
import json
self.redis.lpush('redis:security_alerts', json.dumps(alert))
# 只保留最近1000条告警
self.redis.ltrim('redis:security_alerts', 0, 999)
# 记录安全事件
self._log_security_event('alert_triggered', alert)
# 发送通知(这里可以集成邮件、短信等通知方式)
self.logger.warning(f"Security alert triggered: {alert_type} - {alert_data}")
except Exception as e:
self.logger.error(f"Failed to trigger alert: {e}")
def _get_alert_severity(self, alert_type: str) -> str:
"""获取告警严重级别"""
severity_map = {
'high_failed_auth_rate': 'high',
'suspicious_commands_detected': 'critical',
'high_connection_count': 'medium',
'suspicious_connections': 'medium',
'high_memory_usage': 'high',
'slow_queries_detected': 'low'
}
return severity_map.get(alert_type, 'medium')
def get_security_dashboard(self) -> Dict[str, Any]:
"""获取安全仪表板数据"""
try:
# 获取最近的告警
recent_alerts = self.redis.lrange('redis:security_alerts', 0, 9)
alerts = []
for alert_data in recent_alerts:
try:
import json
alert = json.loads(alert_data)
alerts.append(alert)
except Exception:
continue
# 获取系统状态
info = self.redis.info()
# 获取连接统计
client_info = self.redis.client_list()
# 计算统计信息
dashboard_data = {
'system_status': {
'uptime_seconds': info.get('uptime_in_seconds', 0),
'connected_clients': info.get('connected_clients', 0),
'used_memory_human': info.get('used_memory_human', '0B'),
'total_commands_processed': info.get('total_commands_processed', 0),
'keyspace_hits': info.get('keyspace_hits', 0),
'keyspace_misses': info.get('keyspace_misses', 0)
},
'security_metrics': {
'recent_alerts_count': len(alerts),
'critical_alerts': len([a for a in alerts if a.get('severity') == 'critical']),
'high_alerts': len([a for a in alerts if a.get('severity') == 'high']),
'active_connections': len(client_info),
'failed_auth_attempts_last_hour': self._get_failed_auth_attempts_last_hour()
},
'recent_alerts': alerts,
'timestamp': datetime.now().isoformat()
}
return {
'success': True,
'dashboard': dashboard_data
}
except Exception as e:
self.logger.error(f"Failed to get security dashboard: {e}")
return {
'success': False,
'error': str(e)
}
def _get_failed_auth_attempts_last_hour(self) -> int:
"""获取最近一小时的失败认证尝试次数"""
try:
events = self.redis.lrange('redis:security_events', 0, -1)
failed_attempts = 0
current_time = datetime.now()
one_hour_ago = current_time - timedelta(hours=1)
for event_data in events:
try:
import json
event = json.loads(event_data)
event_time = datetime.fromisoformat(event['timestamp'])
if (event_time >= one_hour_ago and
event['type'] == 'auth_failed'):
failed_attempts += 1
except Exception:
continue
return failed_attempts
except Exception as e:
self.logger.error(f"Failed to get failed auth attempts: {e}")
return 0
def _log_security_event(self, event_type: str, event_data: Dict[str, Any]) -> None:
"""记录安全事件"""
try:
event = {
'type': event_type,
'timestamp': datetime.now().isoformat(),
'data': event_data
}
import json
self.redis.lpush('redis:security_events', json.dumps(event))
# 只保留最近1000条事件
self.redis.ltrim('redis:security_events', 0, 999)
except Exception as e:
self.logger.error(f"Failed to log security event: {e}")
### 12.6.2 审计日志
```python
class RedisAuditLogger:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 审计配置
self.audit_config = {
'log_all_commands': False,
'log_sensitive_commands': True,
'log_failed_operations': True,
'log_admin_operations': True,
'retention_days': 90
}
# 需要审计的敏感命令
self.sensitive_commands = [
'AUTH', 'CONFIG', 'FLUSHDB', 'FLUSHALL',
'EVAL', 'EVALSHA', 'SCRIPT', 'DEBUG',
'SHUTDOWN', 'CLIENT', 'MONITOR'
]
def log_command_execution(self, command: str, args: List[str],
client_info: Dict[str, Any],
result: Any = None,
error: Optional[str] = None) -> None:
"""记录命令执行"""
try:
command_upper = command.upper()
# 判断是否需要记录
should_log = (
self.audit_config['log_all_commands'] or
(self.audit_config['log_sensitive_commands'] and command_upper in self.sensitive_commands) or
(self.audit_config['log_failed_operations'] and error is not None) or
(self.audit_config['log_admin_operations'] and self._is_admin_command(command_upper))
)
if not should_log:
return
# 创建审计记录
audit_record = {
'timestamp': datetime.now().isoformat(),
'command': command_upper,
'args': self._sanitize_args(command_upper, args),
'client_info': {
'addr': client_info.get('addr', 'unknown'),
'name': client_info.get('name', ''),
'user': client_info.get('user', 'default')
},
'execution_time_ms': client_info.get('execution_time', 0),
'success': error is None,
'error': error,
'result_type': type(result).__name__ if result is not None else None
}
# 存储审计记录
import json
audit_key = f"audit:{datetime.now().strftime('%Y-%m-%d')}"
self.redis.lpush(audit_key, json.dumps(audit_record))
# 设置过期时间
retention_seconds = self.audit_config['retention_days'] * 24 * 3600
self.redis.expire(audit_key, retention_seconds)
except Exception as e:
self.logger.error(f"Failed to log command execution: {e}")
def _is_admin_command(self, command: str) -> bool:
"""判断是否为管理命令"""
admin_commands = [
'CONFIG', 'CLIENT', 'DEBUG', 'MONITOR',
'SHUTDOWN', 'SAVE', 'BGSAVE', 'LASTSAVE',
'SLAVEOF', 'ROLE', 'SYNC', 'PSYNC'
]
return command in admin_commands
def _sanitize_args(self, command: str, args: List[str]) -> List[str]:
"""清理敏感参数"""
try:
sanitized_args = args.copy()
# 对AUTH命令的密码参数进行脱敏
if command == 'AUTH' and len(sanitized_args) > 0:
sanitized_args[0] = '***REDACTED***'
# 对CONFIG SET命令中的敏感配置进行脱敏
elif command == 'CONFIG' and len(sanitized_args) >= 3 and sanitized_args[0].upper() == 'SET':
sensitive_configs = ['requirepass', 'masterauth']
if sanitized_args[1].lower() in sensitive_configs:
sanitized_args[2] = '***REDACTED***'
return sanitized_args
except Exception:
return ['***ERROR_SANITIZING***']
def search_audit_logs(self, start_date: Optional[str] = None,
end_date: Optional[str] = None,
command: Optional[str] = None,
client_addr: Optional[str] = None,
user: Optional[str] = None,
failed_only: bool = False) -> Dict[str, Any]:
"""搜索审计日志"""
try:
# 确定搜索日期范围
if start_date is None:
start_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
if end_date is None:
end_date = datetime.now().strftime('%Y-%m-%d')
# 生成日期列表
search_dates = []
current_date = datetime.strptime(start_date, '%Y-%m-%d')
end_date_obj = datetime.strptime(end_date, '%Y-%m-%d')
while current_date <= end_date_obj:
search_dates.append(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
# 搜索审计记录
matching_records = []
total_records = 0
for date in search_dates:
audit_key = f"audit:{date}"
records = self.redis.lrange(audit_key, 0, -1)
for record_data in records:
try:
import json
record = json.loads(record_data)
total_records += 1
# 应用过滤条件
if command and record['command'] != command.upper():
continue
if client_addr and client_addr not in record['client_info']['addr']:
continue
if user and record['client_info']['user'] != user:
continue
if failed_only and record['success']:
continue
matching_records.append(record)
except Exception:
continue
# 按时间排序(最新的在前)
matching_records.sort(key=lambda x: x['timestamp'], reverse=True)
return {
'success': True,
'search_criteria': {
'start_date': start_date,
'end_date': end_date,
'command': command,
'client_addr': client_addr,
'user': user,
'failed_only': failed_only
},
'total_records_searched': total_records,
'matching_records_count': len(matching_records),
'records': matching_records[:100] # 限制返回前100条记录
}
except Exception as e:
self.logger.error(f"Failed to search audit logs: {e}")
return {
'success': False,
'error': str(e)
}
def generate_audit_report(self, start_date: Optional[str] = None,
end_date: Optional[str] = None) -> Dict[str, Any]:
"""生成审计报告"""
try:
# 搜索所有审计记录
search_result = self.search_audit_logs(start_date, end_date)
if not search_result['success']:
return search_result
records = search_result['records']
# 统计分析
report = {
'period': {
'start_date': search_result['search_criteria']['start_date'],
'end_date': search_result['search_criteria']['end_date']
},
'summary': {
'total_operations': len(records),
'successful_operations': len([r for r in records if r['success']]),
'failed_operations': len([r for r in records if not r['success']]),
'unique_clients': len(set(r['client_info']['addr'] for r in records)),
'unique_users': len(set(r['client_info']['user'] for r in records))
},
'command_statistics': {},
'client_statistics': {},
'user_statistics': {},
'failed_operations': [],
'suspicious_activities': []
}
# 命令统计
command_counts = {}
for record in records:
command = record['command']
if command not in command_counts:
command_counts[command] = {'total': 0, 'failed': 0}
command_counts[command]['total'] += 1
if not record['success']:
command_counts[command]['failed'] += 1
report['command_statistics'] = dict(sorted(
command_counts.items(),
key=lambda x: x[1]['total'],
reverse=True
)[:10]) # 前10个最常用命令
# 客户端统计
client_counts = {}
for record in records:
client = record['client_info']['addr']
if client not in client_counts:
client_counts[client] = {'total': 0, 'failed': 0}
client_counts[client]['total'] += 1
if not record['success']:
client_counts[client]['failed'] += 1
report['client_statistics'] = dict(sorted(
client_counts.items(),
key=lambda x: x[1]['total'],
reverse=True
)[:10]) # 前10个最活跃客户端
# 用户统计
user_counts = {}
for record in records:
user = record['client_info']['user']
if user not in user_counts:
user_counts[user] = {'total': 0, 'failed': 0}
user_counts[user]['total'] += 1
if not record['success']:
user_counts[user]['failed'] += 1
report['user_statistics'] = user_counts
# 失败操作
report['failed_operations'] = [
r for r in records if not r['success']
][:20] # 最近20个失败操作
# 可疑活动检测
report['suspicious_activities'] = self._detect_suspicious_activities(records)
return {
'success': True,
'report': report,
'generated_at': datetime.now().isoformat()
}
except Exception as e:
self.logger.error(f"Failed to generate audit report: {e}")
return {
'success': False,
'error': str(e)
}
def _detect_suspicious_activities(self, records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""检测可疑活动"""
suspicious_activities = []
try:
# 检测频繁失败的客户端
client_failures = {}
for record in records:
if not record['success']:
client = record['client_info']['addr']
if client not in client_failures:
client_failures[client] = 0
client_failures[client] += 1
for client, failure_count in client_failures.items():
if failure_count >= 10: # 失败次数超过10次
suspicious_activities.append({
'type': 'frequent_failures',
'client': client,
'failure_count': failure_count,
'description': f'Client {client} has {failure_count} failed operations'
})
# 检测异常时间的操作
for record in records:
record_time = datetime.fromisoformat(record['timestamp'])
hour = record_time.hour
# 检测深夜操作(凌晨2-6点)
if 2 <= hour <= 6 and record['command'] in self.sensitive_commands:
suspicious_activities.append({
'type': 'off_hours_sensitive_operation',
'timestamp': record['timestamp'],
'command': record['command'],
'client': record['client_info']['addr'],
'description': f'Sensitive command {record["command"]} executed during off hours'
})
# 检测大量数据删除操作
deletion_commands = ['DEL', 'UNLINK', 'FLUSHDB', 'FLUSHALL']
deletion_count = len([r for r in records if r['command'] in deletion_commands])
if deletion_count >= 50: # 删除操作超过50次
suspicious_activities.append({
'type': 'excessive_deletions',
'deletion_count': deletion_count,
'description': f'Excessive deletion operations detected: {deletion_count} operations'
})
return suspicious_activities
except Exception as e:
self.logger.error(f"Failed to detect suspicious activities: {e}")
return []
12.7 最佳实践
12.7.1 安全配置清单
class RedisSecurityChecklist:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.logger = logging.getLogger(__name__)
# 安全检查项
self.security_checks = {
'authentication': {
'password_set': 'Check if password authentication is enabled',
'strong_password': 'Verify password strength',
'acl_enabled': 'Check if ACL is configured'
},
'network': {
'bind_address': 'Verify bind address configuration',
'protected_mode': 'Check if protected mode is enabled',
'tls_enabled': 'Verify TLS/SSL encryption',
'port_security': 'Check if default port is changed'
},
'commands': {
'dangerous_commands_disabled': 'Verify dangerous commands are disabled',
'command_renaming': 'Check if sensitive commands are renamed',
'script_security': 'Verify Lua script security settings'
},
'data_protection': {
'encryption_at_rest': 'Check data encryption configuration',
'sensitive_data_handling': 'Verify sensitive data protection',
'backup_security': 'Check backup encryption and access controls'
},
'monitoring': {
'logging_enabled': 'Verify audit logging is configured',
'monitoring_setup': 'Check security monitoring configuration',
'alerting_configured': 'Verify security alerting is set up'
},
'system': {
'file_permissions': 'Check Redis file permissions',
'user_privileges': 'Verify Redis runs with minimal privileges',
'system_updates': 'Check for security updates'
}
}
def run_security_assessment(self) -> Dict[str, Any]:
"""运行完整的安全评估"""
try:
assessment_results = {
'overall_score': 0,
'max_score': 0,
'categories': {},
'recommendations': [],
'critical_issues': [],
'timestamp': datetime.now().isoformat()
}
# 执行各类安全检查
for category, checks in self.security_checks.items():
category_result = self._assess_category(category, checks)
assessment_results['categories'][category] = category_result
# 累计分数
assessment_results['overall_score'] += category_result['score']
assessment_results['max_score'] += category_result['max_score']
# 收集建议和关键问题
assessment_results['recommendations'].extend(category_result['recommendations'])
assessment_results['critical_issues'].extend(category_result['critical_issues'])
# 计算总体安全等级
if assessment_results['max_score'] > 0:
score_percentage = (assessment_results['overall_score'] / assessment_results['max_score']) * 100
assessment_results['security_level'] = self._get_security_level(score_percentage)
assessment_results['score_percentage'] = score_percentage
else:
assessment_results['security_level'] = 'unknown'
assessment_results['score_percentage'] = 0
return {
'success': True,
'assessment': assessment_results
}
except Exception as e:
self.logger.error(f"Failed to run security assessment: {e}")
return {
'success': False,
'error': str(e)
}
def _assess_category(self, category: str, checks: Dict[str, str]) -> Dict[str, Any]:
"""评估特定类别的安全性"""
category_result = {
'score': 0,
'max_score': len(checks),
'passed_checks': [],
'failed_checks': [],
'recommendations': [],
'critical_issues': []
}
# 执行具体的安全检查
for check_name, check_description in checks.items():
check_result = self._execute_security_check(category, check_name)
if check_result['passed']:
category_result['score'] += 1
category_result['passed_checks'].append({
'name': check_name,
'description': check_description
})
else:
category_result['failed_checks'].append({
'name': check_name,
'description': check_description,
'details': check_result.get('details', '')
})
# 添加建议
if check_result.get('recommendation'):
category_result['recommendations'].append(check_result['recommendation'])
# 检查是否为关键问题
if check_result.get('critical', False):
category_result['critical_issues'].append({
'check': check_name,
'description': check_description,
'details': check_result.get('details', '')
})
return category_result
def _execute_security_check(self, category: str, check_name: str) -> Dict[str, Any]:
"""执行具体的安全检查"""
try:
# 根据类别和检查名称执行相应的检查
if category == 'authentication':
return self._check_authentication(check_name)
elif category == 'network':
return self._check_network(check_name)
elif category == 'commands':
return self._check_commands(check_name)
elif category == 'data_protection':
return self._check_data_protection(check_name)
elif category == 'monitoring':
return self._check_monitoring(check_name)
elif category == 'system':
return self._check_system(check_name)
else:
return {'passed': False, 'details': 'Unknown check category'}
except Exception as e:
return {
'passed': False,
'details': f'Check execution failed: {str(e)}'
}
def _check_authentication(self, check_name: str) -> Dict[str, Any]:
"""检查认证相关配置"""
if check_name == 'password_set':
config = self.redis.config_get('requirepass')
if config and config.get('requirepass'):
return {'passed': True}
else:
return {
'passed': False,
'critical': True,
'details': 'No password authentication configured',
'recommendation': 'Set a strong password using CONFIG SET requirepass'
}
elif check_name == 'strong_password':
# 这里只能检查是否设置了密码,无法检查密码强度
config = self.redis.config_get('requirepass')
if config and config.get('requirepass'):
return {
'passed': True,
'details': 'Password is set (strength cannot be verified)'
}
else:
return {
'passed': False,
'critical': True,
'details': 'No password set',
'recommendation': 'Set a strong password with at least 12 characters'
}
elif check_name == 'acl_enabled':
try:
# 尝试获取ACL用户列表
users = self.redis.acl_list()
if len(users) > 1: # 除了default用户外还有其他用户
return {'passed': True}
else:
return {
'passed': False,
'details': 'Only default ACL user exists',
'recommendation': 'Configure ACL users with specific permissions'
}
except Exception:
return {
'passed': False,
'details': 'ACL commands not available or failed',
'recommendation': 'Upgrade to Redis 6.0+ and configure ACL'
}
return {'passed': False, 'details': 'Unknown authentication check'}
def _check_network(self, check_name: str) -> Dict[str, Any]:
"""检查网络相关配置"""
if check_name == 'bind_address':
config = self.redis.config_get('bind')
bind_addresses = config.get('bind', '').split()
if '0.0.0.0' in bind_addresses or '*' in bind_addresses:
return {
'passed': False,
'critical': True,
'details': 'Redis is bound to all interfaces',
'recommendation': 'Bind Redis to specific IP addresses only'
}
elif bind_addresses and '127.0.0.1' in bind_addresses:
return {'passed': True}
else:
return {
'passed': False,
'details': f'Bind configuration: {bind_addresses}',
'recommendation': 'Review bind address configuration'
}
elif check_name == 'protected_mode':
config = self.redis.config_get('protected-mode')
if config.get('protected-mode') == 'yes':
return {'passed': True}
else:
return {
'passed': False,
'critical': True,
'details': 'Protected mode is disabled',
'recommendation': 'Enable protected mode: CONFIG SET protected-mode yes'
}
elif check_name == 'tls_enabled':
# 检查TLS配置(这需要根据实际部署情况调整)
config = self.redis.config_get('tls-port')
if config.get('tls-port', '0') != '0':
return {'passed': True}
else:
return {
'passed': False,
'details': 'TLS is not configured',
'recommendation': 'Configure TLS encryption for secure communication'
}
elif check_name == 'port_security':
config = self.redis.config_get('port')
port = config.get('port', '6379')
if port != '6379':
return {'passed': True}
else:
return {
'passed': False,
'details': 'Using default port 6379',
'recommendation': 'Consider changing to a non-standard port'
}
return {'passed': False, 'details': 'Unknown network check'}
def _check_commands(self, check_name: str) -> Dict[str, Any]:
"""检查命令安全配置"""
if check_name == 'dangerous_commands_disabled':
# 检查危险命令是否被禁用
dangerous_commands = ['FLUSHDB', 'FLUSHALL', 'CONFIG', 'DEBUG', 'SHUTDOWN']
enabled_dangerous = []
for cmd in dangerous_commands:
try:
# 尝试执行命令来检查是否被禁用
# 这里使用一个安全的方法来检查
if cmd == 'CONFIG':
self.redis.config_get('timeout')
enabled_dangerous.append(cmd)
except Exception:
# 命令被禁用或重命名
pass
if enabled_dangerous:
return {
'passed': False,
'critical': True,
'details': f'Dangerous commands still enabled: {enabled_dangerous}',
'recommendation': 'Disable or rename dangerous commands'
}
else:
return {'passed': True}
return {'passed': True, 'details': 'Command check completed'}
def _check_data_protection(self, check_name: str) -> Dict[str, Any]:
"""检查数据保护配置"""
# 这些检查通常需要访问系统级配置
return {
'passed': True,
'details': 'Data protection check requires manual verification'
}
def _check_monitoring(self, check_name: str) -> Dict[str, Any]:
"""检查监控配置"""
if check_name == 'logging_enabled':
# 检查是否有审计日志
audit_keys = self.redis.keys('audit:*')
if audit_keys:
return {'passed': True}
else:
return {
'passed': False,
'details': 'No audit logs found',
'recommendation': 'Enable audit logging'
}
return {'passed': True, 'details': 'Monitoring check completed'}
def _check_system(self, check_name: str) -> Dict[str, Any]:
"""检查系统级配置"""
# 系统级检查通常需要操作系统权限
return {
'passed': True,
'details': 'System check requires manual verification'
}
def _get_security_level(self, score_percentage: float) -> str:
"""根据分数确定安全等级"""
if score_percentage >= 90:
return 'excellent'
elif score_percentage >= 80:
return 'good'
elif score_percentage >= 70:
return 'fair'
elif score_percentage >= 60:
return 'poor'
else:
return 'critical'
def generate_security_report(self) -> Dict[str, Any]:
"""生成安全报告"""
try:
# 运行安全评估
assessment_result = self.run_security_assessment()
if not assessment_result['success']:
return assessment_result
assessment = assessment_result['assessment']
# 生成报告
report = {
'executive_summary': {
'security_level': assessment['security_level'],
'overall_score': f"{assessment['score_percentage']:.1f}%",
'critical_issues_count': len(assessment['critical_issues']),
'recommendations_count': len(assessment['recommendations'])
},
'detailed_results': assessment['categories'],
'critical_issues': assessment['critical_issues'],
'recommendations': assessment['recommendations'],
'next_steps': self._generate_next_steps(assessment),
'generated_at': assessment['timestamp']
}
return {
'success': True,
'report': report
}
except Exception as e:
self.logger.error(f"Failed to generate security report: {e}")
return {
'success': False,
'error': str(e)
}
def _generate_next_steps(self, assessment: Dict[str, Any]) -> List[str]:
"""生成下一步行动建议"""
next_steps = []
# 根据安全等级提供建议
security_level = assessment['security_level']
if security_level == 'critical':
next_steps.extend([
'Immediately address all critical security issues',
'Implement basic authentication and access controls',
'Review and secure network configuration',
'Schedule regular security assessments'
])
elif security_level == 'poor':
next_steps.extend([
'Address critical issues within 24 hours',
'Implement recommended security configurations',
'Set up monitoring and alerting',
'Plan for comprehensive security review'
])
elif security_level == 'fair':
next_steps.extend([
'Address remaining security gaps',
'Enhance monitoring and logging',
'Review and update security policies',
'Consider advanced security features'
])
elif security_level in ['good', 'excellent']:
next_steps.extend([
'Maintain current security posture',
'Regular security assessments',
'Stay updated with security best practices',
'Consider security automation'
])
# 添加基于关键问题的具体建议
if assessment['critical_issues']:
next_steps.insert(0, 'URGENT: Address critical security vulnerabilities immediately')
return next_steps
12.7.2 安全配置模板
class RedisSecurityTemplate:
"""Redis安全配置模板"""
@staticmethod
def get_production_config() -> Dict[str, str]:
"""生产环境安全配置模板"""
return {
# 基本安全配置
'requirepass': 'your-strong-password-here',
'protected-mode': 'yes',
'bind': '127.0.0.1', # 绑定到特定IP
'port': '6380', # 使用非默认端口
# 网络安全
'tcp-keepalive': '300',
'timeout': '300',
'tcp-backlog': '511',
# 内存和性能
'maxmemory-policy': 'allkeys-lru',
'maxclients': '1000',
# 持久化安全
'save': '900 1 300 10 60 10000',
'stop-writes-on-bgsave-error': 'yes',
'rdbcompression': 'yes',
'rdbchecksum': 'yes',
# 日志配置
'loglevel': 'notice',
'syslog-enabled': 'yes',
'syslog-ident': 'redis',
# 安全限制
'rename-command FLUSHDB': 'FLUSHDB_RENAMED_' + secrets.token_hex(16),
'rename-command FLUSHALL': 'FLUSHALL_RENAMED_' + secrets.token_hex(16),
'rename-command CONFIG': 'CONFIG_RENAMED_' + secrets.token_hex(16),
'rename-command DEBUG': 'DEBUG_RENAMED_' + secrets.token_hex(16),
'rename-command SHUTDOWN': 'SHUTDOWN_RENAMED_' + secrets.token_hex(16),
'rename-command EVAL': 'EVAL_RENAMED_' + secrets.token_hex(16),
'rename-command SCRIPT': 'SCRIPT_RENAMED_' + secrets.token_hex(16)
}
@staticmethod
def get_development_config() -> Dict[str, str]:
"""开发环境安全配置模板"""
return {
# 基本配置
'requirepass': 'dev-password-123',
'protected-mode': 'yes',
'bind': '127.0.0.1',
# 开发便利性配置
'timeout': '0',
'loglevel': 'debug',
# 限制危险命令
'rename-command FLUSHALL': 'FLUSHALL_DEV',
'rename-command CONFIG': 'CONFIG_DEV'
}
@staticmethod
def get_acl_config() -> List[str]:
"""ACL配置模板"""
return [
# 管理员用户
'ACL SETUSER admin on >admin-strong-password ~* &* +@all',
# 应用用户(只能访问特定键模式)
'ACL SETUSER app on >app-password ~app:* +@read +@write -@dangerous',
# 只读用户
'ACL SETUSER readonly on >readonly-password ~* +@read -@write -@dangerous',
# 监控用户
'ACL SETUSER monitor on >monitor-password ~* +info +ping +client +config|get',
# 禁用默认用户
'ACL SETUSER default off'
]
12.8 总结
Redis安全配置是确保数据安全和系统稳定的关键环节。本章详细介绍了Redis的安全威胁、安全原则以及全面的安全配置方案。
12.8.1 核心安全要点
身份认证和授权
- 启用密码认证
- 配置ACL访问控制
- 实施最小权限原则
网络安全
- 配置TLS/SSL加密
- 限制网络访问
- 使用防火墙和VPN
命令安全
- 禁用或重命名危险命令
- 实施脚本安全策略
- 监控命令执行
数据保护
- 实施数据加密
- 敏感数据处理
- 数据分类管理
监控和审计
- 安全事件监控
- 审计日志记录
- 告警机制
12.8.2 最佳实践建议
定期安全评估
- 使用安全检查清单
- 进行渗透测试
- 更新安全配置
持续监控
- 实时安全监控
- 异常行为检测
- 自动化响应
安全培训
- 团队安全意识培训
- 最新威胁了解
- 应急响应演练
12.8.3 参考资料
下一章预告:第13章将介绍Redis的性能优化和调优技巧,包括内存优化、网络优化、持久化优化等内容。