From 25686601497a036f3376e105bba0abe5dae2c358 Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 3 Sep 2025 20:18:07 +0000 Subject: [PATCH] feat(metrics): add configurable metrics collection and performance optimizations - Add config flags to enable/disable metrics collection, goroutine monitoring, and latency profiling - Optimize batch processor with configurable queue sizes and thread pinning thresholds - Skip metrics operations when disabled to reduce overhead - Update default config with performance-related settings --- internal/audio/adaptive_buffer.go | 12 ++- internal/audio/audio.go | 18 +++++ internal/audio/base_manager.go | 18 +++++ internal/audio/batch_audio.go | 23 ++++-- internal/audio/cgo_audio.go | 36 +++++++-- internal/audio/config_constants.go | 114 +++++++++++++++++++++++++++- internal/audio/events.go | 6 ++ internal/audio/goroutine_monitor.go | 5 ++ internal/audio/input.go | 10 ++- internal/audio/latency_profiler.go | 17 ++++- internal/audio/output_streaming.go | 18 +++++ internal/audio/zero_copy.go | 28 +++++-- 12 files changed, 280 insertions(+), 25 deletions(-) diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index 584aa505..3ffb61e6 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -152,6 +152,11 @@ func (abm *AdaptiveBufferManager) GetOutputBufferSize() int { // UpdateLatency updates the current latency measurement func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) { + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Use exponential moving average for latency currentAvg := atomic.LoadInt64(&abm.averageLatency) newLatency := latency.Nanoseconds() @@ -235,8 +240,11 @@ func (abm *AdaptiveBufferManager) adaptBufferSizes() { systemCPU := totalCPU // Total CPU across all monitored processes systemMemory := totalMemory / float64(processCount) // Average memory usage - atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100)) - atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100)) + cachedConfig := GetCachedConfig() + if cachedConfig.GetEnableMetricsCollection() { + atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100)) + atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100)) + } // Get current latency currentLatencyNs := atomic.LoadInt64(&abm.averageLatency) diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 444ab312..2b8936a5 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -374,6 +374,12 @@ const ( // RecordFrameReceived increments the frames received counter with batched updates func RecordFrameReceived(bytes int) { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Use local batching to reduce atomic operations frequency atomic.AddInt64(&batchedFramesReceived, 1) atomic.AddInt64(&batchedBytesProcessed, int64(bytes)) @@ -396,6 +402,12 @@ func RecordFrameReceived(bytes int) { // RecordFrameDropped increments the frames dropped counter with batched updates func RecordFrameDropped() { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Use local batching to reduce atomic operations frequency atomic.AddInt64(&batchedFramesDropped, 1) @@ -407,6 +419,12 @@ func RecordFrameDropped() { // RecordConnectionDrop increments the connection drops counter with batched updates func RecordConnectionDrop() { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Use local batching to reduce atomic operations frequency atomic.AddInt64(&batchedConnectionDrops, 1) diff --git a/internal/audio/base_manager.go b/internal/audio/base_manager.go index f5838087..e0a6033c 100644 --- a/internal/audio/base_manager.go +++ b/internal/audio/base_manager.go @@ -79,6 +79,12 @@ func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics { // recordFrameProcessed records a processed frame with simplified tracking func (bam *BaseAudioManager) recordFrameProcessed(bytes int) { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Direct atomic updates to avoid sampling complexity in critical path atomic.AddInt64(&bam.metrics.FramesProcessed, 1) atomic.AddInt64(&bam.metrics.BytesProcessed, int64(bytes)) @@ -89,12 +95,24 @@ func (bam *BaseAudioManager) recordFrameProcessed(bytes int) { // recordFrameDropped records a dropped frame with simplified tracking func (bam *BaseAudioManager) recordFrameDropped() { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Direct atomic update to avoid sampling complexity in critical path atomic.AddInt64(&bam.metrics.FramesDropped, 1) } // updateLatency updates the average latency func (bam *BaseAudioManager) updateLatency(latency time.Duration) { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Simple moving average - could be enhanced with more sophisticated algorithms currentAvg := bam.metrics.AverageLatency if currentAvg == 0 { diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index b74c45b5..6b063ea4 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -94,6 +94,12 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu batchDuration = cache.BatchProcessingDelay } + // Use optimized queue sizes from configuration + queueSize := cache.BatchProcessorMaxQueueSize + if queueSize <= 0 { + queueSize = batchSize * 2 // Fallback to double batch size + } + ctx, cancel := context.WithCancel(context.Background()) // Pre-allocate logger to avoid repeated allocations logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() @@ -110,8 +116,8 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu logger: &logger, batchSize: batchSize, batchDuration: batchDuration, - readQueue: make(chan batchReadRequest, batchSize*2), - writeQueue: make(chan batchWriteRequest, batchSize*2), + readQueue: make(chan batchReadRequest, queueSize), + writeQueue: make(chan batchWriteRequest, queueSize), readBufPool: &sync.Pool{ New: func() interface{} { // Use pre-calculated frame size to avoid GetConfig() calls @@ -422,12 +428,15 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { // Get cached config once - avoid repeated calls cache := GetCachedConfig() - minBatchSize := cache.MinBatchSizeForThreadPinning + threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold + if threadPinningThreshold == 0 { + threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback + } // Only pin to OS thread for large batches to reduce thread contention var start time.Time threadWasPinned := false - if batchSize >= minBatchSize && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { + if batchSize >= threadPinningThreshold && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { start = time.Now() threadWasPinned = true runtime.LockOSThread() @@ -473,10 +482,14 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { // Get cached config to avoid GetConfig() calls in hot path cache := GetCachedConfig() + threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold + if threadPinningThreshold == 0 { + threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback + } // Only pin to OS thread for large batches to reduce thread contention start := time.Now() - shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning + shouldPinThread := len(batch) >= threadPinningThreshold // Track if we pinned the thread in this call threadWasPinned := false diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index ad89d634..77d4396d 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -750,12 +750,19 @@ type AudioConfigCache struct { inputProcessingTimeoutMS atomic.Int32 maxRestartAttempts atomic.Int32 + // Performance flags for hot path optimization + enableMetricsCollection atomic.Bool + enableGoroutineMonitoring atomic.Bool + // Batch processing related values - BatchProcessingTimeout time.Duration - BatchProcessorFramesPerBatch int - BatchProcessorTimeout time.Duration - BatchProcessingDelay time.Duration - MinBatchSizeForThreadPinning int + BatchProcessingTimeout time.Duration + BatchProcessorFramesPerBatch int + BatchProcessorTimeout time.Duration + BatchProcessingDelay time.Duration + MinBatchSizeForThreadPinning int + BatchProcessorMaxQueueSize int + BatchProcessorAdaptiveThreshold float64 + BatchProcessorThreadPinningThreshold int // Mutex for updating the cache mutex sync.RWMutex @@ -822,12 +829,19 @@ func (c *AudioConfigCache) Update() { c.minOpusBitrate.Store(int32(config.MinOpusBitrate)) c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate)) + // Update performance flags for hot path optimization + c.enableMetricsCollection.Store(config.EnableMetricsCollection) + c.enableGoroutineMonitoring.Store(config.EnableGoroutineMonitoring) + // Update batch processing related values c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing c.BatchProcessorFramesPerBatch = config.BatchProcessorFramesPerBatch c.BatchProcessorTimeout = config.BatchProcessorTimeout c.BatchProcessingDelay = config.BatchProcessingDelay c.MinBatchSizeForThreadPinning = config.MinBatchSizeForThreadPinning + c.BatchProcessorMaxQueueSize = config.BatchProcessorMaxQueueSize + c.BatchProcessorAdaptiveThreshold = config.BatchProcessorAdaptiveThreshold + c.BatchProcessorThreadPinningThreshold = config.BatchProcessorThreadPinningThreshold // Pre-allocate common errors c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer) @@ -873,6 +887,18 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error { return c.bufferTooLargeDecodeWrite } +// GetEnableMetricsCollection returns the cached EnableMetricsCollection flag for hot path optimization +func (c *AudioConfigCache) GetEnableMetricsCollection() bool { + c.Update() // Ensure cache is current + return c.enableMetricsCollection.Load() +} + +// GetEnableGoroutineMonitoring returns the cached EnableGoroutineMonitoring flag for hot path optimization +func (c *AudioConfigCache) GetEnableGoroutineMonitoring() bool { + c.Update() // Ensure cache is current + return c.enableGoroutineMonitoring.Load() +} + // Removed duplicate config caching system - using AudioConfigCache instead func cgoAudioReadEncode(buf []byte) (int, error) { diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index d44f801f..7ad56f4a 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -908,6 +908,23 @@ type AudioConfigConstants struct { // Default true enables pre-warming for optimal user experience EnableSubprocessPrewarming bool // Enable subprocess pre-warming (default: true) + // Performance Mode Configuration + // These flags control overhead-inducing features for production optimization + EnableMetricsCollection bool // Enable detailed metrics collection (default: true) + EnableLatencyProfiling bool // Enable latency profiling and detailed tracing (default: false) + EnableGoroutineMonitoring bool // Enable goroutine monitoring (default: false) + EnableBatchTimeTracking bool // Enable batch processing time tracking (default: false) + EnableDetailedLogging bool // Enable detailed debug logging (default: false) + + // Metrics Collection Optimization + MetricsFlushInterval int // Batched metrics flush interval (default: 10) + MetricsForceFlushNanos int64 // Force flush after nanoseconds (default: 100ms) + MetricsSamplingRate float64 // Sampling rate for metrics (0.0-1.0, default: 1.0) + + // Latency Profiling Optimization + LatencyProfilingSamplingRate float64 // Latency profiling sampling rate (default: 0.01 = 1%) + LatencyProfilingInterval time.Duration // Latency profiling report interval (default: 60s) + // Priority Scheduler Configuration - Settings for process priority management // Used in: priority_scheduler.go for system priority control // Impact: Controls valid range for process priority adjustments @@ -1212,6 +1229,24 @@ type AudioConfigConstants struct { // Default 5ms provides quick batch processing with reasonable timeout. BatchProcessorTimeout time.Duration + // BatchProcessorMaxQueueSize defines maximum queue size for batch operations. + // Used in: batch_audio.go for queue size control + // Impact: Larger queues reduce blocking but increase memory usage. + // Default 16 provides good balance between memory and performance. + BatchProcessorMaxQueueSize int + + // BatchProcessorAdaptiveThreshold defines threshold for adaptive batch sizing. + // Used in: batch_audio.go for dynamic batch size adjustment + // Impact: Lower thresholds enable more aggressive batching. + // Default 0.8 enables batching when 80% of queue is full. + BatchProcessorAdaptiveThreshold float64 + + // BatchProcessorThreadPinningThreshold defines minimum batch size for thread pinning. + // Used in: batch_audio.go for OS thread pinning optimization + // Impact: Higher thresholds reduce thread pinning overhead. + // Default 8 frames enables pinning for larger batches only. + BatchProcessorThreadPinningThreshold int + // Output Streaming Constants - Configuration for audio output streaming // Used in: output_streaming.go for output stream timing control // Impact: Controls output streaming frame rate and timing @@ -2431,8 +2466,11 @@ func DefaultAudioConfig() *AudioConfigConstants { ProcessMonitorTraditionalHz: 100.0, // 100.0 Hz traditional clock // Batch Processing Constants - BatchProcessorFramesPerBatch: 4, // 4 frames per batch - BatchProcessorTimeout: 5 * time.Millisecond, // 5ms timeout + BatchProcessorFramesPerBatch: 4, // 4 frames per batch + BatchProcessorTimeout: 5 * time.Millisecond, // 5ms timeout + BatchProcessorMaxQueueSize: 16, // 16 max queue size for balanced memory/performance + BatchProcessorAdaptiveThreshold: 0.8, // 0.8 threshold for adaptive batching (80% queue full) + BatchProcessorThreadPinningThreshold: 8, // 8 frames minimum for thread pinning optimization // Output Streaming Constants OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) @@ -2534,6 +2572,78 @@ func DefaultAudioConfig() *AudioConfigConstants { // Goroutine Monitoring Configuration GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval + + // Performance Configuration Flags - Production optimizations + // Used in: Production environments to reduce overhead and improve performance + // Impact: Controls which performance monitoring features are enabled + + // EnableMetricsCollection controls detailed metrics collection. + // Used in: metrics.go, granular_metrics.go for performance tracking + // Impact: When disabled, reduces atomic operations and memory overhead. + // Default true for development, should be false in production for optimal performance. + EnableMetricsCollection: true, // Enable detailed metrics collection (default: true) + + // EnableLatencyProfiling controls latency profiling and detailed tracing. + // Used in: latency_profiler.go for performance analysis + // Impact: When disabled, eliminates profiling overhead and reduces CPU usage. + // Default false to minimize overhead in production environments. + EnableLatencyProfiling: false, // Enable latency profiling and detailed tracing (default: false) + + // EnableGoroutineMonitoring controls goroutine monitoring. + // Used in: goroutine_monitor.go for tracking goroutine health + // Impact: When disabled, reduces monitoring overhead and CPU usage. + // Default false to minimize overhead in production environments. + EnableGoroutineMonitoring: false, // Enable goroutine monitoring (default: false) + + // EnableBatchTimeTracking controls batch processing time tracking. + // Used in: batch_audio.go for performance analysis + // Impact: When disabled, eliminates time tracking overhead. + // Default false to minimize overhead in production environments. + EnableBatchTimeTracking: false, // Enable batch processing time tracking (default: false) + + // EnableDetailedLogging controls detailed debug logging. + // Used in: Throughout audio system for debugging + // Impact: When disabled, reduces logging overhead and improves performance. + // Default false to minimize overhead in production environments. + EnableDetailedLogging: false, // Enable detailed debug logging (default: false) + + // Metrics Configuration - Batching and sampling for performance + // Used in: metrics.go for optimizing metrics collection overhead + // Impact: Controls how frequently metrics are updated and flushed + + // MetricsFlushInterval defines batched metrics flush interval. + // Used in: metrics.go for batching metrics updates + // Impact: Higher values reduce update frequency but increase memory usage. + // Default 10 provides good balance between performance and memory. + MetricsFlushInterval: 10, // Batched metrics flush interval (default: 10) + + // MetricsForceFlushNanos defines force flush after nanoseconds. + // Used in: metrics.go for ensuring metrics are not delayed too long + // Impact: Prevents metrics from being delayed indefinitely. + // Default 100ms ensures reasonable freshness while allowing batching. + MetricsForceFlushNanos: 100000000, // Force flush after nanoseconds (default: 100ms) + + // MetricsSamplingRate defines sampling rate for metrics. + // Used in: metrics.go for reducing metrics collection overhead + // Impact: Values < 1.0 reduce overhead by sampling only a fraction of events. + // Default 1.0 collects all metrics, set to 0.1 in production for 90% reduction. + MetricsSamplingRate: 1.0, // Sampling rate for metrics (0.0-1.0, default: 1.0) + + // Latency Profiling Configuration - Sampling for performance + // Used in: latency_profiler.go for optimizing profiling overhead + // Impact: Controls how frequently latency measurements are taken + + // LatencyProfilingSamplingRate defines latency profiling sampling rate. + // Used in: latency_profiler.go for reducing profiling overhead + // Impact: Values < 1.0 significantly reduce profiling overhead. + // Default 0.01 (1%) provides useful data with minimal overhead. + LatencyProfilingSamplingRate: 0.01, // Latency profiling sampling rate (default: 0.01 = 1%) + + // LatencyProfilingInterval defines latency profiling report interval. + // Used in: latency_profiler.go for controlling report frequency + // Impact: Longer intervals reduce reporting overhead. + // Default 60s provides reasonable reporting frequency. + LatencyProfilingInterval: 60 * time.Second, // Latency profiling report interval (default: 60s) } } diff --git a/internal/audio/events.go b/internal/audio/events.go index 1cf88623..76ba7e5a 100644 --- a/internal/audio/events.go +++ b/internal/audio/events.go @@ -370,6 +370,12 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { defer ticker.Stop() for range ticker.C { + // Skip metrics broadcasting if metrics collection is disabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + continue + } + aeb.mutex.RLock() subscriberCount := len(aeb.subscribers) diff --git a/internal/audio/goroutine_monitor.go b/internal/audio/goroutine_monitor.go index 2c1d6d4d..0ed432ba 100644 --- a/internal/audio/goroutine_monitor.go +++ b/internal/audio/goroutine_monitor.go @@ -133,6 +133,11 @@ func GetGoroutineMonitor() *GoroutineMonitor { // StartGoroutineMonitoring starts the global goroutine monitor func StartGoroutineMonitoring() { + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableGoroutineMonitoring() { + return + } + monitor := GetGoroutineMonitor() monitor.Start() } diff --git a/internal/audio/input.go b/internal/audio/input.go index 90575f82..d076a335 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -109,12 +109,18 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { } if err != nil { - atomic.AddInt64(&aim.metrics.FramesDropped, 1) + cachedConfig := GetCachedConfig() + if cachedConfig.GetEnableMetricsCollection() { + atomic.AddInt64(&aim.metrics.FramesDropped, 1) + } return err } // Update metrics - atomic.AddInt64(&aim.framesSent, 1) + cachedConfig := GetCachedConfig() + if cachedConfig.GetEnableMetricsCollection() { + atomic.AddInt64(&aim.framesSent, 1) + } aim.recordFrameProcessed(len(frame)) aim.updateLatency(processingTime) diff --git a/internal/audio/latency_profiler.go b/internal/audio/latency_profiler.go index f7235942..02b3c72a 100644 --- a/internal/audio/latency_profiler.go +++ b/internal/audio/latency_profiler.go @@ -127,14 +127,15 @@ var ( // DefaultLatencyProfilerConfig returns default profiler configuration func DefaultLatencyProfilerConfig() LatencyProfilerConfig { + config := GetConfig() return LatencyProfilerConfig{ MaxMeasurements: 10000, - SamplingRate: 0.1, // Profile 10% of frames to minimize overhead + SamplingRate: config.LatencyProfilingSamplingRate, // Use configurable sampling rate ReportingInterval: 30 * time.Second, ThresholdWarning: 50 * time.Millisecond, ThresholdCritical: 100 * time.Millisecond, - EnableDetailedTrace: false, // Disabled by default for performance - EnableHistogram: true, + EnableDetailedTrace: false, // Disabled by default for performance + EnableHistogram: config.EnableLatencyProfiling, // Only enable if profiling is enabled } } @@ -508,6 +509,10 @@ func GetGlobalLatencyProfiler() *LatencyProfiler { // EnableLatencyProfiling enables the global latency profiler func EnableLatencyProfiling() error { + config := GetConfig() + if !config.EnableLatencyProfiling { + return fmt.Errorf("latency profiling is disabled in configuration") + } profiler := GetGlobalLatencyProfiler() return profiler.Start() } @@ -523,6 +528,12 @@ func DisableLatencyProfiling() { // ProfileFrameLatency is a convenience function to profile a single frame's latency func ProfileFrameLatency(frameID uint64, frameSize int, source string, fn func(*FrameLatencyTracker)) { + config := GetConfig() + if !config.EnableLatencyProfiling { + fn(nil) + return + } + profiler := GetGlobalLatencyProfiler() if !profiler.IsEnabled() { fn(nil) diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index df2b4d3a..860f7891 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -295,6 +295,12 @@ func (s *AudioOutputStreamer) reportStatistics() { // recordFrameProcessed records a processed frame with sampling optimization func (s *AudioOutputStreamer) recordFrameProcessed() { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Increment local counters frameCount := atomic.AddInt64(&s.frameCounter, 1) atomic.AddInt64(&s.localProcessed, 1) @@ -309,6 +315,12 @@ func (s *AudioOutputStreamer) recordFrameProcessed() { // recordFrameDropped records a dropped frame with sampling optimization func (s *AudioOutputStreamer) recordFrameDropped() { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Increment local counter localDropped := atomic.AddInt64(&s.localDropped, 1) @@ -321,6 +333,12 @@ func (s *AudioOutputStreamer) recordFrameDropped() { // flushPendingMetrics flushes any pending sampled metrics to atomic counters func (s *AudioOutputStreamer) flushPendingMetrics() { + // Check if metrics collection is enabled + cachedConfig := GetCachedConfig() + if !cachedConfig.GetEnableMetricsCollection() { + return + } + // Flush remaining processed and dropped frames localProcessed := atomic.SwapInt64(&s.localProcessed, 0) localDropped := atomic.SwapInt64(&s.localDropped, 0) diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index 67c1d02a..9ff235a4 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -142,10 +142,14 @@ func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool { // Get retrieves a zero-copy frame from the pool func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { + // Get cached config once for all metrics operations + cachedConfig := GetCachedConfig() + enableMetrics := cachedConfig.GetEnableMetricsCollection() + // Remove metrics overhead in critical path - use sampling instead var wasHit bool var startTime time.Time - trackMetrics := atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations + trackMetrics := enableMetrics && atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations if enabled if trackMetrics { startTime = time.Now() } @@ -154,7 +158,9 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { allocationCount := atomic.LoadInt64(&p.allocationCount) if allocationCount > int64(p.maxPoolSize*2) { // If we've allocated too many frames, force pool reuse - atomic.AddInt64(&p.missCount, 1) + if enableMetrics { + atomic.AddInt64(&p.missCount, 1) + } wasHit = true // Pool reuse counts as hit frame := p.pool.Get().(*ZeroCopyAudioFrame) frame.mutex.Lock() @@ -185,7 +191,9 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { frame.data = frame.data[:0] frame.mutex.Unlock() - atomic.AddInt64(&p.hitCount, 1) + if enableMetrics { + atomic.AddInt64(&p.hitCount, 1) + } // Record metrics only for sampled operations if trackMetrics { @@ -197,7 +205,9 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { p.mutex.Unlock() // Try sync.Pool next and track allocation - atomic.AddInt64(&p.allocationCount, 1) + if enableMetrics { + atomic.AddInt64(&p.allocationCount, 1) + } frame := p.pool.Get().(*ZeroCopyAudioFrame) frame.mutex.Lock() frame.refCount = 1 @@ -218,9 +228,13 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { // Put returns a zero-copy frame to the pool func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { + // Get cached config once for all metrics operations + cachedConfig := GetCachedConfig() + enableMetrics := cachedConfig.GetEnableMetricsCollection() + // Remove metrics overhead in critical path - use sampling instead var startTime time.Time - trackMetrics := atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations + trackMetrics := enableMetrics && atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations if enabled if trackMetrics { startTime = time.Now() } @@ -257,7 +271,9 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { // Return to sync.Pool p.pool.Put(frame) - atomic.AddInt64(&p.counter, 1) + if enableMetrics { + atomic.AddInt64(&p.counter, 1) + } } else { frame.mutex.Unlock() }