本章概述

本章将详细介绍RAG系统开发所需的环境搭建和工具配置,包括Python环境准备、核心依赖库安装、向量数据库配置、以及开发调试工具的使用。通过本章学习,您将能够搭建完整的RAG开发环境。

学习目标

  • 掌握RAG开发环境的搭建方法
  • 了解核心依赖库的安装和配置
  • 学习向量数据库的部署和使用
  • 熟悉开发调试工具和最佳实践

1. 开发环境准备

1.1 Python环境配置

# requirements.txt - RAG项目依赖清单
"""
# 核心框架
langchain==0.1.0
langchain-community==0.0.10
langchain-openai==0.0.5

# 向量数据库
chromadb==0.4.22
faiss-cpu==1.7.4
pinecone-client==3.0.0
weaviate-client==3.25.3

# 嵌入模型
sentence-transformers==2.2.2
openai==1.6.1
huggingface-hub==0.20.2
transformers==4.36.2

# 文档处理
pypdf2==3.0.1
python-docx==1.1.0
beautifulsoup4==4.12.2
markdown==3.5.2

# 数据处理
numpy==1.24.3
pandas==2.0.3
scipy==1.11.4

# Web框架
fastapi==0.104.1
streamlit==1.29.0
flask==3.0.0

# 监控和日志
wandb==0.16.1
mlflow==2.9.2
loguru==0.7.2

# 开发工具
jupyter==1.0.0
ipykernel==6.27.1
pytest==7.4.3
black==23.12.1
flake8==6.1.0
"""

class EnvironmentSetup:
    """环境搭建管理类"""
    
    def __init__(self):
        self.python_version = "3.9+"
        self.required_packages = [
            "langchain", "chromadb", "sentence-transformers",
            "openai", "fastapi", "streamlit"
        ]
        self.optional_packages = [
            "faiss-cpu", "pinecone-client", "weaviate-client"
        ]
    
    def check_python_version(self):
        """检查Python版本"""
        import sys
        version = sys.version_info
        
        if version.major == 3 and version.minor >= 9:
            print(f"✓ Python版本检查通过: {version.major}.{version.minor}.{version.micro}")
            return True
        else:
            print(f"✗ Python版本过低: {version.major}.{version.minor}.{version.micro}")
            print("请升级到Python 3.9或更高版本")
            return False
    
    def create_virtual_environment(self, env_name="rag_env"):
        """创建虚拟环境"""
        import subprocess
        import os
        
        commands = [
            f"python -m venv {env_name}",
            f"{env_name}/Scripts/activate" if os.name == 'nt' else f"source {env_name}/bin/activate",
            "pip install --upgrade pip"
        ]
        
        print(f"创建虚拟环境: {env_name}")
        for cmd in commands:
            print(f"执行: {cmd}")
        
        return {
            "env_name": env_name,
            "activation_script": f"{env_name}/Scripts/activate" if os.name == 'nt' else f"source {env_name}/bin/activate",
            "commands": commands
        }
    
    def install_dependencies(self, requirements_file="requirements.txt"):
        """安装依赖包"""
        installation_steps = [
            {
                "step": "基础依赖安装",
                "command": f"pip install -r {requirements_file}",
                "description": "安装所有必需的Python包"
            },
            {
                "step": "验证安装",
                "command": "pip list | grep -E 'langchain|chromadb|sentence-transformers'",
                "description": "验证核心包是否正确安装"
            },
            {
                "step": "环境测试",
                "command": "python -c 'import langchain; import chromadb; print(\"环境配置成功\")'",
                "description": "测试导入核心模块"
            }
        ]
        
        return installation_steps
    
    def generate_env_file(self):
        """生成环境变量配置文件"""
        env_template = """
# .env - 环境变量配置文件

# OpenAI API配置
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_API_BASE=https://api.openai.com/v1

# Hugging Face配置
HUGGINGFACE_API_TOKEN=your_hf_token_here

# 向量数据库配置
# Pinecone
PINECONE_API_KEY=your_pinecone_api_key
PINECONE_ENVIRONMENT=your_pinecone_environment

# Weaviate
WEAVIATE_URL=http://localhost:8080
WEAVIATE_API_KEY=your_weaviate_api_key

# ChromaDB
CHROMA_HOST=localhost
CHROMA_PORT=8000

# 应用配置
APP_ENV=development
LOG_LEVEL=INFO
MAX_TOKENS=1000
TEMPERATURE=0.7

# 文件路径
DATA_DIR=./data
LOGS_DIR=./logs
MODELS_DIR=./models
        """
        
        return env_template

