学习目标

通过本章学习,你将能够: - 理解 Nacos 配置管理的核心原理 - 掌握配置的增删改查操作 - 实现配置的动态推送和监听 - 配置多环境管理和版本控制 - 实现配置加密和权限控制

1. 配置管理核心概念

1.1 配置模型

from enum import Enum
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable, Any
import time
import json
import hashlib
import threading
import copy
from datetime import datetime

class ConfigType(Enum):
    """配置类型枚举"""
    TEXT = "text"                # 纯文本
    JSON = "json"                # JSON 格式
    XML = "xml"                  # XML 格式
    YAML = "yaml"                # YAML 格式
    PROPERTIES = "properties"    # Properties 格式
    HTML = "html"                # HTML 格式

class ConfigStatus(Enum):
    """配置状态枚举"""
    ACTIVE = "ACTIVE"            # 激活状态
    INACTIVE = "INACTIVE"        # 非激活状态
    DELETED = "DELETED"          # 已删除
    DRAFT = "DRAFT"              # 草稿状态

class ChangeType(Enum):
    """变更类型枚举"""
    CREATE = "CREATE"            # 创建
    UPDATE = "UPDATE"            # 更新
    DELETE = "DELETE"            # 删除
    ROLLBACK = "ROLLBACK"        # 回滚

@dataclass
class ConfigInfo:
    """配置信息数据类"""
    data_id: str                 # 配置 ID
    group: str                   # 配置分组
    namespace_id: str = "public" # 命名空间
    content: str = ""            # 配置内容
    config_type: ConfigType = ConfigType.TEXT
    description: str = ""        # 配置描述
    tags: List[str] = field(default_factory=list)
    app_name: str = ""           # 应用名称
    status: ConfigStatus = ConfigStatus.ACTIVE
    create_time: float = field(default_factory=time.time)
    update_time: float = field(default_factory=time.time)
    create_user: str = "system"
    update_user: str = "system"
    md5: str = field(init=False)
    version: int = 1
    
    def __post_init__(self):
        self.md5 = self.calculate_md5()
    
    def calculate_md5(self) -> str:
        """计算配置内容的 MD5"""
        return hashlib.md5(self.content.encode('utf-8')).hexdigest()
    
    def update_content(self, new_content: str, user: str = "system"):
        """更新配置内容"""
        self.content = new_content
        self.md5 = self.calculate_md5()
        self.update_time = time.time()
        self.update_user = user
        self.version += 1

@dataclass
class ConfigHistory:
    """配置历史记录"""
    id: str
    data_id: str
    group: str
    namespace_id: str
    content: str
    md5: str
    change_type: ChangeType
    create_time: float
    create_user: str
    version: int
    description: str = ""

@dataclass
class ConfigListener:
    """配置监听器"""
    data_id: str
    group: str
    namespace_id: str
    callback: Callable[[ConfigInfo], None]
    md5: str = ""
    last_check_time: float = field(default_factory=time.time)

