此项目演示 LangChain 在生产环境中推荐的“ChatMessageHistory + RunnableWithMessageHistory + 外部存储(Redis)”组合,构建一个多用户 AI 客服接口。所有历史记录都按照 user_id 维度缓存在 Redis 中,适合接入 Web/小程序/客服工作台等场景;模型侧默认调用通义千问 qwen-max。
- LangChain LCEL:
ChatPromptTemplate+MessagesPlaceholder+RunnableWithMessageHistory自动注入历史。 - RedisChatMessageHistory:复用社区实现,支持 TTL、水平扩展和数据淘汰策略。
- FastAPI:提供
/chat与/health接口,方便容器化部署或与 API 网关集成。
为什么不用旧版 Memory? 旧 Memory API 不支持异步 + 流式 + 多 Session 隔离,官方仅建议在原型阶段使用,本示例体现了推荐做法。
代码要点
app/main.py中的conversation_chain定义了主链路:Prompt -> ChatTongyi -> StrOutputParser。RunnableWithMessageHistory负责在每次调用前后自动追加消息;history_messages_key="history"要与MessagesPlaceholder一致。get_history(session_id)返回RedisChatMessageHistory,可替换成 Postgres/Mongo 等自定义实现,只要继承BaseChatMessageHistory。ttl让 Redis 自动清理长时间未活跃的对话,适合客服场景。
源码:
import logging
import os
import traceback
from datetime import datetime
from typing import Optional
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from langchain_community.chat_message_histories import (
RedisChatMessageHistory,
)
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_models.tongyi import ChatTongyi
from pydantic import BaseModel
load_dotenv()
app = FastAPI(title="AI Customer Service Demo", version="0.1.0")
logger = logging.getLogger("ai-customer-service")
#构造 Prompt(提示模板)
# - system 消息固定客服人设
# - MessagesPlaceholder 是历史消息的占位符,稍后会由 RunnableWithMessageHistory 自动填充
# - human 消息留给实时输入
SYSTEM_PROMPT = (
"你是一个细致耐心的 AI 客服,负责回答用户关于产品政策、订购与故障排查的问题。"
"遇到需要人工介入的情况时,先整理已知信息,再建议转人工。"
f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}。"
)
prompt = ChatPromptTemplate.from_messages(
[
("system", SYSTEM_PROMPT),
MessagesPlaceholder(variable_name="history"),
("human", "{user_input}"),
]
)
model = ChatTongyi(
model=os.getenv("TONGYI_MODEL", "qwen-max"),
api_key=os.getenv("OPENAI_API_KEY"),
temperature=0.2,
)
# Prompt + Model + OutputParser 合成最基础的对话链条
base_chain = prompt | model | StrOutputParser()
# 长期记忆存储(Redis)
# - 通过 RedisChatMessageHistory 把每个 user_id 的消息写入 Redis 列表
# - ttl 控制消息保留时间(默认 7 天),也可以调成按需清理
# - REDIS_URL 可带密码、数据库 index,适合生产环境部署
redis_url = os.getenv("REDIS_URL", "redis://:123456@localhost:6379/0")
redis_ttl_seconds = int(os.getenv("REDIS_TTL_SECONDS", str(60 * 60 * 24 * 7)))
def get_history(session_id: str) -> RedisChatMessageHistory:
if not session_id:
raise ValueError("session_id must not be empty")
return RedisChatMessageHistory(
session_id=session_id,
url=redis_url,
ttl=redis_ttl_seconds,
)
# RunnableWithMessageHistory = “胶水”:
# - 每次调用前在 prompt 里注入历史;调用后将最新 AI/Human 消息写回 Redis
# - input_messages_key 对应 prompt 中 human 部分的变量名
# - history_messages_key 对应 MessagesPlaceholder 的变量名
conversation_chain = RunnableWithMessageHistory(
base_chain,
get_session_history=get_history,
input_messages_key="user_input",
history_messages_key="history",
)
class ChatRequest(BaseModel):
user_id: str
message: str
locale: Optional[str] = None
class ChatResponse(BaseModel):
user_id: str
reply: str
@app.get("/health")
async def health_check():
return {"status": "ok"}
@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(payload: ChatRequest):
if not payload.user_id.strip():
raise HTTPException(status_code=400, detail="user_id is required")
try:
reply = await conversation_chain.ainvoke(
{"user_input": payload.message},
config={"configurable": {"session_id": payload.user_id}},
)
except Exception as exc: # pragma: no cover - demo level error surfacing
tb = traceback.format_exc()
logger.error("LLM 调用失败: %s\n%s", exc, tb)
raise HTTPException(
status_code=500,
detail={
"error": str(exc),
"hint": "检查 OPENAI_API_KEY 是否为有效的 DashScope Key 或 Redis 连接是否可用。",
"traceback": tb,
},
) from exc
return ChatResponse(user_id=payload.user_id, reply=reply)
结果:
C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"zhangsan\", \"message\": \"我的猫叫咪咪,它是一只三花猫\"}"
{"user_id":"zhangsan","reply":"谢谢您的分享!咪咪听起来是个非常可爱的名字,三花猫的颜色组合通常也非常漂亮。如果您有关于照顾咪咪的问题,比如饮食、健康或是行为习惯等方面的疑问,我很乐意帮助提供一些建议。或者,您有其他想要了解的内容吗?"}
C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"lisi\", \"message\": \"我计划下个月去北京旅游\"}"
{"user_id":"lisi","reply":"那听起来很令人兴奋!北京是一个充满历史和文化的城市,有很多值得游览的地方。如果您需要一些建议来规划您的旅行,比如必去的景点、美食推荐或是住宿建议等,我很乐意为您提供帮助。另外,如果您有关于交通安排、天气情况或任何其他方面的疑问,也请随时告诉我!您现在有什么具体想要了解的信息吗?"}
C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"zhangsan\", \"message\": \"它平时喜欢吃什么?\"}"
{"user_id":"zhangsan","reply":"三花猫和其他猫咪一样,对食物的喜好可能会有所不同,但一般来说,健康的饮食应该包括高质量的猫粮和适量的湿粮。这里有一些关于猫咪饮食的一般性建议:\n\n1. 高质量猫粮:选择一种营养均衡、适合咪咪年龄(幼猫、成猫或老年猫)以及健康状况的猫粮非常重要。市面上有许多品牌提供不同类型的猫粮,您可以根据咪咪的具体需求来挑选。\n\n2. 湿粮:湿粮可以为猫咪提供额外的水分,并且很多猫咪都喜欢它的口感。湿粮也是增加猫咪饮食多样性的好方法之一。不过需要注意的是,不要让湿粮成为猫咪唯一的食物来源,因为干粮有助于清洁牙齿。\n\n3. 新鲜肉类:一些猫咪喜欢偶尔吃点新鲜煮熟的鸡肉、鱼肉等。确保这些肉类是无骨且未经过调味处理的。\n\n4. 零食:适量给予猫咪专用的小零食作为奖励是可以的,但要控制好分量,避免过量导致肥胖或其他健康问题。\n\n5. 水:保持充足的清水供应对于维持猫咪良好的健康状态至关重要。\n\n值得注意的是,每只猫咪的身体状况和个人偏好都不同,在改变其饮食习惯前最好先咨询兽医的意见。如果发现咪咪有任何不寻常的行为或健康问题,请及时联系专业人士进行检查。希望这些建议能帮助到您!"}
C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"lisi\", \"message\": \"它平时喜欢吃什么?\"}"
{"user_id":"lisi","reply":"看起来您可能是在询问某个人或某个特定对象的饮食偏好,但您的问题中没有明确指出是“谁”或“什么”。如果您能提供更多背景信息,比如是指北京的当地人、某种动物还是其他什么,我就能更好地为您提供相关信息。请告诉我更多细节,好吗?"}
重启服务后:
C:\Users\29924> curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"zhangsan\", \"message\": \"它叫什么来着?\"}"
{"user_id":"zhangsan","reply":"您的猫咪叫咪咪,是一只三花猫。如果您还有其他关于咪咪的问题或需要更多帮助,请随时告诉我!"}
这5次API调用清晰地展示了系统的多用户记忆与隔离机制:
- 首次对话建立不同上下文:
zhangsan聊“我的猫叫咪咪”lisi聊“计划去北京旅游”
- 后续相同问题验证隔离效果:
- zhangsan 问“它平时喜欢吃什么?” → AI准确理解“它”指猫“咪咪”,给出详细养猫建议
- lisi 同样问“它平时喜欢吃什么?” → AI困惑,因历史中无“它”所指对象,要求更多信息
- 重启服务后验证历史记录持久化(记忆为长期而非短期):
- zhangsan 问“它叫什么来着?” → 服务重启后的AI依然可以准确回答“它”叫“咪咪”
核心证明:相同的模糊提问,因用户不同,系统从各自独立的Redis历史中提取上下文,给出了完全不同的回答。这证实了:
- 会话隔离:用户历史互不干扰
- 长期记忆:能记住用户之前的对话
- 自动上下文管理:
RunnableWithMessageHistory正确注入专属历史
讲解:
| 组件 | 作用 | 本质 |
|---|---|---|
| 外部存储 | 数据持久化 | Redis / DB / 向量库 |
| ChatMessageHistory | 数据访问层 | CRUD 封装 |
| RunnableWithMessageHistory | 生命周期管理 | 自动注入 + 自动存储 |
ChatMessageHistory
LangChain 已经提供了常用实现:
InMemoryChatMessageHistory(内存)RedisChatMessageHistorySQLChatMessageHistory
一般使用自定义/扩展的ChatMessageHistory继承BaseChatMessageHistory,需要至少实现三个方法:
add_message:在每次对话发生时写入新消息,并在写入过程中可附带裁剪、摘要、向量化或信息抽取等处理。get_messages:在模型调用前读取并组织上下文,通常会组合“历史摘要 + 最近对话 + 检索结果”等内容返回。clear:清空当前会话的所有已存储记忆,用于重置上下文或结束会话。
下面是一个典型的自定义示例:
class EnhancedRedisChatMessageHistory:
def __init__(
self,
session_id: str,
redis_client: Redis,
ttl: Optional[int] = None,
max_messages: int = 20, # 最多保留最近 N 条对话(防超长)
prefix: str = "chat:history" # Redis key 前缀(多业务隔离)
):
# 1. 基础校验(生产必备)
if not session_id:
raise ValueError("session_id 不能为空")
if not isinstance(redis_client, Redis):
raise TypeError("redis_client 必须是合法的 Redis 实例")
# 2. 核心配置
self.session_id = session_id
self.redis_client = redis_client
self.ttl = ttl
self.max_messages = max_messages
self.key = f"{prefix}:{session_id}" # 最终 Redis key
def add_message(self, message: BaseMessage) -> None:
"""添加单条消息(LangChain 标准接口)"""
try:
# 读取现有历史
history = self.messages
# 追加新消息
history.append(message)
# 截断:只保留最近 N 条(防止 token 爆炸)
if self.max_messages > 0:
history = history[-self.max_messages:]
# 序列化存储
self._save_history(history)
logger.debug(f"会话 {self.session_id} 新增消息成功,当前总数:{len(history)}")
except RedisError as e:
logger.error(f"Redis 存储消息失败:{str(e)}", exc_info=True)
raise # 生产可根据需求选择是否吞异常
def _save_history(self, messages: List[BaseMessage]) -> None:
"""内部方法:序列化 + 存储 + 设置过期时间"""
try:
# 转成可序列化格式
dict_messages = messages_to_dict(messages)
# 存入 Redis
self.redis_client.set(
name=self.key,
value=json.dumps(dict_messages, ensure_ascii=False),
ex=self.ttl
)
except Exception as e:
logger.error(f"序列化/存储失败:{str(e)}", exc_info=True)
raise
def clear(self) -> None:
"""清空历史(LangChain 标准接口)"""
try:
self.redis_client.delete(self.key)
logger.info(f"会话 {self.session_id} 历史已清空")
except RedisError as e:
logger.error(f"清空历史失败:{str(e)}", exc_info=True)
raise
@property
def messages(self) -> List[BaseMessage]:
"""读取历史(LangChain 标准接口)"""
try:
# 从 Redis 读取
data = self.redis_client.get(self.key)
if not data:
return []
# 反序列化
dict_messages = json.loads(data)
return messages_from_dict(dict_messages)
except RedisError as e:
logger.error(f"读取历史失败:{str(e)}", exc_info=True)
return [] # 读失败返回空,不阻塞主流程
except json.JSONDecodeError:
logger.error(f"历史数据格式损坏:{self.key}")
self.clear()
return []
def add_user_message(self, message: str) -> None:
"""快捷添加用户消息(兼容 LangChain)"""
from langchain.schema import HumanMessage
self.add_message(HumanMessage(content=message))
def add_ai_message(self, message: str) -> None:
"""快捷添加 AI 消息(兼容 LangChain)"""
from langchain.schema import AIMessage
self.add_message(AIMessage(content=message))
RunnableWithMessageHistory
给“原本无记忆的链(base_chain)”增加多轮对话能力,且不侵入原链代码。
在调用前后,自动帮你“读历史 + 注入 + 写回”
2. 原理(执行流程)
每次请求实际做三步:
1)根据 session_id 获取历史
history = get_history(session_id)
2)把历史插入 Prompt
→ 填充 MessagesPlaceholder("history")
3)模型返回后写回(写回的是整个链的结果)
history.add_user_message(...)
history.add_ai_message(...)
3. 参数为什么必须这样写
(1)get_session_history
def get_history(session_id) -> RedisChatMessageHistory
作用:
把 session_id 映射到对应的 Redis 记录(实现多用户隔离)
(2)input_messages_key
input_messages_key="user_input"
作用:
指明“用户输入在哪”,用于写回历史
(3)history_messages_key
history_messages_key="history"
作用:
指明“历史插入到 Prompt 的哪个位置”