diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index 899a4ce3..dc9f6f6a 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -57,25 +57,25 @@ type AdaptiveBufferConfig struct { func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig { return AdaptiveBufferConfig{ // Conservative buffer sizes for 256MB RAM constraint - MinBufferSize: GetConfig().AdaptiveMinBufferSize, - MaxBufferSize: GetConfig().AdaptiveMaxBufferSize, - DefaultBufferSize: GetConfig().AdaptiveDefaultBufferSize, + MinBufferSize: Config.AdaptiveMinBufferSize, + MaxBufferSize: Config.AdaptiveMaxBufferSize, + DefaultBufferSize: Config.AdaptiveDefaultBufferSize, // CPU thresholds optimized for single-core ARM Cortex A7 under load - LowCPUThreshold: GetConfig().LowCPUThreshold * 100, // Below 20% CPU - HighCPUThreshold: GetConfig().HighCPUThreshold * 100, // Above 60% CPU (lowered to be more responsive) + LowCPUThreshold: Config.LowCPUThreshold * 100, // Below 20% CPU + HighCPUThreshold: Config.HighCPUThreshold * 100, // Above 60% CPU (lowered to be more responsive) // Memory thresholds for 256MB total RAM - LowMemoryThreshold: GetConfig().LowMemoryThreshold * 100, // Below 35% memory usage - HighMemoryThreshold: GetConfig().HighMemoryThreshold * 100, // Above 75% memory usage (lowered for earlier response) + LowMemoryThreshold: Config.LowMemoryThreshold * 100, // Below 35% memory usage + HighMemoryThreshold: Config.HighMemoryThreshold * 100, // Above 75% memory usage (lowered for earlier response) // Latency targets - TargetLatency: GetConfig().AdaptiveBufferTargetLatency, // Target 20ms latency - MaxLatency: GetConfig().LatencyMonitorTarget, // Max acceptable latency + TargetLatency: Config.AdaptiveBufferTargetLatency, // Target 20ms latency + MaxLatency: Config.LatencyMonitorTarget, // Max acceptable latency // Adaptation settings - AdaptationInterval: GetConfig().BufferUpdateInterval, // Check every 500ms - SmoothingFactor: GetConfig().SmoothingFactor, // Moderate responsiveness + AdaptationInterval: Config.BufferUpdateInterval, // Check every 500ms + SmoothingFactor: Config.SmoothingFactor, // Moderate responsiveness } } @@ -273,7 +273,7 @@ func (abm *AdaptiveBufferManager) adaptBufferSizes() { latencyFactor := abm.calculateLatencyFactor(currentLatency) // Combine factors with weights (CPU has highest priority for KVM coexistence) - combinedFactor := GetConfig().CPUMemoryWeight*cpuFactor + GetConfig().MemoryWeight*memoryFactor + GetConfig().LatencyWeight*latencyFactor + combinedFactor := Config.CPUMemoryWeight*cpuFactor + Config.MemoryWeight*memoryFactor + Config.LatencyWeight*latencyFactor // Apply adaptation with smoothing currentInput := float64(atomic.LoadInt64(&abm.currentInputBufferSize)) @@ -437,8 +437,8 @@ func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} { "input_buffer_size": abm.GetInputBufferSize(), "output_buffer_size": abm.GetOutputBufferSize(), "average_latency_ms": float64(atomic.LoadInt64(&abm.averageLatency)) / 1e6, - "system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / GetConfig().PercentageMultiplier, - "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / GetConfig().PercentageMultiplier, + "system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / Config.PercentageMultiplier, + "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier, "adaptation_count": atomic.LoadInt64(&abm.adaptationCount), "last_adaptation": lastAdaptation, } diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index f82a1fa2..f2417608 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -83,8 +83,7 @@ type batchWriteResult struct { // NewBatchAudioProcessor creates a new batch audio processor func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { // Get cached config to avoid GetConfig() calls - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate input parameters with minimal overhead if batchSize <= 0 || batchSize > 1000 { @@ -105,7 +104,7 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() // Pre-calculate frame size to avoid repeated GetConfig() calls - frameSize := cache.GetMinReadEncodeBuffer() + frameSize := cache.MinReadEncodeBuffer if frameSize == 0 { frameSize = 1500 // Safe fallback } @@ -166,7 +165,7 @@ func (bap *BatchAudioProcessor) Stop() { bap.cancel() // Wait for processing to complete - time.Sleep(bap.batchDuration + GetConfig().BatchProcessingDelay) + time.Sleep(bap.batchDuration + Config.BatchProcessingDelay) bap.logger.Info().Msg("batch audio processor stopped") } @@ -174,8 +173,7 @@ func (bap *BatchAudioProcessor) Stop() { // BatchReadEncode performs batched audio read and encode operations func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate buffer before processing if err := ValidateBufferSize(len(buffer)); err != nil { @@ -221,7 +219,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { select { case result := <-resultChan: return result.length, result.err - case <-time.After(cache.BatchProcessingTimeout): + case <-time.After(cache.BatchProcessorTimeout): // Timeout, fallback to single operation // Use sampling to reduce atomic operations overhead if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { @@ -236,8 +234,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { // This is the legacy version that uses a single buffer func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate buffer before processing if err := ValidateBufferSize(len(buffer)); err != nil { @@ -283,7 +280,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { select { case result := <-resultChan: return result.length, result.err - case <-time.After(cache.BatchProcessingTimeout): + case <-time.After(cache.BatchProcessorTimeout): // Use sampling to reduce atomic operations overhead if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { atomic.AddInt64(&bap.stats.SingleWrites, 10) @@ -296,8 +293,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate buffers before processing if len(opusData) == 0 { @@ -339,7 +335,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcm select { case result := <-resultChan: return result.length, result.err - case <-time.After(cache.BatchProcessingTimeout): + case <-time.After(cache.BatchProcessorTimeout): atomic.AddInt64(&bap.stats.SingleWrites, 1) atomic.AddInt64(&bap.stats.WriteFrames, 1) // Use the optimized function with separate buffers @@ -427,7 +423,7 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { } // Get cached config once - avoid repeated calls - cache := GetCachedConfig() + cache := Config threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold if threadPinningThreshold == 0 { threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback @@ -480,7 +476,7 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { } // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() + cache := Config threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold if threadPinningThreshold == 0 { threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback @@ -586,8 +582,7 @@ func GetBatchAudioProcessor() *BatchAudioProcessor { // Initialize on first use if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) { // Get cached config to avoid GetConfig() calls - cache := GetCachedConfig() - cache.Update() + cache := Config processor := NewBatchAudioProcessor(cache.BatchProcessorFramesPerBatch, cache.BatchProcessorTimeout) atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor)) @@ -601,7 +596,7 @@ func GetBatchAudioProcessor() *BatchAudioProcessor { } // Fallback: create a new processor (should rarely happen) - config := GetConfig() + config := Config return NewBatchAudioProcessor(config.BatchProcessorFramesPerBatch, config.BatchProcessorTimeout) } diff --git a/internal/audio/batch_zero_copy.go b/internal/audio/batch_zero_copy.go index 4ba9959a..8d066521 100644 --- a/internal/audio/batch_zero_copy.go +++ b/internal/audio/batch_zero_copy.go @@ -73,7 +73,7 @@ func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor { // NewBatchZeroCopyProcessor creates a new batch zero-copy processor func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor { - cache := GetCachedConfig() + cache := Config return &BatchZeroCopyProcessor{ maxBatchSize: cache.BatchProcessorFramesPerBatch, batchTimeout: cache.BatchProcessorTimeout, diff --git a/internal/audio/core_config.go b/internal/audio/core_config.go index f5bb7398..6f3b44d9 100644 --- a/internal/audio/core_config.go +++ b/internal/audio/core_config.go @@ -4,12 +4,12 @@ import "time" // GetMetricsUpdateInterval returns the current metrics update interval from centralized config func GetMetricsUpdateInterval() time.Duration { - return GetConfig().MetricsUpdateInterval + return Config.MetricsUpdateInterval } // SetMetricsUpdateInterval sets the metrics update interval in centralized config func SetMetricsUpdateInterval(interval time.Duration) { - config := GetConfig() + config := Config config.MetricsUpdateInterval = interval UpdateConfig(config) } diff --git a/internal/audio/core_config_constants.go b/internal/audio/core_config_constants.go index 0fff7ed5..6af91d14 100644 --- a/internal/audio/core_config_constants.go +++ b/internal/audio/core_config_constants.go @@ -313,6 +313,15 @@ type AudioConfigConstants struct { AudioProcessorQueueSize int AudioReaderQueueSize int WorkerMaxIdleTime time.Duration + + // Connection Retry Configuration + MaxConnectionAttempts int // Maximum connection retry attempts + ConnectionRetryDelay time.Duration // Initial connection retry delay + MaxConnectionRetryDelay time.Duration // Maximum connection retry delay + ConnectionBackoffFactor float64 // Connection retry backoff factor + ConnectionTimeoutDelay time.Duration // Connection timeout for each attempt + ReconnectionInterval time.Duration // Interval for automatic reconnection attempts + HealthCheckInterval time.Duration // Health check interval for connections } // DefaultAudioConfig returns the default configuration constants @@ -424,11 +433,11 @@ func DefaultAudioConfig() *AudioConfigConstants { // Buffer Management PreallocSize: 1024 * 1024, // 1MB buffer preallocation MaxPoolSize: 100, // Maximum object pool size - MessagePoolSize: 256, // Message pool size for IPC + MessagePoolSize: 512, // Increased message pool for quality change bursts OptimalSocketBuffer: 262144, // 256KB optimal socket buffer MaxSocketBuffer: 1048576, // 1MB maximum socket buffer MinSocketBuffer: 8192, // 8KB minimum socket buffer - ChannelBufferSize: 500, // Inter-goroutine channel buffer size + ChannelBufferSize: 1000, // Increased channel buffer for quality change bursts AudioFramePoolSize: 1500, // Audio frame object pool size PageSize: 4096, // Memory page size for alignment InitialBufferFrames: 500, // Initial buffer size during startup @@ -436,17 +445,17 @@ func DefaultAudioConfig() *AudioConfigConstants { MinReadEncodeBuffer: 1276, // Minimum CGO read/encode buffer MaxDecodeWriteBuffer: 4096, // Maximum CGO decode/write buffer - // IPC Configuration + // IPC Configuration - Balanced for stability MagicNumber: 0xDEADBEEF, // IPC message validation header MaxFrameSize: 4096, // Maximum audio frame size (4KB) - WriteTimeout: 100 * time.Millisecond, // IPC write operation timeout + WriteTimeout: 500 * time.Millisecond, // Increased timeout to handle quality change bursts HeaderSize: 8, // IPC message header size - // Monitoring and Metrics - MetricsUpdateInterval: 1000 * time.Millisecond, // Metrics collection frequency - WarmupSamples: 10, // Warmup samples for metrics accuracy - MetricsChannelBuffer: 100, // Metrics data channel buffer size - LatencyHistorySize: 100, // Number of latency measurements to keep + // Monitoring and Metrics - Balanced for stability + MetricsUpdateInterval: 1000 * time.Millisecond, // Stable metrics collection frequency + WarmupSamples: 10, // Adequate warmup samples for accuracy + MetricsChannelBuffer: 100, // Adequate metrics data channel buffer + LatencyHistorySize: 100, // Adequate latency measurements to keep // Process Monitoring Constants MaxCPUPercent: 100.0, // Maximum CPU percentage @@ -470,41 +479,50 @@ func DefaultAudioConfig() *AudioConfigConstants { BackoffMultiplier: 2.0, // Exponential backoff multiplier MaxConsecutiveErrors: 5, // Consecutive error threshold - // Timing Constants - DefaultSleepDuration: 100 * time.Millisecond, // Standard polling interval - ShortSleepDuration: 10 * time.Millisecond, // High-frequency polling - LongSleepDuration: 200 * time.Millisecond, // Background tasks - DefaultTickerInterval: 100 * time.Millisecond, // Periodic task interval - BufferUpdateInterval: 500 * time.Millisecond, // Buffer status updates + // Connection Retry Configuration + MaxConnectionAttempts: 15, // Maximum connection retry attempts + ConnectionRetryDelay: 50 * time.Millisecond, // Initial connection retry delay + MaxConnectionRetryDelay: 2 * time.Second, // Maximum connection retry delay + ConnectionBackoffFactor: 1.5, // Connection retry backoff factor + ConnectionTimeoutDelay: 5 * time.Second, // Connection timeout for each attempt + ReconnectionInterval: 30 * time.Second, // Interval for automatic reconnection attempts + HealthCheckInterval: 10 * time.Second, // Health check interval for connections + + // Timing Constants - Optimized for quality change stability + DefaultSleepDuration: 100 * time.Millisecond, // Balanced polling interval + ShortSleepDuration: 10 * time.Millisecond, // Balanced high-frequency polling + LongSleepDuration: 200 * time.Millisecond, // Balanced background task delay + DefaultTickerInterval: 100 * time.Millisecond, // Balanced periodic task interval + BufferUpdateInterval: 300 * time.Millisecond, // Faster buffer updates for quality changes InputSupervisorTimeout: 5 * time.Second, // Input monitoring timeout OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout - BatchProcessingDelay: 10 * time.Millisecond, // Batch processing delay - AdaptiveOptimizerStability: 10 * time.Second, // Adaptive stability period + BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay + AdaptiveOptimizerStability: 5 * time.Second, // Faster adaptive stability period - LatencyMonitorTarget: 50 * time.Millisecond, // Target latency for monitoring + LatencyMonitorTarget: 50 * time.Millisecond, // Balanced target latency for monitoring - // Adaptive Buffer Configuration - LowCPUThreshold: 0.20, - HighCPUThreshold: 0.60, - LowMemoryThreshold: 0.50, - HighMemoryThreshold: 0.75, - AdaptiveBufferTargetLatency: 20 * time.Millisecond, + // Adaptive Buffer Configuration - Optimized for low latency + LowCPUThreshold: 0.30, + HighCPUThreshold: 0.70, + LowMemoryThreshold: 0.60, + HighMemoryThreshold: 0.80, + AdaptiveBufferTargetLatency: 15 * time.Millisecond, // Reduced target latency - // Adaptive Buffer Size Configuration - AdaptiveMinBufferSize: 3, // Minimum 3 frames for stability - AdaptiveMaxBufferSize: 20, // Maximum 20 frames for high load - AdaptiveDefaultBufferSize: 6, // Balanced buffer size (6 frames) + // Adaptive Buffer Size Configuration - Optimized for quality change bursts + AdaptiveMinBufferSize: 16, // Higher minimum to handle bursts + AdaptiveMaxBufferSize: 64, // Higher maximum for quality changes + AdaptiveDefaultBufferSize: 32, // Higher default for stability - // Adaptive Optimizer Configuration - CooldownPeriod: 30 * time.Second, - RollbackThreshold: 300 * time.Millisecond, - AdaptiveOptimizerLatencyTarget: 50 * time.Millisecond, + // Adaptive Optimizer Configuration - Faster response + CooldownPeriod: 15 * time.Second, // Reduced cooldown period + RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold + AdaptiveOptimizerLatencyTarget: 30 * time.Millisecond, // Reduced latency target - // Latency Monitor Configuration - MaxLatencyThreshold: 200 * time.Millisecond, - JitterThreshold: 20 * time.Millisecond, - LatencyOptimizationInterval: 5 * time.Second, - LatencyAdaptiveThreshold: 0.8, + // Latency Monitor Configuration - More aggressive monitoring + MaxLatencyThreshold: 150 * time.Millisecond, // Lower max latency threshold + JitterThreshold: 15 * time.Millisecond, // Reduced jitter threshold + LatencyOptimizationInterval: 3 * time.Second, // More frequent optimization + LatencyAdaptiveThreshold: 0.7, // More aggressive adaptive threshold // Microphone Contention Configuration MicContentionTimeout: 200 * time.Millisecond, @@ -532,12 +550,12 @@ func DefaultAudioConfig() *AudioConfigConstants { LatencyScalingFactor: 2.0, // Latency ratio scaling factor OptimizerAggressiveness: 0.7, // Optimizer aggressiveness factor - // CGO Audio Processing Constants - CGOUsleepMicroseconds: 1000, // 1000 microseconds (1ms) for CGO usleep calls + // CGO Audio Processing Constants - Balanced for stability + CGOUsleepMicroseconds: 1000, // 1000 microseconds (1ms) for stable CGO usleep calls CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960) CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions - // Frontend Constants + // Frontend Constants - Balanced for stability FrontendOperationDebounceMS: 1000, // 1000ms debounce for frontend operations FrontendSyncDebounceMS: 1000, // 1000ms debounce for sync operations FrontendSampleRate: 48000, // 48000Hz sample rate for frontend audio @@ -560,20 +578,20 @@ func DefaultAudioConfig() *AudioConfigConstants { ProcessMonitorFallbackClockHz: 1000.0, // 1000.0 Hz fallback clock ProcessMonitorTraditionalHz: 100.0, // 100.0 Hz traditional clock - // Batch Processing Constants - BatchProcessorFramesPerBatch: 4, // 4 frames per batch - BatchProcessorTimeout: 5 * time.Millisecond, // 5ms timeout - BatchProcessorMaxQueueSize: 16, // 16 max queue size for balanced memory/performance - BatchProcessorAdaptiveThreshold: 0.8, // 0.8 threshold for adaptive batching (80% queue full) - BatchProcessorThreadPinningThreshold: 8, // 8 frames minimum for thread pinning optimization + // Batch Processing Constants - Optimized for quality change bursts + BatchProcessorFramesPerBatch: 16, // Larger batches for quality changes + BatchProcessorTimeout: 20 * time.Millisecond, // Longer timeout for bursts + BatchProcessorMaxQueueSize: 64, // Larger queue for quality changes + BatchProcessorAdaptiveThreshold: 0.6, // Lower threshold for faster adaptation + BatchProcessorThreadPinningThreshold: 8, // Lower threshold for better performance - // Output Streaming Constants - OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) + // Output Streaming Constants - Balanced for stability + OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) for stability // IPC Constants IPCInitialBufferFrames: 500, // 500 frames for initial buffer - // Event Constants + // Event Constants - Balanced for stability EventTimeoutSeconds: 2, // 2 seconds for event timeout EventTimeFormatString: "2006-01-02T15:04:05.000Z", // "2006-01-02T15:04:05.000Z" time format EventSubscriptionDelayMS: 100, // 100ms subscription delay @@ -585,7 +603,7 @@ func DefaultAudioConfig() *AudioConfigConstants { AudioReaderQueueSize: 32, // 32 tasks queue size for reader pool WorkerMaxIdleTime: 60 * time.Second, // 60s maximum idle time before worker termination - // Input Processing Constants + // Input Processing Constants - Balanced for stability InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold // Adaptive Buffer Constants @@ -670,7 +688,7 @@ func DefaultAudioConfig() *AudioConfigConstants { } // Global configuration instance -var audioConfigInstance = DefaultAudioConfig() +var Config = DefaultAudioConfig() // UpdateConfig allows runtime configuration updates func UpdateConfig(newConfig *AudioConfigConstants) { @@ -682,12 +700,12 @@ func UpdateConfig(newConfig *AudioConfigConstants) { return } - audioConfigInstance = newConfig + Config = newConfig logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger() logger.Info().Msg("Audio configuration updated successfully") } // GetConfig returns the current configuration func GetConfig() *AudioConfigConstants { - return audioConfigInstance + return Config } diff --git a/internal/audio/core_validation.go b/internal/audio/core_validation.go index 7eb63542..03b44adb 100644 --- a/internal/audio/core_validation.go +++ b/internal/audio/core_validation.go @@ -55,12 +55,11 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error { maxFrameSize := cachedMaxFrameSize if maxFrameSize == 0 { // Fallback: get from cache - cache := GetCachedConfig() - maxFrameSize = int(cache.maxAudioFrameSize.Load()) + cache := Config + maxFrameSize = cache.MaxAudioFrameSize if maxFrameSize == 0 { - // Last resort: update cache - cache.Update() - maxFrameSize = int(cache.maxAudioFrameSize.Load()) + // Last resort: use default + maxFrameSize = cache.MaxAudioFrameSize } // Cache globally for next calls cachedMaxFrameSize = maxFrameSize @@ -80,8 +79,8 @@ func ValidateBufferSize(size int) error { } // Fast path: Check against cached max frame size - cache := GetCachedConfig() - maxFrameSize := int(cache.maxAudioFrameSize.Load()) + cache := Config + maxFrameSize := cache.MaxAudioFrameSize // Most common case: validating a buffer that's sized for audio frames if maxFrameSize > 0 && size <= maxFrameSize { @@ -89,7 +88,7 @@ func ValidateBufferSize(size int) error { } // Slower path: full validation against SocketMaxBuffer - config := GetConfig() + config := Config // Use SocketMaxBuffer as the upper limit for general buffer validation // This allows for socket buffers while still preventing extremely large allocations if size > config.SocketMaxBuffer { @@ -107,8 +106,8 @@ func ValidateLatency(latency time.Duration) error { } // Fast path: check against cached max latency - cache := GetCachedConfig() - maxLatency := time.Duration(cache.maxLatency.Load()) + cache := Config + maxLatency := time.Duration(cache.MaxLatency) // If we have a valid cached value, use it if maxLatency > 0 { @@ -125,7 +124,7 @@ func ValidateLatency(latency time.Duration) error { } // Slower path: full validation with GetConfig() - config := GetConfig() + config := Config minLatency := time.Millisecond // Minimum reasonable latency if latency > 0 && latency < minLatency { return fmt.Errorf("%w: latency %v below minimum %v", @@ -142,9 +141,9 @@ func ValidateLatency(latency time.Duration) error { // Optimized to use AudioConfigCache for frequently accessed values func ValidateMetricsInterval(interval time.Duration) error { // Fast path: check against cached values - cache := GetCachedConfig() - minInterval := time.Duration(cache.minMetricsUpdateInterval.Load()) - maxInterval := time.Duration(cache.maxMetricsUpdateInterval.Load()) + cache := Config + minInterval := time.Duration(cache.MinMetricsUpdateInterval) + maxInterval := time.Duration(cache.MaxMetricsUpdateInterval) // If we have valid cached values, use them if minInterval > 0 && maxInterval > 0 { @@ -160,7 +159,7 @@ func ValidateMetricsInterval(interval time.Duration) error { } // Slower path: full validation with GetConfig() - config := GetConfig() + config := Config minInterval = config.MinMetricsUpdateInterval maxInterval = config.MaxMetricsUpdateInterval if interval < minInterval { @@ -184,7 +183,7 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error { return ErrInvalidBufferSize } // Validate against global limits - maxBuffer := GetConfig().SocketMaxBuffer + maxBuffer := Config.SocketMaxBuffer if maxSize > maxBuffer { return ErrInvalidBufferSize } @@ -194,7 +193,7 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error { // ValidateInputIPCConfig validates input IPC configuration func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { // Use config values - config := GetConfig() + config := Config minSampleRate := config.MinSampleRate maxSampleRate := config.MaxSampleRate maxChannels := config.MaxChannels @@ -213,7 +212,7 @@ func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { // ValidateOutputIPCConfig validates output IPC configuration func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error { // Use config values - config := GetConfig() + config := Config minSampleRate := config.MinSampleRate maxSampleRate := config.MaxSampleRate maxChannels := config.MaxChannels @@ -263,8 +262,8 @@ func ValidateSampleRate(sampleRate int) error { } // Fast path: Check against cached sample rate first - cache := GetCachedConfig() - cachedRate := int(cache.sampleRate.Load()) + cache := Config + cachedRate := cache.SampleRate // Most common case: validating against the current sample rate if sampleRate == cachedRate { @@ -272,7 +271,7 @@ func ValidateSampleRate(sampleRate int) error { } // Slower path: check against all valid rates - config := GetConfig() + config := Config validRates := config.ValidSampleRates for _, rate := range validRates { if sampleRate == rate { @@ -291,8 +290,8 @@ func ValidateChannelCount(channels int) error { } // Fast path: Check against cached channels first - cache := GetCachedConfig() - cachedChannels := int(cache.channels.Load()) + cache := Config + cachedChannels := cache.Channels // Most common case: validating against the current channel count if channels == cachedChannels { @@ -300,14 +299,13 @@ func ValidateChannelCount(channels int) error { } // Fast path: Check against cached max channels - cachedMaxChannels := int(cache.maxChannels.Load()) + cachedMaxChannels := cache.MaxChannels if cachedMaxChannels > 0 && channels <= cachedMaxChannels { return nil } - // Slow path: Update cache and validate - cache.Update() - updatedMaxChannels := int(cache.maxChannels.Load()) + // Slow path: Use current config values + updatedMaxChannels := cache.MaxChannels if channels > updatedMaxChannels { return fmt.Errorf("%w: channel count %d exceeds maximum %d", ErrInvalidChannels, channels, updatedMaxChannels) @@ -323,9 +321,9 @@ func ValidateBitrate(bitrate int) error { } // Fast path: Check against cached bitrate values - cache := GetCachedConfig() - minBitrate := int(cache.minOpusBitrate.Load()) - maxBitrate := int(cache.maxOpusBitrate.Load()) + cache := Config + minBitrate := cache.MinOpusBitrate + maxBitrate := cache.MaxOpusBitrate // If we have valid cached values, use them if minBitrate > 0 && maxBitrate > 0 { @@ -343,7 +341,7 @@ func ValidateBitrate(bitrate int) error { } // Slower path: full validation with GetConfig() - config := GetConfig() + config := Config // Convert kbps to bps for comparison with config limits bitrateInBps := bitrate * 1000 if bitrateInBps < config.MinOpusBitrate { @@ -365,11 +363,11 @@ func ValidateFrameDuration(duration time.Duration) error { } // Fast path: Check against cached frame size first - cache := GetCachedConfig() + cache := Config // Convert frameSize (samples) to duration for comparison - cachedFrameSize := int(cache.frameSize.Load()) - cachedSampleRate := int(cache.sampleRate.Load()) + cachedFrameSize := cache.FrameSize + cachedSampleRate := cache.SampleRate // Only do this calculation if we have valid cached values if cachedFrameSize > 0 && cachedSampleRate > 0 { @@ -382,8 +380,8 @@ func ValidateFrameDuration(duration time.Duration) error { } // Fast path: Check against cached min/max frame duration - cachedMinDuration := time.Duration(cache.minFrameDuration.Load()) - cachedMaxDuration := time.Duration(cache.maxFrameDuration.Load()) + cachedMinDuration := time.Duration(cache.MinFrameDuration) + cachedMaxDuration := time.Duration(cache.MaxFrameDuration) if cachedMinDuration > 0 && cachedMaxDuration > 0 { if duration < cachedMinDuration { @@ -397,10 +395,9 @@ func ValidateFrameDuration(duration time.Duration) error { return nil } - // Slow path: Update cache and validate - cache.Update() - updatedMinDuration := time.Duration(cache.minFrameDuration.Load()) - updatedMaxDuration := time.Duration(cache.maxFrameDuration.Load()) + // Slow path: Use current config values + updatedMinDuration := time.Duration(cache.MinFrameDuration) + updatedMaxDuration := time.Duration(cache.MaxFrameDuration) if duration < updatedMinDuration { return fmt.Errorf("%w: frame duration %v below minimum %v", @@ -417,11 +414,11 @@ func ValidateFrameDuration(duration time.Duration) error { // Uses optimized validation functions that leverage AudioConfigCache func ValidateAudioConfigComplete(config AudioConfig) error { // Fast path: Check if all values match the current cached configuration - cache := GetCachedConfig() - cachedSampleRate := int(cache.sampleRate.Load()) - cachedChannels := int(cache.channels.Load()) - cachedBitrate := int(cache.opusBitrate.Load()) / 1000 // Convert from bps to kbps - cachedFrameSize := int(cache.frameSize.Load()) + cache := Config + cachedSampleRate := cache.SampleRate + cachedChannels := cache.Channels + cachedBitrate := cache.OpusBitrate / 1000 // Convert from bps to kbps + cachedFrameSize := cache.FrameSize // Only do this calculation if we have valid cached values if cachedSampleRate > 0 && cachedChannels > 0 && cachedBitrate > 0 && cachedFrameSize > 0 { @@ -481,11 +478,11 @@ var cachedMaxFrameSize int // InitValidationCache initializes cached validation values with actual config func InitValidationCache() { // Initialize the global cache variable for backward compatibility - config := GetConfig() + config := Config cachedMaxFrameSize = config.MaxAudioFrameSize - // Update the global audio config cache - GetCachedConfig().Update() + // Initialize the global audio config cache + cachedMaxFrameSize = Config.MaxAudioFrameSize } // ValidateAudioFrame validates audio frame data with cached max size for performance @@ -502,12 +499,11 @@ func ValidateAudioFrame(data []byte) error { maxSize := cachedMaxFrameSize if maxSize == 0 { // Fallback: get from cache only if global cache not initialized - cache := GetCachedConfig() - maxSize = int(cache.maxAudioFrameSize.Load()) + cache := Config + maxSize = cache.MaxAudioFrameSize if maxSize == 0 { - // Last resort: update cache and get fresh value - cache.Update() - maxSize = int(cache.maxAudioFrameSize.Load()) + // Last resort: get fresh value + maxSize = cache.MaxAudioFrameSize } // Cache the value globally for next calls cachedMaxFrameSize = maxSize diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go index cfc844e0..23115a1d 100644 --- a/internal/audio/goroutine_pool.go +++ b/internal/audio/goroutine_pool.go @@ -255,7 +255,7 @@ func GetAudioProcessorPool() *GoroutinePool { } globalAudioProcessorInitOnce.Do(func() { - config := GetConfig() + config := Config newPool := NewGoroutinePool( "audio-processor", config.MaxAudioProcessorWorkers, @@ -277,7 +277,7 @@ func GetAudioReaderPool() *GoroutinePool { } globalAudioReaderInitOnce.Do(func() { - config := GetConfig() + config := Config newPool := NewGoroutinePool( "audio-reader", config.MaxAudioReaderWorkers, diff --git a/internal/audio/input_microphone_manager.go b/internal/audio/input_microphone_manager.go index f80cfd3f..5178f9f3 100644 --- a/internal/audio/input_microphone_manager.go +++ b/internal/audio/input_microphone_manager.go @@ -108,7 +108,7 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { processingTime := time.Since(startTime) // Log high latency warnings - if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { + if processingTime > time.Duration(Config.InputProcessingTimeoutMS)*time.Millisecond { latencyMs := float64(processingTime.Milliseconds()) aim.logger.Warn(). Float64("latency_ms", latencyMs). @@ -149,7 +149,7 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) processingTime := time.Since(startTime) // Log high latency warnings - if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { + if processingTime > time.Duration(Config.InputProcessingTimeoutMS)*time.Millisecond { latencyMs := float64(processingTime.Milliseconds()) aim.logger.Warn(). Float64("latency_ms", latencyMs). diff --git a/internal/audio/input_server_main.go b/internal/audio/input_server_main.go index 889755c4..dc8b77e3 100644 --- a/internal/audio/input_server_main.go +++ b/internal/audio/input_server_main.go @@ -107,7 +107,7 @@ func RunAudioInputServer() error { server.Stop() // Give some time for cleanup - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) return nil } diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 7311d094..70b63c88 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -73,7 +73,7 @@ func (ais *AudioInputSupervisor) supervisionLoop() { // Configure supervision parameters (no restart for input supervisor) config := SupervisionConfig{ ProcessType: "audio input server", - Timeout: GetConfig().InputSupervisorTimeout, + Timeout: Config.InputSupervisorTimeout, EnableRestart: false, // Input supervisor doesn't restart MaxRestartAttempts: 0, RestartWindow: 0, @@ -164,7 +164,7 @@ func (ais *AudioInputSupervisor) Stop() { select { case <-ais.processDone: ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped gracefully") - case <-time.After(GetConfig().InputSupervisorTimeout): + case <-time.After(Config.InputSupervisorTimeout): ais.logger.Warn().Str("component", "audio-input-supervisor").Msg("component did not stop gracefully, forcing termination") ais.forceKillProcess("audio input server") } @@ -190,7 +190,7 @@ func (ais *AudioInputSupervisor) GetClient() *AudioInputClient { // connectClient attempts to connect the client to the server func (ais *AudioInputSupervisor) connectClient() { // Wait briefly for the server to start and create socket - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) // Additional small delay to ensure socket is ready after restart time.Sleep(20 * time.Millisecond) diff --git a/internal/audio/ipc_common.go b/internal/audio/ipc_common.go index 38b595ec..4b2263d7 100644 --- a/internal/audio/ipc_common.go +++ b/internal/audio/ipc_common.go @@ -49,7 +49,7 @@ func NewGenericMessagePool(size int) *GenericMessagePool { pool.preallocated = make([]*OptimizedMessage, pool.preallocSize) for i := 0; i < pool.preallocSize; i++ { pool.preallocated[i] = &OptimizedMessage{ - data: make([]byte, 0, GetConfig().MaxFrameSize), + data: make([]byte, 0, Config.MaxFrameSize), } } @@ -57,7 +57,7 @@ func NewGenericMessagePool(size int) *GenericMessagePool { for i := 0; i < size-pool.preallocSize; i++ { select { case pool.pool <- &OptimizedMessage{ - data: make([]byte, 0, GetConfig().MaxFrameSize), + data: make([]byte, 0, Config.MaxFrameSize), }: default: break @@ -89,7 +89,7 @@ func (mp *GenericMessagePool) Get() *OptimizedMessage { // Pool empty, create new message atomic.AddInt64(&mp.missCount, 1) return &OptimizedMessage{ - data: make([]byte, 0, GetConfig().MaxFrameSize), + data: make([]byte, 0, Config.MaxFrameSize), } } } @@ -149,7 +149,7 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp())) // Set write deadline for timeout handling (more efficient than goroutines) - if deadline := time.Now().Add(GetConfig().WriteTimeout); deadline.After(time.Now()) { + if deadline := time.Now().Add(Config.WriteTimeout); deadline.After(time.Now()) { if err := conn.SetWriteDeadline(deadline); err != nil { // If we can't set deadline, proceed without it // This maintains compatibility with connections that don't support deadlines diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 56d0e8f9..730d2478 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -23,8 +23,8 @@ const ( // Constants are now defined in unified_ipc.go var ( - maxFrameSize = GetConfig().MaxFrameSize // Maximum Opus frame size - messagePoolSize = GetConfig().MessagePoolSize // Pre-allocated message pool size + maxFrameSize = Config.MaxFrameSize // Maximum Opus frame size + messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size ) // Legacy aliases for backward compatibility @@ -77,7 +77,7 @@ func initializeMessagePool() { messagePoolInitOnce.Do(func() { preallocSize := messagePoolSize / 4 // 25% pre-allocated for immediate use globalMessagePool.preallocSize = preallocSize - globalMessagePool.maxPoolSize = messagePoolSize * GetConfig().PoolGrowthMultiplier // Allow growth up to 2x + globalMessagePool.maxPoolSize = messagePoolSize * Config.PoolGrowthMultiplier // Allow growth up to 2x globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize) // Pre-allocate messages for immediate use @@ -378,7 +378,7 @@ func (ais *AudioInputServer) handleConnection(conn net.Conn) { if ais.conn == nil { return } - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) } } } @@ -499,11 +499,11 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error { } // Get cached config once - avoid repeated calls and locking - cache := GetCachedConfig() + cache := Config // Skip cache expiry check in hotpath - background updates handle this // Get a PCM buffer from the pool for optimized decode-write - pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) + pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize) defer ReturnBufferToPool(pcmBuffer) // Direct CGO call - avoid wrapper function overhead @@ -646,9 +646,9 @@ func (aic *AudioInputClient) Connect() error { return nil } // Exponential backoff starting from config - backoffStart := GetConfig().BackoffStart + backoffStart := Config.BackoffStart delay := time.Duration(backoffStart.Nanoseconds()*(1< maxDelay { delay = maxDelay } @@ -911,10 +911,10 @@ func (ais *AudioInputServer) startReaderGoroutine() { // Enhanced error tracking and recovery var consecutiveErrors int var lastErrorTime time.Time - maxConsecutiveErrors := GetConfig().MaxConsecutiveErrors - errorResetWindow := GetConfig().RestartWindow // Use existing restart window - baseBackoffDelay := GetConfig().RetryDelay - maxBackoffDelay := GetConfig().MaxRetryDelay + maxConsecutiveErrors := Config.MaxConsecutiveErrors + errorResetWindow := Config.RestartWindow // Use existing restart window + baseBackoffDelay := Config.RetryDelay + maxBackoffDelay := Config.MaxRetryDelay logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() @@ -1025,7 +1025,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() { processorTask := func() { // Only lock OS thread and set priority for high-load scenarios // This reduces interference with input processing threads - config := GetConfig() + config := Config useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 if useThreadOptimizations { @@ -1137,7 +1137,7 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo select { case processChan <- msg: return nil - case <-time.After(GetConfig().WriteTimeout): + case <-time.After(Config.WriteTimeout): // Processing queue full and timeout reached, drop frame atomic.AddInt64(&ais.droppedFrames, 1) return fmt.Errorf("processing queue timeout") @@ -1156,7 +1156,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { monitorTask := func() { // Monitor goroutine doesn't need thread locking for most scenarios // Only use thread optimizations for high-throughput scenarios - config := GetConfig() + config := Config useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 if useThreadOptimizations { @@ -1167,11 +1167,11 @@ func (ais *AudioInputServer) startMonitorGoroutine() { } defer ais.wg.Done() - ticker := time.NewTicker(GetConfig().DefaultTickerInterval) + ticker := time.NewTicker(Config.DefaultTickerInterval) defer ticker.Stop() // Buffer size update ticker (less frequent) - bufferUpdateTicker := time.NewTicker(GetConfig().BufferUpdateInterval) + bufferUpdateTicker := time.NewTicker(Config.BufferUpdateInterval) defer bufferUpdateTicker.Stop() for { @@ -1330,7 +1330,7 @@ func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats { var hitRate float64 if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * GetConfig().PercentageMultiplier + hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier } // Calculate channel pool size diff --git a/internal/audio/ipc_output.go b/internal/audio/ipc_output.go index ccef9318..473b7f70 100644 --- a/internal/audio/ipc_output.go +++ b/internal/audio/ipc_output.go @@ -24,7 +24,7 @@ const ( // Methods are now inherited from UnifiedIPCMessage // Global shared message pool for output IPC client header reading -var globalOutputClientMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize) +var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize) // AudioOutputServer is now an alias for UnifiedAudioServer type AudioOutputServer = UnifiedAudioServer @@ -95,7 +95,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { } size := binary.LittleEndian.Uint32(optMsg.header[5:9]) - maxFrameSize := GetConfig().OutputMaxFrameSize + maxFrameSize := Config.OutputMaxFrameSize if int(size) > maxFrameSize { return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize) } diff --git a/internal/audio/ipc_unified.go b/internal/audio/ipc_unified.go index ada7faf0..dec68352 100644 --- a/internal/audio/ipc_unified.go +++ b/internal/audio/ipc_unified.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "io" + "math" "net" "os" "path/filepath" @@ -17,8 +18,8 @@ import ( // Unified IPC constants var ( - outputMagicNumber uint32 = GetConfig().OutputMagicNumber // "JKOU" (JetKVM Output) - inputMagicNumber uint32 = GetConfig().InputMagicNumber // "JKMI" (JetKVM Microphone Input) + outputMagicNumber uint32 = Config.OutputMagicNumber // "JKOU" (JetKVM Output) + inputMagicNumber uint32 = Config.InputMagicNumber // "JKMI" (JetKVM Microphone Input) outputSocketName = "audio_output.sock" inputSocketName = "audio_input.sock" headerSize = 17 // Fixed header size: 4+1+4+8 bytes @@ -144,8 +145,8 @@ func NewUnifiedAudioServer(isInput bool) (*UnifiedAudioServer, error) { logger: logger, socketPath: socketPath, magicNumber: magicNumber, - messageChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize), - processChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize), + messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), + processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), socketBufferConfig: DefaultSocketBufferConfig(), latencyMonitor: nil, adaptiveOptimizer: nil, @@ -311,7 +312,7 @@ func (s *UnifiedAudioServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, err timestamp := int64(binary.LittleEndian.Uint64(header[9:17])) // Validate length - if length > uint32(GetConfig().MaxFrameSize) { + if length > uint32(Config.MaxFrameSize) { return nil, fmt.Errorf("message too large: %d bytes", length) } @@ -339,7 +340,10 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error { defer s.mtx.Unlock() if !s.running || s.conn == nil { - return fmt.Errorf("no client connected") + // Silently drop frames when no client is connected + // This prevents "no client connected" warnings during startup and quality changes + atomic.AddInt64(&s.droppedFrames, 1) + return nil // Return nil to avoid flooding logs with connection warnings } start := time.Now() @@ -398,7 +402,7 @@ func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) // UnifiedAudioClient provides common functionality for both input and output clients type UnifiedAudioClient struct { - // Atomic fields first for ARM32 alignment + // Atomic counters for frame statistics droppedFrames int64 // Atomic counter for dropped frames totalFrames int64 // Atomic counter for total frames @@ -409,6 +413,13 @@ type UnifiedAudioClient struct { socketPath string magicNumber uint32 bufferPool *AudioBufferPool // Buffer pool for memory optimization + + // Connection health monitoring + lastHealthCheck time.Time + connectionErrors int64 // Atomic counter for connection errors + autoReconnect bool // Enable automatic reconnection + healthCheckTicker *time.Ticker + stopHealthCheck chan struct{} } // NewUnifiedAudioClient creates a new unified audio client @@ -430,10 +441,12 @@ func NewUnifiedAudioClient(isInput bool) *UnifiedAudioClient { logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger() return &UnifiedAudioClient{ - logger: logger, - socketPath: socketPath, - magicNumber: magicNumber, - bufferPool: NewAudioBufferPool(GetConfig().MaxFrameSize), + logger: logger, + socketPath: socketPath, + magicNumber: magicNumber, + bufferPool: NewAudioBufferPool(Config.MaxFrameSize), + autoReconnect: true, // Enable automatic reconnection by default + stopHealthCheck: make(chan struct{}), } } @@ -453,32 +466,46 @@ func (c *UnifiedAudioClient) Connect() error { } // Try connecting multiple times as the server might not be ready - // Reduced retry count and delay for faster startup - for i := 0; i < 10; i++ { - conn, err := net.Dial("unix", c.socketPath) + // Use configurable retry parameters for better control + maxAttempts := Config.MaxConnectionAttempts + initialDelay := Config.ConnectionRetryDelay + maxDelay := Config.MaxConnectionRetryDelay + backoffFactor := Config.ConnectionBackoffFactor + + for i := 0; i < maxAttempts; i++ { + // Set connection timeout for each attempt + conn, err := net.DialTimeout("unix", c.socketPath, Config.ConnectionTimeoutDelay) if err == nil { c.conn = conn c.running = true // Reset frame counters on successful connection atomic.StoreInt64(&c.totalFrames, 0) atomic.StoreInt64(&c.droppedFrames, 0) - c.logger.Info().Str("socket_path", c.socketPath).Msg("Connected to server") + atomic.StoreInt64(&c.connectionErrors, 0) + c.lastHealthCheck = time.Now() + // Start health check monitoring if auto-reconnect is enabled + if c.autoReconnect { + c.startHealthCheck() + } + c.logger.Info().Str("socket_path", c.socketPath).Int("attempt", i+1).Msg("Connected to server") return nil } - // Exponential backoff starting from config - backoffStart := GetConfig().BackoffStart - delay := time.Duration(backoffStart.Nanoseconds()*(1< maxDelay { - delay = maxDelay + + // Log connection attempt failure + c.logger.Debug().Err(err).Str("socket_path", c.socketPath).Int("attempt", i+1).Int("max_attempts", maxAttempts).Msg("Connection attempt failed") + + // Don't sleep after the last attempt + if i < maxAttempts-1 { + // Calculate adaptive delay based on connection failure patterns + delay := c.calculateAdaptiveDelay(i, initialDelay, maxDelay, backoffFactor) + time.Sleep(delay) } - time.Sleep(delay) } // Ensure clean state on connection failure c.conn = nil c.running = false - return fmt.Errorf("failed to connect to audio server after 10 attempts") + return fmt.Errorf("failed to connect to audio server after %d attempts", Config.MaxConnectionAttempts) } // Disconnect disconnects the client from the server @@ -492,6 +519,9 @@ func (c *UnifiedAudioClient) Disconnect() { c.running = false + // Stop health check monitoring + c.stopHealthCheckMonitoring() + if c.conn != nil { c.conn.Close() c.conn = nil @@ -511,7 +541,122 @@ func (c *UnifiedAudioClient) IsConnected() bool { func (c *UnifiedAudioClient) GetFrameStats() (total, dropped int64) { total = atomic.LoadInt64(&c.totalFrames) dropped = atomic.LoadInt64(&c.droppedFrames) - return total, dropped + return +} + +// startHealthCheck starts the connection health monitoring +func (c *UnifiedAudioClient) startHealthCheck() { + if c.healthCheckTicker != nil { + c.healthCheckTicker.Stop() + } + + c.healthCheckTicker = time.NewTicker(Config.HealthCheckInterval) + go func() { + for { + select { + case <-c.healthCheckTicker.C: + c.performHealthCheck() + case <-c.stopHealthCheck: + return + } + } + }() +} + +// stopHealthCheckMonitoring stops the health check monitoring +func (c *UnifiedAudioClient) stopHealthCheckMonitoring() { + if c.healthCheckTicker != nil { + c.healthCheckTicker.Stop() + c.healthCheckTicker = nil + } + select { + case c.stopHealthCheck <- struct{}{}: + default: + } +} + +// performHealthCheck checks the connection health and attempts reconnection if needed +func (c *UnifiedAudioClient) performHealthCheck() { + c.mtx.Lock() + defer c.mtx.Unlock() + + if !c.running || c.conn == nil { + return + } + + // Simple health check: try to get connection info + if tcpConn, ok := c.conn.(*net.UnixConn); ok { + if _, err := tcpConn.File(); err != nil { + // Connection is broken + atomic.AddInt64(&c.connectionErrors, 1) + c.logger.Warn().Err(err).Msg("Connection health check failed, attempting reconnection") + + // Close the broken connection + c.conn.Close() + c.conn = nil + c.running = false + + // Attempt reconnection + go func() { + time.Sleep(Config.ReconnectionInterval) + if err := c.Connect(); err != nil { + c.logger.Error().Err(err).Msg("Failed to reconnect during health check") + } + }() + } + } + + c.lastHealthCheck = time.Now() +} + +// SetAutoReconnect enables or disables automatic reconnection +func (c *UnifiedAudioClient) SetAutoReconnect(enabled bool) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.autoReconnect = enabled + if !enabled { + c.stopHealthCheckMonitoring() + } else if c.running { + c.startHealthCheck() + } +} + +// GetConnectionErrors returns the number of connection errors +func (c *UnifiedAudioClient) GetConnectionErrors() int64 { + return atomic.LoadInt64(&c.connectionErrors) +} + +// calculateAdaptiveDelay calculates retry delay based on system load and failure patterns +func (c *UnifiedAudioClient) calculateAdaptiveDelay(attempt int, initialDelay, maxDelay time.Duration, backoffFactor float64) time.Duration { + // Base exponential backoff + baseDelay := time.Duration(float64(initialDelay.Nanoseconds()) * math.Pow(backoffFactor, float64(attempt))) + + // Get connection error history for adaptive adjustment + errorCount := atomic.LoadInt64(&c.connectionErrors) + + // Adjust delay based on recent connection errors + // More errors = longer delays to avoid overwhelming the server + adaptiveFactor := 1.0 + if errorCount > 5 { + adaptiveFactor = 1.5 // 50% longer delays after many errors + } else if errorCount > 10 { + adaptiveFactor = 2.0 // Double delays after excessive errors + } + + // Apply adaptive factor + adaptiveDelay := time.Duration(float64(baseDelay.Nanoseconds()) * adaptiveFactor) + + // Ensure we don't exceed maximum delay + if adaptiveDelay > maxDelay { + adaptiveDelay = maxDelay + } + + // Add small random jitter to avoid thundering herd + jitter := time.Duration(float64(adaptiveDelay.Nanoseconds()) * 0.1 * (0.5 + float64(attempt%3)/6.0)) + adaptiveDelay += jitter + + return adaptiveDelay } // Helper functions for socket paths diff --git a/internal/audio/mgmt_input_ipc_manager.go b/internal/audio/mgmt_input_ipc_manager.go index 5fa84660..d59e6f6b 100644 --- a/internal/audio/mgmt_input_ipc_manager.go +++ b/internal/audio/mgmt_input_ipc_manager.go @@ -63,9 +63,9 @@ func (aim *AudioInputIPCManager) Start() error { } config := InputIPCConfig{ - SampleRate: GetConfig().InputIPCSampleRate, - Channels: GetConfig().InputIPCChannels, - FrameSize: GetConfig().InputIPCFrameSize, + SampleRate: Config.InputIPCSampleRate, + Channels: Config.InputIPCChannels, + FrameSize: Config.InputIPCFrameSize, } // Validate configuration before using it @@ -80,7 +80,7 @@ func (aim *AudioInputIPCManager) Start() error { } // Wait for subprocess readiness - time.Sleep(GetConfig().LongSleepDuration) + time.Sleep(Config.LongSleepDuration) err = aim.supervisor.SendConfig(config) if err != nil { diff --git a/internal/audio/mgmt_output_ipc_manager.go b/internal/audio/mgmt_output_ipc_manager.go index 381be1c9..145c1c20 100644 --- a/internal/audio/mgmt_output_ipc_manager.go +++ b/internal/audio/mgmt_output_ipc_manager.go @@ -57,9 +57,9 @@ func (aom *AudioOutputIPCManager) Start() error { // Send initial configuration config := OutputIPCConfig{ - SampleRate: GetConfig().SampleRate, - Channels: GetConfig().Channels, - FrameSize: int(GetConfig().AudioQualityMediumFrameSize.Milliseconds()), + SampleRate: Config.SampleRate, + Channels: Config.Channels, + FrameSize: int(Config.AudioQualityMediumFrameSize.Milliseconds()), } if err := aom.SendConfig(config); err != nil { diff --git a/internal/audio/mic_contention.go b/internal/audio/mic_contention.go index 373d656a..08d60d3c 100644 --- a/internal/audio/mic_contention.go +++ b/internal/audio/mic_contention.go @@ -105,7 +105,7 @@ func GetMicrophoneContentionManager() *MicrophoneContentionManager { } if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) { - manager := NewMicrophoneContentionManager(GetConfig().MicContentionTimeout) + manager := NewMicrophoneContentionManager(Config.MicContentionTimeout) atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager)) return manager } @@ -115,7 +115,7 @@ func GetMicrophoneContentionManager() *MicrophoneContentionManager { return (*MicrophoneContentionManager)(ptr) } - return NewMicrophoneContentionManager(GetConfig().MicContentionTimeout) + return NewMicrophoneContentionManager(Config.MicContentionTimeout) } func TryMicrophoneOperation() OperationResult { diff --git a/internal/audio/monitor_adaptive_optimizer.go b/internal/audio/monitor_adaptive_optimizer.go index 05c4ae5e..fe0b81f0 100644 --- a/internal/audio/monitor_adaptive_optimizer.go +++ b/internal/audio/monitor_adaptive_optimizer.go @@ -64,10 +64,10 @@ type OptimizerConfig struct { func DefaultOptimizerConfig() OptimizerConfig { return OptimizerConfig{ MaxOptimizationLevel: 8, - CooldownPeriod: GetConfig().CooldownPeriod, - Aggressiveness: GetConfig().OptimizerAggressiveness, - RollbackThreshold: GetConfig().RollbackThreshold, - StabilityPeriod: GetConfig().AdaptiveOptimizerStability, + CooldownPeriod: Config.CooldownPeriod, + Aggressiveness: Config.OptimizerAggressiveness, + RollbackThreshold: Config.RollbackThreshold, + StabilityPeriod: Config.AdaptiveOptimizerStability, // Adaptive interval defaults MinOptimizationInterval: 100 * time.Millisecond, // High stability: check every 100ms @@ -142,7 +142,7 @@ func (ao *AdaptiveOptimizer) handleLatencyOptimization(metrics LatencyMetrics) e // calculateTargetOptimizationLevel determines the appropriate optimization level func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMetrics) int64 { // Base calculation on current latency vs target - latencyRatio := float64(metrics.Current) / float64(GetConfig().AdaptiveOptimizerLatencyTarget) // 50ms target + latencyRatio := float64(metrics.Current) / float64(Config.AdaptiveOptimizerLatencyTarget) // 50ms target // Adjust based on trend switch metrics.Trend { @@ -158,7 +158,7 @@ func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMet latencyRatio *= ao.config.Aggressiveness // Convert to optimization level - targetLevel := int64(latencyRatio * GetConfig().LatencyScalingFactor) // Scale to 0-10 range + targetLevel := int64(latencyRatio * Config.LatencyScalingFactor) // Scale to 0-10 range if targetLevel > int64(ao.config.MaxOptimizationLevel) { targetLevel = int64(ao.config.MaxOptimizationLevel) } diff --git a/internal/audio/monitor_goroutine.go b/internal/audio/monitor_goroutine.go index 00dd3743..fa2c8d8d 100644 --- a/internal/audio/monitor_goroutine.go +++ b/internal/audio/monitor_goroutine.go @@ -126,7 +126,7 @@ func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} { // GetGoroutineMonitor returns the global goroutine monitor instance func GetGoroutineMonitor() *GoroutineMonitor { if globalGoroutineMonitor == nil { - globalGoroutineMonitor = NewGoroutineMonitor(GetConfig().GoroutineMonitorInterval) + globalGoroutineMonitor = NewGoroutineMonitor(Config.GoroutineMonitorInterval) } return globalGoroutineMonitor } diff --git a/internal/audio/monitor_latency.go b/internal/audio/monitor_latency.go index 40b2381d..e44c4c08 100644 --- a/internal/audio/monitor_latency.go +++ b/internal/audio/monitor_latency.go @@ -81,7 +81,7 @@ const ( // DefaultLatencyConfig returns a sensible default configuration func DefaultLatencyConfig() LatencyConfig { - config := GetConfig() + config := Config return LatencyConfig{ TargetLatency: config.LatencyMonitorTarget, MaxLatency: config.MaxLatencyThreshold, diff --git a/internal/audio/monitor_process.go b/internal/audio/monitor_process.go index 30bd0bb0..aa898347 100644 --- a/internal/audio/monitor_process.go +++ b/internal/audio/monitor_process.go @@ -16,26 +16,26 @@ import ( // Variables for process monitoring (using configuration) var ( // System constants - maxCPUPercent = GetConfig().MaxCPUPercent - minCPUPercent = GetConfig().MinCPUPercent - defaultClockTicks = GetConfig().DefaultClockTicks - defaultMemoryGB = GetConfig().DefaultMemoryGB + maxCPUPercent = Config.MaxCPUPercent + minCPUPercent = Config.MinCPUPercent + defaultClockTicks = Config.DefaultClockTicks + defaultMemoryGB = Config.DefaultMemoryGB // Monitoring thresholds - maxWarmupSamples = GetConfig().MaxWarmupSamples - warmupCPUSamples = GetConfig().WarmupCPUSamples + maxWarmupSamples = Config.MaxWarmupSamples + warmupCPUSamples = Config.WarmupCPUSamples // Channel buffer size - metricsChannelBuffer = GetConfig().MetricsChannelBuffer + metricsChannelBuffer = Config.MetricsChannelBuffer // Clock tick detection ranges - minValidClockTicks = float64(GetConfig().MinValidClockTicks) - maxValidClockTicks = float64(GetConfig().MaxValidClockTicks) + minValidClockTicks = float64(Config.MinValidClockTicks) + maxValidClockTicks = float64(Config.MaxValidClockTicks) ) // Variables for process monitoring var ( - pageSize = GetConfig().PageSize + pageSize = Config.PageSize ) // ProcessMetrics represents CPU and memory usage metrics for a process @@ -233,7 +233,7 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM // Calculate memory percentage (RSS / total system memory) if totalMem := pm.getTotalMemory(); totalMem > 0 { - metric.MemoryPercent = float64(metric.MemoryRSS) / float64(totalMem) * GetConfig().PercentageMultiplier + metric.MemoryPercent = float64(metric.MemoryRSS) / float64(totalMem) * Config.PercentageMultiplier } // Update state for next calculation @@ -283,7 +283,7 @@ func (pm *ProcessMonitor) calculateCPUPercent(totalCPUTime int64, state *process // Convert from clock ticks to seconds using actual system clock ticks clockTicks := pm.getClockTicks() cpuSeconds := cpuDelta / clockTicks - cpuPercent := (cpuSeconds / timeDelta) * GetConfig().PercentageMultiplier + cpuPercent := (cpuSeconds / timeDelta) * Config.PercentageMultiplier // Apply bounds if cpuPercent > maxCPUPercent { @@ -335,7 +335,7 @@ func (pm *ProcessMonitor) getClockTicks() float64 { if len(fields) >= 2 { if period, err := strconv.ParseInt(fields[1], 10, 64); err == nil && period > 0 { // Convert nanoseconds to Hz - hz := GetConfig().CGONanosecondsPerSecond / float64(period) + hz := Config.CGONanosecondsPerSecond / float64(period) if hz >= minValidClockTicks && hz <= maxValidClockTicks { pm.clockTicks = hz return @@ -363,7 +363,7 @@ func (pm *ProcessMonitor) getTotalMemory() int64 { pm.memoryOnce.Do(func() { file, err := os.Open("/proc/meminfo") if err != nil { - pm.totalMemory = int64(defaultMemoryGB) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) + pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) return } defer file.Close() @@ -375,14 +375,14 @@ func (pm *ProcessMonitor) getTotalMemory() int64 { fields := strings.Fields(line) if len(fields) >= 2 { if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil { - pm.totalMemory = kb * int64(GetConfig().ProcessMonitorKBToBytes) + pm.totalMemory = kb * int64(Config.ProcessMonitorKBToBytes) return } } break } } - pm.totalMemory = int64(defaultMemoryGB) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) // Fallback + pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) // Fallback }) return pm.totalMemory } diff --git a/internal/audio/output_server_main.go b/internal/audio/output_server_main.go index aa229e61..1b536e10 100644 --- a/internal/audio/output_server_main.go +++ b/internal/audio/output_server_main.go @@ -70,7 +70,7 @@ func RunAudioOutputServer() error { StopNonBlockingAudioStreaming() // Give some time for cleanup - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) return nil } diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 4d3b3b34..2560d4be 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -84,9 +84,9 @@ func StartAudioOutputStreaming(send func([]byte)) error { buffer := make([]byte, GetMaxAudioFrameSize()) consecutiveErrors := 0 - maxConsecutiveErrors := GetConfig().MaxConsecutiveErrors - errorBackoffDelay := GetConfig().RetryDelay - maxErrorBackoff := GetConfig().MaxRetryDelay + maxConsecutiveErrors := Config.MaxConsecutiveErrors + errorBackoffDelay := Config.RetryDelay + maxErrorBackoff := Config.MaxRetryDelay for { select { @@ -123,18 +123,18 @@ func StartAudioOutputStreaming(send func([]byte)) error { Err(initErr). Msg("Failed to reinitialize audio system") // Exponential backoff for reinitialization failures - errorBackoffDelay = time.Duration(float64(errorBackoffDelay) * GetConfig().BackoffMultiplier) + errorBackoffDelay = time.Duration(float64(errorBackoffDelay) * Config.BackoffMultiplier) if errorBackoffDelay > maxErrorBackoff { errorBackoffDelay = maxErrorBackoff } } else { getOutputStreamingLogger().Info().Msg("Audio system reinitialized successfully") consecutiveErrors = 0 - errorBackoffDelay = GetConfig().RetryDelay // Reset backoff + errorBackoffDelay = Config.RetryDelay // Reset backoff } } else { // Brief delay for transient errors - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) } continue } @@ -142,7 +142,7 @@ func StartAudioOutputStreaming(send func([]byte)) error { // Success - reset error counters if consecutiveErrors > 0 { consecutiveErrors = 0 - errorBackoffDelay = GetConfig().RetryDelay + errorBackoffDelay = Config.RetryDelay } if n > 0 { @@ -164,7 +164,7 @@ func StartAudioOutputStreaming(send func([]byte)) error { RecordFrameReceived(n) } // Small delay to prevent busy waiting - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) } } }() @@ -185,6 +185,6 @@ func StopAudioOutputStreaming() { // Wait for streaming to stop for atomic.LoadInt32(&outputStreamingRunning) == 1 { - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) } } diff --git a/internal/audio/output_supervisor.go b/internal/audio/output_supervisor.go index 1abbca66..e4888b01 100644 --- a/internal/audio/output_supervisor.go +++ b/internal/audio/output_supervisor.go @@ -19,19 +19,19 @@ const ( // Restart configuration is now retrieved from centralized config func getMaxRestartAttempts() int { - return GetConfig().MaxRestartAttempts + return Config.MaxRestartAttempts } func getRestartWindow() time.Duration { - return GetConfig().RestartWindow + return Config.RestartWindow } func getRestartDelay() time.Duration { - return GetConfig().RestartDelay + return Config.RestartDelay } func getMaxRestartDelay() time.Duration { - return GetConfig().MaxRestartDelay + return Config.MaxRestartDelay } // AudioOutputSupervisor manages the audio output server subprocess lifecycle @@ -145,7 +145,7 @@ func (s *AudioOutputSupervisor) Stop() { select { case <-s.processDone: s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped gracefully") - case <-time.After(GetConfig().OutputSupervisorTimeout): + case <-time.After(Config.OutputSupervisorTimeout): s.logger.Warn().Str("component", AudioOutputSupervisorComponent).Msg("component did not stop gracefully, forcing termination") s.forceKillProcess("audio output server") } @@ -158,7 +158,7 @@ func (s *AudioOutputSupervisor) supervisionLoop() { // Configure supervision parameters config := SupervisionConfig{ ProcessType: "audio output server", - Timeout: GetConfig().OutputSupervisorTimeout, + Timeout: Config.OutputSupervisorTimeout, EnableRestart: true, MaxRestartAttempts: getMaxRestartAttempts(), RestartWindow: getRestartWindow(), diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 53b58f24..8117aa1f 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -39,7 +39,7 @@ var ( // MaxAudioFrameSize is now retrieved from centralized config func GetMaxAudioFrameSize() int { - return GetConfig().MaxAudioFrameSize + return Config.MaxAudioFrameSize } // AudioQuality represents different audio quality presets @@ -74,17 +74,17 @@ type AudioMetrics struct { var ( currentConfig = AudioConfig{ Quality: AudioQualityMedium, - Bitrate: GetConfig().AudioQualityMediumOutputBitrate, - SampleRate: GetConfig().SampleRate, - Channels: GetConfig().Channels, - FrameSize: GetConfig().AudioQualityMediumFrameSize, + Bitrate: Config.AudioQualityMediumOutputBitrate, + SampleRate: Config.SampleRate, + Channels: Config.Channels, + FrameSize: Config.AudioQualityMediumFrameSize, } currentMicrophoneConfig = AudioConfig{ Quality: AudioQualityMedium, - Bitrate: GetConfig().AudioQualityMediumInputBitrate, - SampleRate: GetConfig().SampleRate, + Bitrate: Config.AudioQualityMediumInputBitrate, + SampleRate: Config.SampleRate, Channels: 1, - FrameSize: GetConfig().AudioQualityMediumFrameSize, + FrameSize: Config.AudioQualityMediumFrameSize, } metrics AudioMetrics ) @@ -96,24 +96,24 @@ var qualityPresets = map[AudioQuality]struct { frameSize time.Duration }{ AudioQualityLow: { - outputBitrate: GetConfig().AudioQualityLowOutputBitrate, inputBitrate: GetConfig().AudioQualityLowInputBitrate, - sampleRate: GetConfig().AudioQualityLowSampleRate, channels: GetConfig().AudioQualityLowChannels, - frameSize: GetConfig().AudioQualityLowFrameSize, + outputBitrate: Config.AudioQualityLowOutputBitrate, inputBitrate: Config.AudioQualityLowInputBitrate, + sampleRate: Config.AudioQualityLowSampleRate, channels: Config.AudioQualityLowChannels, + frameSize: Config.AudioQualityLowFrameSize, }, AudioQualityMedium: { - outputBitrate: GetConfig().AudioQualityMediumOutputBitrate, inputBitrate: GetConfig().AudioQualityMediumInputBitrate, - sampleRate: GetConfig().AudioQualityMediumSampleRate, channels: GetConfig().AudioQualityMediumChannels, - frameSize: GetConfig().AudioQualityMediumFrameSize, + outputBitrate: Config.AudioQualityMediumOutputBitrate, inputBitrate: Config.AudioQualityMediumInputBitrate, + sampleRate: Config.AudioQualityMediumSampleRate, channels: Config.AudioQualityMediumChannels, + frameSize: Config.AudioQualityMediumFrameSize, }, AudioQualityHigh: { - outputBitrate: GetConfig().AudioQualityHighOutputBitrate, inputBitrate: GetConfig().AudioQualityHighInputBitrate, - sampleRate: GetConfig().SampleRate, channels: GetConfig().AudioQualityHighChannels, - frameSize: GetConfig().AudioQualityHighFrameSize, + outputBitrate: Config.AudioQualityHighOutputBitrate, inputBitrate: Config.AudioQualityHighInputBitrate, + sampleRate: Config.SampleRate, channels: Config.AudioQualityHighChannels, + frameSize: Config.AudioQualityHighFrameSize, }, AudioQualityUltra: { - outputBitrate: GetConfig().AudioQualityUltraOutputBitrate, inputBitrate: GetConfig().AudioQualityUltraInputBitrate, - sampleRate: GetConfig().SampleRate, channels: GetConfig().AudioQualityUltraChannels, - frameSize: GetConfig().AudioQualityUltraFrameSize, + outputBitrate: Config.AudioQualityUltraOutputBitrate, inputBitrate: Config.AudioQualityUltraInputBitrate, + sampleRate: Config.SampleRate, channels: Config.AudioQualityUltraChannels, + frameSize: Config.AudioQualityUltraFrameSize, }, } @@ -142,7 +142,7 @@ func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig { Bitrate: preset.inputBitrate, SampleRate: func() int { if quality == AudioQualityLow { - return GetConfig().AudioQualityMicLowSampleRate + return Config.AudioQualityMicLowSampleRate } return preset.sampleRate }(), @@ -172,36 +172,36 @@ func SetAudioQuality(quality AudioQuality) { var complexity, vbr, signalType, bandwidth, dtx int switch quality { case AudioQualityLow: - complexity = GetConfig().AudioQualityLowOpusComplexity - vbr = GetConfig().AudioQualityLowOpusVBR - signalType = GetConfig().AudioQualityLowOpusSignalType - bandwidth = GetConfig().AudioQualityLowOpusBandwidth - dtx = GetConfig().AudioQualityLowOpusDTX + complexity = Config.AudioQualityLowOpusComplexity + vbr = Config.AudioQualityLowOpusVBR + signalType = Config.AudioQualityLowOpusSignalType + bandwidth = Config.AudioQualityLowOpusBandwidth + dtx = Config.AudioQualityLowOpusDTX case AudioQualityMedium: - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX case AudioQualityHigh: - complexity = GetConfig().AudioQualityHighOpusComplexity - vbr = GetConfig().AudioQualityHighOpusVBR - signalType = GetConfig().AudioQualityHighOpusSignalType - bandwidth = GetConfig().AudioQualityHighOpusBandwidth - dtx = GetConfig().AudioQualityHighOpusDTX + complexity = Config.AudioQualityHighOpusComplexity + vbr = Config.AudioQualityHighOpusVBR + signalType = Config.AudioQualityHighOpusSignalType + bandwidth = Config.AudioQualityHighOpusBandwidth + dtx = Config.AudioQualityHighOpusDTX case AudioQualityUltra: - complexity = GetConfig().AudioQualityUltraOpusComplexity - vbr = GetConfig().AudioQualityUltraOpusVBR - signalType = GetConfig().AudioQualityUltraOpusSignalType - bandwidth = GetConfig().AudioQualityUltraOpusBandwidth - dtx = GetConfig().AudioQualityUltraOpusDTX + complexity = Config.AudioQualityUltraOpusComplexity + vbr = Config.AudioQualityUltraOpusVBR + signalType = Config.AudioQualityUltraOpusSignalType + bandwidth = Config.AudioQualityUltraOpusBandwidth + dtx = Config.AudioQualityUltraOpusDTX default: // Use medium quality as fallback - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX } // Restart audio output subprocess with new OPUS configuration @@ -256,7 +256,7 @@ func SetAudioQuality(quality AudioQuality) { } } else { // Fallback to dynamic update if supervisor is not available - vbrConstraint := GetConfig().CGOOpusVBRConstraint + vbrConstraint := Config.CGOOpusVBRConstraint if err := updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx); err != nil { logging.GetDefaultLogger().Error().Err(err).Msg("Failed to update OPUS encoder parameters") } @@ -287,36 +287,36 @@ func SetMicrophoneQuality(quality AudioQuality) { var complexity, vbr, signalType, bandwidth, dtx int switch quality { case AudioQualityLow: - complexity = GetConfig().AudioQualityLowOpusComplexity - vbr = GetConfig().AudioQualityLowOpusVBR - signalType = GetConfig().AudioQualityLowOpusSignalType - bandwidth = GetConfig().AudioQualityLowOpusBandwidth - dtx = GetConfig().AudioQualityLowOpusDTX + complexity = Config.AudioQualityLowOpusComplexity + vbr = Config.AudioQualityLowOpusVBR + signalType = Config.AudioQualityLowOpusSignalType + bandwidth = Config.AudioQualityLowOpusBandwidth + dtx = Config.AudioQualityLowOpusDTX case AudioQualityMedium: - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX case AudioQualityHigh: - complexity = GetConfig().AudioQualityHighOpusComplexity - vbr = GetConfig().AudioQualityHighOpusVBR - signalType = GetConfig().AudioQualityHighOpusSignalType - bandwidth = GetConfig().AudioQualityHighOpusBandwidth - dtx = GetConfig().AudioQualityHighOpusDTX + complexity = Config.AudioQualityHighOpusComplexity + vbr = Config.AudioQualityHighOpusVBR + signalType = Config.AudioQualityHighOpusSignalType + bandwidth = Config.AudioQualityHighOpusBandwidth + dtx = Config.AudioQualityHighOpusDTX case AudioQualityUltra: - complexity = GetConfig().AudioQualityUltraOpusComplexity - vbr = GetConfig().AudioQualityUltraOpusVBR - signalType = GetConfig().AudioQualityUltraOpusSignalType - bandwidth = GetConfig().AudioQualityUltraOpusBandwidth - dtx = GetConfig().AudioQualityUltraOpusDTX + complexity = Config.AudioQualityUltraOpusComplexity + vbr = Config.AudioQualityUltraOpusVBR + signalType = Config.AudioQualityUltraOpusSignalType + bandwidth = Config.AudioQualityUltraOpusBandwidth + dtx = Config.AudioQualityUltraOpusDTX default: // Use medium quality as fallback - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX } // Update audio input subprocess configuration dynamically without restart diff --git a/internal/audio/socket_buffer.go b/internal/audio/socket_buffer.go index b92dff90..a6f7e48d 100644 --- a/internal/audio/socket_buffer.go +++ b/internal/audio/socket_buffer.go @@ -18,8 +18,8 @@ type SocketBufferConfig struct { // DefaultSocketBufferConfig returns the default socket buffer configuration func DefaultSocketBufferConfig() SocketBufferConfig { return SocketBufferConfig{ - SendBufferSize: GetConfig().SocketOptimalBuffer, - RecvBufferSize: GetConfig().SocketOptimalBuffer, + SendBufferSize: Config.SocketOptimalBuffer, + RecvBufferSize: Config.SocketOptimalBuffer, Enabled: true, } } @@ -27,8 +27,8 @@ func DefaultSocketBufferConfig() SocketBufferConfig { // HighLoadSocketBufferConfig returns configuration for high-load scenarios func HighLoadSocketBufferConfig() SocketBufferConfig { return SocketBufferConfig{ - SendBufferSize: GetConfig().SocketMaxBuffer, - RecvBufferSize: GetConfig().SocketMaxBuffer, + SendBufferSize: Config.SocketMaxBuffer, + RecvBufferSize: Config.SocketMaxBuffer, Enabled: true, } } @@ -123,8 +123,8 @@ func ValidateSocketBufferConfig(config SocketBufferConfig) error { return nil } - minBuffer := GetConfig().SocketMinBuffer - maxBuffer := GetConfig().SocketMaxBuffer + minBuffer := Config.SocketMinBuffer + maxBuffer := Config.SocketMaxBuffer if config.SendBufferSize < minBuffer { return fmt.Errorf("send buffer size validation failed: got %d bytes, minimum required %d bytes (configured range: %d-%d)", diff --git a/internal/audio/util_buffer_pool.go b/internal/audio/util_buffer_pool.go index b9232bbb..86d9d40b 100644 --- a/internal/audio/util_buffer_pool.go +++ b/internal/audio/util_buffer_pool.go @@ -366,17 +366,17 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { // Validate buffer size parameter if err := ValidateBufferSize(bufferSize); err != nil { // Use default value on validation error - bufferSize = GetConfig().AudioFramePoolSize + bufferSize = Config.AudioFramePoolSize } // Enhanced preallocation strategy based on buffer size and system capacity var preallocSize int - if bufferSize <= GetConfig().AudioFramePoolSize { + if bufferSize <= Config.AudioFramePoolSize { // For smaller pools, use enhanced preallocation (40% instead of 20%) - preallocSize = GetConfig().PreallocPercentage * 2 + preallocSize = Config.PreallocPercentage * 2 } else { // For larger pools, use standard enhanced preallocation (30% instead of 10%) - preallocSize = (GetConfig().PreallocPercentage * 3) / 2 + preallocSize = (Config.PreallocPercentage * 3) / 2 } // Ensure minimum preallocation for better performance @@ -594,9 +594,9 @@ func (p *AudioBufferPool) Put(buf []byte) { // Enhanced global buffer pools for different audio frame types with improved sizing var ( // Main audio frame pool with enhanced capacity - audioFramePool = NewAudioBufferPool(GetConfig().AudioFramePoolSize) + audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize) // Control message pool with enhanced capacity for better throughput - audioControlPool = NewAudioBufferPool(512) // Increased from GetConfig().OutputHeaderSize to 512 for better control message handling + audioControlPool = NewAudioBufferPool(512) // Increased from Config.OutputHeaderSize to 512 for better control message handling ) func GetAudioFrameBuffer() []byte { @@ -628,7 +628,7 @@ func (p *AudioBufferPool) GetPoolStats() AudioBufferPoolDetailedStats { var hitRate float64 if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * GetConfig().PercentageMultiplier + hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier } return AudioBufferPoolDetailedStats{ diff --git a/internal/audio/util_env.go b/internal/audio/util_env.go index 8c01d4f1..70b9c12c 100644 --- a/internal/audio/util_env.go +++ b/internal/audio/util_env.go @@ -21,12 +21,12 @@ func getEnvInt(key string, defaultValue int) int { // with fallback to default config values func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) { // Read configuration from environment variables with config defaults - bitrate = getEnvInt("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate) - complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity) - vbr = getEnvInt("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR) - signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType) - bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth) - dtx = getEnvInt("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX) + bitrate = getEnvInt("JETKVM_OPUS_BITRATE", Config.CGOOpusBitrate) + complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", Config.CGOOpusComplexity) + vbr = getEnvInt("JETKVM_OPUS_VBR", Config.CGOOpusVBR) + signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", Config.CGOOpusSignalType) + bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", Config.CGOOpusBandwidth) + dtx = getEnvInt("JETKVM_OPUS_DTX", Config.CGOOpusDTX) return bitrate, complexity, vbr, signalType, bandwidth, dtx } @@ -34,7 +34,7 @@ func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int // applyOpusConfig applies OPUS configuration to the global config // with optional logging for the specified component func applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx int, component string, enableLogging bool) { - config := GetConfig() + config := Config config.CGOOpusBitrate = bitrate config.CGOOpusComplexity = complexity config.CGOOpusVBR = vbr diff --git a/internal/audio/webrtc_relay.go b/internal/audio/webrtc_relay.go index 43ccbfec..6a338564 100644 --- a/internal/audio/webrtc_relay.go +++ b/internal/audio/webrtc_relay.go @@ -134,7 +134,7 @@ func (r *AudioRelay) relayLoop() { defer r.wg.Done() r.logger.Debug().Msg("Audio relay loop started") - var maxConsecutiveErrors = GetConfig().MaxConsecutiveErrors + var maxConsecutiveErrors = Config.MaxConsecutiveErrors consecutiveErrors := 0 for { @@ -153,7 +153,7 @@ func (r *AudioRelay) relayLoop() { r.logger.Error().Int("consecutive_errors", consecutiveErrors).Int("max_errors", maxConsecutiveErrors).Msg("too many consecutive read errors, stopping audio relay") return } - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) continue } diff --git a/internal/audio/websocket_events.go b/internal/audio/websocket_events.go index 6edf24f6..d2e2146c 100644 --- a/internal/audio/websocket_events.go +++ b/internal/audio/websocket_events.go @@ -224,7 +224,7 @@ func (aeb *AudioEventBroadcaster) sendToSubscriber(subscriber *AudioEventSubscri return false } - ctx, cancel := context.WithTimeout(subscriber.ctx, time.Duration(GetConfig().EventTimeoutSeconds)*time.Second) + ctx, cancel := context.WithTimeout(subscriber.ctx, time.Duration(Config.EventTimeoutSeconds)*time.Second) defer cancel() err := wsjson.Write(ctx, subscriber.conn, event) diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index e74122cb..0c7edda2 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -98,8 +98,8 @@ type ZeroCopyFramePool struct { // NewZeroCopyFramePool creates a new zero-copy frame pool func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { // Pre-allocate frames for immediate availability - preallocSizeBytes := GetConfig().PreallocSize - maxPoolSize := GetConfig().MaxPoolSize // Limit total pool size + preallocSizeBytes := Config.PreallocSize + maxPoolSize := Config.MaxPoolSize // Limit total pool size // Calculate number of frames based on memory budget, not frame count preallocFrameCount := preallocSizeBytes / maxFrameSize @@ -327,7 +327,7 @@ func (p *ZeroCopyFramePool) GetZeroCopyPoolStats() ZeroCopyFramePoolStats { var hitRate float64 if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * GetConfig().PercentageMultiplier + hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier } return ZeroCopyFramePoolStats{ diff --git a/main.go b/main.go index 9d62db04..1de6ac4c 100644 --- a/main.go +++ b/main.go @@ -58,7 +58,7 @@ func startAudioSubprocess() error { audio.SetAudioInputSupervisor(audioInputSupervisor) // Set default OPUS configuration for audio input supervisor (low quality for single-core RV1106) - config := audio.GetConfig() + config := audio.Config audioInputSupervisor.SetOpusConfig( config.AudioQualityLowInputBitrate*1000, // Convert kbps to bps config.AudioQualityLowOpusComplexity, @@ -81,7 +81,8 @@ func startAudioSubprocess() error { // This prevents "no client connected" errors during quality changes go func() { // Give the audio output server time to initialize and start listening - time.Sleep(500 * time.Millisecond) + // Increased delay to reduce frame drops during connection establishment + time.Sleep(1 * time.Second) // Start audio relay system for main process // If there's an active WebRTC session, use its audio track