你知不知道DeepSeek对话中像打字机一样的流式输出效果是怎么实现的?AI聊天项目实战经验:流式输出的前后端完整实现!(RAG,Langchain,Python)
来看这条简单清晰的逻辑链:LLMs_chain.astream(langchain中chain的API:流式输出LLM的一个个回复块「原料chat_sew.stream_chat (生成器,转换chunk为初步的SSE格式字典「初步加工产品)stream_response_generator(生成器,转换SSE协议字符串丨打包成快递)StreamingResponse(FastAPI用于流式传输响
你好呀~,很高兴你来。 我是一名大三计算机小菜,我在这里分享我的平凡经历。谢谢你的到来。
技术选型
- 前端框架:Vue
- 后端框架: FastAPI
- LLM 框架: Langchain
- 流式协议: Server-Sent Events (SSE)
- 核心 Langchain API:
Runnable.astream()
- FastAPI 响应: `StreamingResponse
- 前端 SSE 客户端:
@microsoft/fetch-event-source
核心逻辑链、数据链、思维链、
LLMs_chain.astream
(langchain中chain的API:流式输出LLM的一个个回复块|原料)
—> chat_sev.stream_chat
(生成器,转换chunk为初步的SSE格式字典|初步加工产品)
—> stream_response_generator
(生成器,转换SSE协议字符串|打包成快递)
—> StreamingResponse
(FastAPI用于流式传输响应体的响应类,以SSE协议发送数据块到客户端|快递员)
—> |前端-fetchEventSource
(第三方库方法,接受SSE数据,累加到ref|收货、组装)
—> Vue响应式更新
—-> 用户眼睛:哇sai!流式输出~
f-流式输出-后端实现
想要实现流式输出,其实很简单。
我们先前搭建了聊天链、RAG 链,并且使用 invoke 方法来传入参数,接收返回的 LLM 回复。
而现在我们只需要三步 ↓
- invoke 改用
astream
方法 --这时我们的链返回从一次性输出结果变为输出一个个chunk
- 处理分块
chunk
:–根据你定义的链的数据结构,解析出你要的数据放到content_piece
- 把一个个
content_piece
yield
出去!
你是不是感到头昏脑胀?astream
是什么?SSE
是什么?? 这个yield
又是啥???
Don’t worry! 让我们逐一击破这些概念↓
基本概念|前置知识
生成器 (Generator)?
生成器是一种特殊的 迭代器
(Iterator
)。
迭代器 是你可以逐个访问其元素的对象(比如在 for
循环中使用)。列表、元组、字典、字符串等都是可迭代对象,但它们不是迭代器本身。你可以通过调用 iter()
函数从可迭代对象获取迭代器。迭代器有一个 next() 方法,每次调用它会返回下一个元素,如果没有更多元素了,会引发 StopIteration
异常。
生成器 是一种创建迭代器的简单而强大的方法。它看起来像一个普通的函数,但关键区别在于它使用 yield 关键字来返回值,而不是 return
。
yield
是啥?
yield
是一个 Python 关键字,它有两个主要作用:
- 定义生成器函数: 任何包含
yield
语句的函数都会自动成为一个生成器函数。调用这个函数不会立即执行函数体,而是返回一个生成器对象(也就是一个迭代器)。 - 返回值并暂停: 当生成器函数的执行遇到
yield
语句时:yield
后面的表达式的值会被返回给调用者(即正在迭代该生成器的代码)。- 函数的执行会在此处暂停,并保存当前的所有状态(包括局部变量)。
- 当调用者请求下一个值时,函数会从暂停的地方恢复执行,直到遇到下一个
yield
或函数结束。
- 对比学习:
yield
vsreturn
:
-
return
会彻底终止函数的执行,并返回一个值(或None
)。函数的状态不会被保存。 -
yield
只会暂停函数的执行,并返回一个值。函数的状态会被保存,以便下次可以恢复。一个生成器函数可以有多个yield
语句。
yield
和生成器
,它们俩两个的关系?
- 关系:
yield
是用来创建生成器的语法核心。一个函数因为包含了yield
而成为生成器函数,调用它则得到生成器(迭代器)。
- 打个比方~:
-
生成器函数 (含
yield
的函数): 扮演“蓝图”或“工厂”的角色,用于定义如何按需生成一系列值。 -
yield
关键字: 扮演“暂停点”和“返回值发射器”的角色。它控制着值的生成和函数执行的暂停/恢复。 -
生成器对象 (调用生成器函数的结果): 扮演“迭代器”的角色。它实现了迭代协议(
__iter__()
和__next__()
),允许你通过循环或其他方式逐个获取由yield
产生的值。
-
StreamingResponse
- yield: 在 FastAPI 的 StreamingResponse 中使用生成器时,每次
yield
一个值,这个值就会被发送到客户端。这使得我们可以逐步发送数据,实现流式传输。
使用方法示例:
yield {"type": "chunk", "data": content_piece}
这将推送一个字典,包含了我们的content_piece
SSE (Server-Sent Events)
- SSE 是一种简单、标准的单向通信协议,非常适合将服务器更新推送到客户端。
- 对比认知:HTTP 也是一种通信协议
invoke 改用astream
方法
Langchain
的 astream()
方法允许我们以异步迭代的方式获取 Runnable
链(包括 LLM 调用)的输出块。astream()
返回一个异步迭代器 (AsyncIterator
)。
该方法会以块的形式流式传输最终输出
from langchain.chat_models import ChatAnthropic
model = ChatAnthropic()
chunks = []
async for chunk in model.astream("你好。告诉我一些关于你自己的事情"):
chunks.append(chunk)
print(chunk.content, end="|", flush=True)
print:
你好|!| 我| 的名字| 是| 克劳德|。| 我| 是|一个|由|人类|创建|的|AI|助手|,|旨在|有所帮助|、|无害|和|诚实|。||
了解了这些前置知识,你是否感觉更清晰明了了呢?
Now, Let’s make it !
实践
处理流式数据分块chunk
:
- 在
stream_chat
中,我们异步迭代astream()
返回的chunk
。- 关键处理: 需要判断
chunk
的类型。因为我们的基础链可能是普通的 LLM 调用(输出AIMessageChunk
等BaseMessage
类型)或 RAG 检索链(输出包含answer
和context
的字典Dict
),需要从中正确提取有效文本内容。 - 代码示例片段:
async for chunk in stream_iterator: content_piece = "" if isinstance(chunk, BaseMessage): if hasattr(chunk, 'content'): content_piece = chunk.content elif isinstance(chunk, dict): if 'answer' in chunk: answer_part = chunk['answer'] if isinstance(answer_part, str): content_piece = answer_part elif isinstance(answer_part, BaseMessage) and hasattr(answer_part, 'content'): content_piece = answer_part.content # ... (处理其他类型或记录日志) if content_piece: yield {"type": "chunk", "data": content_piece}
- 关键处理: 需要判断
结构化输出:
- 为了方便前端处理,我们将流式输出构造成结构化的字典。
- 在流开始时,首先
yield
一个包含上下文信息(如知识库名称)的字典:{"type": "context", "data": context_display_name}
。 - 对于每个文本块,
yield
一个:{"type": "chunk", "data": content_piece}
【核心数据–LLM输出数据】 - 如果处理过程中发生异常,
yield
一个错误信息:{"type": "error", "data": error_message}
。
- 在流开始时,首先
把一个个content_piece
yield
出去!
在路由中(src\router\chatRouter.py)
- 使用
StreamingResponse
:- 把路由的响应类型设置为 FastAPI 的
StreamingResponse
。 StreamingResponse
接收一个异步生成器函数作为其content
参数。
- 把路由的响应类型设置为 FastAPI 的
@ChatRouter.post(
"/stream",
summary="AI Chat (Streaming)",
description="与 AI 进行流式对话,可选使用知识库。",
)
async def chat_stream_endpoint(
request: ChatRequest, chat_sev: ChatSev = Depends(get_chat_service)
):
"""处理流式聊天请求。"""
if request.knowledge_config:
logging.info(f", kb_id={request.knowledge_config.knowledge_base_id}")
return StreamingResponse(
stream_response_generator(chat_sev, request), media_type="text/event-stream"
)
- 异步生成器 (
stream_response_generator
):- 创建一个
async def stream_response_generator(...)
函数。 - 此函数调用
ChatSev
实例的stream_chat
方法。 - 它迭代
stream_chat
返回的结构化字典。 - 将每个字典转换为 SSE 格式的字符串 (
data: json.dumps(chunk_dict)\n\n
) 并yield
出去。——在 FastAPI 的 StreamingResponse 中使用生成器时,每次 yield 一个值,这个值就会被发送到客户端。而在 - 设置
StreamingResponse
的media_type
为text/event-stream
。
- 创建一个
async def stream_response_generator(chat_sev: ChatSev, request_data: ChatRequest):
"""异步生成器,用于 StreamingResponse,产生 SSE 格式的事件。"""
logging.info(f"开始为 session_id={request_data.session_id} 生成流式响应")
try:
async for chunk_dict in chat_sev.stream_chat(
question=request_data.question,
api_key=request_data.llm_config.api_key,
supplier=request_data.llm_config.supplier,
model=request_data.llm_config.model,
session_id=request_data.session_id,
knowledge_base_id=request_data.knowledge_config.knowledge_base_id
if request_data.knowledge_config
else None,
filter_by_file_md5=request_data.knowledge_config.filter_by_file_md5
if request_data.knowledge_config
else None,
search_k=request_data.knowledge_config.search_k
if request_data.knowledge_config
else 3,
max_length=None,
temperature=request_data.llm_config.temperature,
):
event_type = chunk_dict.get("type", "message")
yield f"data: {json.dumps(chunk_dict)}\n\n"
logging.debug(
f"Sent chunk: {chunk_dict['type']} for session {request_data.session_id}"
)
except Exception as e:
logging.error(
f"在 stream_response_generator 中发生错误 (session: {request_data.session_id}): {e}",
exc_info=True,
)
error_payload = json.dumps(
{"type": "error", "data": f"流处理中发生严重错误: {e}"}
)
yield f"data: {error_payload}\n\n"
finally:
logging.info(f"结束为 session_id={request_data.session_id} 的流式响应")
这样我们后端的流式输出接口就完成了【详细源码见附录->src/service/ChatSev.py -def stream_chat、src/service/ChatSev.py、src/router/ChatRouter.py】
接着就是前端来接受处理 SSE 了
f-流式输出-前端实现
- 后端接口
POST /chat/stream
返回 text/event-stream
类型的数据。 - 事件流包含不同类型的 JSON 数据块,通过
type
字段区分 (context
,chunk
,error
)。 - 由于 Axios 不直接支持 SSE,且我们需要发送 POST 请求体,这里我们选择
@microsoft/fetch-event-source
库来处理前端的 SSE 连接。
关键词
fetchEventSource
()——发送和处理 SSE 请求—— 参数 Accept: ‘text/event-stream’【核心】AbortController
——通过signal
与fetchEventSource
挂钩——用于可手动终止 SSE 请求实现用户手动终止 AI 回复【额外功能】fetchEventSource
()中的事件处理回调函数:
- 处理连接 (onopen): 确认连接成功建立且响应类型正确。
- 处理消息(核心) (onmessage): 循环接收事件,解析 JSON,根据 type 更新 UI(累加 chunk 或显示通知/错误)。——
accumulatedContent += parsedData.data// 将新块追加到累积内容
- 处理关闭 (onclose): 连接正常结束时重置状态。
- 处理错误 (onerror): 区分用户取消 (AbortError) 和其他错误,进行相应处理并决定是否阻止重连。
安装库 @microsoft/fetch-event-source
pnpm add @microsoft/fetch-event-source
修改 src/views/chatPage.vue 文件来实现流式响应的处理
- 主要改动包括:
- 引入 fetchEventSource。
- 创建一个新的 ref
isStreaming
来跟踪流式响应的状态。 - 创建一个 ref
abortController
来控制请求的取消。 - 重构 sendMessage 函数,使用
fetchEventSource
调用API,并添加处理 SSE 事件 (onopen, onmessage, onclose, onerror) 的逻辑。 - 在
onmessage
中,根据事件类型 (context, chunk, error) 更新消息列表或显示错误。对于chunk
,我们会累加到最后一条 AI 消息的内容中。 - 添加错误处理和状态更新逻辑。
引入库和状态管理
import { fetchEventSource } from '@microsoft/fetch-event-source'
import { ref } from 'vue'
// ... 其他 import
const isStreaming = ref(false) // 跟踪流式响应状态
const abortController = ref(null) // 控制请求取消
const messages = ref([]) // 存储聊天消息
// ... 其他 refs 和 store
- 重构 sendMessage
函数:
- 请求前准备: 检查必要条件(模型、会话),取消上一个请求(如果存在),创建新的
AbortController
,准备用户消息和请求体messagePayload
。 - 状态更新: 清空输入框,设置
isStreaming.value = true
,添加 AI 消息占位符到messages
列表。 - 调用
fetchEventSource
: 这是核心逻辑,配置method
,headers
,body
,signal
。
核心代码
// 8.2 onmessage: 每次收到服务器发送的事件 (data: ...\n\n) 时调用
onmessage(event) {
console.log('Received SSE data:', event.data)
try {
const parsedData = JSON.parse(event.data) // 解析收到的 JSON 字符串
// 找到对应的 AI 消息占位符
const aiMessageIndex = messages.value.findIndex((msg) => msg.id === aiMessageId)
if (aiMessageIndex === -1) {
// 如果找不到,可能消息已被删除或出现异常
console.warn('AI message placeholder not found.')
return
}
// 根据事件类型处理
if (parsedData.type === 'context') {
// 处理上下文信息,这里使用了 ElNotification 弹出通知
console.log('Context received:', parsedData.data)
ElNotification({ title: 'Context', message: parsedData.data, type: 'info' })
} else if (parsedData.type === 'chunk') {
// 核心:处理文本块
accumulatedContent += parsedData.data // 将新块追加到累积内容
// 更新 Vue ref 中对应消息的内容,触发界面响应式更新
messages.value[aiMessageIndex].content = accumulatedContent
scrollToBottom() // 收到新内容,自动滚动到底部
完整sendMessage
函数:
const sendMessage = async () => {
// 1. 前置检查 (Guard Clauses)
if (!OneapiStore.selectedModel) {
ElMessage.info('请先选择一个模型')
return
}
if (!currentSession.value) {
ElMessage.info('请先选择一个话题')
return
}
if (!inputMessage.value.trim() || isStreaming.value) return // 防止重复发送或在流式传输时发送
// 2. 取消之前的流式请求 (如果存在)
if (abortController.value) {
abortController.value.abort()
console.log('Previous stream aborted.')
}
// 3. 初始化本次请求
abortController.value = new AbortController() // 创建新的 AbortController
const userMessageContent = inputMessage.value.trim()
const userMessage = {
// 准备用户消息对象
id: Date.now(),
type: 'human',
content: userMessageContent,
}
messages.value.push(userMessage) // 将用户消息添加到聊天记录
// 4. 构建请求体 (Payload)
const messagePayload = {
question: userMessageContent,
session_id: currentSession.value?._id,
chat_config: chat_config.value,
llm_config: llm_config.value,
...(knowledge_config.value ? { knowledge_config: knowledge_config.value } : {}), // 动态添加知识库配置
}
// 5. 更新 UI 状态 (开始流式传输)
inputMessage.value = '' // 清空输入框
isStreaming.value = true // 设置为流式状态 (会禁用输入框,改变发送按钮)
// 6. 添加 AI 消息占位符
const aiMessageId = Date.now() + 1 // 为 AI 回复生成唯一 ID
messages.value.push({
id: aiMessageId,
type: 'ai',
content: '', // 初始内容为空,等待后续 chunk 更新
})
let accumulatedContent = '' // 用于累积收到的文本块
// 7. 调用 fetchEventSource 发起 SSE 请求
try {
console.log('发送流式消息 payload:', messagePayload)
await fetchEventSource(baseURL + '/chat/stream', {
// baseURL 来自 @/utils/request
method: 'POST', // 使用 POST 方法
headers: {
'Content-Type': 'application/json', // 告知后端发送的是 JSON
Accept: 'text/event-stream', // 表明希望接收 SSE 流
},
body: JSON.stringify(messagePayload), // 将 JS 对象序列化为 JSON 字符串
signal: abortController.value.signal, // 关联 AbortController,用于取消请求
// 8. 事件处理回调函数
// 8.1 onopen: 连接成功建立时调用
onopen(response) {
if (
response.ok && // 确保 HTTP 状态码是 2xx
response.headers.get('content-type')?.includes('text/event-stream') // 确认响应类型正确
) {
console.log('SSE connection opened.')
// 连接成功,AI 消息占位符已添加,等待 onmessage
} else {
// 如果连接不成功或响应类型不对,则认为失败
isStreaming.value = false // 重置流式状态
throw new Error(`Failed to connect: ${response.status} ${response.statusText}`) // 抛出错误,会被外层 catch 或 onerror 捕获
}
},
// 8.2 onmessage: 每次收到服务器发送的事件 (data: ...\n\n) 时调用
onmessage(event) {
console.log('Received SSE data:', event.data)
try {
const parsedData = JSON.parse(event.data) // 解析收到的 JSON 字符串
// 找到对应的 AI 消息占位符
const aiMessageIndex = messages.value.findIndex((msg) => msg.id === aiMessageId)
if (aiMessageIndex === -1) {
// 如果找不到,可能消息已被删除或出现异常
console.warn('AI message placeholder not found.')
return
}
// 根据事件类型处理
if (parsedData.type === 'context') {
// 处理上下文信息,这里使用了 ElNotification 弹出通知
console.log('Context received:', parsedData.data)
ElNotification({ title: 'Context', message: parsedData.data, type: 'info' })
} else if (parsedData.type === 'chunk') {
// 核心:处理文本块
accumulatedContent += parsedData.data // 将新块追加到累积内容
// 更新 Vue ref 中对应消息的内容,触发界面响应式更新
messages.value[aiMessageIndex].content = accumulatedContent
scrollToBottom() // 收到新内容,自动滚动到底部
} else if (parsedData.type === 'error') {
// 处理流内由后端报告的错误
console.error('Stream error reported:', parsedData.data)
messages.value[aiMessageIndex].content += `\n\n**错误:** ${parsedData.data}` // 将错误信息附加到消息末尾
ElMessage.error(`流式响应出错: ${parsedData.data}`)
// 发生错误,尝试中止连接
if (abortController.value) {
abortController.value.abort()
}
}
} catch (e) {
// 处理 JSON 解析失败的情况
console.error('Failed to parse SSE data:', e, 'Raw data:', event.data)
const aiMessageIndex = messages.value.findIndex((msg) => msg.id === aiMessageId)
if (aiMessageIndex !== -1 && event.data && typeof event.data === 'string') {
// 尝试将原始错误数据附加到消息中
messages.value[aiMessageIndex].content +=
`\n\n**解析错误,原始数据:** ${event.data}`
}
ElMessage.error('接收到无效的数据格式')
// 解析错误,中止连接
if (abortController.value) {
abortController.value.abort()
}
}
},
// 8.3 onclose: 连接正常关闭时调用 (服务器关闭或客户端调用 abort())
onclose() {
console.log('SSE connection closed.')
isStreaming.value = false // 重置流式状态
abortController.value = null // 重置 AbortController 引用
scrollToBottom() // 确保最后滚动到底部
},
// 8.4 onerror: 发生错误时调用 (网络错误、onopen/onmessage 中抛出的错误、AbortError)
onerror(err) {
console.error('SSE error:', err)
isStreaming.value = false // 只要出错,就重置流式状态 (除了 AbortError)
// 找到 AI 消息
const aiMessageIndex = messages.value.findIndex((msg) => msg.id === aiMessageId)
// 特殊处理 AbortError (用户手动取消)
if (err.name === 'AbortError') {
console.log('Stream aborted by user.')
// 如果 AI 消息还是空的,就把它从列表里移除
if (aiMessageIndex !== -1 && !messages.value[aiMessageIndex].content) {
messages.value.splice(aiMessageIndex, 1)
}
// **重要:** 对于 AbortError,我们直接 return,不执行后续错误处理,
// 也不抛出错误,以防止库尝试重连。状态重置由 handleStopStreaming 或 onclose 处理。
return
}
// 处理其他类型的错误 (网络、连接等)
ElMessage.error(`连接错误: ${err.message || '未知错误'}`)
if (aiMessageIndex !== -1) {
// 在 AI 消息末尾附加错误信息
messages.value[aiMessageIndex].content +=
`\n\n**连接错误:** ${err.message || '未知错误'}`
} else {
// 如果连 AI 占位符都没有(可能 onopen 就失败了),则添加一条错误消息
messages.value.push({
id: Date.now(),
type: 'ai',
content: `**连接错误:** ${err.message || '未知错误'}`,
})
}
abortController.value = null // 重置 AbortController 引用
// **重要:** 对于非 AbortError,必须抛出错误 (throw err) 或不返回。
// 这是 fetchEventSource 库的设计,抛出错误会阻止它默认的重连尝试。
throw err
},
})
} catch (err) {
// 9. 捕获 fetchEventSource 启动时的错误
// 这个 catch 主要捕获 fetchEventSource 启动时就发生的错误,
// 例如 DNS 解析失败、网络连接无法建立等,这些错误发生在 onopen 之前。
// onerror 中 throw 的错误也会在这里被捕获,但我们主要处理非 AbortError。
console.error('Error initiating SSE request:', err)
isStreaming.value = false // 确保重置状态
if (err.name !== 'AbortError') {
// AbortError 已在 onerror 处理
ElMessage.error(`请求失败: ${err.message || '未知错误'}`)
// 标记用户消息发送失败
const userMessageIndex = messages.value.findIndex(
(msg) => msg.id === userMessage.id,
)
if (userMessageIndex !== -1) {
messages.value[userMessageIndex].content += ' (发送失败)'
}
// 移除可能已添加的 AI 占位符
const aiMessageIndex = messages.value.findIndex((msg) => msg.id === aiMessageId)
if (aiMessageIndex !== -1) {
messages.value.splice(aiMessageIndex, 1)
}
}
abortController.value = null // 重置 AbortController 引用
}
}
- 效果展示:
到这里教学就结束啦!
核心源码和本文pdf下载
我把更多核心源码、仓库地址放在这里了,欢迎大家0积分下载:【免费】langchain项目如何实现流式输出经验分享前端流式输出.pdf
参考文献:
关于作者
- CSDN 大三小白新手菜鸟咸鱼本科生长期更新强烈建议不要关注!
作者的其他文章
RAG调优|AI聊天|知识库问答
- 你是一名平平无奇的大三生,你投递了简历和上线的项目链接,结果HR真打开链接看!结果还报错登不进去QAQ!【RAG知识库问答系统】新增模型混用提示和报错排查【用户反馈与优化-2025.04.28-CSDN博客
- 你知不知道像打字机一样的流式输出效果是怎么实现的?AI聊天项目实战经验:流式输出的前后端完整实现!图文解说与源码地址(LangcahinAI,RAG,fastapi,Vue,python,SSE)-CSDN博客
- 【豆包写的标题…】《震惊!重排序为啥是 RAG 调优杀手锏?大学生实战项目,0 基础也能白嫖学起来》(Langchain-CSDN博客
- 【Langchain】RAG 优化:提高语义完整性、向量相关性、召回率–从字符分割到语义分块 (SemanticChunker)-CSDN博客
- 【RAG】向量?知识库的底层原理:向量数据库の技术鉴赏 | HNSW(导航小世界)、LSH、K-means-CSDN博客
Agent
docker
Python
前端
nginx
好用插件

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。
更多推荐
所有评论(0)