本章概述

智能分块是RAG系统中的关键环节,直接影响检索精度和生成质量。本章将深入介绍多种分块策略、向量化技术、相似度计算方法,以及如何优化分块效果。

学习目标

  • 掌握多种文本分块策略和算法
  • 学习向量化技术和嵌入模型选择
  • 了解相似度计算和检索优化方法
  • 熟悉分块质量评估和优化技巧
  • 掌握向量数据库的使用和管理

1. 智能分块策略

1.1 基础分块器

# src/chunking/base_chunker.py - 基础分块器
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Union
from dataclasses import dataclass
import re
import math
from ..data.loaders import Document

@dataclass
class Chunk:
    """文本块数据结构"""
    content: str
    metadata: Dict[str, Any]
    chunk_id: Optional[str] = None
    doc_id: Optional[str] = None
    start_index: Optional[int] = None
    end_index: Optional[int] = None
    
    def __post_init__(self):
        if self.chunk_id is None:
            import hashlib
            self.chunk_id = hashlib.md5(
                (self.content + str(self.metadata)).encode()
            ).hexdigest()[:12]

class BaseChunker(ABC):
    """分块器基类"""
    
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
    
    @abstractmethod
    def split_text(self, text: str) -> List[str]:
        """分割文本"""
        pass
    
    def create_chunks(self, document: Document) -> List[Chunk]:
        """创建文本块"""
        text_chunks = self.split_text(document.content)
        chunks = []
        
        for i, chunk_text in enumerate(text_chunks):
            chunk_metadata = document.metadata.copy()
            chunk_metadata.update({
                "chunk_index": i,
                "total_chunks": len(text_chunks),
                "chunk_size": len(chunk_text),
                "word_count": len(chunk_text.split()),
                "source_doc_id": document.doc_id
            })
            
            chunk = Chunk(
                content=chunk_text,
                metadata=chunk_metadata,
                doc_id=document.doc_id,
                start_index=self._get_start_index(document.content, chunk_text, i),
                end_index=self._get_end_index(document.content, chunk_text, i)
            )
            chunks.append(chunk)
        
        return chunks
    
    def _get_start_index(self, full_text: str, chunk_text: str, chunk_index: int) -> int:
        """获取块在原文中的起始位置"""
        try:
            return full_text.find(chunk_text)
        except:
            return chunk_index * (self.chunk_size - self.chunk_overlap)
    
    def _get_end_index(self, full_text: str, chunk_text: str, chunk_index: int) -> int:
        """获取块在原文中的结束位置"""
        start_index = self._get_start_index(full_text, chunk_text, chunk_index)
        return start_index + len(chunk_text)

class FixedSizeChunker(BaseChunker):
    """固定大小分块器"""
    
    def split_text(self, text: str) -> List[str]:
        """按固定大小分割文本"""
        if not text:
            return []
        
        chunks = []
        start = 0
        
        while start < len(text):
            end = start + self.chunk_size
            
            # 如果不是最后一块,尝试在单词边界分割
            if end < len(text):
                # 向后查找空格
                while end > start and text[end] not in ' \n\t':
                    end -= 1
                
                # 如果没找到合适的分割点,使用原始位置
                if end == start:
                    end = start + self.chunk_size
            
            chunk = text[start:end].strip()
            if chunk:
                chunks.append(chunk)
            
            # 计算下一个块的起始位置(考虑重叠)
            start = end - self.chunk_overlap
            if start < 0:
                start = 0
        
        return chunks

