From 0893eb88acfabc98dad583a1b6f6e5d2c58677c6 Mon Sep 17 00:00:00 2001 From: Alex P Date: Mon, 8 Sep 2025 21:47:39 +0000 Subject: [PATCH] feat(audio): improve reliability with graceful degradation and async updates - Implement graceful degradation for congestion handling with configurable thresholds - Refactor audio relay track updates to be async to prevent deadlocks - Add timeout-based supervisor stop during quality changes - Optimize buffer pool configuration and cleanup strategies --- internal/audio/adaptive_buffer.go | 150 ++++++++++++++++++++++++ internal/audio/core_config_constants.go | 78 +++++++++++- internal/audio/quality_presets.go | 27 +++-- internal/audio/relay_api.go | 67 ++++++++--- internal/audio/util_buffer_pool.go | 90 ++++++-------- webrtc.go | 7 +- 6 files changed, 329 insertions(+), 90 deletions(-) diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index dc9f6f6a..d9030ca6 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -88,6 +88,10 @@ type AdaptiveBufferManager struct { systemCPUPercent int64 // System CPU percentage * 100 (atomic) systemMemoryPercent int64 // System memory percentage * 100 (atomic) adaptationCount int64 // Metrics tracking (atomic) + // Graceful degradation fields + congestionLevel int64 // Current congestion level (0-3, atomic) + degradationActive int64 // Whether degradation is active (0/1, atomic) + lastCongestionTime int64 // Last congestion detection time (unix nano, atomic) config AdaptiveBufferConfig logger zerolog.Logger @@ -190,6 +194,139 @@ func (abm *AdaptiveBufferManager) BoostBuffersForQualityChange() { Msg("Boosted buffers to maximum size for quality change") } +// DetectCongestion analyzes system state to detect audio channel congestion +// Returns congestion level: 0=none, 1=mild, 2=moderate, 3=severe +func (abm *AdaptiveBufferManager) DetectCongestion() int { + cpuPercent := float64(atomic.LoadInt64(&abm.systemCPUPercent)) / 100.0 + memoryPercent := float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / 100.0 + latencyNs := atomic.LoadInt64(&abm.averageLatency) + latency := time.Duration(latencyNs) + + // Calculate congestion score based on multiple factors + congestionScore := 0.0 + + // CPU factor (weight: 0.4) + if cpuPercent > abm.config.HighCPUThreshold { + congestionScore += 0.4 * (cpuPercent - abm.config.HighCPUThreshold) / (100.0 - abm.config.HighCPUThreshold) + } + + // Memory factor (weight: 0.3) + if memoryPercent > abm.config.HighMemoryThreshold { + congestionScore += 0.3 * (memoryPercent - abm.config.HighMemoryThreshold) / (100.0 - abm.config.HighMemoryThreshold) + } + + // Latency factor (weight: 0.3) + latencyMs := float64(latency.Milliseconds()) + latencyThreshold := float64(abm.config.TargetLatency.Milliseconds()) + if latencyMs > latencyThreshold { + congestionScore += 0.3 * (latencyMs - latencyThreshold) / latencyThreshold + } + + // Determine congestion level using configured threshold multiplier + if congestionScore > Config.CongestionThresholdMultiplier { + return 3 // Severe congestion + } else if congestionScore > Config.CongestionThresholdMultiplier*0.625 { // 0.8 * 0.625 = 0.5 + return 2 // Moderate congestion + } else if congestionScore > Config.CongestionThresholdMultiplier*0.25 { // 0.8 * 0.25 = 0.2 + return 1 // Mild congestion + } + return 0 // No congestion +} + +// ActivateGracefulDegradation implements emergency measures when congestion is detected +func (abm *AdaptiveBufferManager) ActivateGracefulDegradation(level int) { + atomic.StoreInt64(&abm.congestionLevel, int64(level)) + atomic.StoreInt64(&abm.degradationActive, 1) + atomic.StoreInt64(&abm.lastCongestionTime, time.Now().UnixNano()) + + switch level { + case 1: // Mild congestion + // Reduce buffers by configured factor + currentInput := atomic.LoadInt64(&abm.currentInputBufferSize) + currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize) + newInput := int64(float64(currentInput) * Config.CongestionMildReductionFactor) + newOutput := int64(float64(currentOutput) * Config.CongestionMildReductionFactor) + + // Ensure minimum buffer size + if newInput < int64(abm.config.MinBufferSize) { + newInput = int64(abm.config.MinBufferSize) + } + if newOutput < int64(abm.config.MinBufferSize) { + newOutput = int64(abm.config.MinBufferSize) + } + + atomic.StoreInt64(&abm.currentInputBufferSize, newInput) + atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput) + + abm.logger.Warn(). + Int("level", level). + Int64("input_buffer", newInput). + Int64("output_buffer", newOutput). + Msg("Activated mild graceful degradation") + + case 2: // Moderate congestion + // Reduce buffers by configured factor and trigger quality reduction + currentInput := atomic.LoadInt64(&abm.currentInputBufferSize) + currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize) + newInput := int64(float64(currentInput) * Config.CongestionModerateReductionFactor) + newOutput := int64(float64(currentOutput) * Config.CongestionModerateReductionFactor) + + // Ensure minimum buffer size + if newInput < int64(abm.config.MinBufferSize) { + newInput = int64(abm.config.MinBufferSize) + } + if newOutput < int64(abm.config.MinBufferSize) { + newOutput = int64(abm.config.MinBufferSize) + } + + atomic.StoreInt64(&abm.currentInputBufferSize, newInput) + atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput) + + abm.logger.Warn(). + Int("level", level). + Int64("input_buffer", newInput). + Int64("output_buffer", newOutput). + Msg("Activated moderate graceful degradation") + + case 3: // Severe congestion + // Emergency: Set buffers to minimum and force lowest quality + minSize := int64(abm.config.MinBufferSize) + atomic.StoreInt64(&abm.currentInputBufferSize, minSize) + atomic.StoreInt64(&abm.currentOutputBufferSize, minSize) + + abm.logger.Error(). + Int("level", level). + Int64("buffer_size", minSize). + Msg("Activated severe graceful degradation - emergency mode") + } +} + +// CheckRecoveryConditions determines if degradation can be deactivated +func (abm *AdaptiveBufferManager) CheckRecoveryConditions() bool { + if atomic.LoadInt64(&abm.degradationActive) == 0 { + return false // Not in degradation mode + } + + // Check if congestion has been resolved for the configured timeout + lastCongestion := time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime)) + if time.Since(lastCongestion) < Config.CongestionRecoveryTimeout { + return false + } + + // Check current system state + currentCongestion := abm.DetectCongestion() + if currentCongestion == 0 { + // Deactivate degradation + atomic.StoreInt64(&abm.degradationActive, 0) + atomic.StoreInt64(&abm.congestionLevel, 0) + + abm.logger.Info().Msg("Deactivated graceful degradation - system recovered") + return true + } + + return false +} + // adaptationLoop is the main loop that adjusts buffer sizes func (abm *AdaptiveBufferManager) adaptationLoop() { defer abm.wg.Done() @@ -235,6 +372,16 @@ func (abm *AdaptiveBufferManager) adaptationLoop() { // The algorithm runs periodically and only applies changes when the adaptation interval // has elapsed, preventing excessive adjustments that could destabilize the audio pipeline. func (abm *AdaptiveBufferManager) adaptBufferSizes() { + // Check for congestion and activate graceful degradation if needed + congestionLevel := abm.DetectCongestion() + if congestionLevel > 0 { + abm.ActivateGracefulDegradation(congestionLevel) + return // Skip normal adaptation during degradation + } + + // Check if we can recover from degradation + abm.CheckRecoveryConditions() + // Collect current system metrics metrics := abm.processMonitor.GetCurrentMetrics() if len(metrics) == 0 { @@ -441,6 +588,9 @@ func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} { "system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier, "adaptation_count": atomic.LoadInt64(&abm.adaptationCount), "last_adaptation": lastAdaptation, + "congestion_level": atomic.LoadInt64(&abm.congestionLevel), + "degradation_active": atomic.LoadInt64(&abm.degradationActive) == 1, + "last_congestion_time": time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime)), } } diff --git a/internal/audio/core_config_constants.go b/internal/audio/core_config_constants.go index 4357e7f5..c16ad829 100644 --- a/internal/audio/core_config_constants.go +++ b/internal/audio/core_config_constants.go @@ -322,6 +322,39 @@ type AudioConfigConstants struct { ConnectionTimeoutDelay time.Duration // Connection timeout for each attempt ReconnectionInterval time.Duration // Interval for automatic reconnection attempts HealthCheckInterval time.Duration // Health check interval for connections + + // Quality Change Timeout Configuration + QualityChangeSupervisorTimeout time.Duration // Timeout for supervisor stop during quality changes + QualityChangeTickerInterval time.Duration // Ticker interval for supervisor stop polling + QualityChangeSettleDelay time.Duration // Delay for quality change to settle + QualityChangeRecoveryDelay time.Duration // Delay before attempting recovery + + // Graceful Degradation Configuration + CongestionMildReductionFactor float64 // Buffer reduction factor for mild congestion (0.75) + CongestionModerateReductionFactor float64 // Buffer reduction factor for moderate congestion (0.5) + CongestionThresholdMultiplier float64 // Multiplier for congestion threshold calculations (0.8) + CongestionRecoveryTimeout time.Duration // Timeout for congestion recovery (5 seconds) + + // Buffer Pool Cache Configuration + BufferPoolCacheSize int // Buffers per goroutine cache (4) + BufferPoolCacheTTL time.Duration // Cache TTL for aggressive cleanup (5s) + BufferPoolMaxCacheEntries int // Maximum cache entries to prevent memory bloat (128) + BufferPoolCacheCleanupInterval time.Duration // Cleanup interval for frequent cleanup (15s) + BufferPoolCacheWarmupThreshold int // Warmup threshold for faster startup (25) + BufferPoolCacheHitRateTarget float64 // Target hit rate for balanced performance (0.80) + BufferPoolMaxCacheSize int // Maximum goroutine caches (256) + BufferPoolCleanupInterval int64 // Cleanup interval in seconds (15) + BufferPoolBufferTTL int64 // Buffer TTL in seconds (30) + BufferPoolControlSize int // Control pool buffer size (512) + BufferPoolMinPreallocBuffers int // Minimum preallocation buffers + BufferPoolMaxPoolSize int // Maximum pool size + BufferPoolChunkBufferCount int // Buffers per chunk + BufferPoolMinChunkSize int // Minimum chunk size (64KB) + BufferPoolInitialChunkCapacity int // Initial chunk capacity + BufferPoolAdaptiveResizeThreshold int // Threshold for adaptive resize + BufferPoolHighHitRateThreshold float64 // High hit rate threshold + BufferPoolOptimizeCacheThreshold int // Threshold for cache optimization + BufferPoolCounterResetThreshold int // Counter reset threshold } // DefaultAudioConfig returns the default configuration constants @@ -446,10 +479,10 @@ func DefaultAudioConfig() *AudioConfigConstants { MaxDecodeWriteBuffer: 4096, // Maximum CGO decode/write buffer // IPC Configuration - Balanced for stability - MagicNumber: 0xDEADBEEF, // IPC message validation header - MaxFrameSize: 4096, // Maximum audio frame size (4KB) + MagicNumber: 0xDEADBEEF, // IPC message validation header + MaxFrameSize: 4096, // Maximum audio frame size (4KB) WriteTimeout: 1000 * time.Millisecond, // Further increased timeout to handle quality change bursts - HeaderSize: 8, // IPC message header size + HeaderSize: 8, // IPC message header size // Monitoring and Metrics - Balanced for stability MetricsUpdateInterval: 1000 * time.Millisecond, // Stable metrics collection frequency @@ -488,6 +521,39 @@ func DefaultAudioConfig() *AudioConfigConstants { ReconnectionInterval: 30 * time.Second, // Interval for automatic reconnection attempts HealthCheckInterval: 10 * time.Second, // Health check interval for connections + // Quality Change Timeout Configuration + QualityChangeSupervisorTimeout: 5 * time.Second, // Timeout for supervisor stop during quality changes + QualityChangeTickerInterval: 100 * time.Millisecond, // Ticker interval for supervisor stop polling + QualityChangeSettleDelay: 2 * time.Second, // Delay for quality change to settle + QualityChangeRecoveryDelay: 1 * time.Second, // Delay before attempting recovery + + // Graceful Degradation Configuration + CongestionMildReductionFactor: 0.75, // Buffer reduction factor for mild congestion (0.75) + CongestionModerateReductionFactor: 0.5, // Buffer reduction factor for moderate congestion (0.5) + CongestionThresholdMultiplier: 0.8, // Multiplier for congestion threshold calculations (0.8) + CongestionRecoveryTimeout: 5 * time.Second, // Timeout for congestion recovery (5 seconds) + + // Buffer Pool Cache Configuration + BufferPoolCacheSize: 4, // Buffers per goroutine cache + BufferPoolCacheTTL: 5 * time.Second, // Cache TTL for aggressive cleanup + BufferPoolMaxCacheEntries: 128, // Maximum cache entries to prevent memory bloat + BufferPoolCacheCleanupInterval: 15 * time.Second, // Cleanup interval for frequent cleanup + BufferPoolCacheWarmupThreshold: 25, // Warmup threshold for faster startup + BufferPoolCacheHitRateTarget: 0.80, // Target hit rate for balanced performance + BufferPoolMaxCacheSize: 256, // Maximum goroutine caches + BufferPoolCleanupInterval: 15, // Cleanup interval in seconds + BufferPoolBufferTTL: 30, // Buffer TTL in seconds + BufferPoolControlSize: 512, // Control pool buffer size + BufferPoolMinPreallocBuffers: 16, // Minimum preallocation buffers (reduced from 50) + BufferPoolMaxPoolSize: 128, // Maximum pool size (reduced from 256) + BufferPoolChunkBufferCount: 8, // Buffers per chunk (reduced from 64 to prevent large allocations) + BufferPoolMinChunkSize: 8192, // Minimum chunk size (8KB, reduced from 64KB) + BufferPoolInitialChunkCapacity: 4, // Initial chunk capacity + BufferPoolAdaptiveResizeThreshold: 100, // Threshold for adaptive resize + BufferPoolHighHitRateThreshold: 0.95, // High hit rate threshold + BufferPoolOptimizeCacheThreshold: 100, // Threshold for cache optimization + BufferPoolCounterResetThreshold: 10000, // Counter reset threshold + // Timing Constants - Optimized for quality change stability DefaultSleepDuration: 100 * time.Millisecond, // Balanced polling interval ShortSleepDuration: 10 * time.Millisecond, // Balanced high-frequency polling @@ -509,9 +575,9 @@ func DefaultAudioConfig() *AudioConfigConstants { AdaptiveBufferTargetLatency: 10 * time.Millisecond, // Aggressive target latency for responsiveness // Adaptive Buffer Size Configuration - Optimized for quality change bursts - AdaptiveMinBufferSize: 128, // Significantly increased minimum to handle bursts - AdaptiveMaxBufferSize: 512, // Much higher maximum for quality changes - AdaptiveDefaultBufferSize: 256, // Higher default for stability + AdaptiveMinBufferSize: 256, // Further increased minimum to prevent emergency mode + AdaptiveMaxBufferSize: 1024, // Much higher maximum for quality changes + AdaptiveDefaultBufferSize: 512, // Higher default for stability during bursts // Adaptive Optimizer Configuration - Faster response CooldownPeriod: 15 * time.Second, // Reduced cooldown period diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 8117aa1f..fc4512b2 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -224,18 +224,25 @@ func SetAudioQuality(quality AudioQuality) { // Stop current subprocess supervisor.Stop() - // Wait for supervisor to fully stop before starting again + // Wait for supervisor to fully stop before starting again with timeout // This prevents race conditions and audio breakage - for i := 0; i < 50; i++ { // Wait up to 5 seconds - if !supervisor.IsRunning() { - break + stopTimeout := time.After(Config.QualityChangeSupervisorTimeout) + ticker := time.NewTicker(Config.QualityChangeTickerInterval) + defer ticker.Stop() + + for { + select { + case <-stopTimeout: + logger.Warn().Msg("supervisor did not stop within 5s timeout, proceeding anyway") + goto startSupervisor + case <-ticker.C: + if !supervisor.IsRunning() { + goto startSupervisor + } } - time.Sleep(100 * time.Millisecond) } - if supervisor.IsRunning() { - logger.Warn().Msg("supervisor did not stop within timeout, proceeding anyway") - } + startSupervisor: // Start subprocess with new configuration if err := supervisor.Start(); err != nil { @@ -246,7 +253,7 @@ func SetAudioQuality(quality AudioQuality) { // Reset audio input server stats after quality change // Allow adaptive buffer manager to naturally adjust buffer sizes go func() { - time.Sleep(2 * time.Second) // Wait for quality change to settle + time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle // Reset audio input server stats to clear persistent warnings ResetGlobalAudioInputServerStats() // Attempt recovery if microphone is still having issues @@ -365,7 +372,7 @@ func SetMicrophoneQuality(quality AudioQuality) { // Reset audio input server stats after config update // Allow adaptive buffer manager to naturally adjust buffer sizes go func() { - time.Sleep(2 * time.Second) // Wait for quality change to settle + time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle // Reset audio input server stats to clear persistent warnings ResetGlobalAudioInputServerStats() // Attempt recovery if microphone is still having issues diff --git a/internal/audio/relay_api.go b/internal/audio/relay_api.go index 8a7741c9..7f2f885d 100644 --- a/internal/audio/relay_api.go +++ b/internal/audio/relay_api.go @@ -2,7 +2,9 @@ package audio import ( "errors" + "fmt" "sync" + "time" ) // Global relay instance for the main process @@ -89,41 +91,57 @@ func IsAudioRelayRunning() bool { } // UpdateAudioRelayTrack updates the WebRTC audio track for the relay +// This function is refactored to prevent mutex deadlocks during quality changes func UpdateAudioRelayTrack(audioTrack AudioTrackWriter) error { - relayMutex.Lock() - defer relayMutex.Unlock() + var needsCallback bool + var callbackFunc TrackReplacementCallback + // Critical section: minimize time holding the mutex + relayMutex.Lock() if globalRelay == nil { // No relay running, start one with the provided track relay := NewAudioRelay() config := GetAudioConfig() if err := relay.Start(audioTrack, config); err != nil { + relayMutex.Unlock() return err } globalRelay = relay + } else { + // Update the track in the existing relay + globalRelay.UpdateTrack(audioTrack) + } - // Replace the track in the WebRTC session if callback is available - if trackReplacementCallback != nil { - if err := trackReplacementCallback(audioTrack); err != nil { - // Log error but don't fail the relay start + // Capture callback state while holding mutex + needsCallback = trackReplacementCallback != nil + if needsCallback { + callbackFunc = trackReplacementCallback + } + relayMutex.Unlock() + + // Execute callback outside of mutex to prevent deadlock + if needsCallback && callbackFunc != nil { + // Use goroutine with timeout to prevent blocking + done := make(chan error, 1) + go func() { + done <- callbackFunc(audioTrack) + }() + + // Wait for callback with timeout + select { + case err := <-done: + if err != nil { + // Log error but don't fail the relay operation // The relay can still work even if WebRTC track replacement fails _ = err // Suppress linter warning } - } - return nil - } - - // Update the track in the existing relay - globalRelay.UpdateTrack(audioTrack) - - // Replace the track in the WebRTC session if callback is available - if trackReplacementCallback != nil { - if err := trackReplacementCallback(audioTrack); err != nil { - // Log error but don't fail the track update - // The relay can still work even if WebRTC track replacement fails - _ = err // Suppress linter warning + case <-time.After(5 * time.Second): + // Timeout: log warning but continue + // This prevents indefinite blocking during quality changes + _ = fmt.Errorf("track replacement callback timed out") } } + return nil } @@ -149,6 +167,17 @@ func SetTrackReplacementCallback(callback TrackReplacementCallback) { trackReplacementCallback = callback } +// UpdateAudioRelayTrackAsync performs async track update to prevent blocking +// This is used during WebRTC session creation to avoid deadlocks +func UpdateAudioRelayTrackAsync(audioTrack AudioTrackWriter) { + go func() { + if err := UpdateAudioRelayTrack(audioTrack); err != nil { + // Log error but don't block session creation + _ = err // Suppress linter warning + } + }() +} + // connectRelayToCurrentSession connects the audio relay to the current WebRTC session's audio track // This is used when restarting the relay during unmute operations func connectRelayToCurrentSession() error { diff --git a/internal/audio/util_buffer_pool.go b/internal/audio/util_buffer_pool.go index 86d9d40b..5f452942 100644 --- a/internal/audio/util_buffer_pool.go +++ b/internal/audio/util_buffer_pool.go @@ -99,16 +99,8 @@ type lockFreeBufferCache struct { buffers [8]*[]byte // Increased from 4 to 8 buffers per goroutine cache for better hit rates } -const ( - // Enhanced cache configuration for per-goroutine optimization - cacheSize = 8 // Increased from 4 to 8 buffers per goroutine cache for better hit rates - cacheTTL = 10 * time.Second // Increased from 5s to 10s for better cache retention - // Additional cache constants for enhanced performance - maxCacheEntries = 256 // Maximum number of goroutine cache entries to prevent memory bloat - cacheCleanupInterval = 30 * time.Second // How often to clean up stale cache entries - cacheWarmupThreshold = 50 // Number of requests before enabling cache warmup - cacheHitRateTarget = 0.85 // Target cache hit rate for optimization -) +// Buffer pool constants are now configured via Config +// See core_config_constants.go for default values // TTL tracking for goroutine cache entries type cacheEntry struct { @@ -120,10 +112,8 @@ type cacheEntry struct { // Per-goroutine buffer cache using goroutine-local storage var goroutineBufferCache = make(map[int64]*lockFreeBufferCache) var goroutineCacheMutex sync.RWMutex -var lastCleanupTime int64 // Unix timestamp of last cleanup -const maxCacheSize = 500 // Maximum number of goroutine caches (reduced from 1000) -const cleanupInterval int64 = 30 // Cleanup interval in seconds (30 seconds, reduced from 60) -const bufferTTL int64 = 60 // Time-to-live for cached buffers in seconds (1 minute, reduced from 2) +var goroutineCacheWithTTL = make(map[int64]*cacheEntry) +var lastCleanupTime int64 // Unix timestamp of last cleanup // getGoroutineID extracts goroutine ID from runtime stack for cache key func getGoroutineID() int64 { @@ -144,8 +134,7 @@ func getGoroutineID() int64 { return 0 } -// Map of goroutine ID to cache entry with TTL tracking -var goroutineCacheWithTTL = make(map[int64]*cacheEntry) +// Map of goroutine ID to cache entry with TTL tracking (declared above) // cleanupChannel is used for asynchronous cleanup requests var cleanupChannel = make(chan struct{}, 1) @@ -199,9 +188,9 @@ func performCleanup(forced bool) { } // Only cleanup if enough time has passed (less time if high latency) or if forced - interval := cleanupInterval + interval := Config.BufferPoolCleanupInterval if isHighLatency { - interval = cleanupInterval / 2 // More frequent cleanup under high latency + interval = Config.BufferPoolCleanupInterval / 2 // More frequent cleanup under high latency } if !forced && now-lastCleanup < interval { @@ -255,10 +244,10 @@ func doCleanupGoroutineCache() { // Enhanced cleanup with size limits and better TTL management entriesToRemove := make([]int64, 0) - ttl := bufferTTL + ttl := Config.BufferPoolBufferTTL if isHighLatency { // Under high latency, use a much shorter TTL - ttl = bufferTTL / 4 + ttl = Config.BufferPoolBufferTTL / 4 } // Remove entries older than enhanced TTL @@ -270,7 +259,7 @@ func doCleanupGoroutineCache() { } // If we have too many cache entries, remove the oldest ones - if len(goroutineCacheWithTTL) > maxCacheEntries { + if len(goroutineCacheWithTTL) > Config.BufferPoolMaxCacheEntries { // Sort by last access time and remove oldest entries type cacheEntryWithGID struct { gid int64 @@ -285,7 +274,7 @@ func doCleanupGoroutineCache() { return entries[i].lastAccess < entries[j].lastAccess }) // Mark oldest entries for removal - excessCount := len(goroutineCacheWithTTL) - maxCacheEntries + excessCount := len(goroutineCacheWithTTL) - Config.BufferPoolMaxCacheEntries for i := 0; i < excessCount && i < len(entries); i++ { entriesToRemove = append(entriesToRemove, entries[i].gid) } @@ -293,13 +282,13 @@ func doCleanupGoroutineCache() { // If cache is still too large after TTL cleanup, remove oldest entries // Under high latency, use a more aggressive target size - targetSize := maxCacheSize - targetReduction := maxCacheSize / 2 + targetSize := Config.BufferPoolMaxCacheSize + targetReduction := Config.BufferPoolMaxCacheSize / 2 if isHighLatency { // Under high latency, target a much smaller cache size - targetSize = maxCacheSize / 4 - targetReduction = maxCacheSize / 8 + targetSize = Config.BufferPoolMaxCacheSize / 4 + targetReduction = Config.BufferPoolMaxCacheSize / 8 } if len(goroutineCacheWithTTL) > targetSize { @@ -372,33 +361,32 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { // Enhanced preallocation strategy based on buffer size and system capacity var preallocSize int if bufferSize <= Config.AudioFramePoolSize { - // For smaller pools, use enhanced preallocation (40% instead of 20%) + // For smaller pools, use enhanced preallocation preallocSize = Config.PreallocPercentage * 2 } else { - // For larger pools, use standard enhanced preallocation (30% instead of 10%) + // For larger pools, use standard enhanced preallocation preallocSize = (Config.PreallocPercentage * 3) / 2 } // Ensure minimum preallocation for better performance - minPrealloc := 50 // Minimum 50 buffers for startup performance - if preallocSize < minPrealloc { - preallocSize = minPrealloc + if preallocSize < Config.BufferPoolMinPreallocBuffers { + preallocSize = Config.BufferPoolMinPreallocBuffers } // Calculate max pool size based on buffer size to prevent memory bloat - maxPoolSize := 256 // Default + maxPoolSize := Config.BufferPoolMaxPoolSize // Default if bufferSize > 8192 { - maxPoolSize = 64 // Much smaller for very large buffers + maxPoolSize = Config.BufferPoolMaxPoolSize / 4 // Much smaller for very large buffers } else if bufferSize > 4096 { - maxPoolSize = 128 // Smaller for large buffers + maxPoolSize = Config.BufferPoolMaxPoolSize / 2 // Smaller for large buffers } else if bufferSize > 1024 { - maxPoolSize = 192 // Medium for medium buffers + maxPoolSize = (Config.BufferPoolMaxPoolSize * 3) / 4 // Medium for medium buffers } // Calculate chunk size - allocate larger chunks to reduce allocation frequency - chunkSize := bufferSize * 64 // Each chunk holds 64 buffers worth of memory - if chunkSize < 64*1024 { - chunkSize = 64 * 1024 // Minimum 64KB chunks + chunkSize := bufferSize * Config.BufferPoolChunkBufferCount // Each chunk holds multiple buffers worth of memory + if chunkSize < Config.BufferPoolMinChunkSize { + chunkSize = Config.BufferPoolMinChunkSize // Minimum chunk size } p := &AudioBufferPool{ @@ -407,8 +395,8 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { preallocated: make([]*[]byte, 0, preallocSize), preallocSize: preallocSize, chunkSize: chunkSize, - chunks: make([][]byte, 0, 4), // Start with capacity for 4 chunks - chunkOffsets: make([]int, 0, 4), + chunks: make([][]byte, 0, Config.BufferPoolInitialChunkCapacity), // Start with capacity for initial chunks + chunkOffsets: make([]int, 0, Config.BufferPoolInitialChunkCapacity), } // Configure sync.Pool with optimized allocation @@ -596,7 +584,7 @@ var ( // Main audio frame pool with enhanced capacity audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize) // Control message pool with enhanced capacity for better throughput - audioControlPool = NewAudioBufferPool(512) // Increased from Config.OutputHeaderSize to 512 for better control message handling + audioControlPool = NewAudioBufferPool(Config.BufferPoolControlSize) // Control message buffer size ) func GetAudioFrameBuffer() []byte { @@ -703,15 +691,15 @@ func (p *AudioBufferPool) AdaptiveResize() { missCount := atomic.LoadInt64(&p.missCount) totalRequests := hitCount + missCount - if totalRequests < 100 { + if totalRequests < int64(Config.BufferPoolAdaptiveResizeThreshold) { return // Not enough data for meaningful adaptation } hitRate := float64(hitCount) / float64(totalRequests) currentSize := atomic.LoadInt64(&p.currentSize) - // If hit rate is low (< 80%), consider increasing pool size - if hitRate < 0.8 && currentSize < int64(p.maxPoolSize) { + // If hit rate is low, consider increasing pool size + if hitRate < Config.BufferPoolCacheHitRateTarget && currentSize < int64(p.maxPoolSize) { // Increase preallocation by 25% up to max pool size newPreallocSize := int(float64(len(p.preallocated)) * 1.25) if newPreallocSize > p.maxPoolSize { @@ -725,8 +713,8 @@ func (p *AudioBufferPool) AdaptiveResize() { } } - // If hit rate is very high (> 95%) and pool is large, consider shrinking - if hitRate > 0.95 && len(p.preallocated) > p.preallocSize { + // If hit rate is very high and pool is large, consider shrinking + if hitRate > Config.BufferPoolHighHitRateThreshold && len(p.preallocated) > p.preallocSize { // Reduce preallocation by 10% but not below original size newSize := int(float64(len(p.preallocated)) * 0.9) if newSize < p.preallocSize { @@ -747,7 +735,7 @@ func (p *AudioBufferPool) WarmupCache() { missCount := atomic.LoadInt64(&p.missCount) totalRequests := hitCount + missCount - if totalRequests < int64(cacheWarmupThreshold) { + if totalRequests < int64(Config.BufferPoolCacheWarmupThreshold) { return } @@ -776,7 +764,7 @@ func (p *AudioBufferPool) WarmupCache() { if cache != nil { // Fill cache to optimal level based on hit rate hitRate := float64(hitCount) / float64(totalRequests) - optimalCacheSize := int(float64(cacheSize) * hitRate) + optimalCacheSize := int(float64(Config.BufferPoolCacheSize) * hitRate) if optimalCacheSize < 2 { optimalCacheSize = 2 } @@ -800,19 +788,19 @@ func (p *AudioBufferPool) OptimizeCache() { missCount := atomic.LoadInt64(&p.missCount) totalRequests := hitCount + missCount - if totalRequests < 100 { + if totalRequests < int64(Config.BufferPoolOptimizeCacheThreshold) { return } hitRate := float64(hitCount) / float64(totalRequests) // If hit rate is below target, trigger cache warmup - if hitRate < cacheHitRateTarget { + if hitRate < Config.BufferPoolCacheHitRateTarget { p.WarmupCache() } // Reset counters periodically to avoid overflow and get fresh metrics - if totalRequests > 10000 { + if totalRequests > int64(Config.BufferPoolCounterResetThreshold) { atomic.StoreInt64(&p.hitCount, hitCount/2) atomic.StoreInt64(&p.missCount, missCount/2) } diff --git a/webrtc.go b/webrtc.go index e67dce9c..19ee9f9e 100644 --- a/webrtc.go +++ b/webrtc.go @@ -245,10 +245,9 @@ func newSession(config SessionConfig) (*Session, error) { return nil, err } - // Update the audio relay with the new WebRTC audio track - if err := audio.UpdateAudioRelayTrack(session.AudioTrack); err != nil { - scopedLogger.Warn().Err(err).Msg("Failed to update audio relay track") - } + // Update the audio relay with the new WebRTC audio track asynchronously + // This prevents blocking during session creation and avoids mutex deadlocks + audio.UpdateAudioRelayTrackAsync(session.AudioTrack) videoRtpSender, err := peerConnection.AddTrack(session.VideoTrack) if err != nil {