LangChain 长期memory三剑客实战:Redis + ChatMessageHistory 打造多用户 AI 客服

此项目演示 LangChain 在生产环境中推荐的“ChatMessageHistory + RunnableWithMessageHistory + 外部存储(Redis)”组合,构建一个多用户 AI 客服接口。所有历史记录都按照 user_id 维度缓存在 Redis 中,适合接入 Web/小程序/客服工作台等场景;模型侧默认调用通义千问 qwen-max

  • LangChain LCELChatPromptTemplate + MessagesPlaceholder + RunnableWithMessageHistory 自动注入历史。
  • RedisChatMessageHistory:复用社区实现,支持 TTL、水平扩展和数据淘汰策略。
  • FastAPI:提供 /chat 与 /health 接口,方便容器化部署或与 API 网关集成。

为什么不用旧版 Memory? 旧 Memory API 不支持异步 + 流式 + 多 Session 隔离,官方仅建议在原型阶段使用,本示例体现了推荐做法。

代码要点

  • app/main.py 中的 conversation_chain 定义了主链路:Prompt -> ChatTongyi -> StrOutputParser
  • RunnableWithMessageHistory 负责在每次调用前后自动追加消息;history_messages_key="history" 要与 MessagesPlaceholder 一致。
  • get_history(session_id) 返回 RedisChatMessageHistory,可替换成 Postgres/Mongo 等自定义实现,只要继承 BaseChatMessageHistory
  • ttl 让 Redis 自动清理长时间未活跃的对话,适合客服场景。

源码:

import logging
import os
import traceback
from datetime import datetime
from typing import Optional

from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException
from langchain_community.chat_message_histories import (
    RedisChatMessageHistory,
)
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.chat_models.tongyi import ChatTongyi
from pydantic import BaseModel

load_dotenv()

app = FastAPI(title="AI Customer Service Demo", version="0.1.0")
logger = logging.getLogger("ai-customer-service")

#构造 Prompt(提示模板)
#   - system 消息固定客服人设
#   - MessagesPlaceholder 是历史消息的占位符,稍后会由 RunnableWithMessageHistory 自动填充
#   - human 消息留给实时输入
SYSTEM_PROMPT = (
    "你是一个细致耐心的 AI 客服,负责回答用户关于产品政策、订购与故障排查的问题。"
    "遇到需要人工介入的情况时,先整理已知信息,再建议转人工。"
    f"当前时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}。"
)

prompt = ChatPromptTemplate.from_messages(
    [
        ("system", SYSTEM_PROMPT),
        MessagesPlaceholder(variable_name="history"),
        ("human", "{user_input}"),
    ]
)

model = ChatTongyi(
    model=os.getenv("TONGYI_MODEL", "qwen-max"),
    api_key=os.getenv("OPENAI_API_KEY"),
    temperature=0.2,
)

# Prompt + Model + OutputParser 合成最基础的对话链条
base_chain = prompt | model | StrOutputParser()

# 长期记忆存储(Redis)
#   - 通过 RedisChatMessageHistory 把每个 user_id 的消息写入 Redis 列表
#   - ttl 控制消息保留时间(默认 7 天),也可以调成按需清理
#   - REDIS_URL 可带密码、数据库 index,适合生产环境部署
redis_url = os.getenv("REDIS_URL", "redis://:123456@localhost:6379/0")
redis_ttl_seconds = int(os.getenv("REDIS_TTL_SECONDS", str(60 * 60 * 24 * 7)))

def get_history(session_id: str) -> RedisChatMessageHistory:
    if not session_id:
        raise ValueError("session_id must not be empty")
    return RedisChatMessageHistory(
        session_id=session_id,
        url=redis_url,
        ttl=redis_ttl_seconds,
    )

# RunnableWithMessageHistory = “胶水”:
#   - 每次调用前在 prompt 里注入历史;调用后将最新 AI/Human 消息写回 Redis
#   - input_messages_key 对应 prompt 中 human 部分的变量名
#   - history_messages_key 对应 MessagesPlaceholder 的变量名
conversation_chain = RunnableWithMessageHistory(
    base_chain,
    get_session_history=get_history,
    input_messages_key="user_input",
    history_messages_key="history",
)


class ChatRequest(BaseModel):
    user_id: str
    message: str
    locale: Optional[str] = None


class ChatResponse(BaseModel):
    user_id: str
    reply: str


@app.get("/health")
async def health_check():
    return {"status": "ok"}


@app.post("/chat", response_model=ChatResponse)
async def chat_endpoint(payload: ChatRequest):
    if not payload.user_id.strip():
        raise HTTPException(status_code=400, detail="user_id is required")

    try:
        reply = await conversation_chain.ainvoke(
            {"user_input": payload.message},
            config={"configurable": {"session_id": payload.user_id}},
        )
    except Exception as exc:  # pragma: no cover - demo level error surfacing
        tb = traceback.format_exc()
        logger.error("LLM 调用失败: %s\n%s", exc, tb)
        raise HTTPException(
            status_code=500,
            detail={
                "error": str(exc),
                "hint": "检查 OPENAI_API_KEY 是否为有效的 DashScope Key 或 Redis 连接是否可用。",
                "traceback": tb,
            },
        ) from exc

    return ChatResponse(user_id=payload.user_id, reply=reply)

