diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index 79c1fdad..a45f4b9c 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -152,6 +152,29 @@ func (abm *AdaptiveBufferManager) GetOutputBufferSize() int { // UpdateLatency updates the current latency measurement func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) { + // Use exponential moving average for latency tracking + // Weight: 90% historical, 10% current (for smoother averaging) + currentAvg := atomic.LoadInt64(&abm.averageLatency) + newLatencyNs := latency.Nanoseconds() + + if currentAvg == 0 { + // First measurement + atomic.StoreInt64(&abm.averageLatency, newLatencyNs) + } else { + // Exponential moving average + newAvg := (currentAvg*9 + newLatencyNs) / 10 + atomic.StoreInt64(&abm.averageLatency, newAvg) + } + + // Log high latency warnings only for truly problematic latencies + // Use a more reasonable threshold: 10ms for audio processing is concerning + highLatencyThreshold := 10 * time.Millisecond + if latency > highLatencyThreshold { + abm.logger.Debug(). + Dur("latency_ms", latency/time.Millisecond). + Dur("threshold_ms", highLatencyThreshold/time.Millisecond). + Msg("High audio processing latency detected") + } } // adaptationLoop is the main loop that adjusts buffer sizes diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go index 6af0e00c..cfc844e0 100644 --- a/internal/audio/goroutine_pool.go +++ b/internal/audio/goroutine_pool.go @@ -65,6 +65,42 @@ func (p *GoroutinePool) Submit(task Task) bool { } } +// SubmitWithBackpressure adds a task to the pool with backpressure handling +// Returns true if task was accepted, false if dropped due to backpressure +func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool { + select { + case <-p.shutdown: + return false // Pool is shutting down + case p.taskQueue <- task: + // Task accepted, ensure we have a worker to process it + p.ensureWorkerAvailable() + return true + default: + // Queue is full - apply backpressure + // Check if we're in a high-load situation + queueLen := len(p.taskQueue) + queueCap := cap(p.taskQueue) + workerCount := atomic.LoadInt64(&p.workerCount) + + // If queue is >90% full and we're at max workers, drop the task + if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) { + p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure") + return false + } + + // Try one more time with a short timeout + select { + case p.taskQueue <- task: + p.ensureWorkerAvailable() + return true + case <-time.After(1 * time.Millisecond): + // Still can't submit after timeout - drop task + p.logger.Debug().Msg("Task dropped after backpressure timeout") + return false + } + } +} + // ensureWorkerAvailable makes sure at least one worker is available to process tasks func (p *GoroutinePool) ensureWorkerAvailable() { // Check if we already have enough workers @@ -265,6 +301,16 @@ func SubmitAudioReaderTask(task Task) bool { return GetAudioReaderPool().Submit(task) } +// SubmitAudioProcessorTaskWithBackpressure submits a task with backpressure handling +func SubmitAudioProcessorTaskWithBackpressure(task Task) bool { + return GetAudioProcessorPool().SubmitWithBackpressure(task) +} + +// SubmitAudioReaderTaskWithBackpressure submits a task with backpressure handling +func SubmitAudioReaderTaskWithBackpressure(task Task) bool { + return GetAudioReaderPool().SubmitWithBackpressure(task) +} + // ShutdownAudioPools shuts down all audio goroutine pools func ShutdownAudioPools(wait bool) { logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger() diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index cac1dedf..b2202905 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -191,6 +191,10 @@ type AudioInputServer struct { stopChan chan struct{} // Stop signal for all goroutines wg sync.WaitGroup // Wait group for goroutine coordination + // Channel resizing support + channelMutex sync.RWMutex // Protects channel recreation + lastBufferSize int64 // Last known buffer size for change detection + // Socket buffer configuration socketBufferConfig SocketBufferConfig } @@ -231,6 +235,13 @@ func NewAudioInputServer() (*AudioInputServer, error) { adaptiveManager := GetAdaptiveBufferManager() initialBufferSize := int64(adaptiveManager.GetInputBufferSize()) + // Ensure minimum buffer size to prevent immediate overflow + // Use at least 50 frames to handle burst traffic + minBufferSize := int64(50) + if initialBufferSize < minBufferSize { + initialBufferSize = minBufferSize + } + // Initialize socket buffer configuration socketBufferConfig := DefaultSocketBufferConfig() @@ -240,6 +251,7 @@ func NewAudioInputServer() (*AudioInputServer, error) { processChan: make(chan *InputIPCMessage, initialBufferSize), stopChan: make(chan struct{}), bufferSize: initialBufferSize, + lastBufferSize: initialBufferSize, socketBufferConfig: socketBufferConfig, }, nil } @@ -950,9 +962,13 @@ func (ais *AudioInputServer) startReaderGoroutine() { } } - // Send to message channel with non-blocking write + // Send to message channel with non-blocking write (use read lock for channel access) + ais.channelMutex.RLock() + messageChan := ais.messageChan + ais.channelMutex.RUnlock() + select { - case ais.messageChan <- msg: + case messageChan <- msg: atomic.AddInt64(&ais.totalFrames, 1) default: // Channel full, drop message @@ -966,16 +982,16 @@ func (ais *AudioInputServer) startReaderGoroutine() { } } - // Submit the reader task to the audio reader pool + // Submit the reader task to the audio reader pool with backpressure logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if !SubmitAudioReaderTask(readerTask) { - // If the pool is full or shutting down, fall back to direct goroutine creation - // Only log if warn level enabled - avoid sampling logic in critical path - if logger.GetLevel() <= zerolog.WarnLevel { - logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") - } + if !SubmitAudioReaderTaskWithBackpressure(readerTask) { + // Task was dropped due to backpressure - this is expected under high load + // Log at debug level to avoid spam, but track the drop + logger.Debug().Msg("Audio reader task dropped due to backpressure") - go readerTask() + // Don't fall back to unlimited goroutine creation + // Instead, let the system recover naturally + ais.wg.Done() // Decrement the wait group since we're not starting the task } } @@ -1011,7 +1027,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() { select { case <-ais.stopChan: return - case msg := <-ais.messageChan: + case msg := <-ais.getMessageChan(): // Process message with error handling start := time.Now() err := ais.processMessageWithRecovery(msg, logger) @@ -1032,9 +1048,10 @@ func (ais *AudioInputServer) startProcessorGoroutine() { // If too many processing errors, drop frames more aggressively if processingErrors >= maxProcessingErrors { // Clear processing queue to recover - for len(ais.processChan) > 0 { + processChan := ais.getProcessChan() + for len(processChan) > 0 { select { - case <-ais.processChan: + case <-processChan: atomic.AddInt64(&ais.droppedFrames, 1) default: break @@ -1057,13 +1074,16 @@ func (ais *AudioInputServer) startProcessorGoroutine() { } } - // Submit the processor task to the audio processor pool + // Submit the processor task to the audio processor pool with backpressure logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if !SubmitAudioProcessorTask(processorTask) { - // If the pool is full or shutting down, fall back to direct goroutine creation - logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation") + if !SubmitAudioProcessorTaskWithBackpressure(processorTask) { + // Task was dropped due to backpressure - this is expected under high load + // Log at debug level to avoid spam, but track the drop + logger.Debug().Msg("Audio processor task dropped due to backpressure") - go processorTask() + // Don't fall back to unlimited goroutine creation + // Instead, let the system recover naturally + ais.wg.Done() // Decrement the wait group since we're not starting the task } } @@ -1072,13 +1092,14 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo // Intelligent frame dropping: prioritize recent frames if msg.Type == InputMessageTypeOpusFrame { // Check if processing queue is getting full - queueLen := len(ais.processChan) + processChan := ais.getProcessChan() + queueLen := len(processChan) bufferSize := int(atomic.LoadInt64(&ais.bufferSize)) if queueLen > bufferSize*3/4 { // Drop oldest frames, keep newest select { - case <-ais.processChan: // Remove oldest + case <-processChan: // Remove oldest atomic.AddInt64(&ais.droppedFrames, 1) logger.Debug().Msg("Dropped oldest frame to make room") default: @@ -1086,9 +1107,13 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo } } - // Send to processing queue with timeout + // Send to processing queue with timeout (use read lock for channel access) + ais.channelMutex.RLock() + processChan := ais.processChan + ais.channelMutex.RUnlock() + select { - case ais.processChan <- msg: + case processChan <- msg: return nil case <-time.After(GetConfig().WriteTimeout): // Processing queue full and timeout reached, drop frame @@ -1135,7 +1160,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { // Process frames from processing queue for { select { - case msg := <-ais.processChan: + case msg := <-ais.getProcessChan(): start := time.Now() err := ais.processMessage(msg) processingTime := time.Since(start) @@ -1183,13 +1208,16 @@ func (ais *AudioInputServer) startMonitorGoroutine() { } } - // Submit the monitor task to the audio processor pool + // Submit the monitor task to the audio processor pool with backpressure logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if !SubmitAudioProcessorTask(monitorTask) { - // If the pool is full or shutting down, fall back to direct goroutine creation - logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation") + if !SubmitAudioProcessorTaskWithBackpressure(monitorTask) { + // Task was dropped due to backpressure - this is expected under high load + // Log at debug level to avoid spam, but track the drop + logger.Debug().Msg("Audio monitor task dropped due to backpressure") - go monitorTask() + // Don't fall back to unlimited goroutine creation + // Instead, let the system recover naturally + ais.wg.Done() // Decrement the wait group since we're not starting the task } } @@ -1205,7 +1233,61 @@ func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessi func (ais *AudioInputServer) UpdateBufferSize() { adaptiveManager := GetAdaptiveBufferManager() newSize := int64(adaptiveManager.GetInputBufferSize()) + oldSize := atomic.LoadInt64(&ais.bufferSize) + + // Only recreate channels if size changed significantly (>25% difference) + if oldSize > 0 { + diff := float64(newSize-oldSize) / float64(oldSize) + if diff < 0.25 && diff > -0.25 { + return // Size change not significant enough + } + } + atomic.StoreInt64(&ais.bufferSize, newSize) + + // Recreate channels with new buffer size if server is running + if ais.running { + ais.recreateChannels(int(newSize)) + } +} + +// recreateChannels recreates the message channels with new buffer size +func (ais *AudioInputServer) recreateChannels(newSize int) { + ais.channelMutex.Lock() + defer ais.channelMutex.Unlock() + + // Create new channels with updated buffer size + newMessageChan := make(chan *InputIPCMessage, newSize) + newProcessChan := make(chan *InputIPCMessage, newSize) + + // Drain old channels and transfer messages to new channels + ais.drainAndTransferChannel(ais.messageChan, newMessageChan) + ais.drainAndTransferChannel(ais.processChan, newProcessChan) + + // Replace channels atomically + ais.messageChan = newMessageChan + ais.processChan = newProcessChan + ais.lastBufferSize = int64(newSize) +} + +// drainAndTransferChannel drains the old channel and transfers messages to new channel +func (ais *AudioInputServer) drainAndTransferChannel(oldChan, newChan chan *InputIPCMessage) { + for { + select { + case msg := <-oldChan: + // Try to transfer to new channel, drop if full + select { + case newChan <- msg: + // Successfully transferred + default: + // New channel full, drop message + atomic.AddInt64(&ais.droppedFrames, 1) + } + default: + // Old channel empty + return + } + } } // ReportLatency reports processing latency to adaptive buffer manager @@ -1259,6 +1341,20 @@ func GetGlobalMessagePoolStats() MessagePoolStats { return globalMessagePool.GetMessagePoolStats() } +// getMessageChan safely returns the current message channel +func (ais *AudioInputServer) getMessageChan() chan *InputIPCMessage { + ais.channelMutex.RLock() + defer ais.channelMutex.RUnlock() + return ais.messageChan +} + +// getProcessChan safely returns the current process channel +func (ais *AudioInputServer) getProcessChan() chan *InputIPCMessage { + ais.channelMutex.RLock() + defer ais.channelMutex.RUnlock() + return ais.processChan +} + // Helper functions // getInputSocketPath is now defined in unified_ipc.go diff --git a/internal/audio/ipc_unified.go b/internal/audio/ipc_unified.go index 2b293d5f..ada7faf0 100644 --- a/internal/audio/ipc_unified.go +++ b/internal/audio/ipc_unified.go @@ -24,6 +24,14 @@ var ( headerSize = 17 // Fixed header size: 4+1+4+8 bytes ) +// Header buffer pool to reduce allocation overhead +var headerBufferPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, headerSize) + return &buf + }, +} + // UnifiedMessageType represents the type of IPC message for both input and output type UnifiedMessageType uint8 @@ -283,8 +291,11 @@ func (s *UnifiedAudioServer) startProcessorGoroutine() { // readMessage reads a message from the connection func (s *UnifiedAudioServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) { - // Read header - header := make([]byte, headerSize) + // Get header buffer from pool + headerPtr := headerBufferPool.Get().(*[]byte) + header := *headerPtr + defer headerBufferPool.Put(headerPtr) + if _, err := io.ReadFull(conn, header); err != nil { return nil, fmt.Errorf("failed to read header: %w", err) } @@ -361,8 +372,11 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error { // writeMessage writes a message to the connection func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { - // Write header - header := make([]byte, headerSize) + // Get header buffer from pool + headerPtr := headerBufferPool.Get().(*[]byte) + header := *headerPtr + defer headerBufferPool.Put(headerPtr) + binary.LittleEndian.PutUint32(header[0:4], msg.Magic) header[4] = uint8(msg.Type) binary.LittleEndian.PutUint32(header[5:9], msg.Length)