6.1 性能优化基础

6.1.1 性能瓶颈识别

from neo4j import GraphDatabase
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import time
import psutil
import json

@dataclass
class QueryPerformance:
    """查询性能指标"""
    query: str
    execution_time: float
    db_hits: int
    page_cache_hits: int
    page_cache_misses: int
    result_count: int
    memory_usage: int
    timestamp: datetime

@dataclass
class SystemMetrics:
    """系统性能指标"""
    cpu_usage: float
    memory_usage: float
    disk_io_read: int
    disk_io_write: int
    network_io_sent: int
    network_io_recv: int
    timestamp: datetime

class Neo4jPerformanceMonitor:
    """Neo4j性能监控器"""
    
    def __init__(self, uri: str, username: str, password: str):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))
        self.query_history: List[QueryPerformance] = []
        self.system_metrics: List[SystemMetrics] = []
    
    def close(self):
        if self.driver:
            self.driver.close()
    
    def profile_query(self, query: str, parameters: Dict[str, Any] = None) -> QueryPerformance:
        """分析查询性能"""
        
        profile_query = f"PROFILE {query}"
        
        with self.driver.session() as session:
            start_time = time.time()
            start_memory = psutil.Process().memory_info().rss
            
            result = session.run(profile_query, parameters or {})
            records = list(result)
            
            end_time = time.time()
            end_memory = psutil.Process().memory_info().rss
            
            summary = result.consume()
            profile = summary.profile
            
            performance = QueryPerformance(
                query=query,
                execution_time=end_time - start_time,
                db_hits=getattr(profile, 'db_hits', 0),
                page_cache_hits=getattr(summary.counters, 'page_cache_hits', 0),
                page_cache_misses=getattr(summary.counters, 'page_cache_misses', 0),
                result_count=len(records),
                memory_usage=end_memory - start_memory,
                timestamp=datetime.now()
            )
            
            self.query_history.append(performance)
            return performance
    
    def explain_query(self, query: str, parameters: Dict[str, Any] = None) -> Dict[str, Any]:
        """解释查询执行计划"""
        
        explain_query = f"EXPLAIN {query}"
        
        with self.driver.session() as session:
            result = session.run(explain_query, parameters or {})
            summary = result.consume()
            
            return {
                'plan': summary.plan,
                'estimated_rows': getattr(summary.plan, 'estimated_rows', 0),
                'operator_type': getattr(summary.plan, 'operator_type', 'Unknown'),
                'identifiers': getattr(summary.plan, 'identifiers', []),
                'arguments': getattr(summary.plan, 'arguments', {})
            }
    
    def collect_system_metrics(self) -> SystemMetrics:
        """收集系统性能指标"""
        
        cpu_percent = psutil.cpu_percent(interval=1)
        memory = psutil.virtual_memory()
        disk_io = psutil.disk_io_counters()
        network_io = psutil.net_io_counters()
        
        metrics = SystemMetrics(
            cpu_usage=cpu_percent,
            memory_usage=memory.percent,
            disk_io_read=disk_io.read_bytes if disk_io else 0,
            disk_io_write=disk_io.write_bytes if disk_io else 0,
            network_io_sent=network_io.bytes_sent if network_io else 0,
            network_io_recv=network_io.bytes_recv if network_io else 0,
            timestamp=datetime.now()
        )
        
        self.system_metrics.append(metrics)
        return metrics
    
    def get_database_metrics(self) -> Dict[str, Any]:
        """获取数据库性能指标"""
        
        queries = {
            'node_count': "MATCH (n) RETURN count(n) as count",
            'relationship_count': "MATCH ()-[r]->() RETURN count(r) as count",
            'label_counts': "CALL db.labels() YIELD label CALL apoc.cypher.run('MATCH (n:' + label + ') RETURN count(n) as count', {}) YIELD value RETURN label, value.count as count",
            'relationship_type_counts': "CALL db.relationshipTypes() YIELD relationshipType CALL apoc.cypher.run('MATCH ()-[r:' + relationshipType + ']->() RETURN count(r) as count', {}) YIELD value RETURN relationshipType, value.count as count",
            'index_info': "CALL db.indexes() YIELD name, type, state, populationPercent RETURN name, type, state, populationPercent",
            'constraint_info': "CALL db.constraints() YIELD name, type, description RETURN name, type, description"
        }
        
        metrics = {}
        
        with self.driver.session() as session:
            for metric_name, query in queries.items():
                try:
                    result = session.run(query)
                    metrics[metric_name] = [record.data() for record in result]
                except Exception as e:
                    metrics[metric_name] = f"Error: {str(e)}"
        
        return metrics
    
    def analyze_slow_queries(self, threshold_seconds: float = 1.0) -> List[QueryPerformance]:
        """分析慢查询"""
        
        slow_queries = [
            perf for perf in self.query_history 
            if perf.execution_time > threshold_seconds
        ]
        
        # 按执行时间排序
        slow_queries.sort(key=lambda x: x.execution_time, reverse=True)
        return slow_queries
    
    def generate_performance_report(self) -> Dict[str, Any]:
        """生成性能报告"""
        
        if not self.query_history:
            return {"error": "No query history available"}
        
        # 查询统计
        execution_times = [q.execution_time for q in self.query_history]
        db_hits = [q.db_hits for q in self.query_history]
        
        query_stats = {
            'total_queries': len(self.query_history),
            'avg_execution_time': sum(execution_times) / len(execution_times),
            'max_execution_time': max(execution_times),
            'min_execution_time': min(execution_times),
            'avg_db_hits': sum(db_hits) / len(db_hits) if db_hits else 0,
            'slow_queries_count': len(self.analyze_slow_queries())
        }
        
        # 系统统计
        system_stats = {}
        if self.system_metrics:
            cpu_usage = [m.cpu_usage for m in self.system_metrics]
            memory_usage = [m.memory_usage for m in self.system_metrics]
            
            system_stats = {
                'avg_cpu_usage': sum(cpu_usage) / len(cpu_usage),
                'max_cpu_usage': max(cpu_usage),
                'avg_memory_usage': sum(memory_usage) / len(memory_usage),
                'max_memory_usage': max(memory_usage)
            }
        
        return {
            'query_statistics': query_stats,
            'system_statistics': system_stats,
            'database_metrics': self.get_database_metrics(),
            'report_timestamp': datetime.now().isoformat()
        }

# 使用示例
monitor = Neo4jPerformanceMonitor("bolt://localhost:7687", "neo4j", "password")

# 分析查询性能
test_query = "MATCH (p:Person)-[:KNOWS*1..3]-(friend) RETURN p.name, count(friend) as friend_count ORDER BY friend_count DESC LIMIT 10"
performance = monitor.profile_query(test_query)
print(f"查询执行时间: {performance.execution_time:.4f}秒")
print(f"数据库命中次数: {performance.db_hits}")
print(f"结果数量: {performance.result_count}")

# 解释查询计划
explain_result = monitor.explain_query(test_query)
print(f"\n查询计划: {explain_result['operator_type']}")
print(f"预估行数: {explain_result['estimated_rows']}")

# 收集系统指标
system_metrics = monitor.collect_system_metrics()
print(f"\nCPU使用率: {system_metrics.cpu_usage}%")
print(f"内存使用率: {system_metrics.memory_usage}%")

# 生成性能报告
report = monitor.generate_performance_report()
print(f"\n性能报告:")
print(f"平均查询时间: {report['query_statistics']['avg_execution_time']:.4f}秒")
print(f"慢查询数量: {report['query_statistics']['slow_queries_count']}")

6.1.2 索引优化