class ConfigManager:
    """配置管理器"""
    
    def __init__(self):
        self.configs: Dict[str, ConfigInfo] = {}
        self.histories: Dict[str, List[ConfigHistory]] = {}
        self.listeners: List[ConfigListener] = []
        self.encrypted_keys: Dict[str, str] = {}  # 加密密钥
        self._lock = threading.RLock()
        self._listener_thread = None
        self._start_listener_thread()
    
    def publish_config(self, config_info: ConfigInfo) -> bool:
        """发布配置"""
        config_key = self._get_config_key(config_info.data_id, 
                                         config_info.group, 
                                         config_info.namespace_id)
        
        with self._lock:
            # 检查是否存在
            existing_config = self.configs.get(config_key)
            change_type = ChangeType.UPDATE if existing_config else ChangeType.CREATE
            
            # 保存历史记录
            if existing_config:
                self._save_history(existing_config, change_type)
            
            # 更新配置
            config_info.update_time = time.time()
            self.configs[config_key] = copy.deepcopy(config_info)
            
            # 保存当前版本到历史
            self._save_history(config_info, change_type)
            
            # 通知监听器
            self._notify_listeners(config_info)
            
            print(f"✅ 配置发布成功: {config_key} (版本: {config_info.version})")
            return True
    
    def get_config(self, data_id: str, group: str, 
                  namespace_id: str = "public") -> Optional[ConfigInfo]:
        """获取配置"""
        config_key = self._get_config_key(data_id, group, namespace_id)
        
        with self._lock:
            config = self.configs.get(config_key)
            if config and config.status == ConfigStatus.ACTIVE:
                print(f"📖 获取配置: {config_key}")
                return copy.deepcopy(config)
            
            print(f"❌ 配置不存在: {config_key}")
            return None
    
    def delete_config(self, data_id: str, group: str, 
                     namespace_id: str = "public", user: str = "system") -> bool:
        """删除配置"""
        config_key = self._get_config_key(data_id, group, namespace_id)
        
        with self._lock:
            config = self.configs.get(config_key)
            if not config:
                print(f"❌ 配置不存在,无法删除: {config_key}")
                return False
            
            # 保存删除历史
            self._save_history(config, ChangeType.DELETE, user)
            
            # 标记为已删除
            config.status = ConfigStatus.DELETED
            config.update_time = time.time()
            config.update_user = user
            
            # 通知监听器
            self._notify_listeners(config)
            
            print(f"✅ 配置删除成功: {config_key}")
            return True
    
    def list_configs(self, namespace_id: str = "public", 
                    group: str = None, 
                    data_id_pattern: str = None) -> List[ConfigInfo]:
        """列出配置"""
        with self._lock:
            configs = []
            for config in self.configs.values():
                if config.status == ConfigStatus.DELETED:
                    continue
                
                if config.namespace_id != namespace_id:
                    continue
                
                if group and config.group != group:
                    continue
                
                if data_id_pattern and data_id_pattern not in config.data_id:
                    continue
                
                configs.append(copy.deepcopy(config))
            
            print(f"📋 列出配置: {len(configs)} 个")
            return configs
    
    def add_listener(self, data_id: str, group: str, 
                    callback: Callable[[ConfigInfo], None],
                    namespace_id: str = "public"):
        """添加配置监听器"""
        config = self.get_config(data_id, group, namespace_id)
        md5 = config.md5 if config else ""
        
        listener = ConfigListener(
            data_id=data_id,
            group=group,
            namespace_id=namespace_id,
            callback=callback,
            md5=md5
        )
        
        with self._lock:
            self.listeners.append(listener)
        
        print(f"👂 添加配置监听器: {self._get_config_key(data_id, group, namespace_id)}")
    
    def remove_listener(self, data_id: str, group: str, 
                       namespace_id: str = "public"):
        """移除配置监听器"""
        with self._lock:
            self.listeners = [
                listener for listener in self.listeners
                if not (listener.data_id == data_id and 
                       listener.group == group and 
                       listener.namespace_id == namespace_id)
            ]
        
        print(f"🔇 移除配置监听器: {self._get_config_key(data_id, group, namespace_id)}")
    
    def get_config_history(self, data_id: str, group: str, 
                          namespace_id: str = "public") -> List[ConfigHistory]:
        """获取配置历史"""
        config_key = self._get_config_key(data_id, group, namespace_id)
        
        with self._lock:
            histories = self.histories.get(config_key, [])
            print(f"📚 获取配置历史: {config_key} -> {len(histories)} 条记录")
            return copy.deepcopy(histories)
    
    def rollback_config(self, data_id: str, group: str, 
                       target_version: int,
                       namespace_id: str = "public", 
                       user: str = "system") -> bool:
        """回滚配置"""
        config_key = self._get_config_key(data_id, group, namespace_id)
        
        with self._lock:
            # 查找目标版本
            histories = self.histories.get(config_key, [])
            target_history = None
            
            for history in histories:
                if history.version == target_version:
                    target_history = history
                    break
            
            if not target_history:
                print(f"❌ 未找到目标版本: {target_version}")
                return False
            
            # 创建回滚配置
            current_config = self.configs.get(config_key)
            if not current_config:
                print(f"❌ 当前配置不存在: {config_key}")
                return False
            
            # 保存当前配置到历史
            self._save_history(current_config, ChangeType.ROLLBACK, user)
            
            # 回滚配置内容
            current_config.content = target_history.content
            current_config.md5 = target_history.md5
            current_config.update_time = time.time()
            current_config.update_user = user
            current_config.version += 1
            
            # 保存回滚后的配置到历史
            self._save_history(current_config, ChangeType.ROLLBACK, user)
            
            # 通知监听器
            self._notify_listeners(current_config)
            
            print(f"✅ 配置回滚成功: {config_key} -> 版本 {target_version}")
            return True
    
    def encrypt_config(self, data_id: str, group: str, 
                      namespace_id: str = "public", 
                      encryption_key: str = None) -> bool:
        """加密配置"""
        config_key = self._get_config_key(data_id, group, namespace_id)
        
        with self._lock:
            config = self.configs.get(config_key)
            if not config:
                print(f"❌ 配置不存在: {config_key}")
                return False
            
            if not encryption_key:
                encryption_key = self._generate_encryption_key()
            
            # 简化的加密实现(实际应使用 AES 等算法)
            encrypted_content = self._simple_encrypt(config.content, encryption_key)
            
            # 保存加密前的历史
            self._save_history(config, ChangeType.UPDATE)
            
            # 更新配置内容
            config.content = encrypted_content
            config.md5 = config.calculate_md5()
            config.update_time = time.time()
            
            # 保存加密密钥
            self.encrypted_keys[config_key] = encryption_key
            
            print(f"🔐 配置加密成功: {config_key}")
            return True
    
    def decrypt_config(self, data_id: str, group: str, 
                      namespace_id: str = "public") -> Optional[str]:
        """解密配置"""
        config_key = self._get_config_key(data_id, group, namespace_id)
        
        with self._lock:
            config = self.configs.get(config_key)
            if not config:
                return None
            
            encryption_key = self.encrypted_keys.get(config_key)
            if not encryption_key:
                # 配置未加密,直接返回
                return config.content
            
            # 解密配置内容
            decrypted_content = self._simple_decrypt(config.content, encryption_key)
            print(f"🔓 配置解密: {config_key}")
            return decrypted_content
    
    def _get_config_key(self, data_id: str, group: str, namespace_id: str) -> str:
        """生成配置键"""
        return f"{namespace_id}#{group}#{data_id}"
    
    def _save_history(self, config: ConfigInfo, change_type: ChangeType, 
                     user: str = None):
        """保存配置历史"""
        config_key = self._get_config_key(config.data_id, config.group, config.namespace_id)
        
        history = ConfigHistory(
            id=f"{config_key}#{int(time.time() * 1000)}",
            data_id=config.data_id,
            group=config.group,
            namespace_id=config.namespace_id,
            content=config.content,
            md5=config.md5,
            change_type=change_type,
            create_time=time.time(),
            create_user=user or config.update_user,
            version=config.version
        )
        
        if config_key not in self.histories:
            self.histories[config_key] = []
        
        self.histories[config_key].append(history)
        
        # 保留最近 100 条历史记录
        if len(self.histories[config_key]) > 100:
            self.histories[config_key] = self.histories[config_key][-100:]
    
    def _notify_listeners(self, config: ConfigInfo):
        """通知配置监听器"""
        config_key = self._get_config_key(config.data_id, config.group, config.namespace_id)
        
        for listener in self.listeners:
            if (listener.data_id == config.data_id and 
                listener.group == config.group and 
                listener.namespace_id == config.namespace_id):
                
                if listener.md5 != config.md5:
                    try:
                        listener.callback(config)
                        listener.md5 = config.md5
                        listener.last_check_time = time.time()
                        print(f"📢 通知监听器: {config_key}")
                    except Exception as e:
                        print(f"❌ 监听器回调失败: {config_key} - {e}")
    
    def _start_listener_thread(self):
        """启动监听器线程"""
        def listener_worker():
            while True:
                try:
                    with self._lock:
                        for listener in self.listeners:
                            config = self.get_config(listener.data_id, 
                                                   listener.group, 
                                                   listener.namespace_id)
                            if config and listener.md5 != config.md5:
                                listener.callback(config)
                                listener.md5 = config.md5
                                listener.last_check_time = time.time()
                    
                    time.sleep(5)  # 每 5 秒检查一次
                except Exception as e:
                    print(f"❌ 监听器线程异常: {e}")
                    time.sleep(5)
        
        self._listener_thread = threading.Thread(target=listener_worker, daemon=True)
        self._listener_thread.start()
        print("🎧 配置监听器线程已启动")
    
    def _generate_encryption_key(self) -> str:
        """生成加密密钥"""
        import secrets
        return secrets.token_hex(16)
    
    def _simple_encrypt(self, content: str, key: str) -> str:
        """简单加密(实际应使用 AES)"""
        # 这里只是示例,实际应使用专业的加密算法
        import base64
        encrypted = ""
        for i, char in enumerate(content):
            key_char = key[i % len(key)]
            encrypted_char = chr((ord(char) + ord(key_char)) % 256)
            encrypted += encrypted_char
        return base64.b64encode(encrypted.encode('latin-1')).decode('ascii')
    
    def _simple_decrypt(self, encrypted_content: str, key: str) -> str:
        """简单解密"""
        import base64
        try:
            encrypted = base64.b64decode(encrypted_content).decode('latin-1')
            decrypted = ""
            for i, char in enumerate(encrypted):
                key_char = key[i % len(key)]
                decrypted_char = chr((ord(char) - ord(key_char)) % 256)
                decrypted += decrypted_char
            return decrypted
        except Exception:
            return encrypted_content  # 解密失败,返回原内容
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        with self._lock:
            stats = {
                "total_configs": len([c for c in self.configs.values() 
                                     if c.status != ConfigStatus.DELETED]),
                "deleted_configs": len([c for c in self.configs.values() 
                                       if c.status == ConfigStatus.DELETED]),
                "total_listeners": len(self.listeners),
                "total_histories": sum(len(h) for h in self.histories.values()),
                "encrypted_configs": len(self.encrypted_keys),
                "namespaces": list(set(c.namespace_id for c in self.configs.values())),
                "groups": list(set(c.group for c in self.configs.values())),
                "config_types": {}
            }
            
            # 统计配置类型
            for config in self.configs.values():
                if config.status != ConfigStatus.DELETED:
                    config_type = config.config_type.value
                    stats["config_types"][config_type] = stats["config_types"].get(config_type, 0) + 1
            
            return stats

