package broker import ( "fmt" "log/slog" "net/http" "time" ) // the amount of time to wait when pushing a message to // a slow client or a client that closed after `range clients` started. const patience time.Duration = time.Second * 1 type ( NotificationEvent struct { EventName string Payload string } NotifierChan chan NotificationEvent Broker struct { // Events are pushed to this channel by the main events-gathering routine Notifier NotifierChan // New client connections newClients chan NotifierChan // Closed client connections closingClients chan NotifierChan // Client connections registry clients map[NotifierChan]struct{} } ) func NewBroker() (broker *Broker) { // Instantiate a broker return &Broker{ Notifier: make(NotifierChan, 1), newClients: make(chan NotifierChan), closingClients: make(chan NotifierChan), clients: make(map[NotifierChan]struct{}), } } var Notifier *Broker // for use in different packages func init() { Notifier = NewBroker() go Notifier.Listen() } func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Headers (keep these as-is) w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // w.Header().Set("Access-Control-Allow-Origin", "*") origin := r.Header.Get("Origin") if origin == "" { origin = "*" // Fallback for non-browser clients } w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Allow-Credentials", "true") messageChan := make(NotifierChan) broker.newClients <- messageChan defer func() { broker.closingClients <- messageChan }() ctx := r.Context() // browser can close sse on its own heartbeat := time.NewTicker(15 * time.Second) defer heartbeat.Stop() for { select { case <-ctx.Done(): // Client disconnected return case event := <-messageChan: _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload) if err != nil { fmt.Println(err) // Client disconnected return } w.(http.Flusher).Flush() case <-heartbeat.C: // Send SSE heartbeat comment if _, err := w.Write([]byte(": heartbeat\n\n")); err != nil { return // Client disconnected } w.(http.Flusher).Flush() } } } // Listen for new notifications and redistribute them to clients func (broker *Broker) Listen() { for { select { case s := <-broker.newClients: // A new client has connected. // Register their message channel broker.clients[s] = struct{}{} slog.Info("Client added", "clients listening", len(broker.clients)) case s := <-broker.closingClients: // A client has dettached and we want to // stop sending them messages. delete(broker.clients, s) slog.Info("Client removed", "clients listening", len(broker.clients)) case event := <-broker.Notifier: // We got a new event from the outside! // Send event to all connected clients for clientMessageChan := range broker.clients { select { case clientMessageChan <- event: case <-time.After(patience): delete(broker.clients, clientMessageChan) slog.Info("Client was removed", "clients listening", len(broker.clients)) } } } } }