Feat: add sse broker

This commit is contained in:
Grail Finder
2025-05-03 13:18:51 +03:00
parent 8d85d0612c
commit 0fbc106f9a
5 changed files with 140 additions and 3 deletions

117
broker/sse.go Normal file
View File

@ -0,0 +1,117 @@
package broker
import (
"fmt"
"log/slog"
"net/http"
"strings"
"time"
)
// the amount of time to wait when pushing a message to
// a slow client or a client that closed after `range clients` started.
const patience time.Duration = time.Second * 1
type (
NotificationEvent struct {
EventName string
Payload string
}
NotifierChan chan NotificationEvent
Broker struct {
// Events are pushed to this channel by the main events-gathering routine
Notifier NotifierChan
// New client connections
newClients chan NotifierChan
// Closed client connections
closingClients chan NotifierChan
// Client connections registry
clients map[NotifierChan]struct{}
}
)
func NewBroker() (broker *Broker) {
// Instantiate a broker
return &Broker{
Notifier: make(NotifierChan, 1),
newClients: make(chan NotifierChan),
closingClients: make(chan NotifierChan),
clients: make(map[NotifierChan]struct{}),
}
}
// func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// w.Header().Set("Content-Type", "text/event-stream")
// w.Header().Set("Cache-Control", "no-cache")
// w.Header().Set("Connection", "keep-alive")
// w.Header().Set("Access-Control-Allow-Origin", "*")
// // Each connection registers its own message channel with the Broker's connections registry
// messageChan := make(NotifierChan)
// // Signal the broker that we have a new connection
// broker.newClients <- messageChan
// // Remove this client from the map of connected clients
// // when this handler exits.
// defer func() {
// broker.closingClients <- messageChan
// }()
// // c.Stream(func(w io.Writer) bool {
// for {
// // Emit Server Sent Events compatible
// event := <-messageChan
// fmt.Fprintf(w, "event:%s; data:%s\n", event.EventName, event.Payload)
// // c.SSEvent(event.EventName, event.Payload)
// w.(http.Flusher).Flush()
// }
// }
func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Headers (keep these as-is)
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
messageChan := make(NotifierChan)
broker.newClients <- messageChan
defer func() { broker.closingClients <- messageChan }()
for {
event := <-messageChan
// // Proper SSE formatting
// fmt.Fprintf(w, "event: %s\n", event.EventName) // Event name line
// fmt.Fprintf(w, "data: %s\n\n", event.Payload) // Data line + empty line
// Alternative for multi-line data:
fmt.Fprintf(w, "event: %s\n", event.EventName)
for _, line := range strings.Split(event.Payload, "\n") {
fmt.Fprintf(w, "data: %s\n", line)
}
fmt.Fprintf(w, "\n")
w.(http.Flusher).Flush()
}
}
// Listen for new notifications and redistribute them to clients
func (broker *Broker) Listen() {
for {
select {
case s := <-broker.newClients:
// A new client has connected.
// Register their message channel
broker.clients[s] = struct{}{}
slog.Info("Client added", "clients listening", len(broker.clients))
case s := <-broker.closingClients:
// A client has dettached and we want to
// stop sending them messages.
delete(broker.clients, s)
slog.Info("Client removed", "clients listening", len(broker.clients))
case event := <-broker.Notifier:
// We got a new event from the outside!
// Send event to all connected clients
for clientMessageChan := range broker.clients {
select {
case clientMessageChan <- event:
case <-time.After(patience):
slog.Info("Client was skipped", "clients listening", len(broker.clients))
}
}
}
}
}

View File

@ -2,6 +2,7 @@ package handlers
import (
"context"
"encoding/json"
"golias/models"
)
@ -9,6 +10,18 @@ func createRoom(ctx context.Context, req *models.RoomReq) (*models.Room, error)
return nil, nil
}
func getRoomByID(roomID string) (*models.Room, error) {
roomBytes, err := memcache.Get(models.CacheRoomPrefix + roomID)
if err != nil {
return nil, err
}
resp := &models.Room{}
if err := json.Unmarshal(roomBytes, &resp); err != nil {
return nil, err
}
return resp, nil
}
// context
func getRoomIDFromCtx(ctx context.Context) string {
id, _ := ctx.Value(models.CtxRoomIDKey).(string)

View File

@ -40,6 +40,8 @@ func HandleShowColor(w http.ResponseWriter, r *http.Request) {
abortWithError(w, "failed to get room")
return
}
log.Debug("got room", "room", room)
// update room score
color, exists := roundWords[word]
log.Debug("got show-color request", "word", word, "color", color)
if !exists {

View File

@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/base64"
"errors"
"golias/models"
"net/http"
"time"
)
@ -91,9 +92,9 @@ func GetSession(next http.Handler) http.Handler {
return
}
ctx := context.WithValue(r.Context(),
"username", userSession.Username)
models.CtxUsernameKey, userSession.Username)
ctx = context.WithValue(r.Context(),
"session", userSession)
models.CtxSessionKey, userSession)
if err := cacheSetSession(sessionToken,
userSession); err != nil {
msg := "failed to marshal user session"

View File

@ -1,5 +1,9 @@
package models
var (
CtxRoomIDKey = "current_room"
CtxRoomIDKey = "current_room"
CtxUsernameKey = "username"
CtxSessionKey = "session"
// cache
CacheRoomPrefix = "room#"
)