# 配置模板管理器
class ConfigTemplateManager:
    """配置模板管理器"""
    
    def __init__(self):
        self.templates = {
            "database": {
                "mysql": {
                    "type": ConfigType.PROPERTIES,
                    "content": """# MySQL 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/mydb
spring.datasource.username=root
spring.datasource.password=password
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

# 连接池配置
spring.datasource.hikari.maximum-pool-size=20
spring.datasource.hikari.minimum-idle=5
spring.datasource.hikari.connection-timeout=30000
spring.datasource.hikari.idle-timeout=600000
spring.datasource.hikari.max-lifetime=1800000"""
                },
                "redis": {
                    "type": ConfigType.PROPERTIES,
                    "content": """# Redis 配置
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.password=
spring.redis.database=0

# 连接池配置
spring.redis.lettuce.pool.max-active=8
spring.redis.lettuce.pool.max-idle=8
spring.redis.lettuce.pool.min-idle=0
spring.redis.lettuce.pool.max-wait=-1ms"""
                }
            },
            "logging": {
                "logback": {
                    "type": ConfigType.XML,
                    "content": """<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>logs/application.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>logs/application.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n</pattern>
        </encoder>
    </appender>
    
    <root level="INFO">
        <appender-ref ref="STDOUT" />
        <appender-ref ref="FILE" />
    </root>
</configuration>"""
                }
            },
            "microservice": {
                "gateway": {
                    "type": ConfigType.YAML,
                    "content": """# Spring Cloud Gateway 配置
spring:
  cloud:
    gateway:
      routes:
        - id: user-service
          uri: lb://user-service
          predicates:
            - Path=/api/users/**
          filters:
            - StripPrefix=1
        - id: order-service
          uri: lb://order-service
          predicates:
            - Path=/api/orders/**
          filters:
            - StripPrefix=1
      globalcors:
        cors-configurations:
          '[/**]':
            allowedOrigins: "*"
            allowedMethods:
              - GET
              - POST
              - PUT
              - DELETE
            allowedHeaders: "*"
            allowCredentials: true"""
                }
            }
        }
    
    def get_template(self, category: str, template_name: str) -> Optional[Dict[str, Any]]:
        """获取配置模板"""
        return self.templates.get(category, {}).get(template_name)
    
    def list_templates(self) -> Dict[str, List[str]]:
        """列出所有模板"""
        result = {}
        for category, templates in self.templates.items():
            result[category] = list(templates.keys())
        return result
    
    def create_config_from_template(self, category: str, template_name: str, 
                                   data_id: str, group: str, 
                                   namespace_id: str = "public",
                                   variables: Dict[str, str] = None) -> Optional[ConfigInfo]:
        """从模板创建配置"""
        template = self.get_template(category, template_name)
        if not template:
            return None
        
        content = template["content"]
        
        # 替换变量
        if variables:
            for key, value in variables.items():
                content = content.replace(f"${{{key}}}", value)
        
        config = ConfigInfo(
            data_id=data_id,
            group=group,
            namespace_id=namespace_id,
            content=content,
            config_type=template["type"],
            description=f"从模板 {category}/{template_name} 创建"
        )
        
        return config

# 使用示例
if __name__ == "__main__":
    # 创建配置管理器
    config_manager = ConfigManager()
    template_manager = ConfigTemplateManager()
    
    print("=== 配置管理示例 ===")
    
    # 1. 发布配置
    database_config = ConfigInfo(
        data_id="database.properties",
        group="DEFAULT_GROUP",
        namespace_id="dev",
        content="""spring.datasource.url=jdbc:mysql://localhost:3306/testdb
spring.datasource.username=root
spring.datasource.password=123456""",
        config_type=ConfigType.PROPERTIES,
        description="数据库配置"
    )
    
    config_manager.publish_config(database_config)
    
    # 2. 获取配置
    retrieved_config = config_manager.get_config("database.properties", "DEFAULT_GROUP", "dev")
    if retrieved_config:
        print(f"配置内容: {retrieved_config.content[:50]}...")
    
    # 3. 添加监听器
    def config_change_callback(config: ConfigInfo):
        print(f"🔔 配置变更通知: {config.data_id} (版本: {config.version})")
        print(f"   新内容: {config.content[:50]}...")
    
    config_manager.add_listener("database.properties", "DEFAULT_GROUP", 
                               config_change_callback, "dev")
    
    # 4. 更新配置
    time.sleep(1)
    database_config.update_content(
        """spring.datasource.url=jdbc:mysql://localhost:3306/proddb
spring.datasource.username=admin
spring.datasource.password=admin123
spring.datasource.hikari.maximum-pool-size=20""",
        "admin"
    )
    config_manager.publish_config(database_config)
    
    # 5. 查看历史记录
    time.sleep(1)
    histories = config_manager.get_config_history("database.properties", "DEFAULT_GROUP", "dev")
    print(f"\n配置历史记录: {len(histories)} 条")
    for history in histories[-3:]:  # 显示最近 3 条
        print(f"  版本 {history.version}: {history.change_type.value} by {history.create_user}")
    
    # 6. 使用模板创建配置
    print("\n=== 模板配置示例 ===")
    
    # 列出可用模板
    templates = template_manager.list_templates()
    print(f"可用模板: {templates}")
    
    # 从模板创建 Redis 配置
    redis_config = template_manager.create_config_from_template(
        "database", "redis", 
        "redis.properties", "DEFAULT_GROUP", "dev",
        variables={
            "host": "redis.example.com",
            "port": "6379",
            "password": "redis123"
        }
    )
    
    if redis_config:
        config_manager.publish_config(redis_config)
    
    # 7. 配置加密
    print("\n=== 配置加密示例 ===")
    config_manager.encrypt_config("database.properties", "DEFAULT_GROUP", "dev")
    
    # 解密配置
    decrypted_content = config_manager.decrypt_config("database.properties", "DEFAULT_GROUP", "dev")
    print(f"解密后内容: {decrypted_content[:50]}...")
    
    # 8. 统计信息
    print("\n=== 统计信息 ===")
    stats = config_manager.get_stats()
    print(f"总配置数: {stats['total_configs']}")
    print(f"监听器数: {stats['total_listeners']}")
    print(f"历史记录数: {stats['total_histories']}")
    print(f"加密配置数: {stats['encrypted_configs']}")
    print(f"配置类型分布: {stats['config_types']}")
    
    # 等待监听器处理
    time.sleep(2)

