Fix: sse changes

This commit is contained in:
Grail Finder
2025-05-10 09:01:51 +03:00
parent 2f4891473b
commit 416cc63ec0
6 changed files with 44 additions and 31 deletions

View File

@ -4,7 +4,6 @@ import (
"fmt" "fmt"
"log/slog" "log/slog"
"net/http" "net/http"
"strings"
"time" "time"
) )
@ -73,19 +72,22 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
messageChan := make(NotifierChan) messageChan := make(NotifierChan)
broker.newClients <- messageChan broker.newClients <- messageChan
defer func() { broker.closingClients <- messageChan }() defer func() { broker.closingClients <- messageChan }()
ctx := r.Context()
for { for {
event := <-messageChan select {
// // Proper SSE formatting case <-ctx.Done():
// fmt.Fprintf(w, "event: %s\n", event.EventName) // Event name line // Client disconnected
// fmt.Fprintf(w, "data: %s\n\n", event.Payload) // Data line + empty line return
// Alternative for multi-line data: case event := <-messageChan:
fmt.Fprintf(w, "event: %s\n", event.EventName) _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
for _, line := range strings.Split(event.Payload, "\n") { if err != nil {
fmt.Fprintf(w, "data: %s\n", line) fmt.Println(err)
// Client disconnected
return
} }
fmt.Fprintf(w, "\n")
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
}
} }
// Listen for new notifications and redistribute them to clients // Listen for new notifications and redistribute them to clients

View File

@ -1,4 +1,8 @@
{{define "main"}} {{define "main"}}
<div hx-ext="sse" sse-connect="/sub/sse" sse-swap="test">
Contents of this box will be updated in real time
with every SSE message received from the chatroom.
</div>
<!-- user has no username -> login form --> <!-- user has no username -> login form -->
{{ if not . }} {{ if not . }}
{{template "login"}} {{template "login"}}

View File

@ -78,6 +78,7 @@ func HandleFrontLogin(w http.ResponseWriter, r *http.Request) {
fi := &models.FullInfo{ fi := &models.FullInfo{
State: userstate, State: userstate,
} }
fi.List = listPublicRooms()
// save state to cache // save state to cache
if err := saveState(cleanName, userstate); err != nil { if err := saveState(cleanName, userstate); err != nil {
// if err := saveFullInfo(fi); err != nil { // if err := saveFullInfo(fi); err != nil {

View File

@ -1,18 +1,22 @@
package handlers package handlers
import ( import (
"fmt"
"golias/broker"
"golias/config" "golias/config"
"golias/pkg/cache" "golias/pkg/cache"
"html/template" "html/template"
"log/slog" "log/slog"
"net/http" "net/http"
"os" "os"
"time"
) )
var ( var (
log *slog.Logger log *slog.Logger
cfg *config.Config cfg *config.Config
memcache cache.Cache memcache cache.Cache
Notifier *broker.Broker
) )
func init() { func init() {
@ -22,6 +26,20 @@ func init() {
})) }))
memcache = cache.MemCache memcache = cache.MemCache
cfg = config.LoadConfigOrDefault("") cfg = config.LoadConfigOrDefault("")
Notifier = broker.NewBroker()
go Notifier.Listen()
ticker := time.NewTicker(2 * time.Second)
go func() {
counter := 0
for {
<-ticker.C
Notifier.Notifier <- broker.NotificationEvent{
EventName: "test",
Payload: fmt.Sprintf("%v test call of notifier", counter),
}
counter++
}
}()
} }
var roundWords = map[string]string{ var roundWords = map[string]string{
@ -69,10 +87,8 @@ func HandleHome(w http.ResponseWriter, r *http.Request) {
} }
} }
if fi != nil && fi.Room == nil { if fi != nil && fi.Room == nil {
log.Debug("loading list")
fi.List = listPublicRooms() fi.List = listPublicRooms()
} }
log.Debug("data debug", "fi", fi)
if err := tmpl.ExecuteTemplate(w, "base", fi); err != nil { if err := tmpl.ExecuteTemplate(w, "base", fi); err != nil {
log.Error("failed to exec templ;", "error", err, "templ", "base") log.Error("failed to exec templ;", "error", err, "templ", "base")
} }

View File

@ -11,29 +11,16 @@ import (
"time" "time"
) )
// responseWriterWrapper wraps http.ResponseWriter to capture status code
type responseWriterWrapper struct {
http.ResponseWriter
status int
}
func (w *responseWriterWrapper) WriteHeader(status int) {
w.status = status
w.ResponseWriter.WriteHeader(status)
}
// LogRequests logs all HTTP requests with method, path and duration // LogRequests logs all HTTP requests with method, path and duration
func LogRequests(next http.Handler) http.Handler { func LogRequests(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now() start := time.Now()
// Wrap response writer to capture status code // Wrap response writer to capture status code
ww := &responseWriterWrapper{ResponseWriter: w} next.ServeHTTP(w, r)
next.ServeHTTP(ww, r)
duration := time.Since(start) duration := time.Since(start)
log.Debug("request completed", log.Debug("request completed",
"method", r.Method, "method", r.Method,
"path", r.URL.RequestURI(), "path", r.URL.RequestURI(),
"status", ww.status,
"duration", duration.String(), "duration", duration.String(),
) )
}) })

View File

@ -14,7 +14,8 @@ func ListenToRequests(port string) error {
Handler: handlers.LogRequests(handlers.GetSession(mux)), Handler: handlers.LogRequests(handlers.GetSession(mux)),
Addr: port, Addr: port,
ReadTimeout: time.Second * 5, ReadTimeout: time.Second * 5,
WriteTimeout: time.Second * 5, // WriteTimeout: time.Second * 5,
WriteTimeout: 0, // sse streaming
} }
fs := http.FileServer(http.Dir("assets/")) fs := http.FileServer(http.Dir("assets/"))
mux.Handle("GET /assets/", http.StripPrefix("/assets/", fs)) mux.Handle("GET /assets/", http.StripPrefix("/assets/", fs))
@ -34,6 +35,8 @@ func ListenToRequests(port string) error {
mux.HandleFunc("GET /room/hideform", handlers.HandleHideCreateForm) mux.HandleFunc("GET /room/hideform", handlers.HandleHideCreateForm)
mux.HandleFunc("GET /word/show-color", handlers.HandleShowColor) mux.HandleFunc("GET /word/show-color", handlers.HandleShowColor)
mux.HandleFunc("POST /check/name", handlers.HandleNameCheck) mux.HandleFunc("POST /check/name", handlers.HandleNameCheck)
// sse
mux.Handle("GET /sub/sse", handlers.Notifier)
slog.Info("Listening", "addr", port) slog.Info("Listening", "addr", port)
return server.ListenAndServe() return server.ListenAndServe()
} }