class Neo4jIndexOptimizer:
    """Neo4j索引优化器"""
    
    def __init__(self, monitor: Neo4jPerformanceMonitor):
        self.monitor = monitor
        self.driver = monitor.driver
    
    def analyze_query_patterns(self) -> Dict[str, Any]:
        """分析查询模式"""
        
        patterns = {
            'property_access': {},
            'label_usage': {},
            'relationship_traversal': {},
            'filter_conditions': {}
        }
        
        for query_perf in self.monitor.query_history:
            query = query_perf.query.upper()
            
            # 分析属性访问模式
            import re
            property_matches = re.findall(r'\w+\.\w+', query)
            for match in property_matches:
                if match not in patterns['property_access']:
                    patterns['property_access'][match] = 0
                patterns['property_access'][match] += 1
            
            # 分析标签使用
            label_matches = re.findall(r':\w+', query)
            for match in label_matches:
                if match not in patterns['label_usage']:
                    patterns['label_usage'][match] = 0
                patterns['label_usage'][match] += 1
        
        return patterns
    
    def suggest_indexes(self) -> List[Dict[str, Any]]:
        """建议创建索引"""
        
        patterns = self.analyze_query_patterns()
        suggestions = []
        
        # 基于属性访问频率建议索引
        for property_access, frequency in patterns['property_access'].items():
            if frequency >= 3:  # 频繁访问的属性
                parts = property_access.split('.')
                if len(parts) == 2:
                    suggestions.append({
                        'type': 'property_index',
                        'property': parts[1],
                        'frequency': frequency,
                        'priority': 'high' if frequency >= 10 else 'medium',
                        'create_statement': f"CREATE INDEX FOR (n) ON (n.{parts[1]})"
                    })
        
        # 基于标签使用建议复合索引
        label_properties = {}
        for query_perf in self.monitor.query_history:
            query = query_perf.query
            # 简化的模式匹配,实际应用中需要更复杂的解析
            if 'WHERE' in query.upper():
                # 提取WHERE子句中的条件
                where_part = query.upper().split('WHERE')[1].split('RETURN')[0]
                if '=' in where_part:
                    suggestions.append({
                        'type': 'composite_index',
                        'description': 'Consider composite index for WHERE conditions',
                        'priority': 'medium',
                        'analysis_needed': True
                    })
        
        return suggestions
    
    def create_index(self, index_statement: str) -> bool:
        """创建索引"""
        
        try:
            with self.driver.session() as session:
                session.run(index_statement)
                print(f"索引创建成功: {index_statement}")
                return True
        except Exception as e:
            print(f"索引创建失败: {e}")
            return False
    
    def get_existing_indexes(self) -> List[Dict[str, Any]]:
        """获取现有索引"""
        
        query = "CALL db.indexes() YIELD name, type, state, populationPercent, properties RETURN name, type, state, populationPercent, properties"
        
        with self.driver.session() as session:
            result = session.run(query)
            return [record.data() for record in result]
    
    def analyze_index_usage(self) -> Dict[str, Any]:
        """分析索引使用情况"""
        
        indexes = self.get_existing_indexes()
        usage_analysis = {
            'total_indexes': len(indexes),
            'by_state': {},
            'by_type': {},
            'population_status': []
        }
        
        for index in indexes:
            # 按状态分组
            state = index['state']
            if state not in usage_analysis['by_state']:
                usage_analysis['by_state'][state] = 0
            usage_analysis['by_state'][state] += 1
            
            # 按类型分组
            index_type = index['type']
            if index_type not in usage_analysis['by_type']:
                usage_analysis['by_type'][index_type] = 0
            usage_analysis['by_type'][index_type] += 1
            
            # 填充状态
            usage_analysis['population_status'].append({
                'name': index['name'],
                'population_percent': index['populationPercent'],
                'is_fully_populated': index['populationPercent'] >= 100.0
            })
        
        return usage_analysis
    
    def optimize_indexes(self) -> Dict[str, Any]:
        """优化索引"""
        
        suggestions = self.suggest_indexes()
        existing_indexes = self.get_existing_indexes()
        
        optimization_plan = {
            'create_suggestions': [],
            'drop_suggestions': [],
            'rebuild_suggestions': []
        }
        
        # 建议创建的索引
        for suggestion in suggestions:
            if suggestion['priority'] == 'high':
                optimization_plan['create_suggestions'].append(suggestion)
        
        # 建议删除未使用的索引
        for index in existing_indexes:
            if index['state'] == 'FAILED':
                optimization_plan['drop_suggestions'].append({
                    'name': index['name'],
                    'reason': 'Index is in FAILED state',
                    'drop_statement': f"DROP INDEX {index['name']}"
                })
        
        # 建议重建的索引
        for index in existing_indexes:
            if index['populationPercent'] < 100.0 and index['state'] == 'ONLINE':
                optimization_plan['rebuild_suggestions'].append({
                    'name': index['name'],
                    'reason': f"Index population is {index['populationPercent']}%",
                    'action': 'Monitor or consider rebuilding'
                })
        
        return optimization_plan

# 使用示例
optimizer = Neo4jIndexOptimizer(monitor)

# 分析查询模式
patterns = optimizer.analyze_query_patterns()
print("查询模式分析:")
print(f"属性访问: {patterns['property_access']}")
print(f"标签使用: {patterns['label_usage']}")

# 获取索引建议
suggestions = optimizer.suggest_indexes()
print(f"\n索引建议 ({len(suggestions)} 个):")
for suggestion in suggestions:
    print(f"  类型: {suggestion['type']}, 优先级: {suggestion['priority']}")
    if 'create_statement' in suggestion:
        print(f"  创建语句: {suggestion['create_statement']}")

# 分析现有索引
index_analysis = optimizer.analyze_index_usage()
print(f"\n索引分析:")
print(f"总索引数: {index_analysis['total_indexes']}")
print(f"按状态分布: {index_analysis['by_state']}")
print(f"按类型分布: {index_analysis['by_type']}")

# 获取优化计划
optimization_plan = optimizer.optimize_indexes()
print(f"\n优化计划:")
print(f"建议创建: {len(optimization_plan['create_suggestions'])} 个索引")
print(f"建议删除: {len(optimization_plan['drop_suggestions'])} 个索引")
print(f"建议重建: {len(optimization_plan['rebuild_suggestions'])} 个索引")

6.2 查询优化

6.2.1 Cypher查询优化

class CypherQueryOptimizer:
    """Cypher查询优化器"""
    
    def __init__(self, monitor: Neo4jPerformanceMonitor):
        self.monitor = monitor
        self.driver = monitor.driver
    
    def analyze_query_complexity(self, query: str) -> Dict[str, Any]:
        """分析查询复杂度"""
        
        complexity_metrics = {
            'cartesian_products': 0,
            'variable_length_paths': 0,
            'aggregations': 0,
            'sorts': 0,
            'limits': 0,
            'subqueries': 0,
            'complexity_score': 0
        }
        
        query_upper = query.upper()
        
        # 检测笛卡尔积
        if 'MATCH' in query_upper:
            match_count = query_upper.count('MATCH')
            if match_count > 1 and 'WHERE' not in query_upper:
                complexity_metrics['cartesian_products'] = match_count - 1
        
        # 检测变长路径
        import re
        var_length_patterns = re.findall(r'\[.*?\*.*?\]', query)
        complexity_metrics['variable_length_paths'] = len(var_length_patterns)
        
        # 检测聚合函数
        aggregation_functions = ['COUNT', 'SUM', 'AVG', 'MAX', 'MIN', 'COLLECT']
        for func in aggregation_functions:
            complexity_metrics['aggregations'] += query_upper.count(func)
        
        # 检测排序和限制
        complexity_metrics['sorts'] = query_upper.count('ORDER BY')
        complexity_metrics['limits'] = query_upper.count('LIMIT')
        
        # 检测子查询
        complexity_metrics['subqueries'] = query_upper.count('EXISTS') + query_upper.count('CALL')
        
        # 计算复杂度分数
        complexity_metrics['complexity_score'] = (
            complexity_metrics['cartesian_products'] * 10 +
            complexity_metrics['variable_length_paths'] * 5 +
            complexity_metrics['aggregations'] * 2 +
            complexity_metrics['sorts'] * 3 +
            complexity_metrics['subqueries'] * 4
        )
        
        return complexity_metrics
    
    def suggest_query_optimizations(self, query: str) -> List[Dict[str, Any]]:
        """建议查询优化"""
        
        suggestions = []
        query_upper = query.upper()
        complexity = self.analyze_query_complexity(query)
        
        # 检查是否缺少WHERE子句
        if 'MATCH' in query_upper and 'WHERE' not in query_upper and complexity['cartesian_products'] > 0:
            suggestions.append({
                'type': 'add_where_clause',
                'priority': 'high',
                'description': 'Consider adding WHERE clause to avoid cartesian products',
                'impact': 'Reduces result set size and improves performance'
            })
        
        # 检查变长路径的长度限制
        if complexity['variable_length_paths'] > 0:
            import re
            var_length_patterns = re.findall(r'\[.*?\*.*?\]', query)
            for pattern in var_length_patterns:
                if '..' not in pattern or '*]' in pattern:
                    suggestions.append({
                        'type': 'limit_variable_length',
                        'priority': 'high',
                        'description': f'Add upper bound to variable length pattern: {pattern}',
                        'impact': 'Prevents excessive graph traversal'
                    })
        
        # 检查是否应该使用索引
        if 'WHERE' in query_upper:
            where_part = query.split('WHERE')[1].split('RETURN')[0] if 'RETURN' in query else query.split('WHERE')[1]
            if '=' in where_part:
                suggestions.append({
                    'type': 'use_index',
                    'priority': 'medium',
                    'description': 'Ensure indexes exist for equality conditions in WHERE clause',
                    'impact': 'Faster node lookup'
                })
        
        # 检查LIMIT的使用
        if 'ORDER BY' in query_upper and 'LIMIT' not in query_upper:
            suggestions.append({
                'type': 'add_limit',
                'priority': 'medium',
                'description': 'Consider adding LIMIT when using ORDER BY',
                'impact': 'Reduces sorting overhead for large result sets'
            })
        
        # 检查聚合查询的优化
        if complexity['aggregations'] > 0 and 'GROUP BY' not in query_upper:
            suggestions.append({
                'type': 'consider_grouping',
                'priority': 'low',
                'description': 'Consider if GROUP BY would be more appropriate than aggregation',
                'impact': 'Better performance for grouped aggregations'
            })
        
        return suggestions
    
    def rewrite_query_for_performance(self, query: str) -> Dict[str, Any]:
        """重写查询以提高性能"""
        
        original_query = query
        optimized_query = query
        optimizations_applied = []
        
        # 优化1: 添加变长路径的上限
        import re
        var_length_patterns = re.findall(r'\[.*?\*.*?\]', optimized_query)
        for pattern in var_length_patterns:
            if '..' not in pattern and '*]' in pattern:
                new_pattern = pattern.replace('*]', '*1..5]')  # 添加默认上限
                optimized_query = optimized_query.replace(pattern, new_pattern)
                optimizations_applied.append(f"Added upper bound to variable length: {pattern} -> {new_pattern}")
        
        # 优化2: 重新排列MATCH子句(将最具选择性的放在前面)
        if optimized_query.upper().count('MATCH') > 1:
            # 这是一个简化的实现,实际应用中需要更复杂的分析
            lines = optimized_query.split('\n')
            match_lines = [line for line in lines if line.strip().upper().startswith('MATCH')]
            other_lines = [line for line in lines if not line.strip().upper().startswith('MATCH')]
            
            # 简单的启发式:包含属性过滤的MATCH放在前面
            match_lines.sort(key=lambda x: 0 if '{' in x else 1)
            
            if match_lines != [line for line in lines if line.strip().upper().startswith('MATCH')]:
                optimized_query = '\n'.join(match_lines + other_lines)
                optimizations_applied.append("Reordered MATCH clauses for better selectivity")
        
        # 优化3: 添加LIMIT到子查询
        if 'ORDER BY' in optimized_query.upper() and 'LIMIT' not in optimized_query.upper():
            if optimized_query.upper().endswith('DESC') or optimized_query.upper().endswith('ASC'):
                optimized_query += ' LIMIT 1000'  # 添加默认限制
                optimizations_applied.append("Added LIMIT to prevent large result sets")
        
        return {
            'original_query': original_query,
            'optimized_query': optimized_query,
            'optimizations_applied': optimizations_applied,
            'estimated_improvement': len(optimizations_applied) * 0.2  # 简化的改进估计
        }
    
    def benchmark_query_variants(self, base_query: str, variants: List[str], 
                               parameters: Dict[str, Any] = None, iterations: int = 3) -> Dict[str, Any]:
        """基准测试查询变体"""
        
        results = {
            'base_query': {'query': base_query, 'performances': []},
            'variants': []
        }
        
        # 测试基础查询
        for i in range(iterations):
            perf = self.monitor.profile_query(base_query, parameters)
            results['base_query']['performances'].append(perf)
        
        # 测试变体
        for variant in variants:
            variant_result = {'query': variant, 'performances': []}
            for i in range(iterations):
                perf = self.monitor.profile_query(variant, parameters)
                variant_result['performances'].append(perf)
            results['variants'].append(variant_result)
        
        # 计算统计信息
        def calculate_stats(performances):
            execution_times = [p.execution_time for p in performances]
            db_hits = [p.db_hits for p in performances]
            return {
                'avg_execution_time': sum(execution_times) / len(execution_times),
                'min_execution_time': min(execution_times),
                'max_execution_time': max(execution_times),
                'avg_db_hits': sum(db_hits) / len(db_hits) if db_hits else 0
            }
        
        results['base_query']['stats'] = calculate_stats(results['base_query']['performances'])
        
        for variant_result in results['variants']:
            variant_result['stats'] = calculate_stats(variant_result['performances'])
            
            # 计算相对于基础查询的改进
            base_time = results['base_query']['stats']['avg_execution_time']
            variant_time = variant_result['stats']['avg_execution_time']
            variant_result['improvement_ratio'] = (base_time - variant_time) / base_time if base_time > 0 else 0
        
        return results
    
    def generate_optimization_report(self, query: str) -> Dict[str, Any]:
        """生成查询优化报告"""
        
        complexity = self.analyze_query_complexity(query)
        suggestions = self.suggest_query_optimizations(query)
        rewrite_result = self.rewrite_query_for_performance(query)
        
        # 如果有优化版本,进行基准测试
        benchmark_result = None
        if rewrite_result['optimized_query'] != query:
            benchmark_result = self.benchmark_query_variants(
                query, 
                [rewrite_result['optimized_query']]
            )
        
        return {
            'original_query': query,
            'complexity_analysis': complexity,
            'optimization_suggestions': suggestions,
            'rewrite_result': rewrite_result,
            'benchmark_result': benchmark_result,
            'overall_recommendation': self._generate_overall_recommendation(complexity, suggestions)
        }
    
    def _generate_overall_recommendation(self, complexity: Dict[str, Any], 
                                       suggestions: List[Dict[str, Any]]) -> str:
        """生成总体建议"""
        
        if complexity['complexity_score'] > 20:
            return "High complexity query - consider breaking into smaller parts or adding more constraints"
        elif complexity['complexity_score'] > 10:
            return "Medium complexity query - review suggestions for potential optimizations"
        elif len([s for s in suggestions if s['priority'] == 'high']) > 0:
            return "Low complexity but has high-priority optimization opportunities"
        else:
            return "Query appears to be well-optimized"