2. Java 客户端配置管理

2.1 基础配置操作

// Java 配置管理示例
import com.alibaba.nacos.api.NacosFactory;
import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.alibaba.nacos.api.exception.NacosException;

import java.util.Properties;
import java.util.concurrent.Executor;

public class ConfigManagementExample {
    
    private ConfigService configService;
    
    public ConfigManagementExample() throws NacosException {
        // 1. 配置 Nacos 连接
        Properties properties = new Properties();
        properties.setProperty("serverAddr", "127.0.0.1:8848");
        properties.setProperty("namespace", "dev");
        properties.setProperty("username", "nacos");
        properties.setProperty("password", "nacos");
        
        // 2. 创建配置服务
        this.configService = NacosFactory.createConfigService(properties);
    }
    
    public void publishConfig() throws NacosException {
        String dataId = "application.properties";
        String group = "DEFAULT_GROUP";
        String content = """# 应用配置
server.port=8080
spring.application.name=demo-service

# 数据库配置
spring.datasource.url=jdbc:mysql://localhost:3306/demo
spring.datasource.username=root
spring.datasource.password=123456

# Redis 配置
spring.redis.host=localhost
spring.redis.port=6379
spring.redis.database=0
""";
        
        boolean result = configService.publishConfig(dataId, group, content);
        System.out.println("发布配置结果: " + result);
    }
    
    public String getConfig() throws NacosException {
        String dataId = "application.properties";
        String group = "DEFAULT_GROUP";
        long timeoutMs = 5000;
        
        String content = configService.getConfig(dataId, group, timeoutMs);
        System.out.println("获取配置内容: \n" + content);
        return content;
    }
    
    public void addConfigListener() throws NacosException {
        String dataId = "application.properties";
        String group = "DEFAULT_GROUP";
        
        configService.addListener(dataId, group, new Listener() {
            @Override
            public Executor getExecutor() {
                return null; // 使用默认线程池
            }
            
            @Override
            public void receiveConfigInfo(String configInfo) {
                System.out.println("🔔 配置变更通知:");
                System.out.println("新配置内容: \n" + configInfo);
                
                // 处理配置变更逻辑
                handleConfigChange(configInfo);
            }
        });
        
        System.out.println("✅ 配置监听器添加成功");
    }
    
    private void handleConfigChange(String configInfo) {
        // 解析配置
        Properties props = new Properties();
        try {
            props.load(new StringReader(configInfo));
            
            // 更新应用配置
            String serverPort = props.getProperty("server.port");
            String appName = props.getProperty("spring.application.name");
            
            System.out.println("更新服务端口: " + serverPort);
            System.out.println("更新应用名称: " + appName);
            
            // 这里可以实现热更新逻辑
            // 例如:重新初始化数据源、更新缓存配置等
            
        } catch (Exception e) {
            System.err.println("处理配置变更失败: " + e.getMessage());
        }
    }
    
    public void removeConfig() throws NacosException {
        String dataId = "application.properties";
        String group = "DEFAULT_GROUP";
        
        boolean result = configService.removeConfig(dataId, group);
        System.out.println("删除配置结果: " + result);
    }
    
    public static void main(String[] args) throws Exception {
        ConfigManagementExample example = new ConfigManagementExample();
        
        // 发布配置
        example.publishConfig();
        
        // 获取配置
        example.getConfig();
        
        // 添加监听器
        example.addConfigListener();
        
        // 等待配置变更
        Thread.sleep(60000);
    }
}

2.2 Spring Boot 集成

// Spring Boot 配置类
@Configuration
@NacosPropertySource(dataId = "application.properties", autoRefreshed = true)
public class NacosConfig {
    
    @NacosValue(value = "${server.port:8080}", autoRefreshed = true)
    private int serverPort;
    
    @NacosValue(value = "${spring.application.name:demo}", autoRefreshed = true)
    private String applicationName;
    
    @NacosValue(value = "${spring.datasource.url}", autoRefreshed = true)
    private String datasourceUrl;
    
    // Getter 和 Setter
    public int getServerPort() {
        return serverPort;
    }
    
    public String getApplicationName() {
        return applicationName;
    }
    
    public String getDatasourceUrl() {
        return datasourceUrl;
    }
}

// 配置变更监听器
@Component
public class ConfigChangeListener {
    
    @NacosConfigListener(dataId = "application.properties")
    public void onConfigChange(String newContent) {
        System.out.println("🔔 配置发生变更:");
        System.out.println(newContent);
        
        // 处理配置变更
        handleConfigChange(newContent);
    }
    
    private void handleConfigChange(String configContent) {
        // 实现配置热更新逻辑
        try {
            Properties props = new Properties();
            props.load(new StringReader(configContent));
            
            // 更新数据源配置
            updateDataSourceConfig(props);
            
            // 更新缓存配置
            updateCacheConfig(props);
            
            // 发送配置变更事件
            publishConfigChangeEvent(props);
            
        } catch (Exception e) {
            log.error("处理配置变更失败", e);
        }
    }
    
    private void updateDataSourceConfig(Properties props) {
        // 更新数据源配置逻辑
    }
    
    private void updateCacheConfig(Properties props) {
        // 更新缓存配置逻辑
    }
    
    private void publishConfigChangeEvent(Properties props) {
        // 发布配置变更事件
    }
}

// 动态配置管理服务
@Service
public class DynamicConfigService {
    
    @Autowired
    private ConfigService configService;
    
    private final Map<String, Object> configCache = new ConcurrentHashMap<>();
    
    @PostConstruct
    public void init() {
        // 初始化配置缓存
        loadAllConfigs();
    }
    
    public <T> T getConfig(String key, Class<T> type, T defaultValue) {
        Object value = configCache.get(key);
        if (value != null && type.isInstance(value)) {
            return type.cast(value);
        }
        return defaultValue;
    }
    
    public void updateConfig(String dataId, String group, String content) {
        try {
            boolean result = configService.publishConfig(dataId, group, content);
            if (result) {
                log.info("配置更新成功: {}/{}", group, dataId);
            } else {
                log.error("配置更新失败: {}/{}", group, dataId);
            }
        } catch (NacosException e) {
            log.error("配置更新异常: {}/{}", group, dataId, e);
        }
    }
    
    private void loadAllConfigs() {
        // 加载所有配置到缓存
        try {
            String content = configService.getConfig("application.properties", 
                "DEFAULT_GROUP", 5000);
            if (content != null) {
                Properties props = new Properties();
                props.load(new StringReader(content));
                
                for (String key : props.stringPropertyNames()) {
                    configCache.put(key, props.getProperty(key));
                }
            }
        } catch (Exception e) {
            log.error("加载配置失败", e);
        }
    }
}

2.3 配置加密解密

// 配置加密工具类
@Component
public class ConfigEncryptionUtil {
    
