From 817d69c425982c4e31753681c477003634f2b010 Mon Sep 17 00:00:00 2001 From: Grail Finder Date: Sun, 3 Aug 2025 16:13:51 +0300 Subject: [PATCH] Enha: sse update --- broker/sse.go | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/broker/sse.go b/broker/sse.go index a40692d..ba67e2e 100644 --- a/broker/sse.go +++ b/broker/sse.go @@ -1,6 +1,7 @@ package broker import ( + "context" "fmt" "log/slog" "net/http" @@ -20,8 +21,10 @@ type ( NotifierChan chan NotificationEvent Broker struct { // Events are pushed to this channel by the main events-gathering routine - Notifier NotifierChan - log *slog.Logger + Notifier NotifierChan + log *slog.Logger + addClient chan NotifierChan + clients map[NotifierChan]struct{} } ) @@ -33,6 +36,8 @@ func NewBroker() (broker *Broker) { Level: slog.LevelDebug, AddSource: true, })), + addClient: make(chan NotifierChan, 10), + clients: map[NotifierChan]struct{}{}, } } @@ -56,6 +61,8 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Allow-Credentials", "true") ctx := r.Context() + msgChan := make(NotifierChan, 10) + broker.addClient <- msgChan // browser can close sse on its own; ping every 2s to prevent heartbeat := time.NewTicker(8 * time.Second) defer heartbeat.Stop() @@ -91,3 +98,15 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } } + +func (broker *Broker) Listen(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case clientChan := <-broker.addClient: + // mutex + broker.clients[clientChan] = struct{}{} + } + } +}