Enha: defer finalizeRespStats
This commit is contained in:
28
bot.go
28
bot.go
@@ -485,30 +485,28 @@ func monitorModelLoad(modelID string) {
|
|||||||
// extractDetailedErrorFromBytes extracts detailed error information from response body bytes
|
// extractDetailedErrorFromBytes extracts detailed error information from response body bytes
|
||||||
func extractDetailedErrorFromBytes(body []byte, statusCode int) string {
|
func extractDetailedErrorFromBytes(body []byte, statusCode int) string {
|
||||||
// Try to parse as JSON to extract detailed error information
|
// Try to parse as JSON to extract detailed error information
|
||||||
var errorResponse map[string]interface{}
|
var errorResponse map[string]any
|
||||||
if err := json.Unmarshal(body, &errorResponse); err == nil {
|
if err := json.Unmarshal(body, &errorResponse); err == nil {
|
||||||
// Check if it's an error response with detailed information
|
// Check if it's an error response with detailed information
|
||||||
if errorData, ok := errorResponse["error"]; ok {
|
if errorData, ok := errorResponse["error"]; ok {
|
||||||
if errorMap, ok := errorData.(map[string]interface{}); ok {
|
if errorMap, ok := errorData.(map[string]any); ok {
|
||||||
var errorMsg string
|
var errorMsg string
|
||||||
if msg, ok := errorMap["message"]; ok {
|
if msg, ok := errorMap["message"]; ok {
|
||||||
errorMsg = fmt.Sprintf("%v", msg)
|
errorMsg = fmt.Sprintf("%v", msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
var details []string
|
var details []string
|
||||||
if code, ok := errorMap["code"]; ok {
|
if code, ok := errorMap["code"]; ok {
|
||||||
details = append(details, fmt.Sprintf("Code: %v", code))
|
details = append(details, fmt.Sprintf("Code: %v", code))
|
||||||
}
|
}
|
||||||
|
|
||||||
if metadata, ok := errorMap["metadata"]; ok {
|
if metadata, ok := errorMap["metadata"]; ok {
|
||||||
// Handle metadata which might contain raw error details
|
// Handle metadata which might contain raw error details
|
||||||
if metadataMap, ok := metadata.(map[string]interface{}); ok {
|
if metadataMap, ok := metadata.(map[string]any); ok {
|
||||||
if raw, ok := metadataMap["raw"]; ok {
|
if raw, ok := metadataMap["raw"]; ok {
|
||||||
// Parse the raw error string if it's JSON
|
// Parse the raw error string if it's JSON
|
||||||
var rawError map[string]interface{}
|
var rawError map[string]any
|
||||||
if rawStr, ok := raw.(string); ok && json.Unmarshal([]byte(rawStr), &rawError) == nil {
|
if rawStr, ok := raw.(string); ok && json.Unmarshal([]byte(rawStr), &rawError) == nil {
|
||||||
if rawErrorData, ok := rawError["error"]; ok {
|
if rawErrorData, ok := rawError["error"]; ok {
|
||||||
if rawErrorMap, ok := rawErrorData.(map[string]interface{}); ok {
|
if rawErrorMap, ok := rawErrorData.(map[string]any); ok {
|
||||||
if rawMsg, ok := rawErrorMap["message"]; ok {
|
if rawMsg, ok := rawErrorMap["message"]; ok {
|
||||||
return fmt.Sprintf("API Error: %s", rawMsg)
|
return fmt.Sprintf("API Error: %s", rawMsg)
|
||||||
}
|
}
|
||||||
@@ -519,16 +517,13 @@ func extractDetailedErrorFromBytes(body []byte, statusCode int) string {
|
|||||||
}
|
}
|
||||||
details = append(details, fmt.Sprintf("Metadata: %v", metadata))
|
details = append(details, fmt.Sprintf("Metadata: %v", metadata))
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(details) > 0 {
|
if len(details) > 0 {
|
||||||
return fmt.Sprintf("API Error: %s (%s)", errorMsg, strings.Join(details, ", "))
|
return fmt.Sprintf("API Error: %s (%s)", errorMsg, strings.Join(details, ", "))
|
||||||
}
|
}
|
||||||
|
|
||||||
return "API Error: " + errorMsg
|
return "API Error: " + errorMsg
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If not a structured error response, return the raw body with status
|
// If not a structured error response, return the raw body with status
|
||||||
return fmt.Sprintf("HTTP Status: %d, Response Body: %s", statusCode, string(body))
|
return fmt.Sprintf("HTTP Status: %d, Response Body: %s", statusCode, string(body))
|
||||||
}
|
}
|
||||||
@@ -600,7 +595,7 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
streamDone <- true
|
streamDone <- true
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
//
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
reader := bufio.NewReader(resp.Body)
|
reader := bufio.NewReader(resp.Body)
|
||||||
counter := uint32(0)
|
counter := uint32(0)
|
||||||
@@ -608,6 +603,9 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
hasReasoning := false
|
hasReasoning := false
|
||||||
reasoningSent := false
|
reasoningSent := false
|
||||||
|
defer func() {
|
||||||
|
finalizeRespStats(tokenCount, startTime)
|
||||||
|
}()
|
||||||
for {
|
for {
|
||||||
var (
|
var (
|
||||||
answerText string
|
answerText string
|
||||||
@@ -617,7 +615,6 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
// to stop from spiriling in infinity read of bad bytes that happens with poor connection
|
// to stop from spiriling in infinity read of bad bytes that happens with poor connection
|
||||||
if cfg.ChunkLimit > 0 && counter > cfg.ChunkLimit {
|
if cfg.ChunkLimit > 0 && counter > cfg.ChunkLimit {
|
||||||
logger.Warn("response hit chunk limit", "limit", cfg.ChunkLimit)
|
logger.Warn("response hit chunk limit", "limit", cfg.ChunkLimit)
|
||||||
finalizeRespStats(tokenCount, startTime)
|
|
||||||
streamDone <- true
|
streamDone <- true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -641,7 +638,6 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
logger.Error("failed to notify", "error", err)
|
logger.Error("failed to notify", "error", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
finalizeRespStats(tokenCount, startTime)
|
|
||||||
streamDone <- true
|
streamDone <- true
|
||||||
break
|
break
|
||||||
// }
|
// }
|
||||||
@@ -657,7 +653,6 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
line = line[6:]
|
line = line[6:]
|
||||||
logger.Debug("debugging resp", "line", string(line))
|
logger.Debug("debugging resp", "line", string(line))
|
||||||
if bytes.Equal(line, []byte("[DONE]\n")) {
|
if bytes.Equal(line, []byte("[DONE]\n")) {
|
||||||
finalizeRespStats(tokenCount, startTime)
|
|
||||||
streamDone <- true
|
streamDone <- true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -671,7 +666,6 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
if err := notifyUser("LLM Response Error", "Failed to parse LLM response: "+err.Error()); err != nil {
|
if err := notifyUser("LLM Response Error", "Failed to parse LLM response: "+err.Error()); err != nil {
|
||||||
logger.Error("failed to notify user", "error", err)
|
logger.Error("failed to notify user", "error", err)
|
||||||
}
|
}
|
||||||
finalizeRespStats(tokenCount, startTime)
|
|
||||||
streamDone <- true
|
streamDone <- true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -695,7 +689,6 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
chunkChan <- answerText
|
chunkChan <- answerText
|
||||||
tokenCount++
|
tokenCount++
|
||||||
}
|
}
|
||||||
finalizeRespStats(tokenCount, startTime)
|
|
||||||
streamDone <- true
|
streamDone <- true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -734,8 +727,8 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
if chunkParser.GetAPIType() == models.APITypeCompletion &&
|
if chunkParser.GetAPIType() == models.APITypeCompletion &&
|
||||||
slices.Contains(stopStrings, answerText) {
|
slices.Contains(stopStrings, answerText) {
|
||||||
logger.Debug("stop string detected on client side for completion endpoint", "stop_string", answerText)
|
logger.Debug("stop string detected on client side for completion endpoint", "stop_string", answerText)
|
||||||
finalizeRespStats(tokenCount, startTime)
|
|
||||||
streamDone <- true
|
streamDone <- true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if answerText != "" {
|
if answerText != "" {
|
||||||
chunkChan <- answerText
|
chunkChan <- answerText
|
||||||
@@ -751,7 +744,6 @@ func sendMsgToLLM(body io.Reader) {
|
|||||||
if interruptResp { // read bytes, so it would not get into beginning of the next req
|
if interruptResp { // read bytes, so it would not get into beginning of the next req
|
||||||
interruptResp = false
|
interruptResp = false
|
||||||
logger.Info("interrupted bot response", "chunk_counter", counter)
|
logger.Info("interrupted bot response", "chunk_counter", counter)
|
||||||
finalizeRespStats(tokenCount, startTime)
|
|
||||||
streamDone <- true
|
streamDone <- true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user