From df58e04846351833e5cc67e040e96ec80781d153 Mon Sep 17 00:00:00 2001 From: Alex P Date: Mon, 8 Sep 2025 09:08:07 +0000 Subject: [PATCH] feat(audio): implement zero-copy batch processing with reference counting Add batch reference counting and zero-copy frame management for optimized audio processing. Includes: - BatchReferenceManager for efficient reference counting - ZeroCopyFrameSlice utilities for frame management - BatchZeroCopyProcessor for high-performance batch operations - Adaptive optimization interval based on stability metrics - Improved memory management with zero-copy frames --- internal/audio/batch_reference.go | 331 +++++++++++++++ internal/audio/batch_zero_copy.go | 415 +++++++++++++++++++ internal/audio/cgo_audio.go | 82 +++- internal/audio/goroutine_pool.go | 4 +- internal/audio/ipc_input.go | 22 +- internal/audio/monitor_adaptive_optimizer.go | 157 ++++++- internal/audio/util_buffer_pool.go | 16 +- 7 files changed, 976 insertions(+), 51 deletions(-) create mode 100644 internal/audio/batch_reference.go create mode 100644 internal/audio/batch_zero_copy.go diff --git a/internal/audio/batch_reference.go b/internal/audio/batch_reference.go new file mode 100644 index 00000000..ecfa8d3a --- /dev/null +++ b/internal/audio/batch_reference.go @@ -0,0 +1,331 @@ +//go:build cgo + +package audio + +import ( + "errors" + "sync" + "sync/atomic" + "unsafe" +) + +// BatchReferenceManager handles batch reference counting operations +// to reduce atomic operation overhead for high-frequency frame operations +type BatchReferenceManager struct { + // Batch operations queue + batchQueue chan batchRefOperation + workerPool chan struct{} // Worker pool semaphore + running int32 + wg sync.WaitGroup + + // Statistics + batchedOps int64 + singleOps int64 + batchSavings int64 // Number of atomic operations saved +} + +type batchRefOperation struct { + frames []*ZeroCopyAudioFrame + operation refOperationType + resultCh chan batchRefResult +} + +type refOperationType int + +const ( + refOpAddRef refOperationType = iota + refOpRelease + refOpMixed // For operations with mixed AddRef/Release +) + +// Errors +var ( + ErrUnsupportedOperation = errors.New("unsupported batch reference operation") +) + +type batchRefResult struct { + finalReleases []bool // For Release operations, indicates which frames had final release + err error +} + +// Global batch reference manager +var ( + globalBatchRefManager *BatchReferenceManager + batchRefOnce sync.Once +) + +// GetBatchReferenceManager returns the global batch reference manager +func GetBatchReferenceManager() *BatchReferenceManager { + batchRefOnce.Do(func() { + globalBatchRefManager = NewBatchReferenceManager() + globalBatchRefManager.Start() + }) + return globalBatchRefManager +} + +// NewBatchReferenceManager creates a new batch reference manager +func NewBatchReferenceManager() *BatchReferenceManager { + return &BatchReferenceManager{ + batchQueue: make(chan batchRefOperation, 256), // Buffered for high throughput + workerPool: make(chan struct{}, 4), // 4 workers for parallel processing + } +} + +// Start starts the batch reference manager workers +func (brm *BatchReferenceManager) Start() { + if !atomic.CompareAndSwapInt32(&brm.running, 0, 1) { + return // Already running + } + + // Start worker goroutines + for i := 0; i < cap(brm.workerPool); i++ { + brm.wg.Add(1) + go brm.worker() + } +} + +// Stop stops the batch reference manager +func (brm *BatchReferenceManager) Stop() { + if !atomic.CompareAndSwapInt32(&brm.running, 1, 0) { + return // Already stopped + } + + close(brm.batchQueue) + brm.wg.Wait() +} + +// worker processes batch reference operations +func (brm *BatchReferenceManager) worker() { + defer brm.wg.Done() + + for op := range brm.batchQueue { + brm.processBatchOperation(op) + } +} + +// processBatchOperation processes a batch of reference operations +func (brm *BatchReferenceManager) processBatchOperation(op batchRefOperation) { + result := batchRefResult{} + + switch op.operation { + case refOpAddRef: + // Batch AddRef operations + for _, frame := range op.frames { + if frame != nil { + atomic.AddInt32(&frame.refCount, 1) + } + } + atomic.AddInt64(&brm.batchedOps, int64(len(op.frames))) + atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1)) // Saved ops vs individual calls + + case refOpRelease: + // Batch Release operations + result.finalReleases = make([]bool, len(op.frames)) + for i, frame := range op.frames { + if frame != nil { + newCount := atomic.AddInt32(&frame.refCount, -1) + if newCount == 0 { + result.finalReleases[i] = true + // Return to pool if pooled + if frame.pooled { + globalZeroCopyPool.Put(frame) + } + } + } + } + atomic.AddInt64(&brm.batchedOps, int64(len(op.frames))) + atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1)) + + case refOpMixed: + // Handle mixed operations (not implemented in this version) + result.err = ErrUnsupportedOperation + } + + // Send result back + if op.resultCh != nil { + op.resultCh <- result + close(op.resultCh) + } +} + +// BatchAddRef performs AddRef on multiple frames in a single batch +func (brm *BatchReferenceManager) BatchAddRef(frames []*ZeroCopyAudioFrame) error { + if len(frames) == 0 { + return nil + } + + // For small batches, use direct operations to avoid overhead + if len(frames) <= 2 { + for _, frame := range frames { + if frame != nil { + frame.AddRef() + } + } + atomic.AddInt64(&brm.singleOps, int64(len(frames))) + return nil + } + + // Use batch processing for larger sets + if atomic.LoadInt32(&brm.running) == 0 { + // Fallback to individual operations if batch manager not running + for _, frame := range frames { + if frame != nil { + frame.AddRef() + } + } + atomic.AddInt64(&brm.singleOps, int64(len(frames))) + return nil + } + + resultCh := make(chan batchRefResult, 1) + op := batchRefOperation{ + frames: frames, + operation: refOpAddRef, + resultCh: resultCh, + } + + select { + case brm.batchQueue <- op: + // Wait for completion + <-resultCh + return nil + default: + // Queue full, fallback to individual operations + for _, frame := range frames { + if frame != nil { + frame.AddRef() + } + } + atomic.AddInt64(&brm.singleOps, int64(len(frames))) + return nil + } +} + +// BatchRelease performs Release on multiple frames in a single batch +// Returns a slice indicating which frames had their final reference released +func (brm *BatchReferenceManager) BatchRelease(frames []*ZeroCopyAudioFrame) ([]bool, error) { + if len(frames) == 0 { + return nil, nil + } + + // For small batches, use direct operations + if len(frames) <= 2 { + finalReleases := make([]bool, len(frames)) + for i, frame := range frames { + if frame != nil { + finalReleases[i] = frame.Release() + } + } + atomic.AddInt64(&brm.singleOps, int64(len(frames))) + return finalReleases, nil + } + + // Use batch processing for larger sets + if atomic.LoadInt32(&brm.running) == 0 { + // Fallback to individual operations + finalReleases := make([]bool, len(frames)) + for i, frame := range frames { + if frame != nil { + finalReleases[i] = frame.Release() + } + } + atomic.AddInt64(&brm.singleOps, int64(len(frames))) + return finalReleases, nil + } + + resultCh := make(chan batchRefResult, 1) + op := batchRefOperation{ + frames: frames, + operation: refOpRelease, + resultCh: resultCh, + } + + select { + case brm.batchQueue <- op: + // Wait for completion + result := <-resultCh + return result.finalReleases, result.err + default: + // Queue full, fallback to individual operations + finalReleases := make([]bool, len(frames)) + for i, frame := range frames { + if frame != nil { + finalReleases[i] = frame.Release() + } + } + atomic.AddInt64(&brm.singleOps, int64(len(frames))) + return finalReleases, nil + } +} + +// GetStats returns batch reference counting statistics +func (brm *BatchReferenceManager) GetStats() (batchedOps, singleOps, savings int64) { + return atomic.LoadInt64(&brm.batchedOps), + atomic.LoadInt64(&brm.singleOps), + atomic.LoadInt64(&brm.batchSavings) +} + +// Convenience functions for global batch reference manager + +// BatchAddRefFrames performs batch AddRef on multiple frames +func BatchAddRefFrames(frames []*ZeroCopyAudioFrame) error { + return GetBatchReferenceManager().BatchAddRef(frames) +} + +// BatchReleaseFrames performs batch Release on multiple frames +func BatchReleaseFrames(frames []*ZeroCopyAudioFrame) ([]bool, error) { + return GetBatchReferenceManager().BatchRelease(frames) +} + +// GetBatchReferenceStats returns global batch reference statistics +func GetBatchReferenceStats() (batchedOps, singleOps, savings int64) { + return GetBatchReferenceManager().GetStats() +} + +// ZeroCopyFrameSlice provides utilities for working with slices of zero-copy frames +type ZeroCopyFrameSlice []*ZeroCopyAudioFrame + +// AddRefAll performs batch AddRef on all frames in the slice +func (zfs ZeroCopyFrameSlice) AddRefAll() error { + return BatchAddRefFrames(zfs) +} + +// ReleaseAll performs batch Release on all frames in the slice +func (zfs ZeroCopyFrameSlice) ReleaseAll() ([]bool, error) { + return BatchReleaseFrames(zfs) +} + +// FilterNonNil returns a new slice with only non-nil frames +func (zfs ZeroCopyFrameSlice) FilterNonNil() ZeroCopyFrameSlice { + filtered := make(ZeroCopyFrameSlice, 0, len(zfs)) + for _, frame := range zfs { + if frame != nil { + filtered = append(filtered, frame) + } + } + return filtered +} + +// Len returns the number of frames in the slice +func (zfs ZeroCopyFrameSlice) Len() int { + return len(zfs) +} + +// Get returns the frame at the specified index +func (zfs ZeroCopyFrameSlice) Get(index int) *ZeroCopyAudioFrame { + if index < 0 || index >= len(zfs) { + return nil + } + return zfs[index] +} + +// UnsafePointers returns unsafe pointers for all frames (for CGO batch operations) +func (zfs ZeroCopyFrameSlice) UnsafePointers() []unsafe.Pointer { + pointers := make([]unsafe.Pointer, len(zfs)) + for i, frame := range zfs { + if frame != nil { + pointers[i] = frame.UnsafePointer() + } + } + return pointers +} diff --git a/internal/audio/batch_zero_copy.go b/internal/audio/batch_zero_copy.go new file mode 100644 index 00000000..4ba9959a --- /dev/null +++ b/internal/audio/batch_zero_copy.go @@ -0,0 +1,415 @@ +//go:build cgo + +package audio + +import ( + "sync" + "sync/atomic" + "time" +) + +// BatchZeroCopyProcessor handles batch operations on zero-copy audio frames +// with optimized reference counting and memory management +type BatchZeroCopyProcessor struct { + // Configuration + maxBatchSize int + batchTimeout time.Duration + processingDelay time.Duration + adaptiveThreshold float64 + + // Processing queues + readEncodeQueue chan *batchZeroCopyRequest + decodeWriteQueue chan *batchZeroCopyRequest + + // Worker management + workerPool chan struct{} + running int32 + wg sync.WaitGroup + + // Statistics + batchedFrames int64 + singleFrames int64 + batchSavings int64 + processingTimeUs int64 + adaptiveHits int64 + adaptiveMisses int64 +} + +type batchZeroCopyRequest struct { + frames []*ZeroCopyAudioFrame + operation batchZeroCopyOperation + resultCh chan batchZeroCopyResult + timestamp time.Time +} + +type batchZeroCopyOperation int + +const ( + batchOpReadEncode batchZeroCopyOperation = iota + batchOpDecodeWrite + batchOpMixed +) + +type batchZeroCopyResult struct { + encodedData [][]byte // For read-encode operations + processedCount int // Number of successfully processed frames + err error +} + +// Global batch zero-copy processor +var ( + globalBatchZeroCopyProcessor *BatchZeroCopyProcessor + batchZeroCopyOnce sync.Once +) + +// GetBatchZeroCopyProcessor returns the global batch zero-copy processor +func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor { + batchZeroCopyOnce.Do(func() { + globalBatchZeroCopyProcessor = NewBatchZeroCopyProcessor() + globalBatchZeroCopyProcessor.Start() + }) + return globalBatchZeroCopyProcessor +} + +// NewBatchZeroCopyProcessor creates a new batch zero-copy processor +func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor { + cache := GetCachedConfig() + return &BatchZeroCopyProcessor{ + maxBatchSize: cache.BatchProcessorFramesPerBatch, + batchTimeout: cache.BatchProcessorTimeout, + processingDelay: cache.BatchProcessingDelay, + adaptiveThreshold: cache.BatchProcessorAdaptiveThreshold, + readEncodeQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize), + decodeWriteQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize), + workerPool: make(chan struct{}, 4), // 4 workers for parallel processing + } +} + +// Start starts the batch zero-copy processor workers +func (bzcp *BatchZeroCopyProcessor) Start() { + if !atomic.CompareAndSwapInt32(&bzcp.running, 0, 1) { + return // Already running + } + + // Start worker goroutines for read-encode operations + for i := 0; i < cap(bzcp.workerPool)/2; i++ { + bzcp.wg.Add(1) + go bzcp.readEncodeWorker() + } + + // Start worker goroutines for decode-write operations + for i := 0; i < cap(bzcp.workerPool)/2; i++ { + bzcp.wg.Add(1) + go bzcp.decodeWriteWorker() + } +} + +// Stop stops the batch zero-copy processor +func (bzcp *BatchZeroCopyProcessor) Stop() { + if !atomic.CompareAndSwapInt32(&bzcp.running, 1, 0) { + return // Already stopped + } + + close(bzcp.readEncodeQueue) + close(bzcp.decodeWriteQueue) + bzcp.wg.Wait() +} + +// readEncodeWorker processes batch read-encode operations +func (bzcp *BatchZeroCopyProcessor) readEncodeWorker() { + defer bzcp.wg.Done() + + for req := range bzcp.readEncodeQueue { + bzcp.processBatchReadEncode(req) + } +} + +// decodeWriteWorker processes batch decode-write operations +func (bzcp *BatchZeroCopyProcessor) decodeWriteWorker() { + defer bzcp.wg.Done() + + for req := range bzcp.decodeWriteQueue { + bzcp.processBatchDecodeWrite(req) + } +} + +// processBatchReadEncode processes a batch of read-encode operations +func (bzcp *BatchZeroCopyProcessor) processBatchReadEncode(req *batchZeroCopyRequest) { + startTime := time.Now() + result := batchZeroCopyResult{} + + // Batch AddRef all frames first + err := BatchAddRefFrames(req.frames) + if err != nil { + result.err = err + if req.resultCh != nil { + req.resultCh <- result + close(req.resultCh) + } + return + } + + // Process frames using existing batch read-encode logic + encodedData, err := BatchReadEncode(len(req.frames)) + if err != nil { + // Batch release frames on error + if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil { + // Log release error but preserve original error + _ = releaseErr + } + result.err = err + } else { + result.encodedData = encodedData + result.processedCount = len(encodedData) + // Batch release frames after successful processing + if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil { + // Log release error but don't fail the operation + _ = releaseErr + } + } + + // Update statistics + atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames))) + atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1)) + atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds()) + + // Send result back + if req.resultCh != nil { + req.resultCh <- result + close(req.resultCh) + } +} + +// processBatchDecodeWrite processes a batch of decode-write operations +func (bzcp *BatchZeroCopyProcessor) processBatchDecodeWrite(req *batchZeroCopyRequest) { + startTime := time.Now() + result := batchZeroCopyResult{} + + // Batch AddRef all frames first + err := BatchAddRefFrames(req.frames) + if err != nil { + result.err = err + if req.resultCh != nil { + req.resultCh <- result + close(req.resultCh) + } + return + } + + // Extract data from zero-copy frames for batch processing + frameData := make([][]byte, len(req.frames)) + for i, frame := range req.frames { + if frame != nil { + // Get data from zero-copy frame + frameData[i] = frame.Data()[:frame.Length()] + } + } + + // Process frames using existing batch decode-write logic + err = BatchDecodeWrite(frameData) + if err != nil { + result.err = err + } else { + result.processedCount = len(req.frames) + } + + // Batch release frames + if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil { + // Log release error but don't override processing error + _ = releaseErr + } + + // Update statistics + atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames))) + atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1)) + atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds()) + + // Send result back + if req.resultCh != nil { + req.resultCh <- result + close(req.resultCh) + } +} + +// BatchReadEncodeZeroCopy performs batch read-encode on zero-copy frames +func (bzcp *BatchZeroCopyProcessor) BatchReadEncodeZeroCopy(frames []*ZeroCopyAudioFrame) ([][]byte, error) { + if len(frames) == 0 { + return nil, nil + } + + // For small batches, use direct operations to avoid overhead + if len(frames) <= 2 { + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleReadEncode(frames) + } + + // Use adaptive threshold to determine batch vs single processing + batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames) + singleFrames := atomic.LoadInt64(&bzcp.singleFrames) + totalFrames := batchedFrames + singleFrames + + if totalFrames > 100 { // Only apply adaptive logic after some samples + batchRatio := float64(batchedFrames) / float64(totalFrames) + if batchRatio < bzcp.adaptiveThreshold { + // Batch processing not effective, use single processing + atomic.AddInt64(&bzcp.adaptiveMisses, 1) + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleReadEncode(frames) + } + atomic.AddInt64(&bzcp.adaptiveHits, 1) + } + + // Use batch processing + if atomic.LoadInt32(&bzcp.running) == 0 { + // Fallback to single processing if batch processor not running + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleReadEncode(frames) + } + + resultCh := make(chan batchZeroCopyResult, 1) + req := &batchZeroCopyRequest{ + frames: frames, + operation: batchOpReadEncode, + resultCh: resultCh, + timestamp: time.Now(), + } + + select { + case bzcp.readEncodeQueue <- req: + // Wait for completion + result := <-resultCh + return result.encodedData, result.err + default: + // Queue full, fallback to single processing + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleReadEncode(frames) + } +} + +// BatchDecodeWriteZeroCopy performs batch decode-write on zero-copy frames +func (bzcp *BatchZeroCopyProcessor) BatchDecodeWriteZeroCopy(frames []*ZeroCopyAudioFrame) error { + if len(frames) == 0 { + return nil + } + + // For small batches, use direct operations + if len(frames) <= 2 { + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleDecodeWrite(frames) + } + + // Use adaptive threshold + batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames) + singleFrames := atomic.LoadInt64(&bzcp.singleFrames) + totalFrames := batchedFrames + singleFrames + + if totalFrames > 100 { + batchRatio := float64(batchedFrames) / float64(totalFrames) + if batchRatio < bzcp.adaptiveThreshold { + atomic.AddInt64(&bzcp.adaptiveMisses, 1) + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleDecodeWrite(frames) + } + atomic.AddInt64(&bzcp.adaptiveHits, 1) + } + + // Use batch processing + if atomic.LoadInt32(&bzcp.running) == 0 { + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleDecodeWrite(frames) + } + + resultCh := make(chan batchZeroCopyResult, 1) + req := &batchZeroCopyRequest{ + frames: frames, + operation: batchOpDecodeWrite, + resultCh: resultCh, + timestamp: time.Now(), + } + + select { + case bzcp.decodeWriteQueue <- req: + // Wait for completion + result := <-resultCh + return result.err + default: + // Queue full, fallback to single processing + atomic.AddInt64(&bzcp.singleFrames, int64(len(frames))) + return bzcp.processSingleDecodeWrite(frames) + } +} + +// processSingleReadEncode processes frames individually for read-encode +func (bzcp *BatchZeroCopyProcessor) processSingleReadEncode(frames []*ZeroCopyAudioFrame) ([][]byte, error) { + // Extract data and use existing batch processing + frameData := make([][]byte, 0, len(frames)) + for _, frame := range frames { + if frame != nil { + frame.AddRef() + frameData = append(frameData, frame.Data()[:frame.Length()]) + } + } + + // Use existing batch read-encode + result, err := BatchReadEncode(len(frameData)) + + // Release frames + for _, frame := range frames { + if frame != nil { + frame.Release() + } + } + + return result, err +} + +// processSingleDecodeWrite processes frames individually for decode-write +func (bzcp *BatchZeroCopyProcessor) processSingleDecodeWrite(frames []*ZeroCopyAudioFrame) error { + // Extract data and use existing batch processing + frameData := make([][]byte, 0, len(frames)) + for _, frame := range frames { + if frame != nil { + frame.AddRef() + frameData = append(frameData, frame.Data()[:frame.Length()]) + } + } + + // Use existing batch decode-write + err := BatchDecodeWrite(frameData) + + // Release frames + for _, frame := range frames { + if frame != nil { + frame.Release() + } + } + + return err +} + +// GetBatchZeroCopyStats returns batch zero-copy processing statistics +func (bzcp *BatchZeroCopyProcessor) GetBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) { + return atomic.LoadInt64(&bzcp.batchedFrames), + atomic.LoadInt64(&bzcp.singleFrames), + atomic.LoadInt64(&bzcp.batchSavings), + atomic.LoadInt64(&bzcp.processingTimeUs), + atomic.LoadInt64(&bzcp.adaptiveHits), + atomic.LoadInt64(&bzcp.adaptiveMisses) +} + +// Convenience functions for global batch zero-copy processor + +// BatchReadEncodeZeroCopyFrames performs batch read-encode on zero-copy frames +func BatchReadEncodeZeroCopyFrames(frames []*ZeroCopyAudioFrame) ([][]byte, error) { + return GetBatchZeroCopyProcessor().BatchReadEncodeZeroCopy(frames) +} + +// BatchDecodeWriteZeroCopyFrames performs batch decode-write on zero-copy frames +func BatchDecodeWriteZeroCopyFrames(frames []*ZeroCopyAudioFrame) error { + return GetBatchZeroCopyProcessor().BatchDecodeWriteZeroCopy(frames) +} + +// GetGlobalBatchZeroCopyStats returns global batch zero-copy processing statistics +func GetGlobalBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) { + return GetBatchZeroCopyProcessor().GetBatchZeroCopyStats() +} diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 887c854c..e9a205ee 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -14,12 +14,15 @@ import ( /* #cgo CFLAGS: -I$HOME/.jetkvm/audio-libs/alsa-lib-$ALSA_VERSION/include -I$HOME/.jetkvm/audio-libs/opus-$OPUS_VERSION/include -I$HOME/.jetkvm/audio-libs/opus-$OPUS_VERSION/celt #cgo LDFLAGS: -L$HOME/.jetkvm/audio-libs/alsa-lib-$ALSA_VERSION/src/.libs -lasound -L$HOME/.jetkvm/audio-libs/opus-$OPUS_VERSION/.libs -lopus -lm -ldl -static + #include #include +#include #include #include -#include #include +#include +#include // C state for ALSA/Opus with safety flags static snd_pcm_t *pcm_handle = NULL; @@ -46,6 +49,14 @@ static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMa static int use_mmap_access = 0; // Disable MMAP for compatibility (was 1) static int optimized_buffer_size = 0; // Disable optimized buffer sizing for stability (was 1) +// C function declarations (implementations are below) +int jetkvm_audio_init(); +void jetkvm_audio_close(); +int jetkvm_audio_read_encode(void *opus_buf); +int jetkvm_audio_decode_write(void *opus_buf, int opus_size); +int jetkvm_audio_playback_init(); +void jetkvm_audio_playback_close(); + // Function to update constants from Go configuration void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint, int signal_type, int bandwidth, int dtx, int lsb_depth, int sr, int ch, @@ -1099,6 +1110,7 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) { } // BatchReadEncode reads and encodes multiple audio frames in a single batch +// with optimized zero-copy frame management and batch reference counting func BatchReadEncode(batchSize int) ([][]byte, error) { cache := GetCachedConfig() updateCacheIfNeeded(cache) @@ -1111,18 +1123,26 @@ func BatchReadEncode(batchSize int) ([][]byte, error) { batchBuffer := GetBufferFromPool(totalSize) defer ReturnBufferToPool(batchBuffer) - // Pre-allocate frame result buffers from pool to avoid allocations in loop - frameBuffers := make([][]byte, 0, batchSize) + // Pre-allocate zero-copy frames for batch processing + zeroCopyFrames := make([]*ZeroCopyAudioFrame, 0, batchSize) for i := 0; i < batchSize; i++ { - frameBuffers = append(frameBuffers, GetBufferFromPool(frameSize)) + frame := GetZeroCopyFrame() + zeroCopyFrames = append(zeroCopyFrames, frame) } + // Use batch reference counting for efficient cleanup defer func() { - // Return all frame buffers to pool - for _, buf := range frameBuffers { - ReturnBufferToPool(buf) + if _, err := BatchReleaseFrames(zeroCopyFrames); err != nil { + // Log release error but don't fail the operation + _ = err } }() + // Batch AddRef all frames at once to reduce atomic operation overhead + err := BatchAddRefFrames(zeroCopyFrames) + if err != nil { + return nil, err + } + // Track batch processing statistics - only if enabled var startTime time.Time // Batch time tracking removed @@ -1132,7 +1152,7 @@ func BatchReadEncode(batchSize int) ([][]byte, error) { } batchProcessingCount.Add(1) - // Process frames in batch + // Process frames in batch using zero-copy frames frames := make([][]byte, 0, batchSize) for i := 0; i < batchSize; i++ { // Calculate offset for this frame in the batch buffer @@ -1153,10 +1173,10 @@ func BatchReadEncode(batchSize int) ([][]byte, error) { return nil, err } - // Reuse pre-allocated buffer instead of make([]byte, n) - frameCopy := frameBuffers[i][:n] // Slice to actual size - copy(frameCopy, frameBuf[:n]) - frames = append(frames, frameCopy) + // Use zero-copy frame for efficient memory management + frame := zeroCopyFrames[i] + frame.SetDataDirect(frameBuf[:n]) // Direct assignment without copy + frames = append(frames, frame.Data()) } // Update statistics @@ -1170,12 +1190,39 @@ func BatchReadEncode(batchSize int) ([][]byte, error) { // BatchDecodeWrite decodes and writes multiple audio frames in a single batch // This reduces CGO call overhead by processing multiple frames at once +// with optimized zero-copy frame management and batch reference counting func BatchDecodeWrite(frames [][]byte) error { // Validate input if len(frames) == 0 { return nil } + // Convert to zero-copy frames for optimized processing + zeroCopyFrames := make([]*ZeroCopyAudioFrame, 0, len(frames)) + for _, frameData := range frames { + if len(frameData) > 0 { + frame := GetZeroCopyFrame() + frame.SetDataDirect(frameData) // Direct assignment without copy + zeroCopyFrames = append(zeroCopyFrames, frame) + } + } + + // Use batch reference counting for efficient management + if len(zeroCopyFrames) > 0 { + // Batch AddRef all frames at once + err := BatchAddRefFrames(zeroCopyFrames) + if err != nil { + return err + } + // Ensure cleanup with batch release + defer func() { + if _, err := BatchReleaseFrames(zeroCopyFrames); err != nil { + // Log release error but don't fail the operation + _ = err + } + }() + } + // Get cached config cache := GetCachedConfig() // Only update cache if expired - avoid unnecessary overhead @@ -1204,16 +1251,17 @@ func BatchDecodeWrite(frames [][]byte) error { pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) defer ReturnBufferToPool(pcmBuffer) - // Process each frame + // Process each zero-copy frame with optimized batch processing frameCount := 0 - for _, frame := range frames { - // Skip empty frames - if len(frame) == 0 { + for _, zcFrame := range zeroCopyFrames { + // Get frame data from zero-copy frame + frameData := zcFrame.Data()[:zcFrame.Length()] + if len(frameData) == 0 { continue } // Process this frame using optimized implementation - _, err := CGOAudioDecodeWrite(frame, pcmBuffer) + _, err := CGOAudioDecodeWrite(frameData, pcmBuffer) if err != nil { // Update statistics before returning error batchFrameCount.Add(int64(frameCount)) diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go index aca2069c..cfc844e0 100644 --- a/internal/audio/goroutine_pool.go +++ b/internal/audio/goroutine_pool.go @@ -81,13 +81,13 @@ func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool { queueLen := len(p.taskQueue) queueCap := cap(p.taskQueue) workerCount := atomic.LoadInt64(&p.workerCount) - + // If queue is >90% full and we're at max workers, drop the task if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) { p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure") return false } - + // Try one more time with a short timeout select { case p.taskQueue <- task: diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 12b5c016..b2202905 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -192,8 +192,8 @@ type AudioInputServer struct { wg sync.WaitGroup // Wait group for goroutine coordination // Channel resizing support - channelMutex sync.RWMutex // Protects channel recreation - lastBufferSize int64 // Last known buffer size for change detection + channelMutex sync.RWMutex // Protects channel recreation + lastBufferSize int64 // Last known buffer size for change detection // Socket buffer configuration socketBufferConfig SocketBufferConfig @@ -234,7 +234,7 @@ func NewAudioInputServer() (*AudioInputServer, error) { // Get initial buffer size from adaptive buffer manager adaptiveManager := GetAdaptiveBufferManager() initialBufferSize := int64(adaptiveManager.GetInputBufferSize()) - + // Ensure minimum buffer size to prevent immediate overflow // Use at least 50 frames to handle burst traffic minBufferSize := int64(50) @@ -966,7 +966,7 @@ func (ais *AudioInputServer) startReaderGoroutine() { ais.channelMutex.RLock() messageChan := ais.messageChan ais.channelMutex.RUnlock() - + select { case messageChan <- msg: atomic.AddInt64(&ais.totalFrames, 1) @@ -1111,7 +1111,7 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo ais.channelMutex.RLock() processChan := ais.processChan ais.channelMutex.RUnlock() - + select { case processChan <- msg: return nil @@ -1234,7 +1234,7 @@ func (ais *AudioInputServer) UpdateBufferSize() { adaptiveManager := GetAdaptiveBufferManager() newSize := int64(adaptiveManager.GetInputBufferSize()) oldSize := atomic.LoadInt64(&ais.bufferSize) - + // Only recreate channels if size changed significantly (>25% difference) if oldSize > 0 { diff := float64(newSize-oldSize) / float64(oldSize) @@ -1242,9 +1242,9 @@ func (ais *AudioInputServer) UpdateBufferSize() { return // Size change not significant enough } } - + atomic.StoreInt64(&ais.bufferSize, newSize) - + // Recreate channels with new buffer size if server is running if ais.running { ais.recreateChannels(int(newSize)) @@ -1255,15 +1255,15 @@ func (ais *AudioInputServer) UpdateBufferSize() { func (ais *AudioInputServer) recreateChannels(newSize int) { ais.channelMutex.Lock() defer ais.channelMutex.Unlock() - + // Create new channels with updated buffer size newMessageChan := make(chan *InputIPCMessage, newSize) newProcessChan := make(chan *InputIPCMessage, newSize) - + // Drain old channels and transfer messages to new channels ais.drainAndTransferChannel(ais.messageChan, newMessageChan) ais.drainAndTransferChannel(ais.processChan, newProcessChan) - + // Replace channels atomically ais.messageChan = newMessageChan ais.processChan = newProcessChan diff --git a/internal/audio/monitor_adaptive_optimizer.go b/internal/audio/monitor_adaptive_optimizer.go index ef8cc384..05c4ae5e 100644 --- a/internal/audio/monitor_adaptive_optimizer.go +++ b/internal/audio/monitor_adaptive_optimizer.go @@ -12,9 +12,11 @@ import ( // AdaptiveOptimizer automatically adjusts audio parameters based on latency metrics type AdaptiveOptimizer struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - optimizationCount int64 // Number of optimizations performed (atomic) - lastOptimization int64 // Timestamp of last optimization (atomic) - optimizationLevel int64 // Current optimization level (0-10) (atomic) + optimizationCount int64 // Number of optimizations performed (atomic) + lastOptimization int64 // Timestamp of last optimization (atomic) + optimizationLevel int64 // Current optimization level (0-10) (atomic) + stabilityScore int64 // Current stability score (0-100) (atomic) + optimizationInterval int64 // Current optimization interval in nanoseconds (atomic) latencyMonitor *LatencyMonitor bufferManager *AdaptiveBufferManager @@ -27,6 +29,20 @@ type AdaptiveOptimizer struct { // Configuration config OptimizerConfig + + // Stability tracking + stabilityHistory []StabilityMetric + stabilityMutex sync.RWMutex +} + +// StabilityMetric tracks system stability over time +type StabilityMetric struct { + Timestamp time.Time + LatencyStdev float64 + CPUVariance float64 + MemoryStable bool + ErrorRate float64 + StabilityScore int } // OptimizerConfig holds configuration for the adaptive optimizer @@ -36,6 +52,12 @@ type OptimizerConfig struct { Aggressiveness float64 // How aggressively to optimize (0.0-1.0) RollbackThreshold time.Duration // Latency threshold to rollback optimizations StabilityPeriod time.Duration // Time to wait for stability after optimization + + // Adaptive interval configuration + MinOptimizationInterval time.Duration // Minimum optimization interval (high stability) + MaxOptimizationInterval time.Duration // Maximum optimization interval (low stability) + StabilityThreshold int // Stability score threshold for interval adjustment + StabilityHistorySize int // Number of stability metrics to track } // DefaultOptimizerConfig returns a sensible default configuration @@ -46,6 +68,12 @@ func DefaultOptimizerConfig() OptimizerConfig { Aggressiveness: GetConfig().OptimizerAggressiveness, RollbackThreshold: GetConfig().RollbackThreshold, StabilityPeriod: GetConfig().AdaptiveOptimizerStability, + + // Adaptive interval defaults + MinOptimizationInterval: 100 * time.Millisecond, // High stability: check every 100ms + MaxOptimizationInterval: 2 * time.Second, // Low stability: check every 2s + StabilityThreshold: 70, // Stability score threshold + StabilityHistorySize: 20, // Track last 20 stability metrics } } @@ -54,14 +82,19 @@ func NewAdaptiveOptimizer(latencyMonitor *LatencyMonitor, bufferManager *Adaptiv ctx, cancel := context.WithCancel(context.Background()) optimizer := &AdaptiveOptimizer{ - latencyMonitor: latencyMonitor, - bufferManager: bufferManager, - config: config, - logger: logger.With().Str("component", "adaptive-optimizer").Logger(), - ctx: ctx, - cancel: cancel, + latencyMonitor: latencyMonitor, + bufferManager: bufferManager, + config: config, + logger: logger.With().Str("component", "adaptive-optimizer").Logger(), + ctx: ctx, + cancel: cancel, + stabilityHistory: make([]StabilityMetric, 0, config.StabilityHistorySize), } + // Initialize stability score and optimization interval + atomic.StoreInt64(&optimizer.stabilityScore, 50) // Start with medium stability + atomic.StoreInt64(&optimizer.optimizationInterval, int64(config.MaxOptimizationInterval)) + // Register as latency monitor callback latencyMonitor.AddOptimizationCallback(optimizer.handleLatencyOptimization) @@ -157,7 +190,9 @@ func (ao *AdaptiveOptimizer) decreaseOptimization(targetLevel int) error { func (ao *AdaptiveOptimizer) optimizationLoop() { defer ao.wg.Done() - ticker := time.NewTicker(ao.config.StabilityPeriod) + // Start with initial interval + currentInterval := time.Duration(atomic.LoadInt64(&ao.optimizationInterval)) + ticker := time.NewTicker(currentInterval) defer ticker.Stop() for { @@ -165,7 +200,17 @@ func (ao *AdaptiveOptimizer) optimizationLoop() { case <-ao.ctx.Done(): return case <-ticker.C: + // Update stability metrics and check for optimization needs + ao.updateStabilityMetrics() ao.checkStability() + + // Adjust optimization interval based on current stability + newInterval := ao.calculateOptimizationInterval() + if newInterval != currentInterval { + currentInterval = newInterval + ticker.Reset(currentInterval) + ao.logger.Debug().Dur("new_interval", currentInterval).Int64("stability_score", atomic.LoadInt64(&ao.stabilityScore)).Msg("adjusted optimization interval") + } } } } @@ -186,12 +231,98 @@ func (ao *AdaptiveOptimizer) checkStability() { } } +// updateStabilityMetrics calculates and stores current system stability metrics +func (ao *AdaptiveOptimizer) updateStabilityMetrics() { + metrics := ao.latencyMonitor.GetMetrics() + + // Calculate stability score based on multiple factors + stabilityScore := ao.calculateStabilityScore(metrics) + atomic.StoreInt64(&ao.stabilityScore, int64(stabilityScore)) + + // Store stability metric in history + stabilityMetric := StabilityMetric{ + Timestamp: time.Now(), + LatencyStdev: float64(metrics.Jitter), // Use Jitter as variance indicator + CPUVariance: 0.0, // TODO: Get from system metrics + MemoryStable: true, // TODO: Get from system metrics + ErrorRate: 0.0, // TODO: Get from error tracking + StabilityScore: stabilityScore, + } + + ao.stabilityMutex.Lock() + ao.stabilityHistory = append(ao.stabilityHistory, stabilityMetric) + if len(ao.stabilityHistory) > ao.config.StabilityHistorySize { + ao.stabilityHistory = ao.stabilityHistory[1:] + } + ao.stabilityMutex.Unlock() +} + +// calculateStabilityScore computes a stability score (0-100) based on system metrics +func (ao *AdaptiveOptimizer) calculateStabilityScore(metrics LatencyMetrics) int { + // Base score starts at 100 (perfect stability) + score := 100.0 + + // Penalize high jitter (latency variance) + if metrics.Jitter > 0 && metrics.Average > 0 { + jitterRatio := float64(metrics.Jitter) / float64(metrics.Average) + variancePenalty := jitterRatio * 50 // Scale jitter impact + score -= variancePenalty + } + + // Penalize latency trend volatility + switch metrics.Trend { + case LatencyTrendVolatile: + score -= 20 + case LatencyTrendIncreasing: + score -= 10 + case LatencyTrendDecreasing: + score += 5 // Slight bonus for improving latency + } + + // Ensure score is within bounds + if score < 0 { + score = 0 + } + if score > 100 { + score = 100 + } + + return int(score) +} + +// calculateOptimizationInterval determines the optimization interval based on stability +func (ao *AdaptiveOptimizer) calculateOptimizationInterval() time.Duration { + stabilityScore := atomic.LoadInt64(&ao.stabilityScore) + + // High stability = shorter intervals (more frequent optimization) + // Low stability = longer intervals (less frequent optimization) + if stabilityScore >= int64(ao.config.StabilityThreshold) { + // High stability: use minimum interval + interval := ao.config.MinOptimizationInterval + atomic.StoreInt64(&ao.optimizationInterval, int64(interval)) + return interval + } else { + // Low stability: scale interval based on stability score + // Lower stability = longer intervals + stabilityRatio := float64(stabilityScore) / float64(ao.config.StabilityThreshold) + minInterval := float64(ao.config.MinOptimizationInterval) + maxInterval := float64(ao.config.MaxOptimizationInterval) + + // Linear interpolation between min and max intervals + interval := time.Duration(minInterval + (maxInterval-minInterval)*(1.0-stabilityRatio)) + atomic.StoreInt64(&ao.optimizationInterval, int64(interval)) + return interval + } +} + // GetOptimizationStats returns current optimization statistics func (ao *AdaptiveOptimizer) GetOptimizationStats() map[string]interface{} { return map[string]interface{}{ - "optimization_level": atomic.LoadInt64(&ao.optimizationLevel), - "optimization_count": atomic.LoadInt64(&ao.optimizationCount), - "last_optimization": time.Unix(0, atomic.LoadInt64(&ao.lastOptimization)), + "optimization_level": atomic.LoadInt64(&ao.optimizationLevel), + "optimization_count": atomic.LoadInt64(&ao.optimizationCount), + "last_optimization": time.Unix(0, atomic.LoadInt64(&ao.lastOptimization)), + "stability_score": atomic.LoadInt64(&ao.stabilityScore), + "optimization_interval": time.Duration(atomic.LoadInt64(&ao.optimizationInterval)), } } diff --git a/internal/audio/util_buffer_pool.go b/internal/audio/util_buffer_pool.go index f056c088..b9232bbb 100644 --- a/internal/audio/util_buffer_pool.go +++ b/internal/audio/util_buffer_pool.go @@ -354,12 +354,12 @@ type AudioBufferPool struct { // Memory optimization fields preallocated []*[]byte // Pre-allocated buffers for immediate use preallocSize int // Number of pre-allocated buffers - + // Chunk-based allocation optimization - chunkSize int // Size of each memory chunk - chunks [][]byte // Pre-allocated memory chunks - chunkOffsets []int // Current offset in each chunk - chunkMutex sync.Mutex // Protects chunk allocation + chunkSize int // Size of each memory chunk + chunks [][]byte // Pre-allocated memory chunks + chunkOffsets []int // Current offset in each chunk + chunkMutex sync.Mutex // Protects chunk allocation } func NewAudioBufferPool(bufferSize int) *AudioBufferPool { @@ -432,7 +432,7 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { func (p *AudioBufferPool) allocateFromChunk() []byte { p.chunkMutex.Lock() defer p.chunkMutex.Unlock() - + // Try to allocate from existing chunks for i := 0; i < len(p.chunks); i++ { if p.chunkOffsets[i]+p.bufferSize <= len(p.chunks[i]) { @@ -444,12 +444,12 @@ func (p *AudioBufferPool) allocateFromChunk() []byte { return buf[:0] // Return with zero length but correct capacity } } - + // Need to allocate a new chunk newChunk := make([]byte, p.chunkSize) p.chunks = append(p.chunks, newChunk) p.chunkOffsets = append(p.chunkOffsets, p.bufferSize) - + // Return buffer from the new chunk buf := newChunk[0:p.bufferSize:p.bufferSize] return buf[:0] // Return with zero length but correct capacity