SSE协议

在这里插入图片描述

SSE,全称是 Server-Sent Events,是一种 服务器主动推送消息 到浏览器(客户端)的一种通信协议,基于 HTTP 单向流。

简单理解就是:

  • 客户端发起一个普通的 HTTP 请求(通常是 GET)。

  • 服务器保持这个连接不断开,持续地、实时地往客户端推送数据(类似实时通知、消息推送)。

  • 客户端收到数据后可以及时处理显示。

主要特点

  • 单向通信:服务器 → 客户端(客户端不能主动通过这个连接回传数据,只能发起新请求)。

  • 基于文本格式,数据流以 text/event-stream 的 MIME 类型传输。

  • 轻量、简单,不需要像 WebSocket 那样升级协议。

  • 自动重连(浏览器原生支持,断了会自动重连)。

  • 有序(服务器推送的消息默认是顺序到达的)。

同为浏览器推送技术,相较于 WebSocket 而言,Server-Sent Events (简称SSE)更少被人知晓,具体实践也较少。

原因有两点:

  • WebSocket 比 SSE 更强大,Websocket 在客户端和服务器之间建立了双向的实时通信。而 SSE 只支持从服务器到客户端的单向实时通信。
  • WebSocket 在浏览器方面支持更广(详见下图),IE / Edge 几乎根本不支持 SSE。

然而,就第一点而言,与 WebSocket 相比,SSE 也有独特的优势。

  • SSE 的浏览器端实现内置断线重连和消息追踪的功能,WebSocket 也能实现,但是不在协议设计范围内,需要手动处理。
  • SSE 实现简单,完全复用现有的 HTTP 协议,而 WebSocket 是相对独立于 HTTP 的一套标准,跨平台实现较为复杂。

协议实现

SSE 协议很简单,本质上是一个客户端发起的 HTTP Get 请求,服务器在接到该请求后,返回 200 OK 状态,同时附带以下 Headers:

Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
  • SSE 的 MIME Type 规定为 text/event-stream
  • SSE 肯定不允许缓存
  • SSE 是一个一直打开的 TCP 连接,所以 Connection 为 Keep-Alive

传输格式

每一条消息长这样:

data: 这是推送的数据
id: 1234
event: message
retry: 10000
  • data: 是发送的内容(可以多行)
  • id: 是消息 ID(浏览器会记下来,用于断线续传)
  • event: 可以指定事件类型(配合前端 addEventListener 监听不同事件)

每条消息 以两个换行符(\n\n)结尾,标志一条消息结束。

data 字段

数据内容用data字段表示。

data:  message\n\n

如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。

data: begin message\n
data: continue message\n\n

下面是一个发送 JSON 数据的例子。

data: {\n
data: "foo": "bar",\n
data: "baz", 555\n
data: }\n\n

id 字段

数据标识符用id字段表示,相当于每一条数据的编号。

id: msg1\n
data: message\n\n

浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。

event 字段

event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。

event: foo\n
data: a foo event\n\n

data: an unnamed event\n\n

event: bar\n
data: a bar event\n\n

上面的代码创造了三条信息。第一条的名字是foo,触发浏览器的foo事件;第二条未取名,表示默认类型,触发浏览器的message事件;第三条是bar,触发浏览器的bar事件。

  • 在 SSE 协议规范WHATWG EventSource spec)里,event: 是一个合法的标准字段。
  • event: 后面的内容(事件名)是可以随便取的,你可以叫 updatenew-messageheartbeatanything-you-want
  • 客户端可以通过 addEventListener("事件名", handler) 来分别监听不同类型的事件。

也就是说,SSE协议只规定了格式,但没有限制你具体的 event 名字

服务器发送的数据:

event: user-message
data: {"user":"Alice","msg":"Hi there"}

event: system-alert
data: {"level":"warning","message":"Server is hot"}

event: heartbeat
data: ping

前端可以这样监听不同的事件:

const es = new EventSource("/stream");

// 监听普通消息
es.addEventListener("user-message", e => {
  console.log("收到用户消息:", e.data);
});

// 监听系统警告
es.addEventListener("system-alert", e => {
  console.warn("系统警告:", e.data);
});

// 监听心跳
es.addEventListener("heartbeat", e => {
  console.log("心跳包:", e.data);
});

// 监听默认事件(没有 event: 字段时)
es.onmessage = e => {
  console.log("默认消息:", e.data);
};
  • 如果服务器没有指定 event: xxx,那浏览器默认就是 message 事件(onmessage 触发)。
  • 如果有指定 event: xxx,就要用 addEventListener("xxx", handler) 来监听。
  • event: 字段必须在 data: 字段前面,否则它只对下一条消息生效。

retry 字段

服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。

retry: 10000\n

两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。

前后端实现

前端代码,使用浏览器原生提供的方法即可:

const url = '/xx/xxx'
// 1. 创建实例
var source = new EventSource(url)

// 2. 事件监听
// 建立连接后,触发`open` 事件
source.addEventListener('open', (e) => {
    console.log('open', e)
})
// 收到消息,触发`message` 事件
source.addEventListener('message', (e) => {
    console.log('message', e)
})
// 发生错误,触发`error` 事件
source.addEventListener('error', (e) => {
    console.log('error', e)
})
// 自定义事件
source.addEventListener('eventName', (e) => {
  // ...
}, false)

