Enha: simplify sse (worsened event recieving)

This commit is contained in:
Grail Finder
2025-07-12 20:33:41 +03:00
parent a934d07be3
commit 8f9865db3f
6 changed files with 37 additions and 46 deletions

View File

@ -4,6 +4,7 @@ import (
"fmt"
"log/slog"
"net/http"
"os"
"time"
)
@ -20,22 +21,18 @@ type (
Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier NotifierChan
// New client connections
newClients chan NotifierChan
// Closed client connections
closingClients chan NotifierChan
// Client connections registry
clients map[NotifierChan]struct{}
log *slog.Logger
}
)
func NewBroker() (broker *Broker) {
// Instantiate a broker
return &Broker{
Notifier: make(NotifierChan, 1),
newClients: make(chan NotifierChan),
closingClients: make(chan NotifierChan),
clients: make(map[NotifierChan]struct{}),
Notifier: make(NotifierChan, 100),
log: slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
})),
}
}
@ -44,7 +41,6 @@ var Notifier *Broker
// for use in different packages
func init() {
Notifier = NewBroker()
go Notifier.Listen()
}
func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -59,9 +55,6 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Credentials", "true")
messageChan := make(NotifierChan, 10) // Buffered channel
broker.newClients <- messageChan
defer func() { broker.closingClients <- messageChan }()
ctx := r.Context()
// browser can close sse on its own; ping every 2s to prevent
heartbeat := time.NewTicker(2 * time.Second)
@ -69,12 +62,14 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for {
select {
case <-ctx.Done():
broker.log.Debug("broker: got ctx done")
// Client disconnected
return
case event := <-messageChan:
case event := <-broker.Notifier:
broker.log.Debug("got event", "event", event)
_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
if err != nil {
fmt.Println(err)
broker.log.Error("failed to write event", "error", err)
// Client disconnected
return
}
@ -82,38 +77,10 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case <-heartbeat.C:
// Send SSE heartbeat comment
if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
broker.log.Error("failed to write heartbeat", "error", err)
return // Client disconnected
}
w.(http.Flusher).Flush()
}
}
}
// Listen for new notifications and redistribute them to clients
func (broker *Broker) Listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = struct{}{}
slog.Info("Client added", "clients listening", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
slog.Info("Client removed", "clients listening", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
delete(broker.clients, clientMessageChan)
slog.Info("Client was removed", "clients listening", len(broker.clients))
}
}
}
}
}