package audio import ( "context" "fmt" "sync/atomic" "time" "github.com/jetkvm/kvm/internal/logging" "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media" "github.com/rs/zerolog" ) type OutputRelay struct { source *AudioSource audioTrack *webrtc.TrackLocalStaticSample ctx context.Context cancel context.CancelFunc logger zerolog.Logger running atomic.Bool sample media.Sample stopped chan struct{} framesRelayed atomic.Uint32 framesDropped atomic.Uint32 } func NewOutputRelay(source *AudioSource, audioTrack *webrtc.TrackLocalStaticSample) *OutputRelay { ctx, cancel := context.WithCancel(context.Background()) logger := logging.GetDefaultLogger().With().Str("component", "audio-output-relay").Logger() return &OutputRelay{ source: source, audioTrack: audioTrack, ctx: ctx, cancel: cancel, logger: logger, stopped: make(chan struct{}), sample: media.Sample{ Duration: 20 * time.Millisecond, }, } } func (r *OutputRelay) Start() error { if r.running.Swap(true) { return fmt.Errorf("output relay already running") } go r.relayLoop() r.logger.Debug().Msg("output relay started") return nil } func (r *OutputRelay) Stop() { if !r.running.Swap(false) { return } r.cancel() <-r.stopped r.logger.Debug(). Uint32("frames_relayed", r.framesRelayed.Load()). Uint32("frames_dropped", r.framesDropped.Load()). Msg("output relay stopped") } func (r *OutputRelay) relayLoop() { defer close(r.stopped) const maxRetries = 10 const maxConsecutiveWriteFailures = 50 // Allow some WebRTC write failures before reconnecting retryDelay := 1 * time.Second consecutiveFailures := 0 consecutiveWriteFailures := 0 for r.running.Load() { // Connect if not connected if !(*r.source).IsConnected() { if err := (*r.source).Connect(); err != nil { if consecutiveFailures++; consecutiveFailures >= maxRetries { r.logger.Error().Int("failures", consecutiveFailures).Msg("Max retries exceeded, stopping relay") return } r.logger.Debug().Err(err).Int("failures", consecutiveFailures).Msg("Connection failed, retrying") time.Sleep(retryDelay) retryDelay = min(retryDelay*2, 30*time.Second) continue } consecutiveFailures = 0 retryDelay = 1 * time.Second } // Read message from source msgType, payload, err := (*r.source).ReadMessage() if err != nil { if !r.running.Load() { break } if consecutiveFailures++; consecutiveFailures >= maxRetries { r.logger.Error().Int("failures", consecutiveFailures).Msg("Max read retries exceeded, stopping relay") return } r.logger.Warn().Err(err).Int("failures", consecutiveFailures).Msg("Read error, reconnecting") (*r.source).Disconnect() time.Sleep(retryDelay) retryDelay = min(retryDelay*2, 30*time.Second) continue } // Reset retry state on successful read consecutiveFailures = 0 retryDelay = 1 * time.Second // Write audio sample to WebRTC if msgType == ipcMsgTypeOpus && len(payload) > 0 { r.sample.Data = payload if err := r.audioTrack.WriteSample(r.sample); err != nil { r.framesDropped.Add(1) consecutiveWriteFailures++ // Log warning on first failure and every 10th failure if consecutiveWriteFailures == 1 || consecutiveWriteFailures%10 == 0 { r.logger.Warn(). Err(err). Int("consecutive_failures", consecutiveWriteFailures). Msg("Failed to write sample to WebRTC") } // If too many consecutive write failures, reconnect source if consecutiveWriteFailures >= maxConsecutiveWriteFailures { r.logger.Error(). Int("failures", consecutiveWriteFailures). Msg("Too many consecutive WebRTC write failures, reconnecting source") (*r.source).Disconnect() consecutiveWriteFailures = 0 consecutiveFailures = 0 } } else { r.framesRelayed.Add(1) consecutiveWriteFailures = 0 // Reset on successful write } } } } type InputRelay struct { source *AudioSource ctx context.Context cancel context.CancelFunc logger zerolog.Logger running atomic.Bool } func NewInputRelay(source *AudioSource) *InputRelay { ctx, cancel := context.WithCancel(context.Background()) logger := logging.GetDefaultLogger().With().Str("component", "audio-input-relay").Logger() return &InputRelay{ source: source, ctx: ctx, cancel: cancel, logger: logger, } } func (r *InputRelay) Start() error { if r.running.Swap(true) { return fmt.Errorf("input relay already running") } r.logger.Debug().Msg("input relay started") return nil } func (r *InputRelay) Stop() { if !r.running.Swap(false) { return } r.cancel() r.logger.Debug().Msg("input relay stopped") }