RAG检索增强生成实战

构建企业级知识问答系统

Posted by Feng Yu on October 6, 2024

大语言模型虽然强大,但存在致命弱点:知识截止日期幻觉问题。当你问 GPT-4 关于 2024 年最新事件时,它只能”一脸茫然”;当涉及企业私有数据时,通用模型更是无能为力。

RAG(Retrieval-Augmented Generation,检索增强生成) 为此提供了完美解决方案:它让 LLM 能够”查阅资料”再回答问题,就像给模型配备了一个”外挂知识库”。

RAG 已经成为企业级 AI 应用的标准架构。本文将深入剖析 RAG 的工作原理,并提供从零开始构建生产级系统的完整指南。


一、RAG 核心原理:从”闭卷考试”到”开卷考试”

1.1 传统 LLM vs RAG

传统 LLM(闭卷考试)

用户问题 → LLM → 基于参数化记忆生成答案

问题

  • ❌ 知识过时(训练数据截止于某个时间点)
  • ❌ 无法访问私有数据
  • ❌ 容易产生幻觉(编造事实)
  • ❌ 难以追溯信息来源

RAG 系统(开卷考试)

用户问题 → 向量检索 → 获取相关文档 → LLM(问题 + 文档) → 生成答案

优势

  • ✅ 实时更新知识(只需更新文档库)
  • ✅ 整合私有数据
  • ✅ 答案有据可查(可追溯到源文档)
  • ✅ 降低幻觉率

1.2 RAG 工作流程详解

graph LR
    A[用户提问] --> B[问题向量化]
    B --> C[向量数据库检索]
    C --> D[Top-K 相关文档]
    D --> E[重排序]
    E --> F[构造 Prompt]
    F --> G[LLM 生成答案]
    G --> H[返回结果 + 引用]

核心组件

  1. 文档处理管道
    • 文档加载 → 文本清洗 → 分块(Chunking)→ 向量化(Embedding)→ 存储
  2. 检索管道
    • 问题向量化 → 相似度搜索 → 召回相关片段 → 重排序
  3. 生成管道
    • 构造 Prompt → LLM 推理 → 后处理 → 返回答案

二、文档处理:从原始数据到向量数据库

2.1 文档加载与清洗

from langchain.document_loaders import (
    PyPDFLoader,
    UnstructuredMarkdownLoader,
    TextLoader,
    UnstructuredHTMLLoader
)

def load_documents(file_path):
    """根据文件类型选择加载器"""
    loaders = {
        '.pdf': PyPDFLoader,
        '.md': UnstructuredMarkdownLoader,
        '.txt': TextLoader,
        '.html': UnstructuredHTMLLoader
    }
    
    file_ext = os.path.splitext(file_path)[1]
    loader_class = loaders.get(file_ext, TextLoader)
    
    loader = loader_class(file_path)
    documents = loader.load()
    
    return documents

# 文本清洗
import re

def clean_text(text):
    """清洗文本中的噪声"""
    # 移除多余空白
    text = re.sub(r'\s+', ' ', text)
    
    # 移除特殊字符(保留标点)
    text = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:,。!?;:]', '', text)
    
    # 移除页眉页脚(常见模式)
    text = re.sub(r'第\s*\d+\s*页', '', text)
    text = re.sub(r'Page\s+\d+', '', text, flags=re.IGNORECASE)
    
    return text.strip()

2.2 文档分块(Chunking)策略

为什么需要分块?

  • Embedding 模型有长度限制(通常 512 tokens)
  • 小块更精确,大块更完整 → 需要平衡

策略对比

策略 适用场景 优点 缺点
固定长度 通用文本 简单高效 可能割裂语义
句子边界 自然语言 保持语义完整 长度不均
段落边界 结构化文档 逻辑清晰 可能过长
语义分块 高质量需求 最优语义完整性 计算成本高

代码实现

from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    SpacyTextSplitter,
    TokenTextSplitter
)

# 方法 1:递归字符分割(推荐)
def recursive_chunk(documents, chunk_size=500, chunk_overlap=50):
    """
    递归分割,优先按段落、句子、单词分割
    """
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,  # 重叠部分保持上下文连续性
        separators=["\n\n", "\n", "。", "!", "?", " ", ""],
        length_function=len
    )
    
    chunks = splitter.split_documents(documents)
    return chunks

