Fix(tts): mutex use

This commit is contained in:
Grail Finder
2026-02-02 11:11:07 +03:00
parent 0e6d2747cd
commit fcb4b99332

View File

@@ -16,6 +16,7 @@ import (
"regexp" "regexp"
"strings" "strings"
"time" "time"
"sync"
google_translate_tts "github.com/GrailFinder/google-translate-tts" google_translate_tts "github.com/GrailFinder/google-translate-tts"
"github.com/GrailFinder/google-translate-tts/handlers" "github.com/GrailFinder/google-translate-tts/handlers"
@@ -77,6 +78,7 @@ type Orator interface {
// impl https://github.com/remsky/Kokoro-FastAPI // impl https://github.com/remsky/Kokoro-FastAPI
type KokoroOrator struct { type KokoroOrator struct {
logger *slog.Logger logger *slog.Logger
mu sync.Mutex
URL string URL string
Format models.AudioFormat Format models.AudioFormat
Stream bool Stream bool
@@ -93,6 +95,7 @@ type KokoroOrator struct {
// Google Translate TTS implementation // Google Translate TTS implementation
type GoogleTranslateOrator struct { type GoogleTranslateOrator struct {
logger *slog.Logger logger *slog.Logger
mu sync.Mutex
speech *google_translate_tts.Speech speech *google_translate_tts.Speech
currentStream *beep.Ctrl currentStream *beep.Ctrl
currentDone chan bool currentDone chan bool
@@ -109,6 +112,7 @@ func (o *KokoroOrator) stoproutine() {
for len(TTSTextChan) > 0 { for len(TTSTextChan) > 0 {
<-TTSTextChan <-TTSTextChan
} }
o.mu.Lock()
o.textBuffer.Reset() o.textBuffer.Reset()
if o.currentDone != nil { if o.currentDone != nil {
select { select {
@@ -118,6 +122,7 @@ func (o *KokoroOrator) stoproutine() {
} }
} }
o.interrupt = true o.interrupt = true
o.mu.Unlock()
} }
} }
@@ -128,21 +133,24 @@ func (o *KokoroOrator) readroutine() {
for { for {
select { select {
case chunk := <-TTSTextChan: case chunk := <-TTSTextChan:
o.mu.Lock()
o.interrupt = false o.interrupt = false
// sentenceBuf.WriteString(chunk)
// text := sentenceBuf.String()
_, err := o.textBuffer.WriteString(chunk) _, err := o.textBuffer.WriteString(chunk)
if err != nil { if err != nil {
o.logger.Warn("failed to write to stringbuilder", "error", err) o.logger.Warn("failed to write to stringbuilder", "error", err)
o.mu.Unlock()
continue continue
} }
text := o.textBuffer.String() text := o.textBuffer.String()
o.mu.Unlock()
sentences := tokenizer.Tokenize(text) sentences := tokenizer.Tokenize(text)
o.logger.Debug("adding chunk", "chunk", chunk, "text", text, "sen-len", len(sentences)) o.logger.Debug("adding chunk", "chunk", chunk, "text", text, "sen-len", len(sentences))
for i, sentence := range sentences { for i, sentence := range sentences {
if i == len(sentences)-1 { // last sentence if i == len(sentences)-1 { // last sentence
o.mu.Lock()
o.textBuffer.Reset() o.textBuffer.Reset()
_, err := o.textBuffer.WriteString(sentence.Text) _, err := o.textBuffer.WriteString(sentence.Text)
o.mu.Unlock()
if err != nil { if err != nil {
o.logger.Warn("failed to write to stringbuilder", "error", err) o.logger.Warn("failed to write to stringbuilder", "error", err)
continue continue
@@ -163,7 +171,9 @@ func (o *KokoroOrator) readroutine() {
// lln is done get the whole message out // lln is done get the whole message out
if len(TTSTextChan) > 0 { // otherwise might get stuck if len(TTSTextChan) > 0 { // otherwise might get stuck
for chunk := range TTSTextChan { for chunk := range TTSTextChan {
o.mu.Lock()
_, err := o.textBuffer.WriteString(chunk) _, err := o.textBuffer.WriteString(chunk)
o.mu.Unlock()
if err != nil { if err != nil {
o.logger.Warn("failed to write to stringbuilder", "error", err) o.logger.Warn("failed to write to stringbuilder", "error", err)
continue continue
@@ -174,16 +184,21 @@ func (o *KokoroOrator) readroutine() {
} }
} }
// flush remaining text // flush remaining text
o.mu.Lock()
remaining := o.textBuffer.String() remaining := o.textBuffer.String()
remaining = cleanText(remaining) remaining = cleanText(remaining)
o.textBuffer.Reset() o.textBuffer.Reset()
o.mu.Unlock()
if remaining == "" { if remaining == "" {
continue continue
} }
o.logger.Debug("calling Speak with remainder", "rem", remaining) o.logger.Debug("calling Speak with remainder", "rem", remaining)
sentencesRem := tokenizer.Tokenize(remaining) sentencesRem := tokenizer.Tokenize(remaining)
for _, rs := range sentencesRem { // to avoid dumping large volume of text for _, rs := range sentencesRem { // to avoid dumping large volume of text
if o.interrupt { o.mu.Lock()
interrupt := o.interrupt
o.mu.Unlock()
if interrupt {
break break
} }
if err := o.Speak(rs.Text); err != nil { if err := o.Speak(rs.Text); err != nil {
@@ -240,6 +255,9 @@ func (o *KokoroOrator) GetLogger() *slog.Logger {
} }
func (o *KokoroOrator) requestSound(text string) (io.ReadCloser, error) { func (o *KokoroOrator) requestSound(text string) (io.ReadCloser, error) {
if o.URL == "" {
return nil, fmt.Errorf("TTS URL is empty")
}
payload := map[string]interface{}{ payload := map[string]interface{}{
"input": text, "input": text,
"voice": o.Voice, "voice": o.Voice,
@@ -291,14 +309,18 @@ func (o *KokoroOrator) Speak(text string) error {
o.logger.Debug("failed to init speaker", "error", err) o.logger.Debug("failed to init speaker", "error", err)
} }
done := make(chan bool) done := make(chan bool)
o.mu.Lock()
o.currentDone = done o.currentDone = done
// Create controllable stream and store reference
o.currentStream = &beep.Ctrl{Streamer: beep.Seq(streamer, beep.Callback(func() { o.currentStream = &beep.Ctrl{Streamer: beep.Seq(streamer, beep.Callback(func() {
o.mu.Lock()
close(done) close(done)
o.currentStream = nil o.currentStream = nil
o.currentDone = nil
o.mu.Unlock()
})), Paused: false} })), Paused: false}
o.mu.Unlock()
speaker.Play(o.currentStream) speaker.Play(o.currentStream)
<-o.currentDone <-done
return nil return nil
} }
@@ -307,6 +329,8 @@ func (o *KokoroOrator) Stop() {
o.logger.Debug("attempted to stop orator", "orator", o) o.logger.Debug("attempted to stop orator", "orator", o)
speaker.Lock() speaker.Lock()
defer speaker.Unlock() defer speaker.Unlock()
o.mu.Lock()
defer o.mu.Unlock()
if o.currentStream != nil { if o.currentStream != nil {
// o.currentStream.Paused = true // o.currentStream.Paused = true
o.currentStream.Streamer = nil o.currentStream.Streamer = nil
@@ -322,6 +346,7 @@ func (o *GoogleTranslateOrator) stoproutine() {
for len(TTSTextChan) > 0 { for len(TTSTextChan) > 0 {
<-TTSTextChan <-TTSTextChan
} }
o.mu.Lock()
o.textBuffer.Reset() o.textBuffer.Reset()
if o.currentDone != nil { if o.currentDone != nil {
select { select {
@@ -331,6 +356,7 @@ func (o *GoogleTranslateOrator) stoproutine() {
} }
} }
o.interrupt = true o.interrupt = true
o.mu.Unlock()
} }
} }
@@ -339,19 +365,24 @@ func (o *GoogleTranslateOrator) readroutine() {
for { for {
select { select {
case chunk := <-TTSTextChan: case chunk := <-TTSTextChan:
o.mu.Lock()
o.interrupt = false o.interrupt = false
_, err := o.textBuffer.WriteString(chunk) _, err := o.textBuffer.WriteString(chunk)
if err != nil { if err != nil {
o.logger.Warn("failed to write to stringbuilder", "error", err) o.logger.Warn("failed to write to stringbuilder", "error", err)
o.mu.Unlock()
continue continue
} }
text := o.textBuffer.String() text := o.textBuffer.String()
o.mu.Unlock()
sentences := tokenizer.Tokenize(text) sentences := tokenizer.Tokenize(text)
o.logger.Debug("adding chunk", "chunk", chunk, "text", text, "sen-len", len(sentences)) o.logger.Debug("adding chunk", "chunk", chunk, "text", text, "sen-len", len(sentences))
for i, sentence := range sentences { for i, sentence := range sentences {
if i == len(sentences)-1 { // last sentence if i == len(sentences)-1 { // last sentence
o.mu.Lock()
o.textBuffer.Reset() o.textBuffer.Reset()
_, err := o.textBuffer.WriteString(sentence.Text) _, err := o.textBuffer.WriteString(sentence.Text)
o.mu.Unlock()
if err != nil { if err != nil {
o.logger.Warn("failed to write to stringbuilder", "error", err) o.logger.Warn("failed to write to stringbuilder", "error", err)
continue continue
@@ -372,7 +403,9 @@ func (o *GoogleTranslateOrator) readroutine() {
// lln is done get the whole message out // lln is done get the whole message out
if len(TTSTextChan) > 0 { // otherwise might get stuck if len(TTSTextChan) > 0 { // otherwise might get stuck
for chunk := range TTSTextChan { for chunk := range TTSTextChan {
o.mu.Lock()
_, err := o.textBuffer.WriteString(chunk) _, err := o.textBuffer.WriteString(chunk)
o.mu.Unlock()
if err != nil { if err != nil {
o.logger.Warn("failed to write to stringbuilder", "error", err) o.logger.Warn("failed to write to stringbuilder", "error", err)
continue continue
@@ -382,16 +415,21 @@ func (o *GoogleTranslateOrator) readroutine() {
} }
} }
} }
o.mu.Lock()
remaining := o.textBuffer.String() remaining := o.textBuffer.String()
remaining = cleanText(remaining) remaining = cleanText(remaining)
o.textBuffer.Reset() o.textBuffer.Reset()
o.mu.Unlock()
if remaining == "" { if remaining == "" {
continue continue
} }
o.logger.Debug("calling Speak with remainder", "rem", remaining) o.logger.Debug("calling Speak with remainder", "rem", remaining)
sentencesRem := tokenizer.Tokenize(remaining) sentencesRem := tokenizer.Tokenize(remaining)
for _, rs := range sentencesRem { // to avoid dumping large volume of text for _, rs := range sentencesRem { // to avoid dumping large volume of text
if o.interrupt { o.mu.Lock()
interrupt := o.interrupt
o.mu.Unlock()
if interrupt {
break break
} }
if err := o.Speak(rs.Text); err != nil { if err := o.Speak(rs.Text); err != nil {
@@ -434,14 +472,18 @@ func (o *GoogleTranslateOrator) Speak(text string) error {
o.logger.Debug("failed to init speaker", "error", err) o.logger.Debug("failed to init speaker", "error", err)
} }
done := make(chan bool) done := make(chan bool)
o.mu.Lock()
o.currentDone = done o.currentDone = done
// Create controllable stream and store reference
o.currentStream = &beep.Ctrl{Streamer: beep.Seq(playbackStreamer, beep.Callback(func() { o.currentStream = &beep.Ctrl{Streamer: beep.Seq(playbackStreamer, beep.Callback(func() {
o.mu.Lock()
close(done) close(done)
o.currentStream = nil o.currentStream = nil
o.currentDone = nil
o.mu.Unlock()
})), Paused: false} })), Paused: false}
o.mu.Unlock()
speaker.Play(o.currentStream) speaker.Play(o.currentStream)
<-o.currentDone // wait for playback to complete <-done // wait for playback to complete
return nil return nil
} }
@@ -449,6 +491,8 @@ func (o *GoogleTranslateOrator) Stop() {
o.logger.Debug("attempted to stop google translate orator") o.logger.Debug("attempted to stop google translate orator")
speaker.Lock() speaker.Lock()
defer speaker.Unlock() defer speaker.Unlock()
o.mu.Lock()
defer o.mu.Unlock()
if o.currentStream != nil { if o.currentStream != nil {
o.currentStream.Streamer = nil o.currentStream.Streamer = nil
} }