package broker import ( "fmt" "log/slog" "net/http" "time" ) // the amount of time to wait when pushing a message to // a slow client or a client that closed after `range clients` started. const patience time.Duration = time.Second * 1 type ( NotificationEvent struct { EventName string Payload string } NotifierChan chan NotificationEvent Broker struct { // Events are pushed to this channel by the main events-gathering routine Notifier NotifierChan // New client connections newClients chan NotifierChan // Closed client connections closingClients chan NotifierChan // Client connections registry clients map[NotifierChan]struct{} } ) func NewBroker() (broker *Broker) { // Instantiate a broker return &Broker{ Notifier: make(NotifierChan, 1), newClients: make(chan NotifierChan), closingClients: make(chan NotifierChan), clients: make(map[NotifierChan]struct{}), } } var Notifier *Broker // for use in different packages func init() { Notifier = NewBroker() go Notifier.Listen() } func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Headers (keep these as-is) w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") messageChan := make(NotifierChan) broker.newClients <- messageChan defer func() { broker.closingClients <- messageChan }() ctx := r.Context() for { select { case <-ctx.Done(): // Client disconnected return case event := <-messageChan: _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload) if err != nil { fmt.Println(err) // Client disconnected return } w.(http.Flusher).Flush() } } } // Listen for new notifications and redistribute them to clients func (broker *Broker) Listen() { for { select { case s := <-broker.newClients: // A new client has connected. // Register their message channel broker.clients[s] = struct{}{} slog.Info("Client added", "clients listening", len(broker.clients)) case s := <-broker.closingClients: // A client has dettached and we want to // stop sending them messages. delete(broker.clients, s) slog.Info("Client removed", "clients listening", len(broker.clients)) case event := <-broker.Notifier: // We got a new event from the outside! // Send event to all connected clients for clientMessageChan := range broker.clients { select { case clientMessageChan <- event: case <-time.After(patience): slog.Info("Client was skipped", "clients listening", len(broker.clients)) } } } } }