# 方法 2:语义分块
from sentence_transformers import SentenceTransformer
import numpy as np

class SemanticChunker:
    def __init__(self, model_name='paraphrase-multilingual-MiniLM-L12-v2'):
        self.model = SentenceTransformer(model_name)
        
    def chunk(self, text, threshold=0.5, max_chunk_size=1000):
        """基于语义相似度分块"""
        sentences = self._split_sentences(text)
        embeddings = self.model.encode(sentences)
        
        chunks = []
        current_chunk = [sentences[0]]
        
        for i in range(1, len(sentences)):
            # 计算与当前块的相似度
            similarity = np.dot(
                np.mean([embeddings[j] for j in range(len(current_chunk))], axis=0),
                embeddings[i]
            )
            
            # 判断是否继续添加到当前块
            if similarity > threshold and len(''.join(current_chunk)) < max_chunk_size:
                current_chunk.append(sentences[i])
            else:
                chunks.append(''.join(current_chunk))
                current_chunk = [sentences[i]]
        
        chunks.append(''.join(current_chunk))
        return chunks
    
    def _split_sentences(self, text):
        """句子分割"""
        import re
        sentences = re.split(r'([。!?\n])', text)
        return [''.join(i) for i in zip(sentences[0::2], sentences[1::2])]

# 方法 3:Token 级别分割(适配 LLM)
def token_chunk(documents, chunk_size=512, model_name='gpt-3.5-turbo'):
    """按 token 数量分割(与 LLM 对齐)"""
    splitter = TokenTextSplitter(
        encoding_name='cl100k_base',  # GPT-3.5/4 的 tokenizer
        chunk_size=chunk_size,
        chunk_overlap=50
    )
    
    return splitter.split_documents(documents)

最佳实践建议

# 针对不同文档类型的推荐配置
chunk_configs = {
    "技术文档": {
        "method": "recursive",
        "chunk_size": 800,
        "overlap": 100,
        "separators": ["\n## ", "\n### ", "\n\n", "\n"]
    },
    "对话记录": {
        "method": "sentence",
        "chunk_size": 300,
        "overlap": 0
    },
    "法律合同": {
        "method": "semantic",
        "threshold": 0.6,
        "max_chunk_size": 1000
    },
    "代码库": {
        "method": "code_aware",
        "chunk_size": 600,
        "overlap": 50,
        "separators": ["\nclass ", "\ndef ", "\n\n"]
    }
}

2.3 向量化(Embedding)

常用 Embedding 模型对比

模型 语言 维度 性能 速度 推荐场景
OpenAI text-embedding-3-large 多语言 3072 ⭐⭐⭐⭐⭐ 追求极致效果
bge-large-zh-v1.5 中文 1024 ⭐⭐⭐⭐⭐ 中文优先
sentence-transformers/all-MiniLM-L6-v2 英文 384 ⭐⭐⭐⭐ 轻量级部署
BAAI/bge-m3 多语言 1024 ⭐⭐⭐⭐⭐ 多语言混合

代码实现

from langchain.embeddings import (
    OpenAIEmbeddings,
    HuggingFaceEmbeddings
)

# 方案 1:OpenAI Embeddings(API 调用)
openai_embeddings = OpenAIEmbeddings(
    model="text-embedding-3-large",
    openai_api_key="your-api-key"
)

# 方案 2:本地部署开源模型
hf_embeddings = HuggingFaceEmbeddings(
    model_name="BAAI/bge-large-zh-v1.5",
    model_kwargs={'device': 'cuda'},
    encode_kwargs={'normalize_embeddings': True}  # L2 归一化
)

# 批量向量化
def batch_embed(texts, embeddings_model, batch_size=32):
    """批量处理提高效率"""
    all_embeddings = []
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        batch_embeddings = embeddings_model.embed_documents(batch)
        all_embeddings.extend(batch_embeddings)
    
    return all_embeddings

2.4 向量数据库选择与部署

主流向量数据库对比

数据库 部署方式 性能 易用性 适用规模
Chroma 本地/内存 ⭐⭐⭐ ⭐⭐⭐⭐⭐ 小型(< 100万)
FAISS 本地 ⭐⭐⭐⭐⭐ ⭐⭐⭐ 中大型
Milvus 分布式 ⭐⭐⭐⭐⭐ ⭐⭐⭐ 大规模生产
Pinecone 云服务 ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ 快速原型/生产
Weaviate 本地/云 ⭐⭐⭐⭐ ⭐⭐⭐⭐ 中大型
Qdrant 本地/云 ⭐⭐⭐⭐ ⭐⭐⭐⭐ 中型生产

