diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index 13392232..50dbde1f 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -86,21 +86,24 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu cache := GetCachedConfig() cache.Update() - // Validate input parameters - if err := ValidateBufferSize(batchSize); err != nil { - logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() - logger.Warn().Err(err).Int("batchSize", batchSize).Msg("invalid batch size, using default") + // Validate input parameters with minimal overhead + if batchSize <= 0 || batchSize > 1000 { batchSize = cache.BatchProcessorFramesPerBatch } if batchDuration <= 0 { - logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() - logger.Warn().Dur("batchDuration", batchDuration).Msg("invalid batch duration, using default") batchDuration = cache.BatchProcessingDelay } ctx, cancel := context.WithCancel(context.Background()) + // Pre-allocate logger to avoid repeated allocations logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() + // Pre-calculate frame size to avoid repeated GetConfig() calls + frameSize := cache.GetMinReadEncodeBuffer() + if frameSize == 0 { + frameSize = 1500 // Safe fallback + } + processor := &BatchAudioProcessor{ ctx: ctx, cancel: cancel, @@ -111,12 +114,14 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu writeQueue: make(chan batchWriteRequest, batchSize*2), readBufPool: &sync.Pool{ New: func() interface{} { - return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size + // Use pre-calculated frame size to avoid GetConfig() calls + return make([]byte, 0, frameSize) }, }, writeBufPool: &sync.Pool{ New: func() interface{} { - return make([]byte, GetConfig().AudioFramePoolSize) // Max audio frame size + // Use pre-calculated frame size to avoid GetConfig() calls + return make([]byte, 0, frameSize) }, }, } @@ -386,65 +391,52 @@ func (bap *BatchAudioProcessor) batchWriteProcessor() { // processBatchRead processes a batch of read requests efficiently func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { - if len(batch) == 0 { + batchSize := len(batch) + if batchSize == 0 { return } - // Get cached config to avoid GetConfig() calls in hot path + // Get cached config once - avoid repeated calls cache := GetCachedConfig() + minBatchSize := cache.MinBatchSizeForThreadPinning // Only pin to OS thread for large batches to reduce thread contention - start := time.Now() - shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning - - // Track if we pinned the thread in this call + var start time.Time threadWasPinned := false - - if shouldPinThread && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { + if batchSize >= minBatchSize && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { + start = time.Now() threadWasPinned = true runtime.LockOSThread() - - // Set high priority for batch audio processing - if err := SetAudioThreadPriority(); err != nil { - bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority") - } + // Skip priority setting for better performance - audio threads already have good priority } - batchSize := len(batch) + // Update stats efficiently atomic.AddInt64(&bap.stats.BatchedReads, 1) atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) if batchSize > 1 { atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) } - // Add deferred function to release thread lock if we pinned it - if threadWasPinned { - defer func() { - if err := ResetThreadPriority(); err != nil { - bap.logger.Warn().Err(err).Msg("failed to reset thread priority") - } - runtime.UnlockOSThread() - atomic.StoreInt32(&bap.threadPinned, 0) - bap.stats.OSThreadPinTime += time.Since(start) - }() - } - - // Process each request in the batch - for _, req := range batch { + // Process each request in the batch with minimal overhead + for i := range batch { + req := &batch[i] length, err := CGOAudioReadEncode(req.buffer) - result := batchReadResult{ - length: length, - err: err, - } - // Send result back (non-blocking) + // Send result back (non-blocking) - reuse result struct select { - case req.resultChan <- result: + case req.resultChan <- batchReadResult{length: length, err: err}: default: // Requestor timed out, drop result } } + // Release thread lock if we pinned it + if threadWasPinned { + runtime.UnlockOSThread() + atomic.StoreInt32(&bap.threadPinned, 0) + bap.stats.OSThreadPinTime += time.Since(start) + } + bap.stats.LastBatchTime = time.Now() } @@ -468,10 +460,8 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { threadWasPinned = true runtime.LockOSThread() - // Set high priority for batch audio processing - if err := SetAudioThreadPriority(); err != nil { - bap.logger.Warn().Err(err).Msg("failed to set batch audio processing priority") - } + // Set high priority for batch audio processing - skip logging in hotpath + _ = SetAudioThreadPriority() } batchSize := len(batch) @@ -484,9 +474,8 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { // Add deferred function to release thread lock if we pinned it if threadWasPinned { defer func() { - if err := ResetThreadPriority(); err != nil { - bap.logger.Warn().Err(err).Msg("failed to reset thread priority") - } + // Skip logging in hotpath for performance + _ = ResetThreadPriority() runtime.UnlockOSThread() atomic.StoreInt32(&bap.writePinned, 0) bap.stats.WriteThreadTime += time.Since(start) diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index acf74fc7..8573a399 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -351,50 +351,29 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { } func (p *AudioBufferPool) Get() []byte { - // Trigger periodic cleanup of goroutine cache - cleanupGoroutineCache() - - start := time.Now() - wasHit := false - defer func() { - latency := time.Since(start) - // Record metrics for frame pool (assuming this is the main usage) - if p.bufferSize >= GetConfig().AudioFramePoolSize { - GetGranularMetricsCollector().RecordFramePoolGet(latency, wasHit) - } else { - GetGranularMetricsCollector().RecordControlPoolGet(latency, wasHit) - } - }() + // Skip cleanup trigger in hotpath - cleanup runs in background + // cleanupGoroutineCache() - moved to background goroutine // Fast path: Try lock-free per-goroutine cache first gid := getGoroutineID() goroutineCacheMutex.RLock() - - // Try new TTL-based cache first cacheEntry, exists := goroutineCacheWithTTL[gid] - var cache *lockFreeBufferCache - if exists && cacheEntry != nil { - cache = cacheEntry.cache - // Update last access time - cacheEntry.lastAccess = time.Now().Unix() - } else { - // Fall back to legacy cache if needed - cache, exists = goroutineBufferCache[gid] - } goroutineCacheMutex.RUnlock() - if exists && cache != nil { + if exists && cacheEntry != nil && cacheEntry.cache != nil { // Try to get buffer from lock-free cache + cache := cacheEntry.cache for i := 0; i < len(cache.buffers); i++ { bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i])) buf := (*[]byte)(atomic.LoadPointer(bufPtr)) if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) { atomic.AddInt64(&p.hitCount, 1) - wasHit = true *buf = (*buf)[:0] return *buf } } + // Update access time only after cache miss to reduce overhead + cacheEntry.lastAccess = time.Now().Unix() } // Fallback: Try pre-allocated pool with mutex @@ -404,11 +383,7 @@ func (p *AudioBufferPool) Get() []byte { buf := p.preallocated[lastIdx] p.preallocated = p.preallocated[:lastIdx] p.mutex.Unlock() - - // Update hit counter atomic.AddInt64(&p.hitCount, 1) - wasHit = true - // Ensure buffer is properly reset *buf = (*buf)[:0] return *buf } @@ -417,20 +392,14 @@ func (p *AudioBufferPool) Get() []byte { // Try sync.Pool next if poolBuf := p.pool.Get(); poolBuf != nil { buf := poolBuf.(*[]byte) - // Update hit counter atomic.AddInt64(&p.hitCount, 1) - // Decrement pool size counter atomically atomic.AddInt64(&p.currentSize, -1) - // Ensure buffer is properly reset and check capacity + // Fast capacity check - most buffers should be correct size if cap(*buf) >= p.bufferSize { - wasHit = true *buf = (*buf)[:0] return *buf - } else { - // Buffer too small, allocate new one - atomic.AddInt64(&p.missCount, 1) - return make([]byte, 0, p.bufferSize) } + // Buffer too small, fall through to allocation } // Pool miss - allocate new buffer with exact capacity @@ -439,18 +408,7 @@ func (p *AudioBufferPool) Get() []byte { } func (p *AudioBufferPool) Put(buf []byte) { - start := time.Now() - defer func() { - latency := time.Since(start) - // Record metrics for frame pool (assuming this is the main usage) - if p.bufferSize >= GetConfig().AudioFramePoolSize { - GetGranularMetricsCollector().RecordFramePoolPut(latency, cap(buf)) - } else { - GetGranularMetricsCollector().RecordControlPoolPut(latency, cap(buf)) - } - }() - - // Validate buffer capacity - reject buffers that are too small or too large + // Fast validation - reject buffers that are too small or too large bufCap := cap(buf) if bufCap < p.bufferSize || bufCap > p.bufferSize*2 { return // Buffer size mismatch, don't pool it to prevent memory bloat @@ -461,27 +419,19 @@ func (p *AudioBufferPool) Put(buf []byte) { // Fast path: Try to put in lock-free per-goroutine cache gid := getGoroutineID() - now := time.Now().Unix() - - // Check if we have a TTL-based cache entry for this goroutine goroutineCacheMutex.RLock() entryWithTTL, exists := goroutineCacheWithTTL[gid] + goroutineCacheMutex.RUnlock() + var cache *lockFreeBufferCache if exists && entryWithTTL != nil { cache = entryWithTTL.cache - // Update last access time - entryWithTTL.lastAccess = now + // Update access time only when we successfully use the cache } else { - // Fall back to legacy cache if needed - cache, exists = goroutineBufferCache[gid] - } - goroutineCacheMutex.RUnlock() - - if !exists { // Create new cache for this goroutine cache = &lockFreeBufferCache{} + now := time.Now().Unix() goroutineCacheMutex.Lock() - // Store in TTL-based cache goroutineCacheWithTTL[gid] = &cacheEntry{ cache: cache, lastAccess: now, @@ -495,6 +445,10 @@ func (p *AudioBufferPool) Put(buf []byte) { for i := 0; i < len(cache.buffers); i++ { bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i])) if atomic.CompareAndSwapPointer(bufPtr, nil, unsafe.Pointer(&buf)) { + // Update access time only on successful cache + if exists && entryWithTTL != nil { + entryWithTTL.lastAccess = time.Now().Unix() + } return // Successfully cached } } @@ -510,14 +464,12 @@ func (p *AudioBufferPool) Put(buf []byte) { p.mutex.Unlock() // Check sync.Pool size limit to prevent excessive memory usage - currentSize := atomic.LoadInt64(&p.currentSize) - if currentSize >= int64(p.maxPoolSize) { + if atomic.LoadInt64(&p.currentSize) >= int64(p.maxPoolSize) { return // Pool is full, let GC handle this buffer } // Return to sync.Pool and update counter atomically p.pool.Put(&resetBuf) - // Update pool size counter atomically atomic.AddInt64(&p.currentSize, 1) } diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 35187858..04bcdde8 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -1397,14 +1397,17 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err } } -// CGO function aliases -var ( - CGOAudioInit = cgoAudioInit - CGOAudioClose = cgoAudioClose - CGOAudioReadEncode = cgoAudioReadEncode - CGOAudioPlaybackInit = cgoAudioPlaybackInit - CGOAudioPlaybackClose = cgoAudioPlaybackClose - CGOAudioDecodeWriteLegacy = cgoAudioDecodeWrite - CGOAudioDecodeWrite = cgoAudioDecodeWriteWithBuffers - CGOUpdateOpusEncoderParams = updateOpusEncoderParams -) +// Optimized CGO function aliases - use direct function calls to reduce overhead +// These are now direct function aliases instead of variable assignments +func CGOAudioInit() error { return cgoAudioInit() } +func CGOAudioClose() { cgoAudioClose() } +func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) } +func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() } +func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() } +func CGOAudioDecodeWriteLegacy(buf []byte) (int, error) { return cgoAudioDecodeWrite(buf) } +func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) { + return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer) +} +func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error { + return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx) +} diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 6a42998f..e04fe573 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -501,36 +501,26 @@ func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error { // processOpusFrame processes an Opus audio frame func (ais *AudioInputServer) processOpusFrame(data []byte) error { - if len(data) == 0 { - return nil // Empty frame, ignore + // Fast path: skip empty frame check - caller should handle this + dataLen := len(data) + if dataLen == 0 { + return nil } - // Use ultra-fast validation for critical audio path - if err := ValidateAudioFrame(data); err != nil { - // Skip logging in hotpath to avoid overhead - validation errors are rare - return fmt.Errorf("input frame validation failed: %w", err) + // Inline validation for critical audio path - avoid function call overhead + if dataLen > cachedMaxFrameSize { + return ErrFrameDataTooLarge } - // Get cached config for optimal performance + // Get cached config once - avoid repeated calls and locking cache := GetCachedConfig() - // Only update cache if expired - avoid unnecessary overhead - // Use proper locking to avoid race condition - if cache.initialized.Load() { - cache.mutex.RLock() - cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry - cache.mutex.RUnlock() - if cacheExpired { - cache.Update() - } - } else { - cache.Update() - } + // Skip cache expiry check in hotpath - background updates handle this // Get a PCM buffer from the pool for optimized decode-write pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize()) defer ReturnBufferToPool(pcmBuffer) - // Process the Opus frame using optimized CGO implementation with separate buffers + // Direct CGO call - avoid wrapper function overhead _, err := CGOAudioDecodeWrite(data, pcmBuffer) return err } @@ -720,25 +710,20 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error { return fmt.Errorf("not connected to audio input server") } - if len(frame) == 0 { + frameLen := len(frame) + if frameLen == 0 { return nil // Empty frame, ignore } - // Validate frame data before sending - if err := ValidateAudioFrame(frame); err != nil { - logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - logger.Error().Err(err).Msg("Frame validation failed") - return fmt.Errorf("input frame validation failed: %w", err) - } - - if len(frame) > maxFrameSize { - return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize) + // Inline frame validation to reduce function call overhead + if frameLen > maxFrameSize { + return ErrFrameDataTooLarge } msg := &InputIPCMessage{ Magic: inputMagicNumber, Type: InputMessageTypeOpusFrame, - Length: uint32(len(frame)), + Length: uint32(frameLen), Timestamp: time.Now().UnixNano(), Data: frame, } @@ -755,26 +740,25 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error return fmt.Errorf("not connected to audio input server") } - if frame == nil || frame.Length() == 0 { + if frame == nil { + return nil // Nil frame, ignore + } + + frameLen := frame.Length() + if frameLen == 0 { return nil // Empty frame, ignore } - // Validate zero-copy frame before sending - if err := ValidateZeroCopyFrame(frame); err != nil { - logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - logger.Error().Err(err).Msg("Zero-copy frame validation failed") - return fmt.Errorf("input frame validation failed: %w", err) - } - - if frame.Length() > maxFrameSize { - return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", frame.Length(), maxFrameSize) + // Inline frame validation to reduce function call overhead + if frameLen > maxFrameSize { + return ErrFrameDataTooLarge } // Use zero-copy data directly msg := &InputIPCMessage{ Magic: inputMagicNumber, Type: InputMessageTypeOpusFrame, - Length: uint32(frame.Length()), + Length: uint32(frameLen), Timestamp: time.Now().UnixNano(), Data: frame.Data(), // Zero-copy data access } @@ -945,10 +929,7 @@ func (ais *AudioInputServer) startReaderGoroutine() { consecutiveErrors++ lastErrorTime = now - // Log error with context - logger.Warn().Err(err). - Int("consecutive_errors", consecutiveErrors). - Msg("Failed to read message from input connection") + // Skip logging in hotpath for performance - only log critical errors // Progressive backoff based on error count if consecutiveErrors > 1 { @@ -1019,16 +1000,12 @@ func (ais *AudioInputServer) startProcessorGoroutine() { runtime.LockOSThread() defer runtime.UnlockOSThread() - // Set high priority for audio processing - logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if err := SetAudioThreadPriority(); err != nil { - logger.Warn().Err(err).Msg("Failed to set audio processing priority") - } - defer func() { - if err := ResetThreadPriority(); err != nil { - logger.Warn().Err(err).Msg("Failed to reset thread priority") - } - }() + // Set high priority for audio processing - skip logging in hotpath + _ = SetAudioThreadPriority() + defer func() { _ = ResetThreadPriority() }() + + // Create logger for this goroutine + logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() // Enhanced error tracking for processing var processingErrors int @@ -1057,17 +1034,10 @@ func (ais *AudioInputServer) startProcessorGoroutine() { processingErrors++ lastProcessingError = now - logger.Warn().Err(err). - Int("processing_errors", processingErrors). - Dur("processing_time", processingTime). - Msg("Failed to process input message") + // Skip logging in hotpath for performance // If too many processing errors, drop frames more aggressively if processingErrors >= maxProcessingErrors { - logger.Error(). - Int("processing_errors", processingErrors). - Msg("Too many processing errors, entering aggressive drop mode") - // Clear processing queue to recover for len(ais.processChan) > 0 { select { @@ -1085,7 +1055,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() { // Reset error counter on successful processing if processingErrors > 0 { processingErrors = 0 - logger.Info().Msg("Input processing recovered") + // Skip logging in hotpath for performance } // Update processing time metrics diff --git a/internal/audio/validation.go b/internal/audio/validation.go index 06381fd8..1bfbfbcf 100644 --- a/internal/audio/validation.go +++ b/internal/audio/validation.go @@ -426,44 +426,36 @@ func InitValidationCache() { // This is the primary validation function used in all audio processing paths // // Performance optimizations: -// - Uses AudioConfigCache to eliminate GetConfig() call overhead +// - Uses cached max frame size to eliminate config lookups // - Single branch condition for optimal CPU pipeline efficiency -// - Inlined length checks for minimal overhead -// - Pre-allocated error messages for minimal allocations +// - Minimal error allocation overhead // //go:inline func ValidateAudioFrame(data []byte) error { - // Fast path: empty check first to avoid unnecessary cache access + // Fast path: check length against cached max size in single operation dataLen := len(data) if dataLen == 0 { return ErrFrameDataEmpty } - // Get cached config - this is a pointer access, not a function call - cache := GetCachedConfig() - - // Use atomic access to maxAudioFrameSize for lock-free validation - maxSize := int(cache.maxAudioFrameSize.Load()) - - // If cache not initialized or value is zero, use global cached value or update + // Use global cached value for fastest access - updated during initialization + maxSize := cachedMaxFrameSize if maxSize == 0 { - if cachedMaxFrameSize > 0 { - maxSize = cachedMaxFrameSize - } else { + // Fallback: get from cache only if global cache not initialized + cache := GetCachedConfig() + maxSize = int(cache.maxAudioFrameSize.Load()) + if maxSize == 0 { + // Last resort: update cache and get fresh value cache.Update() maxSize = int(cache.maxAudioFrameSize.Load()) - if maxSize == 0 { - // Fallback to global config if cache still not initialized - maxSize = GetConfig().MaxAudioFrameSize - } } + // Cache the value globally for next calls + cachedMaxFrameSize = maxSize } - // Optimized validation with error message + // Single comparison for validation if dataLen > maxSize { - // Use formatted error since we can't guarantee pre-allocated error is available - return fmt.Errorf("%w: frame size %d exceeds maximum %d bytes", - ErrFrameDataTooLarge, dataLen, maxSize) + return ErrFrameDataTooLarge } return nil }