10.1 MySQL安全基础

10.1.1 安全威胁与风险评估

class MySQLSecurityThreats:
    def __init__(self):
        self.threats = {
            "外部威胁": {
                "SQL注入攻击": {
                    "描述": "通过恶意SQL代码获取或破坏数据",
                    "风险等级": "高",
                    "影响": ["数据泄露", "数据篡改", "权限提升"],
                    "防护措施": [
                        "使用参数化查询",
                        "输入验证和过滤",
                        "最小权限原则",
                        "定期安全审计"
                    ]
                },
                "暴力破解攻击": {
                    "描述": "通过尝试大量密码组合获取访问权限",
                    "风险等级": "中",
                    "影响": ["账户被盗", "未授权访问"],
                    "防护措施": [
                        "强密码策略",
                        "账户锁定机制",
                        "连接限制",
                        "监控异常登录"
                    ]
                },
                "网络嗅探": {
                    "描述": "截获网络传输中的敏感信息",
                    "风险等级": "中",
                    "影响": ["密码泄露", "数据窃取"],
                    "防护措施": [
                        "SSL/TLS加密",
                        "VPN连接",
                        "网络隔离",
                        "加密传输"
                    ]
                }
            },
            "内部威胁": {
                "权限滥用": {
                    "描述": "内部用户超越授权范围访问数据",
                    "风险等级": "高",
                    "影响": ["数据泄露", "内部欺诈"],
                    "防护措施": [
                        "最小权限原则",
                        "权限定期审查",
                        "操作审计",
                        "职责分离"
                    ]
                },
                "误操作": {
                    "描述": "无意中的错误操作导致数据损失",
                    "风险等级": "中",
                    "影响": ["数据丢失", "服务中断"],
                    "防护措施": [
                        "操作培训",
                        "变更管理",
                        "备份策略",
                        "操作审批"
                    ]
                }
            },
            "系统威胁": {
                "配置错误": {
                    "描述": "不安全的系统配置导致安全漏洞",
                    "风险等级": "高",
                    "影响": ["系统入侵", "数据泄露"],
                    "防护措施": [
                        "安全配置基线",
                        "配置审计",
                        "自动化部署",
                        "安全扫描"
                    ]
                },
                "软件漏洞": {
                    "描述": "MySQL软件本身的安全漏洞",
                    "风险等级": "高",
                    "影响": ["系统入侵", "权限提升"],
                    "防护措施": [
                        "及时更新补丁",
                        "漏洞扫描",
                        "版本管理",
                        "安全监控"
                    ]
                }
            }
        }
    
    def show_threat_analysis(self):
        """显示威胁分析"""
        print("MySQL安全威胁分析")
        print("=" * 60)
        
        for category, threats in self.threats.items():
            print(f"\n{category}:")
            print("-" * 40)
            
            for threat_name, details in threats.items():
                print(f"\n威胁: {threat_name}")
                print(f"描述: {details['描述']}")
                print(f"风险等级: {details['风险等级']}")
                
                print("影响:")
                for impact in details['影响']:
                    print(f"  • {impact}")
                
                print("防护措施:")
                for measure in details['防护措施']:
                    print(f"  ✓ {measure}")

# 安全威胁分析演示
security_threats = MySQLSecurityThreats()
security_threats.show_threat_analysis()

10.1.2 安全配置基线

1. 基础安全配置

-- MySQL安全配置基线

-- 1. 删除匿名用户
DELETE FROM mysql.user WHERE User='';

-- 2. 删除test数据库
DROP DATABASE IF EXISTS test;
DELETE FROM mysql.db WHERE Db='test' OR Db='test\_%';

-- 3. 禁用远程root登录
DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');

-- 4. 设置root密码(如果未设置)
ALTER USER 'root'@'localhost' IDENTIFIED BY 'StrongPassword123!';

-- 5. 刷新权限
FLUSH PRIVILEGES;

-- 6. 创建专用管理用户
CREATE USER 'admin'@'localhost' IDENTIFIED BY 'AdminPassword123!';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'localhost' WITH GRANT OPTION;

-- 7. 创建应用用户(最小权限)
CREATE USER 'app_user'@'%' IDENTIFIED BY 'AppPassword123!';
GRANT SELECT, INSERT, UPDATE, DELETE ON app_database.* TO 'app_user'@'%';

-- 8. 创建只读用户
CREATE USER 'readonly'@'%' IDENTIFIED BY 'ReadOnlyPassword123!';
GRANT SELECT ON app_database.* TO 'readonly'@'%';

-- 9. 创建备份用户
CREATE USER 'backup'@'localhost' IDENTIFIED BY 'BackupPassword123!';
GRANT SELECT, LOCK TABLES, SHOW VIEW, EVENT, TRIGGER ON *.* TO 'backup'@'localhost';
GRANT RELOAD, PROCESS ON *.* TO 'backup'@'localhost';

2. my.cnf安全配置

# MySQL安全配置文件 (my.cnf)

[mysqld]
# 基础安全设置
user = mysql
port = 3306
bind-address = 127.0.0.1  # 仅本地访问,生产环境根据需要调整

# 禁用危险功能
local-infile = 0
show-database = 0
safe-user-create = 1
secure-file-priv = /var/lib/mysql-files/

# 连接安全
max_connections = 100
max_connect_errors = 10
max_user_connections = 50

# 密码验证
validate_password.policy = MEDIUM
validate_password.length = 8
validate_password.mixed_case_count = 1
validate_password.number_count = 1
validate_password.special_char_count = 1

# SSL配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/server-cert.pem
ssl-key = /etc/mysql/ssl/server-key.pem
require_secure_transport = ON

# 日志配置
general_log = 1
general_log_file = /var/log/mysql/general.log
log_error = /var/log/mysql/error.log
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
log_queries_not_using_indexes = 1

# 二进制日志
log-bin = /var/log/mysql/mysql-bin
binlog_format = ROW
expire_logs_days = 7
max_binlog_size = 100M

# 其他安全设置
skip-symbolic-links
chroot = /var/lib/mysql
old_passwords = 0
sql_mode = STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION

[mysql]
# 客户端安全设置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/client-cert.pem
ssl-key = /etc/mysql/ssl/client-key.pem

[mysqldump]
# 备份安全设置
single-transaction
routines
triggers

3. 安全配置检查脚本

#!/bin/bash
# MySQL安全配置检查脚本

echo "=== MySQL安全配置检查 ==="
echo "检查时间: $(date)"
echo

# 配置参数
MYSQL_USER="root"
MYSQL_PASS=""
CHECK_RESULT="/tmp/mysql_security_check_$(date +%Y%m%d_%H%M%S).log"

# 创建检查结果文件
exec > >(tee -a $CHECK_RESULT)
exec 2>&1

echo "安全检查结果: $CHECK_RESULT"
echo

# 1. 检查匿名用户
echo "1. 检查匿名用户:"
ANON_USERS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='';" 2>/dev/null | wc -l)
if [ $ANON_USERS -gt 1 ]; then
    echo "❌ 发现匿名用户,存在安全风险"
    mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='';"
else
    echo "✅ 未发现匿名用户"
fi

# 2. 检查test数据库
echo "\n2. 检查test数据库:"
TEST_DB=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW DATABASES LIKE 'test';" 2>/dev/null | wc -l)
if [ $TEST_DB -gt 1 ]; then
    echo "❌ 发现test数据库,建议删除"
else
    echo "✅ test数据库已删除"
fi

# 3. 检查root远程登录
echo "\n3. 检查root远程登录:"
REMOTE_ROOT=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');" 2>/dev/null | wc -l)
if [ $REMOTE_ROOT -gt 1 ]; then
    echo "❌ root用户允许远程登录,存在安全风险"
    mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');"
else
    echo "✅ root用户仅允许本地登录"
fi

# 4. 检查空密码用户
echo "\n4. 检查空密码用户:"
EMPTY_PASS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE authentication_string='';" 2>/dev/null | wc -l)
if [ $EMPTY_PASS -gt 1 ]; then
    echo "❌ 发现空密码用户,存在安全风险"
    mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE authentication_string='';"
else
    echo "✅ 未发现空密码用户"
fi

# 5. 检查SSL配置
echo "\n5. 检查SSL配置:"
SSL_STATUS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'have_ssl';" 2>/dev/null | grep YES | wc -l)
if [ $SSL_STATUS -eq 1 ]; then
    echo "✅ SSL已启用"
    mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE '%ssl%';"
else
    echo "❌ SSL未启用,建议启用SSL加密"
fi

# 6. 检查日志配置
echo "\n6. 检查日志配置:"
GENERAL_LOG=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'general_log';" 2>/dev/null | grep ON | wc -l)
SLOW_LOG=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'slow_query_log';" 2>/dev/null | grep ON | wc -l)

if [ $GENERAL_LOG -eq 1 ]; then
    echo "✅ 一般查询日志已启用"
else
    echo "⚠️  一般查询日志未启用"
fi

if [ $SLOW_LOG -eq 1 ]; then
    echo "✅ 慢查询日志已启用"
else
    echo "⚠️  慢查询日志未启用"
fi

# 7. 检查文件权限
echo "\n7. 检查文件权限:"
if [ -d "/var/lib/mysql" ]; then
    MYSQL_DIR_PERM=$(stat -c "%a" /var/lib/mysql)
    MYSQL_DIR_OWNER=$(stat -c "%U:%G" /var/lib/mysql)
    echo "MySQL数据目录权限: $MYSQL_DIR_PERM ($MYSQL_DIR_OWNER)"
    
    if [ "$MYSQL_DIR_OWNER" = "mysql:mysql" ]; then
        echo "✅ MySQL数据目录所有者正确"
    else
        echo "❌ MySQL数据目录所有者不正确"
    fi
fi

# 8. 检查网络配置
echo "\n8. 检查网络配置:"
BIND_ADDRESS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'bind_address';" 2>/dev/null | awk 'NR==2{print $2}')
PORT=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'port';" 2>/dev/null | awk 'NR==2{print $2}')

echo "绑定地址: $BIND_ADDRESS"
echo "端口: $PORT"

if [ "$BIND_ADDRESS" = "127.0.0.1" ] || [ "$BIND_ADDRESS" = "localhost" ]; then
    echo "✅ 仅绑定本地地址,安全性较高"
else
    echo "⚠️  绑定了外部地址,请确保网络安全"
fi

echo
echo "=== 安全检查完成 ==="
echo "详细结果已保存到: $CHECK_RESULT"

10.2 用户管理与权限控制

10.2.1 用户账户管理

1. 用户创建与管理

-- 用户账户管理最佳实践

-- 1. 创建用户(指定主机)
CREATE USER 'username'@'localhost' IDENTIFIED BY 'StrongPassword123!';
CREATE USER 'username'@'192.168.1.%' IDENTIFIED BY 'StrongPassword123!';
CREATE USER 'username'@'%.example.com' IDENTIFIED BY 'StrongPassword123!';

-- 2. 修改用户密码
ALTER USER 'username'@'localhost' IDENTIFIED BY 'NewStrongPassword123!';

-- 3. 设置密码过期
ALTER USER 'username'@'localhost' PASSWORD EXPIRE;
ALTER USER 'username'@'localhost' PASSWORD EXPIRE INTERVAL 90 DAY;
ALTER USER 'username'@'localhost' PASSWORD EXPIRE NEVER;

-- 4. 账户锁定/解锁
ALTER USER 'username'@'localhost' ACCOUNT LOCK;
ALTER USER 'username'@'localhost' ACCOUNT UNLOCK;

-- 5. 设置连接限制
ALTER USER 'username'@'localhost' 
WITH MAX_QUERIES_PER_HOUR 1000
     MAX_UPDATES_PER_HOUR 100
     MAX_CONNECTIONS_PER_HOUR 10
     MAX_USER_CONNECTIONS 5;

-- 6. 查看用户信息
SELECT User, Host, account_locked, password_expired, password_lifetime 
FROM mysql.user;

-- 7. 删除用户
DROP USER 'username'@'localhost';

-- 8. 重命名用户
RENAME USER 'old_username'@'localhost' TO 'new_username'@'localhost';

2. 密码策略配置

-- 密码验证组件配置

-- 1. 安装密码验证插件
INSTALL PLUGIN validate_password SONAME 'validate_password.so';

-- 2. 查看密码策略设置
SHOW VARIABLES LIKE 'validate_password%';

-- 3. 配置密码策略
SET GLOBAL validate_password.policy = 'STRONG';
SET GLOBAL validate_password.length = 12;
SET GLOBAL validate_password.mixed_case_count = 2;
SET GLOBAL validate_password.number_count = 2;
SET GLOBAL validate_password.special_char_count = 2;
SET GLOBAL validate_password.dictionary_file = '/usr/share/mysql/dictionary.txt';

-- 4. 测试密码强度
SELECT VALIDATE_PASSWORD_STRENGTH('WeakPass');
SELECT VALIDATE_PASSWORD_STRENGTH('StrongPassword123!');

-- 5. 查看密码验证要求
SHOW STATUS LIKE 'validate_password%';

3. 用户管理脚本