快速上手:Chroma

from langchain.vectorstores import Chroma

# 创建向量数据库
vectorstore = Chroma.from_documents(
    documents=chunks,
    embedding=hf_embeddings,
    persist_directory="./chroma_db",  # 持久化路径
    collection_name="my_knowledge_base"
)

# 相似度搜索
query = "什么是 RAG?"
results = vectorstore.similarity_search(
    query,
    k=5  # 返回 Top-5
)

# 带分数的搜索
results_with_scores = vectorstore.similarity_search_with_score(
    query,
    k=5
)

for doc, score in results_with_scores:
    print(f"Score: {score:.4f}")
    print(f"Content: {doc.page_content[:100]}...\n")

生产级:Milvus

from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
from langchain.vectorstores import Milvus

# 连接 Milvus
connections.connect("default", host="localhost", port="19530")

# 创建集合
vectorstore = Milvus.from_documents(
    documents=chunks,
    embedding=hf_embeddings,
    connection_args={
        "host": "localhost",
        "port": "19530"
    },
    collection_name="enterprise_kb",
    index_params={
        "metric_type": "IP",  # Inner Product(内积,适合归一化向量)
        "index_type": "IVF_FLAT",
        "params": {"nlist": 128}
    },
    search_params={
        "metric_type": "IP",
        "params": {"nprobe": 10}
    }
)

# 混合搜索(向量 + 元数据过滤)
results = vectorstore.similarity_search(
    query,
    k=10,
    filter={"source": "official_docs", "date": {"$gt": "2024-01-01"}}
)

三、检索策略:从简单到高级

3.1 基础检索:Top-K 相似度搜索

def basic_retrieval(query, vectorstore, k=5):
    """最基础的检索"""
    return vectorstore.similarity_search(query, k=k)

问题

  • 检索结果可能包含冗余信息
  • 无法处理复杂查询(如多跳推理)

3.2 高级检索:重排序(Reranking)

原理:先用快速的向量检索召回候选集(如 Top-50),再用精细模型重排序选出 Top-5。

from sentence_transformers import CrossEncoder

class RerankerRetriever:
    def __init__(self, vectorstore, reranker_model='cross-encoder/ms-marco-MiniLM-L-12-v2'):
        self.vectorstore = vectorstore
        self.reranker = CrossEncoder(reranker_model)
        
    def retrieve(self, query, k=5, initial_k=50):
        # 步骤 1:向量检索(召回阶段)
        candidates = self.vectorstore.similarity_search(query, k=initial_k)
        
        # 步骤 2:重排序(精排阶段)
        pairs = [[query, doc.page_content] for doc in candidates]
        scores = self.reranker.predict(pairs)
        
        # 步骤 3:按分数排序并返回 Top-K
        ranked_results = sorted(
            zip(candidates, scores),
            key=lambda x: x[1],
            reverse=True
        )
        
        return [doc for doc, score in ranked_results[:k]]

# 使用
retriever = RerankerRetriever(vectorstore)
results = retriever.retrieve("RAG 的优势是什么?", k=5)

效果提升

  • 召回率(Recall@50):85% → 95%
  • 精确率(Precision@5):60% → 80%

3.3 混合检索:向量 + 关键词

from langchain.retrievers import BM25Retriever, EnsembleRetriever

class HybridRetriever:
    def __init__(self, documents, vectorstore):
        # BM25 关键词检索
        self.bm25_retriever = BM25Retriever.from_documents(documents)
        self.bm25_retriever.k = 10
        
        # 向量检索
        self.vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})
        
        # 集成检索(加权融合)
        self.ensemble_retriever = EnsembleRetriever(
            retrievers=[self.bm25_retriever, self.vector_retriever],
            weights=[0.4, 0.6]  # BM25: 40%, Vector: 60%
        )
    
    def retrieve(self, query, k=5):
        # 获取混合结果
        results = self.ensemble_retriever.get_relevant_documents(query)
        return results[:k]

# 使用
hybrid_retriever = HybridRetriever(documents, vectorstore)
results = hybrid_retriever.retrieve("RAG 系统架构")

