// tether-server v0.3: HTTP+SSE relay with extensible message envelope. // // The same /api/send + /api/stream pipeline carries TWO message kinds: // - "clipboard" — the user-facing payload (text) // - "signal" — WebRTC SDP/ICE for peer negotiation // // Peers filter by .Type on the client side. Server is neutral relay. package main import ( "embed" "encoding/json" "flag" "fmt" "io/fs" "log" "net/http" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ) //go:embed web/index.html var webFS embed.FS // Message envelope. Type defaults to "clipboard" for backward compat. type Message struct { Type string `json:"type,omitempty"` // "clipboard" | "signal" Text string `json:"text,omitempty"` // clipboard text Signal json.RawMessage `json:"signal,omitempty"` // {kind:offer|answer|ice, ...} From string `json:"from,omitempty"` // sender peer id (for signal filtering) Source string `json:"source,omitempty"` // human-readable label TS int64 `json:"ts"` } type bus struct { mu sync.Mutex clients map[chan Message]string history []Message } func newBus() *bus { return &bus{clients: map[chan Message]string{}} } func (b *bus) subscribe(label string) chan Message { ch := make(chan Message, 32) b.mu.Lock() b.clients[ch] = label // only replay clipboard messages — signals are time-sensitive for _, m := range b.history { if m.Type == "" || m.Type == "clipboard" { select { case ch <- m: default: } } } b.mu.Unlock() subscribers.Inc() return ch } func (b *bus) unsubscribe(ch chan Message) { b.mu.Lock() delete(b.clients, ch) b.mu.Unlock() close(ch) subscribers.Dec() } func (b *bus) publish(m Message) { b.mu.Lock() defer b.mu.Unlock() if m.Type == "" || m.Type == "clipboard" { b.history = append(b.history, m) if len(b.history) > 10 { b.history = b.history[len(b.history)-10:] } } for ch := range b.clients { select { case ch <- m: default: } } } // Prometheus metrics var ( messages = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "tether_messages_total", Help: "Total messages published to the broadcast bus, by source and type.", }, []string{"source", "type"}) bytesIn = promauto.NewCounter(prometheus.CounterOpts{ Name: "tether_message_bytes_total", Help: "Total bytes of clipboard text published.", }) subscribers = promauto.NewGauge(prometheus.GaugeOpts{ Name: "tether_active_subscribers", Help: "Number of currently-connected SSE subscribers.", }) publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "tether_publish_duration_seconds", Help: "Latency of the publish() fan-out, including channel sends.", Buckets: prometheus.ExponentialBuckets(0.0001, 4, 8), }) ) func main() { addr := flag.String("addr", ":8765", "listen address") flag.Parse() b := newBus() sub, _ := fs.Sub(webFS, "web") mux := http.NewServeMux() mux.Handle("/", http.FileServer(http.FS(sub))) mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/api/send", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "POST only", http.StatusMethodNotAllowed) return } var m Message if err := json.NewDecoder(r.Body).Decode(&m); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } if m.Type == "" { m.Type = "clipboard" } if m.Source == "" { m.Source = r.Header.Get("X-Tether-Source") if m.Source == "" { m.Source = "phone" } } m.TS = time.Now().UnixMilli() t0 := time.Now() b.publish(m) publishLatency.Observe(time.Since(t0).Seconds()) messages.WithLabelValues(m.Source, m.Type).Inc() if m.Type == "clipboard" { bytesIn.Add(float64(len(m.Text))) log.Printf("publish clipboard: %s len=%d", m.Source, len(m.Text)) } else { log.Printf("publish %s: from=%s", m.Type, m.From) } w.WriteHeader(http.StatusNoContent) }) mux.HandleFunc("/api/stream", func(w http.ResponseWriter, r *http.Request) { fl, ok := w.(http.Flusher) if !ok { http.Error(w, "no flusher", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") label := r.Header.Get("X-Tether-Client") if label == "" { label = r.RemoteAddr } ch := b.subscribe(label) defer b.unsubscribe(ch) log.Printf("subscribe: %s", label) ka := time.NewTicker(30 * time.Second) defer ka.Stop() for { select { case <-r.Context().Done(): log.Printf("unsubscribe: %s", label) return case m := <-ch: bs, _ := json.Marshal(m) eventName := m.Type if eventName == "" { eventName = "clipboard" } fmt.Fprintf(w, "event: %s\ndata: %s\n\n", eventName, bs) fl.Flush() case <-ka.C: fmt.Fprintf(w, ": keepalive\n\n") fl.Flush() } } }) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) }) log.Printf("tether-server listening on %s", *addr) log.Fatal(http.ListenAndServe(*addr, mux)) }