# 使用示例
optimizer = CypherQueryOptimizer(monitor)

# 分析复杂查询
complex_query = """
MATCH (p:Person)-[:KNOWS*]-(friend:Person)
MATCH (p)-[:WORKS_FOR]->(c:Company)
RETURN p.name, count(friend) as friend_count, c.name
ORDER BY friend_count DESC
"""

# 生成优化报告
report = optimizer.generate_optimization_report(complex_query)
print("查询优化报告:")
print(f"复杂度分数: {report['complexity_analysis']['complexity_score']}")
print(f"优化建议数量: {len(report['optimization_suggestions'])}")
print(f"总体建议: {report['overall_recommendation']}")

if report['rewrite_result']['optimizations_applied']:
    print("\n应用的优化:")
    for opt in report['rewrite_result']['optimizations_applied']:
        print(f"  - {opt}")
    
    print(f"\n优化后的查询:")
    print(report['rewrite_result']['optimized_query'])

if report['benchmark_result']:
    base_time = report['benchmark_result']['base_query']['stats']['avg_execution_time']
    variant_time = report['benchmark_result']['variants'][0]['stats']['avg_execution_time']
    improvement = (base_time - variant_time) / base_time * 100
    print(f"\n性能改进: {improvement:.1f}%")

6.3 内存和存储优化

6.3.1 内存配置优化

class Neo4jMemoryOptimizer:
    """Neo4j内存优化器"""
    
    def __init__(self, monitor: Neo4jPerformanceMonitor):
        self.monitor = monitor
        self.driver = monitor.driver
    
    def analyze_memory_usage(self) -> Dict[str, Any]:
        """分析内存使用情况"""
        
        # 获取JVM内存信息
        jvm_query = """
        CALL dbms.queryJmx('java.lang:type=Memory') 
        YIELD attributes 
        RETURN attributes.HeapMemoryUsage.used as heap_used,
               attributes.HeapMemoryUsage.max as heap_max,
               attributes.NonHeapMemoryUsage.used as non_heap_used,
               attributes.NonHeapMemoryUsage.max as non_heap_max
        """
        
        # 获取页面缓存信息
        page_cache_query = """
        CALL dbms.queryJmx('org.neo4j:instance=kernel#0,name=Page cache') 
        YIELD attributes 
        RETURN attributes.BytesRead as bytes_read,
               attributes.BytesWritten as bytes_written,
               attributes.Hits as cache_hits,
               attributes.Faults as cache_faults
        """
        
        memory_info = {}
        
        try:
            with self.driver.session() as session:
                # JVM内存
                result = session.run(jvm_query)
                record = result.single()
                if record:
                    memory_info['jvm'] = {
                        'heap_used_mb': record['heap_used'] / (1024 * 1024),
                        'heap_max_mb': record['heap_max'] / (1024 * 1024),
                        'heap_usage_percent': (record['heap_used'] / record['heap_max']) * 100,
                        'non_heap_used_mb': record['non_heap_used'] / (1024 * 1024),
                        'non_heap_max_mb': record['non_heap_max'] / (1024 * 1024) if record['non_heap_max'] > 0 else 0
                    }
                
                # 页面缓存
                result = session.run(page_cache_query)
                record = result.single()
                if record:
                    total_operations = record['cache_hits'] + record['cache_faults']
                    memory_info['page_cache'] = {
                        'bytes_read_mb': record['bytes_read'] / (1024 * 1024),
                        'bytes_written_mb': record['bytes_written'] / (1024 * 1024),
                        'cache_hits': record['cache_hits'],
                        'cache_faults': record['cache_faults'],
                        'hit_ratio': (record['cache_hits'] / total_operations) * 100 if total_operations > 0 else 0
                    }
        
        except Exception as e:
            memory_info['error'] = f"Failed to retrieve memory info: {str(e)}"
        
        return memory_info
    
    def calculate_optimal_memory_settings(self, total_system_memory_gb: float, 
                                        database_size_gb: float) -> Dict[str, Any]:
        """计算最优内存设置"""
        
        # Neo4j内存分配建议
        # 页面缓存:数据库大小的50-75%,但不超过系统内存的50%
        # 堆内存:通常1-8GB,取决于并发查询数量
        
        recommendations = {}
        
        # 页面缓存计算
        page_cache_by_db_size = database_size_gb * 0.6  # 数据库大小的60%
        page_cache_by_system = total_system_memory_gb * 0.5  # 系统内存的50%
        recommended_page_cache = min(page_cache_by_db_size, page_cache_by_system)
        
        # 堆内存计算
        if total_system_memory_gb <= 8:
            recommended_heap = min(2, total_system_memory_gb * 0.25)
        elif total_system_memory_gb <= 32:
            recommended_heap = min(4, total_system_memory_gb * 0.2)
        else:
            recommended_heap = min(8, total_system_memory_gb * 0.15)
        
        # 操作系统预留
        os_reserved = max(1, total_system_memory_gb * 0.1)
        
        recommendations = {
            'page_cache_gb': round(recommended_page_cache, 2),
            'heap_memory_gb': round(recommended_heap, 2),
            'os_reserved_gb': round(os_reserved, 2),
            'total_neo4j_memory_gb': round(recommended_page_cache + recommended_heap, 2),
            'memory_utilization_percent': round(((recommended_page_cache + recommended_heap) / total_system_memory_gb) * 100, 1),
            'configuration_settings': {
                'dbms.memory.pagecache.size': f"{recommended_page_cache:.1f}g",
                'dbms.memory.heap.initial_size': f"{recommended_heap:.1f}g",
                'dbms.memory.heap.max_size': f"{recommended_heap:.1f}g"
            }
        }
        
        # 添加警告和建议
        warnings = []
        if recommendations['memory_utilization_percent'] > 80:
            warnings.append("High memory utilization - consider adding more RAM or reducing cache size")
        
        if recommended_page_cache < database_size_gb * 0.3:
            warnings.append("Page cache is small relative to database size - may impact performance")
        
        if recommended_heap < 1:
            warnings.append("Heap memory is very small - may cause GC issues with concurrent queries")
        
        recommendations['warnings'] = warnings
        
        return recommendations
    
    def monitor_gc_performance(self) -> Dict[str, Any]:
        """监控垃圾回收性能"""
        
        gc_query = """
        CALL dbms.queryJmx('java.lang:type=GarbageCollector,name=*') 
        YIELD attributes, name 
        RETURN name, 
               attributes.CollectionCount as collection_count,
               attributes.CollectionTime as collection_time_ms
        """
        
        gc_info = {'collectors': [], 'total_collections': 0, 'total_time_ms': 0}
        
        try:
            with self.driver.session() as session:
                result = session.run(gc_query)
                
                for record in result:
                    collector_info = {
                        'name': record['name'],
                        'collection_count': record['collection_count'],
                        'collection_time_ms': record['collection_time_ms'],
                        'avg_collection_time_ms': record['collection_time_ms'] / record['collection_count'] if record['collection_count'] > 0 else 0
                    }
                    gc_info['collectors'].append(collector_info)
                    gc_info['total_collections'] += record['collection_count']
                    gc_info['total_time_ms'] += record['collection_time_ms']
                
                # 计算GC开销百分比(需要运行时间信息)
                if gc_info['total_collections'] > 0:
                    gc_info['avg_collection_time_ms'] = gc_info['total_time_ms'] / gc_info['total_collections']
        
        except Exception as e:
            gc_info['error'] = f"Failed to retrieve GC info: {str(e)}"
        
        return gc_info
    
    def generate_memory_optimization_report(self, total_system_memory_gb: float, 
                                          database_size_gb: float) -> Dict[str, Any]:
        """生成内存优化报告"""
        
        current_usage = self.analyze_memory_usage()
        optimal_settings = self.calculate_optimal_memory_settings(total_system_memory_gb, database_size_gb)
        gc_performance = self.monitor_gc_performance()
        
        # 分析当前配置与推荐配置的差异
        recommendations = []
        
        if 'jvm' in current_usage:
            current_heap_gb = current_usage['jvm']['heap_max_mb'] / 1024
            recommended_heap_gb = optimal_settings['heap_memory_gb']
            
            if abs(current_heap_gb - recommended_heap_gb) > 0.5:
                recommendations.append({
                    'type': 'heap_memory',
                    'current': f"{current_heap_gb:.1f}GB",
                    'recommended': f"{recommended_heap_gb:.1f}GB",
                    'action': 'Adjust heap memory allocation'
                })
            
            if current_usage['jvm']['heap_usage_percent'] > 80:
                recommendations.append({
                    'type': 'heap_usage',
                    'current': f"{current_usage['jvm']['heap_usage_percent']:.1f}%",
                    'action': 'High heap usage - consider increasing heap size or optimizing queries'
                })
        
        if 'page_cache' in current_usage:
            if current_usage['page_cache']['hit_ratio'] < 90:
                recommendations.append({
                    'type': 'page_cache_hit_ratio',
                    'current': f"{current_usage['page_cache']['hit_ratio']:.1f}%",
                    'action': 'Low cache hit ratio - consider increasing page cache size'
                })
        
        return {
            'current_memory_usage': current_usage,
            'optimal_settings': optimal_settings,
            'gc_performance': gc_performance,
            'recommendations': recommendations,
            'summary': self._generate_memory_summary(current_usage, optimal_settings, recommendations)
        }
    
    def _generate_memory_summary(self, current_usage: Dict[str, Any], 
                               optimal_settings: Dict[str, Any], 
                               recommendations: List[Dict[str, Any]]) -> str:
        """生成内存优化总结"""
        
        if len(recommendations) == 0:
            return "Memory configuration appears to be optimal"
        elif len(recommendations) <= 2:
            return "Minor memory optimizations recommended"
        else:
            return "Significant memory optimizations needed for better performance"

