From: Skullheadx Date: Mon, 8 Jun 2026 01:16:38 +0000 (-0400) Subject: begin ws integration X-Git-Url: http://git.skullheadx.com/projects/static/git-logo.png?a=commitdiff_plain;h=972aba5c9b051d28fd698fd99b1d2868816e314e;p=monopoly-web.git begin ws integration --- diff --git a/game/game.go b/game/game.go index 51d10bf..56a0502 100644 --- a/game/game.go +++ b/game/game.go @@ -1,7 +1,17 @@ package game import ( + "context" + "errors" + "github.com/coder/websocket" + "golang.org/x/time/rate" + "io" + "log" "math/rand/v2" + "net" + "net/http" + "sync" + "time" ) func initTurn(pID PlayerID, inJail bool) Turn { @@ -204,3 +214,153 @@ func (ctx *Context) getPropID(spaceID SpaceID) PropertyID { } panic("Space is not an ownable property") } + +type MonopolyServer struct { + subscriberMessageBuffer int + publishLimiter *rate.Limiter + + logf func(f string, v ...any) + + serveMux http.ServeMux + + subscribersMu sync.Mutex + subscribers map[*subscriber]struct{} +} + +func NewMonopolyServer() *MonopolyServer { + ms := &MonopolyServer{ + subscriberMessageBuffer: 16, + logf: log.Printf, + subscribers: make(map[*subscriber]struct{}), + publishLimiter: rate.NewLimiter(rate.Every(time.Millisecond*100), 8), + } + ms.serveMux.Handle("/", http.FileServer(http.Dir("../public/"))) + ms.serveMux.HandleFunc("/subscribe", ms.subscribeHandler) + ms.serveMux.HandleFunc("/publish", ms.publishHandler) + + return ms +} + +type subscriber struct { + msgs chan []byte + closeSlow func() +} + +func (ms *MonopolyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ms.serveMux.ServeHTTP(w, r) +} + +func (ms *MonopolyServer) subscribeHandler(w http.ResponseWriter, r *http.Request) { + err := ms.subscribe(w, r) + if errors.Is(err, context.Canceled) { + return + } + + if websocket.CloseStatus(err) == websocket.StatusNormalClosure || websocket.CloseStatus(err) == websocket.StatusGoingAway { + return + } + + if err != nil { + ms.logf("%v", err) + return + } +} + +func (ms *MonopolyServer) publishHandler(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + http.Error(w, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed) + return + } + + body := http.MaxBytesReader(w, r.Body, 8192) + msg, err := io.ReadAll(body) + if err != nil { + http.Error(w, http.StatusText(http.StatusRequestEntityTooLarge), http.StatusRequestEntityTooLarge) + return + } + + ms.publish(msg) + w.WriteHeader(http.StatusAccepted) +} + +func (ms *MonopolyServer) subscribe(w http.ResponseWriter, r *http.Request) error { + var mu sync.Mutex + var c *websocket.Conn + var closed bool + s := &subscriber{ + msgs: make(chan []byte, ms.subscriberMessageBuffer), + closeSlow: func() { + mu.Lock() + defer mu.Unlock() + closed = true + if c != nil { + c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages") + } + }, + } + ms.addSubscriber(s) + defer ms.deleteSubscriber(s) + + c2, err := websocket.Accept(w, r, nil) + if err != nil { + return err + } + + mu.Lock() + if closed { + mu.Unlock() + return net.ErrClosed + } + + c = c2 + mu.Unlock() + defer c.CloseNow() + + ctx := c.CloseRead(context.Background()) + for { + select { + case msg := <-s.msgs: + err := writeTimeout(ctx, time.Second*5, c, msg) + if err != nil { + return err + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + +func (ms *MonopolyServer) publish(msg []byte) { + ms.subscribersMu.Lock() + defer ms.subscribersMu.Unlock() + + ms.publishLimiter.Wait(context.Background()) + + for s := range ms.subscribers { + select { + case s.msgs <- msg: + default: + go s.closeSlow() + } + } + +} + +func (ms *MonopolyServer) addSubscriber(s *subscriber) { + ms.subscribersMu.Lock() + ms.subscribers[s] = struct{}{} + ms.subscribersMu.Unlock() +} + +func (ms *MonopolyServer) deleteSubscriber(s *subscriber) { + ms.subscribersMu.Lock() + delete(ms.subscribers, s) + ms.subscribersMu.Unlock() +} + +func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + return c.Write(ctx, websocket.MessageText, msg) +} diff --git a/go.mod b/go.mod index 127ba66..f5d43a3 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,8 @@ module monopoly-web go 1.26.2 + +require ( + github.com/coder/websocket v1.8.14 + golang.org/x/time v0.15.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..aa6ad6f --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= +github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= +golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U= +golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno= diff --git a/main.go b/main.go index 1c48c06..a1f05b6 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,17 @@ package main import ( - "fmt" "io" "log" "math/rand/v2" "monopoly-web/game" "net/http" + + "context" + "net" + "os" + "os/signal" + "time" ) type Room struct { @@ -24,20 +29,20 @@ func initRoom() Room { } } -func main() { - fmt.Println("monopoly-web backend") - - room := initRoom() - - // register routes - http.HandleFunc("/health", healthHandler) - http.HandleFunc("/api/v1/roll", room.rollDiceHandler) - http.HandleFunc("POST /api/v1/turn", room.endTurnHandler) - http.HandleFunc("POST /api/v1/exit-jail", room.exitJailHandler) - - // listen and serve - log.Fatal(http.ListenAndServe(":8080", nil)) -} +// func main() { +// fmt.Println("monopoly-web backend") +// +// room := initRoom() +// +// // register routes +// http.HandleFunc("/health", healthHandler) +// http.HandleFunc("/api/v1/roll", room.rollDiceHandler) +// http.HandleFunc("POST /api/v1/turn", room.endTurnHandler) +// http.HandleFunc("POST /api/v1/exit-jail", room.exitJailHandler) +// +// // listen and serve +// log.Fatal(http.ListenAndServe(":8080", nil)) +// } func healthHandler(w http.ResponseWriter, req *http.Request) { io.WriteString(w, "Status: healthy\n") @@ -93,3 +98,46 @@ func (r *Room) exitJailHandler(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status": "success", "message": "Action processed"}`)) } + +func main() { + log.SetFlags(0) + + err := run() + if err != nil { + log.Fatal(err) + } +} + +func run() error { + l, err := net.Listen("tcp", "127.0.0.1:8443") + if err != nil { + return err + } + log.Printf("listening on ws://%v", l.Addr()) + + cs := newMonopolyServer() + s := &http.Server{ + Handler: cs, + ReadTimeout: time.Second * 10, + WriteTimeout: time.Second * 10, + } + + errc := make(chan error, 1) + go func() { + errc <- s.Serve(l) + }() + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, os.Interrupt) + select { + case err := <-errc: + log.Printf("failed to serve: %v", err) + case sig := <-sigs: + log.Printf("terminating %v", sig) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + return s.Shutdown(ctx) +} diff --git a/public/index.css b/public/index.css new file mode 100644 index 0000000..f0eb172 --- /dev/null +++ b/public/index.css @@ -0,0 +1,81 @@ +body { + width: 100vw; + min-width: 320px; +} + +#root { + padding: 40px 20px; + max-width: 600px; + margin: auto; + height: 100vh; + + display: flex; + flex-direction: column; + align-items: center; + justify-content: center; +} + +#root>*+* { + margin: 20px 0 0 0; +} + +/* 100vh on safari does not include the bottom bar. */ +@supports (-webkit-overflow-scrolling: touch) { + #root { + height: 85vh; + } +} + +#message-log { + width: 100%; + flex-grow: 1; + overflow: auto; +} + +#message-log p:first-child { + margin: 0; +} + +#message-log>*+* { + margin: 10px 0 0 0; +} + +#publish-form-container { + width: 100%; +} + +#publish-form { + width: 100%; + display: flex; + height: 40px; +} + +#publish-form>*+* { + margin: 0 0 0 10px; +} + +#publish-form input[type='text'] { + flex-grow: 1; + + -moz-appearance: none; + -webkit-appearance: none; + word-break: normal; + border-radius: 5px; + border: 1px solid #ccc; +} + +#publish-form input[type='submit'] { + color: white; + background-color: black; + border-radius: 5px; + padding: 5px 10px; + border: none; +} + +#publish-form input[type='submit']:hover { + background-color: red; +} + +#publish-form input[type='submit']:active { + background-color: red; +} diff --git a/public/index.html b/public/index.html new file mode 100644 index 0000000..7038342 --- /dev/null +++ b/public/index.html @@ -0,0 +1,25 @@ + + + + + github.com/coder/websocket - Chat Example + + + + + + + + +
+
+
+
+ + +
+
+
+ + + diff --git a/public/index.js b/public/index.js new file mode 100644 index 0000000..aabfa80 --- /dev/null +++ b/public/index.js @@ -0,0 +1,70 @@ +; (() => { + let expectingMessage = false + function dial() { + const conn = new WebSocket(`ws://${location.host}/subscribe`) + + conn.addEventListener('close', ev => { + appendLog(`WebSocket Disconnected code: ${ev.code}, reason: ${ev.reason}`, true) + if (ev.code !== 1001) { + appendLog('Reconnecting in 1s', true) + setTimeout(dial, 1000) + } + }) + conn.addEventListener('open', ev => { + console.info('websocket connected') + }) + + conn.addEventListener('message', ev => { + if (typeof ev.data !== 'string') { + console.error('unexpected message type', typeof ev.data) + return + } + const p = appendLog(ev.data) + if (expectingMessage) { + p.scrollIntoView() + expectingMessage = false + } + }) + } + dial() + + const messageLog = document.getElementById('message-log') + const publishForm = document.getElementById('publish-form') + const messageInput = document.getElementById('message-input') + + function appendLog(text, error) { + const p = document.createElement('p') + p.innerText = `${new Date().toLocaleTimeString()}: ${text}` + if (error) { + p.style.color = 'red' + p.style.fontStyle = 'bold' + } + messageLog.append(p) + return p + } + appendLog('Submit a message to get started!') + + + publishForm.onsubmit = async ev => { + ev.preventDefault() + + const msg = messageInput.value + if (msg === '') { + return + } + messageInput.value = '' + + expectingMessage = true + try { + const resp = await fetch('/publish', { + method: 'POST', + body: msg, + }) + if (resp.status !== 202) { + throw new Error(`Unexpected HTTP Status ${resp.status} ${resp.statusText}`) + } + } catch (err) { + appendLog(`Publish failed: ${err.message}`, true) + } + } +})()