v0.4: symmetric presence chirps + per-peer mesh

Both Go peer and browser now broadcast {type:"presence", from, role}
every 10s on the bus. When either side sees a presence from someone
they don't yet have a RTCPeerConnection to, they initiate a new one
targeted at that specific peerID via the new "to" field on signal
messages. Each side keeps a map<peerID, RTCPeerConnection> instead of
the v0.3 single-connection model.

This means:
- N browsers can pair with M peers (true mesh)
- New tabs auto-discover existing peers via their next 10s chirp
- Restarts and network blips recover within 10s instead of needing
  a manual browser refresh
- 45s lastSeen timeout sweeps disconnected peers and tears down their
  PeerConnection

The browser UI now shows a row of peer chips that flip green when their
DataChannel opens. The pill shows "rtc" if *any* peer is open, else
"negotiating" if any are in progress, else "sse".

Go side regenerates a random peerID per process start (was static).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Claude Opus 4.7
2026-05-21 01:09:24 -05:00
parent 6ea7ed579e
commit 7995908c87
2 changed files with 319 additions and 132 deletions

View File

@@ -1,17 +1,20 @@
// tether-client v0.3: SSE listener with optional WebRTC peer.
// tether-client v0.4: SSE listener with optional WebRTC mesh peer.
//
// Default flow (SSE only): subscribe to /api/stream, write incoming
// clipboard messages to the OS clipboard. Works on Win/Linux/macOS.
//
// With -rtc: also act as a WebRTC peer (Pion). Sends an SDP offer via
// the signaling bus, accepts the browser's answer, then receives
// clipboard payloads over a DataChannel — true P2P after ICE.
// With -rtc: become a WebRTC peer that maintains one RTCPeerConnection
// per active browser (mesh, not star). Symmetric "presence" chirps on
// the signaling bus let participants discover each other and
// auto-upgrade SSE → direct DataChannel. After pairing, clipboard text
// flows direct peer-to-peer with DTLS encryption.
package main
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
@@ -28,7 +31,14 @@ import (
"github.com/pion/webrtc/v4"
)
const peerID = "tether-client"
// peerID — each client instance gets a stable random id for the bus.
var peerID = "tether-client-" + randomID(6)
func randomID(n int) string {
b := make([]byte, n)
_, _ = rand.Read(b)
return hex.EncodeToString(b)
}
func defaultLabel(role string) string {
return fmt.Sprintf("%s-%s-%s", runtime.GOOS, "sse", role)
@@ -40,13 +50,15 @@ type Message struct {
Text string `json:"text,omitempty"`
Signal json.RawMessage `json:"signal,omitempty"`
From string `json:"from,omitempty"`
To string `json:"to,omitempty"`
Role string `json:"role,omitempty"`
Source string `json:"source,omitempty"`
TS int64 `json:"ts"`
}
// Signal payload carried in Message.Signal.
type SignalPayload struct {
Kind string `json:"kind"` // "offer" | "answer" | "ice"
Kind string `json:"kind"` // "offer" | "answer" | "ice"
SDP *webrtc.SessionDescription `json:"sdp,omitempty"`
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
}
@@ -55,6 +67,7 @@ var (
noClipboard bool
myLabel string
useRTC bool
rtcServer string
)
func main() {
@@ -62,7 +75,7 @@ func main() {
label := flag.String("label", "", "X-Tether-Client label (default: <os>-sse-<role>)")
sendText := flag.String("send", "", "send this text and exit (otherwise listen)")
flag.BoolVar(&noClipboard, "no-clipboard", false, "don't write incoming messages to the OS clipboard")
flag.BoolVar(&useRTC, "rtc", false, "enable WebRTC peer (uses signaling bus to negotiate)")
flag.BoolVar(&useRTC, "rtc", false, "act as a WebRTC mesh peer")
flag.Parse()
if *label == "" {
@@ -75,6 +88,7 @@ func main() {
}
}
myLabel = *label
rtcServer = *server
if *sendText != "" {
send(*server, *label, *sendText, "clipboard", nil)
@@ -87,7 +101,7 @@ func main() {
}
if useRTC {
go runRTCPeer(*server)
go presenceChirpLoop(*server)
}
for {
@@ -100,6 +114,20 @@ func main() {
func send(server, label, text, msgType string, signal json.RawMessage) {
m := Message{Type: msgType, Text: text, Source: label, From: peerID, Signal: signal}
postMessage(server, label, m)
}
func sendMessage(server, label string, m Message) {
if m.From == "" {
m.From = peerID
}
if m.Source == "" {
m.Source = label
}
postMessage(server, label, m)
}
func postMessage(server, label string, m Message) {
body, _ := json.Marshal(m)
req, _ := http.NewRequest("POST", server+"/api/send", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
@@ -114,13 +142,12 @@ func send(server, label, text, msgType string, signal json.RawMessage) {
log.Printf("send: HTTP %d", r.StatusCode)
return
}
if msgType != "signal" {
if m.Type == "clipboard" && m.Text != "" {
fmt.Println("sent.")
}
}
// listen subscribes to the SSE stream. Clipboard messages → OS clipboard.
// Signal messages → forwarded to the WebRTC peer (if enabled).
// listen subscribes to the SSE stream and dispatches messages to handlers.
func listen(server, label string) error {
req, _ := http.NewRequest("GET", server+"/api/stream", nil)
req.Header.Set("X-Tether-Client", label)
@@ -132,7 +159,8 @@ func listen(server, label string) error {
if r.StatusCode != 200 {
return fmt.Errorf("HTTP %d", r.StatusCode)
}
fmt.Fprintf(os.Stderr, "tether-client: connected to %s as %q (rtc=%v)\n", server, label, useRTC)
fmt.Fprintf(os.Stderr, "tether-client: connected to %s as %q (rtc=%v, peerID=%s)\n",
server, label, useRTC, peerID)
sc := bufio.NewScanner(r.Body)
sc.Buffer(make([]byte, 1<<20), 1<<20)
var ev, data string
@@ -160,134 +188,240 @@ func listen(server, label string) error {
}
func handleMessage(ev string, m Message) {
// Ignore our own messages (echo).
if m.From == peerID {
return
}
switch m.Type {
case "", "clipboard":
ts := time.UnixMilli(m.TS).Format("15:04:05")
fmt.Printf("\n────── %s from %s ──────\n%s\n", ts, m.Source, m.Text)
if !noClipboard && m.Source != myLabel {
if !noClipboard {
if err := clipboard.WriteAll(m.Text); err != nil {
fmt.Fprintf(os.Stderr, " ! clipboard write error: %v\n", err)
} else {
fmt.Fprintln(os.Stderr, " → clipboard updated")
}
}
case "presence":
if useRTC {
onPresence(m)
}
case "signal":
if useRTC && m.From != peerID {
incomingSignal <- m
if useRTC {
onSignal(m)
}
}
}
// ── WebRTC ────────────────────────────────────────────────────────────────
// ── WebRTC mesh ─────────────────────────────────────────────────────────────
type remotePeer struct {
pc *webrtc.PeerConnection
dc *webrtc.DataChannel
lastSeen time.Time
state string // last connection state
}
var (
incomingSignal = make(chan Message, 16)
rtcConnected = make(chan struct{})
peers = make(map[string]*remotePeer) // keyed by remote peerID
peersMu sync.Mutex
)
func runRTCPeer(server string) {
api := webrtc.NewAPI()
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}},
},
func getPeer(id string) (*remotePeer, bool) {
peersMu.Lock()
defer peersMu.Unlock()
p, ok := peers[id]
return p, ok
}
func setPeer(id string, p *remotePeer) {
peersMu.Lock()
defer peersMu.Unlock()
peers[id] = p
}
func removePeer(id string) {
peersMu.Lock()
defer peersMu.Unlock()
if p, ok := peers[id]; ok {
if p.pc != nil {
_ = p.pc.Close()
}
delete(peers, id)
}
pc, err := api.NewPeerConnection(config)
if err != nil {
log.Printf("rtc: new peer connection: %v", err)
}
// presenceChirpLoop broadcasts our own presence every 10s and sweeps stale peers.
func presenceChirpLoop(server string) {
announce := func() {
sendMessage(server, myLabel, Message{
Type: "presence",
Role: "peer",
From: peerID,
})
}
announce()
t := time.NewTicker(10 * time.Second)
sweep := time.NewTicker(15 * time.Second)
defer t.Stop()
defer sweep.Stop()
for {
select {
case <-t.C:
announce()
case <-sweep.C:
peersMu.Lock()
for id, p := range peers {
if time.Since(p.lastSeen) > 45*time.Second {
fmt.Fprintf(os.Stderr, "rtc: peer %s timed out — tearing down\n", id[:8])
if p.pc != nil {
_ = p.pc.Close()
}
delete(peers, id)
}
}
peersMu.Unlock()
}
}
}
// onPresence: a remote participant announced themselves. If we don't yet
// have a peer connection to them, create one + send a targeted offer.
func onPresence(m Message) {
if m.From == "" || m.From == peerID {
return
}
p, exists := getPeer(m.From)
if exists {
// just refresh lastSeen
peersMu.Lock()
p.lastSeen = time.Now()
peersMu.Unlock()
return
}
// New peer — initiate
fmt.Fprintf(os.Stderr, "rtc: discovered new peer %s (role=%s) — sending offer\n",
m.From[:min(len(m.From), 8)], m.Role)
initiateOffer(m.From)
}
func initiateOffer(remoteID string) {
api := webrtc.NewAPI()
pc, err := api.NewPeerConnection(webrtc.Configuration{
ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}},
})
if err != nil {
log.Printf("rtc: new pc for %s: %v", remoteID[:8], err)
return
}
dc, err := pc.CreateDataChannel("tether", nil)
if err != nil {
log.Printf("rtc: create datachannel: %v", err)
log.Printf("rtc: dc for %s: %v", remoteID[:8], err)
_ = pc.Close()
return
}
var once sync.Once
rp := &remotePeer{pc: pc, dc: dc, lastSeen: time.Now()}
setPeer(remoteID, rp)
dc.OnOpen(func() {
fmt.Fprintln(os.Stderr, "rtc: DataChannel OPEN — P2P live")
once.Do(func() { close(rtcConnected) })
fmt.Fprintf(os.Stderr, "rtc: DataChannel OPEN with %s — P2P live\n", remoteID[:8])
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
text := string(msg.Data)
fmt.Printf("\n[rtc] %s\n", text)
fmt.Printf("\n[rtc:%s] %s\n", remoteID[:8], text)
if !noClipboard {
if err := clipboard.WriteAll(text); err == nil {
fmt.Fprintln(os.Stderr, " → clipboard updated (via rtc)")
}
}
})
pc.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
init := c.ToJSON()
payload, _ := json.Marshal(SignalPayload{Kind: "ice", Candidate: &init})
send(server, myLabel, "", "signal", payload)
sendMessage(rtcServer, myLabel, Message{
Type: "signal",
From: peerID,
To: remoteID,
Signal: payload,
})
})
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
fmt.Fprintf(os.Stderr, "rtc: state=%s\n", s)
peersMu.Lock()
if p, ok := peers[remoteID]; ok {
p.state = s.String()
}
peersMu.Unlock()
fmt.Fprintf(os.Stderr, "rtc: %s state=%s\n", remoteID[:8], s)
if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed {
removePeer(remoteID)
}
})
// Create offer + post once immediately
offer, err := pc.CreateOffer(nil)
if err != nil {
log.Printf("rtc: create offer: %v", err)
log.Printf("rtc: create offer for %s: %v", remoteID[:8], err)
return
}
if err := pc.SetLocalDescription(offer); err != nil {
log.Printf("rtc: set local desc: %v", err)
log.Printf("rtc: set local desc for %s: %v", remoteID[:8], err)
return
}
payload, _ := json.Marshal(SignalPayload{Kind: "offer", SDP: &offer})
send(server, myLabel, "", "signal", payload)
fmt.Fprintln(os.Stderr, "rtc: offer posted, will chirp every 5s until paired...")
sendMessage(rtcServer, myLabel, Message{
Type: "signal",
From: peerID,
To: remoteID,
Signal: payload,
})
}
// "Chirp": re-post the offer every 5s until DataChannel opens. Catches
// late-joining browsers without needing server-side history of signals.
go func() {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-rtcConnected:
fmt.Fprintln(os.Stderr, "rtc: paired — chirping stopped")
return
case <-t.C:
p, _ := json.Marshal(SignalPayload{Kind: "offer", SDP: &offer})
send(server, myLabel, "", "signal", p)
// onSignal: routed to the relevant peer connection by `To` field.
func onSignal(m Message) {
// Only handle signals targeted at us (or untargeted = legacy).
if m.To != "" && m.To != peerID {
return
}
var sp SignalPayload
if err := json.Unmarshal(m.Signal, &sp); err != nil {
return
}
p, ok := getPeer(m.From)
if !ok {
// Could be an answer/ice from someone we don't know about yet —
// shouldn't happen in normal flow because they wouldn't answer
// without first seeing our offer. Drop.
return
}
switch sp.Kind {
case "answer":
if sp.SDP != nil {
if err := p.pc.SetRemoteDescription(*sp.SDP); err != nil {
if !strings.Contains(err.Error(), "stable->SetRemote(answer)->stable") {
log.Printf("rtc: set remote desc for %s: %v", m.From[:8], err)
}
} else {
fmt.Fprintf(os.Stderr, "rtc: answer applied for %s\n", m.From[:8])
}
}
}()
// Process incoming signals
for {
select {
case msg := <-incomingSignal:
var sp SignalPayload
if err := json.Unmarshal(msg.Signal, &sp); err != nil {
continue
}
switch sp.Kind {
case "answer":
if sp.SDP != nil {
if err := pc.SetRemoteDescription(*sp.SDP); err != nil {
log.Printf("rtc: set remote desc: %v", err)
} else {
fmt.Fprintln(os.Stderr, "rtc: answer applied")
}
}
case "ice":
if sp.Candidate != nil {
if err := pc.AddICECandidate(*sp.Candidate); err != nil {
log.Printf("rtc: add ice: %v", err)
}
case "ice":
if sp.Candidate != nil {
if err := p.pc.AddICECandidate(*sp.Candidate); err != nil {
if !strings.Contains(err.Error(), "remote description is not set") {
log.Printf("rtc: add ice for %s: %v", m.From[:8], err)
}
}
case <-context.Background().Done():
return
}
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}