120 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			120 lines
		
	
	
		
			3.5 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package broker
 | |
| 
 | |
| import (
 | |
| 	"fmt"
 | |
| 	"log/slog"
 | |
| 	"net/http"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // the amount of time to wait when pushing a message to
 | |
| // a slow client or a client that closed after `range clients` started.
 | |
| const patience time.Duration = time.Second * 1
 | |
| 
 | |
| type (
 | |
| 	NotificationEvent struct {
 | |
| 		EventName string
 | |
| 		Payload   string
 | |
| 	}
 | |
| 	NotifierChan chan NotificationEvent
 | |
| 	Broker       struct {
 | |
| 		// Events are pushed to this channel by the main events-gathering routine
 | |
| 		Notifier NotifierChan
 | |
| 		// New client connections
 | |
| 		newClients chan NotifierChan
 | |
| 		// Closed client connections
 | |
| 		closingClients chan NotifierChan
 | |
| 		// Client connections registry
 | |
| 		clients map[NotifierChan]struct{}
 | |
| 	}
 | |
| )
 | |
| 
 | |
| func NewBroker() (broker *Broker) {
 | |
| 	// Instantiate a broker
 | |
| 	return &Broker{
 | |
| 		Notifier:       make(NotifierChan, 1),
 | |
| 		newClients:     make(chan NotifierChan),
 | |
| 		closingClients: make(chan NotifierChan),
 | |
| 		clients:        make(map[NotifierChan]struct{}),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | |
| // 	w.Header().Set("Content-Type", "text/event-stream")
 | |
| // 	w.Header().Set("Cache-Control", "no-cache")
 | |
| // 	w.Header().Set("Connection", "keep-alive")
 | |
| // 	w.Header().Set("Access-Control-Allow-Origin", "*")
 | |
| // 	// Each connection registers its own message channel with the Broker's connections registry
 | |
| // 	messageChan := make(NotifierChan)
 | |
| // 	// Signal the broker that we have a new connection
 | |
| // 	broker.newClients <- messageChan
 | |
| // 	// Remove this client from the map of connected clients
 | |
| // 	// when this handler exits.
 | |
| // 	defer func() {
 | |
| // 		broker.closingClients <- messageChan
 | |
| // 	}()
 | |
| // 	// c.Stream(func(w io.Writer) bool {
 | |
| // 	for {
 | |
| // 		// Emit Server Sent Events compatible
 | |
| // 		event := <-messageChan
 | |
| // 		fmt.Fprintf(w, "event:%s; data:%s\n", event.EventName, event.Payload)
 | |
| // 		// c.SSEvent(event.EventName, event.Payload)
 | |
| // 		w.(http.Flusher).Flush()
 | |
| // 	}
 | |
| // }
 | |
| 
 | |
| func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | |
| 	// Headers (keep these as-is)
 | |
| 	w.Header().Set("Content-Type", "text/event-stream")
 | |
| 	w.Header().Set("Cache-Control", "no-cache")
 | |
| 	w.Header().Set("Connection", "keep-alive")
 | |
| 	w.Header().Set("Access-Control-Allow-Origin", "*")
 | |
| 	messageChan := make(NotifierChan)
 | |
| 	broker.newClients <- messageChan
 | |
| 	defer func() { broker.closingClients <- messageChan }()
 | |
| 	ctx := r.Context()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			// Client disconnected
 | |
| 			return
 | |
| 		case event := <-messageChan:
 | |
| 			_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
 | |
| 			if err != nil {
 | |
| 				fmt.Println(err)
 | |
| 				// Client disconnected
 | |
| 				return
 | |
| 			}
 | |
| 			w.(http.Flusher).Flush()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Listen for new notifications and redistribute them to clients
 | |
| func (broker *Broker) Listen() {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case s := <-broker.newClients:
 | |
| 			// A new client has connected.
 | |
| 			// Register their message channel
 | |
| 			broker.clients[s] = struct{}{}
 | |
| 			slog.Info("Client added", "clients listening", len(broker.clients))
 | |
| 		case s := <-broker.closingClients:
 | |
| 			// A client has dettached and we want to
 | |
| 			// stop sending them messages.
 | |
| 			delete(broker.clients, s)
 | |
| 			slog.Info("Client removed", "clients listening", len(broker.clients))
 | |
| 		case event := <-broker.Notifier:
 | |
| 			// We got a new event from the outside!
 | |
| 			// Send event to all connected clients
 | |
| 			for clientMessageChan := range broker.clients {
 | |
| 				select {
 | |
| 				case clientMessageChan <- event:
 | |
| 				case <-time.After(patience):
 | |
| 					slog.Info("Client was skipped", "clients listening", len(broker.clients))
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | 
