refactor(audio): improve memory management with atomic operations and chunk allocation

- Replace mutex-protected refCount with atomic operations in ZeroCopyFramePool
- Implement chunk-based allocation in AudioBufferPool to reduce allocations
- Add proper reference counting with atomic operations in ZeroCopyAudioFrame
- Optimize buffer pool sizing based on buffer size
This commit is contained in:
Alex P 2025-09-08 08:25:42 +00:00
parent a6913bf33b
commit 323d2587b7
4 changed files with 127 additions and 74 deletions

View File

@ -81,13 +81,13 @@ func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool {
queueLen := len(p.taskQueue) queueLen := len(p.taskQueue)
queueCap := cap(p.taskQueue) queueCap := cap(p.taskQueue)
workerCount := atomic.LoadInt64(&p.workerCount) workerCount := atomic.LoadInt64(&p.workerCount)
// If queue is >90% full and we're at max workers, drop the task // If queue is >90% full and we're at max workers, drop the task
if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) { if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) {
p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure") p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure")
return false return false
} }
// Try one more time with a short timeout // Try one more time with a short timeout
select { select {
case p.taskQueue <- task: case p.taskQueue <- task:

View File

@ -192,8 +192,8 @@ type AudioInputServer struct {
wg sync.WaitGroup // Wait group for goroutine coordination wg sync.WaitGroup // Wait group for goroutine coordination
// Channel resizing support // Channel resizing support
channelMutex sync.RWMutex // Protects channel recreation channelMutex sync.RWMutex // Protects channel recreation
lastBufferSize int64 // Last known buffer size for change detection lastBufferSize int64 // Last known buffer size for change detection
// Socket buffer configuration // Socket buffer configuration
socketBufferConfig SocketBufferConfig socketBufferConfig SocketBufferConfig
@ -234,7 +234,7 @@ func NewAudioInputServer() (*AudioInputServer, error) {
// Get initial buffer size from adaptive buffer manager // Get initial buffer size from adaptive buffer manager
adaptiveManager := GetAdaptiveBufferManager() adaptiveManager := GetAdaptiveBufferManager()
initialBufferSize := int64(adaptiveManager.GetInputBufferSize()) initialBufferSize := int64(adaptiveManager.GetInputBufferSize())
// Ensure minimum buffer size to prevent immediate overflow // Ensure minimum buffer size to prevent immediate overflow
// Use at least 50 frames to handle burst traffic // Use at least 50 frames to handle burst traffic
minBufferSize := int64(50) minBufferSize := int64(50)
@ -966,7 +966,7 @@ func (ais *AudioInputServer) startReaderGoroutine() {
ais.channelMutex.RLock() ais.channelMutex.RLock()
messageChan := ais.messageChan messageChan := ais.messageChan
ais.channelMutex.RUnlock() ais.channelMutex.RUnlock()
select { select {
case messageChan <- msg: case messageChan <- msg:
atomic.AddInt64(&ais.totalFrames, 1) atomic.AddInt64(&ais.totalFrames, 1)
@ -1111,7 +1111,7 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo
ais.channelMutex.RLock() ais.channelMutex.RLock()
processChan := ais.processChan processChan := ais.processChan
ais.channelMutex.RUnlock() ais.channelMutex.RUnlock()
select { select {
case processChan <- msg: case processChan <- msg:
return nil return nil
@ -1234,7 +1234,7 @@ func (ais *AudioInputServer) UpdateBufferSize() {
adaptiveManager := GetAdaptiveBufferManager() adaptiveManager := GetAdaptiveBufferManager()
newSize := int64(adaptiveManager.GetInputBufferSize()) newSize := int64(adaptiveManager.GetInputBufferSize())
oldSize := atomic.LoadInt64(&ais.bufferSize) oldSize := atomic.LoadInt64(&ais.bufferSize)
// Only recreate channels if size changed significantly (>25% difference) // Only recreate channels if size changed significantly (>25% difference)
if oldSize > 0 { if oldSize > 0 {
diff := float64(newSize-oldSize) / float64(oldSize) diff := float64(newSize-oldSize) / float64(oldSize)
@ -1242,9 +1242,9 @@ func (ais *AudioInputServer) UpdateBufferSize() {
return // Size change not significant enough return // Size change not significant enough
} }
} }
atomic.StoreInt64(&ais.bufferSize, newSize) atomic.StoreInt64(&ais.bufferSize, newSize)
// Recreate channels with new buffer size if server is running // Recreate channels with new buffer size if server is running
if ais.running { if ais.running {
ais.recreateChannels(int(newSize)) ais.recreateChannels(int(newSize))
@ -1255,15 +1255,15 @@ func (ais *AudioInputServer) UpdateBufferSize() {
func (ais *AudioInputServer) recreateChannels(newSize int) { func (ais *AudioInputServer) recreateChannels(newSize int) {
ais.channelMutex.Lock() ais.channelMutex.Lock()
defer ais.channelMutex.Unlock() defer ais.channelMutex.Unlock()
// Create new channels with updated buffer size // Create new channels with updated buffer size
newMessageChan := make(chan *InputIPCMessage, newSize) newMessageChan := make(chan *InputIPCMessage, newSize)
newProcessChan := make(chan *InputIPCMessage, newSize) newProcessChan := make(chan *InputIPCMessage, newSize)
// Drain old channels and transfer messages to new channels // Drain old channels and transfer messages to new channels
ais.drainAndTransferChannel(ais.messageChan, newMessageChan) ais.drainAndTransferChannel(ais.messageChan, newMessageChan)
ais.drainAndTransferChannel(ais.processChan, newProcessChan) ais.drainAndTransferChannel(ais.processChan, newProcessChan)
// Replace channels atomically // Replace channels atomically
ais.messageChan = newMessageChan ais.messageChan = newMessageChan
ais.processChan = newProcessChan ais.processChan = newProcessChan

View File

@ -354,6 +354,12 @@ type AudioBufferPool struct {
// Memory optimization fields // Memory optimization fields
preallocated []*[]byte // Pre-allocated buffers for immediate use preallocated []*[]byte // Pre-allocated buffers for immediate use
preallocSize int // Number of pre-allocated buffers preallocSize int // Number of pre-allocated buffers
// Chunk-based allocation optimization
chunkSize int // Size of each memory chunk
chunks [][]byte // Pre-allocated memory chunks
chunkOffsets []int // Current offset in each chunk
chunkMutex sync.Mutex // Protects chunk allocation
} }
func NewAudioBufferPool(bufferSize int) *AudioBufferPool { func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
@ -379,29 +385,74 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
preallocSize = minPrealloc preallocSize = minPrealloc
} }
// Pre-allocate with exact capacity to avoid slice growth // Calculate max pool size based on buffer size to prevent memory bloat
preallocated := make([]*[]byte, 0, preallocSize) maxPoolSize := 256 // Default
if bufferSize > 8192 {
maxPoolSize = 64 // Much smaller for very large buffers
} else if bufferSize > 4096 {
maxPoolSize = 128 // Smaller for large buffers
} else if bufferSize > 1024 {
maxPoolSize = 192 // Medium for medium buffers
}
// Calculate chunk size - allocate larger chunks to reduce allocation frequency
chunkSize := bufferSize * 64 // Each chunk holds 64 buffers worth of memory
if chunkSize < 64*1024 {
chunkSize = 64 * 1024 // Minimum 64KB chunks
}
p := &AudioBufferPool{
bufferSize: bufferSize,
maxPoolSize: maxPoolSize,
preallocated: make([]*[]byte, 0, preallocSize),
preallocSize: preallocSize,
chunkSize: chunkSize,
chunks: make([][]byte, 0, 4), // Start with capacity for 4 chunks
chunkOffsets: make([]int, 0, 4),
}
// Configure sync.Pool with optimized allocation
p.pool.New = func() interface{} {
// Use chunk-based allocation instead of individual make()
buf := p.allocateFromChunk()
return &buf
}
// Pre-allocate buffers with optimized capacity // Pre-allocate buffers with optimized capacity
for i := 0; i < preallocSize; i++ { for i := 0; i < preallocSize; i++ {
// Use exact buffer size to prevent over-allocation // Use chunk-based allocation to prevent over-allocation
buf := make([]byte, 0, bufferSize) buf := p.allocateFromChunk()
preallocated = append(preallocated, &buf) p.preallocated = append(p.preallocated, &buf)
} }
return &AudioBufferPool{ return p
bufferSize: bufferSize, }
maxPoolSize: GetConfig().MaxPoolSize * 2, // Double the max pool size for better buffering
preallocated: preallocated, // allocateFromChunk allocates a buffer from pre-allocated memory chunks
preallocSize: preallocSize, func (p *AudioBufferPool) allocateFromChunk() []byte {
pool: sync.Pool{ p.chunkMutex.Lock()
New: func() interface{} { defer p.chunkMutex.Unlock()
// Allocate exact size to minimize memory waste
buf := make([]byte, 0, bufferSize) // Try to allocate from existing chunks
return &buf for i := 0; i < len(p.chunks); i++ {
}, if p.chunkOffsets[i]+p.bufferSize <= len(p.chunks[i]) {
}, // Slice from the chunk
start := p.chunkOffsets[i]
end := start + p.bufferSize
buf := p.chunks[i][start:end:end] // Use 3-index slice to set capacity
p.chunkOffsets[i] = end
return buf[:0] // Return with zero length but correct capacity
}
} }
// Need to allocate a new chunk
newChunk := make([]byte, p.chunkSize)
p.chunks = append(p.chunks, newChunk)
p.chunkOffsets = append(p.chunkOffsets, p.bufferSize)
// Return buffer from the new chunk
buf := newChunk[0:p.bufferSize:p.bufferSize]
return buf[:0] // Return with zero length but correct capacity
} }
func (p *AudioBufferPool) Get() []byte { func (p *AudioBufferPool) Get() []byte {
@ -459,10 +510,10 @@ func (p *AudioBufferPool) Get() []byte {
// Buffer too small, fall through to allocation // Buffer too small, fall through to allocation
} }
// Pool miss - allocate new buffer with exact capacity // Pool miss - allocate new buffer from chunk
// Direct miss count update to avoid sampling complexity in critical path // Direct miss count update to avoid sampling complexity in critical path
atomic.AddInt64(&p.missCount, 1) atomic.AddInt64(&p.missCount, 1)
return make([]byte, 0, p.bufferSize) return p.allocateFromChunk()
} }
func (p *AudioBufferPool) Put(buf []byte) { func (p *AudioBufferPool) Put(buf []byte) {

View File

@ -147,7 +147,7 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
// If we've allocated too many frames, force pool reuse // If we've allocated too many frames, force pool reuse
frame := p.pool.Get().(*ZeroCopyAudioFrame) frame := p.pool.Get().(*ZeroCopyAudioFrame)
frame.mutex.Lock() frame.mutex.Lock()
frame.refCount = 1 atomic.StoreInt32(&frame.refCount, 1)
frame.length = 0 frame.length = 0
frame.data = frame.data[:0] frame.data = frame.data[:0]
frame.mutex.Unlock() frame.mutex.Unlock()
@ -163,11 +163,12 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
p.mutex.Unlock() p.mutex.Unlock()
frame.mutex.Lock() frame.mutex.Lock()
frame.refCount = 1 atomic.StoreInt32(&frame.refCount, 1)
frame.length = 0 frame.length = 0
frame.data = frame.data[:0] frame.data = frame.data[:0]
frame.mutex.Unlock() frame.mutex.Unlock()
atomic.AddInt64(&p.hitCount, 1)
return frame return frame
} }
p.mutex.Unlock() p.mutex.Unlock()
@ -175,7 +176,7 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
// Try sync.Pool next and track allocation // Try sync.Pool next and track allocation
frame := p.pool.Get().(*ZeroCopyAudioFrame) frame := p.pool.Get().(*ZeroCopyAudioFrame)
frame.mutex.Lock() frame.mutex.Lock()
frame.refCount = 1 atomic.StoreInt32(&frame.refCount, 1)
frame.length = 0 frame.length = 0
frame.data = frame.data[:0] frame.data = frame.data[:0]
frame.mutex.Unlock() frame.mutex.Unlock()
@ -191,43 +192,34 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) {
return return
} }
// Reset frame state for reuse
frame.mutex.Lock() frame.mutex.Lock()
frame.refCount-- atomic.StoreInt32(&frame.refCount, 0)
if frame.refCount <= 0 { frame.length = 0
frame.refCount = 0 frame.data = frame.data[:0]
frame.length = 0 frame.mutex.Unlock()
frame.data = frame.data[:0]
frame.mutex.Unlock()
// First try to return to pre-allocated pool for fastest reuse // First try to return to pre-allocated pool for fastest reuse
p.mutex.Lock() p.mutex.Lock()
if len(p.preallocated) < p.preallocSize { if len(p.preallocated) < p.preallocSize {
p.preallocated = append(p.preallocated, frame) p.preallocated = append(p.preallocated, frame)
p.mutex.Unlock()
return
}
p.mutex.Unlock() p.mutex.Unlock()
return
}
p.mutex.Unlock()
// Check pool size limit to prevent excessive memory usage // Check pool size limit to prevent excessive memory usage
p.mutex.RLock() p.mutex.RLock()
currentCount := atomic.LoadInt64(&p.counter) currentCount := atomic.LoadInt64(&p.counter)
p.mutex.RUnlock() p.mutex.RUnlock()
if currentCount >= int64(p.maxPoolSize) { if currentCount >= int64(p.maxPoolSize) {
return // Pool is full, let GC handle this frame return // Pool is full, let GC handle this frame
}
// Return to sync.Pool
p.pool.Put(frame)
// Metrics collection removed
if false {
atomic.AddInt64(&p.counter, 1)
}
} else {
frame.mutex.Unlock()
} }
// Metrics recording removed - granular metrics collector was unused // Return to sync.Pool
p.pool.Put(frame)
atomic.AddInt64(&p.counter, 1)
} }
// Data returns the frame data as a slice (zero-copy view) // Data returns the frame data as a slice (zero-copy view)
@ -271,18 +263,28 @@ func (f *ZeroCopyAudioFrame) SetDataDirect(data []byte) {
f.pooled = false // Direct assignment means we can't pool this frame f.pooled = false // Direct assignment means we can't pool this frame
} }
// AddRef increments the reference count for shared access // AddRef increments the reference count atomically
func (f *ZeroCopyAudioFrame) AddRef() { func (f *ZeroCopyAudioFrame) AddRef() {
f.mutex.Lock() atomic.AddInt32(&f.refCount, 1)
f.refCount++
f.mutex.Unlock()
} }
// Release decrements the reference count // Release decrements the reference count atomically
func (f *ZeroCopyAudioFrame) Release() { // Returns true if this was the final reference
f.mutex.Lock() func (f *ZeroCopyAudioFrame) Release() bool {
f.refCount-- newCount := atomic.AddInt32(&f.refCount, -1)
f.mutex.Unlock() if newCount == 0 {
// Final reference released, return to pool if pooled
if f.pooled {
globalZeroCopyPool.Put(f)
}
return true
}
return false
}
// RefCount returns the current reference count atomically
func (f *ZeroCopyAudioFrame) RefCount() int32 {
return atomic.LoadInt32(&f.refCount)
} }
// Length returns the current data length // Length returns the current data length