WebSocket
ACE 的 WebSocket 支持基于 ace-websocket 模块,使用 @WsController 宏将一个类声明为 WebSocket 端点,框架自动处理 HTTP Upgrade 握手、帧收发循环,开发者只需实现生命周期回调。
依赖
在成员模块的 cjpm.toml 中引入 ace-websocket:
toml
[dependencies]
ace-websocket = { path = "../../ace-websocket" }快速上手
cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_websocket.*
@WsController["/ws/echo"]
public class EchoController {
@OnOpen
public func onOpen(session: WsSession): Unit {
session.send("connected")
}
@OnMessage
public func onMessage(message: String, session: WsSession): Unit {
session.send("echo: ${message}")
}
@OnClose
public func onClose(session: WsSession): Unit {
println("connection closed")
}
}访问 ws://host:8080/ws/echo 即可建立连接。
生命周期宏
| 宏 | 方法签名 | 触发时机 |
|---|---|---|
@OnOpen | func xxx(session: WsSession): Unit | 握手成功、连接建立后 |
@OnMessage | func xxx(message: String, session: WsSession): Unit | 收到文本帧 |
@OnClose | func xxx(session: WsSession): Unit | 连接关闭时 |
@OnError | func xxx(session: WsSession, error: Exception): Unit | 帧循环抛异常时 |
所有回调均为可选,未标注则使用空实现,只重写需要的即可。
参数顺序
@OnMessage 的方法签名是 message 在前,session 在后:
cangjie
func onMessage(message: String, session: WsSession): UnitWsSession API
WsSession 封装单个 WebSocket 连接:
| 方法 | 说明 |
|---|---|
send(text: String) | 发送文本帧 |
sendBytes(data: Array<UInt8>) | 发送二进制帧 |
close() | 主动关闭连接(发送 Close 帧) |
广播示例
框架不内置广播机制,推荐在 @WsController 类中维护线程安全的 Session 集合。由于 WsSession 无内置 ID,用对象引用直接管理:
cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_websocket.*
import std.sync.*
import std.collection.*
@WsController["/ws/broadcast"]
public class BroadcastController {
var sessions: ArrayList<WsSession> = ArrayList()
let mu: ReentrantMutex = ReentrantMutex()
@OnOpen
public func onOpen(session: WsSession): Unit {
synchronized(mu) {
sessions.append(session)
}
broadcast("新用户加入")
}
@OnMessage
public func onMessage(message: String, session: WsSession): Unit {
broadcast(message)
}
@OnClose
public func onClose(session: WsSession): Unit {
synchronized(mu) {
// 按引用相等移除
var i = 0
while (i < sessions.size) {
if (sessions[i] === session) {
sessions.remove(i)
break
}
i++
}
}
}
@OnError
public func onError(session: WsSession, error: Exception): Unit {
println("ws error: ${error}")
}
private func broadcast(msg: String): Unit {
let snapshot: Array<WsSession>
synchronized(mu) {
snapshot = sessions.toArray()
}
for (s in snapshot) {
try {
s.send(msg)
} catch (_: Exception) {
()
}
}
}
}@WsController 是单例
@WsController 标注的类被框架以 Singleton 方式实例化,因此 sessions 字段在所有连接间共享,加锁是必要的。
聊天室示例
cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_websocket.*
import std.sync.*
import std.collection.*
@WsController["/ws/chat"]
public class ChatRoomController {
var rooms: HashMap<String, ArrayList<WsSession>> = HashMap()
var names: HashMap<WsSession, String> = HashMap()
let mu: ReentrantMutex = ReentrantMutex()
@OnOpen
public func onOpen(session: WsSession): Unit {
let name = "guest-${rooms.size}"
synchronized(mu) {
names[session] = name
if (!rooms.contains("default")) {
rooms["default"] = ArrayList()
}
rooms["default"]!.append(session)
}
session.send("{\"type\":\"welcome\",\"name\":\"${name}\"}")
broadcastRoom("default", "{\"type\":\"join\",\"name\":\"${name}\"}", exclude: session)
}
@OnMessage
public func onMessage(message: String, session: WsSession): Unit {
let name: String
synchronized(mu) {
name = names[session] ?? "unknown"
}
broadcastRoom("default", "{\"type\":\"msg\",\"from\":\"${name}\",\"text\":\"${jsonEscape(message)}\"}")
}
@OnClose
public func onClose(session: WsSession): Unit {
let name: String
synchronized(mu) {
name = names[session] ?? "unknown"
names.remove(session)
let list = rooms["default"]
if (list != None) {
var i = 0
while (i < list!.size) {
if (list![i] === session) { list!.remove(i); break }
i++
}
}
}
broadcastRoom("default", "{\"type\":\"leave\",\"name\":\"${name}\"}")
}
private func broadcastRoom(room: String, msg: String, exclude: ?WsSession = None): Unit {
let list: Array<WsSession>
synchronized(mu) {
list = rooms[room]?.toArray() ?? []
}
for (s in list) {
if (exclude == None || !(s === exclude!)) {
try { s.send(msg) } catch (_: Exception) { () }
}
}
}
}客户端(浏览器):
javascript
const ws = new WebSocket("ws://localhost:8080/ws/chat")
ws.onmessage = e => console.log(JSON.parse(e.data))
ws.send("大家好!")与 HTTP 路由共存
WebSocket 端点与 HTTP 路由共用同一端口,框架在洋葱之前根据 Upgrade: websocket 请求头自动分流,WebSocket 请求不经过洋葱中间件(含认证中间件):
cangjie
main(): Int64 {
// @WsController 自动注册,无需手动 use
AceApplication.run()
return 0
}鉴权注意事项
WebSocket 升级请求在洋葱之前拦截,因此 JWT 中间件等洋葱鉴权对 WS 无效。如需鉴权,在 @OnOpen 内读取握手请求的 Query 参数携带的 token 并手动验证:
cangjie
@OnOpen
public func onOpen(session: WsSession): Unit {
// 握手阶段的 Query 参数暂不可直接从 session 读取;
// 建议在 WebSocket URL 中携带 token 并在连接后首帧验证
}反向代理
Nginx 需额外配置才能转发 Upgrade 头:
nginx
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";