mirror of https://github.com/jetkvm/kvm.git
feat(metrics): add configurable metrics collection and performance optimizations
- Add config flags to enable/disable metrics collection, goroutine monitoring, and latency profiling - Optimize batch processor with configurable queue sizes and thread pinning thresholds - Skip metrics operations when disabled to reduce overhead - Update default config with performance-related settings
This commit is contained in:
parent
ca365f1acd
commit
2568660149
|
@ -152,6 +152,11 @@ func (abm *AdaptiveBufferManager) GetOutputBufferSize() int {
|
||||||
|
|
||||||
// UpdateLatency updates the current latency measurement
|
// UpdateLatency updates the current latency measurement
|
||||||
func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) {
|
func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) {
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Use exponential moving average for latency
|
// Use exponential moving average for latency
|
||||||
currentAvg := atomic.LoadInt64(&abm.averageLatency)
|
currentAvg := atomic.LoadInt64(&abm.averageLatency)
|
||||||
newLatency := latency.Nanoseconds()
|
newLatency := latency.Nanoseconds()
|
||||||
|
@ -235,8 +240,11 @@ func (abm *AdaptiveBufferManager) adaptBufferSizes() {
|
||||||
systemCPU := totalCPU // Total CPU across all monitored processes
|
systemCPU := totalCPU // Total CPU across all monitored processes
|
||||||
systemMemory := totalMemory / float64(processCount) // Average memory usage
|
systemMemory := totalMemory / float64(processCount) // Average memory usage
|
||||||
|
|
||||||
atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100))
|
cachedConfig := GetCachedConfig()
|
||||||
atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100))
|
if cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100))
|
||||||
|
atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100))
|
||||||
|
}
|
||||||
|
|
||||||
// Get current latency
|
// Get current latency
|
||||||
currentLatencyNs := atomic.LoadInt64(&abm.averageLatency)
|
currentLatencyNs := atomic.LoadInt64(&abm.averageLatency)
|
||||||
|
|
|
@ -374,6 +374,12 @@ const (
|
||||||
|
|
||||||
// RecordFrameReceived increments the frames received counter with batched updates
|
// RecordFrameReceived increments the frames received counter with batched updates
|
||||||
func RecordFrameReceived(bytes int) {
|
func RecordFrameReceived(bytes int) {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Use local batching to reduce atomic operations frequency
|
// Use local batching to reduce atomic operations frequency
|
||||||
atomic.AddInt64(&batchedFramesReceived, 1)
|
atomic.AddInt64(&batchedFramesReceived, 1)
|
||||||
atomic.AddInt64(&batchedBytesProcessed, int64(bytes))
|
atomic.AddInt64(&batchedBytesProcessed, int64(bytes))
|
||||||
|
@ -396,6 +402,12 @@ func RecordFrameReceived(bytes int) {
|
||||||
|
|
||||||
// RecordFrameDropped increments the frames dropped counter with batched updates
|
// RecordFrameDropped increments the frames dropped counter with batched updates
|
||||||
func RecordFrameDropped() {
|
func RecordFrameDropped() {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Use local batching to reduce atomic operations frequency
|
// Use local batching to reduce atomic operations frequency
|
||||||
atomic.AddInt64(&batchedFramesDropped, 1)
|
atomic.AddInt64(&batchedFramesDropped, 1)
|
||||||
|
|
||||||
|
@ -407,6 +419,12 @@ func RecordFrameDropped() {
|
||||||
|
|
||||||
// RecordConnectionDrop increments the connection drops counter with batched updates
|
// RecordConnectionDrop increments the connection drops counter with batched updates
|
||||||
func RecordConnectionDrop() {
|
func RecordConnectionDrop() {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Use local batching to reduce atomic operations frequency
|
// Use local batching to reduce atomic operations frequency
|
||||||
atomic.AddInt64(&batchedConnectionDrops, 1)
|
atomic.AddInt64(&batchedConnectionDrops, 1)
|
||||||
|
|
||||||
|
|
|
@ -79,6 +79,12 @@ func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics {
|
||||||
|
|
||||||
// recordFrameProcessed records a processed frame with simplified tracking
|
// recordFrameProcessed records a processed frame with simplified tracking
|
||||||
func (bam *BaseAudioManager) recordFrameProcessed(bytes int) {
|
func (bam *BaseAudioManager) recordFrameProcessed(bytes int) {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Direct atomic updates to avoid sampling complexity in critical path
|
// Direct atomic updates to avoid sampling complexity in critical path
|
||||||
atomic.AddInt64(&bam.metrics.FramesProcessed, 1)
|
atomic.AddInt64(&bam.metrics.FramesProcessed, 1)
|
||||||
atomic.AddInt64(&bam.metrics.BytesProcessed, int64(bytes))
|
atomic.AddInt64(&bam.metrics.BytesProcessed, int64(bytes))
|
||||||
|
@ -89,12 +95,24 @@ func (bam *BaseAudioManager) recordFrameProcessed(bytes int) {
|
||||||
|
|
||||||
// recordFrameDropped records a dropped frame with simplified tracking
|
// recordFrameDropped records a dropped frame with simplified tracking
|
||||||
func (bam *BaseAudioManager) recordFrameDropped() {
|
func (bam *BaseAudioManager) recordFrameDropped() {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Direct atomic update to avoid sampling complexity in critical path
|
// Direct atomic update to avoid sampling complexity in critical path
|
||||||
atomic.AddInt64(&bam.metrics.FramesDropped, 1)
|
atomic.AddInt64(&bam.metrics.FramesDropped, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateLatency updates the average latency
|
// updateLatency updates the average latency
|
||||||
func (bam *BaseAudioManager) updateLatency(latency time.Duration) {
|
func (bam *BaseAudioManager) updateLatency(latency time.Duration) {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Simple moving average - could be enhanced with more sophisticated algorithms
|
// Simple moving average - could be enhanced with more sophisticated algorithms
|
||||||
currentAvg := bam.metrics.AverageLatency
|
currentAvg := bam.metrics.AverageLatency
|
||||||
if currentAvg == 0 {
|
if currentAvg == 0 {
|
||||||
|
|
|
@ -94,6 +94,12 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
|
||||||
batchDuration = cache.BatchProcessingDelay
|
batchDuration = cache.BatchProcessingDelay
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Use optimized queue sizes from configuration
|
||||||
|
queueSize := cache.BatchProcessorMaxQueueSize
|
||||||
|
if queueSize <= 0 {
|
||||||
|
queueSize = batchSize * 2 // Fallback to double batch size
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
// Pre-allocate logger to avoid repeated allocations
|
// Pre-allocate logger to avoid repeated allocations
|
||||||
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
|
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
|
||||||
|
@ -110,8 +116,8 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
|
||||||
logger: &logger,
|
logger: &logger,
|
||||||
batchSize: batchSize,
|
batchSize: batchSize,
|
||||||
batchDuration: batchDuration,
|
batchDuration: batchDuration,
|
||||||
readQueue: make(chan batchReadRequest, batchSize*2),
|
readQueue: make(chan batchReadRequest, queueSize),
|
||||||
writeQueue: make(chan batchWriteRequest, batchSize*2),
|
writeQueue: make(chan batchWriteRequest, queueSize),
|
||||||
readBufPool: &sync.Pool{
|
readBufPool: &sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
// Use pre-calculated frame size to avoid GetConfig() calls
|
// Use pre-calculated frame size to avoid GetConfig() calls
|
||||||
|
@ -422,12 +428,15 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
|
||||||
|
|
||||||
// Get cached config once - avoid repeated calls
|
// Get cached config once - avoid repeated calls
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
minBatchSize := cache.MinBatchSizeForThreadPinning
|
threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold
|
||||||
|
if threadPinningThreshold == 0 {
|
||||||
|
threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback
|
||||||
|
}
|
||||||
|
|
||||||
// Only pin to OS thread for large batches to reduce thread contention
|
// Only pin to OS thread for large batches to reduce thread contention
|
||||||
var start time.Time
|
var start time.Time
|
||||||
threadWasPinned := false
|
threadWasPinned := false
|
||||||
if batchSize >= minBatchSize && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
|
if batchSize >= threadPinningThreshold && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
threadWasPinned = true
|
threadWasPinned = true
|
||||||
runtime.LockOSThread()
|
runtime.LockOSThread()
|
||||||
|
@ -473,10 +482,14 @@ func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
|
||||||
|
|
||||||
// Get cached config to avoid GetConfig() calls in hot path
|
// Get cached config to avoid GetConfig() calls in hot path
|
||||||
cache := GetCachedConfig()
|
cache := GetCachedConfig()
|
||||||
|
threadPinningThreshold := cache.BatchProcessorThreadPinningThreshold
|
||||||
|
if threadPinningThreshold == 0 {
|
||||||
|
threadPinningThreshold = cache.MinBatchSizeForThreadPinning // Fallback
|
||||||
|
}
|
||||||
|
|
||||||
// Only pin to OS thread for large batches to reduce thread contention
|
// Only pin to OS thread for large batches to reduce thread contention
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
shouldPinThread := len(batch) >= cache.MinBatchSizeForThreadPinning
|
shouldPinThread := len(batch) >= threadPinningThreshold
|
||||||
|
|
||||||
// Track if we pinned the thread in this call
|
// Track if we pinned the thread in this call
|
||||||
threadWasPinned := false
|
threadWasPinned := false
|
||||||
|
|
|
@ -750,12 +750,19 @@ type AudioConfigCache struct {
|
||||||
inputProcessingTimeoutMS atomic.Int32
|
inputProcessingTimeoutMS atomic.Int32
|
||||||
maxRestartAttempts atomic.Int32
|
maxRestartAttempts atomic.Int32
|
||||||
|
|
||||||
|
// Performance flags for hot path optimization
|
||||||
|
enableMetricsCollection atomic.Bool
|
||||||
|
enableGoroutineMonitoring atomic.Bool
|
||||||
|
|
||||||
// Batch processing related values
|
// Batch processing related values
|
||||||
BatchProcessingTimeout time.Duration
|
BatchProcessingTimeout time.Duration
|
||||||
BatchProcessorFramesPerBatch int
|
BatchProcessorFramesPerBatch int
|
||||||
BatchProcessorTimeout time.Duration
|
BatchProcessorTimeout time.Duration
|
||||||
BatchProcessingDelay time.Duration
|
BatchProcessingDelay time.Duration
|
||||||
MinBatchSizeForThreadPinning int
|
MinBatchSizeForThreadPinning int
|
||||||
|
BatchProcessorMaxQueueSize int
|
||||||
|
BatchProcessorAdaptiveThreshold float64
|
||||||
|
BatchProcessorThreadPinningThreshold int
|
||||||
|
|
||||||
// Mutex for updating the cache
|
// Mutex for updating the cache
|
||||||
mutex sync.RWMutex
|
mutex sync.RWMutex
|
||||||
|
@ -822,12 +829,19 @@ func (c *AudioConfigCache) Update() {
|
||||||
c.minOpusBitrate.Store(int32(config.MinOpusBitrate))
|
c.minOpusBitrate.Store(int32(config.MinOpusBitrate))
|
||||||
c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate))
|
c.maxOpusBitrate.Store(int32(config.MaxOpusBitrate))
|
||||||
|
|
||||||
|
// Update performance flags for hot path optimization
|
||||||
|
c.enableMetricsCollection.Store(config.EnableMetricsCollection)
|
||||||
|
c.enableGoroutineMonitoring.Store(config.EnableGoroutineMonitoring)
|
||||||
|
|
||||||
// Update batch processing related values
|
// Update batch processing related values
|
||||||
c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing
|
c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing
|
||||||
c.BatchProcessorFramesPerBatch = config.BatchProcessorFramesPerBatch
|
c.BatchProcessorFramesPerBatch = config.BatchProcessorFramesPerBatch
|
||||||
c.BatchProcessorTimeout = config.BatchProcessorTimeout
|
c.BatchProcessorTimeout = config.BatchProcessorTimeout
|
||||||
c.BatchProcessingDelay = config.BatchProcessingDelay
|
c.BatchProcessingDelay = config.BatchProcessingDelay
|
||||||
c.MinBatchSizeForThreadPinning = config.MinBatchSizeForThreadPinning
|
c.MinBatchSizeForThreadPinning = config.MinBatchSizeForThreadPinning
|
||||||
|
c.BatchProcessorMaxQueueSize = config.BatchProcessorMaxQueueSize
|
||||||
|
c.BatchProcessorAdaptiveThreshold = config.BatchProcessorAdaptiveThreshold
|
||||||
|
c.BatchProcessorThreadPinningThreshold = config.BatchProcessorThreadPinningThreshold
|
||||||
|
|
||||||
// Pre-allocate common errors
|
// Pre-allocate common errors
|
||||||
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer)
|
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, config.MinReadEncodeBuffer)
|
||||||
|
@ -873,6 +887,18 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error {
|
||||||
return c.bufferTooLargeDecodeWrite
|
return c.bufferTooLargeDecodeWrite
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetEnableMetricsCollection returns the cached EnableMetricsCollection flag for hot path optimization
|
||||||
|
func (c *AudioConfigCache) GetEnableMetricsCollection() bool {
|
||||||
|
c.Update() // Ensure cache is current
|
||||||
|
return c.enableMetricsCollection.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEnableGoroutineMonitoring returns the cached EnableGoroutineMonitoring flag for hot path optimization
|
||||||
|
func (c *AudioConfigCache) GetEnableGoroutineMonitoring() bool {
|
||||||
|
c.Update() // Ensure cache is current
|
||||||
|
return c.enableGoroutineMonitoring.Load()
|
||||||
|
}
|
||||||
|
|
||||||
// Removed duplicate config caching system - using AudioConfigCache instead
|
// Removed duplicate config caching system - using AudioConfigCache instead
|
||||||
|
|
||||||
func cgoAudioReadEncode(buf []byte) (int, error) {
|
func cgoAudioReadEncode(buf []byte) (int, error) {
|
||||||
|
|
|
@ -908,6 +908,23 @@ type AudioConfigConstants struct {
|
||||||
// Default true enables pre-warming for optimal user experience
|
// Default true enables pre-warming for optimal user experience
|
||||||
EnableSubprocessPrewarming bool // Enable subprocess pre-warming (default: true)
|
EnableSubprocessPrewarming bool // Enable subprocess pre-warming (default: true)
|
||||||
|
|
||||||
|
// Performance Mode Configuration
|
||||||
|
// These flags control overhead-inducing features for production optimization
|
||||||
|
EnableMetricsCollection bool // Enable detailed metrics collection (default: true)
|
||||||
|
EnableLatencyProfiling bool // Enable latency profiling and detailed tracing (default: false)
|
||||||
|
EnableGoroutineMonitoring bool // Enable goroutine monitoring (default: false)
|
||||||
|
EnableBatchTimeTracking bool // Enable batch processing time tracking (default: false)
|
||||||
|
EnableDetailedLogging bool // Enable detailed debug logging (default: false)
|
||||||
|
|
||||||
|
// Metrics Collection Optimization
|
||||||
|
MetricsFlushInterval int // Batched metrics flush interval (default: 10)
|
||||||
|
MetricsForceFlushNanos int64 // Force flush after nanoseconds (default: 100ms)
|
||||||
|
MetricsSamplingRate float64 // Sampling rate for metrics (0.0-1.0, default: 1.0)
|
||||||
|
|
||||||
|
// Latency Profiling Optimization
|
||||||
|
LatencyProfilingSamplingRate float64 // Latency profiling sampling rate (default: 0.01 = 1%)
|
||||||
|
LatencyProfilingInterval time.Duration // Latency profiling report interval (default: 60s)
|
||||||
|
|
||||||
// Priority Scheduler Configuration - Settings for process priority management
|
// Priority Scheduler Configuration - Settings for process priority management
|
||||||
// Used in: priority_scheduler.go for system priority control
|
// Used in: priority_scheduler.go for system priority control
|
||||||
// Impact: Controls valid range for process priority adjustments
|
// Impact: Controls valid range for process priority adjustments
|
||||||
|
@ -1212,6 +1229,24 @@ type AudioConfigConstants struct {
|
||||||
// Default 5ms provides quick batch processing with reasonable timeout.
|
// Default 5ms provides quick batch processing with reasonable timeout.
|
||||||
BatchProcessorTimeout time.Duration
|
BatchProcessorTimeout time.Duration
|
||||||
|
|
||||||
|
// BatchProcessorMaxQueueSize defines maximum queue size for batch operations.
|
||||||
|
// Used in: batch_audio.go for queue size control
|
||||||
|
// Impact: Larger queues reduce blocking but increase memory usage.
|
||||||
|
// Default 16 provides good balance between memory and performance.
|
||||||
|
BatchProcessorMaxQueueSize int
|
||||||
|
|
||||||
|
// BatchProcessorAdaptiveThreshold defines threshold for adaptive batch sizing.
|
||||||
|
// Used in: batch_audio.go for dynamic batch size adjustment
|
||||||
|
// Impact: Lower thresholds enable more aggressive batching.
|
||||||
|
// Default 0.8 enables batching when 80% of queue is full.
|
||||||
|
BatchProcessorAdaptiveThreshold float64
|
||||||
|
|
||||||
|
// BatchProcessorThreadPinningThreshold defines minimum batch size for thread pinning.
|
||||||
|
// Used in: batch_audio.go for OS thread pinning optimization
|
||||||
|
// Impact: Higher thresholds reduce thread pinning overhead.
|
||||||
|
// Default 8 frames enables pinning for larger batches only.
|
||||||
|
BatchProcessorThreadPinningThreshold int
|
||||||
|
|
||||||
// Output Streaming Constants - Configuration for audio output streaming
|
// Output Streaming Constants - Configuration for audio output streaming
|
||||||
// Used in: output_streaming.go for output stream timing control
|
// Used in: output_streaming.go for output stream timing control
|
||||||
// Impact: Controls output streaming frame rate and timing
|
// Impact: Controls output streaming frame rate and timing
|
||||||
|
@ -2431,8 +2466,11 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
||||||
ProcessMonitorTraditionalHz: 100.0, // 100.0 Hz traditional clock
|
ProcessMonitorTraditionalHz: 100.0, // 100.0 Hz traditional clock
|
||||||
|
|
||||||
// Batch Processing Constants
|
// Batch Processing Constants
|
||||||
BatchProcessorFramesPerBatch: 4, // 4 frames per batch
|
BatchProcessorFramesPerBatch: 4, // 4 frames per batch
|
||||||
BatchProcessorTimeout: 5 * time.Millisecond, // 5ms timeout
|
BatchProcessorTimeout: 5 * time.Millisecond, // 5ms timeout
|
||||||
|
BatchProcessorMaxQueueSize: 16, // 16 max queue size for balanced memory/performance
|
||||||
|
BatchProcessorAdaptiveThreshold: 0.8, // 0.8 threshold for adaptive batching (80% queue full)
|
||||||
|
BatchProcessorThreadPinningThreshold: 8, // 8 frames minimum for thread pinning optimization
|
||||||
|
|
||||||
// Output Streaming Constants
|
// Output Streaming Constants
|
||||||
OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS)
|
OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS)
|
||||||
|
@ -2534,6 +2572,78 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
||||||
|
|
||||||
// Goroutine Monitoring Configuration
|
// Goroutine Monitoring Configuration
|
||||||
GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval
|
GoroutineMonitorInterval: 30 * time.Second, // 30s monitoring interval
|
||||||
|
|
||||||
|
// Performance Configuration Flags - Production optimizations
|
||||||
|
// Used in: Production environments to reduce overhead and improve performance
|
||||||
|
// Impact: Controls which performance monitoring features are enabled
|
||||||
|
|
||||||
|
// EnableMetricsCollection controls detailed metrics collection.
|
||||||
|
// Used in: metrics.go, granular_metrics.go for performance tracking
|
||||||
|
// Impact: When disabled, reduces atomic operations and memory overhead.
|
||||||
|
// Default true for development, should be false in production for optimal performance.
|
||||||
|
EnableMetricsCollection: true, // Enable detailed metrics collection (default: true)
|
||||||
|
|
||||||
|
// EnableLatencyProfiling controls latency profiling and detailed tracing.
|
||||||
|
// Used in: latency_profiler.go for performance analysis
|
||||||
|
// Impact: When disabled, eliminates profiling overhead and reduces CPU usage.
|
||||||
|
// Default false to minimize overhead in production environments.
|
||||||
|
EnableLatencyProfiling: false, // Enable latency profiling and detailed tracing (default: false)
|
||||||
|
|
||||||
|
// EnableGoroutineMonitoring controls goroutine monitoring.
|
||||||
|
// Used in: goroutine_monitor.go for tracking goroutine health
|
||||||
|
// Impact: When disabled, reduces monitoring overhead and CPU usage.
|
||||||
|
// Default false to minimize overhead in production environments.
|
||||||
|
EnableGoroutineMonitoring: false, // Enable goroutine monitoring (default: false)
|
||||||
|
|
||||||
|
// EnableBatchTimeTracking controls batch processing time tracking.
|
||||||
|
// Used in: batch_audio.go for performance analysis
|
||||||
|
// Impact: When disabled, eliminates time tracking overhead.
|
||||||
|
// Default false to minimize overhead in production environments.
|
||||||
|
EnableBatchTimeTracking: false, // Enable batch processing time tracking (default: false)
|
||||||
|
|
||||||
|
// EnableDetailedLogging controls detailed debug logging.
|
||||||
|
// Used in: Throughout audio system for debugging
|
||||||
|
// Impact: When disabled, reduces logging overhead and improves performance.
|
||||||
|
// Default false to minimize overhead in production environments.
|
||||||
|
EnableDetailedLogging: false, // Enable detailed debug logging (default: false)
|
||||||
|
|
||||||
|
// Metrics Configuration - Batching and sampling for performance
|
||||||
|
// Used in: metrics.go for optimizing metrics collection overhead
|
||||||
|
// Impact: Controls how frequently metrics are updated and flushed
|
||||||
|
|
||||||
|
// MetricsFlushInterval defines batched metrics flush interval.
|
||||||
|
// Used in: metrics.go for batching metrics updates
|
||||||
|
// Impact: Higher values reduce update frequency but increase memory usage.
|
||||||
|
// Default 10 provides good balance between performance and memory.
|
||||||
|
MetricsFlushInterval: 10, // Batched metrics flush interval (default: 10)
|
||||||
|
|
||||||
|
// MetricsForceFlushNanos defines force flush after nanoseconds.
|
||||||
|
// Used in: metrics.go for ensuring metrics are not delayed too long
|
||||||
|
// Impact: Prevents metrics from being delayed indefinitely.
|
||||||
|
// Default 100ms ensures reasonable freshness while allowing batching.
|
||||||
|
MetricsForceFlushNanos: 100000000, // Force flush after nanoseconds (default: 100ms)
|
||||||
|
|
||||||
|
// MetricsSamplingRate defines sampling rate for metrics.
|
||||||
|
// Used in: metrics.go for reducing metrics collection overhead
|
||||||
|
// Impact: Values < 1.0 reduce overhead by sampling only a fraction of events.
|
||||||
|
// Default 1.0 collects all metrics, set to 0.1 in production for 90% reduction.
|
||||||
|
MetricsSamplingRate: 1.0, // Sampling rate for metrics (0.0-1.0, default: 1.0)
|
||||||
|
|
||||||
|
// Latency Profiling Configuration - Sampling for performance
|
||||||
|
// Used in: latency_profiler.go for optimizing profiling overhead
|
||||||
|
// Impact: Controls how frequently latency measurements are taken
|
||||||
|
|
||||||
|
// LatencyProfilingSamplingRate defines latency profiling sampling rate.
|
||||||
|
// Used in: latency_profiler.go for reducing profiling overhead
|
||||||
|
// Impact: Values < 1.0 significantly reduce profiling overhead.
|
||||||
|
// Default 0.01 (1%) provides useful data with minimal overhead.
|
||||||
|
LatencyProfilingSamplingRate: 0.01, // Latency profiling sampling rate (default: 0.01 = 1%)
|
||||||
|
|
||||||
|
// LatencyProfilingInterval defines latency profiling report interval.
|
||||||
|
// Used in: latency_profiler.go for controlling report frequency
|
||||||
|
// Impact: Longer intervals reduce reporting overhead.
|
||||||
|
// Default 60s provides reasonable reporting frequency.
|
||||||
|
LatencyProfilingInterval: 60 * time.Second, // Latency profiling report interval (default: 60s)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -370,6 +370,12 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
for range ticker.C {
|
for range ticker.C {
|
||||||
|
// Skip metrics broadcasting if metrics collection is disabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
aeb.mutex.RLock()
|
aeb.mutex.RLock()
|
||||||
subscriberCount := len(aeb.subscribers)
|
subscriberCount := len(aeb.subscribers)
|
||||||
|
|
||||||
|
|
|
@ -133,6 +133,11 @@ func GetGoroutineMonitor() *GoroutineMonitor {
|
||||||
|
|
||||||
// StartGoroutineMonitoring starts the global goroutine monitor
|
// StartGoroutineMonitoring starts the global goroutine monitor
|
||||||
func StartGoroutineMonitoring() {
|
func StartGoroutineMonitoring() {
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableGoroutineMonitoring() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
monitor := GetGoroutineMonitor()
|
monitor := GetGoroutineMonitor()
|
||||||
monitor.Start()
|
monitor.Start()
|
||||||
}
|
}
|
||||||
|
|
|
@ -109,12 +109,18 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
cachedConfig := GetCachedConfig()
|
||||||
|
if cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update metrics
|
// Update metrics
|
||||||
atomic.AddInt64(&aim.framesSent, 1)
|
cachedConfig := GetCachedConfig()
|
||||||
|
if cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
atomic.AddInt64(&aim.framesSent, 1)
|
||||||
|
}
|
||||||
aim.recordFrameProcessed(len(frame))
|
aim.recordFrameProcessed(len(frame))
|
||||||
aim.updateLatency(processingTime)
|
aim.updateLatency(processingTime)
|
||||||
|
|
||||||
|
|
|
@ -127,14 +127,15 @@ var (
|
||||||
|
|
||||||
// DefaultLatencyProfilerConfig returns default profiler configuration
|
// DefaultLatencyProfilerConfig returns default profiler configuration
|
||||||
func DefaultLatencyProfilerConfig() LatencyProfilerConfig {
|
func DefaultLatencyProfilerConfig() LatencyProfilerConfig {
|
||||||
|
config := GetConfig()
|
||||||
return LatencyProfilerConfig{
|
return LatencyProfilerConfig{
|
||||||
MaxMeasurements: 10000,
|
MaxMeasurements: 10000,
|
||||||
SamplingRate: 0.1, // Profile 10% of frames to minimize overhead
|
SamplingRate: config.LatencyProfilingSamplingRate, // Use configurable sampling rate
|
||||||
ReportingInterval: 30 * time.Second,
|
ReportingInterval: 30 * time.Second,
|
||||||
ThresholdWarning: 50 * time.Millisecond,
|
ThresholdWarning: 50 * time.Millisecond,
|
||||||
ThresholdCritical: 100 * time.Millisecond,
|
ThresholdCritical: 100 * time.Millisecond,
|
||||||
EnableDetailedTrace: false, // Disabled by default for performance
|
EnableDetailedTrace: false, // Disabled by default for performance
|
||||||
EnableHistogram: true,
|
EnableHistogram: config.EnableLatencyProfiling, // Only enable if profiling is enabled
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -508,6 +509,10 @@ func GetGlobalLatencyProfiler() *LatencyProfiler {
|
||||||
|
|
||||||
// EnableLatencyProfiling enables the global latency profiler
|
// EnableLatencyProfiling enables the global latency profiler
|
||||||
func EnableLatencyProfiling() error {
|
func EnableLatencyProfiling() error {
|
||||||
|
config := GetConfig()
|
||||||
|
if !config.EnableLatencyProfiling {
|
||||||
|
return fmt.Errorf("latency profiling is disabled in configuration")
|
||||||
|
}
|
||||||
profiler := GetGlobalLatencyProfiler()
|
profiler := GetGlobalLatencyProfiler()
|
||||||
return profiler.Start()
|
return profiler.Start()
|
||||||
}
|
}
|
||||||
|
@ -523,6 +528,12 @@ func DisableLatencyProfiling() {
|
||||||
|
|
||||||
// ProfileFrameLatency is a convenience function to profile a single frame's latency
|
// ProfileFrameLatency is a convenience function to profile a single frame's latency
|
||||||
func ProfileFrameLatency(frameID uint64, frameSize int, source string, fn func(*FrameLatencyTracker)) {
|
func ProfileFrameLatency(frameID uint64, frameSize int, source string, fn func(*FrameLatencyTracker)) {
|
||||||
|
config := GetConfig()
|
||||||
|
if !config.EnableLatencyProfiling {
|
||||||
|
fn(nil)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
profiler := GetGlobalLatencyProfiler()
|
profiler := GetGlobalLatencyProfiler()
|
||||||
if !profiler.IsEnabled() {
|
if !profiler.IsEnabled() {
|
||||||
fn(nil)
|
fn(nil)
|
||||||
|
|
|
@ -295,6 +295,12 @@ func (s *AudioOutputStreamer) reportStatistics() {
|
||||||
|
|
||||||
// recordFrameProcessed records a processed frame with sampling optimization
|
// recordFrameProcessed records a processed frame with sampling optimization
|
||||||
func (s *AudioOutputStreamer) recordFrameProcessed() {
|
func (s *AudioOutputStreamer) recordFrameProcessed() {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Increment local counters
|
// Increment local counters
|
||||||
frameCount := atomic.AddInt64(&s.frameCounter, 1)
|
frameCount := atomic.AddInt64(&s.frameCounter, 1)
|
||||||
atomic.AddInt64(&s.localProcessed, 1)
|
atomic.AddInt64(&s.localProcessed, 1)
|
||||||
|
@ -309,6 +315,12 @@ func (s *AudioOutputStreamer) recordFrameProcessed() {
|
||||||
|
|
||||||
// recordFrameDropped records a dropped frame with sampling optimization
|
// recordFrameDropped records a dropped frame with sampling optimization
|
||||||
func (s *AudioOutputStreamer) recordFrameDropped() {
|
func (s *AudioOutputStreamer) recordFrameDropped() {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Increment local counter
|
// Increment local counter
|
||||||
localDropped := atomic.AddInt64(&s.localDropped, 1)
|
localDropped := atomic.AddInt64(&s.localDropped, 1)
|
||||||
|
|
||||||
|
@ -321,6 +333,12 @@ func (s *AudioOutputStreamer) recordFrameDropped() {
|
||||||
|
|
||||||
// flushPendingMetrics flushes any pending sampled metrics to atomic counters
|
// flushPendingMetrics flushes any pending sampled metrics to atomic counters
|
||||||
func (s *AudioOutputStreamer) flushPendingMetrics() {
|
func (s *AudioOutputStreamer) flushPendingMetrics() {
|
||||||
|
// Check if metrics collection is enabled
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
if !cachedConfig.GetEnableMetricsCollection() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Flush remaining processed and dropped frames
|
// Flush remaining processed and dropped frames
|
||||||
localProcessed := atomic.SwapInt64(&s.localProcessed, 0)
|
localProcessed := atomic.SwapInt64(&s.localProcessed, 0)
|
||||||
localDropped := atomic.SwapInt64(&s.localDropped, 0)
|
localDropped := atomic.SwapInt64(&s.localDropped, 0)
|
||||||
|
|
|
@ -142,10 +142,14 @@ func NewZeroCopyFramePool(maxFrameSize int) *ZeroCopyFramePool {
|
||||||
|
|
||||||
// Get retrieves a zero-copy frame from the pool
|
// Get retrieves a zero-copy frame from the pool
|
||||||
func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
|
func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
|
||||||
|
// Get cached config once for all metrics operations
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
enableMetrics := cachedConfig.GetEnableMetricsCollection()
|
||||||
|
|
||||||
// Remove metrics overhead in critical path - use sampling instead
|
// Remove metrics overhead in critical path - use sampling instead
|
||||||
var wasHit bool
|
var wasHit bool
|
||||||
var startTime time.Time
|
var startTime time.Time
|
||||||
trackMetrics := atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations
|
trackMetrics := enableMetrics && atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations if enabled
|
||||||
if trackMetrics {
|
if trackMetrics {
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
}
|
}
|
||||||
|
@ -154,7 +158,9 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
|
||||||
allocationCount := atomic.LoadInt64(&p.allocationCount)
|
allocationCount := atomic.LoadInt64(&p.allocationCount)
|
||||||
if allocationCount > int64(p.maxPoolSize*2) {
|
if allocationCount > int64(p.maxPoolSize*2) {
|
||||||
// If we've allocated too many frames, force pool reuse
|
// If we've allocated too many frames, force pool reuse
|
||||||
atomic.AddInt64(&p.missCount, 1)
|
if enableMetrics {
|
||||||
|
atomic.AddInt64(&p.missCount, 1)
|
||||||
|
}
|
||||||
wasHit = true // Pool reuse counts as hit
|
wasHit = true // Pool reuse counts as hit
|
||||||
frame := p.pool.Get().(*ZeroCopyAudioFrame)
|
frame := p.pool.Get().(*ZeroCopyAudioFrame)
|
||||||
frame.mutex.Lock()
|
frame.mutex.Lock()
|
||||||
|
@ -185,7 +191,9 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
|
||||||
frame.data = frame.data[:0]
|
frame.data = frame.data[:0]
|
||||||
frame.mutex.Unlock()
|
frame.mutex.Unlock()
|
||||||
|
|
||||||
atomic.AddInt64(&p.hitCount, 1)
|
if enableMetrics {
|
||||||
|
atomic.AddInt64(&p.hitCount, 1)
|
||||||
|
}
|
||||||
|
|
||||||
// Record metrics only for sampled operations
|
// Record metrics only for sampled operations
|
||||||
if trackMetrics {
|
if trackMetrics {
|
||||||
|
@ -197,7 +205,9 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
|
||||||
p.mutex.Unlock()
|
p.mutex.Unlock()
|
||||||
|
|
||||||
// Try sync.Pool next and track allocation
|
// Try sync.Pool next and track allocation
|
||||||
atomic.AddInt64(&p.allocationCount, 1)
|
if enableMetrics {
|
||||||
|
atomic.AddInt64(&p.allocationCount, 1)
|
||||||
|
}
|
||||||
frame := p.pool.Get().(*ZeroCopyAudioFrame)
|
frame := p.pool.Get().(*ZeroCopyAudioFrame)
|
||||||
frame.mutex.Lock()
|
frame.mutex.Lock()
|
||||||
frame.refCount = 1
|
frame.refCount = 1
|
||||||
|
@ -218,9 +228,13 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
|
||||||
|
|
||||||
// Put returns a zero-copy frame to the pool
|
// Put returns a zero-copy frame to the pool
|
||||||
func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) {
|
func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) {
|
||||||
|
// Get cached config once for all metrics operations
|
||||||
|
cachedConfig := GetCachedConfig()
|
||||||
|
enableMetrics := cachedConfig.GetEnableMetricsCollection()
|
||||||
|
|
||||||
// Remove metrics overhead in critical path - use sampling instead
|
// Remove metrics overhead in critical path - use sampling instead
|
||||||
var startTime time.Time
|
var startTime time.Time
|
||||||
trackMetrics := atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations
|
trackMetrics := enableMetrics && atomic.LoadInt64(&p.counter)%100 == 0 // Sample 1% of operations if enabled
|
||||||
if trackMetrics {
|
if trackMetrics {
|
||||||
startTime = time.Now()
|
startTime = time.Now()
|
||||||
}
|
}
|
||||||
|
@ -257,7 +271,9 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) {
|
||||||
|
|
||||||
// Return to sync.Pool
|
// Return to sync.Pool
|
||||||
p.pool.Put(frame)
|
p.pool.Put(frame)
|
||||||
atomic.AddInt64(&p.counter, 1)
|
if enableMetrics {
|
||||||
|
atomic.AddInt64(&p.counter, 1)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
frame.mutex.Unlock()
|
frame.mutex.Unlock()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue