120 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package broker
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"log/slog"
 | |
| 	"net/http"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // the amount of time to wait when pushing a message to
 | |
| // a slow client or a client that closed after `range clients` started.
 | |
| const patience time.Duration = time.Second * 1
 | |
| 
 | |
| type (
 | |
| 	NotificationEvent struct {
 | |
| 		EventName string
 | |
| 		Payload   string
 | |
| 	}
 | |
| 	NotifierChan chan NotificationEvent
 | |
| 	Broker       struct {
 | |
| 		// Events are pushed to this channel by the main events-gathering routine
 | |
| 		Notifier NotifierChan
 | |
| 		// New client connections
 | |
| 		newClients chan NotifierChan
 | |
| 		// Closed client connections
 | |
| 		closingClients chan NotifierChan
 | |
| 		// Client connections registry
 | |
| 		clients map[NotifierChan]struct{}
 | |
| 	}
 | |
| )
 | |
| 
 | |
| func NewBroker() (broker *Broker) {
 | |
| 	// Instantiate a broker
 | |
| 	return &Broker{
 | |
| 		Notifier:       make(NotifierChan, 1),
 | |
| 		newClients:     make(chan NotifierChan),
 | |
| 		closingClients: make(chan NotifierChan),
 | |
| 		clients:        make(map[NotifierChan]struct{}),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var Notifier *Broker
 | |
| 
 | |
| // for use in different packages
 | |
| func init() {
 | |
| 	Notifier = NewBroker()
 | |
| 	go Notifier.Listen()
 | |
| }
 | |
| 
 | |
| func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | |
| 	// Headers (keep these as-is)
 | |
| 	w.Header().Set("Content-Type", "text/event-stream")
 | |
| 	w.Header().Set("Cache-Control", "no-cache")
 | |
| 	w.Header().Set("Connection", "keep-alive")
 | |
| 	// w.Header().Set("Access-Control-Allow-Origin", "*")
 | |
| 	origin := r.Header.Get("Origin")
 | |
| 	if origin == "" {
 | |
| 		origin = "*" // Fallback for non-browser clients
 | |
| 	}
 | |
| 	w.Header().Set("Access-Control-Allow-Origin", origin)
 | |
| 	w.Header().Set("Access-Control-Allow-Credentials", "true")
 | |
| 	messageChan := make(NotifierChan)
 | |
| 	broker.newClients <- messageChan
 | |
| 	defer func() { broker.closingClients <- messageChan }()
 | |
| 	ctx := r.Context()
 | |
| 	// browser can close sse on its own; ping every 2s to prevent
 | |
| 	heartbeat := time.NewTicker(2 * time.Second)
 | |
| 	defer heartbeat.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			// Client disconnected
 | |
| 			return
 | |
| 		case event := <-messageChan:
 | |
| 			_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
 | |
| 			if err != nil {
 | |
| 				fmt.Println(err)
 | |
| 				// Client disconnected
 | |
| 				return
 | |
| 			}
 | |
| 			w.(http.Flusher).Flush()
 | |
| 		case <-heartbeat.C:
 | |
| 			// Send SSE heartbeat comment
 | |
| 			if _, err := w.Write([]byte(": heartbeat\n\n")); err != nil {
 | |
| 				return // Client disconnected
 | |
| 			}
 | |
| 			w.(http.Flusher).Flush()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Listen for new notifications and redistribute them to clients
 | |
| func (broker *Broker) Listen() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case s := <-broker.newClients:
 | |
| 			// A new client has connected.
 | |
| 			// Register their message channel
 | |
| 			broker.clients[s] = struct{}{}
 | |
| 			slog.Info("Client added", "clients listening", len(broker.clients))
 | |
| 		case s := <-broker.closingClients:
 | |
| 			// A client has dettached and we want to
 | |
| 			// stop sending them messages.
 | |
| 			delete(broker.clients, s)
 | |
| 			slog.Info("Client removed", "clients listening", len(broker.clients))
 | |
| 		case event := <-broker.Notifier:
 | |
| 			// We got a new event from the outside!
 | |
| 			// Send event to all connected clients
 | |
| 			for clientMessageChan := range broker.clients {
 | |
| 				select {
 | |
| 				case clientMessageChan <- event:
 | |
| 				case <-time.After(patience):
 | |
| 					delete(broker.clients, clientMessageChan)
 | |
| 					slog.Info("Client was removed", "clients listening", len(broker.clients))
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | 