结果:

C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"zhangsan\", \"message\": \"我的猫叫咪咪,它是一只三花猫\"}"
{"user_id":"zhangsan","reply":"谢谢您的分享!咪咪听起来是个非常可爱的名字,三花猫的颜色组合通常也非常漂亮。如果您有关于照顾咪咪的问题,比如饮食、健康或是行为习惯等方面的疑问,我很乐意帮助提供一些建议。或者,您有其他想要了解的内容吗?"}

C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"lisi\", \"message\": \"我计划下个月去北京旅游\"}"
{"user_id":"lisi","reply":"那听起来很令人兴奋!北京是一个充满历史和文化的城市,有很多值得游览的地方。如果您需要一些建议来规划您的旅行,比如必去的景点、美食推荐或是住宿建议等,我很乐意为您提供帮助。另外,如果您有关于交通安排、天气情况或任何其他方面的疑问,也请随时告诉我!您现在有什么具体想要了解的信息吗?"}

C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"zhangsan\", \"message\": \"它平时喜欢吃什么?\"}"
{"user_id":"zhangsan","reply":"三花猫和其他猫咪一样,对食物的喜好可能会有所不同,但一般来说,健康的饮食应该包括高质量的猫粮和适量的湿粮。这里有一些关于猫咪饮食的一般性建议:\n\n1. 高质量猫粮:选择一种营养均衡、适合咪咪年龄(幼猫、成猫或老年猫)以及健康状况的猫粮非常重要。市面上有许多品牌提供不同类型的猫粮,您可以根据咪咪的具体需求来挑选。\n\n2. 湿粮:湿粮可以为猫咪提供额外的水分,并且很多猫咪都喜欢它的口感。湿粮也是增加猫咪饮食多样性的好方法之一。不过需要注意的是,不要让湿粮成为猫咪唯一的食物来源,因为干粮有助于清洁牙齿。\n\n3. 新鲜肉类:一些猫咪喜欢偶尔吃点新鲜煮熟的鸡肉、鱼肉等。确保这些肉类是无骨且未经过调味处理的。\n\n4. 零食:适量给予猫咪专用的小零食作为奖励是可以的,但要控制好分量,避免过量导致肥胖或其他健康问题。\n\n5. 水:保持充足的清水供应对于维持猫咪良好的健康状态至关重要。\n\n值得注意的是,每只猫咪的身体状况和个人偏好都不同,在改变其饮食习惯前最好先咨询兽医的意见。如果发现咪咪有任何不寻常的行为或健康问题,请及时联系专业人士进行检查。希望这些建议能帮助到您!"}

C:\Users\29924>curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"lisi\", \"message\": \"它平时喜欢吃什么?\"}"
{"user_id":"lisi","reply":"看起来您可能是在询问某个人或某个特定对象的饮食偏好,但您的问题中没有明确指出是“谁”或“什么”。如果您能提供更多背景信息,比如是指北京的当地人、某种动物还是其他什么,我就能更好地为您提供相关信息。请告诉我更多细节,好吗?"}

重启服务后:

C:\Users\29924>  curl -X POST http://127.0.0.1:8000/chat -H "Content-Type: application/json" -d "{\"user_id\": \"zhangsan\", \"message\": \"它叫什么来着?\"}"
{"user_id":"zhangsan","reply":"您的猫咪叫咪咪,是一只三花猫。如果您还有其他关于咪咪的问题或需要更多帮助,请随时告诉我!"}

这5次API调用清晰地展示了系统的多用户记忆与隔离机制:

  1. 首次对话建立不同上下文
    • zhangsan聊“我的猫叫咪咪”
    • lisi聊“计划去北京旅游”
  2. 后续相同问题验证隔离效果
    • zhangsan​ 问“它平时喜欢吃什么?” → AI准确理解“它”指猫“咪咪”,给出详细养猫建议
    • lisi​ 同样问“它平时喜欢吃什么?” → AI困惑,因历史中无“它”所指对象,要求更多信息
  3. 重启服务后验证历史记录持久化(记忆为长期而非短期):
    • zhangsan​ 问“它叫什么来着?” → 服务重启后的AI依然可以准确回答“它”叫“咪咪”

核心证明:相同的模糊提问,因用户不同,系统从各自独立的Redis历史中提取上下文,给出了完全不同的回答。这证实了:

  • 会话隔离:用户历史互不干扰
  • 长期记忆:能记住用户之前的对话
  • 自动上下文管理RunnableWithMessageHistory正确注入专属历史

讲解:

组件作用本质
外部存储数据持久化Redis / DB / 向量库
ChatMessageHistory数据访问层CRUD 封装
RunnableWithMessageHistory生命周期管理自动注入 + 自动存储