class SentenceChunker(BaseChunker):
    """句子级分块器"""
    
    def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200, language: str = "zh"):
        super().__init__(chunk_size, chunk_overlap)
        self.language = language
        self._compile_sentence_patterns()
    
    def _compile_sentence_patterns(self):
        """编译句子分割模式"""
        if self.language == "zh":
            # 中文句子分割
            self.sentence_pattern = re.compile(r'[。!?;\n]+')
        else:
            # 英文句子分割
            self.sentence_pattern = re.compile(r'[.!?;\n]+')
    
    def split_text(self, text: str) -> List[str]:
        """按句子分割文本"""
        if not text:
            return []
        
        # 分割成句子
        sentences = self.sentence_pattern.split(text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        if not sentences:
            return [text]
        
        chunks = []
        current_chunk = ""
        current_size = 0
        
        for sentence in sentences:
            sentence_size = len(sentence)
            
            # 如果当前句子加入后超过块大小
            if current_size + sentence_size > self.chunk_size and current_chunk:
                chunks.append(current_chunk.strip())
                
                # 处理重叠
                if self.chunk_overlap > 0:
                    overlap_text = self._get_overlap_text(current_chunk, self.chunk_overlap)
                    current_chunk = overlap_text + " " + sentence
                    current_size = len(current_chunk)
                else:
                    current_chunk = sentence
                    current_size = sentence_size
            else:
                if current_chunk:
                    current_chunk += " " + sentence
                else:
                    current_chunk = sentence
                current_size += sentence_size
        
        # 添加最后一个块
        if current_chunk.strip():
            chunks.append(current_chunk.strip())
        
        return chunks
    
    def _get_overlap_text(self, text: str, overlap_size: int) -> str:
        """获取重叠文本"""
        if len(text) <= overlap_size:
            return text
        
        # 从末尾开始,尝试在句子边界截取
        overlap_text = text[-overlap_size:]
        
        # 查找句子开始位置
        for pattern in ['. ', '。 ', '! ', '! ', '? ', '? ']:
            pos = overlap_text.find(pattern)
            if pos != -1:
                return overlap_text[pos + len(pattern):]
        
        return overlap_text

class SemanticChunker(BaseChunker):
    """语义分块器"""
    
    def __init__(self, 
                 chunk_size: int = 1000, 
                 chunk_overlap: int = 200,
                 similarity_threshold: float = 0.7,
                 embedding_model=None):
        super().__init__(chunk_size, chunk_overlap)
        self.similarity_threshold = similarity_threshold
        self.embedding_model = embedding_model
    
    def split_text(self, text: str) -> List[str]:
        """基于语义相似度分割文本"""
        if not text or not self.embedding_model:
            # 回退到句子分块
            return SentenceChunker(self.chunk_size, self.chunk_overlap).split_text(text)
        
        # 先按句子分割
        sentence_chunker = SentenceChunker()
        sentences = sentence_chunker.sentence_pattern.split(text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        if len(sentences) <= 1:
            return [text]
        
        # 计算句子嵌入
        embeddings = self._get_embeddings(sentences)
        
        # 基于相似度合并句子
        chunks = []
        current_chunk_sentences = [sentences[0]]
        current_chunk_size = len(sentences[0])
        
        for i in range(1, len(sentences)):
            sentence = sentences[i]
            sentence_size = len(sentence)
            
            # 计算与当前块的相似度
            similarity = self._calculate_similarity(
                embeddings[i], 
                self._get_chunk_embedding(current_chunk_sentences, embeddings[:i])
            )
            
            # 判断是否应该合并
            should_merge = (
                similarity >= self.similarity_threshold and 
                current_chunk_size + sentence_size <= self.chunk_size
            )
            
            if should_merge:
                current_chunk_sentences.append(sentence)
                current_chunk_size += sentence_size
            else:
                # 完成当前块
                chunks.append(" ".join(current_chunk_sentences))
                current_chunk_sentences = [sentence]
                current_chunk_size = sentence_size
        
        # 添加最后一个块
        if current_chunk_sentences:
            chunks.append(" ".join(current_chunk_sentences))
        
        return chunks
    
    def _get_embeddings(self, sentences: List[str]) -> List:
        """获取句子嵌入"""
        try:
            return self.embedding_model.encode(sentences)
        except Exception as e:
            print(f"获取嵌入失败: {e}")
            return [None] * len(sentences)
    
    def _get_chunk_embedding(self, sentences: List[str], embeddings: List) -> Any:
        """获取块的平均嵌入"""
        try:
            import numpy as np
            valid_embeddings = [emb for emb in embeddings if emb is not None]
            if valid_embeddings:
                return np.mean(valid_embeddings, axis=0)
            return None
        except:
            return None
    
    def _calculate_similarity(self, emb1, emb2) -> float:
        """计算相似度"""
        try:
            import numpy as np
            from sklearn.metrics.pairwise import cosine_similarity
            
            if emb1 is None or emb2 is None:
                return 0.0
            
            return cosine_similarity([emb1], [emb2])[0][0]
        except:
            return 0.0

class ChunkerFactory:
    """分块器工厂"""
    
    _chunkers = {
        "fixed": FixedSizeChunker,
        "sentence": SentenceChunker,
        "semantic": SemanticChunker
    }
    
    @classmethod
    def create_chunker(cls, chunker_type: str, **kwargs) -> BaseChunker:
        """创建分块器"""
        if chunker_type not in cls._chunkers:
            raise ValueError(f"不支持的分块器类型: {chunker_type}")
        
        chunker_class = cls._chunkers[chunker_type]
        return chunker_class(**kwargs)
    
    @classmethod
    def get_available_chunkers(cls) -> List[str]:
        """获取可用的分块器类型"""
        return list(cls._chunkers.keys())

2. 向量化技术

2.1 嵌入模型管理

# src/embeddings/embedding_manager.py - 嵌入模型管理
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Union
import numpy as np
from dataclasses import dataclass
import logging
from pathlib import Path

@dataclass
class EmbeddingConfig:
    """嵌入配置"""
    model_name: str
    model_path: Optional[str] = None
    device: str = "cpu"  # cpu, cuda, mps
    batch_size: int = 32
    max_length: int = 512
    normalize: bool = True
    cache_embeddings: bool = True
    cache_dir: Optional[str] = None

class BaseEmbeddingModel(ABC):
    """嵌入模型基类"""
    
    def __init__(self, config: EmbeddingConfig):
        self.config = config
        self.model = None
        self.tokenizer = None
        self._setup_cache()
    
    def _setup_cache(self):
        """设置缓存"""
        if self.config.cache_embeddings:
            cache_dir = self.config.cache_dir or "./cache/embeddings"
            self.cache_dir = Path(cache_dir)
            self.cache_dir.mkdir(parents=True, exist_ok=True)
            self.embedding_cache = {}
        else:
            self.embedding_cache = None
    
    @abstractmethod
    def load_model(self):
        """加载模型"""
        pass
    
    @abstractmethod
    def encode(self, texts: Union[str, List[str]]) -> np.ndarray:
        """编码文本"""
        pass
    
    def get_embedding_dimension(self) -> int:
        """获取嵌入维度"""
        test_embedding = self.encode(["test"])
        return test_embedding.shape[1]
    
    def _get_cache_key(self, text: str) -> str:
        """生成缓存键"""
        import hashlib
        return hashlib.md5(text.encode()).hexdigest()
    
    def _get_cached_embedding(self, text: str) -> Optional[np.ndarray]:
        """获取缓存的嵌入"""
        if not self.embedding_cache:
            return None
        
        cache_key = self._get_cache_key(text)
        return self.embedding_cache.get(cache_key)
    
    def _cache_embedding(self, text: str, embedding: np.ndarray):
        """缓存嵌入"""
        if self.embedding_cache is not None:
            cache_key = self._get_cache_key(text)
            self.embedding_cache[cache_key] = embedding

class SentenceTransformerModel(BaseEmbeddingModel):
    """Sentence Transformer模型"""
    
    def load_model(self):
        """加载Sentence Transformer模型"""
        try:
            from sentence_transformers import SentenceTransformer
            
            model_path = self.config.model_path or self.config.model_name
            self.model = SentenceTransformer(model_path, device=self.config.device)
            
            logging.info(f"已加载Sentence Transformer模型: {model_path}")
            
        except ImportError:
            raise RuntimeError("请安装sentence-transformers库: pip install sentence-transformers")
        except Exception as e:
            raise RuntimeError(f"加载模型失败: {e}")
    
    def encode(self, texts: Union[str, List[str]]) -> np.ndarray:
        """编码文本"""
        if self.model is None:
            self.load_model()
        
        # 处理单个文本
        if isinstance(texts, str):
            texts = [texts]
        
        # 检查缓存
        cached_embeddings = []
        uncached_texts = []
        uncached_indices = []
        
        for i, text in enumerate(texts):
            cached_emb = self._get_cached_embedding(text)
            if cached_emb is not None:
                cached_embeddings.append((i, cached_emb))
            else:
                uncached_texts.append(text)
                uncached_indices.append(i)
        
        # 编码未缓存的文本
        if uncached_texts:
            try:
                new_embeddings = self.model.encode(
                    uncached_texts,
                    batch_size=self.config.batch_size,
                    normalize_embeddings=self.config.normalize,
                    convert_to_numpy=True
                )
                
                # 缓存新的嵌入
                for text, embedding in zip(uncached_texts, new_embeddings):
                    self._cache_embedding(text, embedding)
                
            except Exception as e:
                logging.error(f"编码文本失败: {e}")
                raise
        else:
            new_embeddings = np.array([])
        
        # 合并结果
        all_embeddings = np.zeros((len(texts), self.get_embedding_dimension()))
        
        # 填入缓存的嵌入
        for idx, embedding in cached_embeddings:
            all_embeddings[idx] = embedding
        
        # 填入新的嵌入
        for i, idx in enumerate(uncached_indices):
            all_embeddings[idx] = new_embeddings[i]
        
        return all_embeddings

class OpenAIEmbeddingModel(BaseEmbeddingModel):
    """OpenAI嵌入模型"""
    
    def __init__(self, config: EmbeddingConfig, api_key: str):
        super().__init__(config)
        self.api_key = api_key
    
    def load_model(self):
        """加载OpenAI客户端"""
        try:
            import openai
            openai.api_key = self.api_key
            self.client = openai
            logging.info(f"已初始化OpenAI客户端: {self.config.model_name}")
            
        except ImportError:
            raise RuntimeError("请安装openai库: pip install openai")
    
    def encode(self, texts: Union[str, List[str]]) -> np.ndarray:
        """使用OpenAI API编码文本"""
        if self.client is None:
            self.load_model()
        
        if isinstance(texts, str):
            texts = [texts]
        
        embeddings = []
        
        for text in texts:
            # 检查缓存
            cached_emb = self._get_cached_embedding(text)
            if cached_emb is not None:
                embeddings.append(cached_emb)
                continue
            
            try:
                response = self.client.Embedding.create(
                    model=self.config.model_name,
                    input=text[:self.config.max_length]
                )
                
                embedding = np.array(response['data'][0]['embedding'])
                
                if self.config.normalize:
                    embedding = embedding / np.linalg.norm(embedding)
                
                self._cache_embedding(text, embedding)
                embeddings.append(embedding)
                
            except Exception as e:
                logging.error(f"OpenAI API调用失败: {e}")
                # 返回零向量作为fallback
                embedding = np.zeros(1536)  # text-embedding-ada-002的维度
                embeddings.append(embedding)
        
        return np.array(embeddings)

class EmbeddingModelFactory:
    """嵌入模型工厂"""
    
    _models = {
        "sentence_transformer": SentenceTransformerModel,
        "openai": OpenAIEmbeddingModel
    }
    
    @classmethod
    def create_model(cls, model_type: str, config: EmbeddingConfig, **kwargs) -> BaseEmbeddingModel:
        """创建嵌入模型"""
        if model_type not in cls._models:
            raise ValueError(f"不支持的模型类型: {model_type}")
        
        model_class = cls._models[model_type]
        return model_class(config, **kwargs)
    
    @classmethod
    def get_available_models(cls) -> List[str]:
        """获取可用的模型类型"""
        return list(cls._models.keys())

3. 向量数据库集成

3.1 向量存储管理

# src/vectorstore/vector_manager.py - 向量存储管理
from abc import ABC, abstractmethod
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from dataclasses import dataclass
import logging
import json
from pathlib import Path

@dataclass
class VectorSearchResult:
    """向量搜索结果"""
    chunk_id: str
    content: str
    metadata: Dict[str, Any]
    score: float
    embedding: Optional[np.ndarray] = None

class BaseVectorStore(ABC):
    """向量存储基类"""
    
    def __init__(self, dimension: int, collection_name: str = "default"):
        self.dimension = dimension
        self.collection_name = collection_name
    
    @abstractmethod
    def add_vectors(self, 
                   vectors: np.ndarray, 
                   metadatas: List[Dict[str, Any]], 
                   ids: List[str]):
        """添加向量"""
        pass
    
    @abstractmethod
    def search(self, 
              query_vector: np.ndarray, 
              top_k: int = 10,
              filter_dict: Optional[Dict[str, Any]] = None) -> List[VectorSearchResult]:
        """搜索相似向量"""
        pass
    
    @abstractmethod
    def delete_vectors(self, ids: List[str]):
        """删除向量"""
        pass
    
    @abstractmethod
    def get_vector_count(self) -> int:
        """获取向量数量"""
        pass

class ChromaVectorStore(BaseVectorStore):
    """Chroma向量存储"""
    
    def __init__(self, dimension: int, collection_name: str = "default", persist_directory: str = "./chroma_db"):
        super().__init__(dimension, collection_name)
        self.persist_directory = persist_directory
        self.client = None
        self.collection = None
        self._initialize()
    
    def _initialize(self):
        """初始化Chroma客户端"""
        try:
            import chromadb
            from chromadb.config import Settings
            
            self.client = chromadb.PersistentClient(
                path=self.persist_directory,
                settings=Settings(anonymized_telemetry=False)
            )
            
            # 获取或创建集合
            try:
                self.collection = self.client.get_collection(name=self.collection_name)
            except:
                self.collection = self.client.create_collection(
                    name=self.collection_name,
                    metadata={"dimension": self.dimension}
                )
            
            logging.info(f"已初始化Chroma向量存储: {self.collection_name}")
            
        except ImportError:
            raise RuntimeError("请安装chromadb库: pip install chromadb")
        except Exception as e:
            raise RuntimeError(f"初始化Chroma失败: {e}")
    
    def add_vectors(self, vectors: np.ndarray, metadatas: List[Dict[str, Any]], ids: List[str]):
        """添加向量到Chroma"""
        try:
            # 转换元数据为JSON字符串(Chroma要求)
            documents = [json.dumps(meta, ensure_ascii=False) for meta in metadatas]
            
            self.collection.add(
                embeddings=vectors.tolist(),
                documents=documents,
                metadatas=metadatas,
                ids=ids
            )
            
            logging.info(f"已添加 {len(ids)} 个向量到Chroma")
            
        except Exception as e:
            logging.error(f"添加向量到Chroma失败: {e}")
            raise
    
    def search(self, query_vector: np.ndarray, top_k: int = 10, filter_dict: Optional[Dict[str, Any]] = None) -> List[VectorSearchResult]:
        """在Chroma中搜索"""
        try:
            results = self.collection.query(
                query_embeddings=[query_vector.tolist()],
                n_results=top_k,
                where=filter_dict
            )
            
            search_results = []
            
            for i in range(len(results['ids'][0])):
                result = VectorSearchResult(
                    chunk_id=results['ids'][0][i],
                    content=results['documents'][0][i] if results['documents'][0] else "",
                    metadata=results['metadatas'][0][i] if results['metadatas'][0] else {},
                    score=1 - results['distances'][0][i],  # 转换距离为相似度
                    embedding=np.array(results['embeddings'][0][i]) if results.get('embeddings') else None
                )
                search_results.append(result)
            
            return search_results
            
        except Exception as e:
            logging.error(f"Chroma搜索失败: {e}")
            return []
    
    def delete_vectors(self, ids: List[str]):
        """从Chroma删除向量"""
        try:
            self.collection.delete(ids=ids)
            logging.info(f"已从Chroma删除 {len(ids)} 个向量")
        except Exception as e:
            logging.error(f"从Chroma删除向量失败: {e}")
    
    def get_vector_count(self) -> int:
        """获取Chroma中的向量数量"""
        try:
            return self.collection.count()
        except Exception as e:
            logging.error(f"获取Chroma向量数量失败: {e}")
            return 0

class FAISSVectorStore(BaseVectorStore):
    """FAISS向量存储"""
    
    def __init__(self, dimension: int, collection_name: str = "default", index_type: str = "flat"):
        super().__init__(dimension, collection_name)
        self.index_type = index_type
        self.index = None
        self.id_to_metadata = {}
        self.id_to_content = {}
        self._initialize()
    
    def _initialize(self):
        """初始化FAISS索引"""
        try:
            import faiss
            
            if self.index_type == "flat":
                self.index = faiss.IndexFlatIP(self.dimension)  # 内积索引
            elif self.index_type == "ivf":
                quantizer = faiss.IndexFlatIP(self.dimension)
                self.index = faiss.IndexIVFFlat(quantizer, self.dimension, 100)
            else:
                raise ValueError(f"不支持的索引类型: {self.index_type}")
            
            logging.info(f"已初始化FAISS索引: {self.index_type}")
            
        except ImportError:
            raise RuntimeError("请安装faiss库: pip install faiss-cpu 或 pip install faiss-gpu")
        except Exception as e:
            raise RuntimeError(f"初始化FAISS失败: {e}")
    
    def add_vectors(self, vectors: np.ndarray, metadatas: List[Dict[str, Any]], ids: List[str]):
        """添加向量到FAISS"""
        try:
            # 标准化向量(用于内积计算)
            normalized_vectors = vectors / np.linalg.norm(vectors, axis=1, keepdims=True)
            
            # 添加到索引
            self.index.add(normalized_vectors.astype(np.float32))
            
            # 存储元数据
            start_id = self.index.ntotal - len(vectors)
            for i, (chunk_id, metadata) in enumerate(zip(ids, metadatas)):
                faiss_id = start_id + i
                self.id_to_metadata[faiss_id] = metadata
                self.id_to_content[faiss_id] = chunk_id
            
            logging.info(f"已添加 {len(ids)} 个向量到FAISS")
            
        except Exception as e:
            logging.error(f"添加向量到FAISS失败: {e}")
            raise
    
    def search(self, query_vector: np.ndarray, top_k: int = 10, filter_dict: Optional[Dict[str, Any]] = None) -> List[VectorSearchResult]:
        """在FAISS中搜索"""
        try:
            # 标准化查询向量
            normalized_query = query_vector / np.linalg.norm(query_vector)
            
            # 搜索
            scores, indices = self.index.search(
                normalized_query.reshape(1, -1).astype(np.float32), 
                top_k
            )
            
            search_results = []
            
            for score, idx in zip(scores[0], indices[0]):
                if idx == -1:  # FAISS返回-1表示无效结果
                    continue
                
                metadata = self.id_to_metadata.get(idx, {})
                content = self.id_to_content.get(idx, "")
                
                # 应用过滤器
                if filter_dict and not self._match_filter(metadata, filter_dict):
                    continue
                
                result = VectorSearchResult(
                    chunk_id=str(idx),
                    content=content,
                    metadata=metadata,
                    score=float(score)
                )
                search_results.append(result)
            
            return search_results
            
        except Exception as e:
            logging.error(f"FAISS搜索失败: {e}")
            return []
    
    def _match_filter(self, metadata: Dict[str, Any], filter_dict: Dict[str, Any]) -> bool:
        """检查元数据是否匹配过滤条件"""
        for key, value in filter_dict.items():
            if key not in metadata or metadata[key] != value:
                return False
        return True
    
    def delete_vectors(self, ids: List[str]):
        """FAISS不支持删除,需要重建索引"""
        logging.warning("FAISS不支持删除操作,请考虑重建索引")
    
    def get_vector_count(self) -> int:
        """获取FAISS中的向量数量"""
        return self.index.ntotal if self.index else 0

class VectorStoreFactory:
    """向量存储工厂"""
    
    _stores = {
        "chroma": ChromaVectorStore,
        "faiss": FAISSVectorStore
    }
    
    @classmethod
    def create_store(cls, store_type: str, dimension: int, **kwargs) -> BaseVectorStore:
        """创建向量存储"""
        if store_type not in cls._stores:
            raise ValueError(f"不支持的存储类型: {store_type}")
        
        store_class = cls._stores[store_type]
        return store_class(dimension, **kwargs)
    
    @classmethod
    def get_available_stores(cls) -> List[str]:
        """获取可用的存储类型"""
        return list(cls._stores.keys())

本章总结

本章深入介绍了RAG系统中的智能分块与向量化技术:

核心要点

  1. 分块策略

    • 固定大小分块:简单高效,适合一般场景
    • 句子级分块:保持语义完整性
    • 语义分块:基于内容相似度的智能分割
  2. 向量化技术

    • 多种嵌入模型支持(Sentence Transformer、OpenAI等)
    • 嵌入缓存机制提高效率
    • 批量处理和错误处理
  3. 向量存储

    • Chroma:易用的向量数据库
    • FAISS:高性能的相似度搜索
    • 统一的接口设计

最佳实践

  1. 分块优化

    • 根据文档类型选择合适的分块策略
    • 合理设置块大小和重叠度
    • 考虑语义完整性
  2. 向量化优化

    • 选择适合领域的嵌入模型
    • 使用缓存减少重复计算
    • 批量处理提高效率
  3. 存储选择

    • 小规模数据使用Chroma
    • 大规模数据考虑FAISS
    • 根据查询需求选择索引类型

下一章我们将学习RAG系统的检索与生成技术,包括检索策略优化、生成模型集成等内容。