From: Skullheadx Date: Tue, 9 Jun 2026 01:37:33 +0000 (-0400) Subject: fix multi websocket issue X-Git-Url: http://git.skullheadx.com/links.html?a=commitdiff_plain;h=4ec44dc4f38dd37736561558063d81f1452b33c5;p=monopoly-web.git fix multi websocket issue --- diff --git a/game/game.go b/game/game.go index e2dc5d3..d800363 100644 --- a/game/game.go +++ b/game/game.go @@ -4,26 +4,28 @@ import ( "context" ) -func (ms *MonopolyServer) addSubscriber(s *subscriber, userUUID string) { +func (ms *MServer) addSubscriber(s *MSub, userUUID string) { ms.subscribersMu.Lock() - ms.subscribers[s] = userUUID + ms.Subscribers[s] = userUUID + ms.Users[userUUID].Subscription = s ms.subscribersMu.Unlock() } -func (ms *MonopolyServer) deleteSubscriber(s *subscriber) { +func (ms *MServer) deleteSubscriber(s *MSub) { ms.subscribersMu.Lock() - delete(ms.subscribers, s) + delete(ms.Users, ms.Subscribers[s]) + delete(ms.Subscribers, s) ms.subscribersMu.Unlock() } -func (ms *MonopolyServer) start() { +func (ms *MServer) start() { ms.subscribersMu.Lock() defer ms.subscribersMu.Unlock() ms.publishLimiter.Wait(context.Background()) var players []Player - for _, userUUID := range ms.subscribers { + for _, userUUID := range ms.Subscribers { players = append(players, InitPlayer(userUUID)) } @@ -33,24 +35,24 @@ func (ms *MonopolyServer) start() { ms.gameCtxMu.Unlock() msg := []byte{MSG_START} - for s := range ms.subscribers { + for s := range ms.Subscribers { select { - case s.msgs <- msg: + case s.Msgs <- msg: default: - go s.closeSlow() + go s.CloseSlow() } } } -func (ms *MonopolyServer) roll() { +func (ms *MServer) roll() { ms.gameCtx.RollDice() ms.gameCtx.ProcessMovement() ms.gameCtx.logGameCtx() } -func (ms *MonopolyServer) buy() { +func (ms *MServer) buy() { ms.gameCtx.Buy() ms.gameCtx.logGameCtx() diff --git a/game/handlers.go b/game/handlers.go index ef43a61..ece1024 100644 --- a/game/handlers.go +++ b/game/handlers.go @@ -3,16 +3,17 @@ package game import ( "context" "errors" - "github.com/coder/websocket" - "github.com/google/uuid" "io" "net" "net/http" "sync" "time" + + "github.com/coder/websocket" + "github.com/google/uuid" ) -func (ms *MonopolyServer) subscribeHandler(w http.ResponseWriter, r *http.Request) { +func (ms *MServer) subscribeHandler(w http.ResponseWriter, r *http.Request) { err := ms.subscribe(w, r) if errors.Is(err, context.Canceled) { return @@ -28,7 +29,7 @@ func (ms *MonopolyServer) subscribeHandler(w http.ResponseWriter, r *http.Reques } } -func (ms *MonopolyServer) loggedInHandler(w http.ResponseWriter, r *http.Request) { +func (ms *MServer) loggedInHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "GET" { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return @@ -47,7 +48,7 @@ func (ms *MonopolyServer) loggedInHandler(w http.ResponseWriter, r *http.Request userUUID := cookie.Value - _, ok := ms.users[userUUID] + _, ok := ms.Users[userUUID] if !ok { http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound) return @@ -56,7 +57,7 @@ func (ms *MonopolyServer) loggedInHandler(w http.ResponseWriter, r *http.Request w.WriteHeader(http.StatusOK) } -func (ms *MonopolyServer) loginHandler(w http.ResponseWriter, r *http.Request) { +func (ms *MServer) loginHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return @@ -68,9 +69,32 @@ func (ms *MonopolyServer) loginHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) return } + + cookie, err := r.Cookie("user") + if err != http.ErrNoCookie { + cookieUserUUID := cookie.Value + _, ok := ms.Users[cookieUserUUID] + if !ok { // user has cookie, player does not exist + // http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + // return + } else { // user has cookie, player exists + ms.Users[cookieUserUUID].Username = string(username) + w.WriteHeader(http.StatusOK) + return + } + } + + if ms.gameCtx != nil { + http.Error(w, http.StatusText(http.StatusConflict), http.StatusConflict) + return + } + userUUID := uuid.NewString() - ms.users[userUUID] = string(username) + ms.Users[userUUID] = &MUser{ + Username: string(username), + Subscription: nil, + } http.SetCookie(w, &http.Cookie{ Name: "user", @@ -85,7 +109,95 @@ func (ms *MonopolyServer) loginHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func (ms *MonopolyServer) startHandler(w http.ResponseWriter, r *http.Request) { +func (ms *MServer) subscribe(w http.ResponseWriter, r *http.Request) error { + cookie, err := r.Cookie("user") + if err != nil { + if errors.Is(err, http.ErrNoCookie) { + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return err + } + + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return err + } + + userUUID := cookie.Value + + if mUser, exists := ms.Users[userUUID]; exists { + if mUser.Subscription != nil { + http.Error(w, "Can only have one websocket connection at a time", http.StatusConflict) + return errors.New("Can only have one websocket connection at a time") + } + } else { + http.Error(w, "User does not exist", http.StatusNotFound) + return errors.New("User not found") + } + + var mu sync.Mutex + var c *websocket.Conn + var closed bool + + s := &MSub{ + Msgs: make(chan []byte, ms.subscriberMessageBuffer), + CloseSlow: func() { + mu.Lock() + defer mu.Unlock() + closed = true + if c != nil { + c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") + } + }, + } + ms.addSubscriber(s, userUUID) + + defer ms.deleteSubscriber(s) + + c2, err := websocket.Accept(w, r, nil) + if err != nil { + return err + } + + mu.Lock() + if closed { + mu.Unlock() + return net.ErrClosed + } + + c = c2 + mu.Unlock() + defer c.CloseNow() + + ctx := c.CloseRead(context.Background()) + for { + select { + case msg := <-s.Msgs: + err := writeTimeout(ctx, time.Second*5, c, msg) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +// func (ms *MonopolyServer) publish(msg []byte) { +// ms.subscribersMu.Lock() +// defer ms.subscribersMu.Unlock() +// +// ms.publishLimiter.Wait(context.Background()) +// +// for s := range ms.subscribers { +// select { +// case s.msgs <- msg: +// default: +// go s.closeSlow() +// } +// } +// +// } + +func (ms *MServer) startHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return @@ -103,7 +215,7 @@ func (ms *MonopolyServer) startHandler(w http.ResponseWriter, r *http.Request) { } userUUID := cookie.Value - _, ok := ms.users[userUUID] + _, ok := ms.Users[userUUID] if !ok { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) return @@ -119,7 +231,7 @@ func (ms *MonopolyServer) startHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusAccepted) } -func (ms *MonopolyServer) buyHandler(w http.ResponseWriter, r *http.Request) { +func (ms *MServer) buyHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return @@ -138,7 +250,7 @@ func (ms *MonopolyServer) buyHandler(w http.ResponseWriter, r *http.Request) { userUUID := cookie.Value - _, ok := ms.users[userUUID] + _, ok := ms.Users[userUUID] if !ok { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) return @@ -157,7 +269,7 @@ func (ms *MonopolyServer) buyHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } -func (ms *MonopolyServer) rollHandler(w http.ResponseWriter, r *http.Request) { +func (ms *MServer) rollHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) return @@ -176,7 +288,7 @@ func (ms *MonopolyServer) rollHandler(w http.ResponseWriter, r *http.Request) { userUUID := cookie.Value - _, ok := ms.users[userUUID] + _, ok := ms.Users[userUUID] if !ok { http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) return @@ -193,80 +305,3 @@ func (ms *MonopolyServer) rollHandler(w http.ResponseWriter, r *http.Request) { ms.roll() w.WriteHeader(http.StatusOK) } - -func (ms *MonopolyServer) subscribe(w http.ResponseWriter, r *http.Request) error { - cookie, err := r.Cookie("user") - if err != nil { - if err == http.ErrNoCookie { - http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) - return err - } - - http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) - return err - } - - userUUID := cookie.Value - - var mu sync.Mutex - var c *websocket.Conn - var closed bool - - s := &subscriber{ - msgs: make(chan []byte, ms.subscriberMessageBuffer), - closeSlow: func() { - mu.Lock() - defer mu.Unlock() - closed = true - if c != nil { - c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") - } - }, - } - ms.addSubscriber(s, userUUID) - defer ms.deleteSubscriber(s) - - c2, err := websocket.Accept(w, r, nil) - if err != nil { - return err - } - - mu.Lock() - if closed { - mu.Unlock() - return net.ErrClosed - } - - c = c2 - mu.Unlock() - defer c.CloseNow() - - ctx := c.CloseRead(context.Background()) - for { - select { - case msg := <-s.msgs: - err := writeTimeout(ctx, time.Second*5, c, msg) - if err != nil { - return err - } - case <-ctx.Done(): - return ctx.Err() - } - } -} - -// func (ms *MonopolyServer) publish(msg []byte) { -// ms.subscribersMu.Lock() -// defer ms.subscribersMu.Unlock() -// -// ms.publishLimiter.Wait(context.Background()) -// -// for s := range ms.subscribers { -// select { -// case s.msgs <- msg: -// default: -// go s.closeSlow() -// } -// } -// -// } diff --git a/game/helpers.go b/game/helpers.go index c6c1081..9381bba 100644 --- a/game/helpers.go +++ b/game/helpers.go @@ -160,3 +160,17 @@ func (ctx *Context) logGameCtx() { }, "", " ") fmt.Println(string(data)) } + +func (ms *MServer) logUsers() { + fmt.Println("Users:") + for k, v := range ms.Users { + fmt.Printf("Key:%s, val:%v\n", k, v) + } +} + +func (ms *MServer) logMsubs() { + fmt.Println("Subs:") + for k, v := range ms.Subscribers { + fmt.Printf("Key:%p, val:%s\n", k, v) + } +} diff --git a/game/server.go b/game/server.go index f420e60..ff256e9 100644 --- a/game/server.go +++ b/game/server.go @@ -10,12 +10,12 @@ import ( "time" ) -func NewMonopolyServer() *MonopolyServer { - ms := &MonopolyServer{ +func NewMonopolyServer() *MServer { + ms := &MServer{ subscriberMessageBuffer: 16, logf: log.Printf, - subscribers: make(map[*subscriber]string), - users: make(map[string]string), + Subscribers: make(map[*MSub]string), + Users: make(map[string]*MUser), publishLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 8), gameCtx: nil, randSeed: rand.NewPCG(20, 26), @@ -32,7 +32,7 @@ func NewMonopolyServer() *MonopolyServer { return ms } -func (ms *MonopolyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (ms *MServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { ms.serveMux.ServeHTTP(w, r) } func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error { diff --git a/game/types.go b/game/types.go index 5f7d9dd..980ce3c 100644 --- a/game/types.go +++ b/game/types.go @@ -193,7 +193,7 @@ type Context struct { Properties Properties } -type MonopolyServer struct { +type MServer struct { subscriberMessageBuffer int publishLimiter *rate.Limiter @@ -202,17 +202,22 @@ type MonopolyServer struct { serveMux http.ServeMux subscribersMu sync.Mutex - subscribers map[*subscriber]string + Subscribers map[*MSub]string // uuid to username - users map[string]string + Users map[string]*MUser gameCtxMu sync.Mutex gameCtx *Context randSeed *rand.PCG } -type subscriber struct { - msgs chan []byte - closeSlow func() +type MSub struct { + Msgs chan []byte + CloseSlow func() +} + +type MUser struct { + Username string + Subscription *MSub } diff --git a/public/index.js b/public/index.js index c6af839..22280f4 100644 --- a/public/index.js +++ b/public/index.js @@ -1,12 +1,16 @@ ; (() => { let connected = false function dial() { + if (connected) { + return + } + const conn = new WebSocket(`ws://${location.host}/subscribe`) conn.addEventListener('close', ev => { console.error(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`) connected = false - if (ev.code !== 1001) { + if (ev.code !== 1001 && ev.code !== 404) { console.error('Reconnecting in 1s') setTimeout(dial, 1000) }