7.1 训练策略概述

7.1.1 训练流程设计

Text2SQL模型训练是一个复杂的过程,需要考虑多个方面:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from transformers import AutoTokenizer, AutoModel
from typing import Dict, List, Tuple, Optional
import wandb
import logging
from tqdm import tqdm

class Text2SQLTrainer:
    """Text2SQL模型训练器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        
        # 初始化日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
        
        # 初始化wandb(可选)
        if config.get('use_wandb', False):
            wandb.init(
                project=config.get('project_name', 'text2sql'),
                config=config
            )
        
        # 训练状态
        self.global_step = 0
        self.best_score = 0.0
        self.patience_counter = 0
    
    def setup_model(self, model_class, model_config: Dict):
        """设置模型"""
        self.model = model_class(**model_config)
        self.model.to(self.device)
        
        # 模型参数统计
        total_params = sum(p.numel() for p in self.model.parameters())
        trainable_params = sum(p.numel() for p in self.model.parameters() if p.requires_grad)
        
        self.logger.info(f"总参数量: {total_params:,}")
        self.logger.info(f"可训练参数量: {trainable_params:,}")
        
        return self.model
    
    def setup_optimizer(self, optimizer_config: Dict):
        """设置优化器"""
        optimizer_type = optimizer_config.get('type', 'AdamW')
        learning_rate = optimizer_config.get('learning_rate', 5e-5)
        weight_decay = optimizer_config.get('weight_decay', 0.01)
        
        if optimizer_type == 'AdamW':
            self.optimizer = optim.AdamW(
                self.model.parameters(),
                lr=learning_rate,
                weight_decay=weight_decay,
                betas=optimizer_config.get('betas', (0.9, 0.999)),
                eps=optimizer_config.get('eps', 1e-8)
            )
        elif optimizer_type == 'Adam':
            self.optimizer = optim.Adam(
                self.model.parameters(),
                lr=learning_rate,
                weight_decay=weight_decay
            )
        else:
            raise ValueError(f"不支持的优化器类型: {optimizer_type}")
        
        return self.optimizer
    
    def setup_scheduler(self, scheduler_config: Dict, num_training_steps: int):
        """设置学习率调度器"""
        scheduler_type = scheduler_config.get('type', 'linear')
        
        if scheduler_type == 'linear':
            from transformers import get_linear_schedule_with_warmup
            num_warmup_steps = int(num_training_steps * scheduler_config.get('warmup_ratio', 0.1))
            self.scheduler = get_linear_schedule_with_warmup(
                self.optimizer,
                num_warmup_steps=num_warmup_steps,
                num_training_steps=num_training_steps
            )
        elif scheduler_type == 'cosine':
            from transformers import get_cosine_schedule_with_warmup
            num_warmup_steps = int(num_training_steps * scheduler_config.get('warmup_ratio', 0.1))
            self.scheduler = get_cosine_schedule_with_warmup(
                self.optimizer,
                num_warmup_steps=num_warmup_steps,
                num_training_steps=num_training_steps
            )
        elif scheduler_type == 'step':
            self.scheduler = optim.lr_scheduler.StepLR(
                self.optimizer,
                step_size=scheduler_config.get('step_size', 1000),
                gamma=scheduler_config.get('gamma', 0.9)
            )
        else:
            self.scheduler = None
        
        return self.scheduler
    
    def compute_loss(self, outputs: Dict, labels: torch.Tensor) -> torch.Tensor:
        """计算损失"""
        loss_type = self.config.get('loss_type', 'cross_entropy')
        
        if loss_type == 'cross_entropy':
            logits = outputs['logits']
            loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
            
            # 重塑张量以适应损失函数
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            
            loss = loss_fct(
                shift_logits.view(-1, shift_logits.size(-1)),
                shift_labels.view(-1)
            )
        
        elif loss_type == 'focal':
            # Focal Loss实现
            logits = outputs['logits']
            alpha = self.config.get('focal_alpha', 1.0)
            gamma = self.config.get('focal_gamma', 2.0)
            
            ce_loss = nn.CrossEntropyLoss(ignore_index=-100, reduction='none')
            shift_logits = logits[..., :-1, :].contiguous()
            shift_labels = labels[..., 1:].contiguous()
            
            ce = ce_loss(
                shift_logits.view(-1, shift_logits.size(-1)),
                shift_labels.view(-1)
            )
            
            pt = torch.exp(-ce)
            focal_loss = alpha * (1 - pt) ** gamma * ce
            loss = focal_loss.mean()
        
        else:
            raise ValueError(f"不支持的损失函数类型: {loss_type}")
        
        return loss
    
    def train_epoch(self, train_dataloader: DataLoader, epoch: int) -> Dict[str, float]:
        """训练一个epoch"""
        self.model.train()
        total_loss = 0.0
        num_batches = len(train_dataloader)
        
        progress_bar = tqdm(
            train_dataloader, 
            desc=f"Epoch {epoch}",
            leave=False
        )
        
        for batch_idx, batch in enumerate(progress_bar):
            # 将数据移到设备
            batch = {k: v.to(self.device) for k, v in batch.items()}
            
            # 前向传播
            outputs = self.model(
                input_ids=batch['input_ids'],
                attention_mask=batch['attention_mask']
            )
            
            # 计算损失
            loss = self.compute_loss(outputs, batch['labels'])
            
            # 反向传播
            loss.backward()
            
            # 梯度裁剪
            if self.config.get('max_grad_norm', 0) > 0:
                torch.nn.utils.clip_grad_norm_(
                    self.model.parameters(),
                    self.config['max_grad_norm']
                )
            
            # 优化器步骤
            self.optimizer.step()
            if self.scheduler:
                self.scheduler.step()
            self.optimizer.zero_grad()
            
            # 更新统计信息
            total_loss += loss.item()
            self.global_step += 1
            
            # 更新进度条
            progress_bar.set_postfix({
                'loss': f'{loss.item():.4f}',
                'avg_loss': f'{total_loss / (batch_idx + 1):.4f}',
                'lr': f'{self.optimizer.param_groups[0]["lr"]:.2e}'
            })
            
            # 记录到wandb
            if self.config.get('use_wandb', False) and self.global_step % 100 == 0:
                wandb.log({
                    'train/loss': loss.item(),
                    'train/learning_rate': self.optimizer.param_groups[0]['lr'],
                    'train/global_step': self.global_step
                })
        
        avg_loss = total_loss / num_batches
        return {'avg_loss': avg_loss}
    
    def validate(self, val_dataloader: DataLoader) -> Dict[str, float]:
        """验证模型"""
        self.model.eval()
        total_loss = 0.0
        num_batches = len(val_dataloader)
        
        with torch.no_grad():
            for batch in tqdm(val_dataloader, desc="Validation", leave=False):
                batch = {k: v.to(self.device) for k, v in batch.items()}
                
                outputs = self.model(
                    input_ids=batch['input_ids'],
                    attention_mask=batch['attention_mask']
                )
                
                loss = self.compute_loss(outputs, batch['labels'])
                total_loss += loss.item()
        
        avg_loss = total_loss / num_batches
        return {'avg_loss': avg_loss}
    
    def save_checkpoint(self, epoch: int, metrics: Dict[str, float], 
                       checkpoint_dir: str, is_best: bool = False):
        """保存检查点"""
        import os
        
        checkpoint = {
            'epoch': epoch,
            'global_step': self.global_step,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'metrics': metrics,
            'config': self.config
        }
        
        if self.scheduler:
            checkpoint['scheduler_state_dict'] = self.scheduler.state_dict()
        
        # 保存最新检查点
        latest_path = os.path.join(checkpoint_dir, 'latest_checkpoint.pt')
        torch.save(checkpoint, latest_path)
        
        # 保存最佳检查点
        if is_best:
            best_path = os.path.join(checkpoint_dir, 'best_checkpoint.pt')
            torch.save(checkpoint, best_path)
            self.logger.info(f"保存最佳模型到 {best_path}")
    
    def load_checkpoint(self, checkpoint_path: str):
        """加载检查点"""
        checkpoint = torch.load(checkpoint_path, map_location=self.device)
        
        self.model.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        
        if self.scheduler and 'scheduler_state_dict' in checkpoint:
            self.scheduler.load_state_dict(checkpoint['scheduler_state_dict'])
        
        self.global_step = checkpoint.get('global_step', 0)
        
        self.logger.info(f"从 {checkpoint_path} 加载检查点")
        return checkpoint.get('epoch', 0), checkpoint.get('metrics', {})

# 使用示例
training_config = {
    'learning_rate': 5e-5,
    'batch_size': 16,
    'num_epochs': 10,
    'max_grad_norm': 1.0,
    'loss_type': 'cross_entropy',
    'use_wandb': False,
    'project_name': 'text2sql-training'
}

trainer = Text2SQLTrainer(training_config)
print("训练器初始化完成")

7.1.2 分阶段训练策略

class MultiStageTrainer(Text2SQLTrainer):
    """多阶段训练器"""
    
    def __init__(self, config: Dict):
        super().__init__(config)
        self.training_stages = config.get('training_stages', [])
        self.current_stage = 0
    
    def setup_stage(self, stage_config: Dict):
        """设置训练阶段"""
        self.logger.info(f"设置训练阶段: {stage_config['name']}")
        
        # 更新学习率
        if 'learning_rate' in stage_config:
            for param_group in self.optimizer.param_groups:
                param_group['lr'] = stage_config['learning_rate']
        
        # 冻结/解冻参数
        if 'freeze_layers' in stage_config:
            self.freeze_layers(stage_config['freeze_layers'])
        
        if 'unfreeze_layers' in stage_config:
            self.unfreeze_layers(stage_config['unfreeze_layers'])
        
        # 更新损失函数
        if 'loss_config' in stage_config:
            self.config.update(stage_config['loss_config'])
    
    def freeze_layers(self, layer_patterns: List[str]):
        """冻结指定层"""
        for name, param in self.model.named_parameters():
            for pattern in layer_patterns:
                if pattern in name:
                    param.requires_grad = False
                    self.logger.info(f"冻结层: {name}")
    
    def unfreeze_layers(self, layer_patterns: List[str]):
        """解冻指定层"""
        for name, param in self.model.named_parameters():
            for pattern in layer_patterns:
                if pattern in name:
                    param.requires_grad = True
                    self.logger.info(f"解冻层: {name}")
    
    def train_multistage(self, train_dataloader: DataLoader, 
                        val_dataloader: DataLoader, checkpoint_dir: str):
        """多阶段训练"""
        for stage_idx, stage_config in enumerate(self.training_stages):
            self.current_stage = stage_idx
            self.logger.info(f"开始阶段 {stage_idx + 1}: {stage_config['name']}")
            
            # 设置当前阶段
            self.setup_stage(stage_config)
            
            # 训练当前阶段
            num_epochs = stage_config.get('num_epochs', 5)
            for epoch in range(num_epochs):
                # 训练
                train_metrics = self.train_epoch(train_dataloader, epoch)
                
                # 验证
                val_metrics = self.validate(val_dataloader)
                
                # 记录日志
                self.logger.info(
                    f"阶段 {stage_idx + 1}, Epoch {epoch + 1}: "
                    f"Train Loss: {train_metrics['avg_loss']:.4f}, "
                    f"Val Loss: {val_metrics['avg_loss']:.4f}"
                )
                
                # 保存检查点
                is_best = val_metrics['avg_loss'] < self.best_score
                if is_best:
                    self.best_score = val_metrics['avg_loss']
                
                self.save_checkpoint(
                    epoch, val_metrics, checkpoint_dir, is_best
                )

# 多阶段训练配置示例
multistage_config = {
    'learning_rate': 5e-5,
    'batch_size': 16,
    'training_stages': [
        {
            'name': '预训练阶段',
            'num_epochs': 3,
            'learning_rate': 1e-4,
            'freeze_layers': ['encoder.layer.0', 'encoder.layer.1'],
            'loss_config': {'loss_type': 'cross_entropy'}
        },
        {
            'name': '微调阶段',
            'num_epochs': 5,
            'learning_rate': 5e-5,
            'unfreeze_layers': ['encoder.layer.0', 'encoder.layer.1'],
            'loss_config': {'loss_type': 'focal', 'focal_gamma': 2.0}
        },
        {
            'name': '精调阶段',
            'num_epochs': 2,
            'learning_rate': 1e-5,
            'loss_config': {'loss_type': 'cross_entropy'}
        }
    ]
}

multistage_trainer = MultiStageTrainer(multistage_config)
print("多阶段训练器初始化完成")

7.2 优化技术

7.2.1 梯度优化策略

class AdvancedOptimizer:
    """高级优化器"""
    
    def __init__(self, model: nn.Module, config: Dict):
        self.model = model
        self.config = config
        self.setup_optimizer()
    
    def setup_optimizer(self):
        """设置优化器"""
        # 分层学习率
        if self.config.get('use_layerwise_lr', False):
            self.optimizer = self.setup_layerwise_optimizer()
        else:
            self.optimizer = self.setup_standard_optimizer()
    
    def setup_layerwise_optimizer(self):
        """设置分层学习率优化器"""
        base_lr = self.config.get('learning_rate', 5e-5)
        lr_decay = self.config.get('layerwise_lr_decay', 0.9)
        
        parameters = []
        
        # 为不同层设置不同学习率
        for name, param in self.model.named_parameters():
            if not param.requires_grad:
                continue
            
            # 计算层深度
            layer_depth = self.get_layer_depth(name)
            layer_lr = base_lr * (lr_decay ** layer_depth)
            
            parameters.append({
                'params': param,
                'lr': layer_lr,
                'name': name
            })
        
        return optim.AdamW(
            parameters,
            weight_decay=self.config.get('weight_decay', 0.01)
        )
    
    def get_layer_depth(self, param_name: str) -> int:
        """获取参数层深度"""
        if 'embeddings' in param_name:
            return 0
        elif 'encoder.layer' in param_name:
            # 提取层号
            import re
            match = re.search(r'encoder\.layer\.(\d+)', param_name)
            if match:
                return int(match.group(1)) + 1
        elif 'decoder' in param_name:
            return 12  # 假设编码器有12层
        else:
            return 6  # 默认中间层
    
    def setup_standard_optimizer(self):
        """设置标准优化器"""
        optimizer_type = self.config.get('optimizer_type', 'AdamW')
        
        if optimizer_type == 'AdamW':
            return optim.AdamW(
                self.model.parameters(),
                lr=self.config.get('learning_rate', 5e-5),
                weight_decay=self.config.get('weight_decay', 0.01),
                betas=self.config.get('betas', (0.9, 0.999))
            )
        elif optimizer_type == 'RAdam':
            # 需要安装: pip install torch-optimizer
            import torch_optimizer as optim_extra
            return optim_extra.RAdam(
                self.model.parameters(),
                lr=self.config.get('learning_rate', 5e-5),
                weight_decay=self.config.get('weight_decay', 0.01)
            )
        elif optimizer_type == 'Lookahead':
            # Lookahead优化器
            import torch_optimizer as optim_extra
            base_optimizer = optim.AdamW(
                self.model.parameters(),
                lr=self.config.get('learning_rate', 5e-5)
            )
            return optim_extra.Lookahead(base_optimizer, k=5, alpha=0.5)
        else:
            raise ValueError(f"不支持的优化器: {optimizer_type}")

class GradientAccumulator:
    """梯度累积器"""
    
    def __init__(self, accumulation_steps: int = 4):
        self.accumulation_steps = accumulation_steps
        self.current_step = 0
    
    def should_update(self) -> bool:
        """是否应该更新参数"""
        self.current_step += 1
        return self.current_step % self.accumulation_steps == 0
    
    def scale_loss(self, loss: torch.Tensor) -> torch.Tensor:
        """缩放损失"""
        return loss / self.accumulation_steps

class MixedPrecisionTrainer:
    """混合精度训练器"""
    
    def __init__(self, model: nn.Module, optimizer, config: Dict):
        self.model = model
        self.optimizer = optimizer
        self.config = config
        
        # 初始化GradScaler
        self.scaler = torch.cuda.amp.GradScaler(
            enabled=config.get('use_fp16', False)
        )
    
    def training_step(self, batch: Dict, accumulator: GradientAccumulator) -> torch.Tensor:
        """训练步骤"""
        with torch.cuda.amp.autocast(enabled=self.config.get('use_fp16', False)):
            outputs = self.model(
                input_ids=batch['input_ids'],
                attention_mask=batch['attention_mask']
            )
            
            # 计算损失
            loss = self.compute_loss(outputs, batch['labels'])
            
            # 梯度累积
            loss = accumulator.scale_loss(loss)
        
        # 反向传播
        self.scaler.scale(loss).backward()
        
        # 参数更新
        if accumulator.should_update():
            # 梯度裁剪
            if self.config.get('max_grad_norm', 0) > 0:
                self.scaler.unscale_(self.optimizer)
                torch.nn.utils.clip_grad_norm_(
                    self.model.parameters(),
                    self.config['max_grad_norm']
                )
            
            # 优化器步骤
            self.scaler.step(self.optimizer)
            self.scaler.update()
            self.optimizer.zero_grad()
        
        return loss
    
    def compute_loss(self, outputs: Dict, labels: torch.Tensor) -> torch.Tensor:
        """计算损失(简化版)"""
        logits = outputs['logits']
        loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
        
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()
        
        loss = loss_fct(
            shift_logits.view(-1, shift_logits.size(-1)),
            shift_labels.view(-1)
        )
        
        return loss

# 使用示例
optimizer_config = {
    'optimizer_type': 'AdamW',
    'learning_rate': 5e-5,
    'weight_decay': 0.01,
    'use_layerwise_lr': True,
    'layerwise_lr_decay': 0.9,
    'use_fp16': True,
    'max_grad_norm': 1.0
}

# 假设已有模型
# advanced_optimizer = AdvancedOptimizer(model, optimizer_config)
# accumulator = GradientAccumulator(accumulation_steps=4)
# mp_trainer = MixedPrecisionTrainer(model, advanced_optimizer.optimizer, optimizer_config)

print("高级优化器配置完成")

7.2.2 正则化技术

class RegularizationTechniques:
    """正则化技术集合"""
    
    def __init__(self, config: Dict):
        self.config = config
    
    def apply_dropout(self, model: nn.Module):
        """应用Dropout"""
        dropout_rate = self.config.get('dropout_rate', 0.1)
        
        for module in model.modules():
            if isinstance(module, nn.Dropout):
                module.p = dropout_rate
    
    def apply_weight_decay(self, optimizer):
        """应用权重衰减"""
        weight_decay = self.config.get('weight_decay', 0.01)
        
        for param_group in optimizer.param_groups:
            param_group['weight_decay'] = weight_decay
    
    def label_smoothing_loss(self, logits: torch.Tensor, labels: torch.Tensor, 
                           smoothing: float = 0.1) -> torch.Tensor:
        """标签平滑损失"""
        vocab_size = logits.size(-1)
        confidence = 1.0 - smoothing
        
        # 创建平滑标签
        smooth_labels = torch.zeros_like(logits)
        smooth_labels.fill_(smoothing / (vocab_size - 1))
        smooth_labels.scatter_(1, labels.unsqueeze(1), confidence)
        
        # 计算KL散度
        log_probs = torch.log_softmax(logits, dim=-1)
        loss = -torch.sum(smooth_labels * log_probs, dim=-1)
        
        # 忽略填充标记
        mask = (labels != -100).float()
        loss = loss * mask
        
        return loss.sum() / mask.sum()
    
    def temporal_ensembling_loss(self, current_outputs: torch.Tensor, 
                                ema_outputs: torch.Tensor, 
                                consistency_weight: float = 1.0) -> torch.Tensor:
        """时间集成损失"""
        mse_loss = nn.MSELoss()
        consistency_loss = mse_loss(current_outputs, ema_outputs)
        return consistency_weight * consistency_loss
    
    def adversarial_training_loss(self, model: nn.Module, inputs: Dict, 
                                 epsilon: float = 0.01) -> torch.Tensor:
        """对抗训练损失"""
        # 获取嵌入层
        embeddings = model.get_input_embeddings()
        
        # 计算原始输出
        original_outputs = model(**inputs)
        original_loss = self.compute_loss(original_outputs, inputs['labels'])
        
        # 计算梯度
        embeddings_grad = torch.autograd.grad(
            original_loss, embeddings.weight, 
            retain_graph=True, create_graph=True
        )[0]
        
        # 生成对抗扰动
        noise = epsilon * embeddings_grad.sign()
        
        # 应用扰动
        perturbed_embeddings = embeddings.weight + noise
        
        # 临时替换嵌入
        original_weight = embeddings.weight.data.clone()
        embeddings.weight.data = perturbed_embeddings
        
        # 计算对抗输出
        adv_outputs = model(**inputs)
        adv_loss = self.compute_loss(adv_outputs, inputs['labels'])
        
        # 恢复原始嵌入
        embeddings.weight.data = original_weight
        
        # 返回组合损失
        return original_loss + 0.1 * adv_loss
    
    def compute_loss(self, outputs: Dict, labels: torch.Tensor) -> torch.Tensor:
        """计算基础损失"""
        logits = outputs['logits']
        loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
        
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()
        
        loss = loss_fct(
            shift_logits.view(-1, shift_logits.size(-1)),
            shift_labels.view(-1)
        )
        
        return loss

class EMAModel:
    """指数移动平均模型"""
    
    def __init__(self, model: nn.Module, decay: float = 0.999):
        self.model = model
        self.decay = decay
        self.shadow = {}
        self.backup = {}
        
        # 初始化影子参数
        for name, param in model.named_parameters():
            if param.requires_grad:
                self.shadow[name] = param.data.clone()
    
    def update(self):
        """更新EMA参数"""
        for name, param in self.model.named_parameters():
            if param.requires_grad and name in self.shadow:
                self.shadow[name] = (
                    self.decay * self.shadow[name] + 
                    (1 - self.decay) * param.data
                )
    
    def apply_shadow(self):
        """应用影子参数"""
        for name, param in self.model.named_parameters():
            if param.requires_grad and name in self.shadow:
                self.backup[name] = param.data.clone()
                param.data = self.shadow[name]
    
    def restore(self):
        """恢复原始参数"""
        for name, param in self.model.named_parameters():
            if param.requires_grad and name in self.backup:
                param.data = self.backup[name]
        self.backup = {}

# 使用示例
regularization_config = {
    'dropout_rate': 0.1,
    'weight_decay': 0.01,
    'label_smoothing': 0.1,
    'use_adversarial': True,
    'adversarial_epsilon': 0.01
}

reg_techniques = RegularizationTechniques(regularization_config)
print("正则化技术初始化完成")

7.3 超参数调优

7.3.1 网格搜索与随机搜索

import itertools
import random
from typing import Dict, List, Any, Callable
import json
import os

class HyperparameterTuner:
    """超参数调优器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.results = []
    
    def grid_search(self, param_grid: Dict[str, List], 
                   train_func: Callable, eval_func: Callable) -> Dict:
        """网格搜索"""
        # 生成所有参数组合
        param_names = list(param_grid.keys())
        param_values = list(param_grid.values())
        
        best_score = float('-inf')
        best_params = None
        
        for combination in itertools.product(*param_values):
            params = dict(zip(param_names, combination))
            
            print(f"测试参数组合: {params}")
            
            # 训练模型
            model = train_func(params)
            
            # 评估模型
            score = eval_func(model, params)
            
            # 记录结果
            result = {
                'params': params,
                'score': score
            }
            self.results.append(result)
            
            # 更新最佳结果
            if score > best_score:
                best_score = score
                best_params = params
            
            print(f"得分: {score:.4f}")
        
        return {
            'best_params': best_params,
            'best_score': best_score,
            'all_results': self.results
        }
    
    def random_search(self, param_distributions: Dict[str, Callable], 
                     n_trials: int, train_func: Callable, 
                     eval_func: Callable) -> Dict:
        """随机搜索"""
        best_score = float('-inf')
        best_params = None
        
        for trial in range(n_trials):
            # 随机采样参数
            params = {}
            for param_name, distribution in param_distributions.items():
                params[param_name] = distribution()
            
            print(f"试验 {trial + 1}/{n_trials}: {params}")
            
            # 训练和评估
            model = train_func(params)
            score = eval_func(model, params)
            
            # 记录结果
            result = {
                'trial': trial,
                'params': params,
                'score': score
            }
            self.results.append(result)
            
            # 更新最佳结果
            if score > best_score:
                best_score = score
                best_params = params
            
            print(f"得分: {score:.4f}")
        
        return {
            'best_params': best_params,
            'best_score': best_score,
            'all_results': self.results
        }
    
    def save_results(self, filepath: str):
        """保存调优结果"""
        with open(filepath, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, indent=2, ensure_ascii=False)
    
    def load_results(self, filepath: str):
        """加载调优结果"""
        with open(filepath, 'r', encoding='utf-8') as f:
            self.results = json.load(f)