class MySQLUserManager:
    def __init__(self, host='localhost', user='root', password=''):
        self.host = host
        self.user = user
        self.password = password
        self.connection = None
    
    def connect(self):
        """连接到MySQL"""
        import mysql.connector
        try:
            self.connection = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password
            )
            return True
        except Exception as e:
            print(f"连接失败: {e}")
            return False
    
    def create_user(self, username, host, password, max_connections=10):
        """创建用户"""
        if not self.connection:
            return False
        
        try:
            cursor = self.connection.cursor()
            
            # 创建用户
            create_sql = f"CREATE USER '{username}'@'{host}' IDENTIFIED BY '{password}'"
            cursor.execute(create_sql)
            
            # 设置连接限制
            limit_sql = f"""
                ALTER USER '{username}'@'{host}' 
                WITH MAX_USER_CONNECTIONS {max_connections}
            """
            cursor.execute(limit_sql)
            
            self.connection.commit()
            print(f"用户 {username}@{host} 创建成功")
            return True
            
        except Exception as e:
            print(f"创建用户失败: {e}")
            return False
        finally:
            cursor.close()
    
    def grant_privileges(self, username, host, database, privileges):
        """授予权限"""
        if not self.connection:
            return False
        
        try:
            cursor = self.connection.cursor()
            
            # 构建权限字符串
            if isinstance(privileges, list):
                priv_str = ', '.join(privileges)
            else:
                priv_str = privileges
            
            grant_sql = f"GRANT {priv_str} ON {database}.* TO '{username}'@'{host}'"
            cursor.execute(grant_sql)
            
            self.connection.commit()
            print(f"权限授予成功: {username}@{host} -> {priv_str} on {database}")
            return True
            
        except Exception as e:
            print(f"授予权限失败: {e}")
            return False
        finally:
            cursor.close()
    
    def revoke_privileges(self, username, host, database, privileges):
        """撤销权限"""
        if not self.connection:
            return False
        
        try:
            cursor = self.connection.cursor()
            
            # 构建权限字符串
            if isinstance(privileges, list):
                priv_str = ', '.join(privileges)
            else:
                priv_str = privileges
            
            revoke_sql = f"REVOKE {priv_str} ON {database}.* FROM '{username}'@'{host}'"
            cursor.execute(revoke_sql)
            
            self.connection.commit()
            print(f"权限撤销成功: {username}@{host} -> {priv_str} on {database}")
            return True
            
        except Exception as e:
            print(f"撤销权限失败: {e}")
            return False
        finally:
            cursor.close()
    
    def list_users(self):
        """列出所有用户"""
        if not self.connection:
            return []
        
        try:
            cursor = self.connection.cursor()
            
            query = """
                SELECT User, Host, account_locked, password_expired, 
                       password_lifetime, max_user_connections
                FROM mysql.user 
                ORDER BY User, Host
            """
            cursor.execute(query)
            
            users = cursor.fetchall()
            
            print("MySQL用户列表:")
            print("-" * 80)
            print(f"{'用户名':<15} {'主机':<20} {'锁定':<6} {'密码过期':<8} {'密码生命周期':<12} {'最大连接':<8}")
            print("-" * 80)
            
            for user in users:
                username, host, locked, expired, lifetime, max_conn = user
                locked_str = "是" if locked == 'Y' else "否"
                expired_str = "是" if expired == 'Y' else "否"
                lifetime_str = str(lifetime) if lifetime else "无限制"
                max_conn_str = str(max_conn) if max_conn else "无限制"
                
                print(f"{username:<15} {host:<20} {locked_str:<6} {expired_str:<8} {lifetime_str:<12} {max_conn_str:<8}")
            
            return users
            
        except Exception as e:
            print(f"查询用户失败: {e}")
            return []
        finally:
            cursor.close()
    
    def show_user_privileges(self, username, host):
        """显示用户权限"""
        if not self.connection:
            return
        
        try:
            cursor = self.connection.cursor()
            
            # 查询全局权限
            global_query = f"SHOW GRANTS FOR '{username}'@'{host}'"
            cursor.execute(global_query)
            
            grants = cursor.fetchall()
            
            print(f"用户 {username}@{host} 的权限:")
            print("-" * 50)
            
            for grant in grants:
                print(grant[0])
            
        except Exception as e:
            print(f"查询权限失败: {e}")
        finally:
            cursor.close()
    
    def close(self):
        """关闭连接"""
        if self.connection:
            self.connection.close()

# 用户管理演示
user_manager = MySQLUserManager()
if user_manager.connect():
    # 创建应用用户
    user_manager.create_user('app_user', 'localhost', 'AppPassword123!', 20)
    
    # 授予权限
    user_manager.grant_privileges('app_user', 'localhost', 'myapp', 
                                 ['SELECT', 'INSERT', 'UPDATE', 'DELETE'])
    
    # 列出用户
    user_manager.list_users()
    
    # 显示用户权限
    user_manager.show_user_privileges('app_user', 'localhost')
    
    user_manager.close()

10.2.2 权限系统详解

1. MySQL权限层次

-- MySQL权限层次结构

-- 1. 全局权限(影响整个MySQL服务器)
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'localhost';
GRANT SUPER, PROCESS, RELOAD ON *.* TO 'dba'@'localhost';

-- 2. 数据库权限(影响特定数据库)
GRANT ALL PRIVILEGES ON myapp.* TO 'app_admin'@'localhost';
GRANT CREATE, DROP, ALTER ON myapp.* TO 'schema_manager'@'localhost';

-- 3. 表权限(影响特定表)
GRANT SELECT, INSERT, UPDATE ON myapp.users TO 'app_user'@'localhost';
GRANT SELECT ON myapp.products TO 'readonly_user'@'localhost';

-- 4. 列权限(影响特定列)
GRANT SELECT (id, name, email) ON myapp.users TO 'limited_user'@'localhost';
GRANT UPDATE (last_login) ON myapp.users TO 'login_updater'@'localhost';

-- 5. 存储过程权限
GRANT EXECUTE ON PROCEDURE myapp.calculate_total TO 'proc_user'@'localhost';
GRANT EXECUTE ON FUNCTION myapp.get_discount TO 'func_user'@'localhost';

2. 权限类型详解

class MySQLPrivileges:
    def __init__(self):
        self.privileges = {
            "数据权限": {
                "SELECT": {
                    "描述": "查询数据",
                    "适用对象": ["表", "列", "视图"],
                    "安全考虑": "最基础的权限,但要注意敏感数据的访问控制"
                },
                "INSERT": {
                    "描述": "插入数据",
                    "适用对象": ["表", "列"],
                    "安全考虑": "可能导致数据污染,需要输入验证"
                },
                "UPDATE": {
                    "描述": "更新数据",
                    "适用对象": ["表", "列"],
                    "安全考虑": "可能导致数据篡改,需要严格控制"
                },
                "DELETE": {
                    "描述": "删除数据",
                    "适用对象": ["表"],
                    "安全考虑": "高风险权限,可能导致数据丢失"
                }
            },
            "结构权限": {
                "CREATE": {
                    "描述": "创建数据库和表",
                    "适用对象": ["数据库", "表"],
                    "安全考虑": "可能消耗系统资源,需要限制"
                },
                "ALTER": {
                    "描述": "修改表结构",
                    "适用对象": ["表"],
                    "安全考虑": "可能影响应用程序,需要变更管理"
                },
                "DROP": {
                    "描述": "删除数据库和表",
                    "适用对象": ["数据库", "表"],
                    "安全考虑": "极高风险权限,可能导致数据丢失"
                },
                "INDEX": {
                    "描述": "创建和删除索引",
                    "适用对象": ["表"],
                    "安全考虑": "可能影响性能,需要DBA审核"
                }
            },
            "管理权限": {
                "GRANT OPTION": {
                    "描述": "授予权限给其他用户",
                    "适用对象": ["所有对象"],
                    "安全考虑": "高风险权限,可能导致权限扩散"
                },
                "SUPER": {
                    "描述": "超级用户权限",
                    "适用对象": ["全局"],
                    "安全考虑": "最高权限,仅限DBA使用"
                },
                "PROCESS": {
                    "描述": "查看所有进程",
                    "适用对象": ["全局"],
                    "安全考虑": "可能泄露敏感信息"
                },
                "RELOAD": {
                    "描述": "重新加载权限表",
                    "适用对象": ["全局"],
                    "安全考虑": "可能影响系统稳定性"
                }
            },
            "特殊权限": {
                "FILE": {
                    "描述": "读写服务器文件",
                    "适用对象": ["全局"],
                    "安全考虑": "极高风险,可能导致系统入侵"
                },
                "LOCK TABLES": {
                    "描述": "锁定表",
                    "适用对象": ["数据库"],
                    "安全考虑": "可能影响系统性能"
                },
                "EXECUTE": {
                    "描述": "执行存储过程和函数",
                    "适用对象": ["存储过程", "函数"],
                    "安全考虑": "需要审核存储过程的安全性"
                }
            }
        }
    
    def show_privilege_guide(self):
        """显示权限指南"""
        print("MySQL权限管理指南")
        print("=" * 60)
        
        for category, privs in self.privileges.items():
            print(f"\n{category}:")
            print("-" * 40)
            
            for priv_name, details in privs.items():
                print(f"\n权限: {priv_name}")
                print(f"描述: {details['描述']}")
                print(f"适用对象: {', '.join(details['适用对象'])}")
                print(f"安全考虑: {details['安全考虑']}")

# 权限指南演示
priv_guide = MySQLPrivileges()
priv_guide.show_privilege_guide()

3. 权限审计与监控

-- 权限审计查询

-- 1. 查看所有用户及其权限
SELECT 
    User, 
    Host, 
    Select_priv, 
    Insert_priv, 
    Update_priv, 
    Delete_priv,
    Create_priv, 
    Drop_priv, 
    Reload_priv, 
    Shutdown_priv,
    Process_priv, 
    File_priv, 
    Grant_priv, 
    References_priv,
    Index_priv, 
    Alter_priv, 
    Show_db_priv, 
    Super_priv,
    Create_tmp_table_priv, 
    Lock_tables_priv, 
    Execute_priv,
    Repl_slave_priv, 
    Repl_client_priv, 
    Create_view_priv,
    Show_view_priv, 
    Create_routine_priv, 
    Alter_routine_priv,
    Create_user_priv, 
    Event_priv, 
    Trigger_priv
FROM mysql.user
ORDER BY User, Host;

-- 2. 查看数据库级权限
SELECT 
    User, 
    Host, 
    Db, 
    Select_priv, 
    Insert_priv, 
    Update_priv, 
    Delete_priv,
    Create_priv, 
    Drop_priv, 
    Grant_priv, 
    References_priv,
    Index_priv, 
    Alter_priv, 
    Create_tmp_table_priv, 
    Lock_tables_priv,
    Create_view_priv, 
    Show_view_priv, 
    Create_routine_priv,
    Alter_routine_priv, 
    Execute_priv, 
    Event_priv, 
    Trigger_priv
FROM mysql.db
ORDER BY User, Host, Db;

-- 3. 查看表级权限
SELECT 
    User, 
    Host, 
    Db, 
    Table_name,
    Table_priv, 
    Column_priv
FROM mysql.tables_priv
ORDER BY User, Host, Db, Table_name;

-- 4. 查看列级权限
SELECT 
    User, 
    Host, 
    Db, 
    Table_name, 
    Column_name,
    Column_priv
FROM mysql.columns_priv
ORDER BY User, Host, Db, Table_name, Column_name;

-- 5. 查看存储过程权限
SELECT 
    User, 
    Host, 
    Db, 
    Routine_name, 
    Routine_type,
    Proc_priv
FROM mysql.procs_priv
ORDER BY User, Host, Db, Routine_name;

-- 6. 查看具有特殊权限的用户
SELECT User, Host, 'SUPER' as Privilege
FROM mysql.user 
WHERE Super_priv = 'Y'
UNION
SELECT User, Host, 'FILE' as Privilege
FROM mysql.user 
WHERE File_priv = 'Y'
UNION
SELECT User, Host, 'PROCESS' as Privilege
FROM mysql.user 
WHERE Process_priv = 'Y'
UNION
SELECT User, Host, 'RELOAD' as Privilege
FROM mysql.user 
WHERE Reload_priv = 'Y'
ORDER BY User, Host, Privilege;

-- 7. 查看具有GRANT权限的用户
SELECT User, Host, 'GLOBAL' as Level
FROM mysql.user 
WHERE Grant_priv = 'Y'
UNION
SELECT User, Host, CONCAT('DATABASE: ', Db) as Level
FROM mysql.db 
WHERE Grant_priv = 'Y'
UNION
SELECT User, Host, CONCAT('TABLE: ', Db, '.', Table_name) as Level
FROM mysql.tables_priv 
WHERE Table_priv LIKE '%Grant%'
ORDER BY User, Host, Level;

4. 权限审计脚本

#!/bin/bash
# MySQL权限审计脚本

echo "=== MySQL权限审计报告 ==="
echo "审计时间: $(date)"
echo

# 配置参数
MYSQL_USER="root"
MYSQL_PASS=""
AUDIT_RESULT="/tmp/mysql_privilege_audit_$(date +%Y%m%d_%H%M%S).log"

# 创建审计结果文件
exec > >(tee -a $AUDIT_RESULT)
exec 2>&1

echo "审计结果: $AUDIT_RESULT"
echo

# 1. 具有SUPER权限的用户
echo "1. 具有SUPER权限的用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE Super_priv = 'Y';" 2>/dev/null
echo

# 2. 具有FILE权限的用户
echo "2. 具有FILE权限的用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE File_priv = 'Y';" 2>/dev/null
echo

# 3. 具有GRANT权限的用户
echo "3. 具有GRANT权限的用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE Grant_priv = 'Y';" 2>/dev/null
echo

# 4. 空密码用户
echo "4. 空密码用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE authentication_string = '';" 2>/dev/null
echo

# 5. 远程root用户
echo "5. 远程root用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User = 'root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');" 2>/dev/null
echo

# 6. 用户连接统计
echo "6. 当前用户连接统计:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT user, host, COUNT(*) as connections FROM information_schema.processlist GROUP BY user, host ORDER BY connections DESC;" 2>/dev/null
echo

# 7. 权限变更历史(如果启用了审计日志)
echo "7. 最近的权限相关操作:"
if [ -f "/var/log/mysql/audit.log" ]; then
    grep -i "grant\|revoke\|create user\|drop user" /var/log/mysql/audit.log | tail -10
else
    echo "审计日志未启用或文件不存在"
fi

echo
echo "=== 权限审计完成 ==="
echo "详细结果已保存到: $AUDIT_RESULT"

10.3 SSL/TLS加密配置

10.3.1 SSL证书生成与配置

1. 生成SSL证书

#!/bin/bash
# MySQL SSL证书生成脚本

SSL_DIR="/etc/mysql/ssl"
CERT_DAYS=3650

# 创建SSL目录
sudo mkdir -p $SSL_DIR
cd $SSL_DIR

echo "=== 生成MySQL SSL证书 ==="

# 1. 生成CA私钥
echo "1. 生成CA私钥..."
sudo openssl genrsa 2048 > ca-key.pem

# 2. 生成CA证书
echo "2. 生成CA证书..."
sudo openssl req -new -x509 -nodes -days $CERT_DAYS -key ca-key.pem -out ca-cert.pem << EOF
CN
Beijing
Beijing
MySQL CA
MySQL CA
MySQL CA
ca@mysql.local
EOF

# 3. 生成服务器私钥
echo "3. 生成服务器私钥..."
sudo openssl req -newkey rsa:2048 -days $CERT_DAYS -nodes -keyout server-key.pem -out server-req.pem << EOF
CN
Beijing
Beijing
MySQL Server
MySQL Server
MySQL Server
server@mysql.local


EOF

# 4. 生成服务器证书
echo "4. 生成服务器证书..."
sudo openssl rsa -in server-key.pem -out server-key.pem
sudo openssl x509 -req -in server-req.pem -days $CERT_DAYS -CA ca-cert.pem -CAkey ca-key.pem -set_serial 01 -out server-cert.pem

# 5. 生成客户端私钥
echo "5. 生成客户端私钥..."
sudo openssl req -newkey rsa:2048 -days $CERT_DAYS -nodes -keyout client-key.pem -out client-req.pem << EOF
CN
Beijing
Beijing
MySQL Client
MySQL Client
MySQL Client
client@mysql.local


EOF

# 6. 生成客户端证书
echo "6. 生成客户端证书..."
sudo openssl rsa -in client-key.pem -out client-key.pem
sudo openssl x509 -req -in client-req.pem -days $CERT_DAYS -CA ca-cert.pem -CAkey ca-key.pem -set_serial 02 -out client-cert.pem

# 7. 验证证书
echo "7. 验证证书..."
sudo openssl verify -CAfile ca-cert.pem server-cert.pem client-cert.pem

