feat: collect model performance metrics (#4635)
This commit is contained in:
@@ -0,0 +1,261 @@
|
||||
package perfmetrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/QuantumNous/new-api/common"
|
||||
"github.com/QuantumNous/new-api/model"
|
||||
relaycommon "github.com/QuantumNous/new-api/relay/common"
|
||||
"github.com/QuantumNous/new-api/setting/perf_metrics_setting"
|
||||
)
|
||||
|
||||
var hotBuckets sync.Map
|
||||
|
||||
const seriesSchema = "dbcd0a3c01b55203"
|
||||
|
||||
func Init() {
|
||||
go flushLoop()
|
||||
}
|
||||
|
||||
func RecordRelaySample(info *relaycommon.RelayInfo, success bool) {
|
||||
if info == nil {
|
||||
return
|
||||
}
|
||||
now := time.Now()
|
||||
hasTtft := info.IsStream && info.HasSendResponse()
|
||||
ttftMs := int64(0)
|
||||
if hasTtft {
|
||||
ttftMs = info.FirstResponseTime.Sub(info.StartTime).Milliseconds()
|
||||
}
|
||||
Record(Sample{
|
||||
Model: info.OriginModelName,
|
||||
Group: info.UsingGroup,
|
||||
LatencyMs: now.Sub(info.StartTime).Milliseconds(),
|
||||
TtftMs: ttftMs,
|
||||
HasTtft: hasTtft,
|
||||
Success: success,
|
||||
})
|
||||
}
|
||||
|
||||
func Record(sample Sample) {
|
||||
setting := perf_metrics_setting.GetSetting()
|
||||
if !setting.Enabled || sample.Model == "" {
|
||||
return
|
||||
}
|
||||
if sample.Group == "" {
|
||||
sample.Group = "default"
|
||||
}
|
||||
if sample.LatencyMs < 0 {
|
||||
sample.LatencyMs = 0
|
||||
}
|
||||
|
||||
key := bucketKey{
|
||||
model: sample.Model,
|
||||
group: sample.Group,
|
||||
bucketTs: bucketStart(time.Now().Unix()),
|
||||
}
|
||||
actual, _ := hotBuckets.LoadOrStore(key, &atomicBucket{})
|
||||
actual.(*atomicBucket).add(sample)
|
||||
recordRedis(key, sample)
|
||||
}
|
||||
|
||||
func Query(params QueryParams) (QueryResult, error) {
|
||||
if params.Hours <= 0 {
|
||||
params.Hours = 24
|
||||
}
|
||||
if params.Hours > 24*30 {
|
||||
params.Hours = 24 * 30
|
||||
}
|
||||
endTs := time.Now().Unix()
|
||||
startTs := endTs - int64(params.Hours)*3600
|
||||
|
||||
merged := map[bucketKey]counters{}
|
||||
rows, err := model.GetPerfMetrics(params.Model, params.Group, startTs, endTs)
|
||||
if err != nil {
|
||||
return QueryResult{}, err
|
||||
}
|
||||
for _, row := range rows {
|
||||
mergeCounters(merged, bucketKey{
|
||||
model: row.ModelName,
|
||||
group: row.Group,
|
||||
bucketTs: row.BucketTs,
|
||||
}, counters{
|
||||
requestCount: row.RequestCount,
|
||||
successCount: row.SuccessCount,
|
||||
totalLatencyMs: row.TotalLatencyMs,
|
||||
ttftSumMs: row.TtftSumMs,
|
||||
ttftCount: row.TtftCount,
|
||||
})
|
||||
}
|
||||
|
||||
hotBuckets.Range(func(key, value any) bool {
|
||||
k := key.(bucketKey)
|
||||
if k.model != params.Model || k.bucketTs < startTs || k.bucketTs > endTs {
|
||||
return true
|
||||
}
|
||||
if params.Group != "" && k.group != params.Group {
|
||||
return true
|
||||
}
|
||||
mergeCounters(merged, k, value.(*atomicBucket).snapshot())
|
||||
return true
|
||||
})
|
||||
|
||||
return buildQueryResult(params.Model, merged), nil
|
||||
}
|
||||
|
||||
func bucketStart(ts int64) int64 {
|
||||
bucketSeconds := perf_metrics_setting.GetBucketSeconds()
|
||||
if bucketSeconds <= 0 {
|
||||
bucketSeconds = 3600
|
||||
}
|
||||
return ts - (ts % bucketSeconds)
|
||||
}
|
||||
|
||||
func mergeCounters(merged map[bucketKey]counters, key bucketKey, value counters) {
|
||||
if value.requestCount == 0 {
|
||||
return
|
||||
}
|
||||
current := merged[key]
|
||||
current.requestCount += value.requestCount
|
||||
current.successCount += value.successCount
|
||||
current.totalLatencyMs += value.totalLatencyMs
|
||||
current.ttftSumMs += value.ttftSumMs
|
||||
current.ttftCount += value.ttftCount
|
||||
merged[key] = current
|
||||
}
|
||||
|
||||
func buildQueryResult(modelName string, merged map[bucketKey]counters) QueryResult {
|
||||
groupBuckets := map[string]map[int64]counters{}
|
||||
for key, value := range merged {
|
||||
if value.requestCount == 0 {
|
||||
continue
|
||||
}
|
||||
if _, ok := groupBuckets[key.group]; !ok {
|
||||
groupBuckets[key.group] = map[int64]counters{}
|
||||
}
|
||||
groupBuckets[key.group][key.bucketTs] = value
|
||||
}
|
||||
|
||||
groups := make([]string, 0, len(groupBuckets))
|
||||
for group := range groupBuckets {
|
||||
groups = append(groups, group)
|
||||
}
|
||||
sort.Strings(groups)
|
||||
|
||||
results := make([]GroupResult, 0, len(groups))
|
||||
for _, group := range groups {
|
||||
buckets := groupBuckets[group]
|
||||
timestamps := make([]int64, 0, len(buckets))
|
||||
for ts := range buckets {
|
||||
timestamps = append(timestamps, ts)
|
||||
}
|
||||
sort.Slice(timestamps, func(i, j int) bool {
|
||||
return timestamps[i] < timestamps[j]
|
||||
})
|
||||
|
||||
total := counters{}
|
||||
series := make([]BucketPoint, 0, len(timestamps))
|
||||
for _, ts := range timestamps {
|
||||
value := buckets[ts]
|
||||
total.requestCount += value.requestCount
|
||||
total.successCount += value.successCount
|
||||
total.totalLatencyMs += value.totalLatencyMs
|
||||
total.ttftSumMs += value.ttftSumMs
|
||||
total.ttftCount += value.ttftCount
|
||||
series = append(series, bucketPoint(ts, value))
|
||||
}
|
||||
|
||||
results = append(results, GroupResult{
|
||||
Group: group,
|
||||
AvgTtftMs: avg(total.ttftSumMs, total.ttftCount),
|
||||
AvgLatencyMs: avg(total.totalLatencyMs, total.requestCount),
|
||||
SuccessRate: successRate(total),
|
||||
RequestCount: total.requestCount,
|
||||
SuccessCount: total.successCount,
|
||||
TtftCount: total.ttftCount,
|
||||
Series: series,
|
||||
})
|
||||
}
|
||||
|
||||
return QueryResult{
|
||||
ModelName: modelName,
|
||||
SeriesSchema: seriesSchema,
|
||||
Groups: results,
|
||||
}
|
||||
}
|
||||
|
||||
func bucketPoint(ts int64, value counters) BucketPoint {
|
||||
return BucketPoint{
|
||||
Ts: ts,
|
||||
AvgTtftMs: avg(value.ttftSumMs, value.ttftCount),
|
||||
AvgLatencyMs: avg(value.totalLatencyMs, value.requestCount),
|
||||
SuccessRate: successRate(value),
|
||||
Count: value.requestCount,
|
||||
SuccessCount: value.successCount,
|
||||
TtftCount: value.ttftCount,
|
||||
}
|
||||
}
|
||||
|
||||
func avg(sum int64, count int64) int64 {
|
||||
if count <= 0 {
|
||||
return 0
|
||||
}
|
||||
return sum / count
|
||||
}
|
||||
|
||||
func successRate(value counters) float64 {
|
||||
if value.requestCount <= 0 {
|
||||
return 0
|
||||
}
|
||||
return float64(value.successCount) / float64(value.requestCount) * 100
|
||||
}
|
||||
|
||||
func recordRedis(key bucketKey, sample Sample) {
|
||||
if !common.RedisEnabled || common.RDB == nil {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
|
||||
redisKey := redisBucketKey(key)
|
||||
pipe := common.RDB.TxPipeline()
|
||||
pipe.HIncrBy(ctx, redisKey, "req", 1)
|
||||
if sample.Success {
|
||||
pipe.HIncrBy(ctx, redisKey, "ok", 1)
|
||||
}
|
||||
if sample.LatencyMs > 0 {
|
||||
pipe.HIncrBy(ctx, redisKey, "lat", sample.LatencyMs)
|
||||
}
|
||||
if sample.HasTtft && sample.TtftMs >= 0 {
|
||||
pipe.HIncrBy(ctx, redisKey, "ttft", sample.TtftMs)
|
||||
pipe.HIncrBy(ctx, redisKey, "ttft_n", 1)
|
||||
}
|
||||
pipe.Expire(ctx, redisKey, time.Hour)
|
||||
_, _ = pipe.Exec(ctx)
|
||||
}
|
||||
|
||||
func mergeRedisActiveBuckets(merged map[bucketKey]counters, params QueryParams, startTs int64, endTs int64) {
|
||||
if !common.RedisEnabled || common.RDB == nil || params.Model == "" || params.Group == "" {
|
||||
return
|
||||
}
|
||||
active := bucketStart(time.Now().Unix())
|
||||
if active < startTs || active > endTs {
|
||||
return
|
||||
}
|
||||
key := bucketKey{model: params.Model, group: params.Group, bucketTs: active}
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
values, err := common.RDB.HGetAll(ctx, redisBucketKey(key)).Result()
|
||||
if err != nil || len(values) == 0 {
|
||||
return
|
||||
}
|
||||
mergeCounters(merged, key, redisCounters(values))
|
||||
}
|
||||
|
||||
func redisBucketKey(key bucketKey) string {
|
||||
return fmt.Sprintf("perf:%s:%s:%d", key.model, key.group, key.bucketTs)
|
||||
}
|
||||
Reference in New Issue
Block a user