// tether-server v0.5: room-scoped, stateless mesh signaling. // // Each session is a "room" identified by a short random ID. The room // exists implicitly while at least one subscriber is connected; goes // away once empty (no persistence, no accounts). // // URL shape: // / → landing; JS picks a room id, redirects to /r/ // /r/ → mesh page scoped to room // /api/send → POST { type, text|signal, from, to, source, room } // /api/stream → SSE; query ?room= // /metrics → Prometheus // // Message types on the bus: // "clipboard" – user payload // "signal" – WebRTC SDP / ICE (envelope.signal) // "presence" – chirp; { from, role, ... } package main import ( "crypto/rand" "embed" "encoding/hex" "encoding/json" "flag" "fmt" "io/fs" "log" "net/http" "strings" "sync" "time" "github.com/mileusna/useragent" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ) // labelFromUA returns a short, human-friendly platform label like // "iphone-safari", "macos-chrome", "windows-firefox". Falls back to "web". func labelFromUA(uaStr string) string { ua := useragent.Parse(uaStr) platform := strings.ToLower(ua.OS) switch { case ua.IsIOS() && strings.Contains(strings.ToLower(ua.Device), "ipad"): platform = "ipad" case ua.IsIOS(): platform = "iphone" case ua.IsAndroid(): platform = "android" case ua.IsMacOS(): platform = "macos" case ua.IsWindows(): platform = "windows" case ua.IsLinux(): platform = "linux" } browser := strings.ToLower(ua.Name) browser = strings.ReplaceAll(browser, " ", "") if platform == "" && browser == "" { return "web" } if browser == "" { return platform } if platform == "" { return browser } return platform + "-" + browser } //go:embed web var webFS embed.FS // Message envelope. type Message struct { Type string `json:"type,omitempty"` Text string `json:"text,omitempty"` Signal json.RawMessage `json:"signal,omitempty"` From string `json:"from,omitempty"` To string `json:"to,omitempty"` Role string `json:"role,omitempty"` Source string `json:"source,omitempty"` Room string `json:"room,omitempty"` TS int64 `json:"ts"` } type bus struct { mu sync.Mutex clients map[chan Message]string history []Message } func newBus() *bus { return &bus{clients: map[chan Message]string{}} } func (b *bus) subscribe(label string) chan Message { ch := make(chan Message, 32) b.mu.Lock() b.clients[ch] = label for _, m := range b.history { if m.Type == "" || m.Type == "clipboard" { select { case ch <- m: default: } } } b.mu.Unlock() return ch } func (b *bus) unsubscribe(ch chan Message) { b.mu.Lock() delete(b.clients, ch) b.mu.Unlock() close(ch) } func (b *bus) publish(m Message) { b.mu.Lock() defer b.mu.Unlock() if m.Type == "" || m.Type == "clipboard" { b.history = append(b.history, m) if len(b.history) > 10 { b.history = b.history[len(b.history)-10:] } } for ch := range b.clients { select { case ch <- m: default: } } } func (b *bus) size() int { b.mu.Lock() defer b.mu.Unlock() return len(b.clients) } // Room registry — implicit creation, GC empty rooms after a grace period. type rooms struct { mu sync.Mutex byID map[string]*bus emptyAt map[string]time.Time graceTime time.Duration } func newRooms() *rooms { return &rooms{ byID: map[string]*bus{}, emptyAt: map[string]time.Time{}, graceTime: 5 * time.Minute, } } func (r *rooms) get(id string) *bus { r.mu.Lock() defer r.mu.Unlock() b, ok := r.byID[id] if !ok { b = newBus() r.byID[id] = b activeRooms.Inc() } delete(r.emptyAt, id) return b } func (r *rooms) noteEmpty(id string) { r.mu.Lock() defer r.mu.Unlock() if b, ok := r.byID[id]; ok && b.size() == 0 { r.emptyAt[id] = time.Now() } } func (r *rooms) gcLoop() { t := time.NewTicker(time.Minute) defer t.Stop() for range t.C { r.mu.Lock() for id, since := range r.emptyAt { if time.Since(since) > r.graceTime { delete(r.byID, id) delete(r.emptyAt, id) activeRooms.Dec() } } r.mu.Unlock() } } func newRoomID() string { b := make([]byte, 4) // 8 hex chars — 4 billion combinations, short enough to share _, _ = rand.Read(b) return hex.EncodeToString(b) } // Prometheus metrics var ( messages = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "tether_messages_total", Help: "Total messages published, by source / type / room.", }, []string{"source", "type", "room"}) bytesIn = promauto.NewCounter(prometheus.CounterOpts{ Name: "tether_message_bytes_total", Help: "Total bytes of clipboard text published.", }) activeRooms = promauto.NewGauge(prometheus.GaugeOpts{ Name: "tether_active_rooms", Help: "Currently-active rooms (have at least one subscriber).", }) subscribers = promauto.NewGauge(prometheus.GaugeOpts{ Name: "tether_active_subscribers", Help: "Total SSE subscribers across all rooms.", }) publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "tether_publish_duration_seconds", Help: "Latency of publish() fan-out.", Buckets: prometheus.ExponentialBuckets(0.0001, 4, 8), }) ) func main() { addr := flag.String("addr", ":8765", "listen address") flag.Parse() rm := newRooms() go rm.gcLoop() sub, _ := fs.Sub(webFS, "web") mux := http.NewServeMux() mux.Handle("/static/", http.FileServer(http.FS(sub))) mux.Handle("/metrics", promhttp.Handler()) // Landing: pick a room and 302 to /r/. mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/" { http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound) return } http.NotFound(w, r) }) // Room page — same HTML for any room id. Client reads room from URL. mux.HandleFunc("/r/", func(w http.ResponseWriter, r *http.Request) { // Accept /r/ only (no further path segments) id := strings.TrimPrefix(r.URL.Path, "/r/") if id == "" || strings.Contains(id, "/") { http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound) return } w.Header().Set("Content-Type", "text/html; charset=utf-8") f, err := webFS.ReadFile("web/index.html") if err != nil { http.Error(w, "missing index.html", http.StatusInternalServerError) return } w.Write(f) }) mux.HandleFunc("/api/send", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "POST only", http.StatusMethodNotAllowed) return } var m Message if err := json.NewDecoder(r.Body).Decode(&m); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } if m.Room == "" { m.Room = r.URL.Query().Get("room") } if m.Room == "" { http.Error(w, "missing room", http.StatusBadRequest) return } if m.Type == "" { m.Type = "clipboard" } if m.Source == "" { m.Source = r.Header.Get("X-Tether-Source") if m.Source == "" { m.Source = labelFromUA(r.Header.Get("User-Agent")) } } m.TS = time.Now().UnixMilli() b := rm.get(m.Room) t0 := time.Now() b.publish(m) publishLatency.Observe(time.Since(t0).Seconds()) messages.WithLabelValues(m.Source, m.Type, m.Room).Inc() if m.Type == "clipboard" { bytesIn.Add(float64(len(m.Text))) } w.WriteHeader(http.StatusNoContent) }) mux.HandleFunc("/api/stream", func(w http.ResponseWriter, r *http.Request) { fl, ok := w.(http.Flusher) if !ok { http.Error(w, "no flusher", http.StatusInternalServerError) return } room := r.URL.Query().Get("room") if room == "" { http.Error(w, "missing ?room=", http.StatusBadRequest) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") label := r.Header.Get("X-Tether-Client") if label == "" { label = r.RemoteAddr } b := rm.get(room) ch := b.subscribe(label) subscribers.Inc() defer func() { b.unsubscribe(ch) subscribers.Dec() if b.size() == 0 { rm.noteEmpty(room) } }() log.Printf("subscribe room=%s label=%s", room, label) ka := time.NewTicker(30 * time.Second) defer ka.Stop() for { select { case <-r.Context().Done(): return case m := <-ch: bs, _ := json.Marshal(m) ev := m.Type if ev == "" { ev = "clipboard" } fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev, bs) fl.Flush() case <-ka.C: fmt.Fprintf(w, ": keepalive\n\n") fl.Flush() } } }) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) }) log.Printf("tether-server listening on %s", *addr) log.Fatal(http.ListenAndServe(*addr, mux)) }