From f9781f170c8264bef688b3181d89fdc4d1627b1d Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 3 Sep 2025 12:54:07 +0000 Subject: [PATCH] perf(audio): increase worker pool sizes and optimize worker management Double worker counts and queue sizes to handle higher load scenarios Modify worker management to maintain minimum 2 persistent workers with longer idle timeout --- internal/audio/config_constants.go | 10 ++++----- internal/audio/goroutine_pool.go | 33 ++++++++++++++++++++---------- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index 9156d641..1249e016 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -2435,11 +2435,11 @@ func DefaultAudioConfig() *AudioConfigConstants { EventSubscriptionDelayMS: 100, // 100ms subscription delay // Goroutine Pool Configuration - MaxAudioProcessorWorkers: 8, // 8 workers for audio processing tasks - MaxAudioReaderWorkers: 4, // 4 workers for audio reading tasks - AudioProcessorQueueSize: 32, // 32 tasks queue size for processor pool - AudioReaderQueueSize: 16, // 16 tasks queue size for reader pool - WorkerMaxIdleTime: 30 * time.Second, // 30s maximum idle time before worker termination + MaxAudioProcessorWorkers: 16, // 16 workers for audio processing tasks + MaxAudioReaderWorkers: 8, // 8 workers for audio reading tasks + AudioProcessorQueueSize: 64, // 64 tasks queue size for processor pool + AudioReaderQueueSize: 32, // 32 tasks queue size for reader pool + WorkerMaxIdleTime: 60 * time.Second, // 60s maximum idle time before worker termination // Input Processing Constants InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go index 5c061f4c..6af0e00c 100644 --- a/internal/audio/goroutine_pool.go +++ b/internal/audio/goroutine_pool.go @@ -67,13 +67,22 @@ func (p *GoroutinePool) Submit(task Task) bool { // ensureWorkerAvailable makes sure at least one worker is available to process tasks func (p *GoroutinePool) ensureWorkerAvailable() { - // Try to acquire a semaphore slot without blocking - select { - case p.workerSem <- struct{}{}: - // We got a slot, start a new worker - p.startWorker() - default: - // All worker slots are taken, which means we have enough workers + // Check if we already have enough workers + currentWorkers := atomic.LoadInt64(&p.workerCount) + + // Only start new workers if: + // 1. We have no workers at all, or + // 2. The queue is growing and we're below max workers + queueLen := len(p.taskQueue) + if currentWorkers == 0 || (queueLen > int(currentWorkers) && currentWorkers < int64(p.maxWorkers)) { + // Try to acquire a semaphore slot without blocking + select { + case p.workerSem <- struct{}{}: + // We got a slot, start a new worker + p.startWorker() + default: + // All worker slots are taken, which means we have enough workers + } } } @@ -124,12 +133,14 @@ func (p *GoroutinePool) startWorker() { atomic.AddInt64(&p.taskCount, 1) case <-idleTimer.C: - // Worker has been idle for too long, exit if we have more than minimum workers - if atomic.LoadInt64(&p.workerCount) > 1 { + // Worker has been idle for too long + // Keep at least 2 workers alive to handle incoming tasks without creating new goroutines + if atomic.LoadInt64(&p.workerCount) > 2 { return } - // Reset timer for the minimum worker - idleTimer.Reset(p.maxIdleTime) + // For persistent workers (the minimum 2), use a longer idle timeout + // This prevents excessive worker creation/destruction cycles + idleTimer.Reset(p.maxIdleTime * 3) // Triple the idle time for persistent workers } } }()