perf(audio): make refCount operations atomic and optimize frame pooling

Replace mutex-protected refCount operations with atomic operations to improve performance in concurrent scenarios.
Simplify frame release logic and add hitCount metric for pool usage tracking.
This commit is contained in:
Alex P 2025-09-08 08:17:01 +00:00
parent 6890f17a54
commit a6913bf33b
4 changed files with 211 additions and 32 deletions

View File

@ -152,6 +152,29 @@ func (abm *AdaptiveBufferManager) GetOutputBufferSize() int {
// UpdateLatency updates the current latency measurement
func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) {
// Use exponential moving average for latency tracking
// Weight: 90% historical, 10% current (for smoother averaging)
currentAvg := atomic.LoadInt64(&abm.averageLatency)
newLatencyNs := latency.Nanoseconds()
if currentAvg == 0 {
// First measurement
atomic.StoreInt64(&abm.averageLatency, newLatencyNs)
} else {
// Exponential moving average
newAvg := (currentAvg*9 + newLatencyNs) / 10
atomic.StoreInt64(&abm.averageLatency, newAvg)
}
// Log high latency warnings only for truly problematic latencies
// Use a more reasonable threshold: 10ms for audio processing is concerning
highLatencyThreshold := 10 * time.Millisecond
if latency > highLatencyThreshold {
abm.logger.Debug().
Dur("latency_ms", latency/time.Millisecond).
Dur("threshold_ms", highLatencyThreshold/time.Millisecond).
Msg("High audio processing latency detected")
}
}
// adaptationLoop is the main loop that adjusts buffer sizes

View File