class BayesianOptimizer:
    """贝叶斯优化器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.results = []
    
    def optimize(self, objective_func: Callable, param_bounds: Dict, 
                n_trials: int = 50) -> Dict:
        """贝叶斯优化"""
        try:
            from skopt import gp_minimize
            from skopt.space import Real, Integer, Categorical
            from skopt.utils import use_named_args
        except ImportError:
            raise ImportError("请安装scikit-optimize: pip install scikit-optimize")
        
        # 定义搜索空间
        dimensions = []
        param_names = []
        
        for param_name, bounds in param_bounds.items():
            param_names.append(param_name)
            
            if isinstance(bounds, tuple) and len(bounds) == 2:
                if isinstance(bounds[0], int) and isinstance(bounds[1], int):
                    dimensions.append(Integer(bounds[0], bounds[1], name=param_name))
                else:
                    dimensions.append(Real(bounds[0], bounds[1], name=param_name))
            elif isinstance(bounds, list):
                dimensions.append(Categorical(bounds, name=param_name))
            else:
                raise ValueError(f"不支持的参数边界类型: {type(bounds)}")
        
        @use_named_args(dimensions)
        def objective(**params):
            score = objective_func(params)
            self.results.append({
                'params': params,
                'score': score
            })
            return -score  # skopt最小化目标函数
        
        # 执行优化
        result = gp_minimize(
            func=objective,
            dimensions=dimensions,
            n_calls=n_trials,
            random_state=42
        )
        
        # 提取最佳参数
        best_params = dict(zip(param_names, result.x))
        best_score = -result.fun
        
        return {
            'best_params': best_params,
            'best_score': best_score,
            'optimization_result': result,
            'all_results': self.results
        }

# 超参数搜索空间定义
def create_param_distributions():
    """创建参数分布"""
    return {
        'learning_rate': lambda: random.uniform(1e-5, 1e-3),
        'batch_size': lambda: random.choice([8, 16, 32, 64]),
        'dropout_rate': lambda: random.uniform(0.1, 0.5),
        'weight_decay': lambda: random.uniform(0.001, 0.1),
        'warmup_ratio': lambda: random.uniform(0.05, 0.2),
        'max_grad_norm': lambda: random.uniform(0.5, 2.0)
    }

def create_param_grid():
    """创建参数网格"""
    return {
        'learning_rate': [1e-5, 5e-5, 1e-4, 5e-4],
        'batch_size': [16, 32],
        'dropout_rate': [0.1, 0.2, 0.3],
        'weight_decay': [0.01, 0.05, 0.1]
    }

def create_param_bounds():
    """创建参数边界(贝叶斯优化)"""
    return {
        'learning_rate': (1e-5, 1e-3),
        'batch_size': [8, 16, 32, 64],
        'dropout_rate': (0.1, 0.5),
        'weight_decay': (0.001, 0.1),
        'warmup_ratio': (0.05, 0.2)
    }

# 模拟训练和评估函数
def mock_train_function(params: Dict) -> Any:
    """模拟训练函数"""
    print(f"使用参数训练模型: {params}")
    # 这里应该是实际的模型训练代码
    return f"model_with_{params}"

def mock_eval_function(model: Any, params: Dict) -> float:
    """模拟评估函数"""
    # 这里应该是实际的模型评估代码
    # 返回一个模拟的准确率分数
    base_score = 0.7
    lr_bonus = min(params.get('learning_rate', 5e-5) * 10000, 0.1)
    dropout_penalty = params.get('dropout_rate', 0.1) * 0.1
    
    score = base_score + lr_bonus - dropout_penalty + random.uniform(-0.05, 0.05)
    return max(0, min(1, score))

# 使用示例
tuner = HyperparameterTuner({})

# 网格搜索示例
print("开始网格搜索...")
param_grid = create_param_grid()
grid_results = tuner.grid_search(
    param_grid, mock_train_function, mock_eval_function
)
print(f"网格搜索最佳参数: {grid_results['best_params']}")
print(f"网格搜索最佳得分: {grid_results['best_score']:.4f}")

# 随机搜索示例
print("\n开始随机搜索...")
tuner_random = HyperparameterTuner({})
param_distributions = create_param_distributions()
random_results = tuner_random.random_search(
    param_distributions, n_trials=20, 
    train_func=mock_train_function, eval_func=mock_eval_function
)
print(f"随机搜索最佳参数: {random_results['best_params']}")
print(f"随机搜索最佳得分: {random_results['best_score']:.4f}")

7.3.2 早停与学习率调度

class EarlyStopping:
    """早停机制"""
    
    def __init__(self, patience: int = 7, min_delta: float = 0.001, 
                 mode: str = 'min', restore_best_weights: bool = True):
        self.patience = patience
        self.min_delta = min_delta
        self.mode = mode
        self.restore_best_weights = restore_best_weights
        
        self.best_score = None
        self.counter = 0
        self.best_weights = None
        self.early_stop = False
        
        if mode == 'min':
            self.monitor_op = lambda current, best: current < best - min_delta
        else:
            self.monitor_op = lambda current, best: current > best + min_delta
    
    def __call__(self, score: float, model: nn.Module) -> bool:
        """检查是否应该早停"""
        if self.best_score is None:
            self.best_score = score
            self.save_checkpoint(model)
        elif self.monitor_op(score, self.best_score):
            self.best_score = score
            self.counter = 0
            self.save_checkpoint(model)
        else:
            self.counter += 1
            
        if self.counter >= self.patience:
            self.early_stop = True
            if self.restore_best_weights:
                self.restore_checkpoint(model)
        
        return self.early_stop
    
    def save_checkpoint(self, model: nn.Module):
        """保存最佳权重"""
        if self.restore_best_weights:
            self.best_weights = {k: v.cpu().clone() 
                               for k, v in model.state_dict().items()}
    
    def restore_checkpoint(self, model: nn.Module):
        """恢复最佳权重"""
        if self.best_weights is not None:
            model.load_state_dict({
                k: v.to(model.device) for k, v in self.best_weights.items()
            })

class LearningRateScheduler:
    """学习率调度器"""
    
    def __init__(self, optimizer, config: Dict):
        self.optimizer = optimizer
        self.config = config
        self.scheduler = self.create_scheduler()
    
    def create_scheduler(self):
        """创建调度器"""
        scheduler_type = self.config.get('type', 'cosine')
        
        if scheduler_type == 'cosine':
            return optim.lr_scheduler.CosineAnnealingLR(
                self.optimizer,
                T_max=self.config.get('T_max', 100),
                eta_min=self.config.get('eta_min', 1e-6)
            )
        
        elif scheduler_type == 'reduce_on_plateau':
            return optim.lr_scheduler.ReduceLROnPlateau(
                self.optimizer,
                mode=self.config.get('mode', 'min'),
                factor=self.config.get('factor', 0.5),
                patience=self.config.get('patience', 5),
                min_lr=self.config.get('min_lr', 1e-6)
            )
        
        elif scheduler_type == 'exponential':
            return optim.lr_scheduler.ExponentialLR(
                self.optimizer,
                gamma=self.config.get('gamma', 0.95)
            )
        
        elif scheduler_type == 'step':
            return optim.lr_scheduler.StepLR(
                self.optimizer,
                step_size=self.config.get('step_size', 30),
                gamma=self.config.get('gamma', 0.1)
            )
        
        elif scheduler_type == 'cyclic':
            return optim.lr_scheduler.CyclicLR(
                self.optimizer,
                base_lr=self.config.get('base_lr', 1e-5),
                max_lr=self.config.get('max_lr', 1e-3),
                step_size_up=self.config.get('step_size_up', 2000),
                mode=self.config.get('mode', 'triangular')
            )
        
        else:
            raise ValueError(f"不支持的调度器类型: {scheduler_type}")
    
    def step(self, metrics: float = None):
        """调度器步进"""
        if isinstance(self.scheduler, optim.lr_scheduler.ReduceLROnPlateau):
            if metrics is not None:
                self.scheduler.step(metrics)
        else:
            self.scheduler.step()
    
    def get_lr(self) -> float:
        """获取当前学习率"""
        return self.optimizer.param_groups[0]['lr']

class WarmupScheduler:
    """预热调度器"""
    
    def __init__(self, optimizer, warmup_steps: int, 
                 total_steps: int, initial_lr: float = 1e-7):
        self.optimizer = optimizer
        self.warmup_steps = warmup_steps
        self.total_steps = total_steps
        self.initial_lr = initial_lr
        self.base_lr = optimizer.param_groups[0]['lr']
        self.current_step = 0
    
    def step(self):
        """更新学习率"""
        self.current_step += 1
        
        if self.current_step <= self.warmup_steps:
            # 预热阶段:线性增长
            lr = self.initial_lr + (self.base_lr - self.initial_lr) * \
                 (self.current_step / self.warmup_steps)
        else:
            # 衰减阶段:余弦衰减
            progress = (self.current_step - self.warmup_steps) / \
                      (self.total_steps - self.warmup_steps)
            lr = self.base_lr * 0.5 * (1 + math.cos(math.pi * progress))
        
        for param_group in self.optimizer.param_groups:
            param_group['lr'] = lr
    
    def get_lr(self) -> float:
        """获取当前学习率"""
        return self.optimizer.param_groups[0]['lr']

# 使用示例
import math

# 早停配置
early_stopping = EarlyStopping(
    patience=5,
    min_delta=0.001,
    mode='min',
    restore_best_weights=True
)

# 学习率调度器配置
scheduler_config = {
    'type': 'cosine',
    'T_max': 100,
    'eta_min': 1e-6
}

# 模拟训练循环
class MockModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(10, 1)
        self.device = 'cpu'

mock_model = MockModel()
mock_optimizer = optim.AdamW(mock_model.parameters(), lr=1e-3)

lr_scheduler = LearningRateScheduler(mock_optimizer, scheduler_config)
warmup_scheduler = WarmupScheduler(mock_optimizer, warmup_steps=100, total_steps=1000)

print("训练调度器初始化完成")
print(f"初始学习率: {lr_scheduler.get_lr():.2e}")

# 模拟训练过程
for epoch in range(10):
    # 模拟验证损失
    val_loss = 1.0 - epoch * 0.1 + random.uniform(-0.05, 0.05)
    
    # 检查早停
    if early_stopping(val_loss, mock_model):
        print(f"在第 {epoch + 1} 轮触发早停")
        break
    
    # 更新学习率
    lr_scheduler.step(val_loss)
    
    print(f"Epoch {epoch + 1}: Val Loss = {val_loss:.4f}, LR = {lr_scheduler.get_lr():.2e}")

print("训练完成")

7.4 分布式训练

7.4.1 数据并行训练

import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import os

class DistributedTrainer:
    """分布式训练器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.world_size = config.get('world_size', 1)
        self.backend = config.get('backend', 'nccl')
    
    def setup(self, rank: int):
        """设置分布式环境"""
        os.environ['MASTER_ADDR'] = self.config.get('master_addr', 'localhost')
        os.environ['MASTER_PORT'] = self.config.get('master_port', '12355')
        
        # 初始化进程组
        dist.init_process_group(
            backend=self.backend,
            rank=rank,
            world_size=self.world_size
        )
        
        # 设置GPU
        torch.cuda.set_device(rank)
    
    def cleanup(self):
        """清理分布式环境"""
        dist.destroy_process_group()
    
    def create_distributed_model(self, model: nn.Module, rank: int) -> DDP:
        """创建分布式模型"""
        model = model.to(rank)
        ddp_model = DDP(model, device_ids=[rank])
        return ddp_model
    
    def create_distributed_dataloader(self, dataset, batch_size: int, 
                                    rank: int) -> DataLoader:
        """创建分布式数据加载器"""
        sampler = DistributedSampler(
            dataset,
            num_replicas=self.world_size,
            rank=rank,
            shuffle=True
        )
        
        dataloader = DataLoader(
            dataset,
            batch_size=batch_size,
            sampler=sampler,
            num_workers=4,
            pin_memory=True
        )
        
        return dataloader
    
    def train_distributed(self, rank: int, model_class, dataset, config: Dict):
        """分布式训练主函数"""
        # 设置分布式环境
        self.setup(rank)
        
        try:
            # 创建模型
            model = model_class(**config.get('model_config', {}))
            ddp_model = self.create_distributed_model(model, rank)
            
            # 创建优化器
            optimizer = optim.AdamW(
                ddp_model.parameters(),
                lr=config.get('learning_rate', 5e-5)
            )
            
            # 创建数据加载器
            dataloader = self.create_distributed_dataloader(
                dataset, config.get('batch_size', 16), rank
            )
            
            # 训练循环
            ddp_model.train()
            for epoch in range(config.get('num_epochs', 10)):
                # 设置epoch(用于随机性)
                dataloader.sampler.set_epoch(epoch)
                
                epoch_loss = 0.0
                for batch_idx, batch in enumerate(dataloader):
                    # 将数据移到GPU
                    batch = {k: v.to(rank) for k, v in batch.items()}
                    
                    # 前向传播
                    outputs = ddp_model(
                        input_ids=batch['input_ids'],
                        attention_mask=batch['attention_mask']
                    )
                    
                    # 计算损失
                    loss = self.compute_loss(outputs, batch['labels'])
                    
                    # 反向传播
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                    
                    epoch_loss += loss.item()
                    
                    if rank == 0 and batch_idx % 100 == 0:
                        print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
                
                # 同步所有进程的损失
                avg_loss = self.reduce_tensor(torch.tensor(epoch_loss).to(rank))
                
                if rank == 0:
                    print(f"Epoch {epoch} 平均损失: {avg_loss / len(dataloader):.4f}")
        
        finally:
            self.cleanup()
    
    def reduce_tensor(self, tensor: torch.Tensor) -> torch.Tensor:
        """跨进程归约张量"""
        dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
        return tensor / self.world_size
    
    def compute_loss(self, outputs: Dict, labels: torch.Tensor) -> torch.Tensor:
        """计算损失"""
        logits = outputs['logits']
        loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
        
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()
        
        loss = loss_fct(
            shift_logits.view(-1, shift_logits.size(-1)),
            shift_labels.view(-1)
        )
        
        return loss

