diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 2912049a..3ed023dc 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -355,19 +355,30 @@ func GetAudioMetrics() AudioMetrics { } } -// RecordFrameReceived increments the frames received counter +// RecordFrameReceived increments the frames received counter with simplified tracking func RecordFrameReceived(bytes int) { + // Direct atomic updates to avoid sampling complexity in critical path atomic.AddInt64(&metrics.FramesReceived, 1) atomic.AddInt64(&metrics.BytesProcessed, int64(bytes)) + + // Always update timestamp for accurate last frame tracking metrics.LastFrameTime = time.Now() } -// RecordFrameDropped increments the frames dropped counter +// RecordFrameDropped increments the frames dropped counter with simplified tracking func RecordFrameDropped() { + // Direct atomic update to avoid sampling complexity in critical path atomic.AddInt64(&metrics.FramesDropped, 1) } -// RecordConnectionDrop increments the connection drops counter +// RecordConnectionDrop increments the connection drops counter with simplified tracking func RecordConnectionDrop() { + // Direct atomic update to avoid sampling complexity in critical path atomic.AddInt64(&metrics.ConnectionDrops, 1) } + +// FlushPendingMetrics is now a no-op since we use direct atomic updates +func FlushPendingMetrics() { + // No-op: metrics are now updated directly without local buffering + // This function is kept for API compatibility +} diff --git a/internal/audio/base_manager.go b/internal/audio/base_manager.go index 909c31d4..f5838087 100644 --- a/internal/audio/base_manager.go +++ b/internal/audio/base_manager.go @@ -23,6 +23,7 @@ type BaseAudioMetrics struct { // BaseAudioManager provides common functionality for audio managers type BaseAudioManager struct { + // Core metrics and state metrics BaseAudioMetrics logger zerolog.Logger running int32 @@ -58,6 +59,12 @@ func (bam *BaseAudioManager) resetMetrics() { bam.metrics.AverageLatency = 0 } +// flushPendingMetrics is now a no-op since we use direct atomic updates +func (bam *BaseAudioManager) flushPendingMetrics() { + // No-op: metrics are now updated directly without local buffering + // This function is kept for API compatibility +} + // getBaseMetrics returns a copy of the base metrics func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics { return BaseAudioMetrics{ @@ -70,15 +77,19 @@ func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics { } } -// recordFrameProcessed records a processed frame +// recordFrameProcessed records a processed frame with simplified tracking func (bam *BaseAudioManager) recordFrameProcessed(bytes int) { + // Direct atomic updates to avoid sampling complexity in critical path atomic.AddInt64(&bam.metrics.FramesProcessed, 1) atomic.AddInt64(&bam.metrics.BytesProcessed, int64(bytes)) + + // Always update timestamp for accurate last frame tracking bam.metrics.LastFrameTime = time.Now() } -// recordFrameDropped records a dropped frame +// recordFrameDropped records a dropped frame with simplified tracking func (bam *BaseAudioManager) recordFrameDropped() { + // Direct atomic update to avoid sampling complexity in critical path atomic.AddInt64(&bam.metrics.FramesDropped, 1) } diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index 50dbde1f..d6f5a860 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -173,14 +173,20 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { // Validate buffer before processing if err := ValidateBufferSize(len(buffer)); err != nil { - bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") + // Only log validation errors in debug mode to reduce overhead + if bap.logger.GetLevel() <= zerolog.DebugLevel { + bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") + } return 0, err } if !bap.IsRunning() { // Fallback to single operation if batch processor is not running - atomic.AddInt64(&bap.stats.SingleReads, 1) - atomic.AddInt64(&bap.stats.SingleFrames, 1) + // Use sampling to reduce atomic operations overhead + if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { + atomic.AddInt64(&bap.stats.SingleReads, 10) + atomic.AddInt64(&bap.stats.SingleFrames, 10) + } return CGOAudioReadEncode(buffer) } @@ -197,8 +203,11 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { // Successfully queued default: // Queue is full, fallback to single operation - atomic.AddInt64(&bap.stats.SingleReads, 1) - atomic.AddInt64(&bap.stats.SingleFrames, 1) + // Use sampling to reduce atomic operations overhead + if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { + atomic.AddInt64(&bap.stats.SingleReads, 10) + atomic.AddInt64(&bap.stats.SingleFrames, 10) + } return CGOAudioReadEncode(buffer) } @@ -208,8 +217,11 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { return result.length, result.err case <-time.After(cache.BatchProcessingTimeout): // Timeout, fallback to single operation - atomic.AddInt64(&bap.stats.SingleReads, 1) - atomic.AddInt64(&bap.stats.SingleFrames, 1) + // Use sampling to reduce atomic operations overhead + if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 { + atomic.AddInt64(&bap.stats.SingleReads, 10) + atomic.AddInt64(&bap.stats.SingleFrames, 10) + } return CGOAudioReadEncode(buffer) } } @@ -223,14 +235,20 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // Validate buffer before processing if err := ValidateBufferSize(len(buffer)); err != nil { - bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") + // Only log validation errors in debug mode to reduce overhead + if bap.logger.GetLevel() <= zerolog.DebugLevel { + bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing") + } return 0, err } if !bap.IsRunning() { // Fallback to single operation if batch processor is not running - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.WriteFrames, 1) + // Use sampling to reduce atomic operations overhead + if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { + atomic.AddInt64(&bap.stats.SingleWrites, 10) + atomic.AddInt64(&bap.stats.WriteFrames, 10) + } return CGOAudioDecodeWriteLegacy(buffer) } @@ -247,8 +265,11 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { // Successfully queued default: // Queue is full, fall back to single operation - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.WriteFrames, 1) + // Use sampling to reduce atomic operations overhead + if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { + atomic.AddInt64(&bap.stats.SingleWrites, 10) + atomic.AddInt64(&bap.stats.WriteFrames, 10) + } return CGOAudioDecodeWriteLegacy(buffer) } @@ -257,8 +278,11 @@ func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { case result := <-resultChan: return result.length, result.err case <-time.After(cache.BatchProcessingTimeout): - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.WriteFrames, 1) + // Use sampling to reduce atomic operations overhead + if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 { + atomic.AddInt64(&bap.stats.SingleWrites, 10) + atomic.AddInt64(&bap.stats.WriteFrames, 10) + } return CGOAudioDecodeWriteLegacy(buffer) } } diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index 8573a399..e04e876b 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -367,6 +367,7 @@ func (p *AudioBufferPool) Get() []byte { bufPtr := (*unsafe.Pointer)(unsafe.Pointer(&cache.buffers[i])) buf := (*[]byte)(atomic.LoadPointer(bufPtr)) if buf != nil && atomic.CompareAndSwapPointer(bufPtr, unsafe.Pointer(buf), nil) { + // Direct hit count update to avoid sampling complexity in critical path atomic.AddInt64(&p.hitCount, 1) *buf = (*buf)[:0] return *buf @@ -383,6 +384,7 @@ func (p *AudioBufferPool) Get() []byte { buf := p.preallocated[lastIdx] p.preallocated = p.preallocated[:lastIdx] p.mutex.Unlock() + // Direct hit count update to avoid sampling complexity in critical path atomic.AddInt64(&p.hitCount, 1) *buf = (*buf)[:0] return *buf @@ -392,6 +394,7 @@ func (p *AudioBufferPool) Get() []byte { // Try sync.Pool next if poolBuf := p.pool.Get(); poolBuf != nil { buf := poolBuf.(*[]byte) + // Direct hit count update to avoid sampling complexity in critical path atomic.AddInt64(&p.hitCount, 1) atomic.AddInt64(&p.currentSize, -1) // Fast capacity check - most buffers should be correct size @@ -403,6 +406,7 @@ func (p *AudioBufferPool) Get() []byte { } // Pool miss - allocate new buffer with exact capacity + // Direct miss count update to avoid sampling complexity in critical path atomic.AddInt64(&p.missCount, 1) return make([]byte, 0, p.bufferSize) } diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 04bcdde8..ad89d634 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -712,7 +712,17 @@ func cgoAudioClose() { // AudioConfigCache provides a comprehensive caching system for audio configuration // to minimize GetConfig() calls in the hot path type AudioConfigCache struct { - // Atomic fields for lock-free access to frequently used values + // Atomic int64 fields MUST be first for ARM32 alignment (8-byte alignment required) + minFrameDuration atomic.Int64 // Store as nanoseconds + maxFrameDuration atomic.Int64 // Store as nanoseconds + maxLatency atomic.Int64 // Store as nanoseconds + minMetricsUpdateInterval atomic.Int64 // Store as nanoseconds + maxMetricsUpdateInterval atomic.Int64 // Store as nanoseconds + restartWindow atomic.Int64 // Store as nanoseconds + restartDelay atomic.Int64 // Store as nanoseconds + maxRestartDelay atomic.Int64 // Store as nanoseconds + + // Atomic int32 fields for lock-free access to frequently used values minReadEncodeBuffer atomic.Int32 maxDecodeWriteBuffer atomic.Int32 maxPacketSize atomic.Int32 @@ -731,11 +741,15 @@ type AudioConfigCache struct { // Additional cached values for validation functions maxAudioFrameSize atomic.Int32 maxChannels atomic.Int32 - minFrameDuration atomic.Int64 // Store as nanoseconds - maxFrameDuration atomic.Int64 // Store as nanoseconds minOpusBitrate atomic.Int32 maxOpusBitrate atomic.Int32 + // Socket and buffer configuration values + socketMaxBuffer atomic.Int32 + socketMinBuffer atomic.Int32 + inputProcessingTimeoutMS atomic.Int32 + maxRestartAttempts atomic.Int32 + // Batch processing related values BatchProcessingTimeout time.Duration BatchProcessorFramesPerBatch int diff --git a/internal/audio/input.go b/internal/audio/input.go index 6ef31d9f..90575f82 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -65,6 +65,9 @@ func (aim *AudioInputManager) Stop() { aim.logComponentStop(AudioInputManagerComponent) + // Flush any pending sampled metrics before stopping + aim.flushPendingMetrics() + // Stop the IPC-based audio input aim.ipcManager.Stop() diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index e04fe573..8b1a84a0 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -942,9 +942,12 @@ func (ais *AudioInputServer) startReaderGoroutine() { // If too many consecutive errors, close connection to force reconnect if consecutiveErrors >= maxConsecutiveErrors { - logger.Error(). - Int("consecutive_errors", consecutiveErrors). - Msg("Too many consecutive read errors, closing connection") + // Only log critical errors to reduce hotpath overhead + if logger.GetLevel() <= zerolog.ErrorLevel { + logger.Error(). + Int("consecutive_errors", consecutiveErrors). + Msg("Too many consecutive read errors, closing connection") + } ais.mtx.Lock() if ais.conn != nil { @@ -961,7 +964,10 @@ func (ais *AudioInputServer) startReaderGoroutine() { // Reset error counter on successful read if consecutiveErrors > 0 { consecutiveErrors = 0 - logger.Info().Msg("Input connection recovered") + // Only log recovery info if debug level enabled to reduce overhead + if logger.GetLevel() <= zerolog.InfoLevel { + logger.Info().Msg("Input connection recovered") + } } // Send to message channel with non-blocking write @@ -971,7 +977,11 @@ func (ais *AudioInputServer) startReaderGoroutine() { default: // Channel full, drop message atomic.AddInt64(&ais.droppedFrames, 1) - logger.Warn().Msg("Message channel full, dropping frame") + // Avoid sampling logic in critical path - only log if warn level enabled + if logger.GetLevel() <= zerolog.WarnLevel { + droppedCount := atomic.LoadInt64(&ais.droppedFrames) + logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame") + } } } else { // No connection, wait briefly before checking again @@ -985,7 +995,10 @@ func (ais *AudioInputServer) startReaderGoroutine() { logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() if !SubmitAudioReaderTask(readerTask) { // If the pool is full or shutting down, fall back to direct goroutine creation - logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") + // Only log if warn level enabled - avoid sampling logic in critical path + if logger.GetLevel() <= zerolog.WarnLevel { + logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation") + } go readerTask() } diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 88a57e36..32880875 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -18,11 +18,17 @@ import ( // AudioOutputStreamer manages high-performance audio output streaming type AudioOutputStreamer struct { - // Performance metrics (atomic operations for thread safety) + // Atomic int64 fields MUST be first for ARM32 alignment (8-byte alignment required) processedFrames int64 // Total processed frames counter (atomic) droppedFrames int64 // Dropped frames counter (atomic) processingTime int64 // Average processing time in nanoseconds (atomic) lastStatsTime int64 // Last statistics update time (atomic) + frameCounter int64 // Local counter for sampling + localProcessed int64 // Local processed frame accumulator + localDropped int64 // Local dropped frame accumulator + + // Other fields after atomic int64 fields + sampleRate int32 // Sample every N frames (default: 10) client *AudioOutputClient bufferPool *AudioBufferPool @@ -70,6 +76,7 @@ func NewAudioOutputStreamer() (*AudioOutputStreamer, error) { processingChan: make(chan []byte, GetConfig().ChannelBufferSize), // Large buffer for smooth processing statsInterval: GetConfig().StatsUpdateInterval, // Statistics interval from config lastStatsTime: time.Now().UnixNano(), + sampleRate: 10, // Update metrics every 10 frames to reduce atomic ops }, nil } @@ -108,6 +115,9 @@ func (s *AudioOutputStreamer) Stop() { s.running = false s.cancel() + // Flush any pending sampled metrics before stopping + s.flushPendingMetrics() + // Close processing channel to signal goroutines (only if not already closed) if !s.chanClosed { close(s.processingChan) @@ -194,11 +204,17 @@ func (s *AudioOutputStreamer) processingLoop() { // Set high priority for audio output processing if err := SetAudioThreadPriority(); err != nil { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to set audio output processing priority") + // Only log priority warnings if warn level enabled to reduce overhead + if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { + getOutputStreamingLogger().Warn().Err(err).Msg("Failed to set audio output processing priority") + } } defer func() { if err := ResetThreadPriority(); err != nil { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reset thread priority") + // Only log priority warnings if warn level enabled to reduce overhead + if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { + getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reset thread priority") + } } }() @@ -209,15 +225,23 @@ func (s *AudioOutputStreamer) processingLoop() { if _, err := s.client.ReceiveFrame(); err != nil { if s.client.IsConnected() { - getOutputStreamingLogger().Warn().Err(err).Msg("Error reading audio frame from output server") - atomic.AddInt64(&s.droppedFrames, 1) + // Sample logging to reduce overhead - log every 50th error + if atomic.LoadInt64(&s.droppedFrames)%50 == 0 && getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { + getOutputStreamingLogger().Warn().Err(err).Msg("Error reading audio frame from output server") + } + s.recordFrameDropped() } // Try to reconnect if disconnected if !s.client.IsConnected() { if err := s.client.Connect(); err != nil { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reconnect") + // Only log reconnection failures if warn level enabled + if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { + getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reconnect") + } } } + } else { + s.recordFrameProcessed() } }() } @@ -258,8 +282,51 @@ func (s *AudioOutputStreamer) reportStatistics() { } } -// GetStats returns streaming statistics +// recordFrameProcessed records a processed frame with sampling optimization +func (s *AudioOutputStreamer) recordFrameProcessed() { + // Increment local counters + frameCount := atomic.AddInt64(&s.frameCounter, 1) + atomic.AddInt64(&s.localProcessed, 1) + + // Update metrics only every N frames to reduce atomic operation overhead + if frameCount%int64(atomic.LoadInt32(&s.sampleRate)) == 0 { + // Batch update atomic metrics + localProcessed := atomic.SwapInt64(&s.localProcessed, 0) + atomic.AddInt64(&s.processedFrames, localProcessed) + } +} + +// recordFrameDropped records a dropped frame with sampling optimization +func (s *AudioOutputStreamer) recordFrameDropped() { + // Increment local counter + localDropped := atomic.AddInt64(&s.localDropped, 1) + + // Update atomic metrics every N dropped frames + if localDropped%int64(atomic.LoadInt32(&s.sampleRate)) == 0 { + atomic.AddInt64(&s.droppedFrames, int64(atomic.LoadInt32(&s.sampleRate))) + atomic.StoreInt64(&s.localDropped, 0) + } +} + +// flushPendingMetrics flushes any pending sampled metrics to atomic counters +func (s *AudioOutputStreamer) flushPendingMetrics() { + // Flush remaining processed and dropped frames + localProcessed := atomic.SwapInt64(&s.localProcessed, 0) + localDropped := atomic.SwapInt64(&s.localDropped, 0) + + if localProcessed > 0 { + atomic.AddInt64(&s.processedFrames, localProcessed) + } + if localDropped > 0 { + atomic.AddInt64(&s.droppedFrames, localDropped) + } +} + +// GetStats returns streaming statistics with pending metrics flushed func (s *AudioOutputStreamer) GetStats() (processed, dropped int64, avgProcessingTime time.Duration) { + // Flush pending metrics for accurate reading + s.flushPendingMetrics() + processed = atomic.LoadInt64(&s.processedFrames) dropped = atomic.LoadInt64(&s.droppedFrames) processingTimeNs := atomic.LoadInt64(&s.processingTime) @@ -269,6 +336,9 @@ func (s *AudioOutputStreamer) GetStats() (processed, dropped int64, avgProcessin // GetDetailedStats returns comprehensive streaming statistics func (s *AudioOutputStreamer) GetDetailedStats() map[string]interface{} { + // Flush pending metrics for accurate reading + s.flushPendingMetrics() + processed := atomic.LoadInt64(&s.processedFrames) dropped := atomic.LoadInt64(&s.droppedFrames) processingTime := atomic.LoadInt64(&s.processingTime) diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go index 2d96a7c0..775d4586 100644 --- a/internal/audio/supervisor.go +++ b/internal/audio/supervisor.go @@ -11,6 +11,8 @@ import ( "sync/atomic" "syscall" "time" + + "github.com/rs/zerolog" ) // Restart configuration is now retrieved from centralized config @@ -180,16 +182,25 @@ func (s *AudioOutputSupervisor) supervisionLoop() { default: // Start or restart the process if err := s.startProcess(); err != nil { - s.logger.Error().Err(err).Msg("failed to start audio server process") + // Only log start errors if error level enabled to reduce overhead + if s.logger.GetLevel() <= zerolog.ErrorLevel { + s.logger.Error().Err(err).Msg("failed to start audio server process") + } // Check if we should attempt restart if !s.shouldRestart() { - s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor") + // Only log critical errors to reduce overhead + if s.logger.GetLevel() <= zerolog.ErrorLevel { + s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor") + } return } delay := s.calculateRestartDelay() - s.logger.Warn().Dur("delay", delay).Msg("retrying process start after delay") + // Sample logging to reduce overhead - log every 5th restart attempt + if len(s.restartAttempts)%5 == 0 && s.logger.GetLevel() <= zerolog.WarnLevel { + s.logger.Warn().Dur("delay", delay).Int("attempt", len(s.restartAttempts)).Msg("retrying process start after delay") + } if s.onRestart != nil { s.onRestart(len(s.restartAttempts), delay) diff --git a/internal/audio/validation.go b/internal/audio/validation.go index 1bfbfbcf..278352f1 100644 --- a/internal/audio/validation.go +++ b/internal/audio/validation.go @@ -41,6 +41,7 @@ func ValidateAudioQuality(quality AudioQuality) error { } // ValidateZeroCopyFrame validates zero-copy audio frame +// Optimized to use cached max frame size func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error { if frame == nil { return ErrInvalidFrameData @@ -49,8 +50,22 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error { if len(data) == 0 { return ErrInvalidFrameData } - // Use config value - maxFrameSize := GetConfig().MaxAudioFrameSize + + // Fast path: use cached max frame size + maxFrameSize := cachedMaxFrameSize + if maxFrameSize == 0 { + // Fallback: get from cache + cache := GetCachedConfig() + maxFrameSize = int(cache.maxAudioFrameSize.Load()) + if maxFrameSize == 0 { + // Last resort: update cache + cache.Update() + maxFrameSize = int(cache.maxAudioFrameSize.Load()) + } + // Cache globally for next calls + cachedMaxFrameSize = maxFrameSize + } + if len(data) > maxFrameSize { return ErrInvalidFrameSize } @@ -95,10 +110,31 @@ func ValidateThreadPriority(priority int) error { } // ValidateLatency validates latency duration values with reasonable bounds +// Optimized to use AudioConfigCache for frequently accessed values func ValidateLatency(latency time.Duration) error { if latency < 0 { return fmt.Errorf("%w: latency %v cannot be negative", ErrInvalidLatency, latency) } + + // Fast path: check against cached max latency + cache := GetCachedConfig() + maxLatency := time.Duration(cache.maxLatency.Load()) + + // If we have a valid cached value, use it + if maxLatency > 0 { + minLatency := time.Millisecond // Minimum reasonable latency + if latency > 0 && latency < minLatency { + return fmt.Errorf("%w: latency %v below minimum %v", + ErrInvalidLatency, latency, minLatency) + } + if latency > maxLatency { + return fmt.Errorf("%w: latency %v exceeds maximum %v", + ErrInvalidLatency, latency, maxLatency) + } + return nil + } + + // Slower path: full validation with GetConfig() config := GetConfig() minLatency := time.Millisecond // Minimum reasonable latency if latency > 0 && latency < minLatency { @@ -113,11 +149,30 @@ func ValidateLatency(latency time.Duration) error { } // ValidateMetricsInterval validates metrics update interval +// Optimized to use AudioConfigCache for frequently accessed values func ValidateMetricsInterval(interval time.Duration) error { - // Use config values + // Fast path: check against cached values + cache := GetCachedConfig() + minInterval := time.Duration(cache.minMetricsUpdateInterval.Load()) + maxInterval := time.Duration(cache.maxMetricsUpdateInterval.Load()) + + // If we have valid cached values, use them + if minInterval > 0 && maxInterval > 0 { + if interval < minInterval { + return fmt.Errorf("%w: interval %v below minimum %v", + ErrInvalidMetricsInterval, interval, minInterval) + } + if interval > maxInterval { + return fmt.Errorf("%w: interval %v exceeds maximum %v", + ErrInvalidMetricsInterval, interval, maxInterval) + } + return nil + } + + // Slower path: full validation with GetConfig() config := GetConfig() - minInterval := config.MinMetricsUpdateInterval - maxInterval := config.MaxMetricsUpdateInterval + minInterval = config.MinMetricsUpdateInterval + maxInterval = config.MaxMetricsUpdateInterval if interval < minInterval { return ErrInvalidMetricsInterval } @@ -254,12 +309,18 @@ func ValidateChannelCount(channels int) error { return nil } - // Check against max channels - still using cache to avoid GetConfig() - // Note: We don't have maxChannels in the cache yet, so we'll use GetConfig() for now - config := GetConfig() - if channels > config.MaxChannels { + // Fast path: Check against cached max channels + cachedMaxChannels := int(cache.maxChannels.Load()) + if cachedMaxChannels > 0 && channels <= cachedMaxChannels { + return nil + } + + // Slow path: Update cache and validate + cache.Update() + updatedMaxChannels := int(cache.maxChannels.Load()) + if channels > updatedMaxChannels { return fmt.Errorf("%w: channel count %d exceeds maximum %d", - ErrInvalidChannels, channels, config.MaxChannels) + ErrInvalidChannels, channels, updatedMaxChannels) } return nil } @@ -331,15 +392,34 @@ func ValidateFrameDuration(duration time.Duration) error { } } - // Slower path: full validation against min/max - config := GetConfig() - if duration < config.MinFrameDuration { - return fmt.Errorf("%w: frame duration %v below minimum %v", - ErrInvalidFrameDuration, duration, config.MinFrameDuration) + // Fast path: Check against cached min/max frame duration + cachedMinDuration := time.Duration(cache.minFrameDuration.Load()) + cachedMaxDuration := time.Duration(cache.maxFrameDuration.Load()) + + if cachedMinDuration > 0 && cachedMaxDuration > 0 { + if duration < cachedMinDuration { + return fmt.Errorf("%w: frame duration %v below minimum %v", + ErrInvalidFrameDuration, duration, cachedMinDuration) + } + if duration > cachedMaxDuration { + return fmt.Errorf("%w: frame duration %v exceeds maximum %v", + ErrInvalidFrameDuration, duration, cachedMaxDuration) + } + return nil } - if duration > config.MaxFrameDuration { + + // Slow path: Update cache and validate + cache.Update() + updatedMinDuration := time.Duration(cache.minFrameDuration.Load()) + updatedMaxDuration := time.Duration(cache.maxFrameDuration.Load()) + + if duration < updatedMinDuration { + return fmt.Errorf("%w: frame duration %v below minimum %v", + ErrInvalidFrameDuration, duration, updatedMinDuration) + } + if duration > updatedMaxDuration { return fmt.Errorf("%w: frame duration %v exceeds maximum %v", - ErrInvalidFrameDuration, duration, config.MaxFrameDuration) + ErrInvalidFrameDuration, duration, updatedMaxDuration) } return nil } diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index a5e2012f..67c1d02a 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -142,12 +142,13 @@ func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { // Get retrieves a zero-copy frame from the pool func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { - start := time.Now() + // Remove metrics overhead in critical path - use sampling instead var wasHit bool - defer func() { - latency := time.Since(start) - GetGranularMetricsCollector().RecordZeroCopyGet(latency, wasHit) - }() + var startTime time.Time + trackMetrics := atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations + if trackMetrics { + startTime = time.Now() + } // Memory guard: Track allocation count to prevent excessive memory usage allocationCount := atomic.LoadInt64(&p.allocationCount) @@ -161,6 +162,12 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { frame.length = 0 frame.data = frame.data[:0] frame.mutex.Unlock() + + // Record metrics only for sampled operations + if trackMetrics { + latency := time.Since(startTime) + GetGranularMetricsCollector().RecordZeroCopyGet(latency, wasHit) + } return frame } @@ -179,6 +186,12 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { frame.mutex.Unlock() atomic.AddInt64(&p.hitCount, 1) + + // Record metrics only for sampled operations + if trackMetrics { + latency := time.Since(startTime) + GetGranularMetricsCollector().RecordZeroCopyGet(latency, wasHit) + } return frame } p.mutex.Unlock() @@ -194,16 +207,24 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { wasHit = true // Pool hit atomic.AddInt64(&p.hitCount, 1) + + // Record metrics only for sampled operations + if trackMetrics { + latency := time.Since(startTime) + GetGranularMetricsCollector().RecordZeroCopyGet(latency, wasHit) + } return frame } // Put returns a zero-copy frame to the pool func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { - start := time.Now() - defer func() { - latency := time.Since(start) - GetGranularMetricsCollector().RecordZeroCopyPut(latency, frame.capacity) - }() + // Remove metrics overhead in critical path - use sampling instead + var startTime time.Time + trackMetrics := atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations + if trackMetrics { + startTime = time.Now() + } + if frame == nil || !frame.pooled { return } @@ -240,6 +261,12 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { } else { frame.mutex.Unlock() } + + // Record metrics only for sampled operations + if trackMetrics { + latency := time.Since(startTime) + GetGranularMetricsCollector().RecordZeroCopyPut(latency, frame.capacity) + } } // Data returns the frame data as a slice (zero-copy view)