diff --git a/broker/sse.go b/broker/sse.go index 4293dd8..3b4ab79 100644 --- a/broker/sse.go +++ b/broker/sse.go @@ -4,7 +4,6 @@ import ( "fmt" "log/slog" "net/http" - "strings" "time" ) @@ -73,18 +72,21 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { messageChan := make(NotifierChan) broker.newClients <- messageChan defer func() { broker.closingClients <- messageChan }() + ctx := r.Context() for { - event := <-messageChan - // // Proper SSE formatting - // fmt.Fprintf(w, "event: %s\n", event.EventName) // Event name line - // fmt.Fprintf(w, "data: %s\n\n", event.Payload) // Data line + empty line - // Alternative for multi-line data: - fmt.Fprintf(w, "event: %s\n", event.EventName) - for _, line := range strings.Split(event.Payload, "\n") { - fmt.Fprintf(w, "data: %s\n", line) + select { + case <-ctx.Done(): + // Client disconnected + return + case event := <-messageChan: + _, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload) + if err != nil { + fmt.Println(err) + // Client disconnected + return + } + w.(http.Flusher).Flush() } - fmt.Fprintf(w, "\n") - w.(http.Flusher).Flush() } } diff --git a/components/index.html b/components/index.html index 4b0f94a..79b16f6 100644 --- a/components/index.html +++ b/components/index.html @@ -1,4 +1,8 @@ {{define "main"}} +
+ Contents of this box will be updated in real time + with every SSE message received from the chatroom. +
{{ if not . }} {{template "login"}} diff --git a/handlers/auth.go b/handlers/auth.go index 7482944..b565243 100644 --- a/handlers/auth.go +++ b/handlers/auth.go @@ -78,6 +78,7 @@ func HandleFrontLogin(w http.ResponseWriter, r *http.Request) { fi := &models.FullInfo{ State: userstate, } + fi.List = listPublicRooms() // save state to cache if err := saveState(cleanName, userstate); err != nil { // if err := saveFullInfo(fi); err != nil { diff --git a/handlers/handlers.go b/handlers/handlers.go index 4fcc60b..87eb978 100644 --- a/handlers/handlers.go +++ b/handlers/handlers.go @@ -1,18 +1,22 @@ package handlers import ( + "fmt" + "golias/broker" "golias/config" "golias/pkg/cache" "html/template" "log/slog" "net/http" "os" + "time" ) var ( log *slog.Logger cfg *config.Config memcache cache.Cache + Notifier *broker.Broker ) func init() { @@ -22,6 +26,20 @@ func init() { })) memcache = cache.MemCache cfg = config.LoadConfigOrDefault("") + Notifier = broker.NewBroker() + go Notifier.Listen() + ticker := time.NewTicker(2 * time.Second) + go func() { + counter := 0 + for { + <-ticker.C + Notifier.Notifier <- broker.NotificationEvent{ + EventName: "test", + Payload: fmt.Sprintf("%v test call of notifier", counter), + } + counter++ + } + }() } var roundWords = map[string]string{ @@ -69,10 +87,8 @@ func HandleHome(w http.ResponseWriter, r *http.Request) { } } if fi != nil && fi.Room == nil { - log.Debug("loading list") fi.List = listPublicRooms() } - log.Debug("data debug", "fi", fi) if err := tmpl.ExecuteTemplate(w, "base", fi); err != nil { log.Error("failed to exec templ;", "error", err, "templ", "base") } diff --git a/handlers/middleware.go b/handlers/middleware.go index 1b0c355..1da5b12 100644 --- a/handlers/middleware.go +++ b/handlers/middleware.go @@ -11,29 +11,16 @@ import ( "time" ) -// responseWriterWrapper wraps http.ResponseWriter to capture status code -type responseWriterWrapper struct { - http.ResponseWriter - status int -} - -func (w *responseWriterWrapper) WriteHeader(status int) { - w.status = status - w.ResponseWriter.WriteHeader(status) -} - // LogRequests logs all HTTP requests with method, path and duration func LogRequests(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() // Wrap response writer to capture status code - ww := &responseWriterWrapper{ResponseWriter: w} - next.ServeHTTP(ww, r) + next.ServeHTTP(w, r) duration := time.Since(start) log.Debug("request completed", "method", r.Method, "path", r.URL.RequestURI(), - "status", ww.status, "duration", duration.String(), ) }) diff --git a/main.go b/main.go index 1dd8529..dc6d91d 100644 --- a/main.go +++ b/main.go @@ -11,10 +11,11 @@ import ( func ListenToRequests(port string) error { mux := http.NewServeMux() server := &http.Server{ - Handler: handlers.LogRequests(handlers.GetSession(mux)), - Addr: port, - ReadTimeout: time.Second * 5, - WriteTimeout: time.Second * 5, + Handler: handlers.LogRequests(handlers.GetSession(mux)), + Addr: port, + ReadTimeout: time.Second * 5, + // WriteTimeout: time.Second * 5, + WriteTimeout: 0, // sse streaming } fs := http.FileServer(http.Dir("assets/")) mux.Handle("GET /assets/", http.StripPrefix("/assets/", fs)) @@ -34,6 +35,8 @@ func ListenToRequests(port string) error { mux.HandleFunc("GET /room/hideform", handlers.HandleHideCreateForm) mux.HandleFunc("GET /word/show-color", handlers.HandleShowColor) mux.HandleFunc("POST /check/name", handlers.HandleNameCheck) + // sse + mux.Handle("GET /sub/sse", handlers.Notifier) slog.Info("Listening", "addr", port) return server.ListenAndServe() }