def run_distributed_training(model_class, dataset, config: Dict):
    """启动分布式训练"""
    world_size = config.get('world_size', torch.cuda.device_count())
    
    trainer = DistributedTrainer(config)
    
    mp.spawn(
        trainer.train_distributed,
        args=(model_class, dataset, config),
        nprocs=world_size,
        join=True
    )

# 使用示例
distributed_config = {
    'world_size': 2,
    'backend': 'nccl',
    'master_addr': 'localhost',
    'master_port': '12355',
    'learning_rate': 5e-5,
    'batch_size': 16,
    'num_epochs': 5,
    'model_config': {}
}

print("分布式训练器配置完成")

7.4.2 模型并行训练

class ModelParallelTrainer:
    """模型并行训练器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.device_map = config.get('device_map', {})
    
    def split_model(self, model: nn.Module) -> nn.Module:
        """分割模型到不同设备"""
        # 示例:将编码器和解码器分别放在不同GPU上
        if hasattr(model, 'encoder') and hasattr(model, 'decoder'):
            model.encoder = model.encoder.to('cuda:0')
            model.decoder = model.decoder.to('cuda:1')
        
        return model
    
    def forward_with_model_parallel(self, model: nn.Module, 
                                  input_ids: torch.Tensor, 
                                  attention_mask: torch.Tensor) -> Dict:
        """模型并行前向传播"""
        # 输入在第一个设备
        input_ids = input_ids.to('cuda:0')
        attention_mask = attention_mask.to('cuda:0')
        
        # 编码器前向传播
        encoder_outputs = model.encoder(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        # 将编码器输出移到解码器设备
        encoder_hidden_states = encoder_outputs.last_hidden_state.to('cuda:1')
        attention_mask = attention_mask.to('cuda:1')
        
        # 解码器前向传播
        decoder_outputs = model.decoder(
            encoder_hidden_states=encoder_hidden_states,
            attention_mask=attention_mask
        )
        
        return {
            'logits': decoder_outputs.logits,
            'encoder_outputs': encoder_outputs,
            'decoder_outputs': decoder_outputs
        }
    
    def train_with_model_parallel(self, model: nn.Module, dataloader: DataLoader, 
                                 optimizer, num_epochs: int):
        """模型并行训练"""
        model = self.split_model(model)
        model.train()
        
        for epoch in range(num_epochs):
            total_loss = 0.0
            
            for batch_idx, batch in enumerate(dataloader):
                # 前向传播
                outputs = self.forward_with_model_parallel(
                    model, batch['input_ids'], batch['attention_mask']
                )
                
                # 计算损失(标签需要在正确的设备上)
                labels = batch['labels'].to('cuda:1')
                loss = self.compute_loss(outputs, labels)
                
                # 反向传播
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                
                total_loss += loss.item()
                
                if batch_idx % 100 == 0:
                    print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item():.4f}")
            
            avg_loss = total_loss / len(dataloader)
            print(f"Epoch {epoch} 平均损失: {avg_loss:.4f}")
    
    def compute_loss(self, outputs: Dict, labels: torch.Tensor) -> torch.Tensor:
        """计算损失"""
        logits = outputs['logits']
        loss_fct = nn.CrossEntropyLoss(ignore_index=-100)
        
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()
        
        loss = loss_fct(
            shift_logits.view(-1, shift_logits.size(-1)),
            shift_labels.view(-1)
        )
        
        return loss

class PipelineParallelTrainer:
    """流水线并行训练器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.num_stages = config.get('num_stages', 2)
        self.micro_batch_size = config.get('micro_batch_size', 4)
    
    def create_pipeline_stages(self, model: nn.Module) -> List[nn.Module]:
        """创建流水线阶段"""
        stages = []
        
        # 简单示例:将模型分为两个阶段
        if hasattr(model, 'encoder') and hasattr(model, 'decoder'):
            # 阶段1:嵌入层 + 编码器
            stage1 = nn.Sequential(
                model.embeddings,
                model.encoder
            ).to('cuda:0')
            
            # 阶段2:解码器 + 输出层
            stage2 = nn.Sequential(
                model.decoder,
                model.lm_head
            ).to('cuda:1')
            
            stages = [stage1, stage2]
        
        return stages
    
    def pipeline_forward(self, stages: List[nn.Module], 
                        micro_batches: List[Dict]) -> List[torch.Tensor]:
        """流水线前向传播"""
        outputs = []
        
        for micro_batch in micro_batches:
            x = micro_batch['input_ids'].to('cuda:0')
            
            # 通过各个阶段
            for stage_idx, stage in enumerate(stages):
                if stage_idx > 0:
                    x = x.to(f'cuda:{stage_idx}')
                x = stage(x)
            
            outputs.append(x)
        
        return outputs

