// tether-server: HTTP+SSE relay for phone↔client clipboard sync. // // v0.1: SSE-only relay with broadcast bus. // v0.2 (this): /metrics endpoint + signaling stubs (mailbox for WebRTC SDP/ICE). package main import ( "embed" "encoding/json" "flag" "fmt" "io/fs" "log" "net/http" "sync" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" ) //go:embed web/index.html var webFS embed.FS type Message struct { Text string `json:"text"` Source string `json:"source,omitempty"` TS int64 `json:"ts"` } type bus struct { mu sync.Mutex clients map[chan Message]string history []Message } func newBus() *bus { return &bus{clients: map[chan Message]string{}} } func (b *bus) subscribe(label string) chan Message { ch := make(chan Message, 16) b.mu.Lock() b.clients[ch] = label for _, m := range b.history { select { case ch <- m: default: } } b.mu.Unlock() subscribers.Inc() return ch } func (b *bus) unsubscribe(ch chan Message) { b.mu.Lock() delete(b.clients, ch) b.mu.Unlock() close(ch) subscribers.Dec() } func (b *bus) publish(m Message) { b.mu.Lock() defer b.mu.Unlock() b.history = append(b.history, m) if len(b.history) > 10 { b.history = b.history[len(b.history)-10:] } for ch := range b.clients { select { case ch <- m: default: } } } // Prometheus metrics -------------------------------------------------------- var ( messages = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "tether_messages_total", Help: "Total messages published to the broadcast bus, by source label.", }, []string{"source"}) bytesIn = promauto.NewCounter(prometheus.CounterOpts{ Name: "tether_message_bytes_total", Help: "Total bytes of message text published.", }) subscribers = promauto.NewGauge(prometheus.GaugeOpts{ Name: "tether_active_subscribers", Help: "Number of currently-connected SSE subscribers.", }) publishLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "tether_publish_duration_seconds", Help: "Latency of the publish() fan-out, including channel sends.", Buckets: prometheus.ExponentialBuckets(0.0001, 4, 8), // 0.1ms..1.6s }) ) // Signaling mailbox (v0.3 WebRTC scaffolding) -------------------------------- // Peers POST offers/answers/ICE candidates into a per-room mailbox, peers GET // to drain. Pure relay — no SDP parsing, no peer state on server. type signalBox struct { mu sync.Mutex rooms map[string][]json.RawMessage } func newSignalBox() *signalBox { return &signalBox{rooms: map[string][]json.RawMessage{}} } func (s *signalBox) post(room string, msg json.RawMessage) { s.mu.Lock() defer s.mu.Unlock() s.rooms[room] = append(s.rooms[room], msg) if len(s.rooms[room]) > 64 { s.rooms[room] = s.rooms[room][len(s.rooms[room])-64:] } } func (s *signalBox) drain(room string) []json.RawMessage { s.mu.Lock() defer s.mu.Unlock() out := s.rooms[room] delete(s.rooms, room) return out } // --------------------------------------------------------------------------- func main() { addr := flag.String("addr", ":8765", "listen address") flag.Parse() b := newBus() sig := newSignalBox() sub, _ := fs.Sub(webFS, "web") mux := http.NewServeMux() mux.Handle("/", http.FileServer(http.FS(sub))) mux.Handle("/metrics", promhttp.Handler()) mux.HandleFunc("/api/send", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "POST only", http.StatusMethodNotAllowed) return } var m Message if err := json.NewDecoder(r.Body).Decode(&m); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } if m.Source == "" { m.Source = r.Header.Get("X-Tether-Source") if m.Source == "" { m.Source = "phone" } } m.TS = time.Now().UnixMilli() t0 := time.Now() b.publish(m) publishLatency.Observe(time.Since(t0).Seconds()) messages.WithLabelValues(m.Source).Inc() bytesIn.Add(float64(len(m.Text))) log.Printf("publish: %s len=%d", m.Source, len(m.Text)) w.WriteHeader(http.StatusNoContent) }) mux.HandleFunc("/api/stream", func(w http.ResponseWriter, r *http.Request) { fl, ok := w.(http.Flusher) if !ok { http.Error(w, "no flusher", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") label := r.Header.Get("X-Tether-Client") if label == "" { label = r.RemoteAddr } ch := b.subscribe(label) defer b.unsubscribe(ch) log.Printf("subscribe: %s", label) ka := time.NewTicker(30 * time.Second) defer ka.Stop() for { select { case <-r.Context().Done(): log.Printf("unsubscribe: %s", label) return case m := <-ch: bs, _ := json.Marshal(m) fmt.Fprintf(w, "event: clipboard\ndata: %s\n\n", bs) fl.Flush() case <-ka.C: fmt.Fprintf(w, ": keepalive\n\n") fl.Flush() } } }) // WebRTC signaling: POST to add a message, GET to drain. // /api/signal/ is a dumb relay — peers exchange SDP + ICE via this. mux.HandleFunc("/api/signal/", func(w http.ResponseWriter, r *http.Request) { room := r.URL.Path[len("/api/signal/"):] if room == "" { http.Error(w, "missing room", http.StatusBadRequest) return } switch r.Method { case http.MethodPost: body, _ := func() (json.RawMessage, error) { var raw json.RawMessage err := json.NewDecoder(r.Body).Decode(&raw) return raw, err }() sig.post(room, body) w.WriteHeader(http.StatusNoContent) case http.MethodGet: out := sig.drain(room) if out == nil { out = []json.RawMessage{} } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(out) default: http.Error(w, "POST or GET", http.StatusMethodNotAllowed) } }) mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) }) log.Printf("tether-server listening on %s", *addr) log.Fatal(http.ListenAndServe(*addr, mux)) }