Server now parses the User-Agent header on /api/send when no explicit source is provided, producing labels like 'iphone-safari', 'macos-chrome', 'windows-firefox' so the feed shows where messages came from at a glance. Browser-side peerID is now stored in localStorage instead of being freshly random on each page load — refreshing or reopening the tab keeps the same identity for the same browser profile. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
366 lines
8.7 KiB
Go
366 lines
8.7 KiB
Go
// tether-server v0.5: room-scoped, stateless mesh signaling.
|
||
//
|
||
// Each session is a "room" identified by a short random ID. The room
|
||
// exists implicitly while at least one subscriber is connected; goes
|
||
// away once empty (no persistence, no accounts).
|
||
//
|
||
// URL shape:
|
||
// / → landing; JS picks a room id, redirects to /r/<id>
|
||
// /r/<id> → mesh page scoped to room <id>
|
||
// /api/send → POST { type, text|signal, from, to, source, room }
|
||
// /api/stream → SSE; query ?room=<id>
|
||
// /metrics → Prometheus
|
||
//
|
||
// Message types on the bus:
|
||
// "clipboard" – user payload
|
||
// "signal" – WebRTC SDP / ICE (envelope.signal)
|
||
// "presence" – chirp; { from, role, ... }
|
||
package main
|
||
|
||
import (
|
||
"crypto/rand"
|
||
"embed"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"io/fs"
|
||
"log"
|
||
"net/http"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/mileusna/useragent"
|
||
"github.com/prometheus/client_golang/prometheus"
|
||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||
)
|
||
|
||
// labelFromUA returns a short, human-friendly platform label like
|
||
// "iphone-safari", "macos-chrome", "windows-firefox". Falls back to "web".
|
||
func labelFromUA(uaStr string) string {
|
||
ua := useragent.Parse(uaStr)
|
||
platform := strings.ToLower(ua.OS)
|
||
switch {
|
||
case ua.IsIOS() && strings.Contains(strings.ToLower(ua.Device), "ipad"):
|
||
platform = "ipad"
|
||
case ua.IsIOS():
|
||
platform = "iphone"
|
||
case ua.IsAndroid():
|
||
platform = "android"
|
||
case ua.IsMacOS():
|
||
platform = "macos"
|
||
case ua.IsWindows():
|
||
platform = "windows"
|
||
case ua.IsLinux():
|
||
platform = "linux"
|
||
}
|
||
browser := strings.ToLower(ua.Name)
|
||
browser = strings.ReplaceAll(browser, " ", "")
|
||
if platform == "" && browser == "" {
|
||
return "web"
|
||
}
|
||
if browser == "" {
|
||
return platform
|
||
}
|
||
if platform == "" {
|
||
return browser
|
||
}
|
||
return platform + "-" + browser
|
||
}
|
||
|
||
//go:embed web
|
||
var webFS embed.FS
|
||
|
||
// Message envelope.
|
||
type Message struct {
|
||
Type string `json:"type,omitempty"`
|
||
Text string `json:"text,omitempty"`
|
||
Signal json.RawMessage `json:"signal,omitempty"`
|
||
From string `json:"from,omitempty"`
|
||
To string `json:"to,omitempty"`
|
||
Role string `json:"role,omitempty"`
|
||
Source string `json:"source,omitempty"`
|
||
Room string `json:"room,omitempty"`
|
||
TS int64 `json:"ts"`
|
||
}
|
||
|
||
type bus struct {
|
||
mu sync.Mutex
|
||
clients map[chan Message]string
|
||
history []Message
|
||
}
|
||
|
||
func newBus() *bus { return &bus{clients: map[chan Message]string{}} }
|
||
|
||
func (b *bus) subscribe(label string) chan Message {
|
||
ch := make(chan Message, 32)
|
||
b.mu.Lock()
|
||
b.clients[ch] = label
|
||
for _, m := range b.history {
|
||
if m.Type == "" || m.Type == "clipboard" {
|
||
select {
|
||
case ch <- m:
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
b.mu.Unlock()
|
||
return ch
|
||
}
|
||
|
||
func (b *bus) unsubscribe(ch chan Message) {
|
||
b.mu.Lock()
|
||
delete(b.clients, ch)
|
||
b.mu.Unlock()
|
||
close(ch)
|
||
}
|
||
|
||
func (b *bus) publish(m Message) {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
if m.Type == "" || m.Type == "clipboard" {
|
||
b.history = append(b.history, m)
|
||
if len(b.history) > 10 {
|
||
b.history = b.history[len(b.history)-10:]
|
||
}
|
||
}
|
||
for ch := range b.clients {
|
||
select {
|
||
case ch <- m:
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
func (b *bus) size() int {
|
||
b.mu.Lock()
|
||
defer b.mu.Unlock()
|
||
return len(b.clients)
|
||
}
|
||
|
||
// Room registry — implicit creation, GC empty rooms after a grace period.
|
||
type rooms struct {
|
||
mu sync.Mutex
|
||
byID map[string]*bus
|
||
emptyAt map[string]time.Time
|
||
graceTime time.Duration
|
||
}
|
||
|
||
func newRooms() *rooms {
|
||
return &rooms{
|
||
byID: map[string]*bus{},
|
||
emptyAt: map[string]time.Time{},
|
||
graceTime: 5 * time.Minute,
|
||
}
|
||
}
|
||
|
||
func (r *rooms) get(id string) *bus {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
b, ok := r.byID[id]
|
||
if !ok {
|
||
b = newBus()
|
||
r.byID[id] = b
|
||
activeRooms.Inc()
|
||
}
|
||
delete(r.emptyAt, id)
|
||
return b
|
||
}
|
||
|
||
func (r *rooms) noteEmpty(id string) {
|
||
r.mu.Lock()
|
||
defer r.mu.Unlock()
|
||
if b, ok := r.byID[id]; ok && b.size() == 0 {
|
||
r.emptyAt[id] = time.Now()
|
||
}
|
||
}
|
||
|
||
func (r *rooms) gcLoop() {
|
||
t := time.NewTicker(time.Minute)
|
||
defer t.Stop()
|
||
for range t.C {
|
||
r.mu.Lock()
|
||
for id, since := range r.emptyAt {
|
||
if time.Since(since) > r.graceTime {
|
||
delete(r.byID, id)
|
||
delete(r.emptyAt, id)
|
||
activeRooms.Dec()
|
||
}
|
||
}
|
||
r.mu.Unlock()
|
||
}
|
||
}
|
||
|
||
func newRoomID() string {
|
||
b := make([]byte, 4) // 8 hex chars — 4 billion combinations, short enough to share
|
||
_, _ = rand.Read(b)
|
||
return hex.EncodeToString(b)
|
||
}
|
||
|
||
// Prometheus metrics
|
||
var (
|
||
messages = promauto.NewCounterVec(prometheus.CounterOpts{
|
||
Name: "tether_messages_total",
|
||
Help: "Total messages published, by source / type / room.",
|
||
}, []string{"source", "type", "room"})
|
||
bytesIn = promauto.NewCounter(prometheus.CounterOpts{
|
||
Name: "tether_message_bytes_total",
|
||
Help: "Total bytes of clipboard text published.",
|
||
})
|
||
activeRooms = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "tether_active_rooms",
|
||
Help: "Currently-active rooms (have at least one subscriber).",
|
||
})
|
||
subscribers = promauto.NewGauge(prometheus.GaugeOpts{
|
||
Name: "tether_active_subscribers",
|
||
Help: "Total SSE subscribers across all rooms.",
|
||
})
|
||
publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{
|
||
Name: "tether_publish_duration_seconds",
|
||
Help: "Latency of publish() fan-out.",
|
||
Buckets: prometheus.ExponentialBuckets(0.0001, 4, 8),
|
||
})
|
||
)
|
||
|
||
func main() {
|
||
addr := flag.String("addr", ":8765", "listen address")
|
||
flag.Parse()
|
||
|
||
rm := newRooms()
|
||
go rm.gcLoop()
|
||
|
||
sub, _ := fs.Sub(webFS, "web")
|
||
mux := http.NewServeMux()
|
||
mux.Handle("/static/", http.FileServer(http.FS(sub)))
|
||
mux.Handle("/metrics", promhttp.Handler())
|
||
|
||
// Landing: pick a room and 302 to /r/<id>.
|
||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.URL.Path == "/" {
|
||
http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound)
|
||
return
|
||
}
|
||
http.NotFound(w, r)
|
||
})
|
||
|
||
// Room page — same HTML for any room id. Client reads room from URL.
|
||
mux.HandleFunc("/r/", func(w http.ResponseWriter, r *http.Request) {
|
||
// Accept /r/<id> only (no further path segments)
|
||
id := strings.TrimPrefix(r.URL.Path, "/r/")
|
||
if id == "" || strings.Contains(id, "/") {
|
||
http.Redirect(w, r, "/r/"+newRoomID(), http.StatusFound)
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||
f, err := webFS.ReadFile("web/index.html")
|
||
if err != nil {
|
||
http.Error(w, "missing index.html", http.StatusInternalServerError)
|
||
return
|
||
}
|
||
w.Write(f)
|
||
})
|
||
|
||
mux.HandleFunc("/api/send", func(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
http.Error(w, "POST only", http.StatusMethodNotAllowed)
|
||
return
|
||
}
|
||
var m Message
|
||
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
|
||
http.Error(w, "bad json", http.StatusBadRequest)
|
||
return
|
||
}
|
||
if m.Room == "" {
|
||
m.Room = r.URL.Query().Get("room")
|
||
}
|
||
if m.Room == "" {
|
||
http.Error(w, "missing room", http.StatusBadRequest)
|
||
return
|
||
}
|
||
if m.Type == "" {
|
||
m.Type = "clipboard"
|
||
}
|
||
if m.Source == "" {
|
||
m.Source = r.Header.Get("X-Tether-Source")
|
||
if m.Source == "" {
|
||
m.Source = labelFromUA(r.Header.Get("User-Agent"))
|
||
}
|
||
}
|
||
m.TS = time.Now().UnixMilli()
|
||
|
||
b := rm.get(m.Room)
|
||
t0 := time.Now()
|
||
b.publish(m)
|
||
publishLatency.Observe(time.Since(t0).Seconds())
|
||
messages.WithLabelValues(m.Source, m.Type, m.Room).Inc()
|
||
if m.Type == "clipboard" {
|
||
bytesIn.Add(float64(len(m.Text)))
|
||
}
|
||
w.WriteHeader(http.StatusNoContent)
|
||
})
|
||
|
||
mux.HandleFunc("/api/stream", func(w http.ResponseWriter, r *http.Request) {
|
||
fl, ok := w.(http.Flusher)
|
||
if !ok {
|
||
http.Error(w, "no flusher", http.StatusInternalServerError)
|
||
return
|
||
}
|
||
room := r.URL.Query().Get("room")
|
||
if room == "" {
|
||
http.Error(w, "missing ?room=", http.StatusBadRequest)
|
||
return
|
||
}
|
||
w.Header().Set("Content-Type", "text/event-stream")
|
||
w.Header().Set("Cache-Control", "no-cache")
|
||
w.Header().Set("Connection", "keep-alive")
|
||
w.Header().Set("X-Accel-Buffering", "no")
|
||
|
||
label := r.Header.Get("X-Tether-Client")
|
||
if label == "" {
|
||
label = r.RemoteAddr
|
||
}
|
||
|
||
b := rm.get(room)
|
||
ch := b.subscribe(label)
|
||
subscribers.Inc()
|
||
defer func() {
|
||
b.unsubscribe(ch)
|
||
subscribers.Dec()
|
||
if b.size() == 0 {
|
||
rm.noteEmpty(room)
|
||
}
|
||
}()
|
||
log.Printf("subscribe room=%s label=%s", room, label)
|
||
|
||
ka := time.NewTicker(30 * time.Second)
|
||
defer ka.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-r.Context().Done():
|
||
return
|
||
case m := <-ch:
|
||
bs, _ := json.Marshal(m)
|
||
ev := m.Type
|
||
if ev == "" {
|
||
ev = "clipboard"
|
||
}
|
||
fmt.Fprintf(w, "event: %s\ndata: %s\n\n", ev, bs)
|
||
fl.Flush()
|
||
case <-ka.C:
|
||
fmt.Fprintf(w, ": keepalive\n\n")
|
||
fl.Flush()
|
||
}
|
||
}
|
||
})
|
||
|
||
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
||
w.Write([]byte("ok"))
|
||
})
|
||
|
||
log.Printf("tether-server listening on %s", *addr)
|
||
log.Fatal(http.ListenAndServe(*addr, mux))
|
||
}
|