Server-Sent Events (SSE)
SSE(Server-Sent Events)是基于 HTTP 的单向推流协议:服务器持续向客户端发送事件,客户端只读不写。对于实时日志、进度通知、行情推送等场景,SSE 比 WebSocket 更轻量,原生支持断线重连。
两种写法
方式一:返回 SseStream(推荐)
控制器方法直接返回 SseStream,框架自动设置 Content-Type: text/event-stream 并以 chunked 方式推送:
cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_web.*
@Controller
public class EventController {
@Get["/events"]
public func events(): SseStream {
return SseStream { sink =>
var i = 0
while (i < 10) {
sink.send("count: ${i}")
Thread.sleep(1000 * Duration.millisecond)
i++
}
}
}
}SseStream 的生产者闭包 (SseSink) -> Unit 在流结束(闭包返回)时自动关闭连接。
方式二:sse() 函数(手动控制)
当需要读取 ctx 中的请求参数时,使用 sse() 函数:
cangjie
@Get["/stream"]
public func stream(ctx: Context): Unit {
let topic = ctx.queryParam("topic") ?? "default"
sse(ctx) { sink =>
sink.sendEvent("subscribed", topic)
for (msg in messageQueue(topic)) {
sink.send(msg)
}
}
}SseSink API
| 方法 | 说明 |
|---|---|
send(data: String) | 发送匿名事件(data: ... 帧) |
sendEvent(event: String, data: String) | 发送具名事件(event: ... + data: ...) |
comment(text: String) | 发送注释行(: text),常用于保活心跳 |
raw(bytes: Array<UInt8>) | 直接写入原始字节(需自行保证 SSE 帧格式) |
帧格式
SSE 协议每条事件以空行结束:
data: hello world\n
\n
event: tick\n
data: {"count":1}\n
\n
: heartbeat\n
\ndata 含换行符时自动拆分为多行 data: 字段(ACE 已处理)。
具名事件
cangjie
SseStream { sink =>
sink.sendEvent("open", "{\"status\":\"connected\"}")
for (item in feed()) {
sink.sendEvent("item", item.toJson())
}
sink.sendEvent("close", "{\"status\":\"done\"}")
}客户端按事件名监听:
javascript
const es = new EventSource("/events")
es.addEventListener("open", e => console.log("connected:", e.data))
es.addEventListener("item", e => console.log("item:", JSON.parse(e.data)))
es.addEventListener("close", e => { console.log("done"); es.close() })保活心跳
长连接期间,中间代理(Nginx 等)可能因超时断开连接。定时发送注释帧保持活跃:
cangjie
SseStream { sink =>
var seq = 0
while (true) {
let msg = fetchLatestMessage()
match (msg) {
case Some(m) =>
sink.sendEvent("message", m)
seq++
case None =>
sink.comment("heartbeat") // 不触发客户端 onmessage
}
Thread.sleep(15000 * Duration.millisecond)
}
}实时进度推送示例
cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_web.*
@Controller
public class JobController {
@Inject
var jobService: JobService = ...
@Post["/jobs"]
public func submit(body: String): String {
let jobId = jobService.submit(body)
return "{\"jobId\":\"${jobId}\"}"
}
@Get["/jobs/:id/progress"]
public func progress(id: String): SseStream {
return SseStream { sink =>
sink.sendEvent("start", "{\"jobId\":\"${id}\"}")
var pct = 0
while (pct < 100) {
pct = jobService.getProgress(id)
sink.sendEvent("progress", "{\"pct\":${pct}}")
if (pct >= 100) { break }
Thread.sleep(500 * Duration.millisecond)
}
sink.sendEvent("done", "{\"jobId\":\"${id}\"}")
}
}
}客户端:
javascript
const es = new EventSource(`/jobs/${jobId}/progress`)
es.addEventListener("progress", e => updateProgressBar(JSON.parse(e.data).pct))
es.addEventListener("done", e => { es.close(); alert("完成!") })注意事项
与 WebSocket 的选择
| SSE | WebSocket | |
|---|---|---|
| 方向 | 服务器→客户端 | 双向 |
| 协议 | HTTP/1.1 | WS |
| 断线重连 | 浏览器自动重试 | 需手动处理 |
| 适合场景 | 通知、日志、进度 | 聊天、游戏、协作 |
gzip 压缩
SSE 是流式响应(StreamBody),gzip() 中间件不对流式响应压缩,无需额外配置。
反向代理
Nginx 需关闭缓冲,否则事件会被攒批后一次性发送:
nginx
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;