本章概述
本章将详细介绍RAG系统开发所需的环境搭建和工具配置,包括Python环境准备、核心依赖库安装、向量数据库配置、以及开发调试工具的使用。通过本章学习,您将能够搭建完整的RAG开发环境。
学习目标
- 掌握RAG开发环境的搭建方法
- 了解核心依赖库的安装和配置
- 学习向量数据库的部署和使用
- 熟悉开发调试工具和最佳实践
1. 开发环境准备
1.1 Python环境配置
# requirements.txt - RAG项目依赖清单
"""
# 核心框架
langchain==0.1.0
langchain-community==0.0.10
langchain-openai==0.0.5
# 向量数据库
chromadb==0.4.22
faiss-cpu==1.7.4
pinecone-client==3.0.0
weaviate-client==3.25.3
# 嵌入模型
sentence-transformers==2.2.2
openai==1.6.1
huggingface-hub==0.20.2
transformers==4.36.2
# 文档处理
pypdf2==3.0.1
python-docx==1.1.0
beautifulsoup4==4.12.2
markdown==3.5.2
# 数据处理
numpy==1.24.3
pandas==2.0.3
scipy==1.11.4
# Web框架
fastapi==0.104.1
streamlit==1.29.0
flask==3.0.0
# 监控和日志
wandb==0.16.1
mlflow==2.9.2
loguru==0.7.2
# 开发工具
jupyter==1.0.0
ipykernel==6.27.1
pytest==7.4.3
black==23.12.1
flake8==6.1.0
"""
class EnvironmentSetup:
"""环境搭建管理类"""
def __init__(self):
self.python_version = "3.9+"
self.required_packages = [
"langchain", "chromadb", "sentence-transformers",
"openai", "fastapi", "streamlit"
]
self.optional_packages = [
"faiss-cpu", "pinecone-client", "weaviate-client"
]
def check_python_version(self):
"""检查Python版本"""
import sys
version = sys.version_info
if version.major == 3 and version.minor >= 9:
print(f"✓ Python版本检查通过: {version.major}.{version.minor}.{version.micro}")
return True
else:
print(f"✗ Python版本过低: {version.major}.{version.minor}.{version.micro}")
print("请升级到Python 3.9或更高版本")
return False
def create_virtual_environment(self, env_name="rag_env"):
"""创建虚拟环境"""
import subprocess
import os
commands = [
f"python -m venv {env_name}",
f"{env_name}/Scripts/activate" if os.name == 'nt' else f"source {env_name}/bin/activate",
"pip install --upgrade pip"
]
print(f"创建虚拟环境: {env_name}")
for cmd in commands:
print(f"执行: {cmd}")
return {
"env_name": env_name,
"activation_script": f"{env_name}/Scripts/activate" if os.name == 'nt' else f"source {env_name}/bin/activate",
"commands": commands
}
def install_dependencies(self, requirements_file="requirements.txt"):
"""安装依赖包"""
installation_steps = [
{
"step": "基础依赖安装",
"command": f"pip install -r {requirements_file}",
"description": "安装所有必需的Python包"
},
{
"step": "验证安装",
"command": "pip list | grep -E 'langchain|chromadb|sentence-transformers'",
"description": "验证核心包是否正确安装"
},
{
"step": "环境测试",
"command": "python -c 'import langchain; import chromadb; print(\"环境配置成功\")'",
"description": "测试导入核心模块"
}
]
return installation_steps
def generate_env_file(self):
"""生成环境变量配置文件"""
env_template = """
# .env - 环境变量配置文件
# OpenAI API配置
OPENAI_API_KEY=your_openai_api_key_here
OPENAI_API_BASE=https://api.openai.com/v1
# Hugging Face配置
HUGGINGFACE_API_TOKEN=your_hf_token_here
# 向量数据库配置
# Pinecone
PINECONE_API_KEY=your_pinecone_api_key
PINECONE_ENVIRONMENT=your_pinecone_environment
# Weaviate
WEAVIATE_URL=http://localhost:8080
WEAVIATE_API_KEY=your_weaviate_api_key
# ChromaDB
CHROMA_HOST=localhost
CHROMA_PORT=8000
# 应用配置
APP_ENV=development
LOG_LEVEL=INFO
MAX_TOKENS=1000
TEMPERATURE=0.7
# 文件路径
DATA_DIR=./data
LOGS_DIR=./logs
MODELS_DIR=./models
"""
return env_template
# 使用示例
if __name__ == "__main__":
setup = EnvironmentSetup()
# 检查Python版本
if setup.check_python_version():
print("\n开始环境搭建...")
# 创建虚拟环境
venv_info = setup.create_virtual_environment()
print(f"虚拟环境信息: {venv_info}")
# 安装依赖
install_steps = setup.install_dependencies()
print("\n安装步骤:")
for step in install_steps:
print(f"- {step['step']}: {step['command']}")
# 生成环境配置
env_content = setup.generate_env_file()
print("\n环境变量配置模板已生成")
1.2 Docker环境配置
# Dockerfile - RAG应用容器化
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
curl \
git \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建必要目录
RUN mkdir -p data logs models
# 设置环境变量
ENV PYTHONPATH=/app
ENV APP_ENV=production
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "app.py"]
# docker-compose.yml - 完整的RAG服务栈
version: '3.8'
services:
# RAG应用服务
rag-app:
build: .
ports:
- "8000:8000"
environment:
- APP_ENV=development
- CHROMA_HOST=chromadb
- CHROMA_PORT=8000
volumes:
- ./data:/app/data
- ./logs:/app/logs
depends_on:
- chromadb
- redis
networks:
- rag-network
# ChromaDB向量数据库
chromadb:
image: chromadb/chroma:latest
ports:
- "8001:8000"
volumes:
- chroma_data:/chroma/chroma
environment:
- CHROMA_SERVER_HOST=0.0.0.0
- CHROMA_SERVER_HTTP_PORT=8000
networks:
- rag-network
# Redis缓存
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- rag-network
# Elasticsearch (可选)
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- rag-network
# Jupyter Notebook开发环境
jupyter:
build:
context: .
dockerfile: Dockerfile.jupyter
ports:
- "8888:8888"
volumes:
- ./notebooks:/app/notebooks
- ./data:/app/data
environment:
- JUPYTER_ENABLE_LAB=yes
networks:
- rag-network
volumes:
chroma_data:
redis_data:
es_data:
networks:
rag-network:
driver: bridge
# Dockerfile.jupyter - Jupyter开发环境
FROM python:3.9-slim
WORKDIR /app
# 安装Jupyter和RAG依赖
RUN pip install jupyter jupyterlab
COPY requirements.txt .
RUN pip install -r requirements.txt
# 配置Jupyter
RUN jupyter lab --generate-config
RUN echo "c.ServerApp.ip = '0.0.0.0'" >> ~/.jupyter/jupyter_lab_config.py
RUN echo "c.ServerApp.allow_root = True" >> ~/.jupyter/jupyter_lab_config.py
RUN echo "c.ServerApp.token = ''" >> ~/.jupyter/jupyter_lab_config.py
EXPOSE 8888
CMD ["jupyter", "lab", "--port=8888", "--no-browser"]
2. 核心依赖库详解
2.1 LangChain框架
# langchain_setup.py - LangChain配置和使用
from langchain.embeddings import OpenAIEmbeddings, HuggingFaceEmbeddings
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.vectorstores import Chroma, FAISS
from langchain.document_loaders import TextLoader, PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
import os
class LangChainSetup:
"""LangChain框架配置类"""
def __init__(self):
self.embeddings = None
self.llm = None
self.vectorstore = None
self.qa_chain = None
def setup_embeddings(self, provider="openai", model_name=None):
"""配置嵌入模型"""
if provider == "openai":
self.embeddings = OpenAIEmbeddings(
openai_api_key=os.getenv("OPENAI_API_KEY")
)
print("✓ OpenAI嵌入模型已配置")
elif provider == "huggingface":
model_name = model_name or "sentence-transformers/all-MiniLM-L6-v2"
self.embeddings = HuggingFaceEmbeddings(
model_name=model_name
)
print(f"✓ HuggingFace嵌入模型已配置: {model_name}")
else:
raise ValueError(f"不支持的嵌入提供商: {provider}")
return self.embeddings
def setup_llm(self, provider="openai", model_name="gpt-3.5-turbo"):
"""配置语言模型"""
if provider == "openai":
if "gpt-3.5" in model_name or "gpt-4" in model_name:
self.llm = ChatOpenAI(
model_name=model_name,
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
else:
self.llm = OpenAI(
model_name=model_name,
temperature=0.7,
openai_api_key=os.getenv("OPENAI_API_KEY")
)
print(f"✓ OpenAI语言模型已配置: {model_name}")
else:
raise ValueError(f"不支持的LLM提供商: {provider}")
return self.llm
def setup_vectorstore(self, store_type="chroma", persist_directory="./chroma_db"):
"""配置向量存储"""
if not self.embeddings:
raise ValueError("请先配置嵌入模型")
if store_type == "chroma":
self.vectorstore = Chroma(
embedding_function=self.embeddings,
persist_directory=persist_directory
)
print(f"✓ ChromaDB向量存储已配置: {persist_directory}")
elif store_type == "faiss":
# FAISS需要先有文档才能初始化
print("✓ FAISS向量存储配置就绪(需要文档初始化)")
else:
raise ValueError(f"不支持的向量存储类型: {store_type}")
return self.vectorstore
def load_and_process_documents(self, file_paths, chunk_size=1000, chunk_overlap=200):
"""加载和处理文档"""
documents = []
# 文档加载
for file_path in file_paths:
if file_path.endswith('.pdf'):
loader = PyPDFLoader(file_path)
elif file_path.endswith('.txt'):
loader = TextLoader(file_path)
else:
print(f"跳过不支持的文件类型: {file_path}")
continue
docs = loader.load()
documents.extend(docs)
print(f"✓ 已加载文档: {file_path}")
# 文档分割
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
split_docs = text_splitter.split_documents(documents)
print(f"✓ 文档已分割为 {len(split_docs)} 个块")
return split_docs
def create_qa_chain(self, chain_type="stuff"):
"""创建问答链"""
if not self.llm or not self.vectorstore:
raise ValueError("请先配置LLM和向量存储")
# 创建检索器
retriever = self.vectorstore.as_retriever(
search_kwargs={"k": 3}
)
# 自定义提示模板
prompt_template = """
请基于以下上下文信息回答问题。如果上下文中没有相关信息,请说明无法从提供的信息中找到答案。
上下文:
{context}
问题:{question}
回答:
"""
prompt = PromptTemplate(
template=prompt_template,
input_variables=["context", "question"]
)
# 创建QA链
self.qa_chain = RetrievalQA.from_chain_type(
llm=self.llm,
chain_type=chain_type,
retriever=retriever,
chain_type_kwargs={"prompt": prompt},
return_source_documents=True
)
print(f"✓ QA链已创建,类型: {chain_type}")
return self.qa_chain
def query(self, question):
"""执行查询"""
if not self.qa_chain:
raise ValueError("请先创建QA链")
result = self.qa_chain({"query": question})
return {
"question": question,
"answer": result["result"],
"source_documents": result["source_documents"]
}
def get_setup_status(self):
"""获取配置状态"""
status = {
"嵌入模型": "✓" if self.embeddings else "✗",
"语言模型": "✓" if self.llm else "✗",
"向量存储": "✓" if self.vectorstore else "✗",
"问答链": "✓" if self.qa_chain else "✗"
}
return status
# 使用示例
if __name__ == "__main__":
# 创建LangChain配置实例
lc_setup = LangChainSetup()
# 配置组件
lc_setup.setup_embeddings(provider="huggingface")
lc_setup.setup_llm(provider="openai")
lc_setup.setup_vectorstore(store_type="chroma")
# 检查配置状态
status = lc_setup.get_setup_status()
print("\n配置状态:")
for component, status_icon in status.items():
print(f"{component}: {status_icon}")
# 如果有文档,可以继续配置
# documents = lc_setup.load_and_process_documents(["example.pdf"])
# lc_setup.vectorstore.add_documents(documents)
# lc_setup.create_qa_chain()
# result = lc_setup.query("什么是RAG?")
2.2 向量数据库配置
# vector_databases.py - 多种向量数据库配置
import chromadb
import faiss
import numpy as np
from typing import List, Dict, Any
import json
class VectorDatabaseManager:
"""向量数据库管理器"""
def __init__(self):
self.databases = {}
self.current_db = None
def setup_chromadb(self, persist_directory="./chroma_db", host=None, port=None):
"""配置ChromaDB"""
try:
if host and port:
# 远程ChromaDB
client = chromadb.HttpClient(
host=host,
port=port
)
print(f"✓ 连接到远程ChromaDB: {host}:{port}")
else:
# 本地ChromaDB
client = chromadb.PersistentClient(
path=persist_directory
)
print(f"✓ 本地ChromaDB已配置: {persist_directory}")
self.databases["chroma"] = {
"client": client,
"type": "chroma",
"collections": {}
}
return client
except Exception as e:
print(f"✗ ChromaDB配置失败: {e}")
return None
def setup_faiss(self, dimension=384, index_type="flat"):
"""配置FAISS"""
try:
if index_type == "flat":
index = faiss.IndexFlatL2(dimension)
elif index_type == "ivf":
quantizer = faiss.IndexFlatL2(dimension)
index = faiss.IndexIVFFlat(quantizer, dimension, 100)
elif index_type == "hnsw":
index = faiss.IndexHNSWFlat(dimension, 32)
else:
raise ValueError(f"不支持的FAISS索引类型: {index_type}")
self.databases["faiss"] = {
"index": index,
"type": "faiss",
"dimension": dimension,
"metadata": [],
"trained": False
}
print(f"✓ FAISS索引已配置: {index_type}, 维度: {dimension}")
return index
except Exception as e:
print(f"✗ FAISS配置失败: {e}")
return None
def setup_pinecone(self, api_key, environment, index_name):
"""配置Pinecone"""
try:
import pinecone
pinecone.init(
api_key=api_key,
environment=environment
)
# 检查索引是否存在
if index_name not in pinecone.list_indexes():
print(f"索引 {index_name} 不存在,请先创建")
return None
index = pinecone.Index(index_name)
self.databases["pinecone"] = {
"index": index,
"type": "pinecone",
"index_name": index_name
}
print(f"✓ Pinecone已配置: {index_name}")
return index
except Exception as e:
print(f"✗ Pinecone配置失败: {e}")
return None
def create_collection(self, db_type, collection_name, **kwargs):
"""创建集合"""
if db_type not in self.databases:
raise ValueError(f"数据库类型 {db_type} 未配置")
db_info = self.databases[db_type]
if db_type == "chroma":
client = db_info["client"]
collection = client.create_collection(
name=collection_name,
metadata=kwargs.get("metadata", {})
)
db_info["collections"][collection_name] = collection
print(f"✓ ChromaDB集合已创建: {collection_name}")
return collection
elif db_type == "faiss":
# FAISS不需要显式创建集合
print(f"✓ FAISS集合标识: {collection_name}")
return collection_name
else:
raise ValueError(f"不支持的数据库类型: {db_type}")
def add_vectors(self, db_type, collection_name, vectors, metadata=None, ids=None):
"""添加向量"""
if db_type not in self.databases:
raise ValueError(f"数据库类型 {db_type} 未配置")
db_info = self.databases[db_type]
if db_type == "chroma":
collection = db_info["collections"].get(collection_name)
if not collection:
raise ValueError(f"集合 {collection_name} 不存在")
# 准备数据
embeddings = vectors.tolist() if isinstance(vectors, np.ndarray) else vectors
documents = [meta.get("text", "") for meta in metadata] if metadata else None
metadatas = metadata if metadata else None
ids = ids if ids else [f"doc_{i}" for i in range(len(embeddings))]
collection.add(
embeddings=embeddings,
documents=documents,
metadatas=metadatas,
ids=ids
)
print(f"✓ 已向ChromaDB添加 {len(embeddings)} 个向量")
elif db_type == "faiss":
index = db_info["index"]
# 训练索引(如果需要)
if not db_info["trained"] and hasattr(index, 'train'):
index.train(vectors)
db_info["trained"] = True
print("✓ FAISS索引已训练")
# 添加向量
index.add(vectors)
# 保存元数据
if metadata:
db_info["metadata"].extend(metadata)
print(f"✓ 已向FAISS添加 {len(vectors)} 个向量")
elif db_type == "pinecone":
index = db_info["index"]
# 准备数据
vectors_to_upsert = []
for i, vector in enumerate(vectors):
vector_id = ids[i] if ids else f"vec_{i}"
vector_metadata = metadata[i] if metadata else {}
vectors_to_upsert.append({
"id": vector_id,
"values": vector.tolist() if isinstance(vector, np.ndarray) else vector,
"metadata": vector_metadata
})
index.upsert(vectors=vectors_to_upsert)
print(f"✓ 已向Pinecone添加 {len(vectors)} 个向量")
def search_vectors(self, db_type, collection_name, query_vector, top_k=5, **kwargs):
"""搜索向量"""
if db_type not in self.databases:
raise ValueError(f"数据库类型 {db_type} 未配置")
db_info = self.databases[db_type]
if db_type == "chroma":
collection = db_info["collections"].get(collection_name)
if not collection:
raise ValueError(f"集合 {collection_name} 不存在")
results = collection.query(
query_embeddings=[query_vector.tolist() if isinstance(query_vector, np.ndarray) else query_vector],
n_results=top_k,
include=["documents", "metadatas", "distances"]
)
return {
"ids": results["ids"][0],
"distances": results["distances"][0],
"documents": results["documents"][0],
"metadatas": results["metadatas"][0]
}
elif db_type == "faiss":
index = db_info["index"]
metadata = db_info["metadata"]
# 搜索
query_vector = query_vector.reshape(1, -1) if query_vector.ndim == 1 else query_vector
distances, indices = index.search(query_vector, top_k)
results = {
"indices": indices[0].tolist(),
"distances": distances[0].tolist(),
"metadatas": [metadata[i] if i < len(metadata) else {} for i in indices[0]]
}
return results
elif db_type == "pinecone":
index = db_info["index"]
results = index.query(
vector=query_vector.tolist() if isinstance(query_vector, np.ndarray) else query_vector,
top_k=top_k,
include_metadata=True
)
return {
"matches": results["matches"]
}
def get_database_info(self, db_type):
"""获取数据库信息"""
if db_type not in self.databases:
return {"status": "未配置"}
db_info = self.databases[db_type]
if db_type == "chroma":
client = db_info["client"]
collections = list(db_info["collections"].keys())
return {
"status": "已配置",
"type": "ChromaDB",
"collections": collections,
"collection_count": len(collections)
}
elif db_type == "faiss":
index = db_info["index"]
return {
"status": "已配置",
"type": "FAISS",
"dimension": db_info["dimension"],
"vector_count": index.ntotal,
"trained": db_info["trained"]
}
elif db_type == "pinecone":
return {
"status": "已配置",
"type": "Pinecone",
"index_name": db_info["index_name"]
}
# 使用示例
if __name__ == "__main__":
# 创建向量数据库管理器
db_manager = VectorDatabaseManager()
# 配置ChromaDB
chroma_client = db_manager.setup_chromadb()
if chroma_client:
collection = db_manager.create_collection("chroma", "test_collection")
# 配置FAISS
faiss_index = db_manager.setup_faiss(dimension=384)
# 测试向量操作
test_vectors = np.random.rand(5, 384).astype('float32')
test_metadata = [
{"text": f"文档{i}", "source": f"source_{i}"}
for i in range(5)
]
if chroma_client:
db_manager.add_vectors("chroma", "test_collection", test_vectors, test_metadata)
if faiss_index:
db_manager.add_vectors("faiss", "test_collection", test_vectors, test_metadata)
# 查看数据库信息
for db_type in ["chroma", "faiss"]:
info = db_manager.get_database_info(db_type)
print(f"\n{db_type.upper()} 信息:")
for key, value in info.items():
print(f" {key}: {value}")
3. 开发工具配置
3.1 IDE和编辑器配置
// .vscode/settings.json - VS Code配置
{
"python.defaultInterpreterPath": "./rag_env/Scripts/python",
"python.linting.enabled": true,
"python.linting.pylintEnabled": false,
"python.linting.flake8Enabled": true,
"python.formatting.provider": "black",
"python.formatting.blackArgs": ["--line-length=88"],
"python.testing.pytestEnabled": true,
"python.testing.pytestArgs": ["tests"],
"files.associations": {
"*.env": "properties"
},
"editor.formatOnSave": true,
"editor.codeActionsOnSave": {
"source.organizeImports": true
},
"jupyter.notebookFileRoot": "${workspaceFolder}",
"jupyter.defaultKernel": "rag_env"
}
// .vscode/launch.json - 调试配置
{
"version": "0.2.0",
"configurations": [
{
"name": "RAG App Debug",
"type": "python",
"request": "launch",
"program": "${workspaceFolder}/app.py",
"console": "integratedTerminal",
"env": {
"PYTHONPATH": "${workspaceFolder}",
"APP_ENV": "development"
},
"envFile": "${workspaceFolder}/.env"
},
{
"name": "RAG Tests",
"type": "python",
"request": "launch",
"module": "pytest",
"args": ["tests/", "-v"],
"console": "integratedTerminal",
"envFile": "${workspaceFolder}/.env"
},
{
"name": "Jupyter Notebook",
"type": "python",
"request": "launch",
"module": "jupyter",
"args": ["lab", "--port=8888"],
"console": "integratedTerminal"
}
]
}
3.2 代码质量工具
# .flake8 - Flake8配置
[flake8]
max-line-length = 88
extend-ignore = E203, W503
exclude =
.git,
__pycache__,
.venv,
rag_env,
build,
dist,
*.egg-info
per-file-ignores =
__init__.py:F401
tests/*:F401,F811
# pyproject.toml - Black和其他工具配置
[tool.black]
line-length = 88
target-version = ['py39']
include = '\.pyi?$'
extend-exclude = '''
(
/(
\.eggs
| \.git
| \.hg
| \.mypy_cache
| \.tox
| \.venv
| _build
| buck-out
| build
| dist
)/
)
'''
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py", "*_test.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = "-v --tb=short --strict-markers"
markers = [
"slow: marks tests as slow (deselect with '-m "not slow"')",
"integration: marks tests as integration tests",
"unit: marks tests as unit tests"
]
[tool.coverage.run]
source = ["src"]
omit = [
"tests/*",
"*/venv/*",
"*/rag_env/*"
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise AssertionError",
"raise NotImplementedError"
]
3.3 监控和日志配置
# logging_config.py - 日志配置
import logging
import sys
from pathlib import Path
from loguru import logger
import json
from datetime import datetime
class RAGLogger:
"""RAG系统日志配置"""
def __init__(self, log_dir="./logs", app_name="rag_app"):
self.log_dir = Path(log_dir)
self.app_name = app_name
self.log_dir.mkdir(exist_ok=True)
# 移除默认处理器
logger.remove()
self._setup_loggers()
def _setup_loggers(self):
"""设置日志处理器"""
# 控制台输出
logger.add(
sys.stdout,
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
level="INFO",
colorize=True
)
# 文件输出 - 所有日志
logger.add(
self.log_dir / f"{self.app_name}.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="DEBUG",
rotation="10 MB",
retention="30 days",
compression="zip"
)
# 错误日志单独文件
logger.add(
self.log_dir / f"{self.app_name}_error.log",
format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
level="ERROR",
rotation="5 MB",
retention="60 days"
)
# JSON格式日志(用于分析)
logger.add(
self.log_dir / f"{self.app_name}_structured.jsonl",
format=self._json_formatter,
level="INFO",
rotation="20 MB",
retention="90 days"
)
def _json_formatter(self, record):
"""JSON格式化器"""
log_entry = {
"timestamp": record["time"].isoformat(),
"level": record["level"].name,
"logger": record["name"],
"function": record["function"],
"line": record["line"],
"message": record["message"],
"app": self.app_name
}
# 添加额外字段
if record["extra"]:
log_entry["extra"] = record["extra"]
return json.dumps(log_entry, ensure_ascii=False)
def get_logger(self, name=None):
"""获取日志器"""
if name:
return logger.bind(name=name)
return logger
def log_rag_operation(self, operation, query=None, result=None, duration=None, **kwargs):
"""记录RAG操作"""
extra_data = {
"operation": operation,
"query": query,
"result_length": len(str(result)) if result else 0,
"duration_ms": duration,
**kwargs
}
logger.bind(**extra_data).info(f"RAG操作: {operation}")
def log_performance_metrics(self, metrics):
"""记录性能指标"""
logger.bind(metrics=metrics).info("性能指标")
# 使用示例
if __name__ == "__main__":
# 初始化日志
rag_logger = RAGLogger()
logger = rag_logger.get_logger("test")
# 测试日志
logger.info("RAG系统启动")
logger.debug("调试信息")
logger.warning("警告信息")
logger.error("错误信息")
# 记录RAG操作
rag_logger.log_rag_operation(
operation="document_retrieval",
query="什么是机器学习?",
result="机器学习是...",
duration=150,
num_documents=3
)
# 记录性能指标
rag_logger.log_performance_metrics({
"embedding_time": 50,
"retrieval_time": 30,
"generation_time": 200,
"total_time": 280
})
4. 项目结构和最佳实践
4.1 推荐项目结构
rag_project/
├── src/
│ ├── __init__.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── embeddings.py
│ │ ├── retrievers.py
│ │ ├── generators.py
│ │ └── chains.py
│ ├── data/
│ │ ├── __init__.py
│ │ ├── loaders.py
│ │ ├── processors.py
│ │ └── splitters.py
│ ├── vectorstores/
│ │ ├── __init__.py
│ │ ├── chroma_store.py
│ │ ├── faiss_store.py
│ │ └── pinecone_store.py
│ ├── api/
│ │ ├── __init__.py
│ │ ├── routes.py
│ │ ├── models.py
│ │ └── middleware.py
│ └── utils/
│ ├── __init__.py
│ ├── config.py
│ ├── logging.py
│ └── helpers.py
├── tests/
│ ├── __init__.py
│ ├── unit/
│ ├── integration/
│ └── fixtures/
├── notebooks/
│ ├── exploration.ipynb
│ ├── evaluation.ipynb
│ └── examples.ipynb
├── data/
│ ├── raw/
│ ├── processed/
│ └── embeddings/
├── models/
│ ├── embeddings/
│ └── checkpoints/
├── logs/
├── docs/
│ ├── api.md
│ ├── setup.md
│ └── examples.md
├── scripts/
│ ├── setup_env.py
│ ├── process_data.py
│ └── deploy.py
├── config/
│ ├── development.yaml
│ ├── production.yaml
│ └── testing.yaml
├── requirements.txt
├── requirements-dev.txt
├── docker-compose.yml
├── Dockerfile
├── .env.example
├── .gitignore
├── README.md
└── pyproject.toml
4.2 配置管理
# src/utils/config.py - 配置管理
import os
import yaml
from pathlib import Path
from typing import Dict, Any
from dataclasses import dataclass
@dataclass
class EmbeddingConfig:
provider: str = "huggingface"
model_name: str = "sentence-transformers/all-MiniLM-L6-v2"
dimension: int = 384
batch_size: int = 32
@dataclass
class VectorStoreConfig:
provider: str = "chroma"
persist_directory: str = "./chroma_db"
collection_name: str = "default"
host: str = None
port: int = None
@dataclass
class LLMConfig:
provider: str = "openai"
model_name: str = "gpt-3.5-turbo"
temperature: float = 0.7
max_tokens: int = 1000
api_key: str = None
@dataclass
class APIConfig:
host: str = "0.0.0.0"
port: int = 8000
debug: bool = False
cors_origins: list = None
@dataclass
class RAGConfig:
embedding: EmbeddingConfig
vectorstore: VectorStoreConfig
llm: LLMConfig
api: APIConfig
chunk_size: int = 1000
chunk_overlap: int = 200
retrieval_k: int = 3
log_level: str = "INFO"
log_dir: str = "./logs"
data_dir: str = "./data"
class ConfigManager:
"""配置管理器"""
def __init__(self, config_path=None, env=None):
self.env = env or os.getenv("APP_ENV", "development")
self.config_path = config_path or f"config/{self.env}.yaml"
self.config = self._load_config()
def _load_config(self) -> RAGConfig:
"""加载配置"""
# 默认配置
config_dict = {
"embedding": {},
"vectorstore": {},
"llm": {},
"api": {}
}
# 从文件加载
if Path(self.config_path).exists():
with open(self.config_path, 'r', encoding='utf-8') as f:
file_config = yaml.safe_load(f)
config_dict.update(file_config)
# 从环境变量覆盖
self._override_from_env(config_dict)
# 创建配置对象
return RAGConfig(
embedding=EmbeddingConfig(**config_dict.get("embedding", {})),
vectorstore=VectorStoreConfig(**config_dict.get("vectorstore", {})),
llm=LLMConfig(**config_dict.get("llm", {})),
api=APIConfig(**config_dict.get("api", {})),
**{k: v for k, v in config_dict.items()
if k not in ["embedding", "vectorstore", "llm", "api"]}
)
def _override_from_env(self, config_dict):
"""从环境变量覆盖配置"""
env_mappings = {
"OPENAI_API_KEY": ["llm", "api_key"],
"EMBEDDING_MODEL": ["embedding", "model_name"],
"VECTOR_STORE": ["vectorstore", "provider"],
"CHROMA_HOST": ["vectorstore", "host"],
"CHROMA_PORT": ["vectorstore", "port"],
"API_HOST": ["api", "host"],
"API_PORT": ["api", "port"],
"LOG_LEVEL": ["log_level"],
"CHUNK_SIZE": ["chunk_size"],
"RETRIEVAL_K": ["retrieval_k"]
}
for env_var, config_path in env_mappings.items():
value = os.getenv(env_var)
if value:
if len(config_path) == 1:
# 顶级配置
key = config_path[0]
if key in ["chunk_size", "retrieval_k"]:
config_dict[key] = int(value)
else:
config_dict[key] = value
else:
# 嵌套配置
section, key = config_path
if section not in config_dict:
config_dict[section] = {}
if key == "port":
config_dict[section][key] = int(value)
else:
config_dict[section][key] = value
def get_config(self) -> RAGConfig:
"""获取配置"""
return self.config
def update_config(self, updates: Dict[str, Any]):
"""更新配置"""
# 这里可以实现配置的动态更新
pass
def validate_config(self) -> bool:
"""验证配置"""
errors = []
# 验证必需的API密钥
if self.config.llm.provider == "openai" and not self.config.llm.api_key:
errors.append("OpenAI API密钥未配置")
# 验证向量存储配置
if self.config.vectorstore.provider == "chroma":
if self.config.vectorstore.host and not self.config.vectorstore.port:
errors.append("ChromaDB主机已配置但端口未配置")
# 验证文件路径
for path_attr in ["log_dir", "data_dir"]:
path = getattr(self.config, path_attr)
if not Path(path).exists():
try:
Path(path).mkdir(parents=True, exist_ok=True)
except Exception as e:
errors.append(f"无法创建目录 {path}: {e}")
if errors:
for error in errors:
print(f"配置错误: {error}")
return False
return True
# 使用示例
if __name__ == "__main__":
# 创建配置管理器
config_manager = ConfigManager()
# 验证配置
if config_manager.validate_config():
print("✓ 配置验证通过")
# 获取配置
config = config_manager.get_config()
print(f"嵌入模型: {config.embedding.model_name}")
print(f"向量存储: {config.vectorstore.provider}")
print(f"LLM模型: {config.llm.model_name}")
print(f"API端口: {config.api.port}")
else:
print("✗ 配置验证失败")
5. 本章总结
本章详细介绍了RAG系统开发环境的搭建和配置,主要内容包括:
核心要点
环境准备
- Python 3.9+环境配置
- 虚拟环境创建和管理
- Docker容器化部署
依赖管理
- 核心库安装(LangChain、ChromaDB等)
- 可选组件配置(FAISS、Pinecone等)
- 开发工具集成
向量数据库
- ChromaDB本地和远程配置
- FAISS高性能索引
- Pinecone云服务集成
开发工具
- IDE配置和调试设置
- 代码质量工具
- 日志和监控系统
最佳实践
项目结构
- 模块化代码组织
- 清晰的目录层次
- 配置和代码分离
配置管理
- 环境变量优先级
- 多环境配置支持
- 配置验证机制
开发流程
- 版本控制最佳实践
- 自动化测试集成
- 持续集成部署
下一步
环境搭建完成后,下一章我们将学习文档处理与数据预处理,包括: - 多格式文档加载 - 文本清理和标准化 - 智能分块策略 - 元数据管理
通过本章的学习,您已经具备了开发RAG系统的基础环境,可以开始实际的开发工作。