Skip to Content
Hi-Agent v1.0 · 全新上线 · 一门关于 Agent 工程的系统课程
课程01 · Chat1.2 说透流式输出

1.2 说透流式输出

在第一小节实现的 Chat 能力里,你应该已经能清楚看到 LLM 是怎么回复我们的了:你发出一句话,程序把这句话交给模型;通常等待几秒钟之后,模型会“唰”地一下,一次性吐出一大段文字。然后你再发一段文字,再等待几秒钟,模型继续一次性返回一段内容。如此循环,直到你输入 “exit” 指令,程序结束。

这种体验在刚开始写 Demo 的时候,问题并不明显。毕竟它确实能跑,也确实能回复,看起来已经完成了“聊天”这件事。但是扪心自问:你显然并不习惯这样的输出。如今市面上大部分 AI 软件、聊天应用,哪怕是网页版的 ChatGPT,采用的都是打字机式的输出——一个字一个字、一个 token 一个 token 地往外蹦,像是有人在你面前实时地组织语言、敲出回复。

那么,一次性输出和打字机式输出,差别真有那么大吗?从功能上讲,两者最终给出的完整内容完全一样;但从体验和工程实现的角度看,它们几乎是两个物种。

先说说我们刚刚写的那种“一次性返回”模式。它的好处是简单——发起一次请求,等几秒钟,拿到完整结果,渲染到屏幕上,完事。但它有几个致命的短板:

  • 第一,用户在这几秒钟里面对一个空白或转圈的界面,心里没底,不知道模型是在思考、卡住了还是已经忘了自己;
  • 第二,如果生成的内容很长(比如几百上千字),用户要等很久才能看到开头,这种“首字延迟”会明显削弱交互的流畅感;
  • 第三,一旦网络波动或服务端超时,整个请求就可能失败,连半成品都拿不到。

而打字机式输出,也就是流式输出(Streaming),恰好能解决这些问题。它允许模型一边生成一边把已经得到的文字推送给前端,用户几乎不需要等待就能看到第一个字,并且能实时观察模型的“思考过程”。那种感觉就像和真人聊天——对方还没说完一整句话,你已经能从前半句里捕捉到意图了。

不过,流式输出可不像看上去那么简单。从 HTTP 协议怎么保持长连接,到后端如何逐块推送数据,再到前端怎样把零散的片段拼接成通顺的句子——每一个环节都有不少坑要填。很多初学者在 Demo 里写了一次性的非流式调用,就觉得“LLM 集成不过如此”,结果一上生产、一要做真正的人机对话体验,就被流式输出折腾得焦头烂额。

这一篇,我们就来把流式输出彻底说透。我们会从最常见的 SSE(Server-Sent Events)讲起,一步步带你在后端实现真正的流式响应,同时在前端优雅地渲染出打字机效果。你还会学到如何处理流式传输中的异常、如何做到“边生成边思考”的高级技巧,以及——当一些老旧的 SDK 不支持流式时,你可以怎样“徒手”完成协议层的数据解析。

WebSocket VS SSE

在开始讲SSE之前,我们需要先讲清楚,为什么SSE在AI Agent时代,成为了Agent开发,前后端交互的一个必选协议。

你可能会想:要实现“打字机效果”,让模型逐字推送,难道不能用 WebSocket 吗?WebSocket 是全双工的长连接,理论上比 SSE 更强大——既能从服务端收数据,也能随时给服务端发数据,岂不更灵活?没错,很多实时应用(聊天室、游戏、协同编辑)确实首选 WebSocket。但在 AI Agent 的流式对话场景里,SSE 反而成了更简洁、更务实的选择。这背后有几个关键原因。

Agent Loop 流程图

1. 通信方向决定复杂度

AI Agent 的典型交互模式是:客户端发起一次请求,服务端以流式形式返回一次响应。这是一个典型的“半双工”场景——不需要在收到模型回复的过程中,再向服务端额外发送数据。换句话说,每次对话轮次都是“请求-流式响应”的原子操作。