# 使用示例
if __name__ == "__main__":
    setup = EnvironmentSetup()
    
    # 检查Python版本
    if setup.check_python_version():
        print("\n开始环境搭建...")
        
        # 创建虚拟环境
        venv_info = setup.create_virtual_environment()
        print(f"虚拟环境信息: {venv_info}")
        
        # 安装依赖
        install_steps = setup.install_dependencies()
        print("\n安装步骤:")
        for step in install_steps:
            print(f"- {step['step']}: {step['command']}")
        
        # 生成环境配置
        env_content = setup.generate_env_file()
        print("\n环境变量配置模板已生成")

1.2 Docker环境配置

# Dockerfile - RAG应用容器化
FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    curl \
    git \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建必要目录
RUN mkdir -p data logs models

# 设置环境变量
ENV PYTHONPATH=/app
ENV APP_ENV=production

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "app.py"]
# docker-compose.yml - 完整的RAG服务栈
version: '3.8'

services:
  # RAG应用服务
  rag-app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - APP_ENV=development
      - CHROMA_HOST=chromadb
      - CHROMA_PORT=8000
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    depends_on:
      - chromadb
      - redis
    networks:
      - rag-network

  # ChromaDB向量数据库
  chromadb:
    image: chromadb/chroma:latest
    ports:
      - "8001:8000"
    volumes:
      - chroma_data:/chroma/chroma
    environment:
      - CHROMA_SERVER_HOST=0.0.0.0
      - CHROMA_SERVER_HTTP_PORT=8000
    networks:
      - rag-network

  # Redis缓存
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    networks:
      - rag-network

  # Elasticsearch (可选)
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
    ports:
      - "9200:9200"
    volumes:
      - es_data:/usr/share/elasticsearch/data
    networks:
      - rag-network

  # Jupyter Notebook开发环境
  jupyter:
    build:
      context: .
      dockerfile: Dockerfile.jupyter
    ports:
      - "8888:8888"
    volumes:
      - ./notebooks:/app/notebooks
      - ./data:/app/data
    environment:
      - JUPYTER_ENABLE_LAB=yes
    networks:
      - rag-network

volumes:
  chroma_data:
  redis_data:
  es_data:

networks:
  rag-network:
    driver: bridge
# Dockerfile.jupyter - Jupyter开发环境
FROM python:3.9-slim

WORKDIR /app

# 安装Jupyter和RAG依赖
RUN pip install jupyter jupyterlab
COPY requirements.txt .
RUN pip install -r requirements.txt

# 配置Jupyter
RUN jupyter lab --generate-config
RUN echo "c.ServerApp.ip = '0.0.0.0'" >> ~/.jupyter/jupyter_lab_config.py
RUN echo "c.ServerApp.allow_root = True" >> ~/.jupyter/jupyter_lab_config.py
RUN echo "c.ServerApp.token = ''" >> ~/.jupyter/jupyter_lab_config.py

EXPOSE 8888

CMD ["jupyter", "lab", "--port=8888", "--no-browser"]

2. 核心依赖库详解

2.1 LangChain框架

# langchain_setup.py - LangChain配置和使用
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.vectorstores import Chroma, FAISS
from langchain.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os

