package broker import ( "fmt" "log/slog" "net/http" "strings" "time" ) // the amount of time to wait when pushing a message to // a slow client or a client that closed after `range clients` started. const patience time.Duration = time.Second * 1 type ( NotificationEvent struct { EventName string Payload string } NotifierChan chan NotificationEvent Broker struct { // Events are pushed to this channel by the main events-gathering routine Notifier NotifierChan // New client connections newClients chan NotifierChan // Closed client connections closingClients chan NotifierChan // Client connections registry clients map[NotifierChan]struct{} } ) func NewBroker() (broker *Broker) { // Instantiate a broker return &Broker{ Notifier: make(NotifierChan, 1), newClients: make(chan NotifierChan), closingClients: make(chan NotifierChan), clients: make(map[NotifierChan]struct{}), } } // func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // w.Header().Set("Content-Type", "text/event-stream") // w.Header().Set("Cache-Control", "no-cache") // w.Header().Set("Connection", "keep-alive") // w.Header().Set("Access-Control-Allow-Origin", "*") // // Each connection registers its own message channel with the Broker's connections registry // messageChan := make(NotifierChan) // // Signal the broker that we have a new connection // broker.newClients <- messageChan // // Remove this client from the map of connected clients // // when this handler exits. // defer func() { // broker.closingClients <- messageChan // }() // // c.Stream(func(w io.Writer) bool { // for { // // Emit Server Sent Events compatible // event := <-messageChan // fmt.Fprintf(w, "event:%s; data:%s\n", event.EventName, event.Payload) // // c.SSEvent(event.EventName, event.Payload) // w.(http.Flusher).Flush() // } // } func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Headers (keep these as-is) w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") messageChan := make(NotifierChan) broker.newClients <- messageChan defer func() { broker.closingClients <- messageChan }() for { event := <-messageChan // // Proper SSE formatting // fmt.Fprintf(w, "event: %s\n", event.EventName) // Event name line // fmt.Fprintf(w, "data: %s\n\n", event.Payload) // Data line + empty line // Alternative for multi-line data: fmt.Fprintf(w, "event: %s\n", event.EventName) for _, line := range strings.Split(event.Payload, "\n") { fmt.Fprintf(w, "data: %s\n", line) } fmt.Fprintf(w, "\n") w.(http.Flusher).Flush() } } // Listen for new notifications and redistribute them to clients func (broker *Broker) Listen() { for { select { case s := <-broker.newClients: // A new client has connected. // Register their message channel broker.clients[s] = struct{}{} slog.Info("Client added", "clients listening", len(broker.clients)) case s := <-broker.closingClients: // A client has dettached and we want to // stop sending them messages. delete(broker.clients, s) slog.Info("Client removed", "clients listening", len(broker.clients)) case event := <-broker.Notifier: // We got a new event from the outside! // Send event to all connected clients for clientMessageChan := range broker.clients { select { case clientMessageChan <- event: case <-time.After(patience): slog.Info("Client was skipped", "clients listening", len(broker.clients)) } } } } }