feat(default): add real rankings data

This commit is contained in:
CaIon
2026-05-06 18:20:02 +08:00
parent 0f9f094a48
commit f8cf9c57c4
41 changed files with 1498 additions and 1912 deletions
+4
View File
@@ -47,6 +47,8 @@ func flushCompletedBuckets() {
TotalLatencyMs: drained.totalLatencyMs,
TtftSumMs: drained.ttftSumMs,
TtftCount: drained.ttftCount,
OutputTokens: drained.outputTokens,
GenerationMs: drained.generationMs,
})
if err != nil {
bucket.addCounters(drained)
@@ -82,6 +84,8 @@ func redisCounters(values map[string]string) counters {
totalLatencyMs: parseRedisInt(values["lat"]),
ttftSumMs: parseRedisInt(values["ttft"]),
ttftCount: parseRedisInt(values["ttft_n"]),
outputTokens: parseRedisInt(values["out"]),
generationMs: parseRedisInt(values["gen_ms"]),
}
}
+38 -13
View File
@@ -15,13 +15,15 @@ import (
var hotBuckets sync.Map
// seriesSchema is a stable client cache/schema marker. Do not change it when
// hiding fields or making response-only privacy hardening changes.
const seriesSchema = "dbcd0a3c01b55203"
func Init() {
go flushLoop()
}
func RecordRelaySample(info *relaycommon.RelayInfo, success bool) {
func RecordRelaySample(info *relaycommon.RelayInfo, success bool, outputTokens int64) {
if info == nil {
return
}
@@ -31,13 +33,23 @@ func RecordRelaySample(info *relaycommon.RelayInfo, success bool) {
if hasTtft {
ttftMs = info.FirstResponseTime.Sub(info.StartTime).Milliseconds()
}
latencyMs := now.Sub(info.StartTime).Milliseconds()
generationMs := latencyMs
if hasTtft {
generationMs = now.Sub(info.FirstResponseTime).Milliseconds()
}
if generationMs <= 0 {
generationMs = latencyMs
}
Record(Sample{
Model: info.OriginModelName,
Group: info.UsingGroup,
LatencyMs: now.Sub(info.StartTime).Milliseconds(),
TtftMs: ttftMs,
HasTtft: hasTtft,
Success: success,
Model: info.OriginModelName,
Group: info.UsingGroup,
LatencyMs: latencyMs,
TtftMs: ttftMs,
HasTtft: hasTtft,
Success: success,
OutputTokens: outputTokens,
GenerationMs: generationMs,
})
}
@@ -89,6 +101,8 @@ func Query(params QueryParams) (QueryResult, error) {
totalLatencyMs: row.TotalLatencyMs,
ttftSumMs: row.TtftSumMs,
ttftCount: row.TtftCount,
outputTokens: row.OutputTokens,
generationMs: row.GenerationMs,
})
}
@@ -125,6 +139,8 @@ func mergeCounters(merged map[bucketKey]counters, key bucketKey, value counters)
current.totalLatencyMs += value.totalLatencyMs
current.ttftSumMs += value.ttftSumMs
current.ttftCount += value.ttftCount
current.outputTokens += value.outputTokens
current.generationMs += value.generationMs
merged[key] = current
}
@@ -166,6 +182,8 @@ func buildQueryResult(modelName string, merged map[bucketKey]counters) QueryResu
total.totalLatencyMs += value.totalLatencyMs
total.ttftSumMs += value.ttftSumMs
total.ttftCount += value.ttftCount
total.outputTokens += value.outputTokens
total.generationMs += value.generationMs
series = append(series, bucketPoint(ts, value))
}
@@ -174,9 +192,7 @@ func buildQueryResult(modelName string, merged map[bucketKey]counters) QueryResu
AvgTtftMs: avg(total.ttftSumMs, total.ttftCount),
AvgLatencyMs: avg(total.totalLatencyMs, total.requestCount),
SuccessRate: successRate(total),
RequestCount: total.requestCount,
SuccessCount: total.successCount,
TtftCount: total.ttftCount,
AvgTps: avgTps(total),
Series: series,
})
}
@@ -194,9 +210,7 @@ func bucketPoint(ts int64, value counters) BucketPoint {
AvgTtftMs: avg(value.ttftSumMs, value.ttftCount),
AvgLatencyMs: avg(value.totalLatencyMs, value.requestCount),
SuccessRate: successRate(value),
Count: value.requestCount,
SuccessCount: value.successCount,
TtftCount: value.ttftCount,
AvgTps: avgTps(value),
}
}
@@ -214,6 +228,13 @@ func successRate(value counters) float64 {
return float64(value.successCount) / float64(value.requestCount) * 100
}
func avgTps(value counters) float64 {
if value.outputTokens <= 0 || value.generationMs <= 0 {
return 0
}
return float64(value.outputTokens) / (float64(value.generationMs) / 1000)
}
func recordRedis(key bucketKey, sample Sample) {
if !common.RedisEnabled || common.RDB == nil {
return
@@ -234,6 +255,10 @@ func recordRedis(key bucketKey, sample Sample) {
pipe.HIncrBy(ctx, redisKey, "ttft", sample.TtftMs)
pipe.HIncrBy(ctx, redisKey, "ttft_n", 1)
}
if sample.OutputTokens > 0 && sample.GenerationMs > 0 {
pipe.HIncrBy(ctx, redisKey, "out", sample.OutputTokens)
pipe.HIncrBy(ctx, redisKey, "gen_ms", sample.GenerationMs)
}
pipe.Expire(ctx, redisKey, time.Hour)
_, _ = pipe.Exec(ctx)
}
+28 -12
View File
@@ -8,12 +8,14 @@ type Store interface {
}
type Sample struct {
Model string
Group string
LatencyMs int64
TtftMs int64
HasTtft bool
Success bool
Model string
Group string
LatencyMs int64
TtftMs int64
HasTtft bool
Success bool
OutputTokens int64
GenerationMs int64
}
type QueryParams struct {
@@ -27,9 +29,7 @@ type BucketPoint struct {
AvgTtftMs int64 `json:"avg_ttft_ms"`
AvgLatencyMs int64 `json:"avg_latency_ms"`
SuccessRate float64 `json:"success_rate"`
Count int64 `json:"count"`
SuccessCount int64 `json:"success_count"`
TtftCount int64 `json:"ttft_count"`
AvgTps float64 `json:"avg_tps"`
}
type GroupResult struct {
@@ -37,9 +37,7 @@ type GroupResult struct {
AvgTtftMs int64 `json:"avg_ttft_ms"`
AvgLatencyMs int64 `json:"avg_latency_ms"`
SuccessRate float64 `json:"success_rate"`
RequestCount int64 `json:"request_count"`
SuccessCount int64 `json:"success_count"`
TtftCount int64 `json:"ttft_count"`
AvgTps float64 `json:"avg_tps"`
Series []BucketPoint `json:"series"`
}
@@ -61,6 +59,8 @@ type counters struct {
totalLatencyMs int64
ttftSumMs int64
ttftCount int64
outputTokens int64
generationMs int64
}
type atomicBucket struct {
@@ -69,6 +69,8 @@ type atomicBucket struct {
totalLatencyMs atomic.Int64
ttftSumMs atomic.Int64
ttftCount atomic.Int64
outputTokens atomic.Int64
generationMs atomic.Int64
}
func (b *atomicBucket) add(sample Sample) {
@@ -83,6 +85,10 @@ func (b *atomicBucket) add(sample Sample) {
b.ttftSumMs.Add(sample.TtftMs)
b.ttftCount.Add(1)
}
if sample.OutputTokens > 0 && sample.GenerationMs > 0 {
b.outputTokens.Add(sample.OutputTokens)
b.generationMs.Add(sample.GenerationMs)
}
}
func (b *atomicBucket) snapshot() counters {
@@ -92,6 +98,8 @@ func (b *atomicBucket) snapshot() counters {
totalLatencyMs: b.totalLatencyMs.Load(),
ttftSumMs: b.ttftSumMs.Load(),
ttftCount: b.ttftCount.Load(),
outputTokens: b.outputTokens.Load(),
generationMs: b.generationMs.Load(),
}
}
@@ -102,6 +110,8 @@ func (b *atomicBucket) drain() counters {
totalLatencyMs: b.totalLatencyMs.Swap(0),
ttftSumMs: b.ttftSumMs.Swap(0),
ttftCount: b.ttftCount.Swap(0),
outputTokens: b.outputTokens.Swap(0),
generationMs: b.generationMs.Swap(0),
}
}
@@ -121,4 +131,10 @@ func (b *atomicBucket) addCounters(c counters) {
if c.ttftCount != 0 {
b.ttftCount.Add(c.ttftCount)
}
if c.outputTokens != 0 {
b.outputTokens.Add(c.outputTokens)
}
if c.generationMs != 0 {
b.generationMs.Add(c.generationMs)
}
}