diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 1590184a..68719a57 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -911,46 +911,28 @@ func updateCacheIfNeeded(cache *AudioConfigCache) { } func cgoAudioReadEncode(buf []byte) (int, error) { - cache := GetCachedConfig() - updateCacheIfNeeded(cache) - - // Fast validation with cached values - avoid lock with atomic access - minRequired := cache.GetMinReadEncodeBuffer() - - // Buffer validation - use pre-allocated error for common case - if len(buf) < minRequired { - // Use pre-allocated error for common case, only create custom error for edge cases - if len(buf) > 0 { - return 0, newBufferTooSmallError(len(buf), minRequired) - } - return 0, cache.GetBufferTooSmallError() + // Minimal buffer validation - assume caller provides correct size + if len(buf) == 0 { + return 0, errEmptyBuffer } - // Skip initialization check for now to avoid CGO compilation issues - - // Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers + // Direct CGO call - hotpath optimization n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0])) - // Fast path for success case + // Fast path for success if n > 0 { return int(n), nil } - // Handle error cases - use static error codes to reduce allocations + // Error handling with static errors if n < 0 { - // Common error cases - switch n { - case -1: + if n == -1 { return 0, errAudioInitFailed - case -2: - return 0, errAudioReadEncode - default: - return 0, newAudioReadEncodeError(int(n)) } + return 0, errAudioReadEncode } - // n == 0 case - return 0, nil // No data available + return 0, nil } // Audio playback functions @@ -972,58 +954,25 @@ func cgoAudioPlaybackClose() { C.jetkvm_audio_playback_close() } -func cgoAudioDecodeWrite(buf []byte) (n int, err error) { - // Fast validation with AudioConfigCache - cache := GetCachedConfig() - // Only update cache if expired - avoid unnecessary overhead - // Use proper locking to avoid race condition - if cache.initialized.Load() { - cache.mutex.RLock() - cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry - cache.mutex.RUnlock() - if cacheExpired { - cache.Update() - } - } else { - cache.Update() - } - - // Optimized buffer validation +func cgoAudioDecodeWrite(buf []byte) (int, error) { + // Minimal validation - assume caller provides correct size if len(buf) == 0 { return 0, errEmptyBuffer } - // Use cached max buffer size with atomic access - maxAllowed := cache.GetMaxDecodeWriteBuffer() - if len(buf) > maxAllowed { - // Use pre-allocated error for common case - if len(buf) == maxAllowed+1 { - return 0, cache.GetBufferTooLargeError() - } - return 0, newBufferTooLargeError(len(buf), maxAllowed) - } + // Direct CGO call - hotpath optimization + n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf)))) - // Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is safe for validated non-empty buffers - n = int(C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf)))) - - // Fast path for success case + // Fast path for success if n >= 0 { return n, nil } - // Handle error cases with static error codes - switch n { - case -1: - n = 0 - err = errAudioInitFailed - case -2: - n = 0 - err = errAudioDecodeWrite - default: - n = 0 - err = newAudioDecodeWriteError(n) + // Error handling with static errors + if n == -1 { + return 0, errAudioInitFailed } - return + return 0, errAudioDecodeWrite } // updateOpusEncoderParams dynamically updates OPUS encoder parameters @@ -1111,77 +1060,22 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) { // BatchReadEncode reads and encodes multiple audio frames in a single batch // with optimized zero-copy frame management and batch reference counting func BatchReadEncode(batchSize int) ([][]byte, error) { - cache := GetCachedConfig() - updateCacheIfNeeded(cache) - - // Calculate total buffer size needed for batch - frameSize := cache.GetMinReadEncodeBuffer() - totalSize := frameSize * batchSize - - // Get a single large buffer for all frames - batchBuffer := GetBufferFromPool(totalSize) - defer ReturnBufferToPool(batchBuffer) - - // Pre-allocate zero-copy frames for batch processing - zeroCopyFrames := make([]*ZeroCopyAudioFrame, 0, batchSize) - for i := 0; i < batchSize; i++ { - frame := GetZeroCopyFrame() - zeroCopyFrames = append(zeroCopyFrames, frame) - } - // Use batch reference counting for efficient cleanup - defer func() { - if _, err := BatchReleaseFrames(zeroCopyFrames); err != nil { - // Log release error but don't fail the operation - _ = err - } - }() - - // Batch AddRef all frames at once to reduce atomic operation overhead - err := BatchAddRefFrames(zeroCopyFrames) - if err != nil { - return nil, err - } - - // Track batch processing statistics - only if enabled - var startTime time.Time - // Batch time tracking removed - trackTime := false - if trackTime { - startTime = time.Now() - } - batchProcessingCount.Add(1) - - // Process frames in batch using zero-copy frames + // Simple batch processing without complex overhead frames := make([][]byte, 0, batchSize) - for i := 0; i < batchSize; i++ { - // Calculate offset for this frame in the batch buffer - offset := i * frameSize - frameBuf := batchBuffer[offset : offset+frameSize] + frameSize := 4096 // Fixed frame size for performance - // Process this frame - n, err := cgoAudioReadEncode(frameBuf) + for i := 0; i < batchSize; i++ { + buf := make([]byte, frameSize) + n, err := cgoAudioReadEncode(buf) if err != nil { - // Return partial batch on error if i > 0 { - batchFrameCount.Add(int64(i)) - if trackTime { - batchProcessingTime.Add(time.Since(startTime).Microseconds()) - } - return frames, nil + return frames, nil // Return partial batch } return nil, err } - - // Use zero-copy frame for efficient memory management - frame := zeroCopyFrames[i] - frame.SetDataDirect(frameBuf[:n]) // Direct assignment without copy - frames = append(frames, frame.Data()) - } - - // Update statistics - batchFrameCount.Add(int64(len(frames))) - if trackTime { - batchProcessingTime.Add(time.Since(startTime).Microseconds()) + if n > 0 { + frames = append(frames, buf[:n]) + } } return frames, nil diff --git a/internal/audio/core_handlers.go b/internal/audio/core_handlers.go index 5bc3137e..69d7ec91 100644 --- a/internal/audio/core_handlers.go +++ b/internal/audio/core_handlers.go @@ -29,11 +29,9 @@ func (s *AudioControlService) MuteAudio(muted bool) error { supervisor := GetAudioOutputSupervisor() if supervisor != nil { supervisor.Stop() - s.logger.Info().Msg("audio output supervisor stopped") } StopAudioRelay() SetAudioMuted(true) - s.logger.Info().Msg("audio output muted (subprocess and relay stopped)") } else { // Unmute: Start audio output subprocess and relay if !s.sessionProvider.IsSessionActive() { @@ -44,10 +42,9 @@ func (s *AudioControlService) MuteAudio(muted bool) error { if supervisor != nil { err := supervisor.Start() if err != nil { - s.logger.Error().Err(err).Msg("failed to start audio output supervisor during unmute") + s.logger.Debug().Err(err).Msg("failed to start audio output supervisor") return err } - s.logger.Info().Msg("audio output supervisor started") } // Start audio relay diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index b8a43086..2893051e 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -688,32 +688,28 @@ func (aic *AudioInputClient) Disconnect() { // SendFrame sends an Opus frame to the audio input server func (aic *AudioInputClient) SendFrame(frame []byte) error { + // Fast path validation + if len(frame) == 0 { + return nil + } + aic.mtx.Lock() - defer aic.mtx.Unlock() - if !aic.running || aic.conn == nil { - return fmt.Errorf("not connected to audio input server") - } - - frameLen := len(frame) - if frameLen == 0 { - return nil // Empty frame, ignore - } - - // Inline frame validation to reduce function call overhead - if frameLen > maxFrameSize { - return ErrFrameDataTooLarge + aic.mtx.Unlock() + return fmt.Errorf("not connected") } + // Direct message creation without timestamp overhead msg := &InputIPCMessage{ - Magic: inputMagicNumber, - Type: InputMessageTypeOpusFrame, - Length: uint32(frameLen), - Timestamp: time.Now().UnixNano(), - Data: frame, + Magic: inputMagicNumber, + Type: InputMessageTypeOpusFrame, + Length: uint32(len(frame)), + Data: frame, } - return aic.writeMessage(msg) + err := aic.writeMessage(msg) + aic.mtx.Unlock() + return err } // SendFrameZeroCopy sends a zero-copy Opus frame to the audio input server diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 0d50bc8d..8548a85f 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -312,7 +312,6 @@ func SetMicrophoneQuality(quality AudioQuality) { // Update audio input subprocess configuration dynamically without restart logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() - logger.Info().Int("quality", int(quality)).Msg("updating audio input quality settings dynamically") // Set new OPUS configuration for future restarts if supervisor := GetAudioInputSupervisor(); supervisor != nil { @@ -321,12 +320,11 @@ func SetMicrophoneQuality(quality AudioQuality) { // Check if microphone is active but IPC control is broken inputManager := getAudioInputManager() if inputManager.IsRunning() && !supervisor.IsConnected() { - logger.Info().Msg("microphone active but IPC disconnected, attempting to reconnect control channel") // Reconnect the IPC control channel supervisor.Stop() time.Sleep(50 * time.Millisecond) if err := supervisor.Start(); err != nil { - logger.Warn().Err(err).Msg("failed to reconnect IPC control channel") + logger.Debug().Err(err).Msg("failed to reconnect IPC control channel") } } @@ -345,9 +343,8 @@ func SetMicrophoneQuality(quality AudioQuality) { DTX: dtx, } - logger.Info().Interface("opusConfig", opusConfig).Msg("sending Opus configuration to audio input subprocess") if err := supervisor.SendOpusConfig(opusConfig); err != nil { - logger.Warn().Err(err).Msg("failed to send dynamic Opus config update via IPC, falling back to subprocess restart") + logger.Debug().Err(err).Msg("failed to send dynamic Opus config update via IPC") // Fallback to subprocess restart if IPC update fails supervisor.Stop() if err := supervisor.Start(); err != nil { diff --git a/internal/audio/webrtc_relay.go b/internal/audio/webrtc_relay.go index 6a338564..a8c37a19 100644 --- a/internal/audio/webrtc_relay.go +++ b/internal/audio/webrtc_relay.go @@ -5,6 +5,7 @@ import ( "fmt" "reflect" "sync" + "sync/atomic" "time" "github.com/jetkvm/kvm/internal/logging" @@ -118,9 +119,7 @@ func (r *AudioRelay) IsMuted() bool { // GetStats returns relay statistics func (r *AudioRelay) GetStats() (framesRelayed, framesDropped int64) { - r.mutex.RLock() - defer r.mutex.RUnlock() - return r.framesRelayed, r.framesDropped + return atomic.LoadInt64(&r.framesRelayed), atomic.LoadInt64(&r.framesDropped) } // UpdateTrack updates the WebRTC audio track for the relay @@ -132,34 +131,43 @@ func (r *AudioRelay) UpdateTrack(audioTrack AudioTrackWriter) { func (r *AudioRelay) relayLoop() { defer r.wg.Done() - r.logger.Debug().Msg("Audio relay loop started") var maxConsecutiveErrors = Config.MaxConsecutiveErrors consecutiveErrors := 0 + backoffDelay := time.Millisecond * 10 + maxBackoff := time.Second * 5 for { select { case <-r.ctx.Done(): - r.logger.Debug().Msg("audio relay loop stopping") return default: frame, err := r.client.ReceiveFrame() if err != nil { consecutiveErrors++ - r.logger.Error().Err(err).Int("consecutive_errors", consecutiveErrors).Msg("error reading frame from audio output server") r.incrementDropped() + // Exponential backoff for stability if consecutiveErrors >= maxConsecutiveErrors { - r.logger.Error().Int("consecutive_errors", consecutiveErrors).Int("max_errors", maxConsecutiveErrors).Msg("too many consecutive read errors, stopping audio relay") + // Attempt reconnection + if r.attemptReconnection() { + consecutiveErrors = 0 + backoffDelay = time.Millisecond * 10 + continue + } return } - time.Sleep(Config.ShortSleepDuration) + + time.Sleep(backoffDelay) + if backoffDelay < maxBackoff { + backoffDelay *= 2 + } continue } consecutiveErrors = 0 + backoffDelay = time.Millisecond * 10 if err := r.forwardToWebRTC(frame); err != nil { - r.logger.Warn().Err(err).Msg("failed to forward frame to webrtc") r.incrementDropped() } else { r.incrementRelayed() @@ -218,14 +226,24 @@ func (r *AudioRelay) forwardToWebRTC(frame []byte) error { // incrementRelayed atomically increments the relayed frames counter func (r *AudioRelay) incrementRelayed() { - r.mutex.Lock() - r.framesRelayed++ - r.mutex.Unlock() + atomic.AddInt64(&r.framesRelayed, 1) } // incrementDropped atomically increments the dropped frames counter func (r *AudioRelay) incrementDropped() { - r.mutex.Lock() - r.framesDropped++ - r.mutex.Unlock() + atomic.AddInt64(&r.framesDropped, 1) +} + +// attemptReconnection tries to reconnect the audio client for stability +func (r *AudioRelay) attemptReconnection() bool { + if r.client == nil { + return false + } + + // Disconnect and reconnect + r.client.Disconnect() + time.Sleep(time.Millisecond * 100) + + err := r.client.Connect() + return err == nil }