Skip to content

扩展点:ApplicationRunner 与 HealthIndicator

Component 外,ACE 还提供两个轻量扩展点,满足"启动后执行一次"和"健康检查"的常见需求。

ApplicationRunner

ApplicationRunner 接口对标 Spring 的 ApplicationRunner:服务在所有组件 onReady() 之后、开始监听端口之前,按 order 升序依次调用所有实现类的 run() 方法。

cangjie
public interface ApplicationRunner {
    func order(): Int64 { 0 }   // 执行顺序,升序
    func run(): Unit
}

使用方式

@Service 类上实现 ApplicationRunner,框架自动发现并调用:

cangjie
import ace_framework.*
import ace_framework_runtime.*

@Service
public class DatabaseMigrationRunner <: ApplicationRunner {
    public func order(): Int64 { 10 }  // 比默认 0 晚,确保连接池已就绪

    public func run(): Unit {
        println("[Startup] 执行数据库迁移...")
        MigrationManager.runPending()
        println("[Startup] 迁移完成")
    }
}
cangjie
@Service
public class CacheWarmupRunner <: ApplicationRunner {
    @Inject
    var cacheService: CacheService = ...

    public func order(): Int64 { 20 }  // 在迁移之后执行

    public func run(): Unit {
        cacheService.warmup()
    }
}

执行顺序

多个 ApplicationRunnerorder 升序串行执行(非并发):

order=0  → DefaultRunner(若存在)
order=10 → DatabaseMigrationRunner
order=20 → CacheWarmupRunner

端口开始监听

典型用途

场景说明
数据库迁移启动前检查并执行 pending migration
缓存预热提前加载热点数据,避免冷启动时的缓存穿透
外部服务探活验证第三方 API 可达,启动失败快速报错
定时任务初始化注册需要动态配置的任务
配置校验启动时校验必填配置,而非等到第一个请求才发现

TIP

run() 若抛出异常,异常会向上冒泡并导致进程启动失败——这正是"fail-fast"的预期行为。若某个 runner 失败不应中断启动,在 run() 内部自行 try/catch

HealthIndicator

HealthIndicator/health/ready(K8s readiness probe)提供自定义健康检查项。框架聚合所有已注册 HealthIndicator 的状态,任意一个 Down 则端点返回 503

cangjie
public interface HealthIndicator {
    func name(): String
    func check(): HealthStatus
}

HealthStatus 可携带 UpDownUnknown 三种状态及可选的 detail 说明:

cangjie
HealthStatus(Up)
HealthStatus(Down, "connection refused: redis:6379")
HealthStatus(Unknown, "pending initial check")

使用方式

cangjie
import ace_framework_runtime.*

public class RedisHealthIndicator <: HealthIndicator {
    let client: RedisClient

    public init(client: RedisClient) {
        this.client = client
    }

    public func name(): String { "redis" }

    public func check(): HealthStatus {
        try {
            client.ping()
            return HealthStatus(Up)
        } catch (e: Exception) {
            return HealthStatus(Down, e.message)
        }
    }
}

通过 registerHealthIndicator 注册(通常在 Component.setup 中):

cangjie
public func setup(ctx: ComponentContext): Unit {
    let client = RedisClient(host, port)
    ctx.provide("RedisClient", {=> client})
    registerHealthIndicator(RedisHealthIndicator(client))
}

响应格式

GET /health/ready 聚合所有指标:

json
// 全部 UP → 200
{
  "status": "UP",
  "components": {
    "redis":    { "status": "UP" },
    "database": { "status": "UP" }
  }
}

// 任意 DOWN → 503
{
  "status": "DOWN",
  "components": {
    "redis":    { "status": "DOWN", "detail": "connection refused" },
    "database": { "status": "UP" }
  }
}

GET /health/live(liveness probe)不聚合 HealthIndicator,只要进程存活即返回 200 {"status":"UP"}

内置端点一览

路径说明
/health简单存活检查,向后兼容
/health/liveK8s liveness probe(进程存活)
/health/readyK8s readiness probe(聚合 HealthIndicator)

check() 的性能

check() 每次 readiness probe 请求都会调用,不要在其中执行高延迟操作。建议:

  • 简单 ping(< 50ms)直接执行
  • 复杂检查用后台协程定期刷新,check() 只返回缓存状态

完整示例:带健康检查的数据库组件

cangjie
package myapp.components

import ace_framework_runtime.*

@Component
public class DatabaseComponent {
    var pool_: ?ConnectionPool = None

    public func name(): String { "database" }

    public func defaults(): Array<(String, String)> {
        [
            ("db.url",      "sqlite://app.db"),
            ("db.poolSize", "10")
        ]
    }

    public func setup(ctx: ComponentContext): Unit {
        let cfg  = ctx.namespace("db")
        let url  = cfg.getOr("url", "sqlite://app.db")
        let size = cfg.getInt("poolSize", 10)
        let pool = ConnectionPool(url, size)
        pool_ = Some(pool)
        ctx.provide("ConnectionPool", {=> pool})
        registerHealthIndicator(DatabaseHealthIndicator(pool))
    }

    public func onStop(): Unit {
        match (pool_) {
            case Some(p) => p.close()
            case None => ()
        }
    }
}

public class DatabaseHealthIndicator <: HealthIndicator {
    let pool: ConnectionPool

    public init(pool: ConnectionPool) {
        this.pool = pool
    }

    public func name(): String { "database" }

    public func check(): HealthStatus {
        try {
            pool.ping()
            return HealthStatus(Up)
        } catch (e: Exception) {
            return HealthStatus(Down, e.message)
        }
    }
}

// 数据迁移 Runner(在 database 组件就绪后执行)
@Service
public class MigrationRunner <: ApplicationRunner {
    public func order(): Int64 { 5 }

    public func run(): Unit {
        let pool = (resolveBean("ConnectionPool") as ConnectionPool).getOrThrow()
        Migrator(pool).runPending()
    }
}

基于 Apache-2.0 许可证发布