class LangChainSetup:
    """LangChain框架配置类"""
    
    def __init__(self):
        self.embeddings = None
        self.llm = None
        self.vectorstore = None
        self.qa_chain = None
    
    def setup_embeddings(self, provider="openai", model_name=None):
        """配置嵌入模型"""
        if provider == "openai":
            self.embeddings = OpenAIEmbeddings(
                openai_api_key=os.getenv("OPENAI_API_KEY")
            )
            print("✓ OpenAI嵌入模型已配置")
        
        elif provider == "huggingface":
            model_name = model_name or "sentence-transformers/all-MiniLM-L6-v2"
            self.embeddings = HuggingFaceEmbeddings(
                model_name=model_name
            )
            print(f"✓ HuggingFace嵌入模型已配置: {model_name}")
        
        else:
            raise ValueError(f"不支持的嵌入提供商: {provider}")
        
        return self.embeddings
    
    def setup_llm(self, provider="openai", model_name="gpt-3.5-turbo"):
        """配置语言模型"""
        if provider == "openai":
            if "gpt-3.5" in model_name or "gpt-4" in model_name:
                self.llm = ChatOpenAI(
                    model_name=model_name,
                    temperature=0.7,
                    openai_api_key=os.getenv("OPENAI_API_KEY")
                )
            else:
                self.llm = OpenAI(
                    model_name=model_name,
                    temperature=0.7,
                    openai_api_key=os.getenv("OPENAI_API_KEY")
                )
            print(f"✓ OpenAI语言模型已配置: {model_name}")
        
        else:
            raise ValueError(f"不支持的LLM提供商: {provider}")
        
        return self.llm
    
    def setup_vectorstore(self, store_type="chroma", persist_directory="./chroma_db"):
        """配置向量存储"""
        if not self.embeddings:
            raise ValueError("请先配置嵌入模型")
        
        if store_type == "chroma":
            self.vectorstore = Chroma(
                embedding_function=self.embeddings,
                persist_directory=persist_directory
            )
            print(f"✓ ChromaDB向量存储已配置: {persist_directory}")
        
        elif store_type == "faiss":
            # FAISS需要先有文档才能初始化
            print("✓ FAISS向量存储配置就绪(需要文档初始化)")
        
        else:
            raise ValueError(f"不支持的向量存储类型: {store_type}")
        
        return self.vectorstore
    
    def load_and_process_documents(self, file_paths, chunk_size=1000, chunk_overlap=200):
        """加载和处理文档"""
        documents = []
        
        # 文档加载
        for file_path in file_paths:
            if file_path.endswith('.pdf'):
                loader = PyPDFLoader(file_path)
            elif file_path.endswith('.txt'):
                loader = TextLoader(file_path)
            else:
                print(f"跳过不支持的文件类型: {file_path}")
                continue
            
            docs = loader.load()
            documents.extend(docs)
            print(f"✓ 已加载文档: {file_path}")
        
        # 文档分割
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap
        )
        
        split_docs = text_splitter.split_documents(documents)
        print(f"✓ 文档已分割为 {len(split_docs)} 个块")
        
        return split_docs
    
    def create_qa_chain(self, chain_type="stuff"):
        """创建问答链"""
        if not self.llm or not self.vectorstore:
            raise ValueError("请先配置LLM和向量存储")
        
        # 创建检索器
        retriever = self.vectorstore.as_retriever(
            search_kwargs={"k": 3}
        )
        
        # 自定义提示模板
        prompt_template = """
        请基于以下上下文信息回答问题。如果上下文中没有相关信息,请说明无法从提供的信息中找到答案。
        
        上下文:
        {context}
        
        问题:{question}
        
        回答:
        """
        
        prompt = PromptTemplate(
            template=prompt_template,
            input_variables=["context", "question"]
        )
        
        # 创建QA链
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=self.llm,
            chain_type=chain_type,
            retriever=retriever,
            chain_type_kwargs={"prompt": prompt},
            return_source_documents=True
        )
        
        print(f"✓ QA链已创建,类型: {chain_type}")
        return self.qa_chain
    
    def query(self, question):
        """执行查询"""
        if not self.qa_chain:
            raise ValueError("请先创建QA链")
        
        result = self.qa_chain({"query": question})
        
        return {
            "question": question,
            "answer": result["result"],
            "source_documents": result["source_documents"]
        }
    
    def get_setup_status(self):
        """获取配置状态"""
        status = {
            "嵌入模型": "✓" if self.embeddings else "✗",
            "语言模型": "✓" if self.llm else "✗",
            "向量存储": "✓" if self.vectorstore else "✗",
            "问答链": "✓" if self.qa_chain else "✗"
        }
        
        return status

# 使用示例
if __name__ == "__main__":
    # 创建LangChain配置实例
    lc_setup = LangChainSetup()
    
    # 配置组件
    lc_setup.setup_embeddings(provider="huggingface")
    lc_setup.setup_llm(provider="openai")
    lc_setup.setup_vectorstore(store_type="chroma")
    
    # 检查配置状态
    status = lc_setup.get_setup_status()
    print("\n配置状态:")
    for component, status_icon in status.items():
        print(f"{component}: {status_icon}")
    
    # 如果有文档,可以继续配置
    # documents = lc_setup.load_and_process_documents(["example.pdf"])
    # lc_setup.vectorstore.add_documents(documents)
    # lc_setup.create_qa_chain()
    # result = lc_setup.query("什么是RAG?")

2.2 向量数据库配置

# vector_databases.py - 多种向量数据库配置
import chromadb
import faiss
import numpy as np
from typing import List, Dict, Any
import json