适用场景

  • 专业术语多的领域(医疗、法律)
  • 需要精确匹配的查询(产品型号、代码片段)

3.4 多查询检索(Query Expansion)

from langchain.llms import OpenAI
from langchain.retrievers.multi_query import MultiQueryRetriever

# 自动生成多个查询变体
llm = OpenAI(temperature=0)
multi_query_retriever = MultiQueryRetriever.from_llm(
    retriever=vectorstore.as_retriever(),
    llm=llm
)

# 示例
query = "什么是 RAG?"
# 自动生成的变体可能包括:
# - "RAG 的定义是什么?"
# - "解释一下检索增强生成"
# - "RAG 技术的工作原理"

results = multi_query_retriever.get_relevant_documents(query)

3.5 父文档检索(Parent Document Retriever)

问题:小块检索精确,但可能缺少上下文;大块上下文完整,但检索不精确。

解决方案:用小块检索,返回大块(父文档)。

from langchain.retrievers import ParentDocumentRetriever
from langchain.storage import InMemoryStore

# 父文档存储
parent_store = InMemoryStore()

# 创建父文档检索器
parent_retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=parent_store,
    child_splitter=RecursiveCharacterTextSplitter(chunk_size=200),  # 小块用于检索
    parent_splitter=RecursiveCharacterTextSplitter(chunk_size=2000)  # 大块用于生成
)

# 添加文档
parent_retriever.add_documents(documents)

# 检索(返回父文档)
results = parent_retriever.get_relevant_documents("RAG 实现细节")

四、生成管道:构造 Prompt 与优化

4.1 基础 RAG Prompt

def build_rag_prompt(query, retrieved_docs):
    context = "\n\n".join([
        f"文档 {i+1}\n{doc.page_content}"
        for i, doc in enumerate(retrieved_docs)
    ])
    
    prompt = f"""
请基于以下文档回答问题。如果文档中没有相关信息,请说"文档中未找到答案"。

文档:
{context}

问题:{query}

回答:
"""
    return prompt

4.2 高级 Prompt 工程

def advanced_rag_prompt(query, retrieved_docs, style="professional"):
    # 构造上下文(带来源标记)
    context_parts = []
    for i, doc in enumerate(retrieved_docs):
        source = doc.metadata.get('source', '未知来源')
        context_parts.append(
            f"[文档{i+1} - 来源: {source}]\n{doc.page_content}"
        )
    context = "\n\n".join(context_parts)
    
    # 风格指令
    style_instructions = {
        "professional": "请用专业、准确的语气回答",
        "casual": "请用通俗易懂的语言解释",
        "technical": "请提供技术细节和具体实现",
        "concise": "请用不超过 3 句话简洁回答"
    }
    
    prompt = f"""
你是一个专业的知识助手。请基于提供的文档回答问题。

指导原则:
1. 只使用文档中的信息,不要添加文档外的内容
2. 如果文档中没有答案,明确说明
3. {style_instructions.get(style, '')}
4. 在答案末尾注明引用的文档编号

参考文档:
{context}

用户问题:{query}

你的回答:
"""
    return prompt

4.3 流式生成与引用追踪

from langchain.chains import RetrievalQA
from langchain.llms import OpenAI

class StreamingRAG:
    def __init__(self, vectorstore, llm):
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=vectorstore.as_retriever(search_kwargs={"k": 5}),
            return_source_documents=True
        )
    
    def query(self, question):
        result = self.qa_chain({"query": question})
        
        return {
            "answer": result["result"],
            "sources": [
                {
                    "content": doc.page_content[:200],
                    "metadata": doc.metadata
                }
                for doc in result["source_documents"]
            ]
        }

# 使用
rag_system = StreamingRAG(vectorstore, OpenAI(temperature=0))
response = rag_system.query("RAG 的核心优势?")

print(f"回答:{response['answer']}\n")
print("参考来源:")
for i, source in enumerate(response['sources']):
    print(f"{i+1}. {source['metadata'].get('source', 'Unknown')}")
    print(f"   {source['content']}...\n")

4.4 多轮对话支持

from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain

class ConversationalRAG:
    def __init__(self, vectorstore, llm):
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True,
            output_key="answer"
        )
        
        self.qa_chain = ConversationalRetrievalChain.from_llm(
            llm=llm,
            retriever=vectorstore.as_retriever(),
            memory=self.memory,
            return_source_documents=True
        )
    
    def chat(self, message):
        result = self.qa_chain({"question": message})
        return result["answer"]

