From 0f2aa9abe4f35b88889e07aec2135ac2d495ceb4 Mon Sep 17 00:00:00 2001 From: Alex P Date: Tue, 9 Sep 2025 10:16:53 +0000 Subject: [PATCH] feat(audio): improve socket handling and validation performance - Add retry logic for socket file removal and listener creation - Optimize message writing by combining header and data writes - Move socket paths from temp dir to /var/run - Refactor OPUS parameter lookup to use map for better readability - Simplify validation functions for better performance in hotpaths --- internal/audio/core_validation.go | 86 +++++-------------------------- internal/audio/ipc_unified.go | 58 +++++++++++++++------ internal/audio/quality_presets.go | 61 ++++++++-------------- 3 files changed, 77 insertions(+), 128 deletions(-) diff --git a/internal/audio/core_validation.go b/internal/audio/core_validation.go index 5836abdd..d0318a1c 100644 --- a/internal/audio/core_validation.go +++ b/internal/audio/core_validation.go @@ -72,23 +72,12 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error { } // ValidateBufferSize validates buffer size parameters with enhanced boundary checks -// Optimized to use AudioConfigCache for frequently accessed values +// Optimized for minimal overhead in hotpath func ValidateBufferSize(size int) error { if size <= 0 { return fmt.Errorf("%w: buffer size %d must be positive", ErrInvalidBufferSize, size) } - - // Fast path: Check against cached max frame size - cache := Config - maxFrameSize := cache.MaxAudioFrameSize - - // Most common case: validating a buffer that's sized for audio frames - if maxFrameSize > 0 && size <= maxFrameSize { - return nil - } - - // Use SocketMaxBuffer as the upper limit for general buffer validation - // This allows for socket buffers while still preventing extremely large allocations + // Single boundary check using pre-cached value if size > Config.SocketMaxBuffer { return fmt.Errorf("%w: buffer size %d exceeds maximum %d", ErrInvalidBufferSize, size, Config.SocketMaxBuffer) @@ -219,91 +208,42 @@ func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error { } // ValidateSampleRate validates audio sample rate values -// Optimized to use AudioConfigCache for frequently accessed values +// Optimized for minimal overhead in hotpath func ValidateSampleRate(sampleRate int) error { if sampleRate <= 0 { return fmt.Errorf("%w: sample rate %d must be positive", ErrInvalidSampleRate, sampleRate) } - - // Fast path: Check against cached sample rate first - cache := Config - cachedRate := cache.SampleRate - - // Most common case: validating against the current sample rate - if sampleRate == cachedRate { - return nil - } - - // Slower path: check against all valid rates - validRates := Config.ValidSampleRates - for _, rate := range validRates { + // Direct validation against valid rates + for _, rate := range Config.ValidSampleRates { if sampleRate == rate { return nil } } - return fmt.Errorf("%w: sample rate %d not in supported rates %v", - ErrInvalidSampleRate, sampleRate, validRates) + return fmt.Errorf("%w: sample rate %d not in valid rates %v", + ErrInvalidSampleRate, sampleRate, Config.ValidSampleRates) } // ValidateChannelCount validates audio channel count -// Optimized to use AudioConfigCache for frequently accessed values +// Optimized for minimal overhead in hotpath func ValidateChannelCount(channels int) error { if channels <= 0 { return fmt.Errorf("%w: channel count %d must be positive", ErrInvalidChannels, channels) } - - // Fast path: Check against cached channels first - cache := Config - cachedChannels := cache.Channels - - // Most common case: validating against the current channel count - if channels == cachedChannels { - return nil - } - - // Fast path: Check against cached max channels - cachedMaxChannels := cache.MaxChannels - if cachedMaxChannels > 0 && channels <= cachedMaxChannels { - return nil - } - - // Slow path: Use current config values - updatedMaxChannels := cache.MaxChannels - if channels > updatedMaxChannels { + // Direct boundary check + if channels > Config.MaxChannels { return fmt.Errorf("%w: channel count %d exceeds maximum %d", - ErrInvalidChannels, channels, updatedMaxChannels) + ErrInvalidChannels, channels, Config.MaxChannels) } return nil } // ValidateBitrate validates audio bitrate values (expects kbps) -// Optimized to use AudioConfigCache for frequently accessed values +// Optimized for minimal overhead in hotpath func ValidateBitrate(bitrate int) error { if bitrate <= 0 { return fmt.Errorf("%w: bitrate %d must be positive", ErrInvalidBitrate, bitrate) } - - // Fast path: Check against cached bitrate values - cache := Config - minBitrate := cache.MinOpusBitrate - maxBitrate := cache.MaxOpusBitrate - - // If we have valid cached values, use them - if minBitrate > 0 && maxBitrate > 0 { - // Convert kbps to bps for comparison with config limits - bitrateInBps := bitrate * 1000 - if bitrateInBps < minBitrate { - return fmt.Errorf("%w: bitrate %d kbps (%d bps) below minimum %d bps", - ErrInvalidBitrate, bitrate, bitrateInBps, minBitrate) - } - if bitrateInBps > maxBitrate { - return fmt.Errorf("%w: bitrate %d kbps (%d bps) exceeds maximum %d bps", - ErrInvalidBitrate, bitrate, bitrateInBps, maxBitrate) - } - return nil - } - - // Convert kbps to bps for comparison with config limits + // Direct boundary check with single conversion bitrateInBps := bitrate * 1000 if bitrateInBps < Config.MinOpusBitrate { return fmt.Errorf("%w: bitrate %d kbps (%d bps) below minimum %d bps", diff --git a/internal/audio/ipc_unified.go b/internal/audio/ipc_unified.go index 4a5a2b88..79315560 100644 --- a/internal/audio/ipc_unified.go +++ b/internal/audio/ipc_unified.go @@ -8,6 +8,7 @@ import ( "net" "os" "path/filepath" + "strings" "sync" "sync/atomic" "time" @@ -157,15 +158,38 @@ func (s *UnifiedAudioServer) Start() error { return fmt.Errorf("server already running") } - // Remove existing socket file - if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) { - return fmt.Errorf("failed to remove existing socket: %w", err) + // Remove existing socket file with retry logic + for i := 0; i < 3; i++ { + if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) { + s.logger.Warn().Err(err).Int("attempt", i+1).Msg("failed to remove existing socket file, retrying") + time.Sleep(100 * time.Millisecond) + continue + } + break + } + + // Create listener with retry on address already in use + var listener net.Listener + var err error + for i := 0; i < 3; i++ { + listener, err = net.Listen("unix", s.socketPath) + if err == nil { + break + } + + // If address is still in use, try to remove socket file again + if strings.Contains(err.Error(), "address already in use") { + s.logger.Warn().Err(err).Int("attempt", i+1).Msg("socket address in use, attempting cleanup and retry") + os.Remove(s.socketPath) + time.Sleep(200 * time.Millisecond) + continue + } + + return fmt.Errorf("failed to create unix socket: %w", err) } - // Create listener - listener, err := net.Listen("unix", s.socketPath) if err != nil { - return fmt.Errorf("failed to create listener: %w", err) + return fmt.Errorf("failed to create unix socket after retries: %w", err) } s.listener = listener @@ -367,14 +391,18 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error { func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) - if _, err := conn.Write(header); err != nil { - return fmt.Errorf("failed to write header: %w", err) - } - - // Write data if present + // Optimize: Use single write for header+data to reduce system calls if msg.Length > 0 && msg.Data != nil { - if _, err := conn.Write(msg.Data); err != nil { - return fmt.Errorf("failed to write data: %w", err) + // Pre-allocate combined buffer to avoid copying + combined := make([]byte, len(header)+len(msg.Data)) + copy(combined, header) + copy(combined[len(header):], msg.Data) + if _, err := conn.Write(combined); err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + } else { + if _, err := conn.Write(header); err != nil { + return fmt.Errorf("failed to write header: %w", err) } } @@ -642,9 +670,9 @@ func (c *UnifiedAudioClient) calculateAdaptiveDelay(attempt int, initialDelay, m // Helper functions for socket paths func getInputSocketPath() string { - return filepath.Join(os.TempDir(), inputSocketName) + return filepath.Join("/var/run", inputSocketName) } func getOutputSocketPath() string { - return filepath.Join(os.TempDir(), outputSocketName) + return filepath.Join("/var/run", outputSocketName) } diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 8548a85f..0b495413 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -260,6 +260,16 @@ func GetAudioConfig() AudioConfig { return currentConfig } +// Simplified OPUS parameter lookup table +var opusParams = map[AudioQuality]struct { + complexity, vbr, signalType, bandwidth, dtx int +}{ + AudioQualityLow: {Config.AudioQualityLowOpusComplexity, Config.AudioQualityLowOpusVBR, Config.AudioQualityLowOpusSignalType, Config.AudioQualityLowOpusBandwidth, Config.AudioQualityLowOpusDTX}, + AudioQualityMedium: {Config.AudioQualityMediumOpusComplexity, Config.AudioQualityMediumOpusVBR, Config.AudioQualityMediumOpusSignalType, Config.AudioQualityMediumOpusBandwidth, Config.AudioQualityMediumOpusDTX}, + AudioQualityHigh: {Config.AudioQualityHighOpusComplexity, Config.AudioQualityHighOpusVBR, Config.AudioQualityHighOpusSignalType, Config.AudioQualityHighOpusBandwidth, Config.AudioQualityHighOpusDTX}, + AudioQualityUltra: {Config.AudioQualityUltraOpusComplexity, Config.AudioQualityUltraOpusVBR, Config.AudioQualityUltraOpusSignalType, Config.AudioQualityUltraOpusBandwidth, Config.AudioQualityUltraOpusDTX}, +} + // SetMicrophoneQuality updates the current microphone quality configuration func SetMicrophoneQuality(quality AudioQuality) { // Validate audio quality parameter @@ -274,40 +284,11 @@ func SetMicrophoneQuality(quality AudioQuality) { if config, exists := presets[quality]; exists { currentMicrophoneConfig = config - // Get OPUS parameters for the selected quality - var complexity, vbr, signalType, bandwidth, dtx int - switch quality { - case AudioQualityLow: - complexity = Config.AudioQualityLowOpusComplexity - vbr = Config.AudioQualityLowOpusVBR - signalType = Config.AudioQualityLowOpusSignalType - bandwidth = Config.AudioQualityLowOpusBandwidth - dtx = Config.AudioQualityLowOpusDTX - case AudioQualityMedium: - complexity = Config.AudioQualityMediumOpusComplexity - vbr = Config.AudioQualityMediumOpusVBR - signalType = Config.AudioQualityMediumOpusSignalType - bandwidth = Config.AudioQualityMediumOpusBandwidth - dtx = Config.AudioQualityMediumOpusDTX - case AudioQualityHigh: - complexity = Config.AudioQualityHighOpusComplexity - vbr = Config.AudioQualityHighOpusVBR - signalType = Config.AudioQualityHighOpusSignalType - bandwidth = Config.AudioQualityHighOpusBandwidth - dtx = Config.AudioQualityHighOpusDTX - case AudioQualityUltra: - complexity = Config.AudioQualityUltraOpusComplexity - vbr = Config.AudioQualityUltraOpusVBR - signalType = Config.AudioQualityUltraOpusSignalType - bandwidth = Config.AudioQualityUltraOpusBandwidth - dtx = Config.AudioQualityUltraOpusDTX - default: - // Use medium quality as fallback - complexity = Config.AudioQualityMediumOpusComplexity - vbr = Config.AudioQualityMediumOpusVBR - signalType = Config.AudioQualityMediumOpusSignalType - bandwidth = Config.AudioQualityMediumOpusBandwidth - dtx = Config.AudioQualityMediumOpusDTX + // Get OPUS parameters using lookup table + params, exists := opusParams[quality] + if !exists { + // Fallback to medium quality + params = opusParams[AudioQualityMedium] } // Update audio input subprocess configuration dynamically without restart @@ -315,7 +296,7 @@ func SetMicrophoneQuality(quality AudioQuality) { // Set new OPUS configuration for future restarts if supervisor := GetAudioInputSupervisor(); supervisor != nil { - supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx) + supervisor.SetOpusConfig(config.Bitrate*1000, params.complexity, params.vbr, params.signalType, params.bandwidth, params.dtx) // Check if microphone is active but IPC control is broken inputManager := getAudioInputManager() @@ -336,11 +317,11 @@ func SetMicrophoneQuality(quality AudioQuality) { Channels: config.Channels, FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples Bitrate: config.Bitrate * 1000, // Convert kbps to bps - Complexity: complexity, - VBR: vbr, - SignalType: signalType, - Bandwidth: bandwidth, - DTX: dtx, + Complexity: params.complexity, + VBR: params.vbr, + SignalType: params.signalType, + Bandwidth: params.bandwidth, + DTX: params.dtx, } if err := supervisor.SendOpusConfig(opusConfig); err != nil {