feat(audio): implement goroutine pool for task processing

Add goroutine pool implementation to manage reusable workers for audio processing tasks
Add configuration constants for pool sizing and behavior
Modify audio server components to use pool for goroutine management
Add fallback to direct goroutine creation when pools are full
This commit is contained in:
Alex P 2025-09-03 12:41:27 +00:00
parent 8110be6cc6
commit d7b67e5012
4 changed files with 393 additions and 18 deletions

View File

@ -1543,6 +1543,40 @@ type AudioConfigConstants struct {
LatencyBucket500ms time.Duration // 500ms latency bucket
LatencyBucket1s time.Duration // 1s latency bucket
LatencyBucket2s time.Duration // 2s latency bucket
// Goroutine Pool Configuration
// Used in: goroutine_pool.go for managing reusable goroutines
// Impact: Reduces goroutine creation overhead and improves performance
// MaxAudioProcessorWorkers defines maximum number of workers in the audio processor pool.
// Used in: goroutine_pool.go for limiting concurrent audio processing goroutines
// Impact: Controls resource usage while ensuring sufficient processing capacity.
// Default 8 provides good parallelism without excessive resource consumption.
MaxAudioProcessorWorkers int
// MaxAudioReaderWorkers defines maximum number of workers in the audio reader pool.
// Used in: goroutine_pool.go for limiting concurrent audio reading goroutines
// Impact: Controls resource usage while ensuring sufficient reading capacity.
// Default 4 provides good parallelism for I/O operations.
MaxAudioReaderWorkers int
// AudioProcessorQueueSize defines the task queue size for the audio processor pool.
// Used in: goroutine_pool.go for buffering audio processing tasks
// Impact: Larger queue allows more tasks to be buffered during load spikes.
// Default 32 provides good buffering without excessive memory usage.
AudioProcessorQueueSize int
// AudioReaderQueueSize defines the task queue size for the audio reader pool.
// Used in: goroutine_pool.go for buffering audio reading tasks
// Impact: Larger queue allows more tasks to be buffered during load spikes.
// Default 16 provides good buffering for I/O operations.
AudioReaderQueueSize int
// WorkerMaxIdleTime defines how long a worker goroutine can remain idle before termination.
// Used in: goroutine_pool.go for efficient worker lifecycle management
// Impact: Shorter times reduce resource usage, longer times improve responsiveness.
// Default 30s balances resource usage with startup latency.
WorkerMaxIdleTime time.Duration
}
// DefaultAudioConfig returns the default configuration constants
@ -2400,6 +2434,13 @@ func DefaultAudioConfig() *AudioConfigConstants {
EventTimeFormatString: "2006-01-02T15:04:05.000Z", // "2006-01-02T15:04:05.000Z" time format
EventSubscriptionDelayMS: 100, // 100ms subscription delay
// Goroutine Pool Configuration
MaxAudioProcessorWorkers: 8, // 8 workers for audio processing tasks
MaxAudioReaderWorkers: 4, // 4 workers for audio reading tasks
AudioProcessorQueueSize: 32, // 32 tasks queue size for processor pool
AudioReaderQueueSize: 16, // 16 tasks queue size for reader pool
WorkerMaxIdleTime: 30 * time.Second, // 30s maximum idle time before worker termination
// Input Processing Constants
InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold

View File

@ -0,0 +1,272 @@
package audio
import (
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Task represents a function to be executed by a worker in the pool
type Task func()
// GoroutinePool manages a pool of reusable goroutines to reduce the overhead
// of goroutine creation and destruction
type GoroutinePool struct {
// Atomic fields must be first for proper alignment on 32-bit systems
taskCount int64 // Number of tasks processed
workerCount int64 // Current number of workers
maxIdleTime time.Duration
maxWorkers int
taskQueue chan Task
workerSem chan struct{} // Semaphore to limit concurrent workers
shutdown chan struct{}
shutdownOnce sync.Once
wg sync.WaitGroup
logger *zerolog.Logger
name string
}
// NewGoroutinePool creates a new goroutine pool with the specified parameters
func NewGoroutinePool(name string, maxWorkers int, queueSize int, maxIdleTime time.Duration) *GoroutinePool {
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-pool").Str("pool", name).Logger()
pool := &GoroutinePool{
maxWorkers: maxWorkers,
maxIdleTime: maxIdleTime,
taskQueue: make(chan Task, queueSize),
workerSem: make(chan struct{}, maxWorkers),
shutdown: make(chan struct{}),
logger: &logger,
name: name,
}
// Start a supervisor goroutine to monitor pool health
go pool.supervisor()
return pool
}
// Submit adds a task to the pool for execution
// Returns true if the task was accepted, false if the queue is full
func (p *GoroutinePool) Submit(task Task) bool {
select {
case <-p.shutdown:
return false // Pool is shutting down
case p.taskQueue <- task:
// Task accepted, ensure we have a worker to process it
p.ensureWorkerAvailable()
return true
default:
// Queue is full
return false
}
}
// ensureWorkerAvailable makes sure at least one worker is available to process tasks
func (p *GoroutinePool) ensureWorkerAvailable() {
// Try to acquire a semaphore slot without blocking
select {
case p.workerSem <- struct{}{}:
// We got a slot, start a new worker
p.startWorker()
default:
// All worker slots are taken, which means we have enough workers
}
}
// startWorker launches a new worker goroutine
func (p *GoroutinePool) startWorker() {
p.wg.Add(1)
atomic.AddInt64(&p.workerCount, 1)
go func() {
defer func() {
atomic.AddInt64(&p.workerCount, -1)
<-p.workerSem // Release the semaphore slot
p.wg.Done()
// Recover from panics in worker tasks
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Worker recovered from panic")
}
}()
idleTimer := time.NewTimer(p.maxIdleTime)
defer idleTimer.Stop()
for {
select {
case <-p.shutdown:
return
case task, ok := <-p.taskQueue:
if !ok {
return // Channel closed
}
// Reset idle timer
if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(p.maxIdleTime)
// Execute the task with panic recovery
func() {
defer func() {
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Task execution panic recovered")
}
}()
task()
}()
atomic.AddInt64(&p.taskCount, 1)
case <-idleTimer.C:
// Worker has been idle for too long, exit if we have more than minimum workers
if atomic.LoadInt64(&p.workerCount) > 1 {
return
}
// Reset timer for the minimum worker
idleTimer.Reset(p.maxIdleTime)
}
}
}()
}
// supervisor monitors the pool and logs statistics periodically
func (p *GoroutinePool) supervisor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-p.shutdown:
return
case <-ticker.C:
workers := atomic.LoadInt64(&p.workerCount)
tasks := atomic.LoadInt64(&p.taskCount)
queueLen := len(p.taskQueue)
p.logger.Info().
Int64("workers", workers).
Int64("tasks_processed", tasks).
Int("queue_length", queueLen).
Msg("Pool statistics")
}
}
}
// Shutdown gracefully shuts down the pool
// If wait is true, it will wait for all tasks to complete
// If wait is false, it will terminate immediately, potentially leaving tasks unprocessed
func (p *GoroutinePool) Shutdown(wait bool) {
p.shutdownOnce.Do(func() {
close(p.shutdown)
if wait {
// Wait for all tasks to be processed
if len(p.taskQueue) > 0 {
p.logger.Info().Int("remaining_tasks", len(p.taskQueue)).Msg("Waiting for tasks to complete")
}
// Close the task queue to signal no more tasks
close(p.taskQueue)
// Wait for all workers to finish
p.wg.Wait()
}
})
}
// GetStats returns statistics about the pool
func (p *GoroutinePool) GetStats() map[string]interface{} {
return map[string]interface{}{
"name": p.name,
"worker_count": atomic.LoadInt64(&p.workerCount),
"max_workers": p.maxWorkers,
"tasks_processed": atomic.LoadInt64(&p.taskCount),
"queue_length": len(p.taskQueue),
"queue_capacity": cap(p.taskQueue),
}
}
// Global pools for different audio processing tasks
var (
globalAudioProcessorPool atomic.Pointer[GoroutinePool]
globalAudioReaderPool atomic.Pointer[GoroutinePool]
globalAudioProcessorInitOnce sync.Once
globalAudioReaderInitOnce sync.Once
)
// GetAudioProcessorPool returns the global audio processor pool
func GetAudioProcessorPool() *GoroutinePool {
pool := globalAudioProcessorPool.Load()
if pool != nil {
return pool
}
globalAudioProcessorInitOnce.Do(func() {
config := GetConfig()
newPool := NewGoroutinePool(
"audio-processor",
config.MaxAudioProcessorWorkers,
config.AudioProcessorQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioProcessorPool.Store(newPool)
pool = newPool
})
return globalAudioProcessorPool.Load()
}
// GetAudioReaderPool returns the global audio reader pool
func GetAudioReaderPool() *GoroutinePool {
pool := globalAudioReaderPool.Load()
if pool != nil {
return pool
}
globalAudioReaderInitOnce.Do(func() {
config := GetConfig()
newPool := NewGoroutinePool(
"audio-reader",
config.MaxAudioReaderWorkers,
config.AudioReaderQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioReaderPool.Store(newPool)
pool = newPool
})
return globalAudioReaderPool.Load()
}
// SubmitAudioProcessorTask submits a task to the audio processor pool
func SubmitAudioProcessorTask(task Task) bool {
return GetAudioProcessorPool().Submit(task)
}
// SubmitAudioReaderTask submits a task to the audio reader pool
func SubmitAudioReaderTask(task Task) bool {
return GetAudioReaderPool().Submit(task)
}
// ShutdownAudioPools shuts down all audio goroutine pools
func ShutdownAudioPools(wait bool) {
logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger()
processorPool := globalAudioProcessorPool.Load()
if processorPool != nil {
logger.Info().Msg("Shutting down audio processor pool")
processorPool.Shutdown(wait)
}
readerPool := globalAudioReaderPool.Load()
if readerPool != nil {
logger.Info().Msg("Shutting down audio reader pool")
readerPool.Shutdown(wait)
}
}

View File

@ -289,8 +289,14 @@ func (ais *AudioInputServer) Start() error {
ais.startProcessorGoroutine()
ais.startMonitorGoroutine()
// Accept connections in a goroutine
go ais.acceptConnections()
// Submit the connection acceptor to the audio reader pool
if !SubmitAudioReaderTask(ais.acceptConnections) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go ais.acceptConnections()
}
return nil
}
@ -360,8 +366,14 @@ func (ais *AudioInputServer) acceptConnections() {
ais.conn = conn
ais.mtx.Unlock()
// Handle this connection
go ais.handleConnection(conn)
// Handle this connection using the goroutine pool
if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go ais.handleConnection(conn)
}
}
}
@ -878,10 +890,12 @@ func (aic *AudioInputClient) ResetStats() {
ResetFrameStats(&aic.totalFrames, &aic.droppedFrames)
}
// startReaderGoroutine starts the message reader goroutine
// startReaderGoroutine starts the message reader using the goroutine pool
func (ais *AudioInputServer) startReaderGoroutine() {
ais.wg.Add(1)
go func() {
// Create a reader task that will run in the goroutine pool
readerTask := func() {
defer ais.wg.Done()
// Enhanced error tracking and recovery
@ -966,13 +980,24 @@ func (ais *AudioInputServer) startReaderGoroutine() {
}
}
}
}()
}
// Submit the reader task to the audio reader pool
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioReaderTask(readerTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go readerTask()
}
}
// startProcessorGoroutine starts the message processor goroutine
// startProcessorGoroutine starts the message processor using the goroutine pool
func (ais *AudioInputServer) startProcessorGoroutine() {
ais.wg.Add(1)
go func() {
// Create a processor task that will run in the goroutine pool
processorTask := func() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
@ -1049,7 +1074,16 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds())
}
}
}()
}
// Submit the processor task to the audio processor pool
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTask(processorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
go processorTask()
}
}
// processMessageWithRecovery processes a message with enhanced error recovery
@ -1086,10 +1120,12 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo
}
}
// startMonitorGoroutine starts the performance monitoring goroutine
// startMonitorGoroutine starts the performance monitoring using the goroutine pool
func (ais *AudioInputServer) startMonitorGoroutine() {
ais.wg.Add(1)
go func() {
// Create a monitor task that will run in the goroutine pool
monitorTask := func() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
@ -1166,7 +1202,16 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
}
}
}
}()
}
// Submit the monitor task to the audio processor pool
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTask(monitorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
go monitorTask()
}
}
// GetServerStats returns server performance statistics

View File

@ -159,8 +159,14 @@ func (s *AudioOutputServer) Start() error {
// Start message processor goroutine
s.startProcessorGoroutine()
// Accept connections in a goroutine
go s.acceptConnections()
// Submit the connection acceptor to the audio reader pool
if !SubmitAudioReaderTask(s.acceptConnections) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go s.acceptConnections()
}
return nil
}
@ -199,10 +205,12 @@ func (s *AudioOutputServer) acceptConnections() {
}
}
// startProcessorGoroutine starts the message processor
// startProcessorGoroutine starts the message processor using the goroutine pool
func (s *AudioOutputServer) startProcessorGoroutine() {
s.wg.Add(1)
go func() {
// Create a processor task that will run in the goroutine pool
processorTask := func() {
defer s.wg.Done()
for {
select {
@ -218,7 +226,16 @@ func (s *AudioOutputServer) startProcessorGoroutine() {
return
}
}
}()
}
// Submit the processor task to the audio processor pool
if !SubmitAudioProcessorTask(processorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
go processorTask()
}
}
func (s *AudioOutputServer) Stop() {