class VectorDatabaseManager:
    """向量数据库管理器"""
    
    def __init__(self):
        self.databases = {}
        self.current_db = None
    
    def setup_chromadb(self, persist_directory="./chroma_db", host=None, port=None):
        """配置ChromaDB"""
        try:
            if host and port:
                # 远程ChromaDB
                client = chromadb.HttpClient(
                    host=host,
                    port=port
                )
                print(f"✓ 连接到远程ChromaDB: {host}:{port}")
            else:
                # 本地ChromaDB
                client = chromadb.PersistentClient(
                    path=persist_directory
                )
                print(f"✓ 本地ChromaDB已配置: {persist_directory}")
            
            self.databases["chroma"] = {
                "client": client,
                "type": "chroma",
                "collections": {}
            }
            
            return client
            
        except Exception as e:
            print(f"✗ ChromaDB配置失败: {e}")
            return None
    
    def setup_faiss(self, dimension=384, index_type="flat"):
        """配置FAISS"""
        try:
            if index_type == "flat":
                index = faiss.IndexFlatL2(dimension)
            elif index_type == "ivf":
                quantizer = faiss.IndexFlatL2(dimension)
                index = faiss.IndexIVFFlat(quantizer, dimension, 100)
            elif index_type == "hnsw":
                index = faiss.IndexHNSWFlat(dimension, 32)
            else:
                raise ValueError(f"不支持的FAISS索引类型: {index_type}")
            
            self.databases["faiss"] = {
                "index": index,
                "type": "faiss",
                "dimension": dimension,
                "metadata": [],
                "trained": False
            }
            
            print(f"✓ FAISS索引已配置: {index_type}, 维度: {dimension}")
            return index
            
        except Exception as e:
            print(f"✗ FAISS配置失败: {e}")
            return None
    
    def setup_pinecone(self, api_key, environment, index_name):
        """配置Pinecone"""
        try:
            import pinecone
            
            pinecone.init(
                api_key=api_key,
                environment=environment
            )
            
            # 检查索引是否存在
            if index_name not in pinecone.list_indexes():
                print(f"索引 {index_name} 不存在,请先创建")
                return None
            
            index = pinecone.Index(index_name)
            
            self.databases["pinecone"] = {
                "index": index,
                "type": "pinecone",
                "index_name": index_name
            }
            
            print(f"✓ Pinecone已配置: {index_name}")
            return index
            
        except Exception as e:
            print(f"✗ Pinecone配置失败: {e}")
            return None
    
    def create_collection(self, db_type, collection_name, **kwargs):
        """创建集合"""
        if db_type not in self.databases:
            raise ValueError(f"数据库类型 {db_type} 未配置")
        
        db_info = self.databases[db_type]
        
        if db_type == "chroma":
            client = db_info["client"]
            collection = client.create_collection(
                name=collection_name,
                metadata=kwargs.get("metadata", {})
            )
            db_info["collections"][collection_name] = collection
            print(f"✓ ChromaDB集合已创建: {collection_name}")
            return collection
        
        elif db_type == "faiss":
            # FAISS不需要显式创建集合
            print(f"✓ FAISS集合标识: {collection_name}")
            return collection_name
        
        else:
            raise ValueError(f"不支持的数据库类型: {db_type}")
    
    def add_vectors(self, db_type, collection_name, vectors, metadata=None, ids=None):
        """添加向量"""
        if db_type not in self.databases:
            raise ValueError(f"数据库类型 {db_type} 未配置")
        
        db_info = self.databases[db_type]
        
        if db_type == "chroma":
            collection = db_info["collections"].get(collection_name)
            if not collection:
                raise ValueError(f"集合 {collection_name} 不存在")
            
            # 准备数据
            embeddings = vectors.tolist() if isinstance(vectors, np.ndarray) else vectors
            documents = [meta.get("text", "") for meta in metadata] if metadata else None
            metadatas = metadata if metadata else None
            ids = ids if ids else [f"doc_{i}" for i in range(len(embeddings))]
            
            collection.add(
                embeddings=embeddings,
                documents=documents,
                metadatas=metadatas,
                ids=ids
            )
            
            print(f"✓ 已向ChromaDB添加 {len(embeddings)} 个向量")
        
        elif db_type == "faiss":
            index = db_info["index"]
            
            # 训练索引(如果需要)
            if not db_info["trained"] and hasattr(index, 'train'):
                index.train(vectors)
                db_info["trained"] = True
                print("✓ FAISS索引已训练")
            
            # 添加向量
            index.add(vectors)
            
            # 保存元数据
            if metadata:
                db_info["metadata"].extend(metadata)
            
            print(f"✓ 已向FAISS添加 {len(vectors)} 个向量")
        
        elif db_type == "pinecone":
            index = db_info["index"]
            
            # 准备数据
            vectors_to_upsert = []
            for i, vector in enumerate(vectors):
                vector_id = ids[i] if ids else f"vec_{i}"
                vector_metadata = metadata[i] if metadata else {}
                
                vectors_to_upsert.append({
                    "id": vector_id,
                    "values": vector.tolist() if isinstance(vector, np.ndarray) else vector,
                    "metadata": vector_metadata
                })
            
            index.upsert(vectors=vectors_to_upsert)
            print(f"✓ 已向Pinecone添加 {len(vectors)} 个向量")
    
    def search_vectors(self, db_type, collection_name, query_vector, top_k=5, **kwargs):
        """搜索向量"""
        if db_type not in self.databases:
            raise ValueError(f"数据库类型 {db_type} 未配置")
        
        db_info = self.databases[db_type]
        
        if db_type == "chroma":
            collection = db_info["collections"].get(collection_name)
            if not collection:
                raise ValueError(f"集合 {collection_name} 不存在")
            
            results = collection.query(
                query_embeddings=[query_vector.tolist() if isinstance(query_vector, np.ndarray) else query_vector],
                n_results=top_k,
                include=["documents", "metadatas", "distances"]
            )
            
            return {
                "ids": results["ids"][0],
                "distances": results["distances"][0],
                "documents": results["documents"][0],
                "metadatas": results["metadatas"][0]
            }
        
        elif db_type == "faiss":
            index = db_info["index"]
            metadata = db_info["metadata"]
            
            # 搜索
            query_vector = query_vector.reshape(1, -1) if query_vector.ndim == 1 else query_vector
            distances, indices = index.search(query_vector, top_k)
            
            results = {
                "indices": indices[0].tolist(),
                "distances": distances[0].tolist(),
                "metadatas": [metadata[i] if i < len(metadata) else {} for i in indices[0]]
            }
            
            return results
        
        elif db_type == "pinecone":
            index = db_info["index"]
            
            results = index.query(
                vector=query_vector.tolist() if isinstance(query_vector, np.ndarray) else query_vector,
                top_k=top_k,
                include_metadata=True
            )
            
            return {
                "matches": results["matches"]
            }
    
    def get_database_info(self, db_type):
        """获取数据库信息"""
        if db_type not in self.databases:
            return {"status": "未配置"}
        
        db_info = self.databases[db_type]
        
        if db_type == "chroma":
            client = db_info["client"]
            collections = list(db_info["collections"].keys())
            return {
                "status": "已配置",
                "type": "ChromaDB",
                "collections": collections,
                "collection_count": len(collections)
            }
        
        elif db_type == "faiss":
            index = db_info["index"]
            return {
                "status": "已配置",
                "type": "FAISS",
                "dimension": db_info["dimension"],
                "vector_count": index.ntotal,
                "trained": db_info["trained"]
            }
        
        elif db_type == "pinecone":
            return {
                "status": "已配置",
                "type": "Pinecone",
                "index_name": db_info["index_name"]
            }

