Skip to content

Server-Sent Events (SSE)

SSE(Server-Sent Events)是基于 HTTP 的单向推流协议:服务器持续向客户端发送事件,客户端只读不写。对于实时日志、进度通知、行情推送等场景,SSE 比 WebSocket 更轻量,原生支持断线重连。

两种写法

方式一:返回 SseStream(推荐)

控制器方法直接返回 SseStream,框架自动设置 Content-Type: text/event-stream 并以 chunked 方式推送:

cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_web.*

@Controller
public class EventController {
    @Get["/events"]
    public func events(): SseStream {
        return SseStream { sink =>
            var i = 0
            while (i < 10) {
                sink.send("count: ${i}")
                Thread.sleep(1000 * Duration.millisecond)
                i++
            }
        }
    }
}

SseStream 的生产者闭包 (SseSink) -> Unit 在流结束(闭包返回)时自动关闭连接。

方式二:sse() 函数(手动控制)

当需要读取 ctx 中的请求参数时,使用 sse() 函数:

cangjie
@Get["/stream"]
public func stream(ctx: Context): Unit {
    let topic = ctx.queryParam("topic") ?? "default"
    sse(ctx) { sink =>
        sink.sendEvent("subscribed", topic)
        for (msg in messageQueue(topic)) {
            sink.send(msg)
        }
    }
}

SseSink API

方法说明
send(data: String)发送匿名事件(data: ... 帧)
sendEvent(event: String, data: String)发送具名事件(event: ... + data: ...
comment(text: String)发送注释行(: text),常用于保活心跳
raw(bytes: Array<UInt8>)直接写入原始字节(需自行保证 SSE 帧格式)

帧格式

SSE 协议每条事件以空行结束:

data: hello world\n
\n
event: tick\n
data: {"count":1}\n
\n
: heartbeat\n
\n

data 含换行符时自动拆分为多行 data: 字段(ACE 已处理)。

具名事件

cangjie
SseStream { sink =>
    sink.sendEvent("open", "{\"status\":\"connected\"}")

    for (item in feed()) {
        sink.sendEvent("item", item.toJson())
    }

    sink.sendEvent("close", "{\"status\":\"done\"}")
}

客户端按事件名监听:

javascript
const es = new EventSource("/events")

es.addEventListener("open",  e => console.log("connected:", e.data))
es.addEventListener("item",  e => console.log("item:", JSON.parse(e.data)))
es.addEventListener("close", e => { console.log("done"); es.close() })

保活心跳

长连接期间,中间代理(Nginx 等)可能因超时断开连接。定时发送注释帧保持活跃:

cangjie
SseStream { sink =>
    var seq = 0
    while (true) {
        let msg = fetchLatestMessage()
        match (msg) {
            case Some(m) =>
                sink.sendEvent("message", m)
                seq++
            case None =>
                sink.comment("heartbeat")  // 不触发客户端 onmessage
        }
        Thread.sleep(15000 * Duration.millisecond)
    }
}

实时进度推送示例

cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_web.*

@Controller
public class JobController {
    @Inject
    var jobService: JobService = ...

    @Post["/jobs"]
    public func submit(body: String): String {
        let jobId = jobService.submit(body)
        return "{\"jobId\":\"${jobId}\"}"
    }

    @Get["/jobs/:id/progress"]
    public func progress(id: String): SseStream {
        return SseStream { sink =>
            sink.sendEvent("start", "{\"jobId\":\"${id}\"}")
            var pct = 0
            while (pct < 100) {
                pct = jobService.getProgress(id)
                sink.sendEvent("progress", "{\"pct\":${pct}}")
                if (pct >= 100) { break }
                Thread.sleep(500 * Duration.millisecond)
            }
            sink.sendEvent("done", "{\"jobId\":\"${id}\"}")
        }
    }
}

客户端:

javascript
const es = new EventSource(`/jobs/${jobId}/progress`)
es.addEventListener("progress", e => updateProgressBar(JSON.parse(e.data).pct))
es.addEventListener("done",     e => { es.close(); alert("完成!") })

注意事项

与 WebSocket 的选择

SSEWebSocket
方向服务器→客户端双向
协议HTTP/1.1WS
断线重连浏览器自动重试需手动处理
适合场景通知、日志、进度聊天、游戏、协作

gzip 压缩

SSE 是流式响应(StreamBody),gzip() 中间件不对流式响应压缩,无需额外配置。

反向代理

Nginx 需关闭缓冲,否则事件会被攒批后一次性发送:

nginx
proxy_buffering off;
proxy_cache off;
proxy_set_header Connection '';
proxy_http_version 1.1;
chunked_transfer_encoding on;

基于 Apache-2.0 许可证发布