# 使用示例
memory_optimizer = Neo4jMemoryOptimizer(monitor)

# 分析当前内存使用
memory_usage = memory_optimizer.analyze_memory_usage()
print("当前内存使用:")
if 'jvm' in memory_usage:
    print(f"  堆内存使用: {memory_usage['jvm']['heap_usage_percent']:.1f}%")
    print(f"  堆内存大小: {memory_usage['jvm']['heap_max_mb']:.0f}MB")

if 'page_cache' in memory_usage:
    print(f"  页面缓存命中率: {memory_usage['page_cache']['hit_ratio']:.1f}%")

# 计算最优设置
optimal_settings = memory_optimizer.calculate_optimal_memory_settings(
    total_system_memory_gb=16.0,  # 16GB系统内存
    database_size_gb=5.0          # 5GB数据库
)

print(f"\n推荐内存配置:")
print(f"  页面缓存: {optimal_settings['page_cache_gb']}GB")
print(f"  堆内存: {optimal_settings['heap_memory_gb']}GB")
print(f"  内存利用率: {optimal_settings['memory_utilization_percent']}%")

if optimal_settings['warnings']:
    print("\n警告:")
    for warning in optimal_settings['warnings']:
        print(f"  - {warning}")

# 生成完整的内存优化报告
report = memory_optimizer.generate_memory_optimization_report(
    total_system_memory_gb=16.0,
    database_size_gb=5.0
)

print(f"\n内存优化总结: {report['summary']}")
if report['recommendations']:
    print("\n优化建议:")
    for rec in report['recommendations']:
        print(f"  {rec['type']}: {rec['action']}")

6.3.2 存储优化

class Neo4jStorageOptimizer:
    """Neo4j存储优化器"""
    
    def __init__(self, monitor: Neo4jPerformanceMonitor):
        self.monitor = monitor
        self.driver = monitor.driver
    
    def analyze_storage_usage(self) -> Dict[str, Any]:
        """分析存储使用情况"""
        
        storage_info = {}
        
        # 获取数据库大小信息
        size_query = """
        CALL apoc.monitor.store() 
        YIELD logSize, stringStoreSize, arrayStoreSize, relStoreSize, propStoreSize, totalStoreSize
        RETURN logSize, stringStoreSize, arrayStoreSize, relStoreSize, propStoreSize, totalStoreSize
        """
        
        # 获取节点和关系统计
        stats_query = """
        CALL apoc.meta.stats() 
        YIELD nodeCount, relCount, labelCount, relTypeCount, propertyKeyCount
        RETURN nodeCount, relCount, labelCount, relTypeCount, propertyKeyCount
        """
        
        try:
            with self.driver.session() as session:
                # 存储大小
                result = session.run(size_query)
                record = result.single()
                if record:
                    storage_info['store_sizes'] = {
                        'log_size_mb': record['logSize'] / (1024 * 1024),
                        'string_store_mb': record['stringStoreSize'] / (1024 * 1024),
                        'array_store_mb': record['arrayStoreSize'] / (1024 * 1024),
                        'relationship_store_mb': record['relStoreSize'] / (1024 * 1024),
                        'property_store_mb': record['propStoreSize'] / (1024 * 1024),
                        'total_store_mb': record['totalStoreSize'] / (1024 * 1024)
                    }
                
                # 数据统计
                result = session.run(stats_query)
                record = result.single()
                if record:
                    storage_info['data_stats'] = {
                        'node_count': record['nodeCount'],
                        'relationship_count': record['relCount'],
                        'label_count': record['labelCount'],
                        'relationship_type_count': record['relTypeCount'],
                        'property_key_count': record['propertyKeyCount']
                    }
                    
                    # 计算平均大小
                    if record['nodeCount'] > 0:
                        storage_info['averages'] = {
                            'avg_node_size_bytes': (record['totalStoreSize'] / record['nodeCount']) if record['nodeCount'] > 0 else 0,
                            'avg_relationship_size_bytes': (record['relStoreSize'] / record['relCount']) if record['relCount'] > 0 else 0
                        }
        
        except Exception as e:
            storage_info['error'] = f"Failed to retrieve storage info: {str(e)}"
        
        return storage_info
    
    def analyze_property_usage(self) -> Dict[str, Any]:
        """分析属性使用情况"""
        
        # 获取属性统计
        property_query = """
        CALL apoc.meta.data() 
        YIELD label, other, type, property, count, unique
        WHERE type = 'node'
        RETURN label, property, count, 
               CASE WHEN unique THEN 'unique' ELSE 'non-unique' END as uniqueness
        ORDER BY count DESC
        """
        
        property_analysis = {
            'by_label': {},
            'most_common_properties': [],
            'unique_properties': [],
            'total_property_instances': 0
        }
        
        try:
            with self.driver.session() as session:
                result = session.run(property_query)
                
                for record in result:
                    label = record['label']
                    property_name = record['property']
                    count = record['count']
                    uniqueness = record['uniqueness']
                    
                    # 按标签分组
                    if label not in property_analysis['by_label']:
                        property_analysis['by_label'][label] = []
                    
                    property_analysis['by_label'][label].append({
                        'property': property_name,
                        'count': count,
                        'uniqueness': uniqueness
                    })
                    
                    # 最常见属性
                    property_analysis['most_common_properties'].append({
                        'label': label,
                        'property': property_name,
                        'count': count
                    })
                    
                    # 唯一属性
                    if uniqueness == 'unique':
                        property_analysis['unique_properties'].append({
                            'label': label,
                            'property': property_name,
                            'count': count
                        })
                    
                    property_analysis['total_property_instances'] += count
                
                # 排序
                property_analysis['most_common_properties'].sort(key=lambda x: x['count'], reverse=True)
                property_analysis['unique_properties'].sort(key=lambda x: x['count'], reverse=True)
        
        except Exception as e:
            property_analysis['error'] = f"Failed to analyze properties: {str(e)}"
        
        return property_analysis
    
    def identify_storage_optimization_opportunities(self) -> List[Dict[str, Any]]:
        """识别存储优化机会"""
        
        storage_info = self.analyze_storage_usage()
        property_info = self.analyze_property_usage()
        
        opportunities = []
        
        # 检查存储分布
        if 'store_sizes' in storage_info:
            total_size = storage_info['store_sizes']['total_store_mb']
            string_ratio = storage_info['store_sizes']['string_store_mb'] / total_size
            array_ratio = storage_info['store_sizes']['array_store_mb'] / total_size
            
            if string_ratio > 0.3:
                opportunities.append({
                    'type': 'string_optimization',
                    'priority': 'medium',
                    'description': f'String store is {string_ratio:.1%} of total storage',
                    'recommendation': 'Consider normalizing long string properties or using shorter identifiers'
                })
            
            if array_ratio > 0.2:
                opportunities.append({
                    'type': 'array_optimization',
                    'priority': 'medium',
                    'description': f'Array store is {array_ratio:.1%} of total storage',
                    'recommendation': 'Review array properties for potential normalization'
                })
        
        # 检查属性使用模式
        if 'most_common_properties' in property_info:
            # 查找可能的冗余属性
            property_counts = {}
            for prop in property_info['most_common_properties']:
                prop_name = prop['property']
                if prop_name not in property_counts:
                    property_counts[prop_name] = 0
                property_counts[prop_name] += prop['count']
            
            # 查找在多个标签中重复的属性
            for prop_name, total_count in property_counts.items():
                labels_with_prop = [p['label'] for p in property_info['most_common_properties'] if p['property'] == prop_name]
                if len(labels_with_prop) > 3:  # 在超过3个标签中出现
                    opportunities.append({
                        'type': 'property_normalization',
                        'priority': 'low',
                        'description': f'Property "{prop_name}" appears in {len(labels_with_prop)} labels',
                        'recommendation': 'Consider creating a common base label or relationship for shared properties'
                    })
        
        # 检查数据密度
        if 'data_stats' in storage_usage:
    print(f"  节点数量: {storage_usage['data_stats']['node_count']:,}")
    print(f"  关系数量: {storage_usage['data_stats']['relationship_count']:,}")