# 使用
chat_rag = ConversationalRAG(vectorstore, OpenAI(temperature=0.7))

print(chat_rag.chat("什么是 RAG?"))
# 回答:RAG 是检索增强生成...

print(chat_rag.chat("它有哪些优势?"))  # 上下文理解"它"指 RAG
# 回答:RAG 的优势包括...

五、完整系统实现

5.1 端到端 RAG 系统

import os
from typing import List, Dict
from langchain.document_loaders import DirectoryLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA

class EnterpriseRAGSystem:
    def __init__(
        self,
        data_dir: str,
        persist_dir: str = "./vectorstore",
        embedding_model: str = "BAAI/bge-large-zh-v1.5"
    ):
        self.data_dir = data_dir
        self.persist_dir = persist_dir
        self.embedding_model = embedding_model
        
        # 初始化组件
        self.embeddings = None
        self.vectorstore = None
        self.qa_chain = None
        
    def build(self):
        """构建 RAG 系统"""
        print("步骤 1: 加载文档...")
        documents = self._load_documents()
        print(f"加载了 {len(documents)} 个文档")
        
        print("步骤 2: 文档分块...")
        chunks = self._chunk_documents(documents)
        print(f"生成了 {len(chunks)} 个文档块")
        
        print("步骤 3: 创建向量数据库...")
        self._create_vectorstore(chunks)
        print("向量数据库创建完成")
        
        print("步骤 4: 初始化 QA 链...")
        self._create_qa_chain()
        print("RAG 系统构建完成!")
        
    def _load_documents(self) -> List:
        """加载文档"""
        loader = DirectoryLoader(
            self.data_dir,
            glob="**/*.txt",
            show_progress=True
        )
        return loader.load()
    
    def _chunk_documents(self, documents: List) -> List:
        """文档分块"""
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=500,
            chunk_overlap=50,
            separators=["\n\n", "\n", "。", "!", "?", " ", ""]
        )
        return splitter.split_documents(documents)
    
    def _create_vectorstore(self, chunks: List):
        """创建向量数据库"""
        self.embeddings = HuggingFaceEmbeddings(
            model_name=self.embedding_model,
            model_kwargs={'device': 'cuda'},
            encode_kwargs={'normalize_embeddings': True}
        )
        
        self.vectorstore = Chroma.from_documents(
            documents=chunks,
            embedding=self.embeddings,
            persist_directory=self.persist_dir
        )
        self.vectorstore.persist()
    
    def _create_qa_chain(self):
        """创建 QA 链"""
        llm = OpenAI(temperature=0, model_name="gpt-3.5-turbo")
        
        self.qa_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=self.vectorstore.as_retriever(
                search_kwargs={"k": 5}
            ),
            return_source_documents=True
        )
    
    def query(self, question: str) -> Dict:
        """查询"""
        if self.qa_chain is None:
            raise ValueError("请先调用 build() 构建系统")
        
        result = self.qa_chain({"query": question})
        
        return {
            "answer": result["result"],
            "sources": [
                {
                    "content": doc.page_content,
                    "source": doc.metadata.get("source", "Unknown")
                }
                for doc in result["source_documents"]
            ]
        }
    
    def load(self):
        """从磁盘加载已有的向量数据库"""
        self.embeddings = HuggingFaceEmbeddings(
            model_name=self.embedding_model
        )
        
        self.vectorstore = Chroma(
            persist_directory=self.persist_dir,
            embedding_function=self.embeddings
        )
        
        self._create_qa_chain()
        print("RAG 系统加载完成!")

# 使用示例
if __name__ == "__main__":
    # 构建新系统
    rag = EnterpriseRAGSystem(
        data_dir="./knowledge_base",
        persist_dir="./vectorstore"
    )
    rag.build()
    
    # 或加载已有系统
    # rag.load()
    
    # 查询
    response = rag.query("什么是 RAG?")
    print(f"回答:{response['answer']}\n")
    print("参考来源:")
    for i, source in enumerate(response['sources']):
        print(f"{i+1}. {source['source']}")

5.2 优化与监控

性能监控

import time
from functools import wraps

