本章概述
评估与优化是RAG系统开发的关键环节,直接决定系统的实际效果和用户体验。本章将深入介绍RAG系统的评估指标、测试方法、性能优化策略,以及如何构建持续改进的评估体系。
学习目标
- 掌握RAG系统的评估指标和方法
- 学习自动化评估和人工评估技术
- 了解A/B测试和在线评估策略
- 熟悉性能优化和系统调优技巧
- 掌握持续监控和改进方法
1. 评估指标体系
1.1 检索评估指标
# src/evaluation/retrieval_metrics.py - 检索评估指标
from typing import List, Dict, Any, Tuple, Optional
import numpy as np
from dataclasses import dataclass
import logging
from collections import defaultdict
from ..vectorstore.vector_manager import VectorSearchResult
@dataclass
class RetrievalEvaluation:
"""检索评估结果"""
precision_at_k: Dict[int, float]
recall_at_k: Dict[int, float]
f1_at_k: Dict[int, float]
map_score: float # Mean Average Precision
ndcg_at_k: Dict[int, float] # Normalized Discounted Cumulative Gain
mrr: float # Mean Reciprocal Rank
hit_rate_at_k: Dict[int, float]
class RetrievalEvaluator:
"""检索评估器"""
def __init__(self, k_values: List[int] = None):
self.k_values = k_values or [1, 3, 5, 10, 20]
def evaluate(self,
predictions: List[List[VectorSearchResult]],
ground_truths: List[List[str]]) -> RetrievalEvaluation:
"""评估检索结果"""
if len(predictions) != len(ground_truths):
raise ValueError("预测结果和真实标签数量不匹配")
# 计算各项指标
precision_at_k = self._calculate_precision_at_k(predictions, ground_truths)
recall_at_k = self._calculate_recall_at_k(predictions, ground_truths)
f1_at_k = self._calculate_f1_at_k(precision_at_k, recall_at_k)
map_score = self._calculate_map(predictions, ground_truths)
ndcg_at_k = self._calculate_ndcg_at_k(predictions, ground_truths)
mrr = self._calculate_mrr(predictions, ground_truths)
hit_rate_at_k = self._calculate_hit_rate_at_k(predictions, ground_truths)
return RetrievalEvaluation(
precision_at_k=precision_at_k,
recall_at_k=recall_at_k,
f1_at_k=f1_at_k,
map_score=map_score,
ndcg_at_k=ndcg_at_k,
mrr=mrr,
hit_rate_at_k=hit_rate_at_k
)
def _calculate_precision_at_k(self,
predictions: List[List[VectorSearchResult]],
ground_truths: List[List[str]]) -> Dict[int, float]:
"""计算Precision@K"""
precision_scores = {k: [] for k in self.k_values}
for pred_results, true_ids in zip(predictions, ground_truths):
true_set = set(true_ids)
for k in self.k_values:
if k > len(pred_results):
precision_scores[k].append(0.0)
continue
pred_ids = [result.chunk_id for result in pred_results[:k]]
relevant_retrieved = len(set(pred_ids).intersection(true_set))
precision = relevant_retrieved / k
precision_scores[k].append(precision)
return {k: np.mean(scores) for k, scores in precision_scores.items()}
def _calculate_recall_at_k(self,
predictions: List[List[VectorSearchResult]],
ground_truths: List[List[str]]) -> Dict[int, float]:
"""计算Recall@K"""
recall_scores = {k: [] for k in self.k_values}
for pred_results, true_ids in zip(predictions, ground_truths):
true_set = set(true_ids)
if not true_set: # 避免除零错误
for k in self.k_values:
recall_scores[k].append(0.0)
continue
for k in self.k_values:
pred_ids = [result.chunk_id for result in pred_results[:k]]
relevant_retrieved = len(set(pred_ids).intersection(true_set))
recall = relevant_retrieved / len(true_set)
recall_scores[k].append(recall)
return {k: np.mean(scores) for k, scores in recall_scores.items()}
def _calculate_f1_at_k(self,
precision_at_k: Dict[int, float],
recall_at_k: Dict[int, float]) -> Dict[int, float]:
"""计算F1@K"""
f1_scores = {}
for k in self.k_values:
p = precision_at_k[k]
r = recall_at_k[k]
if p + r == 0:
f1_scores[k] = 0.0
else:
f1_scores[k] = 2 * p * r / (p + r)
return f1_scores
def _calculate_map(self,
predictions: List[List[VectorSearchResult]],
ground_truths: List[List[str]]) -> float:
"""计算Mean Average Precision"""
ap_scores = []
for pred_results, true_ids in zip(predictions, ground_truths):
true_set = set(true_ids)
if not true_set:
ap_scores.append(0.0)
continue
# 计算Average Precision
relevant_count = 0
precision_sum = 0.0
for i, result in enumerate(pred_results):
if result.chunk_id in true_set:
relevant_count += 1
precision_at_i = relevant_count / (i + 1)
precision_sum += precision_at_i
if relevant_count == 0:
ap_scores.append(0.0)
else:
ap_scores.append(precision_sum / len(true_set))
return np.mean(ap_scores)
def _calculate_ndcg_at_k(self,
predictions: List[List[VectorSearchResult]],
ground_truths: List[List[str]]) -> Dict[int, float]:
"""计算NDCG@K"""
ndcg_scores = {k: [] for k in self.k_values}
for pred_results, true_ids in zip(predictions, ground_truths):
true_set = set(true_ids)
for k in self.k_values:
# 计算DCG
dcg = 0.0
for i, result in enumerate(pred_results[:k]):
relevance = 1.0 if result.chunk_id in true_set else 0.0
dcg += relevance / np.log2(i + 2) # i+2 because log2(1) = 0
# 计算IDCG (理想DCG)
ideal_relevances = [1.0] * min(len(true_set), k)
idcg = sum(rel / np.log2(i + 2) for i, rel in enumerate(ideal_relevances))
# 计算NDCG
if idcg == 0:
ndcg_scores[k].append(0.0)
else:
ndcg_scores[k].append(dcg / idcg)
return {k: np.mean(scores) for k, scores in ndcg_scores.items()}
def _calculate_mrr(self,
predictions: List[List[VectorSearchResult]],
ground_truths: List[List[str]]) -> float:
"""计算Mean Reciprocal Rank"""
rr_scores = []
for pred_results, true_ids in zip(predictions, ground_truths):
true_set = set(true_ids)
# 找到第一个相关结果的位置
for i, result in enumerate(pred_results):
if result.chunk_id in true_set:
rr_scores.append(1.0 / (i + 1))
break
else:
rr_scores.append(0.0) # 没有找到相关结果
return np.mean(rr_scores)
def _calculate_hit_rate_at_k(self,
predictions: List[List[VectorSearchResult]],
ground_truths: List[List[str]]) -> Dict[int, float]:
"""计算Hit Rate@K"""
hit_scores = {k: [] for k in self.k_values}
for pred_results, true_ids in zip(predictions, ground_truths):
true_set = set(true_ids)
for k in self.k_values:
pred_ids = [result.chunk_id for result in pred_results[:k]]
hit = 1.0 if set(pred_ids).intersection(true_set) else 0.0
hit_scores[k].append(hit)
return {k: np.mean(scores) for k, scores in hit_scores.items()}
1.2 生成评估指标
# src/evaluation/generation_metrics.py - 生成评估指标
from typing import List, Dict, Any, Optional
import numpy as np
from dataclasses import dataclass
import logging
from ..generation.generator import GenerationResult
@dataclass
class GenerationEvaluation:
"""生成评估结果"""
bleu_score: float
rouge_scores: Dict[str, float]
bert_score: float
semantic_similarity: float
factual_accuracy: float
relevance_score: float
coherence_score: float
fluency_score: float
class GenerationEvaluator:
"""生成评估器"""
def __init__(self, embedding_model=None):
self.embedding_model = embedding_model
def evaluate(self,
predictions: List[str],
references: List[str],
contexts: List[str] = None) -> GenerationEvaluation:
"""评估生成结果"""
if len(predictions) != len(references):
raise ValueError("预测结果和参考答案数量不匹配")
# 计算各项指标
bleu_score = self._calculate_bleu(predictions, references)
rouge_scores = self._calculate_rouge(predictions, references)
bert_score = self._calculate_bert_score(predictions, references)
semantic_similarity = self._calculate_semantic_similarity(predictions, references)
factual_accuracy = self._calculate_factual_accuracy(predictions, references, contexts)
relevance_score = self._calculate_relevance(predictions, contexts)
coherence_score = self._calculate_coherence(predictions)
fluency_score = self._calculate_fluency(predictions)
return GenerationEvaluation(
bleu_score=bleu_score,
rouge_scores=rouge_scores,
bert_score=bert_score,
semantic_similarity=semantic_similarity,
factual_accuracy=factual_accuracy,
relevance_score=relevance_score,
coherence_score=coherence_score,
fluency_score=fluency_score
)
def _calculate_bleu(self, predictions: List[str], references: List[str]) -> float:
"""计算BLEU分数"""
try:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import nltk
# 确保下载必要的数据
try:
nltk.data.find('tokenizers/punkt')
except LookupError:
nltk.download('punkt')
smoothing = SmoothingFunction().method1
bleu_scores = []
for pred, ref in zip(predictions, references):
# 分词
pred_tokens = nltk.word_tokenize(pred.lower())
ref_tokens = [nltk.word_tokenize(ref.lower())]
# 计算BLEU
score = sentence_bleu(ref_tokens, pred_tokens, smoothing_function=smoothing)
bleu_scores.append(score)
return np.mean(bleu_scores)
except ImportError:
logging.warning("NLTK未安装,跳过BLEU计算")
return 0.0
except Exception as e:
logging.error(f"BLEU计算失败: {e}")
return 0.0
def _calculate_rouge(self, predictions: List[str], references: List[str]) -> Dict[str, float]:
"""计算ROUGE分数"""
try:
from rouge_score import rouge_scorer
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
rouge_scores = {'rouge1': [], 'rouge2': [], 'rougeL': []}
for pred, ref in zip(predictions, references):
scores = scorer.score(ref, pred)
for metric in rouge_scores.keys():
rouge_scores[metric].append(scores[metric].fmeasure)
return {metric: np.mean(scores) for metric, scores in rouge_scores.items()}
except ImportError:
logging.warning("rouge-score未安装,跳过ROUGE计算")
return {'rouge1': 0.0, 'rouge2': 0.0, 'rougeL': 0.0}
except Exception as e:
logging.error(f"ROUGE计算失败: {e}")
return {'rouge1': 0.0, 'rouge2': 0.0, 'rougeL': 0.0}
def _calculate_bert_score(self, predictions: List[str], references: List[str]) -> float:
"""计算BERTScore"""
try:
from bert_score import score
P, R, F1 = score(predictions, references, lang='zh', verbose=False)
return F1.mean().item()
except ImportError:
logging.warning("bert-score未安装,跳过BERTScore计算")
return 0.0
except Exception as e:
logging.error(f"BERTScore计算失败: {e}")
return 0.0
def _calculate_semantic_similarity(self, predictions: List[str], references: List[str]) -> float:
"""计算语义相似度"""
if not self.embedding_model:
return 0.0
try:
# 获取嵌入
pred_embeddings = self.embedding_model.encode(predictions)
ref_embeddings = self.embedding_model.encode(references)
# 计算余弦相似度
from sklearn.metrics.pairwise import cosine_similarity
similarities = []
for pred_emb, ref_emb in zip(pred_embeddings, ref_embeddings):
sim = cosine_similarity([pred_emb], [ref_emb])[0][0]
similarities.append(sim)
return np.mean(similarities)
except Exception as e:
logging.error(f"语义相似度计算失败: {e}")
return 0.0
def _calculate_factual_accuracy(self,
predictions: List[str],
references: List[str],
contexts: List[str] = None) -> float:
"""计算事实准确性"""
# 简化的事实准确性评估(实际应用中可以使用更复杂的方法)
accuracy_scores = []
for pred, ref in zip(predictions, references):
# 提取关键实体和数字
pred_entities = self._extract_entities(pred)
ref_entities = self._extract_entities(ref)
# 计算实体匹配度
if not ref_entities:
accuracy_scores.append(1.0) # 如果参考答案没有实体,认为准确
else:
matched = len(pred_entities.intersection(ref_entities))
accuracy = matched / len(ref_entities)
accuracy_scores.append(accuracy)
return np.mean(accuracy_scores)
def _extract_entities(self, text: str) -> set:
"""提取实体(简化版本)"""
import re
entities = set()
# 提取数字
numbers = re.findall(r'\d+(?:\.\d+)?', text)
entities.update(numbers)
# 提取专有名词(大写开头的词)
proper_nouns = re.findall(r'\b[A-Z][a-z]+\b', text)
entities.update(proper_nouns)
# 提取中文专有名词(简化)
chinese_entities = re.findall(r'[\u4e00-\u9fff]{2,}', text)
entities.update(chinese_entities)
return entities
def _calculate_relevance(self, predictions: List[str], contexts: List[str] = None) -> float:
"""计算相关性分数"""
if not contexts:
return 0.5 # 默认值
relevance_scores = []
for pred, context in zip(predictions, contexts):
# 计算预测答案与上下文的相关性
if self.embedding_model:
try:
pred_emb = self.embedding_model.encode([pred])[0]
context_emb = self.embedding_model.encode([context])[0]
from sklearn.metrics.pairwise import cosine_similarity
relevance = cosine_similarity([pred_emb], [context_emb])[0][0]
relevance_scores.append(relevance)
except:
relevance_scores.append(0.5)
else:
# 简单的词汇重叠
pred_words = set(pred.lower().split())
context_words = set(context.lower().split())
if not pred_words:
relevance_scores.append(0.0)
else:
overlap = len(pred_words.intersection(context_words))
relevance = overlap / len(pred_words)
relevance_scores.append(relevance)
return np.mean(relevance_scores)
def _calculate_coherence(self, predictions: List[str]) -> float:
"""计算连贯性分数"""
coherence_scores = []
for pred in predictions:
sentences = pred.split('。') # 简单的句子分割
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) <= 1:
coherence_scores.append(1.0)
continue
# 计算句子间的相似度
if self.embedding_model:
try:
sent_embeddings = self.embedding_model.encode(sentences)
from sklearn.metrics.pairwise import cosine_similarity
similarities = []
for i in range(len(sentences) - 1):
sim = cosine_similarity([sent_embeddings[i]], [sent_embeddings[i+1]])[0][0]
similarities.append(sim)
coherence = np.mean(similarities)
coherence_scores.append(coherence)
except:
coherence_scores.append(0.5)
else:
# 简单的词汇连贯性
coherence_scores.append(0.5)
return np.mean(coherence_scores)
def _calculate_fluency(self, predictions: List[str]) -> float:
"""计算流畅性分数"""
fluency_scores = []
for pred in predictions:
# 简单的流畅性评估
score = 1.0
# 检查重复词汇
words = pred.split()
if words:
unique_words = set(words)
repetition_ratio = len(words) / len(unique_words)
if repetition_ratio > 2.0: # 重复度过高
score -= 0.3
# 检查句子长度
sentences = pred.split('。')
avg_length = np.mean([len(s.split()) for s in sentences if s.strip()])
if avg_length < 3 or avg_length > 50: # 句子过短或过长
score -= 0.2
# 检查标点符号
if not any(p in pred for p in '。!?'):
score -= 0.2
fluency_scores.append(max(0.0, score))
return np.mean(fluency_scores)
2. 自动化评估系统
2.1 评估管道
# src/evaluation/evaluation_pipeline.py - 评估管道
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
import json
import logging
from pathlib import Path
from datetime import datetime
from .retrieval_metrics import RetrievalEvaluator, RetrievalEvaluation
from .generation_metrics import GenerationEvaluator, GenerationEvaluation
from ..vectorstore.vector_manager import VectorSearchResult
@dataclass
class EvaluationDataset:
"""评估数据集"""
queries: List[str]
ground_truth_chunks: List[List[str]] # 每个查询对应的相关chunk ID列表
reference_answers: List[str] # 参考答案
contexts: List[str] = None # 上下文信息
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.contexts is None:
self.contexts = [""] * len(self.queries)
if self.metadata is None:
self.metadata = {}
@dataclass
class EvaluationResult:
"""评估结果"""
dataset_name: str
timestamp: str
retrieval_metrics: RetrievalEvaluation
generation_metrics: GenerationEvaluation
overall_score: float
detailed_results: List[Dict[str, Any]]
config: Dict[str, Any]
class EvaluationPipeline:
"""评估管道"""
def __init__(self,
retrieval_evaluator: RetrievalEvaluator,
generation_evaluator: GenerationEvaluator,
output_dir: str = "./evaluation_results"):
self.retrieval_evaluator = retrieval_evaluator
self.generation_evaluator = generation_evaluator
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
def evaluate_rag_system(self,
rag_system,
dataset: EvaluationDataset,
dataset_name: str = "default") -> EvaluationResult:
"""评估RAG系统"""
logging.info(f"开始评估RAG系统,数据集: {dataset_name}")
# 收集预测结果
retrieval_predictions = []
generation_predictions = []
detailed_results = []
for i, query in enumerate(dataset.queries):
try:
# 获取RAG系统的预测结果
rag_result = rag_system.query(query)
# 提取检索结果
retrieval_results = rag_result.get('retrieval_results', [])
retrieval_predictions.append(retrieval_results)
# 提取生成结果
generated_answer = rag_result.get('answer', '')
generation_predictions.append(generated_answer)
# 记录详细结果
detailed_result = {
'query_id': i,
'query': query,
'retrieved_chunks': [r.chunk_id for r in retrieval_results],
'generated_answer': generated_answer,
'reference_answer': dataset.reference_answers[i],
'ground_truth_chunks': dataset.ground_truth_chunks[i],
'retrieval_scores': [r.score for r in retrieval_results]
}
detailed_results.append(detailed_result)
except Exception as e:
logging.error(f"查询 {i} 评估失败: {e}")
# 添加空结果
retrieval_predictions.append([])
generation_predictions.append("")
detailed_results.append({
'query_id': i,
'query': query,
'error': str(e)
})
# 评估检索性能
retrieval_metrics = self.retrieval_evaluator.evaluate(
retrieval_predictions,
dataset.ground_truth_chunks
)
# 评估生成性能
generation_metrics = self.generation_evaluator.evaluate(
generation_predictions,
dataset.reference_answers,
dataset.contexts
)
# 计算总体分数
overall_score = self._calculate_overall_score(retrieval_metrics, generation_metrics)
# 创建评估结果
result = EvaluationResult(
dataset_name=dataset_name,
timestamp=datetime.now().isoformat(),
retrieval_metrics=retrieval_metrics,
generation_metrics=generation_metrics,
overall_score=overall_score,
detailed_results=detailed_results,
config=self._get_system_config(rag_system)
)
# 保存结果
self._save_results(result)
logging.info(f"评估完成,总体分数: {overall_score:.3f}")
return result
def _calculate_overall_score(self,
retrieval_metrics: RetrievalEvaluation,
generation_metrics: GenerationEvaluation) -> float:
"""计算总体分数"""
# 检索分数(权重0.4)
retrieval_score = (
retrieval_metrics.precision_at_k.get(5, 0) * 0.3 +
retrieval_metrics.recall_at_k.get(5, 0) * 0.3 +
retrieval_metrics.ndcg_at_k.get(5, 0) * 0.4
)
# 生成分数(权重0.6)
generation_score = (
generation_metrics.semantic_similarity * 0.25 +
generation_metrics.factual_accuracy * 0.25 +
generation_metrics.relevance_score * 0.25 +
generation_metrics.coherence_score * 0.25
)
# 总体分数
overall_score = retrieval_score * 0.4 + generation_score * 0.6
return overall_score
def _get_system_config(self, rag_system) -> Dict[str, Any]:
"""获取系统配置"""
try:
return {
'retrieval_config': getattr(rag_system, 'retrieval_config', {}),
'generation_config': getattr(rag_system, 'generation_config', {}),
'chunking_config': getattr(rag_system, 'chunking_config', {})
}
except:
return {}
def _save_results(self, result: EvaluationResult):
"""保存评估结果"""
timestamp = result.timestamp.replace(':', '-').replace('.', '-')
filename = f"{result.dataset_name}_{timestamp}.json"
filepath = self.output_dir / filename
# 转换为可序列化的格式
result_dict = {
'dataset_name': result.dataset_name,
'timestamp': result.timestamp,
'overall_score': result.overall_score,
'retrieval_metrics': asdict(result.retrieval_metrics),
'generation_metrics': asdict(result.generation_metrics),
'detailed_results': result.detailed_results,
'config': result.config
}
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(result_dict, f, ensure_ascii=False, indent=2)
logging.info(f"评估结果已保存到: {filepath}")
def load_dataset(self, dataset_path: str) -> EvaluationDataset:
"""加载评估数据集"""
with open(dataset_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return EvaluationDataset(
queries=data['queries'],
ground_truth_chunks=data['ground_truth_chunks'],
reference_answers=data['reference_answers'],
contexts=data.get('contexts'),
metadata=data.get('metadata', {})
)
def create_dataset(self,
queries: List[str],
ground_truth_chunks: List[List[str]],
reference_answers: List[str],
contexts: List[str] = None,
save_path: str = None) -> EvaluationDataset:
"""创建评估数据集"""
dataset = EvaluationDataset(
queries=queries,
ground_truth_chunks=ground_truth_chunks,
reference_answers=reference_answers,
contexts=contexts
)
if save_path:
dataset_dict = {
'queries': dataset.queries,
'ground_truth_chunks': dataset.ground_truth_chunks,
'reference_answers': dataset.reference_answers,
'contexts': dataset.contexts,
'metadata': dataset.metadata
}
with open(save_path, 'w', encoding='utf-8') as f:
json.dump(dataset_dict, f, ensure_ascii=False, indent=2)
logging.info(f"数据集已保存到: {save_path}")
return dataset
def compare_results(self, result_files: List[str]) -> Dict[str, Any]:
"""比较多个评估结果"""
results = []
for file_path in result_files:
with open(file_path, 'r', encoding='utf-8') as f:
result = json.load(f)
results.append(result)
# 创建比较报告
comparison = {
'summary': {
'num_results': len(results),
'datasets': [r['dataset_name'] for r in results],
'timestamps': [r['timestamp'] for r in results]
},
'overall_scores': [r['overall_score'] for r in results],
'best_result_index': max(range(len(results)), key=lambda i: results[i]['overall_score']),
'metric_comparison': self._compare_metrics(results)
}
return comparison
def _compare_metrics(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""比较指标"""
comparison = {
'retrieval_metrics': {},
'generation_metrics': {}
}
# 比较检索指标
for metric in ['precision_at_k', 'recall_at_k', 'ndcg_at_k']:
comparison['retrieval_metrics'][metric] = {
'values': [r['retrieval_metrics'][metric] for r in results],
'best_index': max(range(len(results)),
key=lambda i: sum(r['retrieval_metrics'][metric].values()))
}
# 比较生成指标
for metric in ['semantic_similarity', 'factual_accuracy', 'relevance_score', 'coherence_score']:
values = [r['generation_metrics'][metric] for r in results]
comparison['generation_metrics'][metric] = {
'values': values,
'best_index': max(range(len(results)), key=lambda i: values[i])
}
return comparison
3. A/B测试框架
3.1 在线评估系统
# src/evaluation/ab_testing.py - A/B测试框架
from typing import List, Dict, Any, Optional, Callable
from dataclasses import dataclass
import random
import logging
import json
from datetime import datetime, timedelta
from collections import defaultdict
import numpy as np
from scipy import stats
@dataclass
class ABTestConfig:
"""A/B测试配置"""
test_name: str
traffic_split: Dict[str, float] # 流量分配比例
min_sample_size: int = 100
confidence_level: float = 0.95
test_duration_days: int = 7
primary_metric: str = "overall_score"
secondary_metrics: List[str] = None
def __post_init__(self):
if self.secondary_metrics is None:
self.secondary_metrics = ["response_time", "user_satisfaction"]
# 验证流量分配
total_traffic = sum(self.traffic_split.values())
if abs(total_traffic - 1.0) > 0.01:
raise ValueError(f"流量分配总和必须为1.0,当前为: {total_traffic}")
@dataclass
class ABTestResult:
"""A/B测试结果"""
test_name: str
variant_name: str
sample_size: int
metric_values: Dict[str, List[float]]
mean_values: Dict[str, float]
confidence_intervals: Dict[str, Tuple[float, float]]
class ABTestManager:
"""A/B测试管理器"""
def __init__(self, config: ABTestConfig):
self.config = config
self.variants = {} # 存储不同变体的RAG系统
self.results = defaultdict(lambda: defaultdict(list)) # 存储测试结果
self.user_assignments = {} # 用户分组分配
self.test_start_time = datetime.now()
def register_variant(self, variant_name: str, rag_system):
"""注册测试变体"""
if variant_name not in self.config.traffic_split:
raise ValueError(f"变体 {variant_name} 不在流量分配配置中")
self.variants[variant_name] = rag_system
logging.info(f"已注册变体: {variant_name}")
def assign_user_to_variant(self, user_id: str) -> str:
"""为用户分配变体"""
if user_id in self.user_assignments:
return self.user_assignments[user_id]
# 基于用户ID的哈希进行稳定分配
random.seed(hash(user_id))
rand_val = random.random()
cumulative_prob = 0.0
for variant_name, prob in self.config.traffic_split.items():
cumulative_prob += prob
if rand_val <= cumulative_prob:
self.user_assignments[user_id] = variant_name
return variant_name
# 默认分配到第一个变体
default_variant = list(self.config.traffic_split.keys())[0]
self.user_assignments[user_id] = default_variant
return default_variant
def process_query(self, user_id: str, query: str) -> Dict[str, Any]:
"""处理用户查询并记录结果"""
variant_name = self.assign_user_to_variant(user_id)
if variant_name not in self.variants:
raise ValueError(f"变体 {variant_name} 未注册")
rag_system = self.variants[variant_name]
# 记录开始时间
start_time = datetime.now()
try:
# 执行查询
result = rag_system.query(query)
# 记录响应时间
response_time = (datetime.now() - start_time).total_seconds()
# 记录指标
self._record_metrics(variant_name, {
'response_time': response_time,
'success': True,
'query_length': len(query),
'answer_length': len(result.get('answer', '')),
'num_retrieved_docs': len(result.get('retrieval_results', []))
})
return {
'variant': variant_name,
'result': result,
'response_time': response_time
}
except Exception as e:
# 记录错误
response_time = (datetime.now() - start_time).total_seconds()
self._record_metrics(variant_name, {
'response_time': response_time,
'success': False,
'error': str(e)
})
raise
def record_user_feedback(self, user_id: str, satisfaction_score: float):
"""记录用户反馈"""
if user_id not in self.user_assignments:
logging.warning(f"用户 {user_id} 未分配变体")
return
variant_name = self.user_assignments[user_id]
self._record_metrics(variant_name, {
'user_satisfaction': satisfaction_score
})
def _record_metrics(self, variant_name: str, metrics: Dict[str, Any]):
"""记录指标数据"""
for metric_name, value in metrics.items():
if isinstance(value, (int, float)):
self.results[variant_name][metric_name].append(value)
def analyze_results(self) -> Dict[str, Any]:
"""分析A/B测试结果"""
analysis = {
'test_config': {
'test_name': self.config.test_name,
'start_time': self.test_start_time.isoformat(),
'duration_days': (datetime.now() - self.test_start_time).days,
'traffic_split': self.config.traffic_split
},
'variant_results': {},
'statistical_tests': {},
'recommendations': []
}
# 分析每个变体的结果
variant_results = {}
for variant_name in self.variants.keys():
variant_results[variant_name] = self._analyze_variant(variant_name)
analysis['variant_results'] = variant_results
# 进行统计检验
analysis['statistical_tests'] = self._perform_statistical_tests(variant_results)
# 生成建议
analysis['recommendations'] = self._generate_recommendations(analysis)
return analysis
def _analyze_variant(self, variant_name: str) -> ABTestResult:
"""分析单个变体的结果"""
variant_data = self.results[variant_name]
metric_values = {}
mean_values = {}
confidence_intervals = {}
for metric_name, values in variant_data.items():
if values:
metric_values[metric_name] = values
mean_values[metric_name] = np.mean(values)
# 计算置信区间
if len(values) > 1:
ci = stats.t.interval(
self.config.confidence_level,
len(values) - 1,
loc=np.mean(values),
scale=stats.sem(values)
)
confidence_intervals[metric_name] = ci
else:
confidence_intervals[metric_name] = (mean_values[metric_name], mean_values[metric_name])
sample_size = len(variant_data.get(self.config.primary_metric, []))
return ABTestResult(
test_name=self.config.test_name,
variant_name=variant_name,
sample_size=sample_size,
metric_values=metric_values,
mean_values=mean_values,
confidence_intervals=confidence_intervals
)
def _perform_statistical_tests(self, variant_results: Dict[str, ABTestResult]) -> Dict[str, Any]:
"""执行统计检验"""
statistical_tests = {}
variant_names = list(variant_results.keys())
if len(variant_names) < 2:
return statistical_tests
# 对每个指标进行两两比较
for metric_name in [self.config.primary_metric] + self.config.secondary_metrics:
metric_tests = {}
for i in range(len(variant_names)):
for j in range(i + 1, len(variant_names)):
variant_a = variant_names[i]
variant_b = variant_names[j]
values_a = variant_results[variant_a].metric_values.get(metric_name, [])
values_b = variant_results[variant_b].metric_values.get(metric_name, [])
if len(values_a) >= self.config.min_sample_size and len(values_b) >= self.config.min_sample_size:
# 执行t检验
t_stat, p_value = stats.ttest_ind(values_a, values_b)
# 计算效应大小(Cohen's d)
pooled_std = np.sqrt(((len(values_a) - 1) * np.var(values_a, ddof=1) +
(len(values_b) - 1) * np.var(values_b, ddof=1)) /
(len(values_a) + len(values_b) - 2))
if pooled_std > 0:
cohens_d = (np.mean(values_a) - np.mean(values_b)) / pooled_std
else:
cohens_d = 0.0
metric_tests[f"{variant_a}_vs_{variant_b}"] = {
't_statistic': t_stat,
'p_value': p_value,
'cohens_d': cohens_d,
'significant': p_value < (1 - self.config.confidence_level),
'sample_sizes': [len(values_a), len(values_b)]
}
if metric_tests:
statistical_tests[metric_name] = metric_tests
return statistical_tests
def _generate_recommendations(self, analysis: Dict[str, Any]) -> List[str]:
"""生成测试建议"""
recommendations = []
variant_results = analysis['variant_results']
statistical_tests = analysis['statistical_tests']
# 检查样本量
for variant_name, result in variant_results.items():
if result.sample_size < self.config.min_sample_size:
recommendations.append(
f"变体 {variant_name} 的样本量 ({result.sample_size}) 小于最小要求 ({self.config.min_sample_size}),建议继续收集数据"
)
# 检查主要指标的显著性
primary_metric_tests = statistical_tests.get(self.config.primary_metric, {})
if primary_metric_tests:
best_variant = max(variant_results.keys(),
key=lambda v: variant_results[v].mean_values.get(self.config.primary_metric, 0))
significant_improvements = []
for test_name, test_result in primary_metric_tests.items():
if test_result['significant'] and test_result['cohens_d'] > 0.2: # 小效应大小阈值
significant_improvements.append(test_name)
if significant_improvements:
recommendations.append(
f"变体 {best_variant} 在主要指标 {self.config.primary_metric} 上表现最佳,建议采用"
)
else:
recommendations.append(
"各变体在主要指标上无显著差异,建议继续测试或考虑其他优化方向"
)
# 检查测试持续时间
test_duration = (datetime.now() - self.test_start_time).days
if test_duration < self.config.test_duration_days:
recommendations.append(
f"测试持续时间 ({test_duration} 天) 少于建议时间 ({self.config.test_duration_days} 天),建议继续测试"
)
return recommendations
def save_results(self, filepath: str):
"""保存测试结果"""
analysis = self.analyze_results()
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(analysis, f, ensure_ascii=False, indent=2, default=str)
logging.info(f"A/B测试结果已保存到: {filepath}")
本章总结
本章深入介绍了RAG系统的评估与优化技术:
核心要点
评估指标体系:
- 检索指标:Precision@K、Recall@K、NDCG、MAP等
- 生成指标:BLEU、ROUGE、BERTScore、语义相似度等
- 综合评估:结合多维度指标的整体评分
自动化评估:
- 评估管道:标准化的评估流程
- 数据集管理:评估数据的创建和管理
- 结果比较:多版本系统的对比分析
A/B测试框架:
- 在线评估:真实用户环境下的测试
- 统计分析:科学的显著性检验
- 决策支持:基于数据的优化建议
最佳实践
评估设计:
- 构建高质量的评估数据集
- 选择合适的评估指标
- 平衡自动化和人工评估
持续优化:
- 建立定期评估机制
- 监控关键性能指标
- 基于用户反馈持续改进
实验管理:
- 科学设计A/B测试
- 确保足够的样本量
- 考虑长期效应和用户体验
下一章我们将学习RAG系统的部署与运维,包括系统架构设计、性能优化、监控告警等内容。