# 识别优化机会
opportunities = storage_optimizer.identify_storage_optimization_opportunities()
print(f"\n存储优化机会 ({len(opportunities)} 个):")
for opp in opportunities:
    print(f"  {opp['type']}: {opp['description']} (优先级: {opp['priority']})")
    print(f"    建议: {opp['recommendation']}")

# 生成完整的存储优化报告
report = storage_optimizer.generate_storage_optimization_report()
print(f"\n存储优化总结: {report['summary']}")

monitor.close()

6.4 集群监控与告警

6.4.1 集群健康监控

from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import requests
import json
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import logging

@dataclass
class ClusterNode:
    """集群节点信息"""
    id: str
    address: str
    role: str  # LEADER, FOLLOWER, READ_REPLICA
    status: str  # ONLINE, OFFLINE, UNKNOWN
    last_seen: datetime
    lag_ms: Optional[int] = None

@dataclass
class AlertRule:
    """告警规则"""
    name: str
    metric: str
    threshold: float
    operator: str  # '>', '<', '>=', '<=', '==', '!='
    severity: str  # 'critical', 'warning', 'info'
    description: str
    enabled: bool = True

@dataclass
class Alert:
    """告警信息"""
    rule_name: str
    severity: str
    message: str
    value: float
    threshold: float
    timestamp: datetime
    resolved: bool = False