# 使用示例
if __name__ == "__main__":
    # 创建向量数据库管理器
    db_manager = VectorDatabaseManager()
    
    # 配置ChromaDB
    chroma_client = db_manager.setup_chromadb()
    if chroma_client:
        collection = db_manager.create_collection("chroma", "test_collection")
    
    # 配置FAISS
    faiss_index = db_manager.setup_faiss(dimension=384)
    
    # 测试向量操作
    test_vectors = np.random.rand(5, 384).astype('float32')
    test_metadata = [
        {"text": f"文档{i}", "source": f"source_{i}"}
        for i in range(5)
    ]
    
    if chroma_client:
        db_manager.add_vectors("chroma", "test_collection", test_vectors, test_metadata)
    
    if faiss_index:
        db_manager.add_vectors("faiss", "test_collection", test_vectors, test_metadata)
    
    # 查看数据库信息
    for db_type in ["chroma", "faiss"]:
        info = db_manager.get_database_info(db_type)
        print(f"\n{db_type.upper()} 信息:")
        for key, value in info.items():
            print(f"  {key}: {value}")

3. 开发工具配置

3.1 IDE和编辑器配置

// .vscode/settings.json - VS Code配置
{
    "python.defaultInterpreterPath": "./rag_env/Scripts/python",
    "python.linting.enabled": true,
    "python.linting.pylintEnabled": false,
    "python.linting.flake8Enabled": true,
    "python.formatting.provider": "black",
    "python.formatting.blackArgs": ["--line-length=88"],
    "python.testing.pytestEnabled": true,
    "python.testing.pytestArgs": ["tests"],
    "files.associations": {
        "*.env": "properties"
    },
    "editor.formatOnSave": true,
    "editor.codeActionsOnSave": {
        "source.organizeImports": true
    },
    "jupyter.notebookFileRoot": "${workspaceFolder}",
    "jupyter.defaultKernel": "rag_env"
}
// .vscode/launch.json - 调试配置
{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "RAG App Debug",
            "type": "python",
            "request": "launch",
            "program": "${workspaceFolder}/app.py",
            "console": "integratedTerminal",
            "env": {
                "PYTHONPATH": "${workspaceFolder}",
                "APP_ENV": "development"
            },
            "envFile": "${workspaceFolder}/.env"
        },
        {
            "name": "RAG Tests",
            "type": "python",
            "request": "launch",
            "module": "pytest",
            "args": ["tests/", "-v"],
            "console": "integratedTerminal",
            "envFile": "${workspaceFolder}/.env"
        },
        {
            "name": "Jupyter Notebook",
            "type": "python",
            "request": "launch",
            "module": "jupyter",
            "args": ["lab", "--port=8888"],
            "console": "integratedTerminal"
        }
    ]
}

