perf(audio): optimize audio processing paths and reduce overhead

- Replace CGO function variable aliases with direct function calls to eliminate indirection
- Simplify audio frame validation by using cached max size and removing error formatting
- Optimize buffer pool operations by removing metrics collection and streamlining cache access
- Improve batch audio processor by pre-calculating values and reducing config lookups
- Streamline IPC message processing with inline validation and reduced error logging
This commit is contained in:
Alex P 2025-09-03 17:51:08 +00:00
parent a741f05829
commit 260f62efc3
5 changed files with 119 additions and 213 deletions

View File

@ -86,21 +86,24 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
cache := GetCachedConfig()
cache.Update()
// Validate input parameters
if err := ValidateBufferSize(batchSize); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Warn().Err(err).Int("batchSize", batchSize).Msg("invalid batch size, using default")
// Validate input parameters with minimal overhead
if batchSize <= 0 || batchSize > 1000 {
batchSize = cache.BatchProcessorFramesPerBatch
}
if batchDuration <= 0 {
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Warn().Dur("batchDuration", batchDuration).Msg("invalid batch duration, using default")
batchDuration = cache.BatchProcessingDelay
}
ctx, cancel := context.WithCancel(context.Background())
// Pre-allocate logger to avoid repeated allocations
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
// Pre-calculate frame size to avoid repeated GetConfig() calls
frameSize := cache.GetMinReadEncodeBuffer()
if frameSize == 0 {
frameSize = 1500 // Safe fallback
}
processor := &BatchAudioProcessor{
ctx: ctx,
cancel: cancel,
@ -111,12 +114,14 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
writeQueue: make(chan batchWriteRequest, batchSize*2),
readBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size
// Use pre-calculated frame size to avoid GetConfig() calls
return make([]byte, 0, frameSize)
},
},
writeBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size
// Use pre-calculated frame size to avoid GetConfig() calls
return make([]byte, 0, frameSize)
},
},
}
@ -386,65 +391,52 @@ func (bap *BatchAudioProcessor) batchWriteProcessor() {
// processBatchRead processes a batch of read requests efficiently
func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
if len(batch) == 0 {
batchSize := len(batch)
if batchSize == 0 {
return
}
// Get cached config to avoid GetConfig() calls in hot path
// Get cached config once - avoid repeated calls
cache := GetCachedConfig()
minBatchSize := cache.MinBatchSizeForThreadPinning
// Only pin to OS thread for large batches to reduce thread contention
start := time.Now()
shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning
// Track if we pinned the thread in this call
var start time.Time
threadWasPinned := false
if shouldPinThread && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
if batchSize >= minBatchSize && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
start = time.Now()
threadWasPinned = true
runtime.LockOSThread()
// Set high priority for batch audio processing
if err := SetAudioThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority")
}
// Skip priority setting for better performance - audio threads already have good priority
}
batchSize := len(batch)
// Update stats efficiently
atomic.AddInt64(&bap.stats.BatchedReads, 1)
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Add deferred function to release thread lock if we pinned it
if threadWasPinned {
defer func() {
if err := ResetThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to reset thread priority")
}
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.threadPinned, 0)
bap.stats.OSThreadPinTime += time.Since(start)
}()
}
// Process each request in the batch
for _, req := range batch {
// Process each request in the batch with minimal overhead
for i := range batch {
req := &batch[i]
length, err := CGOAudioReadEncode(req.buffer)
result := batchReadResult{
length: length,
err: err,
}
// Send result back (non-blocking)
// Send result back (non-blocking) - reuse result struct
select {
case req.resultChan <- result:
case req.resultChan <- batchReadResult{length: length, err: err}:
default:
// Requestor timed out, drop result
}
}
// Release thread lock if we pinned it
if threadWasPinned {
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.threadPinned, 0)
bap.stats.OSThreadPinTime += time.Since(start)
}
bap.stats.LastBatchTime = time.Now()
}
@ -468,10 +460,8 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
threadWasPinned = true
runtime.LockOSThread()
// Set high priority for batch audio processing
if err := SetAudioThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority")
}
// Set high priority for batch audio processing - skip logging in hotpath
_ = SetAudioThreadPriority()
}
batchSize := len(batch)
@ -484,9 +474,8 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
// Add deferred function to release thread lock if we pinned it
if threadWasPinned {
defer func() {
if err := ResetThreadPriority(); err != nil {
bap.logger.Warn().Err(err).Msg("failed to reset thread priority")
}
// Skip logging in hotpath for performance
_ = ResetThreadPriority()
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.writePinned, 0)
bap.stats.WriteThreadTime += time.Since(start)

View File

@ -351,50 +351,29 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
}
func (p *AudioBufferPool) Get() []byte {
// Trigger periodic cleanup of goroutine cache
cleanupGoroutineCache()
start := time.Now()
wasHit := false
defer func() {
latency := time.Since(start)
// Record metrics for frame pool (assuming this is the main usage)
if p.bufferSize >= GetConfig().AudioFramePoolSize {
GetGranularMetricsCollector().RecordFramePoolGet(latency, wasHit)
} else {
GetGranularMetricsCollector().RecordControlPoolGet(latency, wasHit)
}
}()
// Skip cleanup trigger in hotpath - cleanup runs in background
// cleanupGoroutineCache() - moved to background goroutine
// Fast path: Try lock-free per-goroutine cache first
gid := getGoroutineID()
goroutineCacheMutex.RLock()
// Try new TTL-based cache first
cacheEntry, exists := goroutineCacheWithTTL[gid]
var cache *lockFreeBufferCache
if exists && cacheEntry != nil {
cache = cacheEntry.cache
// Update last access time
cacheEntry.lastAccess = time.Now().Unix()
} else {
// Fall back to legacy cache if needed
cache, exists = goroutineBufferCache[gid]
}
goroutineCacheMutex.RUnlock()
if exists && cache != nil {
if exists && cacheEntry != nil && cacheEntry.cache != nil {
// Try to get buffer from lock-free cache
cache := cacheEntry.cache
for i := 0; i < len(cache.buffers); i++ {
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
buf := (*[]byte)(atomic.LoadPointer(bufPtr))
if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) {
atomic.AddInt64(&p.hitCount, 1)
wasHit = true
*buf = (*buf)[:0]
return *buf
}
}
// Update access time only after cache miss to reduce overhead
cacheEntry.lastAccess = time.Now().Unix()
}
// Fallback: Try pre-allocated pool with mutex
@ -404,11 +383,7 @@ func (p *AudioBufferPool) Get() []byte {
buf := p.preallocated[lastIdx]
p.preallocated = p.preallocated[:lastIdx]
p.mutex.Unlock()
// Update hit counter
atomic.AddInt64(&p.hitCount, 1)
wasHit = true
// Ensure buffer is properly reset
*buf = (*buf)[:0]
return *buf
}
@ -417,20 +392,14 @@ func (p *AudioBufferPool) Get() []byte {
// Try sync.Pool next
if poolBuf := p.pool.Get(); poolBuf != nil {
buf := poolBuf.(*[]byte)
// Update hit counter
atomic.AddInt64(&p.hitCount, 1)
// Decrement pool size counter atomically
atomic.AddInt64(&p.currentSize, -1)
// Ensure buffer is properly reset and check capacity
// Fast capacity check - most buffers should be correct size
if cap(*buf) >= p.bufferSize {
wasHit = true
*buf = (*buf)[:0]
return *buf
} else {
// Buffer too small, allocate new one
atomic.AddInt64(&p.missCount, 1)
return make([]byte, 0, p.bufferSize)
}
// Buffer too small, fall through to allocation
}
// Pool miss - allocate new buffer with exact capacity
@ -439,18 +408,7 @@ func (p *AudioBufferPool) Get() []byte {
}
func (p *AudioBufferPool) Put(buf []byte) {
start := time.Now()
defer func() {
latency := time.Since(start)
// Record metrics for frame pool (assuming this is the main usage)
if p.bufferSize >= GetConfig().AudioFramePoolSize {
GetGranularMetricsCollector().RecordFramePoolPut(latency, cap(buf))
} else {
GetGranularMetricsCollector().RecordControlPoolPut(latency, cap(buf))
}
}()
// Validate buffer capacity - reject buffers that are too small or too large
// Fast validation - reject buffers that are too small or too large
bufCap := cap(buf)
if bufCap < p.bufferSize || bufCap > p.bufferSize*2 {
return // Buffer size mismatch, don't pool it to prevent memory bloat
@ -461,27 +419,19 @@ func (p *AudioBufferPool) Put(buf []byte) {
// Fast path: Try to put in lock-free per-goroutine cache
gid := getGoroutineID()
now := time.Now().Unix()
// Check if we have a TTL-based cache entry for this goroutine
goroutineCacheMutex.RLock()
entryWithTTL, exists := goroutineCacheWithTTL[gid]
goroutineCacheMutex.RUnlock()
var cache *lockFreeBufferCache
if exists && entryWithTTL != nil {
cache = entryWithTTL.cache
// Update last access time
entryWithTTL.lastAccess = now
// Update access time only when we successfully use the cache
} else {
// Fall back to legacy cache if needed
cache, exists = goroutineBufferCache[gid]
}
goroutineCacheMutex.RUnlock()
if !exists {
// Create new cache for this goroutine
cache = &lockFreeBufferCache{}
now := time.Now().Unix()
goroutineCacheMutex.Lock()
// Store in TTL-based cache
goroutineCacheWithTTL[gid] = &cacheEntry{
cache: cache,
lastAccess: now,
@ -495,6 +445,10 @@ func (p *AudioBufferPool) Put(buf []byte) {
for i := 0; i < len(cache.buffers); i++ {
bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i]))
if atomic.CompareAndSwapPointer(bufPtr, nil, unsafe.Pointer(&buf)) {
// Update access time only on successful cache
if exists && entryWithTTL != nil {
entryWithTTL.lastAccess = time.Now().Unix()
}
return // Successfully cached
}
}
@ -510,14 +464,12 @@ func (p *AudioBufferPool) Put(buf []byte) {
p.mutex.Unlock()
// Check sync.Pool size limit to prevent excessive memory usage
currentSize := atomic.LoadInt64(&p.currentSize)
if currentSize >= int64(p.maxPoolSize) {
if atomic.LoadInt64(&p.currentSize) >= int64(p.maxPoolSize) {
return // Pool is full, let GC handle this buffer
}
// Return to sync.Pool and update counter atomically
p.pool.Put(&resetBuf)
// Update pool size counter atomically
atomic.AddInt64(&p.currentSize, 1)
}

View File

@ -1397,14 +1397,17 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
}
}
// CGO function aliases
var (
CGOAudioInit = cgoAudioInit
CGOAudioClose = cgoAudioClose
CGOAudioReadEncode = cgoAudioReadEncode
CGOAudioPlaybackInit = cgoAudioPlaybackInit
CGOAudioPlaybackClose = cgoAudioPlaybackClose
CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite
CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers
CGOUpdateOpusEncoderParams = updateOpusEncoderParams
)
// Optimized CGO function aliases - use direct function calls to reduce overhead
// These are now direct function aliases instead of variable assignments
func CGOAudioInit() error { return cgoAudioInit() }
func CGOAudioClose() { cgoAudioClose() }
func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) }
func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() }
func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() }
func CGOAudioDecodeWriteLegacy(buf []byte) (int, error) { return cgoAudioDecodeWrite(buf) }
func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) {
return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer)
}
func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error {
return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx)
}

