Enha: sse update
This commit is contained in:
		| @@ -1,6 +1,7 @@ | ||||
| package broker | ||||
|  | ||||
| import ( | ||||
| 	"context" | ||||
| 	"fmt" | ||||
| 	"log/slog" | ||||
| 	"net/http" | ||||
| @@ -20,8 +21,10 @@ type ( | ||||
| 	NotifierChan chan NotificationEvent | ||||
| 	Broker       struct { | ||||
| 		// Events are pushed to this channel by the main events-gathering routine | ||||
| 		Notifier NotifierChan | ||||
| 		log      *slog.Logger | ||||
| 		Notifier  NotifierChan | ||||
| 		log       *slog.Logger | ||||
| 		addClient chan NotifierChan | ||||
| 		clients   map[NotifierChan]struct{} | ||||
| 	} | ||||
| ) | ||||
|  | ||||
| @@ -33,6 +36,8 @@ func NewBroker() (broker *Broker) { | ||||
| 			Level:     slog.LevelDebug, | ||||
| 			AddSource: true, | ||||
| 		})), | ||||
| 		addClient: make(chan NotifierChan, 10), | ||||
| 		clients:   map[NotifierChan]struct{}{}, | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -56,6 +61,8 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 	w.Header().Set("Access-Control-Allow-Origin", origin) | ||||
| 	w.Header().Set("Access-Control-Allow-Credentials", "true") | ||||
| 	ctx := r.Context() | ||||
| 	msgChan := make(NotifierChan, 10) | ||||
| 	broker.addClient <- msgChan | ||||
| 	// browser can close sse on its own; ping every 2s to prevent | ||||
| 	heartbeat := time.NewTicker(8 * time.Second) | ||||
| 	defer heartbeat.Stop() | ||||
| @@ -91,3 +98,15 @@ func (broker *Broker) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func (broker *Broker) Listen(ctx context.Context) { | ||||
| 	for { | ||||
| 		select { | ||||
| 		case <-ctx.Done(): | ||||
| 			return | ||||
| 		case clientChan := <-broker.addClient: | ||||
| 			// mutex | ||||
| 			broker.clients[clientChan] = struct{}{} | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Grail Finder
					Grail Finder