class Neo4jClusterMonitor:
    """Neo4j集群监控器"""
    
    def __init__(self, cluster_nodes: List[Dict[str, str]], 
                 username: str, password: str):
        """
        初始化集群监控器
        
        Args:
            cluster_nodes: 集群节点列表 [{'address': 'bolt://host:port', 'role': 'core'}]
            username: 认证用户名
            password: 认证密码
        """
        self.cluster_nodes = cluster_nodes
        self.username = username
        self.password = password
        self.drivers = {}
        self.alert_rules: List[AlertRule] = []
        self.active_alerts: List[Alert] = []
        self.alert_history: List[Alert] = []
        
        # 初始化驱动连接
        self._initialize_drivers()
        
        # 设置默认告警规则
        self._setup_default_alert_rules()
        
        # 配置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def _initialize_drivers(self):
        """初始化数据库驱动连接"""
        from neo4j import GraphDatabase
        
        for node in self.cluster_nodes:
            try:
                driver = GraphDatabase.driver(
                    node['address'], 
                    auth=(self.username, self.password)
                )
                self.drivers[node['address']] = driver
                self.logger.info(f"Connected to {node['address']}")
            except Exception as e:
                self.logger.error(f"Failed to connect to {node['address']}: {e}")
    
    def _setup_default_alert_rules(self):
        """设置默认告警规则"""
        default_rules = [
            AlertRule(
                name="high_cpu_usage",
                metric="cpu_usage_percent",
                threshold=80.0,
                operator=">",
                severity="warning",
                description="CPU usage is above 80%"
            ),
            AlertRule(
                name="high_memory_usage",
                metric="memory_usage_percent",
                threshold=85.0,
                operator=">",
                severity="warning",
                description="Memory usage is above 85%"
            ),
            AlertRule(
                name="low_page_cache_hit_ratio",
                metric="page_cache_hit_ratio",
                threshold=90.0,
                operator="<",
                severity="warning",
                description="Page cache hit ratio is below 90%"
            ),
            AlertRule(
                name="node_offline",
                metric="node_status",
                threshold=0,
                operator="==",
                severity="critical",
                description="Cluster node is offline"
            ),
            AlertRule(
                name="high_replication_lag",
                metric="replication_lag_ms",
                threshold=5000.0,
                operator=">",
                severity="warning",
                description="Replication lag is above 5 seconds"
            )
        ]
        
        self.alert_rules.extend(default_rules)
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """获取集群状态"""
        cluster_status = {
            'nodes': [],
            'leader_count': 0,
            'follower_count': 0,
            'read_replica_count': 0,
            'online_nodes': 0,
            'offline_nodes': 0,
            'cluster_health': 'unknown'
        }
        
        for node_config in self.cluster_nodes:
            node_address = node_config['address']
            node_info = self._get_node_status(node_address)
            
            cluster_status['nodes'].append(node_info)
            
            if node_info.status == 'ONLINE':
                cluster_status['online_nodes'] += 1
                if node_info.role == 'LEADER':
                    cluster_status['leader_count'] += 1
                elif node_info.role == 'FOLLOWER':
                    cluster_status['follower_count'] += 1
                elif node_info.role == 'READ_REPLICA':
                    cluster_status['read_replica_count'] += 1
            else:
                cluster_status['offline_nodes'] += 1
        
        # 评估集群健康状态
        if cluster_status['leader_count'] == 1 and cluster_status['offline_nodes'] == 0:
            cluster_status['cluster_health'] = 'healthy'
        elif cluster_status['leader_count'] == 1 and cluster_status['offline_nodes'] > 0:
            cluster_status['cluster_health'] = 'degraded'
        else:
            cluster_status['cluster_health'] = 'unhealthy'
        
        return cluster_status
    
    def _get_node_status(self, node_address: str) -> ClusterNode:
        """获取单个节点状态"""
        try:
            driver = self.drivers.get(node_address)
            if not driver:
                return ClusterNode(
                    id=node_address,
                    address=node_address,
                    role='UNKNOWN',
                    status='OFFLINE',
                    last_seen=datetime.now()
                )
            
            with driver.session() as session:
                # 获取节点角色和状态
                role_query = "CALL dbms.cluster.role() YIELD role RETURN role"
                result = session.run(role_query)
                role_record = result.single()
                role = role_record['role'] if role_record else 'UNKNOWN'
                
                # 获取集群概览
                overview_query = "CALL dbms.cluster.overview() YIELD id, addresses, role, groups RETURN id, addresses, role"
                result = session.run(overview_query)
                
                node_id = None
                for record in result:
                    if node_address in record['addresses']:
                        node_id = record['id']
                        break
                
                return ClusterNode(
                    id=node_id or node_address,
                    address=node_address,
                    role=role,
                    status='ONLINE',
                    last_seen=datetime.now()
                )
        
        except Exception as e:
            self.logger.error(f"Failed to get status for {node_address}: {e}")
            return ClusterNode(
                id=node_address,
                address=node_address,
                role='UNKNOWN',
                status='OFFLINE',
                last_seen=datetime.now()
            )
    
    def collect_cluster_metrics(self) -> Dict[str, Any]:
        """收集集群指标"""
        metrics = {
            'timestamp': datetime.now(),
            'cluster_status': self.get_cluster_status(),
            'node_metrics': {}
        }
        
        for node_config in self.cluster_nodes:
            node_address = node_config['address']
            node_metrics = self._collect_node_metrics(node_address)
            metrics['node_metrics'][node_address] = node_metrics
        
        return metrics
    
    def _collect_node_metrics(self, node_address: str) -> Dict[str, Any]:
        """收集单个节点指标"""
        try:
            driver = self.drivers.get(node_address)
            if not driver:
                return {'status': 'offline', 'error': 'No driver connection'}
            
            with driver.session() as session:
                metrics = {'status': 'online'}
                
                # JVM内存指标
                try:
                    jvm_query = """
                    CALL dbms.queryJmx('java.lang:type=Memory') 
                    YIELD attributes 
                    RETURN attributes.HeapMemoryUsage.used as heap_used,
                           attributes.HeapMemoryUsage.max as heap_max
                    """
                    result = session.run(jvm_query)
                    record = result.single()
                    if record:
                        heap_usage_percent = (record['heap_used'] / record['heap_max']) * 100
                        metrics['memory_usage_percent'] = heap_usage_percent
                        metrics['heap_used_mb'] = record['heap_used'] / (1024 * 1024)
                        metrics['heap_max_mb'] = record['heap_max'] / (1024 * 1024)
                except Exception as e:
                    metrics['memory_error'] = str(e)
                
                # 页面缓存指标
                try:
                    cache_query = """
                    CALL dbms.queryJmx('org.neo4j:instance=kernel#0,name=Page cache') 
                    YIELD attributes 
                    RETURN attributes.Hits as cache_hits,
                           attributes.Faults as cache_faults
                    """
                    result = session.run(cache_query)
                    record = result.single()
                    if record:
                        total_operations = record['cache_hits'] + record['cache_faults']
                        hit_ratio = (record['cache_hits'] / total_operations) * 100 if total_operations > 0 else 0
                        metrics['page_cache_hit_ratio'] = hit_ratio
                        metrics['page_cache_hits'] = record['cache_hits']
                        metrics['page_cache_faults'] = record['cache_faults']
                except Exception as e:
                    metrics['cache_error'] = str(e)
                
                # 事务指标
                try:
                    tx_query = """
                    CALL dbms.queryJmx('org.neo4j:instance=kernel#0,name=Transactions') 
                    YIELD attributes 
                    RETURN attributes.NumberOfOpenTransactions as open_transactions,
                           attributes.NumberOfCommittedTransactions as committed_transactions
                    """
                    result = session.run(tx_query)
                    record = result.single()
                    if record:
                        metrics['open_transactions'] = record['open_transactions']
                        metrics['committed_transactions'] = record['committed_transactions']
                except Exception as e:
                    metrics['transaction_error'] = str(e)
                
                return metrics
        
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def check_alert_rules(self, metrics: Dict[str, Any]) -> List[Alert]:
        """检查告警规则"""
        new_alerts = []
        
        for rule in self.alert_rules:
            if not rule.enabled:
                continue
            
            # 检查集群级别的指标
            if rule.metric == 'node_status':
                offline_nodes = metrics['cluster_status']['offline_nodes']
                if self._evaluate_condition(offline_nodes, rule.operator, rule.threshold):
                    alert = Alert(
                        rule_name=rule.name,
                        severity=rule.severity,
                        message=f"{offline_nodes} nodes are offline",
                        value=offline_nodes,
                        threshold=rule.threshold,
                        timestamp=datetime.now()
                    )
                    new_alerts.append(alert)
            
            # 检查节点级别的指标
            for node_address, node_metrics in metrics['node_metrics'].items():
                if rule.metric in node_metrics:
                    value = node_metrics[rule.metric]
                    if self._evaluate_condition(value, rule.operator, rule.threshold):
                        alert = Alert(
                            rule_name=rule.name,
                            severity=rule.severity,
                            message=f"Node {node_address}: {rule.description} (value: {value})",
                            value=value,
                            threshold=rule.threshold,
                            timestamp=datetime.now()
                        )
                        new_alerts.append(alert)
        
        return new_alerts
    
    def _evaluate_condition(self, value: float, operator: str, threshold: float) -> bool:
        """评估告警条件"""
        if operator == '>':
            return value > threshold
        elif operator == '<':
            return value < threshold
        elif operator == '>=':
            return value >= threshold
        elif operator == '<=':
            return value <= threshold
        elif operator == '==':
            return value == threshold
        elif operator == '!=':
            return value != threshold
        else:
            return False
    
    def process_alerts(self, new_alerts: List[Alert]):
        """处理新告警"""
        for alert in new_alerts:
            # 检查是否是重复告警
            existing_alert = self._find_existing_alert(alert)
            if existing_alert:
                continue
            
            # 添加到活跃告警列表
            self.active_alerts.append(alert)
            self.alert_history.append(alert)
            
            # 发送告警通知
            self._send_alert_notification(alert)
            
            self.logger.warning(f"New alert: {alert.rule_name} - {alert.message}")
    
    def _find_existing_alert(self, new_alert: Alert) -> Optional[Alert]:
        """查找现有的相同告警"""
        for alert in self.active_alerts:
            if (alert.rule_name == new_alert.rule_name and 
                not alert.resolved and
                abs((alert.timestamp - new_alert.timestamp).total_seconds()) < 300):  # 5分钟内的重复告警
                return alert
        return None
    
    def _send_alert_notification(self, alert: Alert):
        """发送告警通知"""
        # 这里可以实现多种通知方式:邮件、Slack、钉钉等
        notification_message = f"""
        告警名称: {alert.rule_name}
        严重程度: {alert.severity}
        告警信息: {alert.message}
        当前值: {alert.value}
        阈值: {alert.threshold}
        时间: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}
        """
        
        self.logger.info(f"Alert notification: {notification_message}")
        
        # 可以在这里添加实际的通知发送逻辑
        # 例如:发送邮件、调用Webhook等
    
    def resolve_alerts(self, metrics: Dict[str, Any]):
        """解决已恢复的告警"""
        for alert in self.active_alerts:
            if alert.resolved:
                continue
            
            rule = next((r for r in self.alert_rules if r.name == alert.rule_name), None)
            if not rule:
                continue
            
            # 检查告警条件是否已恢复
            current_value = self._get_current_metric_value(alert, metrics)
            if current_value is not None:
                if not self._evaluate_condition(current_value, rule.operator, rule.threshold):
                    alert.resolved = True
                    self.logger.info(f"Alert resolved: {alert.rule_name} - {alert.message}")
    
    def _get_current_metric_value(self, alert: Alert, metrics: Dict[str, Any]) -> Optional[float]:
        """获取当前指标值"""
        rule = next((r for r in self.alert_rules if r.name == alert.rule_name), None)
        if not rule:
            return None
        
        if rule.metric == 'node_status':
            return metrics['cluster_status']['offline_nodes']
        
        # 从节点指标中查找
        for node_metrics in metrics['node_metrics'].values():
            if rule.metric in node_metrics:
                return node_metrics[rule.metric]
        
        return None
    
    def get_active_alerts(self) -> List[Alert]:
        """获取活跃告警"""
        return [alert for alert in self.active_alerts if not alert.resolved]
    
    def get_alert_summary(self) -> Dict[str, Any]:
        """获取告警摘要"""
        active_alerts = self.get_active_alerts()
        
        summary = {
            'total_active_alerts': len(active_alerts),
            'critical_alerts': len([a for a in active_alerts if a.severity == 'critical']),
            'warning_alerts': len([a for a in active_alerts if a.severity == 'warning']),
            'info_alerts': len([a for a in active_alerts if a.severity == 'info']),
            'recent_alerts': sorted(self.alert_history[-10:], key=lambda x: x.timestamp, reverse=True)
        }
        
        return summary
    
    def start_monitoring(self, interval_seconds: int = 60):
        """开始监控"""
        import time
        
        self.logger.info(f"Starting cluster monitoring with {interval_seconds}s interval")
        
        try:
            while True:
                # 收集指标
                metrics = self.collect_cluster_metrics()
                
                # 检查告警规则
                new_alerts = self.check_alert_rules(metrics)
                
                # 处理新告警
                if new_alerts:
                    self.process_alerts(new_alerts)
                
                # 解决已恢复的告警
                self.resolve_alerts(metrics)
                
                # 等待下一次检查
                time.sleep(interval_seconds)
        
        except KeyboardInterrupt:
            self.logger.info("Monitoring stopped by user")
        except Exception as e:
            self.logger.error(f"Monitoring error: {e}")
        finally:
            self.close()
    
    def close(self):
        """关闭所有连接"""
        for driver in self.drivers.values():
            if driver:
                driver.close()
        self.logger.info("All connections closed")

# 使用示例
cluster_nodes = [
    {'address': 'bolt://neo4j-core-1:7687', 'role': 'core'},
    {'address': 'bolt://neo4j-core-2:7687', 'role': 'core'},
    {'address': 'bolt://neo4j-core-3:7687', 'role': 'core'},
    {'address': 'bolt://neo4j-replica-1:7687', 'role': 'read_replica'}
]

monitor = Neo4jClusterMonitor(cluster_nodes, "neo4j", "password")

# 获取集群状态
cluster_status = monitor.get_cluster_status()
print("集群状态:")
print(f"  在线节点: {cluster_status['online_nodes']}")
print(f"  离线节点: {cluster_status['offline_nodes']}")
print(f"  Leader数量: {cluster_status['leader_count']}")
print(f"  集群健康: {cluster_status['cluster_health']}")

# 收集指标
metrics = monitor.collect_cluster_metrics()
print(f"\n指标收集时间: {metrics['timestamp']}")

# 检查告警
new_alerts = monitor.check_alert_rules(metrics)
if new_alerts:
    print(f"\n发现 {len(new_alerts)} 个新告警:")
    for alert in new_alerts:
        print(f"  {alert.severity}: {alert.message}")

# 获取告警摘要
alert_summary = monitor.get_alert_summary()
print(f"\n告警摘要:")
print(f"  活跃告警: {alert_summary['total_active_alerts']}")
print(f"  严重告警: {alert_summary['critical_alerts']}")
print(f"  警告告警: {alert_summary['warning_alerts']}")

# 开始持续监控(在实际应用中)
# monitor.start_monitoring(interval_seconds=30)

monitor.close()

6.5 性能基准测试

6.5.1 基准测试框架

import time
import statistics
import concurrent.futures
from typing import List, Dict, Any, Callable
from dataclasses import dataclass
from datetime import datetime
import json
import csv

@dataclass
class BenchmarkResult:
    """基准测试结果"""
    test_name: str
    query: str
    execution_times: List[float]
    avg_time: float
    min_time: float
    max_time: float
    median_time: float
    std_dev: float
    throughput_qps: float
    success_count: int
    error_count: int
    errors: List[str]
    timestamp: datetime

