package broker import ( "fmt" "log/slog" "net/http" "os" "time" ) // the amount of time to wait when pushing a message to // a slow client or a client that closed after `range clients` started. // const patience time.Duration = time.Second * 1 type ( NotificationEvent struct { EventName string Payload string } NotifierChan chan NotificationEvent Broker struct { // Events are pushed to this channel by the main events-gathering routine Notifier NotifierChan log *slog.Logger } ) func NewBroker() (broker *Broker) { // Instantiate a broker return &Broker{ Notifier: make(NotifierChan, 100), log: slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{ Level: slog.LevelDebug, AddSource: true, })), } } var Notifier *Broker // for use in different packages func init() { Notifier = NewBroker() } func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Headers (keep these as-is) w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // w.Header().Set("Access-Control-Allow-Origin", "*") origin := r.Header.Get("Origin") if origin == "" { origin = "*" // Fallback for non-browser clients } w.Header().Set("Access-Control-Allow-Origin", origin) w.Header().Set("Access-Control-Allow-Credentials", "true") ctx := r.Context() // browser can close sse on its own; ping every 2s to prevent heartbeat := time.NewTicker(2 * time.Second) defer heartbeat.Stop() for { select { case <-ctx.Done(): broker.log.Debug("broker: got ctx done") // Client disconnected return case event := <-broker.Notifier: broker.log.Debug("got event", "event", event) _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload) if err != nil { broker.log.Error("failed to write event", "error", err) // Client disconnected return } w.(http.Flusher).Flush() case <-heartbeat.C: // Send SSE heartbeat comment if _, err := fmt.Fprint(w, ":\n\n"); err != nil { broker.log.Error("failed to write heartbeat", "error", err) return // Client disconnected } w.(http.Flusher).Flush() } } }