Files
gralias/broker/sse.go
2025-07-17 23:02:51 +03:00

94 lines
2.4 KiB
Go

package broker
import (
"fmt"
"log/slog"
"net/http"
"os"
"time"
)
// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
// const patience time.Duration = time.Second * 1
type (
NotificationEvent struct {
EventName string
Payload string
}
NotifierChan chan NotificationEvent
Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier NotifierChan
log *slog.Logger
}
)
func NewBroker() (broker *Broker) {
// Instantiate a broker
return &Broker{
Notifier: make(NotifierChan, 100),
log: slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
})),
}
}
var Notifier *Broker
// for use in different packages
func init() {
Notifier = NewBroker()
}
func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Headers (keep these as-is)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// w.Header().Set("Access-Control-Allow-Origin", "*")
origin := r.Header.Get("Origin")
if origin == "" {
origin = "*" // Fallback for non-browser clients
}
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Credentials", "true")
ctx := r.Context()
// browser can close sse on its own; ping every 2s to prevent
heartbeat := time.NewTicker(8 * time.Second)
defer heartbeat.Stop()
for {
select {
case <-ctx.Done():
broker.log.Debug("broker: got ctx done")
// Client disconnected
return
case event := <-broker.Notifier:
broker.log.Debug("got event", "event", event)
for i := 0; i < 10; i++ { // Repeat 3 times
_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
if err != nil {
broker.log.Error("write failed", "error", err)
return
}
w.(http.Flusher).Flush()
// Short delay between sends (non-blocking)
select {
case <-time.After(20 * time.Millisecond): // Adjust delay as needed
case <-ctx.Done():
return
}
}
case <-heartbeat.C:
// Send SSE heartbeat comment
if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
broker.log.Error("failed to write heartbeat", "error", err)
return // Client disconnected
}
w.(http.Flusher).Flush()
}
}
}