def monitor_performance(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        print(f"[性能] {func.__name__} 耗时: {end_time - start_time:.2f}s")
        return result
    return wrapper

class MonitoredRAG(EnterpriseRAGSystem):
    @monitor_performance
    def query(self, question: str) -> Dict:
        return super().query(question)

缓存机制

from functools import lru_cache
import hashlib

class CachedRAG(EnterpriseRAGSystem):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = {}
    
    def query(self, question: str) -> Dict:
        # 生成问题的哈希值作为缓存键
        cache_key = hashlib.md5(question.encode()).hexdigest()
        
        if cache_key in self.cache:
            print("[缓存命中]")
            return self.cache[cache_key]
        
        result = super().query(question)
        self.cache[cache_key] = result
        return result

六、高级技巧与优化

6.1 假设性文档嵌入(HyDE)

原理:让 LLM 先生成一个”假想的答案”,用这个答案去检索,而不是直接用问题检索。

class HyDERetriever:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
    
    def retrieve(self, query, k=5):
        # 步骤 1:生成假想文档
        hypothetical_doc_prompt = f"""
请针对以下问题,写一段假想的答案(即使你不确定,也请编造一个合理的答案):

问题:{query}

假想答案:
"""
        hypothetical_doc = self.llm(hypothetical_doc_prompt)
        
        # 步骤 2:用假想文档检索
        results = self.vectorstore.similarity_search(hypothetical_doc, k=k)
        
        return results

# 使用
hyde_retriever = HyDERetriever(vectorstore, OpenAI(temperature=0.7))
results = hyde_retriever.retrieve("RAG 如何处理长文档?")

6.2 自查询检索(Self-Query)

问题:用户问题可能包含元数据过滤需求。

示例

  • “2024年后发表的关于 RAG 的论文”
  • “价格低于 1000 元的笔记本电脑推荐”
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever

# 定义元数据描述
metadata_field_info = [
    AttributeInfo(
        name="source",
        description="文档来源(官方文档、博客、论文等)",
        type="string"
    ),
    AttributeInfo(
        name="date",
        description="发布日期(格式:YYYY-MM-DD)",
        type="string"
    ),
    AttributeInfo(
        name="author",
        description="作者名称",
        type="string"
    )
]

document_content_description = "技术文档和教程"

# 创建自查询检索器
self_query_retriever = SelfQueryRetriever.from_llm(
    llm=OpenAI(temperature=0),
    vectorstore=vectorstore,
    document_contents=document_content_description,
    metadata_field_info=metadata_field_info,
    verbose=True
)

# 使用(会自动提取过滤条件)
results = self_query_retriever.get_relevant_documents(
    "2024年发布的关于 RAG 的文章"
)
# 自动转换为:
# filter = {"date": {"$gte": "2024-01-01"}}
# query = "RAG"

6.3 长上下文压缩

问题:检索到的文档可能很长,包含大量无关内容。

解决方案:用 LLM 压缩文档,只保留与问题相关的部分。

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor

# 创建压缩器
compressor = LLMChainExtractor.from_llm(OpenAI(temperature=0))

# 包装原有检索器
compression_retriever = ContextualCompressionRetriever(
    base_compressor=compressor,
    base_retriever=vectorstore.as_retriever()
)

# 使用
query = "RAG 的核心组件有哪些?"
compressed_docs = compression_retriever.get_relevant_documents(query)

# compressed_docs 只包含与"核心组件"相关的句子

七、评估与优化

7.1 评估指标

检索阶段

  • Recall@K:Top-K 结果中包含正确文档的比例
  • MRR(Mean Reciprocal Rank):第一个正确结果的排名倒数的平均值
  • NDCG(Normalized Discounted Cumulative Gain):考虑排序质量的指标

生成阶段

  • Faithfulness:答案是否忠实于检索文档(无幻觉)
  • Answer Relevancy:答案与问题的相关性
  • Context Relevancy:检索文档与问题的相关性

7.2 使用 RAGAS 评估

from ragas import evaluate
from ragas.metrics import (
    faithfulness,
    answer_relevancy,
    context_relevancy,
    context_recall
)
from datasets import Dataset

# 准备评估数据
eval_data = {
    "question": ["什么是 RAG?", "RAG 的优势是什么?"],
    "answer": ["RAG 是检索增强生成...", "RAG 的优势包括..."],
    "contexts": [
        ["RAG 定义文档1", "RAG 定义文档2"],
        ["RAG 优势文档1", "RAG 优势文档2"]
    ],
    "ground_truths": [
        ["标准答案1"],
        ["标准答案2"]
    ]
}

dataset = Dataset.from_dict(eval_data)

# 评估
result = evaluate(
    dataset,
    metrics=[
        faithfulness,
        answer_relevancy,
        context_relevancy,
        context_recall
    ]
)

print(result)
# {
#     'faithfulness': 0.95,
#     'answer_relevancy': 0.88,
#     'context_relevancy': 0.82,
#     'context_recall': 0.90
# }

7.3 A/B 测试框架

class RAGABTest:
    def __init__(self, rag_a, rag_b, eval_dataset):
        self.rag_a = rag_a
        self.rag_b = rag_b
        self.eval_dataset = eval_dataset
    
    def run_test(self):
        results_a = []
        results_b = []
        
        for example in self.eval_dataset:
            # 测试版本 A
            response_a = self.rag_a.query(example['question'])
            score_a = self.evaluate_response(
                response_a['answer'],
                example['ground_truth']
            )
            results_a.append(score_a)
            
            # 测试版本 B
            response_b = self.rag_b.query(example['question'])
            score_b = self.evaluate_response(
                response_b['answer'],
                example['ground_truth']
            )
            results_b.append(score_b)
        
        return self.generate_report(results_a, results_b)
    
    def evaluate_response(self, answer, ground_truth):
        # 使用 BLEU、ROUGE 或语义相似度评估
        from sentence_transformers import SentenceTransformer, util
        model = SentenceTransformer('paraphrase-multilingual-MiniLM-L12-v2')
        
        emb1 = model.encode(answer)
        emb2 = model.encode(ground_truth)
        
        return util.cos_sim(emb1, emb2).item()

八、生产部署

8.1 FastAPI 服务

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

# 全局加载 RAG 系统
rag_system = EnterpriseRAGSystem(
    data_dir="./knowledge_base",
    persist_dir="./vectorstore"
)
rag_system.load()

class QueryRequest(BaseModel):
    question: str
    top_k: int = 5

class QueryResponse(BaseModel):
    answer: str
    sources: list

@app.post("/query", response_model=QueryResponse)
async def query(request: QueryRequest):
    try:
        result = rag_system.query(request.question)
        return QueryResponse(**result)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

# 运行:uvicorn main:app --host 0.0.0.0 --port 8000

8.2 Docker 部署

# Dockerfile
FROM python:3.10-slim

WORKDIR /app

# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制代码
COPY . .

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'

services:
  rag-api:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./vectorstore:/app/vectorstore
      - ./knowledge_base:/app/knowledge_base
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}