ChatMessageHistory

LangChain 已经提供了常用实现:

  • InMemoryChatMessageHistory(内存)
  • RedisChatMessageHistory
  • SQLChatMessageHistory

一般使用自定义/扩展的ChatMessageHistory继承BaseChatMessageHistory,需要至少实现三个方法:

  • add_message:在每次对话发生时写入新消息,并在写入过程中可附带裁剪、摘要、向量化或信息抽取等处理。
  • get_messages:在模型调用前读取并组织上下文,通常会组合“历史摘要 + 最近对话 + 检索结果”等内容返回。
  • clear:清空当前会话的所有已存储记忆,用于重置上下文或结束会话。

下面是一个典型的自定义示例:

class EnhancedRedisChatMessageHistory:

    def __init__(
        self,
        session_id: str,
        redis_client: Redis,
        ttl: Optional[int] = None,
        max_messages: int = 20,  # 最多保留最近 N 条对话(防超长)
        prefix: str = "chat:history"  # Redis key 前缀(多业务隔离)
    ):
        # 1. 基础校验(生产必备)
        if not session_id:
            raise ValueError("session_id 不能为空")
        if not isinstance(redis_client, Redis):
            raise TypeError("redis_client 必须是合法的 Redis 实例")

        # 2. 核心配置
        self.session_id = session_id
        self.redis_client = redis_client
        self.ttl = ttl
        self.max_messages = max_messages
        self.key = f"{prefix}:{session_id}"  # 最终 Redis key

    def add_message(self, message: BaseMessage) -> None:
        """添加单条消息(LangChain 标准接口)"""
        try:
            # 读取现有历史
            history = self.messages
            # 追加新消息
            history.append(message)
            # 截断:只保留最近 N 条(防止 token 爆炸)
            if self.max_messages > 0:
                history = history[-self.max_messages:]
            # 序列化存储
            self._save_history(history)
            logger.debug(f"会话 {self.session_id} 新增消息成功,当前总数:{len(history)}")

        except RedisError as e:
            logger.error(f"Redis 存储消息失败:{str(e)}", exc_info=True)
            raise  # 生产可根据需求选择是否吞异常

    def _save_history(self, messages: List[BaseMessage]) -> None:
        """内部方法:序列化 + 存储 + 设置过期时间"""
        try:
            # 转成可序列化格式
            dict_messages = messages_to_dict(messages)
            # 存入 Redis
            self.redis_client.set(
                name=self.key,
                value=json.dumps(dict_messages, ensure_ascii=False),
                ex=self.ttl
            )
        except Exception as e:
            logger.error(f"序列化/存储失败:{str(e)}", exc_info=True)
            raise

    def clear(self) -> None:
        """清空历史(LangChain 标准接口)"""
        try:
            self.redis_client.delete(self.key)
            logger.info(f"会话 {self.session_id} 历史已清空")
        except RedisError as e:
            logger.error(f"清空历史失败:{str(e)}", exc_info=True)
            raise

    @property
    def messages(self) -> List[BaseMessage]:
        """读取历史(LangChain 标准接口)"""
        try:
            # 从 Redis 读取
            data = self.redis_client.get(self.key)
            if not data:
                return []

            # 反序列化
            dict_messages = json.loads(data)
            return messages_from_dict(dict_messages)

        except RedisError as e:
            logger.error(f"读取历史失败:{str(e)}", exc_info=True)
            return []  # 读失败返回空,不阻塞主流程
        except json.JSONDecodeError:
            logger.error(f"历史数据格式损坏:{self.key}")
            self.clear()
            return []

    def add_user_message(self, message: str) -> None:
        """快捷添加用户消息(兼容 LangChain)"""
        from langchain.schema import HumanMessage
        self.add_message(HumanMessage(content=message))

    def add_ai_message(self, message: str) -> None:
        """快捷添加 AI 消息(兼容 LangChain)"""
        from langchain.schema import AIMessage
        self.add_message(AIMessage(content=message))

RunnableWithMessageHistory

给“原本无记忆的链(base_chain)”增加多轮对话能力,且不侵入原链代码。

在调用前后,自动帮你“读历史 + 注入 + 写回”

2. 原理(执行流程)
每次请求实际做三步:

1)根据 session_id 获取历史

history = get_history(session_id)

2)把历史插入 Prompt
→ 填充 MessagesPlaceholder("history")

3)模型返回后写回(写回的是整个链的结果)

history.add_user_message(...)
history.add_ai_message(...)

3. 参数为什么必须这样写

(1)get_session_history

def get_history(session_id) -> RedisChatMessageHistory

作用:
session_id 映射到对应的 Redis 记录(实现多用户隔离)

(2)input_messages_key

input_messages_key="user_input"

作用:
指明“用户输入在哪”,用于写回历史

(3)history_messages_key

history_messages_key="history"

作用:
指明“历史插入到 Prompt 的哪个位置”

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