Compare commits

...

2 Commits

Author SHA1 Message Date
Grail Finder
817d69c425 Enha: sse update 2025-08-03 16:13:51 +03:00
Grail Finder
acc3f11ee3 Enha: remove extra connection 2025-07-17 23:02:51 +03:00
2 changed files with 36 additions and 10 deletions

View File

@@ -1,6 +1,7 @@
package broker package broker
import ( import (
"context"
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
@@ -22,6 +23,8 @@ type (
// Events are pushed to this channel by the main events-gathering routine // Events are pushed to this channel by the main events-gathering routine
Notifier NotifierChan Notifier NotifierChan
log *slog.Logger log *slog.Logger
addClient chan NotifierChan
clients map[NotifierChan]struct{}
} }
) )
@@ -33,6 +36,8 @@ func NewBroker() (broker *Broker) {
Level: slog.LevelDebug, Level: slog.LevelDebug,
AddSource: true, AddSource: true,
})), })),
addClient: make(chan NotifierChan, 10),
clients: map[NotifierChan]struct{}{},
} }
} }
@@ -56,8 +61,10 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Credentials", "true") w.Header().Set("Access-Control-Allow-Credentials", "true")
ctx := r.Context() ctx := r.Context()
msgChan := make(NotifierChan, 10)
broker.addClient <- msgChan
// browser can close sse on its own; ping every 2s to prevent // browser can close sse on its own; ping every 2s to prevent
heartbeat := time.NewTicker(2 * time.Second) heartbeat := time.NewTicker(8 * time.Second)
defer heartbeat.Stop() defer heartbeat.Stop()
for { for {
select { select {
@@ -67,13 +74,20 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return return
case event := <-broker.Notifier: case event := <-broker.Notifier:
broker.log.Debug("got event", "event", event) broker.log.Debug("got event", "event", event)
for i := 0; i < 10; i++ { // Repeat 3 times
_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload) _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
if err != nil { if err != nil {
broker.log.Error("failed to write event", "error", err) broker.log.Error("write failed", "error", err)
// Client disconnected
return return
} }
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
// Short delay between sends (non-blocking)
select {
case <-time.After(20 * time.Millisecond): // Adjust delay as needed
case <-ctx.Done():
return
}
}
case <-heartbeat.C: case <-heartbeat.C:
// Send SSE heartbeat comment // Send SSE heartbeat comment
if _, err := fmt.Fprint(w, ":\n\n"); err != nil { if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
@@ -84,3 +98,15 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
} }
} }
} }
func (broker *Broker) Listen(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case clientChan := <-broker.addClient:
// mutex
broker.clients[clientChan] = struct{}{}
}
}
}

View File

@@ -53,7 +53,7 @@
</div> </div>
<hr/> <hr/>
<div class="grid grid-cols-1 md:grid-cols-5 md:gap-4"> <div class="grid grid-cols-1 md:grid-cols-5 md:gap-4">
<div hx-get="/actionhistory" hx-trigger="sse:backlog_{{.Room.ID}}" class="md:col-span-1"> <div hx-get="/actionhistory" class="md:col-span-1">
{{template "actionhistory" .Room.ActionHistory}} {{template "actionhistory" .Room.ActionHistory}}
</div> </div>
<div id="cardtable" class="md:col-span-3"> <div id="cardtable" class="md:col-span-3">