mirror of https://github.com/jetkvm/kvm.git
feat(audio): improve reliability with graceful degradation and async updates
- Implement graceful degradation for congestion handling with configurable thresholds - Refactor audio relay track updates to be async to prevent deadlocks - Add timeout-based supervisor stop during quality changes - Optimize buffer pool configuration and cleanup strategies
This commit is contained in:
parent
8cf0b639af
commit
0893eb88ac
|
@ -88,6 +88,10 @@ type AdaptiveBufferManager struct {
|
|||
systemCPUPercent int64 // System CPU percentage * 100 (atomic)
|
||||
systemMemoryPercent int64 // System memory percentage * 100 (atomic)
|
||||
adaptationCount int64 // Metrics tracking (atomic)
|
||||
// Graceful degradation fields
|
||||
congestionLevel int64 // Current congestion level (0-3, atomic)
|
||||
degradationActive int64 // Whether degradation is active (0/1, atomic)
|
||||
lastCongestionTime int64 // Last congestion detection time (unix nano, atomic)
|
||||
|
||||
config AdaptiveBufferConfig
|
||||
logger zerolog.Logger
|
||||
|
@ -190,6 +194,139 @@ func (abm *AdaptiveBufferManager) BoostBuffersForQualityChange() {
|
|||
Msg("Boosted buffers to maximum size for quality change")
|
||||
}
|
||||
|
||||
// DetectCongestion analyzes system state to detect audio channel congestion
|
||||
// Returns congestion level: 0=none, 1=mild, 2=moderate, 3=severe
|
||||
func (abm *AdaptiveBufferManager) DetectCongestion() int {
|
||||
cpuPercent := float64(atomic.LoadInt64(&abm.systemCPUPercent)) / 100.0
|
||||
memoryPercent := float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / 100.0
|
||||
latencyNs := atomic.LoadInt64(&abm.averageLatency)
|
||||
latency := time.Duration(latencyNs)
|
||||
|
||||
// Calculate congestion score based on multiple factors
|
||||
congestionScore := 0.0
|
||||
|
||||
// CPU factor (weight: 0.4)
|
||||
if cpuPercent > abm.config.HighCPUThreshold {
|
||||
congestionScore += 0.4 * (cpuPercent - abm.config.HighCPUThreshold) / (100.0 - abm.config.HighCPUThreshold)
|
||||
}
|
||||
|
||||
// Memory factor (weight: 0.3)
|
||||
if memoryPercent > abm.config.HighMemoryThreshold {
|
||||
congestionScore += 0.3 * (memoryPercent - abm.config.HighMemoryThreshold) / (100.0 - abm.config.HighMemoryThreshold)
|
||||
}
|
||||
|
||||
// Latency factor (weight: 0.3)
|
||||
latencyMs := float64(latency.Milliseconds())
|
||||
latencyThreshold := float64(abm.config.TargetLatency.Milliseconds())
|
||||
if latencyMs > latencyThreshold {
|
||||
congestionScore += 0.3 * (latencyMs - latencyThreshold) / latencyThreshold
|
||||
}
|
||||
|
||||
// Determine congestion level using configured threshold multiplier
|
||||
if congestionScore > Config.CongestionThresholdMultiplier {
|
||||
return 3 // Severe congestion
|
||||
} else if congestionScore > Config.CongestionThresholdMultiplier*0.625 { // 0.8 * 0.625 = 0.5
|
||||
return 2 // Moderate congestion
|
||||
} else if congestionScore > Config.CongestionThresholdMultiplier*0.25 { // 0.8 * 0.25 = 0.2
|
||||
return 1 // Mild congestion
|
||||
}
|
||||
return 0 // No congestion
|
||||
}
|
||||
|
||||
// ActivateGracefulDegradation implements emergency measures when congestion is detected
|
||||
func (abm *AdaptiveBufferManager) ActivateGracefulDegradation(level int) {
|
||||
atomic.StoreInt64(&abm.congestionLevel, int64(level))
|
||||
atomic.StoreInt64(&abm.degradationActive, 1)
|
||||
atomic.StoreInt64(&abm.lastCongestionTime, time.Now().UnixNano())
|
||||
|
||||
switch level {
|
||||
case 1: // Mild congestion
|
||||
// Reduce buffers by configured factor
|
||||
currentInput := atomic.LoadInt64(&abm.currentInputBufferSize)
|
||||
currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize)
|
||||
newInput := int64(float64(currentInput) * Config.CongestionMildReductionFactor)
|
||||
newOutput := int64(float64(currentOutput) * Config.CongestionMildReductionFactor)
|
||||
|
||||
// Ensure minimum buffer size
|
||||
if newInput < int64(abm.config.MinBufferSize) {
|
||||
newInput = int64(abm.config.MinBufferSize)
|
||||
}
|
||||
if newOutput < int64(abm.config.MinBufferSize) {
|
||||
newOutput = int64(abm.config.MinBufferSize)
|
||||
}
|
||||
|
||||
atomic.StoreInt64(&abm.currentInputBufferSize, newInput)
|
||||
atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput)
|
||||
|
||||
abm.logger.Warn().
|
||||
Int("level", level).
|
||||
Int64("input_buffer", newInput).
|
||||
Int64("output_buffer", newOutput).
|
||||
Msg("Activated mild graceful degradation")
|
||||
|
||||
case 2: // Moderate congestion
|
||||
// Reduce buffers by configured factor and trigger quality reduction
|
||||
currentInput := atomic.LoadInt64(&abm.currentInputBufferSize)
|
||||
currentOutput := atomic.LoadInt64(&abm.currentOutputBufferSize)
|
||||
newInput := int64(float64(currentInput) * Config.CongestionModerateReductionFactor)
|
||||
newOutput := int64(float64(currentOutput) * Config.CongestionModerateReductionFactor)
|
||||
|
||||
// Ensure minimum buffer size
|
||||
if newInput < int64(abm.config.MinBufferSize) {
|
||||
newInput = int64(abm.config.MinBufferSize)
|
||||
}
|
||||
if newOutput < int64(abm.config.MinBufferSize) {
|
||||
newOutput = int64(abm.config.MinBufferSize)
|
||||
}
|
||||
|
||||
atomic.StoreInt64(&abm.currentInputBufferSize, newInput)
|
||||
atomic.StoreInt64(&abm.currentOutputBufferSize, newOutput)
|
||||
|
||||
abm.logger.Warn().
|
||||
Int("level", level).
|
||||
Int64("input_buffer", newInput).
|
||||
Int64("output_buffer", newOutput).
|
||||
Msg("Activated moderate graceful degradation")
|
||||
|
||||
case 3: // Severe congestion
|
||||
// Emergency: Set buffers to minimum and force lowest quality
|
||||
minSize := int64(abm.config.MinBufferSize)
|
||||
atomic.StoreInt64(&abm.currentInputBufferSize, minSize)
|
||||
atomic.StoreInt64(&abm.currentOutputBufferSize, minSize)
|
||||
|
||||
abm.logger.Error().
|
||||
Int("level", level).
|
||||
Int64("buffer_size", minSize).
|
||||
Msg("Activated severe graceful degradation - emergency mode")
|
||||
}
|
||||
}
|
||||
|
||||
// CheckRecoveryConditions determines if degradation can be deactivated
|
||||
func (abm *AdaptiveBufferManager) CheckRecoveryConditions() bool {
|
||||
if atomic.LoadInt64(&abm.degradationActive) == 0 {
|
||||
return false // Not in degradation mode
|
||||
}
|
||||
|
||||
// Check if congestion has been resolved for the configured timeout
|
||||
lastCongestion := time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime))
|
||||
if time.Since(lastCongestion) < Config.CongestionRecoveryTimeout {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check current system state
|
||||
currentCongestion := abm.DetectCongestion()
|
||||
if currentCongestion == 0 {
|
||||
// Deactivate degradation
|
||||
atomic.StoreInt64(&abm.degradationActive, 0)
|
||||
atomic.StoreInt64(&abm.congestionLevel, 0)
|
||||
|
||||
abm.logger.Info().Msg("Deactivated graceful degradation - system recovered")
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// adaptationLoop is the main loop that adjusts buffer sizes
|
||||
func (abm *AdaptiveBufferManager) adaptationLoop() {
|
||||
defer abm.wg.Done()
|
||||
|
@ -235,6 +372,16 @@ func (abm *AdaptiveBufferManager) adaptationLoop() {
|
|||
// The algorithm runs periodically and only applies changes when the adaptation interval
|
||||
// has elapsed, preventing excessive adjustments that could destabilize the audio pipeline.
|
||||
func (abm *AdaptiveBufferManager) adaptBufferSizes() {
|
||||
// Check for congestion and activate graceful degradation if needed
|
||||
congestionLevel := abm.DetectCongestion()
|
||||
if congestionLevel > 0 {
|
||||
abm.ActivateGracefulDegradation(congestionLevel)
|
||||
return // Skip normal adaptation during degradation
|
||||
}
|
||||
|
||||
// Check if we can recover from degradation
|
||||
abm.CheckRecoveryConditions()
|
||||
|
||||
// Collect current system metrics
|
||||
metrics := abm.processMonitor.GetCurrentMetrics()
|
||||
if len(metrics) == 0 {
|
||||
|
@ -441,6 +588,9 @@ func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} {
|
|||
"system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier,
|
||||
"adaptation_count": atomic.LoadInt64(&abm.adaptationCount),
|
||||
"last_adaptation": lastAdaptation,
|
||||
"congestion_level": atomic.LoadInt64(&abm.congestionLevel),
|
||||
"degradation_active": atomic.LoadInt64(&abm.degradationActive) == 1,
|
||||
"last_congestion_time": time.Unix(0, atomic.LoadInt64(&abm.lastCongestionTime)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -322,6 +322,39 @@ type AudioConfigConstants struct {
|
|||
ConnectionTimeoutDelay time.Duration // Connection timeout for each attempt
|
||||
ReconnectionInterval time.Duration // Interval for automatic reconnection attempts
|
||||
HealthCheckInterval time.Duration // Health check interval for connections
|
||||
|
||||
// Quality Change Timeout Configuration
|
||||
QualityChangeSupervisorTimeout time.Duration // Timeout for supervisor stop during quality changes
|
||||
QualityChangeTickerInterval time.Duration // Ticker interval for supervisor stop polling
|
||||
QualityChangeSettleDelay time.Duration // Delay for quality change to settle
|
||||
QualityChangeRecoveryDelay time.Duration // Delay before attempting recovery
|
||||
|
||||
// Graceful Degradation Configuration
|
||||
CongestionMildReductionFactor float64 // Buffer reduction factor for mild congestion (0.75)
|
||||
CongestionModerateReductionFactor float64 // Buffer reduction factor for moderate congestion (0.5)
|
||||
CongestionThresholdMultiplier float64 // Multiplier for congestion threshold calculations (0.8)
|
||||
CongestionRecoveryTimeout time.Duration // Timeout for congestion recovery (5 seconds)
|
||||
|
||||
// Buffer Pool Cache Configuration
|
||||
BufferPoolCacheSize int // Buffers per goroutine cache (4)
|
||||
BufferPoolCacheTTL time.Duration // Cache TTL for aggressive cleanup (5s)
|
||||
BufferPoolMaxCacheEntries int // Maximum cache entries to prevent memory bloat (128)
|
||||
BufferPoolCacheCleanupInterval time.Duration // Cleanup interval for frequent cleanup (15s)
|
||||
BufferPoolCacheWarmupThreshold int // Warmup threshold for faster startup (25)
|
||||
BufferPoolCacheHitRateTarget float64 // Target hit rate for balanced performance (0.80)
|
||||
BufferPoolMaxCacheSize int // Maximum goroutine caches (256)
|
||||
BufferPoolCleanupInterval int64 // Cleanup interval in seconds (15)
|
||||
BufferPoolBufferTTL int64 // Buffer TTL in seconds (30)
|
||||
BufferPoolControlSize int // Control pool buffer size (512)
|
||||
BufferPoolMinPreallocBuffers int // Minimum preallocation buffers
|
||||
BufferPoolMaxPoolSize int // Maximum pool size
|
||||
BufferPoolChunkBufferCount int // Buffers per chunk
|
||||
BufferPoolMinChunkSize int // Minimum chunk size (64KB)
|
||||
BufferPoolInitialChunkCapacity int // Initial chunk capacity
|
||||
BufferPoolAdaptiveResizeThreshold int // Threshold for adaptive resize
|
||||
BufferPoolHighHitRateThreshold float64 // High hit rate threshold
|
||||
BufferPoolOptimizeCacheThreshold int // Threshold for cache optimization
|
||||
BufferPoolCounterResetThreshold int // Counter reset threshold
|
||||
}
|
||||
|
||||
// DefaultAudioConfig returns the default configuration constants
|
||||
|
@ -446,10 +479,10 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
MaxDecodeWriteBuffer: 4096, // Maximum CGO decode/write buffer
|
||||
|
||||
// IPC Configuration - Balanced for stability
|
||||
MagicNumber: 0xDEADBEEF, // IPC message validation header
|
||||
MaxFrameSize: 4096, // Maximum audio frame size (4KB)
|
||||
MagicNumber: 0xDEADBEEF, // IPC message validation header
|
||||
MaxFrameSize: 4096, // Maximum audio frame size (4KB)
|
||||
WriteTimeout: 1000 * time.Millisecond, // Further increased timeout to handle quality change bursts
|
||||
HeaderSize: 8, // IPC message header size
|
||||
HeaderSize: 8, // IPC message header size
|
||||
|
||||
// Monitoring and Metrics - Balanced for stability
|
||||
MetricsUpdateInterval: 1000 * time.Millisecond, // Stable metrics collection frequency
|
||||
|
@ -488,6 +521,39 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
ReconnectionInterval: 30 * time.Second, // Interval for automatic reconnection attempts
|
||||
HealthCheckInterval: 10 * time.Second, // Health check interval for connections
|
||||
|
||||
// Quality Change Timeout Configuration
|
||||
QualityChangeSupervisorTimeout: 5 * time.Second, // Timeout for supervisor stop during quality changes
|
||||
QualityChangeTickerInterval: 100 * time.Millisecond, // Ticker interval for supervisor stop polling
|
||||
QualityChangeSettleDelay: 2 * time.Second, // Delay for quality change to settle
|
||||
QualityChangeRecoveryDelay: 1 * time.Second, // Delay before attempting recovery
|
||||
|
||||
// Graceful Degradation Configuration
|
||||
CongestionMildReductionFactor: 0.75, // Buffer reduction factor for mild congestion (0.75)
|
||||
CongestionModerateReductionFactor: 0.5, // Buffer reduction factor for moderate congestion (0.5)
|
||||
CongestionThresholdMultiplier: 0.8, // Multiplier for congestion threshold calculations (0.8)
|
||||
CongestionRecoveryTimeout: 5 * time.Second, // Timeout for congestion recovery (5 seconds)
|
||||
|
||||
// Buffer Pool Cache Configuration
|
||||
BufferPoolCacheSize: 4, // Buffers per goroutine cache
|
||||
BufferPoolCacheTTL: 5 * time.Second, // Cache TTL for aggressive cleanup
|
||||
BufferPoolMaxCacheEntries: 128, // Maximum cache entries to prevent memory bloat
|
||||
BufferPoolCacheCleanupInterval: 15 * time.Second, // Cleanup interval for frequent cleanup
|
||||
BufferPoolCacheWarmupThreshold: 25, // Warmup threshold for faster startup
|
||||
BufferPoolCacheHitRateTarget: 0.80, // Target hit rate for balanced performance
|
||||
BufferPoolMaxCacheSize: 256, // Maximum goroutine caches
|
||||
BufferPoolCleanupInterval: 15, // Cleanup interval in seconds
|
||||
BufferPoolBufferTTL: 30, // Buffer TTL in seconds
|
||||
BufferPoolControlSize: 512, // Control pool buffer size
|
||||
BufferPoolMinPreallocBuffers: 16, // Minimum preallocation buffers (reduced from 50)
|
||||
BufferPoolMaxPoolSize: 128, // Maximum pool size (reduced from 256)
|
||||
BufferPoolChunkBufferCount: 8, // Buffers per chunk (reduced from 64 to prevent large allocations)
|
||||
BufferPoolMinChunkSize: 8192, // Minimum chunk size (8KB, reduced from 64KB)
|
||||
BufferPoolInitialChunkCapacity: 4, // Initial chunk capacity
|
||||
BufferPoolAdaptiveResizeThreshold: 100, // Threshold for adaptive resize
|
||||
BufferPoolHighHitRateThreshold: 0.95, // High hit rate threshold
|
||||
BufferPoolOptimizeCacheThreshold: 100, // Threshold for cache optimization
|
||||
BufferPoolCounterResetThreshold: 10000, // Counter reset threshold
|
||||
|
||||
// Timing Constants - Optimized for quality change stability
|
||||
DefaultSleepDuration: 100 * time.Millisecond, // Balanced polling interval
|
||||
ShortSleepDuration: 10 * time.Millisecond, // Balanced high-frequency polling
|
||||
|
@ -509,9 +575,9 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
AdaptiveBufferTargetLatency: 10 * time.Millisecond, // Aggressive target latency for responsiveness
|
||||
|
||||
// Adaptive Buffer Size Configuration - Optimized for quality change bursts
|
||||
AdaptiveMinBufferSize: 128, // Significantly increased minimum to handle bursts
|
||||
AdaptiveMaxBufferSize: 512, // Much higher maximum for quality changes
|
||||
AdaptiveDefaultBufferSize: 256, // Higher default for stability
|
||||
AdaptiveMinBufferSize: 256, // Further increased minimum to prevent emergency mode
|
||||
AdaptiveMaxBufferSize: 1024, // Much higher maximum for quality changes
|
||||
AdaptiveDefaultBufferSize: 512, // Higher default for stability during bursts
|
||||
|
||||
// Adaptive Optimizer Configuration - Faster response
|
||||
CooldownPeriod: 15 * time.Second, // Reduced cooldown period
|
||||
|
|
|
@ -224,18 +224,25 @@ func SetAudioQuality(quality AudioQuality) {
|
|||
// Stop current subprocess
|
||||
supervisor.Stop()
|
||||
|
||||
// Wait for supervisor to fully stop before starting again
|
||||
// Wait for supervisor to fully stop before starting again with timeout
|
||||
// This prevents race conditions and audio breakage
|
||||
for i := 0; i < 50; i++ { // Wait up to 5 seconds
|
||||
if !supervisor.IsRunning() {
|
||||
break
|
||||
stopTimeout := time.After(Config.QualityChangeSupervisorTimeout)
|
||||
ticker := time.NewTicker(Config.QualityChangeTickerInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stopTimeout:
|
||||
logger.Warn().Msg("supervisor did not stop within 5s timeout, proceeding anyway")
|
||||
goto startSupervisor
|
||||
case <-ticker.C:
|
||||
if !supervisor.IsRunning() {
|
||||
goto startSupervisor
|
||||
}
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
if supervisor.IsRunning() {
|
||||
logger.Warn().Msg("supervisor did not stop within timeout, proceeding anyway")
|
||||
}
|
||||
startSupervisor:
|
||||
|
||||
// Start subprocess with new configuration
|
||||
if err := supervisor.Start(); err != nil {
|
||||
|
@ -246,7 +253,7 @@ func SetAudioQuality(quality AudioQuality) {
|
|||
// Reset audio input server stats after quality change
|
||||
// Allow adaptive buffer manager to naturally adjust buffer sizes
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second) // Wait for quality change to settle
|
||||
time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle
|
||||
// Reset audio input server stats to clear persistent warnings
|
||||
ResetGlobalAudioInputServerStats()
|
||||
// Attempt recovery if microphone is still having issues
|
||||
|
@ -365,7 +372,7 @@ func SetMicrophoneQuality(quality AudioQuality) {
|
|||
// Reset audio input server stats after config update
|
||||
// Allow adaptive buffer manager to naturally adjust buffer sizes
|
||||
go func() {
|
||||
time.Sleep(2 * time.Second) // Wait for quality change to settle
|
||||
time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle
|
||||
// Reset audio input server stats to clear persistent warnings
|
||||
ResetGlobalAudioInputServerStats()
|
||||
// Attempt recovery if microphone is still having issues
|
||||
|
|
|
@ -2,7 +2,9 @@ package audio
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Global relay instance for the main process
|
||||
|
@ -89,41 +91,57 @@ func IsAudioRelayRunning() bool {
|
|||
}
|
||||
|
||||
// UpdateAudioRelayTrack updates the WebRTC audio track for the relay
|
||||
// This function is refactored to prevent mutex deadlocks during quality changes
|
||||
func UpdateAudioRelayTrack(audioTrack AudioTrackWriter) error {
|
||||
relayMutex.Lock()
|
||||
defer relayMutex.Unlock()
|
||||
var needsCallback bool
|
||||
var callbackFunc TrackReplacementCallback
|
||||
|
||||
// Critical section: minimize time holding the mutex
|
||||
relayMutex.Lock()
|
||||
if globalRelay == nil {
|
||||
// No relay running, start one with the provided track
|
||||
relay := NewAudioRelay()
|
||||
config := GetAudioConfig()
|
||||
if err := relay.Start(audioTrack, config); err != nil {
|
||||
relayMutex.Unlock()
|
||||
return err
|
||||
}
|
||||
globalRelay = relay
|
||||
} else {
|
||||
// Update the track in the existing relay
|
||||
globalRelay.UpdateTrack(audioTrack)
|
||||
}
|
||||
|
||||
// Replace the track in the WebRTC session if callback is available
|
||||
if trackReplacementCallback != nil {
|
||||
if err := trackReplacementCallback(audioTrack); err != nil {
|
||||
// Log error but don't fail the relay start
|
||||
// Capture callback state while holding mutex
|
||||
needsCallback = trackReplacementCallback != nil
|
||||
if needsCallback {
|
||||
callbackFunc = trackReplacementCallback
|
||||
}
|
||||
relayMutex.Unlock()
|
||||
|
||||
// Execute callback outside of mutex to prevent deadlock
|
||||
if needsCallback && callbackFunc != nil {
|
||||
// Use goroutine with timeout to prevent blocking
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
done <- callbackFunc(audioTrack)
|
||||
}()
|
||||
|
||||
// Wait for callback with timeout
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
// Log error but don't fail the relay operation
|
||||
// The relay can still work even if WebRTC track replacement fails
|
||||
_ = err // Suppress linter warning
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Update the track in the existing relay
|
||||
globalRelay.UpdateTrack(audioTrack)
|
||||
|
||||
// Replace the track in the WebRTC session if callback is available
|
||||
if trackReplacementCallback != nil {
|
||||
if err := trackReplacementCallback(audioTrack); err != nil {
|
||||
// Log error but don't fail the track update
|
||||
// The relay can still work even if WebRTC track replacement fails
|
||||
_ = err // Suppress linter warning
|
||||
case <-time.After(5 * time.Second):
|
||||
// Timeout: log warning but continue
|
||||
// This prevents indefinite blocking during quality changes
|
||||
_ = fmt.Errorf("track replacement callback timed out")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -149,6 +167,17 @@ func SetTrackReplacementCallback(callback TrackReplacementCallback) {
|
|||
trackReplacementCallback = callback
|
||||
}
|
||||
|
||||
// UpdateAudioRelayTrackAsync performs async track update to prevent blocking
|
||||
// This is used during WebRTC session creation to avoid deadlocks
|
||||
func UpdateAudioRelayTrackAsync(audioTrack AudioTrackWriter) {
|
||||
go func() {
|
||||
if err := UpdateAudioRelayTrack(audioTrack); err != nil {
|
||||
// Log error but don't block session creation
|
||||
_ = err // Suppress linter warning
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// connectRelayToCurrentSession connects the audio relay to the current WebRTC session's audio track
|
||||
// This is used when restarting the relay during unmute operations
|
||||
func connectRelayToCurrentSession() error {
|
||||
|
|
|
@ -99,16 +99,8 @@ type lockFreeBufferCache struct {
|
|||
buffers [8]*[]byte // Increased from 4 to 8 buffers per goroutine cache for better hit rates
|
||||
}
|
||||
|
||||
const (
|
||||
// Enhanced cache configuration for per-goroutine optimization
|
||||
cacheSize = 8 // Increased from 4 to 8 buffers per goroutine cache for better hit rates
|
||||
cacheTTL = 10 * time.Second // Increased from 5s to 10s for better cache retention
|
||||
// Additional cache constants for enhanced performance
|
||||
maxCacheEntries = 256 // Maximum number of goroutine cache entries to prevent memory bloat
|
||||
cacheCleanupInterval = 30 * time.Second // How often to clean up stale cache entries
|
||||
cacheWarmupThreshold = 50 // Number of requests before enabling cache warmup
|
||||
cacheHitRateTarget = 0.85 // Target cache hit rate for optimization
|
||||
)
|
||||
// Buffer pool constants are now configured via Config
|
||||
// See core_config_constants.go for default values
|
||||
|
||||
// TTL tracking for goroutine cache entries
|
||||
type cacheEntry struct {
|
||||
|
@ -120,10 +112,8 @@ type cacheEntry struct {
|
|||
// Per-goroutine buffer cache using goroutine-local storage
|
||||
var goroutineBufferCache = make(map[int64]*lockFreeBufferCache)
|
||||
var goroutineCacheMutex sync.RWMutex
|
||||
var lastCleanupTime int64 // Unix timestamp of last cleanup
|
||||
const maxCacheSize = 500 // Maximum number of goroutine caches (reduced from 1000)
|
||||
const cleanupInterval int64 = 30 // Cleanup interval in seconds (30 seconds, reduced from 60)
|
||||
const bufferTTL int64 = 60 // Time-to-live for cached buffers in seconds (1 minute, reduced from 2)
|
||||
var goroutineCacheWithTTL = make(map[int64]*cacheEntry)
|
||||
var lastCleanupTime int64 // Unix timestamp of last cleanup
|
||||
|
||||
// getGoroutineID extracts goroutine ID from runtime stack for cache key
|
||||
func getGoroutineID() int64 {
|
||||
|
@ -144,8 +134,7 @@ func getGoroutineID() int64 {
|
|||
return 0
|
||||
}
|
||||
|
||||
// Map of goroutine ID to cache entry with TTL tracking
|
||||
var goroutineCacheWithTTL = make(map[int64]*cacheEntry)
|
||||
// Map of goroutine ID to cache entry with TTL tracking (declared above)
|
||||
|
||||
// cleanupChannel is used for asynchronous cleanup requests
|
||||
var cleanupChannel = make(chan struct{}, 1)
|
||||
|
@ -199,9 +188,9 @@ func performCleanup(forced bool) {
|
|||
}
|
||||
|
||||
// Only cleanup if enough time has passed (less time if high latency) or if forced
|
||||
interval := cleanupInterval
|
||||
interval := Config.BufferPoolCleanupInterval
|
||||
if isHighLatency {
|
||||
interval = cleanupInterval / 2 // More frequent cleanup under high latency
|
||||
interval = Config.BufferPoolCleanupInterval / 2 // More frequent cleanup under high latency
|
||||
}
|
||||
|
||||
if !forced && now-lastCleanup < interval {
|
||||
|
@ -255,10 +244,10 @@ func doCleanupGoroutineCache() {
|
|||
|
||||
// Enhanced cleanup with size limits and better TTL management
|
||||
entriesToRemove := make([]int64, 0)
|
||||
ttl := bufferTTL
|
||||
ttl := Config.BufferPoolBufferTTL
|
||||
if isHighLatency {
|
||||
// Under high latency, use a much shorter TTL
|
||||
ttl = bufferTTL / 4
|
||||
ttl = Config.BufferPoolBufferTTL / 4
|
||||
}
|
||||
|
||||
// Remove entries older than enhanced TTL
|
||||
|
@ -270,7 +259,7 @@ func doCleanupGoroutineCache() {
|
|||
}
|
||||
|
||||
// If we have too many cache entries, remove the oldest ones
|
||||
if len(goroutineCacheWithTTL) > maxCacheEntries {
|
||||
if len(goroutineCacheWithTTL) > Config.BufferPoolMaxCacheEntries {
|
||||
// Sort by last access time and remove oldest entries
|
||||
type cacheEntryWithGID struct {
|
||||
gid int64
|
||||
|
@ -285,7 +274,7 @@ func doCleanupGoroutineCache() {
|
|||
return entries[i].lastAccess < entries[j].lastAccess
|
||||
})
|
||||
// Mark oldest entries for removal
|
||||
excessCount := len(goroutineCacheWithTTL) - maxCacheEntries
|
||||
excessCount := len(goroutineCacheWithTTL) - Config.BufferPoolMaxCacheEntries
|
||||
for i := 0; i < excessCount && i < len(entries); i++ {
|
||||
entriesToRemove = append(entriesToRemove, entries[i].gid)
|
||||
}
|
||||
|
@ -293,13 +282,13 @@ func doCleanupGoroutineCache() {
|
|||
|
||||
// If cache is still too large after TTL cleanup, remove oldest entries
|
||||
// Under high latency, use a more aggressive target size
|
||||
targetSize := maxCacheSize
|
||||
targetReduction := maxCacheSize / 2
|
||||
targetSize := Config.BufferPoolMaxCacheSize
|
||||
targetReduction := Config.BufferPoolMaxCacheSize / 2
|
||||
|
||||
if isHighLatency {
|
||||
// Under high latency, target a much smaller cache size
|
||||
targetSize = maxCacheSize / 4
|
||||
targetReduction = maxCacheSize / 8
|
||||
targetSize = Config.BufferPoolMaxCacheSize / 4
|
||||
targetReduction = Config.BufferPoolMaxCacheSize / 8
|
||||
}
|
||||
|
||||
if len(goroutineCacheWithTTL) > targetSize {
|
||||
|
@ -372,33 +361,32 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
|
|||
// Enhanced preallocation strategy based on buffer size and system capacity
|
||||
var preallocSize int
|
||||
if bufferSize <= Config.AudioFramePoolSize {
|
||||
// For smaller pools, use enhanced preallocation (40% instead of 20%)
|
||||
// For smaller pools, use enhanced preallocation
|
||||
preallocSize = Config.PreallocPercentage * 2
|
||||
} else {
|
||||
// For larger pools, use standard enhanced preallocation (30% instead of 10%)
|
||||
// For larger pools, use standard enhanced preallocation
|
||||
preallocSize = (Config.PreallocPercentage * 3) / 2
|
||||
}
|
||||
|
||||
// Ensure minimum preallocation for better performance
|
||||
minPrealloc := 50 // Minimum 50 buffers for startup performance
|
||||
if preallocSize < minPrealloc {
|
||||
preallocSize = minPrealloc
|
||||
if preallocSize < Config.BufferPoolMinPreallocBuffers {
|
||||
preallocSize = Config.BufferPoolMinPreallocBuffers
|
||||
}
|
||||
|
||||
// Calculate max pool size based on buffer size to prevent memory bloat
|
||||
maxPoolSize := 256 // Default
|
||||
maxPoolSize := Config.BufferPoolMaxPoolSize // Default
|
||||
if bufferSize > 8192 {
|
||||
maxPoolSize = 64 // Much smaller for very large buffers
|
||||
maxPoolSize = Config.BufferPoolMaxPoolSize / 4 // Much smaller for very large buffers
|
||||
} else if bufferSize > 4096 {
|
||||
maxPoolSize = 128 // Smaller for large buffers
|
||||
maxPoolSize = Config.BufferPoolMaxPoolSize / 2 // Smaller for large buffers
|
||||
} else if bufferSize > 1024 {
|
||||
maxPoolSize = 192 // Medium for medium buffers
|
||||
maxPoolSize = (Config.BufferPoolMaxPoolSize * 3) / 4 // Medium for medium buffers
|
||||
}
|
||||
|
||||
// Calculate chunk size - allocate larger chunks to reduce allocation frequency
|
||||
chunkSize := bufferSize * 64 // Each chunk holds 64 buffers worth of memory
|
||||
if chunkSize < 64*1024 {
|
||||
chunkSize = 64 * 1024 // Minimum 64KB chunks
|
||||
chunkSize := bufferSize * Config.BufferPoolChunkBufferCount // Each chunk holds multiple buffers worth of memory
|
||||
if chunkSize < Config.BufferPoolMinChunkSize {
|
||||
chunkSize = Config.BufferPoolMinChunkSize // Minimum chunk size
|
||||
}
|
||||
|
||||
p := &AudioBufferPool{
|
||||
|
@ -407,8 +395,8 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
|
|||
preallocated: make([]*[]byte, 0, preallocSize),
|
||||
preallocSize: preallocSize,
|
||||
chunkSize: chunkSize,
|
||||
chunks: make([][]byte, 0, 4), // Start with capacity for 4 chunks
|
||||
chunkOffsets: make([]int, 0, 4),
|
||||
chunks: make([][]byte, 0, Config.BufferPoolInitialChunkCapacity), // Start with capacity for initial chunks
|
||||
chunkOffsets: make([]int, 0, Config.BufferPoolInitialChunkCapacity),
|
||||
}
|
||||
|
||||
// Configure sync.Pool with optimized allocation
|
||||
|
@ -596,7 +584,7 @@ var (
|
|||
// Main audio frame pool with enhanced capacity
|
||||
audioFramePool = NewAudioBufferPool(Config.AudioFramePoolSize)
|
||||
// Control message pool with enhanced capacity for better throughput
|
||||
audioControlPool = NewAudioBufferPool(512) // Increased from Config.OutputHeaderSize to 512 for better control message handling
|
||||
audioControlPool = NewAudioBufferPool(Config.BufferPoolControlSize) // Control message buffer size
|
||||
)
|
||||
|
||||
func GetAudioFrameBuffer() []byte {
|
||||
|
@ -703,15 +691,15 @@ func (p *AudioBufferPool) AdaptiveResize() {
|
|||
missCount := atomic.LoadInt64(&p.missCount)
|
||||
totalRequests := hitCount + missCount
|
||||
|
||||
if totalRequests < 100 {
|
||||
if totalRequests < int64(Config.BufferPoolAdaptiveResizeThreshold) {
|
||||
return // Not enough data for meaningful adaptation
|
||||
}
|
||||
|
||||
hitRate := float64(hitCount) / float64(totalRequests)
|
||||
currentSize := atomic.LoadInt64(&p.currentSize)
|
||||
|
||||
// If hit rate is low (< 80%), consider increasing pool size
|
||||
if hitRate < 0.8 && currentSize < int64(p.maxPoolSize) {
|
||||
// If hit rate is low, consider increasing pool size
|
||||
if hitRate < Config.BufferPoolCacheHitRateTarget && currentSize < int64(p.maxPoolSize) {
|
||||
// Increase preallocation by 25% up to max pool size
|
||||
newPreallocSize := int(float64(len(p.preallocated)) * 1.25)
|
||||
if newPreallocSize > p.maxPoolSize {
|
||||
|
@ -725,8 +713,8 @@ func (p *AudioBufferPool) AdaptiveResize() {
|
|||
}
|
||||
}
|
||||
|
||||
// If hit rate is very high (> 95%) and pool is large, consider shrinking
|
||||
if hitRate > 0.95 && len(p.preallocated) > p.preallocSize {
|
||||
// If hit rate is very high and pool is large, consider shrinking
|
||||
if hitRate > Config.BufferPoolHighHitRateThreshold && len(p.preallocated) > p.preallocSize {
|
||||
// Reduce preallocation by 10% but not below original size
|
||||
newSize := int(float64(len(p.preallocated)) * 0.9)
|
||||
if newSize < p.preallocSize {
|
||||
|
@ -747,7 +735,7 @@ func (p *AudioBufferPool) WarmupCache() {
|
|||
missCount := atomic.LoadInt64(&p.missCount)
|
||||
totalRequests := hitCount + missCount
|
||||
|
||||
if totalRequests < int64(cacheWarmupThreshold) {
|
||||
if totalRequests < int64(Config.BufferPoolCacheWarmupThreshold) {
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -776,7 +764,7 @@ func (p *AudioBufferPool) WarmupCache() {
|
|||
if cache != nil {
|
||||
// Fill cache to optimal level based on hit rate
|
||||
hitRate := float64(hitCount) / float64(totalRequests)
|
||||
optimalCacheSize := int(float64(cacheSize) * hitRate)
|
||||
optimalCacheSize := int(float64(Config.BufferPoolCacheSize) * hitRate)
|
||||
if optimalCacheSize < 2 {
|
||||
optimalCacheSize = 2
|
||||
}
|
||||
|
@ -800,19 +788,19 @@ func (p *AudioBufferPool) OptimizeCache() {
|
|||
missCount := atomic.LoadInt64(&p.missCount)
|
||||
totalRequests := hitCount + missCount
|
||||
|
||||
if totalRequests < 100 {
|
||||
if totalRequests < int64(Config.BufferPoolOptimizeCacheThreshold) {
|
||||
return
|
||||
}
|
||||
|
||||
hitRate := float64(hitCount) / float64(totalRequests)
|
||||
|
||||
// If hit rate is below target, trigger cache warmup
|
||||
if hitRate < cacheHitRateTarget {
|
||||
if hitRate < Config.BufferPoolCacheHitRateTarget {
|
||||
p.WarmupCache()
|
||||
}
|
||||
|
||||
// Reset counters periodically to avoid overflow and get fresh metrics
|
||||
if totalRequests > 10000 {
|
||||
if totalRequests > int64(Config.BufferPoolCounterResetThreshold) {
|
||||
atomic.StoreInt64(&p.hitCount, hitCount/2)
|
||||
atomic.StoreInt64(&p.missCount, missCount/2)
|
||||
}
|
||||
|
|
|
@ -245,10 +245,9 @@ func newSession(config SessionConfig) (*Session, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Update the audio relay with the new WebRTC audio track
|
||||
if err := audio.UpdateAudioRelayTrack(session.AudioTrack); err != nil {
|
||||
scopedLogger.Warn().Err(err).Msg("Failed to update audio relay track")
|
||||
}
|
||||
// Update the audio relay with the new WebRTC audio track asynchronously
|
||||
// This prevents blocking during session creation and avoids mutex deadlocks
|
||||
audio.UpdateAudioRelayTrackAsync(session.AudioTrack)
|
||||
|
||||
videoRtpSender, err := peerConnection.AddTrack(session.VideoTrack)
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue