10.1 MySQL安全基础
10.1.1 安全威胁与风险评估
class MySQLSecurityThreats:
def __init__(self):
self.threats = {
"外部威胁": {
"SQL注入攻击": {
"描述": "通过恶意SQL代码获取或破坏数据",
"风险等级": "高",
"影响": ["数据泄露", "数据篡改", "权限提升"],
"防护措施": [
"使用参数化查询",
"输入验证和过滤",
"最小权限原则",
"定期安全审计"
]
},
"暴力破解攻击": {
"描述": "通过尝试大量密码组合获取访问权限",
"风险等级": "中",
"影响": ["账户被盗", "未授权访问"],
"防护措施": [
"强密码策略",
"账户锁定机制",
"连接限制",
"监控异常登录"
]
},
"网络嗅探": {
"描述": "截获网络传输中的敏感信息",
"风险等级": "中",
"影响": ["密码泄露", "数据窃取"],
"防护措施": [
"SSL/TLS加密",
"VPN连接",
"网络隔离",
"加密传输"
]
}
},
"内部威胁": {
"权限滥用": {
"描述": "内部用户超越授权范围访问数据",
"风险等级": "高",
"影响": ["数据泄露", "内部欺诈"],
"防护措施": [
"最小权限原则",
"权限定期审查",
"操作审计",
"职责分离"
]
},
"误操作": {
"描述": "无意中的错误操作导致数据损失",
"风险等级": "中",
"影响": ["数据丢失", "服务中断"],
"防护措施": [
"操作培训",
"变更管理",
"备份策略",
"操作审批"
]
}
},
"系统威胁": {
"配置错误": {
"描述": "不安全的系统配置导致安全漏洞",
"风险等级": "高",
"影响": ["系统入侵", "数据泄露"],
"防护措施": [
"安全配置基线",
"配置审计",
"自动化部署",
"安全扫描"
]
},
"软件漏洞": {
"描述": "MySQL软件本身的安全漏洞",
"风险等级": "高",
"影响": ["系统入侵", "权限提升"],
"防护措施": [
"及时更新补丁",
"漏洞扫描",
"版本管理",
"安全监控"
]
}
}
}
def show_threat_analysis(self):
"""显示威胁分析"""
print("MySQL安全威胁分析")
print("=" * 60)
for category, threats in self.threats.items():
print(f"\n{category}:")
print("-" * 40)
for threat_name, details in threats.items():
print(f"\n威胁: {threat_name}")
print(f"描述: {details['描述']}")
print(f"风险等级: {details['风险等级']}")
print("影响:")
for impact in details['影响']:
print(f" • {impact}")
print("防护措施:")
for measure in details['防护措施']:
print(f" ✓ {measure}")
# 安全威胁分析演示
security_threats = MySQLSecurityThreats()
security_threats.show_threat_analysis()
10.1.2 安全配置基线
1. 基础安全配置
-- MySQL安全配置基线
-- 1. 删除匿名用户
DELETE FROM mysql.user WHERE User='';
-- 2. 删除test数据库
DROP DATABASE IF EXISTS test;
DELETE FROM mysql.db WHERE Db='test' OR Db='test\_%';
-- 3. 禁用远程root登录
DELETE FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');
-- 4. 设置root密码(如果未设置)
ALTER USER 'root'@'localhost' IDENTIFIED BY 'StrongPassword123!';
-- 5. 刷新权限
FLUSH PRIVILEGES;
-- 6. 创建专用管理用户
CREATE USER 'admin'@'localhost' IDENTIFIED BY 'AdminPassword123!';
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'localhost' WITH GRANT OPTION;
-- 7. 创建应用用户(最小权限)
CREATE USER 'app_user'@'%' IDENTIFIED BY 'AppPassword123!';
GRANT SELECT, INSERT, UPDATE, DELETE ON app_database.* TO 'app_user'@'%';
-- 8. 创建只读用户
CREATE USER 'readonly'@'%' IDENTIFIED BY 'ReadOnlyPassword123!';
GRANT SELECT ON app_database.* TO 'readonly'@'%';
-- 9. 创建备份用户
CREATE USER 'backup'@'localhost' IDENTIFIED BY 'BackupPassword123!';
GRANT SELECT, LOCK TABLES, SHOW VIEW, EVENT, TRIGGER ON *.* TO 'backup'@'localhost';
GRANT RELOAD, PROCESS ON *.* TO 'backup'@'localhost';
2. my.cnf安全配置
# MySQL安全配置文件 (my.cnf)
[mysqld]
# 基础安全设置
user = mysql
port = 3306
bind-address = 127.0.0.1 # 仅本地访问,生产环境根据需要调整
# 禁用危险功能
local-infile = 0
show-database = 0
safe-user-create = 1
secure-file-priv = /var/lib/mysql-files/
# 连接安全
max_connections = 100
max_connect_errors = 10
max_user_connections = 50
# 密码验证
validate_password.policy = MEDIUM
validate_password.length = 8
validate_password.mixed_case_count = 1
validate_password.number_count = 1
validate_password.special_char_count = 1
# SSL配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/server-cert.pem
ssl-key = /etc/mysql/ssl/server-key.pem
require_secure_transport = ON
# 日志配置
general_log = 1
general_log_file = /var/log/mysql/general.log
log_error = /var/log/mysql/error.log
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
log_queries_not_using_indexes = 1
# 二进制日志
log-bin = /var/log/mysql/mysql-bin
binlog_format = ROW
expire_logs_days = 7
max_binlog_size = 100M
# 其他安全设置
skip-symbolic-links
chroot = /var/lib/mysql
old_passwords = 0
sql_mode = STRICT_TRANS_TABLES,ERROR_FOR_DIVISION_BY_ZERO,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
[mysql]
# 客户端安全设置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/client-cert.pem
ssl-key = /etc/mysql/ssl/client-key.pem
[mysqldump]
# 备份安全设置
single-transaction
routines
triggers
3. 安全配置检查脚本
#!/bin/bash
# MySQL安全配置检查脚本
echo "=== MySQL安全配置检查 ==="
echo "检查时间: $(date)"
echo
# 配置参数
MYSQL_USER="root"
MYSQL_PASS=""
CHECK_RESULT="/tmp/mysql_security_check_$(date +%Y%m%d_%H%M%S).log"
# 创建检查结果文件
exec > >(tee -a $CHECK_RESULT)
exec 2>&1
echo "安全检查结果: $CHECK_RESULT"
echo
# 1. 检查匿名用户
echo "1. 检查匿名用户:"
ANON_USERS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='';" 2>/dev/null | wc -l)
if [ $ANON_USERS -gt 1 ]; then
echo "❌ 发现匿名用户,存在安全风险"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='';"
else
echo "✅ 未发现匿名用户"
fi
# 2. 检查test数据库
echo "\n2. 检查test数据库:"
TEST_DB=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW DATABASES LIKE 'test';" 2>/dev/null | wc -l)
if [ $TEST_DB -gt 1 ]; then
echo "❌ 发现test数据库,建议删除"
else
echo "✅ test数据库已删除"
fi
# 3. 检查root远程登录
echo "\n3. 检查root远程登录:"
REMOTE_ROOT=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');" 2>/dev/null | wc -l)
if [ $REMOTE_ROOT -gt 1 ]; then
echo "❌ root用户允许远程登录,存在安全风险"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User='root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');"
else
echo "✅ root用户仅允许本地登录"
fi
# 4. 检查空密码用户
echo "\n4. 检查空密码用户:"
EMPTY_PASS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE authentication_string='';" 2>/dev/null | wc -l)
if [ $EMPTY_PASS -gt 1 ]; then
echo "❌ 发现空密码用户,存在安全风险"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE authentication_string='';"
else
echo "✅ 未发现空密码用户"
fi
# 5. 检查SSL配置
echo "\n5. 检查SSL配置:"
SSL_STATUS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'have_ssl';" 2>/dev/null | grep YES | wc -l)
if [ $SSL_STATUS -eq 1 ]; then
echo "✅ SSL已启用"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE '%ssl%';"
else
echo "❌ SSL未启用,建议启用SSL加密"
fi
# 6. 检查日志配置
echo "\n6. 检查日志配置:"
GENERAL_LOG=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'general_log';" 2>/dev/null | grep ON | wc -l)
SLOW_LOG=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'slow_query_log';" 2>/dev/null | grep ON | wc -l)
if [ $GENERAL_LOG -eq 1 ]; then
echo "✅ 一般查询日志已启用"
else
echo "⚠️ 一般查询日志未启用"
fi
if [ $SLOW_LOG -eq 1 ]; then
echo "✅ 慢查询日志已启用"
else
echo "⚠️ 慢查询日志未启用"
fi
# 7. 检查文件权限
echo "\n7. 检查文件权限:"
if [ -d "/var/lib/mysql" ]; then
MYSQL_DIR_PERM=$(stat -c "%a" /var/lib/mysql)
MYSQL_DIR_OWNER=$(stat -c "%U:%G" /var/lib/mysql)
echo "MySQL数据目录权限: $MYSQL_DIR_PERM ($MYSQL_DIR_OWNER)"
if [ "$MYSQL_DIR_OWNER" = "mysql:mysql" ]; then
echo "✅ MySQL数据目录所有者正确"
else
echo "❌ MySQL数据目录所有者不正确"
fi
fi
# 8. 检查网络配置
echo "\n8. 检查网络配置:"
BIND_ADDRESS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'bind_address';" 2>/dev/null | awk 'NR==2{print $2}')
PORT=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'port';" 2>/dev/null | awk 'NR==2{print $2}')
echo "绑定地址: $BIND_ADDRESS"
echo "端口: $PORT"
if [ "$BIND_ADDRESS" = "127.0.0.1" ] || [ "$BIND_ADDRESS" = "localhost" ]; then
echo "✅ 仅绑定本地地址,安全性较高"
else
echo "⚠️ 绑定了外部地址,请确保网络安全"
fi
echo
echo "=== 安全检查完成 ==="
echo "详细结果已保存到: $CHECK_RESULT"
10.2 用户管理与权限控制
10.2.1 用户账户管理
1. 用户创建与管理
-- 用户账户管理最佳实践
-- 1. 创建用户(指定主机)
CREATE USER 'username'@'localhost' IDENTIFIED BY 'StrongPassword123!';
CREATE USER 'username'@'192.168.1.%' IDENTIFIED BY 'StrongPassword123!';
CREATE USER 'username'@'%.example.com' IDENTIFIED BY 'StrongPassword123!';
-- 2. 修改用户密码
ALTER USER 'username'@'localhost' IDENTIFIED BY 'NewStrongPassword123!';
-- 3. 设置密码过期
ALTER USER 'username'@'localhost' PASSWORD EXPIRE;
ALTER USER 'username'@'localhost' PASSWORD EXPIRE INTERVAL 90 DAY;
ALTER USER 'username'@'localhost' PASSWORD EXPIRE NEVER;
-- 4. 账户锁定/解锁
ALTER USER 'username'@'localhost' ACCOUNT LOCK;
ALTER USER 'username'@'localhost' ACCOUNT UNLOCK;
-- 5. 设置连接限制
ALTER USER 'username'@'localhost'
WITH MAX_QUERIES_PER_HOUR 1000
MAX_UPDATES_PER_HOUR 100
MAX_CONNECTIONS_PER_HOUR 10
MAX_USER_CONNECTIONS 5;
-- 6. 查看用户信息
SELECT User, Host, account_locked, password_expired, password_lifetime
FROM mysql.user;
-- 7. 删除用户
DROP USER 'username'@'localhost';
-- 8. 重命名用户
RENAME USER 'old_username'@'localhost' TO 'new_username'@'localhost';
2. 密码策略配置
-- 密码验证组件配置
-- 1. 安装密码验证插件
INSTALL PLUGIN validate_password SONAME 'validate_password.so';
-- 2. 查看密码策略设置
SHOW VARIABLES LIKE 'validate_password%';
-- 3. 配置密码策略
SET GLOBAL validate_password.policy = 'STRONG';
SET GLOBAL validate_password.length = 12;
SET GLOBAL validate_password.mixed_case_count = 2;
SET GLOBAL validate_password.number_count = 2;
SET GLOBAL validate_password.special_char_count = 2;
SET GLOBAL validate_password.dictionary_file = '/usr/share/mysql/dictionary.txt';
-- 4. 测试密码强度
SELECT VALIDATE_PASSWORD_STRENGTH('WeakPass');
SELECT VALIDATE_PASSWORD_STRENGTH('StrongPassword123!');
-- 5. 查看密码验证要求
SHOW STATUS LIKE 'validate_password%';
3. 用户管理脚本
class MySQLUserManager:
def __init__(self, host='localhost', user='root', password=''):
self.host = host
self.user = user
self.password = password
self.connection = None
def connect(self):
"""连接到MySQL"""
import mysql.connector
try:
self.connection = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password
)
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def create_user(self, username, host, password, max_connections=10):
"""创建用户"""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
# 创建用户
create_sql = f"CREATE USER '{username}'@'{host}' IDENTIFIED BY '{password}'"
cursor.execute(create_sql)
# 设置连接限制
limit_sql = f"""
ALTER USER '{username}'@'{host}'
WITH MAX_USER_CONNECTIONS {max_connections}
"""
cursor.execute(limit_sql)
self.connection.commit()
print(f"用户 {username}@{host} 创建成功")
return True
except Exception as e:
print(f"创建用户失败: {e}")
return False
finally:
cursor.close()
def grant_privileges(self, username, host, database, privileges):
"""授予权限"""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
# 构建权限字符串
if isinstance(privileges, list):
priv_str = ', '.join(privileges)
else:
priv_str = privileges
grant_sql = f"GRANT {priv_str} ON {database}.* TO '{username}'@'{host}'"
cursor.execute(grant_sql)
self.connection.commit()
print(f"权限授予成功: {username}@{host} -> {priv_str} on {database}")
return True
except Exception as e:
print(f"授予权限失败: {e}")
return False
finally:
cursor.close()
def revoke_privileges(self, username, host, database, privileges):
"""撤销权限"""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
# 构建权限字符串
if isinstance(privileges, list):
priv_str = ', '.join(privileges)
else:
priv_str = privileges
revoke_sql = f"REVOKE {priv_str} ON {database}.* FROM '{username}'@'{host}'"
cursor.execute(revoke_sql)
self.connection.commit()
print(f"权限撤销成功: {username}@{host} -> {priv_str} on {database}")
return True
except Exception as e:
print(f"撤销权限失败: {e}")
return False
finally:
cursor.close()
def list_users(self):
"""列出所有用户"""
if not self.connection:
return []
try:
cursor = self.connection.cursor()
query = """
SELECT User, Host, account_locked, password_expired,
password_lifetime, max_user_connections
FROM mysql.user
ORDER BY User, Host
"""
cursor.execute(query)
users = cursor.fetchall()
print("MySQL用户列表:")
print("-" * 80)
print(f"{'用户名':<15} {'主机':<20} {'锁定':<6} {'密码过期':<8} {'密码生命周期':<12} {'最大连接':<8}")
print("-" * 80)
for user in users:
username, host, locked, expired, lifetime, max_conn = user
locked_str = "是" if locked == 'Y' else "否"
expired_str = "是" if expired == 'Y' else "否"
lifetime_str = str(lifetime) if lifetime else "无限制"
max_conn_str = str(max_conn) if max_conn else "无限制"
print(f"{username:<15} {host:<20} {locked_str:<6} {expired_str:<8} {lifetime_str:<12} {max_conn_str:<8}")
return users
except Exception as e:
print(f"查询用户失败: {e}")
return []
finally:
cursor.close()
def show_user_privileges(self, username, host):
"""显示用户权限"""
if not self.connection:
return
try:
cursor = self.connection.cursor()
# 查询全局权限
global_query = f"SHOW GRANTS FOR '{username}'@'{host}'"
cursor.execute(global_query)
grants = cursor.fetchall()
print(f"用户 {username}@{host} 的权限:")
print("-" * 50)
for grant in grants:
print(grant[0])
except Exception as e:
print(f"查询权限失败: {e}")
finally:
cursor.close()
def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
# 用户管理演示
user_manager = MySQLUserManager()
if user_manager.connect():
# 创建应用用户
user_manager.create_user('app_user', 'localhost', 'AppPassword123!', 20)
# 授予权限
user_manager.grant_privileges('app_user', 'localhost', 'myapp',
['SELECT', 'INSERT', 'UPDATE', 'DELETE'])
# 列出用户
user_manager.list_users()
# 显示用户权限
user_manager.show_user_privileges('app_user', 'localhost')
user_manager.close()
10.2.2 权限系统详解
1. MySQL权限层次
-- MySQL权限层次结构
-- 1. 全局权限(影响整个MySQL服务器)
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'localhost';
GRANT SUPER, PROCESS, RELOAD ON *.* TO 'dba'@'localhost';
-- 2. 数据库权限(影响特定数据库)
GRANT ALL PRIVILEGES ON myapp.* TO 'app_admin'@'localhost';
GRANT CREATE, DROP, ALTER ON myapp.* TO 'schema_manager'@'localhost';
-- 3. 表权限(影响特定表)
GRANT SELECT, INSERT, UPDATE ON myapp.users TO 'app_user'@'localhost';
GRANT SELECT ON myapp.products TO 'readonly_user'@'localhost';
-- 4. 列权限(影响特定列)
GRANT SELECT (id, name, email) ON myapp.users TO 'limited_user'@'localhost';
GRANT UPDATE (last_login) ON myapp.users TO 'login_updater'@'localhost';
-- 5. 存储过程权限
GRANT EXECUTE ON PROCEDURE myapp.calculate_total TO 'proc_user'@'localhost';
GRANT EXECUTE ON FUNCTION myapp.get_discount TO 'func_user'@'localhost';
2. 权限类型详解
class MySQLPrivileges:
def __init__(self):
self.privileges = {
"数据权限": {
"SELECT": {
"描述": "查询数据",
"适用对象": ["表", "列", "视图"],
"安全考虑": "最基础的权限,但要注意敏感数据的访问控制"
},
"INSERT": {
"描述": "插入数据",
"适用对象": ["表", "列"],
"安全考虑": "可能导致数据污染,需要输入验证"
},
"UPDATE": {
"描述": "更新数据",
"适用对象": ["表", "列"],
"安全考虑": "可能导致数据篡改,需要严格控制"
},
"DELETE": {
"描述": "删除数据",
"适用对象": ["表"],
"安全考虑": "高风险权限,可能导致数据丢失"
}
},
"结构权限": {
"CREATE": {
"描述": "创建数据库和表",
"适用对象": ["数据库", "表"],
"安全考虑": "可能消耗系统资源,需要限制"
},
"ALTER": {
"描述": "修改表结构",
"适用对象": ["表"],
"安全考虑": "可能影响应用程序,需要变更管理"
},
"DROP": {
"描述": "删除数据库和表",
"适用对象": ["数据库", "表"],
"安全考虑": "极高风险权限,可能导致数据丢失"
},
"INDEX": {
"描述": "创建和删除索引",
"适用对象": ["表"],
"安全考虑": "可能影响性能,需要DBA审核"
}
},
"管理权限": {
"GRANT OPTION": {
"描述": "授予权限给其他用户",
"适用对象": ["所有对象"],
"安全考虑": "高风险权限,可能导致权限扩散"
},
"SUPER": {
"描述": "超级用户权限",
"适用对象": ["全局"],
"安全考虑": "最高权限,仅限DBA使用"
},
"PROCESS": {
"描述": "查看所有进程",
"适用对象": ["全局"],
"安全考虑": "可能泄露敏感信息"
},
"RELOAD": {
"描述": "重新加载权限表",
"适用对象": ["全局"],
"安全考虑": "可能影响系统稳定性"
}
},
"特殊权限": {
"FILE": {
"描述": "读写服务器文件",
"适用对象": ["全局"],
"安全考虑": "极高风险,可能导致系统入侵"
},
"LOCK TABLES": {
"描述": "锁定表",
"适用对象": ["数据库"],
"安全考虑": "可能影响系统性能"
},
"EXECUTE": {
"描述": "执行存储过程和函数",
"适用对象": ["存储过程", "函数"],
"安全考虑": "需要审核存储过程的安全性"
}
}
}
def show_privilege_guide(self):
"""显示权限指南"""
print("MySQL权限管理指南")
print("=" * 60)
for category, privs in self.privileges.items():
print(f"\n{category}:")
print("-" * 40)
for priv_name, details in privs.items():
print(f"\n权限: {priv_name}")
print(f"描述: {details['描述']}")
print(f"适用对象: {', '.join(details['适用对象'])}")
print(f"安全考虑: {details['安全考虑']}")
# 权限指南演示
priv_guide = MySQLPrivileges()
priv_guide.show_privilege_guide()
3. 权限审计与监控
-- 权限审计查询
-- 1. 查看所有用户及其权限
SELECT
User,
Host,
Select_priv,
Insert_priv,
Update_priv,
Delete_priv,
Create_priv,
Drop_priv,
Reload_priv,
Shutdown_priv,
Process_priv,
File_priv,
Grant_priv,
References_priv,
Index_priv,
Alter_priv,
Show_db_priv,
Super_priv,
Create_tmp_table_priv,
Lock_tables_priv,
Execute_priv,
Repl_slave_priv,
Repl_client_priv,
Create_view_priv,
Show_view_priv,
Create_routine_priv,
Alter_routine_priv,
Create_user_priv,
Event_priv,
Trigger_priv
FROM mysql.user
ORDER BY User, Host;
-- 2. 查看数据库级权限
SELECT
User,
Host,
Db,
Select_priv,
Insert_priv,
Update_priv,
Delete_priv,
Create_priv,
Drop_priv,
Grant_priv,
References_priv,
Index_priv,
Alter_priv,
Create_tmp_table_priv,
Lock_tables_priv,
Create_view_priv,
Show_view_priv,
Create_routine_priv,
Alter_routine_priv,
Execute_priv,
Event_priv,
Trigger_priv
FROM mysql.db
ORDER BY User, Host, Db;
-- 3. 查看表级权限
SELECT
User,
Host,
Db,
Table_name,
Table_priv,
Column_priv
FROM mysql.tables_priv
ORDER BY User, Host, Db, Table_name;
-- 4. 查看列级权限
SELECT
User,
Host,
Db,
Table_name,
Column_name,
Column_priv
FROM mysql.columns_priv
ORDER BY User, Host, Db, Table_name, Column_name;
-- 5. 查看存储过程权限
SELECT
User,
Host,
Db,
Routine_name,
Routine_type,
Proc_priv
FROM mysql.procs_priv
ORDER BY User, Host, Db, Routine_name;
-- 6. 查看具有特殊权限的用户
SELECT User, Host, 'SUPER' as Privilege
FROM mysql.user
WHERE Super_priv = 'Y'
UNION
SELECT User, Host, 'FILE' as Privilege
FROM mysql.user
WHERE File_priv = 'Y'
UNION
SELECT User, Host, 'PROCESS' as Privilege
FROM mysql.user
WHERE Process_priv = 'Y'
UNION
SELECT User, Host, 'RELOAD' as Privilege
FROM mysql.user
WHERE Reload_priv = 'Y'
ORDER BY User, Host, Privilege;
-- 7. 查看具有GRANT权限的用户
SELECT User, Host, 'GLOBAL' as Level
FROM mysql.user
WHERE Grant_priv = 'Y'
UNION
SELECT User, Host, CONCAT('DATABASE: ', Db) as Level
FROM mysql.db
WHERE Grant_priv = 'Y'
UNION
SELECT User, Host, CONCAT('TABLE: ', Db, '.', Table_name) as Level
FROM mysql.tables_priv
WHERE Table_priv LIKE '%Grant%'
ORDER BY User, Host, Level;
4. 权限审计脚本
#!/bin/bash
# MySQL权限审计脚本
echo "=== MySQL权限审计报告 ==="
echo "审计时间: $(date)"
echo
# 配置参数
MYSQL_USER="root"
MYSQL_PASS=""
AUDIT_RESULT="/tmp/mysql_privilege_audit_$(date +%Y%m%d_%H%M%S).log"
# 创建审计结果文件
exec > >(tee -a $AUDIT_RESULT)
exec 2>&1
echo "审计结果: $AUDIT_RESULT"
echo
# 1. 具有SUPER权限的用户
echo "1. 具有SUPER权限的用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE Super_priv = 'Y';" 2>/dev/null
echo
# 2. 具有FILE权限的用户
echo "2. 具有FILE权限的用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE File_priv = 'Y';" 2>/dev/null
echo
# 3. 具有GRANT权限的用户
echo "3. 具有GRANT权限的用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE Grant_priv = 'Y';" 2>/dev/null
echo
# 4. 空密码用户
echo "4. 空密码用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE authentication_string = '';" 2>/dev/null
echo
# 5. 远程root用户
echo "5. 远程root用户:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT User, Host FROM mysql.user WHERE User = 'root' AND Host NOT IN ('localhost', '127.0.0.1', '::1');" 2>/dev/null
echo
# 6. 用户连接统计
echo "6. 当前用户连接统计:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SELECT user, host, COUNT(*) as connections FROM information_schema.processlist GROUP BY user, host ORDER BY connections DESC;" 2>/dev/null
echo
# 7. 权限变更历史(如果启用了审计日志)
echo "7. 最近的权限相关操作:"
if [ -f "/var/log/mysql/audit.log" ]; then
grep -i "grant\|revoke\|create user\|drop user" /var/log/mysql/audit.log | tail -10
else
echo "审计日志未启用或文件不存在"
fi
echo
echo "=== 权限审计完成 ==="
echo "详细结果已保存到: $AUDIT_RESULT"
10.3 SSL/TLS加密配置
10.3.1 SSL证书生成与配置
1. 生成SSL证书
#!/bin/bash
# MySQL SSL证书生成脚本
SSL_DIR="/etc/mysql/ssl"
CERT_DAYS=3650
# 创建SSL目录
sudo mkdir -p $SSL_DIR
cd $SSL_DIR
echo "=== 生成MySQL SSL证书 ==="
# 1. 生成CA私钥
echo "1. 生成CA私钥..."
sudo openssl genrsa 2048 > ca-key.pem
# 2. 生成CA证书
echo "2. 生成CA证书..."
sudo openssl req -new -x509 -nodes -days $CERT_DAYS -key ca-key.pem -out ca-cert.pem << EOF
CN
Beijing
Beijing
MySQL CA
MySQL CA
MySQL CA
ca@mysql.local
EOF
# 3. 生成服务器私钥
echo "3. 生成服务器私钥..."
sudo openssl req -newkey rsa:2048 -days $CERT_DAYS -nodes -keyout server-key.pem -out server-req.pem << EOF
CN
Beijing
Beijing
MySQL Server
MySQL Server
MySQL Server
server@mysql.local
EOF
# 4. 生成服务器证书
echo "4. 生成服务器证书..."
sudo openssl rsa -in server-key.pem -out server-key.pem
sudo openssl x509 -req -in server-req.pem -days $CERT_DAYS -CA ca-cert.pem -CAkey ca-key.pem -set_serial 01 -out server-cert.pem
# 5. 生成客户端私钥
echo "5. 生成客户端私钥..."
sudo openssl req -newkey rsa:2048 -days $CERT_DAYS -nodes -keyout client-key.pem -out client-req.pem << EOF
CN
Beijing
Beijing
MySQL Client
MySQL Client
MySQL Client
client@mysql.local
EOF
# 6. 生成客户端证书
echo "6. 生成客户端证书..."
sudo openssl rsa -in client-key.pem -out client-key.pem
sudo openssl x509 -req -in client-req.pem -days $CERT_DAYS -CA ca-cert.pem -CAkey ca-key.pem -set_serial 02 -out client-cert.pem
# 7. 验证证书
echo "7. 验证证书..."
sudo openssl verify -CAfile ca-cert.pem server-cert.pem client-cert.pem
# 8. 设置文件权限
echo "8. 设置文件权限..."
sudo chown mysql:mysql $SSL_DIR/*
sudo chmod 600 $SSL_DIR/*-key.pem
sudo chmod 644 $SSL_DIR/*-cert.pem $SSL_DIR/ca-cert.pem
# 9. 清理临时文件
echo "9. 清理临时文件..."
sudo rm -f $SSL_DIR/*-req.pem
echo "SSL证书生成完成!"
echo "证书位置: $SSL_DIR"
ls -la $SSL_DIR
2. MySQL SSL配置
# MySQL SSL配置 (my.cnf)
[mysqld]
# SSL基础配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/server-cert.pem
ssl-key = /etc/mysql/ssl/server-key.pem
# 强制SSL连接(可选)
require_secure_transport = ON
# SSL密码套件配置
ssl-cipher = ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384
# TLS版本配置
tls_version = TLSv1.2,TLSv1.3
[mysql]
# 客户端SSL配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/client-cert.pem
ssl-key = /etc/mysql/ssl/client-key.pem
[mysqldump]
# mysqldump SSL配置
ssl-ca = /etc/mysql/ssl/ca-cert.pem
ssl-cert = /etc/mysql/ssl/client-cert.pem
ssl-key = /etc/mysql/ssl/client-key.pem
3. SSL连接测试
-- SSL连接状态检查
-- 1. 检查SSL是否启用
SHOW VARIABLES LIKE 'have_ssl';
SHOW VARIABLES LIKE 'ssl_%';
-- 2. 查看当前连接的SSL状态
SHOW STATUS LIKE 'Ssl_%';
-- 3. 查看SSL连接信息
SELECT
CONNECTION_ID() as connection_id,
USER() as current_user,
@@ssl_cipher as ssl_cipher,
@@ssl_version as ssl_version;
-- 4. 强制用户使用SSL
ALTER USER 'secure_user'@'%' REQUIRE SSL;
ALTER USER 'cert_user'@'%' REQUIRE X509;
ALTER USER 'specific_cert_user'@'%' REQUIRE ISSUER '/CN=MySQL CA';
-- 5. 查看用户SSL要求
SELECT User, Host, ssl_type, ssl_cipher, x509_issuer, x509_subject
FROM mysql.user
WHERE ssl_type != '';
4. SSL连接脚本
class MySQLSSLConnection:
def __init__(self, host='localhost', port=3306, user='root', password=''):
self.host = host
self.port = port
self.user = user
self.password = password
self.ssl_config = {
'ca': '/etc/mysql/ssl/ca-cert.pem',
'cert': '/etc/mysql/ssl/client-cert.pem',
'key': '/etc/mysql/ssl/client-key.pem'
}
def connect_with_ssl(self, verify_cert=True):
"""使用SSL连接MySQL"""
import mysql.connector
import ssl
try:
# SSL配置
ssl_config = self.ssl_config.copy()
if not verify_cert:
ssl_config['verify_cert'] = False
ssl_config['verify_identity'] = False
# 建立连接
connection = mysql.connector.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
ssl_disabled=False,
**ssl_config
)
print("SSL连接建立成功")
return connection
except Exception as e:
print(f"SSL连接失败: {e}")
return None
def check_ssl_status(self, connection):
"""检查SSL连接状态"""
if not connection:
return
try:
cursor = connection.cursor()
# 检查SSL状态
queries = [
"SHOW STATUS LIKE 'Ssl_cipher'",
"SHOW STATUS LIKE 'Ssl_version'",
"SHOW STATUS LIKE 'Ssl_verify_mode'",
"SHOW STATUS LIKE 'Ssl_verify_depth'"
]
print("SSL连接状态:")
print("-" * 30)
for query in queries:
cursor.execute(query)
result = cursor.fetchone()
if result:
print(f"{result[0]}: {result[1]}")
# 检查连接加密状态
cursor.execute("SELECT CONNECTION_ID(), @@ssl_cipher, @@ssl_version")
result = cursor.fetchone()
if result:
conn_id, cipher, version = result
print(f"\n连接ID: {conn_id}")
print(f"加密套件: {cipher}")
print(f"TLS版本: {version}")
except Exception as e:
print(f"检查SSL状态失败: {e}")
finally:
cursor.close()
def test_ssl_requirements(self, connection):
"""测试SSL要求"""
if not connection:
return
try:
cursor = connection.cursor()
# 查看用户SSL要求
cursor.execute("""
SELECT User, Host, ssl_type, ssl_cipher, x509_issuer, x509_subject
FROM mysql.user
WHERE ssl_type != '' OR ssl_cipher != '' OR x509_issuer != '' OR x509_subject != ''
""")
results = cursor.fetchall()
if results:
print("\n用户SSL要求:")
print("-" * 50)
print(f"{'用户':<15} {'主机':<20} {'SSL类型':<10} {'密码套件':<15}")
print("-" * 50)
for result in results:
user, host, ssl_type, ssl_cipher, issuer, subject = result
print(f"{user:<15} {host:<20} {ssl_type:<10} {ssl_cipher:<15}")
else:
print("\n没有用户设置SSL要求")
except Exception as e:
print(f"检查SSL要求失败: {e}")
finally:
cursor.close()
# SSL连接测试
ssl_conn = MySQLSSLConnection()
connection = ssl_conn.connect_with_ssl()
if connection:
ssl_conn.check_ssl_status(connection)
ssl_conn.test_ssl_requirements(connection)
connection.close()
10.6 防火墙与网络安全
10.6.1 MySQL防火墙配置
1. 企业版防火墙插件
-- MySQL企业版防火墙配置
-- 1. 安装防火墙插件
INSTALL PLUGIN mysql_firewall SONAME 'firewall.so';
INSTALL PLUGIN mysql_firewall_users SONAME 'firewall.so';
INSTALL PLUGIN mysql_firewall_whitelist SONAME 'firewall.so';
-- 2. 查看防火墙状态
SHOW PLUGINS;
SELECT PLUGIN_NAME, PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS
WHERE PLUGIN_NAME LIKE '%firewall%';
-- 3. 启用防火墙
SET GLOBAL mysql_firewall_mode = ON;
SET GLOBAL mysql_firewall_trace = ON;
-- 4. 为用户启用防火墙
CALL mysql.sp_set_firewall_mode('app_user@localhost', 'RECORDING');
-- 5. 查看防火墙模式
SELECT * FROM mysql.firewall_users;
-- 6. 切换到保护模式
CALL mysql.sp_set_firewall_mode('app_user@localhost', 'PROTECTING');
-- 7. 查看防火墙规则
SELECT * FROM mysql.firewall_whitelist WHERE USERHOST = 'app_user@localhost';
-- 8. 手动添加防火墙规则
CALL mysql.sp_firewall_group_enlist('app_group', 'SELECT * FROM users WHERE id = ?');
CALL mysql.sp_firewall_group_enlist('app_group', 'UPDATE users SET last_login = NOW() WHERE id = ?');
-- 9. 查看被阻止的查询
SELECT * FROM performance_schema.events_statements_history
WHERE MESSAGE_TEXT LIKE '%Firewall%';
2. 自定义SQL防火墙
import re
import logging
from datetime import datetime, timedelta
from collections import defaultdict, Counter
class MySQLCustomFirewall:
def __init__(self):
self.blocked_patterns = [
# SQL注入模式
r"(?i)union\s+select",
r"(?i)or\s+1\s*=\s*1",
r"(?i)and\s+1\s*=\s*1",
r"(?i)drop\s+table",
r"(?i)delete\s+from\s+\w+\s+where\s+1\s*=\s*1",
r"(?i)update\s+\w+\s+set\s+.*\s+where\s+1\s*=\s*1",
# 危险函数
r"(?i)load_file\s*\(",
r"(?i)into\s+outfile",
r"(?i)into\s+dumpfile",
r"(?i)system\s*\(",
r"(?i)exec\s*\(",
# 信息泄露
r"(?i)information_schema\.(tables|columns|schemata)",
r"(?i)show\s+(tables|databases|columns)",
r"(?i)describe\s+\w+",
# 权限提升
r"(?i)grant\s+all",
r"(?i)grant\s+.*\s+with\s+grant\s+option",
r"(?i)create\s+user.*identified\s+by",
]
self.rate_limits = {
'queries_per_minute': 100,
'connections_per_minute': 10,
'failed_logins_per_minute': 5
}
self.user_activity = defaultdict(list)
self.blocked_queries = []
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/mysql/firewall.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def is_sql_malicious(self, sql_query):
"""检查SQL查询是否恶意"""
sql_query = sql_query.strip()
# 检查恶意模式
for pattern in self.blocked_patterns:
if re.search(pattern, sql_query, re.IGNORECASE):
return True, f"匹配恶意模式: {pattern}"
# 检查查询长度(可能的缓冲区溢出)
if len(sql_query) > 10000:
return True, "查询长度过长"
# 检查嵌套查询深度
nested_count = sql_query.upper().count('SELECT')
if nested_count > 5:
return True, "嵌套查询过深"
# 检查注释(可能隐藏恶意代码)
comment_patterns = [r'/\*.*?\*/', r'--.*$', r'#.*$']
for pattern in comment_patterns:
if re.search(pattern, sql_query, re.MULTILINE):
# 允许正常注释,但记录可疑注释
if re.search(r'(?i)(union|select|drop|delete|update).*?(/\*|--|#)', sql_query):
return True, "可疑注释模式"
return False, ""
def check_rate_limit(self, user, host, action_type):
"""检查速率限制"""
user_key = f"{user}@{host}"
current_time = datetime.now()
cutoff_time = current_time - timedelta(minutes=1)
# 清理过期记录
self.user_activity[user_key] = [
(timestamp, action) for timestamp, action in self.user_activity[user_key]
if timestamp > cutoff_time
]
# 统计当前分钟内的活动
recent_actions = [action for timestamp, action in self.user_activity[user_key]]
action_count = recent_actions.count(action_type)
# 检查限制
limit_key = f"{action_type}_per_minute"
if limit_key in self.rate_limits:
if action_count >= self.rate_limits[limit_key]:
return False, f"超过速率限制: {action_count}/{self.rate_limits[limit_key]} {action_type}/分钟"
# 记录活动
self.user_activity[user_key].append((current_time, action_type))
return True, ""
def validate_query(self, user, host, sql_query, connection_id=None):
"""验证查询"""
validation_result = {
'allowed': True,
'reason': '',
'risk_level': 'low',
'timestamp': datetime.now()
}
# 检查SQL恶意模式
is_malicious, malicious_reason = self.is_sql_malicious(sql_query)
if is_malicious:
validation_result.update({
'allowed': False,
'reason': f"恶意SQL检测: {malicious_reason}",
'risk_level': 'high'
})
# 记录被阻止的查询
self.blocked_queries.append({
'user': user,
'host': host,
'sql': sql_query,
'reason': malicious_reason,
'timestamp': datetime.now(),
'connection_id': connection_id
})
self.logger.warning(
f"阻止恶意查询 - 用户: {user}@{host}, 原因: {malicious_reason}, SQL: {sql_query[:100]}..."
)
return validation_result
# 检查速率限制
rate_allowed, rate_reason = self.check_rate_limit(user, host, 'queries')
if not rate_allowed:
validation_result.update({
'allowed': False,
'reason': rate_reason,
'risk_level': 'medium'
})
self.logger.warning(
f"阻止查询(速率限制) - 用户: {user}@{host}, 原因: {rate_reason}"
)
return validation_result
# 查询通过验证
self.logger.info(f"允许查询 - 用户: {user}@{host}, SQL: {sql_query[:50]}...")
return validation_result
def validate_connection(self, user, host, password_hash=None):
"""验证连接"""
validation_result = {
'allowed': True,
'reason': '',
'risk_level': 'low',
'timestamp': datetime.now()
}
# 检查连接速率限制
rate_allowed, rate_reason = self.check_rate_limit(user, host, 'connections')
if not rate_allowed:
validation_result.update({
'allowed': False,
'reason': rate_reason,
'risk_level': 'medium'
})
self.logger.warning(
f"阻止连接(速率限制) - 用户: {user}@{host}, 原因: {rate_reason}"
)
return validation_result
# 检查可疑主机
suspicious_hosts = ['192.168.1.100', '10.0.0.50'] # 示例黑名单
if host in suspicious_hosts:
validation_result.update({
'allowed': False,
'reason': f"主机在黑名单中: {host}",
'risk_level': 'high'
})
self.logger.error(f"阻止连接(黑名单主机) - 用户: {user}@{host}")
return validation_result
self.logger.info(f"允许连接 - 用户: {user}@{host}")
return validation_result
def get_firewall_statistics(self, hours=24):
"""获取防火墙统计信息"""
cutoff_time = datetime.now() - timedelta(hours=hours)
# 统计被阻止的查询
recent_blocked = [
query for query in self.blocked_queries
if query['timestamp'] > cutoff_time
]
# 按原因分组
block_reasons = Counter(query['reason'] for query in recent_blocked)
# 按用户分组
blocked_users = Counter(f"{query['user']}@{query['host']}" for query in recent_blocked)
# 统计用户活动
total_queries = 0
active_users = set()
for user_key, activities in self.user_activity.items():
recent_activities = [
activity for timestamp, activity in activities
if timestamp > cutoff_time
]
if recent_activities:
active_users.add(user_key)
total_queries += len([a for a in recent_activities if a == 'queries'])
return {
'total_blocked_queries': len(recent_blocked),
'total_queries': total_queries,
'active_users': len(active_users),
'block_reasons': dict(block_reasons),
'blocked_users': dict(blocked_users),
'block_rate': len(recent_blocked) / max(total_queries, 1) * 100
}
def generate_firewall_report(self, hours=24):
"""生成防火墙报告"""
stats = self.get_firewall_statistics(hours)
print(f"\n=== MySQL防火墙报告 ===")
print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"统计时间范围: 最近{hours}小时")
print("=" * 40)
print(f"\n📊 总体统计:")
print(f" 总查询数: {stats['total_queries']}")
print(f" 被阻止查询: {stats['total_blocked_queries']}")
print(f" 阻止率: {stats['block_rate']:.2f}%")
print(f" 活跃用户: {stats['active_users']}")
if stats['block_reasons']:
print(f"\n🚫 阻止原因统计:")
for reason, count in stats['block_reasons'].items():
print(f" {reason}: {count} 次")
if stats['blocked_users']:
print(f"\n👤 被阻止用户统计:")
for user, count in stats['blocked_users'].items():
print(f" {user}: {count} 次")
# 安全建议
print(f"\n💡 安全建议:")
if stats['block_rate'] > 10:
print(" • 阻止率较高,可能存在攻击行为")
if stats['total_blocked_queries'] > 100:
print(" • 大量查询被阻止,建议检查攻击源")
if len(stats['blocked_users']) > 5:
print(" • 多个用户被阻止,可能需要调整防火墙规则")
return stats
# 防火墙使用示例
firewall = MySQLCustomFirewall()
# 测试恶意查询
malicious_queries = [
"SELECT * FROM users UNION SELECT * FROM admin_users",
"SELECT * FROM products WHERE id = 1 OR 1=1",
"DROP TABLE users; --",
"SELECT LOAD_FILE('/etc/passwd')"
]
for query in malicious_queries:
result = firewall.validate_query('test_user', '192.168.1.10', query)
print(f"查询: {query[:50]}...")
print(f"结果: {'允许' if result['allowed'] else '阻止'} - {result['reason']}")
print()
# 生成防火墙报告
report = firewall.generate_firewall_report()
10.6.2 网络层安全配置
1. iptables防火墙规则
#!/bin/bash
# MySQL网络安全配置脚本
echo "=== 配置MySQL网络安全 ==="
# MySQL端口
MYSQL_PORT=3306
# 允许的IP地址段
ALLOWED_NETWORKS=(
"192.168.1.0/24" # 内网
"10.0.0.0/8" # VPN网络
"172.16.0.0/12" # 私有网络
)
# 管理员IP
ADMIN_IPS=(
"192.168.1.100"
"10.0.0.50"
)
# 1. 清除现有规则
echo "清除现有iptables规则..."
iptables -F
iptables -X
iptables -t nat -F
iptables -t nat -X
iptables -t mangle -F
iptables -t mangle -X
# 2. 设置默认策略
echo "设置默认策略..."
iptables -P INPUT DROP
iptables -P FORWARD DROP
iptables -P OUTPUT ACCEPT
# 3. 允许本地回环
iptables -A INPUT -i lo -j ACCEPT
iptables -A OUTPUT -o lo -j ACCEPT
# 4. 允许已建立的连接
iptables -A INPUT -m state --state ESTABLISHED,RELATED -j ACCEPT
# 5. 允许SSH(管理员IP)
echo "配置SSH访问规则..."
for admin_ip in "${ADMIN_IPS[@]}"; do
iptables -A INPUT -p tcp -s $admin_ip --dport 22 -j ACCEPT
echo " 允许SSH访问: $admin_ip"
done
# 6. 配置MySQL访问规则
echo "配置MySQL访问规则..."
# 允许指定网络访问MySQL
for network in "${ALLOWED_NETWORKS[@]}"; do
iptables -A INPUT -p tcp -s $network --dport $MYSQL_PORT -j ACCEPT
echo " 允许MySQL访问: $network"
done
# 7. 防止DDoS攻击
echo "配置DDoS防护..."
# 限制新连接速率
iptables -A INPUT -p tcp --dport $MYSQL_PORT -m state --state NEW -m limit --limit 10/minute --limit-burst 5 -j ACCEPT
iptables -A INPUT -p tcp --dport $MYSQL_PORT -m state --state NEW -j DROP
# 防止SYN flood攻击
iptables -A INPUT -p tcp --syn -m limit --limit 1/s --limit-burst 3 -j ACCEPT
iptables -A INPUT -p tcp --syn -j DROP
# 8. 记录被拒绝的连接
echo "配置日志记录..."
iptables -A INPUT -p tcp --dport $MYSQL_PORT -j LOG --log-prefix "MySQL-REJECT: " --log-level 4
iptables -A INPUT -p tcp --dport $MYSQL_PORT -j REJECT
# 9. 保存规则
echo "保存iptables规则..."
if command -v iptables-save >/dev/null 2>&1; then
iptables-save > /etc/iptables/rules.v4
echo "规则已保存到 /etc/iptables/rules.v4"
fi
# 10. 显示当前规则
echo "\n当前iptables规则:"
iptables -L -n -v
echo "\n=== MySQL网络安全配置完成 ==="
2. 网络监控脚本
#!/bin/bash
# MySQL网络连接监控脚本
MYSQL_PORT=3306
LOG_FILE="/var/log/mysql/network_monitor.log"
ALERT_THRESHOLD=50
echo "=== MySQL网络监控启动 ==="
echo "监控端口: $MYSQL_PORT"
echo "日志文件: $LOG_FILE"
echo "告警阈值: $ALERT_THRESHOLD 连接/分钟"
echo
# 创建日志目录
mkdir -p $(dirname $LOG_FILE)
# 监控函数
monitor_connections() {
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
# 获取当前MySQL连接
local mysql_connections=$(netstat -an | grep ":$MYSQL_PORT" | grep ESTABLISHED | wc -l)
# 获取连接来源IP统计
local connection_ips=$(netstat -an | grep ":$MYSQL_PORT" | grep ESTABLISHED |
awk '{print $5}' | cut -d: -f1 | sort | uniq -c | sort -nr)
# 记录到日志
echo "[$timestamp] MySQL连接数: $mysql_connections" >> $LOG_FILE
# 检查是否超过阈值
if [ $mysql_connections -gt $ALERT_THRESHOLD ]; then
echo "[$timestamp] 警告: MySQL连接数过高 ($mysql_connections > $ALERT_THRESHOLD)" >> $LOG_FILE
# 发送告警
send_alert "MySQL连接数过高" "当前连接数: $mysql_connections"
fi
# 检查可疑连接
echo "$connection_ips" | while read count ip; do
if [ "$count" -gt 10 ]; then
echo "[$timestamp] 可疑IP: $ip 有 $count 个连接" >> $LOG_FILE
# 检查是否为已知恶意IP
check_malicious_ip "$ip" "$count"
fi
done
# 显示实时状态
echo "[$timestamp] 连接监控:"
echo " 总连接数: $mysql_connections"
echo " 连接来源统计:"
echo "$connection_ips" | head -5 | while read count ip; do
echo " $ip: $count 个连接"
done
echo
}
# 检查恶意IP
check_malicious_ip() {
local ip=$1
local count=$2
# 恶意IP黑名单(示例)
local blacklist=("192.168.1.100" "10.0.0.50")
for bad_ip in "${blacklist[@]}"; do
if [ "$ip" = "$bad_ip" ]; then
echo "[$(date '+%Y-%m-%d %H:%M:%S')] 检测到黑名单IP: $ip" >> $LOG_FILE
# 自动阻止IP
block_ip "$ip"
return
fi
done
# 检查连接数是否异常
if [ "$count" -gt 20 ]; then
echo "[$(date '+%Y-%m-%d %H:%M:%S')] 异常连接数IP: $ip ($count 连接)" >> $LOG_FILE
send_alert "异常连接数" "IP $ip 有 $count 个连接"
fi
}
# 阻止IP
block_ip() {
local ip=$1
echo "阻止恶意IP: $ip"
iptables -A INPUT -s $ip -j DROP
# 记录阻止操作
echo "[$(date '+%Y-%m-%d %H:%M:%S')] 已阻止IP: $ip" >> $LOG_FILE
# 发送告警
send_alert "IP已被阻止" "恶意IP $ip 已被自动阻止"
}
# 发送告警
send_alert() {
local alert_type="$1"
local alert_message="$2"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
echo "[$timestamp] 告警: $alert_type - $alert_message"
# 发送邮件(需要配置邮件服务)
if command -v mail >/dev/null 2>&1; then
echo "时间: $timestamp\n类型: $alert_type\n详情: $alert_message" | \
mail -s "MySQL网络安全告警: $alert_type" "admin@example.com"
fi
# 记录到系统日志
logger "MySQL Network Alert: $alert_type - $alert_message"
}
# 清理旧日志
cleanup_logs() {
find $(dirname $LOG_FILE) -name "*.log" -mtime +7 -delete
}
# 主监控循环
while true; do
monitor_connections
# 每小时清理一次日志
if [ $(($(date +%M))) -eq 0 ]; then
cleanup_logs
fi
sleep 60
done
10.7 安全最佳实践
10.7.1 安全配置检查清单
class MySQLSecurityAudit:
def __init__(self, connection):
self.connection = connection
self.security_issues = []
self.recommendations = []
def check_user_security(self):
"""检查用户安全配置"""
try:
cursor = self.connection.cursor(dictionary=True)
print("\n🔐 用户安全检查:")
print("-" * 30)
# 1. 检查空密码用户
cursor.execute("""
SELECT User, Host FROM mysql.user
WHERE authentication_string = '' OR Password = ''
""")
empty_password_users = cursor.fetchall()
if empty_password_users:
issue = f"发现 {len(empty_password_users)} 个空密码用户"
self.security_issues.append(issue)
print(f" ❌ {issue}")
for user in empty_password_users:
print(f" - {user['User']}@{user['Host']}")
self.recommendations.append("为所有用户设置强密码")
else:
print(" ✅ 未发现空密码用户")
# 2. 检查默认用户
cursor.execute("""
SELECT User, Host FROM mysql.user
WHERE User IN ('', 'root', 'test', 'guest', 'admin')
""")
default_users = cursor.fetchall()
risky_users = [u for u in default_users if u['User'] in ['', 'test', 'guest']]
if risky_users:
issue = f"发现 {len(risky_users)} 个危险默认用户"
self.security_issues.append(issue)
print(f" ❌ {issue}")
for user in risky_users:
print(f" - {user['User']}@{user['Host']}")
self.recommendations.append("删除不必要的默认用户")
else:
print(" ✅ 未发现危险默认用户")
# 3. 检查远程root用户
cursor.execute("""
SELECT User, Host FROM mysql.user
WHERE User = 'root' AND Host != 'localhost' AND Host != '127.0.0.1'
""")
remote_root_users = cursor.fetchall()
if remote_root_users:
issue = f"发现 {len(remote_root_users)} 个远程root用户"
self.security_issues.append(issue)
print(f" ❌ {issue}")
for user in remote_root_users:
print(f" - {user['User']}@{user['Host']}")
self.recommendations.append("禁用远程root登录")
else:
print(" ✅ 未发现远程root用户")
# 4. 检查过度权限用户
cursor.execute("""
SELECT User, Host FROM mysql.user
WHERE Super_priv = 'Y' OR File_priv = 'Y' OR Process_priv = 'Y'
""")
privileged_users = cursor.fetchall()
if len(privileged_users) > 2: # 通常只有root和备份用户需要超级权限
issue = f"发现 {len(privileged_users)} 个高权限用户(可能过多)"
self.security_issues.append(issue)
print(f" ⚠️ {issue}")
self.recommendations.append("审查用户权限,遵循最小权限原则")
else:
print(f" ✅ 高权限用户数量合理 ({len(privileged_users)} 个)")
except Exception as e:
print(f"用户安全检查失败: {e}")
finally:
cursor.close()
def check_configuration_security(self):
"""检查配置安全"""
try:
cursor = self.connection.cursor(dictionary=True)
print("\n⚙️ 配置安全检查:")
print("-" * 30)
# 1. 检查SSL配置
cursor.execute("SHOW VARIABLES LIKE 'have_ssl'")
ssl_status = cursor.fetchone()
if ssl_status and ssl_status['Value'] == 'YES':
print(" ✅ SSL已启用")
else:
issue = "SSL未启用"
self.security_issues.append(issue)
print(f" ❌ {issue}")
self.recommendations.append("启用SSL加密连接")
# 2. 检查日志配置
cursor.execute("SHOW VARIABLES LIKE 'general_log'")
general_log = cursor.fetchone()
cursor.execute("SHOW VARIABLES LIKE 'log_bin'")
binary_log = cursor.fetchone()
if general_log and general_log['Value'] == 'ON':
print(" ✅ 通用日志已启用")
else:
print(" ⚠️ 通用日志未启用")
self.recommendations.append("考虑启用通用日志以便审计")
if binary_log and binary_log['Value'] == 'ON':
print(" ✅ 二进制日志已启用")
else:
print(" ⚠️ 二进制日志未启用")
self.recommendations.append("启用二进制日志以便数据恢复")
# 3. 检查网络配置
cursor.execute("SHOW VARIABLES LIKE 'bind_address'")
bind_address = cursor.fetchone()
if bind_address and bind_address['Value'] not in ['0.0.0.0', '*']:
print(f" ✅ 绑定地址配置安全: {bind_address['Value']}")
else:
issue = "MySQL绑定到所有网络接口"
self.security_issues.append(issue)
print(f" ❌ {issue}")
self.recommendations.append("配置bind_address限制网络访问")
# 4. 检查密码验证插件
cursor.execute("SHOW VARIABLES LIKE 'validate_password%'")
password_validation = cursor.fetchall()
if password_validation:
print(" ✅ 密码验证插件已配置")
for var in password_validation:
if var['Variable_name'] == 'validate_password.policy':
print(f" 策略: {var['Value']}")
else:
issue = "密码验证插件未启用"
self.security_issues.append(issue)
print(f" ❌ {issue}")
self.recommendations.append("启用密码验证插件强化密码策略")
except Exception as e:
print(f"配置安全检查失败: {e}")
finally:
cursor.close()
def check_database_security(self):
"""检查数据库安全"""
try:
cursor = self.connection.cursor(dictionary=True)
print("\n🗄️ 数据库安全检查:")
print("-" * 30)
# 1. 检查test数据库
cursor.execute("SHOW DATABASES LIKE 'test'")
test_db = cursor.fetchone()
if test_db:
issue = "test数据库仍然存在"
self.security_issues.append(issue)
print(f" ❌ {issue}")
self.recommendations.append("删除test数据库")
else:
print(" ✅ test数据库已删除")
# 2. 检查数据库权限
cursor.execute("""
SELECT DISTINCT User, Host, Db FROM mysql.db
WHERE Db = 'mysql' AND User != 'root'
""")
mysql_db_access = cursor.fetchall()
if mysql_db_access:
issue = f"发现 {len(mysql_db_access)} 个非root用户可访问mysql数据库"
self.security_issues.append(issue)
print(f" ❌ {issue}")
for access in mysql_db_access:
print(f" - {access['User']}@{access['Host']}")
self.recommendations.append("限制对mysql系统数据库的访问")
else:
print(" ✅ mysql数据库访问权限配置安全")
# 3. 检查表加密状态
cursor.execute("""
SELECT TABLE_SCHEMA, TABLE_NAME, CREATE_OPTIONS
FROM information_schema.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
AND TABLE_SCHEMA NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
LIMIT 10
""")
tables = cursor.fetchall()
encrypted_tables = [t for t in tables if 'ENCRYPTION' in str(t.get('CREATE_OPTIONS', ''))]
if encrypted_tables:
print(f" ✅ 发现 {len(encrypted_tables)} 个加密表")
else:
print(" ⚠️ 未发现加密表")
self.recommendations.append("考虑对敏感数据表启用加密")
except Exception as e:
print(f"数据库安全检查失败: {e}")
finally:
cursor.close()
def generate_security_report(self):
"""生成安全报告"""
print("\n=== MySQL安全审计报告 ===")
print(f"审计时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 50)
# 执行所有检查
self.check_user_security()
self.check_configuration_security()
self.check_database_security()
# 汇总结果
print("\n📋 安全问题汇总:")
print("-" * 30)
if self.security_issues:
print(f"发现 {len(self.security_issues)} 个安全问题:")
for i, issue in enumerate(self.security_issues, 1):
print(f" {i}. {issue}")
else:
print("✅ 未发现严重安全问题")
print("\n💡 安全建议:")
print("-" * 30)
if self.recommendations:
for i, rec in enumerate(self.recommendations, 1):
print(f" {i}. {rec}")
else:
print("✅ 当前配置符合安全最佳实践")
# 安全评分
total_checks = 10 # 总检查项数
security_score = max(0, (total_checks - len(self.security_issues)) / total_checks * 100)
print(f"\n🏆 安全评分: {security_score:.1f}/100")
if security_score >= 90:
print("安全等级: 优秀 ✅")
elif security_score >= 70:
print("安全等级: 良好 ⚠️")
elif security_score >= 50:
print("安全等级: 一般 ⚠️")
else:
print("安全等级: 危险 ❌")
return {
'security_score': security_score,
'issues_count': len(self.security_issues),
'recommendations_count': len(self.recommendations),
'issues': self.security_issues,
'recommendations': self.recommendations
}
# 安全审计使用示例
import mysql.connector
connection = mysql.connector.connect(
host='localhost',
user='root',
password='password'
)
audit = MySQLSecurityAudit(connection)
report = audit.generate_security_report()
connection.close()
10.7.2 安全维护计划
class MySQLSecurityMaintenance:
def __init__(self):
self.maintenance_tasks = {
'daily': [
'检查审计日志异常',
'监控失败登录尝试',
'检查系统资源使用',
'验证备份完整性'
],
'weekly': [
'更新密码策略',
'审查用户权限',
'检查SSL证书有效期',
'分析安全日志趋势',
'测试灾难恢复流程'
],
'monthly': [
'全面安全审计',
'更新安全补丁',
'权限清理和优化',
'安全配置基线检查',
'渗透测试评估'
],
'quarterly': [
'安全策略评估',
'员工安全培训',
'第三方安全评估',
'灾难恢复演练',
'合规性检查'
]
}
def generate_maintenance_schedule(self):
"""生成维护计划"""
print("\n=== MySQL安全维护计划 ===")
print(f"制定时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 50)
for frequency, tasks in self.maintenance_tasks.items():
print(f"\n📅 {frequency.upper()} 维护任务:")
print("-" * 30)
for i, task in enumerate(tasks, 1):
print(f" {i}. {task}")
print("\n💡 维护建议:")
print("-" * 30)
print(" • 建立自动化监控和告警机制")
print(" • 定期备份和测试恢复流程")
print(" • 保持MySQL版本更新")
print(" • 建立安全事件响应流程")
print(" • 定期进行安全培训")
return self.maintenance_tasks
# 维护计划示例
maintenance = MySQLSecurityMaintenance()
schedule = maintenance.generate_maintenance_schedule()
10.8 总结
本章详细介绍了MySQL的安全与权限管理,涵盖了以下核心内容:
10.8.1 核心要点回顾
安全基础
- 安全威胁识别与风险评估
- 安全配置基线建立
- 系统安全加固措施
用户管理与权限控制
- 用户账户生命周期管理
- 细粒度权限分配
- 权限审计与监控
SSL/TLS加密
- 传输层安全配置
- 证书管理与轮换
- 强制加密连接
数据加密
- 透明数据加密(TDE)
- 应用层字段加密
- 密钥管理策略
审计日志与监控
- 审计日志配置与分析
- 实时安全监控
- 异常行为检测
防火墙与网络安全
- MySQL防火墙配置
- 网络层访问控制
- DDoS防护措施
安全最佳实践
- 安全配置检查清单
- 定期安全审计
- 维护计划制定
10.8.2 实施建议
分阶段实施
- 第一阶段:基础安全配置
- 第二阶段:权限管理优化
- 第三阶段:加密和审计
- 第四阶段:高级防护措施
持续改进
- 定期安全评估
- 及时更新安全策略
- 跟踪安全威胁趋势
- 优化安全配置
团队协作
- 建立安全责任制
- 定期安全培训
- 制定应急响应流程
- 加强安全意识
10.8.3 下一步学习方向
掌握了MySQL安全与权限管理后,建议继续学习:
- 第11章:MySQL备份与恢复 - 学习数据保护和灾难恢复
- 第12章:MySQL高可用架构 - 了解集群和复制技术
- 第13章:MySQL运维自动化 - 掌握自动化运维工具
- 第14章:MySQL故障排除 - 学习问题诊断和解决
通过本章的学习,您应该能够: - 建立完整的MySQL安全体系 - 实施有效的权限管理策略 - 配置数据加密和传输安全 - 建立安全监控和审计机制 - 制定安全维护和应急响应计划
安全是一个持续的过程,需要不断学习新的威胁和防护技术,保持对安全最佳实践的关注和应用。
10.3.2 传输加密最佳实践
1. 加密配置优化
-- 传输加密配置优化
-- 1. 设置强制SSL连接
SET GLOBAL require_secure_transport = ON;
-- 2. 配置TLS版本
SET GLOBAL tls_version = 'TLSv1.2,TLSv1.3';
-- 3. 查看支持的密码套件
SHOW STATUS LIKE 'Ssl_cipher_list';
-- 4. 为不同用户设置不同的SSL要求
-- 普通SSL连接
CREATE USER 'ssl_user'@'%' IDENTIFIED BY 'password' REQUIRE SSL;
-- 需要客户端证书
CREATE USER 'cert_user'@'%' IDENTIFIED BY 'password' REQUIRE X509;
-- 指定证书颁发者
CREATE USER 'issuer_user'@'%' IDENTIFIED BY 'password'
REQUIRE ISSUER '/C=CN/ST=Beijing/L=Beijing/O=MySQL CA/CN=MySQL CA';
-- 指定证书主题
CREATE USER 'subject_user'@'%' IDENTIFIED BY 'password'
REQUIRE SUBJECT '/C=CN/ST=Beijing/L=Beijing/O=MySQL Client/CN=MySQL Client';
-- 指定密码套件
CREATE USER 'cipher_user'@'%' IDENTIFIED BY 'password'
REQUIRE CIPHER 'ECDHE-RSA-AES256-GCM-SHA384';
-- 组合要求
CREATE USER 'secure_user'@'%' IDENTIFIED BY 'password'
REQUIRE SSL AND X509 AND CIPHER 'ECDHE-RSA-AES256-GCM-SHA384';
2. SSL监控脚本
#!/bin/bash
# MySQL SSL连接监控脚本
echo "=== MySQL SSL连接监控 ==="
echo "监控时间: $(date)"
echo
# 配置参数
MYSQL_USER="root"
MYSQL_PASS=""
MONITOR_RESULT="/tmp/mysql_ssl_monitor_$(date +%Y%m%d_%H%M%S).log"
# 创建监控结果文件
exec > >(tee -a $MONITOR_RESULT)
exec 2>&1
echo "监控结果: $MONITOR_RESULT"
echo
# 1. SSL状态检查
echo "1. SSL状态检查:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'have_ssl';" 2>/dev/null
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW VARIABLES LIKE 'ssl_%';" 2>/dev/null
echo
# 2. 当前SSL连接统计
echo "2. 当前SSL连接统计:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "SHOW STATUS LIKE 'Ssl_%';" 2>/dev/null
echo
# 3. 活跃连接的SSL状态
echo "3. 活跃连接的SSL状态:"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "
SELECT
ID,
USER,
HOST,
DB,
COMMAND,
TIME,
STATE,
CASE
WHEN CONNECTION_TYPE = 'SSL/TLS' THEN 'SSL'
ELSE 'Non-SSL'
END AS CONNECTION_TYPE
FROM information_schema.PROCESSLIST
WHERE USER != 'system user'
ORDER BY CONNECTION_TYPE, TIME DESC;
" 2>/dev/null
echo
# 4. SSL证书信息
echo "4. SSL证书信息:"
if [ -f "/etc/mysql/ssl/server-cert.pem" ]; then
echo "服务器证书信息:"
openssl x509 -in /etc/mysql/ssl/server-cert.pem -text -noout | grep -E "Subject:|Issuer:|Not Before:|Not After:"
echo
# 检查证书过期时间
CERT_EXPIRY=$(openssl x509 -in /etc/mysql/ssl/server-cert.pem -enddate -noout | cut -d= -f2)
EXPIRY_TIMESTAMP=$(date -d "$CERT_EXPIRY" +%s)
CURRENT_TIMESTAMP=$(date +%s)
DAYS_TO_EXPIRY=$(( (EXPIRY_TIMESTAMP - CURRENT_TIMESTAMP) / 86400 ))
echo "证书过期检查:"
echo "过期时间: $CERT_EXPIRY"
echo "剩余天数: $DAYS_TO_EXPIRY 天"
if [ $DAYS_TO_EXPIRY -lt 30 ]; then
echo "⚠️ 证书将在30天内过期,请及时更新!"
elif [ $DAYS_TO_EXPIRY -lt 7 ]; then
echo "❌ 证书将在7天内过期,请立即更新!"
else
echo "✅ 证书有效期充足"
fi
else
echo "❌ 服务器证书文件不存在"
fi
echo
# 5. 非SSL连接警告
echo "5. 非SSL连接检查:"
NON_SSL_CONNECTIONS=$(mysql -u$MYSQL_USER -p$MYSQL_PASS -e "
SELECT COUNT(*) as non_ssl_count
FROM information_schema.PROCESSLIST
WHERE USER != 'system user' AND CONNECTION_TYPE != 'SSL/TLS';
" 2>/dev/null | tail -1)
if [ "$NON_SSL_CONNECTIONS" -gt 0 ]; then
echo "⚠️ 发现 $NON_SSL_CONNECTIONS 个非SSL连接"
mysql -u$MYSQL_USER -p$MYSQL_PASS -e "
SELECT ID, USER, HOST, DB, CONNECTION_TYPE
FROM information_schema.PROCESSLIST
WHERE USER != 'system user' AND CONNECTION_TYPE != 'SSL/TLS';
" 2>/dev/null
else
echo "✅ 所有连接都使用SSL加密"
fi
echo
echo "=== SSL监控完成 ==="
echo "详细结果已保存到: $MONITOR_RESULT"
10.4 数据加密
10.4.1 透明数据加密(TDE)
1. InnoDB表空间加密
-- InnoDB表空间加密配置
-- 1. 启用加密插件
INSTALL PLUGIN keyring_file SONAME 'keyring_file.so';
-- 2. 查看加密状态
SHOW PLUGINS;
SELECT PLUGIN_NAME, PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS
WHERE PLUGIN_NAME LIKE 'keyring%';
-- 3. 创建加密表
CREATE TABLE encrypted_users (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100),
password_hash VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB ENCRYPTION='Y';
-- 4. 修改现有表启用加密
ALTER TABLE users ENCRYPTION='Y';
-- 5. 禁用表加密
ALTER TABLE users ENCRYPTION='N';
-- 6. 查看表加密状态
SELECT
TABLE_SCHEMA,
TABLE_NAME,
CREATE_OPTIONS
FROM INFORMATION_SCHEMA.TABLES
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%';
-- 7. 设置默认加密
SET GLOBAL default_table_encryption = ON;
-- 8. 查看加密相关变量
SHOW VARIABLES LIKE '%encrypt%';
SHOW VARIABLES LIKE '%keyring%';
2. 二进制日志加密
-- 二进制日志加密配置
-- 1. 启用二进制日志加密
SET GLOBAL binlog_encryption = ON;
-- 2. 查看二进制日志加密状态
SHOW VARIABLES LIKE 'binlog_encryption';
-- 3. 轮换二进制日志加密密钥
ALTER INSTANCE ROTATE BINLOG MASTER KEY;
-- 4. 查看加密密钥信息
SELECT * FROM performance_schema.keyring_keys;
3. 加密配置文件
# MySQL加密配置 (my.cnf)
[mysqld]
# Keyring插件配置
early-plugin-load = keyring_file.so
keyring_file_data = /var/lib/mysql-keyring/keyring
# 表空间加密
default_table_encryption = ON
innodb_redo_log_encrypt = ON
innodb_undo_log_encrypt = ON
# 二进制日志加密
binlog_encryption = ON
binlog_rotate_encryption_master_key_at_startup = ON
# 临时文件加密
big_tables = 1
tmp_table_size = 32M
max_heap_table_size = 32M
4. 加密管理脚本
class MySQLEncryptionManager:
def __init__(self, host='localhost', user='root', password=''):
self.host = host
self.user = user
self.password = password
self.connection = None
def connect(self):
"""连接到MySQL"""
import mysql.connector
try:
self.connection = mysql.connector.connect(
host=self.host,
user=self.user,
password=self.password
)
return True
except Exception as e:
print(f"连接失败: {e}")
return False
def check_encryption_support(self):
"""检查加密支持"""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
print("加密支持检查:")
print("-" * 40)
# 检查keyring插件
cursor.execute("""
SELECT PLUGIN_NAME, PLUGIN_STATUS
FROM INFORMATION_SCHEMA.PLUGINS
WHERE PLUGIN_NAME LIKE 'keyring%'
""")
plugins = cursor.fetchall()
if plugins:
print("Keyring插件状态:")
for plugin_name, status in plugins:
print(f" {plugin_name}: {status}")
else:
print("❌ 未安装keyring插件")
# 检查加密变量
encryption_vars = [
'default_table_encryption',
'binlog_encryption',
'innodb_redo_log_encrypt',
'innodb_undo_log_encrypt'
]
print("\n加密配置状态:")
for var in encryption_vars:
cursor.execute(f"SHOW VARIABLES LIKE '{var}'")
result = cursor.fetchone()
if result:
print(f" {result[0]}: {result[1]}")
return True
except Exception as e:
print(f"检查加密支持失败: {e}")
return False
finally:
cursor.close()
def list_encrypted_tables(self):
"""列出加密表"""
if not self.connection:
return []
try:
cursor = self.connection.cursor()
cursor.execute("""
SELECT
TABLE_SCHEMA,
TABLE_NAME,
CREATE_OPTIONS
FROM INFORMATION_SCHEMA.TABLES
WHERE CREATE_OPTIONS LIKE '%ENCRYPTION%'
ORDER BY TABLE_SCHEMA, TABLE_NAME
""")
tables = cursor.fetchall()
if tables:
print("加密表列表:")
print("-" * 60)
print(f"{'数据库':<20} {'表名':<25} {'加密选项':<15}")
print("-" * 60)
for schema, table, options in tables:
print(f"{schema:<20} {table:<25} {options:<15}")
else:
print("未找到加密表")
return tables
except Exception as e:
print(f"查询加密表失败: {e}")
return []
finally:
cursor.close()
def encrypt_table(self, database, table):
"""加密表"""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
# 检查表是否存在
cursor.execute(f"USE {database}")
cursor.execute(f"SHOW TABLES LIKE '{table}'")
if not cursor.fetchone():
print(f"表 {database}.{table} 不存在")
return False
# 启用表加密
alter_sql = f"ALTER TABLE {database}.{table} ENCRYPTION='Y'"
cursor.execute(alter_sql)
self.connection.commit()
print(f"表 {database}.{table} 加密成功")
return True
except Exception as e:
print(f"加密表失败: {e}")
return False
finally:
cursor.close()
def decrypt_table(self, database, table):
"""解密表"""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
# 禁用表加密
alter_sql = f"ALTER TABLE {database}.{table} ENCRYPTION='N'"
cursor.execute(alter_sql)
self.connection.commit()
print(f"表 {database}.{table} 解密成功")
return True
except Exception as e:
print(f"解密表失败: {e}")
return False
finally:
cursor.close()
def rotate_master_key(self):
"""轮换主密钥"""
if not self.connection:
return False
try:
cursor = self.connection.cursor()
# 轮换二进制日志主密钥
cursor.execute("ALTER INSTANCE ROTATE BINLOG MASTER KEY")
# 轮换InnoDB主密钥
cursor.execute("ALTER INSTANCE ROTATE INNODB MASTER KEY")
self.connection.commit()
print("主密钥轮换成功")
return True
except Exception as e:
print(f"轮换主密钥失败: {e}")
return False
finally:
cursor.close()
def show_keyring_keys(self):
"""显示密钥环密钥"""
if not self.connection:
return
try:
cursor = self.connection.cursor()
cursor.execute("SELECT * FROM performance_schema.keyring_keys")
keys = cursor.fetchall()
if keys:
print("密钥环密钥:")
print("-" * 50)
print(f"{'密钥ID':<30} {'密钥所有者':<20}")
print("-" * 50)
for key_id, key_owner in keys:
print(f"{key_id:<30} {key_owner:<20}")
else:
print("密钥环中没有密钥")
except Exception as e:
print(f"查询密钥失败: {e}")
finally:
cursor.close()
def close(self):
"""关闭连接"""
if self.connection:
self.connection.close()
# 加密管理演示
enc_manager = MySQLEncryptionManager()
if enc_manager.connect():
enc_manager.check_encryption_support()
enc_manager.list_encrypted_tables()
enc_manager.show_keyring_keys()
enc_manager.close()
10.4.2 应用层数据加密
1. 字段级加密
-- 字段级加密函数
-- 1. 使用AES加密函数
SELECT AES_ENCRYPT('sensitive_data', 'encryption_key') as encrypted_data;
SELECT AES_DECRYPT(encrypted_column, 'encryption_key') as decrypted_data FROM table_name;
-- 2. 创建带加密字段的表
CREATE TABLE user_profiles (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100),
-- 加密存储敏感信息
encrypted_ssn VARBINARY(255),
encrypted_phone VARBINARY(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_username (username)
);
-- 3. 插入加密数据
INSERT INTO user_profiles (username, email, encrypted_ssn, encrypted_phone)
VALUES (
'john_doe',
'john@example.com',
AES_ENCRYPT('123-45-6789', SHA2('encryption_key_ssn', 256)),
AES_ENCRYPT('555-1234', SHA2('encryption_key_phone', 256))
);
-- 4. 查询解密数据
SELECT
id,
username,
email,
AES_DECRYPT(encrypted_ssn, SHA2('encryption_key_ssn', 256)) as ssn,
AES_DECRYPT(encrypted_phone, SHA2('encryption_key_phone', 256)) as phone
FROM user_profiles
WHERE username = 'john_doe';
-- 5. 创建加密视图
CREATE VIEW user_profiles_decrypted AS
SELECT
id,
username,
email,
CAST(AES_DECRYPT(encrypted_ssn, SHA2('encryption_key_ssn', 256)) AS CHAR) as ssn,
CAST(AES_DECRYPT(encrypted_phone, SHA2('encryption_key_phone', 256)) AS CHAR) as phone,
created_at
FROM user_profiles;
2. 加密函数封装
class MySQLFieldEncryption:
def __init__(self, connection, master_key):
self.connection = connection
self.master_key = master_key
def generate_field_key(self, field_name):
"""为特定字段生成加密密钥"""
import hashlib
combined = f"{self.master_key}_{field_name}"
return hashlib.sha256(combined.encode()).hexdigest()
def encrypt_field(self, data, field_name):
"""加密字段数据"""
if not data:
return None
try:
cursor = self.connection.cursor()
field_key = self.generate_field_key(field_name)
# 使用MySQL的AES_ENCRYPT函数
cursor.execute(
"SELECT AES_ENCRYPT(%s, %s) as encrypted_data",
(data, field_key)
)
result = cursor.fetchone()
return result[0] if result else None
except Exception as e:
print(f"加密失败: {e}")
return None
finally:
cursor.close()
def decrypt_field(self, encrypted_data, field_name):
"""解密字段数据"""
if not encrypted_data:
return None
try:
cursor = self.connection.cursor()
field_key = self.generate_field_key(field_name)
# 使用MySQL的AES_DECRYPT函数
cursor.execute(
"SELECT AES_DECRYPT(%s, %s) as decrypted_data",
(encrypted_data, field_key)
)
result = cursor.fetchone()
if result and result[0]:
return result[0].decode('utf-8')
return None
except Exception as e:
print(f"解密失败: {e}")
return None
finally:
cursor.close()
def create_encrypted_table(self, table_name, schema):
"""创建带加密字段的表"""
try:
cursor = self.connection.cursor()
# 构建CREATE TABLE语句
columns = []
for col_name, col_def in schema.items():
if col_def.get('encrypted', False):
# 加密字段使用VARBINARY类型
columns.append(f"{col_name} VARBINARY(255)")
else:
columns.append(f"{col_name} {col_def['type']}")
create_sql = f"CREATE TABLE {table_name} ({', '.join(columns)})"
cursor.execute(create_sql)
self.connection.commit()
print(f"加密表 {table_name} 创建成功")
return True
except Exception as e:
print(f"创建加密表失败: {e}")
return False
finally:
cursor.close()
def insert_encrypted_data(self, table_name, data, schema):
"""插入加密数据"""
try:
cursor = self.connection.cursor()
# 处理加密字段
processed_data = {}
for field_name, value in data.items():
if schema.get(field_name, {}).get('encrypted', False):
# 加密敏感字段
processed_data[field_name] = self.encrypt_field(value, field_name)
else:
processed_data[field_name] = value
# 构建INSERT语句
columns = list(processed_data.keys())
placeholders = ', '.join(['%s'] * len(columns))
values = list(processed_data.values())
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
cursor.execute(insert_sql, values)
self.connection.commit()
print(f"加密数据插入成功")
return cursor.lastrowid
except Exception as e:
print(f"插入加密数据失败: {e}")
return None
finally:
cursor.close()
def select_decrypted_data(self, table_name, schema, where_clause=None, params=None):
"""查询并解密数据"""
try:
cursor = self.connection.cursor(dictionary=True)
# 构建SELECT语句
select_sql = f"SELECT * FROM {table_name}"
if where_clause:
select_sql += f" WHERE {where_clause}"
cursor.execute(select_sql, params or [])
results = cursor.fetchall()
# 解密敏感字段
decrypted_results = []
for row in results:
decrypted_row = {}
for field_name, value in row.items():
if schema.get(field_name, {}).get('encrypted', False):
# 解密敏感字段
decrypted_row[field_name] = self.decrypt_field(value, field_name)
else:
decrypted_row[field_name] = value
decrypted_results.append(decrypted_row)
return decrypted_results
except Exception as e:
print(f"查询解密数据失败: {e}")
return []
finally:
cursor.close()
# 字段加密使用示例
import mysql.connector
# 连接数据库
connection = mysql.connector.connect(
host='localhost',
user='root',
password='password',
database='test_db'
)
# 创建加密管理器
encryption = MySQLFieldEncryption(connection, 'master_encryption_key_2024')
# 定义表结构
user_schema = {
'id': {'type': 'INT PRIMARY KEY AUTO_INCREMENT', 'encrypted': False},
'username': {'type': 'VARCHAR(50) NOT NULL', 'encrypted': False},
'email': {'type': 'VARCHAR(100)', 'encrypted': False},
'ssn': {'type': 'VARCHAR(20)', 'encrypted': True},
'phone': {'type': 'VARCHAR(20)', 'encrypted': True},
'created_at': {'type': 'TIMESTAMP DEFAULT CURRENT_TIMESTAMP', 'encrypted': False}
}
# 创建加密表
encryption.create_encrypted_table('secure_users', user_schema)
# 插入加密数据
user_data = {
'username': 'john_doe',
'email': 'john@example.com',
'ssn': '123-45-6789',
'phone': '555-1234'
}
user_id = encryption.insert_encrypted_data('secure_users', user_data, user_schema)
# 查询解密数据
results = encryption.select_decrypted_data(
'secure_users',
user_schema,
'username = %s',
['john_doe']
)
for user in results:
print(f"用户: {user['username']}, SSN: {user['ssn']}, 电话: {user['phone']}")
connection.close()
10.5 审计日志与监控
10.5.1 MySQL审计日志配置
1. 审计日志插件安装
-- 安装审计日志插件
-- 1. 安装audit_log插件(MySQL Enterprise版本)
INSTALL PLUGIN audit_log SONAME 'audit_log.so';
-- 2. 查看插件状态
SHOW PLUGINS;
SELECT PLUGIN_NAME, PLUGIN_STATUS FROM INFORMATION_SCHEMA.PLUGINS
WHERE PLUGIN_NAME = 'audit_log';
-- 3. 配置审计日志
SET GLOBAL audit_log_policy = 'ALL';
SET GLOBAL audit_log_format = 'JSON';
SET GLOBAL audit_log_file = '/var/log/mysql/audit.log';
SET GLOBAL audit_log_rotate_on_size = 1073741824; -- 1GB
SET GLOBAL audit_log_rotations = 10;
-- 4. 查看审计配置
SHOW VARIABLES LIKE 'audit_log%';
2. 审计日志配置文件
# MySQL审计日志配置 (my.cnf)
[mysqld]
# 审计日志插件
plugin-load-add = audit_log.so
# 审计日志基础配置
audit_log_policy = ALL
audit_log_format = JSON
audit_log_file = /var/log/mysql/audit.log
# 审计日志轮换
audit_log_rotate_on_size = 1073741824 # 1GB
audit_log_rotations = 10
audit_log_flush = ON
# 审计过滤配置
audit_log_connection_policy = ALL
audit_log_statement_policy = ALL
# 排除系统用户
audit_log_exclude_accounts = 'mysql.session@localhost,mysql.sys@localhost'
# 包含特定用户(可选)
# audit_log_include_accounts = 'admin@localhost,app_user@%'
# 审计日志缓冲
audit_log_buffer_size = 1048576 # 1MB
3. 审计日志分析脚本
import json
import re
from datetime import datetime, timedelta
from collections import defaultdict, Counter
class MySQLAuditLogAnalyzer:
def __init__(self, log_file_path):
self.log_file_path = log_file_path
self.events = []
self.load_events()
def load_events(self):
"""加载审计日志事件"""
try:
with open(self.log_file_path, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
event = json.loads(line)
self.events.append(event)
except json.JSONDecodeError:
continue
print(f"加载了 {len(self.events)} 个审计事件")
except FileNotFoundError:
print(f"审计日志文件不存在: {self.log_file_path}")
except Exception as e:
print(f"加载审计日志失败: {e}")
def analyze_login_attempts(self, hours=24):
"""分析登录尝试"""
cutoff_time = datetime.now() - timedelta(hours=hours)
login_events = []
for event in self.events:
if event.get('class') == 'connection':
event_time = datetime.fromisoformat(event.get('timestamp', '').replace('Z', '+00:00'))
if event_time >= cutoff_time:
login_events.append(event)
# 统计登录尝试
successful_logins = [e for e in login_events if e.get('event') == 'connect']
failed_logins = [e for e in login_events if e.get('event') == 'connect' and e.get('connection_id') == 0]
print(f"\n最近{hours}小时登录分析:")
print("-" * 40)
print(f"总登录尝试: {len(login_events)}")
print(f"成功登录: {len(successful_logins)}")
print(f"失败登录: {len(failed_logins)}")
# 按用户统计
user_stats = Counter()
for event in login_events:
user = event.get('user', 'unknown')
host = event.get('host', 'unknown')
user_stats[f"{user}@{host}"] += 1
print("\n用户登录统计:")
for user, count in user_stats.most_common(10):
print(f" {user}: {count} 次")
return {
'total_attempts': len(login_events),
'successful': len(successful_logins),
'failed': len(failed_logins),
'user_stats': dict(user_stats)
}
def analyze_privilege_changes(self, hours=24):
"""分析权限变更"""
cutoff_time = datetime.now() - timedelta(hours=hours)
privilege_keywords = ['GRANT', 'REVOKE', 'CREATE USER', 'DROP USER', 'ALTER USER']
privilege_events = []
for event in self.events:
if event.get('class') == 'general':
event_time = datetime.fromisoformat(event.get('timestamp', '').replace('Z', '+00:00'))
if event_time >= cutoff_time:
sql_text = event.get('general_data', {}).get('sql_command', '').upper()
if any(keyword in sql_text for keyword in privilege_keywords):
privilege_events.append(event)
print(f"\n最近{hours}小时权限变更分析:")
print("-" * 40)
print(f"权限变更事件: {len(privilege_events)}")
# 按操作类型统计
operation_stats = Counter()
for event in privilege_events:
sql_text = event.get('general_data', {}).get('sql_command', '').upper()
for keyword in privilege_keywords:
if keyword in sql_text:
operation_stats[keyword] += 1
break
print("\n操作类型统计:")
for operation, count in operation_stats.items():
print(f" {operation}: {count} 次")
# 显示详细事件
print("\n详细权限变更事件:")
for event in privilege_events[-10:]: # 显示最近10个事件
timestamp = event.get('timestamp', '')
user = event.get('user', 'unknown')
sql_command = event.get('general_data', {}).get('sql_command', '')
print(f" {timestamp} - {user}: {sql_command[:100]}...")
return privilege_events
def analyze_suspicious_activities(self, hours=24):
"""分析可疑活动"""
cutoff_time = datetime.now() - timedelta(hours=hours)
suspicious_patterns = {
'SQL注入尝试': [r'UNION.*SELECT', r'OR.*1=1', r'DROP.*TABLE', r'INSERT.*INTO.*VALUES'],
'权限提升': [r'GRANT.*ALL', r'SUPER', r'FILE'],
'数据导出': [r'SELECT.*INTO.*OUTFILE', r'LOAD_FILE'],
'批量操作': [r'DELETE.*FROM.*WHERE.*1=1', r'UPDATE.*SET.*WHERE.*1=1']
}
suspicious_events = defaultdict(list)
for event in self.events:
if event.get('class') == 'general':
event_time = datetime.fromisoformat(event.get('timestamp', '').replace('Z', '+00:00'))
if event_time >= cutoff_time:
sql_text = event.get('general_data', {}).get('sql_command', '').upper()
for category, patterns in suspicious_patterns.items():
for pattern in patterns:
if re.search(pattern, sql_text, re.IGNORECASE):
suspicious_events[category].append(event)
break
print(f"\n最近{hours}小时可疑活动分析:")
print("-" * 40)
total_suspicious = sum(len(events) for events in suspicious_events.values())
print(f"可疑事件总数: {total_suspicious}")
for category, events in suspicious_events.items():
if events:
print(f"\n{category}: {len(events)} 个事件")
for event in events[-3:]: # 显示最近3个事件
timestamp = event.get('timestamp', '')
user = event.get('user', 'unknown')
sql_command = event.get('general_data', {}).get('sql_command', '')
print(f" {timestamp} - {user}: {sql_command[:80]}...")
return dict(suspicious_events)
def generate_security_report(self, hours=24):
"""生成安全报告"""
print(f"\n=== MySQL安全审计报告 ===")
print(f"报告时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"分析时间范围: 最近{hours}小时")
print("=" * 50)
# 登录分析
login_analysis = self.analyze_login_attempts(hours)
# 权限变更分析
privilege_analysis = self.analyze_privilege_changes(hours)
# 可疑活动分析
suspicious_analysis = self.analyze_suspicious_activities(hours)
# 安全评分
security_score = 100
# 根据失败登录次数扣分
failed_logins = login_analysis.get('failed', 0)
if failed_logins > 10:
security_score -= min(20, failed_logins)
# 根据权限变更次数扣分
privilege_changes = len(privilege_analysis)
if privilege_changes > 5:
security_score -= min(15, privilege_changes * 2)
# 根据可疑活动扣分
total_suspicious = sum(len(events) for events in suspicious_analysis.values())
if total_suspicious > 0:
security_score -= min(30, total_suspicious * 5)
security_score = max(0, security_score)
print(f"\n安全评分: {security_score}/100")
if security_score >= 90:
print("安全状态: 优秀 ✅")
elif security_score >= 70:
print("安全状态: 良好 ⚠️")
elif security_score >= 50:
print("安全状态: 一般 ⚠️")
else:
print("安全状态: 危险 ❌")
# 安全建议
print("\n安全建议:")
if failed_logins > 10:
print(" • 检查是否存在暴力破解攻击")
if privilege_changes > 5:
print(" • 审查权限变更的合理性")
if total_suspicious > 0:
print(" • 立即调查可疑活动")
if security_score < 70:
print(" • 加强访问控制和监控")
return {
'security_score': security_score,
'login_analysis': login_analysis,
'privilege_changes': len(privilege_analysis),
'suspicious_activities': total_suspicious
}
# 审计日志分析使用示例
analyzer = MySQLAuditLogAnalyzer('/var/log/mysql/audit.log')
report = analyzer.generate_security_report(24)
4. 审计日志监控脚本
#!/bin/bash
# MySQL审计日志实时监控脚本
AUDIT_LOG="/var/log/mysql/audit.log"
ALERT_EMAIL="admin@example.com"
TEMP_DIR="/tmp/mysql_audit"
ALERT_THRESHOLD=5
# 创建临时目录
mkdir -p $TEMP_DIR
echo "=== MySQL审计日志监控启动 ==="
echo "监控文件: $AUDIT_LOG"
echo "告警邮箱: $ALERT_EMAIL"
echo "开始时间: $(date)"
echo
# 监控函数
monitor_audit_log() {
local log_file=$1
local last_position_file="$TEMP_DIR/last_position"
# 获取上次读取位置
if [ -f "$last_position_file" ]; then
last_position=$(cat "$last_position_file")
else
last_position=0
fi
# 获取当前文件大小
current_size=$(stat -c%s "$log_file" 2>/dev/null || echo 0)
if [ $current_size -gt $last_position ]; then
# 读取新增内容
tail -c +$((last_position + 1)) "$log_file" | while IFS= read -r line; do
if [ -n "$line" ]; then
analyze_audit_event "$line"
fi
done
# 更新位置
echo $current_size > "$last_position_file"
fi
}
# 分析审计事件
analyze_audit_event() {
local event_line="$1"
local alert_file="$TEMP_DIR/alerts_$(date +%Y%m%d)"
# 检查可疑活动
if echo "$event_line" | grep -qi "UNION.*SELECT\|OR.*1=1\|DROP.*TABLE"; then
echo "$(date): SQL注入尝试 - $event_line" >> "$alert_file"
send_alert "SQL注入尝试" "$event_line"
fi
if echo "$event_line" | grep -qi "GRANT.*ALL\|SUPER\|FILE"; then
echo "$(date): 权限提升尝试 - $event_line" >> "$alert_file"
send_alert "权限提升尝试" "$event_line"
fi
if echo "$event_line" | grep -qi "SELECT.*INTO.*OUTFILE\|LOAD_FILE"; then
echo "$(date): 数据导出尝试 - $event_line" >> "$alert_file"
send_alert "数据导出尝试" "$event_line"
fi
# 检查失败登录
if echo "$event_line" | grep -qi '"event":"connect"' && echo "$event_line" | grep -qi '"connection_id":0'; then
echo "$(date): 登录失败 - $event_line" >> "$alert_file"
# 统计失败登录次数
local failed_count=$(grep "登录失败" "$alert_file" | wc -l)
if [ $failed_count -ge $ALERT_THRESHOLD ]; then
send_alert "频繁登录失败" "检测到 $failed_count 次登录失败"
fi
fi
}
# 发送告警
send_alert() {
local alert_type="$1"
local alert_content="$2"
local timestamp=$(date '+%Y-%m-%d %H:%M:%S')
echo "[$timestamp] 安全告警: $alert_type"
echo "详情: $alert_content"
echo
# 发送邮件告警(需要配置邮件服务)
if command -v mail >/dev/null 2>&1; then
echo "时间: $timestamp\n类型: $alert_type\n详情: $alert_content" | \
mail -s "MySQL安全告警: $alert_type" "$ALERT_EMAIL"
fi
# 记录到系统日志
logger "MySQL Security Alert: $alert_type - $alert_content"
}
# 清理旧文件
cleanup_old_files() {
find "$TEMP_DIR" -name "alerts_*" -mtime +7 -delete
}
# 主监控循环
while true; do
if [ -f "$AUDIT_LOG" ]; then
monitor_audit_log "$AUDIT_LOG"
else
echo "审计日志文件不存在: $AUDIT_LOG"
sleep 60
continue
fi
# 每小时清理一次旧文件
if [ $(($(date +%M))) -eq 0 ]; then
cleanup_old_files
fi
sleep 10
done
10.5.2 性能监控与安全监控集成
1. 安全监控指标
-- 安全监控关键指标
-- 1. 连接监控
SELECT
USER,
HOST,
COUNT(*) as connection_count,
MAX(TIME) as max_time,
AVG(TIME) as avg_time
FROM information_schema.PROCESSLIST
WHERE USER NOT IN ('system user', 'event_scheduler')
GROUP BY USER, HOST
ORDER BY connection_count DESC;
-- 2. 失败连接统计
SELECT
VARIABLE_NAME,
VARIABLE_VALUE
FROM performance_schema.global_status
WHERE VARIABLE_NAME IN (
'Aborted_connects',
'Aborted_clients',
'Connection_errors_max_connections',
'Connection_errors_internal',
'Connection_errors_tcpwrap'
);
-- 3. 权限使用统计
SELECT
USER,
HOST,
COUNT(*) as query_count,
COUNT(DISTINCT SQL_TEXT) as unique_queries
FROM performance_schema.events_statements_history_long
WHERE EVENT_NAME = 'statement/sql/select'
GROUP BY USER, HOST
ORDER BY query_count DESC
LIMIT 10;
-- 4. 敏感操作监控
SELECT
USER,
HOST,
SQL_TEXT,
TIMER_START,
TIMER_END
FROM performance_schema.events_statements_history_long
WHERE SQL_TEXT REGEXP 'GRANT|REVOKE|CREATE USER|DROP USER|ALTER USER'
ORDER BY TIMER_START DESC
LIMIT 20;
-- 5. 文件访问监控
SELECT
USER,
HOST,
SQL_TEXT
FROM performance_schema.events_statements_history_long
WHERE SQL_TEXT REGEXP 'LOAD_FILE|INTO OUTFILE|INTO DUMPFILE'
ORDER BY TIMER_START DESC;
2. 安全监控仪表板
class MySQLSecurityDashboard:
def __init__(self, connection):
self.connection = connection
def get_connection_security_metrics(self):
"""获取连接安全指标"""
try:
cursor = self.connection.cursor(dictionary=True)
# 当前连接统计
cursor.execute("""
SELECT
USER,
HOST,
COUNT(*) as connection_count,
MAX(TIME) as max_time,
AVG(TIME) as avg_time
FROM information_schema.PROCESSLIST
WHERE USER NOT IN ('system user', 'event_scheduler')
GROUP BY USER, HOST
ORDER BY connection_count DESC
""")
current_connections = cursor.fetchall()
# 连接错误统计
cursor.execute("""
SELECT
VARIABLE_NAME,
VARIABLE_VALUE
FROM performance_schema.global_status
WHERE VARIABLE_NAME IN (
'Aborted_connects',
'Aborted_clients',
'Connection_errors_max_connections',
'Connection_errors_internal'
)
""")
connection_errors = {row['VARIABLE_NAME']: int(row['VARIABLE_VALUE'])
for row in cursor.fetchall()}
return {
'current_connections': current_connections,
'connection_errors': connection_errors
}
except Exception as e:
print(f"获取连接安全指标失败: {e}")
return {}
finally:
cursor.close()
def get_privilege_usage_metrics(self):
"""获取权限使用指标"""
try:
cursor = self.connection.cursor(dictionary=True)
# 用户查询统计
cursor.execute("""
SELECT
USER,
HOST,
COUNT(*) as query_count,
COUNT(DISTINCT DIGEST_TEXT) as unique_queries
FROM performance_schema.events_statements_summary_by_user_by_event_name
WHERE EVENT_NAME LIKE 'statement/sql/%'
GROUP BY USER, HOST
ORDER BY query_count DESC
LIMIT 10
""")
user_activity = cursor.fetchall()
# 敏感操作统计
cursor.execute("""
SELECT
USER,
HOST,
DIGEST_TEXT,
COUNT_STAR as execution_count,
SUM_TIMER_WAIT/1000000000 as total_time_seconds
FROM performance_schema.events_statements_summary_by_digest
WHERE DIGEST_TEXT REGEXP 'GRANT|REVOKE|CREATE USER|DROP USER|ALTER USER'
ORDER BY COUNT_STAR DESC
LIMIT 20
""")
sensitive_operations = cursor.fetchall()
return {
'user_activity': user_activity,
'sensitive_operations': sensitive_operations
}
except Exception as e:
print(f"获取权限使用指标失败: {e}")
return {}
finally:
cursor.close()
def get_security_violations(self):
"""获取安全违规事件"""
try:
cursor = self.connection.cursor(dictionary=True)
# 可疑SQL模式
suspicious_patterns = [
"UNION.*SELECT",
"OR.*1=1",
"DROP.*TABLE",
"LOAD_FILE",
"INTO.*OUTFILE"
]
violations = []
for pattern in suspicious_patterns:
cursor.execute(f"""
SELECT
USER,
HOST,
DIGEST_TEXT,
COUNT_STAR as execution_count,
FIRST_SEEN,
LAST_SEEN
FROM performance_schema.events_statements_summary_by_digest
WHERE DIGEST_TEXT REGEXP '{pattern}'
ORDER BY LAST_SEEN DESC
LIMIT 5
""")
pattern_violations = cursor.fetchall()
for violation in pattern_violations:
violation['violation_type'] = pattern
violations.append(violation)
return violations
except Exception as e:
print(f"获取安全违规事件失败: {e}")
return []
finally:
cursor.close()
def generate_security_dashboard(self):
"""生成安全仪表板"""
print("\n=== MySQL安全监控仪表板 ===")
print(f"更新时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("=" * 50)
# 连接安全指标
connection_metrics = self.get_connection_security_metrics()
print("\n📊 连接安全指标:")
print("-" * 30)
current_connections = connection_metrics.get('current_connections', [])
if current_connections:
print(f"活跃连接数: {len(current_connections)}")
print("\n用户连接分布:")
for conn in current_connections[:5]:
print(f" {conn['USER']}@{conn['HOST']}: {conn['connection_count']} 个连接")
connection_errors = connection_metrics.get('connection_errors', {})
if connection_errors:
print("\n连接错误统计:")
for error_type, count in connection_errors.items():
if count > 0:
print(f" {error_type}: {count}")
# 权限使用指标
privilege_metrics = self.get_privilege_usage_metrics()
print("\n🔐 权限使用指标:")
print("-" * 30)
user_activity = privilege_metrics.get('user_activity', [])
if user_activity:
print("用户活跃度 (Top 5):")
for user in user_activity[:5]:
print(f" {user['USER']}@{user['HOST']}: {user['query_count']} 次查询")
sensitive_operations = privilege_metrics.get('sensitive_operations', [])
if sensitive_operations:
print("\n敏感操作统计:")
for op in sensitive_operations[:5]:
print(f" {op['DIGEST_TEXT'][:50]}...: {op['execution_count']} 次")
# 安全违规事件
violations = self.get_security_violations()
print("\n⚠️ 安全违规事件:")
print("-" * 30)
if violations:
print(f"检测到 {len(violations)} 个潜在安全违规事件")
for violation in violations[:5]:
print(f" 类型: {violation['violation_type']}")
print(f" 用户: {violation['USER']}@{violation['HOST']}")
print(f" 执行次数: {violation['execution_count']}")
print(f" 最后执行: {violation['LAST_SEEN']}")
print()
else:
print("✅ 未检测到安全违规事件")
# 安全建议
print("\n💡 安全建议:")
print("-" * 30)
total_errors = sum(connection_errors.values())
if total_errors > 100:
print(" • 连接错误较多,建议检查网络和认证配置")
if len(violations) > 0:
print(" • 发现可疑SQL模式,建议立即调查")
if len(current_connections) > 50:
print(" • 当前连接数较多,建议检查连接池配置")
if not violations and total_errors < 10:
print(" ✅ 当前安全状态良好")
return {
'connection_metrics': connection_metrics,
'privilege_metrics': privilege_metrics,
'violations': violations
}
# 安全仪表板使用示例
import mysql.connector
connection = mysql.connector.connect(
host='localhost',
user='root',
password='password'
)
dashboard = MySQLSecurityDashboard(connection)
metrics = dashboard.generate_security_dashboard()
connection.close()
”`