7.1 Neo4j集群架构

7.1.1 集群架构概述

Neo4j企业版提供了因果集群(Causal Cluster)功能,支持高可用性和水平扩展。集群架构包括:

  • Core Servers(核心服务器):负责数据写入和集群协调
  • Read Replicas(只读副本):提供只读查询服务
  • Raft协议:确保数据一致性和领导者选举

7.1.2 集群组件

from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum
from datetime import datetime
import json
import subprocess
import time

class NodeRole(Enum):
    """节点角色"""
    LEADER = "LEADER"
    FOLLOWER = "FOLLOWER"
    READ_REPLICA = "READ_REPLICA"
    UNKNOWN = "UNKNOWN"

class NodeStatus(Enum):
    """节点状态"""
    ONLINE = "ONLINE"
    OFFLINE = "OFFLINE"
    STARTING = "STARTING"
    STOPPING = "STOPPING"
    UNKNOWN = "UNKNOWN"

@dataclass
class ClusterNode:
    """集群节点信息"""
    id: str
    address: str
    role: NodeRole
    status: NodeStatus
    last_seen: datetime
    database_count: int = 0
    lag_ms: Optional[int] = None
    
class Neo4jClusterManager:
    """Neo4j集群管理器"""
    
    def __init__(self, cluster_config: Dict[str, Any]):
        """
        初始化集群管理器
        
        Args:
            cluster_config: 集群配置信息
        """
        self.cluster_config = cluster_config
        self.core_servers = cluster_config.get('core_servers', [])
        self.read_replicas = cluster_config.get('read_replicas', [])
        self.cluster_name = cluster_config.get('cluster_name', 'neo4j-cluster')
        self.username = cluster_config.get('username', 'neo4j')
        self.password = cluster_config.get('password', 'password')
        
        # 连接池
        self.drivers = {}
        self._initialize_connections()
    
    def _initialize_connections(self):
        """初始化连接"""
        from neo4j import GraphDatabase
        
        all_servers = self.core_servers + self.read_replicas
        
        for server in all_servers:
            try:
                driver = GraphDatabase.driver(
                    server['bolt_uri'],
                    auth=(self.username, self.password)
                )
                self.drivers[server['name']] = driver
                print(f"Connected to {server['name']} at {server['bolt_uri']}")
            except Exception as e:
                print(f"Failed to connect to {server['name']}: {e}")
    
    def get_cluster_overview(self) -> Dict[str, Any]:
        """获取集群概览"""
        cluster_info = {
            'cluster_name': self.cluster_name,
            'nodes': [],
            'leader_count': 0,
            'follower_count': 0,
            'read_replica_count': 0,
            'online_count': 0,
            'offline_count': 0,
            'total_databases': 0
        }
        
        # 检查每个节点
        for server in self.core_servers + self.read_replicas:
            node_info = self._get_node_info(server)
            cluster_info['nodes'].append(node_info)
            
            if node_info.status == NodeStatus.ONLINE:
                cluster_info['online_count'] += 1
                cluster_info['total_databases'] += node_info.database_count
                
                if node_info.role == NodeRole.LEADER:
                    cluster_info['leader_count'] += 1
                elif node_info.role == NodeRole.FOLLOWER:
                    cluster_info['follower_count'] += 1
                elif node_info.role == NodeRole.READ_REPLICA:
                    cluster_info['read_replica_count'] += 1
            else:
                cluster_info['offline_count'] += 1
        
        return cluster_info
    
    def _get_node_info(self, server_config: Dict[str, str]) -> ClusterNode:
        """获取节点信息"""
        server_name = server_config['name']
        
        try:
            driver = self.drivers.get(server_name)
            if not driver:
                return ClusterNode(
                    id=server_name,
                    address=server_config['bolt_uri'],
                    role=NodeRole.UNKNOWN,
                    status=NodeStatus.OFFLINE,
                    last_seen=datetime.now()
                )
            
            with driver.session() as session:
                # 获取节点角色
                role_result = session.run("CALL dbms.cluster.role() YIELD role RETURN role")
                role_record = role_result.single()
                role_str = role_record['role'] if role_record else 'UNKNOWN'
                
                try:
                    role = NodeRole(role_str)
                except ValueError:
                    role = NodeRole.UNKNOWN
                
                # 获取数据库数量
                db_result = session.run("SHOW DATABASES")
                database_count = len(list(db_result))
                
                # 获取集群概览(用于获取节点ID)
                overview_result = session.run(
                    "CALL dbms.cluster.overview() YIELD id, addresses, role RETURN id, addresses"
                )
                
                node_id = server_name
                for record in overview_result:
                    if server_config['bolt_uri'] in record['addresses']:
                        node_id = record['id']
                        break
                
                return ClusterNode(
                    id=node_id,
                    address=server_config['bolt_uri'],
                    role=role,
                    status=NodeStatus.ONLINE,
                    last_seen=datetime.now(),
                    database_count=database_count
                )
        
        except Exception as e:
            print(f"Error getting info for {server_name}: {e}")
            return ClusterNode(
                id=server_name,
                address=server_config['bolt_uri'],
                role=NodeRole.UNKNOWN,
                status=NodeStatus.OFFLINE,
                last_seen=datetime.now()
            )
    
    def check_cluster_health(self) -> Dict[str, Any]:
        """检查集群健康状态"""
        overview = self.get_cluster_overview()
        
        health_status = {
            'overall_health': 'unknown',
            'issues': [],
            'recommendations': [],
            'cluster_info': overview
        }
        
        # 检查领导者数量
        if overview['leader_count'] == 0:
            health_status['issues'].append('No leader found in cluster')
            health_status['overall_health'] = 'critical'
        elif overview['leader_count'] > 1:
            health_status['issues'].append('Multiple leaders detected')
            health_status['overall_health'] = 'critical'
        
        # 检查核心服务器数量
        core_online = overview['leader_count'] + overview['follower_count']
        core_total = len(self.core_servers)
        
        if core_online < (core_total // 2 + 1):
            health_status['issues'].append('Insufficient core servers for quorum')
            health_status['overall_health'] = 'critical'
        
        # 检查离线节点
        if overview['offline_count'] > 0:
            health_status['issues'].append(f"{overview['offline_count']} nodes are offline")
            if health_status['overall_health'] != 'critical':
                health_status['overall_health'] = 'warning'
        
        # 生成建议
        if overview['offline_count'] > 0:
            health_status['recommendations'].append('Investigate and restart offline nodes')
        
        if overview['read_replica_count'] == 0 and len(self.read_replicas) > 0:
            health_status['recommendations'].append('Check read replica connectivity')
        
        # 如果没有问题,标记为健康
        if not health_status['issues']:
            health_status['overall_health'] = 'healthy'
        
        return health_status
    
    def perform_leader_election(self) -> Dict[str, Any]:
        """执行领导者选举(仅在必要时)"""
        overview = self.get_cluster_overview()
        
        if overview['leader_count'] == 1:
            return {
                'success': True,
                'message': 'Leader already exists, no election needed',
                'leader': next(node for node in overview['nodes'] if node.role == NodeRole.LEADER)
            }
        
        # 尝试触发领导者选举
        for server in self.core_servers:
            try:
                driver = self.drivers.get(server['name'])
                if driver:
                    with driver.session() as session:
                        # 检查是否可以执行管理操作
                        session.run("CALL dbms.cluster.overview()")
                        
                        # 等待选举完成
                        time.sleep(5)
                        
                        # 重新检查集群状态
                        new_overview = self.get_cluster_overview()
                        if new_overview['leader_count'] == 1:
                            leader = next(node for node in new_overview['nodes'] if node.role == NodeRole.LEADER)
                            return {
                                'success': True,
                                'message': 'Leader election completed',
                                'leader': leader
                            }
            except Exception as e:
                print(f"Failed to trigger election on {server['name']}: {e}")
        
        return {
            'success': False,
            'message': 'Failed to complete leader election',
            'leader': None
        }
    
    def add_read_replica(self, replica_config: Dict[str, str]) -> Dict[str, Any]:
        """添加只读副本"""
        try:
            # 验证配置
            required_fields = ['name', 'bolt_uri', 'http_uri']
            for field in required_fields:
                if field not in replica_config:
                    return {
                        'success': False,
                        'message': f'Missing required field: {field}'
                    }
            
            # 尝试连接新副本
            from neo4j import GraphDatabase
            
            driver = GraphDatabase.driver(
                replica_config['bolt_uri'],
                auth=(self.username, self.password)
            )
            
            # 测试连接
            with driver.session() as session:
                session.run("RETURN 1")
            
            # 添加到配置
            self.read_replicas.append(replica_config)
            self.drivers[replica_config['name']] = driver
            
            return {
                'success': True,
                'message': f'Read replica {replica_config["name"]} added successfully'
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f'Failed to add read replica: {e}'
            }
    
    def remove_node(self, node_name: str) -> Dict[str, Any]:
        """移除节点"""
        try:
            # 查找节点
            node_config = None
            is_core = False
            
            for server in self.core_servers:
                if server['name'] == node_name:
                    node_config = server
                    is_core = True
                    break
            
            if not node_config:
                for server in self.read_replicas:
                    if server['name'] == node_name:
                        node_config = server
                        break
            
            if not node_config:
                return {
                    'success': False,
                    'message': f'Node {node_name} not found'
                }
            
            # 检查是否可以安全移除
            if is_core:
                overview = self.get_cluster_overview()
                remaining_cores = overview['leader_count'] + overview['follower_count'] - 1
                
                if remaining_cores < (len(self.core_servers) // 2 + 1):
                    return {
                        'success': False,
                        'message': 'Cannot remove core server: would break quorum'
                    }
            
            # 关闭连接
            if node_name in self.drivers:
                self.drivers[node_name].close()
                del self.drivers[node_name]
            
            # 从配置中移除
            if is_core:
                self.core_servers = [s for s in self.core_servers if s['name'] != node_name]
            else:
                self.read_replicas = [s for s in self.read_replicas if s['name'] != node_name]
            
            return {
                'success': True,
                'message': f'Node {node_name} removed successfully'
            }
        
        except Exception as e:
            return {
                'success': False,
                'message': f'Failed to remove node: {e}'
            }
    
    def get_cluster_metrics(self) -> Dict[str, Any]:
        """获取集群指标"""
        metrics = {
            'timestamp': datetime.now(),
            'cluster_overview': self.get_cluster_overview(),
            'node_metrics': {},
            'cluster_health': self.check_cluster_health()
        }
        
        # 收集每个节点的详细指标
        for server in self.core_servers + self.read_replicas:
            node_metrics = self._collect_node_metrics(server)
            metrics['node_metrics'][server['name']] = node_metrics
        
        return metrics
    
    def _collect_node_metrics(self, server_config: Dict[str, str]) -> Dict[str, Any]:
        """收集节点指标"""
        server_name = server_config['name']
        
        try:
            driver = self.drivers.get(server_name)
            if not driver:
                return {'status': 'offline', 'error': 'No connection'}
            
            with driver.session() as session:
                metrics = {'status': 'online'}
                
                # 基本信息
                try:
                    version_result = session.run("CALL dbms.components() YIELD name, versions RETURN name, versions")
                    for record in version_result:
                        if record['name'] == 'Neo4j Kernel':
                            metrics['neo4j_version'] = record['versions'][0]
                            break
                except Exception:
                    pass
                
                # 数据库信息
                try:
                    db_result = session.run("SHOW DATABASES")
                    databases = list(db_result)
                    metrics['database_count'] = len(databases)
                    metrics['databases'] = [db['name'] for db in databases]
                except Exception:
                    pass
                
                # 事务信息
                try:
                    tx_result = session.run(
                        "CALL dbms.listTransactions() YIELD transactionId, currentQuery, status RETURN count(*) as tx_count"
                    )
                    tx_record = tx_result.single()
                    if tx_record:
                        metrics['active_transactions'] = tx_record['tx_count']
                except Exception:
                    pass
                
                # 连接信息
                try:
                    conn_result = session.run(
                        "CALL dbms.listConnections() YIELD connectionId, connectTime, connector RETURN count(*) as conn_count"
                    )
                    conn_record = conn_result.single()
                    if conn_record:
                        metrics['active_connections'] = conn_record['conn_count']
                except Exception:
                    pass
                
                return metrics
        
        except Exception as e:
            return {'status': 'error', 'error': str(e)}
    
    def generate_cluster_report(self) -> str:
        """生成集群报告"""
        metrics = self.get_cluster_metrics()
        overview = metrics['cluster_overview']
        health = metrics['cluster_health']
        
        report = []
        report.append(f"Neo4j Cluster Report - {self.cluster_name}")
        report.append("=" * 50)
        report.append(f"Generated: {metrics['timestamp'].strftime('%Y-%m-%d %H:%M:%S')}")
        report.append("")
        
        # 集群概览
        report.append("Cluster Overview:")
        report.append("-" * 20)
        report.append(f"Total Nodes: {len(overview['nodes'])}")
        report.append(f"Online Nodes: {overview['online_count']}")
        report.append(f"Offline Nodes: {overview['offline_count']}")
        report.append(f"Leaders: {overview['leader_count']}")
        report.append(f"Followers: {overview['follower_count']}")
        report.append(f"Read Replicas: {overview['read_replica_count']}")
        report.append(f"Total Databases: {overview['total_databases']}")
        report.append("")
        
        # 健康状态
        report.append("Health Status:")
        report.append("-" * 15)
        report.append(f"Overall Health: {health['overall_health'].upper()}")
        
        if health['issues']:
            report.append("Issues:")
            for issue in health['issues']:
                report.append(f"  - {issue}")
        
        if health['recommendations']:
            report.append("Recommendations:")
            for rec in health['recommendations']:
                report.append(f"  - {rec}")
        
        report.append("")
        
        # 节点详情
        report.append("Node Details:")
        report.append("-" * 15)
        
        for node in overview['nodes']:
            report.append(f"Node: {node.id}")
            report.append(f"  Address: {node.address}")
            report.append(f"  Role: {node.role.value}")
            report.append(f"  Status: {node.status.value}")
            report.append(f"  Last Seen: {node.last_seen.strftime('%Y-%m-%d %H:%M:%S')}")
            
            node_metrics = metrics['node_metrics'].get(node.id, {})
            if 'database_count' in node_metrics:
                report.append(f"  Databases: {node_metrics['database_count']}")
            if 'active_transactions' in node_metrics:
                report.append(f"  Active Transactions: {node_metrics['active_transactions']}")
            if 'active_connections' in node_metrics:
                report.append(f"  Active Connections: {node_metrics['active_connections']}")
            
            report.append("")
        
        return "\n".join(report)
    
    def close(self):
        """关闭所有连接"""
        for driver in self.drivers.values():
            if driver:
                driver.close()
        print("All cluster connections closed")

# 使用示例
cluster_config = {
    'cluster_name': 'production-cluster',
    'username': 'neo4j',
    'password': 'secure_password',
    'core_servers': [
        {
            'name': 'core-1',
            'bolt_uri': 'bolt://neo4j-core-1:7687',
            'http_uri': 'http://neo4j-core-1:7474'
        },
        {
            'name': 'core-2',
            'bolt_uri': 'bolt://neo4j-core-2:7687',
            'http_uri': 'http://neo4j-core-2:7474'
        },
        {
            'name': 'core-3',
            'bolt_uri': 'bolt://neo4j-core-3:7687',
            'http_uri': 'http://neo4j-core-3:7474'
        }
    ],
    'read_replicas': [
        {
            'name': 'replica-1',
            'bolt_uri': 'bolt://neo4j-replica-1:7687',
            'http_uri': 'http://neo4j-replica-1:7474'
        },
        {
            'name': 'replica-2',
            'bolt_uri': 'bolt://neo4j-replica-2:7687',
            'http_uri': 'http://neo4j-replica-2:7474'
        }
    ]
}

cluster_manager = Neo4jClusterManager(cluster_config)

# 获取集群概览
overview = cluster_manager.get_cluster_overview()
print(f"集群名称: {overview['cluster_name']}")
print(f"在线节点: {overview['online_count']}/{len(overview['nodes'])}")
print(f"Leader: {overview['leader_count']}, Followers: {overview['follower_count']}")
print(f"只读副本: {overview['read_replica_count']}")

# 检查集群健康
health = cluster_manager.check_cluster_health()
print(f"\n集群健康状态: {health['overall_health']}")
if health['issues']:
    print("问题:")
    for issue in health['issues']:
        print(f"  - {issue}")

# 生成集群报告
report = cluster_manager.generate_cluster_report()
print("\n" + report)

cluster_manager.close()

7.2 集群部署配置

7.2.1 核心服务器配置

import os
import yaml
from typing import Dict, List, Any
from pathlib import Path

class Neo4jClusterDeployer:
    """Neo4j集群部署器"""
    
    def __init__(self, deployment_config: Dict[str, Any]):
        self.config = deployment_config
        self.cluster_name = deployment_config.get('cluster_name', 'neo4j-cluster')
        self.base_dir = Path(deployment_config.get('base_dir', '/opt/neo4j'))
        
    def generate_core_server_config(self, server_id: int, server_config: Dict[str, Any]) -> str:
        """生成核心服务器配置"""
        
        config_template = f"""
# Neo4j Core Server Configuration - {server_config['name']}
# Generated automatically - do not edit manually

#*****************************************************************
# Network connector configuration
#*****************************************************************

# Bolt connector
dbms.connector.bolt.enabled=true
dbms.connector.bolt.listen_address={server_config['host']}:7687
dbms.connector.bolt.advertised_address={server_config['advertised_host']}:7687

# HTTP connector
dbms.connector.http.enabled=true
dbms.connector.http.listen_address={server_config['host']}:7474
dbms.connector.http.advertised_address={server_config['advertised_host']}:7474

# HTTPS connector
dbms.connector.https.enabled=true
dbms.connector.https.listen_address={server_config['host']}:7473
dbms.connector.https.advertised_address={server_config['advertised_host']}:7473

#*****************************************************************
# Causal Clustering Configuration
#*****************************************************************

# Enable causal clustering
dbms.mode=CORE

# Cluster identity
dbms.cluster.discovery.type=LIST
dbms.cluster.discovery.endpoints={','.join(self._get_discovery_endpoints())}

# Server identity
dbms.cluster.server_id={server_id}
dbms.cluster.advertised_address={server_config['advertised_host']}:5000
dbms.cluster.listen_address={server_config['host']}:5000

# Raft settings
dbms.cluster.raft.advertised_address={server_config['advertised_host']}:7000
dbms.cluster.raft.listen_address={server_config['host']}:7000

# Cluster settings
dbms.cluster.minimum_core_cluster_size_at_formation={self.config.get('min_core_size', 3)}
dbms.cluster.minimum_core_cluster_size_at_runtime={self.config.get('min_core_runtime', 3)}

# Transaction log settings
dbms.cluster.catchup.client_inactivity_timeout=20s
dbms.cluster.leader_election_timeout=7s

#*****************************************************************
# Memory Settings
#*****************************************************************

# Java heap size
dbms.memory.heap.initial_size={server_config.get('heap_initial', '1G')}
dbms.memory.heap.max_size={server_config.get('heap_max', '2G')}

# Page cache
dbms.memory.pagecache.size={server_config.get('pagecache_size', '1G')}

#*****************************************************************
# Security Configuration
#*****************************************************************

# Authentication
dbms.security.auth_enabled=true
dbms.security.auth_provider=native

# Authorization
dbms.security.authorization_enabled=true

# SSL/TLS
dbms.ssl.policy.bolt.enabled=true
dbms.ssl.policy.bolt.base_directory=certificates/bolt
dbms.ssl.policy.bolt.private_key=private.key
dbms.ssl.policy.bolt.public_certificate=public.crt

dbms.ssl.policy.https.enabled=true
dbms.ssl.policy.https.base_directory=certificates/https
dbms.ssl.policy.https.private_key=private.key
dbms.ssl.policy.https.public_certificate=public.crt

dbms.ssl.policy.cluster.enabled=true
dbms.ssl.policy.cluster.base_directory=certificates/cluster
dbms.ssl.policy.cluster.private_key=private.key
dbms.ssl.policy.cluster.public_certificate=public.crt

#*****************************************************************
# Logging Configuration
#*****************************************************************

# Log level
dbms.logs.default_level=INFO

# Log rotation
dbms.logs.query.enabled=true
dbms.logs.query.threshold=0
dbms.logs.query.rotation.keep_number=10
dbms.logs.query.rotation.size=20M

# Security log
dbms.logs.security.enabled=true
dbms.logs.security.level=INFO

#*****************************************************************
# Performance Settings
#*****************************************************************

# Query timeout
dbms.transaction.timeout=60s
dbms.transaction.concurrent.maximum=1000

# Connection settings
dbms.connector.bolt.thread_pool_min_size=5
dbms.connector.bolt.thread_pool_max_size=400

# Backup settings
dbms.backup.enabled=true
dbms.backup.listen_address={server_config['host']}:6362

#*****************************************************************
# Data Directory Configuration
#*****************************************************************

# Data directories
dbms.directories.data={self.base_dir}/data
dbms.directories.logs={self.base_dir}/logs
dbms.directories.certificates={self.base_dir}/certificates
dbms.directories.import={self.base_dir}/import
"""
        return config_template
    
    def generate_read_replica_config(self, replica_config: Dict[str, Any]) -> str:
        """生成只读副本配置"""
        
        config_template = f"""
# Neo4j Read Replica Configuration - {replica_config['name']}
# Generated automatically - do not edit manually

#*****************************************************************
# Network connector configuration
#*****************************************************************

# Bolt connector
dbms.connector.bolt.enabled=true
dbms.connector.bolt.listen_address={replica_config['host']}:7687
dbms.connector.bolt.advertised_address={replica_config['advertised_host']}:7687

# HTTP connector
dbms.connector.http.enabled=true
dbms.connector.http.listen_address={replica_config['host']}:7474
dbms.connector.http.advertised_address={replica_config['advertised_host']}:7474

# HTTPS connector
dbms.connector.https.enabled=true
dbms.connector.https.listen_address={replica_config['host']}:7473
dbms.connector.https.advertised_address={replica_config['advertised_host']}:7473

#*****************************************************************
# Read Replica Configuration
#*****************************************************************

# Enable read replica mode
dbms.mode=READ_REPLICA

# Cluster discovery
dbms.cluster.discovery.type=LIST
dbms.cluster.discovery.endpoints={','.join(self._get_discovery_endpoints())}

# Server identity
dbms.cluster.server_id={replica_config['server_id']}
dbms.cluster.advertised_address={replica_config['advertised_host']}:5000
dbms.cluster.listen_address={replica_config['host']}:5000

# Catchup settings
dbms.cluster.catchup.client_inactivity_timeout=20s
dbms.cluster.catchup.upstream_strategy=typically_connect_to_random_core_server

#*****************************************************************
# Memory Settings
#*****************************************************************

# Java heap size
dbms.memory.heap.initial_size={replica_config.get('heap_initial', '1G')}
dbms.memory.heap.max_size={replica_config.get('heap_max', '2G')}

# Page cache
dbms.memory.pagecache.size={replica_config.get('pagecache_size', '1G')}

#*****************************************************************
# Security Configuration
#*****************************************************************

# Authentication
dbms.security.auth_enabled=true
dbms.security.auth_provider=native

# Authorization
dbms.security.authorization_enabled=true

# SSL/TLS
dbms.ssl.policy.bolt.enabled=true
dbms.ssl.policy.bolt.base_directory=certificates/bolt
dbms.ssl.policy.bolt.private_key=private.key
dbms.ssl.policy.bolt.public_certificate=public.crt

dbms.ssl.policy.https.enabled=true
dbms.ssl.policy.https.base_directory=certificates/https
dbms.ssl.policy.https.private_key=private.key
dbms.ssl.policy.https.public_certificate=public.crt

dbms.ssl.policy.cluster.enabled=true
dbms.ssl.policy.cluster.base_directory=certificates/cluster
dbms.ssl.policy.cluster.private_key=private.key
dbms.ssl.policy.cluster.public_certificate=public.crt

#*****************************************************************
# Logging Configuration
#*****************************************************************

# Log level
dbms.logs.default_level=INFO

# Query logging
dbms.logs.query.enabled=true
dbms.logs.query.threshold=0
dbms.logs.query.rotation.keep_number=10
dbms.logs.query.rotation.size=20M

#*****************************************************************
# Performance Settings
#*****************************************************************

# Query timeout
dbms.transaction.timeout=60s
dbms.transaction.concurrent.maximum=1000

# Connection settings
dbms.connector.bolt.thread_pool_min_size=5
dbms.connector.bolt.thread_pool_max_size=400

#*****************************************************************
# Data Directory Configuration
#*****************************************************************

# Data directories
dbms.directories.data={self.base_dir}/data
dbms.directories.logs={self.base_dir}/logs
dbms.directories.certificates={self.base_dir}/certificates
dbms.directories.import={self.base_dir}/import
"""
        return config_template
    
    def _get_discovery_endpoints(self) -> List[str]:
        """获取发现端点列表"""
        endpoints = []
        
        for server in self.config.get('core_servers', []):
            endpoint = f"{server['advertised_host']}:5000"
            endpoints.append(endpoint)
        
        return endpoints
    
    def generate_docker_compose(self) -> str:
        """生成Docker Compose配置"""
        
        compose_config = {
            'version': '3.8',
            'services': {},
            'networks': {
                'neo4j-cluster': {
                    'driver': 'bridge'
                }
            },
            'volumes': {}
        }
        
        # 核心服务器
        for i, server in enumerate(self.config.get('core_servers', []), 1):
            service_name = server['name']
            
            # 创建数据卷
            compose_config['volumes'][f"{service_name}-data"] = None
            compose_config['volumes'][f"{service_name}-logs"] = None
            
            # 服务配置
            compose_config['services'][service_name] = {
                'image': f"neo4j:{self.config.get('neo4j_version', '5.0-enterprise')}",
                'hostname': server['name'],
                'container_name': service_name,
                'environment': {
                    'NEO4J_AUTH': f"neo4j/{self.config.get('password', 'password')}",
                    'NEO4J_ACCEPT_LICENSE_AGREEMENT': 'yes',
                    'NEO4J_dbms_mode': 'CORE',
                    'NEO4J_dbms_cluster_server__id': str(i),
                    'NEO4J_dbms_cluster_discovery_endpoints': ','.join([
                        f"{s['name']}:5000" for s in self.config.get('core_servers', [])
                    ]),
                    'NEO4J_dbms_cluster_advertised__address': f"{server['name']}:5000",
                    'NEO4J_dbms_cluster_raft_advertised__address': f"{server['name']}:7000",
                    'NEO4J_dbms_connector_bolt_advertised__address': f"{server['name']}:7687",
                    'NEO4J_dbms_connector_http_advertised__address': f"{server['name']}:7474",
                    'NEO4J_dbms_memory_heap_initial__size': server.get('heap_initial', '1G'),
                    'NEO4J_dbms_memory_heap_max__size': server.get('heap_max', '2G'),
                    'NEO4J_dbms_memory_pagecache_size': server.get('pagecache_size', '1G')
                },
                'ports': [
                    f"{7474 + i - 1}:7474",  # HTTP
                    f"{7687 + i - 1}:7687",  # Bolt
                    f"{7473 + i - 1}:7473"   # HTTPS
                ],
                'volumes': [
                    f"{service_name}-data:/data",
                    f"{service_name}-logs:/logs"
                ],
                'networks': ['neo4j-cluster'],
                'restart': 'unless-stopped',
                'healthcheck': {
                    'test': ['CMD-SHELL', 'cypher-shell -u neo4j -p $NEO4J_AUTH "RETURN 1"'],
                    'interval': '30s',
                    'timeout': '10s',
                    'retries': 3,
                    'start_period': '60s'
                }
            }
        
        # 只读副本
        for i, replica in enumerate(self.config.get('read_replicas', []), 1):
            service_name = replica['name']
            
            # 创建数据卷
            compose_config['volumes'][f"{service_name}-data"] = None
            compose_config['volumes'][f"{service_name}-logs"] = None
            
            # 服务配置
            compose_config['services'][service_name] = {
                'image': f"neo4j:{self.config.get('neo4j_version', '5.0-enterprise')}",
                'hostname': replica['name'],
                'container_name': service_name,
                'environment': {
                    'NEO4J_AUTH': f"neo4j/{self.config.get('password', 'password')}",
                    'NEO4J_ACCEPT_LICENSE_AGREEMENT': 'yes',
                    'NEO4J_dbms_mode': 'READ_REPLICA',
                    'NEO4J_dbms_cluster_server__id': str(100 + i),  # 不同的ID范围
                    'NEO4J_dbms_cluster_discovery_endpoints': ','.join([
                        f"{s['name']}:5000" for s in self.config.get('core_servers', [])
                    ]),
                    'NEO4J_dbms_cluster_advertised__address': f"{replica['name']}:5000",
                    'NEO4J_dbms_connector_bolt_advertised__address': f"{replica['name']}:7687",
                    'NEO4J_dbms_connector_http_advertised__address': f"{replica['name']}:7474",
                    'NEO4J_dbms_memory_heap_initial__size': replica.get('heap_initial', '1G'),
                    'NEO4J_dbms_memory_heap_max__size': replica.get('heap_max', '2G'),
                    'NEO4J_dbms_memory_pagecache_size': replica.get('pagecache_size', '1G')
                },
                'ports': [
                    f"{8474 + i - 1}:7474",  # HTTP
                    f"{8687 + i - 1}:7687",  # Bolt
                    f"{8473 + i - 1}:7473"   # HTTPS
                ],
                'volumes': [
                    f"{service_name}-data:/data",
                    f"{service_name}-logs:/logs"
                ],
                'networks': ['neo4j-cluster'],
                'restart': 'unless-stopped',
                'depends_on': [server['name'] for server in self.config.get('core_servers', [])],
                'healthcheck': {
                    'test': ['CMD-SHELL', 'cypher-shell -u neo4j -p $NEO4J_AUTH "RETURN 1"'],
                    'interval': '30s',
                    'timeout': '10s',
                    'retries': 3,
                    'start_period': '120s'  # 更长的启动时间
                }
            }
        
        return yaml.dump(compose_config, default_flow_style=False, sort_keys=False)
    
    def generate_kubernetes_manifests(self) -> Dict[str, str]:
        """生成Kubernetes部署清单"""
        manifests = {}
        
        # ConfigMap for Neo4j configuration
        configmap = {
            'apiVersion': 'v1',
            'kind': 'ConfigMap',
            'metadata': {
                'name': f'{self.cluster_name}-config',
                'namespace': self.config.get('namespace', 'default')
            },
            'data': {
                'NEO4J_AUTH': f"neo4j/{self.config.get('password', 'password')}",
                'NEO4J_ACCEPT_LICENSE_AGREEMENT': 'yes'
            }
        }
        manifests['configmap.yaml'] = yaml.dump(configmap)
        
        # StatefulSet for core servers
        core_statefulset = {
            'apiVersion': 'apps/v1',
            'kind': 'StatefulSet',
            'metadata': {
                'name': f'{self.cluster_name}-core',
                'namespace': self.config.get('namespace', 'default')
            },
            'spec': {
                'serviceName': f'{self.cluster_name}-core-headless',
                'replicas': len(self.config.get('core_servers', [])),
                'selector': {
                    'matchLabels': {
                        'app': f'{self.cluster_name}-core'
                    }
                },
                'template': {
                    'metadata': {
                        'labels': {
                            'app': f'{self.cluster_name}-core'
                        }
                    },
                    'spec': {
                        'containers': [{
                            'name': 'neo4j',
                            'image': f"neo4j:{self.config.get('neo4j_version', '5.0-enterprise')}",
                            'env': [
                                {'name': 'NEO4J_AUTH', 'valueFrom': {'configMapKeyRef': {'name': f'{self.cluster_name}-config', 'key': 'NEO4J_AUTH'}}},
                                {'name': 'NEO4J_ACCEPT_LICENSE_AGREEMENT', 'value': 'yes'},
                                {'name': 'NEO4J_dbms_mode', 'value': 'CORE'},
                                {'name': 'NEO4J_dbms_cluster_discovery_endpoints', 'value': ','.join([
                                    f'{self.cluster_name}-core-{i}.{self.cluster_name}-core-headless:5000' 
                                    for i in range(len(self.config.get('core_servers', [])))
                                ])}
                            ],
                            'ports': [
                                {'containerPort': 7474, 'name': 'http'},
                                {'containerPort': 7687, 'name': 'bolt'},
                                {'containerPort': 5000, 'name': 'discovery'},
                                {'containerPort': 7000, 'name': 'raft'}
                            ],
                            'volumeMounts': [
                                {'name': 'data', 'mountPath': '/data'},
                                {'name': 'logs', 'mountPath': '/logs'}
                            ],
                            'resources': {
                                'requests': {
                                    'memory': self.config.get('core_memory_request', '2Gi'),
                                    'cpu': self.config.get('core_cpu_request', '1')
                                },
                                'limits': {
                                    'memory': self.config.get('core_memory_limit', '4Gi'),
                                    'cpu': self.config.get('core_cpu_limit', '2')
                                }
                            }
                        }]
                    }
                },
                'volumeClaimTemplates': [
                    {
                        'metadata': {'name': 'data'},
                        'spec': {
                            'accessModes': ['ReadWriteOnce'],
                            'resources': {
                                'requests': {
                                    'storage': self.config.get('core_storage_size', '10Gi')
                                }
                            }
                        }
                    },
                    {
                        'metadata': {'name': 'logs'},
                        'spec': {
                            'accessModes': ['ReadWriteOnce'],
                            'resources': {
                                'requests': {
                                    'storage': '5Gi'
                                }
                            }
                        }
                    }
                ]
            }
        }
        manifests['core-statefulset.yaml'] = yaml.dump(core_statefulset)
        
        # Service for core servers
        core_service = {
            'apiVersion': 'v1',
            'kind': 'Service',
            'metadata': {
                'name': f'{self.cluster_name}-core-headless',
                'namespace': self.config.get('namespace', 'default')
            },
            'spec': {
                'clusterIP': 'None',
                'selector': {
                    'app': f'{self.cluster_name}-core'
                },
                'ports': [
                    {'port': 7474, 'name': 'http'},
                    {'port': 7687, 'name': 'bolt'},
                    {'port': 5000, 'name': 'discovery'},
                    {'port': 7000, 'name': 'raft'}
                ]
            }
        }
        manifests['core-service.yaml'] = yaml.dump(core_service)
        
        return manifests
    
    def deploy_to_directory(self, output_dir: str):
        """部署配置文件到目录"""
        output_path = Path(output_dir)
        output_path.mkdir(parents=True, exist_ok=True)
        
        # 生成核心服务器配置
        for i, server in enumerate(self.config.get('core_servers', []), 1):
            config_content = self.generate_core_server_config(i, server)
            config_file = output_path / f"{server['name']}-neo4j.conf"
            config_file.write_text(config_content)
            print(f"Generated core server config: {config_file}")
        
        # 生成只读副本配置
        for replica in self.config.get('read_replicas', []):
            config_content = self.generate_read_replica_config(replica)
            config_file = output_path / f"{replica['name']}-neo4j.conf"
            config_file.write_text(config_content)
            print(f"Generated read replica config: {config_file}")
        
        # 生成Docker Compose
        compose_content = self.generate_docker_compose()
        compose_file = output_path / 'docker-compose.yml'
        compose_file.write_text(compose_content)
        print(f"Generated Docker Compose: {compose_file}")
        
        # 生成Kubernetes清单
        k8s_manifests = self.generate_kubernetes_manifests()
        k8s_dir = output_path / 'kubernetes'
        k8s_dir.mkdir(exist_ok=True)
        
        for filename, content in k8s_manifests.items():
            manifest_file = k8s_dir / filename
            manifest_file.write_text(content)
            print(f"Generated Kubernetes manifest: {manifest_file}")
        
        print(f"\nDeployment files generated in: {output_path}")

# 使用示例
deployment_config = {
    'cluster_name': 'production-cluster',
    'neo4j_version': '5.0-enterprise',
    'password': 'secure_password',
    'namespace': 'neo4j',
    'min_core_size': 3,
    'min_core_runtime': 3,
    'base_dir': '/opt/neo4j',
    'core_servers': [
        {
            'name': 'neo4j-core-1',
            'host': '0.0.0.0',
            'advertised_host': 'neo4j-core-1.example.com',
            'heap_initial': '2G',
            'heap_max': '4G',
            'pagecache_size': '2G'
        },
        {
            'name': 'neo4j-core-2',
            'host': '0.0.0.0',
            'advertised_host': 'neo4j-core-2.example.com',
            'heap_initial': '2G',
            'heap_max': '4G',
            'pagecache_size': '2G'
        },
        {
            'name': 'neo4j-core-3',
            'host': '0.0.0.0',
            'advertised_host': 'neo4j-core-3.example.com',
            'heap_initial': '2G',
            'heap_max': '4G',
            'pagecache_size': '2G'
        }
    ],
    'read_replicas': [
        {
            'name': 'neo4j-replica-1',
            'server_id': 101,
            'host': '0.0.0.0',
            'advertised_host': 'neo4j-replica-1.example.com',
            'heap_initial': '1G',
            'heap_max': '2G',
            'pagecache_size': '1G'
        },
        {
            'name': 'neo4j-replica-2',
            'server_id': 102,
            'host': '0.0.0.0',
            'advertised_host': 'neo4j-replica-2.example.com',
            'heap_initial': '1G',
            'heap_max': '2G',
            'pagecache_size': '1G'
        }
    ],
    'core_memory_request': '2Gi',
    'core_memory_limit': '4Gi',
    'core_cpu_request': '1',
    'core_cpu_limit': '2',
    'core_storage_size': '20Gi'
}

deployer = Neo4jClusterDeployer(deployment_config)

# 部署配置文件
deployer.deploy_to_directory('./neo4j-cluster-deployment')

print("\n集群部署配置生成完成!")
print("请按照以下步骤部署集群:")
print("1. 检查生成的配置文件")
print("2. 根据环境选择Docker Compose或Kubernetes部署")
print("3. 启动核心服务器")
print("4. 等待集群形成")
print("5. 启动只读副本")
print("6. 验证集群状态")

7.3 负载均衡与故障转移

7.3.1 负载均衡策略

import random
import time
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
from datetime import datetime, timedelta
import threading
import logging

class LoadBalancingStrategy(Enum):
    """负载均衡策略"""
    ROUND_ROBIN = "round_robin"
    RANDOM = "random"
    LEAST_CONNECTIONS = "least_connections"
    WEIGHTED_ROUND_ROBIN = "weighted_round_robin"
    HEALTH_BASED = "health_based"

@dataclass
class ServerHealth:
    """服务器健康状态"""
    server_name: str
    is_healthy: bool
    response_time_ms: float
    active_connections: int
    last_check: datetime
    error_count: int = 0
    weight: float = 1.0

class Neo4jLoadBalancer:
    """Neo4j负载均衡器"""
    
    def __init__(self, cluster_config: Dict[str, Any]):
        self.cluster_config = cluster_config
        self.strategy = LoadBalancingStrategy(cluster_config.get('strategy', 'round_robin'))
        self.health_check_interval = cluster_config.get('health_check_interval', 30)
        self.max_retries = cluster_config.get('max_retries', 3)
        self.timeout = cluster_config.get('timeout', 10)
        
        # 服务器列表
        self.read_servers = cluster_config.get('read_servers', [])
        self.write_servers = cluster_config.get('write_servers', [])
        
        # 健康状态跟踪
        self.server_health: Dict[str, ServerHealth] = {}
        self.current_index = 0
        self.lock = threading.Lock()
        
        # 连接池
        self.drivers = {}
        
        # 初始化
        self._initialize_health_tracking()
        self._initialize_connections()
        self._start_health_monitoring()
        
        # 配置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def _initialize_health_tracking(self):
        """初始化健康状态跟踪"""
        all_servers = self.read_servers + self.write_servers
        
        for server in all_servers:
            self.server_health[server['name']] = ServerHealth(
                server_name=server['name'],
                is_healthy=True,
                response_time_ms=0.0,
                active_connections=0,
                last_check=datetime.now(),
                weight=server.get('weight', 1.0)
            )
    
    def _initialize_connections(self):
        """初始化连接池"""
        from neo4j import GraphDatabase
        
        all_servers = self.read_servers + self.write_servers
        
        for server in all_servers:
            try:
                driver = GraphDatabase.driver(
                    server['uri'],
                    auth=(server.get('username', 'neo4j'), server.get('password', 'password')),
                    max_connection_lifetime=3600,
                    max_connection_pool_size=50,
                    connection_timeout=self.timeout
                )
                self.drivers[server['name']] = driver
                self.logger.info(f"Initialized connection pool for {server['name']}")
            except Exception as e:
                self.logger.error(f"Failed to initialize connection for {server['name']}: {e}")
                self.server_health[server['name']].is_healthy = False
    
    def _start_health_monitoring(self):
        """启动健康监控"""
        def health_check_loop():
            while True:
                self._perform_health_checks()
                time.sleep(self.health_check_interval)
        
        health_thread = threading.Thread(target=health_check_loop, daemon=True)
        health_thread.start()
        self.logger.info("Health monitoring started")
    
    def _perform_health_checks(self):
        """执行健康检查"""
        all_servers = self.read_servers + self.write_servers
        
        for server in all_servers:
            server_name = server['name']
            health = self.server_health[server_name]
            
            try:
                start_time = time.time()
                
                driver = self.drivers.get(server_name)
                if driver:
                    with driver.session() as session:
                        # 简单的健康检查查询
                        result = session.run("RETURN 1 as health_check")
                        list(result)  # 消费结果
                        
                        # 获取连接数
                        conn_result = session.run(
                            "CALL dbms.listConnections() YIELD connectionId RETURN count(*) as conn_count"
                        )
                        conn_record = conn_result.single()
                        active_connections = conn_record['conn_count'] if conn_record else 0
                
                end_time = time.time()
                response_time = (end_time - start_time) * 1000  # 转换为毫秒
                
                # 更新健康状态
                with self.lock:
                    health.is_healthy = True
                    health.response_time_ms = response_time
                    health.active_connections = active_connections
                    health.last_check = datetime.now()
                    health.error_count = 0
                
                self.logger.debug(f"Health check passed for {server_name}: {response_time:.2f}ms")
            
            except Exception as e:
                with self.lock:
                    health.is_healthy = False
                    health.error_count += 1
                    health.last_check = datetime.now()
                
                self.logger.warning(f"Health check failed for {server_name}: {e}")
    
    def get_read_server(self) -> Optional[Dict[str, Any]]:
        """获取读取服务器"""
        healthy_servers = [
            server for server in self.read_servers
            if self.server_health[server['name']].is_healthy
        ]
        
        if not healthy_servers:
            self.logger.error("No healthy write servers available")
            return None
        
        return self._select_server(healthy_servers)
    
    def _select_server(self, servers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """根据策略选择服务器"""
        if self.strategy == LoadBalancingStrategy.ROUND_ROBIN:
            return self._round_robin_select(servers)
        elif self.strategy == LoadBalancingStrategy.RANDOM:
            return self._random_select(servers)
        elif self.strategy == LoadBalancingStrategy.LEAST_CONNECTIONS:
            return self._least_connections_select(servers)
        elif self.strategy == LoadBalancingStrategy.WEIGHTED_ROUND_ROBIN:
            return self._weighted_round_robin_select(servers)
        elif self.strategy == LoadBalancingStrategy.HEALTH_BASED:
            return self._health_based_select(servers)
        else:
            return self._round_robin_select(servers)
    
    def _round_robin_select(self, servers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """轮询选择"""
        with self.lock:
            server = servers[self.current_index % len(servers)]
            self.current_index += 1
            return server
    
    def _random_select(self, servers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """随机选择"""
        return random.choice(servers)
    
    def _least_connections_select(self, servers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """最少连接选择"""
        min_connections = float('inf')
        selected_server = servers[0]
        
        for server in servers:
            health = self.server_health[server['name']]
            if health.active_connections < min_connections:
                min_connections = health.active_connections
                selected_server = server
        
        return selected_server
    
    def _weighted_round_robin_select(self, servers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """加权轮询选择"""
        total_weight = sum(self.server_health[server['name']].weight for server in servers)
        
        if total_weight == 0:
            return self._round_robin_select(servers)
        
        # 生成随机数
        rand_weight = random.uniform(0, total_weight)
        current_weight = 0
        
        for server in servers:
            current_weight += self.server_health[server['name']].weight
            if rand_weight <= current_weight:
                return server
        
        return servers[-1]  # fallback
    
    def _health_based_select(self, servers: List[Dict[str, Any]]) -> Dict[str, Any]:
        """基于健康状态选择"""
        # 计算每个服务器的得分(响应时间越低,连接数越少,得分越高)
        scored_servers = []
        
        for server in servers:
            health = self.server_health[server['name']]
            
            # 计算得分:基于响应时间和连接数
            response_score = 1000 / (health.response_time_ms + 1)  # 响应时间越低得分越高
            connection_score = 100 / (health.active_connections + 1)  # 连接数越少得分越高
            
            total_score = response_score + connection_score
            scored_servers.append((server, total_score))
        
        # 选择得分最高的服务器
        scored_servers.sort(key=lambda x: x[1], reverse=True)
        return scored_servers[0][0]
    
    def execute_read_query(self, query: str, parameters: Dict[str, Any] = None) -> Any:
        """执行读查询"""
        return self._execute_query_with_retry(query, parameters, is_write=False)
    
    def execute_write_query(self, query: str, parameters: Dict[str, Any] = None) -> Any:
        """执行写查询"""
        return self._execute_query_with_retry(query, parameters, is_write=True)
    
    def _execute_query_with_retry(self, query: str, parameters: Dict[str, Any] = None, is_write: bool = False) -> Any:
        """带重试的查询执行"""
        parameters = parameters or {}
        
        for attempt in range(self.max_retries):
            try:
                # 选择服务器
                server = self.get_write_server() if is_write else self.get_read_server()
                
                if not server:
                    raise Exception("No healthy servers available")
                
                # 执行查询
                driver = self.drivers[server['name']]
                
                with driver.session() as session:
                    if is_write:
                        result = session.write_transaction(lambda tx: tx.run(query, parameters))
                    else:
                        result = session.read_transaction(lambda tx: tx.run(query, parameters))
                    
                    return list(result)
            
            except Exception as e:
                self.logger.warning(f"Query attempt {attempt + 1} failed: {e}")
                
                if attempt == self.max_retries - 1:
                    raise e
                
                # 等待后重试
                time.sleep(0.5 * (attempt + 1))
        
        raise Exception("All retry attempts failed")
    
    def get_cluster_status(self) -> Dict[str, Any]:
        """获取集群状态"""
        status = {
            'timestamp': datetime.now(),
            'strategy': self.strategy.value,
            'total_servers': len(self.read_servers) + len(self.write_servers),
            'healthy_servers': 0,
            'unhealthy_servers': 0,
            'read_servers': [],
            'write_servers': [],
            'average_response_time': 0.0
        }
        
        total_response_time = 0
        healthy_count = 0
        
        # 读服务器状态
        for server in self.read_servers:
            health = self.server_health[server['name']]
            server_status = {
                'name': server['name'],
                'uri': server['uri'],
                'is_healthy': health.is_healthy,
                'response_time_ms': health.response_time_ms,
                'active_connections': health.active_connections,
                'last_check': health.last_check,
                'error_count': health.error_count,
                'weight': health.weight
            }
            status['read_servers'].append(server_status)
            
            if health.is_healthy:
                healthy_count += 1
                total_response_time += health.response_time_ms
            else:
                status['unhealthy_servers'] += 1
        
        # 写服务器状态
        for server in self.write_servers:
            health = self.server_health[server['name']]
            server_status = {
                'name': server['name'],
                'uri': server['uri'],
                'is_healthy': health.is_healthy,
                'response_time_ms': health.response_time_ms,
                'active_connections': health.active_connections,
                'last_check': health.last_check,
                'error_count': health.error_count,
                'weight': health.weight
            }
            status['write_servers'].append(server_status)
            
            if health.is_healthy:
                healthy_count += 1
                total_response_time += health.response_time_ms
            else:
                status['unhealthy_servers'] += 1
        
        status['healthy_servers'] = healthy_count
        
        if healthy_count > 0:
            status['average_response_time'] = total_response_time / healthy_count
        
        return status
    
    def update_server_weight(self, server_name: str, weight: float):
        """更新服务器权重"""
        if server_name in self.server_health:
            with self.lock:
                self.server_health[server_name].weight = weight
            self.logger.info(f"Updated weight for {server_name} to {weight}")
        else:
            self.logger.error(f"Server {server_name} not found")
    
    def close(self):
        """关闭负载均衡器"""
        for driver in self.drivers.values():
            if driver:
                driver.close()
        self.logger.info("Load balancer closed")

class Neo4jFailoverManager:
    """Neo4j故障转移管理器"""
    
    def __init__(self, cluster_config: Dict[str, Any]):
        self.cluster_config = cluster_config
        self.primary_servers = cluster_config.get('primary_servers', [])
        self.backup_servers = cluster_config.get('backup_servers', [])
        self.failover_timeout = cluster_config.get('failover_timeout', 30)
        self.recovery_check_interval = cluster_config.get('recovery_check_interval', 60)
        
        # 当前活动服务器
        self.active_servers = self.primary_servers.copy()
        self.failed_servers = []
        
        # 故障转移回调
        self.failover_callbacks: List[Callable] = []
        self.recovery_callbacks: List[Callable] = []
        
        # 锁和监控
        self.lock = threading.Lock()
        self._start_recovery_monitoring()
        
        # 配置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def add_failover_callback(self, callback: Callable):
        """添加故障转移回调"""
        self.failover_callbacks.append(callback)
    
    def add_recovery_callback(self, callback: Callable):
        """添加恢复回调"""
        self.recovery_callbacks.append(callback)
    
    def handle_server_failure(self, failed_server: Dict[str, Any]):
        """处理服务器故障"""
        with self.lock:
            if failed_server in self.active_servers:
                self.active_servers.remove(failed_server)
                self.failed_servers.append({
                    'server': failed_server,
                    'failed_at': datetime.now(),
                    'recovery_attempts': 0
                })
                
                self.logger.error(f"Server {failed_server['name']} failed, initiating failover")
                
                # 尝试故障转移
                self._perform_failover(failed_server)
    
    def _perform_failover(self, failed_server: Dict[str, Any]):
        """执行故障转移"""
        # 查找可用的备份服务器
        available_backup = None
        
        for backup in self.backup_servers:
            if backup not in [fs['server'] for fs in self.failed_servers]:
                available_backup = backup
                break
        
        if available_backup:
            # 激活备份服务器
            self.active_servers.append(available_backup)
            self.backup_servers.remove(available_backup)
            
            self.logger.info(f"Activated backup server {available_backup['name']} to replace {failed_server['name']}")
            
            # 执行故障转移回调
            for callback in self.failover_callbacks:
                try:
                    callback(failed_server, available_backup)
                except Exception as e:
                    self.logger.error(f"Failover callback failed: {e}")
        else:
            self.logger.critical(f"No backup servers available for failover from {failed_server['name']}")
    
    def _start_recovery_monitoring(self):
        """启动恢复监控"""
        def recovery_loop():
            while True:
                self._check_failed_servers_recovery()
                time.sleep(self.recovery_check_interval)
        
        recovery_thread = threading.Thread(target=recovery_loop, daemon=True)
        recovery_thread.start()
        self.logger.info("Recovery monitoring started")
    
    def _check_failed_servers_recovery(self):
        """检查故障服务器恢复"""
        with self.lock:
            recovered_servers = []
            
            for failed_info in self.failed_servers:
                server = failed_info['server']
                
                if self._test_server_health(server):
                    # 服务器已恢复
                    recovered_servers.append(failed_info)
                    self.logger.info(f"Server {server['name']} has recovered")
                    
                    # 执行恢复回调
                    for callback in self.recovery_callbacks:
                        try:
                            callback(server)
                        except Exception as e:
                            self.logger.error(f"Recovery callback failed: {e}")
                else:
                    # 增加恢复尝试次数
                    failed_info['recovery_attempts'] += 1
            
            # 移除已恢复的服务器
            for recovered in recovered_servers:
                self.failed_servers.remove(recovered)
                
                # 决定是否重新激活服务器
                if self._should_reactivate_server(recovered['server']):
                    self._reactivate_server(recovered['server'])
    
    def _test_server_health(self, server: Dict[str, Any]) -> bool:
        """测试服务器健康状态"""
        try:
            from neo4j import GraphDatabase
            
            driver = GraphDatabase.driver(
                server['uri'],
                auth=(server.get('username', 'neo4j'), server.get('password', 'password')),
                connection_timeout=5
            )
            
            with driver.session() as session:
                session.run("RETURN 1")
            
            driver.close()
            return True
        
        except Exception:
            return False
    
    def _should_reactivate_server(self, server: Dict[str, Any]) -> bool:
        """判断是否应该重新激活服务器"""
        # 简单策略:如果活动服务器数量少于期望数量,则重新激活
        desired_active_count = len(self.primary_servers)
        current_active_count = len(self.active_servers)
        
        return current_active_count < desired_active_count
    
    def _reactivate_server(self, server: Dict[str, Any]):
        """重新激活服务器"""
        # 检查是否有备份服务器需要替换回去
        backup_to_deactivate = None
        
        for active in self.active_servers:
            if active in self.backup_servers:
                backup_to_deactivate = active
                break
        
        if backup_to_deactivate:
            # 替换备份服务器
            self.active_servers.remove(backup_to_deactivate)
            self.backup_servers.append(backup_to_deactivate)
            
            self.logger.info(f"Deactivated backup server {backup_to_deactivate['name']}")
        
        # 重新激活原服务器
        self.active_servers.append(server)
        self.logger.info(f"Reactivated server {server['name']}")
    
    def get_failover_status(self) -> Dict[str, Any]:
        """获取故障转移状态"""
        with self.lock:
            return {
                'timestamp': datetime.now(),
                'active_servers': [s['name'] for s in self.active_servers],
                'failed_servers': [
                    {
                        'name': fs['server']['name'],
                        'failed_at': fs['failed_at'],
                        'recovery_attempts': fs['recovery_attempts']
                    }
                    for fs in self.failed_servers
                ],
                'backup_servers': [s['name'] for s in self.backup_servers],
                'total_primary_servers': len(self.primary_servers),
                'total_backup_servers': len(self.cluster_config.get('backup_servers', [])),
                'failover_events': len(self.failed_servers)
            }
    
    def manual_failover(self, from_server: str, to_server: str) -> Dict[str, Any]:
        """手动故障转移"""
        with self.lock:
            # 查找源服务器
            source_server = None
            for server in self.active_servers:
                if server['name'] == from_server:
                    source_server = server
                    break
            
            if not source_server:
                return {
                    'success': False,
                    'message': f'Source server {from_server} not found in active servers'
                }
            
            # 查找目标服务器
            target_server = None
            for server in self.backup_servers:
                if server['name'] == to_server:
                    target_server = server
                    break
            
            if not target_server:
                return {
                    'success': False,
                    'message': f'Target server {to_server} not found in backup servers'
                }
            
            # 执行切换
            self.active_servers.remove(source_server)
            self.backup_servers.remove(target_server)
            
            self.active_servers.append(target_server)
            self.backup_servers.append(source_server)
            
            self.logger.info(f"Manual failover completed: {from_server} -> {to_server}")
            
            return {
                'success': True,
                'message': f'Manual failover completed from {from_server} to {to_server}'
            }

# 使用示例
load_balancer_config = {
    'strategy': 'health_based',
    'health_check_interval': 30,
    'max_retries': 3,
    'timeout': 10,
    'read_servers': [
        {
            'name': 'read-replica-1',
            'uri': 'bolt://neo4j-replica-1:7687',
            'username': 'neo4j',
            'password': 'password',
            'weight': 1.0
        },
        {
            'name': 'read-replica-2',
            'uri': 'bolt://neo4j-replica-2:7687',
            'username': 'neo4j',
            'password': 'password',
            'weight': 1.5
        }
    ],
    'write_servers': [
        {
            'name': 'core-leader',
            'uri': 'bolt://neo4j-core-1:7687',
            'username': 'neo4j',
            'password': 'password',
            'weight': 1.0
        }
    ]
}

failover_config = {
    'primary_servers': [
        {
            'name': 'core-1',
            'uri': 'bolt://neo4j-core-1:7687',
            'username': 'neo4j',
            'password': 'password'
        },
        {
            'name': 'core-2',
            'uri': 'bolt://neo4j-core-2:7687',
            'username': 'neo4j',
            'password': 'password'
        }
    ],
    'backup_servers': [
        {
            'name': 'backup-1',
            'uri': 'bolt://neo4j-backup-1:7687',
            'username': 'neo4j',
            'password': 'password'
        }
    ],
    'failover_timeout': 30,
    'recovery_check_interval': 60
}

# 创建负载均衡器
load_balancer = Neo4jLoadBalancer(load_balancer_config)

# 创建故障转移管理器
failover_manager = Neo4jFailoverManager(failover_config)

# 添加故障转移回调
def on_failover(failed_server, backup_server):
    print(f"Failover: {failed_server['name']} -> {backup_server['name']}")
    # 这里可以添加通知、日志记录等逻辑

def on_recovery(recovered_server):
    print(f"Server recovered: {recovered_server['name']}")
    # 这里可以添加恢复后的处理逻辑

failover_manager.add_failover_callback(on_failover)
failover_manager.add_recovery_callback(on_recovery)

# 执行查询示例
try:
    # 读查询
    read_result = load_balancer.execute_read_query(
        "MATCH (n:Person) RETURN n.name LIMIT 10"
    )
    print(f"Read query result: {len(read_result)} records")
    
    # 写查询
    write_result = load_balancer.execute_write_query(
        "CREATE (p:Person {name: $name, created_at: datetime()}) RETURN p",
        {'name': 'Test User'}
    )
    print(f"Write query result: {len(write_result)} records")
    
    # 获取集群状态
    cluster_status = load_balancer.get_cluster_status()
    print(f"\n集群状态:")
    print(f"健康服务器: {cluster_status['healthy_servers']}/{cluster_status['total_servers']}")
    print(f"平均响应时间: {cluster_status['average_response_time']:.2f}ms")
    
    # 获取故障转移状态
    failover_status = failover_manager.get_failover_status()
    print(f"\n故障转移状态:")
    print(f"活动服务器: {failover_status['active_servers']}")
    print(f"故障服务器: {len(failover_status['failed_servers'])}")
    print(f"备份服务器: {failover_status['backup_servers']}")

except Exception as e:
    print(f"Error: {e}")

finally:
    load_balancer.close()

7.4 数据备份与恢复

7.4.1 备份策略

import os
import shutil
import subprocess
import json
import gzip
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional
from pathlib import Path
from dataclasses import dataclass
from enum import Enum
import threading
import time
import logging

class BackupType(Enum):
    """备份类型"""
    FULL = "full"
    INCREMENTAL = "incremental"
    DIFFERENTIAL = "differential"

class BackupStatus(Enum):
    """备份状态"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

@dataclass
class BackupJob:
    """备份任务"""
    id: str
    backup_type: BackupType
    database_name: str
    backup_path: str
    status: BackupStatus
    created_at: datetime
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    file_size: int = 0
    error_message: Optional[str] = None
    retention_days: int = 30

class Neo4jBackupManager:
    """Neo4j备份管理器"""
    
    def __init__(self, backup_config: Dict[str, Any]):
        self.config = backup_config
        self.neo4j_home = backup_config.get('neo4j_home', '/opt/neo4j')
        self.backup_dir = Path(backup_config.get('backup_dir', '/opt/neo4j/backups'))
        self.backup_dir.mkdir(parents=True, exist_ok=True)
        
        # 连接配置
        self.bolt_uri = backup_config.get('bolt_uri', 'bolt://localhost:7687')
        self.username = backup_config.get('username', 'neo4j')
        self.password = backup_config.get('password', 'password')
        
        # 备份配置
        self.compression_enabled = backup_config.get('compression', True)
        self.parallel_backup = backup_config.get('parallel_backup', True)
        self.max_concurrent_backups = backup_config.get('max_concurrent_backups', 2)
        self.default_retention_days = backup_config.get('retention_days', 30)
        
        # 任务管理
        self.backup_jobs: Dict[str, BackupJob] = {}
        self.running_jobs = 0
        self.lock = threading.Lock()
        
        # 调度配置
        self.schedule_config = backup_config.get('schedule', {})
        self._start_scheduler()
        
        # 配置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def create_backup(self, database_name: str, backup_type: BackupType = BackupType.FULL, 
                     retention_days: Optional[int] = None) -> str:
        """创建备份任务"""
        job_id = f"backup_{database_name}_{backup_type.value}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        
        # 生成备份路径
        backup_filename = f"{job_id}.backup"
        if self.compression_enabled:
            backup_filename += ".gz"
        
        backup_path = self.backup_dir / backup_filename
        
        # 创建备份任务
        backup_job = BackupJob(
            id=job_id,
            backup_type=backup_type,
            database_name=database_name,
            backup_path=str(backup_path),
            status=BackupStatus.PENDING,
            created_at=datetime.now(),
            retention_days=retention_days or self.default_retention_days
        )
        
        with self.lock:
            self.backup_jobs[job_id] = backup_job
        
        # 启动备份任务
        self._start_backup_job(backup_job)
        
        self.logger.info(f"Created backup job {job_id} for database {database_name}")
        return job_id
    
    def _start_backup_job(self, backup_job: BackupJob):
        """启动备份任务"""
        def backup_worker():
            with self.lock:
                if self.running_jobs >= self.max_concurrent_backups:
                    self.logger.warning(f"Max concurrent backups reached, queuing job {backup_job.id}")
                    return
                
                self.running_jobs += 1
                backup_job.status = BackupStatus.RUNNING
                backup_job.started_at = datetime.now()
            
            try:
                self._execute_backup(backup_job)
                
                with self.lock:
                    backup_job.status = BackupStatus.COMPLETED
                    backup_job.completed_at = datetime.now()
                    
                    # 获取文件大小
                    if os.path.exists(backup_job.backup_path):
                        backup_job.file_size = os.path.getsize(backup_job.backup_path)
                
                self.logger.info(f"Backup job {backup_job.id} completed successfully")
            
            except Exception as e:
                with self.lock:
                    backup_job.status = BackupStatus.FAILED
                    backup_job.error_message = str(e)
                    backup_job.completed_at = datetime.now()
                
                self.logger.error(f"Backup job {backup_job.id} failed: {e}")
            
            finally:
                with self.lock:
                    self.running_jobs -= 1
        
        backup_thread = threading.Thread(target=backup_worker)
        backup_thread.start()
    
    def _execute_backup(self, backup_job: BackupJob):
        """执行备份"""
        if backup_job.backup_type == BackupType.FULL:
            self._execute_full_backup(backup_job)
        elif backup_job.backup_type == BackupType.INCREMENTAL:
            self._execute_incremental_backup(backup_job)
        elif backup_job.backup_type == BackupType.DIFFERENTIAL:
            self._execute_differential_backup(backup_job)
    
    def _execute_full_backup(self, backup_job: BackupJob):
        """执行完整备份"""
        # 构建备份命令
        backup_cmd = [
            f"{self.neo4j_home}/bin/neo4j-admin",
            "database", "backup",
            "--database", backup_job.database_name,
            "--to-path", str(self.backup_dir),
            "--backup-name", backup_job.id
        ]
        
        # 添加连接参数
        if self.bolt_uri:
            backup_cmd.extend(["--from", self.bolt_uri])
        
        # 执行备份命令
        self.logger.info(f"Executing full backup: {' '.join(backup_cmd)}")
        
        result = subprocess.run(
            backup_cmd,
            capture_output=True,
            text=True,
            timeout=3600  # 1小时超时
        )
        
        if result.returncode != 0:
            raise Exception(f"Backup command failed: {result.stderr}")
        
        # 压缩备份文件
        if self.compression_enabled:
            self._compress_backup(backup_job)
    
    def _execute_incremental_backup(self, backup_job: BackupJob):
        """执行增量备份"""
        # 查找最近的完整备份
        last_full_backup = self._find_last_full_backup(backup_job.database_name)
        
        if not last_full_backup:
            self.logger.warning(f"No full backup found for {backup_job.database_name}, performing full backup instead")
            self._execute_full_backup(backup_job)
            return
        
        # 构建增量备份命令
        backup_cmd = [
            f"{self.neo4j_home}/bin/neo4j-admin",
            "database", "backup",
            "--database", backup_job.database_name,
            "--to-path", str(self.backup_dir),
            "--backup-name", backup_job.id,
            "--incremental-base", last_full_backup.backup_path
        ]
        
        if self.bolt_uri:
            backup_cmd.extend(["--from", self.bolt_uri])
        
        self.logger.info(f"Executing incremental backup: {' '.join(backup_cmd)}")
        
        result = subprocess.run(
            backup_cmd,
            capture_output=True,
            text=True,
            timeout=1800  # 30分钟超时
        )
        
        if result.returncode != 0:
            raise Exception(f"Incremental backup command failed: {result.stderr}")
        
        if self.compression_enabled:
            self._compress_backup(backup_job)
    
    def _execute_differential_backup(self, backup_job: BackupJob):
        """执行差异备份"""
        # 差异备份逻辑类似增量备份,但基于最近的完整备份
        last_full_backup = self._find_last_full_backup(backup_job.database_name)
        
        if not last_full_backup:
            self.logger.warning(f"No full backup found for {backup_job.database_name}, performing full backup instead")
            self._execute_full_backup(backup_job)
            return
        
        # 使用完整备份作为基础
        backup_cmd = [
            f"{self.neo4j_home}/bin/neo4j-admin",
            "database", "backup",
            "--database", backup_job.database_name,
            "--to-path", str(self.backup_dir),
            "--backup-name", backup_job.id,
            "--differential-base", last_full_backup.backup_path
        ]
        
        if self.bolt_uri:
            backup_cmd.extend(["--from", self.bolt_uri])
        
        self.logger.info(f"Executing differential backup: {' '.join(backup_cmd)}")
        
        result = subprocess.run(
            backup_cmd,
            capture_output=True,
            text=True,
            timeout=1800
        )
        
        if result.returncode != 0:
            raise Exception(f"Differential backup command failed: {result.stderr}")
        
        if self.compression_enabled:
            self._compress_backup(backup_job)
    
    def _compress_backup(self, backup_job: BackupJob):
        """压缩备份文件"""
        original_path = backup_job.backup_path.replace('.gz', '')
        
        if os.path.exists(original_path):
            with open(original_path, 'rb') as f_in:
                with gzip.open(backup_job.backup_path, 'wb') as f_out:
                    shutil.copyfileobj(f_in, f_out)
            
            # 删除原始文件
            os.remove(original_path)
            self.logger.info(f"Compressed backup file: {backup_job.backup_path}")
    
    def _find_last_full_backup(self, database_name: str) -> Optional[BackupJob]:
        """查找最近的完整备份"""
        full_backups = [
            job for job in self.backup_jobs.values()
            if (job.database_name == database_name and 
                job.backup_type == BackupType.FULL and 
                job.status == BackupStatus.COMPLETED)
        ]
        
        if not full_backups:
            return None
        
        # 按创建时间排序,返回最新的
        full_backups.sort(key=lambda x: x.created_at, reverse=True)
        return full_backups[0]
    
    def restore_backup(self, backup_job_id: str, target_database: str, 
                      force: bool = False) -> Dict[str, Any]:
        """恢复备份"""
        backup_job = self.backup_jobs.get(backup_job_id)
        
        if not backup_job:
            return {
                'success': False,
                'message': f'Backup job {backup_job_id} not found'
            }
        
        if backup_job.status != BackupStatus.COMPLETED:
            return {
                'success': False,
                'message': f'Backup job {backup_job_id} is not completed'
            }
        
        if not os.path.exists(backup_job.backup_path):
            return {
                'success': False,
                'message': f'Backup file {backup_job.backup_path} not found'
            }
        
        try:
            # 解压备份文件(如果需要)
            restore_path = backup_job.backup_path
            if backup_job.backup_path.endswith('.gz'):
                restore_path = backup_job.backup_path[:-3]
                
                with gzip.open(backup_job.backup_path, 'rb') as f_in:
                    with open(restore_path, 'wb') as f_out:
                        shutil.copyfileobj(f_in, f_out)
            
            # 构建恢复命令
            restore_cmd = [
                f"{self.neo4j_home}/bin/neo4j-admin",
                "database", "restore",
                "--from-path", restore_path,
                "--database", target_database
            ]
            
            if force:
                restore_cmd.append("--overwrite-destination")
            
            self.logger.info(f"Executing restore: {' '.join(restore_cmd)}")
            
            result = subprocess.run(
                restore_cmd,
                capture_output=True,
                text=True,
                timeout=3600
            )
            
            if result.returncode != 0:
                return {
                    'success': False,
                    'message': f'Restore command failed: {result.stderr}'
                }
            
            # 清理临时解压文件
            if restore_path != backup_job.backup_path and os.path.exists(restore_path):
                os.remove(restore_path)
            
            self.logger.info(f"Successfully restored backup {backup_job_id} to database {target_database}")
            
            return {
                'success': True,
                'message': f'Successfully restored backup to database {target_database}'
            }
        
        except Exception as e:
            self.logger.error(f"Restore failed: {e}")
            return {
                'success': False,
                'message': f'Restore failed: {e}'
            }
    
    def cleanup_old_backups(self):
        """清理过期备份"""
        current_time = datetime.now()
        deleted_count = 0
        
        for job_id, backup_job in list(self.backup_jobs.items()):
            if backup_job.status == BackupStatus.COMPLETED:
                age_days = (current_time - backup_job.created_at).days
                
                if age_days > backup_job.retention_days:
                    try:
                        # 删除备份文件
                        if os.path.exists(backup_job.backup_path):
                            os.remove(backup_job.backup_path)
                        
                        # 从任务列表中移除
                        del self.backup_jobs[job_id]
                        deleted_count += 1
                        
                        self.logger.info(f"Deleted expired backup: {backup_job.backup_path}")
                    
                    except Exception as e:
                        self.logger.error(f"Failed to delete backup {backup_job.backup_path}: {e}")
        
        self.logger.info(f"Cleanup completed: deleted {deleted_count} expired backups")
        return deleted_count
    
    def get_backup_status(self, job_id: Optional[str] = None) -> Dict[str, Any]:
        """获取备份状态"""
        if job_id:
            backup_job = self.backup_jobs.get(job_id)
            if not backup_job:
                return {'error': f'Backup job {job_id} not found'}
            
            return {
                'id': backup_job.id,
                'backup_type': backup_job.backup_type.value,
                'database_name': backup_job.database_name,
                'status': backup_job.status.value,
                'created_at': backup_job.created_at.isoformat(),
                'started_at': backup_job.started_at.isoformat() if backup_job.started_at else None,
                'completed_at': backup_job.completed_at.isoformat() if backup_job.completed_at else None,
                'file_size': backup_job.file_size,
                'backup_path': backup_job.backup_path,
                'error_message': backup_job.error_message,
                'retention_days': backup_job.retention_days
            }
        else:
            # 返回所有备份任务的状态
            return {
                'total_jobs': len(self.backup_jobs),
                'running_jobs': self.running_jobs,
                'completed_jobs': len([j for j in self.backup_jobs.values() if j.status == BackupStatus.COMPLETED]),
                'failed_jobs': len([j for j in self.backup_jobs.values() if j.status == BackupStatus.FAILED]),
                'jobs': [
                    {
                        'id': job.id,
                        'database_name': job.database_name,
                        'backup_type': job.backup_type.value,
                        'status': job.status.value,
                        'created_at': job.created_at.isoformat(),
                        'file_size': job.file_size
                    }
                    for job in sorted(self.backup_jobs.values(), key=lambda x: x.created_at, reverse=True)
                ]
            }
    
    def _start_scheduler(self):
        """启动定时备份调度器"""
        if not self.schedule_config:
            return
        
        def scheduler_loop():
            while True:
                try:
                    self._check_scheduled_backups()
                except Exception as e:
                    self.logger.error(f"Scheduler error: {e}")
                
                time.sleep(3600)  # 每小时检查一次
        
        scheduler_thread = threading.Thread(target=scheduler_loop, daemon=True)
        scheduler_thread.start()
        self.logger.info("Backup scheduler started")
    
    def _check_scheduled_backups(self):
        """检查定时备份"""
        current_time = datetime.now()
        
        for schedule_name, schedule_info in self.schedule_config.items():
            database_name = schedule_info.get('database')
            backup_type = BackupType(schedule_info.get('type', 'full'))
            cron_expression = schedule_info.get('cron')
            
            if self._should_run_backup(schedule_name, cron_expression, current_time):
                self.logger.info(f"Running scheduled backup: {schedule_name}")
                self.create_backup(database_name, backup_type)
    
    def _should_run_backup(self, schedule_name: str, cron_expression: str, current_time: datetime) -> bool:
        """判断是否应该运行备份(简化的cron实现)"""
        # 这里可以实现更复杂的cron表达式解析
        # 简化实现:每天凌晨2点运行
        if cron_expression == "0 2 * * *":
            return current_time.hour == 2 and current_time.minute == 0
        
        return False

# 使用示例
backup_config = {
    'neo4j_home': '/opt/neo4j',
    'backup_dir': '/opt/neo4j/backups',
    'bolt_uri': 'bolt://localhost:7687',
    'username': 'neo4j',
    'password': 'password',
    'compression': True,
    'parallel_backup': True,
    'max_concurrent_backups': 2,
    'retention_days': 30,
    'schedule': {
        'daily_full_backup': {
            'database': 'neo4j',
            'type': 'full',
            'cron': '0 2 * * *'  # 每天凌晨2点
        },
        'hourly_incremental': {
            'database': 'neo4j',
            'type': 'incremental',
            'cron': '0 * * * *'  # 每小时
        }
    }
}

backup_manager = Neo4jBackupManager(backup_config)

# 创建完整备份
full_backup_id = backup_manager.create_backup('neo4j', BackupType.FULL)
print(f"Created full backup job: {full_backup_id}")

# 等待备份完成
time.sleep(5)

# 检查备份状态
status = backup_manager.get_backup_status(full_backup_id)
print(f"Backup status: {status['status']}")

# 创建增量备份
incremental_backup_id = backup_manager.create_backup('neo4j', BackupType.INCREMENTAL)
print(f"Created incremental backup job: {incremental_backup_id}")

# 获取所有备份状态
all_status = backup_manager.get_backup_status()
print(f"\nTotal backup jobs: {all_status['total_jobs']}")
print(f"Completed jobs: {all_status['completed_jobs']}")
print(f"Running jobs: {all_status['running_jobs']}")

# 清理过期备份
deleted_count = backup_manager.cleanup_old_backups()
print(f"\nDeleted {deleted_count} expired backups")

# 恢复备份示例
if status['status'] == 'completed':
    restore_result = backup_manager.restore_backup(
        full_backup_id, 
        'restored_neo4j', 
        force=True
    )
    print(f"\nRestore result: {restore_result['message']}")

7.5 章节总结

核心知识点

  1. 集群架构

    • 因果集群(Causal Cluster)
    • 核心服务器和只读副本
    • Raft协议和一致性保证
  2. 集群部署

    • 配置文件生成和管理
    • Docker和Kubernetes部署
    • 网络和安全配置
  3. 负载均衡

    • 多种负载均衡策略
    • 健康检查和监控
    • 连接池管理
  4. 故障转移

    • 自动故障检测
    • 备份服务器激活
    • 恢复监控
  5. 备份恢复

    • 完整、增量、差异备份
    • 自动化备份调度
    • 备份压缩和清理

最佳实践

  1. 集群规划

    • 至少3个核心服务器保证高可用
    • 合理配置只读副本分担读负载
    • 考虑网络延迟和数据中心分布
  2. 监控告警

    • 实时监控集群健康状态
    • 设置关键指标告警
    • 定期检查备份完整性
  3. 容量规划

    • 预估数据增长和查询负载
    • 合理分配硬件资源
    • 制定扩容策略
  4. 安全配置

    • 启用SSL/TLS加密
    • 配置防火墙规则
    • 定期更新密码和证书

练习题

  1. 设计一个3节点Neo4j集群的部署方案,包括网络配置和安全设置。

  2. 实现一个自定义的负载均衡策略,根据查询类型选择最优服务器。

  3. 编写一个故障转移测试脚本,模拟节点故障并验证自动恢复。

  4. 设计一个备份策略,包括完整备份、增量备份的时间安排和保留策略。

  5. 实现一个集群监控仪表板,显示实时的集群状态和性能指标。

    def get_write_server(self) -> Optional[Dict[str, Any]]: “”“获取写入服务器”“” healthy_servers = [ server for server in self.write_servers if self.server_health[server[‘name’]].is_healthy ]

    if not healthy_servers:
        self