The single shared mesh is replaced by per-session rooms. Visit / and the server mints a random 8-hex-char id, redirects to /r/<id>. That URL IS the session — share the link (or scan the QR code now shown on the page) on another device to join the same room. Bus is now sharded per room. Rooms are created implicitly on first subscribe and GC'd 5 minutes after the last subscriber leaves. No accounts, no persistence, no server-side state beyond the in-memory bus map. Server: - New endpoints: /, /r/<id>, /api/send?room=, /api/stream?room= - Room manager with lazy creation + idle GC - Metrics now labelled by room - New gauge tether_active_rooms Client (Go): - -room flag (accepts bare id OR full /r/<id> URL — paste-friendly) - All API calls now scope to the room - The always-on ct210-rtc-peer systemd unit is disabled — sessions are user-initiated; the user runs tether-client with -room when they want their laptop in a particular session Browser (HTML): - Reads room from /r/<id> path - Shows QR code + URL + "copy link" button at top - "+ new session" link in header to start a fresh room Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
332 lines
7.8 KiB
Go
332 lines
7.8 KiB
Go
// tether-server v0.5: room-scoped, stateless mesh signaling.
|
||
//
|
||
// Each session is a "room" identified by a short random ID. The room
|
||
// exists implicitly while at least one subscriber is connected; goes
|
||
// away once empty (no persistence, no accounts).
|
||
//
|
||
// URL shape:
|
||
// / → landing; JS picks a room id, redirects to /r/<id>
|
||
// /r/<id> → mesh page scoped to room <id>
|
||
// /api/send → POST { type, text|signal, from, to, source, room }
|
||
// /api/stream → SSE; query ?room=<id>
|
||
// /metrics → Prometheus
|
||
//
|
||
// Message types on the bus:
|
||
// "clipboard" – user payload
|
||
// "signal" – WebRTC SDP / ICE (envelope.signal)
|
||
// "presence" – chirp; { from, role, ... }
|
||
package main
|
||
|
||
import (
|
||
"crypto/rand"
|
||
"embed"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"io/fs"
|
||
"log"
|
||
"net/http"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/prometheus/client_golang/prometheus"
|
||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||
)
|
||
|
||
//go:embed web
|
||
var webFS embed.FS
|
||
|
||
// Message envelope.
|
||
type Message struct {
|
||
Type string `json:"type,omitempty"`
|
||
Text string `json:"text,omitempty"`
|
||
Signal json.RawMessage `json:"signal,omitempty"`
|
||
From string `json:"from,omitempty"`
|
||
To string `json:"to,omitempty"`
|
||
Role string `json:"role,omitempty"`
|
||
Source string `json:"source,omitempty"`
|
||
Room string `json:"room,omitempty"`
|
||
TS int64 `json:"ts"`
|
||
}
|
||
|
||
type bus struct {
|
||
mu sync.Mutex
|
||
clients map[chan Message]string
|
||
history []Message
|
||
}
|
||
|
||
func newBus() *bus { return &bus{clients: map[chan Message]string{}} }
|
||
|
||
func (b *bus) subscribe(label string) chan Message {
|
||
ch := make(chan Message, 32)
|
||
b.mu.Lock()
|
||
b.clients[ch] = label
|
||
for _, m := range b.history {
|
||
if m.Type == "" || m.Type == "clipboard" {
|
||
select {
|
||
case ch <- m:
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
b.mu.Unlock()
|
||
return ch
|
||
}
|
||
|
||
func (b *bus) unsubscribe(ch chan Message) {
|
||
b.mu.Lock()
|
||
delete(b.clients, ch)
|
||
b.mu.Unlock()
|
||
close(ch)
|
||
}
|
||
|
||
func (b *bus) publish(m Message) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if m.Type == "" || m.Type == "clipboard" {
|
||
b.history = append(b.history, m)
|
||
if len(b.history) > 10 {
|
||
b.history = b.history[len(b.history)-10:]
|
||
}
|
||
}
|
||
for ch := range b.clients {
|
||
select {
|
||
case ch <- m:
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
func (b *bus) size() int {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
return len(b.clients)
|
||
}
|
||
|
||
// Room registry — implicit creation, GC empty rooms after a grace period.
|
||
type rooms struct {
|
||
mu sync.Mutex
|
||
byID map[string]*bus
|
||
emptyAt map[string]time.Time
|
||
graceTime time.Duration
|
||
}
|
||
|
||
func newRooms() *rooms {
|
||
return &rooms{
|
||
byID: map[string]*bus{},
|
||
emptyAt: map[string]time.Time{},
|
||
graceTime: 5 * time.Minute,
|
||
}
|
||
}
|
||
|
||
func (r *rooms) get(id string) *bus {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
b, ok := r.byID[id]
|
||
if !ok {
|
||
b = newBus()
|
||
r.byID[id] = b
|
||
activeRooms.Inc()
|
||
}
|
||
delete(r.emptyAt, id)
|
||
return b
|
||
}
|
||
|
||
func (r *rooms) noteEmpty(id string) {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
if b, ok := r.byID[id]; ok && b.size() == 0 {
|
||
r.emptyAt[id] = time.Now()
|
||
}
|
||
}
|
||
|
||
func (r *rooms) gcLoop() {
|
||
t := time.NewTicker(time.Minute)
|
||
defer t.Stop()
|
||
for range t.C {
|
||
r.mu.Lock()
|
||
for id, since := range r.emptyAt {
|
||
if time.Since(since) > r.graceTime {
|
||
delete(r.byID, id)
|
||
delete(r.emptyAt, id)
|
||
activeRooms.Dec()
|
||
}
|
||
}
|
||
r.mu.Unlock()
|
||
}
|
||
}
|
||
|
||
func newRoomID() string {
|
||
b := make([]byte, 4) // 8 hex chars — 4 billion combinations, short enough to share
|
||
_, _ = rand.Read(b)
|
||
return hex.EncodeToString(b)
|
||
}
|
||
|
||
// Prometheus metrics
|
||
var (
|
||
messages = promauto.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "tether_messages_total",
|
||
Help: "Total messages published, by source / type / room.",
|
||
}, []string{"source", "type", "room"})
|
||
bytesIn = promauto.NewCounter(prometheus.CounterOpts{
|
||
Name: "tether_message_bytes_total",
|
||
Help: "Total bytes of clipboard text published.",
|
||
})
|
||
activeRooms = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "tether_active_rooms",
|
||
Help: "Currently-active rooms (have at least one subscriber).",
|
||
})
|
||
subscribers = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "tether_active_subscribers",
|
||
Help: "Total SSE subscribers across all rooms.",
|
||
})
|
||
publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{
|
||
Name: "tether_publish_duration_seconds",
|
||
Help: "Latency of publish() fan-out.",
|
||
Buckets: prometheus.ExponentialBuckets(0.0001, 4, 8),
|
||
})
|
||
)
|
||
|
||
func main() {
|
||
addr := flag.String("addr", ":8765", "listen address")
|
||
flag.Parse()
|
||
|
||
rm := newRooms()
|
||
go rm.gcLoop()
|
||
|
||
sub, _ := fs.Sub(webFS, "web")
|
||
mux := http.NewServeMux()
|
||
mux.Handle("/static/", http.FileServer(http.FS(sub)))
|
||
mux.Handle("/metrics", promhttp.Handler())
|
||
|
||
// Landing: pick a room and 302 to /r/<id>.
|
||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.URL.Path == "/" {
|
||
http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound)
|
||
return
|
||
}
|
||
http.NotFound(w, r)
|
||
})
|
||
|
||
// Room page — same HTML for any room id. Client reads room from URL.
|
||
mux.HandleFunc("/r/", func(w http.ResponseWriter, r *http.Request) {
|
||
// Accept /r/<id> only (no further path segments)
|
||
id := strings.TrimPrefix(r.URL.Path, "/r/")
|
||
if id == "" || strings.Contains(id, "/") {
|
||
http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound)
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||
f, err := webFS.ReadFile("web/index.html")
|
||
if err != nil {
|
||
http.Error(w, "missing index.html", http.StatusInternalServerError)
|
||
return
|
||
}
|
||
w.Write(f)
|
||
})
|
||
|
||
mux.HandleFunc("/api/send", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
http.Error(w, "POST only", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
var m Message
|
||
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
|
||
http.Error(w, "bad json", http.StatusBadRequest)
|
||
return
|
||
}
|
||
if m.Room == "" {
|
||
m.Room = r.URL.Query().Get("room")
|
||
}
|
||
if m.Room == "" {
|
||
http.Error(w, "missing room", http.StatusBadRequest)
|
||
return
|
||
}
|
||
if m.Type == "" {
|
||
m.Type = "clipboard"
|
||
}
|
||
if m.Source == "" {
|
||
m.Source = r.Header.Get("X-Tether-Source")
|
||
if m.Source == "" {
|
||
m.Source = "web"
|
||
}
|
||
}
|
||
m.TS = time.Now().UnixMilli()
|
||
|
||
b := rm.get(m.Room)
|
||
t0 := time.Now()
|
||
b.publish(m)
|
||
publishLatency.Observe(time.Since(t0).Seconds())
|
||
messages.WithLabelValues(m.Source, m.Type, m.Room).Inc()
|
||
if m.Type == "clipboard" {
|
||
bytesIn.Add(float64(len(m.Text)))
|
||
}
|
||
w.WriteHeader(http.StatusNoContent)
|
||
})
|
||
|
||
mux.HandleFunc("/api/stream", func(w http.ResponseWriter, r *http.Request) {
|
||
fl, ok := w.(http.Flusher)
|
||
if !ok {
|
||
http.Error(w, "no flusher", http.StatusInternalServerError)
|
||
return
|
||
}
|
||
room := r.URL.Query().Get("room")
|
||
if room == "" {
|
||
http.Error(w, "missing ?room=", http.StatusBadRequest)
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "text/event-stream")
|
||
w.Header().Set("Cache-Control", "no-cache")
|
||
w.Header().Set("Connection", "keep-alive")
|
||
w.Header().Set("X-Accel-Buffering", "no")
|
||
|
||
label := r.Header.Get("X-Tether-Client")
|
||
if label == "" {
|
||
label = r.RemoteAddr
|
||
}
|
||
|
||
b := rm.get(room)
|
||
ch := b.subscribe(label)
|
||
subscribers.Inc()
|
||
defer func() {
|
||
b.unsubscribe(ch)
|
||
subscribers.Dec()
|
||
if b.size() == 0 {
|
||
rm.noteEmpty(room)
|
||
}
|
||
}()
|
||
log.Printf("subscribe room=%s label=%s", room, label)
|
||
|
||
ka := time.NewTicker(30 * time.Second)
|
||
defer ka.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-r.Context().Done():
|
||
return
|
||
case m := <-ch:
|
||
bs, _ := json.Marshal(m)
|
||
ev := m.Type
|
||
if ev == "" {
|
||
ev = "clipboard"
|
||
}
|
||
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev, bs)
|
||
fl.Flush()
|
||
case <-ka.C:
|
||
fmt.Fprintf(w, ": keepalive\n\n")
|
||
fl.Flush()
|
||
}
|
||
}
|
||
})
|
||
|
||
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
||
w.Write([]byte("ok"))
|
||
})
|
||
|
||
log.Printf("tether-server listening on %s", *addr)
|
||
log.Fatal(http.ListenAndServe(*addr, mux))
|
||
}
|