// 3. 关闭链接
source.close()

上面的url可以与当前网址同域,也可以跨域。跨域时,可以指定第二个参数,打开withCredentials属性,表示是否一起发送 Cookie。

var source = new EventSource(url, { withCredentials: true });

EventSource实例的readyState属性,表明连接的当前状态。该属性只读,可以取以下值。

0:相当于常量EventSource.CONNECTING,表示连接还未建立,或者断线正在重连。
1:相当于常量EventSource.OPEN,表示连接已经建立,可以接受数据。
2:相当于常量EventSource.CLOSED,表示连接已断,且不会重连。

后端相对简单:

package main

import (
	"fmt"
	"net/http"
	"time"
)

func sseHandler(w http.ResponseWriter, r *http.Request) {
	// 设置必要的 Header
	w.Header().Set("Content-Type", "text/event-stream")
	w.Header().Set("Cache-Control", "no-cache")
	w.Header().Set("Connection", "keep-alive")
	w.Header().Set("Access-Control-Allow-Origin", "*") // 跨域支持(如果有需要)

	// 确保支持 Flush
	flusher, ok := w.(http.Flusher)
	if !ok {
		http.Error(w, "Streaming unsupported!", http.StatusInternalServerError)
		return
	}

	// 这里是你的推送逻辑:每秒发一条消息
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	// 如果客户端断开,这里可以检测
	ctx := r.Context()

	for {
		select {
		case <-ctx.Done():
			fmt.Println("客户端断开连接")
			return
		case t := <-ticker.C:
			// 注意!每条消息需要以两个换行符 \n\n 结尾
			fmt.Fprintf(w, "event: tick\n")
			fmt.Fprintf(w, "data: %s\n\n", t.Format(time.RFC3339))

			// 刷新到客户端
			flusher.Flush()
		}
	}
}

func main() {
	http.HandleFunc("/sse", sseHandler)
	fmt.Println("SSE 服务启动在 http://localhost:8080/sse")
	http.ListenAndServe(":8080", nil)
}

对于go也有对应的三方包,

在这里插入图片描述
使用非常简单:

import "github.com/gin-contrib/sse"

func httpHandler(w http.ResponseWriter, req *http.Request) {
  // data can be a primitive like a string, an integer or a float
  sse.Encode(w, sse.Event{
    Event: "message",
    Data:  "some data\nmore data",
  })

  // also a complex type, like a map, a struct or a slice
  sse.Encode(w, sse.Event{
    Id:    "124",
    Event: "message",
    Data: map[string]interface{}{
      "user":    "manu",
      "date":    time.Now().Unix(),
      "content": "hi!",
    },
  })
}

为什么选择 gin-contrib/sse

  1. 与 Gin 的无缝集成
    gin-contrib/sse 是 Gin 官方维护的中间件之一,专为与 Gin 框架协作而设计。它与 Gin 的上下文 (*gin.Context) 紧密结合,简化了 SSE 的实现过程。

  2. 简化的 API
    该库提供了 c.SSEvent(event string, data interface{}) 方法,允许开发者轻松发送事件数据,无需手动设置响应头或处理连接管理。

  3. 自动处理连接生命周期
    gin-contrib/sse 自动处理连接的打开和关闭,减少了开发者需要关注的细节。

  4. 支持事件 ID 和重连机制
    该库支持设置事件 ID 和重连时间,符合 SSE 的标准规范,增强了消息的可靠性和客户端的容错能力。

  5. 广泛的社区支持
    作为 Gin 官方提供的中间件之一,gin-contrib/sse 拥有广泛的社区支持和文档资源,易于学习和使用。

以下是使用 gin-contrib/sse 实现 SSE 的示例:

package main

import (
    "github.com/gin-gonic/gin"
    "github.com/gin-contrib/sse"
)

func main() {
    r := gin.Default()

    r.GET("/events", func(c *gin.Context) {
        c.Stream(func(w io.Writer) bool {
            sse.Encode(w, sse.Event{
                Event: "message",
                Data:  "Hello, SSE!",
            })
            return true
        })
    })

    r.Run(":8080")
}

与原生 Go SSE 的对比

特性 原生 Go SSE 实现 gin-contrib/sse
与框架集成 需要手动设置响应头和连接管理 与 Gin 无缝集成,简化实现
API 简洁性 需要手动编码事件格式 提供 c.SSEvent 等简洁方法
连接生命周期管理 需要手动管理连接的打开和关闭 自动处理连接的打开和关闭
事件 ID 和重连 需要手动实现 内置支持事件 ID 和重连机制
社区支持 取决于使用的库 作为 Gin 官方中间件,拥有广泛的社区支持

使用案例

FastAPI + SSE-STARLETTE 模拟大模型推理流

在使用 ChatGPT 时,发现输入 prompt 后,页面是逐步给出回复的,起初以为使用了 WebSckets 持久化连接协议,查看其网络请求,发现这个接口的通信方式并非传统的 http 接口或者 WebSockets,而是基于 EventStream 的事件流,像打字机一样,一段一段的返回答案。

