From 0ebfc762f7c1613ed07ce36fca314092082bb416 Mon Sep 17 00:00:00 2001 From: Alex P Date: Tue, 9 Sep 2025 05:41:20 +0000 Subject: [PATCH] [WIP] Cleanup: PR SImplification --- internal/audio/adaptive_buffer.go | 150 ------------------------ internal/audio/core_config_constants.go | 12 -- internal/audio/input_server_main.go | 4 - internal/audio/ipc_input.go | 73 ++---------- internal/audio/quality_presets.go | 18 --- main.go | 6 +- 6 files changed, 9 insertions(+), 254 deletions(-) diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index e357cce3..dc9f6f6a 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -88,10 +88,6 @@ type AdaptiveBufferManager struct { systemCPUPercent int64 // System CPU percentage * 100 (atomic) systemMemoryPercent int64 // System memory percentage * 100 (atomic) adaptationCount int64 // Metrics tracking (atomic) - // Graceful degradation fields - congestionLevel int64 // Current congestion level (0-3, atomic) - degradationActive int64 // Whether degradation is active (0/1, atomic) - lastCongestionTime int64 // Last congestion detection time (unix nano, atomic) config AdaptiveBufferConfig logger zerolog.Logger @@ -194,139 +190,6 @@ func (abm *AdaptiveBufferManager) BoostBuffersForQualityChange() { Msg("Boosted buffers to maximum size for quality change") } -// DetectCongestion analyzes system state to detect audio channel congestion -// Returns congestion level: 0=none, 1=mild, 2=moderate, 3=severe -func (abm *AdaptiveBufferManager) DetectCongestion() int { - cpuPercent := float64(atomic.LoadInt64(&abm.systemCPUPercent)) / 100.0 - memoryPercent := float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / 100.0 - latencyNs := atomic.LoadInt64(&abm.averageLatency) - latency := time.Duration(latencyNs) - - // Calculate congestion score based on multiple factors - congestionScore := 0.0 - - // CPU factor (weight: 0.4) - if cpuPercent > abm.config.HighCPUThreshold { - congestionScore += 0.4 * (cpuPercent - abm.config.HighCPUThreshold) / (100.0 - abm.config.HighCPUThreshold) - } - - // Memory factor (weight: 0.3) - if memoryPercent > abm.config.HighMemoryThreshold { - congestionScore += 0.3 * (memoryPercent - abm.config.HighMemoryThreshold) / (100.0 - abm.config.HighMemoryThreshold) - } - - // Latency factor (weight: 0.3) - latencyMs := float64(latency.Milliseconds()) - latencyThreshold := float64(abm.config.TargetLatency.Milliseconds()) - if latencyMs > latencyThreshold { - congestionScore += 0.3 * (latencyMs - latencyThreshold) / latencyThreshold - } - - // Determine congestion level using configured threshold multiplier - if congestionScore > Config.CongestionThresholdMultiplier { - return 3 // Severe congestion - } else if congestionScore > Config.CongestionThresholdMultiplier*0.625 { // 0.8 * 0.625 = 0.5 - return 2 // Moderate congestion - } else if congestionScore > Config.CongestionThresholdMultiplier*0.25 { // 0.8 * 0.25 = 0.2 - return 1 // Mild congestion - } - return 0 // No congestion -} - -// ActivateGracefulDegradation implements emergency measures when congestion is detected -func (abm *AdaptiveBufferManager) ActivateGracefulDegradation(level int) { - atomic.StoreInt64(&abm.congestionLevel, int64(level)) - atomic.StoreInt64(&abm.degradationActive, 1) - atomic.StoreInt64(&abm.lastCongestionTime, time.Now().UnixNano()) - - switch level { - case 1: // Mild congestion - // Reduce buffers by configured factor - currentInput := atomic.LoadInt64(&abm.currentInputBufferSize) - currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize) - newInput := int64(float64(currentInput) * Config.CongestionMildReductionFactor) - newOutput := int64(float64(currentOutput) * Config.CongestionMildReductionFactor) - - // Ensure minimum buffer size - if newInput < int64(abm.config.MinBufferSize) { - newInput = int64(abm.config.MinBufferSize) - } - if newOutput < int64(abm.config.MinBufferSize) { - newOutput = int64(abm.config.MinBufferSize) - } - - atomic.StoreInt64(&abm.currentInputBufferSize, newInput) - atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput) - - abm.logger.Warn(). - Int("level", level). - Int64("input_buffer", newInput). - Int64("output_buffer", newOutput). - Msg("Activated mild graceful degradation") - - case 2: // Moderate congestion - // Reduce buffers by configured factor and trigger quality reduction - currentInput := atomic.LoadInt64(&abm.currentInputBufferSize) - currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize) - newInput := int64(float64(currentInput) * Config.CongestionModerateReductionFactor) - newOutput := int64(float64(currentOutput) * Config.CongestionModerateReductionFactor) - - // Ensure minimum buffer size - if newInput < int64(abm.config.MinBufferSize) { - newInput = int64(abm.config.MinBufferSize) - } - if newOutput < int64(abm.config.MinBufferSize) { - newOutput = int64(abm.config.MinBufferSize) - } - - atomic.StoreInt64(&abm.currentInputBufferSize, newInput) - atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput) - - abm.logger.Warn(). - Int("level", level). - Int64("input_buffer", newInput). - Int64("output_buffer", newOutput). - Msg("Activated moderate graceful degradation") - - case 3: // Severe congestion - // Emergency: Set buffers to minimum and force lowest quality - minSize := int64(abm.config.MinBufferSize) - atomic.StoreInt64(&abm.currentInputBufferSize, minSize) - atomic.StoreInt64(&abm.currentOutputBufferSize, minSize) - - abm.logger.Warn(). - Int("level", level). - Int64("buffer_size", minSize). - Msg("Activated severe graceful degradation - emergency mode") - } -} - -// CheckRecoveryConditions determines if degradation can be deactivated -func (abm *AdaptiveBufferManager) CheckRecoveryConditions() bool { - if atomic.LoadInt64(&abm.degradationActive) == 0 { - return false // Not in degradation mode - } - - // Check if congestion has been resolved for the configured timeout - lastCongestion := time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime)) - if time.Since(lastCongestion) < Config.CongestionRecoveryTimeout { - return false - } - - // Check current system state - currentCongestion := abm.DetectCongestion() - if currentCongestion == 0 { - // Deactivate degradation - atomic.StoreInt64(&abm.degradationActive, 0) - atomic.StoreInt64(&abm.congestionLevel, 0) - - abm.logger.Info().Msg("Deactivated graceful degradation - system recovered") - return true - } - - return false -} - // adaptationLoop is the main loop that adjusts buffer sizes func (abm *AdaptiveBufferManager) adaptationLoop() { defer abm.wg.Done() @@ -372,16 +235,6 @@ func (abm *AdaptiveBufferManager) adaptationLoop() { // The algorithm runs periodically and only applies changes when the adaptation interval // has elapsed, preventing excessive adjustments that could destabilize the audio pipeline. func (abm *AdaptiveBufferManager) adaptBufferSizes() { - // Check for congestion and activate graceful degradation if needed - congestionLevel := abm.DetectCongestion() - if congestionLevel > 0 { - abm.ActivateGracefulDegradation(congestionLevel) - return // Skip normal adaptation during degradation - } - - // Check if we can recover from degradation - abm.CheckRecoveryConditions() - // Collect current system metrics metrics := abm.processMonitor.GetCurrentMetrics() if len(metrics) == 0 { @@ -588,9 +441,6 @@ func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} { "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier, "adaptation_count": atomic.LoadInt64(&abm.adaptationCount), "last_adaptation": lastAdaptation, - "congestion_level": atomic.LoadInt64(&abm.congestionLevel), - "degradation_active": atomic.LoadInt64(&abm.degradationActive) == 1, - "last_congestion_time": time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime)), } } diff --git a/internal/audio/core_config_constants.go b/internal/audio/core_config_constants.go index 7a3a1227..5bdaefe7 100644 --- a/internal/audio/core_config_constants.go +++ b/internal/audio/core_config_constants.go @@ -329,12 +329,6 @@ type AudioConfigConstants struct { QualityChangeSettleDelay time.Duration // Delay for quality change to settle QualityChangeRecoveryDelay time.Duration // Delay before attempting recovery - // Graceful Degradation Configuration - CongestionMildReductionFactor float64 // Buffer reduction factor for mild congestion (0.75) - CongestionModerateReductionFactor float64 // Buffer reduction factor for moderate congestion (0.5) - CongestionThresholdMultiplier float64 // Multiplier for congestion threshold calculations (0.8) - CongestionRecoveryTimeout time.Duration // Timeout for congestion recovery (5 seconds) - // Buffer Pool Cache Configuration BufferPoolCacheSize int // Buffers per goroutine cache (4) BufferPoolCacheTTL time.Duration // Cache TTL for aggressive cleanup (5s) @@ -527,12 +521,6 @@ func DefaultAudioConfig() *AudioConfigConstants { QualityChangeSettleDelay: 2 * time.Second, // Delay for quality change to settle QualityChangeRecoveryDelay: 1 * time.Second, // Delay before attempting recovery - // Graceful Degradation Configuration - CongestionMildReductionFactor: 0.75, // Buffer reduction factor for mild congestion (0.75) - CongestionModerateReductionFactor: 0.5, // Buffer reduction factor for moderate congestion (0.5) - CongestionThresholdMultiplier: 36.0, // Multiplier for congestion threshold calculations (increased to reduce false emergency mode triggers) - CongestionRecoveryTimeout: 5 * time.Second, // Timeout for congestion recovery (5 seconds) - // Buffer Pool Cache Configuration BufferPoolCacheSize: 4, // Buffers per goroutine cache BufferPoolCacheTTL: 5 * time.Second, // Cache TTL for aggressive cleanup diff --git a/internal/audio/input_server_main.go b/internal/audio/input_server_main.go index dc8b77e3..46defce9 100644 --- a/internal/audio/input_server_main.go +++ b/internal/audio/input_server_main.go @@ -55,10 +55,6 @@ func RunAudioInputServer() error { // Initialize validation cache for optimal performance InitValidationCache() - // Start adaptive buffer management for optimal performance - StartAdaptiveBuffering() - defer StopAdaptiveBuffering() - // Initialize CGO audio playback (optional for input server) // This is used for audio loopback/monitoring features err := CGOAudioPlaybackInit() diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 730d2478..07147af5 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -231,9 +231,8 @@ func NewAudioInputServer() (*AudioInputServer, error) { return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err) } - // Get initial buffer size from adaptive buffer manager - adaptiveManager := GetAdaptiveBufferManager() - initialBufferSize := int64(adaptiveManager.GetInputBufferSize()) + // Get initial buffer size from config + initialBufferSize := int64(Config.AdaptiveDefaultBufferSize) // Ensure minimum buffer size to prevent immediate overflow // Use at least 50 frames to handle burst traffic @@ -1221,8 +1220,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { // Check if we need to update buffer size select { case <-bufferUpdateTicker.C: - // Update buffer size from adaptive buffer manager - ais.UpdateBufferSize() + // Buffer size is now fixed from config default: // No buffer update needed } @@ -1251,71 +1249,16 @@ func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessi atomic.LoadInt64(&ais.bufferSize) } -// UpdateBufferSize updates the buffer size from adaptive buffer manager +// UpdateBufferSize updates the buffer size (now using fixed config values) func (ais *AudioInputServer) UpdateBufferSize() { - adaptiveManager := GetAdaptiveBufferManager() - newSize := int64(adaptiveManager.GetInputBufferSize()) - oldSize := atomic.LoadInt64(&ais.bufferSize) - - // Only recreate channels if size changed significantly (>25% difference) - if oldSize > 0 { - diff := float64(newSize-oldSize) / float64(oldSize) - if diff < 0.25 && diff > -0.25 { - return // Size change not significant enough - } - } - + // Buffer size is now fixed from config + newSize := int64(Config.AdaptiveDefaultBufferSize) atomic.StoreInt64(&ais.bufferSize, newSize) - - // Recreate channels with new buffer size if server is running - if ais.running { - ais.recreateChannels(int(newSize)) - } } -// recreateChannels recreates the message channels with new buffer size -func (ais *AudioInputServer) recreateChannels(newSize int) { - ais.channelMutex.Lock() - defer ais.channelMutex.Unlock() - - // Create new channels with updated buffer size - newMessageChan := make(chan *InputIPCMessage, newSize) - newProcessChan := make(chan *InputIPCMessage, newSize) - - // Drain old channels and transfer messages to new channels - ais.drainAndTransferChannel(ais.messageChan, newMessageChan) - ais.drainAndTransferChannel(ais.processChan, newProcessChan) - - // Replace channels atomically - ais.messageChan = newMessageChan - ais.processChan = newProcessChan - ais.lastBufferSize = int64(newSize) -} - -// drainAndTransferChannel drains the old channel and transfers messages to new channel -func (ais *AudioInputServer) drainAndTransferChannel(oldChan, newChan chan *InputIPCMessage) { - for { - select { - case msg := <-oldChan: - // Try to transfer to new channel, drop if full - select { - case newChan <- msg: - // Successfully transferred - default: - // New channel full, drop message - atomic.AddInt64(&ais.droppedFrames, 1) - } - default: - // Old channel empty - return - } - } -} - -// ReportLatency reports processing latency to adaptive buffer manager +// ReportLatency reports processing latency (now a no-op with fixed buffers) func (ais *AudioInputServer) ReportLatency(latency time.Duration) { - adaptiveManager := GetAdaptiveBufferManager() - adaptiveManager.UpdateLatency(latency) + // Latency reporting is now a no-op with fixed buffer sizes } // GetMessagePoolStats returns detailed statistics about the message pool diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 1888f872..bedfa104 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -208,15 +208,6 @@ func SetAudioQuality(quality AudioQuality) { logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() logger.Info().Int("quality", int(quality)).Msg("updating audio output quality settings dynamically") - // Immediately boost adaptive buffer sizes to handle quality change frame burst - // This prevents "Message channel full, dropping frame" warnings during transitions - adaptiveManager := GetAdaptiveBufferManager() - if adaptiveManager != nil { - // Immediately set buffers to maximum size for quality change - adaptiveManager.BoostBuffersForQualityChange() - logger.Debug().Msg("boosted adaptive buffers for quality change") - } - // Set new OPUS configuration for future restarts if supervisor := GetAudioOutputSupervisor(); supervisor != nil { supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx) @@ -311,15 +302,6 @@ func SetMicrophoneQuality(quality AudioQuality) { logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() logger.Info().Int("quality", int(quality)).Msg("updating audio input subprocess quality settings dynamically") - // Immediately boost adaptive buffer sizes to handle quality change frame burst - // This prevents "Message channel full, dropping frame" warnings during transitions - adaptiveManager := GetAdaptiveBufferManager() - if adaptiveManager != nil { - // Immediately set buffers to maximum size for quality change - adaptiveManager.BoostBuffersForQualityChange() - logger.Debug().Msg("boosted adaptive buffers for quality change") - } - // Set new OPUS configuration for future restarts supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx) diff --git a/main.go b/main.go index 1de6ac4c..1bc7b686 100644 --- a/main.go +++ b/main.go @@ -35,9 +35,6 @@ func startAudioSubprocess() error { // Initialize validation cache for optimal performance audio.InitValidationCache() - // Start adaptive buffer management for optimal performance - audio.StartAdaptiveBuffering() - // Start goroutine monitoring to detect and prevent leaks audio.StartGoroutineMonitoring() @@ -114,8 +111,7 @@ func startAudioSubprocess() error { // Stop audio relay when process exits audio.StopAudioRelay() - // Stop adaptive buffering - audio.StopAdaptiveBuffering() + // Stop goroutine monitoring audio.StopGoroutineMonitoring() // Disable batch audio processing