3.2 代码质量工具

# .flake8 - Flake8配置
[flake8]
max-line-length = 88
extend-ignore = E203, W503
exclude = 
    .git,
    __pycache__,
    .venv,
    rag_env,
    build,
    dist,
    *.egg-info

per-file-ignores =
    __init__.py:F401
    tests/*:F401,F811
# pyproject.toml - Black和其他工具配置
[tool.black]
line-length = 88
target-version = ['py39']
include = '\.pyi?$'
extend-exclude = '''
(
  /(
      \.eggs
    | \.git
    | \.hg
    | \.mypy_cache
    | \.tox
    | \.venv
    | _build
    | buck-out
    | build
    | dist
  )/
)
'''

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short --strict-markers"
markers = [
    "slow: marks tests as slow (deselect with '-m "not slow"')",
    "integration: marks tests as integration tests",
    "unit: marks tests as unit tests"
]

[tool.coverage.run]
source = ["src"]
omit = [
    "tests/*",
    "*/venv/*",
    "*/rag_env/*"
]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "raise AssertionError",
    "raise NotImplementedError"
]

3.3 监控和日志配置

# logging_config.py - 日志配置
import logging
import sys
from pathlib import Path
from loguru import logger
import json
from datetime import datetime

class RAGLogger:
    """RAG系统日志配置"""
    
    def __init__(self, log_dir="./logs", app_name="rag_app"):
        self.log_dir = Path(log_dir)
        self.app_name = app_name
        self.log_dir.mkdir(exist_ok=True)
        
        # 移除默认处理器
        logger.remove()
        
        self._setup_loggers()
    
    def _setup_loggers(self):
        """设置日志处理器"""
        # 控制台输出
        logger.add(
            sys.stdout,
            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
            level="INFO",
            colorize=True
        )
        
        # 文件输出 - 所有日志
        logger.add(
            self.log_dir / f"{self.app_name}.log",
            format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
            level="DEBUG",
            rotation="10 MB",
            retention="30 days",
            compression="zip"
        )
        
        # 错误日志单独文件
        logger.add(
            self.log_dir / f"{self.app_name}_error.log",
            format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
            level="ERROR",
            rotation="5 MB",
            retention="60 days"
        )
        
        # JSON格式日志(用于分析)
        logger.add(
            self.log_dir / f"{self.app_name}_structured.jsonl",
            format=self._json_formatter,
            level="INFO",
            rotation="20 MB",
            retention="90 days"
        )
    
    def _json_formatter(self, record):
        """JSON格式化器"""
        log_entry = {
            "timestamp": record["time"].isoformat(),
            "level": record["level"].name,
            "logger": record["name"],
            "function": record["function"],
            "line": record["line"],
            "message": record["message"],
            "app": self.app_name
        }
        
        # 添加额外字段
        if record["extra"]:
            log_entry["extra"] = record["extra"]
        
        return json.dumps(log_entry, ensure_ascii=False)
    
    def get_logger(self, name=None):
        """获取日志器"""
        if name:
            return logger.bind(name=name)
        return logger
    
    def log_rag_operation(self, operation, query=None, result=None, duration=None, **kwargs):
        """记录RAG操作"""
        extra_data = {
            "operation": operation,
            "query": query,
            "result_length": len(str(result)) if result else 0,
            "duration_ms": duration,
            **kwargs
        }
        
        logger.bind(**extra_data).info(f"RAG操作: {operation}")
    
    def log_performance_metrics(self, metrics):
        """记录性能指标"""
        logger.bind(metrics=metrics).info("性能指标")

# 使用示例
if __name__ == "__main__":
    # 初始化日志
    rag_logger = RAGLogger()
    logger = rag_logger.get_logger("test")
    
    # 测试日志
    logger.info("RAG系统启动")
    logger.debug("调试信息")
    logger.warning("警告信息")
    logger.error("错误信息")
    
    # 记录RAG操作
    rag_logger.log_rag_operation(
        operation="document_retrieval",
        query="什么是机器学习?",
        result="机器学习是...",
        duration=150,
        num_documents=3
    )
    
    # 记录性能指标
    rag_logger.log_performance_metrics({
        "embedding_time": 50,
        "retrieval_time": 30,
        "generation_time": 200,
        "total_time": 280
    })

4. 项目结构和最佳实践

4.1 推荐项目结构

rag_project/
├── src/
│   ├── __init__.py
│   ├── core/
│   │   ├── __init__.py
│   │   ├── embeddings.py
│   │   ├── retrievers.py
│   │   ├── generators.py
│   │   └── chains.py
│   ├── data/
│   │   ├── __init__.py
│   │   ├── loaders.py
│   │   ├── processors.py
│   │   └── splitters.py
│   ├── vectorstores/
│   │   ├── __init__.py
│   │   ├── chroma_store.py
│   │   ├── faiss_store.py
│   │   └── pinecone_store.py
│   ├── api/
│   │   ├── __init__.py
│   │   ├── routes.py
│   │   ├── models.py
│   │   └── middleware.py
│   └── utils/
│       ├── __init__.py
│       ├── config.py
│       ├── logging.py
│       └── helpers.py
├── tests/
│   ├── __init__.py
│   ├── unit/
│   ├── integration/
│   └── fixtures/
├── notebooks/
│   ├── exploration.ipynb
│   ├── evaluation.ipynb
│   └── examples.ipynb
├── data/
│   ├── raw/
│   ├── processed/
│   └── embeddings/
├── models/
│   ├── embeddings/
│   └── checkpoints/
├── logs/
├── docs/
│   ├── api.md
│   ├── setup.md
│   └── examples.md
├── scripts/
│   ├── setup_env.py
│   ├── process_data.py
│   └── deploy.py
├── config/
│   ├── development.yaml
│   ├── production.yaml
│   └── testing.yaml
├── requirements.txt
├── requirements-dev.txt
├── docker-compose.yml
├── Dockerfile
├── .env.example
├── .gitignore
├── README.md
└── pyproject.toml

4.2 配置管理

# src/utils/config.py - 配置管理
import os
import yaml
from pathlib import Path
from typing import Dict, Any
from dataclasses import dataclass

@dataclass
class EmbeddingConfig:
    provider: str = "huggingface"
    model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
    dimension: int = 384
    batch_size: int = 32

@dataclass
class VectorStoreConfig:
    provider: str = "chroma"
    persist_directory: str = "./chroma_db"
    collection_name: str = "default"
    host: str = None
    port: int = None

@dataclass
class LLMConfig:
    provider: str = "openai"
    model_name: str = "gpt-3.5-turbo"
    temperature: float = 0.7
    max_tokens: int = 1000
    api_key: str = None

@dataclass
class APIConfig:
    host: str = "0.0.0.0"
    port: int = 8000
    debug: bool = False
    cors_origins: list = None

@dataclass
class RAGConfig:
    embedding: EmbeddingConfig
    vectorstore: VectorStoreConfig
    llm: LLMConfig
    api: APIConfig
    
    chunk_size: int = 1000
    chunk_overlap: int = 200
    retrieval_k: int = 3
    
    log_level: str = "INFO"
    log_dir: str = "./logs"
    data_dir: str = "./data"

class ConfigManager:
    """配置管理器"""
    
    def __init__(self, config_path=None, env=None):
        self.env = env or os.getenv("APP_ENV", "development")
        self.config_path = config_path or f"config/{self.env}.yaml"
        self.config = self._load_config()
    
    def _load_config(self) -> RAGConfig:
        """加载配置"""
        # 默认配置
        config_dict = {
            "embedding": {},
            "vectorstore": {},
            "llm": {},
            "api": {}
        }
        
        # 从文件加载
        if Path(self.config_path).exists():
            with open(self.config_path, 'r', encoding='utf-8') as f:
                file_config = yaml.safe_load(f)
                config_dict.update(file_config)
        
        # 从环境变量覆盖
        self._override_from_env(config_dict)
        
        # 创建配置对象
        return RAGConfig(
            embedding=EmbeddingConfig(**config_dict.get("embedding", {})),
            vectorstore=VectorStoreConfig(**config_dict.get("vectorstore", {})),
            llm=LLMConfig(**config_dict.get("llm", {})),
            api=APIConfig(**config_dict.get("api", {})),
            **{k: v for k, v in config_dict.items() 
               if k not in ["embedding", "vectorstore", "llm", "api"]}
        )
    
    def _override_from_env(self, config_dict):
        """从环境变量覆盖配置"""
        env_mappings = {
            "OPENAI_API_KEY": ["llm", "api_key"],
            "EMBEDDING_MODEL": ["embedding", "model_name"],
            "VECTOR_STORE": ["vectorstore", "provider"],
            "CHROMA_HOST": ["vectorstore", "host"],
            "CHROMA_PORT": ["vectorstore", "port"],
            "API_HOST": ["api", "host"],
            "API_PORT": ["api", "port"],
            "LOG_LEVEL": ["log_level"],
            "CHUNK_SIZE": ["chunk_size"],
            "RETRIEVAL_K": ["retrieval_k"]
        }
        
        for env_var, config_path in env_mappings.items():
            value = os.getenv(env_var)
            if value:
                if len(config_path) == 1:
                    # 顶级配置
                    key = config_path[0]
                    if key in ["chunk_size", "retrieval_k"]:
                        config_dict[key] = int(value)
                    else:
                        config_dict[key] = value
                else:
                    # 嵌套配置
                    section, key = config_path
                    if section not in config_dict:
                        config_dict[section] = {}
                    
                    if key == "port":
                        config_dict[section][key] = int(value)
                    else:
                        config_dict[section][key] = value
    
    def get_config(self) -> RAGConfig:
        """获取配置"""
        return self.config
    
    def update_config(self, updates: Dict[str, Any]):
        """更新配置"""
        # 这里可以实现配置的动态更新
        pass
    
    def validate_config(self) -> bool:
        """验证配置"""
        errors = []
        
        # 验证必需的API密钥
        if self.config.llm.provider == "openai" and not self.config.llm.api_key:
            errors.append("OpenAI API密钥未配置")
        
        # 验证向量存储配置
        if self.config.vectorstore.provider == "chroma":
            if self.config.vectorstore.host and not self.config.vectorstore.port:
                errors.append("ChromaDB主机已配置但端口未配置")
        
        # 验证文件路径
        for path_attr in ["log_dir", "data_dir"]:
            path = getattr(self.config, path_attr)
            if not Path(path).exists():
                try:
                    Path(path).mkdir(parents=True, exist_ok=True)
                except Exception as e:
                    errors.append(f"无法创建目录 {path}: {e}")
        
        if errors:
            for error in errors:
                print(f"配置错误: {error}")
            return False
        
        return True

# 使用示例
if __name__ == "__main__":
    # 创建配置管理器
    config_manager = ConfigManager()
    
    # 验证配置
    if config_manager.validate_config():
        print("✓ 配置验证通过")
        
        # 获取配置
        config = config_manager.get_config()
        
        print(f"嵌入模型: {config.embedding.model_name}")
        print(f"向量存储: {config.vectorstore.provider}")
        print(f"LLM模型: {config.llm.model_name}")
        print(f"API端口: {config.api.port}")
    else:
        print("✗ 配置验证失败")

5. 本章总结

本章详细介绍了RAG系统开发环境的搭建和配置,主要内容包括:

核心要点

  1. 环境准备

    • Python 3.9+环境配置
    • 虚拟环境创建和管理
    • Docker容器化部署
  2. 依赖管理

    • 核心库安装(LangChain、ChromaDB等)
    • 可选组件配置(FAISS、Pinecone等)
    • 开发工具集成
  3. 向量数据库

    • ChromaDB本地和远程配置
    • FAISS高性能索引
    • Pinecone云服务集成
  4. 开发工具

    • IDE配置和调试设置
    • 代码质量工具
    • 日志和监控系统

最佳实践

  1. 项目结构

    • 模块化代码组织
    • 清晰的目录层次
    • 配置和代码分离
  2. 配置管理

    • 环境变量优先级
    • 多环境配置支持
    • 配置验证机制
  3. 开发流程

    • 版本控制最佳实践
    • 自动化测试集成
    • 持续集成部署

下一步

环境搭建完成后,下一章我们将学习文档处理与数据预处理,包括: - 多格式文档加载 - 文本清理和标准化 - 智能分块策略 - 元数据管理

通过本章的学习,您已经具备了开发RAG系统的基础环境,可以开始实际的开发工作。