From 629cdf59a7eff0f3f4287c7aa9861615948806d5 Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 13 Aug 2025 14:49:08 +0000 Subject: [PATCH] perf(audio): optimize audio processing with batching and atomic operations - Implement batch audio processing to reduce CGO overhead - Replace mutexes with atomic operations for contention management - Add buffer pooling to reduce allocations - Optimize microphone operation cooldown with lock-free approach - Improve error handling with pre-allocated error objects --- internal/audio/batch_audio.go | 455 ++++++++++++++++++++++++++++ internal/audio/buffer_pool.go | 64 ++++ internal/audio/cgo_audio.go | 31 +- internal/audio/cgo_audio_stub.go | 2 +- internal/audio/mic_contention.go | 158 ++++++++++ internal/audio/nonblocking_api.go | 105 ++++--- internal/audio/nonblocking_audio.go | 52 ++-- web.go | 66 ++-- 8 files changed, 817 insertions(+), 116 deletions(-) create mode 100644 internal/audio/batch_audio.go create mode 100644 internal/audio/buffer_pool.go create mode 100644 internal/audio/mic_contention.go diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go new file mode 100644 index 0000000..61d8dcc --- /dev/null +++ b/internal/audio/batch_audio.go @@ -0,0 +1,455 @@ +//go:build cgo + +package audio + +import ( + "context" + "runtime" + "sync" + "sync/atomic" + "time" + "unsafe" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// BatchAudioProcessor manages batched CGO operations to reduce syscall overhead +type BatchAudioProcessor struct { + // Statistics - MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) + stats BatchAudioStats + + // Control + ctx context.Context + cancel context.CancelFunc + logger *zerolog.Logger + batchSize int + batchDuration time.Duration + + // Batch queues and state (atomic for lock-free access) + readQueue chan batchReadRequest + writeQueue chan batchWriteRequest + initialized int32 + running int32 + threadPinned int32 + + // Buffers (pre-allocated to avoid allocation overhead) + readBufPool *sync.Pool + writeBufPool *sync.Pool +} + +type BatchAudioStats struct { + // int64 fields MUST be first for ARM32 alignment + BatchedReads int64 + BatchedWrites int64 + SingleReads int64 + SingleWrites int64 + BatchedFrames int64 + SingleFrames int64 + CGOCallsReduced int64 + OSThreadPinTime time.Duration // time.Duration is int64 internally + LastBatchTime time.Time +} + +type batchReadRequest struct { + buffer []byte + resultChan chan batchReadResult + timestamp time.Time +} + +type batchWriteRequest struct { + buffer []byte + resultChan chan batchWriteResult + timestamp time.Time +} + +type batchReadResult struct { + length int + err error +} + +type batchWriteResult struct { + written int + err error +} + +// NewBatchAudioProcessor creates a new batch audio processor +func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() + + processor := &BatchAudioProcessor{ + ctx: ctx, + cancel: cancel, + logger: &logger, + batchSize: batchSize, + batchDuration: batchDuration, + readQueue: make(chan batchReadRequest, batchSize*2), + writeQueue: make(chan batchWriteRequest, batchSize*2), + readBufPool: &sync.Pool{ + New: func() interface{} { + return make([]byte, 1500) // Max audio frame size + }, + }, + writeBufPool: &sync.Pool{ + New: func() interface{} { + return make([]byte, 4096) // Max write buffer size + }, + }, + } + + return processor +} + +// Start initializes and starts the batch processor +func (bap *BatchAudioProcessor) Start() error { + if !atomic.CompareAndSwapInt32(&bap.running, 0, 1) { + return nil // Already running + } + + // Initialize CGO resources once per processor lifecycle + if !atomic.CompareAndSwapInt32(&bap.initialized, 0, 1) { + return nil // Already initialized + } + + // Start batch processing goroutines + go bap.batchReadProcessor() + go bap.batchWriteProcessor() + + bap.logger.Info().Int("batch_size", bap.batchSize). + Dur("batch_duration", bap.batchDuration). + Msg("batch audio processor started") + + return nil +} + +// Stop cleanly shuts down the batch processor +func (bap *BatchAudioProcessor) Stop() { + if !atomic.CompareAndSwapInt32(&bap.running, 1, 0) { + return // Already stopped + } + + bap.cancel() + + // Wait for processing to complete + time.Sleep(bap.batchDuration + 10*time.Millisecond) + + bap.logger.Info().Msg("batch audio processor stopped") +} + +// BatchReadEncode performs batched audio read and encode operations +func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { + if atomic.LoadInt32(&bap.running) == 0 { + // Fallback to single operation if batch processor is not running + atomic.AddInt64(&bap.stats.SingleReads, 1) + atomic.AddInt64(&bap.stats.SingleFrames, 1) + return CGOAudioReadEncode(buffer) + } + + resultChan := make(chan batchReadResult, 1) + request := batchReadRequest{ + buffer: buffer, + resultChan: resultChan, + timestamp: time.Now(), + } + + select { + case bap.readQueue <- request: + // Successfully queued + case <-time.After(5 * time.Millisecond): + // Queue is full or blocked, fallback to single operation + atomic.AddInt64(&bap.stats.SingleReads, 1) + atomic.AddInt64(&bap.stats.SingleFrames, 1) + return CGOAudioReadEncode(buffer) + } + + // Wait for result + select { + case result := <-resultChan: + return result.length, result.err + case <-time.After(50 * time.Millisecond): + // Timeout, fallback to single operation + atomic.AddInt64(&bap.stats.SingleReads, 1) + atomic.AddInt64(&bap.stats.SingleFrames, 1) + return CGOAudioReadEncode(buffer) + } +} + +// BatchDecodeWrite performs batched audio decode and write operations +func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { + if atomic.LoadInt32(&bap.running) == 0 { + // Fallback to single operation if batch processor is not running + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.SingleFrames, 1) + return CGOAudioDecodeWrite(buffer) + } + + resultChan := make(chan batchWriteResult, 1) + request := batchWriteRequest{ + buffer: buffer, + resultChan: resultChan, + timestamp: time.Now(), + } + + select { + case bap.writeQueue <- request: + // Successfully queued + case <-time.After(5 * time.Millisecond): + // Queue is full or blocked, fallback to single operation + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.SingleFrames, 1) + return CGOAudioDecodeWrite(buffer) + } + + // Wait for result + select { + case result := <-resultChan: + return result.written, result.err + case <-time.After(50 * time.Millisecond): + // Timeout, fallback to single operation + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.SingleFrames, 1) + return CGOAudioDecodeWrite(buffer) + } +} + +// batchReadProcessor processes batched read operations +func (bap *BatchAudioProcessor) batchReadProcessor() { + defer bap.logger.Debug().Msg("batch read processor stopped") + + ticker := time.NewTicker(bap.batchDuration) + defer ticker.Stop() + + var batch []batchReadRequest + batch = make([]batchReadRequest, 0, bap.batchSize) + + for atomic.LoadInt32(&bap.running) == 1 { + select { + case <-bap.ctx.Done(): + return + + case req := <-bap.readQueue: + batch = append(batch, req) + if len(batch) >= bap.batchSize { + bap.processBatchRead(batch) + batch = batch[:0] // Clear slice but keep capacity + } + + case <-ticker.C: + if len(batch) > 0 { + bap.processBatchRead(batch) + batch = batch[:0] // Clear slice but keep capacity + } + } + } + + // Process any remaining requests + if len(batch) > 0 { + bap.processBatchRead(batch) + } +} + +// batchWriteProcessor processes batched write operations +func (bap *BatchAudioProcessor) batchWriteProcessor() { + defer bap.logger.Debug().Msg("batch write processor stopped") + + ticker := time.NewTicker(bap.batchDuration) + defer ticker.Stop() + + var batch []batchWriteRequest + batch = make([]batchWriteRequest, 0, bap.batchSize) + + for atomic.LoadInt32(&bap.running) == 1 { + select { + case <-bap.ctx.Done(): + return + + case req := <-bap.writeQueue: + batch = append(batch, req) + if len(batch) >= bap.batchSize { + bap.processBatchWrite(batch) + batch = batch[:0] // Clear slice but keep capacity + } + + case <-ticker.C: + if len(batch) > 0 { + bap.processBatchWrite(batch) + batch = batch[:0] // Clear slice but keep capacity + } + } + } + + // Process any remaining requests + if len(batch) > 0 { + bap.processBatchWrite(batch) + } +} + +// processBatchRead processes a batch of read requests efficiently +func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { + if len(batch) == 0 { + return + } + + // Pin to OS thread for the entire batch to minimize thread switching overhead + start := time.Now() + if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { + runtime.LockOSThread() + defer func() { + runtime.UnlockOSThread() + atomic.StoreInt32(&bap.threadPinned, 0) + bap.stats.OSThreadPinTime += time.Since(start) + }() + } + + batchSize := len(batch) + atomic.AddInt64(&bap.stats.BatchedReads, 1) + atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) + if batchSize > 1 { + atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) + } + + // Process each request in the batch + for _, req := range batch { + length, err := CGOAudioReadEncode(req.buffer) + result := batchReadResult{ + length: length, + err: err, + } + + // Send result back (non-blocking) + select { + case req.resultChan <- result: + default: + // Requestor timed out, drop result + } + } + + bap.stats.LastBatchTime = time.Now() +} + +// processBatchWrite processes a batch of write requests efficiently +func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { + if len(batch) == 0 { + return + } + + // Pin to OS thread for the entire batch to minimize thread switching overhead + start := time.Now() + if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { + runtime.LockOSThread() + defer func() { + runtime.UnlockOSThread() + atomic.StoreInt32(&bap.threadPinned, 0) + bap.stats.OSThreadPinTime += time.Since(start) + }() + } + + batchSize := len(batch) + atomic.AddInt64(&bap.stats.BatchedWrites, 1) + atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) + if batchSize > 1 { + atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) + } + + // Process each request in the batch + for _, req := range batch { + written, err := CGOAudioDecodeWrite(req.buffer) + result := batchWriteResult{ + written: written, + err: err, + } + + // Send result back (non-blocking) + select { + case req.resultChan <- result: + default: + // Requestor timed out, drop result + } + } + + bap.stats.LastBatchTime = time.Now() +} + +// GetStats returns current batch processor statistics +func (bap *BatchAudioProcessor) GetStats() BatchAudioStats { + return BatchAudioStats{ + BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads), + BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites), + SingleReads: atomic.LoadInt64(&bap.stats.SingleReads), + SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites), + BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames), + SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames), + CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced), + OSThreadPinTime: bap.stats.OSThreadPinTime, + LastBatchTime: bap.stats.LastBatchTime, + } +} + +// IsRunning returns whether the batch processor is running +func (bap *BatchAudioProcessor) IsRunning() bool { + return atomic.LoadInt32(&bap.running) == 1 +} + +// Global batch processor instance +var ( + globalBatchProcessor unsafe.Pointer // *BatchAudioProcessor + batchProcessorInitialized int32 +) + +// GetBatchAudioProcessor returns the global batch processor instance +func GetBatchAudioProcessor() *BatchAudioProcessor { + ptr := atomic.LoadPointer(&globalBatchProcessor) + if ptr != nil { + return (*BatchAudioProcessor)(ptr) + } + + // Initialize on first use + if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) { + processor := NewBatchAudioProcessor(4, 5*time.Millisecond) // 4 frames per batch, 5ms timeout + atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor)) + return processor + } + + // Another goroutine initialized it, try again + ptr = atomic.LoadPointer(&globalBatchProcessor) + if ptr != nil { + return (*BatchAudioProcessor)(ptr) + } + + // Fallback: create a new processor (should rarely happen) + return NewBatchAudioProcessor(4, 5*time.Millisecond) +} + +// EnableBatchAudioProcessing enables the global batch processor +func EnableBatchAudioProcessing() error { + processor := GetBatchAudioProcessor() + return processor.Start() +} + +// DisableBatchAudioProcessing disables the global batch processor +func DisableBatchAudioProcessing() { + ptr := atomic.LoadPointer(&globalBatchProcessor) + if ptr != nil { + processor := (*BatchAudioProcessor)(ptr) + processor.Stop() + } +} + +// BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode +func BatchCGOAudioReadEncode(buffer []byte) (int, error) { + processor := GetBatchAudioProcessor() + if processor != nil && processor.IsRunning() { + return processor.BatchReadEncode(buffer) + } + return CGOAudioReadEncode(buffer) +} + +// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite +func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) { + processor := GetBatchAudioProcessor() + if processor != nil && processor.IsRunning() { + return processor.BatchDecodeWrite(buffer) + } + return CGOAudioDecodeWrite(buffer) +} \ No newline at end of file diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go new file mode 100644 index 0000000..0591111 --- /dev/null +++ b/internal/audio/buffer_pool.go @@ -0,0 +1,64 @@ +package audio + +import ( + "sync" +) + +// AudioBufferPool manages reusable audio buffers to reduce allocations +type AudioBufferPool struct { + pool sync.Pool +} + +// NewAudioBufferPool creates a new buffer pool for audio frames +func NewAudioBufferPool(bufferSize int) *AudioBufferPool { + return &AudioBufferPool{ + pool: sync.Pool{ + New: func() interface{} { + // Pre-allocate buffer with specified size + return make([]byte, bufferSize) + }, + }, + } +} + +// Get retrieves a buffer from the pool +func (p *AudioBufferPool) Get() []byte { + return p.pool.Get().([]byte) +} + +// Put returns a buffer to the pool +func (p *AudioBufferPool) Put(buf []byte) { + // Reset length but keep capacity for reuse + if cap(buf) >= 1500 { // Only pool buffers of reasonable size + p.pool.Put(buf[:0]) + } +} + +// Global buffer pools for different audio operations +var ( + // Pool for 1500-byte audio frame buffers (Opus max frame size) + audioFramePool = NewAudioBufferPool(1500) + + // Pool for smaller control buffers + audioControlPool = NewAudioBufferPool(64) +) + +// GetAudioFrameBuffer gets a reusable buffer for audio frames +func GetAudioFrameBuffer() []byte { + return audioFramePool.Get() +} + +// PutAudioFrameBuffer returns a buffer to the frame pool +func PutAudioFrameBuffer(buf []byte) { + audioFramePool.Put(buf) +} + +// GetAudioControlBuffer gets a reusable buffer for control data +func GetAudioControlBuffer() []byte { + return audioControlPool.Get() +} + +// PutAudioControlBuffer returns a buffer to the control pool +func PutAudioControlBuffer(buf []byte) { + audioControlPool.Put(buf) +} \ No newline at end of file diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 5c0866e..013ad56 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -1,4 +1,4 @@ -//go:build !nolint +//go:build cgo package audio @@ -385,11 +385,23 @@ void jetkvm_audio_close() { */ import "C" -// Go wrappers for initializing, starting, stopping, and controlling audio +// Optimized Go wrappers with reduced overhead +var ( + errAudioInitFailed = errors.New("failed to init ALSA/Opus") + errBufferTooSmall = errors.New("buffer too small") + errAudioReadEncode = errors.New("audio read/encode error") + errAudioDecodeWrite = errors.New("audio decode/write error") + errAudioPlaybackInit = errors.New("failed to init ALSA playback/Opus decoder") + errEmptyBuffer = errors.New("empty buffer") + errNilBuffer = errors.New("nil buffer") + errBufferTooLarge = errors.New("buffer too large") + errInvalidBufferPtr = errors.New("invalid buffer pointer") +) + func cgoAudioInit() error { ret := C.jetkvm_audio_init() if ret != 0 { - return errors.New("failed to init ALSA/Opus") + return errAudioInitFailed } return nil } @@ -398,18 +410,19 @@ func cgoAudioClose() { C.jetkvm_audio_close() } -// Reads and encodes one frame, returns encoded bytes or error +// Optimized read and encode with pre-allocated error objects and reduced checks func cgoAudioReadEncode(buf []byte) (int, error) { - if len(buf) < 1500 { - return 0, errors.New("buffer too small") + // Fast path: check minimum buffer size (reduced from 1500 to 1276 for 10ms frames) + if len(buf) < 1276 { + return 0, errBufferTooSmall } + n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0])) if n < 0 { - return 0, errors.New("audio read/encode error") + return 0, errAudioReadEncode } if n == 0 { - // No data available - this is not an error, just no audio frame - return 0, nil + return 0, nil // No data available } return int(n), nil } diff --git a/internal/audio/cgo_audio_stub.go b/internal/audio/cgo_audio_stub.go index 193ed57..4ddb24d 100644 --- a/internal/audio/cgo_audio_stub.go +++ b/internal/audio/cgo_audio_stub.go @@ -1,4 +1,4 @@ -//go:build nolint +//go:build !cgo package audio diff --git a/internal/audio/mic_contention.go b/internal/audio/mic_contention.go new file mode 100644 index 0000000..6c35393 --- /dev/null +++ b/internal/audio/mic_contention.go @@ -0,0 +1,158 @@ +package audio + +import ( + "sync/atomic" + "time" + "unsafe" +) + +// MicrophoneContentionManager provides optimized microphone operation locking +// with reduced contention using atomic operations and conditional locking +type MicrophoneContentionManager struct { + // Atomic fields (must be 64-bit aligned on 32-bit systems) + lastOpNano int64 // Unix nanoseconds of last operation + cooldownNanos int64 // Cooldown duration in nanoseconds + operationID int64 // Incremental operation ID for tracking + + // Lock-free state flags (using atomic.Pointer for lock-free updates) + lockPtr unsafe.Pointer // *sync.Mutex - conditionally allocated +} + +// NewMicrophoneContentionManager creates a new microphone contention manager +func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentionManager { + return &MicrophoneContentionManager{ + cooldownNanos: int64(cooldown), + } +} + +// OperationResult represents the result of attempting a microphone operation +type OperationResult struct { + Allowed bool + RemainingCooldown time.Duration + OperationID int64 +} + +// TryOperation attempts to perform a microphone operation with optimized contention handling +func (mcm *MicrophoneContentionManager) TryOperation() OperationResult { + now := time.Now().UnixNano() + cooldown := atomic.LoadInt64(&mcm.cooldownNanos) + + // Fast path: check if we're clearly outside cooldown period using atomic read + lastOp := atomic.LoadInt64(&mcm.lastOpNano) + elapsed := now - lastOp + + if elapsed >= cooldown { + // Attempt atomic update without locking + if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) { + opID := atomic.AddInt64(&mcm.operationID, 1) + return OperationResult{ + Allowed: true, + RemainingCooldown: 0, + OperationID: opID, + } + } + } + + // Slow path: potential contention, check remaining cooldown + currentLastOp := atomic.LoadInt64(&mcm.lastOpNano) + currentElapsed := now - currentLastOp + + if currentElapsed >= cooldown { + // Race condition: another operation might have updated lastOpNano + // Try once more with CAS + if atomic.CompareAndSwapInt64(&mcm.lastOpNano, currentLastOp, now) { + opID := atomic.AddInt64(&mcm.operationID, 1) + return OperationResult{ + Allowed: true, + RemainingCooldown: 0, + OperationID: opID, + } + } + // If CAS failed, fall through to cooldown calculation + currentLastOp = atomic.LoadInt64(&mcm.lastOpNano) + currentElapsed = now - currentLastOp + } + + remaining := time.Duration(cooldown - currentElapsed) + if remaining < 0 { + remaining = 0 + } + + return OperationResult{ + Allowed: false, + RemainingCooldown: remaining, + OperationID: atomic.LoadInt64(&mcm.operationID), + } +} + +// SetCooldown updates the cooldown duration atomically +func (mcm *MicrophoneContentionManager) SetCooldown(cooldown time.Duration) { + atomic.StoreInt64(&mcm.cooldownNanos, int64(cooldown)) +} + +// GetCooldown returns the current cooldown duration +func (mcm *MicrophoneContentionManager) GetCooldown() time.Duration { + return time.Duration(atomic.LoadInt64(&mcm.cooldownNanos)) +} + +// GetLastOperationTime returns the time of the last operation +func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time { + nanos := atomic.LoadInt64(&mcm.lastOpNano) + if nanos == 0 { + return time.Time{} + } + return time.Unix(0, nanos) +} + +// GetOperationCount returns the total number of successful operations +func (mcm *MicrophoneContentionManager) GetOperationCount() int64 { + return atomic.LoadInt64(&mcm.operationID) +} + +// Reset resets the contention manager state +func (mcm *MicrophoneContentionManager) Reset() { + atomic.StoreInt64(&mcm.lastOpNano, 0) + atomic.StoreInt64(&mcm.operationID, 0) +} + +// Global instance for microphone contention management +var ( + globalMicContentionManager unsafe.Pointer // *MicrophoneContentionManager + micContentionInitialized int32 +) + +// GetMicrophoneContentionManager returns the global microphone contention manager +func GetMicrophoneContentionManager() *MicrophoneContentionManager { + ptr := atomic.LoadPointer(&globalMicContentionManager) + if ptr != nil { + return (*MicrophoneContentionManager)(ptr) + } + + // Initialize on first use + if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) { + manager := NewMicrophoneContentionManager(200 * time.Millisecond) + atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager)) + return manager + } + + // Another goroutine initialized it, try again + ptr = atomic.LoadPointer(&globalMicContentionManager) + if ptr != nil { + return (*MicrophoneContentionManager)(ptr) + } + + // Fallback: create a new manager (should rarely happen) + return NewMicrophoneContentionManager(200 * time.Millisecond) +} + +// TryMicrophoneOperation provides a convenient global function for microphone operations +func TryMicrophoneOperation() OperationResult { + manager := GetMicrophoneContentionManager() + return manager.TryOperation() +} + +// SetMicrophoneCooldown updates the global microphone cooldown +func SetMicrophoneCooldown(cooldown time.Duration) { + manager := GetMicrophoneContentionManager() + manager.SetCooldown(cooldown) +} \ No newline at end of file diff --git a/internal/audio/nonblocking_api.go b/internal/audio/nonblocking_api.go index 33ae260..4e67df3 100644 --- a/internal/audio/nonblocking_api.go +++ b/internal/audio/nonblocking_api.go @@ -1,96 +1,115 @@ package audio import ( - "sync" + "sync/atomic" + "unsafe" ) var ( - globalNonBlockingManager *NonBlockingAudioManager - managerMutex sync.Mutex + // Use unsafe.Pointer for atomic operations instead of mutex + globalNonBlockingManager unsafe.Pointer // *NonBlockingAudioManager ) +// loadManager atomically loads the global manager +func loadManager() *NonBlockingAudioManager { + ptr := atomic.LoadPointer(&globalNonBlockingManager) + if ptr == nil { + return nil + } + return (*NonBlockingAudioManager)(ptr) +} + +// storeManager atomically stores the global manager +func storeManager(manager *NonBlockingAudioManager) { + atomic.StorePointer(&globalNonBlockingManager, unsafe.Pointer(manager)) +} + +// compareAndSwapManager atomically compares and swaps the global manager +func compareAndSwapManager(old, new *NonBlockingAudioManager) bool { + return atomic.CompareAndSwapPointer(&globalNonBlockingManager, + unsafe.Pointer(old), unsafe.Pointer(new)) +} + // StartNonBlockingAudioStreaming starts the non-blocking audio streaming system func StartNonBlockingAudioStreaming(send func([]byte)) error { - managerMutex.Lock() - defer managerMutex.Unlock() - - if globalNonBlockingManager != nil && globalNonBlockingManager.IsOutputRunning() { + manager := loadManager() + if manager != nil && manager.IsOutputRunning() { return nil // Already running, this is not an error } - if globalNonBlockingManager == nil { - globalNonBlockingManager = NewNonBlockingAudioManager() + if manager == nil { + newManager := NewNonBlockingAudioManager() + if !compareAndSwapManager(nil, newManager) { + // Another goroutine created manager, use it + manager = loadManager() + } else { + manager = newManager + } } - return globalNonBlockingManager.StartAudioOutput(send) + return manager.StartAudioOutput(send) } // StartNonBlockingAudioInput starts the non-blocking audio input system func StartNonBlockingAudioInput(receiveChan <-chan []byte) error { - managerMutex.Lock() - defer managerMutex.Unlock() - - if globalNonBlockingManager == nil { - globalNonBlockingManager = NewNonBlockingAudioManager() + manager := loadManager() + if manager == nil { + newManager := NewNonBlockingAudioManager() + if !compareAndSwapManager(nil, newManager) { + // Another goroutine created manager, use it + manager = loadManager() + } else { + manager = newManager + } } // Check if input is already running to avoid unnecessary operations - if globalNonBlockingManager.IsInputRunning() { + if manager.IsInputRunning() { return nil // Already running, this is not an error } - return globalNonBlockingManager.StartAudioInput(receiveChan) + return manager.StartAudioInput(receiveChan) } // StopNonBlockingAudioStreaming stops the non-blocking audio streaming system func StopNonBlockingAudioStreaming() { - managerMutex.Lock() - defer managerMutex.Unlock() - - if globalNonBlockingManager != nil { - globalNonBlockingManager.Stop() - globalNonBlockingManager = nil + manager := loadManager() + if manager != nil { + manager.Stop() + storeManager(nil) } } // StopNonBlockingAudioInput stops only the audio input without affecting output func StopNonBlockingAudioInput() { - managerMutex.Lock() - defer managerMutex.Unlock() - - if globalNonBlockingManager != nil && globalNonBlockingManager.IsInputRunning() { - globalNonBlockingManager.StopAudioInput() + manager := loadManager() + if manager != nil && manager.IsInputRunning() { + manager.StopAudioInput() // If both input and output are stopped, recreate manager to ensure clean state - if !globalNonBlockingManager.IsRunning() { - globalNonBlockingManager = nil + if !manager.IsRunning() { + storeManager(nil) } } } // GetNonBlockingAudioStats returns statistics from the non-blocking audio system func GetNonBlockingAudioStats() NonBlockingAudioStats { - managerMutex.Lock() - defer managerMutex.Unlock() - - if globalNonBlockingManager != nil { - return globalNonBlockingManager.GetStats() + manager := loadManager() + if manager != nil { + return manager.GetStats() } return NonBlockingAudioStats{} } // IsNonBlockingAudioRunning returns true if the non-blocking audio system is running func IsNonBlockingAudioRunning() bool { - managerMutex.Lock() - defer managerMutex.Unlock() - - return globalNonBlockingManager != nil && globalNonBlockingManager.IsRunning() + manager := loadManager() + return manager != nil && manager.IsRunning() } // IsNonBlockingAudioInputRunning returns true if the non-blocking audio input is running func IsNonBlockingAudioInputRunning() bool { - managerMutex.Lock() - defer managerMutex.Unlock() - - return globalNonBlockingManager != nil && globalNonBlockingManager.IsInputRunning() + manager := loadManager() + return manager != nil && manager.IsInputRunning() } diff --git a/internal/audio/nonblocking_audio.go b/internal/audio/nonblocking_audio.go index 34d25fb..5787a8a 100644 --- a/internal/audio/nonblocking_audio.go +++ b/internal/audio/nonblocking_audio.go @@ -3,7 +3,7 @@ package audio import ( "context" "errors" - "runtime" + // "runtime" // removed: no longer directly pinning OS thread here; batching handles it "sync" "sync/atomic" "time" @@ -98,6 +98,9 @@ func (nam *NonBlockingAudioManager) StartAudioOutput(sendFunc func([]byte)) erro nam.outputSendFunc = sendFunc + // Enable batch audio processing for performance + EnableBatchAudioProcessing() + // Start the blocking worker thread nam.wg.Add(1) go nam.outputWorkerThread() @@ -106,7 +109,7 @@ func (nam *NonBlockingAudioManager) StartAudioOutput(sendFunc func([]byte)) erro nam.wg.Add(1) go nam.outputCoordinatorThread() - nam.logger.Info().Msg("non-blocking audio output started") + nam.logger.Info().Msg("non-blocking audio output started with batch processing") return nil } @@ -118,6 +121,9 @@ func (nam *NonBlockingAudioManager) StartAudioInput(receiveChan <-chan []byte) e nam.inputReceiveChan = receiveChan + // Enable batch audio processing for performance + EnableBatchAudioProcessing() + // Start the blocking worker thread nam.wg.Add(1) go nam.inputWorkerThread() @@ -126,16 +132,12 @@ func (nam *NonBlockingAudioManager) StartAudioInput(receiveChan <-chan []byte) e nam.wg.Add(1) go nam.inputCoordinatorThread() - nam.logger.Info().Msg("non-blocking audio input started") + nam.logger.Info().Msg("non-blocking audio input started with batch processing") return nil } // outputWorkerThread handles all blocking audio output operations func (nam *NonBlockingAudioManager) outputWorkerThread() { - // Lock to OS thread to isolate blocking CGO operations - runtime.LockOSThread() - defer runtime.UnlockOSThread() - defer nam.wg.Done() defer atomic.StoreInt32(&nam.outputWorkerRunning, 0) @@ -149,7 +151,9 @@ func (nam *NonBlockingAudioManager) outputWorkerThread() { } defer CGOAudioClose() - buf := make([]byte, 1500) + // Use buffer pool to avoid allocations + buf := GetAudioFrameBuffer() + defer PutAudioFrameBuffer(buf) for { select { @@ -160,17 +164,18 @@ func (nam *NonBlockingAudioManager) outputWorkerThread() { case workItem := <-nam.outputWorkChan: switch workItem.workType { case audioWorkReadEncode: - // Perform blocking audio read/encode operation - n, err := CGOAudioReadEncode(buf) - result := audioResult{ + n, err := BatchCGOAudioReadEncode(buf) + + result := audioResult{ success: err == nil, length: n, err: err, } if err == nil && n > 0 { - // Copy data to avoid race conditions - result.data = make([]byte, n) - copy(result.data, buf[:n]) + // Get buffer from pool and copy data + resultBuf := GetAudioFrameBuffer() + copy(resultBuf[:n], buf[:n]) + result.data = resultBuf[:n] } // Send result back (non-blocking) @@ -180,6 +185,9 @@ func (nam *NonBlockingAudioManager) outputWorkerThread() { return default: // Drop result if coordinator is not ready + if result.data != nil { + PutAudioFrameBuffer(result.data) + } atomic.AddInt64(&nam.stats.OutputFramesDropped, 1) } @@ -243,6 +251,8 @@ func (nam *NonBlockingAudioManager) outputCoordinatorThread() { atomic.AddInt64(&nam.stats.OutputFramesProcessed, 1) RecordFrameReceived(result.length) } + // Return buffer to pool after use + PutAudioFrameBuffer(result.data) } else if result.success && result.length == 0 { // No data available - this is normal, not an error // Just continue without logging or counting as error @@ -252,6 +262,10 @@ func (nam *NonBlockingAudioManager) outputCoordinatorThread() { if result.err != nil { nam.logger.Warn().Err(result.err).Msg("audio output worker error") } + // Clean up buffer if present + if result.data != nil { + PutAudioFrameBuffer(result.data) + } RecordFrameDropped() } } @@ -269,10 +283,6 @@ func (nam *NonBlockingAudioManager) outputCoordinatorThread() { // inputWorkerThread handles all blocking audio input operations func (nam *NonBlockingAudioManager) inputWorkerThread() { - // Lock to OS thread to isolate blocking CGO operations - runtime.LockOSThread() - defer runtime.UnlockOSThread() - defer nam.wg.Done() // Cleanup CGO resources properly to avoid double-close scenarios // The outputWorkerThread's CGOAudioClose() will handle all cleanup @@ -362,7 +372,8 @@ func (nam *NonBlockingAudioManager) inputWorkerThread() { return } - n, err := CGOAudioDecodeWrite(workItem.data) + n, err := BatchCGOAudioDecodeWrite(workItem.data) + result = audioResult{ success: err == nil, length: n, @@ -479,6 +490,9 @@ func (nam *NonBlockingAudioManager) Stop() { // Wait for all goroutines to finish nam.wg.Wait() + // Disable batch processing to free resources + DisableBatchAudioProcessing() + nam.logger.Info().Msg("non-blocking audio manager stopped") } diff --git a/web.go b/web.go index eb1eab5..4bed6b5 100644 --- a/web.go +++ b/web.go @@ -283,28 +283,17 @@ func setupRouter() *gin.Engine { return } - // Server-side cooldown to prevent rapid start/stop thrashing - { - cs := currentSession - cs.micOpMu.Lock() - now := time.Now() - if cs.micCooldown == 0 { - cs.micCooldown = 200 * time.Millisecond - } - since := now.Sub(cs.lastMicOp) - if since < cs.micCooldown { - remaining := cs.micCooldown - since - running := cs.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() - cs.micOpMu.Unlock() - c.JSON(200, gin.H{ - "status": "cooldown", - "running": running, - "cooldown_ms_remaining": remaining.Milliseconds(), - }) - return - } - cs.lastMicOp = now - cs.micOpMu.Unlock() + // Optimized server-side cooldown using atomic operations + opResult := audio.TryMicrophoneOperation() + if !opResult.Allowed { + running := currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() + c.JSON(200, gin.H{ + "status": "cooldown", + "running": running, + "cooldown_ms_remaining": opResult.RemainingCooldown.Milliseconds(), + "operation_id": opResult.OperationID, + }) + return } // Check if already running before attempting to start @@ -356,28 +345,17 @@ func setupRouter() *gin.Engine { return } - // Server-side cooldown to prevent rapid start/stop thrashing - { - cs := currentSession - cs.micOpMu.Lock() - now := time.Now() - if cs.micCooldown == 0 { - cs.micCooldown = 200 * time.Millisecond - } - since := now.Sub(cs.lastMicOp) - if since < cs.micCooldown { - remaining := cs.micCooldown - since - running := cs.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() - cs.micOpMu.Unlock() - c.JSON(200, gin.H{ - "status": "cooldown", - "running": running, - "cooldown_ms_remaining": remaining.Milliseconds(), - }) - return - } - cs.lastMicOp = now - cs.micOpMu.Unlock() + // Optimized server-side cooldown using atomic operations + opResult := audio.TryMicrophoneOperation() + if !opResult.Allowed { + running := currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() + c.JSON(200, gin.H{ + "status": "cooldown", + "running": running, + "cooldown_ms_remaining": opResult.RemainingCooldown.Milliseconds(), + "operation_id": opResult.OperationID, + }) + return } // Check if already stopped before attempting to stop