From 323d2587b7d8e01330c76316e0a7ca60d4f4302f Mon Sep 17 00:00:00 2001 From: Alex P Date: Mon, 8 Sep 2025 08:25:42 +0000 Subject: [PATCH] refactor(audio): improve memory management with atomic operations and chunk allocation - Replace mutex-protected refCount with atomic operations in ZeroCopyFramePool - Implement chunk-based allocation in AudioBufferPool to reduce allocations - Add proper reference counting with atomic operations in ZeroCopyAudioFrame - Optimize buffer pool sizing based on buffer size --- internal/audio/goroutine_pool.go | 4 +- internal/audio/ipc_input.go | 22 ++++---- internal/audio/util_buffer_pool.go | 89 +++++++++++++++++++++++------- internal/audio/zero_copy.go | 86 +++++++++++++++-------------- 4 files changed, 127 insertions(+), 74 deletions(-) diff --git a/internal/audio/goroutine_pool.go b/internal/audio/goroutine_pool.go index cfc844e0..aca2069c 100644 --- a/internal/audio/goroutine_pool.go +++ b/internal/audio/goroutine_pool.go @@ -81,13 +81,13 @@ func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool { queueLen := len(p.taskQueue) queueCap := cap(p.taskQueue) workerCount := atomic.LoadInt64(&p.workerCount) - + // If queue is >90% full and we're at max workers, drop the task if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) { p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure") return false } - + // Try one more time with a short timeout select { case p.taskQueue <- task: diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index b2202905..12b5c016 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -192,8 +192,8 @@ type AudioInputServer struct { wg sync.WaitGroup // Wait group for goroutine coordination // Channel resizing support - channelMutex sync.RWMutex // Protects channel recreation - lastBufferSize int64 // Last known buffer size for change detection + channelMutex sync.RWMutex // Protects channel recreation + lastBufferSize int64 // Last known buffer size for change detection // Socket buffer configuration socketBufferConfig SocketBufferConfig @@ -234,7 +234,7 @@ func NewAudioInputServer() (*AudioInputServer, error) { // Get initial buffer size from adaptive buffer manager adaptiveManager := GetAdaptiveBufferManager() initialBufferSize := int64(adaptiveManager.GetInputBufferSize()) - + // Ensure minimum buffer size to prevent immediate overflow // Use at least 50 frames to handle burst traffic minBufferSize := int64(50) @@ -966,7 +966,7 @@ func (ais *AudioInputServer) startReaderGoroutine() { ais.channelMutex.RLock() messageChan := ais.messageChan ais.channelMutex.RUnlock() - + select { case messageChan <- msg: atomic.AddInt64(&ais.totalFrames, 1) @@ -1111,7 +1111,7 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo ais.channelMutex.RLock() processChan := ais.processChan ais.channelMutex.RUnlock() - + select { case processChan <- msg: return nil @@ -1234,7 +1234,7 @@ func (ais *AudioInputServer) UpdateBufferSize() { adaptiveManager := GetAdaptiveBufferManager() newSize := int64(adaptiveManager.GetInputBufferSize()) oldSize := atomic.LoadInt64(&ais.bufferSize) - + // Only recreate channels if size changed significantly (>25% difference) if oldSize > 0 { diff := float64(newSize-oldSize) / float64(oldSize) @@ -1242,9 +1242,9 @@ func (ais *AudioInputServer) UpdateBufferSize() { return // Size change not significant enough } } - + atomic.StoreInt64(&ais.bufferSize, newSize) - + // Recreate channels with new buffer size if server is running if ais.running { ais.recreateChannels(int(newSize)) @@ -1255,15 +1255,15 @@ func (ais *AudioInputServer) UpdateBufferSize() { func (ais *AudioInputServer) recreateChannels(newSize int) { ais.channelMutex.Lock() defer ais.channelMutex.Unlock() - + // Create new channels with updated buffer size newMessageChan := make(chan *InputIPCMessage, newSize) newProcessChan := make(chan *InputIPCMessage, newSize) - + // Drain old channels and transfer messages to new channels ais.drainAndTransferChannel(ais.messageChan, newMessageChan) ais.drainAndTransferChannel(ais.processChan, newProcessChan) - + // Replace channels atomically ais.messageChan = newMessageChan ais.processChan = newProcessChan diff --git a/internal/audio/util_buffer_pool.go b/internal/audio/util_buffer_pool.go index 442b3204..f056c088 100644 --- a/internal/audio/util_buffer_pool.go +++ b/internal/audio/util_buffer_pool.go @@ -354,6 +354,12 @@ type AudioBufferPool struct { // Memory optimization fields preallocated []*[]byte // Pre-allocated buffers for immediate use preallocSize int // Number of pre-allocated buffers + + // Chunk-based allocation optimization + chunkSize int // Size of each memory chunk + chunks [][]byte // Pre-allocated memory chunks + chunkOffsets []int // Current offset in each chunk + chunkMutex sync.Mutex // Protects chunk allocation } func NewAudioBufferPool(bufferSize int) *AudioBufferPool { @@ -379,29 +385,74 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { preallocSize = minPrealloc } - // Pre-allocate with exact capacity to avoid slice growth - preallocated := make([]*[]byte, 0, preallocSize) + // Calculate max pool size based on buffer size to prevent memory bloat + maxPoolSize := 256 // Default + if bufferSize > 8192 { + maxPoolSize = 64 // Much smaller for very large buffers + } else if bufferSize > 4096 { + maxPoolSize = 128 // Smaller for large buffers + } else if bufferSize > 1024 { + maxPoolSize = 192 // Medium for medium buffers + } + + // Calculate chunk size - allocate larger chunks to reduce allocation frequency + chunkSize := bufferSize * 64 // Each chunk holds 64 buffers worth of memory + if chunkSize < 64*1024 { + chunkSize = 64 * 1024 // Minimum 64KB chunks + } + + p := &AudioBufferPool{ + bufferSize: bufferSize, + maxPoolSize: maxPoolSize, + preallocated: make([]*[]byte, 0, preallocSize), + preallocSize: preallocSize, + chunkSize: chunkSize, + chunks: make([][]byte, 0, 4), // Start with capacity for 4 chunks + chunkOffsets: make([]int, 0, 4), + } + + // Configure sync.Pool with optimized allocation + p.pool.New = func() interface{} { + // Use chunk-based allocation instead of individual make() + buf := p.allocateFromChunk() + return &buf + } // Pre-allocate buffers with optimized capacity for i := 0; i < preallocSize; i++ { - // Use exact buffer size to prevent over-allocation - buf := make([]byte, 0, bufferSize) - preallocated = append(preallocated, &buf) + // Use chunk-based allocation to prevent over-allocation + buf := p.allocateFromChunk() + p.preallocated = append(p.preallocated, &buf) } - return &AudioBufferPool{ - bufferSize: bufferSize, - maxPoolSize: GetConfig().MaxPoolSize * 2, // Double the max pool size for better buffering - preallocated: preallocated, - preallocSize: preallocSize, - pool: sync.Pool{ - New: func() interface{} { - // Allocate exact size to minimize memory waste - buf := make([]byte, 0, bufferSize) - return &buf - }, - }, + return p +} + +// allocateFromChunk allocates a buffer from pre-allocated memory chunks +func (p *AudioBufferPool) allocateFromChunk() []byte { + p.chunkMutex.Lock() + defer p.chunkMutex.Unlock() + + // Try to allocate from existing chunks + for i := 0; i < len(p.chunks); i++ { + if p.chunkOffsets[i]+p.bufferSize <= len(p.chunks[i]) { + // Slice from the chunk + start := p.chunkOffsets[i] + end := start + p.bufferSize + buf := p.chunks[i][start:end:end] // Use 3-index slice to set capacity + p.chunkOffsets[i] = end + return buf[:0] // Return with zero length but correct capacity + } } + + // Need to allocate a new chunk + newChunk := make([]byte, p.chunkSize) + p.chunks = append(p.chunks, newChunk) + p.chunkOffsets = append(p.chunkOffsets, p.bufferSize) + + // Return buffer from the new chunk + buf := newChunk[0:p.bufferSize:p.bufferSize] + return buf[:0] // Return with zero length but correct capacity } func (p *AudioBufferPool) Get() []byte { @@ -459,10 +510,10 @@ func (p *AudioBufferPool) Get() []byte { // Buffer too small, fall through to allocation } - // Pool miss - allocate new buffer with exact capacity + // Pool miss - allocate new buffer from chunk // Direct miss count update to avoid sampling complexity in critical path atomic.AddInt64(&p.missCount, 1) - return make([]byte, 0, p.bufferSize) + return p.allocateFromChunk() } func (p *AudioBufferPool) Put(buf []byte) { diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index 924d895f..e74122cb 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -147,7 +147,7 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { // If we've allocated too many frames, force pool reuse frame := p.pool.Get().(*ZeroCopyAudioFrame) frame.mutex.Lock() - frame.refCount = 1 + atomic.StoreInt32(&frame.refCount, 1) frame.length = 0 frame.data = frame.data[:0] frame.mutex.Unlock() @@ -163,11 +163,12 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { p.mutex.Unlock() frame.mutex.Lock() - frame.refCount = 1 + atomic.StoreInt32(&frame.refCount, 1) frame.length = 0 frame.data = frame.data[:0] frame.mutex.Unlock() + atomic.AddInt64(&p.hitCount, 1) return frame } p.mutex.Unlock() @@ -175,7 +176,7 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { // Try sync.Pool next and track allocation frame := p.pool.Get().(*ZeroCopyAudioFrame) frame.mutex.Lock() - frame.refCount = 1 + atomic.StoreInt32(&frame.refCount, 1) frame.length = 0 frame.data = frame.data[:0] frame.mutex.Unlock() @@ -191,43 +192,34 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { return } + // Reset frame state for reuse frame.mutex.Lock() - frame.refCount-- - if frame.refCount <= 0 { - frame.refCount = 0 - frame.length = 0 - frame.data = frame.data[:0] - frame.mutex.Unlock() + atomic.StoreInt32(&frame.refCount, 0) + frame.length = 0 + frame.data = frame.data[:0] + frame.mutex.Unlock() - // First try to return to pre-allocated pool for fastest reuse - p.mutex.Lock() - if len(p.preallocated) < p.preallocSize { - p.preallocated = append(p.preallocated, frame) - p.mutex.Unlock() - return - } + // First try to return to pre-allocated pool for fastest reuse + p.mutex.Lock() + if len(p.preallocated) < p.preallocSize { + p.preallocated = append(p.preallocated, frame) p.mutex.Unlock() + return + } + p.mutex.Unlock() - // Check pool size limit to prevent excessive memory usage - p.mutex.RLock() - currentCount := atomic.LoadInt64(&p.counter) - p.mutex.RUnlock() + // Check pool size limit to prevent excessive memory usage + p.mutex.RLock() + currentCount := atomic.LoadInt64(&p.counter) + p.mutex.RUnlock() - if currentCount >= int64(p.maxPoolSize) { - return // Pool is full, let GC handle this frame - } - - // Return to sync.Pool - p.pool.Put(frame) - // Metrics collection removed - if false { - atomic.AddInt64(&p.counter, 1) - } - } else { - frame.mutex.Unlock() + if currentCount >= int64(p.maxPoolSize) { + return // Pool is full, let GC handle this frame } - // Metrics recording removed - granular metrics collector was unused + // Return to sync.Pool + p.pool.Put(frame) + atomic.AddInt64(&p.counter, 1) } // Data returns the frame data as a slice (zero-copy view) @@ -271,18 +263,28 @@ func (f *ZeroCopyAudioFrame) SetDataDirect(data []byte) { f.pooled = false // Direct assignment means we can't pool this frame } -// AddRef increments the reference count for shared access +// AddRef increments the reference count atomically func (f *ZeroCopyAudioFrame) AddRef() { - f.mutex.Lock() - f.refCount++ - f.mutex.Unlock() + atomic.AddInt32(&f.refCount, 1) } -// Release decrements the reference count -func (f *ZeroCopyAudioFrame) Release() { - f.mutex.Lock() - f.refCount-- - f.mutex.Unlock() +// Release decrements the reference count atomically +// Returns true if this was the final reference +func (f *ZeroCopyAudioFrame) Release() bool { + newCount := atomic.AddInt32(&f.refCount, -1) + if newCount == 0 { + // Final reference released, return to pool if pooled + if f.pooled { + globalZeroCopyPool.Put(f) + } + return true + } + return false +} + +// RefCount returns the current reference count atomically +func (f *ZeroCopyAudioFrame) RefCount() int32 { + return atomic.LoadInt32(&f.refCount) } // Length returns the current data length