    private static final String ALGORITHM = "AES";
    private static final String TRANSFORMATION = "AES/ECB/PKCS5Padding";
    private static final String CHARSET = "UTF-8";
    
    @Value("${nacos.config.encryption.key:defaultkey123456}")
    private String encryptionKey;
    
    public String encrypt(String plainText) {
        try {
            SecretKeySpec secretKey = new SecretKeySpec(
                encryptionKey.getBytes(CHARSET), ALGORITHM);
            
            Cipher cipher = Cipher.getInstance(TRANSFORMATION);
            cipher.init(Cipher.ENCRYPT_MODE, secretKey);
            
            byte[] encryptedBytes = cipher.doFinal(plainText.getBytes(CHARSET));
            return Base64.getEncoder().encodeToString(encryptedBytes);
            
        } catch (Exception e) {
            throw new RuntimeException("配置加密失败", e);
        }
    }
    
    public String decrypt(String encryptedText) {
        try {
            SecretKeySpec secretKey = new SecretKeySpec(
                encryptionKey.getBytes(CHARSET), ALGORITHM);
            
            Cipher cipher = Cipher.getInstance(TRANSFORMATION);
            cipher.init(Cipher.DECRYPT_MODE, secretKey);
            
            byte[] decryptedBytes = cipher.doFinal(
                Base64.getDecoder().decode(encryptedText));
            return new String(decryptedBytes, CHARSET);
            
        } catch (Exception e) {
            throw new RuntimeException("配置解密失败", e);
        }
    }
    
    public boolean isEncrypted(String text) {
        // 简单判断是否为加密文本(实际可以使用更复杂的标识)
        try {
            Base64.getDecoder().decode(text);
            return text.length() % 4 == 0 && text.matches("[A-Za-z0-9+/=]+");
        } catch (Exception e) {
            return false;
        }
    }
}

// 加密配置处理器
@Component
public class EncryptedConfigProcessor {
    
    @Autowired
    private ConfigEncryptionUtil encryptionUtil;
    
    @Autowired
    private ConfigService configService;
    
    public void publishEncryptedConfig(String dataId, String group, 
                                      String content, boolean encrypt) {
        try {
            String finalContent = encrypt ? encryptionUtil.encrypt(content) : content;
            
            boolean result = configService.publishConfig(dataId, group, finalContent);
            if (result) {
                log.info("加密配置发布成功: {}/{}", group, dataId);
            } else {
                log.error("加密配置发布失败: {}/{}", group, dataId);
            }
        } catch (Exception e) {
            log.error("发布加密配置异常", e);
        }
    }
    
    public String getDecryptedConfig(String dataId, String group) {
        try {
            String content = configService.getConfig(dataId, group, 5000);
            if (content != null && encryptionUtil.isEncrypted(content)) {
                return encryptionUtil.decrypt(content);
            }
            return content;
        } catch (Exception e) {
            log.error("获取解密配置异常", e);
            return null;
        }
    }
}

3. Go 客户端配置管理

3.1 基础配置操作

// Go 配置管理示例
package main

import (
    "fmt"
    "log"
    "time"
    "context"
    "sync"
    
    "github.com/nacos-group/nacos-sdk-go/clients"
    "github.com/nacos-group/nacos-sdk-go/clients/config_client"
    "github.com/nacos-group/nacos-sdk-go/common/constant"
    "github.com/nacos-group/nacos-sdk-go/vo"
)

type ConfigManager struct {
    client      config_client.IConfigClient
    listeners   map[string]context.CancelFunc
    configCache sync.Map
    mutex       sync.RWMutex
}

func NewConfigManager() (*ConfigManager, error) {
    // 1. 创建服务端配置
    serverConfigs := []constant.ServerConfig{
        {
            IpAddr: "127.0.0.1",
            Port:   8848,
        },
    }
    
    // 2. 创建客户端配置
    clientConfig := constant.ClientConfig{
        NamespaceId:         "dev",
        TimeoutMs:           5000,
        NotLoadCacheAtStart: true,
        LogDir:              "/tmp/nacos/log",
        CacheDir:            "/tmp/nacos/cache",
        LogLevel:            "info",
        Username:            "nacos",
        Password:            "nacos",
    }
    
    // 3. 创建配置客户端
    configClient, err := clients.CreateConfigClient(map[string]interface{}{
        "serverConfigs": serverConfigs,
        "clientConfig":  clientConfig,
    })
    
    if err != nil {
        return nil, fmt.Errorf("创建配置客户端失败: %v", err)
    }
    
    return &ConfigManager{
        client:    configClient,
        listeners: make(map[string]context.CancelFunc),
    }, nil
}

func (cm *ConfigManager) PublishConfig(dataId, group, content string) error {
    success, err := cm.client.PublishConfig(vo.ConfigParam{
        DataId:  dataId,
        Group:   group,
        Content: content,
    })
    
    if err != nil {
        return fmt.Errorf("发布配置失败: %v", err)
    }
    
    if !success {
        return fmt.Errorf("发布配置失败: 返回 false")
    }
    
    // 更新缓存
    cacheKey := fmt.Sprintf("%s#%s", group, dataId)
    cm.configCache.Store(cacheKey, content)
    
    log.Printf("✅ 配置发布成功: %s/%s", group, dataId)
    return nil
}

func (cm *ConfigManager) GetConfig(dataId, group string) (string, error) {
    content, err := cm.client.GetConfig(vo.ConfigParam{
        DataId: dataId,
        Group:  group,
    })
    
    if err != nil {
        // 尝试从缓存获取
        cacheKey := fmt.Sprintf("%s#%s", group, dataId)
        if cached, ok := cm.configCache.Load(cacheKey); ok {
            log.Printf("📖 从缓存获取配置: %s/%s", group, dataId)
            return cached.(string), nil
        }
        return "", fmt.Errorf("获取配置失败: %v", err)
    }
    
    // 更新缓存
    cacheKey := fmt.Sprintf("%s#%s", group, dataId)
    cm.configCache.Store(cacheKey, content)
    
    log.Printf("📖 获取配置成功: %s/%s", group, dataId)
    return content, nil
}

func (cm *ConfigManager) DeleteConfig(dataId, group string) error {
    success, err := cm.client.DeleteConfig(vo.ConfigParam{
        DataId: dataId,
        Group:  group,
    })
    
    if err != nil {
        return fmt.Errorf("删除配置失败: %v", err)
    }
    
    if !success {
        return fmt.Errorf("删除配置失败: 返回 false")
    }
    
    // 清除缓存
    cacheKey := fmt.Sprintf("%s#%s", group, dataId)
    cm.configCache.Delete(cacheKey)
    
    log.Printf("✅ 配置删除成功: %s/%s", group, dataId)
    return nil
}

