diff --git a/broker/sse.go b/broker/sse.go index 0ff5da6..468d100 100644 --- a/broker/sse.go +++ b/broker/sse.go @@ -52,11 +52,20 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") - w.Header().Set("Access-Control-Allow-Origin", "*") + // w.Header().Set("Access-Control-Allow-Origin", "*") + origin := r.Header.Get("Origin") + if origin == "" { + origin = "*" // Fallback for non-browser clients + } + w.Header().Set("Access-Control-Allow-Origin", origin) + w.Header().Set("Access-Control-Allow-Credentials", "true") messageChan := make(NotifierChan) broker.newClients <- messageChan defer func() { broker.closingClients <- messageChan }() ctx := r.Context() + // browser can close sse on its own + heartbeat := time.NewTicker(15 * time.Second) + defer heartbeat.Stop() for { select { case <-ctx.Done(): @@ -70,6 +79,12 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } w.(http.Flusher).Flush() + case <-heartbeat.C: + // Send SSE heartbeat comment + if _, err := w.Write([]byte(": heartbeat\n\n")); err != nil { + return // Client disconnected + } + w.(http.Flusher).Flush() } } } @@ -95,7 +110,8 @@ func (broker *Broker) Listen() { select { case clientMessageChan <- event: case <-time.After(patience): - slog.Info("Client was skipped", "clients listening", len(broker.clients)) + delete(broker.clients, clientMessageChan) + slog.Info("Client was removed", "clients listening", len(broker.clients)) } } }