Compare commits

2 Commits

Author SHA1 Message Date
9fc36eb7ea Merge branch 'master' into enha/sse-try 2025-07-15 16:13:59 +03:00
8f9865db3f Enha: simplify sse (worsened event recieving) 2025-07-12 20:33:41 +03:00
6 changed files with 38 additions and 54 deletions

View File

@ -4,12 +4,13 @@ import (
"fmt"
"log/slog"
"net/http"
"os"
"time"
)
// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 1
// const patience time.Duration = time.Second * 1
type (
NotificationEvent struct {
@ -20,22 +21,18 @@ type (
Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier NotifierChan
// New client connections
newClients chan NotifierChan
// Closed client connections
closingClients chan NotifierChan
// Client connections registry
clients map[NotifierChan]struct{}
log *slog.Logger
}
)
func NewBroker() (broker *Broker) {
// Instantiate a broker
return &Broker{
Notifier: make(NotifierChan, 1),
newClients: make(chan NotifierChan),
closingClients: make(chan NotifierChan),
clients: make(map[NotifierChan]struct{}),
Notifier: make(NotifierChan, 100),
log: slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: true,
})),
}
}
@ -44,7 +41,6 @@ var Notifier *Broker
// for use in different packages
func init() {
Notifier = NewBroker()
go Notifier.Listen()
}
func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -59,9 +55,6 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Set("Access-Control-Allow-Credentials", "true")
messageChan := make(NotifierChan, 10) // Buffered channel
broker.newClients <- messageChan
defer func() { broker.closingClients <- messageChan }()
ctx := r.Context()
// browser can close sse on its own; ping every 2s to prevent
heartbeat := time.NewTicker(2 * time.Second)
@ -69,12 +62,14 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
for {
select {
case <-ctx.Done():
broker.log.Debug("broker: got ctx done")
// Client disconnected
return
case event := <-messageChan:
case event := <-broker.Notifier:
broker.log.Debug("got event", "event", event)
_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
if err != nil {
fmt.Println(err)
broker.log.Error("failed to write event", "error", err)
// Client disconnected
return
}
@ -82,45 +77,10 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
case <-heartbeat.C:
// Send SSE heartbeat comment
if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
broker.log.Error("failed to write heartbeat", "error", err)
return // Client disconnected
}
w.(http.Flusher).Flush()
}
}
}
// Listen for new notifications and redistribute them to clients
func (broker *Broker) Listen() {
slog.Info("Broker listener started")
for {
slog.Info("Broker waiting for event")
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = struct{}{}
slog.Info("Client added", "clients listening", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
slog.Info("Client removed", "clients listening", len(broker.clients))
case event := <-broker.Notifier:
slog.Info("Received new event", "event", event.EventName, "payload", event.Payload)
// We got a new event from the outside!
// Send event to all connected clients
slog.Info("Broadcasting event to clients", "client_count", len(broker.clients))
for clientMessageChan := range broker.clients {
slog.Info("Sending event to client", "client", clientMessageChan)
select {
case clientMessageChan <- event:
slog.Info("Successfully sent event to client", "client", clientMessageChan)
case <-time.After(patience):
delete(broker.clients, clientMessageChan)
slog.Warn("Client timed out, removed", "client", clientMessageChan, "clients listening", len(broker.clients))
}
}
slog.Info("Finished broadcasting event")
}
}
}

View File

@ -13,6 +13,12 @@
</head>
<body>
<div id="ancestor" hx-ext="sse" sse-connect="/sub/sse">
<script type="text/javascript">
document.body.addEventListener('htmx:sseError', function (e) {
// do something before the event data is swapped in
console.log(e)
})
</script>
<div id="main-content">
{{template "main" .}}
</div>

View File

@ -22,6 +22,7 @@
{{template "roomlist" .List}}
</div>
{{else}}
<div id="sse-listener" sse-connect="/sub/sse" hx-trigger="sse:roomupdate_{{.State.RoomID}}" hx-get="/room" hx-target="#room-interier" hx-swap="none" style="display:none;"></div>
<div id="room">
{{template "room" .}}
</div>

View File

@ -1,5 +1,5 @@
{{define "room"}}
<div id="interier" hx-get="/" hx-trigger="sse:roomupdate_{{.State.RoomID}}" class=space-y-2>
<div id="room-interier" class=space-y-2>
<div id="headwrapper" class="grid grid-cols-1 md:grid-cols-5 md:gap-4">
<div id="meta" class="md:col-span-1 border-2 rounded-lg text-center space-y-2">
<p>Hello {{.State.Username}};</p>

View File

@ -334,3 +334,19 @@ func HandleRemoveBot(w http.ResponseWriter, r *http.Request) {
}
notify(models.NotifyRoomUpdatePrefix+fi.Room.ID, "")
}
func HandleGetRoom(w http.ResponseWriter, r *http.Request) {
fi, err := getFullInfoByCtx(r.Context())
if err != nil {
abortWithError(w, err.Error())
return
}
tmpl, err := template.ParseGlob("components/*.html")
if err != nil {
abortWithError(w, err.Error())
return
}
if err := tmpl.ExecuteTemplate(w, "room", fi); err != nil {
log.Error("failed to execute template", "error", err)
}
}

View File

@ -92,6 +92,7 @@ func ListenToRequests(port string) *http.Server {
mux.HandleFunc("GET /add-bot", handlers.HandleAddBot)
mux.HandleFunc("GET /remove-bot", handlers.HandleRemoveBot)
mux.HandleFunc("GET /mark-card", handlers.HandleMarkCard)
mux.HandleFunc("GET /room", handlers.HandleGetRoom)
// special
mux.HandleFunc("GET /renotify-bot", handlers.HandleRenotifyBot)
// sse