diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index 1c81c770..9156d641 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -1543,6 +1543,40 @@ type AudioConfigConstants struct { LatencyBucket500ms time.Duration // 500ms latency bucket LatencyBucket1s time.Duration // 1s latency bucket LatencyBucket2s time.Duration // 2s latency bucket + + // Goroutine Pool Configuration + // Used in: goroutine_pool.go for managing reusable goroutines + // Impact: Reduces goroutine creation overhead and improves performance + + // MaxAudioProcessorWorkers defines maximum number of workers in the audio processor pool. + // Used in: goroutine_pool.go for limiting concurrent audio processing goroutines + // Impact: Controls resource usage while ensuring sufficient processing capacity. + // Default 8 provides good parallelism without excessive resource consumption. + MaxAudioProcessorWorkers int + + // MaxAudioReaderWorkers defines maximum number of workers in the audio reader pool. + // Used in: goroutine_pool.go for limiting concurrent audio reading goroutines + // Impact: Controls resource usage while ensuring sufficient reading capacity. + // Default 4 provides good parallelism for I/O operations. + MaxAudioReaderWorkers int + + // AudioProcessorQueueSize defines the task queue size for the audio processor pool. + // Used in: goroutine_pool.go for buffering audio processing tasks + // Impact: Larger queue allows more tasks to be buffered during load spikes. + // Default 32 provides good buffering without excessive memory usage. + AudioProcessorQueueSize int + + // AudioReaderQueueSize defines the task queue size for the audio reader pool. + // Used in: goroutine_pool.go for buffering audio reading tasks + // Impact: Larger queue allows more tasks to be buffered during load spikes. + // Default 16 provides good buffering for I/O operations. + AudioReaderQueueSize int + + // WorkerMaxIdleTime defines how long a worker goroutine can remain idle before termination. + // Used in: goroutine_pool.go for efficient worker lifecycle management + // Impact: Shorter times reduce resource usage, longer times improve responsiveness. + // Default 30s balances resource usage with startup latency. + WorkerMaxIdleTime time.Duration } // DefaultAudioConfig returns the default configuration constants @@ -2400,6 +2434,13 @@ func DefaultAudioConfig() *AudioConfigConstants { EventTimeFormatString: "2006-01-02T15:04:05.000Z", // "2006-01-02T15:04:05.000Z" time format EventSubscriptionDelayMS: 100, // 100ms subscription delay + // Goroutine Pool Configuration + MaxAudioProcessorWorkers: 8, // 8 workers for audio processing tasks + MaxAudioReaderWorkers: 4, // 4 workers for audio reading tasks + AudioProcessorQueueSize: 32, // 32 tasks queue size for processor pool + AudioReaderQueueSize: 16, // 16 tasks queue size for reader pool + WorkerMaxIdleTime: 30 * time.Second, // 30s maximum idle time before worker termination + // Input Processing Constants InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go new file mode 100644 index 00000000..5c061f4c --- /dev/null +++ b/internal/audio/goroutine_pool.go @@ -0,0 +1,272 @@ +package audio + +import ( + "sync" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// Task represents a function to be executed by a worker in the pool +type Task func() + +// GoroutinePool manages a pool of reusable goroutines to reduce the overhead +// of goroutine creation and destruction +type GoroutinePool struct { + // Atomic fields must be first for proper alignment on 32-bit systems + taskCount int64 // Number of tasks processed + workerCount int64 // Current number of workers + maxIdleTime time.Duration + maxWorkers int + taskQueue chan Task + workerSem chan struct{} // Semaphore to limit concurrent workers + shutdown chan struct{} + shutdownOnce sync.Once + wg sync.WaitGroup + logger *zerolog.Logger + name string +} + +// NewGoroutinePool creates a new goroutine pool with the specified parameters +func NewGoroutinePool(name string, maxWorkers int, queueSize int, maxIdleTime time.Duration) *GoroutinePool { + logger := logging.GetDefaultLogger().With().Str("component", "goroutine-pool").Str("pool", name).Logger() + + pool := &GoroutinePool{ + maxWorkers: maxWorkers, + maxIdleTime: maxIdleTime, + taskQueue: make(chan Task, queueSize), + workerSem: make(chan struct{}, maxWorkers), + shutdown: make(chan struct{}), + logger: &logger, + name: name, + } + + // Start a supervisor goroutine to monitor pool health + go pool.supervisor() + + return pool +} + +// Submit adds a task to the pool for execution +// Returns true if the task was accepted, false if the queue is full +func (p *GoroutinePool) Submit(task Task) bool { + select { + case <-p.shutdown: + return false // Pool is shutting down + case p.taskQueue <- task: + // Task accepted, ensure we have a worker to process it + p.ensureWorkerAvailable() + return true + default: + // Queue is full + return false + } +} + +// ensureWorkerAvailable makes sure at least one worker is available to process tasks +func (p *GoroutinePool) ensureWorkerAvailable() { + // Try to acquire a semaphore slot without blocking + select { + case p.workerSem <- struct{}{}: + // We got a slot, start a new worker + p.startWorker() + default: + // All worker slots are taken, which means we have enough workers + } +} + +// startWorker launches a new worker goroutine +func (p *GoroutinePool) startWorker() { + p.wg.Add(1) + atomic.AddInt64(&p.workerCount, 1) + + go func() { + defer func() { + atomic.AddInt64(&p.workerCount, -1) + <-p.workerSem // Release the semaphore slot + p.wg.Done() + + // Recover from panics in worker tasks + if r := recover(); r != nil { + p.logger.Error().Interface("panic", r).Msg("Worker recovered from panic") + } + }() + + idleTimer := time.NewTimer(p.maxIdleTime) + defer idleTimer.Stop() + + for { + select { + case <-p.shutdown: + return + case task, ok := <-p.taskQueue: + if !ok { + return // Channel closed + } + + // Reset idle timer + if !idleTimer.Stop() { + <-idleTimer.C + } + idleTimer.Reset(p.maxIdleTime) + + // Execute the task with panic recovery + func() { + defer func() { + if r := recover(); r != nil { + p.logger.Error().Interface("panic", r).Msg("Task execution panic recovered") + } + }() + task() + }() + + atomic.AddInt64(&p.taskCount, 1) + case <-idleTimer.C: + // Worker has been idle for too long, exit if we have more than minimum workers + if atomic.LoadInt64(&p.workerCount) > 1 { + return + } + // Reset timer for the minimum worker + idleTimer.Reset(p.maxIdleTime) + } + } + }() +} + +// supervisor monitors the pool and logs statistics periodically +func (p *GoroutinePool) supervisor() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + for { + select { + case <-p.shutdown: + return + case <-ticker.C: + workers := atomic.LoadInt64(&p.workerCount) + tasks := atomic.LoadInt64(&p.taskCount) + queueLen := len(p.taskQueue) + + p.logger.Info(). + Int64("workers", workers). + Int64("tasks_processed", tasks). + Int("queue_length", queueLen). + Msg("Pool statistics") + } + } +} + +// Shutdown gracefully shuts down the pool +// If wait is true, it will wait for all tasks to complete +// If wait is false, it will terminate immediately, potentially leaving tasks unprocessed +func (p *GoroutinePool) Shutdown(wait bool) { + p.shutdownOnce.Do(func() { + close(p.shutdown) + + if wait { + // Wait for all tasks to be processed + if len(p.taskQueue) > 0 { + p.logger.Info().Int("remaining_tasks", len(p.taskQueue)).Msg("Waiting for tasks to complete") + } + + // Close the task queue to signal no more tasks + close(p.taskQueue) + + // Wait for all workers to finish + p.wg.Wait() + } + }) +} + +// GetStats returns statistics about the pool +func (p *GoroutinePool) GetStats() map[string]interface{} { + return map[string]interface{}{ + "name": p.name, + "worker_count": atomic.LoadInt64(&p.workerCount), + "max_workers": p.maxWorkers, + "tasks_processed": atomic.LoadInt64(&p.taskCount), + "queue_length": len(p.taskQueue), + "queue_capacity": cap(p.taskQueue), + } +} + +// Global pools for different audio processing tasks +var ( + globalAudioProcessorPool atomic.Pointer[GoroutinePool] + globalAudioReaderPool atomic.Pointer[GoroutinePool] + globalAudioProcessorInitOnce sync.Once + globalAudioReaderInitOnce sync.Once +) + +// GetAudioProcessorPool returns the global audio processor pool +func GetAudioProcessorPool() *GoroutinePool { + pool := globalAudioProcessorPool.Load() + if pool != nil { + return pool + } + + globalAudioProcessorInitOnce.Do(func() { + config := GetConfig() + newPool := NewGoroutinePool( + "audio-processor", + config.MaxAudioProcessorWorkers, + config.AudioProcessorQueueSize, + config.WorkerMaxIdleTime, + ) + globalAudioProcessorPool.Store(newPool) + pool = newPool + }) + + return globalAudioProcessorPool.Load() +} + +// GetAudioReaderPool returns the global audio reader pool +func GetAudioReaderPool() *GoroutinePool { + pool := globalAudioReaderPool.Load() + if pool != nil { + return pool + } + + globalAudioReaderInitOnce.Do(func() { + config := GetConfig() + newPool := NewGoroutinePool( + "audio-reader", + config.MaxAudioReaderWorkers, + config.AudioReaderQueueSize, + config.WorkerMaxIdleTime, + ) + globalAudioReaderPool.Store(newPool) + pool = newPool + }) + + return globalAudioReaderPool.Load() +} + +// SubmitAudioProcessorTask submits a task to the audio processor pool +func SubmitAudioProcessorTask(task Task) bool { + return GetAudioProcessorPool().Submit(task) +} + +// SubmitAudioReaderTask submits a task to the audio reader pool +func SubmitAudioReaderTask(task Task) bool { + return GetAudioReaderPool().Submit(task) +} + +// ShutdownAudioPools shuts down all audio goroutine pools +func ShutdownAudioPools(wait bool) { + logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger() + + processorPool := globalAudioProcessorPool.Load() + if processorPool != nil { + logger.Info().Msg("Shutting down audio processor pool") + processorPool.Shutdown(wait) + } + + readerPool := globalAudioReaderPool.Load() + if readerPool != nil { + logger.Info().Msg("Shutting down audio reader pool") + readerPool.Shutdown(wait) + } +} diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 2142f206..b4279185 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -289,8 +289,14 @@ func (ais *AudioInputServer) Start() error { ais.startProcessorGoroutine() ais.startMonitorGoroutine() - // Accept connections in a goroutine - go ais.acceptConnections() + // Submit the connection acceptor to the audio reader pool + if !SubmitAudioReaderTask(ais.acceptConnections) { + // If the pool is full or shutting down, fall back to direct goroutine creation + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") + + go ais.acceptConnections() + } return nil } @@ -360,8 +366,14 @@ func (ais *AudioInputServer) acceptConnections() { ais.conn = conn ais.mtx.Unlock() - // Handle this connection - go ais.handleConnection(conn) + // Handle this connection using the goroutine pool + if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) { + // If the pool is full or shutting down, fall back to direct goroutine creation + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") + + go ais.handleConnection(conn) + } } } @@ -878,10 +890,12 @@ func (aic *AudioInputClient) ResetStats() { ResetFrameStats(&aic.totalFrames, &aic.droppedFrames) } -// startReaderGoroutine starts the message reader goroutine +// startReaderGoroutine starts the message reader using the goroutine pool func (ais *AudioInputServer) startReaderGoroutine() { ais.wg.Add(1) - go func() { + + // Create a reader task that will run in the goroutine pool + readerTask := func() { defer ais.wg.Done() // Enhanced error tracking and recovery @@ -966,13 +980,24 @@ func (ais *AudioInputServer) startReaderGoroutine() { } } } - }() + } + + // Submit the reader task to the audio reader pool + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + if !SubmitAudioReaderTask(readerTask) { + // If the pool is full or shutting down, fall back to direct goroutine creation + logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") + + go readerTask() + } } -// startProcessorGoroutine starts the message processor goroutine +// startProcessorGoroutine starts the message processor using the goroutine pool func (ais *AudioInputServer) startProcessorGoroutine() { ais.wg.Add(1) - go func() { + + // Create a processor task that will run in the goroutine pool + processorTask := func() { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -1049,7 +1074,16 @@ func (ais *AudioInputServer) startProcessorGoroutine() { atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds()) } } - }() + } + + // Submit the processor task to the audio processor pool + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + if !SubmitAudioProcessorTask(processorTask) { + // If the pool is full or shutting down, fall back to direct goroutine creation + logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation") + + go processorTask() + } } // processMessageWithRecovery processes a message with enhanced error recovery @@ -1086,10 +1120,12 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo } } -// startMonitorGoroutine starts the performance monitoring goroutine +// startMonitorGoroutine starts the performance monitoring using the goroutine pool func (ais *AudioInputServer) startMonitorGoroutine() { ais.wg.Add(1) - go func() { + + // Create a monitor task that will run in the goroutine pool + monitorTask := func() { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -1166,7 +1202,16 @@ func (ais *AudioInputServer) startMonitorGoroutine() { } } } - }() + } + + // Submit the monitor task to the audio processor pool + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + if !SubmitAudioProcessorTask(monitorTask) { + // If the pool is full or shutting down, fall back to direct goroutine creation + logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation") + + go monitorTask() + } } // GetServerStats returns server performance statistics diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index 208f7aa4..d172144e 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -159,8 +159,14 @@ func (s *AudioOutputServer) Start() error { // Start message processor goroutine s.startProcessorGoroutine() - // Accept connections in a goroutine - go s.acceptConnections() + // Submit the connection acceptor to the audio reader pool + if !SubmitAudioReaderTask(s.acceptConnections) { + // If the pool is full or shutting down, fall back to direct goroutine creation + logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() + logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") + + go s.acceptConnections() + } return nil } @@ -199,10 +205,12 @@ func (s *AudioOutputServer) acceptConnections() { } } -// startProcessorGoroutine starts the message processor +// startProcessorGoroutine starts the message processor using the goroutine pool func (s *AudioOutputServer) startProcessorGoroutine() { s.wg.Add(1) - go func() { + + // Create a processor task that will run in the goroutine pool + processorTask := func() { defer s.wg.Done() for { select { @@ -218,7 +226,16 @@ func (s *AudioOutputServer) startProcessorGoroutine() { return } } - }() + } + + // Submit the processor task to the audio processor pool + if !SubmitAudioProcessorTask(processorTask) { + // If the pool is full or shutting down, fall back to direct goroutine creation + logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() + logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation") + + go processorTask() + } } func (s *AudioOutputServer) Stop() {