From 32805849d683fb3347650173932d4f1f4ac613b4 Mon Sep 17 00:00:00 2001 From: xujiantop-crypto Date: Fri, 5 Jun 2026 12:18:57 +0800 Subject: [PATCH] fix: reuse stream scanner buffer in channel handlers (#5225) --- relay/channel/cloudflare/relay_cloudflare.go | 2 +- relay/channel/cohere/relay-cohere.go | 6 ++++-- relay/channel/coze/relay-coze.go | 2 +- relay/channel/ollama/relay-ollama.go | 4 ++-- relay/channel/ollama/stream.go | 3 +-- relay/channel/tencent/relay-tencent.go | 2 +- relay/channel/zhipu/relay-zhipu.go | 5 ++++- relay/helper/stream_scanner.go | 9 +++++++-- relay/helper/stream_scanner_test.go | 17 +++++++++++++++++ 9 files changed, 38 insertions(+), 12 deletions(-) diff --git a/relay/channel/cloudflare/relay_cloudflare.go b/relay/channel/cloudflare/relay_cloudflare.go index a543c8fd..589ff126 100644 --- a/relay/channel/cloudflare/relay_cloudflare.go +++ b/relay/channel/cloudflare/relay_cloudflare.go @@ -30,7 +30,7 @@ func convertCf2CompletionsRequest(textRequest dto.GeneralOpenAIRequest) *CfReque } func cfStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*types.NewAPIError, *dto.Usage) { - scanner := bufio.NewScanner(resp.Body) + scanner := helper.NewStreamScanner(resp.Body) scanner.Split(bufio.ScanLines) helper.SetEventStreamHeaders(c) diff --git a/relay/channel/cohere/relay-cohere.go b/relay/channel/cohere/relay-cohere.go index c205e106..7b47789f 100644 --- a/relay/channel/cohere/relay-cohere.go +++ b/relay/channel/cohere/relay-cohere.go @@ -1,7 +1,6 @@ package cohere import ( - "bufio" "encoding/json" "io" "net/http" @@ -86,7 +85,7 @@ func cohereStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http createdTime := common.GetTimestamp() usage := &dto.Usage{} responseText := "" - scanner := bufio.NewScanner(resp.Body) + scanner := helper.NewStreamScanner(resp.Body) scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { if atEOF && len(data) == 0 { return 0, nil, nil @@ -106,6 +105,9 @@ func cohereStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http data := scanner.Text() dataChan <- data } + if err := scanner.Err(); err != nil { + common.SysLog("error reading stream: " + err.Error()) + } stopChan <- true }() helper.SetEventStreamHeaders(c) diff --git a/relay/channel/coze/relay-coze.go b/relay/channel/coze/relay-coze.go index 69ebd8a6..c2db5c60 100644 --- a/relay/channel/coze/relay-coze.go +++ b/relay/channel/coze/relay-coze.go @@ -98,7 +98,7 @@ func cozeChatHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Res } func cozeChatStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { - scanner := bufio.NewScanner(resp.Body) + scanner := helper.NewStreamScanner(resp.Body) scanner.Split(bufio.ScanLines) helper.SetEventStreamHeaders(c) id := helper.GetResponseID(c) diff --git a/relay/channel/ollama/relay-ollama.go b/relay/channel/ollama/relay-ollama.go index 975c244c..06e4d94c 100644 --- a/relay/channel/ollama/relay-ollama.go +++ b/relay/channel/ollama/relay-ollama.go @@ -1,7 +1,6 @@ package ollama import ( - "bufio" "encoding/json" "fmt" "io" @@ -12,6 +11,7 @@ import ( "github.com/QuantumNous/new-api/common" "github.com/QuantumNous/new-api/dto" relaycommon "github.com/QuantumNous/new-api/relay/common" + "github.com/QuantumNous/new-api/relay/helper" "github.com/QuantumNous/new-api/service" "github.com/QuantumNous/new-api/types" @@ -397,7 +397,7 @@ func PullOllamaModelStream(baseURL, apiKey, modelName string, progressCallback f } // 读取流式响应 - scanner := bufio.NewScanner(response.Body) + scanner := helper.NewStreamScanner(response.Body) successful := false for scanner.Scan() { line := scanner.Text() diff --git a/relay/channel/ollama/stream.go b/relay/channel/ollama/stream.go index 43e024de..ee3a8034 100644 --- a/relay/channel/ollama/stream.go +++ b/relay/channel/ollama/stream.go @@ -1,7 +1,6 @@ package ollama import ( - "bufio" "encoding/json" "fmt" "io" @@ -70,7 +69,7 @@ func ollamaStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http defer service.CloseResponseBodyGracefully(resp) helper.SetEventStreamHeaders(c) - scanner := bufio.NewScanner(resp.Body) + scanner := helper.NewStreamScanner(resp.Body) usage := &dto.Usage{} var model = info.UpstreamModelName var responseId = common.GetUUID() diff --git a/relay/channel/tencent/relay-tencent.go b/relay/channel/tencent/relay-tencent.go index 0343f578..4cda7541 100644 --- a/relay/channel/tencent/relay-tencent.go +++ b/relay/channel/tencent/relay-tencent.go @@ -92,7 +92,7 @@ func streamResponseTencent2OpenAI(TencentResponse *TencentChatResponse) *dto.Cha func tencentStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { var responseText string - scanner := bufio.NewScanner(resp.Body) + scanner := helper.NewStreamScanner(resp.Body) scanner.Split(bufio.ScanLines) helper.SetEventStreamHeaders(c) diff --git a/relay/channel/zhipu/relay-zhipu.go b/relay/channel/zhipu/relay-zhipu.go index c3c96a05..6754c02c 100644 --- a/relay/channel/zhipu/relay-zhipu.go +++ b/relay/channel/zhipu/relay-zhipu.go @@ -157,7 +157,7 @@ func streamMetaResponseZhipu2OpenAI(zhipuResponse *ZhipuStreamMetaResponse) (*dt func zhipuStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http.Response) (*dto.Usage, *types.NewAPIError) { var usage *dto.Usage - scanner := bufio.NewScanner(resp.Body) + scanner := helper.NewStreamScanner(resp.Body) scanner.Split(bufio.ScanLines) dataChan := make(chan string) metaChan := make(chan string) @@ -180,6 +180,9 @@ func zhipuStreamHandler(c *gin.Context, info *relaycommon.RelayInfo, resp *http. } } } + if err := scanner.Err(); err != nil { + common.SysLog("error reading stream: " + err.Error()) + } stopChan <- true }() helper.SetEventStreamHeaders(c) diff --git a/relay/helper/stream_scanner.go b/relay/helper/stream_scanner.go index 1d44b804..dbc7c8c4 100644 --- a/relay/helper/stream_scanner.go +++ b/relay/helper/stream_scanner.go @@ -34,6 +34,12 @@ func getScannerBufferSize() int { return DefaultMaxScannerBufferSize } +func NewStreamScanner(reader io.Reader) *bufio.Scanner { + scanner := bufio.NewScanner(reader) + scanner.Buffer(make([]byte, InitialScannerBufferSize), getScannerBufferSize()) + return scanner +} + func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon.RelayInfo, dataHandler func(data string, sr *StreamResult)) { if resp == nil || dataHandler == nil { @@ -54,7 +60,7 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon var ( stopChan = make(chan bool, 3) // 增加缓冲区避免阻塞 - scanner = bufio.NewScanner(resp.Body) + scanner = NewStreamScanner(resp.Body) ticker = time.NewTicker(streamingTimeout) pingTicker *time.Ticker writeMutex sync.Mutex // Mutex to protect concurrent writes @@ -104,7 +110,6 @@ func StreamScannerHandler(c *gin.Context, resp *http.Response, info *relaycommon close(stopChan) }() - scanner.Buffer(make([]byte, InitialScannerBufferSize), getScannerBufferSize()) scanner.Split(bufio.ScanLines) SetEventStreamHeaders(c) diff --git a/relay/helper/stream_scanner_test.go b/relay/helper/stream_scanner_test.go index 9d6f3bb4..d1577266 100644 --- a/relay/helper/stream_scanner_test.go +++ b/relay/helper/stream_scanner_test.go @@ -1,6 +1,7 @@ package helper import ( + "bufio" "fmt" "io" "net/http" @@ -81,6 +82,22 @@ func TestStreamScannerHandler_NilInputs(t *testing.T) { StreamScannerHandler(c, &http.Response{Body: io.NopCloser(strings.NewReader(""))}, info, nil) } +func TestNewStreamScanner_AllowsLargeStreamLine(t *testing.T) { + oldBufferMB := constant.StreamScannerMaxBufferMB + constant.StreamScannerMaxBufferMB = 1 + t.Cleanup(func() { + constant.StreamScannerMaxBufferMB = oldBufferMB + }) + + payload := strings.Repeat("x", 128<<10) + scanner := NewStreamScanner(strings.NewReader("data: " + payload + "\n")) + scanner.Split(bufio.ScanLines) + + require.True(t, scanner.Scan()) + assert.Equal(t, "data: "+payload, scanner.Text()) + require.NoError(t, scanner.Err()) +} + func TestStreamScannerHandler_EmptyBody(t *testing.T) { t.Parallel()