View File

@ -501,36 +501,26 @@ func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error {
// processOpusFrame processes an Opus audio frame
func (ais *AudioInputServer) processOpusFrame(data []byte) error {
if len(data) == 0 {
return nil // Empty frame, ignore
// Fast path: skip empty frame check - caller should handle this
dataLen := len(data)
if dataLen == 0 {
return nil
}
// Use ultra-fast validation for critical audio path
if err := ValidateAudioFrame(data); err != nil {
// Skip logging in hotpath to avoid overhead - validation errors are rare
return fmt.Errorf("input frame validation failed: %w", err)
// Inline validation for critical audio path - avoid function call overhead
if dataLen > cachedMaxFrameSize {
return ErrFrameDataTooLarge
}
// Get cached config for optimal performance
// Get cached config once - avoid repeated calls and locking
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Skip cache expiry check in hotpath - background updates handle this
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Process the Opus frame using optimized CGO implementation with separate buffers
// Direct CGO call - avoid wrapper function overhead
_, err := CGOAudioDecodeWrite(data, pcmBuffer)
return err
}
@ -720,25 +710,20 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
return fmt.Errorf("not connected to audio input server")
}
if len(frame) == 0 {
frameLen := len(frame)
if frameLen == 0 {
return nil // Empty frame, ignore
}
// Validate frame data before sending
if err := ValidateAudioFrame(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
if len(frame) > maxFrameSize {
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize)
// Inline frame validation to reduce function call overhead
if frameLen > maxFrameSize {
return ErrFrameDataTooLarge
}
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame,
Length: uint32(len(frame)),
Length: uint32(frameLen),
Timestamp: time.Now().UnixNano(),
Data: frame,
}
@ -755,26 +740,25 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
return fmt.Errorf("not connected to audio input server")
}
if frame == nil || frame.Length() == 0 {
if frame == nil {
return nil // Nil frame, ignore
}
frameLen := frame.Length()
if frameLen == 0 {
return nil // Empty frame, ignore
}
// Validate zero-copy frame before sending
if err := ValidateZeroCopyFrame(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Zero-copy frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
if frame.Length() > maxFrameSize {
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", frame.Length(), maxFrameSize)
// Inline frame validation to reduce function call overhead
if frameLen > maxFrameSize {
return ErrFrameDataTooLarge
}
// Use zero-copy data directly
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame,
Length: uint32(frame.Length()),
Length: uint32(frameLen),
Timestamp: time.Now().UnixNano(),
Data: frame.Data(), // Zero-copy data access
}
@ -945,10 +929,7 @@ func (ais *AudioInputServer) startReaderGoroutine() {
consecutiveErrors++
lastErrorTime = now
// Log error with context
logger.Warn().Err(err).
Int("consecutive_errors", consecutiveErrors).
Msg("Failed to read message from input connection")
// Skip logging in hotpath for performance - only log critical errors
// Progressive backoff based on error count
if consecutiveErrors > 1 {
@ -1019,16 +1000,12 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// Set high priority for audio processing
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if err := SetAudioThreadPriority(); err != nil {
logger.Warn().Err(err).Msg("Failed to set audio processing priority")
}
defer func() {
if err := ResetThreadPriority(); err != nil {
logger.Warn().Err(err).Msg("Failed to reset thread priority")
}
}()
// Set high priority for audio processing - skip logging in hotpath
_ = SetAudioThreadPriority()
defer func() { _ = ResetThreadPriority() }()
// Create logger for this goroutine
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
// Enhanced error tracking for processing
var processingErrors int
@ -1057,17 +1034,10 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
processingErrors++
lastProcessingError = now
logger.Warn().Err(err).
Int("processing_errors", processingErrors).
Dur("processing_time", processingTime).
Msg("Failed to process input message")
// Skip logging in hotpath for performance
// If too many processing errors, drop frames more aggressively
if processingErrors >= maxProcessingErrors {
logger.Error().
Int("processing_errors", processingErrors).
Msg("Too many processing errors, entering aggressive drop mode")
// Clear processing queue to recover
for len(ais.processChan) > 0 {
select {
@ -1085,7 +1055,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
// Reset error counter on successful processing
if processingErrors > 0 {
processingErrors = 0
logger.Info().Msg("Input processing recovered")
// Skip logging in hotpath for performance
}
// Update processing time metrics

View File

@ -426,44 +426,36 @@ func InitValidationCache() {
// This is the primary validation function used in all audio processing paths
//
// Performance optimizations:
// - Uses AudioConfigCache to eliminate GetConfig() call overhead
// - Uses cached max frame size to eliminate config lookups
// - Single branch condition for optimal CPU pipeline efficiency
// - Inlined length checks for minimal overhead
// - Pre-allocated error messages for minimal allocations
// - Minimal error allocation overhead
//
//go:inline
func ValidateAudioFrame(data []byte) error {
// Fast path: empty check first to avoid unnecessary cache access
// Fast path: check length against cached max size in single operation
dataLen := len(data)
if dataLen == 0 {
return ErrFrameDataEmpty
}
// Get cached config - this is a pointer access, not a function call
cache := GetCachedConfig()
// Use atomic access to maxAudioFrameSize for lock-free validation
maxSize := int(cache.maxAudioFrameSize.Load())
// If cache not initialized or value is zero, use global cached value or update
// Use global cached value for fastest access - updated during initialization
maxSize := cachedMaxFrameSize
if maxSize == 0 {
if cachedMaxFrameSize > 0 {
maxSize = cachedMaxFrameSize
} else {
// Fallback: get from cache only if global cache not initialized
cache := GetCachedConfig()
maxSize = int(cache.maxAudioFrameSize.Load())
if maxSize == 0 {
// Last resort: update cache and get fresh value
cache.Update()
maxSize = int(cache.maxAudioFrameSize.Load())
if maxSize == 0 {
// Fallback to global config if cache still not initialized
maxSize = GetConfig().MaxAudioFrameSize
}
}
// Cache the value globally for next calls
cachedMaxFrameSize = maxSize
}
// Optimized validation with error message
// Single comparison for validation
if dataLen > maxSize {
// Use formatted error since we can't guarantee pre-allocated error is available
return fmt.Errorf("%w: frame size %d exceeds maximum %d bytes",
ErrFrameDataTooLarge, dataLen, maxSize)
return ErrFrameDataTooLarge
}
return nil
}