RAG 2.0实战:2026年最新检索增强生成架构#
引言#
检索增强生成(Retrieval-Augmented Generation, RAG)自2020年被Facebook AI Research首次提出以来,已经成为大语言模型(LLM)应用中最重要的范式之一。到2026年,RAG已经从最初简单的"检索+拼接+生成"模式,演进到了一个全新的阶段——RAG 2.0。
本文将全面剖析RAG 2.0的核心架构,涵盖混合搜索、重排序(Reranking)、知识图谱增强RAG(Graph RAG)、智能体驱动RAG(Agentic RAG)等最新技术,并提供完整的Python代码实战。无论你是刚接触RAG的新手,还是想要升级现有系统的资深工程师,这篇文章都将为你提供清晰的路线图。
一、从RAG 1.0到RAG 2.0:架构演进之路#
1.1 RAG 1.0的局限性#
RAG 1.0的核心流程非常简单:
用户查询 → 向量检索 → 拼接上下文 → LLM生成回答这种朴素的实现方式存在几个关键问题:
- 检索质量不稳定:纯向量语义搜索在关键词匹配场景下表现不佳
- 上下文窗口浪费:简单拼接所有检索结果,包含大量冗余信息
- 缺乏推理能力:无法处理需要多跳推理的复杂问题
- 无自我纠正机制:检索到错误文档时,模型会"自信地"给出错误答案
1.2 RAG 2.0的核心改进#
RAG 2.0引入了多项关键改进:
| 特性 | RAG 1.0 | RAG 2.0 |
|---|---|---|
| 检索方式 | 纯向量搜索 | 混合搜索(向量+关键词+图谱) |
| 结果处理 | 直接拼接 | 智能重排序+压缩 |
| 推理能力 | 单跳 | 多跳推理(Agentic RAG) |
| 自我纠正 | 无 | 自动验证+回溯 |
| 知识整合 | 平面文档 | 知识图谱+层次化索引 |
二、向量数据库选型:2026年主流方案对比#
在构建RAG系统时,向量数据库是最关键的基础设施之一。以下是对2026年四大主流向量数据库的详细对比:
2.1 主流向量数据库对比#
| 特性 | Pinecone | Weaviate | Chroma | Milvus |
|---|---|---|---|---|
| 部署方式 | 全托管云服务 | 自托管/云 | 嵌入式/轻量服务 | 自托管/云 |
| 延迟 | 极低(<10ms) | 低(<20ms) | 极低(本地) | 低(<15ms) |
| 最大向量数 | 100亿+ | 10亿+ | 千万级 | 100亿+ |
| 混合搜索 | ✅ 原生支持 | ✅ BM25+向量 | ⚠️ 基础支持 | ✅ 原生支持 |
| 多租户 | ✅ | ✅ | ⚠️ | ✅ |
| 价格 | 按量付费 | 开源免费/云付费 | 完全开源 | 开源/企业版 |
| 适用场景 | 生产级大规模 | 功能丰富型 | 快速原型 | 超大规模 |
选型建议:
- 快速原型/个人项目:Chroma——零配置,pip install即可使用
- 中小规模生产环境:Weaviate——功能全面,社区活跃
- 大规模生产环境:Milvus——高并发,分布式架构成熟
- 全托管无运维:Pinecone——开箱即用,自动扩缩容
2.2 Milvus快速上手#
以下是使用Milvus作为向量数据库的完整示例:
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
from sentence_transformers import SentenceTransformer
import numpy as np
# 连接Milvus
connections.connect("default", host="localhost", port="19530")
# 定义集合Schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=512),
]
schema = CollectionSchema(fields, description="RAG 2.0 document store")
collection = Collection("rag_documents", schema)
# 创建混合索引:向量索引 + 标量索引
index_params = {
"metric_type": "COSINE",
"index_type": "HNSW",
"params": {"M": 16, "efConstruction": 256}
}
collection.create_index("embedding", index_params)
collection.create_index("source", {"index_type": "TRIE"})
# 加载集合到内存
collection.load()三、混合搜索(Hybrid Search):RAG 2.0的核心引擎#
3.1 为什么需要混合搜索?#
纯向量搜索擅长捕捉语义相似性,但在精确关键词匹配上表现不佳。例如:
- 查询"RFC 7231"——向量搜索可能返回与HTTP相关但不是RFC 7231的内容
- 查询"Python 3.12的新特性"——向量搜索可能返回Python 3.11甚至3.10的内容
混合搜索结合了稠密向量搜索(语义匹配)和稀疏向量搜索(关键词匹配,如BM25),取两者之长。
3.2 混合搜索实现#
import numpy as np
from sentence_transformers import SentenceTransformer
from rank_bm25 import BM25Okapi
from pymilvus import Collection
from typing import List, Dict, Tuple
import jieba
class HybridSearchEngine:
"""RAG 2.0 混合搜索引擎:稠密向量 + 稀疏BM25 + RRF融合"""
def __init__(self, collection_name: str = "rag_documents"):
self.dense_model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
self.collection = Collection(collection_name)
self.reranker = None # 延迟加载重排序模型
def dense_search(self, query: str, top_k: int = 20) -> List[Dict]:
"""稠密向量搜索:语义相似性"""
embedding = self.dense_model.encode(query).tolist()
self.collection.load()
results = self.collection.search(
data=[embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 128}},
limit=top_k,
output_fields=["text", "source"]
)
return [
{
"id": hit.id,
"text": hit.entity.get("text"),
"source": hit.entity.get("source"),
"score": hit.score,
"method": "dense"
}
for hit in results[0]
]
def sparse_search(self, query: str, corpus: List[str], top_k: int = 20) -> List[Dict]:
"""稀疏搜索:BM25关键词匹配"""
# 中文需要分词
tokenized_corpus = [list(jieba.cut(doc)) for doc in corpus]
tokenized_query = list(jieba.cut(query))
bm25 = BM25Okapi(tokenized_corpus)
scores = bm25.get_scores(tokenized_query)
top_indices = np.argsort(scores)[::-1][:top_k]
return [
{
"text": corpus[idx],
"score": float(scores[idx]),
"method": "sparse",
"index": idx
}
for idx in top_indices
]
def reciprocal_rank_fusion(
self,
results_lists: List[List[Dict]],
k: int = 60
) -> List[Dict]:
"""Reciprocal Rank Fusion (RRF) 融合多路检索结果"""
fused_scores = {}
for results in results_lists:
for rank, item in enumerate(results):
doc_id = item.get("id", item.get("text", ""))
if doc_id not in fused_scores:
fused_scores[doc_id] = {"item": item, "score": 0.0}
fused_scores[doc_id]["score"] += 1.0 / (k + rank + 1)
# 按融合分数排序
sorted_results = sorted(
fused_scores.values(),
key=lambda x: x["score"],
reverse=True
)
return [item["item"] for item in sorted_results]
def hybrid_search(self, query: str, corpus: List[str], top_k: int = 10) -> List[Dict]:
"""执行混合搜索"""
dense_results = self.dense_search(query, top_k=20)
sparse_results = self.sparse_search(query, corpus, top_k=20)
# RRF融合
fused = self.reciprocal_rank_fusion([dense_results, sparse_results])
return fused[:top_k]
# 使用示例
engine = HybridSearchEngine()
corpus = [
"RAG 2.0架构采用混合搜索策略,结合稠密和稀疏向量",
"Milvus是2026年最受欢迎的开源向量数据库之一",
"Graph RAG通过知识图谱增强检索质量",
"Agentic RAG使用智能体来协调多步检索推理",
]
results = engine.hybrid_search("什么是混合搜索?", corpus, top_k=3)
for r in results:
print(f"[{r.get('method', 'fused')}] {r['text'][:50]}... (score: {r.get('score', 'N/A')})")四、智能重排序(Reranking)#
4.1 为什么需要重排序?#
混合搜索虽然提升了召回率,但返回的候选集中仍然可能包含相关性较低的文档。重排序(Reranking)作为第二阶段,使用更精细的模型对候选文档进行重新排序。
4.2 Cross-Encoder重排序实现#
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
from typing import List, Dict
class Reranker:
"""RAG 2.0 重排序器:使用Cross-Encoder模型进行精细排序"""
def __init__(self, model_name: str = "BAAI/bge-reranker-v2.5-gemma2-lightweight"):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.model.eval()
@torch.no_grad()
def rerank(self, query: str, documents: List[Dict], top_k: int = 5) -> List[Dict]:
"""对候选文档进行重排序"""
pairs = [(query, doc["text"]) for doc in documents]
inputs = self.tokenizer(
[p[0] for p in pairs],
[p[1] for p in pairs],
padding=True,
truncation=True,
max_length=512,
return_tensors="pt"
)
scores = self.model(**inputs).logits.squeeze(-1)
scores = torch.sigmoid(scores).numpy()
# 将分数附加到文档并重新排序
for doc, score in zip(documents, scores):
doc["rerank_score"] = float(score)
reranked = sorted(documents, key=lambda x: x["rerank_score"], reverse=True)
return reranked[:top_k]
# 在混合搜索管道中集成重排序
class RAG2Pipeline:
"""完整的RAG 2.0检索管道"""
def __init__(self):
self.search_engine = HybridSearchEngine()
self.reranker = Reranker()
def retrieve(self, query: str, corpus: List[str], final_k: int = 5) -> List[Dict]:
"""三阶段检索:混合搜索 → 重排序 → 精选"""
# 第一阶段:混合搜索获取候选集
candidates = self.search_engine.hybrid_search(query, corpus, top_k=20)
print(f"第一阶段:混合搜索返回 {len(candidates)} 个候选")
# 第二阶段:Cross-Encoder重排序
reranked = self.reranker.rerank(query, candidates, top_k=final_k)
print(f"第二阶段:重排序保留 {len(reranked)} 个文档")
return reranked五、Graph RAG:知识图谱增强检索#
5.1 Graph RAG的核心思想#
传统的RAG将文档视为独立的文本块,忽略了文档之间的关系。Graph RAG通过构建和利用知识图谱,能够:
- 捕捉实体之间的关系(如"公司A收购了公司B")
- 支持多跳推理(如"A的CEO毕业于哪所大学?")
- 提供结构化的上下文信息
5.2 Graph RAG实现#
import networkx as nx
from typing import List, Dict, Tuple, Set
import requests
import json
class GraphRAG:
"""RAG 2.0 知识图谱增强检索"""
def __init__(self):
self.graph = nx.DiGraph()
self.entity_index = {} # entity -> [chunk_ids]
def build_graph_from_chunks(self, chunks: List[Dict]) -> None:
"""从文本块中提取实体和关系,构建知识图谱"""
for chunk in chunks:
chunk_id = chunk["id"]
text = chunk["text"]
# 使用LLM提取实体和关系(通过XiDao API)
entities, relations = self._extract_entities_relations(text)
# 添加实体节点
for entity in entities:
if not self.graph.has_node(entity["name"]):
self.graph.add_node(
entity["name"],
type=entity["type"],
description=entity.get("description", "")
)
# 更新实体索引
if entity["name"] not in self.entity_index:
self.entity_index[entity["name"]] = []
self.entity_index[entity["name"]].append(chunk_id)
# 添加关系边
for rel in relations:
self.graph.add_edge(
rel["source"],
rel["target"],
relation=rel["relation"],
chunk_id=chunk_id
)
def _extract_entities_relations(self, text: str) -> Tuple[List, List]:
"""使用XiDao API调用LLM提取实体和关系"""
response = requests.post(
"https://api.xidao.online/v1/chat/completions",
headers={
"Authorization": "Bearer YOUR_XIDAO_API_KEY",
"Content-Type": "application/json"
},
json={
"model": "claude-4.7-sonnet",
"messages": [
{
"role": "system",
"content": "你是一个知识图谱构建助手。从文本中提取实体和关系,以JSON格式返回。"
},
{
"role": "user",
"content": f"""从以下文本中提取实体和关系:
{text}
返回JSON格式:
{{
"entities": [{{"name": "实体名", "type": "类型", "description": "描述"}}],
"relations": [{{"source": "源实体", "target": "目标实体", "relation": "关系"}}]
}}"""
}
],
"temperature": 0.1,
"max_tokens": 2000
}
)
result = response.json()
content = result["choices"][0]["message"]["content"]
parsed = json.loads(content)
return parsed.get("entities", []), parsed.get("relations", [])
def graph_enhanced_search(self, query: str, top_k: int = 5) -> List[str]:
"""图增强搜索:结合实体链接和图遍历"""
# 提取查询中的实体
query_entities = self._extract_query_entities(query)
# 从图中扩展相关实体(2跳邻居)
related_entities: Set[str] = set()
for entity in query_entities:
if entity in self.graph:
related_entities.add(entity)
# 1跳邻居
for neighbor in self.graph.neighbors(entity):
related_entities.add(neighbor)
# 2跳邻居
for second_hop in self.graph.neighbors(neighbor):
related_entities.add(second_hop)
# 收集相关的文本块ID
relevant_chunk_ids = set()
for entity in related_entities:
if entity in self.entity_index:
relevant_chunk_ids.update(self.entity_index[entity])
return list(relevant_chunk_ids)[:top_k]
def get_subgraph_context(self, query: str) -> str:
"""获取与查询相关的子图上下文,作为LLM的额外输入"""
query_entities = self._extract_query_entities(query)
context_lines = []
for entity in query_entities:
if entity in self.graph:
# 获取实体属性
node_data = self.graph.nodes[entity]
context_lines.append(f"【{entity}】类型:{node_data.get('type', '未知')}")
# 获取关系
for _, target, data in self.graph.edges(entity, data=True):
rel = data.get("relation", "相关")
context_lines.append(f" → {rel} → {target}")
return "\n".join(context_lines) if context_lines else "未找到相关图谱信息"
def _extract_query_entities(self, query: str) -> List[str]:
"""从查询中提取实体(简化实现)"""
entities = []
for entity in self.entity_index:
if entity in query:
entities.append(entity)
return entities六、Agentic RAG:智能体驱动的自适应检索#
6.1 Agentic RAG的核心理念#
Agentic RAG是2026年最前沿的RAG架构范式。它不再被动地执行"检索→生成",而是让一个智能体(Agent)主动决定:
- 是否需要检索:简单问题直接由LLM回答
- 如何检索:选择最适合的检索策略(向量/关键词/图谱)
- 是否需要更多证据:如果当前检索结果不足以回答,自动发起二次检索
- 是否需要分解问题:将复杂问题拆解为子问题分别检索
6.2 Agentic RAG完整实现#
from typing import List, Dict, Optional, Literal
from dataclasses import dataclass, field
import requests
import json
@dataclass
class RAGState:
"""RAG智能体的状态"""
original_query: str = ""
sub_queries: List[str] = field(default_factory=list)
retrieved_docs: List[Dict] = field(default_factory=list)
intermediate_answers: List[str] = field(default_factory=list)
final_answer: str = ""
iteration: int = 0
max_iterations: int = 5
confidence: float = 0.0
class AgenticRAG:
"""
RAG 2.0 Agentic RAG 实现
使用LLM智能体自主决策检索策略
"""
def __init__(self, xidao_api_key: str):
self.api_key = xidao_api_key
self.api_url = "https://api.xidao.online/v1/chat/completions"
self.pipeline = RAG2Pipeline()
self.graph_rag = GraphRAG()
def _call_llm(self, messages: List[Dict], model: str = "gpt-5.5", temperature: float = 0.1) -> str:
"""通过XiDao API调用LLM"""
response = requests.post(
self.api_url,
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": model,
"messages": messages,
"temperature": temperature,
"max_tokens": 4096
}
)
result = response.json()
return result["choices"][0]["message"]["content"]
def plan(self, state: RAGState) -> RAGState:
"""规划阶段:决定如何处理查询"""
planning_prompt = f"""你是一个RAG系统的规划智能体。分析以下用户查询,并决定最佳的处理策略。
用户查询:{state.original_query}
可选策略:
1. DIRECT_ANSWER - 查询简单,无需检索,直接回答
2. SINGLE_SEARCH - 需要一次检索
3. MULTI_SEARCH - 需要多角度检索
4. DECOMPOSE - 复杂问题需要分解为子问题
5. GRAPH_SEARCH - 涉及实体关系,需要图谱检索
请返回JSON格式:
{{"strategy": "策略名称", "reasoning": "理由", "sub_queries": ["子查询1", "子查询2"], "search_type": "dense/sparse/hybrid/graph"}}"""
response = self._call_llm([
{"role": "system", "content": "你是一个智能检索规划器。"},
{"role": "user", "content": planning_prompt}
])
plan = json.loads(response)
state.sub_queries = plan.get("sub_queries", [state.original_query])
print(f"📋 规划决策: {plan['strategy']} - {plan['reasoning']}")
return state
def retrieve(self, state: RAGState, corpus: List[str]) -> RAGState:
"""检索阶段:根据规划执行检索"""
all_docs = []
for sub_query in state.sub_queries:
docs = self.pipeline.retrieve(sub_query, corpus, final_k=5)
all_docs.extend(docs)
# 去重
seen_texts = set()
unique_docs = []
for doc in all_docs:
if doc["text"] not in seen_texts:
seen_texts.add(doc["text"])
unique_docs.append(doc)
state.retrieved_docs = unique_docs
print(f"🔍 检索到 {len(unique_docs)} 个唯一文档")
return state
def evaluate(self, state: RAGState) -> RAGState:
"""评估阶段:判断检索结果是否充分"""
docs_text = "\n---\n".join([d["text"] for d in state.retrieved_docs])
eval_prompt = f"""评估以下检索结果是否足以回答用户查询。
用户查询:{state.original_query}
检索结果:
{docs_text}
返回JSON格式:
{{"confidence": 0.0到1.0的置信度, "sufficient": true/false, "missing_info": "缺失的信息(如有)"}}"""
response = self._call_llm([
{"role": "system", "content": "你是检索质量评估器。"},
{"role": "user", "content": eval_prompt}
])
evaluation = json.loads(response)
state.confidence = evaluation["confidence"]
print(f"📊 评估结果: 置信度={state.confidence}, 充分={evaluation['sufficient']}")
return state
def generate(self, state: RAGState) -> RAGState:
"""生成阶段:基于检索结果生成回答"""
docs_text = "\n\n".join([
f"[来源: {d.get('source', '未知')}]\n{d['text']}"
for d in state.retrieved_docs
])
generate_prompt = f"""基于以下检索到的文档,回答用户的问题。如果文档中没有足够的信息,请明确指出。
用户问题:{state.original_query}
参考文档:
{docs_text}
要求:
1. 直接回答问题,不要多余的话
2. 引用具体来源
3. 如果信息不足,诚实说明"""
state.final_answer = self._call_llm([
{"role": "system", "content": "你是一个专业的知识助手,严格基于提供的文档回答问题。"},
{"role": "user", "content": generate_prompt}
], model="claude-4.7-sonnet")
return state
def run(self, query: str, corpus: List[str]) -> str:
"""运行完整的Agentic RAG流程"""
state = RAGState(original_query=query)
while state.iteration < state.max_iterations:
state.iteration += 1
print(f"\n{'='*50}")
print(f"🔄 迭代 {state.iteration}")
print(f"{'='*50}")
# 1. 规划
state = self.plan(state)
# 2. 检索
state = self.retrieve(state, corpus)
# 3. 评估
state = self.evaluate(state)
# 4. 如果置信度足够高,生成最终答案
if state.confidence >= 0.7:
state = self.generate(state)
print(f"\n✅ 最终答案(置信度: {state.confidence}):")
return state.final_answer
# 5. 否则继续迭代(可能需要调整查询策略)
print(f"⚠️ 置信度不足({state.confidence}), 继续迭代...")
# 达到最大迭代次数,基于已有结果生成
state = self.generate(state)
return state.final_answer
# 使用示例
if __name__ == "__main__":
agentic_rag = AgenticRAG(xidao_api_key="YOUR_XIDAO_API_KEY")
corpus = [
"RAG 2.0在2026年已成为企业级AI应用的标准架构...",
"混合搜索结合了BM25和向量搜索的优势...",
"Graph RAG通过知识图谱增强了多跳推理能力...",
"Agentic RAG使用LLM智能体来动态规划检索策略...",
]
answer = agentic_rag.run(
query="RAG 2.0相比1.0有哪些关键改进?在企业场景中如何选型?",
corpus=corpus
)
print(answer)七、完整RAG 2.0系统集成#
7.1 使用XiDao API的完整RAG管道#
"""
RAG 2.0 完整系统:集成混合搜索 + 重排序 + Graph RAG + Agentic RAG
使用XiDao API作为LLM后端
"""
import os
from dataclasses import dataclass
@dataclass
class RAG2Config:
"""RAG 2.0 系统配置"""
# XiDao API配置
xidao_api_key: str = os.getenv("XIDAO_API_KEY", "")
xidao_api_url: str = "https://api.xidao.online/v1/chat/completions"
# 模型配置
generation_model: str = "claude-4.7-sonnet"
planning_model: str = "gpt-5.5"
embedding_model: str = "BAAI/bge-large-zh-v1.5"
reranker_model: str = "BAAI/bge-reranker-v2.5-gemma2-lightweight"
# 检索配置
dense_top_k: int = 20
sparse_top_k: int = 20
rerank_top_k: int = 5
hybrid_rrf_k: int = 60
# 向量数据库配置
vector_db: str = "milvus" # milvus/weaviate/chroma/pinecone
milvus_host: str = "localhost"
milvus_port: int = 19530
# Agentic RAG配置
max_iterations: int = 5
confidence_threshold: float = 0.7
class RAG2System:
"""RAG 2.0 完整系统"""
def __init__(self, config: RAG2Config):
self.config = config
self.search_engine = HybridSearchEngine()
self.reranker = Reranker(model_name=config.reranker_model)
self.graph_rag = GraphRAG()
self.agent = AgenticRAG(xidao_api_key=config.xidao_api_key)
def ingest_documents(self, documents: List[Dict]) -> None:
"""文档摄入:分块 → 向量化 → 索引 → 图谱构建"""
from langchain.text_splitter import RecursiveCharacterTextSplitter
splitter = RecursiveCharacterTextSplitter(
chunk_size=512,
chunk_overlap=64,
separators=["\n\n", "\n", "。", "!", "?", ".", "!", "?"]
)
all_chunks = []
for doc in documents:
chunks = splitter.split_text(doc["content"])
for i, chunk in enumerate(chunks):
all_chunks.append({
"id": f"{doc['id']}_{i}",
"text": chunk,
"source": doc.get("source", "unknown")
})
# 构建知识图谱
print("🕸️ 构建知识图谱...")
self.graph_rag.build_graph_from_chunks(all_chunks)
print(f"✅ 图谱构建完成: {self.graph_rag.graph.number_of_nodes()} 节点, "
f"{self.graph_rag.graph.number_of_edges()} 条边")
# 向量索引已在search_engine中自动处理
print(f"✅ 文档摄入完成: {len(all_chunks)} 个文本块")
def query(self, question: str, corpus: List[str]) -> str:
"""处理用户查询"""
return self.agent.run(question, corpus)
# 快速启动示例
if __name__ == "__main__":
config = RAG2Config(
xidao_api_key="YOUR_XIDAO_API_KEY",
generation_model="claude-4.7-sonnet",
vector_db="milvus"
)
system = RAG2System(config)
# 摄入文档
documents = [
{
"id": "doc_001",
"content": "RAG 2.0是2026年最先进的检索增强生成架构...",
"source": "技术博客"
}
]
system.ingest_documents(documents)
# 查询
answer = system.query("如何从RAG 1.0迁移到RAG 2.0?")
print(f"\n📝 回答:{answer}")八、性能优化与最佳实践#
8.1 分块策略优化#
# 语义分块:基于句子嵌入相似度的智能分块
class SemanticChunker:
"""基于语义相似度的智能分块器"""
def __init__(self, similarity_threshold: float = 0.75, max_chunk_size: int = 512):
self.threshold = similarity_threshold
self.max_size = max_chunk_size
self.model = SentenceTransformer("BAAI/bge-large-zh-v1.5")
def chunk(self, text: str) -> List[str]:
sentences = self._split_sentences(text)
if not sentences:
return []
embeddings = self.model.encode(sentences)
chunks = []
current_chunk = [sentences[0]]
current_embedding = embeddings[0]
for i in range(1, len(sentences)):
similarity = np.dot(embeddings[i], current_embedding) / (
np.linalg.norm(embeddings[i]) * np.linalg.norm(current_embedding)
)
chunk_text = " ".join(current_chunk)
if similarity >= self.threshold and len(chunk_text) + len(sentences[i]) < self.max_size:
current_chunk.append(sentences[i])
# 更新chunk embedding(加权平均)
current_embedding = (current_embedding * len(current_chunk[:-1]) + embeddings[i]) / len(current_chunk)
else:
chunks.append(chunk_text)
current_chunk = [sentences[i]]
current_embedding = embeddings[i]
if current_chunk:
chunks.append(" ".join(current_chunk))
return chunks
def _split_sentences(self, text: str) -> List[str]:
import re
sentences = re.split(r'(?<=[。!?.!?])\s*', text)
return [s.strip() for s in sentences if s.strip()]8.2 上下文压缩#
class ContextCompressor:
"""上下文压缩:减少冗余,保留关键信息"""
def __init__(self, xidao_api_key: str):
self.api_key = xidao_api_key
def compress(self, query: str, documents: List[Dict], max_tokens: int = 2000) -> str:
"""使用LLM压缩和整合检索结果"""
docs_text = "\n\n".join([f"文档{i+1}: {d['text']}" for i, d in enumerate(documents)])
response = requests.post(
"https://api.xidao.online/v1/chat/completions",
headers={
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
},
json={
"model": "gpt-5.5",
"messages": [
{
"role": "system",
"content": "你是一个信息压缩专家。从文档中提取与查询最相关的信息,用精炼的中文输出。"
},
{
"role": "user",
"content": f"查询:{query}\n\n文档:\n{docs_text}\n\n请压缩并整合与查询相关的关键信息。"
}
],
"temperature": 0.1,
"max_tokens": max_tokens
}
)
return response.json()["choices"][0]["message"]["content"]九、2026年RAG技术趋势展望#
9.1 主流模型支持#
2026年的RAG系统已经能够充分利用最新一代模型的强大能力:
- Claude 4.7 Sonnet:优秀的长上下文理解(支持100万token),适合处理大量检索文档
- GPT-5.5:强大的推理和规划能力,是Agentic RAG的理想选择
- Gemini 2.5 Pro:多模态RAG的最佳选择,支持图文混合检索
- Qwen 3.5:中文场景下的首选模型,性价比极高
9.2 未来发展方向#
- 端到端学习:检索器和生成器联合训练,自动优化整个管道
- 多模态RAG:不仅检索文本,还能检索图像、表格、代码
- 实时RAG:支持实时数据流的增量索引和检索
- 个性化RAG:根据用户历史和偏好定制检索策略
- 可信RAG:增强事实验证和来源追溯能力
十、总结#
RAG 2.0代表了检索增强生成技术的重大飞跃。通过混合搜索提升召回率,通过重排序提升精确度,通过Graph RAG支持复杂推理,通过Agentic RAG实现自适应检索策略,2026年的RAG系统已经能够处理前所未有的复杂查询。
关键要点回顾:
- 混合搜索是基础:结合稠密向量和稀疏BM25,使用RRF融合
- 重排序是关键:Cross-Encoder模型能显著提升最终结果质量
- Graph RAG是突破:知识图谱让RAG具备多跳推理能力
- Agentic RAG是趋势:智能体驱动的自适应检索是未来方向
- 选好向量数据库:根据规模和场景选择Milvus/Weaviate/Chroma/Pinecone
- 善用XiDao API:统一的LLM调用接口简化开发流程
开始构建你的RAG 2.0系统吧!
本文作者:XiDao | 发布日期:2026年5月1日
如果觉得本文对你有帮助,欢迎分享给更多开发者。有任何问题或建议,欢迎在评论区留言讨论。