九、总结与最佳实践

核心要点

  1. 文档处理
    • 选择合适的分块策略(推荐 chunk_size=500, overlap=50)
    • 使用高质量 Embedding 模型(中文优选 bge-large-zh)
    • 保留元数据(来源、时间等)用于过滤
  2. 检索策略
    • 混合检索(向量 + BM25)提升召回率
    • 重排序(Reranker)提升精确率
    • 父文档检索平衡精确度和上下文完整性
  3. 生成优化
    • 明确指示模型只使用文档内容
    • 要求标注引用来源
    • 使用 Self-Consistency 提升稳定性
  4. 系统优化
    • 缓存常见问题
    • 批量处理提高吞吐量
    • 异步处理长时任务

避坑指南

常见错误

  1. 分块过大导致检索不精确
  2. 分块过小导致上下文丢失
  3. 未清洗文档噪声(页眉页脚、特殊符号)
  4. 检索结果未去重
  5. 未设置 Prompt 长度限制(超出 LLM 上下文窗口)

推荐实践

  1. 在真实数据上评估不同分块策略
  2. 使用重排序提升前 5 名结果质量
  3. 添加元数据过滤功能
  4. 记录查询日志用于优化
  5. 定期更新知识库

参考资源

论文

工具与框架

向量数据库

教程

下一篇文章将探讨 LLM Agent 开发指南,敬请期待!


💬 交流与讨论

⚠️ 尚未完成 Giscus 配置。请在 _config.yml 中设置 repo_idcategory_id 后重新部署,即可启用升级后的评论系统。

配置完成后,评论区将自动支持 Markdown 代码高亮与 LaTeX 数学公式渲染,访客回复会同步到 GitHub Discussions,并具备通知功能。