Contents

SSE 实战:用 Server-Sent Events 实现实时推送,比 WebSocket 更轻量的服务端推送方案

大多数开发者选错了实时通信方案

提到"服务端主动推送",绝大多数开发者第一反应是 WebSocket。但在实际项目中,80% 的实时场景根本不需要双向通信——股票行情、通知推送、日志流、AI 流式输出,这些场景数据都是从服务端单向流向客户端。

WebSocket 是一把瑞士军刀,但你只是想开个瓶盖时,一把开瓶器就够了。Server-Sent Events (SSE) 就是那把开瓶器——基于 HTTP 协议、原生支持断线重连、不需要额外协议升级,浏览器原生支持 EventSource API,写起来比 WebSocket 简单得多。

SSE 的核心原理

SSE 的本质就是一个不会关闭的 HTTP 响应。服务端通过 text/event-stream 内容类型持续推送数据,客户端通过浏览器内置的 EventSource API 接收。

数据格式非常简单,每条消息以 data: 开头,以空行结尾:

1
2
3
4
5
6
data: {"price": 102.5}

data: {"price": 103.1}

event: alert
data: {"message": "价格突破105"}

几个关键特性:

  • 自动重连:连接断开后,浏览器会自动重连,并通过 Last-Event-ID 头告知服务端从哪条消息继续
  • 基于 HTTP:不需要协议升级,可以穿透绝大多数代理和 CDN
  • 单向通信:服务端 → 客户端,如果需要客户端发消息,用普通 HTTP 请求即可

Node.js 实现:一个实时价格推送服务

先来一个最直观的例子——模拟股票价格实时推送:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// server.js
import http from 'node:http';

const PORT = 3000;

// 模拟股票价格波动
function generatePrice() {
  return (100 + Math.random() * 10).toFixed(2);
}

const server = http.createServer((req, res) => {
  if (req.url === '/events') {
    // 设置 SSE 响应头
    res.writeHead(200, {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
      'X-Accel-Buffering': 'no', // 禁用 Nginx 缓冲
    });

    // 发送初始连接确认
    res.write(`data: ${JSON.stringify({ connected: true })}\n\n`);

    // 每秒推送一次价格
    const intervalId = setInterval(() => {
      const price = generatePrice();
      res.write(`data: ${JSON.stringify({ price, ts: Date.now() })}\n\n`);
    }, 1000);

    // 客户端断开时清理
    req.on('close', () => {
      clearInterval(intervalId);
      console.log('客户端断开连接');
    });
  } else {
    res.writeHead(200, { 'Content-Type': 'text/html' });
    res.end(`
      <!DOCTYPE html>
      <html>
      <body>
        <h2>实时股价</h2>
        <div id="price">连接中...</div>
        <script>
          const source = new EventSource('/events');
          source.onmessage = (e) => {
            const data = JSON.parse(e.data);
            document.getElementById('price').textContent = 
              '¥' + data.price;
          };
          source.onerror = () => {
            document.getElementById('price').textContent = '重连中...';
          };
        </script>
      </body>
      </html>
    `);
  }
});

server.listen(PORT, () => {
  console.log(`SSE 服务运行在 http://localhost:${PORT}`);
});

运行后访问 http://localhost:3000,你会看到价格每秒更新——总共不到 30 行核心代码。

生产级封装:带心跳和重连的 SSE 中间件

上面的例子能跑,但生产环境还需要心跳保活和断线续传。下面是一个 Express 中间件封装:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// sse-middleware.js
class SSEConnection {
  constructor(res, id) {
    this.res = res;
    this.id = id;
    this.alive = true;
  }

  send(event, data) {
    if (!this.alive) return;
    if (event) this.res.write(`event: ${event}\n`);
    this.res.write(`id: ${this.id}\n`);
    this.res.write(`data: ${JSON.stringify(data)}\n\n`);
    this.id++;
  }