@ -65,6 +65,42 @@ func (p *GoroutinePool) Submit(task Task) bool {
}
}
// SubmitWithBackpressure adds a task to the pool with backpressure handling
// Returns true if task was accepted, false if dropped due to backpressure
func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool {
select {
case <-p.shutdown:
return false // Pool is shutting down
case p.taskQueue <- task:
// Task accepted, ensure we have a worker to process it
p.ensureWorkerAvailable()
return true
default:
// Queue is full - apply backpressure
// Check if we're in a high-load situation
queueLen := len(p.taskQueue)
queueCap := cap(p.taskQueue)
workerCount := atomic.LoadInt64(&p.workerCount)
// If queue is >90% full and we're at max workers, drop the task
if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) {
p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure")
return false
}
// Try one more time with a short timeout
select {
case p.taskQueue <- task:
p.ensureWorkerAvailable()
return true
case <-time.After(1 * time.Millisecond):
// Still can't submit after timeout - drop task
p.logger.Debug().Msg("Task dropped after backpressure timeout")
return false
}
}
}
// ensureWorkerAvailable makes sure at least one worker is available to process tasks
func (p *GoroutinePool) ensureWorkerAvailable() {
// Check if we already have enough workers
@ -265,6 +301,16 @@ func SubmitAudioReaderTask(task Task) bool {
return GetAudioReaderPool().Submit(task)
}
// SubmitAudioProcessorTaskWithBackpressure submits a task with backpressure handling
func SubmitAudioProcessorTaskWithBackpressure(task Task) bool {
return GetAudioProcessorPool().SubmitWithBackpressure(task)
}
// SubmitAudioReaderTaskWithBackpressure submits a task with backpressure handling
func SubmitAudioReaderTaskWithBackpressure(task Task) bool {
return GetAudioReaderPool().SubmitWithBackpressure(task)
}
// ShutdownAudioPools shuts down all audio goroutine pools
func ShutdownAudioPools(wait bool) {
logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger()

View File

@ -191,6 +191,10 @@ type AudioInputServer struct {
stopChan chan struct{} // Stop signal for all goroutines
wg sync.WaitGroup // Wait group for goroutine coordination
// Channel resizing support
channelMutex sync.RWMutex // Protects channel recreation
lastBufferSize int64 // Last known buffer size for change detection
// Socket buffer configuration
socketBufferConfig SocketBufferConfig
}
@ -231,6 +235,13 @@ func NewAudioInputServer() (*AudioInputServer, error) {
adaptiveManager := GetAdaptiveBufferManager()
initialBufferSize := int64(adaptiveManager.GetInputBufferSize())
// Ensure minimum buffer size to prevent immediate overflow
// Use at least 50 frames to handle burst traffic
minBufferSize := int64(50)
if initialBufferSize < minBufferSize {
initialBufferSize = minBufferSize
}
// Initialize socket buffer configuration
socketBufferConfig := DefaultSocketBufferConfig()
@ -240,6 +251,7 @@ func NewAudioInputServer() (*AudioInputServer, error) {
processChan: make(chan *InputIPCMessage, initialBufferSize),
stopChan: make(chan struct{}),
bufferSize: initialBufferSize,
lastBufferSize: initialBufferSize,
socketBufferConfig: socketBufferConfig,
}, nil
}
@ -950,9 +962,13 @@ func (ais *AudioInputServer) startReaderGoroutine() {
}
}
// Send to message channel with non-blocking write
// Send to message channel with non-blocking write (use read lock for channel access)
ais.channelMutex.RLock()
messageChan := ais.messageChan
ais.channelMutex.RUnlock()
select {
case ais.messageChan <- msg:
case messageChan <- msg:
atomic.AddInt64(&ais.totalFrames, 1)
default:
// Channel full, drop message
@ -966,16 +982,16 @@ func (ais *AudioInputServer) startReaderGoroutine() {
}
}
// Submit the reader task to the audio reader pool
// Submit the reader task to the audio reader pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioReaderTask(readerTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
// Only log if warn level enabled - avoid sampling logic in critical path
if logger.GetLevel() <= zerolog.WarnLevel {
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
}
if !SubmitAudioReaderTaskWithBackpressure(readerTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio reader task dropped due to backpressure")
go readerTask()
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
}
@ -1011,7 +1027,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
select {
case <-ais.stopChan:
return
case msg := <-ais.messageChan:
case msg := <-ais.getMessageChan():
// Process message with error handling
start := time.Now()
err := ais.processMessageWithRecovery(msg, logger)
@ -1032,9 +1048,10 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
// If too many processing errors, drop frames more aggressively
if processingErrors >= maxProcessingErrors {
// Clear processing queue to recover
for len(ais.processChan) > 0 {
processChan := ais.getProcessChan()
for len(processChan) > 0 {
select {
case <-ais.processChan:
case <-processChan:
atomic.AddInt64(&ais.droppedFrames, 1)
default:
break
@ -1057,13 +1074,16 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
}
}
// Submit the processor task to the audio processor pool
// Submit the processor task to the audio processor pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTask(processorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
if !SubmitAudioProcessorTaskWithBackpressure(processorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio processor task dropped due to backpressure")
go processorTask()
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
}
@ -1072,13 +1092,14 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo
// Intelligent frame dropping: prioritize recent frames
if msg.Type == InputMessageTypeOpusFrame {
// Check if processing queue is getting full
queueLen := len(ais.processChan)
processChan := ais.getProcessChan()
queueLen := len(processChan)
bufferSize := int(atomic.LoadInt64(&ais.bufferSize))
if queueLen > bufferSize*3/4 {
// Drop oldest frames, keep newest
select {
case <-ais.processChan: // Remove oldest
case <-processChan: // Remove oldest
atomic.AddInt64(&ais.droppedFrames, 1)
logger.Debug().Msg("Dropped oldest frame to make room")
default:
@ -1086,9 +1107,13 @@ func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, lo
}
}
// Send to processing queue with timeout
// Send to processing queue with timeout (use read lock for channel access)
ais.channelMutex.RLock()
processChan := ais.processChan
ais.channelMutex.RUnlock()
select {
case ais.processChan <- msg:
case processChan <- msg:
return nil
case <-time.After(GetConfig().WriteTimeout):
// Processing queue full and timeout reached, drop frame
@ -1135,7 +1160,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
// Process frames from processing queue
for {
select {
case msg := <-ais.processChan:
case msg := <-ais.getProcessChan():
start := time.Now()
err := ais.processMessage(msg)
processingTime := time.Since(start)
@ -1183,13 +1208,16 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
}
}
// Submit the monitor task to the audio processor pool
// Submit the monitor task to the audio processor pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTask(monitorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
if !SubmitAudioProcessorTaskWithBackpressure(monitorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio monitor task dropped due to backpressure")
go monitorTask()
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
}
@ -1205,7 +1233,61 @@ func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessi
func (ais *AudioInputServer) UpdateBufferSize() {
adaptiveManager := GetAdaptiveBufferManager()
newSize := int64(adaptiveManager.GetInputBufferSize())
oldSize := atomic.LoadInt64(&ais.bufferSize)
// Only recreate channels if size changed significantly (>25% difference)
if oldSize > 0 {
diff := float64(newSize-oldSize) / float64(oldSize)
if diff < 0.25 && diff > -0.25 {
return // Size change not significant enough
}
}
atomic.StoreInt64(&ais.bufferSize, newSize)
// Recreate channels with new buffer size if server is running
if ais.running {
ais.recreateChannels(int(newSize))
}
}
// recreateChannels recreates the message channels with new buffer size
func (ais *AudioInputServer) recreateChannels(newSize int) {
ais.channelMutex.Lock()
defer ais.channelMutex.Unlock()
// Create new channels with updated buffer size
newMessageChan := make(chan *InputIPCMessage, newSize)
newProcessChan := make(chan *InputIPCMessage, newSize)
// Drain old channels and transfer messages to new channels
ais.drainAndTransferChannel(ais.messageChan, newMessageChan)
ais.drainAndTransferChannel(ais.processChan, newProcessChan)
// Replace channels atomically
ais.messageChan = newMessageChan
ais.processChan = newProcessChan
ais.lastBufferSize = int64(newSize)
}
// drainAndTransferChannel drains the old channel and transfers messages to new channel
func (ais *AudioInputServer) drainAndTransferChannel(oldChan, newChan chan *InputIPCMessage) {
for {
select {
case msg := <-oldChan:
// Try to transfer to new channel, drop if full
select {
case newChan <- msg:
// Successfully transferred
default:
// New channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
}
default:
// Old channel empty
return
}
}
}
// ReportLatency reports processing latency to adaptive buffer manager
@ -1259,6 +1341,20 @@ func GetGlobalMessagePoolStats() MessagePoolStats {
return globalMessagePool.GetMessagePoolStats()
}
// getMessageChan safely returns the current message channel
func (ais *AudioInputServer) getMessageChan() chan *InputIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.messageChan
}
// getProcessChan safely returns the current process channel
func (ais *AudioInputServer) getProcessChan() chan *InputIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.processChan
}
// Helper functions
// getInputSocketPath is now defined in unified_ipc.go

View File

@ -24,6 +24,14 @@ var (
headerSize = 17 // Fixed header size: 4+1+4+8 bytes
)
// Header buffer pool to reduce allocation overhead
var headerBufferPool = sync.Pool{
New: func() interface{} {
buf := make([]byte, headerSize)
return &buf
},
}
// UnifiedMessageType represents the type of IPC message for both input and output
type UnifiedMessageType uint8
@ -283,8 +291,11 @@ func (s *UnifiedAudioServer) startProcessorGoroutine() {
// readMessage reads a message from the connection
func (s *UnifiedAudioServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) {
// Read header
header := make([]byte, headerSize)
// Get header buffer from pool
headerPtr := headerBufferPool.Get().(*[]byte)
header := *headerPtr
defer headerBufferPool.Put(headerPtr)
if _, err := io.ReadFull(conn, header); err != nil {
return nil, fmt.Errorf("failed to read header: %w", err)
}
@ -361,8 +372,11 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error {
// writeMessage writes a message to the connection
func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
// Write header
header := make([]byte, headerSize)
// Get header buffer from pool
headerPtr := headerBufferPool.Get().(*[]byte)
header := *headerPtr
defer headerBufferPool.Put(headerPtr)
binary.LittleEndian.PutUint32(header[0:4], msg.Magic)
header[4] = uint8(msg.Type)
binary.LittleEndian.PutUint32(header[5:9], msg.Length)