From 0fbc106f9a945575d327c9631912e3638a772b28 Mon Sep 17 00:00:00 2001 From: Grail Finder Date: Sat, 3 May 2025 13:18:51 +0300 Subject: [PATCH] Feat: add sse broker --- broker/sse.go | 117 +++++++++++++++++++++++++++++++++++++++++ handlers/actions.go | 13 +++++ handlers/elements.go | 2 + handlers/middleware.go | 5 +- models/keys.go | 6 ++- 5 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 broker/sse.go diff --git a/broker/sse.go b/broker/sse.go new file mode 100644 index 0000000..4293dd8 --- /dev/null +++ b/broker/sse.go @@ -0,0 +1,117 @@ +package broker + +import ( + "fmt" + "log/slog" + "net/http" + "strings" + "time" +) + +// the amount of time to wait when pushing a message to +// a slow client or a client that closed after `range clients` started. +const patience time.Duration = time.Second * 1 + +type ( + NotificationEvent struct { + EventName string + Payload string + } + NotifierChan chan NotificationEvent + Broker struct { + // Events are pushed to this channel by the main events-gathering routine + Notifier NotifierChan + // New client connections + newClients chan NotifierChan + // Closed client connections + closingClients chan NotifierChan + // Client connections registry + clients map[NotifierChan]struct{} + } +) + +func NewBroker() (broker *Broker) { + // Instantiate a broker + return &Broker{ + Notifier: make(NotifierChan, 1), + newClients: make(chan NotifierChan), + closingClients: make(chan NotifierChan), + clients: make(map[NotifierChan]struct{}), + } +} + +// func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { +// w.Header().Set("Content-Type", "text/event-stream") +// w.Header().Set("Cache-Control", "no-cache") +// w.Header().Set("Connection", "keep-alive") +// w.Header().Set("Access-Control-Allow-Origin", "*") +// // Each connection registers its own message channel with the Broker's connections registry +// messageChan := make(NotifierChan) +// // Signal the broker that we have a new connection +// broker.newClients <- messageChan +// // Remove this client from the map of connected clients +// // when this handler exits. +// defer func() { +// broker.closingClients <- messageChan +// }() +// // c.Stream(func(w io.Writer) bool { +// for { +// // Emit Server Sent Events compatible +// event := <-messageChan +// fmt.Fprintf(w, "event:%s; data:%s\n", event.EventName, event.Payload) +// // c.SSEvent(event.EventName, event.Payload) +// w.(http.Flusher).Flush() +// } +// } + +func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // Headers (keep these as-is) + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + messageChan := make(NotifierChan) + broker.newClients <- messageChan + defer func() { broker.closingClients <- messageChan }() + for { + event := <-messageChan + // // Proper SSE formatting + // fmt.Fprintf(w, "event: %s\n", event.EventName) // Event name line + // fmt.Fprintf(w, "data: %s\n\n", event.Payload) // Data line + empty line + // Alternative for multi-line data: + fmt.Fprintf(w, "event: %s\n", event.EventName) + for _, line := range strings.Split(event.Payload, "\n") { + fmt.Fprintf(w, "data: %s\n", line) + } + fmt.Fprintf(w, "\n") + w.(http.Flusher).Flush() + } +} + +// Listen for new notifications and redistribute them to clients +func (broker *Broker) Listen() { + for { + select { + case s := <-broker.newClients: + // A new client has connected. + // Register their message channel + broker.clients[s] = struct{}{} + slog.Info("Client added", "clients listening", len(broker.clients)) + case s := <-broker.closingClients: + // A client has dettached and we want to + // stop sending them messages. + delete(broker.clients, s) + slog.Info("Client removed", "clients listening", len(broker.clients)) + case event := <-broker.Notifier: + // We got a new event from the outside! + // Send event to all connected clients + for clientMessageChan := range broker.clients { + select { + case clientMessageChan <- event: + case <-time.After(patience): + slog.Info("Client was skipped", "clients listening", len(broker.clients)) + } + } + } + } +} diff --git a/handlers/actions.go b/handlers/actions.go index 34a4a0f..f2e91fc 100644 --- a/handlers/actions.go +++ b/handlers/actions.go @@ -2,6 +2,7 @@ package handlers import ( "context" + "encoding/json" "golias/models" ) @@ -9,6 +10,18 @@ func createRoom(ctx context.Context, req *models.RoomReq) (*models.Room, error) return nil, nil } +func getRoomByID(roomID string) (*models.Room, error) { + roomBytes, err := memcache.Get(models.CacheRoomPrefix + roomID) + if err != nil { + return nil, err + } + resp := &models.Room{} + if err := json.Unmarshal(roomBytes, &resp); err != nil { + return nil, err + } + return resp, nil +} + // context func getRoomIDFromCtx(ctx context.Context) string { id, _ := ctx.Value(models.CtxRoomIDKey).(string) diff --git a/handlers/elements.go b/handlers/elements.go index c5659af..0ae3905 100644 --- a/handlers/elements.go +++ b/handlers/elements.go @@ -40,6 +40,8 @@ func HandleShowColor(w http.ResponseWriter, r *http.Request) { abortWithError(w, "failed to get room") return } + log.Debug("got room", "room", room) + // update room score color, exists := roundWords[word] log.Debug("got show-color request", "word", word, "color", color) if !exists { diff --git a/handlers/middleware.go b/handlers/middleware.go index dfdae06..02dbe8f 100644 --- a/handlers/middleware.go +++ b/handlers/middleware.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/base64" "errors" + "golias/models" "net/http" "time" ) @@ -91,9 +92,9 @@ func GetSession(next http.Handler) http.Handler { return } ctx := context.WithValue(r.Context(), - "username", userSession.Username) + models.CtxUsernameKey, userSession.Username) ctx = context.WithValue(r.Context(), - "session", userSession) + models.CtxSessionKey, userSession) if err := cacheSetSession(sessionToken, userSession); err != nil { msg := "failed to marshal user session" diff --git a/models/keys.go b/models/keys.go index 5fae370..52a0665 100644 --- a/models/keys.go +++ b/models/keys.go @@ -1,5 +1,9 @@ package models var ( - CtxRoomIDKey = "current_room" + CtxRoomIDKey = "current_room" + CtxUsernameKey = "username" + CtxSessionKey = "session" + // cache + CacheRoomPrefix = "room#" )