0 入门
0.1 作为javaweb选手,为什么不选择spring AI、langchain4j而是langchain?
作为 JavaWeb 选手,我并不是排斥 Spring AI 或 LangChain4j,而是区分学习阶段和系统架构阶段。
在当前主流企业架构中,大模型能力通常会被独立拆成 LLM 服务,以微服务形式存在,Java 服务通过 HTTP / RPC 调用。这种情况下,Spring AI 把 LLM 嵌进 Spring 容器的一体化优势就不明显了。
我选择先系统学习 LangChain,主要是因为它在 Python 生态里定义了大模型应用的核心抽象和设计范式,比如 Prompt、Chain、Agent、Tool、Memory,这些已经成为事实上的行业标准。理解 LangChain,本质是在理解 LLM 服务的内部架构,而不是某个语言的实现方式。
在真正工程落地时,如果 LLM 能力需要直接集成在 Java 应用中,我会优先考虑 Spring AI 或 LangChain4j;但如果是独立的大模型微服务,我更关注服务边界、接口设计和演进能力,而不是是否使用 Java 框架本身。
0.2 简介
LangChain 是一个「大模型应用编排框架」。
关注:
- 输入怎么组织
- 中间步骤怎么串
- 输出怎么稳定
0.3 核心模块划分
LangChain 核心模块一览
| 模块 | 作用 | 解决的问题 | 典型类 / 接口 | 是否底层核心 |
|---|---|---|---|---|
| LLM / ChatModel | 统一大模型调用接口 | 屏蔽不同模型 API 差异 | ChatOpenAI、AzureChatOpenAI | ✅ |
| PromptTemplate | Prompt 模板化 | Prompt 拼接混乱、难维护 | PromptTemplate、ChatPromptTemplate | ✅ |
| OutputParser | 输出结构化 | 模型输出不可控 | StrOutputParser、StructuredOutputParser | ✅ |
| Chain / Runnable | 流程编排 | 多步骤逻辑难组合 | Runnable、RunnableSequence | ✅(最核心) |
| Memory | 状态管理(对话/上下文) | 模型无状态 | ConversationBufferMemory、ChatMessageHistory | ◯ |
| Tool | 外部能力封装 | LLM 不能直接调用代码 | Tool、StructuredTool | ◯ |
| Agent | 自动决策调用 Tool | 复杂任务需要多步推理 | AgentExecutor、ReActAgent | ◯ |
此外早期版本划分为以下六大模块:
| 模块 | 作用 |
|---|---|
| Models | 大语言模型接口,统一调用不同AI模型 |
| Prompts | 提示词模板和示例管理 |
| Indexes | 文档索引和检索系统 |
| Memory | 对话历史状态管理 |
| Chains | 多步骤流程链式编排 |
| Agents | 智能代理决策执行 |
0.4 安装
pip install langchain langchain-community dashscope pymilvus
0.5 应用
RAG
1 Models
1.1 简介
langchain支持的模型主要有以下三类:
1. 大语言模型
这是最基础的类型,用于生成文本。你给它一段“提示词”,它返回一段“回答文本”。功能单一,主要用于文本补全、续写、问答等。例如,你输入“中国的首都是”,它回答“北京”。
核心特点:输入是文本字符串,输出也是文本字符串。
示例:GPT-3、Llama 3的纯文本补全版本。
2. 聊天模型
这是为“对话”场景专门设计的。它不直接处理原始文本,而是处理结构化的消息,比如:
SystemMessage:设定AI角色和背景。HumanMessage:用户的提问。AIMessage:AI之前的回复历史。
核心特点:输入是消息列表,输出是AIMessage。它天然能理解多轮对话的上下文,是构建聊天机器人、AI助手的首选。
示例:GPT-4、Claude-3的聊天API版本。
3. 文本嵌入模型
这是一种“翻译”模型,将文本(词、句、段落)转换成计算机能更好理解的数字形式——高维向量(一串数字)。语义相近的文本,其向量在空间中的位置也相近。
核心作用:
- 搜索:将问题和文档都转为向量,快速找到最相关的文档。
- 聚类:将相似的文本自动归类。
- 推荐:根据你喜欢的物品的向量,推荐相似的物品。
核心特点:输入文本,输出一个固定长度的数字向量。
示例:text-embedding-ada-002、BGE等专门生成向量的模型。
1.2 大语言模型
模型对象创建
from langchain_community.llms.tongyi import Tongyi
model=Tongyi(model="qwen-max", api_key=os.getenv("OPENAI_API_KEY"))
一次返回完整结果 invoke
print(model.invoke("请写一个关于机器学习的程序"))
流式输出
res = model.stream("rag的标准流程")
for chunk in res:
print(chunk,end="",flush=True)
1.3 聊天模型
消息类型:
- HumanMessage, # 用户输入(user)
- AIMessage, # AI回复(assistant)
- SystemMessage, # 系统指令(system)
- FunctionMessage, # 函数调用结果
- ToolMessage, # 工具调用结果
- ChatMessage # 自定义角色消息
示例:
chat = ChatTongyi(
model="qwen-max",
api_key=os.getenv("OPENAI_API_KEY")
)
messages = [
SystemMessage(content="你是一个只回答一句话的 Java 专家"),
HumanMessage(content="什么是 JVM?"),
AIMessage(content="JVM 是 Java 程序运行的虚拟机,负责字节码执行。"),
HumanMessage(content="那它和 JDK 的关系是什么?")
]
for chunk in chat.stream(messages):
print(chunk.content, end="", flush=True)
print("\n")
消息简写:
messages = [
("system", "你是一个只回答一句话的 Java 专家"),
("human", "什么是 JVM?"),
("ai", "JVM 是 Java 程序运行的虚拟机,负责字节码执行。"),
("human", "那它和 JDK 的关系是什么?")
]
1.4 文本嵌入模型 embedding Models
文本嵌入模型将文本转换为向量表示(embeddings),用于:
- 语义搜索
- 文档相似度计算
- 聚类分析
- RAG(检索增强生成)中的文档检索
创建模型对象
#创建模型对象,不传model默认为 text-embedding-v1
embeddingModel = DashScopeEmbeddings(dashscope_api_key = os.getenv("DASHSCOPE_API_KEY")))
单次转换 embed_query:
query_vector = embeddingModel.embed_query("only god knows")
print(query_vector)
print(len(query_vector))
批量转换 embed_documents:
texts = [
"only god knows",
"machine learning is interesting",
"artificial intelligence is powerful",
"python programming language"
]
query_vector = embeddingModel.embed_documents(texts)
print(query_vector)
print(len(query_vector))
print(len(query_vector[0]))
2 提示词 PromptsTemplate
2.1 通用提示词模板 PromptTemplate
最基础的提示词模板是 PromptTemplate。它本质上就是一个带变量占位符的字符串模板。
示例:
from langchain_core.prompts import PromptTemplate
from langchain_community.llms.tongyi import Tongyi
import os
from dotenv import load_dotenv
load_dotenv()
model = Tongyi(
model="qwen-max",
api_key=os.getenv("OPENAI_API_KEY")
)
prompt = PromptTemplate.from_template(
"你是一个资深的 {role}。请用通俗易懂的语言解释:{question}"
)
result = prompt.format(role="Java 专家", question="什么是 JVM?")
print(result)
print(model.invoke(result))
链式:
chain = prompt | model
print(chain.invoke({
"role": "Java 后端工程师",
"question": "什么是线程池?"
}))
这里的 | 是 LCEL(LangChain Expression Language)的管道操作符,本质上是把“提示词生成器”输出交给“模型”。
2.2 FewShotPromptTemplate
如果说 PromptTemplate 是“告诉模型要做什么”,那么 Few-Shot Prompt 是“给模型看几个例子,让它照着学”。
Few-shot 的核心思想很简单:
与其告诉模型规则,不如给它样本。
FewShotPromptTemplate类对象构建需要5个核心参数:
- example_prompt:示例数据的提示词模板
- examples:示例数据,list,内套字典
- prefix:组装提示词,示例数据前内容
- suffix:组装提示词,示例数据后内容
- input_variables:列表,注入的变量列表
示例:
from langchain_core.prompts import PromptTemplate,FewShotPromptTemplate
from langchain_community.llms.tongyi import Tongyi
import os
from dotenv import load_dotenv
load_dotenv()
model = Tongyi(
model="qwen-max",
api_key=os.getenv("OPENAI_API_KEY")
)
# 示例样本
examples = [
{"input": "1+1", "output": "3"},
{"input": "2+2", "output": "5"},
{"input": "3+3", "output": "7"},
]
# 示例格式模板
example_prompt = PromptTemplate(
input_variables=["input", "output"],
template="问题: {input}\n答案: {output}"
)
# FewShot 模板
few_shot_prompt = FewShotPromptTemplate(
examples=examples,
example_prompt=example_prompt,
prefix="请根据示例计算结果:",
suffix="问题: {input}\n答案:",
input_variables=["input"]
)
#获得最终提示词
prompt = few_shot_prompt.format(input="4+4")
print(prompt)
print(model.invoke(prompt))
2.3 ChatPromptTemplate
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage, AIMessage
import os
from dotenv import load_dotenv
load_dotenv()
model = ChatTongyi(
model="qwen-max",
api_key=os.getenv("OPENAI_API_KEY")
)
# 构建 ChatPromptTemplate
chat_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个资深的 Java 架构师,用通俗语言回答问题。"),
MessagesPlaceholder(variable_name="history"),
("human", "{question}")
])
# 构造历史消息
history = [
HumanMessage(content="什么是 JVM?"),
AIMessage(content="JVM 是 Java 虚拟机,用来运行 Java 字节码。"),
HumanMessage(content="那它和 JDK 有什么区别?"),
AIMessage(content="JDK 是开发工具包,包含 JVM。")
]
# 链式调用
chain = chat_prompt | model
result = chain.invoke({
"history": history,
"question": "什么是类加载机制?"
})
print(result)
3 Chain / Runnable
LangChain 管道:
Runnable A → Python 对象 → Runnable B
将组件串联,上一个组件的输出作为下一个组件的输入。
链中对象/变量/组件都是RunnableSequence(RunnableSerializable子类)
简单示例:
from langchain.schema.runnable import RunnableLambda
# 1. Runnable A: 转为大写
runnable_a = RunnableLambda(lambda x: x.upper())
# 2. Python对象: 自定义处理器
class Reverser:
def __call__(self, text):
return text[::-1]
# 3. Runnable B: 添加前缀
runnable_b = RunnableLambda(lambda x: f"结果: {x}")
# 4. 构建链
chain = runnable_a | Reverser() | runnable_b
# 5. 执行
result = chain.invoke("hello")
print(result) # 输出: 结果: OLLEH
4 OutputParser
4.1 StrOutputParser
例:
chain = prompt | model | model 会报错
prompt的结果是PromptValue类型,输入给了model
model的输出结果是:AIMessage
StrOutputParser是LangChain内置的简单字符串解析器
可以将AIMessage解析为简单的字符串,符合了模型invoke方法要求(可传入字符串,不接收AIMessage类型)
是Runnable接口的子类(可以加入链)
修正例:
parser = StrOutputParser()
chain = prompt | model | parser | model
4.2 JsonOutputParser
一、核心定义
JsonOutputParser 是 LangChain 内置的JSON 格式输出解析器,专门用于将大模型输出的 AIMessage 消息体,解析为标准的 Python 字典(dict)对象,是实现「大模型输出结构化数据」的核心组件。
二、核心作用
- 格式标准化:强制大模型输出合法的 JSON 字符串,并自动解析为 Python 可直接使用的字典,避免输出乱码、格式错误;
- 链衔接必备:解决「模型输出无法直接被下游链组件使用」的问题 —— 下游 Prompt 模板只能接收字典 / 字符串,无法接收 AIMessage 或非结构化文本;
- 数据结构化:将大模型的自然语言输出,转化为程序可读取、可调用的结构化数据(键值对格式)。
三、核心特性
- 自动校验:会验证模型返回的内容是否为合法 JSON,非法格式会抛出异常;
- Runnable 兼容:可直接用
|符号加入 LangChain 管道,无缝串联组件; - 零额外配置:无需定义复杂的 Pydantic 模型,开箱即用(适合简单结构化场景)。
四、适用场景
- 从文本中提取指定字段(书名、作者、信息抽取等);
- 要求模型返回固定键值对的结构化结果;
- 下游链需要直接使用模型输出的字段值(如示例中用
title/author填充第二个 Prompt)。
示例:
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage, AIMessage
import os
from dotenv import load_dotenv
load_dotenv()
model = ChatTongyi(
model="qwen-max",
api_key=os.getenv("OPENAI_API_KEY")
)
first_prompt = ChatPromptTemplate.from_template("""
请分析以下书籍描述,提取关键信息并以JSON格式返回。没有目标信息你来补充。
书籍描述: {book_description}
请提取以下字段:
- title: 书名
- author: 作者
- genre: 体裁/类型
- main_theme: 核心主题
- target_audience: 目标读者群体
返回格式必须是合法的JSON对象。
""")
second_prompt = ChatPromptTemplate.from_template("""
基于以下书籍信息,为指定的读者群体撰写一段吸引人的推荐文案。
书名: {title}
作者: {author}
体裁: {genre}
核心主题: {main_theme}
目标读者: {target_audience}
文案要求: 突出书籍的核心价值,语言生动有感染力,长度在100-150字左右。
""")
json_parser = JsonOutputParser()
chain = first_prompt|model|json_parser|second_prompt|model
result = chain.invoke({"book_description": "《哈利·波特与魔法石》是一本 Adventure 类型的书,目标读者群体是 7-12 岁的儿童。"})
print(result.content)
5 RunnableLambda
一、核心定义
RunnableLambda 是 LangChain 中将任意 Python 函数 / 可调用对象,包装为标准 Runnable 组件的工具,是连接「自定义 Python 逻辑」与「LangChain 管道链」的桥梁。
它实现了完整的 Runnable 接口,让普通函数可以直接用 | 符号串联进链,成为管道中的一环。
二、核心作用
- 自定义逻辑接入链:把你自己写的 Python 函数、处理逻辑,变成 LangChain 链的一部分;
- 数据格式转换:在上游输出和下游输入不匹配时,做格式转换、字段提取、数据清洗;
- 轻量中间处理:无需写类、无需继承,一行代码包装函数,快速插入管道中间;
- 兼容所有 Runnable 特性:支持
invoke、batch、stream,可和模型、Prompt、Parser 自由串联。
三、核心特性
- 极简包装:接收任意函数(lambda、普通函数、类的__call__方法),包装后就是标准 Runnable;
- 无侵入式:不需要修改原函数,直接包装即可使用;
- 双向数据传递:上游输出 → 传入你的函数 → 处理结果 → 传给下游;
- 同步 / 异步通用:既支持同步函数,也可包装异步函数,链会自动处理。
四、基础语法
1. 包装 lambda 匿名函数(最常用)
from langchain.schema.runnable import RunnableLambda
# 包装一个简单函数
func = RunnableLambda(lambda x: x.upper()) # 转大写
2. 包装普通命名函数
def reverse_text(text):
return text[::-1]
# 包装普通函数
reverse_runnable = RunnableLambda(reverse_text)
示例:
以上面4.2 JsonOutputParser的增强为例,我们想检查json_parser的输出结果:
······
def check_json_parser(json_data):
print("json_parser OUTPUT:")
print(json_data,"\n")
return json_data
check_json_parser = RunnableLambda(check_json_parser)
chain = first_prompt|model|json_parser|check_json_parser|second_prompt|model
result = chain.invoke({"book_description": "《哈利·波特与魔法石》是一本 Adventure 类型的书,目标读者群体是 7-12 岁的儿童。"})
print(result.content)
得到结果:
json_parser OUTPUT:
{‘title’: ‘哈利·波特与魔法石’, ‘author’: ‘J.K. Rowling’, ‘genre’: ‘Adventure’, ‘main_theme’: ‘勇气、友谊以及对抗邪恶的重要性’, ‘target_audience’: ‘7-12岁的儿童’}
欢迎来到一个充满奇迹与魔法的世界!《哈利·波特与魔法石》不仅仅是一本书,它是一次心灵之旅。跟随勇敢的哈利和他的朋友们一起,在霍格沃茨魔法学校探索未知的秘密,面对挑战时展现出无畏勇气;学习到真挚友谊的力量可以战胜一切困难。这本书教会我们每个人心中都藏着对抗邪恶、保护所爱之人所需的魔力。准备好开启这段不可思议的旅程了吗?让我们一起见证勇气如何点亮黑暗,友谊怎样温暖人心吧!
另外,Runnable.or方法确实支持直接传入 Callable(可调用对象,如函数),而不需要显式地用 RunnableLambda包装。所以其实这里可以直接将自定义函数入链而不用RunnableLambda包装。
def check_json_parser(json_data):
print("json_parser OUTPUT:")
print(json_data,"\n")
return json_data
#check_json_parser = RunnableLambda(check_json_parser)
chain = first_prompt|model|json_parser|check_json_parser|second_prompt|model
还可以:
def check_json_parser(json_data):
print("json_parser OUTPUT:")
print(json_data,"\n")
return json_data
#check_json_parser = RunnableLambda(check_json_parser)
def myOutPutParser(msg):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
new_content = f"TEST [{timestamp}]: " + msg.content
return AIMessage(content=new_content) # 返回AI消息对象
chain = first_prompt|model|json_parser|check_json_parser|second_prompt|model|myOutPutParser
result = chain.invoke({"book_description": "《哈利·波特与魔法石》是一本 Adventure 类型的书,目标读者群体是 7-12 岁的儿童。"})
print(result.content)
得到:
json_parser OUTPUT:
{‘title’: ‘哈利·波特与魔法石’, ‘author’: ‘J.K. 罗琳’, ‘genre’: ‘Adventure’, ‘main_theme’: ‘勇气, 友谊, 对抗邪恶’, ‘target_audience’: ‘7-12岁的儿童’}
TEST [2026-03-19 22:54:35]: 【奇妙旅程,从这里开始】《哈利·波特与魔法石》——一个关于勇气、友谊和对抗邪恶的故事,正等待着每一位勇敢的小探险家!跟随哈利的脚步,你将踏上一段充满奇迹的旅途。在这里,你不仅能够体验到惊心动魄的冒险,还能学到真正的朋友之间应该如何相互支持。J.K.罗琳用她那神奇的笔触,为7-12岁的小朋友们编织了一个既梦幻又真实的魔法世界。准备好了吗?让我们一起开启这场非凡之旅吧!
6 Memory
Memory(记忆系统)是实现「有状态交互」的核心组件——它负责存储和管理链/智能体与人类交互过程中产生的信息,让大模型摆脱“一次性交互”的局限,拥有上下文感知能力。
| 分类 | 存储位置 | 程序重启后 | 核心用途 | 核心优势 | 核心劣势 |
|---|---|---|---|---|---|
| 临时记忆 | 内存(RAM) | 记忆丢失 | 当前对话上下文延续 | 速度快、无额外依赖、开箱即用 | 无法长期留存、长对话易超限 |
| 长期记忆 | 文件/数据库/云端 | 记忆保留 | 用户信息、历史记录永久存储 | 永久留存、跨会话复用 | 配置稍复杂、依赖外部存储 |
6.1 临时记忆存储:
不常用,生产环境使用有爆内存的风险。
基于RunnableWithMessageHistory在原有链的基础上创建带有历史记录功能的新链(新Runnable实例)
基于InMemoryChatMessageHistory为历史记录提供内存存储(临时用)
6.2 长期记忆存储:
在生产环境中,长期记忆通常依赖外部存储系统(如数据库、向量库或对象存储)来实现跨会话的数据持久化。
核心思路
长期记忆的本质是:将对话或关键信息结构化存储,并在后续交互中按需检索并注入上下文。
一般流程如下:
- 信息抽取(Memory Extraction)
- 从用户对话中提取有价值的信息(如用户偏好、身份信息、历史行为等)
- 持久化存储(Persistence)
- 写入数据库 / 向量库 / 文件系统
- 检索(Retrieval)
- 新对话时,根据用户ID或语义相似度检索相关记忆
- 上下文注入(Context Injection)
- 将记忆拼接到 Prompt 或作为额外输入提供给模型
在最新的 LangChain 实践中,官方更推荐使用 ChatMessageHistory 配合数据库驱动,而不是老旧的 Memory 类。
ChatMessageHistory + RunnableWithMessageHistory + 外部存储
这就是目前 LangChain 生产环境中最标准的三剑客。
简单来说,这套组合解决了两个痛点:数据存哪(ChatMessageHistory)、怎么自动存取(RunnableWithMessageHistory)以及如何跨进程复用(外部存储)。
三剑客的职责分工
| 组件 | 角色 | 真实类比 |
| 外部存储 (Database) | 仓库 | 实际存放数据的物理地点(如 Redis, PostgreSQL, MongoDB)。 |
| ChatMessageHistory | 搬运工 | 负责与数据库对接的具体“驱动”。它知道怎么 add_message 和 get_messages。 |
| RunnableWithMessageHistory | 自动化管家 | 核心控制器。它拦截 LLM 的输入输出,自动调用搬运工去仓库查历史、存新话。 |
生产实践通常还需叠加:
- 历史裁剪/摘要(避免无限膨胀;可在 get_session_history 或链前后自定义 trimming/summarization);
- 领域知识回忆(向量库/检索器)或状态存储(自定义 Runnable/LangGraph 节点);
- 可靠的持久化实现(Redis、DynamoDB、Supabase、Upstash、Chroma 等)。这些都可以在 BaseChatMessageHistory 的不同实现或自定义类中扩展。
示例
小实验见:
LangChain 长期memory三剑客实战:Redis + ChatMessageHistory 打造多用户 AI 客服 – CMD137’s Blog
7 Document loaders: 文档加载器
文档加载器是 LangChain 生态中负责“数据接入”的核心组件。它们从形形色色的数据源(本地文件、网络、应用、数据库等)中提取信息,并标准化为 Document对象,为后续的文本切分、向量化、检索与问答链提供原料。
7.1 核心概念:Document对象
一个 Document对象通常包含两个核心属性:
page_content (str): 文档的文本内容主体。metadata (dict): 描述此文档的元数据,如来源路径、作者、创建日期等。格式和内容因加载器而异。
# 一个 Document 对象的示例结构
document = Document(
page_content="这是文档的主要内容...",
metadata={
"source": "/path/to/file.pdf",
"page": 1,
"author": "张三"
}
)
7.2 核心接口:BaseLoader
所有加载器都继承自 BaseLoader基类,其核心方法是:
load() -> List[Document]: 同步加载,返回文档列表。lazy_load() -> Iterator[Document]: 惰性加载(生成器),适用于大文件,避免一次性内存过载。
7.3 加载器分类与应用场景
7.3.1 通用文件加载器
处理本地或远程的常见文件格式。
| 加载器类 | 描述 | 典型用途 | 依赖包 |
|---|---|---|---|
TextLoader | 加载纯文本文件(.txt, .md, .html等) | 配置文件、日志、简单网页 | 无 |
CSVLoader | 加载 CSV 文件 | 结构化数据表格 | 无 |
JSONLoader | 加载 JSON 文件,支持通过 jq模式提取文本 | API 响应、配置数据 | jq(可选) |
UnstructuredFileLoader | 功能强大的通用加载器,支持 PDF, Word, PPT, Excel, 图片(OCR)等数十种格式 | 企业多格式文档处理 | unstructured, 及其子包(如 unstructured[pdf]) |
PyPDFLoader | 专门加载 PDF 文件 | 学术论文、电子书 | pymupdf或 pypdf |
Docx2txtLoader | 加载 Microsoft Word 文档 | 报告、合同 | docx2txt |
BSHTMLLoader | 用 BeautifulSoup 解析 HTML,可提取主体文本 | 网页抓取 | beautifulsoup4 |
7.3.2 示例:
PyPDFLoader 示例
# 导入PDF加载器
from langchain_community.document_loaders import PyPDFLoader
# 加载单个PDF文件
loader = PyPDFLoader("path/to/your/document.pdf")
documents = loader.load()
# 查看加载结果
print(f"加载了 {len(documents)} 个页面")
for i, doc in enumerate(documents[:3]): # 查看前3页
print(f"\n--- 第 {i+1} 页 ---")
print(f"元数据: {doc.metadata}")
print(f"内容预览: {doc.page_content[:200]}...")
TextLoader 配合 RecursiveCharacterTextSplitter 示例
from langchain_community.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 加载文本文件
loader = TextLoader("path/to/your/text.txt", encoding="utf-8")
documents = loader.load()
print(f"原始文档字符数: {len(documents[0].page_content)}")
print(f"原始文档预览: {documents[0].page_content[:200]}...")
# 创建文本分割器
text_splitter = RecursiveCharacterTextSplitter(
# 分割参数
chunk_size=1000, # 目标块大小(字符数)
chunk_overlap=200, # 块之间的重叠字符数(保持上下文连贯)
# 分割符优先级(按此顺序尝试分割)
separators=[
"\n\n", # 1. 双换行(段落)
"\n", # 2. 单换行
"。", # 3. 中文句号
"?", # 中文问号
"!", # 中文感叹号
";", # 中文分号
",", # 中文逗号
" ", # 空格
"", # 最后手段:按字符分割
],
# 是否保持分割符
keep_separator=True,
# 长度计算函数(默认len,适用于中文)
length_function=len,
# 是否添加原始文档的元数据到每个块
add_start_index=True,
)
# 分割文本
splits = text_splitter.split_documents(documents)
print(f"原始文档数: {len(documents)}")
print(f"分割后块数: {len(splits)}")
print(f"平均每块字符数: {sum(len(s.page_content) for s in splits) / len(splits):.0f}")
# 查看分割后的前3个块
for i, chunk in enumerate(splits[:3]):
print(f"\n=== 块 {i+1} ===")
print(f"字符数: {len(chunk.page_content)}")
print(f"元数据: {chunk.metadata}")
print(f"内容预览:\n{chunk.page_content[:300]}...\n{'-'*50}")
8 Vector stores 向量存储
Vector stores(向量存储)是 RAG(Retrieval-Augmented Generation,检索增强生成)系统中的核心基础设施——它负责存储文本、图片等内容经过 Embedding 模型转换后的向量表示,并在查询时基于相似度快速检索最相关的内容,让大模型能够“先查资料,再回答问题”。
如果说前面的 Document loaders 解决的是“知识从哪里来”,那么 Vector stores 解决的就是“知识存到哪里,以及之后怎么高效找回来”。
| 分类 | 核心特征 | 典型代表 | 适用场景 | 核心优势 | 核心劣势 |
|---|---|---|---|---|---|
| 内存型 / 本地型 | 轻量、易上手、适合实验 | Chroma、FAISS | 本地实验、小型项目 | 部署简单、上手快 | 持久化与扩展能力有限 |
| 数据库型 / 服务型 | 面向生产、支持大规模检索 | Milvus、Weaviate、Pinecone | RAG 系统、知识库、生产环境 | 扩展性强、检索性能好 | 配置更复杂、理解成本更高 |
在 LangChain 中,向量存储的作用通常分为两步:
- 写入(Ingestion)
将文档切分为 chunk,调用 Embedding 模型生成向量,并写入向量数据库。 - 检索(Retrieval)
将用户问题同样转成向量,在向量数据库中进行相似度搜索,找出最相关的若干 chunk,再交给大模型组织回答。
示例
实验项目见:
LangChain + Milvus RAG :PaperTutor论文助教 – CMD137’s Blog
9 并行链、Passthrough 与检索器retriever
from flask.cli import load_dotenv
from langchain_community.chat_models import ChatTongyi
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
load_dotenv()
model = ChatTongyi(model="qwen3-max")
prompt = ChatPromptTemplate.from_messages(
[
("system", "以我提供的已知参考资料为主,简洁和专业的回答用户问题。参考资料: {context}。"),
("user", "用户提问:{input}")
]
)
vector_store = InMemoryVectorStore(embedding=DashScopeEmbeddings(model="text-embedding-v4"))
#模拟向量库资料
texts=[
"RabbitMQ是基于 Erlang 开发、遵循 AMQP 协议的老牌消息队列,功能完备、成熟稳定,支持丰富的交换机路由策略、死信队列、延迟队列等特性,易用性与社区生态极佳,适合中小规模业务系统、微服务异步解耦与通知场景,缺点是高并发吞吐量相对一般",
"Kafka是由 Scala/Java 开发的分布式高吞吐消息队列,以分区、顺序写、批量处理为核心,追求极高吞吐量与低延迟,天然适配大数据、日志收集、流数据场景,消息可持久化重复消费,但功能较精简,不适合复杂业务路由。",
"RocketMQ是阿里开源的 Java 分布式消息队列,兼顾高吞吐与高可靠,支持严格顺序消息、事务消息、定时/延迟消息等企业级功能,稳定性优秀,适合电商、金融等高可靠核心交易场景,国内生态完善。",
"ActiveMQ是Apache旗下老牌Java消息队列,支持JMS、AMQP等多种协议,功能全面、轻量易集成,适合传统企业应用、小型系统及跨语言通信,架构简单但高并发与分布式能力较弱,大规模场景性能有限。",
"Pulsar是Apache下一代云原生分布式消息队列,采用存储计算分离架构,兼具Kafka高吞吐与RabbitMQ灵活特性,支持多租户、分层存储、强一致性,适合云平台、微服务、流处理一体化大规模场景,部署与运维相对复杂。"
]
vector_store.add_texts(texts)
# 问题
question="什么消息队列适用于运维成本低的同时吞吐高?给出两个可选项并进行介绍"
#从向量库构建检索器
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
def format_context(docs: list[Document]):
if not docs:
return "无参考资料"
return "\n".join([f"{i+1}. {doc.page_content}" for i, doc in enumerate(docs)])
#构建链
chain = {"input": RunnablePassthrough(), "context": retriever|format_context} | prompt | model | StrOutputParser()
res = chain.invoke(question)
print( res)
#查看检索详情
print("检索详情:")
results = vector_store.similarity_search_with_score(question, k=3)
for i, (doc, score) in enumerate(results):
print(f"--- 文档 {i+1} ---")
print("score:", score)
print("内容:", doc.page_content)
print()
得到:
在兼顾**运维成本低**与**高吞吐量**的需求下,以下两个消息队列是较为合适的选项:
1. **Kafka**
- **高吞吐优势**:Kafka 专为高吞吐、低延迟设计,采用分区机制、顺序写磁盘和批量处理技术,单机每秒可处理数十万至百万级消息,非常适合日志收集、监控数据、流处理等大数据场景。
- **运维成本低**:架构简洁,依赖少(仅需 ZooKeeper 或 KRaft 模式),社区成熟,自动化运维工具丰富,部署和扩缩容相对简单。
2. **RocketMQ**
- **高吞吐与高可靠兼顾**:由阿里开源,支持高并发、严格顺序消息、事务消息及定时/延迟消息,吞吐量可达十万级 TPS,适用于电商、金融等对可靠性要求高的核心业务。
- **运维友好**:基于 Java 开发,国内文档和社区支持完善,提供可视化控制台(如 RocketMQ Console),部署和监控体系成熟,适合希望降低学习与维护门槛的团队。
综上,若侧重极致吞吐与简单模型,选 **Kafka**;若需在高吞吐基础上支持更多企业级特性且偏好中文生态,选 **RocketMQ**。
检索详情:
--- 文档 1 ---
score: 0.7423904004396087
内容: RabbitMQ是基于 Erlang 开发、遵循 AMQP 协议的老牌消息队列,功能完备、成熟稳定,支持丰富的交换机路由策略、死信队列、延迟队列等特性,易用性与社区生态极佳,适合中小规模业务系统、微服务异步解耦与通知场景,缺点是高并发吞吐量相对一般
--- 文档 2 ---
score: 0.7311247907320779
内容: Kafka是由 Scala/Java 开发的分布式高吞吐消息队列,以分区、顺序写、批量处理为核心,追求极高吞吐量与低延迟,天然适配大数据、日志收集、流数据场景,消息可持久化重复消费,但功能较精简,不适合复杂业务路由。
--- 文档 3 ---
score: 0.7234437319359901
内容: RocketMQ是阿里开源的 Java 分布式消息队列,兼顾高吞吐与高可靠,支持严格顺序消息、事务消息、定时/延迟消息等企业级功能,稳定性优秀,适合电商、金融等高可靠核心交易场景,国内生态完善。
上面在检索时检索到了3个(k=3),按相似性排序为RabbitMQ、Kafka、RocketMQ,最后回答了Kafka与RocketMQ。
若k=2:
在兼顾**运维成本低**和**高吞吐量**的需求下,以下两个消息队列是较优选择:
1. **Kafka**
- 由 Scala/Java 开发,专为高吞吐、低延迟设计,采用分区、顺序写磁盘和批量处理机制,单机可支持百万级消息/秒。
- 架构简洁,天然支持水平扩展与持久化存储,适合日志收集、流处理等大数据场景。
- 虽需依赖 ZooKeeper(新版本已逐步去依赖),但社区成熟、部署工具丰富,长期运行稳定,**单位吞吐的运维成本较低**。
- 缺点是不支持复杂路由、死信队列等高级功能,适用于“简单可靠、重吞吐”的场景。
2. **RabbitMQ**
- 基于 Erlang 开发,遵循 AMQP 协议,功能丰富(如多种交换机类型、死信队列、延迟队列),开箱即用,管理界面友好。
- 社区活跃、文档完善,**上手和日常运维门槛低**,适合中小规模业务系统及微服务解耦。
- 吞吐量虽不及 Kafka(通常万级消息/秒),但在多数企业级应用中已足够,且通过集群也能满足中等高并发需求。
**总结**:若**极致吞吐优先**,选 **Kafka**;若**易用性与适度吞吐兼顾**,选 **RabbitMQ**。两者在各自适用场景下均可实现较低的综合运维成本。
若k=1:
得到:
若需兼顾**运维成本低**与**高吞吐量**,可考虑以下两个消息队列:
1. **Apache Kafka**
- **高吞吐**:Kafka 专为高吞吐、持久化日志流设计,支持每秒百万级消息处理,适用于大数据、日志聚合、实时分析等场景。
- **运维成本较低**:架构简洁(基于分区和副本机制),依赖 ZooKeeper(新版本已逐步去依赖),社区成熟,云服务支持广泛(如 Confluent Cloud、AWS MSK),简化部署与运维。
2. **Pulsar**
- **高吞吐与低延迟**:采用分层架构(Broker 无状态 + BookKeeper 存储),支持多租户、跨地域复制,吞吐量媲美 Kafka,同时具备更低的发布-消费延迟。
- **运维友好**:计算与存储分离,弹性伸缩能力强;原生支持 Kubernetes 部署,适合云原生环境,长期运维复杂度低于 Kafka。
> 注:相比之下,RabbitMQ 虽然易用、生态好、运维简单,但其吞吐量一般(通常万级/秒),不满足“高吞吐”核心需求。
检索详情:
--- 文档 1 ---
score: 0.7423904004396087
内容: RabbitMQ是基于 Erlang 开发、遵循 AMQP 协议的老牌消息队列,功能完备、成熟稳定,支持丰富的交换机路由策略、死信队列、延迟队列等特性,易用性与社区生态极佳,适合中小规模业务系统、微服务异步解耦与通知场景,缺点是高并发吞吐量相对一般
发现虽然只检索到了RabbitMQ,但大模型并没有选择回答RabbitMQ,并且补充了:“相比之下,RabbitMQ 虽然易用、生态好、运维简单,但其吞吐量一般(通常万级/秒),不满足“高吞吐”核心需求。”
检索 ≠ 答案
Retriever 只是提供参考资料,最终选什么由大模型决定。
相似度高 ≠ 最符合问题
RabbitMQ 排第一是语义接近,但不满足“高吞吐”,所以被模型舍弃。
k 越小,越容易“乱答”
k=1 时信息不足,模型会用自身知识补(如引入 Pulsar)。
当前链不是严格基于检索
prompt 只说“参考为主”,模型可以脱离 context,自由发挥。
9.1 retriever(检索器)—— RAG 的「资料查找员」
1. 什么是 retriever?
retriever 直译是检索器,它是 RAG 系统的核心组件,作用是:
根据用户的问题,从向量数据库中匹配出最相关的文档 / 资料,相当于给大模型配备的「专属资料员」。
1. 基本用法
(1)创建 Retriever
retriever = vector_store.as_retriever(search_kwargs={"k": 3})
(2)执行检索
docs = retriever.invoke(question)
返回:
List[Document]
- 输入:query
- 输出:最相关的
Document列表 - 本质:向量相似度检索器
2. 核心参数(最常用)
(1)k
{"k": 3}
→ 返回 Top-K 结果(最重要参数)
(2)score_threshold
{"score_threshold": 0.8}
→ 过滤低相关内容
(3)filter
{"filter": {"source": "kafka"}}
→ 基于 metadata 过滤
3. search_type(关键)
as_retriever(search_type="xxx")
常见:
similarity(默认):纯相似度mmr(推荐):去重 + 多样性similarity_score_threshold:只保留高质量结果
4. MMR 常用参数
{
"k": 3,
"fetch_k": 10,
"lambda_mult": 0.5
}
fetch_k:候选池大小lambda_mult:相似性 vs 多样性权衡
5. 工作流程
query → embedding → 向量检索 → topK 文档
6. 常见优化
推荐配置:
retriever = vector_store.as_retriever(
search_type="mmr",
search_kwargs={"k": 3, "fetch_k": 10}
)
7. 扩展类型(了解即可)
- MultiQueryRetriever(多query)
- SelfQueryRetriever(自动解析过滤条件)
- Hybrid Retriever(向量+关键词)
9.2 RunnablePassthrough —— 数据「透传神器」
1. 什么是 RunnablePassthrough?
RunnablePassthrough 是 LangChain 中最简单的可运行组件,作用只有一个:
原封不动地把输入数据传递给输出,不做任何修改。
可以把它理解为数据传送带,专门用来传递不需要处理的原始数据。
2. 为什么要用它?
在我们的 RAG 链中,大模型需要两个输入:
- 用户原始问题(input)
- 检索后的参考资料(context)
参考资料需要经过 retriever 检索 + 格式化,而用户问题不需要任何处理,直接透传即可。
3. 代码中的用法
from langchain_core.runnables import RunnablePassthrough
# input 直接使用用户的原始问题
{"input": RunnablePassthrough(), "context": retriever|format_context}
这里 RunnablePassthrough() 会把调用 chain.invoke(question) 传入的 question,原封不动赋值给 input 变量。
9.3:{} 字典并行 —— 多任务「并行执行器」
1. {} 在 LangChain 中的作用
在 LangChain 的链(Chain)语法中,字典 {key: value} 不是普通字典,而是「并行执行器」!
它会同时运行字典中的所有 value,等所有任务执行完毕后,再把结果组装成字典传入下一个组件。
2. 代码中的并行逻辑
# 并行执行两个任务
{
# 任务1:透传用户问题(快速完成)
"input": RunnablePassthrough(),
# 任务2:检索资料 + 格式化(耗时操作)
"context": retriever|format_context
}
3. 执行流程(重点!)
- 链启动后,同时开启两个任务
- 任务 1:瞬间完成,把用户问题存入
input - 任务 2:执行向量检索 → 格式化文档,存入
context - 两个任务都完成后,统一把
{"input": "...", "context": "..."}传入提示词模板