The single shared mesh is replaced by per-session rooms. Visit / and the server mints a random 8-hex-char id, redirects to /r/<id>. That URL IS the session — share the link (or scan the QR code now shown on the page) on another device to join the same room. Bus is now sharded per room. Rooms are created implicitly on first subscribe and GC'd 5 minutes after the last subscriber leaves. No accounts, no persistence, no server-side state beyond the in-memory bus map. Server: - New endpoints: /, /r/<id>, /api/send?room=, /api/stream?room= - Room manager with lazy creation + idle GC - Metrics now labelled by room - New gauge tether_active_rooms Client (Go): - -room flag (accepts bare id OR full /r/<id> URL — paste-friendly) - All API calls now scope to the room - The always-on ct210-rtc-peer systemd unit is disabled — sessions are user-initiated; the user runs tether-client with -room when they want their laptop in a particular session Browser (HTML): - Reads room from /r/<id> path - Shows QR code + URL + "copy link" button at top - "+ new session" link in header to start a fresh room Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
447 lines
11 KiB
Go
447 lines
11 KiB
Go
// tether-client v0.4: SSE listener with optional WebRTC mesh peer.
|
|
//
|
|
// Default flow (SSE only): subscribe to /api/stream, write incoming
|
|
// clipboard messages to the OS clipboard. Works on Win/Linux/macOS.
|
|
//
|
|
// With -rtc: become a WebRTC peer that maintains one RTCPeerConnection
|
|
// per active browser (mesh, not star). Symmetric "presence" chirps on
|
|
// the signaling bus let participants discover each other and
|
|
// auto-upgrade SSE → direct DataChannel. After pairing, clipboard text
|
|
// flows direct peer-to-peer with DTLS encryption.
|
|
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"crypto/rand"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/atotto/clipboard"
|
|
"github.com/pion/webrtc/v4"
|
|
)
|
|
|
|
// peerID — each client instance gets a stable random id for the bus.
|
|
var peerID = "tether-client-" + randomID(6)
|
|
|
|
func randomID(n int) string {
|
|
b := make([]byte, n)
|
|
_, _ = rand.Read(b)
|
|
return hex.EncodeToString(b)
|
|
}
|
|
|
|
func defaultLabel(role string) string {
|
|
return fmt.Sprintf("%s-%s-%s", runtime.GOOS, "sse", role)
|
|
}
|
|
|
|
// Message envelope (must match server).
|
|
type Message struct {
|
|
Type string `json:"type,omitempty"`
|
|
Text string `json:"text,omitempty"`
|
|
Signal json.RawMessage `json:"signal,omitempty"`
|
|
From string `json:"from,omitempty"`
|
|
To string `json:"to,omitempty"`
|
|
Role string `json:"role,omitempty"`
|
|
Source string `json:"source,omitempty"`
|
|
Room string `json:"room,omitempty"`
|
|
TS int64 `json:"ts"`
|
|
}
|
|
|
|
// Signal payload carried in Message.Signal.
|
|
type SignalPayload struct {
|
|
Kind string `json:"kind"` // "offer" | "answer" | "ice"
|
|
SDP *webrtc.SessionDescription `json:"sdp,omitempty"`
|
|
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
|
|
}
|
|
|
|
var (
|
|
noClipboard bool
|
|
myLabel string
|
|
useRTC bool
|
|
rtcServer string
|
|
myRoom string
|
|
)
|
|
|
|
func main() {
|
|
server := flag.String("server", "https://tether.pecord.io", "tether-server base URL")
|
|
label := flag.String("label", "", "X-Tether-Client label (default: <os>-sse-<role>)")
|
|
sendText := flag.String("send", "", "send this text and exit (otherwise listen)")
|
|
roomFlag := flag.String("room", "", "room id to join (or full URL like https://tether.pecord.io/r/<id>)")
|
|
flag.BoolVar(&noClipboard, "no-clipboard", false, "don't write incoming messages to the OS clipboard")
|
|
flag.BoolVar(&useRTC, "rtc", false, "act as a WebRTC mesh peer")
|
|
flag.Parse()
|
|
|
|
// Accept either bare id or full URL
|
|
myRoom = *roomFlag
|
|
if strings.Contains(myRoom, "/r/") {
|
|
parts := strings.SplitN(myRoom, "/r/", 2)
|
|
if len(parts) == 2 {
|
|
myRoom = strings.SplitN(parts[1], "/", 2)[0]
|
|
}
|
|
}
|
|
if myRoom == "" {
|
|
fmt.Fprintln(os.Stderr, "tether-client: -room is required (e.g. -room abc12345 or paste the /r/<id> URL)")
|
|
os.Exit(2)
|
|
}
|
|
|
|
if *label == "" {
|
|
if *sendText != "" {
|
|
*label = defaultLabel("sender")
|
|
} else if useRTC {
|
|
*label = fmt.Sprintf("%s-rtc-listener", runtime.GOOS)
|
|
} else {
|
|
*label = defaultLabel("listener")
|
|
}
|
|
}
|
|
myLabel = *label
|
|
rtcServer = *server
|
|
|
|
if *sendText != "" {
|
|
send(*server, *label, *sendText, "clipboard", nil)
|
|
return
|
|
}
|
|
|
|
if !noClipboard && clipboard.Unsupported {
|
|
fmt.Fprintln(os.Stderr, "tether-client: OS clipboard unsupported on this platform; falling back to stdout-only")
|
|
noClipboard = true
|
|
}
|
|
|
|
if useRTC {
|
|
go presenceChirpLoop(*server)
|
|
}
|
|
|
|
for {
|
|
if err := listen(*server, *label); err != nil {
|
|
log.Printf("stream error: %v — reconnecting in 3s", err)
|
|
time.Sleep(3 * time.Second)
|
|
}
|
|
}
|
|
}
|
|
|
|
func send(server, label, text, msgType string, signal json.RawMessage) {
|
|
m := Message{Type: msgType, Text: text, Source: label, From: peerID, Signal: signal}
|
|
postMessage(server, label, m)
|
|
}
|
|
|
|
func sendMessage(server, label string, m Message) {
|
|
if m.From == "" {
|
|
m.From = peerID
|
|
}
|
|
if m.Source == "" {
|
|
m.Source = label
|
|
}
|
|
postMessage(server, label, m)
|
|
}
|
|
|
|
func postMessage(server, label string, m Message) {
|
|
if m.Room == "" {
|
|
m.Room = myRoom
|
|
}
|
|
body, _ := json.Marshal(m)
|
|
req, _ := http.NewRequest("POST", server+"/api/send?room="+myRoom, bytes.NewReader(body))
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set("X-Tether-Source", label)
|
|
r, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
log.Printf("send: %v", err)
|
|
return
|
|
}
|
|
defer r.Body.Close()
|
|
if r.StatusCode >= 300 {
|
|
log.Printf("send: HTTP %d", r.StatusCode)
|
|
return
|
|
}
|
|
if m.Type == "clipboard" && m.Text != "" {
|
|
fmt.Println("sent.")
|
|
}
|
|
}
|
|
|
|
// listen subscribes to the SSE stream and dispatches messages to handlers.
|
|
func listen(server, label string) error {
|
|
req, _ := http.NewRequest("GET", server+"/api/stream?room="+myRoom, nil)
|
|
req.Header.Set("X-Tether-Client", label)
|
|
r, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer r.Body.Close()
|
|
if r.StatusCode != 200 {
|
|
return fmt.Errorf("HTTP %d", r.StatusCode)
|
|
}
|
|
fmt.Fprintf(os.Stderr, "tether-client: connected to %s as %q (rtc=%v, peerID=%s)\n",
|
|
server, label, useRTC, peerID)
|
|
sc := bufio.NewScanner(r.Body)
|
|
sc.Buffer(make([]byte, 1<<20), 1<<20)
|
|
var ev, data string
|
|
for sc.Scan() {
|
|
line := sc.Text()
|
|
switch {
|
|
case strings.HasPrefix(line, "event: "):
|
|
ev = strings.TrimPrefix(line, "event: ")
|
|
case strings.HasPrefix(line, "data: "):
|
|
data = strings.TrimPrefix(line, "data: ")
|
|
case line == "":
|
|
if data != "" {
|
|
var m Message
|
|
if err := json.Unmarshal([]byte(data), &m); err == nil {
|
|
handleMessage(ev, m)
|
|
}
|
|
}
|
|
ev, data = "", ""
|
|
}
|
|
}
|
|
if err := sc.Err(); err != nil && err != io.EOF {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handleMessage(ev string, m Message) {
|
|
// Ignore our own messages (echo).
|
|
if m.From == peerID {
|
|
return
|
|
}
|
|
switch m.Type {
|
|
case "", "clipboard":
|
|
ts := time.UnixMilli(m.TS).Format("15:04:05")
|
|
fmt.Printf("\n────── %s from %s ──────\n%s\n", ts, m.Source, m.Text)
|
|
if !noClipboard {
|
|
if err := clipboard.WriteAll(m.Text); err != nil {
|
|
fmt.Fprintf(os.Stderr, " ! clipboard write error: %v\n", err)
|
|
} else {
|
|
fmt.Fprintln(os.Stderr, " → clipboard updated")
|
|
}
|
|
}
|
|
case "presence":
|
|
if useRTC {
|
|
onPresence(m)
|
|
}
|
|
case "signal":
|
|
if useRTC {
|
|
onSignal(m)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── WebRTC mesh ─────────────────────────────────────────────────────────────
|
|
|
|
type remotePeer struct {
|
|
pc *webrtc.PeerConnection
|
|
dc *webrtc.DataChannel
|
|
lastSeen time.Time
|
|
state string // last connection state
|
|
}
|
|
|
|
var (
|
|
peers = make(map[string]*remotePeer) // keyed by remote peerID
|
|
peersMu sync.Mutex
|
|
)
|
|
|
|
func getPeer(id string) (*remotePeer, bool) {
|
|
peersMu.Lock()
|
|
defer peersMu.Unlock()
|
|
p, ok := peers[id]
|
|
return p, ok
|
|
}
|
|
|
|
func setPeer(id string, p *remotePeer) {
|
|
peersMu.Lock()
|
|
defer peersMu.Unlock()
|
|
peers[id] = p
|
|
}
|
|
|
|
func removePeer(id string) {
|
|
peersMu.Lock()
|
|
defer peersMu.Unlock()
|
|
if p, ok := peers[id]; ok {
|
|
if p.pc != nil {
|
|
_ = p.pc.Close()
|
|
}
|
|
delete(peers, id)
|
|
}
|
|
}
|
|
|
|
// presenceChirpLoop broadcasts our own presence every 10s and sweeps stale peers.
|
|
func presenceChirpLoop(server string) {
|
|
announce := func() {
|
|
sendMessage(server, myLabel, Message{
|
|
Type: "presence",
|
|
Role: "peer",
|
|
From: peerID,
|
|
})
|
|
}
|
|
announce()
|
|
t := time.NewTicker(10 * time.Second)
|
|
sweep := time.NewTicker(15 * time.Second)
|
|
defer t.Stop()
|
|
defer sweep.Stop()
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
announce()
|
|
case <-sweep.C:
|
|
peersMu.Lock()
|
|
for id, p := range peers {
|
|
if time.Since(p.lastSeen) > 45*time.Second {
|
|
fmt.Fprintf(os.Stderr, "rtc: peer %s timed out — tearing down\n", id[:8])
|
|
if p.pc != nil {
|
|
_ = p.pc.Close()
|
|
}
|
|
delete(peers, id)
|
|
}
|
|
}
|
|
peersMu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
// onPresence: a remote participant announced themselves. If we don't yet
|
|
// have a peer connection to them, create one + send a targeted offer.
|
|
func onPresence(m Message) {
|
|
if m.From == "" || m.From == peerID {
|
|
return
|
|
}
|
|
p, exists := getPeer(m.From)
|
|
if exists {
|
|
// just refresh lastSeen
|
|
peersMu.Lock()
|
|
p.lastSeen = time.Now()
|
|
peersMu.Unlock()
|
|
return
|
|
}
|
|
// New peer — initiate
|
|
fmt.Fprintf(os.Stderr, "rtc: discovered new peer %s (role=%s) — sending offer\n",
|
|
m.From[:min(len(m.From), 8)], m.Role)
|
|
initiateOffer(m.From)
|
|
}
|
|
|
|
func initiateOffer(remoteID string) {
|
|
api := webrtc.NewAPI()
|
|
pc, err := api.NewPeerConnection(webrtc.Configuration{
|
|
ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.l.google.com:19302"}}},
|
|
})
|
|
if err != nil {
|
|
log.Printf("rtc: new pc for %s: %v", remoteID[:8], err)
|
|
return
|
|
}
|
|
dc, err := pc.CreateDataChannel("tether", nil)
|
|
if err != nil {
|
|
log.Printf("rtc: dc for %s: %v", remoteID[:8], err)
|
|
_ = pc.Close()
|
|
return
|
|
}
|
|
|
|
rp := &remotePeer{pc: pc, dc: dc, lastSeen: time.Now()}
|
|
setPeer(remoteID, rp)
|
|
|
|
dc.OnOpen(func() {
|
|
fmt.Fprintf(os.Stderr, "rtc: DataChannel OPEN with %s — P2P live\n", remoteID[:8])
|
|
})
|
|
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
|
|
text := string(msg.Data)
|
|
fmt.Printf("\n[rtc:%s] %s\n", remoteID[:8], text)
|
|
if !noClipboard {
|
|
if err := clipboard.WriteAll(text); err == nil {
|
|
fmt.Fprintln(os.Stderr, " → clipboard updated (via rtc)")
|
|
}
|
|
}
|
|
})
|
|
pc.OnICECandidate(func(c *webrtc.ICECandidate) {
|
|
if c == nil {
|
|
return
|
|
}
|
|
init := c.ToJSON()
|
|
payload, _ := json.Marshal(SignalPayload{Kind: "ice", Candidate: &init})
|
|
sendMessage(rtcServer, myLabel, Message{
|
|
Type: "signal",
|
|
From: peerID,
|
|
To: remoteID,
|
|
Signal: payload,
|
|
})
|
|
})
|
|
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
|
|
peersMu.Lock()
|
|
if p, ok := peers[remoteID]; ok {
|
|
p.state = s.String()
|
|
}
|
|
peersMu.Unlock()
|
|
fmt.Fprintf(os.Stderr, "rtc: %s state=%s\n", remoteID[:8], s)
|
|
if s == webrtc.PeerConnectionStateFailed || s == webrtc.PeerConnectionStateClosed {
|
|
removePeer(remoteID)
|
|
}
|
|
})
|
|
|
|
offer, err := pc.CreateOffer(nil)
|
|
if err != nil {
|
|
log.Printf("rtc: create offer for %s: %v", remoteID[:8], err)
|
|
return
|
|
}
|
|
if err := pc.SetLocalDescription(offer); err != nil {
|
|
log.Printf("rtc: set local desc for %s: %v", remoteID[:8], err)
|
|
return
|
|
}
|
|
payload, _ := json.Marshal(SignalPayload{Kind: "offer", SDP: &offer})
|
|
sendMessage(rtcServer, myLabel, Message{
|
|
Type: "signal",
|
|
From: peerID,
|
|
To: remoteID,
|
|
Signal: payload,
|
|
})
|
|
}
|
|
|
|
// onSignal: routed to the relevant peer connection by `To` field.
|
|
func onSignal(m Message) {
|
|
// Only handle signals targeted at us (or untargeted = legacy).
|
|
if m.To != "" && m.To != peerID {
|
|
return
|
|
}
|
|
var sp SignalPayload
|
|
if err := json.Unmarshal(m.Signal, &sp); err != nil {
|
|
return
|
|
}
|
|
p, ok := getPeer(m.From)
|
|
if !ok {
|
|
// Could be an answer/ice from someone we don't know about yet —
|
|
// shouldn't happen in normal flow because they wouldn't answer
|
|
// without first seeing our offer. Drop.
|
|
return
|
|
}
|
|
switch sp.Kind {
|
|
case "answer":
|
|
if sp.SDP != nil {
|
|
if err := p.pc.SetRemoteDescription(*sp.SDP); err != nil {
|
|
if !strings.Contains(err.Error(), "stable->SetRemote(answer)->stable") {
|
|
log.Printf("rtc: set remote desc for %s: %v", m.From[:8], err)
|
|
}
|
|
} else {
|
|
fmt.Fprintf(os.Stderr, "rtc: answer applied for %s\n", m.From[:8])
|
|
}
|
|
}
|
|
case "ice":
|
|
if sp.Candidate != nil {
|
|
if err := p.pc.AddICECandidate(*sp.Candidate); err != nil {
|
|
if !strings.Contains(err.Error(), "remote description is not set") {
|
|
log.Printf("rtc: add ice for %s: %v", m.From[:8], err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func min(a, b int) int {
|
|
if a < b {
|
|
return a
|
|
}
|
|
return b
|
|
}
|