From 1d1658db15d168940cc219805bfbb2a7d6d709e1 Mon Sep 17 00:00:00 2001 From: Alex P Date: Mon, 8 Sep 2025 17:30:49 +0000 Subject: [PATCH] refactor(audio): replace GetConfig() calls with direct Config access This change replaces all instances of GetConfig() function calls with direct access to the Config variable throughout the audio package. The modification improves performance by eliminating function call overhead and simplifies the codebase by removing unnecessary indirection. The commit also includes minor optimizations in validation logic and connection handling, while maintaining all existing functionality. Error handling remains robust with appropriate fallbacks when config values are not available. Additional improvements include: - Enhanced connection health monitoring in UnifiedAudioClient - Optimized validation functions using cached config values - Reduced memory allocations in hot paths - Improved error recovery during quality changes --- internal/audio/adaptive_buffer.go | 28 +-- internal/audio/batch_audio.go | 31 ++- internal/audio/batch_zero_copy.go | 2 +- internal/audio/core_config.go | 4 +- internal/audio/core_config_constants.go | 124 +++++++----- internal/audio/core_validation.go | 102 +++++----- internal/audio/goroutine_pool.go | 4 +- internal/audio/input_microphone_manager.go | 4 +- internal/audio/input_server_main.go | 2 +- internal/audio/input_supervisor.go | 6 +- internal/audio/ipc_common.go | 8 +- internal/audio/ipc_input.go | 36 ++-- internal/audio/ipc_output.go | 4 +- internal/audio/ipc_unified.go | 193 ++++++++++++++++--- internal/audio/mgmt_input_ipc_manager.go | 8 +- internal/audio/mgmt_output_ipc_manager.go | 6 +- internal/audio/mic_contention.go | 4 +- internal/audio/monitor_adaptive_optimizer.go | 12 +- internal/audio/monitor_goroutine.go | 2 +- internal/audio/monitor_latency.go | 2 +- internal/audio/monitor_process.go | 32 +-- internal/audio/output_server_main.go | 2 +- internal/audio/output_streaming.go | 18 +- internal/audio/output_supervisor.go | 12 +- internal/audio/quality_presets.go | 144 +++++++------- internal/audio/socket_buffer.go | 12 +- internal/audio/util_buffer_pool.go | 14 +- internal/audio/util_env.go | 14 +- internal/audio/webrtc_relay.go | 4 +- internal/audio/websocket_events.go | 2 +- internal/audio/zero_copy.go | 6 +- main.go | 5 +- 32 files changed, 501 insertions(+), 346 deletions(-) diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index 899a4ce3..dc9f6f6a 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -57,25 +57,25 @@ type AdaptiveBufferConfig struct { func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig { return AdaptiveBufferConfig{ // Conservative buffer sizes for 256MB RAM constraint - MinBufferSize: GetConfig().AdaptiveMinBufferSize, - MaxBufferSize: GetConfig().AdaptiveMaxBufferSize, - DefaultBufferSize: GetConfig().AdaptiveDefaultBufferSize, + MinBufferSize: Config.AdaptiveMinBufferSize, + MaxBufferSize: Config.AdaptiveMaxBufferSize, + DefaultBufferSize: Config.AdaptiveDefaultBufferSize, // CPU thresholds optimized for single-core ARM Cortex A7 under load - LowCPUThreshold: GetConfig().LowCPUThreshold * 100, // Below 20% CPU - HighCPUThreshold: GetConfig().HighCPUThreshold * 100, // Above 60% CPU (lowered to be more responsive) + LowCPUThreshold: Config.LowCPUThreshold * 100, // Below 20% CPU + HighCPUThreshold: Config.HighCPUThreshold * 100, // Above 60% CPU (lowered to be more responsive) // Memory thresholds for 256MB total RAM - LowMemoryThreshold: GetConfig().LowMemoryThreshold * 100, // Below 35% memory usage - HighMemoryThreshold: GetConfig().HighMemoryThreshold * 100, // Above 75% memory usage (lowered for earlier response) + LowMemoryThreshold: Config.LowMemoryThreshold * 100, // Below 35% memory usage + HighMemoryThreshold: Config.HighMemoryThreshold * 100, // Above 75% memory usage (lowered for earlier response) // Latency targets - TargetLatency: GetConfig().AdaptiveBufferTargetLatency, // Target 20ms latency - MaxLatency: GetConfig().LatencyMonitorTarget, // Max acceptable latency + TargetLatency: Config.AdaptiveBufferTargetLatency, // Target 20ms latency + MaxLatency: Config.LatencyMonitorTarget, // Max acceptable latency // Adaptation settings - AdaptationInterval: GetConfig().BufferUpdateInterval, // Check every 500ms - SmoothingFactor: GetConfig().SmoothingFactor, // Moderate responsiveness + AdaptationInterval: Config.BufferUpdateInterval, // Check every 500ms + SmoothingFactor: Config.SmoothingFactor, // Moderate responsiveness } } @@ -273,7 +273,7 @@ func (abm *AdaptiveBufferManager) adaptBufferSizes() { latencyFactor := abm.calculateLatencyFactor(currentLatency) // Combine factors with weights (CPU has highest priority for KVM coexistence) - combinedFactor := GetConfig().CPUMemoryWeight*cpuFactor + GetConfig().MemoryWeight*memoryFactor + GetConfig().LatencyWeight*latencyFactor + combinedFactor := Config.CPUMemoryWeight*cpuFactor + Config.MemoryWeight*memoryFactor + Config.LatencyWeight*latencyFactor // Apply adaptation with smoothing currentInput := float64(atomic.LoadInt64(&abm.currentInputBufferSize)) @@ -437,8 +437,8 @@ func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} { "input_buffer_size": abm.GetInputBufferSize(), "output_buffer_size": abm.GetOutputBufferSize(), "average_latency_ms": float64(atomic.LoadInt64(&abm.averageLatency)) / 1e6, - "system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / GetConfig().PercentageMultiplier, - "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / GetConfig().PercentageMultiplier, + "system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / Config.PercentageMultiplier, + "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier, "adaptation_count": atomic.LoadInt64(&abm.adaptationCount), "last_adaptation": lastAdaptation, } diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index f82a1fa2..f2417608 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -83,8 +83,7 @@ type batchWriteResult struct { // NewBatchAudioProcessor creates a new batch audio processor func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { // Get cached config to avoid GetConfig() calls - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate input parameters with minimal overhead if batchSize <= 0 || batchSize > 1000 { @@ -105,7 +104,7 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() // Pre-calculate frame size to avoid repeated GetConfig() calls - frameSize := cache.GetMinReadEncodeBuffer() + frameSize := cache.MinReadEncodeBuffer if frameSize == 0 { frameSize = 1500 // Safe fallback } @@ -166,7 +165,7 @@ func (bap *BatchAudioProcessor) Stop() { bap.cancel() // Wait for processing to complete - time.Sleep(bap.batchDuration + GetConfig().BatchProcessingDelay) + time.Sleep(bap.batchDuration + Config.BatchProcessingDelay) bap.logger.Info().Msg("batch audio processor stopped") } @@ -174,8 +173,7 @@ func (bap *BatchAudioProcessor) Stop() { // BatchReadEncode performs batched audio read and encode operations func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate buffer before processing if err := ValidateBufferSize(len(buffer)); err != nil { @@ -221,7 +219,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { select { case result := <-resultChan: return result.length, result.err - case <-time.After(cache.BatchProcessingTimeout): + case <-time.After(cache.BatchProcessorTimeout): // Timeout, fallback to single operation // Use sampling to reduce atomic operations overhead if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { @@ -236,8 +234,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { // This is the legacy version that uses a single buffer func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate buffer before processing if err := ValidateBufferSize(len(buffer)); err != nil { @@ -283,7 +280,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { select { case result := <-resultChan: return result.length, result.err - case <-time.After(cache.BatchProcessingTimeout): + case <-time.After(cache.BatchProcessorTimeout): // Use sampling to reduce atomic operations overhead if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { atomic.AddInt64(&bap.stats.SingleWrites, 10) @@ -296,8 +293,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() - cache.Update() + cache := Config // Validate buffers before processing if len(opusData) == 0 { @@ -339,7 +335,7 @@ func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcm select { case result := <-resultChan: return result.length, result.err - case <-time.After(cache.BatchProcessingTimeout): + case <-time.After(cache.BatchProcessorTimeout): atomic.AddInt64(&bap.stats.SingleWrites, 1) atomic.AddInt64(&bap.stats.WriteFrames, 1) // Use the optimized function with separate buffers @@ -427,7 +423,7 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { } // Get cached config once - avoid repeated calls - cache := GetCachedConfig() + cache := Config threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold if threadPinningThreshold == 0 { threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback @@ -480,7 +476,7 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { } // Get cached config to avoid GetConfig() calls in hot path - cache := GetCachedConfig() + cache := Config threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold if threadPinningThreshold == 0 { threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback @@ -586,8 +582,7 @@ func GetBatchAudioProcessor() *BatchAudioProcessor { // Initialize on first use if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) { // Get cached config to avoid GetConfig() calls - cache := GetCachedConfig() - cache.Update() + cache := Config processor := NewBatchAudioProcessor(cache.BatchProcessorFramesPerBatch, cache.BatchProcessorTimeout) atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor)) @@ -601,7 +596,7 @@ func GetBatchAudioProcessor() *BatchAudioProcessor { } // Fallback: create a new processor (should rarely happen) - config := GetConfig() + config := Config return NewBatchAudioProcessor(config.BatchProcessorFramesPerBatch, config.BatchProcessorTimeout) } diff --git a/internal/audio/batch_zero_copy.go b/internal/audio/batch_zero_copy.go index 4ba9959a..8d066521 100644 --- a/internal/audio/batch_zero_copy.go +++ b/internal/audio/batch_zero_copy.go @@ -73,7 +73,7 @@ func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor { // NewBatchZeroCopyProcessor creates a new batch zero-copy processor func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor { - cache := GetCachedConfig() + cache := Config return &BatchZeroCopyProcessor{ maxBatchSize: cache.BatchProcessorFramesPerBatch, batchTimeout: cache.BatchProcessorTimeout, diff --git a/internal/audio/core_config.go b/internal/audio/core_config.go index f5bb7398..6f3b44d9 100644 --- a/internal/audio/core_config.go +++ b/internal/audio/core_config.go @@ -4,12 +4,12 @@ import "time" // GetMetricsUpdateInterval returns the current metrics update interval from centralized config func GetMetricsUpdateInterval() time.Duration { - return GetConfig().MetricsUpdateInterval + return Config.MetricsUpdateInterval } // SetMetricsUpdateInterval sets the metrics update interval in centralized config func SetMetricsUpdateInterval(interval time.Duration) { - config := GetConfig() + config := Config config.MetricsUpdateInterval = interval UpdateConfig(config) } diff --git a/internal/audio/core_config_constants.go b/internal/audio/core_config_constants.go index 0fff7ed5..6af91d14 100644 --- a/internal/audio/core_config_constants.go +++ b/internal/audio/core_config_constants.go @@ -313,6 +313,15 @@ type AudioConfigConstants struct { AudioProcessorQueueSize int AudioReaderQueueSize int WorkerMaxIdleTime time.Duration + + // Connection Retry Configuration + MaxConnectionAttempts int // Maximum connection retry attempts + ConnectionRetryDelay time.Duration // Initial connection retry delay + MaxConnectionRetryDelay time.Duration // Maximum connection retry delay + ConnectionBackoffFactor float64 // Connection retry backoff factor + ConnectionTimeoutDelay time.Duration // Connection timeout for each attempt + ReconnectionInterval time.Duration // Interval for automatic reconnection attempts + HealthCheckInterval time.Duration // Health check interval for connections } // DefaultAudioConfig returns the default configuration constants @@ -424,11 +433,11 @@ func DefaultAudioConfig() *AudioConfigConstants { // Buffer Management PreallocSize: 1024 * 1024, // 1MB buffer preallocation MaxPoolSize: 100, // Maximum object pool size - MessagePoolSize: 256, // Message pool size for IPC + MessagePoolSize: 512, // Increased message pool for quality change bursts OptimalSocketBuffer: 262144, // 256KB optimal socket buffer MaxSocketBuffer: 1048576, // 1MB maximum socket buffer MinSocketBuffer: 8192, // 8KB minimum socket buffer - ChannelBufferSize: 500, // Inter-goroutine channel buffer size + ChannelBufferSize: 1000, // Increased channel buffer for quality change bursts AudioFramePoolSize: 1500, // Audio frame object pool size PageSize: 4096, // Memory page size for alignment InitialBufferFrames: 500, // Initial buffer size during startup @@ -436,17 +445,17 @@ func DefaultAudioConfig() *AudioConfigConstants { MinReadEncodeBuffer: 1276, // Minimum CGO read/encode buffer MaxDecodeWriteBuffer: 4096, // Maximum CGO decode/write buffer - // IPC Configuration + // IPC Configuration - Balanced for stability MagicNumber: 0xDEADBEEF, // IPC message validation header MaxFrameSize: 4096, // Maximum audio frame size (4KB) - WriteTimeout: 100 * time.Millisecond, // IPC write operation timeout + WriteTimeout: 500 * time.Millisecond, // Increased timeout to handle quality change bursts HeaderSize: 8, // IPC message header size - // Monitoring and Metrics - MetricsUpdateInterval: 1000 * time.Millisecond, // Metrics collection frequency - WarmupSamples: 10, // Warmup samples for metrics accuracy - MetricsChannelBuffer: 100, // Metrics data channel buffer size - LatencyHistorySize: 100, // Number of latency measurements to keep + // Monitoring and Metrics - Balanced for stability + MetricsUpdateInterval: 1000 * time.Millisecond, // Stable metrics collection frequency + WarmupSamples: 10, // Adequate warmup samples for accuracy + MetricsChannelBuffer: 100, // Adequate metrics data channel buffer + LatencyHistorySize: 100, // Adequate latency measurements to keep // Process Monitoring Constants MaxCPUPercent: 100.0, // Maximum CPU percentage @@ -470,41 +479,50 @@ func DefaultAudioConfig() *AudioConfigConstants { BackoffMultiplier: 2.0, // Exponential backoff multiplier MaxConsecutiveErrors: 5, // Consecutive error threshold - // Timing Constants - DefaultSleepDuration: 100 * time.Millisecond, // Standard polling interval - ShortSleepDuration: 10 * time.Millisecond, // High-frequency polling - LongSleepDuration: 200 * time.Millisecond, // Background tasks - DefaultTickerInterval: 100 * time.Millisecond, // Periodic task interval - BufferUpdateInterval: 500 * time.Millisecond, // Buffer status updates + // Connection Retry Configuration + MaxConnectionAttempts: 15, // Maximum connection retry attempts + ConnectionRetryDelay: 50 * time.Millisecond, // Initial connection retry delay + MaxConnectionRetryDelay: 2 * time.Second, // Maximum connection retry delay + ConnectionBackoffFactor: 1.5, // Connection retry backoff factor + ConnectionTimeoutDelay: 5 * time.Second, // Connection timeout for each attempt + ReconnectionInterval: 30 * time.Second, // Interval for automatic reconnection attempts + HealthCheckInterval: 10 * time.Second, // Health check interval for connections + + // Timing Constants - Optimized for quality change stability + DefaultSleepDuration: 100 * time.Millisecond, // Balanced polling interval + ShortSleepDuration: 10 * time.Millisecond, // Balanced high-frequency polling + LongSleepDuration: 200 * time.Millisecond, // Balanced background task delay + DefaultTickerInterval: 100 * time.Millisecond, // Balanced periodic task interval + BufferUpdateInterval: 300 * time.Millisecond, // Faster buffer updates for quality changes InputSupervisorTimeout: 5 * time.Second, // Input monitoring timeout OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout - BatchProcessingDelay: 10 * time.Millisecond, // Batch processing delay - AdaptiveOptimizerStability: 10 * time.Second, // Adaptive stability period + BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay + AdaptiveOptimizerStability: 5 * time.Second, // Faster adaptive stability period - LatencyMonitorTarget: 50 * time.Millisecond, // Target latency for monitoring + LatencyMonitorTarget: 50 * time.Millisecond, // Balanced target latency for monitoring - // Adaptive Buffer Configuration - LowCPUThreshold: 0.20, - HighCPUThreshold: 0.60, - LowMemoryThreshold: 0.50, - HighMemoryThreshold: 0.75, - AdaptiveBufferTargetLatency: 20 * time.Millisecond, + // Adaptive Buffer Configuration - Optimized for low latency + LowCPUThreshold: 0.30, + HighCPUThreshold: 0.70, + LowMemoryThreshold: 0.60, + HighMemoryThreshold: 0.80, + AdaptiveBufferTargetLatency: 15 * time.Millisecond, // Reduced target latency - // Adaptive Buffer Size Configuration - AdaptiveMinBufferSize: 3, // Minimum 3 frames for stability - AdaptiveMaxBufferSize: 20, // Maximum 20 frames for high load - AdaptiveDefaultBufferSize: 6, // Balanced buffer size (6 frames) + // Adaptive Buffer Size Configuration - Optimized for quality change bursts + AdaptiveMinBufferSize: 16, // Higher minimum to handle bursts + AdaptiveMaxBufferSize: 64, // Higher maximum for quality changes + AdaptiveDefaultBufferSize: 32, // Higher default for stability - // Adaptive Optimizer Configuration - CooldownPeriod: 30 * time.Second, - RollbackThreshold: 300 * time.Millisecond, - AdaptiveOptimizerLatencyTarget: 50 * time.Millisecond, + // Adaptive Optimizer Configuration - Faster response + CooldownPeriod: 15 * time.Second, // Reduced cooldown period + RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold + AdaptiveOptimizerLatencyTarget: 30 * time.Millisecond, // Reduced latency target - // Latency Monitor Configuration - MaxLatencyThreshold: 200 * time.Millisecond, - JitterThreshold: 20 * time.Millisecond, - LatencyOptimizationInterval: 5 * time.Second, - LatencyAdaptiveThreshold: 0.8, + // Latency Monitor Configuration - More aggressive monitoring + MaxLatencyThreshold: 150 * time.Millisecond, // Lower max latency threshold + JitterThreshold: 15 * time.Millisecond, // Reduced jitter threshold + LatencyOptimizationInterval: 3 * time.Second, // More frequent optimization + LatencyAdaptiveThreshold: 0.7, // More aggressive adaptive threshold // Microphone Contention Configuration MicContentionTimeout: 200 * time.Millisecond, @@ -532,12 +550,12 @@ func DefaultAudioConfig() *AudioConfigConstants { LatencyScalingFactor: 2.0, // Latency ratio scaling factor OptimizerAggressiveness: 0.7, // Optimizer aggressiveness factor - // CGO Audio Processing Constants - CGOUsleepMicroseconds: 1000, // 1000 microseconds (1ms) for CGO usleep calls + // CGO Audio Processing Constants - Balanced for stability + CGOUsleepMicroseconds: 1000, // 1000 microseconds (1ms) for stable CGO usleep calls CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960) CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions - // Frontend Constants + // Frontend Constants - Balanced for stability FrontendOperationDebounceMS: 1000, // 1000ms debounce for frontend operations FrontendSyncDebounceMS: 1000, // 1000ms debounce for sync operations FrontendSampleRate: 48000, // 48000Hz sample rate for frontend audio @@ -560,20 +578,20 @@ func DefaultAudioConfig() *AudioConfigConstants { ProcessMonitorFallbackClockHz: 1000.0, // 1000.0 Hz fallback clock ProcessMonitorTraditionalHz: 100.0, // 100.0 Hz traditional clock - // Batch Processing Constants - BatchProcessorFramesPerBatch: 4, // 4 frames per batch - BatchProcessorTimeout: 5 * time.Millisecond, // 5ms timeout - BatchProcessorMaxQueueSize: 16, // 16 max queue size for balanced memory/performance - BatchProcessorAdaptiveThreshold: 0.8, // 0.8 threshold for adaptive batching (80% queue full) - BatchProcessorThreadPinningThreshold: 8, // 8 frames minimum for thread pinning optimization + // Batch Processing Constants - Optimized for quality change bursts + BatchProcessorFramesPerBatch: 16, // Larger batches for quality changes + BatchProcessorTimeout: 20 * time.Millisecond, // Longer timeout for bursts + BatchProcessorMaxQueueSize: 64, // Larger queue for quality changes + BatchProcessorAdaptiveThreshold: 0.6, // Lower threshold for faster adaptation + BatchProcessorThreadPinningThreshold: 8, // Lower threshold for better performance - // Output Streaming Constants - OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) + // Output Streaming Constants - Balanced for stability + OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) for stability // IPC Constants IPCInitialBufferFrames: 500, // 500 frames for initial buffer - // Event Constants + // Event Constants - Balanced for stability EventTimeoutSeconds: 2, // 2 seconds for event timeout EventTimeFormatString: "2006-01-02T15:04:05.000Z", // "2006-01-02T15:04:05.000Z" time format EventSubscriptionDelayMS: 100, // 100ms subscription delay @@ -585,7 +603,7 @@ func DefaultAudioConfig() *AudioConfigConstants { AudioReaderQueueSize: 32, // 32 tasks queue size for reader pool WorkerMaxIdleTime: 60 * time.Second, // 60s maximum idle time before worker termination - // Input Processing Constants + // Input Processing Constants - Balanced for stability InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold // Adaptive Buffer Constants @@ -670,7 +688,7 @@ func DefaultAudioConfig() *AudioConfigConstants { } // Global configuration instance -var audioConfigInstance = DefaultAudioConfig() +var Config = DefaultAudioConfig() // UpdateConfig allows runtime configuration updates func UpdateConfig(newConfig *AudioConfigConstants) { @@ -682,12 +700,12 @@ func UpdateConfig(newConfig *AudioConfigConstants) { return } - audioConfigInstance = newConfig + Config = newConfig logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger() logger.Info().Msg("Audio configuration updated successfully") } // GetConfig returns the current configuration func GetConfig() *AudioConfigConstants { - return audioConfigInstance + return Config } diff --git a/internal/audio/core_validation.go b/internal/audio/core_validation.go index 7eb63542..03b44adb 100644 --- a/internal/audio/core_validation.go +++ b/internal/audio/core_validation.go @@ -55,12 +55,11 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error { maxFrameSize := cachedMaxFrameSize if maxFrameSize == 0 { // Fallback: get from cache - cache := GetCachedConfig() - maxFrameSize = int(cache.maxAudioFrameSize.Load()) + cache := Config + maxFrameSize = cache.MaxAudioFrameSize if maxFrameSize == 0 { - // Last resort: update cache - cache.Update() - maxFrameSize = int(cache.maxAudioFrameSize.Load()) + // Last resort: use default + maxFrameSize = cache.MaxAudioFrameSize } // Cache globally for next calls cachedMaxFrameSize = maxFrameSize @@ -80,8 +79,8 @@ func ValidateBufferSize(size int) error { } // Fast path: Check against cached max frame size - cache := GetCachedConfig() - maxFrameSize := int(cache.maxAudioFrameSize.Load()) + cache := Config + maxFrameSize := cache.MaxAudioFrameSize // Most common case: validating a buffer that's sized for audio frames if maxFrameSize > 0 && size <= maxFrameSize { @@ -89,7 +88,7 @@ func ValidateBufferSize(size int) error { } // Slower path: full validation against SocketMaxBuffer - config := GetConfig() + config := Config // Use SocketMaxBuffer as the upper limit for general buffer validation // This allows for socket buffers while still preventing extremely large allocations if size > config.SocketMaxBuffer { @@ -107,8 +106,8 @@ func ValidateLatency(latency time.Duration) error { } // Fast path: check against cached max latency - cache := GetCachedConfig() - maxLatency := time.Duration(cache.maxLatency.Load()) + cache := Config + maxLatency := time.Duration(cache.MaxLatency) // If we have a valid cached value, use it if maxLatency > 0 { @@ -125,7 +124,7 @@ func ValidateLatency(latency time.Duration) error { } // Slower path: full validation with GetConfig() - config := GetConfig() + config := Config minLatency := time.Millisecond // Minimum reasonable latency if latency > 0 && latency < minLatency { return fmt.Errorf("%w: latency %v below minimum %v", @@ -142,9 +141,9 @@ func ValidateLatency(latency time.Duration) error { // Optimized to use AudioConfigCache for frequently accessed values func ValidateMetricsInterval(interval time.Duration) error { // Fast path: check against cached values - cache := GetCachedConfig() - minInterval := time.Duration(cache.minMetricsUpdateInterval.Load()) - maxInterval := time.Duration(cache.maxMetricsUpdateInterval.Load()) + cache := Config + minInterval := time.Duration(cache.MinMetricsUpdateInterval) + maxInterval := time.Duration(cache.MaxMetricsUpdateInterval) // If we have valid cached values, use them if minInterval > 0 && maxInterval > 0 { @@ -160,7 +159,7 @@ func ValidateMetricsInterval(interval time.Duration) error { } // Slower path: full validation with GetConfig() - config := GetConfig() + config := Config minInterval = config.MinMetricsUpdateInterval maxInterval = config.MaxMetricsUpdateInterval if interval < minInterval { @@ -184,7 +183,7 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error { return ErrInvalidBufferSize } // Validate against global limits - maxBuffer := GetConfig().SocketMaxBuffer + maxBuffer := Config.SocketMaxBuffer if maxSize > maxBuffer { return ErrInvalidBufferSize } @@ -194,7 +193,7 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error { // ValidateInputIPCConfig validates input IPC configuration func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { // Use config values - config := GetConfig() + config := Config minSampleRate := config.MinSampleRate maxSampleRate := config.MaxSampleRate maxChannels := config.MaxChannels @@ -213,7 +212,7 @@ func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { // ValidateOutputIPCConfig validates output IPC configuration func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error { // Use config values - config := GetConfig() + config := Config minSampleRate := config.MinSampleRate maxSampleRate := config.MaxSampleRate maxChannels := config.MaxChannels @@ -263,8 +262,8 @@ func ValidateSampleRate(sampleRate int) error { } // Fast path: Check against cached sample rate first - cache := GetCachedConfig() - cachedRate := int(cache.sampleRate.Load()) + cache := Config + cachedRate := cache.SampleRate // Most common case: validating against the current sample rate if sampleRate == cachedRate { @@ -272,7 +271,7 @@ func ValidateSampleRate(sampleRate int) error { } // Slower path: check against all valid rates - config := GetConfig() + config := Config validRates := config.ValidSampleRates for _, rate := range validRates { if sampleRate == rate { @@ -291,8 +290,8 @@ func ValidateChannelCount(channels int) error { } // Fast path: Check against cached channels first - cache := GetCachedConfig() - cachedChannels := int(cache.channels.Load()) + cache := Config + cachedChannels := cache.Channels // Most common case: validating against the current channel count if channels == cachedChannels { @@ -300,14 +299,13 @@ func ValidateChannelCount(channels int) error { } // Fast path: Check against cached max channels - cachedMaxChannels := int(cache.maxChannels.Load()) + cachedMaxChannels := cache.MaxChannels if cachedMaxChannels > 0 && channels <= cachedMaxChannels { return nil } - // Slow path: Update cache and validate - cache.Update() - updatedMaxChannels := int(cache.maxChannels.Load()) + // Slow path: Use current config values + updatedMaxChannels := cache.MaxChannels if channels > updatedMaxChannels { return fmt.Errorf("%w: channel count %d exceeds maximum %d", ErrInvalidChannels, channels, updatedMaxChannels) @@ -323,9 +321,9 @@ func ValidateBitrate(bitrate int) error { } // Fast path: Check against cached bitrate values - cache := GetCachedConfig() - minBitrate := int(cache.minOpusBitrate.Load()) - maxBitrate := int(cache.maxOpusBitrate.Load()) + cache := Config + minBitrate := cache.MinOpusBitrate + maxBitrate := cache.MaxOpusBitrate // If we have valid cached values, use them if minBitrate > 0 && maxBitrate > 0 { @@ -343,7 +341,7 @@ func ValidateBitrate(bitrate int) error { } // Slower path: full validation with GetConfig() - config := GetConfig() + config := Config // Convert kbps to bps for comparison with config limits bitrateInBps := bitrate * 1000 if bitrateInBps < config.MinOpusBitrate { @@ -365,11 +363,11 @@ func ValidateFrameDuration(duration time.Duration) error { } // Fast path: Check against cached frame size first - cache := GetCachedConfig() + cache := Config // Convert frameSize (samples) to duration for comparison - cachedFrameSize := int(cache.frameSize.Load()) - cachedSampleRate := int(cache.sampleRate.Load()) + cachedFrameSize := cache.FrameSize + cachedSampleRate := cache.SampleRate // Only do this calculation if we have valid cached values if cachedFrameSize > 0 && cachedSampleRate > 0 { @@ -382,8 +380,8 @@ func ValidateFrameDuration(duration time.Duration) error { } // Fast path: Check against cached min/max frame duration - cachedMinDuration := time.Duration(cache.minFrameDuration.Load()) - cachedMaxDuration := time.Duration(cache.maxFrameDuration.Load()) + cachedMinDuration := time.Duration(cache.MinFrameDuration) + cachedMaxDuration := time.Duration(cache.MaxFrameDuration) if cachedMinDuration > 0 && cachedMaxDuration > 0 { if duration < cachedMinDuration { @@ -397,10 +395,9 @@ func ValidateFrameDuration(duration time.Duration) error { return nil } - // Slow path: Update cache and validate - cache.Update() - updatedMinDuration := time.Duration(cache.minFrameDuration.Load()) - updatedMaxDuration := time.Duration(cache.maxFrameDuration.Load()) + // Slow path: Use current config values + updatedMinDuration := time.Duration(cache.MinFrameDuration) + updatedMaxDuration := time.Duration(cache.MaxFrameDuration) if duration < updatedMinDuration { return fmt.Errorf("%w: frame duration %v below minimum %v", @@ -417,11 +414,11 @@ func ValidateFrameDuration(duration time.Duration) error { // Uses optimized validation functions that leverage AudioConfigCache func ValidateAudioConfigComplete(config AudioConfig) error { // Fast path: Check if all values match the current cached configuration - cache := GetCachedConfig() - cachedSampleRate := int(cache.sampleRate.Load()) - cachedChannels := int(cache.channels.Load()) - cachedBitrate := int(cache.opusBitrate.Load()) / 1000 // Convert from bps to kbps - cachedFrameSize := int(cache.frameSize.Load()) + cache := Config + cachedSampleRate := cache.SampleRate + cachedChannels := cache.Channels + cachedBitrate := cache.OpusBitrate / 1000 // Convert from bps to kbps + cachedFrameSize := cache.FrameSize // Only do this calculation if we have valid cached values if cachedSampleRate > 0 && cachedChannels > 0 && cachedBitrate > 0 && cachedFrameSize > 0 { @@ -481,11 +478,11 @@ var cachedMaxFrameSize int // InitValidationCache initializes cached validation values with actual config func InitValidationCache() { // Initialize the global cache variable for backward compatibility - config := GetConfig() + config := Config cachedMaxFrameSize = config.MaxAudioFrameSize - // Update the global audio config cache - GetCachedConfig().Update() + // Initialize the global audio config cache + cachedMaxFrameSize = Config.MaxAudioFrameSize } // ValidateAudioFrame validates audio frame data with cached max size for performance @@ -502,12 +499,11 @@ func ValidateAudioFrame(data []byte) error { maxSize := cachedMaxFrameSize if maxSize == 0 { // Fallback: get from cache only if global cache not initialized - cache := GetCachedConfig() - maxSize = int(cache.maxAudioFrameSize.Load()) + cache := Config + maxSize = cache.MaxAudioFrameSize if maxSize == 0 { - // Last resort: update cache and get fresh value - cache.Update() - maxSize = int(cache.maxAudioFrameSize.Load()) + // Last resort: get fresh value + maxSize = cache.MaxAudioFrameSize } // Cache the value globally for next calls cachedMaxFrameSize = maxSize diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go index cfc844e0..23115a1d 100644 --- a/internal/audio/goroutine_pool.go +++ b/internal/audio/goroutine_pool.go @@ -255,7 +255,7 @@ func GetAudioProcessorPool() *GoroutinePool { } globalAudioProcessorInitOnce.Do(func() { - config := GetConfig() + config := Config newPool := NewGoroutinePool( "audio-processor", config.MaxAudioProcessorWorkers, @@ -277,7 +277,7 @@ func GetAudioReaderPool() *GoroutinePool { } globalAudioReaderInitOnce.Do(func() { - config := GetConfig() + config := Config newPool := NewGoroutinePool( "audio-reader", config.MaxAudioReaderWorkers, diff --git a/internal/audio/input_microphone_manager.go b/internal/audio/input_microphone_manager.go index f80cfd3f..5178f9f3 100644 --- a/internal/audio/input_microphone_manager.go +++ b/internal/audio/input_microphone_manager.go @@ -108,7 +108,7 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { processingTime := time.Since(startTime) // Log high latency warnings - if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { + if processingTime > time.Duration(Config.InputProcessingTimeoutMS)*time.Millisecond { latencyMs := float64(processingTime.Milliseconds()) aim.logger.Warn(). Float64("latency_ms", latencyMs). @@ -149,7 +149,7 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) processingTime := time.Since(startTime) // Log high latency warnings - if processingTime > time.Duration(GetConfig().InputProcessingTimeoutMS)*time.Millisecond { + if processingTime > time.Duration(Config.InputProcessingTimeoutMS)*time.Millisecond { latencyMs := float64(processingTime.Milliseconds()) aim.logger.Warn(). Float64("latency_ms", latencyMs). diff --git a/internal/audio/input_server_main.go b/internal/audio/input_server_main.go index 889755c4..dc8b77e3 100644 --- a/internal/audio/input_server_main.go +++ b/internal/audio/input_server_main.go @@ -107,7 +107,7 @@ func RunAudioInputServer() error { server.Stop() // Give some time for cleanup - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) return nil } diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 7311d094..70b63c88 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -73,7 +73,7 @@ func (ais *AudioInputSupervisor) supervisionLoop() { // Configure supervision parameters (no restart for input supervisor) config := SupervisionConfig{ ProcessType: "audio input server", - Timeout: GetConfig().InputSupervisorTimeout, + Timeout: Config.InputSupervisorTimeout, EnableRestart: false, // Input supervisor doesn't restart MaxRestartAttempts: 0, RestartWindow: 0, @@ -164,7 +164,7 @@ func (ais *AudioInputSupervisor) Stop() { select { case <-ais.processDone: ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped gracefully") - case <-time.After(GetConfig().InputSupervisorTimeout): + case <-time.After(Config.InputSupervisorTimeout): ais.logger.Warn().Str("component", "audio-input-supervisor").Msg("component did not stop gracefully, forcing termination") ais.forceKillProcess("audio input server") } @@ -190,7 +190,7 @@ func (ais *AudioInputSupervisor) GetClient() *AudioInputClient { // connectClient attempts to connect the client to the server func (ais *AudioInputSupervisor) connectClient() { // Wait briefly for the server to start and create socket - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) // Additional small delay to ensure socket is ready after restart time.Sleep(20 * time.Millisecond) diff --git a/internal/audio/ipc_common.go b/internal/audio/ipc_common.go index 38b595ec..4b2263d7 100644 --- a/internal/audio/ipc_common.go +++ b/internal/audio/ipc_common.go @@ -49,7 +49,7 @@ func NewGenericMessagePool(size int) *GenericMessagePool { pool.preallocated = make([]*OptimizedMessage, pool.preallocSize) for i := 0; i < pool.preallocSize; i++ { pool.preallocated[i] = &OptimizedMessage{ - data: make([]byte, 0, GetConfig().MaxFrameSize), + data: make([]byte, 0, Config.MaxFrameSize), } } @@ -57,7 +57,7 @@ func NewGenericMessagePool(size int) *GenericMessagePool { for i := 0; i < size-pool.preallocSize; i++ { select { case pool.pool <- &OptimizedMessage{ - data: make([]byte, 0, GetConfig().MaxFrameSize), + data: make([]byte, 0, Config.MaxFrameSize), }: default: break @@ -89,7 +89,7 @@ func (mp *GenericMessagePool) Get() *OptimizedMessage { // Pool empty, create new message atomic.AddInt64(&mp.missCount, 1) return &OptimizedMessage{ - data: make([]byte, 0, GetConfig().MaxFrameSize), + data: make([]byte, 0, Config.MaxFrameSize), } } } @@ -149,7 +149,7 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp())) // Set write deadline for timeout handling (more efficient than goroutines) - if deadline := time.Now().Add(GetConfig().WriteTimeout); deadline.After(time.Now()) { + if deadline := time.Now().Add(Config.WriteTimeout); deadline.After(time.Now()) { if err := conn.SetWriteDeadline(deadline); err != nil { // If we can't set deadline, proceed without it // This maintains compatibility with connections that don't support deadlines diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 56d0e8f9..730d2478 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -23,8 +23,8 @@ const ( // Constants are now defined in unified_ipc.go var ( - maxFrameSize = GetConfig().MaxFrameSize // Maximum Opus frame size - messagePoolSize = GetConfig().MessagePoolSize // Pre-allocated message pool size + maxFrameSize = Config.MaxFrameSize // Maximum Opus frame size + messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size ) // Legacy aliases for backward compatibility @@ -77,7 +77,7 @@ func initializeMessagePool() { messagePoolInitOnce.Do(func() { preallocSize := messagePoolSize / 4 // 25% pre-allocated for immediate use globalMessagePool.preallocSize = preallocSize - globalMessagePool.maxPoolSize = messagePoolSize * GetConfig().PoolGrowthMultiplier // Allow growth up to 2x + globalMessagePool.maxPoolSize = messagePoolSize * Config.PoolGrowthMultiplier // Allow growth up to 2x globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize) // Pre-allocate messages for immediate use @@ -378,7 +378,7 @@ func (ais *AudioInputServer) handleConnection(conn net.Conn) { if ais.conn == nil { return } - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) } } } @@ -499,11 +499,11 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error { } // Get cached config once - avoid repeated calls and locking - cache := GetCachedConfig() + cache := Config // Skip cache expiry check in hotpath - background updates handle this // Get a PCM buffer from the pool for optimized decode-write - pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) + pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize) defer ReturnBufferToPool(pcmBuffer) // Direct CGO call - avoid wrapper function overhead @@ -646,9 +646,9 @@ func (aic *AudioInputClient) Connect() error { return nil } // Exponential backoff starting from config - backoffStart := GetConfig().BackoffStart + backoffStart := Config.BackoffStart delay := time.Duration(backoffStart.Nanoseconds()*(1< maxDelay { delay = maxDelay } @@ -911,10 +911,10 @@ func (ais *AudioInputServer) startReaderGoroutine() { // Enhanced error tracking and recovery var consecutiveErrors int var lastErrorTime time.Time - maxConsecutiveErrors := GetConfig().MaxConsecutiveErrors - errorResetWindow := GetConfig().RestartWindow // Use existing restart window - baseBackoffDelay := GetConfig().RetryDelay - maxBackoffDelay := GetConfig().MaxRetryDelay + maxConsecutiveErrors := Config.MaxConsecutiveErrors + errorResetWindow := Config.RestartWindow // Use existing restart window + baseBackoffDelay := Config.RetryDelay + maxBackoffDelay := Config.MaxRetryDelay logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() @@ -1025,7 +1025,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() { processorTask := func() { // Only lock OS thread and set priority for high-load scenarios // This reduces interference with input processing threads - config := GetConfig() + config := Config useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 if useThreadOptimizations { @@ -1137,7 +1137,7 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo select { case processChan <- msg: return nil - case <-time.After(GetConfig().WriteTimeout): + case <-time.After(Config.WriteTimeout): // Processing queue full and timeout reached, drop frame atomic.AddInt64(&ais.droppedFrames, 1) return fmt.Errorf("processing queue timeout") @@ -1156,7 +1156,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { monitorTask := func() { // Monitor goroutine doesn't need thread locking for most scenarios // Only use thread optimizations for high-throughput scenarios - config := GetConfig() + config := Config useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 if useThreadOptimizations { @@ -1167,11 +1167,11 @@ func (ais *AudioInputServer) startMonitorGoroutine() { } defer ais.wg.Done() - ticker := time.NewTicker(GetConfig().DefaultTickerInterval) + ticker := time.NewTicker(Config.DefaultTickerInterval) defer ticker.Stop() // Buffer size update ticker (less frequent) - bufferUpdateTicker := time.NewTicker(GetConfig().BufferUpdateInterval) + bufferUpdateTicker := time.NewTicker(Config.BufferUpdateInterval) defer bufferUpdateTicker.Stop() for { @@ -1330,7 +1330,7 @@ func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats { var hitRate float64 if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * GetConfig().PercentageMultiplier + hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier } // Calculate channel pool size diff --git a/internal/audio/ipc_output.go b/internal/audio/ipc_output.go index ccef9318..473b7f70 100644 --- a/internal/audio/ipc_output.go +++ b/internal/audio/ipc_output.go @@ -24,7 +24,7 @@ const ( // Methods are now inherited from UnifiedIPCMessage // Global shared message pool for output IPC client header reading -var globalOutputClientMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize) +var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize) // AudioOutputServer is now an alias for UnifiedAudioServer type AudioOutputServer = UnifiedAudioServer @@ -95,7 +95,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { } size := binary.LittleEndian.Uint32(optMsg.header[5:9]) - maxFrameSize := GetConfig().OutputMaxFrameSize + maxFrameSize := Config.OutputMaxFrameSize if int(size) > maxFrameSize { return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize) } diff --git a/internal/audio/ipc_unified.go b/internal/audio/ipc_unified.go index ada7faf0..dec68352 100644 --- a/internal/audio/ipc_unified.go +++ b/internal/audio/ipc_unified.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "fmt" "io" + "math" "net" "os" "path/filepath" @@ -17,8 +18,8 @@ import ( // Unified IPC constants var ( - outputMagicNumber uint32 = GetConfig().OutputMagicNumber // "JKOU" (JetKVM Output) - inputMagicNumber uint32 = GetConfig().InputMagicNumber // "JKMI" (JetKVM Microphone Input) + outputMagicNumber uint32 = Config.OutputMagicNumber // "JKOU" (JetKVM Output) + inputMagicNumber uint32 = Config.InputMagicNumber // "JKMI" (JetKVM Microphone Input) outputSocketName = "audio_output.sock" inputSocketName = "audio_input.sock" headerSize = 17 // Fixed header size: 4+1+4+8 bytes @@ -144,8 +145,8 @@ func NewUnifiedAudioServer(isInput bool) (*UnifiedAudioServer, error) { logger: logger, socketPath: socketPath, magicNumber: magicNumber, - messageChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize), - processChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize), + messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), + processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), socketBufferConfig: DefaultSocketBufferConfig(), latencyMonitor: nil, adaptiveOptimizer: nil, @@ -311,7 +312,7 @@ func (s *UnifiedAudioServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, err timestamp := int64(binary.LittleEndian.Uint64(header[9:17])) // Validate length - if length > uint32(GetConfig().MaxFrameSize) { + if length > uint32(Config.MaxFrameSize) { return nil, fmt.Errorf("message too large: %d bytes", length) } @@ -339,7 +340,10 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error { defer s.mtx.Unlock() if !s.running || s.conn == nil { - return fmt.Errorf("no client connected") + // Silently drop frames when no client is connected + // This prevents "no client connected" warnings during startup and quality changes + atomic.AddInt64(&s.droppedFrames, 1) + return nil // Return nil to avoid flooding logs with connection warnings } start := time.Now() @@ -398,7 +402,7 @@ func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) // UnifiedAudioClient provides common functionality for both input and output clients type UnifiedAudioClient struct { - // Atomic fields first for ARM32 alignment + // Atomic counters for frame statistics droppedFrames int64 // Atomic counter for dropped frames totalFrames int64 // Atomic counter for total frames @@ -409,6 +413,13 @@ type UnifiedAudioClient struct { socketPath string magicNumber uint32 bufferPool *AudioBufferPool // Buffer pool for memory optimization + + // Connection health monitoring + lastHealthCheck time.Time + connectionErrors int64 // Atomic counter for connection errors + autoReconnect bool // Enable automatic reconnection + healthCheckTicker *time.Ticker + stopHealthCheck chan struct{} } // NewUnifiedAudioClient creates a new unified audio client @@ -430,10 +441,12 @@ func NewUnifiedAudioClient(isInput bool) *UnifiedAudioClient { logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger() return &UnifiedAudioClient{ - logger: logger, - socketPath: socketPath, - magicNumber: magicNumber, - bufferPool: NewAudioBufferPool(GetConfig().MaxFrameSize), + logger: logger, + socketPath: socketPath, + magicNumber: magicNumber, + bufferPool: NewAudioBufferPool(Config.MaxFrameSize), + autoReconnect: true, // Enable automatic reconnection by default + stopHealthCheck: make(chan struct{}), } } @@ -453,32 +466,46 @@ func (c *UnifiedAudioClient) Connect() error { } // Try connecting multiple times as the server might not be ready - // Reduced retry count and delay for faster startup - for i := 0; i < 10; i++ { - conn, err := net.Dial("unix", c.socketPath) + // Use configurable retry parameters for better control + maxAttempts := Config.MaxConnectionAttempts + initialDelay := Config.ConnectionRetryDelay + maxDelay := Config.MaxConnectionRetryDelay + backoffFactor := Config.ConnectionBackoffFactor + + for i := 0; i < maxAttempts; i++ { + // Set connection timeout for each attempt + conn, err := net.DialTimeout("unix", c.socketPath, Config.ConnectionTimeoutDelay) if err == nil { c.conn = conn c.running = true // Reset frame counters on successful connection atomic.StoreInt64(&c.totalFrames, 0) atomic.StoreInt64(&c.droppedFrames, 0) - c.logger.Info().Str("socket_path", c.socketPath).Msg("Connected to server") + atomic.StoreInt64(&c.connectionErrors, 0) + c.lastHealthCheck = time.Now() + // Start health check monitoring if auto-reconnect is enabled + if c.autoReconnect { + c.startHealthCheck() + } + c.logger.Info().Str("socket_path", c.socketPath).Int("attempt", i+1).Msg("Connected to server") return nil } - // Exponential backoff starting from config - backoffStart := GetConfig().BackoffStart - delay := time.Duration(backoffStart.Nanoseconds()*(1< maxDelay { - delay = maxDelay + + // Log connection attempt failure + c.logger.Debug().Err(err).Str("socket_path", c.socketPath).Int("attempt", i+1).Int("max_attempts", maxAttempts).Msg("Connection attempt failed") + + // Don't sleep after the last attempt + if i < maxAttempts-1 { + // Calculate adaptive delay based on connection failure patterns + delay := c.calculateAdaptiveDelay(i, initialDelay, maxDelay, backoffFactor) + time.Sleep(delay) } - time.Sleep(delay) } // Ensure clean state on connection failure c.conn = nil c.running = false - return fmt.Errorf("failed to connect to audio server after 10 attempts") + return fmt.Errorf("failed to connect to audio server after %d attempts", Config.MaxConnectionAttempts) } // Disconnect disconnects the client from the server @@ -492,6 +519,9 @@ func (c *UnifiedAudioClient) Disconnect() { c.running = false + // Stop health check monitoring + c.stopHealthCheckMonitoring() + if c.conn != nil { c.conn.Close() c.conn = nil @@ -511,7 +541,122 @@ func (c *UnifiedAudioClient) IsConnected() bool { func (c *UnifiedAudioClient) GetFrameStats() (total, dropped int64) { total = atomic.LoadInt64(&c.totalFrames) dropped = atomic.LoadInt64(&c.droppedFrames) - return total, dropped + return +} + +// startHealthCheck starts the connection health monitoring +func (c *UnifiedAudioClient) startHealthCheck() { + if c.healthCheckTicker != nil { + c.healthCheckTicker.Stop() + } + + c.healthCheckTicker = time.NewTicker(Config.HealthCheckInterval) + go func() { + for { + select { + case <-c.healthCheckTicker.C: + c.performHealthCheck() + case <-c.stopHealthCheck: + return + } + } + }() +} + +// stopHealthCheckMonitoring stops the health check monitoring +func (c *UnifiedAudioClient) stopHealthCheckMonitoring() { + if c.healthCheckTicker != nil { + c.healthCheckTicker.Stop() + c.healthCheckTicker = nil + } + select { + case c.stopHealthCheck <- struct{}{}: + default: + } +} + +// performHealthCheck checks the connection health and attempts reconnection if needed +func (c *UnifiedAudioClient) performHealthCheck() { + c.mtx.Lock() + defer c.mtx.Unlock() + + if !c.running || c.conn == nil { + return + } + + // Simple health check: try to get connection info + if tcpConn, ok := c.conn.(*net.UnixConn); ok { + if _, err := tcpConn.File(); err != nil { + // Connection is broken + atomic.AddInt64(&c.connectionErrors, 1) + c.logger.Warn().Err(err).Msg("Connection health check failed, attempting reconnection") + + // Close the broken connection + c.conn.Close() + c.conn = nil + c.running = false + + // Attempt reconnection + go func() { + time.Sleep(Config.ReconnectionInterval) + if err := c.Connect(); err != nil { + c.logger.Error().Err(err).Msg("Failed to reconnect during health check") + } + }() + } + } + + c.lastHealthCheck = time.Now() +} + +// SetAutoReconnect enables or disables automatic reconnection +func (c *UnifiedAudioClient) SetAutoReconnect(enabled bool) { + c.mtx.Lock() + defer c.mtx.Unlock() + + c.autoReconnect = enabled + if !enabled { + c.stopHealthCheckMonitoring() + } else if c.running { + c.startHealthCheck() + } +} + +// GetConnectionErrors returns the number of connection errors +func (c *UnifiedAudioClient) GetConnectionErrors() int64 { + return atomic.LoadInt64(&c.connectionErrors) +} + +// calculateAdaptiveDelay calculates retry delay based on system load and failure patterns +func (c *UnifiedAudioClient) calculateAdaptiveDelay(attempt int, initialDelay, maxDelay time.Duration, backoffFactor float64) time.Duration { + // Base exponential backoff + baseDelay := time.Duration(float64(initialDelay.Nanoseconds()) * math.Pow(backoffFactor, float64(attempt))) + + // Get connection error history for adaptive adjustment + errorCount := atomic.LoadInt64(&c.connectionErrors) + + // Adjust delay based on recent connection errors + // More errors = longer delays to avoid overwhelming the server + adaptiveFactor := 1.0 + if errorCount > 5 { + adaptiveFactor = 1.5 // 50% longer delays after many errors + } else if errorCount > 10 { + adaptiveFactor = 2.0 // Double delays after excessive errors + } + + // Apply adaptive factor + adaptiveDelay := time.Duration(float64(baseDelay.Nanoseconds()) * adaptiveFactor) + + // Ensure we don't exceed maximum delay + if adaptiveDelay > maxDelay { + adaptiveDelay = maxDelay + } + + // Add small random jitter to avoid thundering herd + jitter := time.Duration(float64(adaptiveDelay.Nanoseconds()) * 0.1 * (0.5 + float64(attempt%3)/6.0)) + adaptiveDelay += jitter + + return adaptiveDelay } // Helper functions for socket paths diff --git a/internal/audio/mgmt_input_ipc_manager.go b/internal/audio/mgmt_input_ipc_manager.go index 5fa84660..d59e6f6b 100644 --- a/internal/audio/mgmt_input_ipc_manager.go +++ b/internal/audio/mgmt_input_ipc_manager.go @@ -63,9 +63,9 @@ func (aim *AudioInputIPCManager) Start() error { } config := InputIPCConfig{ - SampleRate: GetConfig().InputIPCSampleRate, - Channels: GetConfig().InputIPCChannels, - FrameSize: GetConfig().InputIPCFrameSize, + SampleRate: Config.InputIPCSampleRate, + Channels: Config.InputIPCChannels, + FrameSize: Config.InputIPCFrameSize, } // Validate configuration before using it @@ -80,7 +80,7 @@ func (aim *AudioInputIPCManager) Start() error { } // Wait for subprocess readiness - time.Sleep(GetConfig().LongSleepDuration) + time.Sleep(Config.LongSleepDuration) err = aim.supervisor.SendConfig(config) if err != nil { diff --git a/internal/audio/mgmt_output_ipc_manager.go b/internal/audio/mgmt_output_ipc_manager.go index 381be1c9..145c1c20 100644 --- a/internal/audio/mgmt_output_ipc_manager.go +++ b/internal/audio/mgmt_output_ipc_manager.go @@ -57,9 +57,9 @@ func (aom *AudioOutputIPCManager) Start() error { // Send initial configuration config := OutputIPCConfig{ - SampleRate: GetConfig().SampleRate, - Channels: GetConfig().Channels, - FrameSize: int(GetConfig().AudioQualityMediumFrameSize.Milliseconds()), + SampleRate: Config.SampleRate, + Channels: Config.Channels, + FrameSize: int(Config.AudioQualityMediumFrameSize.Milliseconds()), } if err := aom.SendConfig(config); err != nil { diff --git a/internal/audio/mic_contention.go b/internal/audio/mic_contention.go index 373d656a..08d60d3c 100644 --- a/internal/audio/mic_contention.go +++ b/internal/audio/mic_contention.go @@ -105,7 +105,7 @@ func GetMicrophoneContentionManager() *MicrophoneContentionManager { } if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) { - manager := NewMicrophoneContentionManager(GetConfig().MicContentionTimeout) + manager := NewMicrophoneContentionManager(Config.MicContentionTimeout) atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager)) return manager } @@ -115,7 +115,7 @@ func GetMicrophoneContentionManager() *MicrophoneContentionManager { return (*MicrophoneContentionManager)(ptr) } - return NewMicrophoneContentionManager(GetConfig().MicContentionTimeout) + return NewMicrophoneContentionManager(Config.MicContentionTimeout) } func TryMicrophoneOperation() OperationResult { diff --git a/internal/audio/monitor_adaptive_optimizer.go b/internal/audio/monitor_adaptive_optimizer.go index 05c4ae5e..fe0b81f0 100644 --- a/internal/audio/monitor_adaptive_optimizer.go +++ b/internal/audio/monitor_adaptive_optimizer.go @@ -64,10 +64,10 @@ type OptimizerConfig struct { func DefaultOptimizerConfig() OptimizerConfig { return OptimizerConfig{ MaxOptimizationLevel: 8, - CooldownPeriod: GetConfig().CooldownPeriod, - Aggressiveness: GetConfig().OptimizerAggressiveness, - RollbackThreshold: GetConfig().RollbackThreshold, - StabilityPeriod: GetConfig().AdaptiveOptimizerStability, + CooldownPeriod: Config.CooldownPeriod, + Aggressiveness: Config.OptimizerAggressiveness, + RollbackThreshold: Config.RollbackThreshold, + StabilityPeriod: Config.AdaptiveOptimizerStability, // Adaptive interval defaults MinOptimizationInterval: 100 * time.Millisecond, // High stability: check every 100ms @@ -142,7 +142,7 @@ func (ao *AdaptiveOptimizer) handleLatencyOptimization(metrics LatencyMetrics) e // calculateTargetOptimizationLevel determines the appropriate optimization level func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMetrics) int64 { // Base calculation on current latency vs target - latencyRatio := float64(metrics.Current) / float64(GetConfig().AdaptiveOptimizerLatencyTarget) // 50ms target + latencyRatio := float64(metrics.Current) / float64(Config.AdaptiveOptimizerLatencyTarget) // 50ms target // Adjust based on trend switch metrics.Trend { @@ -158,7 +158,7 @@ func (ao *AdaptiveOptimizer) calculateTargetOptimizationLevel(metrics LatencyMet latencyRatio *= ao.config.Aggressiveness // Convert to optimization level - targetLevel := int64(latencyRatio * GetConfig().LatencyScalingFactor) // Scale to 0-10 range + targetLevel := int64(latencyRatio * Config.LatencyScalingFactor) // Scale to 0-10 range if targetLevel > int64(ao.config.MaxOptimizationLevel) { targetLevel = int64(ao.config.MaxOptimizationLevel) } diff --git a/internal/audio/monitor_goroutine.go b/internal/audio/monitor_goroutine.go index 00dd3743..fa2c8d8d 100644 --- a/internal/audio/monitor_goroutine.go +++ b/internal/audio/monitor_goroutine.go @@ -126,7 +126,7 @@ func (gm *GoroutineMonitor) GetGoroutineStats() map[string]interface{} { // GetGoroutineMonitor returns the global goroutine monitor instance func GetGoroutineMonitor() *GoroutineMonitor { if globalGoroutineMonitor == nil { - globalGoroutineMonitor = NewGoroutineMonitor(GetConfig().GoroutineMonitorInterval) + globalGoroutineMonitor = NewGoroutineMonitor(Config.GoroutineMonitorInterval) } return globalGoroutineMonitor } diff --git a/internal/audio/monitor_latency.go b/internal/audio/monitor_latency.go index 40b2381d..e44c4c08 100644 --- a/internal/audio/monitor_latency.go +++ b/internal/audio/monitor_latency.go @@ -81,7 +81,7 @@ const ( // DefaultLatencyConfig returns a sensible default configuration func DefaultLatencyConfig() LatencyConfig { - config := GetConfig() + config := Config return LatencyConfig{ TargetLatency: config.LatencyMonitorTarget, MaxLatency: config.MaxLatencyThreshold, diff --git a/internal/audio/monitor_process.go b/internal/audio/monitor_process.go index 30bd0bb0..aa898347 100644 --- a/internal/audio/monitor_process.go +++ b/internal/audio/monitor_process.go @@ -16,26 +16,26 @@ import ( // Variables for process monitoring (using configuration) var ( // System constants - maxCPUPercent = GetConfig().MaxCPUPercent - minCPUPercent = GetConfig().MinCPUPercent - defaultClockTicks = GetConfig().DefaultClockTicks - defaultMemoryGB = GetConfig().DefaultMemoryGB + maxCPUPercent = Config.MaxCPUPercent + minCPUPercent = Config.MinCPUPercent + defaultClockTicks = Config.DefaultClockTicks + defaultMemoryGB = Config.DefaultMemoryGB // Monitoring thresholds - maxWarmupSamples = GetConfig().MaxWarmupSamples - warmupCPUSamples = GetConfig().WarmupCPUSamples + maxWarmupSamples = Config.MaxWarmupSamples + warmupCPUSamples = Config.WarmupCPUSamples // Channel buffer size - metricsChannelBuffer = GetConfig().MetricsChannelBuffer + metricsChannelBuffer = Config.MetricsChannelBuffer // Clock tick detection ranges - minValidClockTicks = float64(GetConfig().MinValidClockTicks) - maxValidClockTicks = float64(GetConfig().MaxValidClockTicks) + minValidClockTicks = float64(Config.MinValidClockTicks) + maxValidClockTicks = float64(Config.MaxValidClockTicks) ) // Variables for process monitoring var ( - pageSize = GetConfig().PageSize + pageSize = Config.PageSize ) // ProcessMetrics represents CPU and memory usage metrics for a process @@ -233,7 +233,7 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM // Calculate memory percentage (RSS / total system memory) if totalMem := pm.getTotalMemory(); totalMem > 0 { - metric.MemoryPercent = float64(metric.MemoryRSS) / float64(totalMem) * GetConfig().PercentageMultiplier + metric.MemoryPercent = float64(metric.MemoryRSS) / float64(totalMem) * Config.PercentageMultiplier } // Update state for next calculation @@ -283,7 +283,7 @@ func (pm *ProcessMonitor) calculateCPUPercent(totalCPUTime int64, state *process // Convert from clock ticks to seconds using actual system clock ticks clockTicks := pm.getClockTicks() cpuSeconds := cpuDelta / clockTicks - cpuPercent := (cpuSeconds / timeDelta) * GetConfig().PercentageMultiplier + cpuPercent := (cpuSeconds / timeDelta) * Config.PercentageMultiplier // Apply bounds if cpuPercent > maxCPUPercent { @@ -335,7 +335,7 @@ func (pm *ProcessMonitor) getClockTicks() float64 { if len(fields) >= 2 { if period, err := strconv.ParseInt(fields[1], 10, 64); err == nil && period > 0 { // Convert nanoseconds to Hz - hz := GetConfig().CGONanosecondsPerSecond / float64(period) + hz := Config.CGONanosecondsPerSecond / float64(period) if hz >= minValidClockTicks && hz <= maxValidClockTicks { pm.clockTicks = hz return @@ -363,7 +363,7 @@ func (pm *ProcessMonitor) getTotalMemory() int64 { pm.memoryOnce.Do(func() { file, err := os.Open("/proc/meminfo") if err != nil { - pm.totalMemory = int64(defaultMemoryGB) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) + pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) return } defer file.Close() @@ -375,14 +375,14 @@ func (pm *ProcessMonitor) getTotalMemory() int64 { fields := strings.Fields(line) if len(fields) >= 2 { if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil { - pm.totalMemory = kb * int64(GetConfig().ProcessMonitorKBToBytes) + pm.totalMemory = kb * int64(Config.ProcessMonitorKBToBytes) return } } break } } - pm.totalMemory = int64(defaultMemoryGB) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) * int64(GetConfig().ProcessMonitorKBToBytes) // Fallback + pm.totalMemory = int64(defaultMemoryGB) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) * int64(Config.ProcessMonitorKBToBytes) // Fallback }) return pm.totalMemory } diff --git a/internal/audio/output_server_main.go b/internal/audio/output_server_main.go index aa229e61..1b536e10 100644 --- a/internal/audio/output_server_main.go +++ b/internal/audio/output_server_main.go @@ -70,7 +70,7 @@ func RunAudioOutputServer() error { StopNonBlockingAudioStreaming() // Give some time for cleanup - time.Sleep(GetConfig().DefaultSleepDuration) + time.Sleep(Config.DefaultSleepDuration) return nil } diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 4d3b3b34..2560d4be 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -84,9 +84,9 @@ func StartAudioOutputStreaming(send func([]byte)) error { buffer := make([]byte, GetMaxAudioFrameSize()) consecutiveErrors := 0 - maxConsecutiveErrors := GetConfig().MaxConsecutiveErrors - errorBackoffDelay := GetConfig().RetryDelay - maxErrorBackoff := GetConfig().MaxRetryDelay + maxConsecutiveErrors := Config.MaxConsecutiveErrors + errorBackoffDelay := Config.RetryDelay + maxErrorBackoff := Config.MaxRetryDelay for { select { @@ -123,18 +123,18 @@ func StartAudioOutputStreaming(send func([]byte)) error { Err(initErr). Msg("Failed to reinitialize audio system") // Exponential backoff for reinitialization failures - errorBackoffDelay = time.Duration(float64(errorBackoffDelay) * GetConfig().BackoffMultiplier) + errorBackoffDelay = time.Duration(float64(errorBackoffDelay) * Config.BackoffMultiplier) if errorBackoffDelay > maxErrorBackoff { errorBackoffDelay = maxErrorBackoff } } else { getOutputStreamingLogger().Info().Msg("Audio system reinitialized successfully") consecutiveErrors = 0 - errorBackoffDelay = GetConfig().RetryDelay // Reset backoff + errorBackoffDelay = Config.RetryDelay // Reset backoff } } else { // Brief delay for transient errors - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) } continue } @@ -142,7 +142,7 @@ func StartAudioOutputStreaming(send func([]byte)) error { // Success - reset error counters if consecutiveErrors > 0 { consecutiveErrors = 0 - errorBackoffDelay = GetConfig().RetryDelay + errorBackoffDelay = Config.RetryDelay } if n > 0 { @@ -164,7 +164,7 @@ func StartAudioOutputStreaming(send func([]byte)) error { RecordFrameReceived(n) } // Small delay to prevent busy waiting - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) } } }() @@ -185,6 +185,6 @@ func StopAudioOutputStreaming() { // Wait for streaming to stop for atomic.LoadInt32(&outputStreamingRunning) == 1 { - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) } } diff --git a/internal/audio/output_supervisor.go b/internal/audio/output_supervisor.go index 1abbca66..e4888b01 100644 --- a/internal/audio/output_supervisor.go +++ b/internal/audio/output_supervisor.go @@ -19,19 +19,19 @@ const ( // Restart configuration is now retrieved from centralized config func getMaxRestartAttempts() int { - return GetConfig().MaxRestartAttempts + return Config.MaxRestartAttempts } func getRestartWindow() time.Duration { - return GetConfig().RestartWindow + return Config.RestartWindow } func getRestartDelay() time.Duration { - return GetConfig().RestartDelay + return Config.RestartDelay } func getMaxRestartDelay() time.Duration { - return GetConfig().MaxRestartDelay + return Config.MaxRestartDelay } // AudioOutputSupervisor manages the audio output server subprocess lifecycle @@ -145,7 +145,7 @@ func (s *AudioOutputSupervisor) Stop() { select { case <-s.processDone: s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped gracefully") - case <-time.After(GetConfig().OutputSupervisorTimeout): + case <-time.After(Config.OutputSupervisorTimeout): s.logger.Warn().Str("component", AudioOutputSupervisorComponent).Msg("component did not stop gracefully, forcing termination") s.forceKillProcess("audio output server") } @@ -158,7 +158,7 @@ func (s *AudioOutputSupervisor) supervisionLoop() { // Configure supervision parameters config := SupervisionConfig{ ProcessType: "audio output server", - Timeout: GetConfig().OutputSupervisorTimeout, + Timeout: Config.OutputSupervisorTimeout, EnableRestart: true, MaxRestartAttempts: getMaxRestartAttempts(), RestartWindow: getRestartWindow(), diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 53b58f24..8117aa1f 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -39,7 +39,7 @@ var ( // MaxAudioFrameSize is now retrieved from centralized config func GetMaxAudioFrameSize() int { - return GetConfig().MaxAudioFrameSize + return Config.MaxAudioFrameSize } // AudioQuality represents different audio quality presets @@ -74,17 +74,17 @@ type AudioMetrics struct { var ( currentConfig = AudioConfig{ Quality: AudioQualityMedium, - Bitrate: GetConfig().AudioQualityMediumOutputBitrate, - SampleRate: GetConfig().SampleRate, - Channels: GetConfig().Channels, - FrameSize: GetConfig().AudioQualityMediumFrameSize, + Bitrate: Config.AudioQualityMediumOutputBitrate, + SampleRate: Config.SampleRate, + Channels: Config.Channels, + FrameSize: Config.AudioQualityMediumFrameSize, } currentMicrophoneConfig = AudioConfig{ Quality: AudioQualityMedium, - Bitrate: GetConfig().AudioQualityMediumInputBitrate, - SampleRate: GetConfig().SampleRate, + Bitrate: Config.AudioQualityMediumInputBitrate, + SampleRate: Config.SampleRate, Channels: 1, - FrameSize: GetConfig().AudioQualityMediumFrameSize, + FrameSize: Config.AudioQualityMediumFrameSize, } metrics AudioMetrics ) @@ -96,24 +96,24 @@ var qualityPresets = map[AudioQuality]struct { frameSize time.Duration }{ AudioQualityLow: { - outputBitrate: GetConfig().AudioQualityLowOutputBitrate, inputBitrate: GetConfig().AudioQualityLowInputBitrate, - sampleRate: GetConfig().AudioQualityLowSampleRate, channels: GetConfig().AudioQualityLowChannels, - frameSize: GetConfig().AudioQualityLowFrameSize, + outputBitrate: Config.AudioQualityLowOutputBitrate, inputBitrate: Config.AudioQualityLowInputBitrate, + sampleRate: Config.AudioQualityLowSampleRate, channels: Config.AudioQualityLowChannels, + frameSize: Config.AudioQualityLowFrameSize, }, AudioQualityMedium: { - outputBitrate: GetConfig().AudioQualityMediumOutputBitrate, inputBitrate: GetConfig().AudioQualityMediumInputBitrate, - sampleRate: GetConfig().AudioQualityMediumSampleRate, channels: GetConfig().AudioQualityMediumChannels, - frameSize: GetConfig().AudioQualityMediumFrameSize, + outputBitrate: Config.AudioQualityMediumOutputBitrate, inputBitrate: Config.AudioQualityMediumInputBitrate, + sampleRate: Config.AudioQualityMediumSampleRate, channels: Config.AudioQualityMediumChannels, + frameSize: Config.AudioQualityMediumFrameSize, }, AudioQualityHigh: { - outputBitrate: GetConfig().AudioQualityHighOutputBitrate, inputBitrate: GetConfig().AudioQualityHighInputBitrate, - sampleRate: GetConfig().SampleRate, channels: GetConfig().AudioQualityHighChannels, - frameSize: GetConfig().AudioQualityHighFrameSize, + outputBitrate: Config.AudioQualityHighOutputBitrate, inputBitrate: Config.AudioQualityHighInputBitrate, + sampleRate: Config.SampleRate, channels: Config.AudioQualityHighChannels, + frameSize: Config.AudioQualityHighFrameSize, }, AudioQualityUltra: { - outputBitrate: GetConfig().AudioQualityUltraOutputBitrate, inputBitrate: GetConfig().AudioQualityUltraInputBitrate, - sampleRate: GetConfig().SampleRate, channels: GetConfig().AudioQualityUltraChannels, - frameSize: GetConfig().AudioQualityUltraFrameSize, + outputBitrate: Config.AudioQualityUltraOutputBitrate, inputBitrate: Config.AudioQualityUltraInputBitrate, + sampleRate: Config.SampleRate, channels: Config.AudioQualityUltraChannels, + frameSize: Config.AudioQualityUltraFrameSize, }, } @@ -142,7 +142,7 @@ func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig { Bitrate: preset.inputBitrate, SampleRate: func() int { if quality == AudioQualityLow { - return GetConfig().AudioQualityMicLowSampleRate + return Config.AudioQualityMicLowSampleRate } return preset.sampleRate }(), @@ -172,36 +172,36 @@ func SetAudioQuality(quality AudioQuality) { var complexity, vbr, signalType, bandwidth, dtx int switch quality { case AudioQualityLow: - complexity = GetConfig().AudioQualityLowOpusComplexity - vbr = GetConfig().AudioQualityLowOpusVBR - signalType = GetConfig().AudioQualityLowOpusSignalType - bandwidth = GetConfig().AudioQualityLowOpusBandwidth - dtx = GetConfig().AudioQualityLowOpusDTX + complexity = Config.AudioQualityLowOpusComplexity + vbr = Config.AudioQualityLowOpusVBR + signalType = Config.AudioQualityLowOpusSignalType + bandwidth = Config.AudioQualityLowOpusBandwidth + dtx = Config.AudioQualityLowOpusDTX case AudioQualityMedium: - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX case AudioQualityHigh: - complexity = GetConfig().AudioQualityHighOpusComplexity - vbr = GetConfig().AudioQualityHighOpusVBR - signalType = GetConfig().AudioQualityHighOpusSignalType - bandwidth = GetConfig().AudioQualityHighOpusBandwidth - dtx = GetConfig().AudioQualityHighOpusDTX + complexity = Config.AudioQualityHighOpusComplexity + vbr = Config.AudioQualityHighOpusVBR + signalType = Config.AudioQualityHighOpusSignalType + bandwidth = Config.AudioQualityHighOpusBandwidth + dtx = Config.AudioQualityHighOpusDTX case AudioQualityUltra: - complexity = GetConfig().AudioQualityUltraOpusComplexity - vbr = GetConfig().AudioQualityUltraOpusVBR - signalType = GetConfig().AudioQualityUltraOpusSignalType - bandwidth = GetConfig().AudioQualityUltraOpusBandwidth - dtx = GetConfig().AudioQualityUltraOpusDTX + complexity = Config.AudioQualityUltraOpusComplexity + vbr = Config.AudioQualityUltraOpusVBR + signalType = Config.AudioQualityUltraOpusSignalType + bandwidth = Config.AudioQualityUltraOpusBandwidth + dtx = Config.AudioQualityUltraOpusDTX default: // Use medium quality as fallback - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX } // Restart audio output subprocess with new OPUS configuration @@ -256,7 +256,7 @@ func SetAudioQuality(quality AudioQuality) { } } else { // Fallback to dynamic update if supervisor is not available - vbrConstraint := GetConfig().CGOOpusVBRConstraint + vbrConstraint := Config.CGOOpusVBRConstraint if err := updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx); err != nil { logging.GetDefaultLogger().Error().Err(err).Msg("Failed to update OPUS encoder parameters") } @@ -287,36 +287,36 @@ func SetMicrophoneQuality(quality AudioQuality) { var complexity, vbr, signalType, bandwidth, dtx int switch quality { case AudioQualityLow: - complexity = GetConfig().AudioQualityLowOpusComplexity - vbr = GetConfig().AudioQualityLowOpusVBR - signalType = GetConfig().AudioQualityLowOpusSignalType - bandwidth = GetConfig().AudioQualityLowOpusBandwidth - dtx = GetConfig().AudioQualityLowOpusDTX + complexity = Config.AudioQualityLowOpusComplexity + vbr = Config.AudioQualityLowOpusVBR + signalType = Config.AudioQualityLowOpusSignalType + bandwidth = Config.AudioQualityLowOpusBandwidth + dtx = Config.AudioQualityLowOpusDTX case AudioQualityMedium: - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX case AudioQualityHigh: - complexity = GetConfig().AudioQualityHighOpusComplexity - vbr = GetConfig().AudioQualityHighOpusVBR - signalType = GetConfig().AudioQualityHighOpusSignalType - bandwidth = GetConfig().AudioQualityHighOpusBandwidth - dtx = GetConfig().AudioQualityHighOpusDTX + complexity = Config.AudioQualityHighOpusComplexity + vbr = Config.AudioQualityHighOpusVBR + signalType = Config.AudioQualityHighOpusSignalType + bandwidth = Config.AudioQualityHighOpusBandwidth + dtx = Config.AudioQualityHighOpusDTX case AudioQualityUltra: - complexity = GetConfig().AudioQualityUltraOpusComplexity - vbr = GetConfig().AudioQualityUltraOpusVBR - signalType = GetConfig().AudioQualityUltraOpusSignalType - bandwidth = GetConfig().AudioQualityUltraOpusBandwidth - dtx = GetConfig().AudioQualityUltraOpusDTX + complexity = Config.AudioQualityUltraOpusComplexity + vbr = Config.AudioQualityUltraOpusVBR + signalType = Config.AudioQualityUltraOpusSignalType + bandwidth = Config.AudioQualityUltraOpusBandwidth + dtx = Config.AudioQualityUltraOpusDTX default: // Use medium quality as fallback - complexity = GetConfig().AudioQualityMediumOpusComplexity - vbr = GetConfig().AudioQualityMediumOpusVBR - signalType = GetConfig().AudioQualityMediumOpusSignalType - bandwidth = GetConfig().AudioQualityMediumOpusBandwidth - dtx = GetConfig().AudioQualityMediumOpusDTX + complexity = Config.AudioQualityMediumOpusComplexity + vbr = Config.AudioQualityMediumOpusVBR + signalType = Config.AudioQualityMediumOpusSignalType + bandwidth = Config.AudioQualityMediumOpusBandwidth + dtx = Config.AudioQualityMediumOpusDTX } // Update audio input subprocess configuration dynamically without restart diff --git a/internal/audio/socket_buffer.go b/internal/audio/socket_buffer.go index b92dff90..a6f7e48d 100644 --- a/internal/audio/socket_buffer.go +++ b/internal/audio/socket_buffer.go @@ -18,8 +18,8 @@ type SocketBufferConfig struct { // DefaultSocketBufferConfig returns the default socket buffer configuration func DefaultSocketBufferConfig() SocketBufferConfig { return SocketBufferConfig{ - SendBufferSize: GetConfig().SocketOptimalBuffer, - RecvBufferSize: GetConfig().SocketOptimalBuffer, + SendBufferSize: Config.SocketOptimalBuffer, + RecvBufferSize: Config.SocketOptimalBuffer, Enabled: true, } } @@ -27,8 +27,8 @@ func DefaultSocketBufferConfig() SocketBufferConfig { // HighLoadSocketBufferConfig returns configuration for high-load scenarios func HighLoadSocketBufferConfig() SocketBufferConfig { return SocketBufferConfig{ - SendBufferSize: GetConfig().SocketMaxBuffer, - RecvBufferSize: GetConfig().SocketMaxBuffer, + SendBufferSize: Config.SocketMaxBuffer, + RecvBufferSize: Config.SocketMaxBuffer, Enabled: true, } } @@ -123,8 +123,8 @@ func ValidateSocketBufferConfig(config SocketBufferConfig) error { return nil } - minBuffer := GetConfig().SocketMinBuffer - maxBuffer := GetConfig().SocketMaxBuffer + minBuffer := Config.SocketMinBuffer + maxBuffer := Config.SocketMaxBuffer if config.SendBufferSize < minBuffer { return fmt.Errorf("send buffer size validation failed: got %d bytes, minimum required %d bytes (configured range: %d-%d)", diff --git a/internal/audio/util_buffer_pool.go b/internal/audio/util_buffer_pool.go index b9232bbb..86d9d40b 100644 --- a/internal/audio/util_buffer_pool.go +++ b/internal/audio/util_buffer_pool.go @@ -366,17 +366,17 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { // Validate buffer size parameter if err := ValidateBufferSize(bufferSize); err != nil { // Use default value on validation error - bufferSize = GetConfig().AudioFramePoolSize + bufferSize = Config.AudioFramePoolSize } // Enhanced preallocation strategy based on buffer size and system capacity var preallocSize int - if bufferSize <= GetConfig().AudioFramePoolSize { + if bufferSize <= Config.AudioFramePoolSize { // For smaller pools, use enhanced preallocation (40% instead of 20%) - preallocSize = GetConfig().PreallocPercentage * 2 + preallocSize = Config.PreallocPercentage * 2 } else { // For larger pools, use standard enhanced preallocation (30% instead of 10%) - preallocSize = (GetConfig().PreallocPercentage * 3) / 2 + preallocSize = (Config.PreallocPercentage * 3) / 2 } // Ensure minimum preallocation for better performance @@ -594,9 +594,9 @@ func (p *AudioBufferPool) Put(buf []byte) { // Enhanced global buffer pools for different audio frame types with improved sizing var ( // Main audio frame pool with enhanced capacity - audioFramePool = NewAudioBufferPool(GetConfig().AudioFramePoolSize) + audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize) // Control message pool with enhanced capacity for better throughput - audioControlPool = NewAudioBufferPool(512) // Increased from GetConfig().OutputHeaderSize to 512 for better control message handling + audioControlPool = NewAudioBufferPool(512) // Increased from Config.OutputHeaderSize to 512 for better control message handling ) func GetAudioFrameBuffer() []byte { @@ -628,7 +628,7 @@ func (p *AudioBufferPool) GetPoolStats() AudioBufferPoolDetailedStats { var hitRate float64 if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * GetConfig().PercentageMultiplier + hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier } return AudioBufferPoolDetailedStats{ diff --git a/internal/audio/util_env.go b/internal/audio/util_env.go index 8c01d4f1..70b9c12c 100644 --- a/internal/audio/util_env.go +++ b/internal/audio/util_env.go @@ -21,12 +21,12 @@ func getEnvInt(key string, defaultValue int) int { // with fallback to default config values func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) { // Read configuration from environment variables with config defaults - bitrate = getEnvInt("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate) - complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity) - vbr = getEnvInt("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR) - signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType) - bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth) - dtx = getEnvInt("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX) + bitrate = getEnvInt("JETKVM_OPUS_BITRATE", Config.CGOOpusBitrate) + complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", Config.CGOOpusComplexity) + vbr = getEnvInt("JETKVM_OPUS_VBR", Config.CGOOpusVBR) + signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", Config.CGOOpusSignalType) + bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", Config.CGOOpusBandwidth) + dtx = getEnvInt("JETKVM_OPUS_DTX", Config.CGOOpusDTX) return bitrate, complexity, vbr, signalType, bandwidth, dtx } @@ -34,7 +34,7 @@ func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int // applyOpusConfig applies OPUS configuration to the global config // with optional logging for the specified component func applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx int, component string, enableLogging bool) { - config := GetConfig() + config := Config config.CGOOpusBitrate = bitrate config.CGOOpusComplexity = complexity config.CGOOpusVBR = vbr diff --git a/internal/audio/webrtc_relay.go b/internal/audio/webrtc_relay.go index 43ccbfec..6a338564 100644 --- a/internal/audio/webrtc_relay.go +++ b/internal/audio/webrtc_relay.go @@ -134,7 +134,7 @@ func (r *AudioRelay) relayLoop() { defer r.wg.Done() r.logger.Debug().Msg("Audio relay loop started") - var maxConsecutiveErrors = GetConfig().MaxConsecutiveErrors + var maxConsecutiveErrors = Config.MaxConsecutiveErrors consecutiveErrors := 0 for { @@ -153,7 +153,7 @@ func (r *AudioRelay) relayLoop() { r.logger.Error().Int("consecutive_errors", consecutiveErrors).Int("max_errors", maxConsecutiveErrors).Msg("too many consecutive read errors, stopping audio relay") return } - time.Sleep(GetConfig().ShortSleepDuration) + time.Sleep(Config.ShortSleepDuration) continue } diff --git a/internal/audio/websocket_events.go b/internal/audio/websocket_events.go index 6edf24f6..d2e2146c 100644 --- a/internal/audio/websocket_events.go +++ b/internal/audio/websocket_events.go @@ -224,7 +224,7 @@ func (aeb *AudioEventBroadcaster) sendToSubscriber(subscriber *AudioEventSubscri return false } - ctx, cancel := context.WithTimeout(subscriber.ctx, time.Duration(GetConfig().EventTimeoutSeconds)*time.Second) + ctx, cancel := context.WithTimeout(subscriber.ctx, time.Duration(Config.EventTimeoutSeconds)*time.Second) defer cancel() err := wsjson.Write(ctx, subscriber.conn, event) diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index e74122cb..0c7edda2 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -98,8 +98,8 @@ type ZeroCopyFramePool struct { // NewZeroCopyFramePool creates a new zero-copy frame pool func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { // Pre-allocate frames for immediate availability - preallocSizeBytes := GetConfig().PreallocSize - maxPoolSize := GetConfig().MaxPoolSize // Limit total pool size + preallocSizeBytes := Config.PreallocSize + maxPoolSize := Config.MaxPoolSize // Limit total pool size // Calculate number of frames based on memory budget, not frame count preallocFrameCount := preallocSizeBytes / maxFrameSize @@ -327,7 +327,7 @@ func (p *ZeroCopyFramePool) GetZeroCopyPoolStats() ZeroCopyFramePoolStats { var hitRate float64 if totalRequests > 0 { - hitRate = float64(hitCount) / float64(totalRequests) * GetConfig().PercentageMultiplier + hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier } return ZeroCopyFramePoolStats{ diff --git a/main.go b/main.go index 9d62db04..1de6ac4c 100644 --- a/main.go +++ b/main.go @@ -58,7 +58,7 @@ func startAudioSubprocess() error { audio.SetAudioInputSupervisor(audioInputSupervisor) // Set default OPUS configuration for audio input supervisor (low quality for single-core RV1106) - config := audio.GetConfig() + config := audio.Config audioInputSupervisor.SetOpusConfig( config.AudioQualityLowInputBitrate*1000, // Convert kbps to bps config.AudioQualityLowOpusComplexity, @@ -81,7 +81,8 @@ func startAudioSubprocess() error { // This prevents "no client connected" errors during quality changes go func() { // Give the audio output server time to initialize and start listening - time.Sleep(500 * time.Millisecond) + // Increased delay to reduce frame drops during connection establishment + time.Sleep(1 * time.Second) // Start audio relay system for main process // If there's an active WebRTC session, use its audio track