113 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			113 lines
		
	
	
		
			2.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package broker
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"log/slog"
 | |
| 	"net/http"
 | |
| 	"os"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // the amount of time to wait when pushing a message to
 | |
| // a slow client or a client that closed after `range clients` started.
 | |
| // const patience time.Duration = time.Second * 1
 | |
| 
 | |
| type (
 | |
| 	NotificationEvent struct {
 | |
| 		EventName string
 | |
| 		Payload   string
 | |
| 	}
 | |
| 	NotifierChan chan NotificationEvent
 | |
| 	Broker       struct {
 | |
| 		// Events are pushed to this channel by the main events-gathering routine
 | |
| 		Notifier  NotifierChan
 | |
| 		log       *slog.Logger
 | |
| 		addClient chan NotifierChan
 | |
| 		clients   map[NotifierChan]struct{}
 | |
| 	}
 | |
| )
 | |
| 
 | |
| func NewBroker() (broker *Broker) {
 | |
| 	// Instantiate a broker
 | |
| 	return &Broker{
 | |
| 		Notifier: make(NotifierChan, 100),
 | |
| 		log: slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
 | |
| 			Level:     slog.LevelDebug,
 | |
| 			AddSource: true,
 | |
| 		})),
 | |
| 		addClient: make(chan NotifierChan, 10),
 | |
| 		clients:   map[NotifierChan]struct{}{},
 | |
| 	}
 | |
| }
 | |
| 
 | |
| var Notifier *Broker
 | |
| 
 | |
| // for use in different packages
 | |
| func init() {
 | |
| 	Notifier = NewBroker()
 | |
| }
 | |
| 
 | |
| func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 | |
| 	// Headers (keep these as-is)
 | |
| 	w.Header().Set("Content-Type", "text/event-stream")
 | |
| 	w.Header().Set("Cache-Control", "no-cache")
 | |
| 	w.Header().Set("Connection", "keep-alive")
 | |
| 	// w.Header().Set("Access-Control-Allow-Origin", "*")
 | |
| 	origin := r.Header.Get("Origin")
 | |
| 	if origin == "" {
 | |
| 		origin = "*" // Fallback for non-browser clients
 | |
| 	}
 | |
| 	w.Header().Set("Access-Control-Allow-Origin", origin)
 | |
| 	w.Header().Set("Access-Control-Allow-Credentials", "true")
 | |
| 	ctx := r.Context()
 | |
| 	msgChan := make(NotifierChan, 10)
 | |
| 	broker.addClient <- msgChan
 | |
| 	// browser can close sse on its own; ping every 2s to prevent
 | |
| 	heartbeat := time.NewTicker(8 * time.Second)
 | |
| 	defer heartbeat.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			broker.log.Debug("broker: got ctx done")
 | |
| 			// Client disconnected
 | |
| 			return
 | |
| 		case event := <-broker.Notifier:
 | |
| 			broker.log.Debug("got event", "event", event)
 | |
| 			for i := 0; i < 10; i++ { // Repeat 3 times
 | |
| 				_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
 | |
| 				if err != nil {
 | |
| 					broker.log.Error("write failed", "error", err)
 | |
| 					return
 | |
| 				}
 | |
| 				w.(http.Flusher).Flush()
 | |
| 				// Short delay between sends (non-blocking)
 | |
| 				select {
 | |
| 				case <-time.After(20 * time.Millisecond): // Adjust delay as needed
 | |
| 				case <-ctx.Done():
 | |
| 					return
 | |
| 				}
 | |
| 			}
 | |
| 		case <-heartbeat.C:
 | |
| 			// Send SSE heartbeat comment
 | |
| 			if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
 | |
| 				broker.log.Error("failed to write heartbeat", "error", err)
 | |
| 				return // Client disconnected
 | |
| 			}
 | |
| 			w.(http.Flusher).Flush()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (broker *Broker) Listen(ctx context.Context) {
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-ctx.Done():
 | |
| 			return
 | |
| 		case clientChan := <-broker.addClient:
 | |
| 			// mutex
 | |
| 			broker.clients[clientChan] = struct{}{}
 | |
| 		}
 | |
| 	}
 | |
| }
 | 