# 8. 设置文件权限
echo "8. 设置文件权限..."
sudo chown mysql:mysql $SSL_DIR/*
sudo chmod 600 $SSL_DIR/*-key.pem
sudo chmod 644 $SSL_DIR/*-cert.pem $SSL_DIR/ca-cert.pem

# 9. 清理临时文件
echo "9. 清理临时文件..."
sudo rm -f $SSL_DIR/*-req.pem

echo "SSL证书生成完成!"
echo "证书位置: $SSL_DIR"
ls -la $SSL_DIR

2. MySQL SSL配置

# MySQL SSL配置 (my.cnf)

[mysqld]
# SSL基础配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/server-cert.pem
ssl-key = /etc/mysql/ssl/server-key.pem

# 强制SSL连接(可选)
require_secure_transport = ON

# SSL密码套件配置
ssl-cipher = ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384

# TLS版本配置
tls_version = TLSv1.2,TLSv1.3

[mysql]
# 客户端SSL配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/client-cert.pem
ssl-key = /etc/mysql/ssl/client-key.pem

[mysqldump]
# mysqldump SSL配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/client-cert.pem
ssl-key = /etc/mysql/ssl/client-key.pem

3. SSL连接测试

-- SSL连接状态检查

-- 1. 检查SSL是否启用
SHOW VARIABLES LIKE 'have_ssl';
SHOW VARIABLES LIKE 'ssl_%';

-- 2. 查看当前连接的SSL状态
SHOW STATUS LIKE 'Ssl_%';

-- 3. 查看SSL连接信息
SELECT 
    CONNECTION_ID() as connection_id,
    USER() as current_user,
    @@ssl_cipher as ssl_cipher,
    @@ssl_version as ssl_version;

-- 4. 强制用户使用SSL
ALTER USER 'secure_user'@'%' REQUIRE SSL;
ALTER USER 'cert_user'@'%' REQUIRE X509;
ALTER USER 'specific_cert_user'@'%' REQUIRE ISSUER '/CN=MySQL CA';

-- 5. 查看用户SSL要求
SELECT User, Host, ssl_type, ssl_cipher, x509_issuer, x509_subject
FROM mysql.user
WHERE ssl_type != '';

4. SSL连接脚本

class MySQLSSLConnection:
    def __init__(self, host='localhost', port=3306, user='root', password=''):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.ssl_config = {
            'ca': '/etc/mysql/ssl/ca-cert.pem',
            'cert': '/etc/mysql/ssl/client-cert.pem',
            'key': '/etc/mysql/ssl/client-key.pem'
        }
    
    def connect_with_ssl(self, verify_cert=True):
        """使用SSL连接MySQL"""
        import mysql.connector
        import ssl
        
        try:
            # SSL配置
            ssl_config = self.ssl_config.copy()
            if not verify_cert:
                ssl_config['verify_cert'] = False
                ssl_config['verify_identity'] = False
            
            # 建立连接
            connection = mysql.connector.connect(
                host=self.host,
                port=self.port,
                user=self.user,
                password=self.password,
                ssl_disabled=False,
                **ssl_config
            )
            
            print("SSL连接建立成功")
            return connection
            
        except Exception as e:
            print(f"SSL连接失败: {e}")
            return None
    
    def check_ssl_status(self, connection):
        """检查SSL连接状态"""
        if not connection:
            return
        
        try:
            cursor = connection.cursor()
            
            # 检查SSL状态
            queries = [
                "SHOW STATUS LIKE 'Ssl_cipher'",
                "SHOW STATUS LIKE 'Ssl_version'",
                "SHOW STATUS LIKE 'Ssl_verify_mode'",
                "SHOW STATUS LIKE 'Ssl_verify_depth'"
            ]
            
            print("SSL连接状态:")
            print("-" * 30)
            
            for query in queries:
                cursor.execute(query)
                result = cursor.fetchone()
                if result:
                    print(f"{result[0]}: {result[1]}")
            
            # 检查连接加密状态
            cursor.execute("SELECT CONNECTION_ID(), @@ssl_cipher, @@ssl_version")
            result = cursor.fetchone()
            if result:
                conn_id, cipher, version = result
                print(f"\n连接ID: {conn_id}")
                print(f"加密套件: {cipher}")
                print(f"TLS版本: {version}")
            
        except Exception as e:
            print(f"检查SSL状态失败: {e}")
        finally:
            cursor.close()
    
    def test_ssl_requirements(self, connection):
        """测试SSL要求"""
        if not connection:
            return
        
        try:
            cursor = connection.cursor()
            
            # 查看用户SSL要求
            cursor.execute("""
                SELECT User, Host, ssl_type, ssl_cipher, x509_issuer, x509_subject
                FROM mysql.user
                WHERE ssl_type != '' OR ssl_cipher != '' OR x509_issuer != '' OR x509_subject != ''
            """)
            
            results = cursor.fetchall()
            
            if results:
                print("\n用户SSL要求:")
                print("-" * 50)
                print(f"{'用户':<15} {'主机':<20} {'SSL类型':<10} {'密码套件':<15}")
                print("-" * 50)
                
                for result in results:
                    user, host, ssl_type, ssl_cipher, issuer, subject = result
                    print(f"{user:<15} {host:<20} {ssl_type:<10} {ssl_cipher:<15}")
            else:
                print("\n没有用户设置SSL要求")
            
        except Exception as e:
            print(f"检查SSL要求失败: {e}")
        finally:
            cursor.close()

# SSL连接测试
ssl_conn = MySQLSSLConnection()
connection = ssl_conn.connect_with_ssl()

if connection:
    ssl_conn.check_ssl_status(connection)
    ssl_conn.test_ssl_requirements(connection)
    connection.close()

10.6 防火墙与网络安全

10.6.1 MySQL防火墙配置

1. 企业版防火墙插件

-- MySQL企业版防火墙配置

-- 1. 安装防火墙插件
INSTALL PLUGIN mysql_firewall SONAME 'firewall.so';
INSTALL PLUGIN mysql_firewall_users SONAME 'firewall.so';
INSTALL PLUGIN mysql_firewall_whitelist SONAME 'firewall.so';

-- 2. 查看防火墙状态
SHOW PLUGINS;
SELECT PLUGIN_NAME, PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS 
WHERE PLUGIN_NAME LIKE '%firewall%';

-- 3. 启用防火墙
SET GLOBAL mysql_firewall_mode = ON;
SET GLOBAL mysql_firewall_trace = ON;

-- 4. 为用户启用防火墙
CALL mysql.sp_set_firewall_mode('app_user@localhost', 'RECORDING');

-- 5. 查看防火墙模式
SELECT * FROM mysql.firewall_users;

-- 6. 切换到保护模式
CALL mysql.sp_set_firewall_mode('app_user@localhost', 'PROTECTING');

-- 7. 查看防火墙规则
SELECT * FROM mysql.firewall_whitelist WHERE USERHOST = 'app_user@localhost';

-- 8. 手动添加防火墙规则
CALL mysql.sp_firewall_group_enlist('app_group', 'SELECT * FROM users WHERE id = ?');
CALL mysql.sp_firewall_group_enlist('app_group', 'UPDATE users SET last_login = NOW() WHERE id = ?');

-- 9. 查看被阻止的查询
SELECT * FROM performance_schema.events_statements_history 
WHERE MESSAGE_TEXT LIKE '%Firewall%';

2. 自定义SQL防火墙

import re
import logging
from datetime import datetime, timedelta
from collections import defaultdict, Counter

class MySQLCustomFirewall:
    def __init__(self):
        self.blocked_patterns = [
            # SQL注入模式
            r"(?i)union\s+select",
            r"(?i)or\s+1\s*=\s*1",
            r"(?i)and\s+1\s*=\s*1",
            r"(?i)drop\s+table",
            r"(?i)delete\s+from\s+\w+\s+where\s+1\s*=\s*1",
            r"(?i)update\s+\w+\s+set\s+.*\s+where\s+1\s*=\s*1",
            
            # 危险函数
            r"(?i)load_file\s*\(",
            r"(?i)into\s+outfile",
            r"(?i)into\s+dumpfile",
            r"(?i)system\s*\(",
            r"(?i)exec\s*\(",
            
            # 信息泄露
            r"(?i)information_schema\.(tables|columns|schemata)",
            r"(?i)show\s+(tables|databases|columns)",
            r"(?i)describe\s+\w+",
            
            # 权限提升
            r"(?i)grant\s+all",
            r"(?i)grant\s+.*\s+with\s+grant\s+option",
            r"(?i)create\s+user.*identified\s+by",
        ]
        
        self.rate_limits = {
            'queries_per_minute': 100,
            'connections_per_minute': 10,
            'failed_logins_per_minute': 5
        }
        
        self.user_activity = defaultdict(list)
        self.blocked_queries = []
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/mysql/firewall.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def is_sql_malicious(self, sql_query):
        """检查SQL查询是否恶意"""
        sql_query = sql_query.strip()
        
        # 检查恶意模式
        for pattern in self.blocked_patterns:
            if re.search(pattern, sql_query, re.IGNORECASE):
                return True, f"匹配恶意模式: {pattern}"
        
        # 检查查询长度(可能的缓冲区溢出)
        if len(sql_query) > 10000:
            return True, "查询长度过长"
        
        # 检查嵌套查询深度
        nested_count = sql_query.upper().count('SELECT')
        if nested_count > 5:
            return True, "嵌套查询过深"
        
        # 检查注释(可能隐藏恶意代码)
        comment_patterns = [r'/\*.*?\*/', r'--.*$', r'#.*$']
        for pattern in comment_patterns:
            if re.search(pattern, sql_query, re.MULTILINE):
                # 允许正常注释,但记录可疑注释
                if re.search(r'(?i)(union|select|drop|delete|update).*?(/\*|--|#)', sql_query):
                    return True, "可疑注释模式"
        
        return False, ""
    
    def check_rate_limit(self, user, host, action_type):
        """检查速率限制"""
        user_key = f"{user}@{host}"
        current_time = datetime.now()
        cutoff_time = current_time - timedelta(minutes=1)
        
        # 清理过期记录
        self.user_activity[user_key] = [
            (timestamp, action) for timestamp, action in self.user_activity[user_key]
            if timestamp > cutoff_time
        ]
        
        # 统计当前分钟内的活动
        recent_actions = [action for timestamp, action in self.user_activity[user_key]]
        action_count = recent_actions.count(action_type)
        
        # 检查限制
        limit_key = f"{action_type}_per_minute"
        if limit_key in self.rate_limits:
            if action_count >= self.rate_limits[limit_key]:
                return False, f"超过速率限制: {action_count}/{self.rate_limits[limit_key]} {action_type}/分钟"
        
        # 记录活动
        self.user_activity[user_key].append((current_time, action_type))
        return True, ""
    
    def validate_query(self, user, host, sql_query, connection_id=None):
        """验证查询"""
        validation_result = {
            'allowed': True,
            'reason': '',
            'risk_level': 'low',
            'timestamp': datetime.now()
        }
        
        # 检查SQL恶意模式
        is_malicious, malicious_reason = self.is_sql_malicious(sql_query)
        if is_malicious:
            validation_result.update({
                'allowed': False,
                'reason': f"恶意SQL检测: {malicious_reason}",
                'risk_level': 'high'
            })
            
            # 记录被阻止的查询
            self.blocked_queries.append({
                'user': user,
                'host': host,
                'sql': sql_query,
                'reason': malicious_reason,
                'timestamp': datetime.now(),
                'connection_id': connection_id
            })
            
            self.logger.warning(
                f"阻止恶意查询 - 用户: {user}@{host}, 原因: {malicious_reason}, SQL: {sql_query[:100]}..."
            )
            
            return validation_result
        
        # 检查速率限制
        rate_allowed, rate_reason = self.check_rate_limit(user, host, 'queries')
        if not rate_allowed:
            validation_result.update({
                'allowed': False,
                'reason': rate_reason,
                'risk_level': 'medium'
            })
            
            self.logger.warning(
                f"阻止查询(速率限制) - 用户: {user}@{host}, 原因: {rate_reason}"
            )
            
            return validation_result
        
        # 查询通过验证
        self.logger.info(f"允许查询 - 用户: {user}@{host}, SQL: {sql_query[:50]}...")
        return validation_result
    
    def validate_connection(self, user, host, password_hash=None):
        """验证连接"""
        validation_result = {
            'allowed': True,
            'reason': '',
            'risk_level': 'low',
            'timestamp': datetime.now()
        }
        
        # 检查连接速率限制
        rate_allowed, rate_reason = self.check_rate_limit(user, host, 'connections')
        if not rate_allowed:
            validation_result.update({
                'allowed': False,
                'reason': rate_reason,
                'risk_level': 'medium'
            })
            
            self.logger.warning(
                f"阻止连接(速率限制) - 用户: {user}@{host}, 原因: {rate_reason}"
            )
            
            return validation_result
        
        # 检查可疑主机
        suspicious_hosts = ['192.168.1.100', '10.0.0.50']  # 示例黑名单
        if host in suspicious_hosts:
            validation_result.update({
                'allowed': False,
                'reason': f"主机在黑名单中: {host}",
                'risk_level': 'high'
            })
            
            self.logger.error(f"阻止连接(黑名单主机) - 用户: {user}@{host}")
            return validation_result
        
        self.logger.info(f"允许连接 - 用户: {user}@{host}")
        return validation_result
    
    def get_firewall_statistics(self, hours=24):
        """获取防火墙统计信息"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        # 统计被阻止的查询
        recent_blocked = [
            query for query in self.blocked_queries
            if query['timestamp'] > cutoff_time
        ]
        
        # 按原因分组
        block_reasons = Counter(query['reason'] for query in recent_blocked)
        
        # 按用户分组
        blocked_users = Counter(f"{query['user']}@{query['host']}" for query in recent_blocked)
        
        # 统计用户活动
        total_queries = 0
        active_users = set()
        
        for user_key, activities in self.user_activity.items():
            recent_activities = [
                activity for timestamp, activity in activities
                if timestamp > cutoff_time
            ]
            if recent_activities:
                active_users.add(user_key)
                total_queries += len([a for a in recent_activities if a == 'queries'])
        
        return {
            'total_blocked_queries': len(recent_blocked),
            'total_queries': total_queries,
            'active_users': len(active_users),
            'block_reasons': dict(block_reasons),
            'blocked_users': dict(blocked_users),
            'block_rate': len(recent_blocked) / max(total_queries, 1) * 100
        }
    
    def generate_firewall_report(self, hours=24):
        """生成防火墙报告"""
        stats = self.get_firewall_statistics(hours)
        
        print(f"\n=== MySQL防火墙报告 ===")
        print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"统计时间范围: 最近{hours}小时")
        print("=" * 40)
        
        print(f"\n📊 总体统计:")
        print(f"  总查询数: {stats['total_queries']}")
        print(f"  被阻止查询: {stats['total_blocked_queries']}")
        print(f"  阻止率: {stats['block_rate']:.2f}%")
        print(f"  活跃用户: {stats['active_users']}")
        
        if stats['block_reasons']:
            print(f"\n🚫 阻止原因统计:")
            for reason, count in stats['block_reasons'].items():
                print(f"  {reason}: {count} 次")
        
        if stats['blocked_users']:
            print(f"\n👤 被阻止用户统计:")
            for user, count in stats['blocked_users'].items():
                print(f"  {user}: {count} 次")
        
        # 安全建议
        print(f"\n💡 安全建议:")
        if stats['block_rate'] > 10:
            print("  • 阻止率较高,可能存在攻击行为")
        if stats['total_blocked_queries'] > 100:
            print("  • 大量查询被阻止,建议检查攻击源")
        if len(stats['blocked_users']) > 5:
            print("  • 多个用户被阻止,可能需要调整防火墙规则")
        
        return stats

# 防火墙使用示例
firewall = MySQLCustomFirewall()

# 测试恶意查询
malicious_queries = [
    "SELECT * FROM users UNION SELECT * FROM admin_users",
    "SELECT * FROM products WHERE id = 1 OR 1=1",
    "DROP TABLE users; --",
    "SELECT LOAD_FILE('/etc/passwd')"
]

for query in malicious_queries:
    result = firewall.validate_query('test_user', '192.168.1.10', query)
    print(f"查询: {query[:50]}...")
    print(f"结果: {'允许' if result['allowed'] else '阻止'} - {result['reason']}")
    print()

# 生成防火墙报告
report = firewall.generate_firewall_report()

10.6.2 网络层安全配置

1. iptables防火墙规则

#!/bin/bash
# MySQL网络安全配置脚本

echo "=== 配置MySQL网络安全 ==="

# MySQL端口
MYSQL_PORT=3306

# 允许的IP地址段
ALLOWED_NETWORKS=(
    "192.168.1.0/24"    # 内网
    "10.0.0.0/8"        # VPN网络
    "172.16.0.0/12"     # 私有网络
)

# 管理员IP
ADMIN_IPS=(
    "192.168.1.100"
    "10.0.0.50"
)

# 1. 清除现有规则
echo "清除现有iptables规则..."
iptables -F
iptables -X
iptables -t nat -F
iptables -t nat -X
iptables -t mangle -F
iptables -t mangle -X

# 2. 设置默认策略
echo "设置默认策略..."
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT

# 3. 允许本地回环
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT

# 4. 允许已建立的连接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT

# 5. 允许SSH(管理员IP)
echo "配置SSH访问规则..."
for admin_ip in "${ADMIN_IPS[@]}"; do
    iptables -A INPUT -p tcp -s $admin_ip --dport 22 -j ACCEPT
    echo "  允许SSH访问: $admin_ip"
done

# 6. 配置MySQL访问规则
echo "配置MySQL访问规则..."

# 允许指定网络访问MySQL
for network in "${ALLOWED_NETWORKS[@]}"; do
    iptables -A INPUT -p tcp -s $network --dport $MYSQL_PORT -j ACCEPT
    echo "  允许MySQL访问: $network"
done

# 7. 防止DDoS攻击
echo "配置DDoS防护..."

# 限制新连接速率
iptables -A INPUT -p tcp --dport $MYSQL_PORT -m state --state NEW -m limit --limit 10/minute --limit-burst 5 -j ACCEPT
iptables -A INPUT -p tcp --dport $MYSQL_PORT -m state --state NEW -j DROP

# 防止SYN flood攻击
iptables -A INPUT -p tcp --syn -m limit --limit 1/s --limit-burst 3 -j ACCEPT
iptables -A INPUT -p tcp --syn -j DROP

# 8. 记录被拒绝的连接
echo "配置日志记录..."
iptables -A INPUT -p tcp --dport $MYSQL_PORT -j LOG --log-prefix "MySQL-REJECT: " --log-level 4
iptables -A INPUT -p tcp --dport $MYSQL_PORT -j REJECT

# 9. 保存规则
echo "保存iptables规则..."
if command -v iptables-save >/dev/null 2>&1; then
    iptables-save > /etc/iptables/rules.v4
    echo "规则已保存到 /etc/iptables/rules.v4"
fi

# 10. 显示当前规则
echo "\n当前iptables规则:"
iptables -L -n -v

echo "\n=== MySQL网络安全配置完成 ==="

2. 网络监控脚本

#!/bin/bash
# MySQL网络连接监控脚本

MYSQL_PORT=3306
LOG_FILE="/var/log/mysql/network_monitor.log"
ALERT_THRESHOLD=50

echo "=== MySQL网络监控启动 ==="
echo "监控端口: $MYSQL_PORT"
echo "日志文件: $LOG_FILE"
echo "告警阈值: $ALERT_THRESHOLD 连接/分钟"
echo

# 创建日志目录
mkdir -p $(dirname $LOG_FILE)

# 监控函数
monitor_connections() {
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
    
    # 获取当前MySQL连接
    local mysql_connections=$(netstat -an | grep ":$MYSQL_PORT" | grep ESTABLISHED | wc -l)
    
    # 获取连接来源IP统计
    local connection_ips=$(netstat -an | grep ":$MYSQL_PORT" | grep ESTABLISHED | 
                          awk '{print $5}' | cut -d: -f1 | sort | uniq -c | sort -nr)
    
    # 记录到日志
    echo "[$timestamp] MySQL连接数: $mysql_connections" >> $LOG_FILE
    
    # 检查是否超过阈值
    if [ $mysql_connections -gt $ALERT_THRESHOLD ]; then
        echo "[$timestamp] 警告: MySQL连接数过高 ($mysql_connections > $ALERT_THRESHOLD)" >> $LOG_FILE
        
        # 发送告警
        send_alert "MySQL连接数过高" "当前连接数: $mysql_connections"
    fi
    
    # 检查可疑连接
    echo "$connection_ips" | while read count ip; do
        if [ "$count" -gt 10 ]; then
            echo "[$timestamp] 可疑IP: $ip 有 $count 个连接" >> $LOG_FILE
            
            # 检查是否为已知恶意IP
            check_malicious_ip "$ip" "$count"
        fi
    done
    
    # 显示实时状态
    echo "[$timestamp] 连接监控:"
    echo "  总连接数: $mysql_connections"
    echo "  连接来源统计:"
    echo "$connection_ips" | head -5 | while read count ip; do
        echo "    $ip: $count 个连接"
    done
    echo
}

# 检查恶意IP
check_malicious_ip() {
    local ip=$1
    local count=$2
    
    # 恶意IP黑名单(示例)
    local blacklist=("192.168.1.100" "10.0.0.50")
    
    for bad_ip in "${blacklist[@]}"; do
        if [ "$ip" = "$bad_ip" ]; then
            echo "[$(date '+%Y-%m-%d %H:%M:%S')] 检测到黑名单IP: $ip" >> $LOG_FILE
            
            # 自动阻止IP
            block_ip "$ip"
            return
        fi
    done
    
    # 检查连接数是否异常
    if [ "$count" -gt 20 ]; then
        echo "[$(date '+%Y-%m-%d %H:%M:%S')] 异常连接数IP: $ip ($count 连接)" >> $LOG_FILE
        send_alert "异常连接数" "IP $ip 有 $count 个连接"
    fi
}

# 阻止IP
block_ip() {
    local ip=$1
    
    echo "阻止恶意IP: $ip"
    iptables -A INPUT -s $ip -j DROP
    
    # 记录阻止操作
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] 已阻止IP: $ip" >> $LOG_FILE
    
    # 发送告警
    send_alert "IP已被阻止" "恶意IP $ip 已被自动阻止"
}

# 发送告警
send_alert() {
    local alert_type="$1"
    local alert_message="$2"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
    
    echo "[$timestamp] 告警: $alert_type - $alert_message"
    
    # 发送邮件(需要配置邮件服务)
    if command -v mail >/dev/null 2>&1; then
        echo "时间: $timestamp\n类型: $alert_type\n详情: $alert_message" | \
        mail -s "MySQL网络安全告警: $alert_type" "admin@example.com"
    fi
    
    # 记录到系统日志
    logger "MySQL Network Alert: $alert_type - $alert_message"
}

# 清理旧日志
cleanup_logs() {
    find $(dirname $LOG_FILE) -name "*.log" -mtime +7 -delete
}

# 主监控循环
while true; do
    monitor_connections
    
    # 每小时清理一次日志
    if [ $(($(date +%M))) -eq 0 ]; then
        cleanup_logs
    fi
    
    sleep 60
done

10.7 安全最佳实践

10.7.1 安全配置检查清单

class MySQLSecurityAudit:
    def __init__(self, connection):
        self.connection = connection
        self.security_issues = []
        self.recommendations = []
    
    def check_user_security(self):
        """检查用户安全配置"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            print("\n🔐 用户安全检查:")
            print("-" * 30)
            
            # 1. 检查空密码用户
            cursor.execute("""
                SELECT User, Host FROM mysql.user 
                WHERE authentication_string = '' OR Password = ''
            """)
            
            empty_password_users = cursor.fetchall()
            if empty_password_users:
                issue = f"发现 {len(empty_password_users)} 个空密码用户"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                for user in empty_password_users:
                    print(f"     - {user['User']}@{user['Host']}")
                self.recommendations.append("为所有用户设置强密码")
            else:
                print("  ✅ 未发现空密码用户")
            
            # 2. 检查默认用户
            cursor.execute("""
                SELECT User, Host FROM mysql.user 
                WHERE User IN ('', 'root', 'test', 'guest', 'admin')
            """)
            
            default_users = cursor.fetchall()
            risky_users = [u for u in default_users if u['User'] in ['', 'test', 'guest']]
            
            if risky_users:
                issue = f"发现 {len(risky_users)} 个危险默认用户"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                for user in risky_users:
                    print(f"     - {user['User']}@{user['Host']}")
                self.recommendations.append("删除不必要的默认用户")
            else:
                print("  ✅ 未发现危险默认用户")
            
            # 3. 检查远程root用户
            cursor.execute("""
                SELECT User, Host FROM mysql.user 
                WHERE User = 'root' AND Host != 'localhost' AND Host != '127.0.0.1'
            """)
            
            remote_root_users = cursor.fetchall()
            if remote_root_users:
                issue = f"发现 {len(remote_root_users)} 个远程root用户"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                for user in remote_root_users:
                    print(f"     - {user['User']}@{user['Host']}")
                self.recommendations.append("禁用远程root登录")
            else:
                print("  ✅ 未发现远程root用户")
            
            # 4. 检查过度权限用户
            cursor.execute("""
                SELECT User, Host FROM mysql.user 
                WHERE Super_priv = 'Y' OR File_priv = 'Y' OR Process_priv = 'Y'
            """)
            
            privileged_users = cursor.fetchall()
            if len(privileged_users) > 2:  # 通常只有root和备份用户需要超级权限
                issue = f"发现 {len(privileged_users)} 个高权限用户(可能过多)"
                self.security_issues.append(issue)
                print(f"  ⚠️  {issue}")
                self.recommendations.append("审查用户权限,遵循最小权限原则")
            else:
                print(f"  ✅ 高权限用户数量合理 ({len(privileged_users)} 个)")
            
        except Exception as e:
            print(f"用户安全检查失败: {e}")
        finally:
            cursor.close()
    
    def check_configuration_security(self):
        """检查配置安全"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            print("\n⚙️  配置安全检查:")
            print("-" * 30)
            
            # 1. 检查SSL配置
            cursor.execute("SHOW VARIABLES LIKE 'have_ssl'")
            ssl_status = cursor.fetchone()
            
            if ssl_status and ssl_status['Value'] == 'YES':
                print("  ✅ SSL已启用")
            else:
                issue = "SSL未启用"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                self.recommendations.append("启用SSL加密连接")
            
            # 2. 检查日志配置
            cursor.execute("SHOW VARIABLES LIKE 'general_log'")
            general_log = cursor.fetchone()
            
            cursor.execute("SHOW VARIABLES LIKE 'log_bin'")
            binary_log = cursor.fetchone()
            
            if general_log and general_log['Value'] == 'ON':
                print("  ✅ 通用日志已启用")
            else:
                print("  ⚠️  通用日志未启用")
                self.recommendations.append("考虑启用通用日志以便审计")
            
            if binary_log and binary_log['Value'] == 'ON':
                print("  ✅ 二进制日志已启用")
            else:
                print("  ⚠️  二进制日志未启用")
                self.recommendations.append("启用二进制日志以便数据恢复")
            
            # 3. 检查网络配置
            cursor.execute("SHOW VARIABLES LIKE 'bind_address'")
            bind_address = cursor.fetchone()
            
            if bind_address and bind_address['Value'] not in ['0.0.0.0', '*']:
                print(f"  ✅ 绑定地址配置安全: {bind_address['Value']}")
            else:
                issue = "MySQL绑定到所有网络接口"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                self.recommendations.append("配置bind_address限制网络访问")
            
            # 4. 检查密码验证插件
            cursor.execute("SHOW VARIABLES LIKE 'validate_password%'")
            password_validation = cursor.fetchall()
            
            if password_validation:
                print("  ✅ 密码验证插件已配置")
                for var in password_validation:
                    if var['Variable_name'] == 'validate_password.policy':
                        print(f"     策略: {var['Value']}")
            else:
                issue = "密码验证插件未启用"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                self.recommendations.append("启用密码验证插件强化密码策略")
            
        except Exception as e:
            print(f"配置安全检查失败: {e}")
        finally:
            cursor.close()
    
    def check_database_security(self):
        """检查数据库安全"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            print("\n🗄️  数据库安全检查:")
            print("-" * 30)
            
            # 1. 检查test数据库
            cursor.execute("SHOW DATABASES LIKE 'test'")
            test_db = cursor.fetchone()
            
            if test_db:
                issue = "test数据库仍然存在"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                self.recommendations.append("删除test数据库")
            else:
                print("  ✅ test数据库已删除")
            
            # 2. 检查数据库权限
            cursor.execute("""
                SELECT DISTINCT User, Host, Db FROM mysql.db 
                WHERE Db = 'mysql' AND User != 'root'
            """)
            
            mysql_db_access = cursor.fetchall()
            if mysql_db_access:
                issue = f"发现 {len(mysql_db_access)} 个非root用户可访问mysql数据库"
                self.security_issues.append(issue)
                print(f"  ❌ {issue}")
                for access in mysql_db_access:
                    print(f"     - {access['User']}@{access['Host']}")
                self.recommendations.append("限制对mysql系统数据库的访问")
            else:
                print("  ✅ mysql数据库访问权限配置安全")
            
            # 3. 检查表加密状态
            cursor.execute("""
                SELECT TABLE_SCHEMA, TABLE_NAME, CREATE_OPTIONS 
                FROM information_schema.TABLES 
                WHERE TABLE_TYPE = 'BASE TABLE' 
                AND TABLE_SCHEMA NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
                LIMIT 10
            """)
            
            tables = cursor.fetchall()
            encrypted_tables = [t for t in tables if 'ENCRYPTION' in str(t.get('CREATE_OPTIONS', ''))]
            
            if encrypted_tables:
                print(f"  ✅ 发现 {len(encrypted_tables)} 个加密表")
            else:
                print("  ⚠️  未发现加密表")
                self.recommendations.append("考虑对敏感数据表启用加密")
            
        except Exception as e:
            print(f"数据库安全检查失败: {e}")
        finally:
            cursor.close()
    
    def generate_security_report(self):
        """生成安全报告"""
        print("\n=== MySQL安全审计报告 ===")
        print(f"审计时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 50)
        
        # 执行所有检查
        self.check_user_security()
        self.check_configuration_security()
        self.check_database_security()
        
        # 汇总结果
        print("\n📋 安全问题汇总:")
        print("-" * 30)
        
        if self.security_issues:
            print(f"发现 {len(self.security_issues)} 个安全问题:")
            for i, issue in enumerate(self.security_issues, 1):
                print(f"  {i}. {issue}")
        else:
            print("✅ 未发现严重安全问题")
        
        print("\n💡 安全建议:")
        print("-" * 30)
        
        if self.recommendations:
            for i, rec in enumerate(self.recommendations, 1):
                print(f"  {i}. {rec}")
        else:
            print("✅ 当前配置符合安全最佳实践")
        
        # 安全评分
        total_checks = 10  # 总检查项数
        security_score = max(0, (total_checks - len(self.security_issues)) / total_checks * 100)
        
        print(f"\n🏆 安全评分: {security_score:.1f}/100")
        
        if security_score >= 90:
            print("安全等级: 优秀 ✅")
        elif security_score >= 70:
            print("安全等级: 良好 ⚠️")
        elif security_score >= 50:
            print("安全等级: 一般 ⚠️")
        else:
            print("安全等级: 危险 ❌")
        
        return {
            'security_score': security_score,
            'issues_count': len(self.security_issues),
            'recommendations_count': len(self.recommendations),
            'issues': self.security_issues,
            'recommendations': self.recommendations
        }

# 安全审计使用示例
import mysql.connector

connection = mysql.connector.connect(
    host='localhost',
    user='root',
    password='password'
)

audit = MySQLSecurityAudit(connection)
report = audit.generate_security_report()

connection.close()

10.7.2 安全维护计划

class MySQLSecurityMaintenance:
    def __init__(self):
        self.maintenance_tasks = {
            'daily': [
                '检查审计日志异常',
                '监控失败登录尝试',
                '检查系统资源使用',
                '验证备份完整性'
            ],
            'weekly': [
                '更新密码策略',
                '审查用户权限',
                '检查SSL证书有效期',
                '分析安全日志趋势',
                '测试灾难恢复流程'
            ],
            'monthly': [
                '全面安全审计',
                '更新安全补丁',
                '权限清理和优化',
                '安全配置基线检查',
                '渗透测试评估'
            ],
            'quarterly': [
                '安全策略评估',
                '员工安全培训',
                '第三方安全评估',
                '灾难恢复演练',
                '合规性检查'
            ]
        }
    
    def generate_maintenance_schedule(self):
        """生成维护计划"""
        print("\n=== MySQL安全维护计划 ===")
        print(f"制定时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 50)
        
        for frequency, tasks in self.maintenance_tasks.items():
            print(f"\n📅 {frequency.upper()} 维护任务:")
            print("-" * 30)
            
            for i, task in enumerate(tasks, 1):
                print(f"  {i}. {task}")
        
        print("\n💡 维护建议:")
        print("-" * 30)
        print("  • 建立自动化监控和告警机制")
        print("  • 定期备份和测试恢复流程")
        print("  • 保持MySQL版本更新")
        print("  • 建立安全事件响应流程")
        print("  • 定期进行安全培训")
        
        return self.maintenance_tasks

# 维护计划示例
maintenance = MySQLSecurityMaintenance()
schedule = maintenance.generate_maintenance_schedule()

10.8 总结

本章详细介绍了MySQL的安全与权限管理,涵盖了以下核心内容:

10.8.1 核心要点回顾

  1. 安全基础

    • 安全威胁识别与风险评估
    • 安全配置基线建立
    • 系统安全加固措施
  2. 用户管理与权限控制

    • 用户账户生命周期管理
    • 细粒度权限分配
    • 权限审计与监控
  3. SSL/TLS加密

    • 传输层安全配置
    • 证书管理与轮换
    • 强制加密连接
  4. 数据加密

    • 透明数据加密(TDE)
    • 应用层字段加密
    • 密钥管理策略
  5. 审计日志与监控

    • 审计日志配置与分析
    • 实时安全监控
    • 异常行为检测
  6. 防火墙与网络安全

    • MySQL防火墙配置
    • 网络层访问控制
    • DDoS防护措施
  7. 安全最佳实践

    • 安全配置检查清单
    • 定期安全审计
    • 维护计划制定

10.8.2 实施建议

  1. 分阶段实施

    • 第一阶段:基础安全配置
    • 第二阶段:权限管理优化
    • 第三阶段:加密和审计
    • 第四阶段:高级防护措施
  2. 持续改进

    • 定期安全评估
    • 及时更新安全策略
    • 跟踪安全威胁趋势
    • 优化安全配置
  3. 团队协作

    • 建立安全责任制
    • 定期安全培训
    • 制定应急响应流程
    • 加强安全意识

10.8.3 下一步学习方向

掌握了MySQL安全与权限管理后,建议继续学习:

  • 第11章:MySQL备份与恢复 - 学习数据保护和灾难恢复
  • 第12章:MySQL高可用架构 - 了解集群和复制技术
  • 第13章:MySQL运维自动化 - 掌握自动化运维工具
  • 第14章:MySQL故障排除 - 学习问题诊断和解决

通过本章的学习,您应该能够: - 建立完整的MySQL安全体系 - 实施有效的权限管理策略 - 配置数据加密和传输安全 - 建立安全监控和审计机制 - 制定安全维护和应急响应计划

安全是一个持续的过程,需要不断学习新的威胁和防护技术,保持对安全最佳实践的关注和应用。

10.3.2 传输加密最佳实践

1. 加密配置优化

-- 传输加密配置优化

-- 1. 设置强制SSL连接
SET GLOBAL require_secure_transport = ON;

-- 2. 配置TLS版本
SET GLOBAL tls_version = 'TLSv1.2,TLSv1.3';

-- 3. 查看支持的密码套件
SHOW STATUS LIKE 'Ssl_cipher_list';

-- 4. 为不同用户设置不同的SSL要求

-- 普通SSL连接
CREATE USER 'ssl_user'@'%' IDENTIFIED BY 'password' REQUIRE SSL;

-- 需要客户端证书
CREATE USER 'cert_user'@'%' IDENTIFIED BY 'password' REQUIRE X509;

-- 指定证书颁发者
CREATE USER 'issuer_user'@'%' IDENTIFIED BY 'password' 
REQUIRE ISSUER '/C=CN/ST=Beijing/L=Beijing/O=MySQL CA/CN=MySQL CA';

-- 指定证书主题
CREATE USER 'subject_user'@'%' IDENTIFIED BY 'password' 
REQUIRE SUBJECT '/C=CN/ST=Beijing/L=Beijing/O=MySQL Client/CN=MySQL Client';

-- 指定密码套件
CREATE USER 'cipher_user'@'%' IDENTIFIED BY 'password' 
REQUIRE CIPHER 'ECDHE-RSA-AES256-GCM-SHA384';

-- 组合要求
CREATE USER 'secure_user'@'%' IDENTIFIED BY 'password' 
REQUIRE SSL AND X509 AND CIPHER 'ECDHE-RSA-AES256-GCM-SHA384';

2. SSL监控脚本

#!/bin/bash
# MySQL SSL连接监控脚本

echo "=== MySQL SSL连接监控 ==="
echo "监控时间: $(date)"
echo

# 配置参数
MYSQL_USER="root"
MYSQL_PASS=""
MONITOR_RESULT="/tmp/mysql_ssl_monitor_$(date +%Y%m%d_%H%M%S).log"

# 创建监控结果文件
exec > >(tee -a $MONITOR_RESULT)
exec 2>&1

echo "监控结果: $MONITOR_RESULT"
echo

# 1. SSL状态检查
echo "1. SSL状态检查:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'have_ssl';" 2>/dev/null
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'ssl_%';" 2>/dev/null
echo

# 2. 当前SSL连接统计
echo "2. 当前SSL连接统计:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW STATUS LIKE 'Ssl_%';" 2>/dev/null
echo

# 3. 活跃连接的SSL状态
echo "3. 活跃连接的SSL状态:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "
SELECT 
    ID,
    USER,
    HOST,
    DB,
    COMMAND,
    TIME,
    STATE,
    CASE 
        WHEN CONNECTION_TYPE = 'SSL/TLS' THEN 'SSL'
        ELSE 'Non-SSL'
    END AS CONNECTION_TYPE
FROM information_schema.PROCESSLIST
WHERE USER != 'system user'
ORDER BY CONNECTION_TYPE, TIME DESC;
" 2>/dev/null
echo

# 4. SSL证书信息
echo "4. SSL证书信息:"
if [ -f "/etc/mysql/ssl/server-cert.pem" ]; then
    echo "服务器证书信息:"
    openssl x509 -in /etc/mysql/ssl/server-cert.pem -text -noout | grep -E "Subject:|Issuer:|Not Before:|Not After:"
    echo
    
    # 检查证书过期时间
    CERT_EXPIRY=$(openssl x509 -in /etc/mysql/ssl/server-cert.pem -enddate -noout | cut -d= -f2)
    EXPIRY_TIMESTAMP=$(date -d "$CERT_EXPIRY" +%s)
    CURRENT_TIMESTAMP=$(date +%s)
    DAYS_TO_EXPIRY=$(( (EXPIRY_TIMESTAMP - CURRENT_TIMESTAMP) / 86400 ))
    
    echo "证书过期检查:"
    echo "过期时间: $CERT_EXPIRY"
    echo "剩余天数: $DAYS_TO_EXPIRY 天"
    
    if [ $DAYS_TO_EXPIRY -lt 30 ]; then
        echo "⚠️  证书将在30天内过期,请及时更新!"
    elif [ $DAYS_TO_EXPIRY -lt 7 ]; then
        echo "❌ 证书将在7天内过期,请立即更新!"
    else
        echo "✅ 证书有效期充足"
    fi
else
    echo "❌ 服务器证书文件不存在"
fi
echo

# 5. 非SSL连接警告
echo "5. 非SSL连接检查:"
NON_SSL_CONNECTIONS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "
SELECT COUNT(*) as non_ssl_count
FROM information_schema.PROCESSLIST
WHERE USER != 'system user' AND CONNECTION_TYPE != 'SSL/TLS';
" 2>/dev/null | tail -1)

if [ "$NON_SSL_CONNECTIONS" -gt 0 ]; then
    echo "⚠️  发现 $NON_SSL_CONNECTIONS 个非SSL连接"
    mysql -u$MYSQL_USER -p$MYSQL_PASS -e "
    SELECT ID, USER, HOST, DB, CONNECTION_TYPE
    FROM information_schema.PROCESSLIST
    WHERE USER != 'system user' AND CONNECTION_TYPE != 'SSL/TLS';
    " 2>/dev/null
else
    echo "✅ 所有连接都使用SSL加密"
fi

echo
echo "=== SSL监控完成 ==="
echo "详细结果已保存到: $MONITOR_RESULT"

10.4 数据加密

10.4.1 透明数据加密(TDE)

1. InnoDB表空间加密

-- InnoDB表空间加密配置

-- 1. 启用加密插件
INSTALL PLUGIN keyring_file SONAME 'keyring_file.so';

-- 2. 查看加密状态
SHOW PLUGINS;
SELECT PLUGIN_NAME, PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS 
WHERE PLUGIN_NAME LIKE 'keyring%';

-- 3. 创建加密表
CREATE TABLE encrypted_users (
    id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    password_hash VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB ENCRYPTION='Y';

-- 4. 修改现有表启用加密
ALTER TABLE users ENCRYPTION='Y';

-- 5. 禁用表加密
ALTER TABLE users ENCRYPTION='N';

-- 6. 查看表加密状态
SELECT 
    TABLE_SCHEMA,
    TABLE_NAME,
    CREATE_OPTIONS
FROM INFORMATION_SCHEMA.TABLES 
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';

-- 7. 设置默认加密
SET GLOBAL default_table_encryption = ON;

-- 8. 查看加密相关变量
SHOW VARIABLES LIKE '%encrypt%';
SHOW VARIABLES LIKE '%keyring%';

2. 二进制日志加密

-- 二进制日志加密配置

-- 1. 启用二进制日志加密
SET GLOBAL binlog_encryption = ON;

-- 2. 查看二进制日志加密状态
SHOW VARIABLES LIKE 'binlog_encryption';

-- 3. 轮换二进制日志加密密钥
ALTER INSTANCE ROTATE BINLOG MASTER KEY;

-- 4. 查看加密密钥信息
SELECT * FROM performance_schema.keyring_keys;

3. 加密配置文件

# MySQL加密配置 (my.cnf)

[mysqld]
# Keyring插件配置
early-plugin-load = keyring_file.so
keyring_file_data = /var/lib/mysql-keyring/keyring

# 表空间加密
default_table_encryption = ON
innodb_redo_log_encrypt = ON
innodb_undo_log_encrypt = ON

# 二进制日志加密
binlog_encryption = ON
binlog_rotate_encryption_master_key_at_startup = ON

# 临时文件加密
big_tables = 1
tmp_table_size = 32M
max_heap_table_size = 32M

4. 加密管理脚本

class MySQLEncryptionManager:
    def __init__(self, host='localhost', user='root', password=''):
        self.host = host
        self.user = user
        self.password = password
        self.connection = None
    
    def connect(self):
        """连接到MySQL"""
        import mysql.connector
        try:
            self.connection = mysql.connector.connect(
                host=self.host,
                user=self.user,
                password=self.password
            )
            return True
        except Exception as e:
            print(f"连接失败: {e}")
            return False
    
    def check_encryption_support(self):
        """检查加密支持"""
        if not self.connection:
            return False
        
        try:
            cursor = self.connection.cursor()
            
            print("加密支持检查:")
            print("-" * 40)
            
            # 检查keyring插件
            cursor.execute("""
                SELECT PLUGIN_NAME, PLUGIN_STATUS 
                FROM INFORMATION_SCHEMA.PLUGINS 
                WHERE PLUGIN_NAME LIKE 'keyring%'
            """)
            
            plugins = cursor.fetchall()
            if plugins:
                print("Keyring插件状态:")
                for plugin_name, status in plugins:
                    print(f"  {plugin_name}: {status}")
            else:
                print("❌ 未安装keyring插件")
            
            # 检查加密变量
            encryption_vars = [
                'default_table_encryption',
                'binlog_encryption',
                'innodb_redo_log_encrypt',
                'innodb_undo_log_encrypt'
            ]
            
            print("\n加密配置状态:")
            for var in encryption_vars:
                cursor.execute(f"SHOW VARIABLES LIKE '{var}'")
                result = cursor.fetchone()
                if result:
                    print(f"  {result[0]}: {result[1]}")
            
            return True
            
        except Exception as e:
            print(f"检查加密支持失败: {e}")
            return False
        finally:
            cursor.close()
    
    def list_encrypted_tables(self):
        """列出加密表"""
        if not self.connection:
            return []
        
        try:
            cursor = self.connection.cursor()
            
            cursor.execute("""
                SELECT 
                    TABLE_SCHEMA,
                    TABLE_NAME,
                    CREATE_OPTIONS
                FROM INFORMATION_SCHEMA.TABLES 
                WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%'
                ORDER BY TABLE_SCHEMA, TABLE_NAME
            """)
            
            tables = cursor.fetchall()
            
            if tables:
                print("加密表列表:")
                print("-" * 60)
                print(f"{'数据库':<20} {'表名':<25} {'加密选项':<15}")
                print("-" * 60)
                
                for schema, table, options in tables:
                    print(f"{schema:<20} {table:<25} {options:<15}")
            else:
                print("未找到加密表")
            
            return tables
            
        except Exception as e:
            print(f"查询加密表失败: {e}")
            return []
        finally:
            cursor.close()
    
    def encrypt_table(self, database, table):
        """加密表"""
        if not self.connection:
            return False
        
        try:
            cursor = self.connection.cursor()
            
            # 检查表是否存在
            cursor.execute(f"USE {database}")
            cursor.execute(f"SHOW TABLES LIKE '{table}'")
            
            if not cursor.fetchone():
                print(f"表 {database}.{table} 不存在")
                return False
            
            # 启用表加密
            alter_sql = f"ALTER TABLE {database}.{table} ENCRYPTION='Y'"
            cursor.execute(alter_sql)
            
            self.connection.commit()
            print(f"表 {database}.{table} 加密成功")
            return True
            
        except Exception as e:
            print(f"加密表失败: {e}")
            return False
        finally:
            cursor.close()
    
    def decrypt_table(self, database, table):
        """解密表"""
        if not self.connection:
            return False
        
        try:
            cursor = self.connection.cursor()
            
            # 禁用表加密
            alter_sql = f"ALTER TABLE {database}.{table} ENCRYPTION='N'"
            cursor.execute(alter_sql)
            
            self.connection.commit()
            print(f"表 {database}.{table} 解密成功")
            return True
            
        except Exception as e:
            print(f"解密表失败: {e}")
            return False
        finally:
            cursor.close()
    
    def rotate_master_key(self):
        """轮换主密钥"""
        if not self.connection:
            return False
        
        try:
            cursor = self.connection.cursor()
            
            # 轮换二进制日志主密钥
            cursor.execute("ALTER INSTANCE ROTATE BINLOG MASTER KEY")
            
            # 轮换InnoDB主密钥
            cursor.execute("ALTER INSTANCE ROTATE INNODB MASTER KEY")
            
            self.connection.commit()
            print("主密钥轮换成功")
            return True
            
        except Exception as e:
            print(f"轮换主密钥失败: {e}")
            return False
        finally:
            cursor.close()
    
    def show_keyring_keys(self):
        """显示密钥环密钥"""
        if not self.connection:
            return
        
        try:
            cursor = self.connection.cursor()
            
            cursor.execute("SELECT * FROM performance_schema.keyring_keys")
            keys = cursor.fetchall()
            
            if keys:
                print("密钥环密钥:")
                print("-" * 50)
                print(f"{'密钥ID':<30} {'密钥所有者':<20}")
                print("-" * 50)
                
                for key_id, key_owner in keys:
                    print(f"{key_id:<30} {key_owner:<20}")
            else:
                print("密钥环中没有密钥")
            
        except Exception as e:
            print(f"查询密钥失败: {e}")
        finally:
            cursor.close()
    
    def close(self):
        """关闭连接"""
        if self.connection:
            self.connection.close()

# 加密管理演示
enc_manager = MySQLEncryptionManager()
if enc_manager.connect():
    enc_manager.check_encryption_support()
    enc_manager.list_encrypted_tables()
    enc_manager.show_keyring_keys()
    enc_manager.close()

10.4.2 应用层数据加密

1. 字段级加密

-- 字段级加密函数

-- 1. 使用AES加密函数
SELECT AES_ENCRYPT('sensitive_data', 'encryption_key') as encrypted_data;
SELECT AES_DECRYPT(encrypted_column, 'encryption_key') as decrypted_data FROM table_name;

-- 2. 创建带加密字段的表
CREATE TABLE user_profiles (
    id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    -- 加密存储敏感信息
    encrypted_ssn VARBINARY(255),
    encrypted_phone VARBINARY(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    INDEX idx_username (username)
);

-- 3. 插入加密数据
INSERT INTO user_profiles (username, email, encrypted_ssn, encrypted_phone)
VALUES (
    'john_doe',
    'john@example.com',
    AES_ENCRYPT('123-45-6789', SHA2('encryption_key_ssn', 256)),
    AES_ENCRYPT('555-1234', SHA2('encryption_key_phone', 256))
);

-- 4. 查询解密数据
SELECT 
    id,
    username,
    email,
    AES_DECRYPT(encrypted_ssn, SHA2('encryption_key_ssn', 256)) as ssn,
    AES_DECRYPT(encrypted_phone, SHA2('encryption_key_phone', 256)) as phone
FROM user_profiles
WHERE username = 'john_doe';

-- 5. 创建加密视图
CREATE VIEW user_profiles_decrypted AS
SELECT 
    id,
    username,
    email,
    CAST(AES_DECRYPT(encrypted_ssn, SHA2('encryption_key_ssn', 256)) AS CHAR) as ssn,
    CAST(AES_DECRYPT(encrypted_phone, SHA2('encryption_key_phone', 256)) AS CHAR) as phone,
    created_at
FROM user_profiles;

2. 加密函数封装

class MySQLFieldEncryption:
    def __init__(self, connection, master_key):
        self.connection = connection
        self.master_key = master_key
    
    def generate_field_key(self, field_name):
        """为特定字段生成加密密钥"""
        import hashlib
        combined = f"{self.master_key}_{field_name}"
        return hashlib.sha256(combined.encode()).hexdigest()
    
    def encrypt_field(self, data, field_name):
        """加密字段数据"""
        if not data:
            return None
        
        try:
            cursor = self.connection.cursor()
            field_key = self.generate_field_key(field_name)
            
            # 使用MySQL的AES_ENCRYPT函数
            cursor.execute(
                "SELECT AES_ENCRYPT(%s, %s) as encrypted_data",
                (data, field_key)
            )
            
            result = cursor.fetchone()
            return result[0] if result else None
            
        except Exception as e:
            print(f"加密失败: {e}")
            return None
        finally:
            cursor.close()
    
    def decrypt_field(self, encrypted_data, field_name):
        """解密字段数据"""
        if not encrypted_data:
            return None
        
        try:
            cursor = self.connection.cursor()
            field_key = self.generate_field_key(field_name)
            
            # 使用MySQL的AES_DECRYPT函数
            cursor.execute(
                "SELECT AES_DECRYPT(%s, %s) as decrypted_data",
                (encrypted_data, field_key)
            )
            
            result = cursor.fetchone()
            if result and result[0]:
                return result[0].decode('utf-8')
            return None
            
        except Exception as e:
            print(f"解密失败: {e}")
            return None
        finally:
            cursor.close()
    
    def create_encrypted_table(self, table_name, schema):
        """创建带加密字段的表"""
        try:
            cursor = self.connection.cursor()
            
            # 构建CREATE TABLE语句
            columns = []
            for col_name, col_def in schema.items():
                if col_def.get('encrypted', False):
                    # 加密字段使用VARBINARY类型
                    columns.append(f"{col_name} VARBINARY(255)")
                else:
                    columns.append(f"{col_name} {col_def['type']}")
            
            create_sql = f"CREATE TABLE {table_name} ({', '.join(columns)})"
            cursor.execute(create_sql)
            
            self.connection.commit()
            print(f"加密表 {table_name} 创建成功")
            return True
            
        except Exception as e:
            print(f"创建加密表失败: {e}")
            return False
        finally:
            cursor.close()
    
    def insert_encrypted_data(self, table_name, data, schema):
        """插入加密数据"""
        try:
            cursor = self.connection.cursor()
            
            # 处理加密字段
            processed_data = {}
            for field_name, value in data.items():
                if schema.get(field_name, {}).get('encrypted', False):
                    # 加密敏感字段
                    processed_data[field_name] = self.encrypt_field(value, field_name)
                else:
                    processed_data[field_name] = value
            
            # 构建INSERT语句
            columns = list(processed_data.keys())
            placeholders = ', '.join(['%s'] * len(columns))
            values = list(processed_data.values())
            
            insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
            cursor.execute(insert_sql, values)
            
            self.connection.commit()
            print(f"加密数据插入成功")
            return cursor.lastrowid
            
        except Exception as e:
            print(f"插入加密数据失败: {e}")
            return None
        finally:
            cursor.close()
    
    def select_decrypted_data(self, table_name, schema, where_clause=None, params=None):
        """查询并解密数据"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            # 构建SELECT语句
            select_sql = f"SELECT * FROM {table_name}"
            if where_clause:
                select_sql += f" WHERE {where_clause}"
            
            cursor.execute(select_sql, params or [])
            results = cursor.fetchall()
            
            # 解密敏感字段
            decrypted_results = []
            for row in results:
                decrypted_row = {}
                for field_name, value in row.items():
                    if schema.get(field_name, {}).get('encrypted', False):
                        # 解密敏感字段
                        decrypted_row[field_name] = self.decrypt_field(value, field_name)
                    else:
                        decrypted_row[field_name] = value
                decrypted_results.append(decrypted_row)
            
            return decrypted_results
            
        except Exception as e:
            print(f"查询解密数据失败: {e}")
            return []
        finally:
            cursor.close()

# 字段加密使用示例
import mysql.connector

# 连接数据库
connection = mysql.connector.connect(
    host='localhost',
    user='root',
    password='password',
    database='test_db'
)

# 创建加密管理器
encryption = MySQLFieldEncryption(connection, 'master_encryption_key_2024')

# 定义表结构
user_schema = {
    'id': {'type': 'INT PRIMARY KEY AUTO_INCREMENT', 'encrypted': False},
    'username': {'type': 'VARCHAR(50) NOT NULL', 'encrypted': False},
    'email': {'type': 'VARCHAR(100)', 'encrypted': False},
    'ssn': {'type': 'VARCHAR(20)', 'encrypted': True},
    'phone': {'type': 'VARCHAR(20)', 'encrypted': True},
    'created_at': {'type': 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP', 'encrypted': False}
}

# 创建加密表
encryption.create_encrypted_table('secure_users', user_schema)

# 插入加密数据
user_data = {
    'username': 'john_doe',
    'email': 'john@example.com',
    'ssn': '123-45-6789',
    'phone': '555-1234'
}

user_id = encryption.insert_encrypted_data('secure_users', user_data, user_schema)

# 查询解密数据
results = encryption.select_decrypted_data(
    'secure_users', 
    user_schema, 
    'username = %s', 
    ['john_doe']
)

for user in results:
    print(f"用户: {user['username']}, SSN: {user['ssn']}, 电话: {user['phone']}")

connection.close()

10.5 审计日志与监控

10.5.1 MySQL审计日志配置

1. 审计日志插件安装

-- 安装审计日志插件

-- 1. 安装audit_log插件(MySQL Enterprise版本)
INSTALL PLUGIN audit_log SONAME 'audit_log.so';

-- 2. 查看插件状态
SHOW PLUGINS;
SELECT PLUGIN_NAME, PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS 
WHERE PLUGIN_NAME = 'audit_log';

-- 3. 配置审计日志
SET GLOBAL audit_log_policy = 'ALL';
SET GLOBAL audit_log_format = 'JSON';
SET GLOBAL audit_log_file = '/var/log/mysql/audit.log';
SET GLOBAL audit_log_rotate_on_size = 1073741824; -- 1GB
SET GLOBAL audit_log_rotations = 10;

-- 4. 查看审计配置
SHOW VARIABLES LIKE 'audit_log%';

2. 审计日志配置文件

# MySQL审计日志配置 (my.cnf)

[mysqld]
# 审计日志插件
plugin-load-add = audit_log.so

# 审计日志基础配置
audit_log_policy = ALL
audit_log_format = JSON
audit_log_file = /var/log/mysql/audit.log

# 审计日志轮换
audit_log_rotate_on_size = 1073741824  # 1GB
audit_log_rotations = 10
audit_log_flush = ON

# 审计过滤配置
audit_log_connection_policy = ALL
audit_log_statement_policy = ALL

# 排除系统用户
audit_log_exclude_accounts = 'mysql.session@localhost,mysql.sys@localhost'

# 包含特定用户(可选)
# audit_log_include_accounts = 'admin@localhost,app_user@%'

# 审计日志缓冲
audit_log_buffer_size = 1048576  # 1MB

3. 审计日志分析脚本

import json
import re
from datetime import datetime, timedelta
from collections import defaultdict, Counter

class MySQLAuditLogAnalyzer:
    def __init__(self, log_file_path):
        self.log_file_path = log_file_path
        self.events = []
        self.load_events()
    
    def load_events(self):
        """加载审计日志事件"""
        try:
            with open(self.log_file_path, 'r', encoding='utf-8') as f:
                for line in f:
                    line = line.strip()
                    if line:
                        try:
                            event = json.loads(line)
                            self.events.append(event)
                        except json.JSONDecodeError:
                            continue
            print(f"加载了 {len(self.events)} 个审计事件")
        except FileNotFoundError:
            print(f"审计日志文件不存在: {self.log_file_path}")
        except Exception as e:
            print(f"加载审计日志失败: {e}")
    
    def analyze_login_attempts(self, hours=24):
        """分析登录尝试"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        login_events = []
        for event in self.events:
            if event.get('class') == 'connection':
                event_time = datetime.fromisoformat(event.get('timestamp', '').replace('Z', '+00:00'))
                if event_time >= cutoff_time:
                    login_events.append(event)
        
        # 统计登录尝试
        successful_logins = [e for e in login_events if e.get('event') == 'connect']
        failed_logins = [e for e in login_events if e.get('event') == 'connect' and e.get('connection_id') == 0]
        
        print(f"\n最近{hours}小时登录分析:")
        print("-" * 40)
        print(f"总登录尝试: {len(login_events)}")
        print(f"成功登录: {len(successful_logins)}")
        print(f"失败登录: {len(failed_logins)}")
        
        # 按用户统计
        user_stats = Counter()
        for event in login_events:
            user = event.get('user', 'unknown')
            host = event.get('host', 'unknown')
            user_stats[f"{user}@{host}"] += 1
        
        print("\n用户登录统计:")
        for user, count in user_stats.most_common(10):
            print(f"  {user}: {count} 次")
        
        return {
            'total_attempts': len(login_events),
            'successful': len(successful_logins),
            'failed': len(failed_logins),
            'user_stats': dict(user_stats)
        }
    
    def analyze_privilege_changes(self, hours=24):
        """分析权限变更"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        privilege_keywords = ['GRANT', 'REVOKE', 'CREATE USER', 'DROP USER', 'ALTER USER']
        privilege_events = []
        
        for event in self.events:
            if event.get('class') == 'general':
                event_time = datetime.fromisoformat(event.get('timestamp', '').replace('Z', '+00:00'))
                if event_time >= cutoff_time:
                    sql_text = event.get('general_data', {}).get('sql_command', '').upper()
                    if any(keyword in sql_text for keyword in privilege_keywords):
                        privilege_events.append(event)
        
        print(f"\n最近{hours}小时权限变更分析:")
        print("-" * 40)
        print(f"权限变更事件: {len(privilege_events)}")
        
        # 按操作类型统计
        operation_stats = Counter()
        for event in privilege_events:
            sql_text = event.get('general_data', {}).get('sql_command', '').upper()
            for keyword in privilege_keywords:
                if keyword in sql_text:
                    operation_stats[keyword] += 1
                    break
        
        print("\n操作类型统计:")
        for operation, count in operation_stats.items():
            print(f"  {operation}: {count} 次")
        
        # 显示详细事件
        print("\n详细权限变更事件:")
        for event in privilege_events[-10:]:  # 显示最近10个事件
            timestamp = event.get('timestamp', '')
            user = event.get('user', 'unknown')
            sql_command = event.get('general_data', {}).get('sql_command', '')
            print(f"  {timestamp} - {user}: {sql_command[:100]}...")
        
        return privilege_events
    
    def analyze_suspicious_activities(self, hours=24):
        """分析可疑活动"""
        cutoff_time = datetime.now() - timedelta(hours=hours)
        
        suspicious_patterns = {
            'SQL注入尝试': [r'UNION.*SELECT', r'OR.*1=1', r'DROP.*TABLE', r'INSERT.*INTO.*VALUES'],
            '权限提升': [r'GRANT.*ALL', r'SUPER', r'FILE'],
            '数据导出': [r'SELECT.*INTO.*OUTFILE', r'LOAD_FILE'],
            '批量操作': [r'DELETE.*FROM.*WHERE.*1=1', r'UPDATE.*SET.*WHERE.*1=1']
        }
        
        suspicious_events = defaultdict(list)
        
        for event in self.events:
            if event.get('class') == 'general':
                event_time = datetime.fromisoformat(event.get('timestamp', '').replace('Z', '+00:00'))
                if event_time >= cutoff_time:
                    sql_text = event.get('general_data', {}).get('sql_command', '').upper()
                    
                    for category, patterns in suspicious_patterns.items():
                        for pattern in patterns:
                            if re.search(pattern, sql_text, re.IGNORECASE):
                                suspicious_events[category].append(event)
                                break
        
        print(f"\n最近{hours}小时可疑活动分析:")
        print("-" * 40)
        
        total_suspicious = sum(len(events) for events in suspicious_events.values())
        print(f"可疑事件总数: {total_suspicious}")
        
        for category, events in suspicious_events.items():
            if events:
                print(f"\n{category}: {len(events)} 个事件")
                for event in events[-3:]:  # 显示最近3个事件
                    timestamp = event.get('timestamp', '')
                    user = event.get('user', 'unknown')
                    sql_command = event.get('general_data', {}).get('sql_command', '')
                    print(f"  {timestamp} - {user}: {sql_command[:80]}...")
        
        return dict(suspicious_events)
    
    def generate_security_report(self, hours=24):
        """生成安全报告"""
        print(f"\n=== MySQL安全审计报告 ===")
        print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"分析时间范围: 最近{hours}小时")
        print("=" * 50)
        
        # 登录分析
        login_analysis = self.analyze_login_attempts(hours)
        
        # 权限变更分析
        privilege_analysis = self.analyze_privilege_changes(hours)
        
        # 可疑活动分析
        suspicious_analysis = self.analyze_suspicious_activities(hours)
        
        # 安全评分
        security_score = 100
        
        # 根据失败登录次数扣分
        failed_logins = login_analysis.get('failed', 0)
        if failed_logins > 10:
            security_score -= min(20, failed_logins)
        
        # 根据权限变更次数扣分
        privilege_changes = len(privilege_analysis)
        if privilege_changes > 5:
            security_score -= min(15, privilege_changes * 2)
        
        # 根据可疑活动扣分
        total_suspicious = sum(len(events) for events in suspicious_analysis.values())
        if total_suspicious > 0:
            security_score -= min(30, total_suspicious * 5)
        
        security_score = max(0, security_score)
        
        print(f"\n安全评分: {security_score}/100")
        if security_score >= 90:
            print("安全状态: 优秀 ✅")
        elif security_score >= 70:
            print("安全状态: 良好 ⚠️")
        elif security_score >= 50:
            print("安全状态: 一般 ⚠️")
        else:
            print("安全状态: 危险 ❌")
        
        # 安全建议
        print("\n安全建议:")
        if failed_logins > 10:
            print("  • 检查是否存在暴力破解攻击")
        if privilege_changes > 5:
            print("  • 审查权限变更的合理性")
        if total_suspicious > 0:
            print("  • 立即调查可疑活动")
        if security_score < 70:
            print("  • 加强访问控制和监控")
        
        return {
            'security_score': security_score,
            'login_analysis': login_analysis,
            'privilege_changes': len(privilege_analysis),
            'suspicious_activities': total_suspicious
        }

# 审计日志分析使用示例
analyzer = MySQLAuditLogAnalyzer('/var/log/mysql/audit.log')
report = analyzer.generate_security_report(24)

4. 审计日志监控脚本

#!/bin/bash
# MySQL审计日志实时监控脚本

AUDIT_LOG="/var/log/mysql/audit.log"
ALERT_EMAIL="admin@example.com"
TEMP_DIR="/tmp/mysql_audit"
ALERT_THRESHOLD=5

# 创建临时目录
mkdir -p $TEMP_DIR

echo "=== MySQL审计日志监控启动 ==="
echo "监控文件: $AUDIT_LOG"
echo "告警邮箱: $ALERT_EMAIL"
echo "开始时间: $(date)"
echo

# 监控函数
monitor_audit_log() {
    local log_file=$1
    local last_position_file="$TEMP_DIR/last_position"
    
    # 获取上次读取位置
    if [ -f "$last_position_file" ]; then
        last_position=$(cat "$last_position_file")
    else
        last_position=0
    fi
    
    # 获取当前文件大小
    current_size=$(stat -c%s "$log_file" 2>/dev/null || echo 0)
    
    if [ $current_size -gt $last_position ]; then
        # 读取新增内容
        tail -c +$((last_position + 1)) "$log_file" | while IFS= read -r line; do
            if [ -n "$line" ]; then
                analyze_audit_event "$line"
            fi
        done
        
        # 更新位置
        echo $current_size > "$last_position_file"
    fi
}

# 分析审计事件
analyze_audit_event() {
    local event_line="$1"
    local alert_file="$TEMP_DIR/alerts_$(date +%Y%m%d)"
    
    # 检查可疑活动
    if echo "$event_line" | grep -qi "UNION.*SELECT\|OR.*1=1\|DROP.*TABLE"; then
        echo "$(date): SQL注入尝试 - $event_line" >> "$alert_file"
        send_alert "SQL注入尝试" "$event_line"
    fi
    
    if echo "$event_line" | grep -qi "GRANT.*ALL\|SUPER\|FILE"; then
        echo "$(date): 权限提升尝试 - $event_line" >> "$alert_file"
        send_alert "权限提升尝试" "$event_line"
    fi
    
    if echo "$event_line" | grep -qi "SELECT.*INTO.*OUTFILE\|LOAD_FILE"; then
        echo "$(date): 数据导出尝试 - $event_line" >> "$alert_file"
        send_alert "数据导出尝试" "$event_line"
    fi
    
    # 检查失败登录
    if echo "$event_line" | grep -qi '"event":"connect"' && echo "$event_line" | grep -qi '"connection_id":0'; then
        echo "$(date): 登录失败 - $event_line" >> "$alert_file"
        
        # 统计失败登录次数
        local failed_count=$(grep "登录失败" "$alert_file" | wc -l)
        if [ $failed_count -ge $ALERT_THRESHOLD ]; then
            send_alert "频繁登录失败" "检测到 $failed_count 次登录失败"
        fi
    fi
}

# 发送告警
send_alert() {
    local alert_type="$1"
    local alert_content="$2"
    local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
    
    echo "[$timestamp] 安全告警: $alert_type"
    echo "详情: $alert_content"
    echo
    
    # 发送邮件告警(需要配置邮件服务)
    if command -v mail >/dev/null 2>&1; then
        echo "时间: $timestamp\n类型: $alert_type\n详情: $alert_content" | \
        mail -s "MySQL安全告警: $alert_type" "$ALERT_EMAIL"
    fi
    
    # 记录到系统日志
    logger "MySQL Security Alert: $alert_type - $alert_content"
}

# 清理旧文件
cleanup_old_files() {
    find "$TEMP_DIR" -name "alerts_*" -mtime +7 -delete
}

# 主监控循环
while true; do
    if [ -f "$AUDIT_LOG" ]; then
        monitor_audit_log "$AUDIT_LOG"
    else
        echo "审计日志文件不存在: $AUDIT_LOG"
        sleep 60
        continue
    fi
    
    # 每小时清理一次旧文件
    if [ $(($(date +%M))) -eq 0 ]; then
        cleanup_old_files
    fi
    
    sleep 10
done

10.5.2 性能监控与安全监控集成

1. 安全监控指标

-- 安全监控关键指标

-- 1. 连接监控
SELECT 
    USER,
    HOST,
    COUNT(*) as connection_count,
    MAX(TIME) as max_time,
    AVG(TIME) as avg_time
FROM information_schema.PROCESSLIST 
WHERE USER NOT IN ('system user', 'event_scheduler')
GROUP BY USER, HOST
ORDER BY connection_count DESC;

-- 2. 失败连接统计
SELECT 
    VARIABLE_NAME,
    VARIABLE_VALUE
FROM performance_schema.global_status 
WHERE VARIABLE_NAME IN (
    'Aborted_connects',
    'Aborted_clients',
    'Connection_errors_max_connections',
    'Connection_errors_internal',
    'Connection_errors_tcpwrap'
);

-- 3. 权限使用统计
SELECT 
    USER,
    HOST,
    COUNT(*) as query_count,
    COUNT(DISTINCT SQL_TEXT) as unique_queries
FROM performance_schema.events_statements_history_long
WHERE EVENT_NAME = 'statement/sql/select'
GROUP BY USER, HOST
ORDER BY query_count DESC
LIMIT 10;

-- 4. 敏感操作监控
SELECT 
    USER,
    HOST,
    SQL_TEXT,
    TIMER_START,
    TIMER_END
FROM performance_schema.events_statements_history_long
WHERE SQL_TEXT REGEXP 'GRANT|REVOKE|CREATE USER|DROP USER|ALTER USER'
ORDER BY TIMER_START DESC
LIMIT 20;

-- 5. 文件访问监控
SELECT 
    USER,
    HOST,
    SQL_TEXT
FROM performance_schema.events_statements_history_long
WHERE SQL_TEXT REGEXP 'LOAD_FILE|INTO OUTFILE|INTO DUMPFILE'
ORDER BY TIMER_START DESC;

2. 安全监控仪表板

class MySQLSecurityDashboard:
    def __init__(self, connection):
        self.connection = connection
    
    def get_connection_security_metrics(self):
        """获取连接安全指标"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            # 当前连接统计
            cursor.execute("""
                SELECT 
                    USER,
                    HOST,
                    COUNT(*) as connection_count,
                    MAX(TIME) as max_time,
                    AVG(TIME) as avg_time
                FROM information_schema.PROCESSLIST 
                WHERE USER NOT IN ('system user', 'event_scheduler')
                GROUP BY USER, HOST
                ORDER BY connection_count DESC
            """)
            
            current_connections = cursor.fetchall()
            
            # 连接错误统计
            cursor.execute("""
                SELECT 
                    VARIABLE_NAME,
                    VARIABLE_VALUE
                FROM performance_schema.global_status 
                WHERE VARIABLE_NAME IN (
                    'Aborted_connects',
                    'Aborted_clients',
                    'Connection_errors_max_connections',
                    'Connection_errors_internal'
                )
            """)
            
            connection_errors = {row['VARIABLE_NAME']: int(row['VARIABLE_VALUE']) 
                               for row in cursor.fetchall()}
            
            return {
                'current_connections': current_connections,
                'connection_errors': connection_errors
            }
            
        except Exception as e:
            print(f"获取连接安全指标失败: {e}")
            return {}
        finally:
            cursor.close()
    
    def get_privilege_usage_metrics(self):
        """获取权限使用指标"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            # 用户查询统计
            cursor.execute("""
                SELECT 
                    USER,
                    HOST,
                    COUNT(*) as query_count,
                    COUNT(DISTINCT DIGEST_TEXT) as unique_queries
                FROM performance_schema.events_statements_summary_by_user_by_event_name
                WHERE EVENT_NAME LIKE 'statement/sql/%'
                GROUP BY USER, HOST
                ORDER BY query_count DESC
                LIMIT 10
            """)
            
            user_activity = cursor.fetchall()
            
            # 敏感操作统计
            cursor.execute("""
                SELECT 
                    USER,
                    HOST,
                    DIGEST_TEXT,
                    COUNT_STAR as execution_count,
                    SUM_TIMER_WAIT/1000000000 as total_time_seconds
                FROM performance_schema.events_statements_summary_by_digest
                WHERE DIGEST_TEXT REGEXP 'GRANT|REVOKE|CREATE USER|DROP USER|ALTER USER'
                ORDER BY COUNT_STAR DESC
                LIMIT 20
            """)
            
            sensitive_operations = cursor.fetchall()
            
            return {
                'user_activity': user_activity,
                'sensitive_operations': sensitive_operations
            }
            
        except Exception as e:
            print(f"获取权限使用指标失败: {e}")
            return {}
        finally:
            cursor.close()
    
    def get_security_violations(self):
        """获取安全违规事件"""
        try:
            cursor = self.connection.cursor(dictionary=True)
            
            # 可疑SQL模式
            suspicious_patterns = [
                "UNION.*SELECT",
                "OR.*1=1",
                "DROP.*TABLE",
                "LOAD_FILE",
                "INTO.*OUTFILE"
            ]
            
            violations = []
            
            for pattern in suspicious_patterns:
                cursor.execute(f"""
                    SELECT 
                        USER,
                        HOST,
                        DIGEST_TEXT,
                        COUNT_STAR as execution_count,
                        FIRST_SEEN,
                        LAST_SEEN
                    FROM performance_schema.events_statements_summary_by_digest
                    WHERE DIGEST_TEXT REGEXP '{pattern}'
                    ORDER BY LAST_SEEN DESC
                    LIMIT 5
                """)
                
                pattern_violations = cursor.fetchall()
                for violation in pattern_violations:
                    violation['violation_type'] = pattern
                    violations.append(violation)
            
            return violations
            
        except Exception as e:
            print(f"获取安全违规事件失败: {e}")
            return []
        finally:
            cursor.close()
    
    def generate_security_dashboard(self):
        """生成安全仪表板"""
        print("\n=== MySQL安全监控仪表板 ===")
        print(f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print("=" * 50)
        
        # 连接安全指标
        connection_metrics = self.get_connection_security_metrics()
        
        print("\n📊 连接安全指标:")
        print("-" * 30)
        
        current_connections = connection_metrics.get('current_connections', [])
        if current_connections:
            print(f"活跃连接数: {len(current_connections)}")
            print("\n用户连接分布:")
            for conn in current_connections[:5]:
                print(f"  {conn['USER']}@{conn['HOST']}: {conn['connection_count']} 个连接")
        
        connection_errors = connection_metrics.get('connection_errors', {})
        if connection_errors:
            print("\n连接错误统计:")
            for error_type, count in connection_errors.items():
                if count > 0:
                    print(f"  {error_type}: {count}")
        
        # 权限使用指标
        privilege_metrics = self.get_privilege_usage_metrics()
        
        print("\n🔐 权限使用指标:")
        print("-" * 30)
        
        user_activity = privilege_metrics.get('user_activity', [])
        if user_activity:
            print("用户活跃度 (Top 5):")
            for user in user_activity[:5]:
                print(f"  {user['USER']}@{user['HOST']}: {user['query_count']} 次查询")
        
        sensitive_operations = privilege_metrics.get('sensitive_operations', [])
        if sensitive_operations:
            print("\n敏感操作统计:")
            for op in sensitive_operations[:5]:
                print(f"  {op['DIGEST_TEXT'][:50]}...: {op['execution_count']} 次")
        
        # 安全违规事件
        violations = self.get_security_violations()
        
        print("\n⚠️  安全违规事件:")
        print("-" * 30)
        
        if violations:
            print(f"检测到 {len(violations)} 个潜在安全违规事件")
            for violation in violations[:5]:
                print(f"  类型: {violation['violation_type']}")
                print(f"  用户: {violation['USER']}@{violation['HOST']}")
                print(f"  执行次数: {violation['execution_count']}")
                print(f"  最后执行: {violation['LAST_SEEN']}")
                print()
        else:
            print("✅ 未检测到安全违规事件")
        
        # 安全建议
        print("\n💡 安全建议:")
        print("-" * 30)
        
        total_errors = sum(connection_errors.values())
        if total_errors > 100:
            print("  • 连接错误较多,建议检查网络和认证配置")
        
        if len(violations) > 0:
            print("  • 发现可疑SQL模式,建议立即调查")
        
        if len(current_connections) > 50:
            print("  • 当前连接数较多,建议检查连接池配置")
        
        if not violations and total_errors < 10:
            print("  ✅ 当前安全状态良好")
        
        return {
            'connection_metrics': connection_metrics,
            'privilege_metrics': privilege_metrics,
            'violations': violations
        }

# 安全仪表板使用示例
import mysql.connector

connection = mysql.connector.connect(
    host='localhost',
    user='root',
    password='password'
)

dashboard = MySQLSecurityDashboard(connection)
metrics = dashboard.generate_security_dashboard()

connection.close()

”`