package kvm import ( "fmt" "io" "strings" "sync/atomic" "time" "github.com/pion/webrtc/v4" "go.bug.st/serial" ) /* ---------- SINK (terminal output) ---------- */ type Sink interface { SendText(s string) error } type dataChannelSink struct{ dataChannel *webrtc.DataChannel } func (sink dataChannelSink) SendText(str string) error { return sink.dataChannel.SendText(str) } /* ---------- NORMALIZATION (applies to RX & TX) ---------- */ type NormalizeMode int const ( ModeCaret NormalizeMode = iota // ^C ^M ^? ModeNames // , , , … ModeHex // \x1B ) type CRLFMode int const ( CRLFAsIs CRLFMode = iota CRLF_LF CRLF_CR CRLF_CRLF CRLF_LFCR ) type NormOptions struct { Mode NormalizeMode CRLF CRLFMode TabRender string // e.g. " " or "" to keep '\t' PreserveANSI bool ShowNLTag bool // <- NEW: also print a visible tag for CR/LF } func normalize(in []byte, opt NormOptions) string { var out strings.Builder esc := byte(0x1B) for i := 0; i < len(in); { b := in[i] // ANSI preservation (CSI/OSC) if opt.PreserveANSI && b == esc && i+1 < len(in) { if in[i+1] == '[' { // CSI j := i + 2 for j < len(in) { c := in[j] if c >= 0x40 && c <= 0x7E { j++ break } j++ } out.Write(in[i:j]) i = j continue } else if in[i+1] == ']' { // OSC ... BEL or ST j := i + 2 for j < len(in) { if in[j] == 0x07 { j++ break } // BEL if j+1 < len(in) && in[j] == esc && in[j+1] == '\\' { j += 2 break } // ST j++ } out.Write(in[i:j]) i = j continue } } // CR/LF normalization (emit real newline(s), optionally tag them visibly) if b == '\r' || b == '\n' { // detect pair (CRLF or LFCR) isPair := i+1 < len(in) && ((b == '\r' && in[i+1] == '\n') || (b == '\n' && in[i+1] == '\r')) // optional visible tag of what we *saw* if opt.ShowNLTag { if isPair { if b == '\r' { // saw CRLF out.WriteString("") } else { // saw LFCR out.WriteString("") } } else { if b == '\r' { out.WriteString("") } else { out.WriteString("") } } } // now emit the actual newline(s) per the normalization mode switch opt.CRLF { case CRLFAsIs: if isPair { out.WriteByte(b) out.WriteByte(in[i+1]) i += 2 } else { out.WriteByte(b) i++ } case CRLF_LF: if isPair { i += 2 } else { i++ } out.WriteByte('\n') case CRLF_CR: if isPair { i += 2 } else { i++ } out.WriteByte('\r') case CRLF_CRLF: if isPair { i += 2 } else { i++ } out.WriteString("\r\n") // (fixed to actually write CRLF) case CRLF_LFCR: if isPair { i += 2 } else { i++ } out.WriteString("\n\r") } continue } // Tabs if b == '\t' { if opt.TabRender != "" { out.WriteString(opt.TabRender) } else { out.WriteByte('\t') } i++ continue } // Controls if b < 0x20 || b == 0x7F { switch opt.Mode { case ModeCaret: if b == 0x7F { out.WriteString("^?") } else { out.WriteByte('^') out.WriteByte(byte('@' + b)) } case ModeNames: names := map[byte]string{ 0: "NUL", 1: "SOH", 2: "STX", 3: "ETX", 4: "EOT", 5: "ENQ", 6: "ACK", 7: "BEL", 8: "BS", 9: "TAB", 10: "LF", 11: "VT", 12: "FF", 13: "CR", 14: "SO", 15: "SI", 16: "DLE", 17: "DC1", 18: "DC2", 19: "DC3", 20: "DC4", 21: "NAK", 22: "SYN", 23: "ETB", 24: "CAN", 25: "EM", 26: "SUB", 27: "ESC", 28: "FS", 29: "GS", 30: "RS", 31: "US", 127: "DEL", } if n, ok := names[b]; ok { out.WriteString("<" + n + ">") } else { out.WriteString(fmt.Sprintf("0x%02X", b)) } case ModeHex: out.WriteString(fmt.Sprintf("\\x%02X", b)) } i++ continue } out.WriteByte(b) i++ } return out.String() } /* ---------- CONSOLE BROKER (ordering + normalization + RX/TX) ---------- */ type consoleEventKind int const ( evRX consoleEventKind = iota evTX // local echo after a successful write ) type consoleEvent struct { kind consoleEventKind data []byte } type ConsoleBroker struct { sink Sink in chan consoleEvent done chan struct{} // pause control terminalPaused bool pauseCh chan bool // buffered output while paused bufLines []string bufBytes int maxBufLines int maxBufBytes int // line-aware echo rxAtLineEnd bool txLineActive bool // true if we’re mid-line (prefix already written) pendingTX *consoleEvent quietTimer *time.Timer quietAfter time.Duration // normalization norm NormOptions // labels labelRX string labelTX string } func NewConsoleBroker(s Sink, norm NormOptions) *ConsoleBroker { return &ConsoleBroker{ sink: s, in: make(chan consoleEvent, 256), done: make(chan struct{}), pauseCh: make(chan bool, 8), terminalPaused: false, rxAtLineEnd: true, txLineActive: false, quietAfter: 120 * time.Millisecond, norm: norm, labelRX: "RX", labelTX: "TX", // reasonable defaults; tweak as you like maxBufLines: 5000, maxBufBytes: 1 << 20, // 1 MiB } } func (b *ConsoleBroker) Start() { go b.loop() } func (b *ConsoleBroker) Close() { close(b.done) } func (b *ConsoleBroker) SetSink(s Sink) { b.sink = s } func (b *ConsoleBroker) SetNormOptions(norm NormOptions) { b.norm = norm } func (b *ConsoleBroker) SetTerminalPaused(v bool) { if b == nil { return } // send to broker loop to avoid data races select { case b.pauseCh <- v: default: b.pauseCh <- v } } func (b *ConsoleBroker) Enqueue(ev consoleEvent) { b.in <- ev // blocking is fine; adjust if you want drop semantics } func (b *ConsoleBroker) loop() { scopedLogger := serialLogger.With().Str("service", "Serial Console Broker").Logger() for { select { case <-b.done: return case v := <-b.pauseCh: // apply pause state was := b.terminalPaused b.terminalPaused = v if was && !v { // we just unpaused: flush buffered output in order scopedLogger.Info().Msg("Terminal unpaused; flushing buffered output") b.flushBuffer() } else if !was && v { scopedLogger.Info().Msg("Terminal paused; buffering output") } case ev := <-b.in: switch ev.kind { case evRX: scopedLogger.Info().Msg("Processing RX data from serial port") b.handleRX(ev.data) case evTX: scopedLogger.Info().Msg("Processing TX echo request") b.handleTX(ev.data) } case <-b.quietCh(): if b.pendingTX != nil { b.emitToTerminal(b.lineSep()) // use CRLF policy b.flushPendingTX() b.rxAtLineEnd = true b.txLineActive = false } } } } func (b *ConsoleBroker) quietCh() <-chan time.Time { if b.quietTimer != nil { return b.quietTimer.C } return make(<-chan time.Time) } func (b *ConsoleBroker) startQuietTimer() { if b.quietTimer == nil { b.quietTimer = time.NewTimer(b.quietAfter) } else { b.quietTimer.Reset(b.quietAfter) } } func (b *ConsoleBroker) stopQuietTimer() { if b.quietTimer != nil { if !b.quietTimer.Stop() { select { case <-b.quietTimer.C: default: } } } } func (b *ConsoleBroker) handleRX(data []byte) { scopedLogger := serialLogger.With().Str("service", "Serial Console Broker RX handler").Logger() if b.sink == nil || len(data) == 0 { return } // If we’re mid TX line, end it before RX if b.txLineActive { b.emitToTerminal(b.lineSep()) b.txLineActive = false } text := normalize(data, b.norm) if text == "" { return } scopedLogger.Info().Msg("Emitting RX data to sink (with per-line prefixes)") // Prefix every line, regardless of how the EOLs look lines := splitAfterAnyEOL(text, b.norm.CRLF) // Start from the broker's current RX line state atLineEnd := b.rxAtLineEnd for _, line := range lines { if line == "" { continue } if atLineEnd { // New physical line -> prefix with RX: b.emitToTerminal(fmt.Sprintf("%s: %s", b.labelRX, line)) } else { // Continuation of previous RX line -> no extra RX: prefix b.emitToTerminal(line) } // Update line-end state based on this piece atLineEnd = endsWithEOL(line, b.norm.CRLF) } // Persist state for next RX chunk b.rxAtLineEnd = atLineEnd if b.pendingTX != nil && b.rxAtLineEnd { b.flushPendingTX() b.stopQuietTimer() } } func (b *ConsoleBroker) handleTX(data []byte) { scopedLogger := serialLogger.With().Str("service", "Serial Console Broker TX handler").Logger() if b.sink == nil || len(data) == 0 { return } if b.rxAtLineEnd && b.pendingTX == nil { scopedLogger.Info().Msg("Emitting TX data to sink immediately") b.emitTX(data) return } scopedLogger.Info().Msg("Queuing TX data to emit after RX line completion or quiet period") b.pendingTX = &consoleEvent{kind: evTX, data: append([]byte(nil), data...)} b.startQuietTimer() } func (b *ConsoleBroker) emitTX(data []byte) { scopedLogger := serialLogger.With().Str("service", "Serial Console Broker TX emiter").Logger() if len(data) == 0 { return } text := normalize(data, b.norm) if text == "" { return } // Check if we’re in the middle of a TX line if !b.txLineActive { // Start new TX line with prefix scopedLogger.Info().Msg("Emitting TX data to sink with prefix") b.emitToTerminal(fmt.Sprintf("%s: %s", b.labelTX, text)) b.txLineActive = true } else { // Continue current line (no prefix) scopedLogger.Info().Msg("Emitting TX data to sink without prefix") b.emitToTerminal(text) } // If the data ends with a newline, mark TX line as complete if strings.HasSuffix(text, "\r") || strings.HasSuffix(text, "\n") { b.txLineActive = false } } func (b *ConsoleBroker) flushPendingTX() { if b.pendingTX == nil { return } b.emitTX(b.pendingTX.data) b.pendingTX = nil b.txLineActive = false } func (b *ConsoleBroker) lineSep() string { switch b.norm.CRLF { case CRLF_CRLF: return "\r\n" case CRLF_LFCR: return "\n\r" case CRLF_CR: return "\r" case CRLF_LF: return "\n" default: return "\n" } } // splitAfterAnyEOL splits text into lines keeping the EOL with each piece. // For CRLFAsIs it treats \r, \n, \r\n, and \n\r as EOLs. // For other modes it uses the normalized separator. func splitAfterAnyEOL(text string, mode CRLFMode) []string { if text == "" { return nil } // Fast path for normalized modes switch mode { case CRLF_LF: return strings.SplitAfter(text, "\n") case CRLF_CR: return strings.SplitAfter(text, "\r") case CRLF_CRLF: return strings.SplitAfter(text, "\r\n") case CRLF_LFCR: return strings.SplitAfter(text, "\n\r") } // CRLFAsIs: scan bytes and treat \r, \n, \r\n, \n\r as one boundary b := []byte(text) var parts []string start := 0 for i := 0; i < len(b); i++ { if b[i] == '\r' || b[i] == '\n' { j := i + 1 // coalesce pair if the next is the "other" newline if j < len(b) && ((b[i] == '\r' && b[j] == '\n') || (b[i] == '\n' && b[j] == '\r')) { j++ } parts = append(parts, string(b[start:j])) start = j i = j - 1 // advance past the EOL (or pair) } } if start < len(b) { parts = append(parts, string(b[start:])) } return parts } func endsWithEOL(s string, mode CRLFMode) bool { if s == "" { return false } switch mode { case CRLF_CRLF: return strings.HasSuffix(s, "\r\n") case CRLF_LFCR: return strings.HasSuffix(s, "\n\r") case CRLF_LF: return strings.HasSuffix(s, "\n") case CRLF_CR: return strings.HasSuffix(s, "\r") default: // AsIs: any of \r, \n, \r\n, \n\r return strings.HasSuffix(s, "\r\n") || strings.HasSuffix(s, "\n\r") || strings.HasSuffix(s, "\n") || strings.HasSuffix(s, "\r") } } func (b *ConsoleBroker) emitToTerminal(s string) { if b.sink == nil || s == "" { return } if b.terminalPaused { b.enqueueBuffered(s) return } _ = b.sink.SendText(s) } func (b *ConsoleBroker) enqueueBuffered(s string) { b.bufLines = append(b.bufLines, s) b.bufBytes += len(s) // trim if over limits (drop oldest) for b.bufBytes > b.maxBufBytes || len(b.bufLines) > b.maxBufLines { if len(b.bufLines) == 0 { break } b.bufBytes -= len(b.bufLines[0]) b.bufLines = b.bufLines[1:] } } func (b *ConsoleBroker) flushBuffer() { if b.sink == nil || len(b.bufLines) == 0 { b.bufLines = nil b.bufBytes = 0 return } for _, s := range b.bufLines { _ = b.sink.SendText(s) } b.bufLines = nil b.bufBytes = 0 } /* ---------- SERIAL MUX (single reader/writer, emits to broker) ---------- */ type txFrame struct { payload []byte // should include terminator already source string // "webrtc" | "button" echo bool // request TX echo (subject to global toggle) } type SerialMux struct { port serial.Port txQ chan txFrame done chan struct{} broker *ConsoleBroker echoEnabled atomic.Bool // controlled via SetEchoEnabled } func NewSerialMux(p serial.Port, broker *ConsoleBroker) *SerialMux { m := &SerialMux{ port: p, txQ: make(chan txFrame, 128), done: make(chan struct{}), broker: broker, } return m } func (m *SerialMux) Start() { go m.reader() go m.writer() } func (m *SerialMux) Close() { close(m.done) } func (m *SerialMux) SetEchoEnabled(v bool) { m.echoEnabled.Store(v) } func (m *SerialMux) Enqueue(payload []byte, source string, requestEcho bool) { serialLogger.Info().Str("src", source).Bool("echo", requestEcho).Msg("Enqueuing TX data to serial port") m.txQ <- txFrame{payload: append([]byte(nil), payload...), source: source, echo: requestEcho} } func (m *SerialMux) reader() { scopedLogger := serialLogger.With().Str("service", "SerialMux reader").Logger() buf := make([]byte, 4096) for { select { case <-m.done: return default: n, err := m.port.Read(buf) if err != nil { if err != io.EOF { serialLogger.Warn().Err(err).Msg("serial read failed") } time.Sleep(50 * time.Millisecond) continue } if n > 0 && m.broker != nil { scopedLogger.Info().Msg("Sending RX data to console broker") m.broker.Enqueue(consoleEvent{kind: evRX, data: append([]byte(nil), buf[:n]...)}) } } } } func (m *SerialMux) writer() { scopedLogger := serialLogger.With().Str("service", "SerialMux writer").Logger() for { select { case <-m.done: return case f := <-m.txQ: scopedLogger.Info().Msg("Writing TX data to serial port") if _, err := m.port.Write(f.payload); err != nil { scopedLogger.Warn().Err(err).Str("src", f.source).Msg("serial write failed") continue } // echo (if requested AND globally enabled) if f.echo && m.echoEnabled.Load() && m.broker != nil { scopedLogger.Info().Msg("Sending TX echo to console broker") m.broker.Enqueue(consoleEvent{kind: evTX, data: append([]byte(nil), f.payload...)}) } } } }