[WIP] Cleanup: PR SImplification

This commit is contained in:
Alex P 2025-09-09 05:41:20 +00:00
parent 845eadec18
commit 0ebfc762f7
6 changed files with 9 additions and 254 deletions

View File

@ -88,10 +88,6 @@ type AdaptiveBufferManager struct {
systemCPUPercent int64 // System CPU percentage * 100 (atomic) systemCPUPercent int64 // System CPU percentage * 100 (atomic)
systemMemoryPercent int64 // System memory percentage * 100 (atomic) systemMemoryPercent int64 // System memory percentage * 100 (atomic)
adaptationCount int64 // Metrics tracking (atomic) adaptationCount int64 // Metrics tracking (atomic)
// Graceful degradation fields
congestionLevel int64 // Current congestion level (0-3, atomic)
degradationActive int64 // Whether degradation is active (0/1, atomic)
lastCongestionTime int64 // Last congestion detection time (unix nano, atomic)
config AdaptiveBufferConfig config AdaptiveBufferConfig
logger zerolog.Logger logger zerolog.Logger
@ -194,139 +190,6 @@ func (abm *AdaptiveBufferManager) BoostBuffersForQualityChange() {
Msg("Boosted buffers to maximum size for quality change") Msg("Boosted buffers to maximum size for quality change")
} }
// DetectCongestion analyzes system state to detect audio channel congestion
// Returns congestion level: 0=none, 1=mild, 2=moderate, 3=severe
func (abm *AdaptiveBufferManager) DetectCongestion() int {
cpuPercent := float64(atomic.LoadInt64(&abm.systemCPUPercent)) / 100.0
memoryPercent := float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / 100.0
latencyNs := atomic.LoadInt64(&abm.averageLatency)
latency := time.Duration(latencyNs)
// Calculate congestion score based on multiple factors
congestionScore := 0.0
// CPU factor (weight: 0.4)
if cpuPercent > abm.config.HighCPUThreshold {
congestionScore += 0.4 * (cpuPercent - abm.config.HighCPUThreshold) / (100.0 - abm.config.HighCPUThreshold)
}
// Memory factor (weight: 0.3)
if memoryPercent > abm.config.HighMemoryThreshold {
congestionScore += 0.3 * (memoryPercent - abm.config.HighMemoryThreshold) / (100.0 - abm.config.HighMemoryThreshold)
}
// Latency factor (weight: 0.3)
latencyMs := float64(latency.Milliseconds())
latencyThreshold := float64(abm.config.TargetLatency.Milliseconds())
if latencyMs > latencyThreshold {
congestionScore += 0.3 * (latencyMs - latencyThreshold) / latencyThreshold
}
// Determine congestion level using configured threshold multiplier
if congestionScore > Config.CongestionThresholdMultiplier {
return 3 // Severe congestion
} else if congestionScore > Config.CongestionThresholdMultiplier*0.625 { // 0.8 * 0.625 = 0.5
return 2 // Moderate congestion
} else if congestionScore > Config.CongestionThresholdMultiplier*0.25 { // 0.8 * 0.25 = 0.2
return 1 // Mild congestion
}
return 0 // No congestion
}
// ActivateGracefulDegradation implements emergency measures when congestion is detected
func (abm *AdaptiveBufferManager) ActivateGracefulDegradation(level int) {
atomic.StoreInt64(&abm.congestionLevel, int64(level))
atomic.StoreInt64(&abm.degradationActive, 1)
atomic.StoreInt64(&abm.lastCongestionTime, time.Now().UnixNano())
switch level {
case 1: // Mild congestion
// Reduce buffers by configured factor
currentInput := atomic.LoadInt64(&abm.currentInputBufferSize)
currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize)
newInput := int64(float64(currentInput) * Config.CongestionMildReductionFactor)
newOutput := int64(float64(currentOutput) * Config.CongestionMildReductionFactor)
// Ensure minimum buffer size
if newInput < int64(abm.config.MinBufferSize) {
newInput = int64(abm.config.MinBufferSize)
}
if newOutput < int64(abm.config.MinBufferSize) {
newOutput = int64(abm.config.MinBufferSize)
}
atomic.StoreInt64(&abm.currentInputBufferSize, newInput)
atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput)
abm.logger.Warn().
Int("level", level).
Int64("input_buffer", newInput).
Int64("output_buffer", newOutput).
Msg("Activated mild graceful degradation")
case 2: // Moderate congestion
// Reduce buffers by configured factor and trigger quality reduction
currentInput := atomic.LoadInt64(&abm.currentInputBufferSize)
currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize)
newInput := int64(float64(currentInput) * Config.CongestionModerateReductionFactor)
newOutput := int64(float64(currentOutput) * Config.CongestionModerateReductionFactor)
// Ensure minimum buffer size
if newInput < int64(abm.config.MinBufferSize) {
newInput = int64(abm.config.MinBufferSize)
}
if newOutput < int64(abm.config.MinBufferSize) {
newOutput = int64(abm.config.MinBufferSize)
}
atomic.StoreInt64(&abm.currentInputBufferSize, newInput)
atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput)
abm.logger.Warn().
Int("level", level).
Int64("input_buffer", newInput).
Int64("output_buffer", newOutput).
Msg("Activated moderate graceful degradation")
case 3: // Severe congestion
// Emergency: Set buffers to minimum and force lowest quality
minSize := int64(abm.config.MinBufferSize)
atomic.StoreInt64(&abm.currentInputBufferSize, minSize)
atomic.StoreInt64(&abm.currentOutputBufferSize, minSize)
abm.logger.Warn().
Int("level", level).
Int64("buffer_size", minSize).
Msg("Activated severe graceful degradation - emergency mode")
}
}
// CheckRecoveryConditions determines if degradation can be deactivated
func (abm *AdaptiveBufferManager) CheckRecoveryConditions() bool {
if atomic.LoadInt64(&abm.degradationActive) == 0 {
return false // Not in degradation mode
}
// Check if congestion has been resolved for the configured timeout
lastCongestion := time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime))
if time.Since(lastCongestion) < Config.CongestionRecoveryTimeout {
return false
}
// Check current system state
currentCongestion := abm.DetectCongestion()
if currentCongestion == 0 {
// Deactivate degradation
atomic.StoreInt64(&abm.degradationActive, 0)
atomic.StoreInt64(&abm.congestionLevel, 0)
abm.logger.Info().Msg("Deactivated graceful degradation - system recovered")
return true
}
return false
}
// adaptationLoop is the main loop that adjusts buffer sizes // adaptationLoop is the main loop that adjusts buffer sizes
func (abm *AdaptiveBufferManager) adaptationLoop() { func (abm *AdaptiveBufferManager) adaptationLoop() {
defer abm.wg.Done() defer abm.wg.Done()
@ -372,16 +235,6 @@ func (abm *AdaptiveBufferManager) adaptationLoop() {
// The algorithm runs periodically and only applies changes when the adaptation interval // The algorithm runs periodically and only applies changes when the adaptation interval
// has elapsed, preventing excessive adjustments that could destabilize the audio pipeline. // has elapsed, preventing excessive adjustments that could destabilize the audio pipeline.
func (abm *AdaptiveBufferManager) adaptBufferSizes() { func (abm *AdaptiveBufferManager) adaptBufferSizes() {
// Check for congestion and activate graceful degradation if needed
congestionLevel := abm.DetectCongestion()
if congestionLevel > 0 {
abm.ActivateGracefulDegradation(congestionLevel)
return // Skip normal adaptation during degradation
}
// Check if we can recover from degradation
abm.CheckRecoveryConditions()
// Collect current system metrics // Collect current system metrics
metrics := abm.processMonitor.GetCurrentMetrics() metrics := abm.processMonitor.GetCurrentMetrics()
if len(metrics) == 0 { if len(metrics) == 0 {
@ -588,9 +441,6 @@ func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} {
"system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier, "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier,
"adaptation_count": atomic.LoadInt64(&abm.adaptationCount), "adaptation_count": atomic.LoadInt64(&abm.adaptationCount),
"last_adaptation": lastAdaptation, "last_adaptation": lastAdaptation,
"congestion_level": atomic.LoadInt64(&abm.congestionLevel),
"degradation_active": atomic.LoadInt64(&abm.degradationActive) == 1,
"last_congestion_time": time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime)),
} }
} }

View File

@ -329,12 +329,6 @@ type AudioConfigConstants struct {
QualityChangeSettleDelay time.Duration // Delay for quality change to settle QualityChangeSettleDelay time.Duration // Delay for quality change to settle
QualityChangeRecoveryDelay time.Duration // Delay before attempting recovery QualityChangeRecoveryDelay time.Duration // Delay before attempting recovery
// Graceful Degradation Configuration
CongestionMildReductionFactor float64 // Buffer reduction factor for mild congestion (0.75)
CongestionModerateReductionFactor float64 // Buffer reduction factor for moderate congestion (0.5)
CongestionThresholdMultiplier float64 // Multiplier for congestion threshold calculations (0.8)
CongestionRecoveryTimeout time.Duration // Timeout for congestion recovery (5 seconds)
// Buffer Pool Cache Configuration // Buffer Pool Cache Configuration
BufferPoolCacheSize int // Buffers per goroutine cache (4) BufferPoolCacheSize int // Buffers per goroutine cache (4)
BufferPoolCacheTTL time.Duration // Cache TTL for aggressive cleanup (5s) BufferPoolCacheTTL time.Duration // Cache TTL for aggressive cleanup (5s)
@ -527,12 +521,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
QualityChangeSettleDelay: 2 * time.Second, // Delay for quality change to settle QualityChangeSettleDelay: 2 * time.Second, // Delay for quality change to settle
QualityChangeRecoveryDelay: 1 * time.Second, // Delay before attempting recovery QualityChangeRecoveryDelay: 1 * time.Second, // Delay before attempting recovery
// Graceful Degradation Configuration
CongestionMildReductionFactor: 0.75, // Buffer reduction factor for mild congestion (0.75)
CongestionModerateReductionFactor: 0.5, // Buffer reduction factor for moderate congestion (0.5)
CongestionThresholdMultiplier: 36.0, // Multiplier for congestion threshold calculations (increased to reduce false emergency mode triggers)
CongestionRecoveryTimeout: 5 * time.Second, // Timeout for congestion recovery (5 seconds)
// Buffer Pool Cache Configuration // Buffer Pool Cache Configuration
BufferPoolCacheSize: 4, // Buffers per goroutine cache BufferPoolCacheSize: 4, // Buffers per goroutine cache
BufferPoolCacheTTL: 5 * time.Second, // Cache TTL for aggressive cleanup BufferPoolCacheTTL: 5 * time.Second, // Cache TTL for aggressive cleanup

View File

@ -55,10 +55,6 @@ func RunAudioInputServer() error {
// Initialize validation cache for optimal performance // Initialize validation cache for optimal performance
InitValidationCache() InitValidationCache()
// Start adaptive buffer management for optimal performance
StartAdaptiveBuffering()
defer StopAdaptiveBuffering()
// Initialize CGO audio playback (optional for input server) // Initialize CGO audio playback (optional for input server)
// This is used for audio loopback/monitoring features // This is used for audio loopback/monitoring features
err := CGOAudioPlaybackInit() err := CGOAudioPlaybackInit()

View File

@ -231,9 +231,8 @@ func NewAudioInputServer() (*AudioInputServer, error) {
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err) return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
} }
// Get initial buffer size from adaptive buffer manager // Get initial buffer size from config
adaptiveManager := GetAdaptiveBufferManager() initialBufferSize := int64(Config.AdaptiveDefaultBufferSize)
initialBufferSize := int64(adaptiveManager.GetInputBufferSize())
// Ensure minimum buffer size to prevent immediate overflow // Ensure minimum buffer size to prevent immediate overflow
// Use at least 50 frames to handle burst traffic // Use at least 50 frames to handle burst traffic
@ -1221,8 +1220,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
// Check if we need to update buffer size // Check if we need to update buffer size
select { select {
case <-bufferUpdateTicker.C: case <-bufferUpdateTicker.C:
// Update buffer size from adaptive buffer manager // Buffer size is now fixed from config
ais.UpdateBufferSize()
default: default:
// No buffer update needed // No buffer update needed
} }
@ -1251,71 +1249,16 @@ func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessi
atomic.LoadInt64(&ais.bufferSize) atomic.LoadInt64(&ais.bufferSize)
} }
// UpdateBufferSize updates the buffer size from adaptive buffer manager // UpdateBufferSize updates the buffer size (now using fixed config values)
func (ais *AudioInputServer) UpdateBufferSize() { func (ais *AudioInputServer) UpdateBufferSize() {
adaptiveManager := GetAdaptiveBufferManager() // Buffer size is now fixed from config
newSize := int64(adaptiveManager.GetInputBufferSize()) newSize := int64(Config.AdaptiveDefaultBufferSize)
oldSize := atomic.LoadInt64(&ais.bufferSize)
// Only recreate channels if size changed significantly (>25% difference)
if oldSize > 0 {
diff := float64(newSize-oldSize) / float64(oldSize)
if diff < 0.25 && diff > -0.25 {
return // Size change not significant enough
}
}
atomic.StoreInt64(&ais.bufferSize, newSize) atomic.StoreInt64(&ais.bufferSize, newSize)
// Recreate channels with new buffer size if server is running
if ais.running {
ais.recreateChannels(int(newSize))
}
} }
// recreateChannels recreates the message channels with new buffer size // ReportLatency reports processing latency (now a no-op with fixed buffers)
func (ais *AudioInputServer) recreateChannels(newSize int) {
ais.channelMutex.Lock()
defer ais.channelMutex.Unlock()
// Create new channels with updated buffer size
newMessageChan := make(chan *InputIPCMessage, newSize)
newProcessChan := make(chan *InputIPCMessage, newSize)
// Drain old channels and transfer messages to new channels
ais.drainAndTransferChannel(ais.messageChan, newMessageChan)
ais.drainAndTransferChannel(ais.processChan, newProcessChan)
// Replace channels atomically
ais.messageChan = newMessageChan
ais.processChan = newProcessChan
ais.lastBufferSize = int64(newSize)
}
// drainAndTransferChannel drains the old channel and transfers messages to new channel
func (ais *AudioInputServer) drainAndTransferChannel(oldChan, newChan chan *InputIPCMessage) {
for {
select {
case msg := <-oldChan:
// Try to transfer to new channel, drop if full
select {
case newChan <- msg:
// Successfully transferred
default:
// New channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
}
default:
// Old channel empty
return
}
}
}
// ReportLatency reports processing latency to adaptive buffer manager
func (ais *AudioInputServer) ReportLatency(latency time.Duration) { func (ais *AudioInputServer) ReportLatency(latency time.Duration) {
adaptiveManager := GetAdaptiveBufferManager() // Latency reporting is now a no-op with fixed buffer sizes
adaptiveManager.UpdateLatency(latency)
} }
// GetMessagePoolStats returns detailed statistics about the message pool // GetMessagePoolStats returns detailed statistics about the message pool

