kvm/internal/audio/relay.go

173 lines
4.2 KiB
Go

package audio
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/pion/webrtc/v4"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/rs/zerolog"
)
type OutputRelay struct {
source *AudioSource
audioTrack *webrtc.TrackLocalStaticSample
ctx context.Context
cancel context.CancelFunc
logger zerolog.Logger
running atomic.Bool
sample media.Sample
stopped chan struct{}
framesRelayed atomic.Uint32
framesDropped atomic.Uint32
}
func NewOutputRelay(source *AudioSource, audioTrack *webrtc.TrackLocalStaticSample) *OutputRelay {
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-relay").Logger()
return &OutputRelay{
source: source,
audioTrack: audioTrack,
ctx: ctx,
cancel: cancel,
logger: logger,
stopped: make(chan struct{}),
sample: media.Sample{
Duration: 20 * time.Millisecond,
},
}
}
func (r *OutputRelay) Start() error {
if r.running.Swap(true) {
return fmt.Errorf("output relay already running")
}
go r.relayLoop()
r.logger.Debug().Msg("output relay started")
return nil
}
func (r *OutputRelay) Stop() {
if !r.running.Swap(false) {
return
}
r.cancel()
<-r.stopped
r.logger.Debug().
Uint32("frames_relayed", r.framesRelayed.Load()).
Uint32("frames_dropped", r.framesDropped.Load()).
Msg("output relay stopped")
}
func (r *OutputRelay) relayLoop() {
defer close(r.stopped)
const maxRetries = 10
const maxConsecutiveWriteFailures = 50 // Allow some WebRTC write failures before reconnecting
retryDelay := 1 * time.Second
consecutiveFailures := 0
consecutiveWriteFailures := 0
for r.running.Load() {
if !(*r.source).IsConnected() {
if err := (*r.source).Connect(); err != nil {
if consecutiveFailures++; consecutiveFailures >= maxRetries {
r.logger.Error().Int("failures", consecutiveFailures).Msg("Max retries exceeded, stopping relay")
return
}
r.logger.Debug().Err(err).Int("failures", consecutiveFailures).Msg("Connection failed, retrying")
time.Sleep(retryDelay)
retryDelay = min(retryDelay*2, 30*time.Second)
continue
}
consecutiveFailures = 0
retryDelay = 1 * time.Second
}
msgType, payload, err := (*r.source).ReadMessage()
if err != nil {
if !r.running.Load() {
break
}
if consecutiveFailures++; consecutiveFailures >= maxRetries {
r.logger.Error().Int("failures", consecutiveFailures).Msg("Max read retries exceeded, stopping relay")
return
}
r.logger.Warn().Err(err).Int("failures", consecutiveFailures).Msg("Read error, reconnecting")
(*r.source).Disconnect()
time.Sleep(retryDelay)
retryDelay = min(retryDelay*2, 30*time.Second)
continue
}
consecutiveFailures = 0
retryDelay = 1 * time.Second
if msgType == ipcMsgTypeOpus && len(payload) > 0 {
r.sample.Data = payload
if err := r.audioTrack.WriteSample(r.sample); err != nil {
r.framesDropped.Add(1)
consecutiveWriteFailures++
// Log warning on first failure and every 10th failure
if consecutiveWriteFailures == 1 || consecutiveWriteFailures%10 == 0 {
r.logger.Warn().
Err(err).
Int("consecutive_failures", consecutiveWriteFailures).
Msg("Failed to write sample to WebRTC")
}
if consecutiveWriteFailures >= maxConsecutiveWriteFailures {
r.logger.Error().
Int("failures", consecutiveWriteFailures).
Msg("Too many consecutive WebRTC write failures, reconnecting source")
(*r.source).Disconnect()
consecutiveWriteFailures = 0
consecutiveFailures = 0
}
} else {
r.framesRelayed.Add(1)
consecutiveWriteFailures = 0
}
}
}
}
type InputRelay struct {
logger zerolog.Logger
running atomic.Bool
}
func NewInputRelay() *InputRelay {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-relay").Logger()
return &InputRelay{
logger: logger,
}
}
func (r *InputRelay) Start() error {
if r.running.Swap(true) {
return fmt.Errorf("input relay already running")
}
r.logger.Debug().Msg("input relay started")
return nil
}
func (r *InputRelay) Stop() {
if !r.running.Swap(false) {
return
}
r.logger.Debug().Msg("input relay stopped")
}