大多数开发者选错了实时通信方案
提到"服务端主动推送",绝大多数开发者第一反应是 WebSocket。但在实际项目中,80% 的实时场景根本不需要双向通信——股票行情、通知推送、日志流、AI 流式输出,这些场景数据都是从服务端单向流向客户端。
WebSocket 是一把瑞士军刀,但你只是想开个瓶盖时,一把开瓶器就够了。Server-Sent Events (SSE) 就是那把开瓶器——基于 HTTP 协议、原生支持断线重连、不需要额外协议升级,浏览器原生支持 EventSource API,写起来比 WebSocket 简单得多。
SSE 的核心原理
SSE 的本质就是一个不会关闭的 HTTP 响应。服务端通过 text/event-stream 内容类型持续推送数据,客户端通过浏览器内置的 EventSource API 接收。
数据格式非常简单,每条消息以 data: 开头,以空行结尾:
1
2
3
4
5
6
|
data: {"price": 102.5}
data: {"price": 103.1}
event: alert
data: {"message": "价格突破105"}
|
几个关键特性:
- 自动重连:连接断开后,浏览器会自动重连,并通过
Last-Event-ID 头告知服务端从哪条消息继续
- 基于 HTTP:不需要协议升级,可以穿透绝大多数代理和 CDN
- 单向通信:服务端 → 客户端,如果需要客户端发消息,用普通 HTTP 请求即可
Node.js 实现:一个实时价格推送服务
先来一个最直观的例子——模拟股票价格实时推送:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
// server.js
import http from 'node:http';
const PORT = 3000;
// 模拟股票价格波动
function generatePrice() {
return (100 + Math.random() * 10).toFixed(2);
}
const server = http.createServer((req, res) => {
if (req.url === '/events') {
// 设置 SSE 响应头
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no', // 禁用 Nginx 缓冲
});
// 发送初始连接确认
res.write(`data: ${JSON.stringify({ connected: true })}\n\n`);
// 每秒推送一次价格
const intervalId = setInterval(() => {
const price = generatePrice();
res.write(`data: ${JSON.stringify({ price, ts: Date.now() })}\n\n`);
}, 1000);
// 客户端断开时清理
req.on('close', () => {
clearInterval(intervalId);
console.log('客户端断开连接');
});
} else {
res.writeHead(200, { 'Content-Type': 'text/html' });
res.end(`
<!DOCTYPE html>
<html>
<body>
<h2>实时股价</h2>
<div id="price">连接中...</div>
<script>
const source = new EventSource('/events');
source.onmessage = (e) => {
const data = JSON.parse(e.data);
document.getElementById('price').textContent =
'¥' + data.price;
};
source.onerror = () => {
document.getElementById('price').textContent = '重连中...';
};
</script>
</body>
</html>
`);
}
});
server.listen(PORT, () => {
console.log(`SSE 服务运行在 http://localhost:${PORT}`);
});
|
运行后访问 http://localhost:3000,你会看到价格每秒更新——总共不到 30 行核心代码。
生产级封装:带心跳和重连的 SSE 中间件
上面的例子能跑,但生产环境还需要心跳保活和断线续传。下面是一个 Express 中间件封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
// sse-middleware.js
class SSEConnection {
constructor(res, id) {
this.res = res;
this.id = id;
this.alive = true;
}
send(event, data) {
if (!this.alive) return;
if (event) this.res.write(`event: ${event}\n`);
this.res.write(`id: ${this.id}\n`);
this.res.write(`data: ${JSON.stringify(data)}\n\n`);
this.id++;
}
close() {
this.alive = false;
this.res.end();
}
}
// SSE 管理器
class SSEManager {
constructor(heartbeatInterval = 30000) {
this.clients = new Map();
this.nextId = 1;
this.heartbeatInterval = heartbeatInterval;
}
middleware() {
return (req, res, next) => {
if (req.url !== '/events') return next();
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'X-Accel-Buffering': 'no',
});
const clientId = this.nextId++;
const client = new SSEConnection(res, clientId);
this.clients.set(clientId, client);
// 发送心跳防止代理断开连接
const heartbeat = setInterval(() => {
client.res.write(': heartbeat\n\n');
}, this.heartbeatInterval);
console.log(`客户端 ${clientId} 已连接,当前在线: ${this.clients.size}`);
req.on('close', () => {
clearInterval(heartbeat);
client.close();
this.clients.delete(clientId);
console.log(`客户端 ${clientId} 断开,当前在线: ${this.clients.size}`);
});
};
}
// 广播消息给所有客户端
broadcast(event, data) {
for (const [, client] of this.clients) {
client.send(event, data);
}
}
// 发送给指定客户端
sendTo(clientId, event, data) {
const client = this.clients.get(clientId);
if (client) client.send(event, data);
}
get count() {
return this.clients.size;
}
}
export { SSEManager };
|
使用起来非常简洁:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// app.js
import express from 'express';
import { SSEManager } from './sse-middleware.js';
const app = express();
const sse = new SSEManager();
// 注册 SSE 中间件
app.use(sse.middleware());
// 普通 API 接口触发推送
app.post('/api/notify', express.json(), (req, res) => {
const { title, message } = req.body;
sse.broadcast('notification', { title, message, time: Date.now() });
res.json({ ok: true, online: sse.count });
});
app.listen(3000);
|
Python 实现:FastAPI + SSE
Python 生态中,FastAPI 配合 sse-starlette 可以非常优雅地实现 SSE:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
# main.py
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import time
app = FastAPI()
# 全局消息队列
message_queue: asyncio.Queue = asyncio.Queue()
async def event_generator():
"""持续产生事件的生成器"""
while True:
# 模拟推送数据
data = {
"timestamp": time.time(),
"value": round(100 + (time.time() % 10), 2)
}
yield {
"event": "price",
"data": json.dumps(data),
"id": str(int(time.time() * 1000))
}
await asyncio.sleep(1)
@app.get("/stream")
async def stream(request: Request):
return EventSourceResponse(
event_generator(),
ping=30, # 每30秒发送心跳
)
@app.post("/push")
async def push_notification(payload: dict):
"""通过普通 HTTP 请求触发推送"""
await message_queue.put(payload)
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=3000)
|
安装依赖只需一行:
1
|
pip install fastapi sse-starlette uvicorn
|
Go 实现:最高效的 SSE 服务端
Go 的 goroutine 模型天然适合管理大量 SSE 连接:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
// main.go
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
)
type Client struct {
id int
events chan string
}
type SSEHub struct {
mu sync.RWMutex
clients map[int]*Client
nextID int
}
func NewHub() *SSEHub {
return &SSEHub{
clients: make(map[int]*Client),
nextID: 1,
}
}
func (h *SSEHub) AddClient() *Client {
h.mu.Lock()
defer h.mu.Unlock()
c := &Client{id: h.nextID, events: make(chan string, 10)}
h.clients[c.id] = c
h.nextID++
log.Printf("客户端 %d 连接,当前在线: %d", c.id, len(h.clients))
return c
}
func (h *SSEHub) RemoveClient(c *Client) {
h.mu.Lock()
defer h.mu.Unlock()
delete(h.clients, c.id)
close(c.events)
log.Printf("客户端 %d 断开,当前在线: %d", c.id, len(h.clients))
}
func (h *SSEHub) Broadcast(data interface{}) {
h.mu.RLock()
defer h.mu.RUnlock()
jsonData, _ := json.Marshal(data)
msg := fmt.Sprintf("data: %s\n\n", jsonData)
for _, c := range h.clients {
select {
case c.events <- msg:
default:
// 客户端太慢,跳过
}
}
}
func sseHandler(hub *SSEHub) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
client := hub.AddClient()
defer hub.RemoveClient(client)
ctx := r.Context()
flusher := w.(http.Flusher)
for {
select {
case msg := <-client.events:
fmt.Fprint(w, msg)
flusher.Flush()
case <-ctx.Done():
return
}
}
}
}
func main() {
hub := NewHub()
go func() {
for {
hub.Broadcast(map[string]interface{}{
"price": fmt.Sprintf("%.2f", 100+time.Now().UnixNano()%1000/100.0),
"timestamp": time.Now().Unix(),
})
time.Sleep(time.Second)
}
}()
http.HandleFunc("/events", sseHandler(hub))
log.Println("SSE 服务启动在 :3000")
log.Fatal(http.ListenAndServe(":3000", nil))
}
|
SSE vs WebSocket:怎么选?
| 特性 |
SSE |
WebSocket |
| 通信方向 |
单向(服务端→客户端) |
双向 |
| 协议 |
HTTP/1.1 或 HTTP/2 |
独立协议 (ws://) |
| 自动重连 |
✅ 浏览器原生支持 |
❌ 需要手动实现 |
| 断线续传 |
✅ Last-Event-ID |
❌ 需要手动实现 |
| 数据格式 |
纯文本 |
文本或二进制 |
| 代理兼容 |
✅ 标准 HTTP |
⚠️ 部分代理不支持 |
| 浏览器支持 |
所有现代浏览器 |
所有现代浏览器 |
| 最大连接数 |
HTTP/1.1 有连接数限制 |
无特殊限制 |
选 SSE 的场景:
- 通知推送、消息提醒
- 实时数据看板(股价、监控面板)
- AI 流式输出(ChatGPT 类应用)
- 日志流、事件流
- 任何只需要服务端单向推送的场景
选 WebSocket 的场景:
- 聊天应用、协作编辑
- 多人实时游戏
- 需要客户端频繁发送数据的场景
- 需要二进制数据传输
Nginx 部署注意事项
如果用 Nginx 反向代理 SSE 服务,必须关闭缓冲,否则数据会被攒着一起发:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
server {
listen 80;
server_name api.example.com;
location /events {
proxy_pass http://127.0.0.1:3000;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off; # 关键:禁用响应缓冲
proxy_cache off;
chunked_transfer_encoding off;
proxy_read_timeout 86400s; # 长连接超时设为24小时
}
}
|
总结
SSE 不是一个新东西,它是 HTML5 规范的一部分,被浏览器原生支持了十多年。但在 WebSocket 的光环下,它一直被低估。
下次需要实现"服务端推送"时,先问自己一个问题:客户端需要发送数据吗? 如果答案是"不需要"或"很少需要",SSE 就是更简单、更可靠的选择。少即是多,技术选型也是如此。