Compare commits
2 Commits
master
...
enha/sse-t
Author | SHA1 | Date | |
---|---|---|---|
9fc36eb7ea | |||
8f9865db3f |
@ -4,12 +4,13 @@ import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// the amount of time to wait when pushing a message to
|
||||
// a slow client or a client that closed after `range clients` started.
|
||||
const patience time.Duration = time.Second * 1
|
||||
// const patience time.Duration = time.Second * 1
|
||||
|
||||
type (
|
||||
NotificationEvent struct {
|
||||
@ -20,22 +21,18 @@ type (
|
||||
Broker struct {
|
||||
// Events are pushed to this channel by the main events-gathering routine
|
||||
Notifier NotifierChan
|
||||
// New client connections
|
||||
newClients chan NotifierChan
|
||||
// Closed client connections
|
||||
closingClients chan NotifierChan
|
||||
// Client connections registry
|
||||
clients map[NotifierChan]struct{}
|
||||
log *slog.Logger
|
||||
}
|
||||
)
|
||||
|
||||
func NewBroker() (broker *Broker) {
|
||||
// Instantiate a broker
|
||||
return &Broker{
|
||||
Notifier: make(NotifierChan, 1),
|
||||
newClients: make(chan NotifierChan),
|
||||
closingClients: make(chan NotifierChan),
|
||||
clients: make(map[NotifierChan]struct{}),
|
||||
Notifier: make(NotifierChan, 100),
|
||||
log: slog.New(slog.NewJSONHandler(os.Stderr, &slog.HandlerOptions{
|
||||
Level: slog.LevelDebug,
|
||||
AddSource: true,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
@ -44,7 +41,6 @@ var Notifier *Broker
|
||||
// for use in different packages
|
||||
func init() {
|
||||
Notifier = NewBroker()
|
||||
go Notifier.Listen()
|
||||
}
|
||||
|
||||
func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
@ -59,9 +55,6 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
w.Header().Set("Access-Control-Allow-Origin", origin)
|
||||
w.Header().Set("Access-Control-Allow-Credentials", "true")
|
||||
messageChan := make(NotifierChan, 10) // Buffered channel
|
||||
broker.newClients <- messageChan
|
||||
defer func() { broker.closingClients <- messageChan }()
|
||||
ctx := r.Context()
|
||||
// browser can close sse on its own; ping every 2s to prevent
|
||||
heartbeat := time.NewTicker(2 * time.Second)
|
||||
@ -69,12 +62,14 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
broker.log.Debug("broker: got ctx done")
|
||||
// Client disconnected
|
||||
return
|
||||
case event := <-messageChan:
|
||||
case event := <-broker.Notifier:
|
||||
broker.log.Debug("got event", "event", event)
|
||||
_, err := fmt.Fprintf(w, "event: %s\ndata: %s\n\n", event.EventName, event.Payload)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
broker.log.Error("failed to write event", "error", err)
|
||||
// Client disconnected
|
||||
return
|
||||
}
|
||||
@ -82,45 +77,10 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
case <-heartbeat.C:
|
||||
// Send SSE heartbeat comment
|
||||
if _, err := fmt.Fprint(w, ":\n\n"); err != nil {
|
||||
broker.log.Error("failed to write heartbeat", "error", err)
|
||||
return // Client disconnected
|
||||
}
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Listen for new notifications and redistribute them to clients
|
||||
func (broker *Broker) Listen() {
|
||||
slog.Info("Broker listener started")
|
||||
for {
|
||||
slog.Info("Broker waiting for event")
|
||||
select {
|
||||
case s := <-broker.newClients:
|
||||
// A new client has connected.
|
||||
// Register their message channel
|
||||
broker.clients[s] = struct{}{}
|
||||
slog.Info("Client added", "clients listening", len(broker.clients))
|
||||
case s := <-broker.closingClients:
|
||||
// A client has dettached and we want to
|
||||
// stop sending them messages.
|
||||
delete(broker.clients, s)
|
||||
slog.Info("Client removed", "clients listening", len(broker.clients))
|
||||
case event := <-broker.Notifier:
|
||||
slog.Info("Received new event", "event", event.EventName, "payload", event.Payload)
|
||||
// We got a new event from the outside!
|
||||
// Send event to all connected clients
|
||||
slog.Info("Broadcasting event to clients", "client_count", len(broker.clients))
|
||||
for clientMessageChan := range broker.clients {
|
||||
slog.Info("Sending event to client", "client", clientMessageChan)
|
||||
select {
|
||||
case clientMessageChan <- event:
|
||||
slog.Info("Successfully sent event to client", "client", clientMessageChan)
|
||||
case <-time.After(patience):
|
||||
delete(broker.clients, clientMessageChan)
|
||||
slog.Warn("Client timed out, removed", "client", clientMessageChan, "clients listening", len(broker.clients))
|
||||
}
|
||||
}
|
||||
slog.Info("Finished broadcasting event")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -13,6 +13,12 @@
|
||||
</head>
|
||||
<body>
|
||||
<div id="ancestor" hx-ext="sse" sse-connect="/sub/sse">
|
||||
<script type="text/javascript">
|
||||
document.body.addEventListener('htmx:sseError', function (e) {
|
||||
// do something before the event data is swapped in
|
||||
console.log(e)
|
||||
})
|
||||
</script>
|
||||
<div id="main-content">
|
||||
{{template "main" .}}
|
||||
</div>
|
||||
|
@ -22,6 +22,7 @@
|
||||
{{template "roomlist" .List}}
|
||||
</div>
|
||||
{{else}}
|
||||
<div id="sse-listener" sse-connect="/sub/sse" hx-trigger="sse:roomupdate_{{.State.RoomID}}" hx-get="/room" hx-target="#room-interier" hx-swap="none" style="display:none;"></div>
|
||||
<div id="room">
|
||||
{{template "room" .}}
|
||||
</div>
|
||||
|
@ -1,5 +1,5 @@
|
||||
{{define "room"}}
|
||||
<div id="interier" hx-get="/" hx-trigger="sse:roomupdate_{{.State.RoomID}}" class=space-y-2>
|
||||
<div id="room-interier" class=space-y-2>
|
||||
<div id="headwrapper" class="grid grid-cols-1 md:grid-cols-5 md:gap-4">
|
||||
<div id="meta" class="md:col-span-1 border-2 rounded-lg text-center space-y-2">
|
||||
<p>Hello {{.State.Username}};</p>
|
||||
|
@ -334,3 +334,19 @@ func HandleRemoveBot(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
notify(models.NotifyRoomUpdatePrefix+fi.Room.ID, "")
|
||||
}
|
||||
|
||||
func HandleGetRoom(w http.ResponseWriter, r *http.Request) {
|
||||
fi, err := getFullInfoByCtx(r.Context())
|
||||
if err != nil {
|
||||
abortWithError(w, err.Error())
|
||||
return
|
||||
}
|
||||
tmpl, err := template.ParseGlob("components/*.html")
|
||||
if err != nil {
|
||||
abortWithError(w, err.Error())
|
||||
return
|
||||
}
|
||||
if err := tmpl.ExecuteTemplate(w, "room", fi); err != nil {
|
||||
log.Error("failed to execute template", "error", err)
|
||||
}
|
||||
}
|
||||
|
1
main.go
1
main.go
@ -92,6 +92,7 @@ func ListenToRequests(port string) *http.Server {
|
||||
mux.HandleFunc("GET /add-bot", handlers.HandleAddBot)
|
||||
mux.HandleFunc("GET /remove-bot", handlers.HandleRemoveBot)
|
||||
mux.HandleFunc("GET /mark-card", handlers.HandleMarkCard)
|
||||
mux.HandleFunc("GET /room", handlers.HandleGetRoom)
|
||||
// special
|
||||
mux.HandleFunc("GET /renotify-bot", handlers.HandleRenotifyBot)
|
||||
// sse
|
||||
|
Reference in New Issue
Block a user