12.1 性能优化概述
12.1.1 性能优化的层次结构
MySQL性能优化是一个系统性工程,需要从多个层面进行考虑和实施:
应用层优化
├── SQL查询优化
├── 连接池管理
├── 缓存策略
└── 业务逻辑优化
数据库层优化
├── 索引设计与优化
├── 查询执行计划优化
├── 存储引擎选择
└── 数据库结构设计
服务器层优化
├── MySQL配置参数调优
├── 内存分配优化
├── I/O性能优化
└── 并发控制优化
系统层优化
├── 操作系统参数调优
├── 硬件配置优化
├── 网络性能优化
└── 存储系统优化
12.1.2 性能分析工具
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL性能分析工具集
提供全面的MySQL性能分析和监控功能
"""
import mysql.connector
import time
import json
import subprocess
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
import logging
import argparse
from dataclasses import dataclass
@dataclass
class PerformanceMetric:
"""性能指标数据类"""
name: str
value: float
unit: str
timestamp: datetime
threshold_warning: Optional[float] = None
threshold_critical: Optional[float] = None
@property
def status(self) -> str:
"""获取指标状态"""
if self.threshold_critical and self.value >= self.threshold_critical:
return 'critical'
elif self.threshold_warning and self.value >= self.threshold_warning:
return 'warning'
else:
return 'ok'
class MySQLPerformanceAnalyzer:
"""MySQL性能分析器"""
def __init__(self, host='localhost', port=3306, user='root', password='', database=None):
self.host = host
self.port = port
self.user = user
self.password = password
self.database = database
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 性能阈值配置
self.thresholds = {
'cpu_usage': {'warning': 70.0, 'critical': 90.0},
'memory_usage': {'warning': 80.0, 'critical': 95.0},
'disk_usage': {'warning': 80.0, 'critical': 90.0},
'connections': {'warning': 80.0, 'critical': 95.0},
'slow_queries': {'warning': 10.0, 'critical': 50.0},
'lock_waits': {'warning': 5.0, 'critical': 20.0}
}
def get_connection(self):
"""获取MySQL连接"""
try:
connection = mysql.connector.connect(
host=self.host,
port=self.port,
user=self.user,
password=self.password,
database=self.database
)
return connection
except mysql.connector.Error as e:
self.logger.error(f"MySQL连接失败: {e}")
return None
def get_server_status(self) -> Dict[str, PerformanceMetric]:
"""获取服务器状态指标"""
connection = self.get_connection()
if not connection:
return {}
metrics = {}
now = datetime.now()
try:
cursor = connection.cursor(dictionary=True)
# 获取服务器状态变量
cursor.execute("SHOW GLOBAL STATUS")
status_vars = {row['Variable_name']: row['Value'] for row in cursor.fetchall()}
# 连接数指标
max_connections = int(status_vars.get('max_connections', 0))
threads_connected = int(status_vars.get('Threads_connected', 0))
connection_usage = (threads_connected / max_connections * 100) if max_connections > 0 else 0
metrics['connections'] = PerformanceMetric(
name='连接使用率',
value=connection_usage,
unit='%',
timestamp=now,
threshold_warning=self.thresholds['connections']['warning'],
threshold_critical=self.thresholds['connections']['critical']
)
# 慢查询指标
slow_queries = int(status_vars.get('Slow_queries', 0))
queries = int(status_vars.get('Queries', 1))
slow_query_rate = (slow_queries / queries * 100) if queries > 0 else 0
metrics['slow_queries'] = PerformanceMetric(
name='慢查询率',
value=slow_query_rate,
unit='%',
timestamp=now,
threshold_warning=self.thresholds['slow_queries']['warning'],
threshold_critical=self.thresholds['slow_queries']['critical']
)
# 锁等待指标
table_locks_waited = int(status_vars.get('Table_locks_waited', 0))
table_locks_immediate = int(status_vars.get('Table_locks_immediate', 1))
lock_wait_rate = (table_locks_waited / (table_locks_waited + table_locks_immediate) * 100)
metrics['lock_waits'] = PerformanceMetric(
name='锁等待率',
value=lock_wait_rate,
unit='%',
timestamp=now,
threshold_warning=self.thresholds['lock_waits']['warning'],
threshold_critical=self.thresholds['lock_waits']['critical']
)
# 缓冲池命中率
innodb_buffer_pool_reads = int(status_vars.get('Innodb_buffer_pool_reads', 0))
innodb_buffer_pool_read_requests = int(status_vars.get('Innodb_buffer_pool_read_requests', 1))
buffer_hit_rate = ((innodb_buffer_pool_read_requests - innodb_buffer_pool_reads) /
innodb_buffer_pool_read_requests * 100) if innodb_buffer_pool_read_requests > 0 else 0
metrics['buffer_hit_rate'] = PerformanceMetric(
name='缓冲池命中率',
value=buffer_hit_rate,
unit='%',
timestamp=now,
threshold_warning=95.0,
threshold_critical=90.0
)
# QPS (每秒查询数)
cursor.execute("SHOW GLOBAL STATUS LIKE 'Uptime'")
uptime = int(cursor.fetchone()['Value'])
qps = queries / uptime if uptime > 0 else 0
metrics['qps'] = PerformanceMetric(
name='每秒查询数',
value=qps,
unit='queries/sec',
timestamp=now
)
# TPS (每秒事务数)
com_commit = int(status_vars.get('Com_commit', 0))
com_rollback = int(status_vars.get('Com_rollback', 0))
tps = (com_commit + com_rollback) / uptime if uptime > 0 else 0
metrics['tps'] = PerformanceMetric(
name='每秒事务数',
value=tps,
unit='transactions/sec',
timestamp=now
)
except mysql.connector.Error as e:
self.logger.error(f"获取服务器状态失败: {e}")
finally:
connection.close()
return metrics
def get_innodb_status(self) -> Dict[str, PerformanceMetric]:
"""获取InnoDB状态指标"""
connection = self.get_connection()
if not connection:
return {}
metrics = {}
now = datetime.now()
try:
cursor = connection.cursor()
# 获取InnoDB状态
cursor.execute("SHOW ENGINE INNODB STATUS")
innodb_status = cursor.fetchone()[2] # Status列
# 解析InnoDB状态信息
lines = innodb_status.split('\n')
# 查找关键指标
for i, line in enumerate(lines):
# 缓冲池信息
if 'Buffer pool size' in line:
# 解析缓冲池大小和使用情况
pass
# 日志信息
if 'Log sequence number' in line:
# 解析日志序列号
pass
# 锁信息
if 'TRANSACTIONS' in line:
# 解析事务和锁信息
pass
# 获取InnoDB变量
cursor.execute("SHOW GLOBAL STATUS LIKE 'Innodb_%'")
innodb_vars = {row[0]: row[1] for row in cursor.fetchall()}
# InnoDB缓冲池使用率
buffer_pool_pages_total = int(innodb_vars.get('Innodb_buffer_pool_pages_total', 1))
buffer_pool_pages_free = int(innodb_vars.get('Innodb_buffer_pool_pages_free', 0))
buffer_pool_usage = ((buffer_pool_pages_total - buffer_pool_pages_free) /
buffer_pool_pages_total * 100) if buffer_pool_pages_total > 0 else 0
metrics['innodb_buffer_usage'] = PerformanceMetric(
name='InnoDB缓冲池使用率',
value=buffer_pool_usage,
unit='%',
timestamp=now,
threshold_warning=80.0,
threshold_critical=95.0
)
# InnoDB日志写入
log_writes = int(innodb_vars.get('Innodb_log_writes', 0))
os_log_written = int(innodb_vars.get('Innodb_os_log_written', 0))
metrics['innodb_log_writes'] = PerformanceMetric(
name='InnoDB日志写入次数',
value=log_writes,
unit='writes',
timestamp=now
)
except mysql.connector.Error as e:
self.logger.error(f"获取InnoDB状态失败: {e}")
finally:
connection.close()
return metrics
def get_slow_queries(self, limit=10) -> List[Dict]:
"""获取慢查询信息"""
connection = self.get_connection()
if not connection:
return []
slow_queries = []
try:
cursor = connection.cursor(dictionary=True)
# 检查慢查询日志是否启用
cursor.execute("SHOW VARIABLES LIKE 'slow_query_log'")
slow_log_enabled = cursor.fetchone()
if slow_log_enabled and slow_log_enabled['Value'] == 'ON':
# 从performance_schema获取慢查询
query = """
SELECT
DIGEST_TEXT as query_text,
COUNT_STAR as exec_count,
AVG_TIMER_WAIT/1000000000 as avg_time_sec,
MAX_TIMER_WAIT/1000000000 as max_time_sec,
SUM_ROWS_EXAMINED as total_rows_examined,
SUM_ROWS_SENT as total_rows_sent
FROM performance_schema.events_statements_summary_by_digest
WHERE DIGEST_TEXT IS NOT NULL
ORDER BY AVG_TIMER_WAIT DESC
LIMIT %s
"""
cursor.execute(query, (limit,))
slow_queries = cursor.fetchall()
except mysql.connector.Error as e:
self.logger.error(f"获取慢查询失败: {e}")
finally:
connection.close()
return slow_queries
def get_table_statistics(self, database_name=None) -> List[Dict]:
"""获取表统计信息"""
connection = self.get_connection()
if not connection:
return []
table_stats = []
try:
cursor = connection.cursor(dictionary=True)
# 构建查询
query = """
SELECT
table_schema,
table_name,
table_rows,
data_length,
index_length,
(data_length + index_length) as total_size,
avg_row_length
FROM information_schema.tables
WHERE table_type = 'BASE TABLE'
"""
params = []
if database_name:
query += " AND table_schema = %s"
params.append(database_name)
query += " ORDER BY (data_length + index_length) DESC"
cursor.execute(query, params)
table_stats = cursor.fetchall()
except mysql.connector.Error as e:
self.logger.error(f"获取表统计信息失败: {e}")
finally:
connection.close()
return table_stats
def analyze_query_performance(self, query: str) -> Dict:
"""分析查询性能"""
connection = self.get_connection()
if not connection:
return {}
analysis = {
'query': query,
'execution_plan': [],
'execution_time': 0,
'rows_examined': 0,
'recommendations': []
}
try:
cursor = connection.cursor(dictionary=True)
# 获取执行计划
cursor.execute(f"EXPLAIN {query}")
analysis['execution_plan'] = cursor.fetchall()
# 分析执行计划
for step in analysis['execution_plan']:
# 检查全表扫描
if step.get('type') == 'ALL':
analysis['recommendations'].append(
f"表 {step['table']} 进行了全表扫描,建议添加索引"
)
# 检查临时表
if step.get('Extra') and 'Using temporary' in step['Extra']:
analysis['recommendations'].append(
"查询使用了临时表,可能影响性能"
)
# 检查文件排序
if step.get('Extra') and 'Using filesort' in step['Extra']:
analysis['recommendations'].append(
"查询使用了文件排序,建议优化ORDER BY子句"
)
# 执行查询并测量时间
start_time = time.time()
cursor.execute(query)
cursor.fetchall() # 获取所有结果
end_time = time.time()
analysis['execution_time'] = end_time - start_time
except mysql.connector.Error as e:
self.logger.error(f"查询性能分析失败: {e}")
analysis['error'] = str(e)
finally:
connection.close()
return analysis
def generate_performance_report(self) -> Dict:
"""生成性能报告"""
self.logger.info("生成MySQL性能报告")
report = {
'timestamp': datetime.now().isoformat(),
'server_info': {
'host': self.host,
'port': self.port
},
'server_metrics': self.get_server_status(),
'innodb_metrics': self.get_innodb_status(),
'slow_queries': self.get_slow_queries(),
'table_statistics': self.get_table_statistics(),
'recommendations': []
}
# 生成建议
recommendations = []
# 检查连接使用率
if 'connections' in report['server_metrics']:
conn_metric = report['server_metrics']['connections']
if conn_metric.status == 'critical':
recommendations.append("连接使用率过高,建议增加max_connections或优化连接池")
elif conn_metric.status == 'warning':
recommendations.append("连接使用率较高,建议监控连接数变化")
# 检查慢查询率
if 'slow_queries' in report['server_metrics']:
slow_metric = report['server_metrics']['slow_queries']
if slow_metric.status == 'critical':
recommendations.append("慢查询率过高,建议优化SQL查询和索引")
# 检查缓冲池命中率
if 'buffer_hit_rate' in report['server_metrics']:
buffer_metric = report['server_metrics']['buffer_hit_rate']
if buffer_metric.value < 95:
recommendations.append("缓冲池命中率较低,建议增加innodb_buffer_pool_size")
report['recommendations'] = recommendations
return report
def save_report(self, report: Dict, filename: str = None):
"""保存性能报告"""
if not filename:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"mysql_performance_report_{timestamp}.json"
# 转换PerformanceMetric对象为字典
def convert_metrics(obj):
if isinstance(obj, PerformanceMetric):
return {
'name': obj.name,
'value': obj.value,
'unit': obj.unit,
'timestamp': obj.timestamp.isoformat(),
'status': obj.status,
'threshold_warning': obj.threshold_warning,
'threshold_critical': obj.threshold_critical
}
elif isinstance(obj, dict):
return {k: convert_metrics(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_metrics(item) for item in obj]
else:
return obj
converted_report = convert_metrics(report)
with open(filename, 'w', encoding='utf-8') as f:
json.dump(converted_report, f, indent=2, ensure_ascii=False)
self.logger.info(f"性能报告已保存到: {filename}")
def main():
parser = argparse.ArgumentParser(description='MySQL性能分析工具')
parser.add_argument('--host', default='localhost', help='MySQL主机地址')
parser.add_argument('--port', type=int, default=3306, help='MySQL端口')
parser.add_argument('--user', default='root', help='MySQL用户名')
parser.add_argument('--password', default='', help='MySQL密码')
parser.add_argument('--database', help='数据库名称')
parser.add_argument('--action', choices=['status', 'slow', 'analyze', 'report'],
default='report', help='执行的操作')
parser.add_argument('--query', help='要分析的SQL查询')
parser.add_argument('--output', help='输出文件名')
args = parser.parse_args()
# 创建性能分析器
analyzer = MySQLPerformanceAnalyzer(
host=args.host,
port=args.port,
user=args.user,
password=args.password,
database=args.database
)
if args.action == 'status':
# 显示服务器状态
metrics = analyzer.get_server_status()
for name, metric in metrics.items():
print(f"{metric.name}: {metric.value} {metric.unit} ({metric.status})")
elif args.action == 'slow':
# 显示慢查询
slow_queries = analyzer.get_slow_queries()
print(f"发现 {len(slow_queries)} 个慢查询:")
for i, query in enumerate(slow_queries, 1):
print(f"\n{i}. 平均执行时间: {query['avg_time_sec']:.2f}秒")
print(f" 执行次数: {query['exec_count']}")
print(f" 查询: {query['query_text'][:100]}...")
elif args.action == 'analyze' and args.query:
# 分析特定查询
analysis = analyzer.analyze_query_performance(args.query)
print(f"查询执行时间: {analysis['execution_time']:.4f}秒")
print("\n执行计划:")
for step in analysis['execution_plan']:
print(f" 表: {step['table']}, 类型: {step['type']}, 行数: {step['rows']}")
if analysis['recommendations']:
print("\n优化建议:")
for rec in analysis['recommendations']:
print(f" - {rec}")
elif args.action == 'report':
# 生成完整报告
report = analyzer.generate_performance_report()
analyzer.save_report(report, args.output)
# 显示摘要
print("MySQL性能报告摘要:")
print(f"生成时间: {report['timestamp']}")
print(f"服务器: {report['server_info']['host']}:{report['server_info']['port']}")
if report['recommendations']:
print("\n主要建议:")
for rec in report['recommendations']:
print(f" - {rec}")
if __name__ == '__main__':
main()
12.2 查询优化
12.2.1 SQL查询优化基础
1. 查询执行过程分析
-- 查询执行过程示例
-- 1. 语法解析
-- 2. 查询优化
-- 3. 执行计划生成
-- 4. 查询执行
-- 使用EXPLAIN分析查询执行计划
EXPLAIN SELECT
u.username,
p.title,
p.created_at
FROM users u
JOIN posts p ON u.id = p.user_id
WHERE u.status = 'active'
AND p.created_at >= '2024-01-01'
ORDER BY p.created_at DESC
LIMIT 10;
-- 使用EXPLAIN FORMAT=JSON获取详细信息
EXPLAIN FORMAT=JSON
SELECT
u.username,
COUNT(p.id) as post_count,
AVG(p.view_count) as avg_views
FROM users u
LEFT JOIN posts p ON u.id = p.user_id
WHERE u.created_at >= '2023-01-01'
GROUP BY u.id, u.username
HAVING post_count > 5
ORDER BY avg_views DESC;
-- 使用EXPLAIN ANALYZE (MySQL 8.0+)
EXPLAIN ANALYZE
SELECT
category,
COUNT(*) as product_count,
AVG(price) as avg_price
FROM products
WHERE status = 'active'
AND price BETWEEN 100 AND 1000
GROUP BY category
ORDER BY avg_price DESC;
2. 索引优化策略
-- 创建高效索引的示例
-- 1. 单列索引
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_posts_created_at ON posts(created_at);
CREATE INDEX idx_products_price ON products(price);
-- 2. 复合索引(注意列的顺序)
-- 遵循最左前缀原则
CREATE INDEX idx_posts_user_status_created ON posts(user_id, status, created_at);
CREATE INDEX idx_orders_customer_date_status ON orders(customer_id, order_date, status);
-- 3. 覆盖索引(包含查询所需的所有列)
CREATE INDEX idx_products_category_cover ON products(category, name, price, status);
-- 4. 前缀索引(适用于长字符串)
CREATE INDEX idx_articles_title_prefix ON articles(title(20));
CREATE INDEX idx_users_description_prefix ON users(description(50));
-- 5. 函数索引(MySQL 8.0+)
CREATE INDEX idx_users_email_lower ON users((LOWER(email)));
CREATE INDEX idx_orders_year ON orders((YEAR(order_date)));
-- 6. 部分索引(带WHERE条件)
CREATE INDEX idx_posts_active ON posts(created_at) WHERE status = 'published';
-- 索引使用分析
SHOW INDEX FROM posts;
-- 查看索引使用情况
SELECT
table_schema,
table_name,
index_name,
column_name,
seq_in_index,
cardinality
FROM information_schema.statistics
WHERE table_schema = 'your_database'
ORDER BY table_name, index_name, seq_in_index;
-- 检查未使用的索引
SELECT
object_schema,
object_name,
index_name
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE index_name IS NOT NULL
AND count_star = 0
AND object_schema = 'your_database';
12.2.2 查询重写技巧
-- 查询重写优化示例
-- 1. 避免SELECT *
-- 不好的写法
SELECT * FROM users WHERE status = 'active';
-- 好的写法
SELECT id, username, email, created_at
FROM users
WHERE status = 'active';
-- 2. 使用LIMIT限制结果集
-- 不好的写法
SELECT username FROM users ORDER BY created_at DESC;
-- 好的写法
SELECT username
FROM users
ORDER BY created_at DESC
LIMIT 20;
-- 3. 优化子查询
-- 不好的写法(相关子查询)
SELECT u.username
FROM users u
WHERE EXISTS (
SELECT 1 FROM posts p
WHERE p.user_id = u.id
AND p.status = 'published'
);
-- 好的写法(使用JOIN)
SELECT DISTINCT u.username
FROM users u
INNER JOIN posts p ON u.id = p.user_id
WHERE p.status = 'published';
-- 4. 优化OR条件
-- 不好的写法
SELECT * FROM products
WHERE category = 'electronics'
OR category = 'computers'
OR category = 'phones';
-- 好的写法
SELECT * FROM products
WHERE category IN ('electronics', 'computers', 'phones');
-- 5. 避免在WHERE子句中使用函数
-- 不好的写法
SELECT * FROM orders
WHERE YEAR(order_date) = 2024;
-- 好的写法
SELECT * FROM orders
WHERE order_date >= '2024-01-01'
AND order_date < '2025-01-01';
-- 6. 优化LIKE查询
-- 不好的写法
SELECT * FROM products WHERE name LIKE '%phone%';
-- 好的写法(如果可能的话)
SELECT * FROM products WHERE name LIKE 'phone%';
-- 或者使用全文索引
ALTER TABLE products ADD FULLTEXT(name, description);
SELECT * FROM products
WHERE MATCH(name, description) AGAINST('phone' IN NATURAL LANGUAGE MODE);
-- 7. 优化分页查询
-- 不好的写法(大偏移量)
SELECT * FROM posts
ORDER BY created_at DESC
LIMIT 10000, 20;
-- 好的写法(使用游标分页)
SELECT * FROM posts
WHERE created_at < '2024-01-01 12:00:00'
ORDER BY created_at DESC
LIMIT 20;
-- 8. 优化COUNT查询
-- 不好的写法
SELECT COUNT(*) FROM large_table;
-- 好的写法(如果不需要精确计数)
SELECT table_rows
FROM information_schema.tables
WHERE table_schema = 'your_database'
AND table_name = 'large_table';
-- 9. 使用UNION ALL代替UNION
-- 如果确定没有重复记录
SELECT username FROM active_users
UNION ALL
SELECT username FROM inactive_users;
-- 10. 优化GROUP BY和ORDER BY
-- 确保GROUP BY和ORDER BY使用相同的列顺序
SELECT category, COUNT(*) as cnt
FROM products
GROUP BY category
ORDER BY category; -- 与GROUP BY相同的顺序
12.2.3 查询优化工具
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
SQL查询优化分析工具
自动分析SQL查询并提供优化建议
"""
import re
import mysql.connector
from typing import List, Dict, Tuple
import logging
from dataclasses import dataclass
@dataclass
class OptimizationSuggestion:
"""优化建议数据类"""
type: str # 建议类型:index, rewrite, config
priority: str # 优先级:high, medium, low
description: str # 建议描述
example: str = "" # 示例代码
impact: str = "" # 预期影响
class SQLOptimizer:
"""SQL查询优化器"""
def __init__(self, connection_config: Dict):
self.connection_config = connection_config
self.logger = logging.getLogger(__name__)
# 优化规则模式
self.optimization_patterns = {
'select_star': r'SELECT\s+\*\s+FROM',
'no_limit': r'SELECT.*FROM.*(?!LIMIT)',
'like_leading_wildcard': r"LIKE\s+['\"]%",
'function_in_where': r'WHERE.*\w+\(',
'or_conditions': r'WHERE.*OR.*OR',
'subquery_exists': r'WHERE\s+EXISTS\s*\(',
'count_star': r'COUNT\(\*\)',
'order_by_rand': r'ORDER\s+BY\s+RAND\(\)'
}
def get_connection(self):
"""获取数据库连接"""
return mysql.connector.connect(**self.connection_config)
def analyze_query_structure(self, query: str) -> List[OptimizationSuggestion]:
"""分析查询结构并提供优化建议"""
suggestions = []
query_upper = query.upper()
# 检查SELECT *
if re.search(self.optimization_patterns['select_star'], query_upper, re.IGNORECASE):
suggestions.append(OptimizationSuggestion(
type='rewrite',
priority='medium',
description='避免使用SELECT *,明确指定需要的列',
example='SELECT id, name, email FROM users;',
impact='减少网络传输和内存使用'
))
# 检查缺少LIMIT
if 'SELECT' in query_upper and 'LIMIT' not in query_upper:
suggestions.append(OptimizationSuggestion(
type='rewrite',
priority='high',
description='添加LIMIT子句限制结果集大小',
example='SELECT * FROM users ORDER BY created_at DESC LIMIT 100;',
impact='防止返回过多数据,提高响应速度'
))
# 检查LIKE前导通配符
if re.search(self.optimization_patterns['like_leading_wildcard'], query, re.IGNORECASE):
suggestions.append(OptimizationSuggestion(
type='index',
priority='high',
description='避免LIKE查询使用前导通配符,考虑使用全文索引',
example='ALTER TABLE table_name ADD FULLTEXT(column_name);',
impact='避免全表扫描,大幅提升查询性能'
))
# 检查WHERE子句中的函数
if re.search(self.optimization_patterns['function_in_where'], query_upper, re.IGNORECASE):
suggestions.append(OptimizationSuggestion(
type='rewrite',
priority='high',
description='避免在WHERE子句中使用函数,考虑重写查询',
example='WHERE date_column >= "2024-01-01" AND date_column < "2025-01-01"',
impact='允许使用索引,显著提升查询性能'
))
# 检查多个OR条件
if re.search(self.optimization_patterns['or_conditions'], query_upper, re.IGNORECASE):
suggestions.append(OptimizationSuggestion(
type='rewrite',
priority='medium',
description='将多个OR条件重写为IN子句',
example='WHERE column IN (value1, value2, value3)',
impact='提高查询效率和可读性'
))
# 检查EXISTS子查询
if re.search(self.optimization_patterns['subquery_exists'], query_upper, re.IGNORECASE):
suggestions.append(OptimizationSuggestion(
type='rewrite',
priority='medium',
description='考虑将EXISTS子查询重写为JOIN',
example='SELECT DISTINCT t1.* FROM table1 t1 JOIN table2 t2 ON t1.id = t2.ref_id',
impact='可能提高查询性能,特别是大数据集'
))
# 检查ORDER BY RAND()
if re.search(self.optimization_patterns['order_by_rand'], query_upper, re.IGNORECASE):
suggestions.append(OptimizationSuggestion(
type='rewrite',
priority='high',
description='避免使用ORDER BY RAND(),考虑应用层随机化',
example='SELECT * FROM table WHERE id >= RAND() * (SELECT MAX(id) FROM table) LIMIT 1',
impact='避免全表排序,大幅提升性能'
))
return suggestions
def analyze_execution_plan(self, query: str) -> Dict:
"""分析查询执行计划"""
connection = self.get_connection()
analysis = {
'query': query,
'execution_plan': [],
'issues': [],
'suggestions': []
}
try:
cursor = connection.cursor(dictionary=True)
# 获取执行计划
cursor.execute(f"EXPLAIN {query}")
execution_plan = cursor.fetchall()
analysis['execution_plan'] = execution_plan
# 分析执行计划中的问题
for step in execution_plan:
table = step.get('table', '')
select_type = step.get('select_type', '')
type_access = step.get('type', '')
extra = step.get('Extra', '')
rows = step.get('rows', 0)
# 检查全表扫描
if type_access == 'ALL':
analysis['issues'].append(f"表 {table} 进行全表扫描")
analysis['suggestions'].append(OptimizationSuggestion(
type='index',
priority='high',
description=f'为表 {table} 添加适当的索引',
example=f'CREATE INDEX idx_{table}_column ON {table}(column_name);',
impact='避免全表扫描,显著提升查询速度'
))
# 检查大量行扫描
if rows and rows > 10000:
analysis['issues'].append(f"表 {table} 扫描行数过多: {rows}")
analysis['suggestions'].append(OptimizationSuggestion(
type='rewrite',
priority='medium',
description='优化查询条件,减少扫描行数',
impact='减少I/O操作,提升查询效率'
))
# 检查临时表
if 'Using temporary' in extra:
analysis['issues'].append(f"查询使用临时表")
analysis['suggestions'].append(OptimizationSuggestion(
type='index',
priority='medium',
description='优化GROUP BY或ORDER BY,避免使用临时表',
impact='减少内存使用和磁盘I/O'
))
# 检查文件排序
if 'Using filesort' in extra:
analysis['issues'].append(f"查询使用文件排序")
analysis['suggestions'].append(OptimizationSuggestion(
type='index',
priority='medium',
description='为ORDER BY列添加索引',
impact='避免排序操作,提升查询速度'
))
# 检查子查询
if select_type in ['DEPENDENT SUBQUERY', 'UNCACHEABLE SUBQUERY']:
analysis['issues'].append(f"存在低效的子查询: {select_type}")
analysis['suggestions'].append(OptimizationSuggestion(
type='rewrite',
priority='high',
description='将子查询重写为JOIN',
impact='避免重复执行子查询,提升性能'
))
except mysql.connector.Error as e:
analysis['error'] = str(e)
self.logger.error(f"执行计划分析失败: {e}")
finally:
connection.close()
return analysis
def suggest_indexes(self, query: str) -> List[str]:
"""基于查询建议索引"""
suggestions = []
# 简单的索引建议逻辑
# 提取WHERE子句中的列
where_match = re.search(r'WHERE\s+(.+?)(?:GROUP|ORDER|LIMIT|$)', query, re.IGNORECASE | re.DOTALL)
if where_match:
where_clause = where_match.group(1)
# 提取列名(简化版本)
column_pattern = r'(\w+\.)?([a-zA-Z_]\w*)\s*[=<>!]'
columns = re.findall(column_pattern, where_clause)
for table_prefix, column in columns:
if table_prefix:
table = table_prefix.rstrip('.')
suggestions.append(f"CREATE INDEX idx_{table}_{column} ON {table}({column});")
else:
suggestions.append(f"CREATE INDEX idx_{column} ON table_name({column});")
# 提取ORDER BY子句中的列
order_match = re.search(r'ORDER\s+BY\s+([^\s]+)', query, re.IGNORECASE)
if order_match:
order_column = order_match.group(1)
suggestions.append(f"CREATE INDEX idx_order_{order_column} ON table_name({order_column});")
return list(set(suggestions)) # 去重
def optimize_query(self, query: str) -> Dict:
"""综合优化分析"""
self.logger.info(f"开始优化查询: {query[:100]}...")
optimization_result = {
'original_query': query,
'structure_suggestions': self.analyze_query_structure(query),
'execution_analysis': self.analyze_execution_plan(query),
'index_suggestions': self.suggest_indexes(query),
'priority_actions': []
}
# 汇总高优先级建议
all_suggestions = optimization_result['structure_suggestions'] + \
optimization_result['execution_analysis'].get('suggestions', [])
high_priority = [s for s in all_suggestions if s.priority == 'high']
optimization_result['priority_actions'] = high_priority
return optimization_result
def generate_optimization_report(self, queries: List[str]) -> Dict:
"""生成优化报告"""
report = {
'timestamp': datetime.now().isoformat(),
'total_queries': len(queries),
'query_analyses': [],
'summary': {
'high_priority_issues': 0,
'medium_priority_issues': 0,
'low_priority_issues': 0,
'suggested_indexes': set()
}
}
for query in queries:
analysis = self.optimize_query(query)
report['query_analyses'].append(analysis)
# 统计问题数量
all_suggestions = analysis['structure_suggestions'] + \
analysis['execution_analysis'].get('suggestions', [])
for suggestion in all_suggestions:
if suggestion.priority == 'high':
report['summary']['high_priority_issues'] += 1
elif suggestion.priority == 'medium':
report['summary']['medium_priority_issues'] += 1
else:
report['summary']['low_priority_issues'] += 1
# 收集索引建议
report['summary']['suggested_indexes'].update(analysis['index_suggestions'])
# 转换set为list以便JSON序列化
report['summary']['suggested_indexes'] = list(report['summary']['suggested_indexes'])
return report
# 使用示例
if __name__ == '__main__':
# 配置数据库连接
config = {
'host': 'localhost',
'port': 3306,
'user': 'root',
'password': 'password',
'database': 'test_db'
}
optimizer = SQLOptimizer(config)
# 测试查询
test_queries = [
"SELECT * FROM users WHERE YEAR(created_at) = 2024",
"SELECT u.name FROM users u WHERE EXISTS (SELECT 1 FROM posts p WHERE p.user_id = u.id)",
"SELECT * FROM products WHERE name LIKE '%phone%' ORDER BY RAND() LIMIT 10"
]
# 生成优化报告
report = optimizer.generate_optimization_report(test_queries)
print(f"分析了 {report['total_queries']} 个查询")
print(f"发现 {report['summary']['high_priority_issues']} 个高优先级问题")
print(f"建议创建 {len(report['summary']['suggested_indexes'])} 个索引")
12.3 配置优化
12.3.1 MySQL配置参数调优
#!/bin/bash
# MySQL配置优化脚本
# 根据服务器硬件自动生成优化的MySQL配置
set -euo pipefail
# 配置文件路径
MYSQL_CONFIG="/etc/mysql/mysql.conf.d/mysqld.cnf"
BACKUP_CONFIG="/etc/mysql/mysql.conf.d/mysqld.cnf.backup.$(date +%Y%m%d_%H%M%S)"
LOG_FILE="/var/log/mysql_optimization.log"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 获取系统信息
get_system_info() {
log "获取系统信息..."
# 内存信息(GB)
TOTAL_MEMORY=$(free -g | awk '/^Mem:/{print $2}')
if [ "$TOTAL_MEMORY" -eq 0 ]; then
TOTAL_MEMORY=1
fi
# CPU核心数
CPU_CORES=$(nproc)
# 磁盘信息
DISK_TYPE="HDD" # 默认为HDD
if lsblk -d -o name,rota | grep -q "0"; then
DISK_TYPE="SSD"
fi
log "系统配置: ${TOTAL_MEMORY}GB内存, ${CPU_CORES}核CPU, ${DISK_TYPE}存储"
}
# 计算InnoDB缓冲池大小
calculate_innodb_buffer_pool_size() {
# 建议使用70-80%的可用内存
if [ "$TOTAL_MEMORY" -le 4 ]; then
BUFFER_POOL_SIZE="${TOTAL_MEMORY}G"
elif [ "$TOTAL_MEMORY" -le 16 ]; then
BUFFER_POOL_SIZE="$((TOTAL_MEMORY * 3 / 4))G"
else
BUFFER_POOL_SIZE="$((TOTAL_MEMORY * 4 / 5))G"
fi
log "InnoDB缓冲池大小: $BUFFER_POOL_SIZE"
}
# 计算连接数配置
calculate_connection_settings() {
# 基于CPU核心数和内存计算
MAX_CONNECTIONS=$((CPU_CORES * 50))
if [ "$MAX_CONNECTIONS" -lt 100 ]; then
MAX_CONNECTIONS=100
elif [ "$MAX_CONNECTIONS" -gt 1000 ]; then
MAX_CONNECTIONS=1000
fi
THREAD_CACHE_SIZE=$((CPU_CORES * 2))
if [ "$THREAD_CACHE_SIZE" -lt 8 ]; then
THREAD_CACHE_SIZE=8
elif [ "$THREAD_CACHE_SIZE" -gt 64 ]; then
THREAD_CACHE_SIZE=64
fi
log "连接配置: max_connections=$MAX_CONNECTIONS, thread_cache_size=$THREAD_CACHE_SIZE"
}
# 计算查询缓存配置
calculate_query_cache_settings() {
# MySQL 8.0已移除查询缓存,这里为兼容性保留
MYSQL_VERSION=$(mysql --version | grep -oP '\d+\.\d+' | head -1)
MAJOR_VERSION=$(echo "$MYSQL_VERSION" | cut -d. -f1)
if [ "$MAJOR_VERSION" -lt 8 ]; then
if [ "$TOTAL_MEMORY" -le 4 ]; then
QUERY_CACHE_SIZE="64M"
elif [ "$TOTAL_MEMORY" -le 16 ]; then
QUERY_CACHE_SIZE="128M"
else
QUERY_CACHE_SIZE="256M"
fi
QUERY_CACHE_TYPE=1
else
QUERY_CACHE_SIZE="0"
QUERY_CACHE_TYPE=0
fi
log "查询缓存配置: query_cache_size=$QUERY_CACHE_SIZE"
}
# 计算InnoDB日志配置
calculate_innodb_log_settings() {
# 基于缓冲池大小计算日志文件大小
BUFFER_POOL_MB=$(echo "$BUFFER_POOL_SIZE" | sed 's/G/*1024/g' | bc)
LOG_FILE_SIZE=$((BUFFER_POOL_MB / 4))
if [ "$LOG_FILE_SIZE" -lt 64 ]; then
LOG_FILE_SIZE=64
elif [ "$LOG_FILE_SIZE" -gt 2048 ]; then
LOG_FILE_SIZE=2048
fi
INNODB_LOG_FILE_SIZE="${LOG_FILE_SIZE}M"
INNODB_LOG_BUFFER_SIZE="16M"
if [ "$TOTAL_MEMORY" -gt 8 ]; then
INNODB_LOG_BUFFER_SIZE="32M"
fi
log "InnoDB日志配置: log_file_size=$INNODB_LOG_FILE_SIZE, log_buffer_size=$INNODB_LOG_BUFFER_SIZE"
}
# 计算其他InnoDB配置
calculate_other_innodb_settings() {
# InnoDB实例数(通常等于CPU核心数,但不超过64)
INNODB_BUFFER_POOL_INSTANCES=$CPU_CORES
if [ "$INNODB_BUFFER_POOL_INSTANCES" -gt 64 ]; then
INNODB_BUFFER_POOL_INSTANCES=64
fi
# InnoDB读写线程数
INNODB_READ_IO_THREADS=$((CPU_CORES / 2))
INNODB_WRITE_IO_THREADS=$((CPU_CORES / 2))
if [ "$INNODB_READ_IO_THREADS" -lt 4 ]; then
INNODB_READ_IO_THREADS=4
fi
if [ "$INNODB_WRITE_IO_THREADS" -lt 4 ]; then
INNODB_WRITE_IO_THREADS=4
fi
# 根据存储类型调整刷新设置
if [ "$DISK_TYPE" = "SSD" ]; then
INNODB_FLUSH_METHOD="O_DIRECT"
INNODB_IO_CAPACITY=2000
INNODB_IO_CAPACITY_MAX=4000
else
INNODB_FLUSH_METHOD="O_DIRECT"
INNODB_IO_CAPACITY=200
INNODB_IO_CAPACITY_MAX=400
fi
log "其他InnoDB配置: buffer_pool_instances=$INNODB_BUFFER_POOL_INSTANCES, io_capacity=$INNODB_IO_CAPACITY"
}
# 生成优化配置
generate_optimized_config() {
log "生成优化配置文件..."
cat > "/tmp/mysql_optimized.cnf" << EOF
# MySQL优化配置
# 生成时间: $(date)
# 系统配置: ${TOTAL_MEMORY}GB内存, ${CPU_CORES}核CPU, ${DISK_TYPE}存储
[mysqld]
# 基本设置
port = 3306
socket = /var/run/mysqld/mysqld.sock
pid-file = /var/run/mysqld/mysqld.pid
datadir = /var/lib/mysql
tmpdir = /tmp
# 字符集设置
character-set-server = utf8mb4
collation-server = utf8mb4_unicode_ci
init-connect = 'SET NAMES utf8mb4'
# 连接设置
max_connections = $MAX_CONNECTIONS
max_connect_errors = 1000
thread_cache_size = $THREAD_CACHE_SIZE
back_log = $((MAX_CONNECTIONS / 2))
# 查询缓存设置(MySQL 5.7及以下)
query_cache_type = $QUERY_CACHE_TYPE
query_cache_size = $QUERY_CACHE_SIZE
query_cache_limit = 2M
# 表缓存设置
table_open_cache = $((MAX_CONNECTIONS * 2))
table_definition_cache = $((MAX_CONNECTIONS * 2))
# 排序和临时表设置
sort_buffer_size = 2M
read_buffer_size = 1M
read_rnd_buffer_size = 2M
join_buffer_size = 2M
tmp_table_size = 64M
max_heap_table_size = 64M
# 慢查询日志
slow_query_log = 1
slow_query_log_file = /var/log/mysql/slow.log
long_query_time = 2
log_queries_not_using_indexes = 1
log_slow_admin_statements = 1
# 错误日志
log_error = /var/log/mysql/error.log
log_error_verbosity = 2
# 二进制日志
log_bin = /var/log/mysql/mysql-bin
binlog_format = ROW
binlog_expire_logs_seconds = 604800 # 7天
max_binlog_size = 100M
sync_binlog = 1
# InnoDB设置
default_storage_engine = InnoDB
innodb_buffer_pool_size = $BUFFER_POOL_SIZE
innodb_buffer_pool_instances = $INNODB_BUFFER_POOL_INSTANCES
innodb_log_file_size = $INNODB_LOG_FILE_SIZE
innodb_log_buffer_size = $INNODB_LOG_BUFFER_SIZE
innodb_flush_log_at_trx_commit = 1
innodb_flush_method = $INNODB_FLUSH_METHOD
innodb_file_per_table = 1
innodb_read_io_threads = $INNODB_READ_IO_THREADS
innodb_write_io_threads = $INNODB_WRITE_IO_THREADS
innodb_io_capacity = $INNODB_IO_CAPACITY
innodb_io_capacity_max = $INNODB_IO_CAPACITY_MAX
innodb_thread_concurrency = 0
innodb_lock_wait_timeout = 50
innodb_rollback_on_timeout = 1
innodb_print_all_deadlocks = 1
innodb_autoinc_lock_mode = 2
# MyISAM设置(如果使用)
key_buffer_size = 32M
myisam_sort_buffer_size = 8M
myisam_max_sort_file_size = 1G
myisam_repair_threads = 1
# 网络设置
max_allowed_packet = 64M
interactive_timeout = 28800
wait_timeout = 28800
# 安全设置
local_infile = 0
skip_name_resolve = 1
[mysql]
default-character-set = utf8mb4
[client]
default-character-set = utf8mb4
port = 3306
socket = /var/run/mysqld/mysqld.sock
EOF
log "优化配置已生成到 /tmp/mysql_optimized.cnf"
}
# 备份当前配置
backup_current_config() {
if [ -f "$MYSQL_CONFIG" ]; then
log "备份当前配置到 $BACKUP_CONFIG"
cp "$MYSQL_CONFIG" "$BACKUP_CONFIG"
else
log "警告: 未找到当前配置文件 $MYSQL_CONFIG"
fi
}
# 应用新配置
apply_new_config() {
log "应用新配置..."
# 检查MySQL是否运行
if systemctl is-active --quiet mysql; then
log "停止MySQL服务..."
systemctl stop mysql
MYSQL_WAS_RUNNING=true
else
MYSQL_WAS_RUNNING=false
fi
# 复制新配置
cp "/tmp/mysql_optimized.cnf" "$MYSQL_CONFIG"
# 设置正确的权限
chown mysql:mysql "$MYSQL_CONFIG"
chmod 644 "$MYSQL_CONFIG"
# 启动MySQL服务
if [ "$MYSQL_WAS_RUNNING" = true ]; then
log "启动MySQL服务..."
systemctl start mysql
# 等待MySQL启动
sleep 5
# 检查MySQL状态
if systemctl is-active --quiet mysql; then
log "MySQL服务启动成功"
else
log "错误: MySQL服务启动失败,恢复备份配置"
cp "$BACKUP_CONFIG" "$MYSQL_CONFIG"
systemctl start mysql
exit 1
fi
fi
}
# 验证配置
validate_config() {
log "验证MySQL配置..."
# 检查MySQL是否可以连接
if mysql -e "SELECT 1" > /dev/null 2>&1; then
log "MySQL连接测试成功"
else
log "错误: MySQL连接测试失败"
return 1
fi
# 显示关键配置参数
log "当前配置参数:"
mysql -e "
SELECT
@@max_connections as max_connections,
@@innodb_buffer_pool_size as buffer_pool_size,
@@thread_cache_size as thread_cache_size,
@@innodb_log_file_size as log_file_size;
" 2>/dev/null | tee -a "$LOG_FILE"
}
# 生成配置报告
generate_config_report() {
log "生成配置优化报告..."
REPORT_FILE="/tmp/mysql_optimization_report_$(date +%Y%m%d_%H%M%S).txt"
cat > "$REPORT_FILE" << EOF
MySQL配置优化报告
生成时间: $(date)
系统信息:
- 内存: ${TOTAL_MEMORY}GB
- CPU核心: ${CPU_CORES}
- 存储类型: ${DISK_TYPE}
主要优化配置:
- InnoDB缓冲池大小: $BUFFER_POOL_SIZE
- 最大连接数: $MAX_CONNECTIONS
- 线程缓存大小: $THREAD_CACHE_SIZE
- InnoDB日志文件大小: $INNODB_LOG_FILE_SIZE
- InnoDB缓冲池实例数: $INNODB_BUFFER_POOL_INSTANCES
- InnoDB I/O容量: $INNODB_IO_CAPACITY
配置文件位置:
- 当前配置: $MYSQL_CONFIG
- 备份配置: $BACKUP_CONFIG
- 优化配置: /tmp/mysql_optimized.cnf
建议:
1. 监控MySQL性能指标,根据实际负载调整配置
2. 定期检查慢查询日志,优化SQL查询
3. 监控InnoDB缓冲池命中率,确保在95%以上
4. 根据业务增长调整max_connections参数
5. 定期备份配置文件,便于回滚
EOF
log "优化报告已生成: $REPORT_FILE"
}
# 主函数
main() {
log "开始MySQL配置优化..."
# 检查权限
if [ "$EUID" -ne 0 ]; then
echo "请以root权限运行此脚本"
exit 1
fi
# 获取系统信息
get_system_info
# 计算优化参数
calculate_innodb_buffer_pool_size
calculate_connection_settings
calculate_query_cache_settings
calculate_innodb_log_settings
calculate_other_innodb_settings
# 生成配置
generate_optimized_config
# 询问是否应用配置
echo
read -p "是否应用新配置?这将重启MySQL服务 (y/N): " -n 1 -r
echo
if [[ $REPLY =~ ^[Yy]$ ]]; then
backup_current_config
apply_new_config
validate_config
else
log "配置未应用,优化配置已保存到 /tmp/mysql_optimized.cnf"
fi
# 生成报告
generate_config_report
log "MySQL配置优化完成"
}
# 执行主函数
main "$@"
12.3.2 性能参数监控
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MySQL性能参数实时监控系统
监控关键性能指标并提供告警
"""
import mysql.connector
import time
import json
import smtplib
from email.mime.text import MimeText
from email.mime.multipart import MimeMultipart
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import logging
import threading
import queue
from dataclasses import dataclass, asdict
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
from collections import deque
@dataclass
class PerformanceAlert:
"""性能告警数据类"""
metric_name: str
current_value: float
threshold_value: float
severity: str # critical, warning, info
timestamp: datetime
message: str
class MySQLPerformanceMonitor:
"""MySQL性能监控器"""
def __init__(self, connection_config: Dict, alert_config: Dict = None):
self.connection_config = connection_config
self.alert_config = alert_config or {}
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# 监控状态
self.is_monitoring = False
self.monitor_thread = None
self.alert_queue = queue.Queue()
# 性能数据存储(最近1000个数据点)
self.performance_data = {
'timestamps': deque(maxlen=1000),
'cpu_usage': deque(maxlen=1000),
'memory_usage': deque(maxlen=1000),
'connections': deque(maxlen=1000),
'qps': deque(maxlen=1000),
'tps': deque(maxlen=1000),
'buffer_hit_rate': deque(maxlen=1000),
'slow_queries': deque(maxlen=1000)
}
# 告警阈值配置
self.thresholds = {
'connections': {'warning': 80, 'critical': 95},
'buffer_hit_rate': {'warning': 95, 'critical': 90},
'slow_query_rate': {'warning': 5, 'critical': 10},
'cpu_usage': {'warning': 70, 'critical': 90},
'memory_usage': {'warning': 80, 'critical': 95},
'disk_usage': {'warning': 80, 'critical': 90},
'replication_lag': {'warning': 60, 'critical': 300} # 秒
}
# 告警历史(避免重复告警)
self.alert_history = {}
self.alert_cooldown = 300 # 5分钟冷却期
def get_connection(self):
"""获取MySQL连接"""
try:
return mysql.connector.connect(**self.connection_config)
except mysql.connector.Error as e:
self.logger.error(f"MySQL连接失败: {e}")
return None
def collect_performance_metrics(self) -> Dict:
"""收集性能指标"""
connection = self.get_connection()
if not connection:
return {}
metrics = {}
timestamp = datetime.now()
try:
cursor = connection.cursor(dictionary=True)
# 获取全局状态变量
cursor.execute("SHOW GLOBAL STATUS")
status_vars = {row['Variable_name']: row['Value'] for row in cursor.fetchall()}
# 获取全局变量
cursor.execute("SHOW GLOBAL VARIABLES")
global_vars = {row['Variable_name']: row['Value'] for row in cursor.fetchall()}
# 计算连接使用率
max_connections = int(global_vars.get('max_connections', 1))
threads_connected = int(status_vars.get('Threads_connected', 0))
connection_usage = (threads_connected / max_connections * 100)
# 计算QPS
cursor.execute("SHOW GLOBAL STATUS LIKE 'Uptime'")
uptime = int(cursor.fetchone()['Value'])
queries = int(status_vars.get('Queries', 0))
qps = queries / uptime if uptime > 0 else 0
# 计算TPS
com_commit = int(status_vars.get('Com_commit', 0))
com_rollback = int(status_vars.get('Com_rollback', 0))
tps = (com_commit + com_rollback) / uptime if uptime > 0 else 0
# 计算缓冲池命中率
buffer_reads = int(status_vars.get('Innodb_buffer_pool_reads', 0))
buffer_read_requests = int(status_vars.get('Innodb_buffer_pool_read_requests', 1))
buffer_hit_rate = ((buffer_read_requests - buffer_reads) / buffer_read_requests * 100) if buffer_read_requests > 0 else 0
# 计算慢查询率
slow_queries = int(status_vars.get('Slow_queries', 0))
slow_query_rate = (slow_queries / queries * 100) if queries > 0 else 0
# 获取复制延迟(如果是从库)
replication_lag = 0
try:
cursor.execute("SHOW SLAVE STATUS")
slave_status = cursor.fetchone()
if slave_status:
replication_lag = slave_status.get('Seconds_Behind_Master', 0) or 0
except:
pass
metrics = {
'timestamp': timestamp,
'connections': {
'current': threads_connected,
'max': max_connections,
'usage_percent': connection_usage
},
'performance': {
'qps': qps,
'tps': tps,
'buffer_hit_rate': buffer_hit_rate,
'slow_query_rate': slow_query_rate
},
'replication': {
'lag_seconds': replication_lag
},
'innodb': {
'buffer_pool_pages_total': int(status_vars.get('Innodb_buffer_pool_pages_total', 0)),
'buffer_pool_pages_free': int(status_vars.get('Innodb_buffer_pool_pages_free', 0)),
'buffer_pool_pages_dirty': int(status_vars.get('Innodb_buffer_pool_pages_dirty', 0)),
'log_waits': int(status_vars.get('Innodb_log_waits', 0)),
'row_lock_waits': int(status_vars.get('Innodb_row_lock_waits', 0))
}
}
except mysql.connector.Error as e:
self.logger.error(f"收集性能指标失败: {e}")
finally:
connection.close()
return metrics
def check_thresholds(self, metrics: Dict) -> List[PerformanceAlert]:
"""检查阈值并生成告警"""
alerts = []
timestamp = metrics.get('timestamp', datetime.now())
# 检查连接使用率
conn_usage = metrics.get('connections', {}).get('usage_percent', 0)
if conn_usage >= self.thresholds['connections']['critical']:
alerts.append(PerformanceAlert(
metric_name='connections',
current_value=conn_usage,
threshold_value=self.thresholds['connections']['critical'],
severity='critical',
timestamp=timestamp,
message=f'连接使用率过高: {conn_usage:.1f}%'
))
elif conn_usage >= self.thresholds['connections']['warning']:
alerts.append(PerformanceAlert(
metric_name='connections',
current_value=conn_usage,
threshold_value=self.thresholds['connections']['warning'],
severity='warning',
timestamp=timestamp,
message=f'连接使用率较高: {conn_usage:.1f}%'
))
# 检查缓冲池命中率
buffer_hit_rate = metrics.get('performance', {}).get('buffer_hit_rate', 100)
if buffer_hit_rate <= self.thresholds['buffer_hit_rate']['critical']:
alerts.append(PerformanceAlert(
metric_name='buffer_hit_rate',
current_value=buffer_hit_rate,
threshold_value=self.thresholds['buffer_hit_rate']['critical'],
severity='critical',
timestamp=timestamp,
message=f'缓冲池命中率过低: {buffer_hit_rate:.1f}%'
))
elif buffer_hit_rate <= self.thresholds['buffer_hit_rate']['warning']:
alerts.append(PerformanceAlert(
metric_name='buffer_hit_rate',
current_value=buffer_hit_rate,
threshold_value=self.thresholds['buffer_hit_rate']['warning'],
severity='warning',
timestamp=timestamp,
message=f'缓冲池命中率较低: {buffer_hit_rate:.1f}%'
))
# 检查慢查询率
slow_query_rate = metrics.get('performance', {}).get('slow_query_rate', 0)
if slow_query_rate >= self.thresholds['slow_query_rate']['critical']:
alerts.append(PerformanceAlert(
metric_name='slow_query_rate',
current_value=slow_query_rate,
threshold_value=self.thresholds['slow_query_rate']['critical'],
severity='critical',
timestamp=timestamp,
message=f'慢查询率过高: {slow_query_rate:.1f}%'
))
elif slow_query_rate >= self.thresholds['slow_query_rate']['warning']:
alerts.append(PerformanceAlert(
metric_name='slow_query_rate',
current_value=slow_query_rate,
threshold_value=self.thresholds['slow_query_rate']['warning'],
severity='warning',
timestamp=timestamp,
message=f'慢查询率较高: {slow_query_rate:.1f}%'
))
# 检查复制延迟
replication_lag = metrics.get('replication', {}).get('lag_seconds', 0)
if replication_lag >= self.thresholds['replication_lag']['critical']:
alerts.append(PerformanceAlert(
metric_name='replication_lag',
current_value=replication_lag,
threshold_value=self.thresholds['replication_lag']['critical'],
severity='critical',
timestamp=timestamp,
message=f'复制延迟过高: {replication_lag}秒'
))
elif replication_lag >= self.thresholds['replication_lag']['warning']:
alerts.append(PerformanceAlert(
metric_name='replication_lag',
current_value=replication_lag,
threshold_value=self.thresholds['replication_lag']['warning'],
severity='warning',
timestamp=timestamp,
message=f'复制延迟较高: {replication_lag}秒'
))
return alerts
def should_send_alert(self, alert: PerformanceAlert) -> bool:
"""检查是否应该发送告警(避免重复告警)"""
alert_key = f"{alert.metric_name}_{alert.severity}"
now = datetime.now()
if alert_key in self.alert_history:
last_alert_time = self.alert_history[alert_key]
if (now - last_alert_time).total_seconds() < self.alert_cooldown:
return False
self.alert_history[alert_key] = now
return True
def send_email_alert(self, alert: PerformanceAlert):
"""发送邮件告警"""
if not self.alert_config.get('email'):
return
email_config = self.alert_config['email']
try:
msg = MimeMultipart()
msg['From'] = email_config['from']
msg['To'] = ', '.join(email_config['to'])
msg['Subject'] = f"MySQL性能告警 - {alert.severity.upper()}"
body = f"""
MySQL性能告警通知
告警级别: {alert.severity.upper()}
指标名称: {alert.metric_name}
当前值: {alert.current_value}
阈值: {alert.threshold_value}
告警时间: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
告警信息: {alert.message}
服务器信息:
主机: {self.connection_config['host']}
端口: {self.connection_config['port']}
请及时处理!
"""
msg.attach(MimeText(body, 'plain', 'utf-8'))
server = smtplib.SMTP(email_config['smtp_server'], email_config['smtp_port'])
if email_config.get('use_tls'):
server.starttls()
if email_config.get('username'):
server.login(email_config['username'], email_config['password'])
server.send_message(msg)
server.quit()
self.logger.info(f"告警邮件已发送: {alert.message}")
except Exception as e:
self.logger.error(f"发送告警邮件失败: {e}")
def process_alerts(self):
"""处理告警队列"""
while self.is_monitoring:
try:
alert = self.alert_queue.get(timeout=1)
if self.should_send_alert(alert):
self.logger.warning(f"性能告警: {alert.message}")
# 发送邮件告警
if self.alert_config.get('email'):
self.send_email_alert(alert)
# 可以添加其他告警方式(短信、钉钉等)
self.alert_queue.task_done()
except queue.Empty:
continue
except Exception as e:
self.logger.error(f"处理告警失败: {e}")
def store_metrics(self, metrics: Dict):
"""存储性能指标数据"""
timestamp = metrics.get('timestamp')
if not timestamp:
return
self.performance_data['timestamps'].append(timestamp)
self.performance_data['connections'].append(
metrics.get('connections', {}).get('usage_percent', 0)
)
self.performance_data['qps'].append(
metrics.get('performance', {}).get('qps', 0)
)
self.performance_data['tps'].append(
metrics.get('performance', {}).get('tps', 0)
)
self.performance_data['buffer_hit_rate'].append(
metrics.get('performance', {}).get('buffer_hit_rate', 0)
)
self.performance_data['slow_queries'].append(
metrics.get('performance', {}).get('slow_query_rate', 0)
)
def generate_performance_chart(self, output_file: str = None):
"""生成性能图表"""
if not output_file:
output_file = f"mysql_performance_{datetime.now().strftime('%Y%m%d_%H%M%S')}.png"
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('MySQL性能监控图表', fontsize=16)
timestamps = list(self.performance_data['timestamps'])
if not timestamps:
self.logger.warning("没有性能数据可用于生成图表")
return
# 连接使用率
axes[0, 0].plot(timestamps, list(self.performance_data['connections']), 'b-')
axes[0, 0].set_title('连接使用率 (%)')
axes[0, 0].set_ylabel('使用率')
axes[0, 0].grid(True)
# QPS
axes[0, 1].plot(timestamps, list(self.performance_data['qps']), 'g-')
axes[0, 1].set_title('每秒查询数 (QPS)')
axes[0, 1].set_ylabel('查询数')
axes[0, 1].grid(True)
# 缓冲池命中率
axes[1, 0].plot(timestamps, list(self.performance_data['buffer_hit_rate']), 'r-')
axes[1, 0].set_title('缓冲池命中率 (%)')
axes[1, 0].set_ylabel('命中率')
axes[1, 0].grid(True)
# TPS
axes[1, 1].plot(timestamps, list(self.performance_data['tps']), 'm-')
axes[1, 1].set_title('每秒事务数 (TPS)')
axes[1, 1].set_ylabel('事务数')
axes[1, 1].grid(True)
# 格式化时间轴
for ax in axes.flat:
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
ax.xaxis.set_major_locator(mdates.MinuteLocator(interval=10))
plt.setp(ax.xaxis.get_majorticklabels(), rotation=45)
plt.tight_layout()
plt.savefig(output_file, dpi=300, bbox_inches='tight')
plt.close()
self.logger.info(f"性能图表已保存到: {output_file}")
def monitor_loop(self, interval: int = 60):
"""监控主循环"""
self.logger.info(f"开始MySQL性能监控,间隔: {interval}秒")
while self.is_monitoring:
try:
# 收集性能指标
metrics = self.collect_performance_metrics()
if not metrics:
time.sleep(interval)
continue
# 存储指标数据
self.store_metrics(metrics)
# 检查告警阈值
alerts = self.check_thresholds(metrics)
for alert in alerts:
self.alert_queue.put(alert)
# 记录性能指标
conn_usage = metrics.get('connections', {}).get('usage_percent', 0)
qps = metrics.get('performance', {}).get('qps', 0)
buffer_hit_rate = metrics.get('performance', {}).get('buffer_hit_rate', 0)
self.logger.info(
f"性能指标 - 连接: {conn_usage:.1f}%, QPS: {qps:.1f}, "
f"缓冲池命中率: {buffer_hit_rate:.1f}%"
)
time.sleep(interval)
except Exception as e:
self.logger.error(f"监控循环异常: {e}")
time.sleep(interval)
def start_monitoring(self, interval: int = 60):
"""启动监控"""
if self.is_monitoring:
self.logger.warning("监控已在运行中")
return
self.is_monitoring = True
# 启动告警处理线程
alert_thread = threading.Thread(target=self.process_alerts, daemon=True)
alert_thread.start()
# 启动监控线程
self.monitor_thread = threading.Thread(
target=self.monitor_loop,
args=(interval,),
daemon=True
)
self.monitor_thread.start()
self.logger.info("MySQL性能监控已启动")
def stop_monitoring(self):
"""停止监控"""
if not self.is_monitoring:
self.logger.warning("监控未在运行")
return
self.is_monitoring = False
if self.monitor_thread:
self.monitor_thread.join(timeout=5)
self.logger.info("MySQL性能监控已停止")
def get_current_status(self) -> Dict:
"""获取当前状态摘要"""
metrics = self.collect_performance_metrics()
if not metrics:
return {}
return {
'timestamp': metrics['timestamp'].isoformat(),
'connections': metrics.get('connections', {}),
'performance': metrics.get('performance', {}),
'replication': metrics.get('replication', {}),
'monitoring_status': 'running' if self.is_monitoring else 'stopped'
}
# 使用示例
if __name__ == '__main__':
# 数据库连接配置
db_config = {
'host': 'localhost',
'port': 3306,
'user': 'monitor_user',
'password': 'monitor_password',
'database': 'mysql'
}
# 告警配置
alert_config = {
'email': {
'smtp_server': 'smtp.gmail.com',
'smtp_port': 587,
'use_tls': True,
'from': 'monitor@example.com',
'to': ['admin@example.com'],
'username': 'monitor@example.com',
'password': 'email_password'
}
}
# 创建监控器
monitor = MySQLPerformanceMonitor(db_config, alert_config)
try:
# 启动监控
monitor.start_monitoring(interval=30) # 30秒间隔
# 运行一段时间
time.sleep(300) # 5分钟
# 生成性能图表
monitor.generate_performance_chart()
# 显示当前状态
status = monitor.get_current_status()
print(json.dumps(status, indent=2, default=str))
except KeyboardInterrupt:
print("\n收到中断信号,停止监控...")
finally:
monitor.stop_monitoring()
12.4 性能调优实践
12.4.1 综合性能调优方案
#!/bin/bash
# MySQL综合性能调优脚本
# 包含硬件检测、配置优化、索引分析、查询优化等
set -euo pipefail
# 配置变量
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
LOG_FILE="/var/log/mysql_tuning.log"
REPORT_DIR="/tmp/mysql_tuning_$(date +%Y%m%d_%H%M%S)"
MYSQL_USER="root"
MYSQL_PASSWORD=""
MYSQL_HOST="localhost"
MYSQL_PORT="3306"
# 创建报告目录
mkdir -p "$REPORT_DIR"
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" | tee -a "$LOG_FILE"
}
# 错误处理
error_exit() {
log "错误: $1"
exit 1
}
# 检查MySQL连接
check_mysql_connection() {
log "检查MySQL连接..."
if ! mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT 1" >/dev/null 2>&1; then
error_exit "无法连接到MySQL服务器"
fi
log "MySQL连接正常"
}
# 收集系统信息
collect_system_info() {
log "收集系统信息..."
cat > "$REPORT_DIR/system_info.txt" << EOF
系统信息收集报告
生成时间: $(date)
=== 硬件信息 ===
CPU信息:
$(lscpu | grep -E '^(Architecture|CPU|Thread|Core|Socket)')
内存信息:
$(free -h)
磁盘信息:
$(df -h)
磁盘I/O统计:
$(iostat -x 1 3 2>/dev/null || echo "iostat未安装")
=== 操作系统信息 ===
$(uname -a)
内核参数:
$(sysctl -a | grep -E '(vm\.|net\.|kernel\.)' | head -20)
=== MySQL版本信息 ===
$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "SELECT VERSION();")
EOF
log "系统信息已收集到: $REPORT_DIR/system_info.txt"
}
# 分析MySQL配置
analyze_mysql_config() {
log "分析MySQL配置..."
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
VARIABLE_NAME,
VARIABLE_VALUE
FROM performance_schema.global_variables
WHERE VARIABLE_NAME IN (
'innodb_buffer_pool_size',
'max_connections',
'thread_cache_size',
'query_cache_size',
'innodb_log_file_size',
'innodb_log_buffer_size',
'sort_buffer_size',
'read_buffer_size',
'join_buffer_size',
'tmp_table_size',
'max_heap_table_size'
)
ORDER BY VARIABLE_NAME;
" > "$REPORT_DIR/mysql_config.txt"
log "MySQL配置分析完成: $REPORT_DIR/mysql_config.txt"
}
# 分析性能状态
analyze_performance_status() {
log "分析性能状态..."
# 连接状态
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
'Connection Analysis' as Category,
CONCAT('Current: ', @@GLOBAL.threads_connected,
' / Max: ', @@GLOBAL.max_connections,
' (', ROUND(@@GLOBAL.threads_connected/@@GLOBAL.max_connections*100, 2), '%)') as Value
UNION ALL
SELECT
'Thread Cache Hit Rate',
CONCAT(ROUND((1 - (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Threads_created') /
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Connections')) * 100, 2), '%')
UNION ALL
SELECT
'InnoDB Buffer Pool Hit Rate',
CONCAT(ROUND((1 - (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads') /
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_buffer_pool_read_requests')) * 100, 2), '%')
UNION ALL
SELECT
'Query Cache Hit Rate',
CASE
WHEN (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Qcache_hits') = 0 THEN 'N/A (Query Cache Disabled)'
ELSE CONCAT(ROUND((SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Qcache_hits') /
((SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Qcache_hits') +
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Com_select')) * 100, 2), '%')
END;
" > "$REPORT_DIR/performance_status.txt"
log "性能状态分析完成: $REPORT_DIR/performance_status.txt"
}
# 分析慢查询
analyze_slow_queries() {
log "分析慢查询..."
# 检查慢查询日志是否启用
SLOW_LOG_ENABLED=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN -e "SELECT @@GLOBAL.slow_query_log;")
if [ "$SLOW_LOG_ENABLED" = "1" ]; then
# 从performance_schema获取慢查询统计
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
SCHEMA_NAME as 'Database',
DIGEST_TEXT as 'Query Pattern',
COUNT_STAR as 'Exec Count',
ROUND(AVG_TIMER_WAIT/1000000000, 3) as 'Avg Time (sec)',
ROUND(MAX_TIMER_WAIT/1000000000, 3) as 'Max Time (sec)',
ROUND(SUM_TIMER_WAIT/1000000000, 3) as 'Total Time (sec)',
SUM_ROWS_EXAMINED as 'Total Rows Examined',
SUM_ROWS_SENT as 'Total Rows Sent'
FROM performance_schema.events_statements_summary_by_digest
WHERE DIGEST_TEXT IS NOT NULL
AND AVG_TIMER_WAIT > 1000000000 -- 大于1秒的查询
ORDER BY AVG_TIMER_WAIT DESC
LIMIT 20;
" > "$REPORT_DIR/slow_queries.txt"
else
echo "慢查询日志未启用" > "$REPORT_DIR/slow_queries.txt"
fi
log "慢查询分析完成: $REPORT_DIR/slow_queries.txt"
}
# 分析表和索引
analyze_tables_indexes() {
log "分析表和索引..."
# 获取所有数据库(排除系统数据库)
DATABASES=$(mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -sN -e "
SHOW DATABASES;
" | grep -vE '^(information_schema|performance_schema|mysql|sys)$')
# 分析每个数据库的表
for db in $DATABASES; do
log "分析数据库: $db"
# 表大小统计
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
table_name as 'Table',
ROUND(((data_length + index_length) / 1024 / 1024), 2) as 'Size (MB)',
table_rows as 'Rows',
ROUND(((data_length) / 1024 / 1024), 2) as 'Data (MB)',
ROUND(((index_length) / 1024 / 1024), 2) as 'Index (MB)',
ROUND((index_length / data_length), 2) as 'Index Ratio'
FROM information_schema.tables
WHERE table_schema = '$db'
AND table_type = 'BASE TABLE'
ORDER BY (data_length + index_length) DESC;
" > "$REPORT_DIR/tables_${db}.txt"
# 未使用的索引
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
object_schema as 'Database',
object_name as 'Table',
index_name as 'Index'
FROM performance_schema.table_io_waits_summary_by_index_usage
WHERE object_schema = '$db'
AND index_name IS NOT NULL
AND index_name != 'PRIMARY'
AND count_star = 0
ORDER BY object_name, index_name;
" > "$REPORT_DIR/unused_indexes_${db}.txt"
done
log "表和索引分析完成"
}
# 生成优化建议
generate_recommendations() {
log "生成优化建议..."
cat > "$REPORT_DIR/recommendations.txt" << 'EOF'
MySQL性能优化建议
==================
基于分析结果的优化建议:
1. 配置优化建议
- 检查 innodb_buffer_pool_size 是否为可用内存的70-80%
- 确保 max_connections 设置合理,避免过高或过低
- 调整 thread_cache_size 以提高连接复用率
- 优化临时表相关参数 tmp_table_size 和 max_heap_table_size
2. 索引优化建议
- 删除未使用的索引以减少写入开销
- 为慢查询中的WHERE条件添加适当索引
- 检查复合索引的列顺序是否符合最左前缀原则
- 考虑为大表的ORDER BY列添加索引
3. 查询优化建议
- 重写慢查询,避免全表扫描
- 使用LIMIT限制结果集大小
- 避免在WHERE子句中使用函数
- 优化子查询,考虑改写为JOIN
4. 硬件优化建议
- 考虑使用SSD存储以提高I/O性能
- 增加内存以提高缓冲池命中率
- 使用RAID配置提高磁盘性能和可靠性
5. 监控建议
- 启用慢查询日志并定期分析
- 监控关键性能指标(QPS、TPS、连接数等)
- 设置性能告警阈值
- 定期检查错误日志
6. 维护建议
- 定期执行ANALYZE TABLE更新统计信息
- 清理过期的二进制日志
- 定期备份和恢复测试
- 保持MySQL版本更新
EOF
log "优化建议已生成: $REPORT_DIR/recommendations.txt"
}
# 生成性能调优脚本
generate_tuning_script() {
log "生成性能调优脚本..."
# 获取当前系统信息
TOTAL_MEMORY_GB=$(free -g | awk '/^Mem:/{print $2}')
CPU_CORES=$(nproc)
# 计算推荐配置
BUFFER_POOL_SIZE=$((TOTAL_MEMORY_GB * 3 / 4))
if [ "$BUFFER_POOL_SIZE" -lt 1 ]; then
BUFFER_POOL_SIZE=1
fi
MAX_CONNECTIONS=$((CPU_CORES * 50))
if [ "$MAX_CONNECTIONS" -lt 100 ]; then
MAX_CONNECTIONS=100
elif [ "$MAX_CONNECTIONS" -gt 1000 ]; then
MAX_CONNECTIONS=1000
fi
cat > "$REPORT_DIR/apply_tuning.sql" << EOF
-- MySQL性能调优SQL脚本
-- 生成时间: $(date)
-- 系统配置: ${TOTAL_MEMORY_GB}GB内存, ${CPU_CORES}核CPU
-- 设置全局变量(需要重启MySQL生效的参数请修改配置文件)
-- 连接相关
SET GLOBAL max_connections = $MAX_CONNECTIONS;
SET GLOBAL thread_cache_size = $((CPU_CORES * 2));
-- 缓冲区相关
SET GLOBAL sort_buffer_size = 2097152; -- 2MB
SET GLOBAL read_buffer_size = 1048576; -- 1MB
SET GLOBAL join_buffer_size = 2097152; -- 2MB
-- 临时表相关
SET GLOBAL tmp_table_size = 67108864; -- 64MB
SET GLOBAL max_heap_table_size = 67108864; -- 64MB
-- 慢查询日志
SET GLOBAL slow_query_log = 1;
SET GLOBAL long_query_time = 2;
SET GLOBAL log_queries_not_using_indexes = 1;
-- InnoDB相关
SET GLOBAL innodb_lock_wait_timeout = 50;
SET GLOBAL innodb_rollback_on_timeout = 1;
SET GLOBAL innodb_print_all_deadlocks = 1;
-- 显示当前设置
SELECT 'Configuration Applied' as Status;
SHOW VARIABLES LIKE 'max_connections';
SHOW VARIABLES LIKE 'thread_cache_size';
SHOW VARIABLES LIKE 'slow_query_log';
EOF
log "调优脚本已生成: $REPORT_DIR/apply_tuning.sql"
}
# 生成监控脚本
generate_monitoring_script() {
log "生成监控脚本..."
cat > "$REPORT_DIR/monitor_performance.sh" << 'EOF'
#!/bin/bash
# MySQL性能监控脚本
MYSQL_USER="root"
MYSQL_PASSWORD=""
MYSQL_HOST="localhost"
MYSQL_PORT="3306"
echo "MySQL性能监控报告 - $(date)"
echo "======================================"
# 连接状态
echo "\n=== 连接状态 ==="
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
CONCAT('当前连接: ', @@GLOBAL.threads_connected, ' / ', @@GLOBAL.max_connections,
' (', ROUND(@@GLOBAL.threads_connected/@@GLOBAL.max_connections*100, 1), '%)');
"
# QPS和TPS
echo "\n=== QPS/TPS ==="
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
ROUND((SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Queries') /
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Uptime'), 2) as 'QPS',
ROUND(((SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Com_commit') +
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Com_rollback')) /
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Uptime'), 2) as 'TPS';
"
# 缓冲池状态
echo "\n=== InnoDB缓冲池 ==="
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
ROUND((1 - (SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_buffer_pool_reads') /
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_buffer_pool_read_requests')) * 100, 2) as 'Buffer Pool Hit Rate (%)',
ROUND((SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_buffer_pool_pages_dirty') /
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Innodb_buffer_pool_pages_total') * 100, 2) as 'Dirty Pages (%)';
"
# 慢查询
echo "\n=== 慢查询 ==="
mysql -h"$MYSQL_HOST" -P"$MYSQL_PORT" -u"$MYSQL_USER" -p"$MYSQL_PASSWORD" -e "
SELECT
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Slow_queries') as 'Total Slow Queries',
ROUND((SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Slow_queries') /
(SELECT VARIABLE_VALUE FROM performance_schema.global_status WHERE VARIABLE_NAME = 'Queries') * 100, 4) as 'Slow Query Rate (%)';
"
EOF
chmod +x "$REPORT_DIR/monitor_performance.sh"
log "监控脚本已生成: $REPORT_DIR/monitor_performance.sh"
}
# 生成最终报告
generate_final_report() {
log "生成最终报告..."
cat > "$REPORT_DIR/README.md" << EOF
# MySQL性能调优报告
生成时间: $(date)
服务器: $MYSQL_HOST:$MYSQL_PORT
## 报告文件说明
- **system_info.txt**: 系统硬件和软件信息
- **mysql_config.txt**: 当前MySQL配置参数
- **performance_status.txt**: 性能状态分析
- **slow_queries.txt**: 慢查询分析
- **tables_*.txt**: 各数据库的表大小统计
- **unused_indexes_*.txt**: 未使用的索引列表
- **recommendations.txt**: 优化建议
- **apply_tuning.sql**: 可执行的调优SQL脚本
- **monitor_performance.sh**: 性能监控脚本
## 使用说明
1. 查看各个分析文件了解当前状态
2. 阅读 recommendations.txt 中的优化建议
3. 根据需要执行 apply_tuning.sql 中的调优命令
4. 使用 monitor_performance.sh 持续监控性能
## 注意事项
- 在生产环境应用配置更改前请先在测试环境验证
- 某些配置参数需要重启MySQL服务才能生效
- 建议在业务低峰期进行调优操作
- 调优后需要持续监控性能变化
EOF
log "最终报告已生成: $REPORT_DIR/README.md"
}
# 主函数
main() {
log "开始MySQL性能调优分析..."
# 检查依赖
if ! command -v mysql &> /dev/null; then
error_exit "MySQL客户端未安装"
fi
# 检查连接
check_mysql_connection
# 执行分析
collect_system_info
analyze_mysql_config
analyze_performance_status
analyze_slow_queries
analyze_tables_indexes
# 生成建议和脚本
generate_recommendations
generate_tuning_script
generate_monitoring_script
generate_final_report
log "MySQL性能调优分析完成"
log "报告目录: $REPORT_DIR"
echo
echo "性能调优分析完成!"
echo "报告目录: $REPORT_DIR"
echo "请查看 $REPORT_DIR/README.md 了解详细信息"
}
# 解析命令行参数
while [[ $# -gt 0 ]]; do
case $1 in
-h|--host)
MYSQL_HOST="$2"
shift 2
;;
-P|--port)
MYSQL_PORT="$2"
shift 2
;;
-u|--user)
MYSQL_USER="$2"
shift 2
;;
-p|--password)
MYSQL_PASSWORD="$2"
shift 2
;;
--help)
echo "用法: $0 [选项]"
echo "选项:"
echo " -h, --host HOST MySQL主机地址 (默认: localhost)"
echo " -P, --port PORT MySQL端口 (默认: 3306)"
echo " -u, --user USER MySQL用户名 (默认: root)"
echo " -p, --password PASS MySQL密码"
echo " --help 显示此帮助信息"
exit 0
;;
*)
error_exit "未知参数: $1"
;;
esac
done
# 执行主函数
main
12.5 总结
本章详细介绍了MySQL性能优化与监控的各个方面,从理论基础到实践应用,为数据库管理员和开发人员提供了全面的性能调优指南。
12.5.1 核心要点回顾
1. 性能优化层次
- 应用层优化: SQL查询优化、连接池管理、缓存策略
- 数据库层优化: 索引设计、查询执行计划、存储引擎选择
- 服务器层优化: MySQL配置参数、内存分配、I/O优化
- 系统层优化: 操作系统调优、硬件配置、网络优化
2. 查询优化技术
- 执行计划分析: 使用EXPLAIN分析查询性能
- 索引策略: 单列索引、复合索引、覆盖索引、前缀索引
- 查询重写: 避免SELECT *、优化子查询、使用LIMIT
- 自动化工具: SQL优化分析器、性能建议生成
3. 配置参数调优
- 内存配置: innodb_buffer_pool_size、sort_buffer_size
- 连接配置: max_connections、thread_cache_size
- I/O配置: innodb_io_capacity、innodb_flush_method
- 日志配置: innodb_log_file_size、slow_query_log
4. 性能监控体系
- 关键指标: QPS、TPS、连接数、缓冲池命中率
- 告警机制: 阈值设置、邮件通知、实时监控
- 可视化: 性能图表、趋势分析、历史数据
- 自动化: 监控脚本、报告生成、问题诊断
12.5.2 实施建议
1. 分阶段实施
第一阶段: 基础监控
├── 启用慢查询日志
├── 配置性能监控
├── 建立基线指标
└── 设置基本告警
第二阶段: 配置优化
├── 分析当前配置
├── 根据硬件调整参数
├── 测试配置变更
└── 监控性能变化
第三阶段: 查询优化
├── 分析慢查询
├── 优化索引设计
├── 重写低效查询
└── 验证优化效果
第四阶段: 持续优化
├── 定期性能评估
├── 容量规划
├── 架构优化
└── 新技术应用
2. 最佳实践
- 监控先行: 建立完善的监控体系
- 测试验证: 在测试环境验证所有变更
- 渐进优化: 逐步实施,避免大幅变更
- 文档记录: 记录所有配置变更和优化过程
- 定期评估: 定期检查优化效果和新的优化机会
3. 常见问题处理
- 连接数过高: 检查连接池配置、优化应用连接管理
- 缓冲池命中率低: 增加innodb_buffer_pool_size
- 慢查询增多: 分析查询模式、优化索引
- 锁等待频繁: 优化事务逻辑、减少锁持有时间
- I/O瓶颈: 优化存储配置、考虑硬件升级
12.5.3 下一步学习方向
- 高可用架构: MySQL主从复制、集群部署
- 分库分表: 水平分片、垂直分片策略
- 读写分离: 负载均衡、数据一致性
- 云数据库: RDS、Aurora等云服务
- 新存储引擎: TokuDB、RocksDB等
- 大数据集成: 与Hadoop、Spark的集成
通过本章的学习,您应该能够: - 建立完整的MySQL性能监控体系 - 识别和解决常见的性能问题 - 制定适合的性能优化策略 - 使用自动化工具进行性能分析 - 持续优化数据库性能
性能优化是一个持续的过程,需要根据业务发展和技术演进不断调整和完善。下一章我们将学习MySQL高可用架构设计,探讨如何构建稳定可靠的数据库服务。