Files
tether/client/main.go
Claude Opus 4.7 7e63ffd357 client: chirp offer every 5s while unpaired
Solves the late-subscriber problem — browsers that load the page after
the peer's startup offer would never see one. Now the peer re-broadcasts
the offer every 5 seconds until the DataChannel opens, then stops.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 01:03:06 -05:00

294 lines
7.8 KiB
Go

// tether-client v0.3: SSE listener with optional WebRTC peer.
//
// Default flow (SSE only): subscribe to /api/stream, write incoming
// clipboard messages to the OS clipboard. Works on Win/Linux/macOS.
//
// With -rtc: also act as a WebRTC peer (Pion). Sends an SDP offer via
// the signaling bus, accepts the browser's answer, then receives
// clipboard payloads over a DataChannel — true P2P after ICE.
package main
import (
"bufio"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"runtime"
"strings"
"sync"
"time"
"github.com/atotto/clipboard"
"github.com/pion/webrtc/v4"
)
const peerID = "tether-client"
func defaultLabel(role string) string {
return fmt.Sprintf("%s-%s-%s", runtime.GOOS, "sse", role)
}
// Message envelope (must match server).
type Message struct {
Type string `json:"type,omitempty"`
Text string `json:"text,omitempty"`
Signal json.RawMessage `json:"signal,omitempty"`
From string `json:"from,omitempty"`
Source string `json:"source,omitempty"`
TS int64 `json:"ts"`
}
// Signal payload carried in Message.Signal.
type SignalPayload struct {
Kind string `json:"kind"` // "offer" | "answer" | "ice"
SDP *webrtc.SessionDescription `json:"sdp,omitempty"`
Candidate *webrtc.ICECandidateInit `json:"candidate,omitempty"`
}
var (
noClipboard bool
myLabel string
useRTC bool
)
func main() {
server := flag.String("server", "https://tether.pecord.io", "tether-server base URL")
label := flag.String("label", "", "X-Tether-Client label (default: <os>-sse-<role>)")
sendText := flag.String("send", "", "send this text and exit (otherwise listen)")
flag.BoolVar(&noClipboard, "no-clipboard", false, "don't write incoming messages to the OS clipboard")
flag.BoolVar(&useRTC, "rtc", false, "enable WebRTC peer (uses signaling bus to negotiate)")
flag.Parse()
if *label == "" {
if *sendText != "" {
*label = defaultLabel("sender")
} else if useRTC {
*label = fmt.Sprintf("%s-rtc-listener", runtime.GOOS)
} else {
*label = defaultLabel("listener")
}
}
myLabel = *label
if *sendText != "" {
send(*server, *label, *sendText, "clipboard", nil)
return
}
if !noClipboard && clipboard.Unsupported {
fmt.Fprintln(os.Stderr, "tether-client: OS clipboard unsupported on this platform; falling back to stdout-only")
noClipboard = true
}
if useRTC {
go runRTCPeer(*server)
}
for {
if err := listen(*server, *label); err != nil {
log.Printf("stream error: %v — reconnecting in 3s", err)
time.Sleep(3 * time.Second)
}
}
}
func send(server, label, text, msgType string, signal json.RawMessage) {
m := Message{Type: msgType, Text: text, Source: label, From: peerID, Signal: signal}
body, _ := json.Marshal(m)
req, _ := http.NewRequest("POST", server+"/api/send", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Tether-Source", label)
r, err := http.DefaultClient.Do(req)
if err != nil {
log.Printf("send: %v", err)
return
}
defer r.Body.Close()
if r.StatusCode >= 300 {
log.Printf("send: HTTP %d", r.StatusCode)
return
}
if msgType != "signal" {
fmt.Println("sent.")
}
}
// listen subscribes to the SSE stream. Clipboard messages → OS clipboard.
// Signal messages → forwarded to the WebRTC peer (if enabled).
func listen(server, label string) error {
req, _ := http.NewRequest("GET", server+"/api/stream", nil)
req.Header.Set("X-Tether-Client", label)
r, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != 200 {
return fmt.Errorf("HTTP %d", r.StatusCode)
}
fmt.Fprintf(os.Stderr, "tether-client: connected to %s as %q (rtc=%v)\n", server, label, useRTC)
sc := bufio.NewScanner(r.Body)
sc.Buffer(make([]byte, 1<<20), 1<<20)
var ev, data string
for sc.Scan() {
line := sc.Text()
switch {
case strings.HasPrefix(line, "event: "):
ev = strings.TrimPrefix(line, "event: ")
case strings.HasPrefix(line, "data: "):
data = strings.TrimPrefix(line, "data: ")
case line == "":
if data != "" {
var m Message
if err := json.Unmarshal([]byte(data), &m); err == nil {
handleMessage(ev, m)
}
}
ev, data = "", ""
}
}
if err := sc.Err(); err != nil && err != io.EOF {
return err
}
return nil
}
func handleMessage(ev string, m Message) {
switch m.Type {
case "", "clipboard":
ts := time.UnixMilli(m.TS).Format("15:04:05")
fmt.Printf("\n────── %s from %s ──────\n%s\n", ts, m.Source, m.Text)
if !noClipboard && m.Source != myLabel {
if err := clipboard.WriteAll(m.Text); err != nil {
fmt.Fprintf(os.Stderr, " ! clipboard write error: %v\n", err)
} else {
fmt.Fprintln(os.Stderr, " → clipboard updated")
}
}
case "signal":
if useRTC && m.From != peerID {
incomingSignal <- m
}
}
}
// ── WebRTC ────────────────────────────────────────────────────────────────
var (
incomingSignal = make(chan Message, 16)
rtcConnected = make(chan struct{})
)
func runRTCPeer(server string) {
api := webrtc.NewAPI()
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}},
},
}
pc, err := api.NewPeerConnection(config)
if err != nil {
log.Printf("rtc: new peer connection: %v", err)
return
}
dc, err := pc.CreateDataChannel("tether", nil)
if err != nil {
log.Printf("rtc: create datachannel: %v", err)
return
}
var once sync.Once
dc.OnOpen(func() {
fmt.Fprintln(os.Stderr, "rtc: DataChannel OPEN — P2P live")
once.Do(func() { close(rtcConnected) })
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
text := string(msg.Data)
fmt.Printf("\n[rtc] %s\n", text)
if !noClipboard {
if err := clipboard.WriteAll(text); err == nil {
fmt.Fprintln(os.Stderr, " → clipboard updated (via rtc)")
}
}
})
pc.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
init := c.ToJSON()
payload, _ := json.Marshal(SignalPayload{Kind: "ice", Candidate: &init})
send(server, myLabel, "", "signal", payload)
})
pc.OnConnectionStateChange(func(s webrtc.PeerConnectionState) {
fmt.Fprintf(os.Stderr, "rtc: state=%s\n", s)
})
// Create offer + post once immediately
offer, err := pc.CreateOffer(nil)
if err != nil {
log.Printf("rtc: create offer: %v", err)
return
}
if err := pc.SetLocalDescription(offer); err != nil {
log.Printf("rtc: set local desc: %v", err)
return
}
payload, _ := json.Marshal(SignalPayload{Kind: "offer", SDP: &offer})
send(server, myLabel, "", "signal", payload)
fmt.Fprintln(os.Stderr, "rtc: offer posted, will chirp every 5s until paired...")
// "Chirp": re-post the offer every 5s until DataChannel opens. Catches
// late-joining browsers without needing server-side history of signals.
go func() {
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-rtcConnected:
fmt.Fprintln(os.Stderr, "rtc: paired — chirping stopped")
return
case <-t.C:
p, _ := json.Marshal(SignalPayload{Kind: "offer", SDP: &offer})
send(server, myLabel, "", "signal", p)
}
}
}()
// Process incoming signals
for {
select {
case msg := <-incomingSignal:
var sp SignalPayload
if err := json.Unmarshal(msg.Signal, &sp); err != nil {
continue
}
switch sp.Kind {
case "answer":
if sp.SDP != nil {
if err := pc.SetRemoteDescription(*sp.SDP); err != nil {
log.Printf("rtc: set remote desc: %v", err)
} else {
fmt.Fprintln(os.Stderr, "rtc: answer applied")
}
}
case "ice":
if sp.Candidate != nil {
if err := pc.AddICECandidate(*sp.Candidate); err != nil {
log.Printf("rtc: add ice: %v", err)
}
}
}
case <-context.Background().Done():
return
}
}
}