Feat: show stats
This commit is contained in:
51
bot.go
51
bot.go
@@ -46,6 +46,7 @@ var (
|
||||
ragger *rag.RAG
|
||||
chunkParser ChunkParser
|
||||
lastToolCall *models.FuncCall
|
||||
lastRespStats *models.ResponseStats
|
||||
//nolint:unused // TTS_ENABLED conditionally uses this
|
||||
orator Orator
|
||||
asr STT
|
||||
@@ -532,6 +533,19 @@ func extractDetailedErrorFromBytes(body []byte, statusCode int) string {
|
||||
return fmt.Sprintf("HTTP Status: %d, Response Body: %s", statusCode, string(body))
|
||||
}
|
||||
|
||||
func finalizeRespStats(tokenCount int, startTime time.Time) {
|
||||
duration := time.Since(startTime).Seconds()
|
||||
var tps float64
|
||||
if duration > 0 {
|
||||
tps = float64(tokenCount) / duration
|
||||
}
|
||||
lastRespStats = &models.ResponseStats{
|
||||
Tokens: tokenCount,
|
||||
Duration: duration,
|
||||
TokensPerSec: tps,
|
||||
}
|
||||
}
|
||||
|
||||
// sendMsgToLLM expects streaming resp
|
||||
func sendMsgToLLM(body io.Reader) {
|
||||
choseChunkParser()
|
||||
@@ -590,6 +604,8 @@ func sendMsgToLLM(body io.Reader) {
|
||||
defer resp.Body.Close()
|
||||
reader := bufio.NewReader(resp.Body)
|
||||
counter := uint32(0)
|
||||
tokenCount := 0
|
||||
startTime := time.Now()
|
||||
hasReasoning := false
|
||||
reasoningSent := false
|
||||
for {
|
||||
@@ -601,6 +617,7 @@ func sendMsgToLLM(body io.Reader) {
|
||||
// to stop from spiriling in infinity read of bad bytes that happens with poor connection
|
||||
if cfg.ChunkLimit > 0 && counter > cfg.ChunkLimit {
|
||||
logger.Warn("response hit chunk limit", "limit", cfg.ChunkLimit)
|
||||
finalizeRespStats(tokenCount, startTime)
|
||||
streamDone <- true
|
||||
break
|
||||
}
|
||||
@@ -624,6 +641,7 @@ func sendMsgToLLM(body io.Reader) {
|
||||
logger.Error("failed to notify", "error", err)
|
||||
}
|
||||
}
|
||||
finalizeRespStats(tokenCount, startTime)
|
||||
streamDone <- true
|
||||
break
|
||||
// }
|
||||
@@ -639,6 +657,7 @@ func sendMsgToLLM(body io.Reader) {
|
||||
line = line[6:]
|
||||
logger.Debug("debugging resp", "line", string(line))
|
||||
if bytes.Equal(line, []byte("[DONE]\n")) {
|
||||
finalizeRespStats(tokenCount, startTime)
|
||||
streamDone <- true
|
||||
break
|
||||
}
|
||||
@@ -652,6 +671,7 @@ func sendMsgToLLM(body io.Reader) {
|
||||
if err := notifyUser("LLM Response Error", "Failed to parse LLM response: "+err.Error()); err != nil {
|
||||
logger.Error("failed to notify user", "error", err)
|
||||
}
|
||||
finalizeRespStats(tokenCount, startTime)
|
||||
streamDone <- true
|
||||
break
|
||||
}
|
||||
@@ -667,12 +687,15 @@ func sendMsgToLLM(body io.Reader) {
|
||||
// Close the thinking block if we were streaming reasoning and haven't closed it yet
|
||||
if hasReasoning && !reasoningSent {
|
||||
chunkChan <- "</think>"
|
||||
tokenCount++
|
||||
}
|
||||
if chunk.Chunk != "" {
|
||||
logger.Warn("text inside of finish llmchunk", "chunk", chunk, "counter", counter)
|
||||
answerText = strings.ReplaceAll(chunk.Chunk, "\n\n", "\n")
|
||||
chunkChan <- answerText
|
||||
tokenCount++
|
||||
}
|
||||
finalizeRespStats(tokenCount, startTime)
|
||||
streamDone <- true
|
||||
break
|
||||
}
|
||||
@@ -684,12 +707,14 @@ func sendMsgToLLM(body io.Reader) {
|
||||
if !hasReasoning {
|
||||
// First reasoning chunk - send opening tag
|
||||
chunkChan <- "<think>"
|
||||
tokenCount++
|
||||
hasReasoning = true
|
||||
}
|
||||
// Stream reasoning content immediately
|
||||
answerText = strings.ReplaceAll(chunk.Reasoning, "\n\n", "\n")
|
||||
if answerText != "" {
|
||||
chunkChan <- answerText
|
||||
tokenCount++
|
||||
}
|
||||
}
|
||||
|
||||
@@ -697,6 +722,7 @@ func sendMsgToLLM(body io.Reader) {
|
||||
if chunk.Chunk != "" && hasReasoning && !reasoningSent {
|
||||
// Close the thinking block before sending actual content
|
||||
chunkChan <- "</think>"
|
||||
tokenCount++
|
||||
reasoningSent = true
|
||||
}
|
||||
|
||||
@@ -708,10 +734,12 @@ func sendMsgToLLM(body io.Reader) {
|
||||
if chunkParser.GetAPIType() == models.APITypeCompletion &&
|
||||
slices.Contains(stopStrings, answerText) {
|
||||
logger.Debug("stop string detected on client side for completion endpoint", "stop_string", answerText)
|
||||
finalizeRespStats(tokenCount, startTime)
|
||||
streamDone <- true
|
||||
}
|
||||
if answerText != "" {
|
||||
chunkChan <- answerText
|
||||
tokenCount++
|
||||
}
|
||||
openAIToolChan <- chunk.ToolChunk
|
||||
if chunk.FuncName != "" {
|
||||
@@ -723,6 +751,7 @@ func sendMsgToLLM(body io.Reader) {
|
||||
if interruptResp { // read bytes, so it would not get into beginning of the next req
|
||||
interruptResp = false
|
||||
logger.Info("interrupted bot response", "chunk_counter", counter)
|
||||
finalizeRespStats(tokenCount, startTime)
|
||||
streamDone <- true
|
||||
break
|
||||
}
|
||||
@@ -914,7 +943,6 @@ out:
|
||||
textView.ScrollToEnd()
|
||||
}
|
||||
case <-streamDone:
|
||||
// drain any remaining chunks from chunkChan before exiting
|
||||
for len(chunkChan) > 0 {
|
||||
chunk := <-chunkChan
|
||||
fmt.Fprint(textView, chunk)
|
||||
@@ -923,31 +951,40 @@ out:
|
||||
textView.ScrollToEnd()
|
||||
}
|
||||
if cfg.TTS_ENABLED {
|
||||
// Send chunk to audio stream handler
|
||||
TTSTextChan <- chunk
|
||||
}
|
||||
}
|
||||
if cfg.TTS_ENABLED {
|
||||
// msg is done; flush it down
|
||||
TTSFlushChan <- true
|
||||
}
|
||||
break out
|
||||
}
|
||||
}
|
||||
var msgStats *models.ResponseStats
|
||||
if lastRespStats != nil {
|
||||
msgStats = &models.ResponseStats{
|
||||
Tokens: lastRespStats.Tokens,
|
||||
Duration: lastRespStats.Duration,
|
||||
TokensPerSec: lastRespStats.TokensPerSec,
|
||||
}
|
||||
lastRespStats = nil
|
||||
}
|
||||
botRespMode = false
|
||||
// numbers in chatbody and displayed must be the same
|
||||
if r.Resume {
|
||||
chatBody.Messages[len(chatBody.Messages)-1].Content += respText.String()
|
||||
// lastM.Content = lastM.Content + respText.String()
|
||||
// Process the updated message to check for known_to tags in resumed response
|
||||
updatedMsg := chatBody.Messages[len(chatBody.Messages)-1]
|
||||
processedMsg := processMessageTag(&updatedMsg)
|
||||
chatBody.Messages[len(chatBody.Messages)-1] = *processedMsg
|
||||
if msgStats != nil && chatBody.Messages[len(chatBody.Messages)-1].Role != cfg.ToolRole {
|
||||
chatBody.Messages[len(chatBody.Messages)-1].Stats = msgStats
|
||||
}
|
||||
} else {
|
||||
// Message was already added at the start, just process it for known_to tags
|
||||
chatBody.Messages[msgIdx].Content = respText.String()
|
||||
processedMsg := processMessageTag(&chatBody.Messages[msgIdx])
|
||||
chatBody.Messages[msgIdx] = *processedMsg
|
||||
if msgStats != nil && chatBody.Messages[msgIdx].Role != cfg.ToolRole {
|
||||
chatBody.Messages[msgIdx].Stats = msgStats
|
||||
}
|
||||
stopTTSIfNotForUser(&chatBody.Messages[msgIdx])
|
||||
}
|
||||
cleanChatBody()
|
||||
|
||||
Reference in New Issue
Block a user