ChatGPT 是一个基于深度学习的大型语言模型,处理自然语言需要大量的计算资源和时间,响应速度肯定比普通的读数据库要慢的多,普通 http 接口等待时间过长,显然并不合适。对于这种单项对话场景,ChagtGPT 将先计算出的数据“推送”给用户,边计算边返回,避免用户因为等待时间过长关闭页面。而这,可以采用 SSE 技术。

在这里插入图片描述

而现在很多大模型 API 服务(像 OpenAI 的 ChatGPT-API、各种 LLMs 推理服务)基本上都是:

  • 后端 Python
  • 框架FastAPI(因为支持 ASGI,可以异步高效处理流式返回)
  • 返回SSE流
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import datetime

app = FastAPI()

async def event_generator():
    while True:
        # 等待1秒
        await asyncio.sleep(1)
        # 当前时间
        now = datetime.datetime.now().isoformat()
        # 注意SSE格式要求两个 \n\n 结尾
        yield f"event: tick\ndata: {now}\n\n"

@app.get("/sse")
async def sse_endpoint(request: Request):
    # 检测客户端断开连接
    async def server_sent_events():
        async for event in event_generator():
            if await request.is_disconnected():
                print("客户端断开连接")
                break
            yield event

    return StreamingResponse(
        server_sent_events(),
        media_type="text/event-stream"
    )

Python 生态里还提供了专门的 SSE 辅助库,最有名的是:

库名 说明
sse-starlette 基于 Starlette(FastAPI的底层框架),专门为 FastAPI/FastASGI 写的 SSE 工具。
flask-sse 专门给 Flask 用户用的,封装了 Redis PubSub,适合广播场景。
python-sse 一个小型独立库,纯粹处理 SSE 协议格式,不依赖具体 Web 框架。

🔥 举个 sse-starlette 用法示范(适合 FastAPI)

首先安装:

pip install sse-starlette

然后代码很简单:

from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import datetime

app = FastAPI()

async def event_publisher():
    while True:
        await asyncio.sleep(1)
        yield {
            "event": "tick",
            "data": datetime.datetime.now().isoformat()
        }

@app.get("/sse")
async def sse():
    return EventSourceResponse(event_publisher())

🔥 这个库帮你自动做了什么?

功能 原本要手动做的事
帮你正确格式化 event:data:\n\n 你自己就不用手动 yield f"event: xx\ndata: yy\n\n"
自动设置 Content-Type: text/event-stream
支持 request.is_disconnected() 检测 防止死循环
支持传 retry: 字段(控制断线重连时间)
支持 id: 字段(让前端从上次断开位置继续接收)

基本就是 开箱即用,专门为 SSE 而生,而且和 FastAPI 非常搭配。✨

🖥 代码:FastAPI + SSE-STARLETTE 模拟大模型推理流
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
import random

app = FastAPI()

# 模拟的 LLM 推理,每次产出一个 "token"
async def fake_llm_stream(prompt: str):
    fake_tokens = [
        "Hello", ",", " this", " is", " a", " simulated", " response", ".", 
        " Thank", " you", " for", " using", " our", " AI", " model", "!"
    ]

    for token in fake_tokens:
        # 每隔随机 100~400ms 推一个 token
        await asyncio.sleep(random.uniform(0.1, 0.4))
        yield {
            "event": "token",              # 自定义事件名:token
            "data": token
        }

    # 推送一个结束标志(可以不推)
    yield {
        "event": "end",
        "data": "[DONE]"
    }

@app.get("/chat/stream")
async def chat_stream(prompt: str):
    # 每次访问 /chat/stream?prompt=xxx,就返回一个 Streaming Response
    return EventSourceResponse(fake_llm_stream(prompt))

🛠 前端测试 HTML

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <title>LLM Stream Demo</title>
</head>
<body>
  <h1>LLM 流式输出 Demo</h1>
  <div id="output" style="white-space: pre-wrap; font-family: monospace;"></div>

  <script>
    const prompt = "你好,请介绍一下自己";
    const es = new EventSource(`http://localhost:8000/chat/stream?prompt=${encodeURIComponent(prompt)}`);

    es.addEventListener("token", (e) => {
      document.getElementById("output").textContent += e.data;
    });

    es.addEventListener("end", (e) => {
      console.log("流式输出结束:", e.data);
      es.close();
    });

    es.onerror = (e) => {
      console.error("连接出错", e);
      es.close();
    };
  </script>
</body>
</html>

🔥 效果

  • 你一打开页面,它就调用 /chat/stream
  • 后端像大模型那样一块块流式返回 Token。
  • 浏览器前端实时接收、一字字拼出来
  • 最后收到 event: end,自动关闭连接。

就跟你用 OpenAI ChatCompletion stream=True 一模一样的体验!🎯

📦 依赖安装

别忘了装必要依赖哦:

pip install fastapi sse-starlette uvicorn

然后运行:

uvicorn your_file_name:app --reload

(记得把 your_file_name 换成你保存的 Python 文件名)

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