# 使用示例
model_parallel_config = {
    'device_map': {
        'encoder': 'cuda:0',
        'decoder': 'cuda:1'
    },
    'num_stages': 2,
    'micro_batch_size': 4
}

mp_trainer = ModelParallelTrainer(model_parallel_config)
print("模型并行训练器配置完成")

## 7.5 性能监控与调试

### 7.5.1 训练监控系统

```python
import psutil
import GPUtil
import time
from collections import defaultdict, deque
import matplotlib.pyplot as plt
from typing import Dict, List, Optional

class TrainingMonitor:
    """训练监控器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.metrics_history = defaultdict(deque)
        self.start_time = time.time()
        
        # 监控配置
        self.monitor_gpu = config.get('monitor_gpu', True)
        self.monitor_memory = config.get('monitor_memory', True)
        self.history_length = config.get('history_length', 1000)
        
        # 初始化wandb(如果配置)
        if config.get('use_wandb', False):
            import wandb
            self.wandb = wandb
        else:
            self.wandb = None
    
    def log_metrics(self, metrics: Dict[str, float], step: int):
        """记录指标"""
        timestamp = time.time()
        
        # 添加系统指标
        system_metrics = self.get_system_metrics()
        metrics.update(system_metrics)
        
        # 存储到历史记录
        for key, value in metrics.items():
            self.metrics_history[key].append((timestamp, step, value))
            
            # 限制历史长度
            if len(self.metrics_history[key]) > self.history_length:
                self.metrics_history[key].popleft()
        
        # 记录到wandb
        if self.wandb:
            self.wandb.log(metrics, step=step)
        
        # 打印关键指标
        self.print_metrics(metrics, step)
    
    def get_system_metrics(self) -> Dict[str, float]:
        """获取系统指标"""
        metrics = {}
        
        # CPU和内存使用率
        if self.monitor_memory:
            cpu_percent = psutil.cpu_percent()
            memory = psutil.virtual_memory()
            
            metrics.update({
                'system/cpu_percent': cpu_percent,
                'system/memory_percent': memory.percent,
                'system/memory_available_gb': memory.available / (1024**3)
            })
        
        # GPU指标
        if self.monitor_gpu and torch.cuda.is_available():
            try:
                gpus = GPUtil.getGPUs()
                for i, gpu in enumerate(gpus):
                    metrics.update({
                        f'gpu_{i}/utilization': gpu.load * 100,
                        f'gpu_{i}/memory_percent': gpu.memoryUtil * 100,
                        f'gpu_{i}/memory_used_mb': gpu.memoryUsed,
                        f'gpu_{i}/temperature': gpu.temperature
                    })
            except Exception as e:
                print(f"GPU监控错误: {e}")
        
        return metrics
    
    def print_metrics(self, metrics: Dict[str, float], step: int):
        """打印指标"""
        elapsed_time = time.time() - self.start_time
        
        # 选择关键指标打印
        key_metrics = {
            k: v for k, v in metrics.items() 
            if any(keyword in k.lower() for keyword in ['loss', 'accuracy', 'lr'])
        }
        
        print(f"Step {step} ({elapsed_time:.1f}s): {key_metrics}")
    
    def plot_metrics(self, metric_names: List[str], save_path: Optional[str] = None):
        """绘制指标图表"""
        fig, axes = plt.subplots(len(metric_names), 1, figsize=(12, 4 * len(metric_names)))
        
        if len(metric_names) == 1:
            axes = [axes]
        
        for i, metric_name in enumerate(metric_names):
            if metric_name in self.metrics_history:
                history = list(self.metrics_history[metric_name])
                timestamps, steps, values = zip(*history)
                
                axes[i].plot(steps, values)
                axes[i].set_title(f'{metric_name} over time')
                axes[i].set_xlabel('Step')
                axes[i].set_ylabel(metric_name)
                axes[i].grid(True)
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path)
        else:
            plt.show()
    
    def get_summary_stats(self, metric_name: str) -> Dict[str, float]:
        """获取指标统计信息"""
        if metric_name not in self.metrics_history:
            return {}
        
        values = [v for _, _, v in self.metrics_history[metric_name]]
        
        return {
            'mean': sum(values) / len(values),
            'min': min(values),
            'max': max(values),
            'latest': values[-1] if values else 0
        }
    
    def detect_anomalies(self, metric_name: str, threshold: float = 2.0) -> List[int]:
        """检测异常值"""
        if metric_name not in self.metrics_history:
            return []
        
        values = [v for _, _, v in self.metrics_history[metric_name]]
        steps = [s for _, s, _ in self.metrics_history[metric_name]]
        
        if len(values) < 10:
            return []
        
        # 计算移动平均和标准差
        window_size = min(50, len(values) // 4)
        anomalies = []
        
        for i in range(window_size, len(values)):
            window = values[i-window_size:i]
            mean_val = sum(window) / len(window)
            std_val = (sum((x - mean_val) ** 2 for x in window) / len(window)) ** 0.5
            
            if abs(values[i] - mean_val) > threshold * std_val:
                anomalies.append(steps[i])
        
        return anomalies

class PerformanceProfiler:
    """性能分析器"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.profiler = None
        self.profile_dir = config.get('profile_dir', './profiles')
        
        # 创建分析目录
        os.makedirs(self.profile_dir, exist_ok=True)
    
    def start_profiling(self, activities: List[str] = None):
        """开始性能分析"""
        if activities is None:
            activities = ['cpu', 'cuda']
        
        self.profiler = torch.profiler.profile(
            activities=[getattr(torch.profiler.ProfilerActivity, act.upper()) 
                       for act in activities],
            schedule=torch.profiler.schedule(
                wait=self.config.get('wait', 1),
                warmup=self.config.get('warmup', 1),
                active=self.config.get('active', 3),
                repeat=self.config.get('repeat', 2)
            ),
            on_trace_ready=torch.profiler.tensorboard_trace_handler(self.profile_dir),
            record_shapes=True,
            profile_memory=True,
            with_stack=True
        )
        
        self.profiler.start()
    
    def step(self):
        """分析器步进"""
        if self.profiler:
            self.profiler.step()
    
    def stop_profiling(self):
        """停止性能分析"""
        if self.profiler:
            self.profiler.stop()
            self.profiler = None
    
    def analyze_memory_usage(self, model: nn.Module) -> Dict[str, float]:
        """分析内存使用"""
        if not torch.cuda.is_available():
            return {}
        
        torch.cuda.empty_cache()
        torch.cuda.reset_peak_memory_stats()
        
        # 模拟前向传播
        dummy_input = torch.randn(1, 512).cuda()
        
        with torch.no_grad():
            _ = model(dummy_input)
        
        memory_stats = {
            'allocated_mb': torch.cuda.memory_allocated() / (1024**2),
            'reserved_mb': torch.cuda.memory_reserved() / (1024**2),
            'peak_allocated_mb': torch.cuda.max_memory_allocated() / (1024**2),
            'peak_reserved_mb': torch.cuda.max_memory_reserved() / (1024**2)
        }
        
        return memory_stats

# 使用示例
monitor_config = {
    'monitor_gpu': True,
    'monitor_memory': True,
    'history_length': 1000,
    'use_wandb': False
}

monitor = TrainingMonitor(monitor_config)
profiler = PerformanceProfiler({'profile_dir': './profiles'})

print("性能监控系统初始化完成")

7.5.2 调试技巧

class DebuggingTools:
    """调试工具集合"""
    
    def __init__(self, config: Dict):
        self.config = config
        self.gradient_history = []
        self.activation_history = []
    
    def check_gradients(self, model: nn.Module, threshold: float = 1e-7) -> Dict:
        """检查梯度"""
        gradient_info = {
            'zero_gradients': [],
            'large_gradients': [],
            'nan_gradients': [],
            'gradient_norms': {}
        }
        
        for name, param in model.named_parameters():
            if param.grad is not None:
                grad_norm = param.grad.norm().item()
                gradient_info['gradient_norms'][name] = grad_norm
                
                # 检查零梯度
                if grad_norm < threshold:
                    gradient_info['zero_gradients'].append(name)
                
                # 检查大梯度
                if grad_norm > 10.0:
                    gradient_info['large_gradients'].append(name)
                
                # 检查NaN梯度
                if torch.isnan(param.grad).any():
                    gradient_info['nan_gradients'].append(name)
        
        return gradient_info
    
    def register_hooks(self, model: nn.Module):
        """注册调试钩子"""
        def gradient_hook(name):
            def hook(grad):
                if torch.isnan(grad).any():
                    print(f"NaN梯度检测到: {name}")
                if grad.norm() > 100:
                    print(f"大梯度检测到: {name}, norm: {grad.norm().item():.2f}")
                return grad
            return hook
        
        def activation_hook(name):
            def hook(module, input, output):
                if torch.isnan(output).any():
                    print(f"NaN激活检测到: {name}")
                self.activation_history.append({
                    'name': name,
                    'mean': output.mean().item(),
                    'std': output.std().item(),
                    'min': output.min().item(),
                    'max': output.max().item()
                })
            return hook
        
        # 注册梯度钩子
        for name, param in model.named_parameters():
            if param.requires_grad:
                param.register_hook(gradient_hook(name))
        
        # 注册激活钩子
        for name, module in model.named_modules():
            if isinstance(module, (nn.Linear, nn.Conv1d, nn.Conv2d)):
                module.register_forward_hook(activation_hook(name))
    
    def visualize_gradients(self, model: nn.Module, save_path: str = None):
        """可视化梯度分布"""
        gradients = []
        names = []
        
        for name, param in model.named_parameters():
            if param.grad is not None:
                gradients.append(param.grad.flatten().detach().cpu().numpy())
                names.append(name)
        
        if not gradients:
            print("没有找到梯度信息")
            return
        
        fig, axes = plt.subplots(len(gradients), 1, figsize=(12, 3 * len(gradients)))
        
        if len(gradients) == 1:
            axes = [axes]
        
        for i, (grad, name) in enumerate(zip(gradients, names)):
            axes[i].hist(grad, bins=50, alpha=0.7)
            axes[i].set_title(f'Gradient distribution: {name}')
            axes[i].set_xlabel('Gradient value')
            axes[i].set_ylabel('Frequency')
        
        plt.tight_layout()
        
        if save_path:
            plt.savefig(save_path)
        else:
            plt.show()
    
    def check_model_weights(self, model: nn.Module) -> Dict:
        """检查模型权重"""
        weight_info = {
            'weight_norms': {},
            'zero_weights': [],
            'large_weights': [],
            'nan_weights': []
        }
        
        for name, param in model.named_parameters():
            weight_norm = param.norm().item()
            weight_info['weight_norms'][name] = weight_norm
            
            # 检查零权重
            if weight_norm < 1e-7:
                weight_info['zero_weights'].append(name)
            
            # 检查大权重
            if weight_norm > 100:
                weight_info['large_weights'].append(name)
            
            # 检查NaN权重
            if torch.isnan(param).any():
                weight_info['nan_weights'].append(name)
        
        return weight_info
    
    def save_checkpoint_with_debug_info(self, model: nn.Module, optimizer, 
                                       epoch: int, loss: float, 
                                       checkpoint_path: str):
        """保存包含调试信息的检查点"""
        gradient_info = self.check_gradients(model)
        weight_info = self.check_model_weights(model)
        
        checkpoint = {
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
            'gradient_info': gradient_info,
            'weight_info': weight_info,
            'activation_history': self.activation_history[-100:],  # 保存最近100个激活记录
            'config': self.config
        }
        
        torch.save(checkpoint, checkpoint_path)
        print(f"调试检查点已保存到: {checkpoint_path}")

# 使用示例
debug_tools = DebuggingTools({})
print("调试工具初始化完成")

7.6 训练技巧与最佳实践

7.6.1 训练稳定性技巧

class TrainingStabilizer:
    """训练稳定性工具"""
    
    def __init__(self, config: Dict):
        self.config = config
    
    def apply_weight_initialization(self, model: nn.Module):
        """应用权重初始化"""
        for module in model.modules():
            if isinstance(module, nn.Linear):
                # Xavier初始化
                nn.init.xavier_uniform_(module.weight)
                if module.bias is not None:
                    nn.init.zeros_(module.bias)
            
            elif isinstance(module, nn.Embedding):
                # 正态分布初始化
                nn.init.normal_(module.weight, mean=0, std=0.02)
            
            elif isinstance(module, (nn.LayerNorm, nn.BatchNorm1d)):
                # 归一化层初始化
                nn.init.ones_(module.weight)
                nn.init.zeros_(module.bias)
    
    def apply_gradient_clipping(self, model: nn.Module, max_norm: float = 1.0):
        """应用梯度裁剪"""
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
    
    def check_loss_explosion(self, loss: float, threshold: float = 100.0) -> bool:
        """检查损失爆炸"""
        return loss > threshold or torch.isnan(torch.tensor(loss))
    
    def apply_loss_scaling(self, loss: torch.Tensor, scale: float = 1.0) -> torch.Tensor:
        """应用损失缩放"""
        return loss * scale
    
    def warmup_learning_rate(self, optimizer, current_step: int, 
                           warmup_steps: int, base_lr: float):
        """预热学习率"""
        if current_step < warmup_steps:
            lr = base_lr * (current_step / warmup_steps)
            for param_group in optimizer.param_groups:
                param_group['lr'] = lr

class DataAugmentation:
    """数据增强技术"""
    
    def __init__(self, config: Dict):
        self.config = config
    
    def token_dropout(self, input_ids: torch.Tensor, 
                     dropout_rate: float = 0.1) -> torch.Tensor:
        """词元丢弃"""
        mask = torch.rand(input_ids.shape) > dropout_rate
        return input_ids * mask.long()
    
    def token_replacement(self, input_ids: torch.Tensor, 
                         vocab_size: int, replacement_rate: float = 0.1) -> torch.Tensor:
        """词元替换"""
        mask = torch.rand(input_ids.shape) < replacement_rate
        random_tokens = torch.randint(0, vocab_size, input_ids.shape)
        return torch.where(mask, random_tokens, input_ids)
    
    def sequence_shuffling(self, input_ids: torch.Tensor, 
                          shuffle_rate: float = 0.1) -> torch.Tensor:
        """序列打乱"""
        batch_size, seq_len = input_ids.shape
        
        for i in range(batch_size):
            if torch.rand(1) < shuffle_rate:
                # 随机选择一个子序列进行打乱
                start = torch.randint(0, seq_len - 10, (1,)).item()
                end = min(start + 10, seq_len)
                
                subseq = input_ids[i, start:end]
                shuffled_indices = torch.randperm(len(subseq))
                input_ids[i, start:end] = subseq[shuffled_indices]
        
        return input_ids

# 使用示例
stabilizer = TrainingStabilizer({})
augmentation = DataAugmentation({})

print("训练稳定性工具初始化完成")

7.7 本章总结

本章详细介绍了Text2SQL模型的训练与优化技术,主要内容包括:

7.7.1 核心要点

  1. 训练策略

    • 设计了完整的训练流程,包括数据加载、模型前向传播、损失计算和参数更新
    • 实现了多阶段训练策略,支持预训练、微调和精调等不同阶段
    • 提供了灵活的配置系统,便于调整训练参数
  2. 优化技术

    • 实现了多种优化器,包括AdamW、RAdam和Lookahead等
    • 支持分层学习率、梯度累积和混合精度训练
    • 集成了多种正则化技术,如Dropout、权重衰减、标签平滑等
  3. 超参数调优

    • 提供了网格搜索、随机搜索和贝叶斯优化等调优方法
    • 实现了早停机制和多种学习率调度策略
    • 支持自动化的超参数搜索和结果分析
  4. 分布式训练

    • 实现了数据并行和模型并行训练
    • 支持多GPU和多节点训练
    • 提供了流水线并行训练的基础框架
  5. 性能监控

    • 建立了完整的训练监控系统
    • 实现了性能分析和调试工具
    • 支持实时指标记录和可视化

7.7.2 实践建议

  1. 训练准备

    • 合理设置批次大小和学习率
    • 使用预热策略稳定训练初期
    • 定期保存检查点以防意外中断
  2. 优化策略

    • 根据模型大小选择合适的优化器
    • 使用梯度裁剪防止梯度爆炸
    • 适当使用正则化技术防止过拟合
  3. 监控调试

    • 实时监控训练指标和系统资源
    • 定期检查梯度和权重分布
    • 使用可视化工具分析训练过程
  4. 性能优化

    • 使用混合精度训练加速训练过程
    • 合理配置数据加载器提高I/O效率
    • 根据硬件条件选择合适的并行策略

7.7.3 下一步学习

完成本章学习后,建议:

  1. 实践不同的训练策略,比较其效果差异
  2. 尝试在实际数据集上应用这些优化技术
  3. 学习更高级的分布式训练技术
  4. 深入了解特定领域的训练技巧

通过本章的学习,你应该能够: - 设计和实现完整的Text2SQL模型训练流程 - 选择和配置合适的优化策略 - 进行有效的超参数调优 - 监控和调试训练过程 - 应用分布式训练技术处理大规模数据

这些技能将为你在Text2SQL领域的深入研究和实际应用奠定坚实的基础。 “`