Skip to content

WebSocket

ACE 的 WebSocket 支持基于 ace-websocket 模块,使用 @WsController 宏将一个类声明为 WebSocket 端点,框架自动处理 HTTP Upgrade 握手、帧收发循环,开发者只需实现生命周期回调。

依赖

在成员模块的 cjpm.toml 中引入 ace-websocket

toml
[dependencies]
ace-websocket = { path = "../../ace-websocket" }

快速上手

cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_websocket.*

@WsController["/ws/echo"]
public class EchoController {
    @OnOpen
    public func onOpen(session: WsSession): Unit {
        session.send("connected")
    }

    @OnMessage
    public func onMessage(message: String, session: WsSession): Unit {
        session.send("echo: ${message}")
    }

    @OnClose
    public func onClose(session: WsSession): Unit {
        println("connection closed")
    }
}

访问 ws://host:8080/ws/echo 即可建立连接。

生命周期宏

方法签名触发时机
@OnOpenfunc xxx(session: WsSession): Unit握手成功、连接建立后
@OnMessagefunc xxx(message: String, session: WsSession): Unit收到文本帧
@OnClosefunc xxx(session: WsSession): Unit连接关闭时
@OnErrorfunc xxx(session: WsSession, error: Exception): Unit帧循环抛异常时

所有回调均为可选,未标注则使用空实现,只重写需要的即可。

参数顺序

@OnMessage 的方法签名是 message 在前,session 在后

cangjie
func onMessage(message: String, session: WsSession): Unit

WsSession API

WsSession 封装单个 WebSocket 连接:

方法说明
send(text: String)发送文本帧
sendBytes(data: Array<UInt8>)发送二进制帧
close()主动关闭连接(发送 Close 帧)

广播示例

框架不内置广播机制,推荐在 @WsController 类中维护线程安全的 Session 集合。由于 WsSession 无内置 ID,用对象引用直接管理:

cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_websocket.*
import std.sync.*
import std.collection.*

@WsController["/ws/broadcast"]
public class BroadcastController {
    var sessions: ArrayList<WsSession> = ArrayList()
    let mu: ReentrantMutex = ReentrantMutex()

    @OnOpen
    public func onOpen(session: WsSession): Unit {
        synchronized(mu) {
            sessions.append(session)
        }
        broadcast("新用户加入")
    }

    @OnMessage
    public func onMessage(message: String, session: WsSession): Unit {
        broadcast(message)
    }

    @OnClose
    public func onClose(session: WsSession): Unit {
        synchronized(mu) {
            // 按引用相等移除
            var i = 0
            while (i < sessions.size) {
                if (sessions[i] === session) {
                    sessions.remove(i)
                    break
                }
                i++
            }
        }
    }

    @OnError
    public func onError(session: WsSession, error: Exception): Unit {
        println("ws error: ${error}")
    }

    private func broadcast(msg: String): Unit {
        let snapshot: Array<WsSession>
        synchronized(mu) {
            snapshot = sessions.toArray()
        }
        for (s in snapshot) {
            try {
                s.send(msg)
            } catch (_: Exception) {
                ()
            }
        }
    }
}

@WsController 是单例

@WsController 标注的类被框架以 Singleton 方式实例化,因此 sessions 字段在所有连接间共享,加锁是必要的。

聊天室示例

cangjie
import ace_framework.*
import ace_framework_macros.*
import ace_websocket.*
import std.sync.*
import std.collection.*

@WsController["/ws/chat"]
public class ChatRoomController {
    var rooms: HashMap<String, ArrayList<WsSession>> = HashMap()
    var names: HashMap<WsSession, String> = HashMap()
    let mu: ReentrantMutex = ReentrantMutex()

    @OnOpen
    public func onOpen(session: WsSession): Unit {
        let name = "guest-${rooms.size}"
        synchronized(mu) {
            names[session] = name
            if (!rooms.contains("default")) {
                rooms["default"] = ArrayList()
            }
            rooms["default"]!.append(session)
        }
        session.send("{\"type\":\"welcome\",\"name\":\"${name}\"}")
        broadcastRoom("default", "{\"type\":\"join\",\"name\":\"${name}\"}", exclude: session)
    }

    @OnMessage
    public func onMessage(message: String, session: WsSession): Unit {
        let name: String
        synchronized(mu) {
            name = names[session] ?? "unknown"
        }
        broadcastRoom("default", "{\"type\":\"msg\",\"from\":\"${name}\",\"text\":\"${jsonEscape(message)}\"}")
    }

    @OnClose
    public func onClose(session: WsSession): Unit {
        let name: String
        synchronized(mu) {
            name = names[session] ?? "unknown"
            names.remove(session)
            let list = rooms["default"]
            if (list != None) {
                var i = 0
                while (i < list!.size) {
                    if (list![i] === session) { list!.remove(i); break }
                    i++
                }
            }
        }
        broadcastRoom("default", "{\"type\":\"leave\",\"name\":\"${name}\"}")
    }

    private func broadcastRoom(room: String, msg: String, exclude: ?WsSession = None): Unit {
        let list: Array<WsSession>
        synchronized(mu) {
            list = rooms[room]?.toArray() ?? []
        }
        for (s in list) {
            if (exclude == None || !(s === exclude!)) {
                try { s.send(msg) } catch (_: Exception) { () }
            }
        }
    }
}

客户端(浏览器):

javascript
const ws = new WebSocket("ws://localhost:8080/ws/chat")
ws.onmessage = e => console.log(JSON.parse(e.data))
ws.send("大家好!")

与 HTTP 路由共存

WebSocket 端点与 HTTP 路由共用同一端口,框架在洋葱之前根据 Upgrade: websocket 请求头自动分流,WebSocket 请求不经过洋葱中间件(含认证中间件):

cangjie
main(): Int64 {
    // @WsController 自动注册,无需手动 use
    AceApplication.run()
    return 0
}

鉴权注意事项

WebSocket 升级请求在洋葱之前拦截,因此 JWT 中间件等洋葱鉴权对 WS 无效。如需鉴权,在 @OnOpen 内读取握手请求的 Query 参数携带的 token 并手动验证:

cangjie
@OnOpen
public func onOpen(session: WsSession): Unit {
    // 握手阶段的 Query 参数暂不可直接从 session 读取;
    // 建议在 WebSocket URL 中携带 token 并在连接后首帧验证
}

反向代理

Nginx 需额外配置才能转发 Upgrade 头:

nginx
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";

基于 Apache-2.0 许可证发布