  close() {
    this.alive = false;
    this.res.end();
  }
}

// SSE 管理器
class SSEManager {
  constructor(heartbeatInterval = 30000) {
    this.clients = new Map();
    this.nextId = 1;
    this.heartbeatInterval = heartbeatInterval;
  }

  middleware() {
    return (req, res, next) => {
      if (req.url !== '/events') return next();

      res.writeHead(200, {
        'Content-Type': 'text/event-stream',
        'Cache-Control': 'no-cache',
        'Connection': 'keep-alive',
        'X-Accel-Buffering': 'no',
      });

      const clientId = this.nextId++;
      const client = new SSEConnection(res, clientId);
      this.clients.set(clientId, client);

      // 发送心跳防止代理断开连接
      const heartbeat = setInterval(() => {
        client.res.write(': heartbeat\n\n');
      }, this.heartbeatInterval);

      console.log(`客户端 ${clientId} 已连接,当前在线: ${this.clients.size}`);

      req.on('close', () => {
        clearInterval(heartbeat);
        client.close();
        this.clients.delete(clientId);
        console.log(`客户端 ${clientId} 断开,当前在线: ${this.clients.size}`);
      });
    };
  }

  // 广播消息给所有客户端
  broadcast(event, data) {
    for (const [, client] of this.clients) {
      client.send(event, data);
    }
  }

  // 发送给指定客户端
  sendTo(clientId, event, data) {
    const client = this.clients.get(clientId);
    if (client) client.send(event, data);
  }

  get count() {
    return this.clients.size;
  }
}

export { SSEManager };

使用起来非常简洁:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// app.js
import express from 'express';
import { SSEManager } from './sse-middleware.js';

const app = express();
const sse = new SSEManager();

// 注册 SSE 中间件
app.use(sse.middleware());

// 普通 API 接口触发推送
app.post('/api/notify', express.json(), (req, res) => {
  const { title, message } = req.body;
  sse.broadcast('notification', { title, message, time: Date.now() });
  res.json({ ok: true, online: sse.count });
});

app.listen(3000);

Python 实现:FastAPI + SSE

Python 生态中,FastAPI 配合 sse-starlette 可以非常优雅地实现 SSE:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# main.py
from fastapi import FastAPI, Request
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
import time

app = FastAPI()

# 全局消息队列
message_queue: asyncio.Queue = asyncio.Queue()


async def event_generator():
    """持续产生事件的生成器"""
    while True:
        # 模拟推送数据
        data = {
            "timestamp": time.time(),
            "value": round(100 + (time.time() % 10), 2)
        }
        yield {
            "event": "price",
            "data": json.dumps(data),
            "id": str(int(time.time() * 1000))
        }
        await asyncio.sleep(1)


@app.get("/stream")
async def stream(request: Request):
    return EventSourceResponse(
        event_generator(),
        ping=30,  # 每30秒发送心跳
    )


@app.post("/push")
async def push_notification(payload: dict):
    """通过普通 HTTP 请求触发推送"""
    await message_queue.put(payload)
    return {"status": "ok"}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=3000)

安装依赖只需一行:

1
pip install fastapi sse-starlette uvicorn

Go 实现:最高效的 SSE 服务端

Go 的 goroutine 模型天然适合管理大量 SSE 连接:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
// main.go
package main

import (
    "encoding/json"
    "fmt"
    "log"
    "net/http"
    "sync"
    "time"
)

type Client struct {
    id     int
    events chan string
}

type SSEHub struct {
    mu      sync.RWMutex
    clients map[int]*Client
    nextID  int
}

func NewHub() *SSEHub {
    return &SSEHub{
        clients: make(map[int]*Client),
        nextID:  1,
    }
}

func (h *SSEHub) AddClient() *Client {
    h.mu.Lock()
    defer h.mu.Unlock()
    c := &Client{id: h.nextID, events: make(chan string, 10)}
    h.clients[c.id] = c
    h.nextID++
    log.Printf("客户端 %d 连接,当前在线: %d", c.id, len(h.clients))
    return c
}