class Neo4jBenchmark:
    """Neo4j性能基准测试"""
    
    def __init__(self, monitor: Neo4jPerformanceMonitor):
        self.monitor = monitor
        self.driver = monitor.driver
        self.results: List[BenchmarkResult] = []
    
    def run_single_query_benchmark(self, test_name: str, query: str, 
                                 parameters: Dict[str, Any] = None,
                                 iterations: int = 100,
                                 warmup_iterations: int = 10) -> BenchmarkResult:
        """运行单个查询的基准测试"""
        
        print(f"Running benchmark: {test_name}")
        print(f"Query: {query[:100]}..." if len(query) > 100 else f"Query: {query}")
        print(f"Iterations: {iterations}, Warmup: {warmup_iterations}")
        
        # 预热
        print("Warming up...")
        for i in range(warmup_iterations):
            try:
                with self.driver.session() as session:
                    session.run(query, parameters or {})
            except Exception as e:
                print(f"Warmup error {i+1}: {e}")
        
        # 实际测试
        print("Running benchmark...")
        execution_times = []
        errors = []
        success_count = 0
        error_count = 0
        
        start_time = time.time()
        
        for i in range(iterations):
            try:
                query_start = time.time()
                with self.driver.session() as session:
                    result = session.run(query, parameters or {})
                    list(result)  # 消费结果
                query_end = time.time()
                
                execution_times.append(query_end - query_start)
                success_count += 1
                
                if (i + 1) % 10 == 0:
                    print(f"  Progress: {i + 1}/{iterations}")
            
            except Exception as e:
                error_count += 1
                errors.append(str(e))
                print(f"  Error {error_count}: {e}")
        
        end_time = time.time()
        total_time = end_time - start_time
        
        # 计算统计信息
        if execution_times:
            avg_time = statistics.mean(execution_times)
            min_time = min(execution_times)
            max_time = max(execution_times)
            median_time = statistics.median(execution_times)
            std_dev = statistics.stdev(execution_times) if len(execution_times) > 1 else 0
            throughput_qps = success_count / total_time
        else:
            avg_time = min_time = max_time = median_time = std_dev = throughput_qps = 0
        
        result = BenchmarkResult(
            test_name=test_name,
            query=query,
            execution_times=execution_times,
            avg_time=avg_time,
            min_time=min_time,
            max_time=max_time,
            median_time=median_time,
            std_dev=std_dev,
            throughput_qps=throughput_qps,
            success_count=success_count,
            error_count=error_count,
            errors=errors[:10],  # 只保留前10个错误
            timestamp=datetime.now()
        )
        
        self.results.append(result)
        self._print_benchmark_result(result)
        
        return result
    
    def run_concurrent_benchmark(self, test_name: str, query: str,
                               parameters: Dict[str, Any] = None,
                               concurrent_users: int = 10,
                               queries_per_user: int = 10) -> BenchmarkResult:
        """运行并发基准测试"""
        
        print(f"Running concurrent benchmark: {test_name}")
        print(f"Concurrent users: {concurrent_users}, Queries per user: {queries_per_user}")
        
        def run_user_queries(user_id: int) -> List[float]:
            """单个用户的查询执行"""
            user_times = []
            user_errors = []
            
            for i in range(queries_per_user):
                try:
                    start_time = time.time()
                    with self.driver.session() as session:
                        result = session.run(query, parameters or {})
                        list(result)
                    end_time = time.time()
                    
                    user_times.append(end_time - start_time)
                
                except Exception as e:
                    user_errors.append(str(e))
            
            return user_times, user_errors
        
        # 并发执行
        start_time = time.time()
        all_execution_times = []
        all_errors = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=concurrent_users) as executor:
            futures = [executor.submit(run_user_queries, i) for i in range(concurrent_users)]
            
            for future in concurrent.futures.as_completed(futures):
                try:
                    user_times, user_errors = future.result()
                    all_execution_times.extend(user_times)
                    all_errors.extend(user_errors)
                except Exception as e:
                    all_errors.append(str(e))
        
        end_time = time.time()
        total_time = end_time - start_time
        
        # 计算统计信息
        success_count = len(all_execution_times)
        error_count = len(all_errors)
        
        if all_execution_times:
            avg_time = statistics.mean(all_execution_times)
            min_time = min(all_execution_times)
            max_time = max(all_execution_times)
            median_time = statistics.median(all_execution_times)
            std_dev = statistics.stdev(all_execution_times) if len(all_execution_times) > 1 else 0
            throughput_qps = success_count / total_time
        else:
            avg_time = min_time = max_time = median_time = std_dev = throughput_qps = 0
        
        result = BenchmarkResult(
            test_name=test_name,
            query=query,
            execution_times=all_execution_times,
            avg_time=avg_time,
            min_time=min_time,
            max_time=max_time,
            median_time=median_time,
            std_dev=std_dev,
            throughput_qps=throughput_qps,
            success_count=success_count,
            error_count=error_count,
            errors=all_errors[:10],
            timestamp=datetime.now()
        )
        
        self.results.append(result)
        self._print_benchmark_result(result)
        
        return result
    
    def _print_benchmark_result(self, result: BenchmarkResult):
        """打印基准测试结果"""
        print(f"\n=== Benchmark Result: {result.test_name} ===")
        print(f"Success: {result.success_count}, Errors: {result.error_count}")
        print(f"Average time: {result.avg_time:.4f}s")
        print(f"Min time: {result.min_time:.4f}s")
        print(f"Max time: {result.max_time:.4f}s")
        print(f"Median time: {result.median_time:.4f}s")
        print(f"Std deviation: {result.std_dev:.4f}s")
        print(f"Throughput: {result.throughput_qps:.2f} QPS")
        
        if result.errors:
            print(f"Sample errors: {result.errors[:3]}")
    
    def run_benchmark_suite(self) -> Dict[str, BenchmarkResult]:
        """运行完整的基准测试套件"""
        
        print("Starting Neo4j Benchmark Suite")
        print("=" * 50)
        
        benchmark_tests = {
            # 基础查询测试
            'simple_node_lookup': {
                'query': 'MATCH (n:Person {id: $id}) RETURN n',
                'parameters': {'id': 'person_1'},
                'iterations': 1000
            },
            
            # 关系遍历测试
            'relationship_traversal': {
                'query': 'MATCH (p:Person {id: $id})-[:KNOWS]->(friend) RETURN friend.name',
                'parameters': {'id': 'person_1'},
                'iterations': 500
            },
            
            # 变长路径测试
            'variable_length_path': {
                'query': 'MATCH (p:Person {id: $id})-[:KNOWS*1..3]->(friend) RETURN DISTINCT friend.name',
                'parameters': {'id': 'person_1'},
                'iterations': 100
            },
            
            # 聚合查询测试
            'aggregation_query': {
                'query': 'MATCH (p:Person)-[:KNOWS]->(friend) RETURN p.name, count(friend) as friend_count ORDER BY friend_count DESC LIMIT 10',
                'parameters': {},
                'iterations': 50
            },
            
            # 复杂连接测试
            'complex_join': {
                'query': '''MATCH (p:Person)-[:WORKS_FOR]->(c:Company)
                           MATCH (p)-[:KNOWS]->(colleague)-[:WORKS_FOR]->(c)
                           RETURN p.name, c.name, count(colleague) as colleagues
                           ORDER BY colleagues DESC LIMIT 20''',
                'parameters': {},
                'iterations': 20
            }
        }
        
        results = {}
        
        for test_name, test_config in benchmark_tests.items():
            try:
                result = self.run_single_query_benchmark(
                    test_name=test_name,
                    query=test_config['query'],
                    parameters=test_config['parameters'],
                    iterations=test_config['iterations']
                )
                results[test_name] = result
                
                # 短暂休息
                time.sleep(1)
                
            except Exception as e:
                print(f"Failed to run benchmark {test_name}: {e}")
        
        # 并发测试
        try:
            concurrent_result = self.run_concurrent_benchmark(
                test_name='concurrent_node_lookup',
                query='MATCH (n:Person {id: $id}) RETURN n',
                parameters={'id': 'person_1'},
                concurrent_users=10,
                queries_per_user=50
            )
            results['concurrent_test'] = concurrent_result
        except Exception as e:
            print(f"Failed to run concurrent benchmark: {e}")
        
        return results
    
    def compare_benchmarks(self, baseline_results: Dict[str, BenchmarkResult],
                         current_results: Dict[str, BenchmarkResult]) -> Dict[str, Any]:
        """比较基准测试结果"""
        
        comparison = {
            'improvements': [],
            'regressions': [],
            'summary': {}
        }
        
        for test_name in baseline_results:
            if test_name not in current_results:
                continue
            
            baseline = baseline_results[test_name]
            current = current_results[test_name]
            
            # 计算性能变化
            time_change = ((current.avg_time - baseline.avg_time) / baseline.avg_time) * 100
            throughput_change = ((current.throughput_qps - baseline.throughput_qps) / baseline.throughput_qps) * 100
            
            test_comparison = {
                'test_name': test_name,
                'baseline_avg_time': baseline.avg_time,
                'current_avg_time': current.avg_time,
                'time_change_percent': time_change,
                'baseline_throughput': baseline.throughput_qps,
                'current_throughput': current.throughput_qps,
                'throughput_change_percent': throughput_change
            }
            
            if time_change < -5:  # 性能提升超过5%
                comparison['improvements'].append(test_comparison)
            elif time_change > 5:  # 性能下降超过5%
                comparison['regressions'].append(test_comparison)
        
        # 生成摘要
        comparison['summary'] = {
            'total_tests': len(baseline_results),
            'improvements_count': len(comparison['improvements']),
            'regressions_count': len(comparison['regressions']),
            'stable_count': len(baseline_results) - len(comparison['improvements']) - len(comparison['regressions'])
        }
        
        return comparison
    
    def export_results(self, filename: str, format: str = 'json'):
        """导出基准测试结果"""
        
        if format == 'json':
            with open(filename, 'w') as f:
                results_data = []
                for result in self.results:
                    result_dict = {
                        'test_name': result.test_name,
                        'query': result.query,
                        'avg_time': result.avg_time,
                        'min_time': result.min_time,
                        'max_time': result.max_time,
                        'median_time': result.median_time,
                        'std_dev': result.std_dev,
                        'throughput_qps': result.throughput_qps,
                        'success_count': result.success_count,
                        'error_count': result.error_count,
                        'timestamp': result.timestamp.isoformat()
                    }
                    results_data.append(result_dict)
                
                json.dump(results_data, f, indent=2)
        
        elif format == 'csv':
            with open(filename, 'w', newline='') as f:
                writer = csv.writer(f)
                writer.writerow([
                    'test_name', 'avg_time', 'min_time', 'max_time', 'median_time',
                    'std_dev', 'throughput_qps', 'success_count', 'error_count', 'timestamp'
                ])
                
                for result in self.results:
                    writer.writerow([
                        result.test_name, result.avg_time, result.min_time, result.max_time,
                        result.median_time, result.std_dev, result.throughput_qps,
                        result.success_count, result.error_count, result.timestamp.isoformat()
                    ])
        
        print(f"Results exported to {filename}")
    
    def generate_performance_report(self) -> str:
        """生成性能报告"""
        
        if not self.results:
            return "No benchmark results available"
        
        report = []
        report.append("Neo4j Performance Benchmark Report")
        report.append("=" * 40)
        report.append(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        report.append(f"Total tests: {len(self.results)}")
        report.append("")
        
        # 按性能排序
        sorted_results = sorted(self.results, key=lambda x: x.avg_time)
        
        report.append("Test Results (sorted by average time):")
        report.append("-" * 40)
        
        for result in sorted_results:
            report.append(f"Test: {result.test_name}")
            report.append(f"  Average time: {result.avg_time:.4f}s")
            report.append(f"  Throughput: {result.throughput_qps:.2f} QPS")
            report.append(f"  Success rate: {result.success_count / (result.success_count + result.error_count) * 100:.1f}%")
            report.append("")
        
        # 性能摘要
        avg_times = [r.avg_time for r in self.results]
        throughputs = [r.throughput_qps for r in self.results]
        
        report.append("Performance Summary:")
        report.append("-" * 20)
        report.append(f"Average query time: {statistics.mean(avg_times):.4f}s")
        report.append(f"Fastest query: {min(avg_times):.4f}s")
        report.append(f"Slowest query: {max(avg_times):.4f}s")
        report.append(f"Average throughput: {statistics.mean(throughputs):.2f} QPS")
        report.append(f"Peak throughput: {max(throughputs):.2f} QPS")
        
        return "\n".join(report)

# 使用示例
benchmark = Neo4jBenchmark(monitor)

# 运行单个基准测试
result = benchmark.run_single_query_benchmark(
    test_name="simple_lookup",
    query="MATCH (n:Person {id: $id}) RETURN n",
    parameters={'id': 'person_1'},
    iterations=100
)

# 运行并发测试
concurrent_result = benchmark.run_concurrent_benchmark(
    test_name="concurrent_lookup",
    query="MATCH (n:Person {id: $id}) RETURN n",
    parameters={'id': 'person_1'},
    concurrent_users=5,
    queries_per_user=20
)

# 运行完整测试套件
# suite_results = benchmark.run_benchmark_suite()

# 导出结果
benchmark.export_results('benchmark_results.json', 'json')
benchmark.export_results('benchmark_results.csv', 'csv')

# 生成报告
report = benchmark.generate_performance_report()
print("\n" + report)

6.6 章节总结

本章详细介绍了Neo4j的性能优化与监控技术,主要内容包括:

核心知识点

  1. 性能监控基础

    • 查询性能分析和执行计划解释
    • 系统指标收集和数据库性能监控
    • 慢查询识别和性能瓶颈分析
  2. 索引优化策略

    • 查询模式分析和索引建议生成
    • 索引使用情况监控和优化计划制定
    • 复合索引和属性索引的最佳实践
  3. 查询优化技术

    • Cypher查询复杂度分析
    • 查询重写和性能改进建议
    • 基准测试和查询变体比较
  4. 内存和存储优化

    • JVM内存配置和页面缓存优化
    • 垃圾回收性能监控
    • 存储使用分析和压缩策略
  5. 集群监控与告警

    • 集群健康状态监控
    • 告警规则配置和通知机制
    • 节点状态跟踪和故障检测
  6. 性能基准测试

    • 单查询和并发性能测试
    • 基准测试套件和结果比较
    • 性能报告生成和结果导出

最佳实践

  1. 监控策略

    • 建立全面的性能监控体系
    • 设置合理的告警阈值
    • 定期进行性能基准测试
  2. 优化方法

    • 优先优化高频查询
    • 合理配置内存和缓存
    • 定期维护索引和清理数据
  3. 运维管理

    • 建立性能基线和趋势分析
    • 实施预防性维护策略
    • 保持系统配置的最新最优

练习题

  1. 基础练习

    • 实现一个简单的查询性能监控器
    • 分析给定查询的执行计划和优化建议
    • 配置基本的内存设置
  2. 进阶练习

    • 设计一个完整的集群监控系统
    • 实现自动化的索引优化建议
    • 创建性能基准测试套件
  3. 实战项目

    • 为生产环境设计监控告警系统
    • 实施全面的性能优化方案
    • 建立性能回归测试流程

通过本章的学习,你应该能够: - 建立完整的Neo4j性能监控体系 - 识别和解决常见的性能问题 - 优化查询、索引和系统配置 - 实施有效的集群监控和告警机制 - 进行科学的性能基准测试和分析

这些技能对于维护高性能的Neo4j生产环境至关重要。’ in storage_info and ‘averages’ in storage_info: avg_node_size = storage_info[‘averages’][‘avg_node_size_bytes’] if avg_node_size > 1000: # 节点平均大小超过1KB opportunities.append({ ‘type’: ‘node_size_optimization’, ‘priority’: ‘high’, ‘description’: f’Average node size is {avg_node_size:.0f} bytes’, ‘recommendation’: ‘Consider moving large properties to separate nodes or external storage’ })

    return opportunities

def suggest_compression_strategies(self) -> Dict[str, Any]:
    """建议压缩策略"""

    storage_info = self.analyze_storage_usage()

    strategies = {
        'property_compression': [],
        'relationship_optimization': [],
        'index_optimization': []
    }

    if 'store_sizes' in storage_info:
        total_size = storage_info['store_sizes']['total_store_mb']

        # 属性压缩建议
        if storage_info['store_sizes']['string_store_mb'] > 100:  # 字符串存储超过100MB
            strategies['property_compression'].append({
                'type': 'string_compression',
                'description': 'Large string store detected',
                'actions': [
                    'Use shorter property names',
                    'Normalize repeated string values',
                    'Consider using integer IDs instead of string identifiers',
                    'Implement string interning for common values'
                ]
            })

        # 关系优化建议
        if storage_info['store_sizes']['relationship_store_mb'] > total_size * 0.4:
            strategies['relationship_optimization'].append({
                'type': 'relationship_compression',
                'description': 'Relationships consume large portion of storage',
                'actions': [
                    'Review relationship properties for necessity',
                    'Consider using relationship types instead of properties where possible',
                    'Implement relationship batching for similar connections'
                ]
            })

        # 索引优化建议
        strategies['index_optimization'].append({
            'type': 'index_maintenance',
            'description': 'Regular index maintenance',
            'actions': [
                'Drop unused indexes',
                'Rebuild fragmented indexes',
                'Consider composite indexes for multi-property queries'
            ]
        })

    return strategies

def generate_storage_optimization_report(self) -> Dict[str, Any]:
    """生成存储优化报告"""

    storage_usage = self.analyze_storage_usage()
    property_usage = self.analyze_property_usage()
    opportunities = self.identify_storage_optimization_opportunities()
    compression_strategies = self.suggest_compression_strategies()

    # 计算存储效率指标
    efficiency_metrics = {}
    if 'data_stats' in storage_usage and 'store_sizes' in storage_usage:
        total_entities = storage_usage['data_stats']['node_count'] + storage_usage['data_stats']['relationship_count']
        total_size_mb = storage_usage['store_sizes']['total_store_mb']

        efficiency_metrics = {
            'entities_per_mb': total_entities / total_size_mb if total_size_mb > 0 else 0,
            'storage_density': 'high' if total_entities / total_size_mb > 10000 else 'medium' if total_entities / total_size_mb > 1000 else 'low'
        }

    return {
        'storage_usage': storage_usage,
        'property_usage': property_usage,
        'optimization_opportunities': opportunities,
        'compression_strategies': compression_strategies,
        'efficiency_metrics': efficiency_metrics,
        'summary': self._generate_storage_summary(opportunities, efficiency_metrics)
    }

def _generate_storage_summary(self, opportunities: List[Dict[str, Any]], 
                            efficiency_metrics: Dict[str, Any]) -> str:
    """生成存储优化总结"""

    high_priority_count = len([o for o in opportunities if o['priority'] == 'high'])

    if high_priority_count > 0:
        return f"Found {high_priority_count} high-priority storage optimization opportunities"
    elif len(opportunities) > 0:
        return f"Found {len(opportunities)} storage optimization opportunities"
    elif efficiency_metrics.get('storage_density') == 'low':
        return "Storage density is low - consider data model optimization"
    else:
        return "Storage usage appears to be efficient"

使用示例

storage_optimizer = Neo4jStorageOptimizer(monitor)

分析存储使用

storage_usage = storage_optimizer.analyze_storage_usage() print(“存储使用分析:”) if ‘store_sizes’ in storage_usage: print(f” 总存储大小: {storage_usage[‘store_sizes’][‘total_store_mb’]:.1f}MB”) print(f” 字符串存储: {storage_usage[‘store_sizes’][‘string_store_mb’]:.1f}MB”) print(f” 关系存储: {storage_usage[‘store_sizes’][‘relationship_store_mb’]:.1f}MB”)

if ‘data_stats