本章概述
智能分块是RAG系统中的关键环节,直接影响检索精度和生成质量。本章将深入介绍多种分块策略、向量化技术、相似度计算方法,以及如何优化分块效果。
学习目标
- 掌握多种文本分块策略和算法
- 学习向量化技术和嵌入模型选择
- 了解相似度计算和检索优化方法
- 熟悉分块质量评估和优化技巧
- 掌握向量数据库的使用和管理
1. 智能分块策略
1.1 基础分块器
# src/chunking/base_chunker.py - 基础分块器
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
import re
import math
from ..data.loaders import Document
@dataclass
class Chunk:
"""文本块数据结构"""
content: str
metadata: Dict[str, Any]
chunk_id: Optional[str] = None
doc_id: Optional[str] = None
start_index: Optional[int] = None
end_index: Optional[int] = None
def __post_init__(self):
if self.chunk_id is None:
import hashlib
self.chunk_id = hashlib.md5(
(self.content + str(self.metadata)).encode()
).hexdigest()[:12]
class BaseChunker(ABC):
"""分块器基类"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
@abstractmethod
def split_text(self, text: str) -> List[str]:
"""分割文本"""
pass
def create_chunks(self, document: Document) -> List[Chunk]:
"""创建文本块"""
text_chunks = self.split_text(document.content)
chunks = []
for i, chunk_text in enumerate(text_chunks):
chunk_metadata = document.metadata.copy()
chunk_metadata.update({
"chunk_index": i,
"total_chunks": len(text_chunks),
"chunk_size": len(chunk_text),
"word_count": len(chunk_text.split()),
"source_doc_id": document.doc_id
})
chunk = Chunk(
content=chunk_text,
metadata=chunk_metadata,
doc_id=document.doc_id,
start_index=self._get_start_index(document.content, chunk_text, i),
end_index=self._get_end_index(document.content, chunk_text, i)
)
chunks.append(chunk)
return chunks
def _get_start_index(self, full_text: str, chunk_text: str, chunk_index: int) -> int:
"""获取块在原文中的起始位置"""
try:
return full_text.find(chunk_text)
except:
return chunk_index * (self.chunk_size - self.chunk_overlap)
def _get_end_index(self, full_text: str, chunk_text: str, chunk_index: int) -> int:
"""获取块在原文中的结束位置"""
start_index = self._get_start_index(full_text, chunk_text, chunk_index)
return start_index + len(chunk_text)
class FixedSizeChunker(BaseChunker):
"""固定大小分块器"""
def split_text(self, text: str) -> List[str]:
"""按固定大小分割文本"""
if not text:
return []
chunks = []
start = 0
while start < len(text):
end = start + self.chunk_size
# 如果不是最后一块,尝试在单词边界分割
if end < len(text):
# 向后查找空格
while end > start and text[end] not in ' \n\t':
end -= 1
# 如果没找到合适的分割点,使用原始位置
if end == start:
end = start + self.chunk_size
chunk = text[start:end].strip()
if chunk:
chunks.append(chunk)
# 计算下一个块的起始位置(考虑重叠)
start = end - self.chunk_overlap
if start < 0:
start = 0
return chunks
class SentenceChunker(BaseChunker):
"""句子级分块器"""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200, language: str = "zh"):
super().__init__(chunk_size, chunk_overlap)
self.language = language
self._compile_sentence_patterns()
def _compile_sentence_patterns(self):
"""编译句子分割模式"""
if self.language == "zh":
# 中文句子分割
self.sentence_pattern = re.compile(r'[。!?;\n]+')
else:
# 英文句子分割
self.sentence_pattern = re.compile(r'[.!?;\n]+')
def split_text(self, text: str) -> List[str]:
"""按句子分割文本"""
if not text:
return []
# 分割成句子
sentences = self.sentence_pattern.split(text)
sentences = [s.strip() for s in sentences if s.strip()]
if not sentences:
return [text]
chunks = []
current_chunk = ""
current_size = 0
for sentence in sentences:
sentence_size = len(sentence)
# 如果当前句子加入后超过块大小
if current_size + sentence_size > self.chunk_size and current_chunk:
chunks.append(current_chunk.strip())
# 处理重叠
if self.chunk_overlap > 0:
overlap_text = self._get_overlap_text(current_chunk, self.chunk_overlap)
current_chunk = overlap_text + " " + sentence
current_size = len(current_chunk)
else:
current_chunk = sentence
current_size = sentence_size
else:
if current_chunk:
current_chunk += " " + sentence
else:
current_chunk = sentence
current_size += sentence_size
# 添加最后一个块
if current_chunk.strip():
chunks.append(current_chunk.strip())
return chunks
def _get_overlap_text(self, text: str, overlap_size: int) -> str:
"""获取重叠文本"""
if len(text) <= overlap_size:
return text
# 从末尾开始,尝试在句子边界截取
overlap_text = text[-overlap_size:]
# 查找句子开始位置
for pattern in ['. ', '。 ', '! ', '! ', '? ', '? ']:
pos = overlap_text.find(pattern)
if pos != -1:
return overlap_text[pos + len(pattern):]
return overlap_text
class SemanticChunker(BaseChunker):
"""语义分块器"""
def __init__(self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
similarity_threshold: float = 0.7,
embedding_model=None):
super().__init__(chunk_size, chunk_overlap)
self.similarity_threshold = similarity_threshold
self.embedding_model = embedding_model
def split_text(self, text: str) -> List[str]:
"""基于语义相似度分割文本"""
if not text or not self.embedding_model:
# 回退到句子分块
return SentenceChunker(self.chunk_size, self.chunk_overlap).split_text(text)
# 先按句子分割
sentence_chunker = SentenceChunker()
sentences = sentence_chunker.sentence_pattern.split(text)
sentences = [s.strip() for s in sentences if s.strip()]
if len(sentences) <= 1:
return [text]
# 计算句子嵌入
embeddings = self._get_embeddings(sentences)
# 基于相似度合并句子
chunks = []
current_chunk_sentences = [sentences[0]]
current_chunk_size = len(sentences[0])
for i in range(1, len(sentences)):
sentence = sentences[i]
sentence_size = len(sentence)
# 计算与当前块的相似度
similarity = self._calculate_similarity(
embeddings[i],
self._get_chunk_embedding(current_chunk_sentences, embeddings[:i])
)
# 判断是否应该合并
should_merge = (
similarity >= self.similarity_threshold and
current_chunk_size + sentence_size <= self.chunk_size
)
if should_merge:
current_chunk_sentences.append(sentence)
current_chunk_size += sentence_size
else:
# 完成当前块
chunks.append(" ".join(current_chunk_sentences))
current_chunk_sentences = [sentence]
current_chunk_size = sentence_size
# 添加最后一个块
if current_chunk_sentences:
chunks.append(" ".join(current_chunk_sentences))
return chunks
def _get_embeddings(self, sentences: List[str]) -> List:
"""获取句子嵌入"""
try:
return self.embedding_model.encode(sentences)
except Exception as e:
print(f"获取嵌入失败: {e}")
return [None] * len(sentences)
def _get_chunk_embedding(self, sentences: List[str], embeddings: List) -> Any:
"""获取块的平均嵌入"""
try:
import numpy as np
valid_embeddings = [emb for emb in embeddings if emb is not None]
if valid_embeddings:
return np.mean(valid_embeddings, axis=0)
return None
except:
return None
def _calculate_similarity(self, emb1, emb2) -> float:
"""计算相似度"""
try:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
if emb1 is None or emb2 is None:
return 0.0
return cosine_similarity([emb1], [emb2])[0][0]
except:
return 0.0
class ChunkerFactory:
"""分块器工厂"""
_chunkers = {
"fixed": FixedSizeChunker,
"sentence": SentenceChunker,
"semantic": SemanticChunker
}
@classmethod
def create_chunker(cls, chunker_type: str, **kwargs) -> BaseChunker:
"""创建分块器"""
if chunker_type not in cls._chunkers:
raise ValueError(f"不支持的分块器类型: {chunker_type}")
chunker_class = cls._chunkers[chunker_type]
return chunker_class(**kwargs)
@classmethod
def get_available_chunkers(cls) -> List[str]:
"""获取可用的分块器类型"""
return list(cls._chunkers.keys())
2. 向量化技术
2.1 嵌入模型管理
# src/embeddings/embedding_manager.py - 嵌入模型管理
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Union
import numpy as np
from dataclasses import dataclass
import logging
from pathlib import Path
@dataclass
class EmbeddingConfig:
"""嵌入配置"""
model_name: str
model_path: Optional[str] = None
device: str = "cpu" # cpu, cuda, mps
batch_size: int = 32
max_length: int = 512
normalize: bool = True
cache_embeddings: bool = True
cache_dir: Optional[str] = None
class BaseEmbeddingModel(ABC):
"""嵌入模型基类"""
def __init__(self, config: EmbeddingConfig):
self.config = config
self.model = None
self.tokenizer = None
self._setup_cache()
def _setup_cache(self):
"""设置缓存"""
if self.config.cache_embeddings:
cache_dir = self.config.cache_dir or "./cache/embeddings"
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.embedding_cache = {}
else:
self.embedding_cache = None
@abstractmethod
def load_model(self):
"""加载模型"""
pass
@abstractmethod
def encode(self, texts: Union[str, List[str]]) -> np.ndarray:
"""编码文本"""
pass
def get_embedding_dimension(self) -> int:
"""获取嵌入维度"""
test_embedding = self.encode(["test"])
return test_embedding.shape[1]
def _get_cache_key(self, text: str) -> str:
"""生成缓存键"""
import hashlib
return hashlib.md5(text.encode()).hexdigest()
def _get_cached_embedding(self, text: str) -> Optional[np.ndarray]:
"""获取缓存的嵌入"""
if not self.embedding_cache:
return None
cache_key = self._get_cache_key(text)
return self.embedding_cache.get(cache_key)
def _cache_embedding(self, text: str, embedding: np.ndarray):
"""缓存嵌入"""
if self.embedding_cache is not None:
cache_key = self._get_cache_key(text)
self.embedding_cache[cache_key] = embedding
class SentenceTransformerModel(BaseEmbeddingModel):
"""Sentence Transformer模型"""
def load_model(self):
"""加载Sentence Transformer模型"""
try:
from sentence_transformers import SentenceTransformer
model_path = self.config.model_path or self.config.model_name
self.model = SentenceTransformer(model_path, device=self.config.device)
logging.info(f"已加载Sentence Transformer模型: {model_path}")
except ImportError:
raise RuntimeError("请安装sentence-transformers库: pip install sentence-transformers")
except Exception as e:
raise RuntimeError(f"加载模型失败: {e}")
def encode(self, texts: Union[str, List[str]]) -> np.ndarray:
"""编码文本"""
if self.model is None:
self.load_model()
# 处理单个文本
if isinstance(texts, str):
texts = [texts]
# 检查缓存
cached_embeddings = []
uncached_texts = []
uncached_indices = []
for i, text in enumerate(texts):
cached_emb = self._get_cached_embedding(text)
if cached_emb is not None:
cached_embeddings.append((i, cached_emb))
else:
uncached_texts.append(text)
uncached_indices.append(i)
# 编码未缓存的文本
if uncached_texts:
try:
new_embeddings = self.model.encode(
uncached_texts,
batch_size=self.config.batch_size,
normalize_embeddings=self.config.normalize,
convert_to_numpy=True
)
# 缓存新的嵌入
for text, embedding in zip(uncached_texts, new_embeddings):
self._cache_embedding(text, embedding)
except Exception as e:
logging.error(f"编码文本失败: {e}")
raise
else:
new_embeddings = np.array([])
# 合并结果
all_embeddings = np.zeros((len(texts), self.get_embedding_dimension()))
# 填入缓存的嵌入
for idx, embedding in cached_embeddings:
all_embeddings[idx] = embedding
# 填入新的嵌入
for i, idx in enumerate(uncached_indices):
all_embeddings[idx] = new_embeddings[i]
return all_embeddings
class OpenAIEmbeddingModel(BaseEmbeddingModel):
"""OpenAI嵌入模型"""
def __init__(self, config: EmbeddingConfig, api_key: str):
super().__init__(config)
self.api_key = api_key
def load_model(self):
"""加载OpenAI客户端"""
try:
import openai
openai.api_key = self.api_key
self.client = openai
logging.info(f"已初始化OpenAI客户端: {self.config.model_name}")
except ImportError:
raise RuntimeError("请安装openai库: pip install openai")
def encode(self, texts: Union[str, List[str]]) -> np.ndarray:
"""使用OpenAI API编码文本"""
if self.client is None:
self.load_model()
if isinstance(texts, str):
texts = [texts]
embeddings = []
for text in texts:
# 检查缓存
cached_emb = self._get_cached_embedding(text)
if cached_emb is not None:
embeddings.append(cached_emb)
continue
try:
response = self.client.Embedding.create(
model=self.config.model_name,
input=text[:self.config.max_length]
)
embedding = np.array(response['data'][0]['embedding'])
if self.config.normalize:
embedding = embedding / np.linalg.norm(embedding)
self._cache_embedding(text, embedding)
embeddings.append(embedding)
except Exception as e:
logging.error(f"OpenAI API调用失败: {e}")
# 返回零向量作为fallback
embedding = np.zeros(1536) # text-embedding-ada-002的维度
embeddings.append(embedding)
return np.array(embeddings)
class EmbeddingModelFactory:
"""嵌入模型工厂"""
_models = {
"sentence_transformer": SentenceTransformerModel,
"openai": OpenAIEmbeddingModel
}
@classmethod
def create_model(cls, model_type: str, config: EmbeddingConfig, **kwargs) -> BaseEmbeddingModel:
"""创建嵌入模型"""
if model_type not in cls._models:
raise ValueError(f"不支持的模型类型: {model_type}")
model_class = cls._models[model_type]
return model_class(config, **kwargs)
@classmethod
def get_available_models(cls) -> List[str]:
"""获取可用的模型类型"""
return list(cls._models.keys())
3. 向量数据库集成
3.1 向量存储管理
# src/vectorstore/vector_manager.py - 向量存储管理
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from dataclasses import dataclass
import logging
import json
from pathlib import Path
@dataclass
class VectorSearchResult:
"""向量搜索结果"""
chunk_id: str
content: str
metadata: Dict[str, Any]
score: float
embedding: Optional[np.ndarray] = None
class BaseVectorStore(ABC):
"""向量存储基类"""
def __init__(self, dimension: int, collection_name: str = "default"):
self.dimension = dimension
self.collection_name = collection_name
@abstractmethod
def add_vectors(self,
vectors: np.ndarray,
metadatas: List[Dict[str, Any]],
ids: List[str]):
"""添加向量"""
pass
@abstractmethod
def search(self,
query_vector: np.ndarray,
top_k: int = 10,
filter_dict: Optional[Dict[str, Any]] = None) -> List[VectorSearchResult]:
"""搜索相似向量"""
pass
@abstractmethod
def delete_vectors(self, ids: List[str]):
"""删除向量"""
pass
@abstractmethod
def get_vector_count(self) -> int:
"""获取向量数量"""
pass
class ChromaVectorStore(BaseVectorStore):
"""Chroma向量存储"""
def __init__(self, dimension: int, collection_name: str = "default", persist_directory: str = "./chroma_db"):
super().__init__(dimension, collection_name)
self.persist_directory = persist_directory
self.client = None
self.collection = None
self._initialize()
def _initialize(self):
"""初始化Chroma客户端"""
try:
import chromadb
from chromadb.config import Settings
self.client = chromadb.PersistentClient(
path=self.persist_directory,
settings=Settings(anonymized_telemetry=False)
)
# 获取或创建集合
try:
self.collection = self.client.get_collection(name=self.collection_name)
except:
self.collection = self.client.create_collection(
name=self.collection_name,
metadata={"dimension": self.dimension}
)
logging.info(f"已初始化Chroma向量存储: {self.collection_name}")
except ImportError:
raise RuntimeError("请安装chromadb库: pip install chromadb")
except Exception as e:
raise RuntimeError(f"初始化Chroma失败: {e}")
def add_vectors(self, vectors: np.ndarray, metadatas: List[Dict[str, Any]], ids: List[str]):
"""添加向量到Chroma"""
try:
# 转换元数据为JSON字符串(Chroma要求)
documents = [json.dumps(meta, ensure_ascii=False) for meta in metadatas]
self.collection.add(
embeddings=vectors.tolist(),
documents=documents,
metadatas=metadatas,
ids=ids
)
logging.info(f"已添加 {len(ids)} 个向量到Chroma")
except Exception as e:
logging.error(f"添加向量到Chroma失败: {e}")
raise
def search(self, query_vector: np.ndarray, top_k: int = 10, filter_dict: Optional[Dict[str, Any]] = None) -> List[VectorSearchResult]:
"""在Chroma中搜索"""
try:
results = self.collection.query(
query_embeddings=[query_vector.tolist()],
n_results=top_k,
where=filter_dict
)
search_results = []
for i in range(len(results['ids'][0])):
result = VectorSearchResult(
chunk_id=results['ids'][0][i],
content=results['documents'][0][i] if results['documents'][0] else "",
metadata=results['metadatas'][0][i] if results['metadatas'][0] else {},
score=1 - results['distances'][0][i], # 转换距离为相似度
embedding=np.array(results['embeddings'][0][i]) if results.get('embeddings') else None
)
search_results.append(result)
return search_results
except Exception as e:
logging.error(f"Chroma搜索失败: {e}")
return []
def delete_vectors(self, ids: List[str]):
"""从Chroma删除向量"""
try:
self.collection.delete(ids=ids)
logging.info(f"已从Chroma删除 {len(ids)} 个向量")
except Exception as e:
logging.error(f"从Chroma删除向量失败: {e}")
def get_vector_count(self) -> int:
"""获取Chroma中的向量数量"""
try:
return self.collection.count()
except Exception as e:
logging.error(f"获取Chroma向量数量失败: {e}")
return 0
class FAISSVectorStore(BaseVectorStore):
"""FAISS向量存储"""
def __init__(self, dimension: int, collection_name: str = "default", index_type: str = "flat"):
super().__init__(dimension, collection_name)
self.index_type = index_type
self.index = None
self.id_to_metadata = {}
self.id_to_content = {}
self._initialize()
def _initialize(self):
"""初始化FAISS索引"""
try:
import faiss
if self.index_type == "flat":
self.index = faiss.IndexFlatIP(self.dimension) # 内积索引
elif self.index_type == "ivf":
quantizer = faiss.IndexFlatIP(self.dimension)
self.index = faiss.IndexIVFFlat(quantizer, self.dimension, 100)
else:
raise ValueError(f"不支持的索引类型: {self.index_type}")
logging.info(f"已初始化FAISS索引: {self.index_type}")
except ImportError:
raise RuntimeError("请安装faiss库: pip install faiss-cpu 或 pip install faiss-gpu")
except Exception as e:
raise RuntimeError(f"初始化FAISS失败: {e}")
def add_vectors(self, vectors: np.ndarray, metadatas: List[Dict[str, Any]], ids: List[str]):
"""添加向量到FAISS"""
try:
# 标准化向量(用于内积计算)
normalized_vectors = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)
# 添加到索引
self.index.add(normalized_vectors.astype(np.float32))
# 存储元数据
start_id = self.index.ntotal - len(vectors)
for i, (chunk_id, metadata) in enumerate(zip(ids, metadatas)):
faiss_id = start_id + i
self.id_to_metadata[faiss_id] = metadata
self.id_to_content[faiss_id] = chunk_id
logging.info(f"已添加 {len(ids)} 个向量到FAISS")
except Exception as e:
logging.error(f"添加向量到FAISS失败: {e}")
raise
def search(self, query_vector: np.ndarray, top_k: int = 10, filter_dict: Optional[Dict[str, Any]] = None) -> List[VectorSearchResult]:
"""在FAISS中搜索"""
try:
# 标准化查询向量
normalized_query = query_vector / np.linalg.norm(query_vector)
# 搜索
scores, indices = self.index.search(
normalized_query.reshape(1, -1).astype(np.float32),
top_k
)
search_results = []
for score, idx in zip(scores[0], indices[0]):
if idx == -1: # FAISS返回-1表示无效结果
continue
metadata = self.id_to_metadata.get(idx, {})
content = self.id_to_content.get(idx, "")
# 应用过滤器
if filter_dict and not self._match_filter(metadata, filter_dict):
continue
result = VectorSearchResult(
chunk_id=str(idx),
content=content,
metadata=metadata,
score=float(score)
)
search_results.append(result)
return search_results
except Exception as e:
logging.error(f"FAISS搜索失败: {e}")
return []
def _match_filter(self, metadata: Dict[str, Any], filter_dict: Dict[str, Any]) -> bool:
"""检查元数据是否匹配过滤条件"""
for key, value in filter_dict.items():
if key not in metadata or metadata[key] != value:
return False
return True
def delete_vectors(self, ids: List[str]):
"""FAISS不支持删除,需要重建索引"""
logging.warning("FAISS不支持删除操作,请考虑重建索引")
def get_vector_count(self) -> int:
"""获取FAISS中的向量数量"""
return self.index.ntotal if self.index else 0
class VectorStoreFactory:
"""向量存储工厂"""
_stores = {
"chroma": ChromaVectorStore,
"faiss": FAISSVectorStore
}
@classmethod
def create_store(cls, store_type: str, dimension: int, **kwargs) -> BaseVectorStore:
"""创建向量存储"""
if store_type not in cls._stores:
raise ValueError(f"不支持的存储类型: {store_type}")
store_class = cls._stores[store_type]
return store_class(dimension, **kwargs)
@classmethod
def get_available_stores(cls) -> List[str]:
"""获取可用的存储类型"""
return list(cls._stores.keys())
本章总结
本章深入介绍了RAG系统中的智能分块与向量化技术:
核心要点
分块策略:
- 固定大小分块:简单高效,适合一般场景
- 句子级分块:保持语义完整性
- 语义分块:基于内容相似度的智能分割
向量化技术:
- 多种嵌入模型支持(Sentence Transformer、OpenAI等)
- 嵌入缓存机制提高效率
- 批量处理和错误处理
向量存储:
- Chroma:易用的向量数据库
- FAISS:高性能的相似度搜索
- 统一的接口设计
最佳实践
分块优化:
- 根据文档类型选择合适的分块策略
- 合理设置块大小和重叠度
- 考虑语义完整性
向量化优化:
- 选择适合领域的嵌入模型
- 使用缓存减少重复计算
- 批量处理提高效率
存储选择:
- 小规模数据使用Chroma
- 大规模数据考虑FAISS
- 根据查询需求选择索引类型
下一章我们将学习RAG系统的检索与生成技术,包括检索策略优化、生成模型集成等内容。