func (cm *ConfigManager) ListenConfig(dataId, group string, 
    callback func(namespace, group, dataId, data string)) error {
    
    listenerKey := fmt.Sprintf("%s#%s", group, dataId)
    
    // 如果已经存在监听器,先取消
    if cancel, exists := cm.listeners[listenerKey]; exists {
        cancel()
    }
    
    ctx, cancel := context.WithCancel(context.Background())
    cm.listeners[listenerKey] = cancel
    
    err := cm.client.ListenConfig(vo.ConfigParam{
        DataId: dataId,
        Group:  group,
        OnChange: func(namespace, group, dataId, data string) {
            log.Printf("🔔 配置变更通知: %s/%s", group, dataId)
            
            // 更新缓存
            cacheKey := fmt.Sprintf("%s#%s", group, dataId)
            cm.configCache.Store(cacheKey, data)
            
            // 调用回调函数
            if callback != nil {
                callback(namespace, group, dataId, data)
            }
        },
    })
    
    if err != nil {
        delete(cm.listeners, listenerKey)
        return fmt.Errorf("添加配置监听器失败: %v", err)
    }
    
    log.Printf("👂 配置监听器添加成功: %s/%s", group, dataId)
    return nil
}

func (cm *ConfigManager) CancelListen(dataId, group string) {
    listenerKey := fmt.Sprintf("%s#%s", group, dataId)
    
    if cancel, exists := cm.listeners[listenerKey]; exists {
        cancel()
        delete(cm.listeners, listenerKey)
        
        // 取消监听
        cm.client.CancelListenConfig(vo.ConfigParam{
            DataId: dataId,
            Group:  group,
        })
        
        log.Printf("🔇 取消配置监听: %s/%s", group, dataId)
    }
}

// 配置模板
type ConfigTemplate struct {
    Name        string
    Description string
    Content     string
    Type        string
}

var configTemplates = map[string]ConfigTemplate{
    "database-mysql": {
        Name:        "MySQL 数据库配置",
        Description: "MySQL 数据库连接配置模板",
        Type:        "properties",
        Content: `# MySQL 数据库配置
db.host=localhost
db.port=3306
db.name=mydb
db.username=root
db.password=password

# 连接池配置
db.pool.maxActive=20
db.pool.maxIdle=10
db.pool.minIdle=5
db.pool.maxWait=60000`,
    },
    "redis-cache": {
        Name:        "Redis 缓存配置",
        Description: "Redis 缓存服务配置模板",
        Type:        "properties",
        Content: `# Redis 配置
redis.host=localhost
redis.port=6379
redis.password=
redis.database=0
redis.timeout=5000

# 连接池配置
redis.pool.maxActive=100
redis.pool.maxIdle=20
redis.pool.minIdle=5
redis.pool.maxWait=3000`,
    },
    "logging-config": {
        Name:        "日志配置",
        Description: "应用日志配置模板",
        Type:        "properties",
        Content: `# 日志配置
log.level=INFO
log.file.path=./logs/app.log
log.file.maxSize=100MB
log.file.maxHistory=30

# 控制台输出
log.console.enabled=true
log.console.pattern=%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n`,
    },
}

func (cm *ConfigManager) CreateConfigFromTemplate(templateName, dataId, group string, 
    variables map[string]string) error {
    
    template, exists := configTemplates[templateName]
    if !exists {
        return fmt.Errorf("模板不存在: %s", templateName)
    }
    
    content := template.Content
    
    // 替换变量
    for key, value := range variables {
        placeholder := fmt.Sprintf("{{%s}}", key)
        content = strings.ReplaceAll(content, placeholder, value)
    }
    
    return cm.PublishConfig(dataId, group, content)
}

func (cm *ConfigManager) ListTemplates() map[string]ConfigTemplate {
    return configTemplates
}

// 配置热更新处理器
type ConfigHotReloader struct {
    configManager *ConfigManager
    handlers      map[string]func(string)
    mutex         sync.RWMutex
}

func NewConfigHotReloader(cm *ConfigManager) *ConfigHotReloader {
    return &ConfigHotReloader{
        configManager: cm,
        handlers:      make(map[string]func(string)),
    }
}

func (chr *ConfigHotReloader) RegisterHandler(configKey string, handler func(string)) {
    chr.mutex.Lock()
    defer chr.mutex.Unlock()
    
    chr.handlers[configKey] = handler
    log.Printf("🔧 注册配置处理器: %s", configKey)
}

func (chr *ConfigHotReloader) StartWatching(dataId, group string) error {
    configKey := fmt.Sprintf("%s#%s", group, dataId)
    
    return chr.configManager.ListenConfig(dataId, group, 
        func(namespace, group, dataId, data string) {
            chr.mutex.RLock()
            handler, exists := chr.handlers[configKey]
            chr.mutex.RUnlock()
            
            if exists {
                log.Printf("🔄 执行配置热更新: %s", configKey)
                handler(data)
            }
        })
}

func main() {
    // 创建配置管理器
    configManager, err := NewConfigManager()
    if err != nil {
        log.Fatalf("创建配置管理器失败: %v", err)
    }
    
    // 创建热更新处理器
    hotReloader := NewConfigHotReloader(configManager)
    
    fmt.Println("=== Go 配置管理示例 ===")
    
    // 1. 发布配置
    appConfig := `# 应用配置
app.name=demo-service
app.version=1.0.0
app.port=8080

# 数据库配置
db.host=localhost
db.port=3306
db.name=demo
db.username=root
db.password=123456`
    
    err = configManager.PublishConfig("application.properties", "DEFAULT_GROUP", appConfig)
    if err != nil {
        log.Printf("发布配置失败: %v", err)
    }
    
    // 2. 获取配置
    content, err := configManager.GetConfig("application.properties", "DEFAULT_GROUP")
    if err != nil {
        log.Printf("获取配置失败: %v", err)
    } else {
        fmt.Printf("配置内容:\n%s\n", content)
    }
    
    // 3. 注册配置变更处理器
    hotReloader.RegisterHandler("DEFAULT_GROUP#application.properties", 
        func(newContent string) {
            fmt.Println("🔔 配置发生变更:")
            fmt.Printf("新配置内容:\n%s\n", newContent)
            
            // 这里可以实现配置热更新逻辑
            // 例如:重新加载数据库连接、更新服务端口等
        })
    
    // 4. 开始监听配置变更
    err = hotReloader.StartWatching("application.properties", "DEFAULT_GROUP")
    if err != nil {
        log.Printf("开始监听配置失败: %v", err)
    }
    
    // 5. 使用模板创建配置
    fmt.Println("\n=== 模板配置示例 ===")
    
    templates := configManager.ListTemplates()
    fmt.Printf("可用模板: %v\n", func() []string {
        var names []string
        for name := range templates {
            names = append(names, name)
        }
        return names
    }())
    
    // 从模板创建 MySQL 配置
    variables := map[string]string{
        "host":     "mysql.example.com",
        "port":     "3306",
        "database": "production",
        "username": "admin",
        "password": "admin123",
    }
    
    err = configManager.CreateConfigFromTemplate("database-mysql", 
        "mysql.properties", "DEFAULT_GROUP", variables)
    if err != nil {
        log.Printf("从模板创建配置失败: %v", err)
    }
    
    // 等待配置变更通知
    fmt.Println("\n等待配置变更通知...")
    time.Sleep(30 * time.Second)
    
    // 6. 清理资源
    configManager.CancelListen("application.properties", "DEFAULT_GROUP")
    fmt.Println("\n程序结束")
}

