diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 2912049a..470b9667 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -1,5 +1,5 @@ -//go:build cgo -// +build cgo +//go:build cgo && !arm +// +build cgo,!arm // Package audio provides real-time audio processing for JetKVM with low-latency streaming. // diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index a7b7537f..9e97dfe9 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -1,4 +1,4 @@ -//go:build cgo +//go:build cgo && !arm package audio @@ -28,23 +28,31 @@ type BatchAudioProcessor struct { // Batch queues and state (atomic for lock-free access) readQueue chan batchReadRequest + writeQueue chan batchWriteRequest initialized int32 running int32 threadPinned int32 + writePinned int32 // Buffers (pre-allocated to avoid allocation overhead) - readBufPool *sync.Pool + readBufPool *sync.Pool + writeBufPool *sync.Pool } type BatchAudioStats struct { // int64 fields MUST be first for ARM32 alignment BatchedReads int64 SingleReads int64 + BatchedWrites int64 + SingleWrites int64 BatchedFrames int64 SingleFrames int64 + WriteFrames int64 CGOCallsReduced int64 OSThreadPinTime time.Duration // time.Duration is int64 internally + WriteThreadTime time.Duration // time.Duration is int64 internally LastBatchTime time.Time + LastWriteTime time.Time } type batchReadRequest struct { @@ -58,18 +66,33 @@ type batchReadResult struct { err error } +type batchWriteRequest struct { + buffer []byte + resultChan chan batchWriteResult + timestamp time.Time +} + +type batchWriteResult struct { + length int + err error +} + // NewBatchAudioProcessor creates a new batch audio processor func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { + // Get cached config to avoid GetConfig() calls + cache := GetCachedConfig() + cache.Update() + // Validate input parameters if err := ValidateBufferSize(batchSize); err != nil { logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() logger.Warn().Err(err).Int("batchSize", batchSize).Msg("invalid batch size, using default") - batchSize = GetConfig().BatchProcessorFramesPerBatch + batchSize = cache.BatchProcessorFramesPerBatch } if batchDuration <= 0 { logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() logger.Warn().Dur("batchDuration", batchDuration).Msg("invalid batch duration, using default") - batchDuration = GetConfig().BatchProcessingDelay + batchDuration = cache.BatchProcessingDelay } ctx, cancel := context.WithCancel(context.Background()) @@ -82,11 +105,17 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu batchSize: batchSize, batchDuration: batchDuration, readQueue: make(chan batchReadRequest, batchSize*2), + writeQueue: make(chan batchWriteRequest, batchSize*2), readBufPool: &sync.Pool{ New: func() interface{} { return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size }, }, + writeBufPool: &sync.Pool{ + New: func() interface{} { + return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size + }, + }, } return processor @@ -105,6 +134,7 @@ func (bap *BatchAudioProcessor) Start() error { // Start batch processing goroutines go bap.batchReadProcessor() + go bap.batchWriteProcessor() bap.logger.Info().Int("batch_size", bap.batchSize). Dur("batch_duration", bap.batchDuration). @@ -129,13 +159,17 @@ func (bap *BatchAudioProcessor) Stop() { // BatchReadEncode performs batched audio read and encode operations func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { + // Get cached config to avoid GetConfig() calls in hot path + cache := GetCachedConfig() + cache.Update() + // Validate buffer before processing if err := ValidateBufferSize(len(buffer)); err != nil { bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") return 0, err } - if atomic.LoadInt32(&bap.running) == 0 { + if !bap.IsRunning() { // Fallback to single operation if batch processor is not running atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1) @@ -149,21 +183,22 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { timestamp: time.Now(), } + // Try to queue the request with non-blocking send select { case bap.readQueue <- request: // Successfully queued - case <-time.After(GetConfig().ShortTimeout): - // Queue is full or blocked, fallback to single operation + default: + // Queue is full, fallback to single operation atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1) return CGOAudioReadEncode(buffer) } - // Wait for result + // Wait for result with timeout select { case result := <-resultChan: return result.length, result.err - case <-time.After(GetConfig().MediumTimeout): + case <-time.After(cache.BatchProcessingTimeout): // Timeout, fallback to single operation atomic.AddInt64(&bap.stats.SingleReads, 1) atomic.AddInt64(&bap.stats.SingleFrames, 1) @@ -171,6 +206,54 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { } } +// BatchDecodeWrite performs batched audio decode and write operations +func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { + // Get cached config to avoid GetConfig() calls in hot path + cache := GetCachedConfig() + cache.Update() + + // Validate buffer before processing + if err := ValidateBufferSize(len(buffer)); err != nil { + bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") + return 0, err + } + + if !bap.IsRunning() { + // Fallback to single operation if batch processor is not running + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.WriteFrames, 1) + return CGOAudioDecodeWrite(buffer) + } + + resultChan := make(chan batchWriteResult, 1) + request := batchWriteRequest{ + buffer: buffer, + resultChan: resultChan, + timestamp: time.Now(), + } + + // Try to queue the request with non-blocking send + select { + case bap.writeQueue <- request: + // Successfully queued + default: + // Queue is full, fall back to single operation + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.WriteFrames, 1) + return CGOAudioDecodeWrite(buffer) + } + + // Wait for result with timeout + select { + case result := <-resultChan: + return result.length, result.err + case <-time.After(cache.BatchProcessingTimeout): + atomic.AddInt64(&bap.stats.SingleWrites, 1) + atomic.AddInt64(&bap.stats.WriteFrames, 1) + return CGOAudioDecodeWrite(buffer) + } +} + // batchReadProcessor processes batched read operations func (bap *BatchAudioProcessor) batchReadProcessor() { defer bap.logger.Debug().Msg("batch read processor stopped") @@ -207,15 +290,54 @@ func (bap *BatchAudioProcessor) batchReadProcessor() { } } +// batchWriteProcessor processes batched write operations +func (bap *BatchAudioProcessor) batchWriteProcessor() { + defer bap.logger.Debug().Msg("batch write processor stopped") + + ticker := time.NewTicker(bap.batchDuration) + defer ticker.Stop() + + var batch []batchWriteRequest + batch = make([]batchWriteRequest, 0, bap.batchSize) + + for atomic.LoadInt32(&bap.running) == 1 { + select { + case <-bap.ctx.Done(): + return + + case req := <-bap.writeQueue: + batch = append(batch, req) + if len(batch) >= bap.batchSize { + bap.processBatchWrite(batch) + batch = batch[:0] // Clear slice but keep capacity + } + + case <-ticker.C: + if len(batch) > 0 { + bap.processBatchWrite(batch) + batch = batch[:0] // Clear slice but keep capacity + } + } + } + + // Process any remaining requests + if len(batch) > 0 { + bap.processBatchWrite(batch) + } +} + // processBatchRead processes a batch of read requests efficiently func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { if len(batch) == 0 { return } + // Get cached config to avoid GetConfig() calls in hot path + cache := GetCachedConfig() + // Only pin to OS thread for large batches to reduce thread contention start := time.Now() - shouldPinThread := len(batch) >= GetConfig().MinBatchSizeForThreadPinning + shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning // Track if we pinned the thread in this call threadWasPinned := false @@ -268,16 +390,85 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { bap.stats.LastBatchTime = time.Now() } +// processBatchWrite processes a batch of write requests efficiently +func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { + if len(batch) == 0 { + return + } + + // Get cached config to avoid GetConfig() calls in hot path + cache := GetCachedConfig() + + // Only pin to OS thread for large batches to reduce thread contention + start := time.Now() + shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning + + // Track if we pinned the thread in this call + threadWasPinned := false + + if shouldPinThread && atomic.CompareAndSwapInt32(&bap.writePinned, 0, 1) { + threadWasPinned = true + runtime.LockOSThread() + + // Set high priority for batch audio processing + if err := SetAudioThreadPriority(); err != nil { + bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority") + } + } + + batchSize := len(batch) + atomic.AddInt64(&bap.stats.BatchedWrites, 1) + atomic.AddInt64(&bap.stats.WriteFrames, int64(batchSize)) + if batchSize > 1 { + atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) + } + + // Add deferred function to release thread lock if we pinned it + if threadWasPinned { + defer func() { + if err := ResetThreadPriority(); err != nil { + bap.logger.Warn().Err(err).Msg("failed to reset thread priority") + } + runtime.UnlockOSThread() + atomic.StoreInt32(&bap.writePinned, 0) + bap.stats.WriteThreadTime += time.Since(start) + }() + } + + // Process each request in the batch + for _, req := range batch { + length, err := CGOAudioDecodeWrite(req.buffer) + result := batchWriteResult{ + length: length, + err: err, + } + + // Send result back (non-blocking) + select { + case req.resultChan <- result: + default: + // Requestor timed out, drop result + } + } + + bap.stats.LastWriteTime = time.Now() +} + // GetStats returns current batch processor statistics func (bap *BatchAudioProcessor) GetStats() BatchAudioStats { return BatchAudioStats{ BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads), SingleReads: atomic.LoadInt64(&bap.stats.SingleReads), + BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites), + SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites), BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames), SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames), + WriteFrames: atomic.LoadInt64(&bap.stats.WriteFrames), CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced), OSThreadPinTime: bap.stats.OSThreadPinTime, + WriteThreadTime: bap.stats.WriteThreadTime, LastBatchTime: bap.stats.LastBatchTime, + LastWriteTime: bap.stats.LastWriteTime, } } @@ -301,8 +492,11 @@ func GetBatchAudioProcessor() *BatchAudioProcessor { // Initialize on first use if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) { - config := GetConfig() - processor := NewBatchAudioProcessor(config.BatchProcessorFramesPerBatch, config.BatchProcessorTimeout) + // Get cached config to avoid GetConfig() calls + cache := GetCachedConfig() + cache.Update() + + processor := NewBatchAudioProcessor(cache.BatchProcessorFramesPerBatch, cache.BatchProcessorTimeout) atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor)) return processor } @@ -336,8 +530,21 @@ func DisableBatchAudioProcessing() { // BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode func BatchCGOAudioReadEncode(buffer []byte) (int, error) { processor := GetBatchAudioProcessor() - if processor != nil && processor.IsRunning() { - return processor.BatchReadEncode(buffer) + if processor == nil || !processor.IsRunning() { + // Fall back to non-batched version if processor is not running + return CGOAudioReadEncode(buffer) } - return CGOAudioReadEncode(buffer) + + return processor.BatchReadEncode(buffer) +} + +// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite +func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) { + processor := GetBatchAudioProcessor() + if processor == nil || !processor.IsRunning() { + // Fall back to non-batched version if processor is not running + return CGOAudioDecodeWrite(buffer) + } + + return processor.BatchDecodeWrite(buffer) } diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index acf74fc7..ad0430d6 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -1,5 +1,5 @@ -//go:build cgo -// +build cgo +//go:build cgo && !arm +// +build cgo,!arm package audio diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 0f33c7de..82f73055 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -1,4 +1,4 @@ -//go:build cgo +//go:build cgo && !arm package audio @@ -735,6 +735,13 @@ type AudioConfigCache struct { minOpusBitrate atomic.Int32 maxOpusBitrate atomic.Int32 + // Batch processing related values + BatchProcessingTimeout time.Duration + BatchProcessorFramesPerBatch int + BatchProcessorTimeout time.Duration + BatchProcessingDelay time.Duration + MinBatchSizeForThreadPinning int + // Mutex for updating the cache mutex sync.RWMutex lastUpdate time.Time @@ -799,6 +806,13 @@ func (c *AudioConfigCache) Update() { c.minOpusBitrate.Store(int32(config.MinOpusBitrate)) c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate)) + // Update batch processing related values + c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing + c.BatchProcessorFramesPerBatch = config.BatchProcessorFramesPerBatch + c.BatchProcessorTimeout = config.BatchProcessorTimeout + c.BatchProcessingDelay = config.BatchProcessingDelay + c.MinBatchSizeForThreadPinning = config.MinBatchSizeForThreadPinning + // Pre-allocate common errors c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer) c.bufferTooLargeDecodeWrite = newBufferTooLargeError(config.MaxDecodeWriteBuffer+1, config.MaxDecodeWriteBuffer) diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 88a57e36..ac309802 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -1,5 +1,5 @@ -//go:build cgo -// +build cgo +//go:build cgo && !arm +// +build cgo,!arm package audio diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go index 2d96a7c0..5b422207 100644 --- a/internal/audio/supervisor.go +++ b/internal/audio/supervisor.go @@ -1,5 +1,5 @@ -//go:build cgo -// +build cgo +//go:build cgo && !arm +// +build cgo,!arm package audio