diff --git a/client/main.go b/client/main.go index 98954fe..165eb93 100644 --- a/client/main.go +++ b/client/main.go @@ -53,6 +53,7 @@ type Message struct { To string `json:"to,omitempty"` Role string `json:"role,omitempty"` Source string `json:"source,omitempty"` + Room string `json:"room,omitempty"` TS int64 `json:"ts"` } @@ -68,16 +69,31 @@ var ( myLabel string useRTC bool rtcServer string + myRoom string ) func main() { server := flag.String("server", "https://tether.pecord.io", "tether-server base URL") label := flag.String("label", "", "X-Tether-Client label (default: -sse-)") sendText := flag.String("send", "", "send this text and exit (otherwise listen)") + roomFlag := flag.String("room", "", "room id to join (or full URL like https://tether.pecord.io/r/)") flag.BoolVar(&noClipboard, "no-clipboard", false, "don't write incoming messages to the OS clipboard") flag.BoolVar(&useRTC, "rtc", false, "act as a WebRTC mesh peer") flag.Parse() + // Accept either bare id or full URL + myRoom = *roomFlag + if strings.Contains(myRoom, "/r/") { + parts := strings.SplitN(myRoom, "/r/", 2) + if len(parts) == 2 { + myRoom = strings.SplitN(parts[1], "/", 2)[0] + } + } + if myRoom == "" { + fmt.Fprintln(os.Stderr, "tether-client: -room is required (e.g. -room abc12345 or paste the /r/ URL)") + os.Exit(2) + } + if *label == "" { if *sendText != "" { *label = defaultLabel("sender") @@ -128,8 +144,11 @@ func sendMessage(server, label string, m Message) { } func postMessage(server, label string, m Message) { + if m.Room == "" { + m.Room = myRoom + } body, _ := json.Marshal(m) - req, _ := http.NewRequest("POST", server+"/api/send", bytes.NewReader(body)) + req, _ := http.NewRequest("POST", server+"/api/send?room="+myRoom, bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") req.Header.Set("X-Tether-Source", label) r, err := http.DefaultClient.Do(req) @@ -149,7 +168,7 @@ func postMessage(server, label string, m Message) { // listen subscribes to the SSE stream and dispatches messages to handlers. func listen(server, label string) error { - req, _ := http.NewRequest("GET", server+"/api/stream", nil) + req, _ := http.NewRequest("GET", server+"/api/stream?room="+myRoom, nil) req.Header.Set("X-Tether-Client", label) r, err := http.DefaultClient.Do(req) if err != nil { diff --git a/server/main.go b/server/main.go index 71b4c7c..4dd46a8 100644 --- a/server/main.go +++ b/server/main.go @@ -1,20 +1,33 @@ -// tether-server v0.3: HTTP+SSE relay with extensible message envelope. +// tether-server v0.5: room-scoped, stateless mesh signaling. // -// The same /api/send + /api/stream pipeline carries TWO message kinds: -// - "clipboard" — the user-facing payload (text) -// - "signal" — WebRTC SDP/ICE for peer negotiation +// Each session is a "room" identified by a short random ID. The room +// exists implicitly while at least one subscriber is connected; goes +// away once empty (no persistence, no accounts). // -// Peers filter by .Type on the client side. Server is neutral relay. +// URL shape: +// / → landing; JS picks a room id, redirects to /r/ +// /r/ → mesh page scoped to room +// /api/send → POST { type, text|signal, from, to, source, room } +// /api/stream → SSE; query ?room= +// /metrics → Prometheus +// +// Message types on the bus: +// "clipboard" – user payload +// "signal" – WebRTC SDP / ICE (envelope.signal) +// "presence" – chirp; { from, role, ... } package main import ( + "crypto/rand" "embed" + "encoding/hex" "encoding/json" "flag" "fmt" "io/fs" "log" "net/http" + "strings" "sync" "time" @@ -23,17 +36,20 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) -//go:embed web/index.html +//go:embed web var webFS embed.FS -// Message envelope. Type defaults to "clipboard" for backward compat. +// Message envelope. type Message struct { - Type string `json:"type,omitempty"` // "clipboard" | "signal" - Text string `json:"text,omitempty"` // clipboard text - Signal json.RawMessage `json:"signal,omitempty"` // {kind:offer|answer|ice, ...} - From string `json:"from,omitempty"` // sender peer id (for signal filtering) - Source string `json:"source,omitempty"` // human-readable label - TS int64 `json:"ts"` + Type string `json:"type,omitempty"` + Text string `json:"text,omitempty"` + Signal json.RawMessage `json:"signal,omitempty"` + From string `json:"from,omitempty"` + To string `json:"to,omitempty"` + Role string `json:"role,omitempty"` + Source string `json:"source,omitempty"` + Room string `json:"room,omitempty"` + TS int64 `json:"ts"` } type bus struct { @@ -48,7 +64,6 @@ func (b *bus) subscribe(label string) chan Message { ch := make(chan Message, 32) b.mu.Lock() b.clients[ch] = label - // only replay clipboard messages — signals are time-sensitive for _, m := range b.history { if m.Type == "" || m.Type == "clipboard" { select { @@ -58,7 +73,6 @@ func (b *bus) subscribe(label string) chan Message { } } b.mu.Unlock() - subscribers.Inc() return ch } @@ -67,7 +81,6 @@ func (b *bus) unsubscribe(ch chan Message) { delete(b.clients, ch) b.mu.Unlock() close(ch) - subscribers.Dec() } func (b *bus) publish(m Message) { @@ -87,23 +100,92 @@ func (b *bus) publish(m Message) { } } +func (b *bus) size() int { + b.mu.Lock() + defer b.mu.Unlock() + return len(b.clients) +} + +// Room registry — implicit creation, GC empty rooms after a grace period. +type rooms struct { + mu sync.Mutex + byID map[string]*bus + emptyAt map[string]time.Time + graceTime time.Duration +} + +func newRooms() *rooms { + return &rooms{ + byID: map[string]*bus{}, + emptyAt: map[string]time.Time{}, + graceTime: 5 * time.Minute, + } +} + +func (r *rooms) get(id string) *bus { + r.mu.Lock() + defer r.mu.Unlock() + b, ok := r.byID[id] + if !ok { + b = newBus() + r.byID[id] = b + activeRooms.Inc() + } + delete(r.emptyAt, id) + return b +} + +func (r *rooms) noteEmpty(id string) { + r.mu.Lock() + defer r.mu.Unlock() + if b, ok := r.byID[id]; ok && b.size() == 0 { + r.emptyAt[id] = time.Now() + } +} + +func (r *rooms) gcLoop() { + t := time.NewTicker(time.Minute) + defer t.Stop() + for range t.C { + r.mu.Lock() + for id, since := range r.emptyAt { + if time.Since(since) > r.graceTime { + delete(r.byID, id) + delete(r.emptyAt, id) + activeRooms.Dec() + } + } + r.mu.Unlock() + } +} + +func newRoomID() string { + b := make([]byte, 4) // 8 hex chars — 4 billion combinations, short enough to share + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} + // Prometheus metrics var ( messages = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "tether_messages_total", - Help: "Total messages published to the broadcast bus, by source and type.", - }, []string{"source", "type"}) + Help: "Total messages published, by source / type / room.", + }, []string{"source", "type", "room"}) bytesIn = promauto.NewCounter(prometheus.CounterOpts{ Name: "tether_message_bytes_total", Help: "Total bytes of clipboard text published.", }) + activeRooms = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "tether_active_rooms", + Help: "Currently-active rooms (have at least one subscriber).", + }) subscribers = promauto.NewGauge(prometheus.GaugeOpts{ Name: "tether_active_subscribers", - Help: "Number of currently-connected SSE subscribers.", + Help: "Total SSE subscribers across all rooms.", }) publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "tether_publish_duration_seconds", - Help: "Latency of the publish() fan-out, including channel sends.", + Help: "Latency of publish() fan-out.", Buckets: prometheus.ExponentialBuckets(0.0001, 4, 8), }) ) @@ -112,13 +194,40 @@ func main() { addr := flag.String("addr", ":8765", "listen address") flag.Parse() - b := newBus() + rm := newRooms() + go rm.gcLoop() sub, _ := fs.Sub(webFS, "web") mux := http.NewServeMux() - mux.Handle("/", http.FileServer(http.FS(sub))) + mux.Handle("/static/", http.FileServer(http.FS(sub))) mux.Handle("/metrics", promhttp.Handler()) + // Landing: pick a room and 302 to /r/. + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/" { + http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound) + return + } + http.NotFound(w, r) + }) + + // Room page — same HTML for any room id. Client reads room from URL. + mux.HandleFunc("/r/", func(w http.ResponseWriter, r *http.Request) { + // Accept /r/ only (no further path segments) + id := strings.TrimPrefix(r.URL.Path, "/r/") + if id == "" || strings.Contains(id, "/") { + http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound) + return + } + w.Header().Set("Content-Type", "text/html; charset=utf-8") + f, err := webFS.ReadFile("web/index.html") + if err != nil { + http.Error(w, "missing index.html", http.StatusInternalServerError) + return + } + w.Write(f) + }) + mux.HandleFunc("/api/send", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "POST only", http.StatusMethodNotAllowed) @@ -129,6 +238,13 @@ func main() { http.Error(w, "bad json", http.StatusBadRequest) return } + if m.Room == "" { + m.Room = r.URL.Query().Get("room") + } + if m.Room == "" { + http.Error(w, "missing room", http.StatusBadRequest) + return + } if m.Type == "" { m.Type = "clipboard" } @@ -140,15 +256,13 @@ func main() { } m.TS = time.Now().UnixMilli() + b := rm.get(m.Room) t0 := time.Now() b.publish(m) publishLatency.Observe(time.Since(t0).Seconds()) - messages.WithLabelValues(m.Source, m.Type).Inc() + messages.WithLabelValues(m.Source, m.Type, m.Room).Inc() if m.Type == "clipboard" { bytesIn.Add(float64(len(m.Text))) - log.Printf("publish clipboard: %s len=%d", m.Source, len(m.Text)) - } else { - log.Printf("publish %s: from=%s", m.Type, m.From) } w.WriteHeader(http.StatusNoContent) }) @@ -159,6 +273,11 @@ func main() { http.Error(w, "no flusher", http.StatusInternalServerError) return } + room := r.URL.Query().Get("room") + if room == "" { + http.Error(w, "missing ?room=", http.StatusBadRequest) + return + } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") @@ -168,9 +287,18 @@ func main() { if label == "" { label = r.RemoteAddr } + + b := rm.get(room) ch := b.subscribe(label) - defer b.unsubscribe(ch) - log.Printf("subscribe: %s", label) + subscribers.Inc() + defer func() { + b.unsubscribe(ch) + subscribers.Dec() + if b.size() == 0 { + rm.noteEmpty(room) + } + }() + log.Printf("subscribe room=%s label=%s", room, label) ka := time.NewTicker(30 * time.Second) defer ka.Stop() @@ -178,15 +306,14 @@ func main() { for { select { case <-r.Context().Done(): - log.Printf("unsubscribe: %s", label) return case m := <-ch: bs, _ := json.Marshal(m) - eventName := m.Type - if eventName == "" { - eventName = "clipboard" + ev := m.Type + if ev == "" { + ev = "clipboard" } - fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventName, bs) + fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev, bs) fl.Flush() case <-ka.C: fmt.Fprintf(w, ": keepalive\n\n") diff --git a/server/web/index.html b/server/web/index.html index 4ff6855..2961d2c 100644 --- a/server/web/index.html +++ b/server/web/index.html @@ -16,17 +16,46 @@ padding: 18px 16px env(safe-area-inset-bottom); gap: 14px; } - header { display: flex; align-items: baseline; gap: 8px; } + header { display: flex; align-items: baseline; gap: 8px; flex-wrap: wrap; } h1 { margin: 0; font-size: 22px; font-weight: 600; letter-spacing: -0.5px; } .tag { font-size: 11px; color: #888; letter-spacing: 0.5px; text-transform: uppercase; } - .pill { - font-size: 10px; padding: 2px 8px; border-radius: 999px; - background: #1f1f1f; color: #888; letter-spacing: 0.4px; - } + .pill { font-size: 10px; padding: 2px 8px; border-radius: 999px; background: #1f1f1f; color: #888; letter-spacing: 0.4px; } .pill.live { background: #052e16; color: #4ade80; } .pill.connecting { background: #1f2937; color: #fbbf24; } + .room { + font: 12px ui-monospace, "SF Mono", monospace; + padding: 2px 8px; border-radius: 6px; + background: #1a1a1a; color: #a3a3a3; + cursor: pointer; user-select: all; + } + .room:hover { color: #f5f5f5; } .row { display: flex; flex-direction: column; gap: 6px; } label { font-size: 11px; color: #888; letter-spacing: 0.4px; text-transform: uppercase; } + + .share { + display: flex; gap: 12px; align-items: center; + background: #131313; border: 1px solid #1f1f1f; border-radius: 10px; + padding: 12px; + } + .share .qr { + width: 110px; height: 110px; + flex-shrink: 0; + background: #fff; padding: 8px; border-radius: 6px; + } + .share .qr img { width: 100%; height: 100%; } + .share .info { display: flex; flex-direction: column; gap: 6px; min-width: 0; } + .share .info small { color: #888; font-size: 11px; letter-spacing: 0.4px; text-transform: uppercase; } + .share .info .url { + font: 13px ui-monospace, "SF Mono", monospace; color: #d4d4d4; + word-break: break-all; user-select: all; + } + .share .info button { + align-self: flex-start; font-size: 12px; padding: 6px 12px; + background: #1a1a1a; color: #d4d4d4; + border: 1px solid #2a2a2a; border-radius: 6px; cursor: pointer; + } + .share .info button:hover { background: #222; } + textarea { width: 100%; min-height: 130px; resize: vertical; font: 15px -apple-system, ui-monospace, "SF Mono", monospace; @@ -63,7 +92,7 @@ } .meta { font-size: 11px; color: #666; margin-top: 4px; } .meta .rtc-badge { color: #4ade80; } - .peers { font-size: 11px; color: #666; margin-top: 4px; display: flex; flex-wrap: wrap; gap: 6px; } + .peers { font-size: 11px; color: #666; display: flex; flex-wrap: wrap; gap: 6px; } .peers .peer { background: #131313; padding: 2px 8px; border-radius: 999px; border: 1px solid #1f1f1f; } .peers .peer.rtc { color: #4ade80; border-color: #052e16; } footer { @@ -78,8 +107,19 @@

tether

mesh clipboard sse + + + new session + +
@@ -99,32 +139,41 @@