feat(audio): improve audio quality handling and recovery mechanisms

- Add server stats reset and frame drop recovery functions
- Implement global audio server instance management
- Add WebRTC audio track replacement capability
- Improve audio relay initialization with retry logic
- Enhance quality change handling with adaptive buffer management
- Add global helper functions for audio quality control
This commit is contained in:
Alex P 2025-09-08 12:48:22 +00:00
parent 219c972e33
commit 91f9dba4c6
8 changed files with 268 additions and 31 deletions

View File

@ -22,6 +22,14 @@ func initAudioControlService() {
audio.SetCurrentSessionCallback(func() audio.AudioTrackWriter {
return GetCurrentSessionAudioTrack()
})
// Set up callback for audio relay to replace WebRTC audio track
audio.SetTrackReplacementCallback(func(newTrack audio.AudioTrackWriter) error {
if track, ok := newTrack.(*webrtc.TrackLocalStaticSample); ok {
return ReplaceCurrentSessionAudioTrack(track)
}
return nil
})
}
}
@ -92,6 +100,60 @@ func ConnectRelayToCurrentSession() error {
return nil
}
// ReplaceCurrentSessionAudioTrack replaces the audio track in the current WebRTC session
func ReplaceCurrentSessionAudioTrack(newTrack *webrtc.TrackLocalStaticSample) error {
if currentSession == nil {
return nil // No session to update
}
err := currentSession.ReplaceAudioTrack(newTrack)
if err != nil {
logger.Error().Err(err).Msg("failed to replace audio track in current session")
return err
}
logger.Info().Msg("successfully replaced audio track in current session")
return nil
}
// SetAudioQuality is a global helper to set audio output quality
func SetAudioQuality(quality audio.AudioQuality) error {
initAudioControlService()
audioControlService.SetAudioQuality(quality)
return nil
}
// SetMicrophoneQuality is a global helper to set microphone quality
func SetMicrophoneQuality(quality audio.AudioQuality) error {
initAudioControlService()
audioControlService.SetMicrophoneQuality(quality)
return nil
}
// GetAudioQualityPresets is a global helper to get available audio quality presets
func GetAudioQualityPresets() map[audio.AudioQuality]audio.AudioConfig {
initAudioControlService()
return audioControlService.GetAudioQualityPresets()
}
// GetMicrophoneQualityPresets is a global helper to get available microphone quality presets
func GetMicrophoneQualityPresets() map[audio.AudioQuality]audio.AudioConfig {
initAudioControlService()
return audioControlService.GetMicrophoneQualityPresets()
}
// GetCurrentAudioQuality is a global helper to get current audio quality configuration
func GetCurrentAudioQuality() audio.AudioConfig {
initAudioControlService()
return audioControlService.GetCurrentAudioQuality()
}
// GetCurrentMicrophoneQuality is a global helper to get current microphone quality configuration
func GetCurrentMicrophoneQuality() audio.AudioConfig {
initAudioControlService()
return audioControlService.GetCurrentMicrophoneQuality()
}
// handleAudioMute handles POST /audio/mute requests
func handleAudioMute(c *gin.Context) {
type muteReq struct {
@ -202,10 +264,8 @@ func handleAudioStatus(c *gin.Context) {
// handleAudioQuality handles GET requests for audio quality presets
func handleAudioQuality(c *gin.Context) {
initAudioControlService()
presets := audioControlService.GetAudioQualityPresets()
current := audioControlService.GetCurrentAudioQuality()
presets := GetAudioQualityPresets()
current := GetCurrentAudioQuality()
c.JSON(200, gin.H{
"presets": presets,
@ -224,16 +284,17 @@ func handleSetAudioQuality(c *gin.Context) {
return
}
initAudioControlService()
// Convert int to AudioQuality type
quality := audio.AudioQuality(req.Quality)
// Set the audio quality
audioControlService.SetAudioQuality(quality)
// Set the audio quality using global convenience function
if err := SetAudioQuality(quality); err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
// Return the updated configuration
current := audioControlService.GetCurrentAudioQuality()
current := GetCurrentAudioQuality()
c.JSON(200, gin.H{
"success": true,
"config": current,
@ -242,9 +303,9 @@ func handleSetAudioQuality(c *gin.Context) {
// handleMicrophoneQuality handles GET requests for microphone quality presets
func handleMicrophoneQuality(c *gin.Context) {
initAudioControlService()
presets := audioControlService.GetMicrophoneQualityPresets()
current := audioControlService.GetCurrentMicrophoneQuality()
presets := GetMicrophoneQualityPresets()
current := GetCurrentMicrophoneQuality()
c.JSON(200, gin.H{
"presets": presets,
"current": current,
@ -258,21 +319,22 @@ func handleSetMicrophoneQuality(c *gin.Context) {
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
c.JSON(400, gin.H{"error": err.Error()})
return
}
initAudioControlService()
// Convert int to AudioQuality type
quality := audio.AudioQuality(req.Quality)
// Set the microphone quality
audioControlService.SetMicrophoneQuality(quality)
// Set the microphone quality using global convenience function
if err := SetMicrophoneQuality(quality); err != nil {
c.JSON(500, gin.H{"error": err.Error()})
return
}
// Return the updated configuration
current := audioControlService.GetCurrentMicrophoneQuality()
c.JSON(http.StatusOK, gin.H{
current := GetCurrentMicrophoneQuality()
c.JSON(200, gin.H{
"success": true,
"config": current,
})

View File

@ -177,6 +177,19 @@ func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) {
}
}
// BoostBuffersForQualityChange immediately increases buffer sizes to handle quality change bursts
// This bypasses the normal adaptive algorithm for emergency situations
func (abm *AdaptiveBufferManager) BoostBuffersForQualityChange() {
// Immediately set buffers to maximum size to handle quality change frame bursts
maxSize := int64(abm.config.MaxBufferSize)
atomic.StoreInt64(&abm.currentInputBufferSize, maxSize)
atomic.StoreInt64(&abm.currentOutputBufferSize, maxSize)
abm.logger.Info().
Int("buffer_size", int(maxSize)).
Msg("Boosted buffers to maximum size for quality change")
}
// adaptationLoop is the main loop that adjusts buffer sizes
func (abm *AdaptiveBufferManager) adaptationLoop() {
defer abm.wg.Done()

View File

@ -19,6 +19,28 @@ import (
"github.com/jetkvm/kvm/internal/logging"
)
// Global audio input server instance
var globalAudioInputServer *AudioInputServer
// GetGlobalAudioInputServer returns the global audio input server instance
func GetGlobalAudioInputServer() *AudioInputServer {
return globalAudioInputServer
}
// ResetGlobalAudioInputServerStats resets the global audio input server stats
func ResetGlobalAudioInputServerStats() {
if globalAudioInputServer != nil {
globalAudioInputServer.ResetServerStats()
}
}
// RecoverGlobalAudioInputServer attempts to recover from dropped frames
func RecoverGlobalAudioInputServer() {
if globalAudioInputServer != nil {
globalAudioInputServer.RecoverFromDroppedFrames()
}
}
// getEnvInt reads an integer from environment variable with a default value
// RunAudioInputServer runs the audio input server subprocess
@ -56,6 +78,9 @@ func RunAudioInputServer() error {
}
defer server.Close()
// Store globally for access by other functions
globalAudioInputServer = server
err = server.Start()
if err != nil {
logger.Error().Err(err).Msg("failed to start audio input server")

View File

@ -878,6 +878,28 @@ func (aic *AudioInputClient) ResetStats() {
ResetFrameStats(&aic.totalFrames, &aic.droppedFrames)
}
// ResetServerStats resets server frame statistics
func (ais *AudioInputServer) ResetServerStats() {
atomic.StoreInt64(&ais.totalFrames, 0)
atomic.StoreInt64(&ais.droppedFrames, 0)
}
// RecoverFromDroppedFrames attempts to recover when too many frames are dropped
func (ais *AudioInputServer) RecoverFromDroppedFrames() {
total := atomic.LoadInt64(&ais.totalFrames)
dropped := atomic.LoadInt64(&ais.droppedFrames)
// If more than 50% of frames are dropped, attempt recovery
if total > 100 && dropped > total/2 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Warn().Int64("total", total).Int64("dropped", dropped).Msg("high drop rate detected, attempting recovery")
// Reset stats and update buffer size from adaptive manager
ais.ResetServerStats()
ais.UpdateBufferSize()
}
}
// startReaderGoroutine starts the message reader using the goroutine pool
func (ais *AudioInputServer) startReaderGoroutine() {
ais.wg.Add(1)

View File

@ -209,15 +209,50 @@ func SetAudioQuality(quality AudioQuality) {
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
logger.Info().Int("quality", int(quality)).Msg("restarting audio output subprocess with new quality settings")
// Immediately boost adaptive buffer sizes to handle quality change frame burst
// This prevents "Message channel full, dropping frame" warnings during transitions
adaptiveManager := GetAdaptiveBufferManager()
if adaptiveManager != nil {
// Immediately set buffers to maximum size for quality change
adaptiveManager.BoostBuffersForQualityChange()
logger.Debug().Msg("boosted adaptive buffers for quality change")
}
// Set new OPUS configuration
supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx)
// Stop current subprocess
supervisor.Stop()
// Wait for supervisor to fully stop before starting again
// This prevents race conditions and audio breakage
for i := 0; i < 50; i++ { // Wait up to 5 seconds
if !supervisor.IsRunning() {
break
}
time.Sleep(100 * time.Millisecond)
}
if supervisor.IsRunning() {
logger.Warn().Msg("supervisor did not stop within timeout, proceeding anyway")
}
// Start subprocess with new configuration
if err := supervisor.Start(); err != nil {
logger.Error().Err(err).Msg("failed to restart audio output subprocess")
} else {
logger.Info().Int("quality", int(quality)).Msg("audio output subprocess restarted successfully with new quality")
// Reset audio input server stats after quality change
// Allow adaptive buffer manager to naturally adjust buffer sizes
go func() {
time.Sleep(2 * time.Second) // Wait for quality change to settle
// Reset audio input server stats to clear persistent warnings
ResetGlobalAudioInputServerStats()
// Attempt recovery if microphone is still having issues
time.Sleep(1 * time.Second)
RecoverGlobalAudioInputServer()
}()
}
} else {
// Fallback to dynamic update if supervisor is not available
@ -289,6 +324,15 @@ func SetMicrophoneQuality(quality AudioQuality) {
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
logger.Info().Int("quality", int(quality)).Msg("updating audio input subprocess quality settings dynamically")
// Immediately boost adaptive buffer sizes to handle quality change frame burst
// This prevents "Message channel full, dropping frame" warnings during transitions
adaptiveManager := GetAdaptiveBufferManager()
if adaptiveManager != nil {
// Immediately set buffers to maximum size for quality change
adaptiveManager.BoostBuffersForQualityChange()
logger.Debug().Msg("boosted adaptive buffers for quality change")
}
// Set new OPUS configuration for future restarts
supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx)
@ -317,6 +361,17 @@ func SetMicrophoneQuality(quality AudioQuality) {
}
} else {
logger.Info().Msg("audio input quality updated dynamically with complete Opus configuration")
// Reset audio input server stats after config update
// Allow adaptive buffer manager to naturally adjust buffer sizes
go func() {
time.Sleep(2 * time.Second) // Wait for quality change to settle
// Reset audio input server stats to clear persistent warnings
ResetGlobalAudioInputServerStats()
// Attempt recovery if microphone is still having issues
time.Sleep(1 * time.Second)
RecoverGlobalAudioInputServer()
}()
}
} else {
logger.Info().Bool("supervisor_running", supervisor.IsRunning()).Msg("audio input subprocess not connected, configuration will apply on next start")

View File

@ -101,25 +101,54 @@ func UpdateAudioRelayTrack(audioTrack AudioTrackWriter) error {
return err
}
globalRelay = relay
// Replace the track in the WebRTC session if callback is available
if trackReplacementCallback != nil {
if err := trackReplacementCallback(audioTrack); err != nil {
// Log error but don't fail the relay start
// The relay can still work even if WebRTC track replacement fails
_ = err // Suppress linter warning
}
}
return nil
}
// Update the track in the existing relay
globalRelay.UpdateTrack(audioTrack)
// Replace the track in the WebRTC session if callback is available
if trackReplacementCallback != nil {
if err := trackReplacementCallback(audioTrack); err != nil {
// Log error but don't fail the track update
// The relay can still work even if WebRTC track replacement fails
_ = err // Suppress linter warning
}
}
return nil
}
// CurrentSessionCallback is a function type for getting the current session's audio track
type CurrentSessionCallback func() AudioTrackWriter
// TrackReplacementCallback is a function type for replacing the WebRTC audio track
type TrackReplacementCallback func(AudioTrackWriter) error
// currentSessionCallback holds the callback function to get the current session's audio track
var currentSessionCallback CurrentSessionCallback
// trackReplacementCallback holds the callback function to replace the WebRTC audio track
var trackReplacementCallback TrackReplacementCallback
// SetCurrentSessionCallback sets the callback function to get the current session's audio track
func SetCurrentSessionCallback(callback CurrentSessionCallback) {
currentSessionCallback = callback
}
// SetTrackReplacementCallback sets the callback function to replace the WebRTC audio track
func SetTrackReplacementCallback(callback TrackReplacementCallback) {
trackReplacementCallback = callback
}
// connectRelayToCurrentSession connects the audio relay to the current WebRTC session's audio track
// This is used when restarting the relay during unmute operations
func connectRelayToCurrentSession() error {

36
main.go
View File

@ -77,19 +77,31 @@ func startAudioSubprocess() error {
func(pid int) {
logger.Info().Int("pid", pid).Msg("audio server process started")
// Start audio relay system for main process
// If there's an active WebRTC session, use its audio track
var audioTrack *webrtc.TrackLocalStaticSample
if currentSession != nil && currentSession.AudioTrack != nil {
audioTrack = currentSession.AudioTrack
logger.Info().Msg("restarting audio relay with existing WebRTC audio track")
} else {
logger.Info().Msg("starting audio relay without WebRTC track (will be updated when session is created)")
}
// Wait for audio output server to be fully ready before starting relay
// This prevents "no client connected" errors during quality changes
go func() {
// Give the audio output server time to initialize and start listening
time.Sleep(500 * time.Millisecond)
if err := audio.StartAudioRelay(audioTrack); err != nil {
logger.Error().Err(err).Msg("failed to start audio relay")
}
// Start audio relay system for main process
// If there's an active WebRTC session, use its audio track
var audioTrack *webrtc.TrackLocalStaticSample
if currentSession != nil && currentSession.AudioTrack != nil {
audioTrack = currentSession.AudioTrack
logger.Info().Msg("restarting audio relay with existing WebRTC audio track")
} else {
logger.Info().Msg("starting audio relay without WebRTC track (will be updated when session is created)")
}
if err := audio.StartAudioRelay(audioTrack); err != nil {
logger.Error().Err(err).Msg("failed to start audio relay")
// Retry once after additional delay if initial attempt fails
time.Sleep(1 * time.Second)
if err := audio.StartAudioRelay(audioTrack); err != nil {
logger.Error().Err(err).Msg("failed to start audio relay after retry")
}
}
}()
},
// onProcessExit
func(pid int, exitCode int, crashed bool) {

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"net"
"runtime"
"strings"
@ -24,6 +25,7 @@ type Session struct {
peerConnection *webrtc.PeerConnection
VideoTrack *webrtc.TrackLocalStaticSample
AudioTrack *webrtc.TrackLocalStaticSample
AudioRtpSender *webrtc.RTPSender
ControlChannel *webrtc.DataChannel
RPCChannel *webrtc.DataChannel
HidChannel *webrtc.DataChannel
@ -261,6 +263,7 @@ func newSession(config SessionConfig) (*Session, error) {
return nil, err
}
audioRtpSender := audioTransceiver.Sender()
session.AudioRtpSender = audioRtpSender
// Handle incoming audio track (microphone from browser)
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
@ -410,6 +413,22 @@ func (s *Session) stopAudioProcessor() {
s.audioWg.Wait()
}
// ReplaceAudioTrack replaces the current audio track with a new one
func (s *Session) ReplaceAudioTrack(newTrack *webrtc.TrackLocalStaticSample) error {
if s.AudioRtpSender == nil {
return fmt.Errorf("audio RTP sender not available")
}
// Replace the track using the RTP sender
if err := s.AudioRtpSender.ReplaceTrack(newTrack); err != nil {
return fmt.Errorf("failed to replace audio track: %w", err)
}
// Update the session's audio track reference
s.AudioTrack = newTrack
return nil
}
func drainRtpSender(rtpSender *webrtc.RTPSender) {
// Lock to OS thread to isolate RTCP processing
runtime.LockOSThread()