WebSocket 虽然是全双工,但这份“全能”是有代价的:需要自己设计协议(比如定义消息类型、分帧、重连逻辑),需要处理连接状态管理(心跳、断线重连、消息确认),服务端还得维护每个 WebSocket 会话。而 SSE 天然就是为“客户端发起一次 HTTP 请求,服务端多次推送数据”设计的——它基于标准 HTTP,使用简单的 text/event-stream Content-Type,客户端通过 EventSource API 就能直接消费。代码量和心智负担都小一个数量级。

2. 拥抱 HTTP 生态,无须额外端口

SSE 走的是标准 HTTP/HTTPS 端口,可以无缝利用现有的负载均衡、反向代理(Nginx、Traefik)、认证拦截器、日志中间件。 这些基础设施对 HTTP 早已优化得炉火纯青。而 WebSocket 需要单独处理升级请求,某些云环境或企业防火墙可能对 WebSocket 的长连接有限制,甚至干脆阻断。 SSE 则没有这些烦恼,只要 HTTP 能通,SSE 就能通。

3. 自动重连机制

SSE EventSource 客户端内置了断线重连。如果网络抖动导致连接断开,浏览器会自动重新发起连接,并且可以通过 Last-Event-ID 让服务端补发丢失的消息。 这对于生成式 AI 尤其重要——模型可能已经生成了几百个 token,网络闪断几秒就全部丢失,体验会非常糟糕。 WebSocket 你需要自己实现心跳和重连,还要自己记录消息序号,容易出错。SSE 把这点内置好了,开箱即用。

4. 带宽与性能:单向流的天然优势

对于纯文本的 token 流,SSE 的头部开销远小于 WebSocket。WebSocket 每个消息帧至少 2-14 字节的额外开销,而且为保证可靠性往往需要额外的控制帧。 SSE 每个数据块就是 data: …\n\n 这样极简的格式,甚至可以按行分块。当模型以极高的频率吐出 token(比如每秒几十个小 token),SSE 的性能表现更稳定。

传统 Web 应用里,WebSocket 常用于双向实时交互(比如股票行情,服务端主动推价格;或者游戏,客户端和服务端互相发指令)。 但在 Agent 场景下,每个“用户提示 → 模型回复”是一个明确的事务,且回复过程可以纯粹是一个单向数据流。Agent 偶尔需要“中途调用工具”或“要求客户端补充信息”, 这些确实需要双向通信——但现代的设计往往把这类交互拆成多个独立的 HTTP 请求(比如函数调用单独发一次请求),而不是嵌在同一个长连接里。 保持简单:SSE 搞定流式文本,其他交互走普通 HTTP。这种关注点分离让系统更清晰。

拆解 SSE 协议格式

在动手写代码之前,我们先把 SSE 协议本身拆开来看个清楚。只有理解了线路上真正传输的是什么,你才能在遇到奇怪的 bug(比如数据被截断、前端收不到消息)时,快速定位问题。

Agent Loop 流程图

SSE(Server-Sent Events) 的规范定义在 HTML Living Standard 中,它极其简单——简单到整个协议可以用几句话描述清楚:

  • 服务端返回的 Content-Type 必须是 text/event-stream。
  • 客户端(浏览器 EventSource 或自定义 HTTP 客户端)会保持连接打开。
  • 服务端以 UTF-8 编码的纯文本块持续发送消息,每个消息由若干字段组成,字段之间用换行分隔,消息之间用两个换行符 \n\n 隔开。

一条 SSE 消息长这样:

SSE 消息示例
field: value\n field: value\n \n

每个字段的格式是 字段名: 值,末尾跟一个换行 \n(或 \r\n,但规范推荐 \n)。消息结束时再额外加一个换行。

具体来说,协议定义了四个标准字段名:

字段名含义是否必须
data消息的实际数据载荷推荐(每条消息至少一个)
event自定义事件类型,前端可监听特定类型可选
id消息 ID,用于断线重连时传递 Last-Event-ID可选
retry重连时间(毫秒),客户端断线后等待多久重新连接可选

