2 Commits

Author SHA1 Message Date
Grail Finder
34cd4ac141 Fix: ragflow 2026-02-24 14:24:57 +03:00
Grail Finder
343366b12d Fix: tag tables 2026-02-24 12:28:17 +03:00
2 changed files with 157 additions and 90 deletions

View File

@@ -23,7 +23,6 @@ var (
ErrRAGStatus = "some error occurred; failed to transfer data to vector db"
)
type RAG struct {
logger *slog.Logger
store storage.FullRepo
@@ -122,10 +121,11 @@ func (r *RAG) LoadRAG(fpath string) error {
batchCh = make(chan map[int][]string, maxChSize)
vectorCh = make(chan []models.VectorRow, maxChSize)
errCh = make(chan error, 1)
doneCh = make(chan struct{})
wg = new(sync.WaitGroup)
lock = new(sync.Mutex)
)
defer close(doneCh)
defer close(errCh)
defer close(batchCh)
@@ -156,18 +156,20 @@ func (r *RAG) LoadRAG(fpath string) error {
for w := 0; w < int(r.cfg.RAGWorkers); w++ {
go func(workerID int) {
defer wg.Done()
r.batchToVectorAsync(lock, workerID, batchCh, vectorCh, errCh, path.Base(fpath))
r.batchToVectorAsync(workerID, batchCh, vectorCh, errCh, doneCh, path.Base(fpath))
}(w)
}
// Use a goroutine to close the batchCh when all batches are sent
// Close batchCh to signal workers no more data is coming
close(batchCh)
// Wait for all workers to finish, then close vectorCh
go func() {
wg.Wait()
close(vectorCh) // Close vectorCh when all workers are done
close(vectorCh)
}()
// Check for errors from workers
// Use a non-blocking check for errors
// Check for errors from workers - this will block until an error occurs or all workers finish
select {
case err := <-errCh:
if err != nil {
@@ -179,12 +181,28 @@ func (r *RAG) LoadRAG(fpath string) error {
}
// Write vectors to storage - this will block until vectorCh is closed
return r.writeVectors(vectorCh)
return r.writeVectors(vectorCh, errCh)
}
func (r *RAG) writeVectors(vectorCh chan []models.VectorRow) error {
func (r *RAG) writeVectors(vectorCh chan []models.VectorRow, errCh chan error) error {
// Use a select to handle both vectorCh and errCh
for {
for batch := range vectorCh {
select {
case err := <-errCh:
if err != nil {
r.logger.Error("error during RAG processing in writeVectors", "error", err)
return err
}
case batch, ok := <-vectorCh:
if !ok {
r.logger.Debug("vector channel closed, finished writing vectors")
select {
case LongJobStatusCh <- FinishedRAGStatus:
default:
r.logger.Warn("LongJobStatusCh channel is full or closed, dropping status message", "message", FinishedRAGStatus)
}
return nil
}
for _, vector := range batch {
if err := r.storage.WriteVector(&vector); err != nil {
r.logger.Error("failed to write vector to DB", "error", err, "slug", vector.Slug)
@@ -192,74 +210,57 @@ func (r *RAG) writeVectors(vectorCh chan []models.VectorRow) error {
case LongJobStatusCh <- ErrRAGStatus:
default:
r.logger.Warn("LongJobStatusCh channel is full or closed, dropping status message", "message", ErrRAGStatus)
// Channel is full or closed, ignore the message to prevent panic
}
return err // Stop the entire RAG operation on DB error
return err
}
}
r.logger.Debug("wrote batch to db", "size", len(batch), "vector_chan_len", len(vectorCh))
if len(vectorCh) == 0 {
r.logger.Debug("finished writing vectors")
select {
case LongJobStatusCh <- FinishedRAGStatus:
default:
r.logger.Warn("LongJobStatusCh channel is full or closed, dropping status message", "message", FinishedRAGStatus)
// Channel is full or closed, ignore the message to prevent panic
}
return nil
}
r.logger.Debug("wrote batch to db", "size", len(batch))
}
}
}
func (r *RAG) batchToVectorAsync(lock *sync.Mutex, id int, inputCh <-chan map[int][]string,
vectorCh chan<- []models.VectorRow, errCh chan error, filename string) {
func (r *RAG) batchToVectorAsync(id int, inputCh <-chan map[int][]string,
vectorCh chan<- []models.VectorRow, errCh chan error, doneCh <-chan struct{}, filename string) {
var err error
defer func() {
// For errCh, make sure we only send if there's actually an error and the channel can accept it
if err != nil {
select {
case errCh <- err:
default:
// errCh might be full or closed, log but don't panic
r.logger.Warn("errCh channel full or closed, skipping error propagation", "worker", id, "error", err)
}
}
}()
for {
lock.Lock()
if len(inputCh) == 0 {
lock.Unlock()
return
}
select {
case linesMap := <-inputCh:
case <-doneCh:
r.logger.Debug("worker received done signal", "worker", id)
return
case linesMap, ok := <-inputCh:
if !ok {
r.logger.Debug("input channel closed, worker exiting", "worker", id)
return
}
for leftI, lines := range linesMap {
select {
case <-doneCh:
return
default:
}
if err := r.fetchEmb(lines, errCh, vectorCh, fmt.Sprintf("%s_%d", filename, leftI), filename); err != nil {
r.logger.Error("error fetching embeddings", "error", err, "worker", id)
lock.Unlock()
return
}
}
lock.Unlock()
case err = <-errCh:
r.logger.Error("got an error from error channel", "error", err)
lock.Unlock()
return
default:
lock.Unlock()
}
r.logger.Debug("processed batch", "batches#", len(inputCh), "worker#", id)
statusMsg := fmt.Sprintf("converted to vector; batches: %d, worker#: %d", len(inputCh), id)
select {
case LongJobStatusCh <- statusMsg:
default:
r.logger.Warn("LongJobStatusCh channel full or closed, dropping status message", "message", statusMsg)
// Channel is full or closed, ignore the message to prevent panic
r.logger.Debug("processed batch", "worker#", id)
statusMsg := fmt.Sprintf("converted to vector; worker#: %d", id)
select {
case LongJobStatusCh <- statusMsg:
default:
r.logger.Warn("LongJobStatusCh channel full or closed, dropping status message", "message", statusMsg)
}
}
}
}

144
tables.go
View File

@@ -236,9 +236,20 @@ func makeChatTable(chatMap map[string]models.Chat) *tview.Table {
}
// nolint:unused
func formatSize(size int64) string {
units := []string{"B", "KB", "MB", "GB", "TB"}
i := 0
s := float64(size)
for s >= 1024 && i < len(units)-1 {
s /= 1024
i++
}
return fmt.Sprintf("%.1f%s", s, units[i])
}
func makeRAGTable(fileList []string) *tview.Flex {
actions := []string{"load", "delete"}
rows, cols := len(fileList), len(actions)+1
rows, cols := len(fileList), len(actions)+2
fileTable := tview.NewTable().
SetBorders(true)
longStatusView := tview.NewTextView()
@@ -252,39 +263,62 @@ func makeRAGTable(fileList []string) *tview.Flex {
AddItem(fileTable, 0, 60, true)
// Add the exit option as the first row (row 0)
fileTable.SetCell(0, 0,
tview.NewTableCell("Exit RAG manager").
tview.NewTableCell("File Name").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
fileTable.SetCell(0, 1,
tview.NewTableCell("(Close without action)").
SetTextColor(tcell.ColorGray).
tview.NewTableCell("Preview").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
fileTable.SetCell(0, 2,
tview.NewTableCell("exit").
SetTextColor(tcell.ColorGray).
SetAlign(tview.AlignCenter))
tview.NewTableCell("Load").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
fileTable.SetCell(0, 3,
tview.NewTableCell("Delete").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
// Add the file rows starting from row 1
for r := 0; r < rows; r++ {
for c := 0; c < cols; c++ {
color := tcell.ColorWhite
switch {
case c < 1:
fileTable.SetCell(r+1, c, // +1 to account for the exit row at index 0
case c == 0:
fileTable.SetCell(r+1, c,
tview.NewTableCell(fileList[r]).
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
case c == 1: // Action description column - not selectable
fileTable.SetCell(r+1, c, // +1 to account for the exit row at index 0
tview.NewTableCell("(Action)").
case c == 1:
fpath := path.Join(cfg.RAGDir, fileList[r])
if fi, err := os.Stat(fpath); err == nil {
size := fi.Size()
modTime := fi.ModTime()
preview := fmt.Sprintf("%s | %s", formatSize(size), modTime.Format("2006-01-02 15:04"))
fileTable.SetCell(r+1, c,
tview.NewTableCell(preview).
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
} else {
fileTable.SetCell(r+1, c,
tview.NewTableCell("error").
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
}
case c == 2:
fileTable.SetCell(r+1, c,
tview.NewTableCell("load").
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
default: // Action button column - selectable
fileTable.SetCell(r+1, c, // +1 to account for the exit row at index 0
tview.NewTableCell(actions[c-1]).
SetAlign(tview.AlignCenter))
default:
fileTable.SetCell(r+1, c,
tview.NewTableCell("delete").
SetTextColor(color).
SetAlign(tview.AlignCenter))
}
@@ -318,7 +352,7 @@ func makeRAGTable(fileList []string) *tview.Flex {
}()
fileTable.Select(0, 0).
SetFixed(1, 1).
SetSelectable(true, false).
SetSelectable(true, true).
SetSelectedStyle(tcell.StyleDefault.Background(tcell.ColorGray).Foreground(tcell.ColorWhite)).
SetDoneFunc(func(key tcell.Key) {
if key == tcell.KeyEsc || key == tcell.KeyF1 || key == tcell.Key('x') || key == tcell.KeyCtrlX {
@@ -335,6 +369,8 @@ func makeRAGTable(fileList []string) *tview.Flex {
}
// defer pages.RemovePage(RAGPage)
tc := fileTable.GetCell(row, column)
tc.SetTextColor(tcell.ColorRed)
fileTable.SetSelectable(false, false)
// Check if the selected row is the exit row (row 0) - do this first to avoid index issues
if row == 0 {
pages.RemovePage(RAGPage)
@@ -385,7 +421,7 @@ func makeRAGTable(fileList []string) *tview.Flex {
func makeLoadedRAGTable(fileList []string) *tview.Flex {
actions := []string{"delete"}
rows, cols := len(fileList), len(actions)+1
rows, cols := len(fileList), len(actions)+2
// Add 1 extra row for the "exit" option at the top
fileTable := tview.NewTable().
SetBorders(true)
@@ -400,39 +436,61 @@ func makeLoadedRAGTable(fileList []string) *tview.Flex {
AddItem(fileTable, 0, 60, true)
// Add the exit option as the first row (row 0)
fileTable.SetCell(0, 0,
tview.NewTableCell("Exit Loaded Files manager").
tview.NewTableCell("File Name").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
fileTable.SetCell(0, 1,
tview.NewTableCell("(Close without action)").
SetTextColor(tcell.ColorGray).
tview.NewTableCell("Preview").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
fileTable.SetCell(0, 2,
tview.NewTableCell("exit").
SetTextColor(tcell.ColorGray).
SetAlign(tview.AlignCenter))
tview.NewTableCell("Load").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
fileTable.SetCell(0, 3,
tview.NewTableCell("Delete").
SetTextColor(tcell.ColorWhite).
SetAlign(tview.AlignCenter).
SetSelectable(false))
// Add the file rows starting from row 1
for r := 0; r < rows; r++ {
for c := 0; c < cols; c++ {
color := tcell.ColorWhite
switch {
case c < 1:
fileTable.SetCell(r+1, c, // +1 to account for the exit row at index 0
case c == 0:
fileTable.SetCell(r+1, c,
tview.NewTableCell(fileList[r]).
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
case c == 1: // Action description column - not selectable
fileTable.SetCell(r+1, c, // +1 to account for the exit row at index 0
tview.NewTableCell("(Action)").
case c == 1:
if fi, err := os.Stat(fileList[r]); err == nil {
size := fi.Size()
modTime := fi.ModTime()
preview := fmt.Sprintf("%s | %s", formatSize(size), modTime.Format("2006-01-02 15:04"))
fileTable.SetCell(r+1, c,
tview.NewTableCell(preview).
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
} else {
fileTable.SetCell(r+1, c,
tview.NewTableCell("error").
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
}
case c == 2:
fileTable.SetCell(r+1, c,
tview.NewTableCell("load").
SetTextColor(color).
SetAlign(tview.AlignCenter).
SetSelectable(false))
default: // Action button column - selectable
fileTable.SetCell(r+1, c, // +1 to account for the exit row at index 0
tview.NewTableCell(actions[c-1]).
SetAlign(tview.AlignCenter))
default:
fileTable.SetCell(r+1, c,
tview.NewTableCell("delete").
SetTextColor(color).
SetAlign(tview.AlignCenter))
}
@@ -440,7 +498,7 @@ func makeLoadedRAGTable(fileList []string) *tview.Flex {
}
fileTable.Select(0, 0).
SetFixed(1, 1).
SetSelectable(true, false).
SetSelectable(true, true).
SetSelectedStyle(tcell.StyleDefault.Background(tcell.ColorGray).Foreground(tcell.ColorWhite)).
SetDoneFunc(func(key tcell.Key) {
if key == tcell.KeyEsc || key == tcell.KeyF1 || key == tcell.Key('x') || key == tcell.KeyCtrlX {
@@ -456,6 +514,8 @@ func makeLoadedRAGTable(fileList []string) *tview.Flex {
return
}
tc := fileTable.GetCell(row, column)
tc.SetTextColor(tcell.ColorRed)
fileTable.SetSelectable(false, false)
// Check if the selected row is the exit row (row 0) - do this first to avoid index issues
if row == 0 {
pages.RemovePage(RAGLoadedPage)
@@ -533,7 +593,7 @@ func makeAgentTable(agentList []string) *tview.Table {
}
chatActTable.Select(0, 0).
SetFixed(1, 1).
SetSelectable(true, false).
SetSelectable(true, true).
SetSelectedStyle(tcell.StyleDefault.Background(tcell.ColorGray).Foreground(tcell.ColorWhite)).
SetDoneFunc(func(key tcell.Key) {
if key == tcell.KeyEsc || key == tcell.KeyF1 || key == tcell.Key('x') {
@@ -549,6 +609,8 @@ func makeAgentTable(agentList []string) *tview.Table {
return
}
tc := chatActTable.GetCell(row, column)
tc.SetTextColor(tcell.ColorRed)
chatActTable.SetSelectable(false, false)
selected := agentList[row]
// notification := fmt.Sprintf("chat: %s; action: %s", selectedChat, tc.Text)
switch tc.Text {
@@ -630,7 +692,7 @@ func makeCodeBlockTable(codeBlocks []string) *tview.Table {
}
table.Select(0, 0).
SetFixed(1, 1).
SetSelectable(true, false).
SetSelectable(true, true).
SetSelectedStyle(tcell.StyleDefault.Background(tcell.ColorGray).Foreground(tcell.ColorWhite)).
SetDoneFunc(func(key tcell.Key) {
if key == tcell.KeyEsc || key == tcell.KeyF1 || key == tcell.Key('x') {
@@ -646,6 +708,8 @@ func makeCodeBlockTable(codeBlocks []string) *tview.Table {
return
}
tc := table.GetCell(row, column)
tc.SetTextColor(tcell.ColorRed)
table.SetSelectable(false, false)
selected := codeBlocks[row]
// notification := fmt.Sprintf("chat: %s; action: %s", selectedChat, tc.Text)
switch tc.Text {
@@ -702,7 +766,7 @@ func makeImportChatTable(filenames []string) *tview.Table {
}
chatActTable.Select(0, 0).
SetFixed(1, 1).
SetSelectable(true, false).
SetSelectable(true, true).
SetSelectedStyle(tcell.StyleDefault.Background(tcell.ColorGray).Foreground(tcell.ColorWhite)).
SetDoneFunc(func(key tcell.Key) {
if key == tcell.KeyEsc || key == tcell.KeyF1 || key == tcell.Key('x') {
@@ -718,6 +782,8 @@ func makeImportChatTable(filenames []string) *tview.Table {
return
}
tc := chatActTable.GetCell(row, column)
tc.SetTextColor(tcell.ColorRed)
chatActTable.SetSelectable(false, false)
selected := filenames[row]
// notification := fmt.Sprintf("chat: %s; action: %s", selectedChat, tc.Text)
switch tc.Text {