func (h *SSEHub) RemoveClient(c *Client) {
    h.mu.Lock()
    defer h.mu.Unlock()
    delete(h.clients, c.id)
    close(c.events)
    log.Printf("客户端 %d 断开,当前在线: %d", c.id, len(h.clients))
}

func (h *SSEHub) Broadcast(data interface{}) {
    h.mu.RLock()
    defer h.mu.RUnlock()
    jsonData, _ := json.Marshal(data)
    msg := fmt.Sprintf("data: %s\n\n", jsonData)
    for _, c := range h.clients {
        select {
        case c.events <- msg:
        default:
            // 客户端太慢,跳过
        }
    }
}

func sseHandler(hub *SSEHub) http.HandlerFunc {
    return func(w http.ResponseWriter, r *http.Request) {
        w.Header().Set("Content-Type", "text/event-stream")
        w.Header().Set("Cache-Control", "no-cache")
        w.Header().Set("Connection", "keep-alive")

        client := hub.AddClient()
        defer hub.RemoveClient(client)

        ctx := r.Context()
        flusher := w.(http.Flusher)

        for {
            select {
            case msg := <-client.events:
                fmt.Fprint(w, msg)
                flusher.Flush()
            case <-ctx.Done():
                return
            }
        }
    }
}

func main() {
    hub := NewHub()

    go func() {
        for {
            hub.Broadcast(map[string]interface{}{
                "price":    fmt.Sprintf("%.2f", 100+time.Now().UnixNano()%1000/100.0),
                "timestamp": time.Now().Unix(),
            })
            time.Sleep(time.Second)
        }
    }()

    http.HandleFunc("/events", sseHandler(hub))
    log.Println("SSE 服务启动在 :3000")
    log.Fatal(http.ListenAndServe(":3000", nil))
}

SSE vs WebSocket:怎么选?

特性 SSE WebSocket
通信方向 单向(服务端→客户端) 双向
协议 HTTP/1.1 或 HTTP/2 独立协议 (ws://)
自动重连 ✅ 浏览器原生支持 ❌ 需要手动实现
断线续传 ✅ Last-Event-ID ❌ 需要手动实现
数据格式 纯文本 文本或二进制
代理兼容 ✅ 标准 HTTP ⚠️ 部分代理不支持
浏览器支持 所有现代浏览器 所有现代浏览器
最大连接数 HTTP/1.1 有连接数限制 无特殊限制

选 SSE 的场景:

  • 通知推送、消息提醒
  • 实时数据看板(股价、监控面板)
  • AI 流式输出(ChatGPT 类应用)
  • 日志流、事件流
  • 任何只需要服务端单向推送的场景

选 WebSocket 的场景:

  • 聊天应用、协作编辑
  • 多人实时游戏
  • 需要客户端频繁发送数据的场景
  • 需要二进制数据传输

Nginx 部署注意事项

如果用 Nginx 反向代理 SSE 服务,必须关闭缓冲,否则数据会被攒着一起发:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
server {
    listen 80;
    server_name api.example.com;

    location /events {
        proxy_pass http://127.0.0.1:3000;
        proxy_http_version 1.1;
        proxy_set_header Connection '';
        proxy_buffering off;           # 关键:禁用响应缓冲
        proxy_cache off;
        chunked_transfer_encoding off;
        proxy_read_timeout 86400s;     # 长连接超时设为24小时
    }
}

总结

SSE 不是一个新东西,它是 HTML5 规范的一部分,被浏览器原生支持了十多年。但在 WebSocket 的光环下,它一直被低估。

下次需要实现"服务端推送"时,先问自己一个问题:客户端需要发送数据吗? 如果答案是"不需要"或"很少需要",SSE 就是更简单、更可靠的选择。少即是多,技术选型也是如此。