1. 单行消息

只发送一行数据:

单行消息示例
data: hello world\n \n

注意末尾必须有两个 \n:第一个 \n 结束字段行,第二个 \n 表示消息结束。

客户端收到这条消息时,会触发 message 事件,event.data 的值就是 “hello world”。

2. 多行数据:如何发送带换行的文本

如果模型的回复里本身就包含换行符(比如生成一段代码或列表),你需要将每一行单独用 data: 前缀,并且这些行会由客户端自动拼接。

例如服务端发送:

多行消息示例
data: 第一行内容\n data: 第二行内容\n data: 第三行内容\n \n

客户端最终收到的 event.data 会是 “第一行内容\n第二行内容\n第三行内容”(换行符会保留)。这种设计使得我们不需要对 payload 做额外的转义,直接按行拆分即可。

3. 自定义事件类型

默认所有消息都触发 message 事件。如果你希望前端区分不同种类的数据(例如模型生成的主文本 vs 工具调用指令 vs 状态更新),可以用 event 字段:

自定义事件消息
event: token\n data: 你好\n \n event: tool_call\n data: {"name": "search", "args": {...}}\n \n event: done\n data: \n \n

前端可以分别监听 token、tool_call、done 事件,做不同的 UI 处理。

4. 消息 ID 与自动重连

SSE 最实用的特性之一就是内置重连。当连接意外断开,浏览器会自动重新发起请求,并在请求头中携带 Last-Event-ID(值等于最后收到的那条消息的 id)。

服务端可以这样给消息编号:

消息ID
id: 42\n data: 某条重要消息\n \n

客户端断线后重连时,会在请求头加上 Last-Event-ID: 42。服务端据此可以从第 43 条消息开始继续推送,而不是从头开始。这对于长生成任务非常关键——你不想让用户因为一次网络抖动就丢失已经生成的几百个 token。

5. 重连时间 retry

retry 字段告诉客户端如果连接断开了,应该等待多少毫秒后再尝试重连。

retry
retry: 3000\n \n

这个字段可以随时发送,客户端收到后会更新自己的重连计时器。通常服务端可以在连接建立之初先发送一次默认重连间隔,比如 3000ms。

6. 注释行(心跳保活)

有些浏览器或代理有超时机制,如果长时间没有数据传输,可能会主动断开连接。为了解决这个问题,服务端可以定期发送注释行(以冒号开头,后面可以跟任意文本)。

心跳保活
: heartbeat\n\n

注释行不会触发任何客户端事件,也不会被当作数据存储,纯粹是为了保持连接活跃。注意它也必须以 \n\n 结尾。

  • 冒号后面必须跟一个空格吗?规范说“可以有空格”,但实践上建议跟一个空格,避免解析歧义。
  • 字段名大小写:规范要求字段名比较是大小写不敏感的,但实践中最好全小写。
  • 换行符混用:虽然规范推荐 \n,但大多数实现同时支持 \r\n。为了兼容性,建议统一用 \n。
  • 不能发送二进制数据:SSE 是纯文本协议。要传二进制,需 base64 编码,或改用 WebSocket。
  • 最大消息长度:没有硬性限制,但前端 EventSource 的实现通常会把整个消息加载到内存中。如果模型一次生成几 MB 的文本,建议在应用层做分块(比如每 1000 个 token 发一条 data 消息)。

有工具介入的SSE过程

我们前面讲的是简单的SSE协议体和流程,当然,单纯的用来Chat就已经足够了,因为在单纯的聊天中,我们只需要自定义各种事件就足矣覆盖大部分场景,但是,我们现在要做的是Agent。

这是整个流式架构里最考验工程功底的部分,也是 SSE + Function Calling 结合时最容易被忽视的坑。 当模型决定调用一个工具的时候(比如 read_file、search、execute_command),它并不是直接吐出一个完整的 JSON 对象。原因很简单:模型是自回归生成的,每次只输出一个 token。你收到的是按 token 切分的碎片——一个 token 可能只是半个单词,也可能是一个花括号的一部分。 拿一个典型的 tool_use 块来说,它最终的完整形态是:

