|
|
@@ -1,6 +1,7 @@
|
|
|
|
package broker
|
|
|
|
package broker
|
|
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
import (
|
|
|
|
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"fmt"
|
|
|
|
"log/slog"
|
|
|
|
"log/slog"
|
|
|
|
"net/http"
|
|
|
|
"net/http"
|
|
|
@@ -22,6 +23,8 @@ type (
|
|
|
|
// Events are pushed to this channel by the main events-gathering routine
|
|
|
|
// Events are pushed to this channel by the main events-gathering routine
|
|
|
|
Notifier NotifierChan
|
|
|
|
Notifier NotifierChan
|
|
|
|
log *slog.Logger
|
|
|
|
log *slog.Logger
|
|
|
|
|
|
|
|
addClient chan NotifierChan
|
|
|
|
|
|
|
|
clients map[NotifierChan]struct{}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
@@ -33,6 +36,8 @@ func NewBroker() (broker *Broker) {
|
|
|
|
Level: slog.LevelDebug,
|
|
|
|
Level: slog.LevelDebug,
|
|
|
|
AddSource: true,
|
|
|
|
AddSource: true,
|
|
|
|
})),
|
|
|
|
})),
|
|
|
|
|
|
|
|
addClient: make(chan NotifierChan, 10),
|
|
|
|
|
|
|
|
clients: map[NotifierChan]struct{}{},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@@ -56,8 +61,10 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
w.Header().Set("Access-Control-Allow-Origin", origin)
|
|
|
|
w.Header().Set("Access-Control-Allow-Origin", origin)
|
|
|
|
w.Header().Set("Access-Control-Allow-Credentials", "true")
|
|
|
|
w.Header().Set("Access-Control-Allow-Credentials", "true")
|
|
|
|
ctx := r.Context()
|
|
|
|
ctx := r.Context()
|
|
|
|
|
|
|
|
msgChan := make(NotifierChan, 10)
|
|
|
|
|
|
|
|
broker.addClient <- msgChan
|
|
|
|
// browser can close sse on its own; ping every 2s to prevent
|
|
|
|
// browser can close sse on its own; ping every 2s to prevent
|
|
|
|
heartbeat := time.NewTicker(2 * time.Second)
|
|
|
|
heartbeat := time.NewTicker(8 * time.Second)
|
|
|
|
defer heartbeat.Stop()
|
|
|
|
defer heartbeat.Stop()
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
select {
|
|
|
@@ -67,13 +74,20 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
case event := <-broker.Notifier:
|
|
|
|
case event := <-broker.Notifier:
|
|
|
|
broker.log.Debug("got event", "event", event)
|
|
|
|
broker.log.Debug("got event", "event", event)
|
|
|
|
|
|
|
|
for i := 0; i < 10; i++ { // Repeat 3 times
|
|
|
|
_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
|
|
|
|
_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
broker.log.Error("failed to write event", "error", err)
|
|
|
|
broker.log.Error("write failed", "error", err)
|
|
|
|
// Client disconnected
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
w.(http.Flusher).Flush()
|
|
|
|
w.(http.Flusher).Flush()
|
|
|
|
|
|
|
|
// Short delay between sends (non-blocking)
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case <-time.After(20 * time.Millisecond): // Adjust delay as needed
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
case <-heartbeat.C:
|
|
|
|
case <-heartbeat.C:
|
|
|
|
// Send SSE heartbeat comment
|
|
|
|
// Send SSE heartbeat comment
|
|
|
|
if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
|
|
|
|
if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
|
|
|
@@ -84,3 +98,15 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
func (broker *Broker) Listen(ctx context.Context) {
|
|
|
|
|
|
|
|
for {
|
|
|
|
|
|
|
|
select {
|
|
|
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
|
|
|
|
return
|
|
|
|
|
|
|
|
case clientChan := <-broker.addClient:
|
|
|
|
|
|
|
|
// mutex
|
|
|
|
|
|
|
|
broker.clients[clientChan] = struct{}{}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|