diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go deleted file mode 100644 index 3887e591..00000000 --- a/internal/audio/batch_audio.go +++ /dev/null @@ -1,626 +0,0 @@ -//go:build cgo - -package audio - -import ( - "context" - "fmt" - "runtime" - "sync" - "sync/atomic" - "time" - "unsafe" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// BatchAudioProcessor manages batched CGO operations to reduce syscall overhead -type BatchAudioProcessor struct { - // Statistics - MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - stats BatchAudioStats - - // Control - ctx context.Context - cancel context.CancelFunc - logger *zerolog.Logger - batchSize int - batchDuration time.Duration - - // Batch queues and state (atomic for lock-free access) - readQueue chan batchReadRequest - writeQueue chan batchWriteRequest - initialized int32 - running int32 - threadPinned int32 - writePinned int32 - - // Buffers (pre-allocated to avoid allocation overhead) - readBufPool *sync.Pool - writeBufPool *sync.Pool -} - -type BatchAudioStats struct { - // int64 fields MUST be first for ARM32 alignment - BatchedReads int64 - SingleReads int64 - BatchedWrites int64 - SingleWrites int64 - BatchedFrames int64 - SingleFrames int64 - WriteFrames int64 - CGOCallsReduced int64 - OSThreadPinTime time.Duration // time.Duration is int64 internally - WriteThreadTime time.Duration // time.Duration is int64 internally - LastBatchTime time.Time - LastWriteTime time.Time -} - -type batchReadRequest struct { - buffer []byte - resultChan chan batchReadResult - timestamp time.Time -} - -type batchReadResult struct { - length int - err error -} - -type batchWriteRequest struct { - buffer []byte // Buffer for backward compatibility - opusData []byte // Opus encoded data for decode-write operations - pcmBuffer []byte // PCM buffer for decode-write operations - resultChan chan batchWriteResult - timestamp time.Time -} - -type batchWriteResult struct { - length int - err error -} - -// NewBatchAudioProcessor creates a new batch audio processor -func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { - // Validate input parameters with minimal overhead - if batchSize <= 0 || batchSize > 1000 { - batchSize = Config.BatchProcessorFramesPerBatch - } - if batchDuration <= 0 { - batchDuration = Config.BatchProcessingDelay - } - - // Use optimized queue sizes from configuration - queueSize := Config.BatchProcessorMaxQueueSize - if queueSize <= 0 { - queueSize = batchSize * 2 // Fallback to double batch size - } - - ctx, cancel := context.WithCancel(context.Background()) - // Pre-allocate logger to avoid repeated allocations - logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() - - frameSize := Config.MinReadEncodeBuffer - if frameSize == 0 { - frameSize = 1500 // Safe fallback - } - - processor := &BatchAudioProcessor{ - ctx: ctx, - cancel: cancel, - logger: &logger, - batchSize: batchSize, - batchDuration: batchDuration, - readQueue: make(chan batchReadRequest, queueSize), - writeQueue: make(chan batchWriteRequest, queueSize), - readBufPool: &sync.Pool{ - New: func() interface{} { - return make([]byte, 0, frameSize) - }, - }, - writeBufPool: &sync.Pool{ - New: func() interface{} { - return make([]byte, 0, frameSize) - }, - }, - } - - return processor -} - -// Start initializes and starts the batch processor -func (bap *BatchAudioProcessor) Start() error { - if !atomic.CompareAndSwapInt32(&bap.running, 0, 1) { - return nil // Already running - } - - // Initialize CGO resources once per processor lifecycle - if !atomic.CompareAndSwapInt32(&bap.initialized, 0, 1) { - return nil // Already initialized - } - - // Start batch processing goroutines - go bap.batchReadProcessor() - go bap.batchWriteProcessor() - - bap.logger.Info().Int("batch_size", bap.batchSize). - Dur("batch_duration", bap.batchDuration). - Msg("batch audio processor started") - - return nil -} - -// Stop cleanly shuts down the batch processor -func (bap *BatchAudioProcessor) Stop() { - if !atomic.CompareAndSwapInt32(&bap.running, 1, 0) { - return // Already stopped - } - - bap.cancel() - - // Wait for processing to complete - time.Sleep(bap.batchDuration + Config.BatchProcessingDelay) - - bap.logger.Info().Msg("batch audio processor stopped") -} - -// BatchReadEncode performs batched audio read and encode operations -func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { - // Validate buffer before processing - if err := ValidateBufferSize(len(buffer)); err != nil { - // Only log validation errors in debug mode to reduce overhead - if bap.logger.GetLevel() <= zerolog.DebugLevel { - bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") - } - return 0, err - } - - if !bap.IsRunning() { - // Fallback to single operation if batch processor is not running - // Use sampling to reduce atomic operations overhead - if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { - atomic.AddInt64(&bap.stats.SingleReads, 10) - atomic.AddInt64(&bap.stats.SingleFrames, 10) - } - return CGOAudioReadEncode(buffer) - } - - resultChan := make(chan batchReadResult, 1) - request := batchReadRequest{ - buffer: buffer, - resultChan: resultChan, - timestamp: time.Now(), - } - - // Try to queue the request with non-blocking send - select { - case bap.readQueue <- request: - // Successfully queued - default: - // Queue is full, fallback to single operation - // Use sampling to reduce atomic operations overhead - if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { - atomic.AddInt64(&bap.stats.SingleReads, 10) - atomic.AddInt64(&bap.stats.SingleFrames, 10) - } - return CGOAudioReadEncode(buffer) - } - - // Wait for result with timeout - select { - case result := <-resultChan: - return result.length, result.err - case <-time.After(Config.BatchProcessorTimeout): - // Timeout, fallback to single operation - // Use sampling to reduce atomic operations overhead - if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { - atomic.AddInt64(&bap.stats.SingleReads, 10) - atomic.AddInt64(&bap.stats.SingleFrames, 10) - } - return CGOAudioReadEncode(buffer) - } -} - -// BatchDecodeWrite performs batched audio decode and write operations -// This is the legacy version that uses a single buffer -func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { - // Validate buffer before processing - if err := ValidateBufferSize(len(buffer)); err != nil { - // Only log validation errors in debug mode to reduce overhead - if bap.logger.GetLevel() <= zerolog.DebugLevel { - bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") - } - return 0, err - } - - if !bap.IsRunning() { - // Fallback to single operation if batch processor is not running - // Use sampling to reduce atomic operations overhead - if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { - atomic.AddInt64(&bap.stats.SingleWrites, 10) - atomic.AddInt64(&bap.stats.WriteFrames, 10) - } - return CGOAudioDecodeWriteLegacy(buffer) - } - - resultChan := make(chan batchWriteResult, 1) - request := batchWriteRequest{ - buffer: buffer, - resultChan: resultChan, - timestamp: time.Now(), - } - - // Try to queue the request with non-blocking send - select { - case bap.writeQueue <- request: - // Successfully queued - default: - // Queue is full, fall back to single operation - // Use sampling to reduce atomic operations overhead - if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { - atomic.AddInt64(&bap.stats.SingleWrites, 10) - atomic.AddInt64(&bap.stats.WriteFrames, 10) - } - return CGOAudioDecodeWriteLegacy(buffer) - } - - // Wait for result with timeout - select { - case result := <-resultChan: - return result.length, result.err - case <-time.After(Config.BatchProcessorTimeout): - // Use sampling to reduce atomic operations overhead - if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { - atomic.AddInt64(&bap.stats.SingleWrites, 10) - atomic.AddInt64(&bap.stats.WriteFrames, 10) - } - return CGOAudioDecodeWriteLegacy(buffer) - } -} - -// BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers -func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { - // Validate buffers before processing - if len(opusData) == 0 { - return 0, fmt.Errorf("empty opus data buffer") - } - if len(pcmBuffer) == 0 { - return 0, fmt.Errorf("empty PCM buffer") - } - - if !bap.IsRunning() { - // Fallback to single operation if batch processor is not running - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.WriteFrames, 1) - // Use the optimized function with separate buffers - return CGOAudioDecodeWrite(opusData, pcmBuffer) - } - - resultChan := make(chan batchWriteResult, 1) - request := batchWriteRequest{ - opusData: opusData, - pcmBuffer: pcmBuffer, - resultChan: resultChan, - timestamp: time.Now(), - } - - // Try to queue the request with non-blocking send - select { - case bap.writeQueue <- request: - // Successfully queued - default: - // Queue is full, fall back to single operation - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.WriteFrames, 1) - // Use the optimized function with separate buffers - return CGOAudioDecodeWrite(opusData, pcmBuffer) - } - - // Wait for result with timeout - select { - case result := <-resultChan: - return result.length, result.err - case <-time.After(Config.BatchProcessorTimeout): - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.WriteFrames, 1) - // Use the optimized function with separate buffers - return CGOAudioDecodeWrite(opusData, pcmBuffer) - } -} - -// batchReadProcessor processes batched read operations -func (bap *BatchAudioProcessor) batchReadProcessor() { - defer bap.logger.Debug().Msg("batch read processor stopped") - - ticker := time.NewTicker(bap.batchDuration) - defer ticker.Stop() - - var batch []batchReadRequest - batch = make([]batchReadRequest, 0, bap.batchSize) - - for atomic.LoadInt32(&bap.running) == 1 { - select { - case <-bap.ctx.Done(): - return - - case req := <-bap.readQueue: - batch = append(batch, req) - if len(batch) >= bap.batchSize { - bap.processBatchRead(batch) - batch = batch[:0] // Clear slice but keep capacity - } - - case <-ticker.C: - if len(batch) > 0 { - bap.processBatchRead(batch) - batch = batch[:0] // Clear slice but keep capacity - } - } - } - - // Process any remaining requests - if len(batch) > 0 { - bap.processBatchRead(batch) - } -} - -// batchWriteProcessor processes batched write operations -func (bap *BatchAudioProcessor) batchWriteProcessor() { - defer bap.logger.Debug().Msg("batch write processor stopped") - - ticker := time.NewTicker(bap.batchDuration) - defer ticker.Stop() - - var batch []batchWriteRequest - batch = make([]batchWriteRequest, 0, bap.batchSize) - - for atomic.LoadInt32(&bap.running) == 1 { - select { - case <-bap.ctx.Done(): - return - - case req := <-bap.writeQueue: - batch = append(batch, req) - if len(batch) >= bap.batchSize { - bap.processBatchWrite(batch) - batch = batch[:0] // Clear slice but keep capacity - } - - case <-ticker.C: - if len(batch) > 0 { - bap.processBatchWrite(batch) - batch = batch[:0] // Clear slice but keep capacity - } - } - } - - // Process any remaining requests - if len(batch) > 0 { - bap.processBatchWrite(batch) - } -} - -// processBatchRead processes a batch of read requests efficiently -func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { - batchSize := len(batch) - if batchSize == 0 { - return - } - - threadPinningThreshold := Config.BatchProcessorThreadPinningThreshold - if threadPinningThreshold == 0 { - threadPinningThreshold = Config.MinBatchSizeForThreadPinning // Fallback - } - - // Only pin to OS thread for large batches to reduce thread contention - var start time.Time - threadWasPinned := false - if batchSize >= threadPinningThreshold && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { - start = time.Now() - threadWasPinned = true - runtime.LockOSThread() - } - - // Batch stats updates to reduce atomic operations (update once per batch instead of per frame) - atomic.AddInt64(&bap.stats.BatchedReads, 1) - atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) - if batchSize > 1 { - atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) - } - - // Process each request in the batch with minimal overhead - for i := range batch { - req := &batch[i] - length, err := CGOAudioReadEncode(req.buffer) - - // Send result back (non-blocking) - reuse result struct - select { - case req.resultChan <- batchReadResult{length: length, err: err}: - default: - // Requestor timed out, drop result - } - } - - // Release thread lock if we pinned it - if threadWasPinned { - runtime.UnlockOSThread() - atomic.StoreInt32(&bap.threadPinned, 0) - bap.stats.OSThreadPinTime += time.Since(start) - } - - // Update timestamp only once per batch instead of per frame - bap.stats.LastBatchTime = time.Now() -} - -// processBatchWrite processes a batch of write requests efficiently -func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { - if len(batch) == 0 { - return - } - - threadPinningThreshold := Config.BatchProcessorThreadPinningThreshold - if threadPinningThreshold == 0 { - threadPinningThreshold = Config.MinBatchSizeForThreadPinning // Fallback - } - - // Only pin to OS thread for large batches to reduce thread contention - start := time.Now() - shouldPinThread := len(batch) >= threadPinningThreshold - - // Track if we pinned the thread in this call - threadWasPinned := false - - if shouldPinThread && atomic.CompareAndSwapInt32(&bap.writePinned, 0, 1) { - threadWasPinned = true - runtime.LockOSThread() - - // Priority scheduler not implemented - using default thread priority - } - - batchSize := len(batch) - atomic.AddInt64(&bap.stats.BatchedWrites, 1) - atomic.AddInt64(&bap.stats.WriteFrames, int64(batchSize)) - if batchSize > 1 { - atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) - } - - // Add deferred function to release thread lock if we pinned it - if threadWasPinned { - defer func() { - // Priority scheduler not implemented - using default thread priority - runtime.UnlockOSThread() - atomic.StoreInt32(&bap.writePinned, 0) - bap.stats.WriteThreadTime += time.Since(start) - }() - } - - // Process each request in the batch - for _, req := range batch { - var length int - var err error - - // Handle both legacy and new decode-write operations - if req.opusData != nil && req.pcmBuffer != nil { - // New style with separate opus data and PCM buffer - length, err = CGOAudioDecodeWrite(req.opusData, req.pcmBuffer) - } else { - // Legacy style with single buffer - length, err = CGOAudioDecodeWriteLegacy(req.buffer) - } - - result := batchWriteResult{ - length: length, - err: err, - } - - // Send result back (non-blocking) - select { - case req.resultChan <- result: - default: - // Requestor timed out, drop result - } - } - - bap.stats.LastWriteTime = time.Now() -} - -// GetStats returns current batch processor statistics -func (bap *BatchAudioProcessor) GetStats() BatchAudioStats { - return BatchAudioStats{ - BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads), - SingleReads: atomic.LoadInt64(&bap.stats.SingleReads), - BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites), - SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites), - BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames), - SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames), - WriteFrames: atomic.LoadInt64(&bap.stats.WriteFrames), - CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced), - OSThreadPinTime: bap.stats.OSThreadPinTime, - WriteThreadTime: bap.stats.WriteThreadTime, - LastBatchTime: bap.stats.LastBatchTime, - LastWriteTime: bap.stats.LastWriteTime, - } -} - -// IsRunning returns whether the batch processor is running -func (bap *BatchAudioProcessor) IsRunning() bool { - return atomic.LoadInt32(&bap.running) == 1 -} - -// Global batch processor instance -var ( - globalBatchProcessor unsafe.Pointer // *BatchAudioProcessor - batchProcessorInitialized int32 -) - -// GetBatchAudioProcessor returns the global batch processor instance -func GetBatchAudioProcessor() *BatchAudioProcessor { - ptr := atomic.LoadPointer(&globalBatchProcessor) - if ptr != nil { - return (*BatchAudioProcessor)(ptr) - } - - // Initialize on first use - if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) { - processor := NewBatchAudioProcessor(Config.BatchProcessorFramesPerBatch, Config.BatchProcessorTimeout) - atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor)) - return processor - } - - // Another goroutine initialized it, try again - ptr = atomic.LoadPointer(&globalBatchProcessor) - if ptr != nil { - return (*BatchAudioProcessor)(ptr) - } - - // Fallback: create a new processor (should rarely happen) - return NewBatchAudioProcessor(Config.BatchProcessorFramesPerBatch, Config.BatchProcessorTimeout) -} - -// EnableBatchAudioProcessing enables the global batch processor -func EnableBatchAudioProcessing() error { - processor := GetBatchAudioProcessor() - return processor.Start() -} - -// DisableBatchAudioProcessing disables the global batch processor -func DisableBatchAudioProcessing() { - ptr := atomic.LoadPointer(&globalBatchProcessor) - if ptr != nil { - processor := (*BatchAudioProcessor)(ptr) - processor.Stop() - } -} - -// BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode -func BatchCGOAudioReadEncode(buffer []byte) (int, error) { - processor := GetBatchAudioProcessor() - if processor == nil || !processor.IsRunning() { - // Fall back to non-batched version if processor is not running - return CGOAudioReadEncode(buffer) - } - - return processor.BatchReadEncode(buffer) -} - -// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite -func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) { - processor := GetBatchAudioProcessor() - if processor == nil || !processor.IsRunning() { - // Fall back to non-batched version if processor is not running - return CGOAudioDecodeWriteLegacy(buffer) - } - - return processor.BatchDecodeWrite(buffer) -} - -// BatchCGOAudioDecodeWriteWithBuffers is a batched version of CGOAudioDecodeWrite that uses separate opus and PCM buffers -func BatchCGOAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) { - processor := GetBatchAudioProcessor() - if processor == nil || !processor.IsRunning() { - // Fall back to non-batched version if processor is not running - return CGOAudioDecodeWrite(opusData, pcmBuffer) - } - - return processor.BatchDecodeWriteWithBuffers(opusData, pcmBuffer) -} diff --git a/internal/audio/batch_reference.go b/internal/audio/batch_reference.go deleted file mode 100644 index ecfa8d3a..00000000 --- a/internal/audio/batch_reference.go +++ /dev/null @@ -1,331 +0,0 @@ -//go:build cgo - -package audio - -import ( - "errors" - "sync" - "sync/atomic" - "unsafe" -) - -// BatchReferenceManager handles batch reference counting operations -// to reduce atomic operation overhead for high-frequency frame operations -type BatchReferenceManager struct { - // Batch operations queue - batchQueue chan batchRefOperation - workerPool chan struct{} // Worker pool semaphore - running int32 - wg sync.WaitGroup - - // Statistics - batchedOps int64 - singleOps int64 - batchSavings int64 // Number of atomic operations saved -} - -type batchRefOperation struct { - frames []*ZeroCopyAudioFrame - operation refOperationType - resultCh chan batchRefResult -} - -type refOperationType int - -const ( - refOpAddRef refOperationType = iota - refOpRelease - refOpMixed // For operations with mixed AddRef/Release -) - -// Errors -var ( - ErrUnsupportedOperation = errors.New("unsupported batch reference operation") -) - -type batchRefResult struct { - finalReleases []bool // For Release operations, indicates which frames had final release - err error -} - -// Global batch reference manager -var ( - globalBatchRefManager *BatchReferenceManager - batchRefOnce sync.Once -) - -// GetBatchReferenceManager returns the global batch reference manager -func GetBatchReferenceManager() *BatchReferenceManager { - batchRefOnce.Do(func() { - globalBatchRefManager = NewBatchReferenceManager() - globalBatchRefManager.Start() - }) - return globalBatchRefManager -} - -// NewBatchReferenceManager creates a new batch reference manager -func NewBatchReferenceManager() *BatchReferenceManager { - return &BatchReferenceManager{ - batchQueue: make(chan batchRefOperation, 256), // Buffered for high throughput - workerPool: make(chan struct{}, 4), // 4 workers for parallel processing - } -} - -// Start starts the batch reference manager workers -func (brm *BatchReferenceManager) Start() { - if !atomic.CompareAndSwapInt32(&brm.running, 0, 1) { - return // Already running - } - - // Start worker goroutines - for i := 0; i < cap(brm.workerPool); i++ { - brm.wg.Add(1) - go brm.worker() - } -} - -// Stop stops the batch reference manager -func (brm *BatchReferenceManager) Stop() { - if !atomic.CompareAndSwapInt32(&brm.running, 1, 0) { - return // Already stopped - } - - close(brm.batchQueue) - brm.wg.Wait() -} - -// worker processes batch reference operations -func (brm *BatchReferenceManager) worker() { - defer brm.wg.Done() - - for op := range brm.batchQueue { - brm.processBatchOperation(op) - } -} - -// processBatchOperation processes a batch of reference operations -func (brm *BatchReferenceManager) processBatchOperation(op batchRefOperation) { - result := batchRefResult{} - - switch op.operation { - case refOpAddRef: - // Batch AddRef operations - for _, frame := range op.frames { - if frame != nil { - atomic.AddInt32(&frame.refCount, 1) - } - } - atomic.AddInt64(&brm.batchedOps, int64(len(op.frames))) - atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1)) // Saved ops vs individual calls - - case refOpRelease: - // Batch Release operations - result.finalReleases = make([]bool, len(op.frames)) - for i, frame := range op.frames { - if frame != nil { - newCount := atomic.AddInt32(&frame.refCount, -1) - if newCount == 0 { - result.finalReleases[i] = true - // Return to pool if pooled - if frame.pooled { - globalZeroCopyPool.Put(frame) - } - } - } - } - atomic.AddInt64(&brm.batchedOps, int64(len(op.frames))) - atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1)) - - case refOpMixed: - // Handle mixed operations (not implemented in this version) - result.err = ErrUnsupportedOperation - } - - // Send result back - if op.resultCh != nil { - op.resultCh <- result - close(op.resultCh) - } -} - -// BatchAddRef performs AddRef on multiple frames in a single batch -func (brm *BatchReferenceManager) BatchAddRef(frames []*ZeroCopyAudioFrame) error { - if len(frames) == 0 { - return nil - } - - // For small batches, use direct operations to avoid overhead - if len(frames) <= 2 { - for _, frame := range frames { - if frame != nil { - frame.AddRef() - } - } - atomic.AddInt64(&brm.singleOps, int64(len(frames))) - return nil - } - - // Use batch processing for larger sets - if atomic.LoadInt32(&brm.running) == 0 { - // Fallback to individual operations if batch manager not running - for _, frame := range frames { - if frame != nil { - frame.AddRef() - } - } - atomic.AddInt64(&brm.singleOps, int64(len(frames))) - return nil - } - - resultCh := make(chan batchRefResult, 1) - op := batchRefOperation{ - frames: frames, - operation: refOpAddRef, - resultCh: resultCh, - } - - select { - case brm.batchQueue <- op: - // Wait for completion - <-resultCh - return nil - default: - // Queue full, fallback to individual operations - for _, frame := range frames { - if frame != nil { - frame.AddRef() - } - } - atomic.AddInt64(&brm.singleOps, int64(len(frames))) - return nil - } -} - -// BatchRelease performs Release on multiple frames in a single batch -// Returns a slice indicating which frames had their final reference released -func (brm *BatchReferenceManager) BatchRelease(frames []*ZeroCopyAudioFrame) ([]bool, error) { - if len(frames) == 0 { - return nil, nil - } - - // For small batches, use direct operations - if len(frames) <= 2 { - finalReleases := make([]bool, len(frames)) - for i, frame := range frames { - if frame != nil { - finalReleases[i] = frame.Release() - } - } - atomic.AddInt64(&brm.singleOps, int64(len(frames))) - return finalReleases, nil - } - - // Use batch processing for larger sets - if atomic.LoadInt32(&brm.running) == 0 { - // Fallback to individual operations - finalReleases := make([]bool, len(frames)) - for i, frame := range frames { - if frame != nil { - finalReleases[i] = frame.Release() - } - } - atomic.AddInt64(&brm.singleOps, int64(len(frames))) - return finalReleases, nil - } - - resultCh := make(chan batchRefResult, 1) - op := batchRefOperation{ - frames: frames, - operation: refOpRelease, - resultCh: resultCh, - } - - select { - case brm.batchQueue <- op: - // Wait for completion - result := <-resultCh - return result.finalReleases, result.err - default: - // Queue full, fallback to individual operations - finalReleases := make([]bool, len(frames)) - for i, frame := range frames { - if frame != nil { - finalReleases[i] = frame.Release() - } - } - atomic.AddInt64(&brm.singleOps, int64(len(frames))) - return finalReleases, nil - } -} - -// GetStats returns batch reference counting statistics -func (brm *BatchReferenceManager) GetStats() (batchedOps, singleOps, savings int64) { - return atomic.LoadInt64(&brm.batchedOps), - atomic.LoadInt64(&brm.singleOps), - atomic.LoadInt64(&brm.batchSavings) -} - -// Convenience functions for global batch reference manager - -// BatchAddRefFrames performs batch AddRef on multiple frames -func BatchAddRefFrames(frames []*ZeroCopyAudioFrame) error { - return GetBatchReferenceManager().BatchAddRef(frames) -} - -// BatchReleaseFrames performs batch Release on multiple frames -func BatchReleaseFrames(frames []*ZeroCopyAudioFrame) ([]bool, error) { - return GetBatchReferenceManager().BatchRelease(frames) -} - -// GetBatchReferenceStats returns global batch reference statistics -func GetBatchReferenceStats() (batchedOps, singleOps, savings int64) { - return GetBatchReferenceManager().GetStats() -} - -// ZeroCopyFrameSlice provides utilities for working with slices of zero-copy frames -type ZeroCopyFrameSlice []*ZeroCopyAudioFrame - -// AddRefAll performs batch AddRef on all frames in the slice -func (zfs ZeroCopyFrameSlice) AddRefAll() error { - return BatchAddRefFrames(zfs) -} - -// ReleaseAll performs batch Release on all frames in the slice -func (zfs ZeroCopyFrameSlice) ReleaseAll() ([]bool, error) { - return BatchReleaseFrames(zfs) -} - -// FilterNonNil returns a new slice with only non-nil frames -func (zfs ZeroCopyFrameSlice) FilterNonNil() ZeroCopyFrameSlice { - filtered := make(ZeroCopyFrameSlice, 0, len(zfs)) - for _, frame := range zfs { - if frame != nil { - filtered = append(filtered, frame) - } - } - return filtered -} - -// Len returns the number of frames in the slice -func (zfs ZeroCopyFrameSlice) Len() int { - return len(zfs) -} - -// Get returns the frame at the specified index -func (zfs ZeroCopyFrameSlice) Get(index int) *ZeroCopyAudioFrame { - if index < 0 || index >= len(zfs) { - return nil - } - return zfs[index] -} - -// UnsafePointers returns unsafe pointers for all frames (for CGO batch operations) -func (zfs ZeroCopyFrameSlice) UnsafePointers() []unsafe.Pointer { - pointers := make([]unsafe.Pointer, len(zfs)) - for i, frame := range zfs { - if frame != nil { - pointers[i] = frame.UnsafePointer() - } - } - return pointers -} diff --git a/internal/audio/batch_zero_copy.go b/internal/audio/batch_zero_copy.go deleted file mode 100644 index 8d066521..00000000 --- a/internal/audio/batch_zero_copy.go +++ /dev/null @@ -1,415 +0,0 @@ -//go:build cgo - -package audio - -import ( - "sync" - "sync/atomic" - "time" -) - -// BatchZeroCopyProcessor handles batch operations on zero-copy audio frames -// with optimized reference counting and memory management -type BatchZeroCopyProcessor struct { - // Configuration - maxBatchSize int - batchTimeout time.Duration - processingDelay time.Duration - adaptiveThreshold float64 - - // Processing queues - readEncodeQueue chan *batchZeroCopyRequest - decodeWriteQueue chan *batchZeroCopyRequest - - // Worker management - workerPool chan struct{} - running int32 - wg sync.WaitGroup - - // Statistics - batchedFrames int64 - singleFrames int64 - batchSavings int64 - processingTimeUs int64 - adaptiveHits int64 - adaptiveMisses int64 -} - -type batchZeroCopyRequest struct { - frames []*ZeroCopyAudioFrame - operation batchZeroCopyOperation - resultCh chan batchZeroCopyResult - timestamp time.Time -} - -type batchZeroCopyOperation int - -const ( - batchOpReadEncode batchZeroCopyOperation = iota - batchOpDecodeWrite - batchOpMixed -) - -type batchZeroCopyResult struct { - encodedData [][]byte // For read-encode operations - processedCount int // Number of successfully processed frames - err error -} - -// Global batch zero-copy processor -var ( - globalBatchZeroCopyProcessor *BatchZeroCopyProcessor - batchZeroCopyOnce sync.Once -) - -// GetBatchZeroCopyProcessor returns the global batch zero-copy processor -func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor { - batchZeroCopyOnce.Do(func() { - globalBatchZeroCopyProcessor = NewBatchZeroCopyProcessor() - globalBatchZeroCopyProcessor.Start() - }) - return globalBatchZeroCopyProcessor -} - -// NewBatchZeroCopyProcessor creates a new batch zero-copy processor -func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor { - cache := Config - return &BatchZeroCopyProcessor{ - maxBatchSize: cache.BatchProcessorFramesPerBatch, - batchTimeout: cache.BatchProcessorTimeout, - processingDelay: cache.BatchProcessingDelay, - adaptiveThreshold: cache.BatchProcessorAdaptiveThreshold, - readEncodeQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize), - decodeWriteQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize), - workerPool: make(chan struct{}, 4), // 4 workers for parallel processing - } -} - -// Start starts the batch zero-copy processor workers -func (bzcp *BatchZeroCopyProcessor) Start() { - if !atomic.CompareAndSwapInt32(&bzcp.running, 0, 1) { - return // Already running - } - - // Start worker goroutines for read-encode operations - for i := 0; i < cap(bzcp.workerPool)/2; i++ { - bzcp.wg.Add(1) - go bzcp.readEncodeWorker() - } - - // Start worker goroutines for decode-write operations - for i := 0; i < cap(bzcp.workerPool)/2; i++ { - bzcp.wg.Add(1) - go bzcp.decodeWriteWorker() - } -} - -// Stop stops the batch zero-copy processor -func (bzcp *BatchZeroCopyProcessor) Stop() { - if !atomic.CompareAndSwapInt32(&bzcp.running, 1, 0) { - return // Already stopped - } - - close(bzcp.readEncodeQueue) - close(bzcp.decodeWriteQueue) - bzcp.wg.Wait() -} - -// readEncodeWorker processes batch read-encode operations -func (bzcp *BatchZeroCopyProcessor) readEncodeWorker() { - defer bzcp.wg.Done() - - for req := range bzcp.readEncodeQueue { - bzcp.processBatchReadEncode(req) - } -} - -// decodeWriteWorker processes batch decode-write operations -func (bzcp *BatchZeroCopyProcessor) decodeWriteWorker() { - defer bzcp.wg.Done() - - for req := range bzcp.decodeWriteQueue { - bzcp.processBatchDecodeWrite(req) - } -} - -// processBatchReadEncode processes a batch of read-encode operations -func (bzcp *BatchZeroCopyProcessor) processBatchReadEncode(req *batchZeroCopyRequest) { - startTime := time.Now() - result := batchZeroCopyResult{} - - // Batch AddRef all frames first - err := BatchAddRefFrames(req.frames) - if err != nil { - result.err = err - if req.resultCh != nil { - req.resultCh <- result - close(req.resultCh) - } - return - } - - // Process frames using existing batch read-encode logic - encodedData, err := BatchReadEncode(len(req.frames)) - if err != nil { - // Batch release frames on error - if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil { - // Log release error but preserve original error - _ = releaseErr - } - result.err = err - } else { - result.encodedData = encodedData - result.processedCount = len(encodedData) - // Batch release frames after successful processing - if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil { - // Log release error but don't fail the operation - _ = releaseErr - } - } - - // Update statistics - atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames))) - atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1)) - atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds()) - - // Send result back - if req.resultCh != nil { - req.resultCh <- result - close(req.resultCh) - } -} - -// processBatchDecodeWrite processes a batch of decode-write operations -func (bzcp *BatchZeroCopyProcessor) processBatchDecodeWrite(req *batchZeroCopyRequest) { - startTime := time.Now() - result := batchZeroCopyResult{} - - // Batch AddRef all frames first - err := BatchAddRefFrames(req.frames) - if err != nil { - result.err = err - if req.resultCh != nil { - req.resultCh <- result - close(req.resultCh) - } - return - } - - // Extract data from zero-copy frames for batch processing - frameData := make([][]byte, len(req.frames)) - for i, frame := range req.frames { - if frame != nil { - // Get data from zero-copy frame - frameData[i] = frame.Data()[:frame.Length()] - } - } - - // Process frames using existing batch decode-write logic - err = BatchDecodeWrite(frameData) - if err != nil { - result.err = err - } else { - result.processedCount = len(req.frames) - } - - // Batch release frames - if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil { - // Log release error but don't override processing error - _ = releaseErr - } - - // Update statistics - atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames))) - atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1)) - atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds()) - - // Send result back - if req.resultCh != nil { - req.resultCh <- result - close(req.resultCh) - } -} - -// BatchReadEncodeZeroCopy performs batch read-encode on zero-copy frames -func (bzcp *BatchZeroCopyProcessor) BatchReadEncodeZeroCopy(frames []*ZeroCopyAudioFrame) ([][]byte, error) { - if len(frames) == 0 { - return nil, nil - } - - // For small batches, use direct operations to avoid overhead - if len(frames) <= 2 { - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleReadEncode(frames) - } - - // Use adaptive threshold to determine batch vs single processing - batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames) - singleFrames := atomic.LoadInt64(&bzcp.singleFrames) - totalFrames := batchedFrames + singleFrames - - if totalFrames > 100 { // Only apply adaptive logic after some samples - batchRatio := float64(batchedFrames) / float64(totalFrames) - if batchRatio < bzcp.adaptiveThreshold { - // Batch processing not effective, use single processing - atomic.AddInt64(&bzcp.adaptiveMisses, 1) - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleReadEncode(frames) - } - atomic.AddInt64(&bzcp.adaptiveHits, 1) - } - - // Use batch processing - if atomic.LoadInt32(&bzcp.running) == 0 { - // Fallback to single processing if batch processor not running - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleReadEncode(frames) - } - - resultCh := make(chan batchZeroCopyResult, 1) - req := &batchZeroCopyRequest{ - frames: frames, - operation: batchOpReadEncode, - resultCh: resultCh, - timestamp: time.Now(), - } - - select { - case bzcp.readEncodeQueue <- req: - // Wait for completion - result := <-resultCh - return result.encodedData, result.err - default: - // Queue full, fallback to single processing - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleReadEncode(frames) - } -} - -// BatchDecodeWriteZeroCopy performs batch decode-write on zero-copy frames -func (bzcp *BatchZeroCopyProcessor) BatchDecodeWriteZeroCopy(frames []*ZeroCopyAudioFrame) error { - if len(frames) == 0 { - return nil - } - - // For small batches, use direct operations - if len(frames) <= 2 { - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleDecodeWrite(frames) - } - - // Use adaptive threshold - batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames) - singleFrames := atomic.LoadInt64(&bzcp.singleFrames) - totalFrames := batchedFrames + singleFrames - - if totalFrames > 100 { - batchRatio := float64(batchedFrames) / float64(totalFrames) - if batchRatio < bzcp.adaptiveThreshold { - atomic.AddInt64(&bzcp.adaptiveMisses, 1) - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleDecodeWrite(frames) - } - atomic.AddInt64(&bzcp.adaptiveHits, 1) - } - - // Use batch processing - if atomic.LoadInt32(&bzcp.running) == 0 { - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleDecodeWrite(frames) - } - - resultCh := make(chan batchZeroCopyResult, 1) - req := &batchZeroCopyRequest{ - frames: frames, - operation: batchOpDecodeWrite, - resultCh: resultCh, - timestamp: time.Now(), - } - - select { - case bzcp.decodeWriteQueue <- req: - // Wait for completion - result := <-resultCh - return result.err - default: - // Queue full, fallback to single processing - atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) - return bzcp.processSingleDecodeWrite(frames) - } -} - -// processSingleReadEncode processes frames individually for read-encode -func (bzcp *BatchZeroCopyProcessor) processSingleReadEncode(frames []*ZeroCopyAudioFrame) ([][]byte, error) { - // Extract data and use existing batch processing - frameData := make([][]byte, 0, len(frames)) - for _, frame := range frames { - if frame != nil { - frame.AddRef() - frameData = append(frameData, frame.Data()[:frame.Length()]) - } - } - - // Use existing batch read-encode - result, err := BatchReadEncode(len(frameData)) - - // Release frames - for _, frame := range frames { - if frame != nil { - frame.Release() - } - } - - return result, err -} - -// processSingleDecodeWrite processes frames individually for decode-write -func (bzcp *BatchZeroCopyProcessor) processSingleDecodeWrite(frames []*ZeroCopyAudioFrame) error { - // Extract data and use existing batch processing - frameData := make([][]byte, 0, len(frames)) - for _, frame := range frames { - if frame != nil { - frame.AddRef() - frameData = append(frameData, frame.Data()[:frame.Length()]) - } - } - - // Use existing batch decode-write - err := BatchDecodeWrite(frameData) - - // Release frames - for _, frame := range frames { - if frame != nil { - frame.Release() - } - } - - return err -} - -// GetBatchZeroCopyStats returns batch zero-copy processing statistics -func (bzcp *BatchZeroCopyProcessor) GetBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) { - return atomic.LoadInt64(&bzcp.batchedFrames), - atomic.LoadInt64(&bzcp.singleFrames), - atomic.LoadInt64(&bzcp.batchSavings), - atomic.LoadInt64(&bzcp.processingTimeUs), - atomic.LoadInt64(&bzcp.adaptiveHits), - atomic.LoadInt64(&bzcp.adaptiveMisses) -} - -// Convenience functions for global batch zero-copy processor - -// BatchReadEncodeZeroCopyFrames performs batch read-encode on zero-copy frames -func BatchReadEncodeZeroCopyFrames(frames []*ZeroCopyAudioFrame) ([][]byte, error) { - return GetBatchZeroCopyProcessor().BatchReadEncodeZeroCopy(frames) -} - -// BatchDecodeWriteZeroCopyFrames performs batch decode-write on zero-copy frames -func BatchDecodeWriteZeroCopyFrames(frames []*ZeroCopyAudioFrame) error { - return GetBatchZeroCopyProcessor().BatchDecodeWriteZeroCopy(frames) -} - -// GetGlobalBatchZeroCopyStats returns global batch zero-copy processing statistics -func GetGlobalBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) { - return GetBatchZeroCopyProcessor().GetBatchZeroCopyStats() -} diff --git a/internal/audio/c/audio.c b/internal/audio/c/audio.c index 43709028..203e41d5 100644 --- a/internal/audio/c/audio.c +++ b/internal/audio/c/audio.c @@ -554,7 +554,6 @@ retry_write: return pcm_frames; } - // Safe playback cleanup with double-close protection void jetkvm_audio_playback_close() { // Wait for any ongoing operations to complete diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index f726b684..857f7c22 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -161,16 +161,6 @@ type AudioConfigCache struct { inputProcessingTimeoutMS atomic.Int32 maxRestartAttempts atomic.Int32 - // Batch processing related values - BatchProcessingTimeout time.Duration - BatchProcessorFramesPerBatch int - BatchProcessorTimeout time.Duration - BatchProcessingDelay time.Duration - MinBatchSizeForThreadPinning int - BatchProcessorMaxQueueSize int - BatchProcessorAdaptiveThreshold float64 - BatchProcessorThreadPinningThreshold int - // Mutex for updating the cache mutex sync.RWMutex lastUpdate time.Time @@ -234,16 +224,6 @@ func (c *AudioConfigCache) Update() { c.minOpusBitrate.Store(int32(Config.MinOpusBitrate)) c.maxOpusBitrate.Store(int32(Config.MaxOpusBitrate)) - // Update batch processing related values - c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing - c.BatchProcessorFramesPerBatch = Config.BatchProcessorFramesPerBatch - c.BatchProcessorTimeout = Config.BatchProcessorTimeout - c.BatchProcessingDelay = Config.BatchProcessingDelay - c.MinBatchSizeForThreadPinning = Config.MinBatchSizeForThreadPinning - c.BatchProcessorMaxQueueSize = Config.BatchProcessorMaxQueueSize - c.BatchProcessorAdaptiveThreshold = Config.BatchProcessorAdaptiveThreshold - c.BatchProcessorThreadPinningThreshold = Config.BatchProcessorThreadPinningThreshold - // Pre-allocate common errors c.bufferTooSmallReadEncode = newBufferTooSmallError(0, Config.MinReadEncodeBuffer) c.bufferTooLargeDecodeWrite = newBufferTooLargeError(Config.MaxDecodeWriteBuffer+1, Config.MaxDecodeWriteBuffer) @@ -251,6 +231,9 @@ func (c *AudioConfigCache) Update() { c.lastUpdate = time.Now() c.initialized.Store(true) + c.lastUpdate = time.Now() + c.initialized.Store(true) + // Update the global validation cache as well if cachedMaxFrameSize != 0 { cachedMaxFrameSize = Config.MaxAudioFrameSize @@ -388,7 +371,9 @@ func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType // Buffer pool for reusing buffers in CGO functions var ( - // Using SizedBufferPool for better memory management + // Simple buffer pool for PCM data + pcmBufferPool = NewAudioBufferPool(Config.MaxPCMBufferSize) + // Track buffer pool usage cgoBufferPoolGets atomic.Int64 cgoBufferPoolPuts atomic.Int64 @@ -402,13 +387,14 @@ var ( // GetBufferFromPool gets a buffer from the pool with at least the specified capacity func GetBufferFromPool(minCapacity int) []byte { cgoBufferPoolGets.Add(1) - return GetOptimalBuffer(minCapacity) + // Use simple fixed-size buffer for PCM data + return pcmBufferPool.Get() } // ReturnBufferToPool returns a buffer to the pool func ReturnBufferToPool(buf []byte) { cgoBufferPoolPuts.Add(1) - ReturnOptimalBuffer(buf) + pcmBufferPool.Put(buf) } // ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool @@ -451,125 +437,6 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) { return CGOAudioDecodeWrite(data, pcmBuffer) } -// BatchReadEncode reads and encodes multiple audio frames in a single batch -// with optimized zero-copy frame management and batch reference counting -func BatchReadEncode(batchSize int) ([][]byte, error) { - // Simple batch processing without complex overhead - frames := make([][]byte, 0, batchSize) - frameSize := 4096 // Fixed frame size for performance - - for i := 0; i < batchSize; i++ { - buf := make([]byte, frameSize) - n, err := cgoAudioReadEncode(buf) - if err != nil { - if i > 0 { - return frames, nil // Return partial batch - } - return nil, err - } - if n > 0 { - frames = append(frames, buf[:n]) - } - } - - return frames, nil -} - -// BatchDecodeWrite decodes and writes multiple audio frames in a single batch -// This reduces CGO call overhead by processing multiple frames at once -// with optimized zero-copy frame management and batch reference counting -func BatchDecodeWrite(frames [][]byte) error { - // Validate input - if len(frames) == 0 { - return nil - } - - // Convert to zero-copy frames for optimized processing - zeroCopyFrames := make([]*ZeroCopyAudioFrame, 0, len(frames)) - for _, frameData := range frames { - if len(frameData) > 0 { - frame := GetZeroCopyFrame() - frame.SetDataDirect(frameData) // Direct assignment without copy - zeroCopyFrames = append(zeroCopyFrames, frame) - } - } - - // Use batch reference counting for efficient management - if len(zeroCopyFrames) > 0 { - // Batch AddRef all frames at once - err := BatchAddRefFrames(zeroCopyFrames) - if err != nil { - return err - } - // Ensure cleanup with batch release - defer func() { - if _, err := BatchReleaseFrames(zeroCopyFrames); err != nil { - // Log release error but don't fail the operation - _ = err - } - }() - } - - // Get cached config - cache := GetCachedConfig() - // Only update cache if expired - avoid unnecessary overhead - // Use proper locking to avoid race condition - if cache.initialized.Load() { - cache.mutex.RLock() - cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry - cache.mutex.RUnlock() - if cacheExpired { - cache.Update() - } - } else { - cache.Update() - } - - // Track batch processing statistics - only if enabled - var startTime time.Time - // Batch time tracking removed - trackTime := false - if trackTime { - startTime = time.Now() - } - batchProcessingCount.Add(1) - - // Get a PCM buffer from the pool for optimized decode-write - pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) - defer ReturnBufferToPool(pcmBuffer) - - // Process each zero-copy frame with optimized batch processing - frameCount := 0 - for _, zcFrame := range zeroCopyFrames { - // Get frame data from zero-copy frame - frameData := zcFrame.Data()[:zcFrame.Length()] - if len(frameData) == 0 { - continue - } - - // Process this frame using optimized implementation - _, err := CGOAudioDecodeWrite(frameData, pcmBuffer) - if err != nil { - // Update statistics before returning error - batchFrameCount.Add(int64(frameCount)) - if trackTime { - batchProcessingTime.Add(time.Since(startTime).Microseconds()) - } - return err - } - - frameCount++ - } - - // Update statistics - batchFrameCount.Add(int64(frameCount)) - if trackTime { - batchProcessingTime.Add(time.Since(startTime).Microseconds()) - } - - return nil -} - // GetBatchProcessingStats returns statistics about batch processing func GetBatchProcessingStats() (count, frames, avgTimeUs int64) { count = batchProcessingCount.Load() diff --git a/internal/audio/core_config_constants.go b/internal/audio/core_config_constants.go index 8996a1d1..a388a33a 100644 --- a/internal/audio/core_config_constants.go +++ b/internal/audio/core_config_constants.go @@ -202,13 +202,6 @@ type AudioConfigConstants struct { CGOPCMBufferSize int // PCM buffer size for CGO audio processing CGONanosecondsPerSecond float64 // Nanoseconds per second conversion - // Batch Processing Constants - BatchProcessorFramesPerBatch int // Frames processed per batch (4) - BatchProcessorTimeout time.Duration // Batch processing timeout (5ms) - BatchProcessorMaxQueueSize int // Maximum batch queue size (16) - BatchProcessorAdaptiveThreshold float64 // Adaptive batch sizing threshold (0.8) - BatchProcessorThreadPinningThreshold int // Thread pinning threshold (8 frames) - // Output Streaming Constants OutputStreamingFrameIntervalMS int // Output frame interval (20ms for 50 FPS) @@ -523,12 +516,6 @@ func DefaultAudioConfig() *AudioConfigConstants { CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960) CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions - // Batch Processing Constants - Optimized for quality change bursts - BatchProcessorFramesPerBatch: 16, // Larger batches for quality changes - BatchProcessorTimeout: 20 * time.Millisecond, // Longer timeout for bursts - BatchProcessorMaxQueueSize: 64, // Larger queue for quality changes - BatchProcessorThreadPinningThreshold: 8, // Lower threshold for better performance - // Output Streaming Constants - Balanced for stability OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) for stability diff --git a/internal/audio/core_metrics.go b/internal/audio/core_metrics.go index 02aa924d..ab71ab88 100644 --- a/internal/audio/core_metrics.go +++ b/internal/audio/core_metrics.go @@ -2,7 +2,6 @@ package audio import ( "runtime" - "sync" "sync/atomic" "time" @@ -11,31 +10,6 @@ import ( ) var ( - // Socket buffer metrics - socketBufferSizeGauge = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_socket_buffer_size_bytes", - Help: "Current socket buffer size in bytes", - }, - []string{"component", "buffer_type"}, // buffer_type: send, receive - ) - - socketBufferUtilizationGauge = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_socket_buffer_utilization_percent", - Help: "Socket buffer utilization percentage", - }, - []string{"component", "buffer_type"}, // buffer_type: send, receive - ) - - socketBufferOverflowCounter = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "jetkvm_audio_socket_buffer_overflow_total", - Help: "Total number of socket buffer overflows", - }, - []string{"component", "buffer_type"}, // buffer_type: send, receive - ) - // Audio output metrics audioFramesReceivedTotal = promauto.NewCounter( prometheus.CounterOpts{ @@ -122,10 +96,7 @@ var ( }, ) - // Device health metrics - // Removed device health metrics - functionality not used - - // Memory metrics + // Memory metrics (basic monitoring) memoryHeapAllocBytes = promauto.NewGauge( prometheus.GaugeOpts{ Name: "jetkvm_audio_memory_heap_alloc_bytes", @@ -133,20 +104,6 @@ var ( }, ) - memoryHeapSysBytes = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_memory_heap_sys_bytes", - Help: "Total heap system memory in bytes", - }, - ) - - memoryHeapObjects = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_memory_heap_objects", - Help: "Number of heap objects", - }, - ) - memoryGCCount = promauto.NewCounter( prometheus.CounterOpts{ Name: "jetkvm_audio_memory_gc_total", @@ -154,74 +111,8 @@ var ( }, ) - memoryGCCPUFraction = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_memory_gc_cpu_fraction", - Help: "Fraction of CPU time spent in garbage collection", - }, - ) - - // Buffer pool efficiency metrics - bufferPoolHitRate = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_buffer_pool_hit_rate_percent", - Help: "Buffer pool hit rate percentage", - }, - []string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool - ) - - bufferPoolMissRate = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_buffer_pool_miss_rate_percent", - Help: "Buffer pool miss rate percentage", - }, - []string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool - ) - - bufferPoolUtilization = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_buffer_pool_utilization_percent", - Help: "Buffer pool utilization percentage", - }, - []string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool - ) - - bufferPoolThroughput = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_buffer_pool_throughput_ops_per_sec", - Help: "Buffer pool throughput in operations per second", - }, - []string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool - ) - - bufferPoolGetLatency = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_buffer_pool_get_latency_seconds", - Help: "Average buffer pool get operation latency in seconds", - }, - []string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool - ) - - bufferPoolPutLatency = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_buffer_pool_put_latency_seconds", - Help: "Average buffer pool put operation latency in seconds", - }, - []string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool - ) - - // Latency percentile metrics - latencyPercentile = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_latency_percentile_milliseconds", - Help: "Audio latency percentiles in milliseconds", - }, - []string{"source", "percentile"}, // source: input, output, processing; percentile: p50, p95, p99, min, max, avg - ) - // Metrics update tracking - metricsUpdateMutex sync.RWMutex - lastMetricsUpdate int64 + lastMetricsUpdate int64 // Counter value tracking (since prometheus counters don't have Get() method) audioFramesReceivedValue uint64 @@ -233,8 +124,6 @@ var ( micBytesProcessedValue uint64 micConnectionDropsValue uint64 - // Atomic counters for device health metrics - functionality removed, no longer used - // Atomic counter for memory GC memoryGCCountValue uint32 ) @@ -338,32 +227,12 @@ func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) { atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } -// UpdateSocketBufferMetrics updates socket buffer metrics -func UpdateSocketBufferMetrics(component, bufferType string, size, utilization float64, overflowOccurred bool) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - socketBufferSizeGauge.WithLabelValues(component, bufferType).Set(size) - socketBufferUtilizationGauge.WithLabelValues(component, bufferType).Set(utilization) - - if overflowOccurred { - socketBufferOverflowCounter.WithLabelValues(component, bufferType).Inc() - } - - atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) -} - -// UpdateDeviceHealthMetrics - Placeholder for future device health metrics - -// UpdateMemoryMetrics updates memory metrics +// UpdateMemoryMetrics updates basic memory metrics func UpdateMemoryMetrics() { var m runtime.MemStats runtime.ReadMemStats(&m) memoryHeapAllocBytes.Set(float64(m.HeapAlloc)) - memoryHeapSysBytes.Set(float64(m.HeapSys)) - memoryHeapObjects.Set(float64(m.HeapObjects)) - memoryGCCPUFraction.Set(m.GCCPUFraction) // Update GC count with delta calculation currentGCCount := uint32(m.NumGC) @@ -375,31 +244,6 @@ func UpdateMemoryMetrics() { atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } -// UpdateBufferPoolMetrics updates buffer pool efficiency metrics -func UpdateBufferPoolMetrics(poolName string, hitRate, missRate, utilization, throughput, getLatency, putLatency float64) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - bufferPoolHitRate.WithLabelValues(poolName).Set(hitRate * 100) - bufferPoolMissRate.WithLabelValues(poolName).Set(missRate * 100) - bufferPoolUtilization.WithLabelValues(poolName).Set(utilization * 100) - bufferPoolThroughput.WithLabelValues(poolName).Set(throughput) - bufferPoolGetLatency.WithLabelValues(poolName).Set(getLatency) - bufferPoolPutLatency.WithLabelValues(poolName).Set(putLatency) - - atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) -} - -// UpdateLatencyMetrics updates latency percentile metrics -func UpdateLatencyMetrics(source, percentile string, latencyMilliseconds float64) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - latencyPercentile.WithLabelValues(source, percentile).Set(latencyMilliseconds) - - atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) -} - // GetLastMetricsUpdate returns the timestamp of the last metrics update func GetLastMetricsUpdate() time.Time { timestamp := atomic.LoadInt64(&lastMetricsUpdate) diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go deleted file mode 100644 index 4f954d19..00000000 --- a/internal/audio/goroutine_pool.go +++ /dev/null @@ -1,329 +0,0 @@ -package audio - -import ( - "sync" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// Task represents a function to be executed by a worker in the pool -type Task func() - -// GoroutinePool manages a pool of reusable goroutines to reduce the overhead -// of goroutine creation and destruction -type GoroutinePool struct { - // Atomic fields must be first for proper alignment on 32-bit systems - taskCount int64 // Number of tasks processed - workerCount int64 // Current number of workers - maxIdleTime time.Duration - maxWorkers int - taskQueue chan Task - workerSem chan struct{} // Semaphore to limit concurrent workers - shutdown chan struct{} - shutdownOnce sync.Once - wg sync.WaitGroup - logger *zerolog.Logger - name string -} - -// NewGoroutinePool creates a new goroutine pool with the specified parameters -func NewGoroutinePool(name string, maxWorkers int, queueSize int, maxIdleTime time.Duration) *GoroutinePool { - logger := logging.GetDefaultLogger().With().Str("component", "goroutine-pool").Str("pool", name).Logger() - - pool := &GoroutinePool{ - maxWorkers: maxWorkers, - maxIdleTime: maxIdleTime, - taskQueue: make(chan Task, queueSize), - workerSem: make(chan struct{}, maxWorkers), - shutdown: make(chan struct{}), - logger: &logger, - name: name, - } - - // Start a supervisor goroutine to monitor pool health - go pool.supervisor() - - return pool -} - -// Submit adds a task to the pool for execution -// Returns true if the task was accepted, false if the queue is full -func (p *GoroutinePool) Submit(task Task) bool { - select { - case <-p.shutdown: - return false // Pool is shutting down - case p.taskQueue <- task: - // Task accepted, ensure we have a worker to process it - p.ensureWorkerAvailable() - return true - default: - // Queue is full - return false - } -} - -// SubmitWithBackpressure adds a task to the pool with backpressure handling -// Returns true if task was accepted, false if dropped due to backpressure -func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool { - select { - case <-p.shutdown: - return false // Pool is shutting down - case p.taskQueue <- task: - // Task accepted, ensure we have a worker to process it - p.ensureWorkerAvailable() - return true - default: - // Queue is full - apply backpressure - // Check if we're in a high-load situation - queueLen := len(p.taskQueue) - queueCap := cap(p.taskQueue) - workerCount := atomic.LoadInt64(&p.workerCount) - - // If queue is >90% full and we're at max workers, drop the task - if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) { - p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure") - return false - } - - // Try one more time with a short timeout - select { - case p.taskQueue <- task: - p.ensureWorkerAvailable() - return true - case <-time.After(1 * time.Millisecond): - // Still can't submit after timeout - drop task - p.logger.Debug().Msg("Task dropped after backpressure timeout") - return false - } - } -} - -// ensureWorkerAvailable makes sure at least one worker is available to process tasks -func (p *GoroutinePool) ensureWorkerAvailable() { - // Check if we already have enough workers - currentWorkers := atomic.LoadInt64(&p.workerCount) - - // Only start new workers if: - // 1. We have no workers at all, or - // 2. The queue is growing and we're below max workers - queueLen := len(p.taskQueue) - if currentWorkers == 0 || (queueLen > int(currentWorkers) && currentWorkers < int64(p.maxWorkers)) { - // Try to acquire a semaphore slot without blocking - select { - case p.workerSem <- struct{}{}: - // We got a slot, start a new worker - p.startWorker() - default: - // All worker slots are taken, which means we have enough workers - } - } -} - -// startWorker launches a new worker goroutine -func (p *GoroutinePool) startWorker() { - p.wg.Add(1) - atomic.AddInt64(&p.workerCount, 1) - - go func() { - defer func() { - atomic.AddInt64(&p.workerCount, -1) - <-p.workerSem // Release the semaphore slot - p.wg.Done() - - // Recover from panics in worker tasks - if r := recover(); r != nil { - p.logger.Error().Interface("panic", r).Msg("Worker recovered from panic") - } - }() - - idleTimer := time.NewTimer(p.maxIdleTime) - defer idleTimer.Stop() - - for { - select { - case <-p.shutdown: - return - case task, ok := <-p.taskQueue: - if !ok { - return // Channel closed - } - - // Reset idle timer - if !idleTimer.Stop() { - <-idleTimer.C - } - idleTimer.Reset(p.maxIdleTime) - - // Execute the task with panic recovery - func() { - defer func() { - if r := recover(); r != nil { - p.logger.Error().Interface("panic", r).Msg("Task execution panic recovered") - } - }() - task() - }() - - atomic.AddInt64(&p.taskCount, 1) - case <-idleTimer.C: - // Worker has been idle for too long - // Keep at least 2 workers alive to handle incoming tasks without creating new goroutines - if atomic.LoadInt64(&p.workerCount) > 2 { - return - } - // For persistent workers (the minimum 2), use a longer idle timeout - // This prevents excessive worker creation/destruction cycles - idleTimer.Reset(p.maxIdleTime * 3) // Triple the idle time for persistent workers - } - } - }() -} - -// supervisor monitors the pool and logs statistics periodically -func (p *GoroutinePool) supervisor() { - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - for { - select { - case <-p.shutdown: - return - case <-ticker.C: - workers := atomic.LoadInt64(&p.workerCount) - tasks := atomic.LoadInt64(&p.taskCount) - queueLen := len(p.taskQueue) - - p.logger.Debug(). - Int64("workers", workers). - Int64("tasks_processed", tasks). - Int("queue_length", queueLen). - Msg("Pool statistics") - } - } -} - -// Shutdown gracefully shuts down the pool -// If wait is true, it will wait for all tasks to complete -// If wait is false, it will terminate immediately, potentially leaving tasks unprocessed -func (p *GoroutinePool) Shutdown(wait bool) { - p.shutdownOnce.Do(func() { - close(p.shutdown) - - if wait { - // Wait for all tasks to be processed - if len(p.taskQueue) > 0 { - p.logger.Debug().Int("remaining_tasks", len(p.taskQueue)).Msg("Waiting for tasks to complete") - } - - // Close the task queue to signal no more tasks - close(p.taskQueue) - - // Wait for all workers to finish - p.wg.Wait() - } - }) -} - -// GetStats returns statistics about the pool -func (p *GoroutinePool) GetStats() map[string]interface{} { - return map[string]interface{}{ - "name": p.name, - "worker_count": atomic.LoadInt64(&p.workerCount), - "max_workers": p.maxWorkers, - "tasks_processed": atomic.LoadInt64(&p.taskCount), - "queue_length": len(p.taskQueue), - "queue_capacity": cap(p.taskQueue), - } -} - -// Global pools for different audio processing tasks -var ( - globalAudioProcessorPool atomic.Pointer[GoroutinePool] - globalAudioReaderPool atomic.Pointer[GoroutinePool] - globalAudioProcessorInitOnce sync.Once - globalAudioReaderInitOnce sync.Once -) - -// GetAudioProcessorPool returns the global audio processor pool -func GetAudioProcessorPool() *GoroutinePool { - pool := globalAudioProcessorPool.Load() - if pool != nil { - return pool - } - - globalAudioProcessorInitOnce.Do(func() { - config := Config - newPool := NewGoroutinePool( - "audio-processor", - config.MaxAudioProcessorWorkers, - config.AudioProcessorQueueSize, - config.WorkerMaxIdleTime, - ) - globalAudioProcessorPool.Store(newPool) - pool = newPool - }) - - return globalAudioProcessorPool.Load() -} - -// GetAudioReaderPool returns the global audio reader pool -func GetAudioReaderPool() *GoroutinePool { - pool := globalAudioReaderPool.Load() - if pool != nil { - return pool - } - - globalAudioReaderInitOnce.Do(func() { - config := Config - newPool := NewGoroutinePool( - "audio-reader", - config.MaxAudioReaderWorkers, - config.AudioReaderQueueSize, - config.WorkerMaxIdleTime, - ) - globalAudioReaderPool.Store(newPool) - pool = newPool - }) - - return globalAudioReaderPool.Load() -} - -// SubmitAudioProcessorTask submits a task to the audio processor pool -func SubmitAudioProcessorTask(task Task) bool { - return GetAudioProcessorPool().Submit(task) -} - -// SubmitAudioReaderTask submits a task to the audio reader pool -func SubmitAudioReaderTask(task Task) bool { - return GetAudioReaderPool().Submit(task) -} - -// SubmitAudioProcessorTaskWithBackpressure submits a task with backpressure handling -func SubmitAudioProcessorTaskWithBackpressure(task Task) bool { - return GetAudioProcessorPool().SubmitWithBackpressure(task) -} - -// SubmitAudioReaderTaskWithBackpressure submits a task with backpressure handling -func SubmitAudioReaderTaskWithBackpressure(task Task) bool { - return GetAudioReaderPool().SubmitWithBackpressure(task) -} - -// ShutdownAudioPools shuts down all audio goroutine pools -func ShutdownAudioPools(wait bool) { - logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger() - - processorPool := globalAudioProcessorPool.Load() - if processorPool != nil { - logger.Info().Msg("Shutting down audio processor pool") - processorPool.Shutdown(wait) - } - - readerPool := globalAudioReaderPool.Load() - if readerPool != nil { - logger.Info().Msg("Shutting down audio reader pool") - readerPool.Shutdown(wait) - } -} diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 0a27940c..ec69d21c 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -256,11 +256,8 @@ func (ais *AudioInputServer) Start() error { ais.startProcessorGoroutine() ais.startMonitorGoroutine() - // Submit the connection acceptor to the audio reader pool - if !SubmitAudioReaderTask(ais.acceptConnections) { - // If the pool is full or shutting down, fall back to direct goroutine creation - go ais.acceptConnections() - } + // Submit the connection acceptor directly + go ais.acceptConnections() return nil } @@ -335,10 +332,8 @@ func (ais *AudioInputServer) acceptConnections() { ais.mtx.Unlock() // Handle this connection using the goroutine pool - if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) { - // If the pool is full or shutting down, fall back to direct goroutine creation - go ais.handleConnection(conn) - } + // Handle the connection directly + go ais.handleConnection(conn) } } @@ -981,17 +976,8 @@ func (ais *AudioInputServer) startReaderGoroutine() { } } - // Submit the reader task to the audio reader pool with backpressure - logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if !SubmitAudioReaderTaskWithBackpressure(readerTask) { - // Task was dropped due to backpressure - this is expected under high load - // Log at debug level to avoid spam, but track the drop - logger.Debug().Msg("Audio reader task dropped due to backpressure") - - // Don't fall back to unlimited goroutine creation - // Instead, let the system recover naturally - ais.wg.Done() // Decrement the wait group since we're not starting the task - } + // Handle the reader task directly + go readerTask() } // startProcessorGoroutine starts the message processor using the goroutine pool @@ -1073,17 +1059,8 @@ func (ais *AudioInputServer) startProcessorGoroutine() { } } - // Submit the processor task to the audio processor pool with backpressure - logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if !SubmitAudioProcessorTaskWithBackpressure(processorTask) { - // Task was dropped due to backpressure - this is expected under high load - // Log at debug level to avoid spam, but track the drop - logger.Debug().Msg("Audio processor task dropped due to backpressure") - - // Don't fall back to unlimited goroutine creation - // Instead, let the system recover naturally - ais.wg.Done() // Decrement the wait group since we're not starting the task - } + // Submit the processor task directly + go processorTask() } // processMessageWithRecovery processes a message with enhanced error recovery @@ -1206,17 +1183,8 @@ func (ais *AudioInputServer) startMonitorGoroutine() { } } - // Submit the monitor task to the audio processor pool with backpressure - logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if !SubmitAudioProcessorTaskWithBackpressure(monitorTask) { - // Task was dropped due to backpressure - this is expected under high load - // Log at debug level to avoid spam, but track the drop - logger.Debug().Msg("Audio monitor task dropped due to backpressure") - - // Don't fall back to unlimited goroutine creation - // Instead, let the system recover naturally - ais.wg.Done() // Decrement the wait group since we're not starting the task - } + // Submit the monitor task directly + go monitorTask() } // GetServerStats returns server performance statistics diff --git a/internal/audio/sized_buffer_pool.go b/internal/audio/sized_buffer_pool.go index 2abdca7c..62f00179 100644 --- a/internal/audio/sized_buffer_pool.go +++ b/internal/audio/sized_buffer_pool.go @@ -2,216 +2,52 @@ package audio import ( "sync" - "sync/atomic" ) -// SizedBufferPool manages a pool of buffers with size tracking -type SizedBufferPool struct { - // The underlying sync.Pool +// SimpleBufferPool manages a pool of fixed-size buffers +// Analysis shows 99% of requests are for maxPCMBufferSize, so we simplify to fixed-size +type SimpleBufferPool struct { pool sync.Pool - - // Statistics for monitoring - totalBuffers atomic.Int64 - totalBytes atomic.Int64 - gets atomic.Int64 - puts atomic.Int64 - misses atomic.Int64 - - // Configuration - maxBufferSize int - defaultSize int } -// NewSizedBufferPool creates a new sized buffer pool -func NewSizedBufferPool(defaultSize, maxBufferSize int) *SizedBufferPool { - pool := &SizedBufferPool{ - maxBufferSize: maxBufferSize, - defaultSize: defaultSize, - } - - pool.pool = sync.Pool{ - New: func() interface{} { - // Track pool misses - pool.misses.Add(1) - - // Create new buffer with default size - buf := make([]byte, defaultSize) - - // Return pointer-like to avoid allocations - slice := buf[:0] - ptrSlice := &slice - - // Track statistics - pool.totalBuffers.Add(1) - pool.totalBytes.Add(int64(cap(buf))) - - return ptrSlice +// NewSimpleBufferPool creates a new simple buffer pool for fixed-size buffers +func NewSimpleBufferPool(bufferSize int) *SimpleBufferPool { + return &SimpleBufferPool{ + pool: sync.Pool{ + New: func() interface{} { + buf := make([]byte, 0, bufferSize) + return &buf + }, }, } - - return pool } -// Get returns a buffer from the pool with at least the specified capacity -func (p *SizedBufferPool) Get(minCapacity int) []byte { - // Track gets - p.gets.Add(1) - - // Get buffer from pool - handle pointer-like storage - var buf []byte +// Get returns a buffer from the pool +func (p *SimpleBufferPool) Get() []byte { poolObj := p.pool.Get() switch v := poolObj.(type) { case *[]byte: - // Handle pointer-like storage from Put method if v != nil { - buf = (*v)[:0] // Get the underlying slice - } else { - buf = make([]byte, 0, p.defaultSize) + buf := *v + return buf[:0] // Reset length but keep capacity } case []byte: - // Handle direct slice for backward compatibility - buf = v - default: - // Fallback for unexpected types - buf = make([]byte, 0, p.defaultSize) - p.misses.Add(1) + return v[:0] // Handle direct slice for backward compatibility } - - // Check if buffer has sufficient capacity - if cap(buf) < minCapacity { - // Track statistics for the old buffer - p.totalBytes.Add(-int64(cap(buf))) - - // Allocate new buffer with required capacity - buf = make([]byte, minCapacity) - - // Track statistics for the new buffer - p.totalBytes.Add(int64(cap(buf))) - } else { - // Resize existing buffer - buf = buf[:minCapacity] - } - - return buf + // Fallback for unexpected types or nil + return make([]byte, 0) // Will be resized by caller if needed } // Put returns a buffer to the pool -func (p *SizedBufferPool) Put(buf []byte) { - // Track statistics - p.puts.Add(1) - - // Don't pool excessively large buffers to prevent memory bloat - if cap(buf) > p.maxBufferSize { - // Track statistics - p.totalBuffers.Add(-1) - p.totalBytes.Add(-int64(cap(buf))) +func (p *SimpleBufferPool) Put(buf []byte) { + if buf == nil { return } - - // Clear buffer contents for security - for i := range buf { - buf[i] = 0 - } - - // Return to pool - use pointer-like approach to avoid allocations - slice := buf[:0] - p.pool.Put(&slice) + // Clear and reset the buffer + buf = buf[:0] + // Use pointer to avoid allocations as recommended by staticcheck + p.pool.Put(&buf) } -// GetStats returns statistics about the buffer pool -func (p *SizedBufferPool) GetStats() (buffers, bytes, gets, puts, misses int64) { - buffers = p.totalBuffers.Load() - bytes = p.totalBytes.Load() - gets = p.gets.Load() - puts = p.puts.Load() - misses = p.misses.Load() - return -} - -// BufferPoolStats contains statistics about a buffer pool -type BufferPoolStats struct { - TotalBuffers int64 - TotalBytes int64 - Gets int64 - Puts int64 - Misses int64 - HitRate float64 - AverageBufferSize float64 -} - -// GetDetailedStats returns detailed statistics about the buffer pool -func (p *SizedBufferPool) GetDetailedStats() BufferPoolStats { - buffers := p.totalBuffers.Load() - bytes := p.totalBytes.Load() - gets := p.gets.Load() - puts := p.puts.Load() - misses := p.misses.Load() - - // Calculate hit rate - hitRate := 0.0 - if gets > 0 { - hitRate = float64(gets-misses) / float64(gets) * 100.0 - } - - // Calculate average buffer size - avgSize := 0.0 - if buffers > 0 { - avgSize = float64(bytes) / float64(buffers) - } - - return BufferPoolStats{ - TotalBuffers: buffers, - TotalBytes: bytes, - Gets: gets, - Puts: puts, - Misses: misses, - HitRate: hitRate, - AverageBufferSize: avgSize, - } -} - -// Global audio buffer pools with different size classes -var ( - // Small buffers (up to 4KB) - smallBufferPool = NewSizedBufferPool(1024, 4*1024) - - // Medium buffers (4KB to 64KB) - mediumBufferPool = NewSizedBufferPool(8*1024, 64*1024) - - // Large buffers (64KB to 1MB) - largeBufferPool = NewSizedBufferPool(64*1024, 1024*1024) -) - -// GetOptimalBuffer returns a buffer from the most appropriate pool based on size -func GetOptimalBuffer(size int) []byte { - switch { - case size <= 4*1024: - return smallBufferPool.Get(size) - case size <= 64*1024: - return mediumBufferPool.Get(size) - default: - return largeBufferPool.Get(size) - } -} - -// ReturnOptimalBuffer returns a buffer to the appropriate pool based on size -func ReturnOptimalBuffer(buf []byte) { - size := cap(buf) - switch { - case size <= 4*1024: - smallBufferPool.Put(buf) - case size <= 64*1024: - mediumBufferPool.Put(buf) - default: - largeBufferPool.Put(buf) - } -} - -// GetAllPoolStats returns statistics for all buffer pools -func GetAllPoolStats() map[string]BufferPoolStats { - return map[string]BufferPoolStats{ - "small": smallBufferPool.GetDetailedStats(), - "medium": mediumBufferPool.GetDetailedStats(), - "large": largeBufferPool.GetDetailedStats(), - } -} +// Global simple buffer pool - sized for maxPCMBufferSize since that's 99% of usage +var GlobalBufferPool *SimpleBufferPool diff --git a/internal/audio/socket_buffer.go b/internal/audio/socket_buffer.go index a6f7e48d..15c861f7 100644 --- a/internal/audio/socket_buffer.go +++ b/internal/audio/socket_buffer.go @@ -156,23 +156,12 @@ func RecordSocketBufferMetrics(conn net.Conn, component string) { } // Get current socket buffer sizes - sendSize, recvSize, err := GetSocketBufferSizes(conn) + _, _, err := GetSocketBufferSizes(conn) if err != nil { // Log error but don't fail return } - // Record buffer sizes - socketBufferSizeGauge.WithLabelValues(component, "send").Set(float64(sendSize)) - socketBufferSizeGauge.WithLabelValues(component, "receive").Set(float64(recvSize)) -} - -// RecordSocketBufferOverflow records a socket buffer overflow event -func RecordSocketBufferOverflow(component, bufferType string) { - socketBufferOverflowCounter.WithLabelValues(component, bufferType).Inc() -} - -// UpdateSocketBufferUtilization updates socket buffer utilization metrics -func UpdateSocketBufferUtilization(component, bufferType string, utilizationPercent float64) { - socketBufferUtilizationGauge.WithLabelValues(component, bufferType).Set(utilizationPercent) + // Socket buffer sizes recorded for debugging if needed + // Removed detailed metrics as they weren't being used } diff --git a/main.go b/main.go index 0a7516ec..3e380e5a 100644 --- a/main.go +++ b/main.go @@ -35,11 +35,6 @@ func startAudioSubprocess() error { // Initialize validation cache for optimal performance audio.InitValidationCache() - // Enable batch audio processing to reduce CGO call overhead - if err := audio.EnableBatchAudioProcessing(); err != nil { - logger.Warn().Err(err).Msg("failed to enable batch audio processing") - } - // Create audio server supervisor audioSupervisor = audio.NewAudioOutputSupervisor() @@ -108,9 +103,6 @@ func startAudioSubprocess() error { // Stop audio relay when process exits audio.StopAudioRelay() - - // Disable batch audio processing - audio.DisableBatchAudioProcessing() }, // onRestart func(attempt int, delay time.Duration) {