4. 多环境配置管理

4.1 环境隔离策略

# 多环境配置管理
class EnvironmentManager:
    """环境管理器"""
    
    def __init__(self, config_manager: ConfigManager):
        self.config_manager = config_manager
        self.environments = {
            "dev": "开发环境",
            "test": "测试环境", 
            "staging": "预发布环境",
            "prod": "生产环境"
        }
        self.env_configs = {}
    
    def deploy_config_to_env(self, env: str, config_info: ConfigInfo) -> bool:
        """部署配置到指定环境"""
        if env not in self.environments:
            print(f"❌ 不支持的环境: {env}")
            return False
        
        # 创建环境特定的配置
        env_config = copy.deepcopy(config_info)
        env_config.namespace_id = env
        
        # 根据环境调整配置内容
        env_config.content = self._adjust_config_for_env(config_info.content, env)
        
        # 发布到指定环境
        success = self.config_manager.publish_config(env_config)
        
        if success:
            # 记录环境配置
            env_key = f"{env}#{config_info.group}#{config_info.data_id}"
            self.env_configs[env_key] = env_config
            print(f"✅ 配置部署到 {self.environments[env]} 成功")
        
        return success
    
    def _adjust_config_for_env(self, content: str, env: str) -> str:
        """根据环境调整配置内容"""
        # 环境特定的配置替换规则
        env_replacements = {
            "dev": {
                "localhost": "dev-db.example.com",
                "3306": "3306",
                "mydb": "dev_mydb",
                "INFO": "DEBUG"
            },
            "test": {
                "localhost": "test-db.example.com",
                "3306": "3306", 
                "mydb": "test_mydb",
                "INFO": "INFO"
            },
            "staging": {
                "localhost": "staging-db.example.com",
                "3306": "3306",
                "mydb": "staging_mydb",
                "INFO": "WARN"
            },
            "prod": {
                "localhost": "prod-db.example.com",
                "3306": "3306",
                "mydb": "prod_mydb", 
                "INFO": "ERROR"
            }
        }
        
        replacements = env_replacements.get(env, {})
        adjusted_content = content
        
        for old_value, new_value in replacements.items():
            adjusted_content = adjusted_content.replace(old_value, new_value)
        
        return adjusted_content
    
    def sync_config_across_envs(self, source_env: str, target_envs: List[str], 
                               data_id: str, group: str) -> Dict[str, bool]:
        """跨环境同步配置"""
        results = {}
        
        # 获取源环境配置
        source_config = self.config_manager.get_config(data_id, group, source_env)
        if not source_config:
            print(f"❌ 源环境配置不存在: {source_env}/{group}/{data_id}")
            return results
        
        # 同步到目标环境
        for target_env in target_envs:
            if target_env == source_env:
                continue
            
            success = self.deploy_config_to_env(target_env, source_config)
            results[target_env] = success
            
            if success:
                print(f"✅ 配置同步成功: {source_env} -> {target_env}")
            else:
                print(f"❌ 配置同步失败: {source_env} -> {target_env}")
        
        return results
    
    def compare_env_configs(self, env1: str, env2: str, 
                           data_id: str, group: str) -> Dict[str, Any]:
        """比较环境间配置差异"""
        config1 = self.config_manager.get_config(data_id, group, env1)
        config2 = self.config_manager.get_config(data_id, group, env2)
        
        if not config1 or not config2:
            return {"error": "配置不存在"}
        
        # 简单的差异比较
        lines1 = config1.content.split('\n')
        lines2 = config2.content.split('\n')
        
        differences = []
        max_lines = max(len(lines1), len(lines2))
        
        for i in range(max_lines):
            line1 = lines1[i] if i < len(lines1) else ""
            line2 = lines2[i] if i < len(lines2) else ""
            
            if line1 != line2:
                differences.append({
                    "line": i + 1,
                    env1: line1,
                    env2: line2
                })
        
        return {
            "env1": env1,
            "env2": env2,
            "differences": differences,
            "total_differences": len(differences)
        }
    
    def get_env_summary(self) -> Dict[str, Any]:
        """获取环境配置摘要"""
        summary = {}
        
        for env in self.environments:
            configs = self.config_manager.list_configs(env)
            summary[env] = {
                "name": self.environments[env],
                "config_count": len(configs),
                "config_types": {},
                "groups": set()
            }
            
            for config in configs:
                # 统计配置类型
                config_type = config.config_type.value
                summary[env]["config_types"][config_type] = \
                    summary[env]["config_types"].get(config_type, 0) + 1
                
                # 收集分组
                summary[env]["groups"].add(config.group)
            
            summary[env]["groups"] = list(summary[env]["groups"])
        
        return summary

# 使用示例
if __name__ == "__main__":
    config_manager = ConfigManager()
    env_manager = EnvironmentManager(config_manager)
    
    print("=== 多环境配置管理示例 ===")
    
    # 创建基础配置
    base_config = ConfigInfo(
        data_id="application.properties",
        group="DEFAULT_GROUP",
        content="""# 应用配置
app.name=demo-service
app.port=8080

# 数据库配置
db.host=localhost
db.port=3306
db.name=mydb
db.username=root
db.password=123456

# 日志配置
log.level=INFO
log.file=/var/log/app.log""",
        config_type=ConfigType.PROPERTIES
    )
    
    # 部署到各个环境
    environments = ["dev", "test", "staging", "prod"]
    for env in environments:
        env_manager.deploy_config_to_env(env, base_config)
    
    # 比较环境配置差异
    print("\n=== 环境配置差异比较 ===")
    diff_result = env_manager.compare_env_configs("dev", "prod", 
        "application.properties", "DEFAULT_GROUP")
    
    if "differences" in diff_result:
        print(f"发现 {diff_result['total_differences']} 处差异:")
        for diff in diff_result["differences"][:5]:  # 显示前5个差异
            print(f"  行 {diff['line']}: {diff['dev']} -> {diff['prod']}")
    
    # 获取环境摘要
    print("\n=== 环境配置摘要 ===")
    summary = env_manager.get_env_summary()
    for env, info in summary.items():
        print(f"{info['name']}: {info['config_count']} 个配置")
        print(f"  配置类型: {info['config_types']}")
        print(f"  配置分组: {info['groups']}")

5. 配置版本控制与回滚

5.1 版本管理策略

