From 7995908c87693d9587fbd6652ef6a9cb416982d8 Mon Sep 17 00:00:00 2001 From: "Claude Opus 4.7" Date: Thu, 21 May 2026 01:09:24 -0500 Subject: [PATCH] v0.4: symmetric presence chirps + per-peer mesh Both Go peer and browser now broadcast {type:"presence", from, role} every 10s on the bus. When either side sees a presence from someone they don't yet have a RTCPeerConnection to, they initiate a new one targeted at that specific peerID via the new "to" field on signal messages. Each side keeps a map instead of the v0.3 single-connection model. This means: - N browsers can pair with M peers (true mesh) - New tabs auto-discover existing peers via their next 10s chirp - Restarts and network blips recover within 10s instead of needing a manual browser refresh - 45s lastSeen timeout sweeps disconnected peers and tears down their PeerConnection The browser UI now shows a row of peer chips that flip green when their DataChannel opens. The pill shows "rtc" if *any* peer is open, else "negotiating" if any are in progress, else "sse". Go side regenerates a random peerID per process start (was static). Co-Authored-By: Claude Opus 4.7 --- client/main.go | 294 ++++++++++++++++++++++++++++++------------ server/web/index.html | 157 ++++++++++++++-------- 2 files changed, 319 insertions(+), 132 deletions(-) diff --git a/client/main.go b/client/main.go index 0c6d930..98954fe 100644 --- a/client/main.go +++ b/client/main.go @@ -1,17 +1,20 @@ -// tether-client v0.3: SSE listener with optional WebRTC peer. +// tether-client v0.4: SSE listener with optional WebRTC mesh peer. // // Default flow (SSE only): subscribe to /api/stream, write incoming // clipboard messages to the OS clipboard. Works on Win/Linux/macOS. // -// With -rtc: also act as a WebRTC peer (Pion). Sends an SDP offer via -// the signaling bus, accepts the browser's answer, then receives -// clipboard payloads over a DataChannel — true P2P after ICE. +// With -rtc: become a WebRTC peer that maintains one RTCPeerConnection +// per active browser (mesh, not star). Symmetric "presence" chirps on +// the signaling bus let participants discover each other and +// auto-upgrade SSE → direct DataChannel. After pairing, clipboard text +// flows direct peer-to-peer with DTLS encryption. package main import ( "bufio" "bytes" - "context" + "crypto/rand" + "encoding/hex" "encoding/json" "flag" "fmt" @@ -28,7 +31,14 @@ import ( "github.com/pion/webrtc/v4" ) -const peerID = "tether-client" +// peerID — each client instance gets a stable random id for the bus. +var peerID = "tether-client-" + randomID(6) + +func randomID(n int) string { + b := make([]byte, n) + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} func defaultLabel(role string) string { return fmt.Sprintf("%s-%s-%s", runtime.GOOS, "sse", role) @@ -40,13 +50,15 @@ type Message struct { Text string `json:"text,omitempty"` Signal json.RawMessage `json:"signal,omitempty"` From string `json:"from,omitempty"` + To string `json:"to,omitempty"` + Role string `json:"role,omitempty"` Source string `json:"source,omitempty"` TS int64 `json:"ts"` } // Signal payload carried in Message.Signal. type SignalPayload struct { - Kind string `json:"kind"` // "offer" | "answer" | "ice" + Kind string `json:"kind"` // "offer" | "answer" | "ice" SDP *webrtc.SessionDescription `json:"sdp,omitempty"` Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"` } @@ -55,6 +67,7 @@ var ( noClipboard bool myLabel string useRTC bool + rtcServer string ) func main() { @@ -62,7 +75,7 @@ func main() { label := flag.String("label", "", "X-Tether-Client label (default: -sse-)") sendText := flag.String("send", "", "send this text and exit (otherwise listen)") flag.BoolVar(&noClipboard, "no-clipboard", false, "don't write incoming messages to the OS clipboard") - flag.BoolVar(&useRTC, "rtc", false, "enable WebRTC peer (uses signaling bus to negotiate)") + flag.BoolVar(&useRTC, "rtc", false, "act as a WebRTC mesh peer") flag.Parse() if *label == "" { @@ -75,6 +88,7 @@ func main() { } } myLabel = *label + rtcServer = *server if *sendText != "" { send(*server, *label, *sendText, "clipboard", nil) @@ -87,7 +101,7 @@ func main() { } if useRTC { - go runRTCPeer(*server) + go presenceChirpLoop(*server) } for { @@ -100,6 +114,20 @@ func main() { func send(server, label, text, msgType string, signal json.RawMessage) { m := Message{Type: msgType, Text: text, Source: label, From: peerID, Signal: signal} + postMessage(server, label, m) +} + +func sendMessage(server, label string, m Message) { + if m.From == "" { + m.From = peerID + } + if m.Source == "" { + m.Source = label + } + postMessage(server, label, m) +} + +func postMessage(server, label string, m Message) { body, _ := json.Marshal(m) req, _ := http.NewRequest("POST", server+"/api/send", bytes.NewReader(body)) req.Header.Set("Content-Type", "application/json") @@ -114,13 +142,12 @@ func send(server, label, text, msgType string, signal json.RawMessage) { log.Printf("send: HTTP %d", r.StatusCode) return } - if msgType != "signal" { + if m.Type == "clipboard" && m.Text != "" { fmt.Println("sent.") } } -// listen subscribes to the SSE stream. Clipboard messages → OS clipboard. -// Signal messages → forwarded to the WebRTC peer (if enabled). +// listen subscribes to the SSE stream and dispatches messages to handlers. func listen(server, label string) error { req, _ := http.NewRequest("GET", server+"/api/stream", nil) req.Header.Set("X-Tether-Client", label) @@ -132,7 +159,8 @@ func listen(server, label string) error { if r.StatusCode != 200 { return fmt.Errorf("HTTP %d", r.StatusCode) } - fmt.Fprintf(os.Stderr, "tether-client: connected to %s as %q (rtc=%v)\n", server, label, useRTC) + fmt.Fprintf(os.Stderr, "tether-client: connected to %s as %q (rtc=%v, peerID=%s)\n", + server, label, useRTC, peerID) sc := bufio.NewScanner(r.Body) sc.Buffer(make([]byte, 1<<20), 1<<20) var ev, data string @@ -160,134 +188,240 @@ func listen(server, label string) error { } func handleMessage(ev string, m Message) { + // Ignore our own messages (echo). + if m.From == peerID { + return + } switch m.Type { case "", "clipboard": ts := time.UnixMilli(m.TS).Format("15:04:05") fmt.Printf("\n────── %s from %s ──────\n%s\n", ts, m.Source, m.Text) - if !noClipboard && m.Source != myLabel { + if !noClipboard { if err := clipboard.WriteAll(m.Text); err != nil { fmt.Fprintf(os.Stderr, " ! clipboard write error: %v\n", err) } else { fmt.Fprintln(os.Stderr, " → clipboard updated") } } + case "presence": + if useRTC { + onPresence(m) + } case "signal": - if useRTC && m.From != peerID { - incomingSignal <- m + if useRTC { + onSignal(m) } } } -// ── WebRTC ──────────────────────────────────────────────────────────────── +// ── WebRTC mesh ───────────────────────────────────────────────────────────── + +type remotePeer struct { + pc *webrtc.PeerConnection + dc *webrtc.DataChannel + lastSeen time.Time + state string // last connection state +} var ( - incomingSignal = make(chan Message, 16) - rtcConnected = make(chan struct{}) + peers = make(map[string]*remotePeer) // keyed by remote peerID + peersMu sync.Mutex ) -func runRTCPeer(server string) { - api := webrtc.NewAPI() - config := webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{ - {URLs: []string{"stun:stun.l.google.com:19302"}}, - }, +func getPeer(id string) (*remotePeer, bool) { + peersMu.Lock() + defer peersMu.Unlock() + p, ok := peers[id] + return p, ok +} + +func setPeer(id string, p *remotePeer) { + peersMu.Lock() + defer peersMu.Unlock() + peers[id] = p +} + +func removePeer(id string) { + peersMu.Lock() + defer peersMu.Unlock() + if p, ok := peers[id]; ok { + if p.pc != nil { + _ = p.pc.Close() + } + delete(peers, id) } - pc, err := api.NewPeerConnection(config) - if err != nil { - log.Printf("rtc: new peer connection: %v", err) +} + +// presenceChirpLoop broadcasts our own presence every 10s and sweeps stale peers. +func presenceChirpLoop(server string) { + announce := func() { + sendMessage(server, myLabel, Message{ + Type: "presence", + Role: "peer", + From: peerID, + }) + } + announce() + t := time.NewTicker(10 * time.Second) + sweep := time.NewTicker(15 * time.Second) + defer t.Stop() + defer sweep.Stop() + for { + select { + case <-t.C: + announce() + case <-sweep.C: + peersMu.Lock() + for id, p := range peers { + if time.Since(p.lastSeen) > 45*time.Second { + fmt.Fprintf(os.Stderr, "rtc: peer %s timed out — tearing down\n", id[:8]) + if p.pc != nil { + _ = p.pc.Close() + } + delete(peers, id) + } + } + peersMu.Unlock() + } + } +} + +// onPresence: a remote participant announced themselves. If we don't yet +// have a peer connection to them, create one + send a targeted offer. +func onPresence(m Message) { + if m.From == "" || m.From == peerID { return } + p, exists := getPeer(m.From) + if exists { + // just refresh lastSeen + peersMu.Lock() + p.lastSeen = time.Now() + peersMu.Unlock() + return + } + // New peer — initiate + fmt.Fprintf(os.Stderr, "rtc: discovered new peer %s (role=%s) — sending offer\n", + m.From[:min(len(m.From), 8)], m.Role) + initiateOffer(m.From) +} +func initiateOffer(remoteID string) { + api := webrtc.NewAPI() + pc, err := api.NewPeerConnection(webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}}, + }) + if err != nil { + log.Printf("rtc: new pc for %s: %v", remoteID[:8], err) + return + } dc, err := pc.CreateDataChannel("tether", nil) if err != nil { - log.Printf("rtc: create datachannel: %v", err) + log.Printf("rtc: dc for %s: %v", remoteID[:8], err) + _ = pc.Close() return } - var once sync.Once + rp := &remotePeer{pc: pc, dc: dc, lastSeen: time.Now()} + setPeer(remoteID, rp) + dc.OnOpen(func() { - fmt.Fprintln(os.Stderr, "rtc: DataChannel OPEN — P2P live") - once.Do(func() { close(rtcConnected) }) + fmt.Fprintf(os.Stderr, "rtc: DataChannel OPEN with %s — P2P live\n", remoteID[:8]) }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { text := string(msg.Data) - fmt.Printf("\n[rtc] %s\n", text) + fmt.Printf("\n[rtc:%s] %s\n", remoteID[:8], text) if !noClipboard { if err := clipboard.WriteAll(text); err == nil { fmt.Fprintln(os.Stderr, " → clipboard updated (via rtc)") } } }) - pc.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { return } init := c.ToJSON() payload, _ := json.Marshal(SignalPayload{Kind: "ice", Candidate: &init}) - send(server, myLabel, "", "signal", payload) + sendMessage(rtcServer, myLabel, Message{ + Type: "signal", + From: peerID, + To: remoteID, + Signal: payload, + }) }) pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) { - fmt.Fprintf(os.Stderr, "rtc: state=%s\n", s) + peersMu.Lock() + if p, ok := peers[remoteID]; ok { + p.state = s.String() + } + peersMu.Unlock() + fmt.Fprintf(os.Stderr, "rtc: %s state=%s\n", remoteID[:8], s) + if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed { + removePeer(remoteID) + } }) - // Create offer + post once immediately offer, err := pc.CreateOffer(nil) if err != nil { - log.Printf("rtc: create offer: %v", err) + log.Printf("rtc: create offer for %s: %v", remoteID[:8], err) return } if err := pc.SetLocalDescription(offer); err != nil { - log.Printf("rtc: set local desc: %v", err) + log.Printf("rtc: set local desc for %s: %v", remoteID[:8], err) return } payload, _ := json.Marshal(SignalPayload{Kind: "offer", SDP: &offer}) - send(server, myLabel, "", "signal", payload) - fmt.Fprintln(os.Stderr, "rtc: offer posted, will chirp every 5s until paired...") + sendMessage(rtcServer, myLabel, Message{ + Type: "signal", + From: peerID, + To: remoteID, + Signal: payload, + }) +} - // "Chirp": re-post the offer every 5s until DataChannel opens. Catches - // late-joining browsers without needing server-side history of signals. - go func() { - t := time.NewTicker(5 * time.Second) - defer t.Stop() - for { - select { - case <-rtcConnected: - fmt.Fprintln(os.Stderr, "rtc: paired — chirping stopped") - return - case <-t.C: - p, _ := json.Marshal(SignalPayload{Kind: "offer", SDP: &offer}) - send(server, myLabel, "", "signal", p) +// onSignal: routed to the relevant peer connection by `To` field. +func onSignal(m Message) { + // Only handle signals targeted at us (or untargeted = legacy). + if m.To != "" && m.To != peerID { + return + } + var sp SignalPayload + if err := json.Unmarshal(m.Signal, &sp); err != nil { + return + } + p, ok := getPeer(m.From) + if !ok { + // Could be an answer/ice from someone we don't know about yet — + // shouldn't happen in normal flow because they wouldn't answer + // without first seeing our offer. Drop. + return + } + switch sp.Kind { + case "answer": + if sp.SDP != nil { + if err := p.pc.SetRemoteDescription(*sp.SDP); err != nil { + if !strings.Contains(err.Error(), "stable->SetRemote(answer)->stable") { + log.Printf("rtc: set remote desc for %s: %v", m.From[:8], err) + } + } else { + fmt.Fprintf(os.Stderr, "rtc: answer applied for %s\n", m.From[:8]) } } - }() - - // Process incoming signals - for { - select { - case msg := <-incomingSignal: - var sp SignalPayload - if err := json.Unmarshal(msg.Signal, &sp); err != nil { - continue - } - switch sp.Kind { - case "answer": - if sp.SDP != nil { - if err := pc.SetRemoteDescription(*sp.SDP); err != nil { - log.Printf("rtc: set remote desc: %v", err) - } else { - fmt.Fprintln(os.Stderr, "rtc: answer applied") - } - } - case "ice": - if sp.Candidate != nil { - if err := pc.AddICECandidate(*sp.Candidate); err != nil { - log.Printf("rtc: add ice: %v", err) - } + case "ice": + if sp.Candidate != nil { + if err := p.pc.AddICECandidate(*sp.Candidate); err != nil { + if !strings.Contains(err.Error(), "remote description is not set") { + log.Printf("rtc: add ice for %s: %v", m.From[:8], err) } } - case <-context.Background().Done(): - return } } } + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/server/web/index.html b/server/web/index.html index bbf6ce4..4ff6855 100644 --- a/server/web/index.html +++ b/server/web/index.html @@ -63,6 +63,9 @@ } .meta { font-size: 11px; color: #666; margin-top: 4px; } .meta .rtc-badge { color: #4ade80; } + .peers { font-size: 11px; color: #666; margin-top: 4px; display: flex; flex-wrap: wrap; gap: 6px; } + .peers .peer { background: #131313; padding: 2px 8px; border-radius: 999px; border: 1px solid #1f1f1f; } + .peers .peer.rtc { color: #4ade80; border-color: #052e16; } footer { margin-top: auto; padding-top: 8px; font-size: 11px; color: #555; text-align: center; @@ -73,12 +76,14 @@

tether

- phone ↔ laptop + mesh clipboard sse
+
+
- +
@@ -89,18 +94,18 @@
idle
-

received from laptop

+

received