View File

@ -208,15 +208,6 @@ func SetAudioQuality(quality AudioQuality) {
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
logger.Info().Int("quality", int(quality)).Msg("updating audio output quality settings dynamically") logger.Info().Int("quality", int(quality)).Msg("updating audio output quality settings dynamically")
// Immediately boost adaptive buffer sizes to handle quality change frame burst
// This prevents "Message channel full, dropping frame" warnings during transitions
adaptiveManager := GetAdaptiveBufferManager()
if adaptiveManager != nil {
// Immediately set buffers to maximum size for quality change
adaptiveManager.BoostBuffersForQualityChange()
logger.Debug().Msg("boosted adaptive buffers for quality change")
}
// Set new OPUS configuration for future restarts // Set new OPUS configuration for future restarts
if supervisor := GetAudioOutputSupervisor(); supervisor != nil { if supervisor := GetAudioOutputSupervisor(); supervisor != nil {
supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx) supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx)
@ -311,15 +302,6 @@ func SetMicrophoneQuality(quality AudioQuality) {
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
logger.Info().Int("quality", int(quality)).Msg("updating audio input subprocess quality settings dynamically") logger.Info().Int("quality", int(quality)).Msg("updating audio input subprocess quality settings dynamically")
// Immediately boost adaptive buffer sizes to handle quality change frame burst
// This prevents "Message channel full, dropping frame" warnings during transitions
adaptiveManager := GetAdaptiveBufferManager()
if adaptiveManager != nil {
// Immediately set buffers to maximum size for quality change
adaptiveManager.BoostBuffersForQualityChange()
logger.Debug().Msg("boosted adaptive buffers for quality change")
}
// Set new OPUS configuration for future restarts // Set new OPUS configuration for future restarts
supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx) supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx)

View File

@ -35,9 +35,6 @@ func startAudioSubprocess() error {
// Initialize validation cache for optimal performance // Initialize validation cache for optimal performance
audio.InitValidationCache() audio.InitValidationCache()
// Start adaptive buffer management for optimal performance
audio.StartAdaptiveBuffering()
// Start goroutine monitoring to detect and prevent leaks // Start goroutine monitoring to detect and prevent leaks
audio.StartGoroutineMonitoring() audio.StartGoroutineMonitoring()
@ -114,8 +111,7 @@ func startAudioSubprocess() error {
// Stop audio relay when process exits // Stop audio relay when process exits
audio.StopAudioRelay() audio.StopAudioRelay()
// Stop adaptive buffering
audio.StopAdaptiveBuffering()
// Stop goroutine monitoring // Stop goroutine monitoring
audio.StopGoroutineMonitoring() audio.StopGoroutineMonitoring()
// Disable batch audio processing // Disable batch audio processing