# 配置版本控制管理器
class ConfigVersionManager:
    """配置版本控制管理器"""
    
    def __init__(self, config_manager: ConfigManager):
        self.config_manager = config_manager
        self.version_tags = {}  # 版本标签
        self.release_notes = {}  # 发布说明
    
    def create_version_tag(self, data_id: str, group: str, namespace_id: str,
                          tag: str, description: str = "") -> bool:
        """创建版本标签"""
        config = self.config_manager.get_config(data_id, group, namespace_id)
        if not config:
            print(f"❌ 配置不存在: {namespace_id}/{group}/{data_id}")
            return False
        
        config_key = self.config_manager._get_config_key(data_id, group, namespace_id)
        
        if config_key not in self.version_tags:
            self.version_tags[config_key] = {}
        
        self.version_tags[config_key][tag] = {
            "version": config.version,
            "md5": config.md5,
            "content": config.content,
            "create_time": time.time(),
            "description": description
        }
        
        print(f"✅ 版本标签创建成功: {tag} (版本: {config.version})")
        return True
    
    def rollback_to_tag(self, data_id: str, group: str, namespace_id: str,
                       tag: str, user: str = "system") -> bool:
        """回滚到指定标签版本"""
        config_key = self.config_manager._get_config_key(data_id, group, namespace_id)
        
        if config_key not in self.version_tags:
            print(f"❌ 没有版本标签记录: {config_key}")
            return False
        
        if tag not in self.version_tags[config_key]:
            print(f"❌ 版本标签不存在: {tag}")
            return False
        
        tag_info = self.version_tags[config_key][tag]
        
        # 获取当前配置
        current_config = self.config_manager.get_config(data_id, group, namespace_id)
        if not current_config:
            print(f"❌ 当前配置不存在: {config_key}")
            return False
        
        # 回滚配置内容
        current_config.content = tag_info["content"]
        current_config.update_time = time.time()
        current_config.update_user = user
        current_config.version += 1
        current_config.md5 = current_config.calculate_md5()
        
        # 发布回滚后的配置
        success = self.config_manager.publish_config(current_config)
        
        if success:
            print(f"✅ 回滚到标签版本成功: {tag}")
            
            # 记录回滚操作
            self.add_release_note(data_id, group, namespace_id, 
                f"回滚到标签版本: {tag}", user)
        
        return success
    
    def list_version_tags(self, data_id: str, group: str, 
                         namespace_id: str) -> List[Dict[str, Any]]:
        """列出版本标签"""
        config_key = self.config_manager._get_config_key(data_id, group, namespace_id)
        
        if config_key not in self.version_tags:
            return []
        
        tags = []
        for tag, info in self.version_tags[config_key].items():
            tags.append({
                "tag": tag,
                "version": info["version"],
                "md5": info["md5"],
                "create_time": info["create_time"],
                "description": info["description"]
            })
        
        # 按创建时间排序
        tags.sort(key=lambda x: x["create_time"], reverse=True)
        return tags
    
    def add_release_note(self, data_id: str, group: str, namespace_id: str,
                        note: str, user: str = "system"):
        """添加发布说明"""
        config_key = self.config_manager._get_config_key(data_id, group, namespace_id)
        
        if config_key not in self.release_notes:
            self.release_notes[config_key] = []
        
        self.release_notes[config_key].append({
            "note": note,
            "user": user,
            "time": time.time()
        })
        
        # 保留最近 50 条记录
        if len(self.release_notes[config_key]) > 50:
            self.release_notes[config_key] = self.release_notes[config_key][-50:]
    
    def get_release_notes(self, data_id: str, group: str, 
                         namespace_id: str) -> List[Dict[str, Any]]:
        """获取发布说明"""
        config_key = self.config_manager._get_config_key(data_id, group, namespace_id)
        return self.release_notes.get(config_key, [])
    
    def create_release(self, data_id: str, group: str, namespace_id: str,
                      version_name: str, release_note: str, 
                      user: str = "system") -> bool:
        """创建发布版本"""
        # 创建版本标签
        success = self.create_version_tag(data_id, group, namespace_id, 
                                        version_name, f"发布版本: {version_name}")
        
        if success:
            # 添加发布说明
            self.add_release_note(data_id, group, namespace_id, 
                                f"发布版本 {version_name}: {release_note}", user)
            print(f"✅ 发布版本创建成功: {version_name}")
        
        return success

# 使用示例
if __name__ == "__main__":
    config_manager = ConfigManager()
    version_manager = ConfigVersionManager(config_manager)
    
    print("=== 配置版本控制示例 ===")
    
    # 发布初始配置
    config = ConfigInfo(
        data_id="app.properties",
        group="DEFAULT_GROUP",
        namespace_id="prod",
        content="app.version=1.0.0\napp.debug=false",
        config_type=ConfigType.PROPERTIES
    )
    config_manager.publish_config(config)
    
    # 创建 v1.0.0 版本标签
    version_manager.create_version_tag("app.properties", "DEFAULT_GROUP", "prod",
                                      "v1.0.0", "初始发布版本")
    
    # 更新配置
    time.sleep(1)
    config.update_content("app.version=1.1.0\napp.debug=false\napp.feature.new=true")
    config_manager.publish_config(config)
    
    # 创建 v1.1.0 版本
    version_manager.create_release("app.properties", "DEFAULT_GROUP", "prod",
                                  "v1.1.0", "添加新功能特性")
    
    # 列出版本标签
    tags = version_manager.list_version_tags("app.properties", "DEFAULT_GROUP", "prod")
    print(f"\n版本标签列表: {len(tags)} 个")
    for tag in tags:
        print(f"  {tag['tag']}: 版本 {tag['version']} ({tag['description']})")
    
    # 回滚到 v1.0.0
    print("\n回滚到 v1.0.0...")
    version_manager.rollback_to_tag("app.properties", "DEFAULT_GROUP", "prod", "v1.0.0")
    
    # 查看发布说明
    notes = version_manager.get_release_notes("app.properties", "DEFAULT_GROUP", "prod")
    print(f"\n发布说明: {len(notes)} 条")
    for note in notes[-3:]:  # 显示最近3条
        print(f"  {note['user']}: {note['note']}")

6. 核心要点

6.1 配置管理最佳实践

  1. 配置分层管理

    • 按环境分离配置(dev/test/prod)
    • 按功能模块组织配置
    • 使用命名空间隔离不同应用
  2. 配置安全

    • 敏感信息加密存储
    • 配置访问权限控制
    • 配置变更审计日志
  3. 配置版本控制

    • 配置变更历史记录
    • 版本标签管理
    • 快速回滚机制
  4. 配置监听与热更新

    • 实时配置变更通知
    • 应用配置热更新
    • 配置变更影响评估

6.2 性能优化建议

  1. 客户端缓存

    • 本地配置缓存
    • 缓存失效策略
    • 离线配置支持
  2. 网络优化

    • 长连接复用
    • 配置压缩传输
    • 批量配置获取
  3. 监听器优化

    • 合理的监听器数量
    • 异步配置处理
    • 监听器异常处理

7. 下一步学习

在下一章节中,我们将学习: - Nacos 集群部署与高可用 - 配置中心的性能调优 - 与 Spring Cloud 的深度集成 - 配置中心的监控与运维