tool_use
{ "type": "tool_use", "name": "read_file", "input": { "file_path": "src/utils.ts" } }

但在流式输出中,模型会按顺序吐出成百上千个 token。通过 SSE 传递过来时,你会看到类似这样的事件序列(以 Anthropic Claude 的 API 为例,其他模型类似):

sse_tool
event: content_block_start data: {"type": "tool_use", "index": 0, "name": "read_file", "input": {}} event: content_block_delta data: {"type": "input_json_delta", "index": 0, "partial_json": "{\"file_"} event: content_block_delta data: {"type": "input_json_delta", "index": 0, "partial_json": "path\": \""} event: content_block_delta data: {"type": "input_json_delta", "index": 0, "partial_json": "src/uti"} event: content_block_delta data: {"type": "input_json_delta", "index": 0, "partial_json": "ls.ts\"}"} event: content_block_stop data: {"index": 0}

看到问题了吗?

  • content_block_start 里的 input 字段是一个空对象 ——这只是个占位符,告诉你“参数从这里开始”。
  • 真正的参数内容是通过 content_block_delta 事件里的一小片一小片 partial_json 推过来的。
  • 每一片 partial_json 都是不完整的 JSON 片段——{\"file_path\ 单独拎出来啥也不是。
  • 直到 content_block_stop 信号到来之前,你手里的永远是一个不完整的、无法解析的半成品。

1. 为什么不能“攒到 stop 再解析”?

最直观的思路就是:等收到 content_block_stop 之后,把所有 partial_json 按顺序拼成一个大字符串,再调用 JSON.parse()。这种方法绝对正确,也最容易实现。

但是——在 AI Agent 的场景下,这个“但是”很致命。因为模型的思考很慢,通常每秒只吐出 20~50 个 token。如果一个工具的参数有几十个 token(比如文件路径很长、查询语句很复杂),你可能要等 1~2 秒才能拿到完整 JSON。而 Agent 的核心价值之一就是快速响应,用户发出指令后,希望看到工具被尽早调用(尤其是那些可以预取或展示中间结果的工具)。

考虑两个极端场景:

  • 搜索工具:参数是一个查询字符串 “latest news about AI”。其实模型只输出 “latest news” 时,搜索引擎已经可以根据前缀做模糊匹配了。如果等完整参数再执行,白白浪费了时间。
  • 代码执行工具:参数是 { "code": "import os; os.system('rm -rf /')" }——这种绝对不能提前执行,必须等到完整参数验证无误后才能运行。

所以,工程上我们面临一个本质的权衡:延迟 vs 正确性/安全性。这就引出了几种不同的架构模式。

1.1 模式一:攒完整再执行(Safe but Slow)

最简单,也最稳妥:

Safe but Slow
let buffer = ""; let toolName = ""; let toolIndex = -1; onContentBlockDelta((partial_json) => { buffer += partial_json; }); onContentBlockStop(() => { const input = JSON.parse(buffer); // 现在 buffer 已经是完整 JSON executeTool(toolName, input); });
  • 优点:无脑,不会出错。
  • 缺点:每次工具调用都要等完整个 JSON 生成,增加了首工具延迟(First Tool Latency)。

1.2 模式二:流式增量解析(Fast but Tricky)

如果工具的调用参数是安全的(比如只读操作、搜索查询),我们可以采用增量 JSON 解析——收到一个片段就尝试解析一次,一旦字段值构建完整就立刻执行。

一些 JSON 解析库支持增量/流式模式,例如:

  • 我自己手写一个基于状态的解析器,跟踪 JSON 的 key 是否已经完整。
  • 或者使用 jsonpath + 部分解析(比如读到 file_path 的值完整后就直接取用)。

更常见的做法是利用 JSON 的括号计数 + 关键字段提前识别。比如:

流式增量解析
let buffer = ""; let filePathComplete = false; onContentBlockDelta((partial) => { buffer += partial; // 土办法:当看到 "file_path": " 后面的双引号闭合时,就认为值完整了 const match = buffer.match(/"file_path":\s*"([^"]*)"/); if (match && !filePathComplete) { const filePath = match[1]; preloadFile(filePath); // 提前发起文件读取(不等待其他参数) filePathComplete = true; } });

但这种手写的状态机非常脆弱,一旦模型输出格式有细微变化(比如多一个空格、换行)就会失效。更工业化的方案是使用真正的流式 JSON 解析器,比如:

  • stream-json(Node.js)
  • ijson(Python)
  • json-stream-parser(Rust)

它们允许你在 JSON 还没完全到达时,就注册回调、收取特定字段的值。例如:

ijson 示例
import ijson import sys # 假设 fragments 是一个迭代器,逐个产生 partial_json 字符串 parser = ijson.parse(fragments) for prefix, event, value in parser: if prefix == 'input.file_path' and event == 'string': print(f"file_path 提前得知: {value}") # 立即开始预加载文件

这种方式的极端优势:你可以在模型还在输出剩余参数的同时,并行执行一部分工具准备工作。比如文件路径确定后,立刻去读磁盘;模型还在输出 file_encoding、start_line 等额外参数时,文件已经读好了。 等到完整 JSON 到达,只需做最后的参数校验,工具结果几乎可以零延迟返回。

1.3 模式三:完全不等,直接执行流式工具(激进)

更激进的 Agent 框架甚至允许工具接收不完整的参数,由工具自己处理模糊匹配。比如一个搜索工具可以一边接收 query 片段,一边实时返回搜索结果中的候选项。这本质上把工具调用也流式化了——但这样做会大大增加复杂度,且要求工具本身具备增量计算能力。

目前主流生产环境(LangChain、Semantic Kernel)仍然以“攒完整再解析”为主,部分高级场景会引入“关键字段预提取”,极少数自研系统会做全流式工具执行。

当模型一次生成多个工具调用(tool use 数组)时,情况会更复杂。模型可能这样输出:

流式工具调用
content_block_start (tool_use #0, name="search") content_block_delta partial_json "{\"que" content_block_start (tool_use #1, name="read_file") content_block_delta partial_json "{\"pat" ...

不同工具的 JSON 碎片会交错发送。你需要维护每个 index 对应的独立 buffer,直到各自的 content_block_stop 出现。增量解析时更要小心——不能把 tool #0 的片段误喂给 tool #1。

2. ClaudeCode和OpenClaw的流式设计哲学

讲到这里,我们其实已经摸到了 Agent 流式输出里最核心的矛盾:

如果只是普通聊天,SSE 很简单。模型生成一点,后端推一点,前端拼一点,用户看到打字机效果,这事基本就成了。

但一旦工具介入,问题就完全变了。

因为模型输出的不再只有自然语言,还有工具调用;工具调用也不再只是展示给用户看的文本,而是会触发真实的系统行为:读文件、查资料、执行命令、修改代码、调用外部接口,甚至写入数据库。 这时候,我们就不能再把 SSE 理解成“传 token 的管道”。它更像是一条 Agent 的运行事件流。

所以,生产级 Agent 的流式输出,绝不只是“把模型生成的字一个一个推给前端”。 真正重要的是:

  • 模型什么时候开始说话;
  • 工具什么时候开始执行;
  • 哪些工具可以并发执行;
  • 哪些工具必须排队执行;
  • 工具执行结果如何返回给模型;
  • 用户界面应该看到哪些中间状态;
  • 出错时,错误应该影响当前工具,还是影响整个任务;
  • 最终的消息历史应该怎样保持可读、可调试、可复现。

这些问题,已经不是 SSE 协议本身能解决的了。SSE 只是底层通道,真正决定体验的是 Agent Runtime 的设计。 接下来我们就来看看:ClaudeCode和OpenClaw他们的SSE设计。

2.1 Claude Code:优先优化“执行效率”和“因果正确性”

Claude Code 本质上是一个 coding agent。它能读取代码库、编辑文件、运行命令,并和开发工具集成。
这类产品的核心目标不是“聊得像人”,而是“尽快正确地把活干完”。所以 Claude Code 的流式设计,重点不是把每个 token 都优雅地推给用户,而是围绕一个核心原则:

只要工具调用已经完整、参数已经确定,就尽早执行;但不能破坏工具之间的因果关系。

因此,ClaudeCode对于工具执行中的SSE,采用的就是‘流式增量解析’;但是,Claude Code 的关键不是“能并发就并发”,而是“安全地并发”。比如:

Claude Code 流式工具调用
Read src/utils.ts Read package.json Read tsconfig.json

这三个工具都是只读操作,互相不影响,当然可以并发。但如果是:

Claude Code 流式工具调用
Read src/utils.ts Edit src/utils.ts

那就不能乱来。Edit 很可能依赖 Read 的结果。再比如两个 Edit 同时改同一个文件,即使改的是第 10 行和第 20 行,也可能因为第一个修改改变了文件行号,导致第二个修改落到错误位置。

所以 Claude Code 的并发哲学是:

只读任务尽量并发,写入任务谨慎串行,危险命令强制隔离

2.2 OpenClaw:优先优化“跨渠道用户体验”和“消息可送达性”

OpenClaw 的场景和 Claude Code 不一样。

Claude Code 主要面对终端、IDE、代码工作区;OpenClaw 更像一个跨渠道 Agent 网关,要把 Agent 接到 Telegram、Discord、Slack、WhatsApp、微信、短信、浏览器等各种渠道里。

OpenClaw中它有两层 streaming:

  • 一层是 Block streaming,把助手输出按完成块发送成普通渠道消息;
  • 另一层是 Preview streaming,在 Telegram、Discord、Slack 等渠道中更新临时预览消息。

它也说明,目前不是把真正的 token delta 直接发成渠道消息,而是基于消息发送、编辑、追加来实现预览。

这就很关键了。对于网页 Chat UI 来说,token-by-token 很自然;但对于 Slack / Discord / Telegram 来说,token-by-token 是灾难:

OpenClaw 流式工具调用
你想 你想让 你想让我 你想让我帮 ...

这会导致:

  • 消息疯狂闪烁;
  • API 调用过多;
  • 触发平台限流;
  • 用户聊天窗口被刷屏;
  • Markdown 渲染不稳定;
  • 代码块可能被截断。

所以 OpenClaw 的核心哲学不是“越实时越好”,而是:

把模型的 token 流,翻译成适合具体渠道的消息流。

Agent Loop 流程图

SSE 怎么做“双向通信”?

讲到工具调用时,很容易冒出一个疑问:

如果 Agent 在执行工具之前需要用户确认怎么办?

比如 Claude Code 修改文件前,会提示你是否允许;一个业务 Agent 要扣款、发邮件、删除数据,也应该先让用户点一下“确认”。这看起来像是一个双向通信场景:服务端一边通过 SSE 往前端推送内容,前端一边还要把用户的确认结果传回服务端。

那是不是必须换成 WebSocket?

其实不需要。关键点在于:**工具审批通常不是发生在同一条 SSE 流的中间,而是发生在两次模型调用之间。**以 Claude 这类工具调用模型为例,当模型决定调用工具时,它会在当前响应里输出一个 tool_use 内容块。流式情况下,工具参数会通过 input_json_delta 一片片传过来;等工具调用块结束后,本轮模型响应也会以 stop_reason: "tool_use" 停下来。 这意味着:模型已经说完了,它现在等你执行工具。

所以整个流程可以拆成四步:

Agent Loop 流程图

所以,这里并不是在一条 SSE 连接里同时收发消息。

更准确地说,是:

工具调用流程
SSE:服务端 → 客户端,负责推送模型输出和工具请求 HTTP POST:客户端 → 服务端,负责提交用户确认和工具结果 新的 SSE:服务端 → 客户端,继续推送模型后续回复

给Hi-Agent接入流式输出

讲了这么多,接下来就来给我们的Hi-Agent接入流式输出;虽然我们这一节的文字内容很长,但主要还是想让大家深入的了解流式输出这部分内容,并不只是打字机式的输出这么简单, 要想真正的去实现一个自己的Agent,流式的处理也是一个相当复杂的点。

在我们原本的实现里,Chat接口全部都是阻塞的,需要完全等待AI返回结束后,输出String,那么流式的输出显然就不能这样来做了;

我们首先在OpenAiChatClient新增两个方法:

新增方法
public String streamChat(String userPrompt, Consumer<String> onDelta) public String streamChat(List<ChatMessage> messages, Consumer<String> onDelta)

OpenAiChatSession里新增方法:

新增方法
public String sendStreaming(String userInput, Consumer<String> onDelta)

这里刻意使用了 Consumer<String>,因为它足够简单,特别适合命令行场景:

  • SDK 每来一段内容,就回调一次
  • Main 收到这一段后直接 print
  • 同时底层仍然可以把整段回复重新拼起来,作为最终返回值

Consumer<T>可以理解成 Java 里的一个“处理器”。它接收一个参数,然后对这个参数做点事情,但不返回结果。比如 Consumer<String> 就表示:给我一段字符串,我拿到之后可以打印它、保存它、发送给前端,或者做其他处理。 在流式输出代码里,Consumer<String> onDelta 的意思就是:每当模型返回一小段文本,就把这段文本交给 onDelta 处理。

这样既保留了流式输出能力,也保留了“完整 assistant 回复”这个结果,后面写入会话历史时就很方便。

接下来就是调用接口了,阻塞式调用走的是:

client.chat().completions().create(params)

流式调用走的是:

client.chat().completions().createStreaming(params)

这个接口返回的是 StreamResponse<ChatCompletionChunk>。也就是说,服务端不是一次性把完整回答交回来,而是不断返回一个个 ChatCompletionChunk

那我们就需要一边回调增量内容,一边拼接最终结果:

streamChat(...)
public String streamChat(List<ChatMessage> messages, Consumer<String> onDelta) { Consumer<String> safeOnDelta = Objects.requireNonNull(onDelta, "onDelta must not be null"); ChatCompletionCreateParams params = buildParams(messages); StringBuilder reply = new StringBuilder(); try (StreamResponse<ChatCompletionChunk> streamResponse = client.chat() .completions() .createStreaming(params)) { streamResponse.stream() .flatMap(chunk -> chunk.choices().stream()) .map(ChatCompletionChunk.Choice::delta) .map(ChatCompletionChunk.Choice.Delta::content) .flatMap(Optional::stream) .filter(content -> !content.isEmpty()) .forEach(content -> { safeOnDelta.accept(content); reply.append(content); }); } if (reply.isEmpty()) { throw new IllegalStateException("AI response did not contain assistant content"); } return reply.toString(); }

streamChat(...) 的核心逻辑可以概括成下面这几步:

  1. 打开流式响应
  2. 遍历每个 chunk
  3. 从 chunk 中取出 assistant 的增量文本
  4. 每拿到一段文本,就执行 onDelta.accept(content)
  5. 同时把这段文本追加到 StringBuilder
  6. 流结束后返回完整 reply

这个设计非常重要,因为它同时满足了两个目标:

  • 对上层来说,可以实时拿到增量输出
  • 对会话层来说,仍然可以拿到一段完整 assistant 回复,便于写入 history

如果只做回调、不返回最终字符串,那么 session 层就很难知道最终完整回复是什么。

流式改造时,最容易被破坏的一点,就是 history 的一致性。原来的阻塞式语义是:

  • 先构造 requestMessages
  • 发请求
  • 成功后再把 userassistant 写入 history

这条规则在流式场景里必须保持不变。所以 sendStreaming(...) 的实现顺序是:

  1. 先创建当前轮的 userMessage

  2. 用旧 history + 新 userMessage 组装 requestMessages

  3. 调用 client.streamChat(...)

  4. 只有整个流成功结束后,才把:

    • userMessage
    • assistant 的完整 reply

    写入 history

这样做的结果是:

  • 如果流式请求成功,下一轮会带上完整上下文
  • 如果流式请求中途失败,控制台上已经打印的内容可以保留,但 history 不会被污染
streamChat(...)
public String sendStreaming(String userInput, Consumer<String> onDelta) { ChatMessage userMessage = new ChatMessage(ChatRole.USER, userInput); List<ChatMessage> requestMessages = new ArrayList<>(history); requestMessages.add(userMessage); String reply = client.streamChat(requestMessages, onDelta); history.add(userMessage); history.add(new ChatMessage(ChatRole.ASSISTANT, reply)); return reply; }

clientsession 都已经支持流式后,Main 的改动反而是最小的。

原来的写法是:

String reply = session.send(input); System.out.println("AI: " + reply);

现在改成:

System.out.print("AI: "); System.out.flush(); session.sendStreaming(input, delta -> { System.out.print(delta); System.out.flush(); }); System.out.println();

虽然 System.out.print(...) 会写入标准输出,但为了让终端更及时地显示内容,这里每次都显式调用了 flush()

这样可以尽量确保“收到一段就打印一段”。需要注意的是:

这能保证客户端不积压输出,但不能保证服务端一定按很细的粒度推送。 如果上游模型服务本身就是一小段一小段地返回 chunk,那么终端看到的也会是一小段一小段地出现。这是流式链路的真实表现,不是客户端额外做了缓冲。

总结

这一节我们把“流式输出”从体验、协议和工程实现三个层面完整拆了一遍。

一开始,我们只是想解决一个很直观的问题:为什么 AI 回复不能等几秒后一次性蹦出来,而应该像打字机一样逐步显示?答案很简单:流式输出能显著降低用户的等待感,让用户更早看到模型已经开始工作,也能在长文本生成时获得更自然的交互体验。

但继续往下看就会发现,流式输出并不只是“一个字一个字往前端推”。它背后牵涉到 SSE 协议格式、事件类型设计、心跳保活、断线重连、消息分块、工具调用、工具审批,以及不同模型提供商的流式协议差异。

在普通 Chat 场景里,SSE 的任务比较简单:服务端持续推送文本片段,客户端不断拼接展示即可。但到了 Agent 场景里,SSE 就不再只是文本流,而更像是一条 Agent 运行事件流。模型生成文字、准备调用工具、工具开始执行、工具执行完成、等待用户审批、继续下一轮模型调用,这些都应该被看作一个个事件。

所以我们重点讨论了两个成熟系统的设计思路:

  • Claude Code 更关注“执行效率”和“因果正确性”:工具调用一旦完整,就尽早执行;只读工具可以并发,写入工具要谨慎串行,危险命令必须隔离。
  • OpenClaw 更关注“跨渠道投递体验”:不要把 token 原样推给用户,而是把 token 流加工成自然、稳定、可读的消息块,避免聊天平台闪烁、刷屏、限流和 Markdown 渲染异常。

这两个方向共同说明了一件事:生产级 Agent Streaming,不只是文本流,而是事件流。

最后,我们回到自己的 Hi-Agent,实现了第一版流式输出能力。

这里最重要的设计点是:既要实时输出,也要保留完整回复。

实时输出解决的是用户体验问题;完整回复解决的是会话历史问题。如果只顾着把 delta 打印出去,却没有把最终 assistant 回复拼回来,下一轮对话就会丢失上下文。反过来,如果只保留完整回复,不做增量回调,那用户又回到了“一次性等待”的体验。

到这里,Hi-Agent 已经从一个阻塞式聊天 Demo,升级成了一个真正具备流式响应能力的 Chat 程序。

不过,这还只是第一步。我们下一小节再见。

本节源码63ac383f63ac383 feat: add streaming CLI chat tutorial从阻塞式调用到命令行流式输出