From b497444d6d2b21d13fcdbfeed03da9ba4c4d9737 Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 3 Sep 2025 23:13:36 +0000 Subject: [PATCH] [WIP] Cleanup: reduce PR complexity --- internal/audio/config_constants.go | 37 +- internal/audio/device_health.go | 514 --------------------------- internal/audio/granular_metrics.go | 263 -------------- internal/audio/latency_profiler.go | 545 ----------------------------- internal/audio/memory_metrics.go | 201 ----------- internal/audio/metrics.go | 76 +--- internal/audio/zero_copy.go | 14 +- 7 files changed, 7 insertions(+), 1643 deletions(-) delete mode 100644 internal/audio/device_health.go delete mode 100644 internal/audio/granular_metrics.go delete mode 100644 internal/audio/latency_profiler.go delete mode 100644 internal/audio/memory_metrics.go diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index 6223b427..e9ad5605 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -1483,36 +1483,7 @@ type AudioConfigConstants struct { // Default 512 bytes accommodates typical encoding variations. FrameSizeTolerance int - // Device Health Monitoring Configuration - // Used in: device_health.go for proactive device monitoring and recovery - // Impact: Controls health check frequency and recovery thresholds - - // HealthCheckIntervalMS defines interval between device health checks in milliseconds. - // Used in: DeviceHealthMonitor for periodic health assessment - // Impact: Lower values provide faster detection but increase CPU usage. - // Default 5000ms (5s) provides good balance between responsiveness and overhead. - HealthCheckIntervalMS int - - // HealthRecoveryThreshold defines number of consecutive successful operations - // required to mark a device as healthy after being unhealthy. - // Used in: DeviceHealthMonitor for recovery state management - // Impact: Higher values prevent premature recovery declarations. - // Default 3 consecutive successes ensures stable recovery. - HealthRecoveryThreshold int - - // HealthLatencyThresholdMS defines maximum acceptable latency in milliseconds - // before considering a device unhealthy. - // Used in: DeviceHealthMonitor for latency-based health assessment - // Impact: Lower values trigger recovery sooner but may cause false positives. - // Default 100ms provides reasonable threshold for real-time audio. - HealthLatencyThresholdMS int - - // HealthErrorRateLimit defines maximum error rate (0.0-1.0) before - // considering a device unhealthy. - // Used in: DeviceHealthMonitor for error rate assessment - // Impact: Lower values trigger recovery sooner for error-prone devices. - // Default 0.1 (10%) allows some transient errors while detecting problems. - HealthErrorRateLimit float64 + // Removed device health monitoring configuration - functionality not used // Latency Histogram Bucket Configuration // Used in: LatencyHistogram for granular latency measurement buckets @@ -2450,11 +2421,7 @@ func DefaultAudioConfig() *AudioConfigConstants { MinFrameSize: 1, // 1 byte minimum frame size (allow small frames) FrameSizeTolerance: 512, // 512 bytes frame size tolerance - // Device Health Monitoring Configuration - HealthCheckIntervalMS: 5000, // 5000ms (5s) health check interval - HealthRecoveryThreshold: 3, // 3 consecutive successes for recovery - HealthLatencyThresholdMS: 100, // 100ms latency threshold for health - HealthErrorRateLimit: 0.1, // 10% error rate limit for health + // Removed device health monitoring configuration - functionality not used // Latency Histogram Bucket Configuration LatencyBucket10ms: 10 * time.Millisecond, // 10ms latency bucket diff --git a/internal/audio/device_health.go b/internal/audio/device_health.go deleted file mode 100644 index 473cb1cc..00000000 --- a/internal/audio/device_health.go +++ /dev/null @@ -1,514 +0,0 @@ -package audio - -import ( - "context" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// DeviceHealthStatus represents the health status of an audio device -type DeviceHealthStatus int - -const ( - DeviceHealthUnknown DeviceHealthStatus = iota - DeviceHealthHealthy - DeviceHealthDegraded - DeviceHealthFailing - DeviceHealthCritical -) - -func (s DeviceHealthStatus) String() string { - switch s { - case DeviceHealthHealthy: - return "healthy" - case DeviceHealthDegraded: - return "degraded" - case DeviceHealthFailing: - return "failing" - case DeviceHealthCritical: - return "critical" - default: - return "unknown" - } -} - -// DeviceHealthMetrics tracks health-related metrics for audio devices -type DeviceHealthMetrics struct { - // Error tracking - ConsecutiveErrors int64 `json:"consecutive_errors"` - TotalErrors int64 `json:"total_errors"` - LastErrorTime time.Time `json:"last_error_time"` - ErrorRate float64 `json:"error_rate"` // errors per minute - - // Performance metrics - AverageLatency time.Duration `json:"average_latency"` - MaxLatency time.Duration `json:"max_latency"` - LatencySpikes int64 `json:"latency_spikes"` - Underruns int64 `json:"underruns"` - Overruns int64 `json:"overruns"` - - // Device availability - LastSuccessfulOp time.Time `json:"last_successful_op"` - DeviceDisconnects int64 `json:"device_disconnects"` - RecoveryAttempts int64 `json:"recovery_attempts"` - SuccessfulRecoveries int64 `json:"successful_recoveries"` - - // Health assessment - CurrentStatus DeviceHealthStatus `json:"current_status"` - StatusLastChanged time.Time `json:"status_last_changed"` - HealthScore float64 `json:"health_score"` // 0.0 to 1.0 -} - -// DeviceHealthMonitor monitors the health of audio devices and triggers recovery -type DeviceHealthMonitor struct { - // Atomic fields first for ARM32 alignment - running int32 - monitoringEnabled int32 - - // Configuration - checkInterval time.Duration - recoveryThreshold int - latencyThreshold time.Duration - errorRateLimit float64 // max errors per minute - - // State tracking - captureMetrics *DeviceHealthMetrics - playbackMetrics *DeviceHealthMetrics - mutex sync.RWMutex - - // Control channels - ctx context.Context - cancel context.CancelFunc - stopChan chan struct{} - doneChan chan struct{} - - // Recovery callbacks - recoveryCallbacks map[string]func() error - callbackMutex sync.RWMutex - - // Logging - logger zerolog.Logger - config *AudioConfigConstants -} - -// NewDeviceHealthMonitor creates a new device health monitor -func NewDeviceHealthMonitor() *DeviceHealthMonitor { - ctx, cancel := context.WithCancel(context.Background()) - config := GetConfig() - - return &DeviceHealthMonitor{ - checkInterval: time.Duration(config.HealthCheckIntervalMS) * time.Millisecond, - recoveryThreshold: config.HealthRecoveryThreshold, - latencyThreshold: time.Duration(config.HealthLatencyThresholdMS) * time.Millisecond, - errorRateLimit: config.HealthErrorRateLimit, - captureMetrics: &DeviceHealthMetrics{ - CurrentStatus: DeviceHealthUnknown, - HealthScore: 1.0, - }, - playbackMetrics: &DeviceHealthMetrics{ - CurrentStatus: DeviceHealthUnknown, - HealthScore: 1.0, - }, - ctx: ctx, - cancel: cancel, - stopChan: make(chan struct{}), - doneChan: make(chan struct{}), - recoveryCallbacks: make(map[string]func() error), - logger: logging.GetDefaultLogger().With().Str("component", "device-health-monitor").Logger(), - config: config, - } -} - -// Start begins health monitoring -func (dhm *DeviceHealthMonitor) Start() error { - if !atomic.CompareAndSwapInt32(&dhm.running, 0, 1) { - return fmt.Errorf("device health monitor already running") - } - - dhm.logger.Debug().Msg("device health monitor starting") - atomic.StoreInt32(&dhm.monitoringEnabled, 1) - - go dhm.monitoringLoop() - return nil -} - -// Stop stops health monitoring -func (dhm *DeviceHealthMonitor) Stop() { - if !atomic.CompareAndSwapInt32(&dhm.running, 1, 0) { - return - } - - dhm.logger.Debug().Msg("device health monitor stopping") - atomic.StoreInt32(&dhm.monitoringEnabled, 0) - - close(dhm.stopChan) - dhm.cancel() - - // Wait for monitoring loop to finish - select { - case <-dhm.doneChan: - dhm.logger.Debug().Msg("device health monitor stopped") - case <-time.After(time.Duration(dhm.config.SupervisorTimeout)): - dhm.logger.Warn().Msg("device health monitor stop timeout") - } -} - -// RegisterRecoveryCallback registers a recovery function for a specific component -func (dhm *DeviceHealthMonitor) RegisterRecoveryCallback(component string, callback func() error) { - dhm.callbackMutex.Lock() - defer dhm.callbackMutex.Unlock() - dhm.recoveryCallbacks[component] = callback - dhm.logger.Debug().Str("component", component).Msg("registered recovery callback") -} - -// RecordError records an error for health tracking -func (dhm *DeviceHealthMonitor) RecordError(deviceType string, err error) { - if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { - return - } - - dhm.mutex.Lock() - defer dhm.mutex.Unlock() - - var metrics *DeviceHealthMetrics - switch deviceType { - case "capture": - metrics = dhm.captureMetrics - case "playback": - metrics = dhm.playbackMetrics - default: - dhm.logger.Warn().Str("device_type", deviceType).Msg("unknown device type for error recording") - return - } - - atomic.AddInt64(&metrics.ConsecutiveErrors, 1) - atomic.AddInt64(&metrics.TotalErrors, 1) - metrics.LastErrorTime = time.Now() - - // Update error rate (errors per minute) - if !metrics.LastErrorTime.IsZero() { - timeSinceFirst := time.Since(metrics.LastErrorTime) - if timeSinceFirst > 0 { - metrics.ErrorRate = float64(metrics.TotalErrors) / timeSinceFirst.Minutes() - } - } - - dhm.logger.Debug(). - Str("device_type", deviceType). - Err(err). - Int64("consecutive_errors", metrics.ConsecutiveErrors). - Float64("error_rate", metrics.ErrorRate). - Msg("recorded device error") - - // Trigger immediate health assessment - dhm.assessDeviceHealth(deviceType, metrics) -} - -// RecordSuccess records a successful operation -func (dhm *DeviceHealthMonitor) RecordSuccess(deviceType string) { - if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { - return - } - - dhm.mutex.Lock() - defer dhm.mutex.Unlock() - - var metrics *DeviceHealthMetrics - switch deviceType { - case "capture": - metrics = dhm.captureMetrics - case "playback": - metrics = dhm.playbackMetrics - default: - return - } - - // Reset consecutive errors on success - atomic.StoreInt64(&metrics.ConsecutiveErrors, 0) - metrics.LastSuccessfulOp = time.Now() - - // Improve health score gradually - if metrics.HealthScore < 1.0 { - metrics.HealthScore = min(1.0, metrics.HealthScore+0.1) - } -} - -// RecordLatency records operation latency for health assessment -func (dhm *DeviceHealthMonitor) RecordLatency(deviceType string, latency time.Duration) { - if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { - return - } - - dhm.mutex.Lock() - defer dhm.mutex.Unlock() - - var metrics *DeviceHealthMetrics - switch deviceType { - case "capture": - metrics = dhm.captureMetrics - case "playback": - metrics = dhm.playbackMetrics - default: - return - } - - // Update latency metrics - if metrics.AverageLatency == 0 { - metrics.AverageLatency = latency - } else { - // Exponential moving average - metrics.AverageLatency = time.Duration(float64(metrics.AverageLatency)*0.9 + float64(latency)*0.1) - } - - if latency > metrics.MaxLatency { - metrics.MaxLatency = latency - } - - // Track latency spikes - if latency > dhm.latencyThreshold { - atomic.AddInt64(&metrics.LatencySpikes, 1) - } -} - -// RecordUnderrun records an audio underrun event -func (dhm *DeviceHealthMonitor) RecordUnderrun(deviceType string) { - if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { - return - } - - dhm.mutex.Lock() - defer dhm.mutex.Unlock() - - var metrics *DeviceHealthMetrics - switch deviceType { - case "capture": - metrics = dhm.captureMetrics - case "playback": - metrics = dhm.playbackMetrics - default: - return - } - - atomic.AddInt64(&metrics.Underruns, 1) - dhm.logger.Debug().Str("device_type", deviceType).Msg("recorded audio underrun") -} - -// RecordOverrun records an audio overrun event -func (dhm *DeviceHealthMonitor) RecordOverrun(deviceType string) { - if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { - return - } - - dhm.mutex.Lock() - defer dhm.mutex.Unlock() - - var metrics *DeviceHealthMetrics - switch deviceType { - case "capture": - metrics = dhm.captureMetrics - case "playback": - metrics = dhm.playbackMetrics - default: - return - } - - atomic.AddInt64(&metrics.Overruns, 1) - dhm.logger.Debug().Str("device_type", deviceType).Msg("recorded audio overrun") -} - -// GetHealthMetrics returns current health metrics -func (dhm *DeviceHealthMonitor) GetHealthMetrics() (capture, playback DeviceHealthMetrics) { - dhm.mutex.RLock() - defer dhm.mutex.RUnlock() - return *dhm.captureMetrics, *dhm.playbackMetrics -} - -// monitoringLoop runs the main health monitoring loop -func (dhm *DeviceHealthMonitor) monitoringLoop() { - defer close(dhm.doneChan) - - ticker := time.NewTicker(dhm.checkInterval) - defer ticker.Stop() - - for { - select { - case <-dhm.stopChan: - return - case <-dhm.ctx.Done(): - return - case <-ticker.C: - dhm.performHealthCheck() - } - } -} - -// performHealthCheck performs a comprehensive health check -func (dhm *DeviceHealthMonitor) performHealthCheck() { - dhm.mutex.Lock() - defer dhm.mutex.Unlock() - - // Assess health for both devices - dhm.assessDeviceHealth("capture", dhm.captureMetrics) - dhm.assessDeviceHealth("playback", dhm.playbackMetrics) - - // Check if recovery is needed - dhm.checkRecoveryNeeded("capture", dhm.captureMetrics) - dhm.checkRecoveryNeeded("playback", dhm.playbackMetrics) -} - -// assessDeviceHealth assesses the health status of a device -func (dhm *DeviceHealthMonitor) assessDeviceHealth(deviceType string, metrics *DeviceHealthMetrics) { - previousStatus := metrics.CurrentStatus - newStatus := dhm.calculateHealthStatus(metrics) - - if newStatus != previousStatus { - metrics.CurrentStatus = newStatus - metrics.StatusLastChanged = time.Now() - dhm.logger.Info(). - Str("device_type", deviceType). - Str("previous_status", previousStatus.String()). - Str("new_status", newStatus.String()). - Float64("health_score", metrics.HealthScore). - Msg("device health status changed") - } - - // Update health score - metrics.HealthScore = dhm.calculateHealthScore(metrics) -} - -// calculateHealthStatus determines health status based on metrics -func (dhm *DeviceHealthMonitor) calculateHealthStatus(metrics *DeviceHealthMetrics) DeviceHealthStatus { - consecutiveErrors := atomic.LoadInt64(&metrics.ConsecutiveErrors) - totalErrors := atomic.LoadInt64(&metrics.TotalErrors) - - // Critical: Too many consecutive errors or device disconnected recently - if consecutiveErrors >= int64(dhm.recoveryThreshold) { - return DeviceHealthCritical - } - - // Critical: No successful operations in a long time - if !metrics.LastSuccessfulOp.IsZero() && time.Since(metrics.LastSuccessfulOp) > time.Duration(dhm.config.SupervisorTimeout) { - return DeviceHealthCritical - } - - // Failing: High error rate or frequent latency spikes - if metrics.ErrorRate > dhm.errorRateLimit || atomic.LoadInt64(&metrics.LatencySpikes) > int64(dhm.config.MaxDroppedFrames) { - return DeviceHealthFailing - } - - // Degraded: Some errors or performance issues - if consecutiveErrors > 0 || totalErrors > int64(dhm.config.MaxDroppedFrames/2) || metrics.AverageLatency > dhm.latencyThreshold { - return DeviceHealthDegraded - } - - // Healthy: No significant issues - return DeviceHealthHealthy -} - -// calculateHealthScore calculates a numeric health score (0.0 to 1.0) -func (dhm *DeviceHealthMonitor) calculateHealthScore(metrics *DeviceHealthMetrics) float64 { - score := 1.0 - - // Penalize consecutive errors - consecutiveErrors := atomic.LoadInt64(&metrics.ConsecutiveErrors) - if consecutiveErrors > 0 { - score -= float64(consecutiveErrors) * 0.1 - } - - // Penalize high error rate - if metrics.ErrorRate > 0 { - score -= min(0.5, metrics.ErrorRate/dhm.errorRateLimit*0.5) - } - - // Penalize high latency - if metrics.AverageLatency > dhm.latencyThreshold { - excess := float64(metrics.AverageLatency-dhm.latencyThreshold) / float64(dhm.latencyThreshold) - score -= min(0.3, excess*0.3) - } - - // Penalize underruns/overruns - underruns := atomic.LoadInt64(&metrics.Underruns) - overruns := atomic.LoadInt64(&metrics.Overruns) - if underruns+overruns > 0 { - score -= min(0.2, float64(underruns+overruns)*0.01) - } - - return max(0.0, score) -} - -// checkRecoveryNeeded checks if recovery is needed and triggers it -func (dhm *DeviceHealthMonitor) checkRecoveryNeeded(deviceType string, metrics *DeviceHealthMetrics) { - if metrics.CurrentStatus == DeviceHealthCritical { - dhm.triggerRecovery(deviceType, metrics) - } -} - -// triggerRecovery triggers recovery for a device -func (dhm *DeviceHealthMonitor) triggerRecovery(deviceType string, metrics *DeviceHealthMetrics) { - atomic.AddInt64(&metrics.RecoveryAttempts, 1) - - dhm.logger.Warn(). - Str("device_type", deviceType). - Str("status", metrics.CurrentStatus.String()). - Int64("consecutive_errors", atomic.LoadInt64(&metrics.ConsecutiveErrors)). - Float64("error_rate", metrics.ErrorRate). - Msg("triggering device recovery") - - // Try registered recovery callbacks - dhm.callbackMutex.RLock() - defer dhm.callbackMutex.RUnlock() - - for component, callback := range dhm.recoveryCallbacks { - if callback != nil { - go func(comp string, cb func() error) { - if err := cb(); err != nil { - dhm.logger.Error(). - Str("component", comp). - Str("device_type", deviceType). - Err(err). - Msg("recovery callback failed") - } else { - atomic.AddInt64(&metrics.SuccessfulRecoveries, 1) - dhm.logger.Info(). - Str("component", comp). - Str("device_type", deviceType). - Msg("recovery callback succeeded") - } - }(component, callback) - } - } -} - -// Global device health monitor instance -var ( - globalDeviceHealthMonitor *DeviceHealthMonitor - deviceHealthOnce sync.Once -) - -// GetDeviceHealthMonitor returns the global device health monitor -func GetDeviceHealthMonitor() *DeviceHealthMonitor { - deviceHealthOnce.Do(func() { - globalDeviceHealthMonitor = NewDeviceHealthMonitor() - }) - return globalDeviceHealthMonitor -} - -// Helper functions for min/max -func min(a, b float64) float64 { - if a < b { - return a - } - return b -} - -func max(a, b float64) float64 { - if a > b { - return a - } - return b -} diff --git a/internal/audio/granular_metrics.go b/internal/audio/granular_metrics.go deleted file mode 100644 index 735c62f0..00000000 --- a/internal/audio/granular_metrics.go +++ /dev/null @@ -1,263 +0,0 @@ -package audio - -import ( - "sync" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// LatencyPercentiles holds calculated percentile values -type LatencyPercentiles struct { - P50 time.Duration `json:"p50"` - P95 time.Duration `json:"p95"` - P99 time.Duration `json:"p99"` - Min time.Duration `json:"min"` - Max time.Duration `json:"max"` - Avg time.Duration `json:"avg"` -} - -// BufferPoolEfficiencyMetrics tracks detailed buffer pool performance -type BufferPoolEfficiencyMetrics struct { - // Pool utilization metrics - HitRate float64 `json:"hit_rate"` - MissRate float64 `json:"miss_rate"` - UtilizationRate float64 `json:"utilization_rate"` - FragmentationRate float64 `json:"fragmentation_rate"` - - // Memory efficiency metrics - MemoryEfficiency float64 `json:"memory_efficiency"` - AllocationOverhead float64 `json:"allocation_overhead"` - ReuseEffectiveness float64 `json:"reuse_effectiveness"` - - // Performance metrics - AverageGetLatency time.Duration `json:"average_get_latency"` - AveragePutLatency time.Duration `json:"average_put_latency"` - Throughput float64 `json:"throughput"` // Operations per second -} - -// GranularMetricsCollector aggregates all granular metrics -type GranularMetricsCollector struct { - // Buffer pool efficiency tracking - framePoolMetrics *BufferPoolEfficiencyTracker - controlPoolMetrics *BufferPoolEfficiencyTracker - zeroCopyMetrics *BufferPoolEfficiencyTracker - - mutex sync.RWMutex - logger zerolog.Logger -} - -// BufferPoolEfficiencyTracker tracks detailed efficiency metrics for a buffer pool -type BufferPoolEfficiencyTracker struct { - // Atomic counters - getOperations int64 // Total get operations (atomic) - putOperations int64 // Total put operations (atomic) - getLatencySum int64 // Sum of get latencies in nanoseconds (atomic) - putLatencySum int64 // Sum of put latencies in nanoseconds (atomic) - allocationBytes int64 // Total bytes allocated (atomic) - reuseCount int64 // Number of successful reuses (atomic) - - // Recent operation times for throughput calculation - recentOps []time.Time - opsMutex sync.RWMutex - - poolName string - logger zerolog.Logger -} - -// NewBufferPoolEfficiencyTracker creates a new efficiency tracker -func NewBufferPoolEfficiencyTracker(poolName string, logger zerolog.Logger) *BufferPoolEfficiencyTracker { - return &BufferPoolEfficiencyTracker{ - recentOps: make([]time.Time, 0, 1000), // Track last 1000 operations - poolName: poolName, - logger: logger, - } -} - -// RecordGetOperation records a buffer get operation with its latency -func (bpet *BufferPoolEfficiencyTracker) RecordGetOperation(latency time.Duration, wasHit bool) { - atomic.AddInt64(&bpet.getOperations, 1) - atomic.AddInt64(&bpet.getLatencySum, latency.Nanoseconds()) - - if wasHit { - atomic.AddInt64(&bpet.reuseCount, 1) - } - - // Record operation time for throughput calculation - bpet.opsMutex.Lock() - now := time.Now() - if len(bpet.recentOps) >= 1000 { - bpet.recentOps = bpet.recentOps[1:] - } - bpet.recentOps = append(bpet.recentOps, now) - bpet.opsMutex.Unlock() -} - -// RecordPutOperation records a buffer put operation with its latency -func (bpet *BufferPoolEfficiencyTracker) RecordPutOperation(latency time.Duration, bufferSize int) { - atomic.AddInt64(&bpet.putOperations, 1) - atomic.AddInt64(&bpet.putLatencySum, latency.Nanoseconds()) - atomic.AddInt64(&bpet.allocationBytes, int64(bufferSize)) -} - -// GetEfficiencyMetrics calculates current efficiency metrics -func (bpet *BufferPoolEfficiencyTracker) GetEfficiencyMetrics() BufferPoolEfficiencyMetrics { - getOps := atomic.LoadInt64(&bpet.getOperations) - putOps := atomic.LoadInt64(&bpet.putOperations) - reuseCount := atomic.LoadInt64(&bpet.reuseCount) - getLatencySum := atomic.LoadInt64(&bpet.getLatencySum) - putLatencySum := atomic.LoadInt64(&bpet.putLatencySum) - allocationBytes := atomic.LoadInt64(&bpet.allocationBytes) - - var hitRate, missRate, avgGetLatency, avgPutLatency float64 - var throughput float64 - - if getOps > 0 { - hitRate = float64(reuseCount) / float64(getOps) * 100 - missRate = 100 - hitRate - avgGetLatency = float64(getLatencySum) / float64(getOps) - } - - if putOps > 0 { - avgPutLatency = float64(putLatencySum) / float64(putOps) - } - - // Calculate throughput from recent operations - bpet.opsMutex.RLock() - if len(bpet.recentOps) > 1 { - timeSpan := bpet.recentOps[len(bpet.recentOps)-1].Sub(bpet.recentOps[0]) - if timeSpan > 0 { - throughput = float64(len(bpet.recentOps)) / timeSpan.Seconds() - } - } - bpet.opsMutex.RUnlock() - - // Calculate efficiency metrics - utilizationRate := hitRate // Simplified: hit rate as utilization - memoryEfficiency := hitRate // Simplified: reuse rate as memory efficiency - reuseEffectiveness := hitRate - - // Calculate fragmentation (simplified as inverse of hit rate) - fragmentationRate := missRate - - // Calculate allocation overhead (simplified) - allocationOverhead := float64(0) - if getOps > 0 && allocationBytes > 0 { - allocationOverhead = float64(allocationBytes) / float64(getOps) - } - - return BufferPoolEfficiencyMetrics{ - HitRate: hitRate, - MissRate: missRate, - UtilizationRate: utilizationRate, - FragmentationRate: fragmentationRate, - MemoryEfficiency: memoryEfficiency, - AllocationOverhead: allocationOverhead, - ReuseEffectiveness: reuseEffectiveness, - AverageGetLatency: time.Duration(avgGetLatency), - AveragePutLatency: time.Duration(avgPutLatency), - Throughput: throughput, - } -} - -// NewGranularMetricsCollector creates a new granular metrics collector -func NewGranularMetricsCollector(logger zerolog.Logger) *GranularMetricsCollector { - return &GranularMetricsCollector{ - framePoolMetrics: NewBufferPoolEfficiencyTracker("frame_pool", logger.With().Str("pool", "frame").Logger()), - controlPoolMetrics: NewBufferPoolEfficiencyTracker("control_pool", logger.With().Str("pool", "control").Logger()), - zeroCopyMetrics: NewBufferPoolEfficiencyTracker("zero_copy_pool", logger.With().Str("pool", "zero_copy").Logger()), - logger: logger, - } -} - -// RecordFramePoolOperation records frame pool operations -func (gmc *GranularMetricsCollector) RecordFramePoolGet(latency time.Duration, wasHit bool) { - gmc.framePoolMetrics.RecordGetOperation(latency, wasHit) -} - -func (gmc *GranularMetricsCollector) RecordFramePoolPut(latency time.Duration, bufferSize int) { - gmc.framePoolMetrics.RecordPutOperation(latency, bufferSize) -} - -// RecordControlPoolOperation records control pool operations -func (gmc *GranularMetricsCollector) RecordControlPoolGet(latency time.Duration, wasHit bool) { - gmc.controlPoolMetrics.RecordGetOperation(latency, wasHit) -} - -func (gmc *GranularMetricsCollector) RecordControlPoolPut(latency time.Duration, bufferSize int) { - gmc.controlPoolMetrics.RecordPutOperation(latency, bufferSize) -} - -// RecordZeroCopyOperation records zero-copy pool operations -func (gmc *GranularMetricsCollector) RecordZeroCopyGet(latency time.Duration, wasHit bool) { - gmc.zeroCopyMetrics.RecordGetOperation(latency, wasHit) -} - -func (gmc *GranularMetricsCollector) RecordZeroCopyPut(latency time.Duration, bufferSize int) { - gmc.zeroCopyMetrics.RecordPutOperation(latency, bufferSize) -} - -// GetBufferPoolEfficiency returns efficiency metrics for all buffer pools -func (gmc *GranularMetricsCollector) GetBufferPoolEfficiency() map[string]BufferPoolEfficiencyMetrics { - gmc.mutex.RLock() - defer gmc.mutex.RUnlock() - - return map[string]BufferPoolEfficiencyMetrics{ - "frame_pool": gmc.framePoolMetrics.GetEfficiencyMetrics(), - "control_pool": gmc.controlPoolMetrics.GetEfficiencyMetrics(), - "zero_copy_pool": gmc.zeroCopyMetrics.GetEfficiencyMetrics(), - } -} - -// LogGranularMetrics logs comprehensive granular metrics -func (gmc *GranularMetricsCollector) LogGranularMetrics() { - bufferEfficiency := gmc.GetBufferPoolEfficiency() - - // Log buffer pool efficiency - for poolName, efficiency := range bufferEfficiency { - gmc.logger.Info(). - Str("pool", poolName). - Float64("hit_rate", efficiency.HitRate). - Float64("miss_rate", efficiency.MissRate). - Float64("utilization_rate", efficiency.UtilizationRate). - Float64("memory_efficiency", efficiency.MemoryEfficiency). - Dur("avg_get_latency", efficiency.AverageGetLatency). - Dur("avg_put_latency", efficiency.AveragePutLatency). - Float64("throughput", efficiency.Throughput). - Msg("Buffer pool efficiency metrics") - } -} - -// Global granular metrics collector instance -var ( - granularMetricsCollector *GranularMetricsCollector - granularMetricsOnce sync.Once -) - -// GetGranularMetricsCollector returns the global granular metrics collector -func GetGranularMetricsCollector() *GranularMetricsCollector { - granularMetricsOnce.Do(func() { - logger := logging.GetDefaultLogger().With().Str("component", "granular-metrics").Logger() - granularMetricsCollector = NewGranularMetricsCollector(logger) - }) - return granularMetricsCollector -} - -// StartGranularMetricsLogging starts periodic granular metrics logging -func StartGranularMetricsLogging(interval time.Duration) { - collector := GetGranularMetricsCollector() - logger := collector.logger - - logger.Info().Dur("interval", interval).Msg("Starting granular metrics logging") - - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for range ticker.C { - collector.LogGranularMetrics() - } - }() -} diff --git a/internal/audio/latency_profiler.go b/internal/audio/latency_profiler.go deleted file mode 100644 index 21d48ac1..00000000 --- a/internal/audio/latency_profiler.go +++ /dev/null @@ -1,545 +0,0 @@ -package audio - -import ( - "context" - "fmt" - "runtime" - "sync" - "sync/atomic" - "time" - "unsafe" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// LatencyProfiler provides comprehensive end-to-end audio latency profiling -// with nanosecond precision across the entire WebRTC->IPC->CGO->ALSA pipeline -type LatencyProfiler struct { - // Atomic counters for thread-safe access (MUST be first for ARM32 alignment) - totalMeasurements int64 // Total number of measurements taken - webrtcLatencySum int64 // Sum of WebRTC processing latencies (nanoseconds) - ipcLatencySum int64 // Sum of IPC communication latencies (nanoseconds) - cgoLatencySum int64 // Sum of CGO call latencies (nanoseconds) - alsaLatencySum int64 // Sum of ALSA device latencies (nanoseconds) - endToEndLatencySum int64 // Sum of complete end-to-end latencies (nanoseconds) - validationLatencySum int64 // Sum of validation overhead (nanoseconds) - serializationLatencySum int64 // Sum of serialization overhead (nanoseconds) - - // Peak latency tracking - maxWebrtcLatency int64 // Maximum WebRTC latency observed (nanoseconds) - maxIpcLatency int64 // Maximum IPC latency observed (nanoseconds) - maxCgoLatency int64 // Maximum CGO latency observed (nanoseconds) - maxAlsaLatency int64 // Maximum ALSA latency observed (nanoseconds) - maxEndToEndLatency int64 // Maximum end-to-end latency observed (nanoseconds) - - // Configuration and control - config LatencyProfilerConfig - logger zerolog.Logger - ctx context.Context - cancel context.CancelFunc - running int32 // Atomic flag for profiler state - enabled int32 // Atomic flag for measurement collection - - // Detailed measurement storage - measurements []DetailedLatencyMeasurement - measurementMutex sync.RWMutex - measurementIndex int - - // High-resolution timing - timeSource func() int64 // Nanosecond precision time source -} - -// LatencyProfilerConfig defines profiler configuration -type LatencyProfilerConfig struct { - MaxMeasurements int // Maximum measurements to store in memory - SamplingRate float64 // Sampling rate (0.0-1.0, 1.0 = profile every frame) - ReportingInterval time.Duration // How often to log profiling reports - ThresholdWarning time.Duration // Latency threshold for warnings - ThresholdCritical time.Duration // Latency threshold for critical alerts - EnableDetailedTrace bool // Enable detailed per-component tracing - EnableHistogram bool // Enable latency histogram collection -} - -// DetailedLatencyMeasurement captures comprehensive latency breakdown -type DetailedLatencyMeasurement struct { - Timestamp time.Time // When the measurement was taken - FrameID uint64 // Unique frame identifier for tracing - WebRTCLatency time.Duration // WebRTC processing time - IPCLatency time.Duration // IPC communication time - CGOLatency time.Duration // CGO call overhead - ALSALatency time.Duration // ALSA device processing time - ValidationLatency time.Duration // Frame validation overhead - SerializationLatency time.Duration // Data serialization overhead - EndToEndLatency time.Duration // Complete pipeline latency - Source string // Source component (input/output) - FrameSize int // Size of the audio frame in bytes - CPUUsage float64 // CPU usage at time of measurement - MemoryUsage uint64 // Memory usage at time of measurement -} - -// LatencyProfileReport contains aggregated profiling results -type LatencyProfileReport struct { - TotalMeasurements int64 // Total measurements taken - TimeRange time.Duration // Time span of measurements - - // Average latencies - AvgWebRTCLatency time.Duration - AvgIPCLatency time.Duration - AvgCGOLatency time.Duration - AvgALSALatency time.Duration - AvgEndToEndLatency time.Duration - AvgValidationLatency time.Duration - AvgSerializationLatency time.Duration - - // Peak latencies - MaxWebRTCLatency time.Duration - MaxIPCLatency time.Duration - MaxCGOLatency time.Duration - MaxALSALatency time.Duration - MaxEndToEndLatency time.Duration - - // Performance analysis - BottleneckComponent string // Component with highest average latency - LatencyDistribution map[string]int // Histogram of latency ranges - Throughput float64 // Frames per second processed -} - -// FrameLatencyTracker tracks latency for a single audio frame through the pipeline -type FrameLatencyTracker struct { - frameID uint64 - startTime int64 // Nanosecond timestamp - webrtcStartTime int64 - ipcStartTime int64 - cgoStartTime int64 - alsaStartTime int64 - validationStartTime int64 - serializationStartTime int64 - frameSize int - source string -} - -// Global profiler instance -var ( - globalLatencyProfiler unsafe.Pointer // *LatencyProfiler - profilerInitialized int32 -) - -// DefaultLatencyProfilerConfig returns default profiler configuration -func DefaultLatencyProfilerConfig() LatencyProfilerConfig { - return LatencyProfilerConfig{ - MaxMeasurements: 10000, - SamplingRate: 0.01, // Fixed sampling rate (1%) - ReportingInterval: 30 * time.Second, - ThresholdWarning: 50 * time.Millisecond, - ThresholdCritical: 100 * time.Millisecond, - EnableDetailedTrace: false, // Disabled by default for performance - EnableHistogram: false, // Latency profiling disabled - } -} - -// NewLatencyProfiler creates a new latency profiler -func NewLatencyProfiler(config LatencyProfilerConfig) *LatencyProfiler { - ctx, cancel := context.WithCancel(context.Background()) - logger := logging.GetDefaultLogger().With().Str("component", "latency-profiler").Logger() - - // Validate configuration - if config.MaxMeasurements <= 0 { - config.MaxMeasurements = 10000 - } - if config.SamplingRate < 0.0 || config.SamplingRate > 1.0 { - config.SamplingRate = 0.1 - } - if config.ReportingInterval <= 0 { - config.ReportingInterval = 30 * time.Second - } - - profiler := &LatencyProfiler{ - config: config, - logger: logger, - ctx: ctx, - cancel: cancel, - measurements: make([]DetailedLatencyMeasurement, config.MaxMeasurements), - timeSource: func() int64 { return time.Now().UnixNano() }, - } - - // Initialize peak latencies to zero - atomic.StoreInt64(&profiler.maxWebrtcLatency, 0) - atomic.StoreInt64(&profiler.maxIpcLatency, 0) - atomic.StoreInt64(&profiler.maxCgoLatency, 0) - atomic.StoreInt64(&profiler.maxAlsaLatency, 0) - atomic.StoreInt64(&profiler.maxEndToEndLatency, 0) - - return profiler -} - -// Start begins latency profiling -func (lp *LatencyProfiler) Start() error { - if !atomic.CompareAndSwapInt32(&lp.running, 0, 1) { - return fmt.Errorf("latency profiler already running") - } - - // Enable measurement collection - atomic.StoreInt32(&lp.enabled, 1) - - // Start reporting goroutine - go lp.reportingLoop() - - lp.logger.Info().Float64("sampling_rate", lp.config.SamplingRate).Msg("latency profiler started") - return nil -} - -// Stop stops latency profiling -func (lp *LatencyProfiler) Stop() { - if !atomic.CompareAndSwapInt32(&lp.running, 1, 0) { - return - } - - // Disable measurement collection - atomic.StoreInt32(&lp.enabled, 0) - - // Cancel context to stop reporting - lp.cancel() - - lp.logger.Info().Msg("latency profiler stopped") -} - -// IsEnabled returns whether profiling is currently enabled -func (lp *LatencyProfiler) IsEnabled() bool { - return atomic.LoadInt32(&lp.enabled) == 1 -} - -// StartFrameTracking begins tracking latency for a new audio frame -func (lp *LatencyProfiler) StartFrameTracking(frameID uint64, frameSize int, source string) *FrameLatencyTracker { - if !lp.IsEnabled() { - return nil - } - - // Apply sampling rate to reduce profiling overhead - if lp.config.SamplingRate < 1.0 { - // Simple sampling based on frame ID - if float64(frameID%100)/100.0 > lp.config.SamplingRate { - return nil - } - } - - now := lp.timeSource() - return &FrameLatencyTracker{ - frameID: frameID, - startTime: now, - frameSize: frameSize, - source: source, - } -} - -// TrackWebRTCStart marks the start of WebRTC processing -func (tracker *FrameLatencyTracker) TrackWebRTCStart() { - if tracker != nil { - tracker.webrtcStartTime = time.Now().UnixNano() - } -} - -// TrackIPCStart marks the start of IPC communication -func (tracker *FrameLatencyTracker) TrackIPCStart() { - if tracker != nil { - tracker.ipcStartTime = time.Now().UnixNano() - } -} - -// TrackCGOStart marks the start of CGO processing -func (tracker *FrameLatencyTracker) TrackCGOStart() { - if tracker != nil { - tracker.cgoStartTime = time.Now().UnixNano() - } -} - -// TrackALSAStart marks the start of ALSA device processing -func (tracker *FrameLatencyTracker) TrackALSAStart() { - if tracker != nil { - tracker.alsaStartTime = time.Now().UnixNano() - } -} - -// TrackValidationStart marks the start of frame validation -func (tracker *FrameLatencyTracker) TrackValidationStart() { - if tracker != nil { - tracker.validationStartTime = time.Now().UnixNano() - } -} - -// TrackSerializationStart marks the start of data serialization -func (tracker *FrameLatencyTracker) TrackSerializationStart() { - if tracker != nil { - tracker.serializationStartTime = time.Now().UnixNano() - } -} - -// FinishTracking completes frame tracking and records the measurement -func (lp *LatencyProfiler) FinishTracking(tracker *FrameLatencyTracker) { - if tracker == nil || !lp.IsEnabled() { - return - } - - endTime := lp.timeSource() - - // Calculate component latencies - var webrtcLatency, ipcLatency, cgoLatency, alsaLatency, validationLatency, serializationLatency time.Duration - - if tracker.webrtcStartTime > 0 { - webrtcLatency = time.Duration(tracker.ipcStartTime - tracker.webrtcStartTime) - } - if tracker.ipcStartTime > 0 { - ipcLatency = time.Duration(tracker.cgoStartTime - tracker.ipcStartTime) - } - if tracker.cgoStartTime > 0 { - cgoLatency = time.Duration(tracker.alsaStartTime - tracker.cgoStartTime) - } - if tracker.alsaStartTime > 0 { - alsaLatency = time.Duration(endTime - tracker.alsaStartTime) - } - if tracker.validationStartTime > 0 { - validationLatency = time.Duration(tracker.ipcStartTime - tracker.validationStartTime) - } - if tracker.serializationStartTime > 0 { - serializationLatency = time.Duration(tracker.cgoStartTime - tracker.serializationStartTime) - } - - endToEndLatency := time.Duration(endTime - tracker.startTime) - - // Update atomic counters - atomic.AddInt64(&lp.totalMeasurements, 1) - atomic.AddInt64(&lp.webrtcLatencySum, webrtcLatency.Nanoseconds()) - atomic.AddInt64(&lp.ipcLatencySum, ipcLatency.Nanoseconds()) - atomic.AddInt64(&lp.cgoLatencySum, cgoLatency.Nanoseconds()) - atomic.AddInt64(&lp.alsaLatencySum, alsaLatency.Nanoseconds()) - atomic.AddInt64(&lp.endToEndLatencySum, endToEndLatency.Nanoseconds()) - atomic.AddInt64(&lp.validationLatencySum, validationLatency.Nanoseconds()) - atomic.AddInt64(&lp.serializationLatencySum, serializationLatency.Nanoseconds()) - - // Update peak latencies - lp.updatePeakLatency(&lp.maxWebrtcLatency, webrtcLatency.Nanoseconds()) - lp.updatePeakLatency(&lp.maxIpcLatency, ipcLatency.Nanoseconds()) - lp.updatePeakLatency(&lp.maxCgoLatency, cgoLatency.Nanoseconds()) - lp.updatePeakLatency(&lp.maxAlsaLatency, alsaLatency.Nanoseconds()) - lp.updatePeakLatency(&lp.maxEndToEndLatency, endToEndLatency.Nanoseconds()) - - // Store detailed measurement if enabled - if lp.config.EnableDetailedTrace { - lp.storeMeasurement(DetailedLatencyMeasurement{ - Timestamp: time.Now(), - FrameID: tracker.frameID, - WebRTCLatency: webrtcLatency, - IPCLatency: ipcLatency, - CGOLatency: cgoLatency, - ALSALatency: alsaLatency, - ValidationLatency: validationLatency, - SerializationLatency: serializationLatency, - EndToEndLatency: endToEndLatency, - Source: tracker.source, - FrameSize: tracker.frameSize, - CPUUsage: lp.getCurrentCPUUsage(), - MemoryUsage: lp.getCurrentMemoryUsage(), - }) - } - - // Check for threshold violations - if endToEndLatency > lp.config.ThresholdCritical { - lp.logger.Error().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID). - Str("source", tracker.source).Msg("critical latency threshold exceeded") - } else if endToEndLatency > lp.config.ThresholdWarning { - lp.logger.Warn().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID). - Str("source", tracker.source).Msg("warning latency threshold exceeded") - } -} - -// updatePeakLatency atomically updates peak latency if new value is higher -func (lp *LatencyProfiler) updatePeakLatency(peakPtr *int64, newLatency int64) { - for { - current := atomic.LoadInt64(peakPtr) - if newLatency <= current || atomic.CompareAndSwapInt64(peakPtr, current, newLatency) { - break - } - } -} - -// storeMeasurement stores a detailed measurement in the circular buffer -func (lp *LatencyProfiler) storeMeasurement(measurement DetailedLatencyMeasurement) { - lp.measurementMutex.Lock() - defer lp.measurementMutex.Unlock() - - lp.measurements[lp.measurementIndex] = measurement - lp.measurementIndex = (lp.measurementIndex + 1) % len(lp.measurements) -} - -// GetReport generates a comprehensive latency profiling report -func (lp *LatencyProfiler) GetReport() LatencyProfileReport { - totalMeasurements := atomic.LoadInt64(&lp.totalMeasurements) - if totalMeasurements == 0 { - return LatencyProfileReport{} - } - - // Calculate averages - avgWebRTC := time.Duration(atomic.LoadInt64(&lp.webrtcLatencySum) / totalMeasurements) - avgIPC := time.Duration(atomic.LoadInt64(&lp.ipcLatencySum) / totalMeasurements) - avgCGO := time.Duration(atomic.LoadInt64(&lp.cgoLatencySum) / totalMeasurements) - avgALSA := time.Duration(atomic.LoadInt64(&lp.alsaLatencySum) / totalMeasurements) - avgEndToEnd := time.Duration(atomic.LoadInt64(&lp.endToEndLatencySum) / totalMeasurements) - avgValidation := time.Duration(atomic.LoadInt64(&lp.validationLatencySum) / totalMeasurements) - avgSerialization := time.Duration(atomic.LoadInt64(&lp.serializationLatencySum) / totalMeasurements) - - // Get peak latencies - maxWebRTC := time.Duration(atomic.LoadInt64(&lp.maxWebrtcLatency)) - maxIPC := time.Duration(atomic.LoadInt64(&lp.maxIpcLatency)) - maxCGO := time.Duration(atomic.LoadInt64(&lp.maxCgoLatency)) - maxALSA := time.Duration(atomic.LoadInt64(&lp.maxAlsaLatency)) - maxEndToEnd := time.Duration(atomic.LoadInt64(&lp.maxEndToEndLatency)) - - // Determine bottleneck component - bottleneck := "WebRTC" - maxAvg := avgWebRTC - if avgIPC > maxAvg { - bottleneck = "IPC" - maxAvg = avgIPC - } - if avgCGO > maxAvg { - bottleneck = "CGO" - maxAvg = avgCGO - } - if avgALSA > maxAvg { - bottleneck = "ALSA" - } - - return LatencyProfileReport{ - TotalMeasurements: totalMeasurements, - AvgWebRTCLatency: avgWebRTC, - AvgIPCLatency: avgIPC, - AvgCGOLatency: avgCGO, - AvgALSALatency: avgALSA, - AvgEndToEndLatency: avgEndToEnd, - AvgValidationLatency: avgValidation, - AvgSerializationLatency: avgSerialization, - MaxWebRTCLatency: maxWebRTC, - MaxIPCLatency: maxIPC, - MaxCGOLatency: maxCGO, - MaxALSALatency: maxALSA, - MaxEndToEndLatency: maxEndToEnd, - BottleneckComponent: bottleneck, - } -} - -// reportingLoop periodically logs profiling reports -func (lp *LatencyProfiler) reportingLoop() { - ticker := time.NewTicker(lp.config.ReportingInterval) - defer ticker.Stop() - - for { - select { - case <-lp.ctx.Done(): - return - case <-ticker.C: - report := lp.GetReport() - if report.TotalMeasurements > 0 { - lp.logReport(report) - } - } - } -} - -// logReport logs a comprehensive profiling report -func (lp *LatencyProfiler) logReport(report LatencyProfileReport) { - lp.logger.Info(). - Int64("total_measurements", report.TotalMeasurements). - Dur("avg_webrtc_latency", report.AvgWebRTCLatency). - Dur("avg_ipc_latency", report.AvgIPCLatency). - Dur("avg_cgo_latency", report.AvgCGOLatency). - Dur("avg_alsa_latency", report.AvgALSALatency). - Dur("avg_end_to_end_latency", report.AvgEndToEndLatency). - Dur("avg_validation_latency", report.AvgValidationLatency). - Dur("avg_serialization_latency", report.AvgSerializationLatency). - Dur("max_webrtc_latency", report.MaxWebRTCLatency). - Dur("max_ipc_latency", report.MaxIPCLatency). - Dur("max_cgo_latency", report.MaxCGOLatency). - Dur("max_alsa_latency", report.MaxALSALatency). - Dur("max_end_to_end_latency", report.MaxEndToEndLatency). - Str("bottleneck_component", report.BottleneckComponent). - Msg("latency profiling report") -} - -// getCurrentCPUUsage returns current CPU usage percentage -func (lp *LatencyProfiler) getCurrentCPUUsage() float64 { - // Simplified CPU usage - in production, this would use more sophisticated monitoring - var m runtime.MemStats - runtime.ReadMemStats(&m) - return float64(runtime.NumGoroutine()) / 100.0 // Rough approximation -} - -// getCurrentMemoryUsage returns current memory usage in bytes -func (lp *LatencyProfiler) getCurrentMemoryUsage() uint64 { - var m runtime.MemStats - runtime.ReadMemStats(&m) - return m.Alloc -} - -// GetGlobalLatencyProfiler returns the global latency profiler instance -func GetGlobalLatencyProfiler() *LatencyProfiler { - ptr := atomic.LoadPointer(&globalLatencyProfiler) - if ptr != nil { - return (*LatencyProfiler)(ptr) - } - - // Initialize on first use - if atomic.CompareAndSwapInt32(&profilerInitialized, 0, 1) { - config := DefaultLatencyProfilerConfig() - profiler := NewLatencyProfiler(config) - atomic.StorePointer(&globalLatencyProfiler, unsafe.Pointer(profiler)) - return profiler - } - - // Another goroutine initialized it, try again - ptr = atomic.LoadPointer(&globalLatencyProfiler) - if ptr != nil { - return (*LatencyProfiler)(ptr) - } - - // Fallback: create a new profiler - config := DefaultLatencyProfilerConfig() - return NewLatencyProfiler(config) -} - -// EnableLatencyProfiling enables the global latency profiler -func EnableLatencyProfiling() error { - // Latency profiling disabled - if true { - return fmt.Errorf("latency profiling is disabled in configuration") - } - profiler := GetGlobalLatencyProfiler() - return profiler.Start() -} - -// DisableLatencyProfiling disables the global latency profiler -func DisableLatencyProfiling() { - ptr := atomic.LoadPointer(&globalLatencyProfiler) - if ptr != nil { - profiler := (*LatencyProfiler)(ptr) - profiler.Stop() - } -} - -// ProfileFrameLatency is a convenience function to profile a single frame's latency -func ProfileFrameLatency(frameID uint64, frameSize int, source string, fn func(*FrameLatencyTracker)) { - // Latency profiling disabled - if true { - fn(nil) - return - } - - profiler := GetGlobalLatencyProfiler() - if !profiler.IsEnabled() { - fn(nil) - return - } - - tracker := profiler.StartFrameTracking(frameID, frameSize, source) - defer profiler.FinishTracking(tracker) - fn(tracker) -} diff --git a/internal/audio/memory_metrics.go b/internal/audio/memory_metrics.go deleted file mode 100644 index 88feb38e..00000000 --- a/internal/audio/memory_metrics.go +++ /dev/null @@ -1,201 +0,0 @@ -package audio - -import ( - "encoding/json" - "net/http" - "runtime" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// MemoryMetrics provides comprehensive memory allocation statistics -type MemoryMetrics struct { - // Runtime memory statistics - RuntimeStats RuntimeMemoryStats `json:"runtime_stats"` - // Audio buffer pool statistics - BufferPools AudioBufferPoolStats `json:"buffer_pools"` - // Zero-copy frame pool statistics - ZeroCopyPool ZeroCopyFramePoolStats `json:"zero_copy_pool"` - // Message pool statistics - MessagePool MessagePoolStats `json:"message_pool"` - // Batch processor statistics - BatchProcessor BatchProcessorMemoryStats `json:"batch_processor,omitempty"` - // Collection timestamp - Timestamp time.Time `json:"timestamp"` -} - -// RuntimeMemoryStats provides Go runtime memory statistics -type RuntimeMemoryStats struct { - Alloc uint64 `json:"alloc"` // Bytes allocated and not yet freed - TotalAlloc uint64 `json:"total_alloc"` // Total bytes allocated (cumulative) - Sys uint64 `json:"sys"` // Total bytes obtained from OS - Lookups uint64 `json:"lookups"` // Number of pointer lookups - Mallocs uint64 `json:"mallocs"` // Number of mallocs - Frees uint64 `json:"frees"` // Number of frees - HeapAlloc uint64 `json:"heap_alloc"` // Bytes allocated and not yet freed (heap) - HeapSys uint64 `json:"heap_sys"` // Bytes obtained from OS for heap - HeapIdle uint64 `json:"heap_idle"` // Bytes in idle spans - HeapInuse uint64 `json:"heap_inuse"` // Bytes in non-idle spans - HeapReleased uint64 `json:"heap_released"` // Bytes released to OS - HeapObjects uint64 `json:"heap_objects"` // Total number of allocated objects - StackInuse uint64 `json:"stack_inuse"` // Bytes used by stack spans - StackSys uint64 `json:"stack_sys"` // Bytes obtained from OS for stack - MSpanInuse uint64 `json:"mspan_inuse"` // Bytes used by mspan structures - MSpanSys uint64 `json:"mspan_sys"` // Bytes obtained from OS for mspan - MCacheInuse uint64 `json:"mcache_inuse"` // Bytes used by mcache structures - MCacheSys uint64 `json:"mcache_sys"` // Bytes obtained from OS for mcache - BuckHashSys uint64 `json:"buck_hash_sys"` // Bytes used by profiling bucket hash table - GCSys uint64 `json:"gc_sys"` // Bytes used for garbage collection metadata - OtherSys uint64 `json:"other_sys"` // Bytes used for other system allocations - NextGC uint64 `json:"next_gc"` // Target heap size for next GC - LastGC uint64 `json:"last_gc"` // Time of last GC (nanoseconds since epoch) - PauseTotalNs uint64 `json:"pause_total_ns"` // Total GC pause time - NumGC uint32 `json:"num_gc"` // Number of completed GC cycles - NumForcedGC uint32 `json:"num_forced_gc"` // Number of forced GC cycles - GCCPUFraction float64 `json:"gc_cpu_fraction"` // Fraction of CPU time used by GC -} - -// BatchProcessorMemoryStats provides batch processor memory statistics -type BatchProcessorMemoryStats struct { - Initialized bool `json:"initialized"` - Running bool `json:"running"` - Stats BatchAudioStats `json:"stats"` - BufferPool AudioBufferPoolDetailedStats `json:"buffer_pool,omitempty"` -} - -// GetBatchAudioProcessor is defined in batch_audio.go -// BatchAudioStats is defined in batch_audio.go - -var memoryMetricsLogger *zerolog.Logger - -func getMemoryMetricsLogger() *zerolog.Logger { - if memoryMetricsLogger == nil { - logger := logging.GetDefaultLogger().With().Str("component", "memory-metrics").Logger() - memoryMetricsLogger = &logger - } - return memoryMetricsLogger -} - -// CollectMemoryMetrics gathers comprehensive memory allocation statistics -func CollectMemoryMetrics() MemoryMetrics { - // Collect runtime memory statistics - var m runtime.MemStats - runtime.ReadMemStats(&m) - - runtimeStats := RuntimeMemoryStats{ - Alloc: m.Alloc, - TotalAlloc: m.TotalAlloc, - Sys: m.Sys, - Lookups: m.Lookups, - Mallocs: m.Mallocs, - Frees: m.Frees, - HeapAlloc: m.HeapAlloc, - HeapSys: m.HeapSys, - HeapIdle: m.HeapIdle, - HeapInuse: m.HeapInuse, - HeapReleased: m.HeapReleased, - HeapObjects: m.HeapObjects, - StackInuse: m.StackInuse, - StackSys: m.StackSys, - MSpanInuse: m.MSpanInuse, - MSpanSys: m.MSpanSys, - MCacheInuse: m.MCacheInuse, - MCacheSys: m.MCacheSys, - BuckHashSys: m.BuckHashSys, - GCSys: m.GCSys, - OtherSys: m.OtherSys, - NextGC: m.NextGC, - LastGC: m.LastGC, - PauseTotalNs: m.PauseTotalNs, - NumGC: m.NumGC, - NumForcedGC: m.NumForcedGC, - GCCPUFraction: m.GCCPUFraction, - } - - // Collect audio buffer pool statistics - bufferPoolStats := GetAudioBufferPoolStats() - - // Collect zero-copy frame pool statistics - zeroCopyStats := GetGlobalZeroCopyPoolStats() - - // Collect message pool statistics - messagePoolStats := GetGlobalMessagePoolStats() - - // Collect batch processor statistics if available - var batchStats BatchProcessorMemoryStats - if processor := GetBatchAudioProcessor(); processor != nil { - batchStats.Initialized = true - batchStats.Running = processor.IsRunning() - batchStats.Stats = processor.GetStats() - // Note: BatchAudioProcessor uses sync.Pool, detailed stats not available - } - - return MemoryMetrics{ - RuntimeStats: runtimeStats, - BufferPools: bufferPoolStats, - ZeroCopyPool: zeroCopyStats, - MessagePool: messagePoolStats, - BatchProcessor: batchStats, - Timestamp: time.Now(), - } -} - -// HandleMemoryMetrics provides an HTTP handler for memory metrics -func HandleMemoryMetrics(w http.ResponseWriter, r *http.Request) { - logger := getMemoryMetricsLogger() - - if r.Method != http.MethodGet { - http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) - return - } - - metrics := CollectMemoryMetrics() - - w.Header().Set("Content-Type", "application/json") - w.Header().Set("Cache-Control", "no-cache") - - encoder := json.NewEncoder(w) - encoder.SetIndent("", " ") - - if err := encoder.Encode(metrics); err != nil { - logger.Error().Err(err).Msg("failed to encode memory metrics") - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - - logger.Debug().Msg("memory metrics served") -} - -// LogMemoryMetrics logs current memory metrics for debugging -func LogMemoryMetrics() { - logger := getMemoryMetricsLogger() - metrics := CollectMemoryMetrics() - - logger.Info(). - Uint64("heap_alloc_mb", metrics.RuntimeStats.HeapAlloc/uint64(GetConfig().BytesToMBDivisor)). - Uint64("heap_sys_mb", metrics.RuntimeStats.HeapSys/uint64(GetConfig().BytesToMBDivisor)). - Uint64("heap_objects", metrics.RuntimeStats.HeapObjects). - Uint32("num_gc", metrics.RuntimeStats.NumGC). - Float64("gc_cpu_fraction", metrics.RuntimeStats.GCCPUFraction). - Float64("buffer_pool_hit_rate", metrics.BufferPools.FramePoolHitRate). - Float64("zero_copy_hit_rate", metrics.ZeroCopyPool.HitRate). - Float64("message_pool_hit_rate", metrics.MessagePool.HitRate). - Msg("memory metrics snapshot") -} - -// StartMemoryMetricsLogging starts periodic memory metrics logging -func StartMemoryMetricsLogging(interval time.Duration) { - logger := getMemoryMetricsLogger() - logger.Debug().Dur("interval", interval).Msg("memory metrics logging started") - - go func() { - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for range ticker.C { - LogMemoryMetrics() - } - }() -} diff --git a/internal/audio/metrics.go b/internal/audio/metrics.go index d3597f45..4247b5cd 100644 --- a/internal/audio/metrics.go +++ b/internal/audio/metrics.go @@ -288,45 +288,7 @@ var ( ) // Device health metrics - deviceHealthStatus = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_device_health_status", - Help: "Current device health status (0=Healthy, 1=Degraded, 2=Failing, 3=Critical)", - }, - []string{"device_type"}, // device_type: capture, playback - ) - - deviceHealthScore = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_device_health_score", - Help: "Device health score (0.0-1.0, higher is better)", - }, - []string{"device_type"}, // device_type: capture, playback - ) - - deviceConsecutiveErrors = promauto.NewGaugeVec( - prometheus.GaugeOpts{ - Name: "jetkvm_audio_device_consecutive_errors", - Help: "Number of consecutive errors for device", - }, - []string{"device_type"}, // device_type: capture, playback - ) - - deviceTotalErrors = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "jetkvm_audio_device_total_errors", - Help: "Total number of errors for device", - }, - []string{"device_type"}, // device_type: capture, playback - ) - - deviceLatencySpikes = promauto.NewCounterVec( - prometheus.CounterOpts{ - Name: "jetkvm_audio_device_latency_spikes_total", - Help: "Total number of latency spikes for device", - }, - []string{"device_type"}, // device_type: capture, playback - ) + // Removed device health metrics - functionality not used // Memory metrics memoryHeapAllocBytes = promauto.NewGauge( @@ -436,11 +398,7 @@ var ( micBytesProcessedValue int64 micConnectionDropsValue int64 - // Atomic counters for device health metrics - deviceCaptureErrorsValue int64 - devicePlaybackErrorsValue int64 - deviceCaptureSpikesValue int64 - devicePlaybackSpikesValue int64 + // Atomic counters for device health metrics - functionality removed, no longer used // Atomic counter for memory GC memoryGCCountValue uint32 @@ -639,34 +597,8 @@ func UpdateSocketBufferMetrics(component, bufferType string, size, utilization f atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } -// UpdateDeviceHealthMetrics updates device health metrics -func UpdateDeviceHealthMetrics(deviceType string, status int, healthScore float64, consecutiveErrors, totalErrors, latencySpikes int64) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - deviceHealthStatus.WithLabelValues(deviceType).Set(float64(status)) - deviceHealthScore.WithLabelValues(deviceType).Set(healthScore) - deviceConsecutiveErrors.WithLabelValues(deviceType).Set(float64(consecutiveErrors)) - - // Update error counters with delta calculation - var prevErrors, prevSpikes int64 - if deviceType == "capture" { - prevErrors = atomic.SwapInt64(&deviceCaptureErrorsValue, totalErrors) - prevSpikes = atomic.SwapInt64(&deviceCaptureSpikesValue, latencySpikes) - } else { - prevErrors = atomic.SwapInt64(&devicePlaybackErrorsValue, totalErrors) - prevSpikes = atomic.SwapInt64(&devicePlaybackSpikesValue, latencySpikes) - } - - if prevErrors > 0 && totalErrors > prevErrors { - deviceTotalErrors.WithLabelValues(deviceType).Add(float64(totalErrors - prevErrors)) - } - if prevSpikes > 0 && latencySpikes > prevSpikes { - deviceLatencySpikes.WithLabelValues(deviceType).Add(float64(latencySpikes - prevSpikes)) - } - - atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) -} +// UpdateDeviceHealthMetrics - Device health monitoring functionality has been removed +// This function is no longer used as device health monitoring is not implemented // UpdateMemoryMetrics updates memory metrics func UpdateMemoryMetrics() { diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index ce066a17..924d895f 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -3,7 +3,6 @@ package audio import ( "sync" "sync/atomic" - "time" "unsafe" ) @@ -188,13 +187,6 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame { // Put returns a zero-copy frame to the pool func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { - // Metrics collection removed - var startTime time.Time - trackMetrics := false // Metrics disabled - if false { - startTime = time.Now() - } - if frame == nil || !frame.pooled { return } @@ -235,11 +227,7 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) { frame.mutex.Unlock() } - // Record metrics only for sampled operations - if trackMetrics { - latency := time.Since(startTime) - GetGranularMetricsCollector().RecordZeroCopyPut(latency, frame.capacity) - } + // Metrics recording removed - granular metrics collector was unused } // Data returns the frame data as a slice (zero-copy view)