[WIP] Cleanup: reduce PR complexity

This commit is contained in:
Alex P 2025-09-03 23:13:36 +00:00
parent 476a245598
commit b497444d6d
7 changed files with 7 additions and 1643 deletions

View File

@ -1483,36 +1483,7 @@ type AudioConfigConstants struct {
// Default 512 bytes accommodates typical encoding variations.
FrameSizeTolerance int
// Device Health Monitoring Configuration
// Used in: device_health.go for proactive device monitoring and recovery
// Impact: Controls health check frequency and recovery thresholds
// HealthCheckIntervalMS defines interval between device health checks in milliseconds.
// Used in: DeviceHealthMonitor for periodic health assessment
// Impact: Lower values provide faster detection but increase CPU usage.
// Default 5000ms (5s) provides good balance between responsiveness and overhead.
HealthCheckIntervalMS int
// HealthRecoveryThreshold defines number of consecutive successful operations
// required to mark a device as healthy after being unhealthy.
// Used in: DeviceHealthMonitor for recovery state management
// Impact: Higher values prevent premature recovery declarations.
// Default 3 consecutive successes ensures stable recovery.
HealthRecoveryThreshold int
// HealthLatencyThresholdMS defines maximum acceptable latency in milliseconds
// before considering a device unhealthy.
// Used in: DeviceHealthMonitor for latency-based health assessment
// Impact: Lower values trigger recovery sooner but may cause false positives.
// Default 100ms provides reasonable threshold for real-time audio.
HealthLatencyThresholdMS int
// HealthErrorRateLimit defines maximum error rate (0.0-1.0) before
// considering a device unhealthy.
// Used in: DeviceHealthMonitor for error rate assessment
// Impact: Lower values trigger recovery sooner for error-prone devices.
// Default 0.1 (10%) allows some transient errors while detecting problems.
HealthErrorRateLimit float64
// Removed device health monitoring configuration - functionality not used
// Latency Histogram Bucket Configuration
// Used in: LatencyHistogram for granular latency measurement buckets
@ -2450,11 +2421,7 @@ func DefaultAudioConfig() *AudioConfigConstants {
MinFrameSize: 1, // 1 byte minimum frame size (allow small frames)
FrameSizeTolerance: 512, // 512 bytes frame size tolerance
// Device Health Monitoring Configuration
HealthCheckIntervalMS: 5000, // 5000ms (5s) health check interval
HealthRecoveryThreshold: 3, // 3 consecutive successes for recovery
HealthLatencyThresholdMS: 100, // 100ms latency threshold for health
HealthErrorRateLimit: 0.1, // 10% error rate limit for health
// Removed device health monitoring configuration - functionality not used
// Latency Histogram Bucket Configuration
LatencyBucket10ms: 10 * time.Millisecond, // 10ms latency bucket

View File

@ -1,514 +0,0 @@
package audio
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// DeviceHealthStatus represents the health status of an audio device
type DeviceHealthStatus int
const (
DeviceHealthUnknown DeviceHealthStatus = iota
DeviceHealthHealthy
DeviceHealthDegraded
DeviceHealthFailing
DeviceHealthCritical
)
func (s DeviceHealthStatus) String() string {
switch s {
case DeviceHealthHealthy:
return "healthy"
case DeviceHealthDegraded:
return "degraded"
case DeviceHealthFailing:
return "failing"
case DeviceHealthCritical:
return "critical"
default:
return "unknown"
}
}
// DeviceHealthMetrics tracks health-related metrics for audio devices
type DeviceHealthMetrics struct {
// Error tracking
ConsecutiveErrors int64 `json:"consecutive_errors"`
TotalErrors int64 `json:"total_errors"`
LastErrorTime time.Time `json:"last_error_time"`
ErrorRate float64 `json:"error_rate"` // errors per minute
// Performance metrics
AverageLatency time.Duration `json:"average_latency"`
MaxLatency time.Duration `json:"max_latency"`
LatencySpikes int64 `json:"latency_spikes"`
Underruns int64 `json:"underruns"`
Overruns int64 `json:"overruns"`
// Device availability
LastSuccessfulOp time.Time `json:"last_successful_op"`
DeviceDisconnects int64 `json:"device_disconnects"`
RecoveryAttempts int64 `json:"recovery_attempts"`
SuccessfulRecoveries int64 `json:"successful_recoveries"`
// Health assessment
CurrentStatus DeviceHealthStatus `json:"current_status"`
StatusLastChanged time.Time `json:"status_last_changed"`
HealthScore float64 `json:"health_score"` // 0.0 to 1.0
}
// DeviceHealthMonitor monitors the health of audio devices and triggers recovery
type DeviceHealthMonitor struct {
// Atomic fields first for ARM32 alignment
running int32
monitoringEnabled int32
// Configuration
checkInterval time.Duration
recoveryThreshold int
latencyThreshold time.Duration
errorRateLimit float64 // max errors per minute
// State tracking
captureMetrics *DeviceHealthMetrics
playbackMetrics *DeviceHealthMetrics
mutex sync.RWMutex
// Control channels
ctx context.Context
cancel context.CancelFunc
stopChan chan struct{}
doneChan chan struct{}
// Recovery callbacks
recoveryCallbacks map[string]func() error
callbackMutex sync.RWMutex
// Logging
logger zerolog.Logger
config *AudioConfigConstants
}
// NewDeviceHealthMonitor creates a new device health monitor
func NewDeviceHealthMonitor() *DeviceHealthMonitor {
ctx, cancel := context.WithCancel(context.Background())
config := GetConfig()
return &DeviceHealthMonitor{
checkInterval: time.Duration(config.HealthCheckIntervalMS) * time.Millisecond,
recoveryThreshold: config.HealthRecoveryThreshold,
latencyThreshold: time.Duration(config.HealthLatencyThresholdMS) * time.Millisecond,
errorRateLimit: config.HealthErrorRateLimit,
captureMetrics: &DeviceHealthMetrics{
CurrentStatus: DeviceHealthUnknown,
HealthScore: 1.0,
},
playbackMetrics: &DeviceHealthMetrics{
CurrentStatus: DeviceHealthUnknown,
HealthScore: 1.0,
},
ctx: ctx,
cancel: cancel,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
recoveryCallbacks: make(map[string]func() error),
logger: logging.GetDefaultLogger().With().Str("component", "device-health-monitor").Logger(),
config: config,
}
}
// Start begins health monitoring
func (dhm *DeviceHealthMonitor) Start() error {
if !atomic.CompareAndSwapInt32(&dhm.running, 0, 1) {
return fmt.Errorf("device health monitor already running")
}
dhm.logger.Debug().Msg("device health monitor starting")
atomic.StoreInt32(&dhm.monitoringEnabled, 1)
go dhm.monitoringLoop()
return nil
}
// Stop stops health monitoring
func (dhm *DeviceHealthMonitor) Stop() {
if !atomic.CompareAndSwapInt32(&dhm.running, 1, 0) {
return
}
dhm.logger.Debug().Msg("device health monitor stopping")
atomic.StoreInt32(&dhm.monitoringEnabled, 0)
close(dhm.stopChan)
dhm.cancel()
// Wait for monitoring loop to finish
select {
case <-dhm.doneChan:
dhm.logger.Debug().Msg("device health monitor stopped")
case <-time.After(time.Duration(dhm.config.SupervisorTimeout)):
dhm.logger.Warn().Msg("device health monitor stop timeout")
}
}
// RegisterRecoveryCallback registers a recovery function for a specific component
func (dhm *DeviceHealthMonitor) RegisterRecoveryCallback(component string, callback func() error) {
dhm.callbackMutex.Lock()
defer dhm.callbackMutex.Unlock()
dhm.recoveryCallbacks[component] = callback
dhm.logger.Debug().Str("component", component).Msg("registered recovery callback")
}
// RecordError records an error for health tracking
func (dhm *DeviceHealthMonitor) RecordError(deviceType string, err error) {
if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 {
return
}
dhm.mutex.Lock()
defer dhm.mutex.Unlock()
var metrics *DeviceHealthMetrics
switch deviceType {
case "capture":
metrics = dhm.captureMetrics
case "playback":
metrics = dhm.playbackMetrics
default:
dhm.logger.Warn().Str("device_type", deviceType).Msg("unknown device type for error recording")
return
}
atomic.AddInt64(&metrics.ConsecutiveErrors, 1)
atomic.AddInt64(&metrics.TotalErrors, 1)
metrics.LastErrorTime = time.Now()
// Update error rate (errors per minute)
if !metrics.LastErrorTime.IsZero() {
timeSinceFirst := time.Since(metrics.LastErrorTime)
if timeSinceFirst > 0 {
metrics.ErrorRate = float64(metrics.TotalErrors) / timeSinceFirst.Minutes()
}
}
dhm.logger.Debug().
Str("device_type", deviceType).
Err(err).
Int64("consecutive_errors", metrics.ConsecutiveErrors).
Float64("error_rate", metrics.ErrorRate).
Msg("recorded device error")
// Trigger immediate health assessment
dhm.assessDeviceHealth(deviceType, metrics)
}
// RecordSuccess records a successful operation
func (dhm *DeviceHealthMonitor) RecordSuccess(deviceType string) {
if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 {
return
}
dhm.mutex.Lock()
defer dhm.mutex.Unlock()
var metrics *DeviceHealthMetrics
switch deviceType {
case "capture":
metrics = dhm.captureMetrics
case "playback":
metrics = dhm.playbackMetrics
default:
return
}
// Reset consecutive errors on success
atomic.StoreInt64(&metrics.ConsecutiveErrors, 0)
metrics.LastSuccessfulOp = time.Now()
// Improve health score gradually
if metrics.HealthScore < 1.0 {
metrics.HealthScore = min(1.0, metrics.HealthScore+0.1)
}
}
// RecordLatency records operation latency for health assessment
func (dhm *DeviceHealthMonitor) RecordLatency(deviceType string, latency time.Duration) {
if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 {
return
}
dhm.mutex.Lock()
defer dhm.mutex.Unlock()
var metrics *DeviceHealthMetrics
switch deviceType {
case "capture":
metrics = dhm.captureMetrics
case "playback":
metrics = dhm.playbackMetrics
default:
return
}
// Update latency metrics
if metrics.AverageLatency == 0 {
metrics.AverageLatency = latency
} else {
// Exponential moving average
metrics.AverageLatency = time.Duration(float64(metrics.AverageLatency)*0.9 + float64(latency)*0.1)
}
if latency > metrics.MaxLatency {
metrics.MaxLatency = latency
}
// Track latency spikes
if latency > dhm.latencyThreshold {
atomic.AddInt64(&metrics.LatencySpikes, 1)
}
}
// RecordUnderrun records an audio underrun event
func (dhm *DeviceHealthMonitor) RecordUnderrun(deviceType string) {
if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 {
return
}
dhm.mutex.Lock()
defer dhm.mutex.Unlock()
var metrics *DeviceHealthMetrics
switch deviceType {
case "capture":
metrics = dhm.captureMetrics
case "playback":
metrics = dhm.playbackMetrics
default:
return
}
atomic.AddInt64(&metrics.Underruns, 1)
dhm.logger.Debug().Str("device_type", deviceType).Msg("recorded audio underrun")
}
// RecordOverrun records an audio overrun event
func (dhm *DeviceHealthMonitor) RecordOverrun(deviceType string) {
if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 {
return
}
dhm.mutex.Lock()
defer dhm.mutex.Unlock()
var metrics *DeviceHealthMetrics
switch deviceType {
case "capture":
metrics = dhm.captureMetrics
case "playback":
metrics = dhm.playbackMetrics
default:
return
}
atomic.AddInt64(&metrics.Overruns, 1)
dhm.logger.Debug().Str("device_type", deviceType).Msg("recorded audio overrun")
}
// GetHealthMetrics returns current health metrics
func (dhm *DeviceHealthMonitor) GetHealthMetrics() (capture, playback DeviceHealthMetrics) {
dhm.mutex.RLock()
defer dhm.mutex.RUnlock()
return *dhm.captureMetrics, *dhm.playbackMetrics
}
// monitoringLoop runs the main health monitoring loop
func (dhm *DeviceHealthMonitor) monitoringLoop() {
defer close(dhm.doneChan)
ticker := time.NewTicker(dhm.checkInterval)
defer ticker.Stop()
for {
select {
case <-dhm.stopChan:
return
case <-dhm.ctx.Done():
return
case <-ticker.C:
dhm.performHealthCheck()
}
}
}
// performHealthCheck performs a comprehensive health check
func (dhm *DeviceHealthMonitor) performHealthCheck() {
dhm.mutex.Lock()
defer dhm.mutex.Unlock()
// Assess health for both devices
dhm.assessDeviceHealth("capture", dhm.captureMetrics)
dhm.assessDeviceHealth("playback", dhm.playbackMetrics)
// Check if recovery is needed
dhm.checkRecoveryNeeded("capture", dhm.captureMetrics)
dhm.checkRecoveryNeeded("playback", dhm.playbackMetrics)
}
// assessDeviceHealth assesses the health status of a device
func (dhm *DeviceHealthMonitor) assessDeviceHealth(deviceType string, metrics *DeviceHealthMetrics) {
previousStatus := metrics.CurrentStatus
newStatus := dhm.calculateHealthStatus(metrics)
if newStatus != previousStatus {
metrics.CurrentStatus = newStatus
metrics.StatusLastChanged = time.Now()
dhm.logger.Info().
Str("device_type", deviceType).
Str("previous_status", previousStatus.String()).
Str("new_status", newStatus.String()).
Float64("health_score", metrics.HealthScore).
Msg("device health status changed")
}
// Update health score
metrics.HealthScore = dhm.calculateHealthScore(metrics)
}
// calculateHealthStatus determines health status based on metrics
func (dhm *DeviceHealthMonitor) calculateHealthStatus(metrics *DeviceHealthMetrics) DeviceHealthStatus {
consecutiveErrors := atomic.LoadInt64(&metrics.ConsecutiveErrors)
totalErrors := atomic.LoadInt64(&metrics.TotalErrors)
// Critical: Too many consecutive errors or device disconnected recently
if consecutiveErrors >= int64(dhm.recoveryThreshold) {
return DeviceHealthCritical
}
// Critical: No successful operations in a long time
if !metrics.LastSuccessfulOp.IsZero() && time.Since(metrics.LastSuccessfulOp) > time.Duration(dhm.config.SupervisorTimeout) {
return DeviceHealthCritical
}
// Failing: High error rate or frequent latency spikes
if metrics.ErrorRate > dhm.errorRateLimit || atomic.LoadInt64(&metrics.LatencySpikes) > int64(dhm.config.MaxDroppedFrames) {
return DeviceHealthFailing
}
// Degraded: Some errors or performance issues
if consecutiveErrors > 0 || totalErrors > int64(dhm.config.MaxDroppedFrames/2) || metrics.AverageLatency > dhm.latencyThreshold {
return DeviceHealthDegraded
}
// Healthy: No significant issues
return DeviceHealthHealthy
}
// calculateHealthScore calculates a numeric health score (0.0 to 1.0)
func (dhm *DeviceHealthMonitor) calculateHealthScore(metrics *DeviceHealthMetrics) float64 {
score := 1.0
// Penalize consecutive errors
consecutiveErrors := atomic.LoadInt64(&metrics.ConsecutiveErrors)
if consecutiveErrors > 0 {
score -= float64(consecutiveErrors) * 0.1
}
// Penalize high error rate
if metrics.ErrorRate > 0 {
score -= min(0.5, metrics.ErrorRate/dhm.errorRateLimit*0.5)
}
// Penalize high latency
if metrics.AverageLatency > dhm.latencyThreshold {
excess := float64(metrics.AverageLatency-dhm.latencyThreshold) / float64(dhm.latencyThreshold)
score -= min(0.3, excess*0.3)
}
// Penalize underruns/overruns
underruns := atomic.LoadInt64(&metrics.Underruns)
overruns := atomic.LoadInt64(&metrics.Overruns)
if underruns+overruns > 0 {
score -= min(0.2, float64(underruns+overruns)*0.01)
}
return max(0.0, score)
}
// checkRecoveryNeeded checks if recovery is needed and triggers it
func (dhm *DeviceHealthMonitor) checkRecoveryNeeded(deviceType string, metrics *DeviceHealthMetrics) {
if metrics.CurrentStatus == DeviceHealthCritical {
dhm.triggerRecovery(deviceType, metrics)
}
}
// triggerRecovery triggers recovery for a device
func (dhm *DeviceHealthMonitor) triggerRecovery(deviceType string, metrics *DeviceHealthMetrics) {
atomic.AddInt64(&metrics.RecoveryAttempts, 1)
dhm.logger.Warn().
Str("device_type", deviceType).
Str("status", metrics.CurrentStatus.String()).
Int64("consecutive_errors", atomic.LoadInt64(&metrics.ConsecutiveErrors)).
Float64("error_rate", metrics.ErrorRate).
Msg("triggering device recovery")
// Try registered recovery callbacks
dhm.callbackMutex.RLock()
defer dhm.callbackMutex.RUnlock()
for component, callback := range dhm.recoveryCallbacks {
if callback != nil {
go func(comp string, cb func() error) {
if err := cb(); err != nil {
dhm.logger.Error().
Str("component", comp).
Str("device_type", deviceType).
Err(err).
Msg("recovery callback failed")
} else {
atomic.AddInt64(&metrics.SuccessfulRecoveries, 1)
dhm.logger.Info().
Str("component", comp).
Str("device_type", deviceType).
Msg("recovery callback succeeded")
}
}(component, callback)
}
}
}
// Global device health monitor instance
var (
globalDeviceHealthMonitor *DeviceHealthMonitor
deviceHealthOnce sync.Once
)
// GetDeviceHealthMonitor returns the global device health monitor
func GetDeviceHealthMonitor() *DeviceHealthMonitor {
deviceHealthOnce.Do(func() {
globalDeviceHealthMonitor = NewDeviceHealthMonitor()
})
return globalDeviceHealthMonitor
}
// Helper functions for min/max
func min(a, b float64) float64 {
if a < b {
return a
}
return b
}
func max(a, b float64) float64 {
if a > b {
return a
}
return b
}

View File

@ -1,263 +0,0 @@
package audio
import (
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// LatencyPercentiles holds calculated percentile values
type LatencyPercentiles struct {
P50 time.Duration `json:"p50"`
P95 time.Duration `json:"p95"`
P99 time.Duration `json:"p99"`
Min time.Duration `json:"min"`
Max time.Duration `json:"max"`
Avg time.Duration `json:"avg"`
}
// BufferPoolEfficiencyMetrics tracks detailed buffer pool performance
type BufferPoolEfficiencyMetrics struct {
// Pool utilization metrics
HitRate float64 `json:"hit_rate"`
MissRate float64 `json:"miss_rate"`
UtilizationRate float64 `json:"utilization_rate"`
FragmentationRate float64 `json:"fragmentation_rate"`
// Memory efficiency metrics
MemoryEfficiency float64 `json:"memory_efficiency"`
AllocationOverhead float64 `json:"allocation_overhead"`
ReuseEffectiveness float64 `json:"reuse_effectiveness"`
// Performance metrics
AverageGetLatency time.Duration `json:"average_get_latency"`
AveragePutLatency time.Duration `json:"average_put_latency"`
Throughput float64 `json:"throughput"` // Operations per second
}
// GranularMetricsCollector aggregates all granular metrics
type GranularMetricsCollector struct {
// Buffer pool efficiency tracking
framePoolMetrics *BufferPoolEfficiencyTracker
controlPoolMetrics *BufferPoolEfficiencyTracker
zeroCopyMetrics *BufferPoolEfficiencyTracker
mutex sync.RWMutex
logger zerolog.Logger
}
// BufferPoolEfficiencyTracker tracks detailed efficiency metrics for a buffer pool
type BufferPoolEfficiencyTracker struct {
// Atomic counters
getOperations int64 // Total get operations (atomic)
putOperations int64 // Total put operations (atomic)
getLatencySum int64 // Sum of get latencies in nanoseconds (atomic)
putLatencySum int64 // Sum of put latencies in nanoseconds (atomic)
allocationBytes int64 // Total bytes allocated (atomic)
reuseCount int64 // Number of successful reuses (atomic)
// Recent operation times for throughput calculation
recentOps []time.Time
opsMutex sync.RWMutex
poolName string
logger zerolog.Logger
}
// NewBufferPoolEfficiencyTracker creates a new efficiency tracker
func NewBufferPoolEfficiencyTracker(poolName string, logger zerolog.Logger) *BufferPoolEfficiencyTracker {
return &BufferPoolEfficiencyTracker{
recentOps: make([]time.Time, 0, 1000), // Track last 1000 operations
poolName: poolName,
logger: logger,
}
}
// RecordGetOperation records a buffer get operation with its latency
func (bpet *BufferPoolEfficiencyTracker) RecordGetOperation(latency time.Duration, wasHit bool) {
atomic.AddInt64(&bpet.getOperations, 1)
atomic.AddInt64(&bpet.getLatencySum, latency.Nanoseconds())
if wasHit {
atomic.AddInt64(&bpet.reuseCount, 1)
}
// Record operation time for throughput calculation
bpet.opsMutex.Lock()
now := time.Now()
if len(bpet.recentOps) >= 1000 {
bpet.recentOps = bpet.recentOps[1:]
}
bpet.recentOps = append(bpet.recentOps, now)
bpet.opsMutex.Unlock()
}
// RecordPutOperation records a buffer put operation with its latency
func (bpet *BufferPoolEfficiencyTracker) RecordPutOperation(latency time.Duration, bufferSize int) {
atomic.AddInt64(&bpet.putOperations, 1)
atomic.AddInt64(&bpet.putLatencySum, latency.Nanoseconds())
atomic.AddInt64(&bpet.allocationBytes, int64(bufferSize))
}
// GetEfficiencyMetrics calculates current efficiency metrics
func (bpet *BufferPoolEfficiencyTracker) GetEfficiencyMetrics() BufferPoolEfficiencyMetrics {
getOps := atomic.LoadInt64(&bpet.getOperations)
putOps := atomic.LoadInt64(&bpet.putOperations)
reuseCount := atomic.LoadInt64(&bpet.reuseCount)
getLatencySum := atomic.LoadInt64(&bpet.getLatencySum)
putLatencySum := atomic.LoadInt64(&bpet.putLatencySum)
allocationBytes := atomic.LoadInt64(&bpet.allocationBytes)
var hitRate, missRate, avgGetLatency, avgPutLatency float64
var throughput float64
if getOps > 0 {
hitRate = float64(reuseCount) / float64(getOps) * 100
missRate = 100 - hitRate
avgGetLatency = float64(getLatencySum) / float64(getOps)
}
if putOps > 0 {
avgPutLatency = float64(putLatencySum) / float64(putOps)
}
// Calculate throughput from recent operations
bpet.opsMutex.RLock()
if len(bpet.recentOps) > 1 {
timeSpan := bpet.recentOps[len(bpet.recentOps)-1].Sub(bpet.recentOps[0])
if timeSpan > 0 {
throughput = float64(len(bpet.recentOps)) / timeSpan.Seconds()
}
}
bpet.opsMutex.RUnlock()
// Calculate efficiency metrics
utilizationRate := hitRate // Simplified: hit rate as utilization
memoryEfficiency := hitRate // Simplified: reuse rate as memory efficiency
reuseEffectiveness := hitRate
// Calculate fragmentation (simplified as inverse of hit rate)
fragmentationRate := missRate
// Calculate allocation overhead (simplified)
allocationOverhead := float64(0)
if getOps > 0 && allocationBytes > 0 {
allocationOverhead = float64(allocationBytes) / float64(getOps)
}
return BufferPoolEfficiencyMetrics{
HitRate: hitRate,
MissRate: missRate,
UtilizationRate: utilizationRate,
FragmentationRate: fragmentationRate,
MemoryEfficiency: memoryEfficiency,
AllocationOverhead: allocationOverhead,
ReuseEffectiveness: reuseEffectiveness,
AverageGetLatency: time.Duration(avgGetLatency),
AveragePutLatency: time.Duration(avgPutLatency),
Throughput: throughput,
}
}
// NewGranularMetricsCollector creates a new granular metrics collector
func NewGranularMetricsCollector(logger zerolog.Logger) *GranularMetricsCollector {
return &GranularMetricsCollector{
framePoolMetrics: NewBufferPoolEfficiencyTracker("frame_pool", logger.With().Str("pool", "frame").Logger()),
controlPoolMetrics: NewBufferPoolEfficiencyTracker("control_pool", logger.With().Str("pool", "control").Logger()),
zeroCopyMetrics: NewBufferPoolEfficiencyTracker("zero_copy_pool", logger.With().Str("pool", "zero_copy").Logger()),
logger: logger,
}
}
// RecordFramePoolOperation records frame pool operations
func (gmc *GranularMetricsCollector) RecordFramePoolGet(latency time.Duration, wasHit bool) {
gmc.framePoolMetrics.RecordGetOperation(latency, wasHit)
}
func (gmc *GranularMetricsCollector) RecordFramePoolPut(latency time.Duration, bufferSize int) {
gmc.framePoolMetrics.RecordPutOperation(latency, bufferSize)
}
// RecordControlPoolOperation records control pool operations
func (gmc *GranularMetricsCollector) RecordControlPoolGet(latency time.Duration, wasHit bool) {
gmc.controlPoolMetrics.RecordGetOperation(latency, wasHit)
}
func (gmc *GranularMetricsCollector) RecordControlPoolPut(latency time.Duration, bufferSize int) {
gmc.controlPoolMetrics.RecordPutOperation(latency, bufferSize)
}
// RecordZeroCopyOperation records zero-copy pool operations
func (gmc *GranularMetricsCollector) RecordZeroCopyGet(latency time.Duration, wasHit bool) {
gmc.zeroCopyMetrics.RecordGetOperation(latency, wasHit)
}
func (gmc *GranularMetricsCollector) RecordZeroCopyPut(latency time.Duration, bufferSize int) {
gmc.zeroCopyMetrics.RecordPutOperation(latency, bufferSize)
}
// GetBufferPoolEfficiency returns efficiency metrics for all buffer pools
func (gmc *GranularMetricsCollector) GetBufferPoolEfficiency() map[string]BufferPoolEfficiencyMetrics {
gmc.mutex.RLock()
defer gmc.mutex.RUnlock()
return map[string]BufferPoolEfficiencyMetrics{
"frame_pool": gmc.framePoolMetrics.GetEfficiencyMetrics(),
"control_pool": gmc.controlPoolMetrics.GetEfficiencyMetrics(),
"zero_copy_pool": gmc.zeroCopyMetrics.GetEfficiencyMetrics(),
}
}
// LogGranularMetrics logs comprehensive granular metrics
func (gmc *GranularMetricsCollector) LogGranularMetrics() {
bufferEfficiency := gmc.GetBufferPoolEfficiency()
// Log buffer pool efficiency
for poolName, efficiency := range bufferEfficiency {
gmc.logger.Info().
Str("pool", poolName).
Float64("hit_rate", efficiency.HitRate).
Float64("miss_rate", efficiency.MissRate).
Float64("utilization_rate", efficiency.UtilizationRate).
Float64("memory_efficiency", efficiency.MemoryEfficiency).
Dur("avg_get_latency", efficiency.AverageGetLatency).
Dur("avg_put_latency", efficiency.AveragePutLatency).
Float64("throughput", efficiency.Throughput).
Msg("Buffer pool efficiency metrics")
}
}
// Global granular metrics collector instance
var (
granularMetricsCollector *GranularMetricsCollector
granularMetricsOnce sync.Once
)
// GetGranularMetricsCollector returns the global granular metrics collector
func GetGranularMetricsCollector() *GranularMetricsCollector {
granularMetricsOnce.Do(func() {
logger := logging.GetDefaultLogger().With().Str("component", "granular-metrics").Logger()
granularMetricsCollector = NewGranularMetricsCollector(logger)
})
return granularMetricsCollector
}
// StartGranularMetricsLogging starts periodic granular metrics logging
func StartGranularMetricsLogging(interval time.Duration) {
collector := GetGranularMetricsCollector()
logger := collector.logger
logger.Info().Dur("interval", interval).Msg("Starting granular metrics logging")
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
collector.LogGranularMetrics()
}
}()
}

View File

@ -1,545 +0,0 @@
package audio
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// LatencyProfiler provides comprehensive end-to-end audio latency profiling
// with nanosecond precision across the entire WebRTC->IPC->CGO->ALSA pipeline
type LatencyProfiler struct {
// Atomic counters for thread-safe access (MUST be first for ARM32 alignment)
totalMeasurements int64 // Total number of measurements taken
webrtcLatencySum int64 // Sum of WebRTC processing latencies (nanoseconds)
ipcLatencySum int64 // Sum of IPC communication latencies (nanoseconds)
cgoLatencySum int64 // Sum of CGO call latencies (nanoseconds)
alsaLatencySum int64 // Sum of ALSA device latencies (nanoseconds)
endToEndLatencySum int64 // Sum of complete end-to-end latencies (nanoseconds)
validationLatencySum int64 // Sum of validation overhead (nanoseconds)
serializationLatencySum int64 // Sum of serialization overhead (nanoseconds)
// Peak latency tracking
maxWebrtcLatency int64 // Maximum WebRTC latency observed (nanoseconds)
maxIpcLatency int64 // Maximum IPC latency observed (nanoseconds)
maxCgoLatency int64 // Maximum CGO latency observed (nanoseconds)
maxAlsaLatency int64 // Maximum ALSA latency observed (nanoseconds)
maxEndToEndLatency int64 // Maximum end-to-end latency observed (nanoseconds)
// Configuration and control
config LatencyProfilerConfig
logger zerolog.Logger
ctx context.Context
cancel context.CancelFunc
running int32 // Atomic flag for profiler state
enabled int32 // Atomic flag for measurement collection
// Detailed measurement storage
measurements []DetailedLatencyMeasurement
measurementMutex sync.RWMutex
measurementIndex int
// High-resolution timing
timeSource func() int64 // Nanosecond precision time source
}
// LatencyProfilerConfig defines profiler configuration
type LatencyProfilerConfig struct {
MaxMeasurements int // Maximum measurements to store in memory
SamplingRate float64 // Sampling rate (0.0-1.0, 1.0 = profile every frame)
ReportingInterval time.Duration // How often to log profiling reports
ThresholdWarning time.Duration // Latency threshold for warnings
ThresholdCritical time.Duration // Latency threshold for critical alerts
EnableDetailedTrace bool // Enable detailed per-component tracing
EnableHistogram bool // Enable latency histogram collection
}
// DetailedLatencyMeasurement captures comprehensive latency breakdown
type DetailedLatencyMeasurement struct {
Timestamp time.Time // When the measurement was taken
FrameID uint64 // Unique frame identifier for tracing
WebRTCLatency time.Duration // WebRTC processing time
IPCLatency time.Duration // IPC communication time
CGOLatency time.Duration // CGO call overhead
ALSALatency time.Duration // ALSA device processing time
ValidationLatency time.Duration // Frame validation overhead
SerializationLatency time.Duration // Data serialization overhead
EndToEndLatency time.Duration // Complete pipeline latency
Source string // Source component (input/output)
FrameSize int // Size of the audio frame in bytes
CPUUsage float64 // CPU usage at time of measurement
MemoryUsage uint64 // Memory usage at time of measurement
}
// LatencyProfileReport contains aggregated profiling results
type LatencyProfileReport struct {
TotalMeasurements int64 // Total measurements taken
TimeRange time.Duration // Time span of measurements
// Average latencies
AvgWebRTCLatency time.Duration
AvgIPCLatency time.Duration
AvgCGOLatency time.Duration
AvgALSALatency time.Duration
AvgEndToEndLatency time.Duration
AvgValidationLatency time.Duration
AvgSerializationLatency time.Duration
// Peak latencies
MaxWebRTCLatency time.Duration
MaxIPCLatency time.Duration
MaxCGOLatency time.Duration
MaxALSALatency time.Duration
MaxEndToEndLatency time.Duration
// Performance analysis
BottleneckComponent string // Component with highest average latency
LatencyDistribution map[string]int // Histogram of latency ranges
Throughput float64 // Frames per second processed
}
// FrameLatencyTracker tracks latency for a single audio frame through the pipeline
type FrameLatencyTracker struct {
frameID uint64
startTime int64 // Nanosecond timestamp
webrtcStartTime int64
ipcStartTime int64
cgoStartTime int64
alsaStartTime int64
validationStartTime int64
serializationStartTime int64
frameSize int
source string
}
// Global profiler instance
var (
globalLatencyProfiler unsafe.Pointer // *LatencyProfiler
profilerInitialized int32
)
// DefaultLatencyProfilerConfig returns default profiler configuration
func DefaultLatencyProfilerConfig() LatencyProfilerConfig {
return LatencyProfilerConfig{
MaxMeasurements: 10000,
SamplingRate: 0.01, // Fixed sampling rate (1%)
ReportingInterval: 30 * time.Second,
ThresholdWarning: 50 * time.Millisecond,
ThresholdCritical: 100 * time.Millisecond,
EnableDetailedTrace: false, // Disabled by default for performance
EnableHistogram: false, // Latency profiling disabled
}
}
// NewLatencyProfiler creates a new latency profiler
func NewLatencyProfiler(config LatencyProfilerConfig) *LatencyProfiler {
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "latency-profiler").Logger()
// Validate configuration
if config.MaxMeasurements <= 0 {
config.MaxMeasurements = 10000
}
if config.SamplingRate < 0.0 || config.SamplingRate > 1.0 {
config.SamplingRate = 0.1
}
if config.ReportingInterval <= 0 {
config.ReportingInterval = 30 * time.Second
}
profiler := &LatencyProfiler{
config: config,
logger: logger,
ctx: ctx,
cancel: cancel,
measurements: make([]DetailedLatencyMeasurement, config.MaxMeasurements),
timeSource: func() int64 { return time.Now().UnixNano() },
}
// Initialize peak latencies to zero
atomic.StoreInt64(&profiler.maxWebrtcLatency, 0)
atomic.StoreInt64(&profiler.maxIpcLatency, 0)
atomic.StoreInt64(&profiler.maxCgoLatency, 0)
atomic.StoreInt64(&profiler.maxAlsaLatency, 0)
atomic.StoreInt64(&profiler.maxEndToEndLatency, 0)
return profiler
}
// Start begins latency profiling
func (lp *LatencyProfiler) Start() error {
if !atomic.CompareAndSwapInt32(&lp.running, 0, 1) {
return fmt.Errorf("latency profiler already running")
}
// Enable measurement collection
atomic.StoreInt32(&lp.enabled, 1)
// Start reporting goroutine
go lp.reportingLoop()
lp.logger.Info().Float64("sampling_rate", lp.config.SamplingRate).Msg("latency profiler started")
return nil
}
// Stop stops latency profiling
func (lp *LatencyProfiler) Stop() {
if !atomic.CompareAndSwapInt32(&lp.running, 1, 0) {
return
}
// Disable measurement collection
atomic.StoreInt32(&lp.enabled, 0)
// Cancel context to stop reporting
lp.cancel()
lp.logger.Info().Msg("latency profiler stopped")
}
// IsEnabled returns whether profiling is currently enabled
func (lp *LatencyProfiler) IsEnabled() bool {
return atomic.LoadInt32(&lp.enabled) == 1
}
// StartFrameTracking begins tracking latency for a new audio frame
func (lp *LatencyProfiler) StartFrameTracking(frameID uint64, frameSize int, source string) *FrameLatencyTracker {
if !lp.IsEnabled() {
return nil
}
// Apply sampling rate to reduce profiling overhead
if lp.config.SamplingRate < 1.0 {
// Simple sampling based on frame ID
if float64(frameID%100)/100.0 > lp.config.SamplingRate {
return nil
}
}
now := lp.timeSource()
return &FrameLatencyTracker{
frameID: frameID,
startTime: now,
frameSize: frameSize,
source: source,
}
}
// TrackWebRTCStart marks the start of WebRTC processing
func (tracker *FrameLatencyTracker) TrackWebRTCStart() {
if tracker != nil {
tracker.webrtcStartTime = time.Now().UnixNano()
}
}
// TrackIPCStart marks the start of IPC communication
func (tracker *FrameLatencyTracker) TrackIPCStart() {
if tracker != nil {
tracker.ipcStartTime = time.Now().UnixNano()
}
}
// TrackCGOStart marks the start of CGO processing
func (tracker *FrameLatencyTracker) TrackCGOStart() {
if tracker != nil {
tracker.cgoStartTime = time.Now().UnixNano()
}
}
// TrackALSAStart marks the start of ALSA device processing
func (tracker *FrameLatencyTracker) TrackALSAStart() {
if tracker != nil {
tracker.alsaStartTime = time.Now().UnixNano()
}
}
// TrackValidationStart marks the start of frame validation
func (tracker *FrameLatencyTracker) TrackValidationStart() {
if tracker != nil {
tracker.validationStartTime = time.Now().UnixNano()
}
}
// TrackSerializationStart marks the start of data serialization
func (tracker *FrameLatencyTracker) TrackSerializationStart() {
if tracker != nil {
tracker.serializationStartTime = time.Now().UnixNano()
}
}
// FinishTracking completes frame tracking and records the measurement
func (lp *LatencyProfiler) FinishTracking(tracker *FrameLatencyTracker) {
if tracker == nil || !lp.IsEnabled() {
return
}
endTime := lp.timeSource()
// Calculate component latencies
var webrtcLatency, ipcLatency, cgoLatency, alsaLatency, validationLatency, serializationLatency time.Duration
if tracker.webrtcStartTime > 0 {
webrtcLatency = time.Duration(tracker.ipcStartTime - tracker.webrtcStartTime)
}
if tracker.ipcStartTime > 0 {
ipcLatency = time.Duration(tracker.cgoStartTime - tracker.ipcStartTime)
}
if tracker.cgoStartTime > 0 {
cgoLatency = time.Duration(tracker.alsaStartTime - tracker.cgoStartTime)
}
if tracker.alsaStartTime > 0 {
alsaLatency = time.Duration(endTime - tracker.alsaStartTime)
}
if tracker.validationStartTime > 0 {
validationLatency = time.Duration(tracker.ipcStartTime - tracker.validationStartTime)
}
if tracker.serializationStartTime > 0 {
serializationLatency = time.Duration(tracker.cgoStartTime - tracker.serializationStartTime)
}
endToEndLatency := time.Duration(endTime - tracker.startTime)
// Update atomic counters
atomic.AddInt64(&lp.totalMeasurements, 1)
atomic.AddInt64(&lp.webrtcLatencySum, webrtcLatency.Nanoseconds())
atomic.AddInt64(&lp.ipcLatencySum, ipcLatency.Nanoseconds())
atomic.AddInt64(&lp.cgoLatencySum, cgoLatency.Nanoseconds())
atomic.AddInt64(&lp.alsaLatencySum, alsaLatency.Nanoseconds())
atomic.AddInt64(&lp.endToEndLatencySum, endToEndLatency.Nanoseconds())
atomic.AddInt64(&lp.validationLatencySum, validationLatency.Nanoseconds())
atomic.AddInt64(&lp.serializationLatencySum, serializationLatency.Nanoseconds())
// Update peak latencies
lp.updatePeakLatency(&lp.maxWebrtcLatency, webrtcLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxIpcLatency, ipcLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxCgoLatency, cgoLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxAlsaLatency, alsaLatency.Nanoseconds())
lp.updatePeakLatency(&lp.maxEndToEndLatency, endToEndLatency.Nanoseconds())
// Store detailed measurement if enabled
if lp.config.EnableDetailedTrace {
lp.storeMeasurement(DetailedLatencyMeasurement{
Timestamp: time.Now(),
FrameID: tracker.frameID,
WebRTCLatency: webrtcLatency,
IPCLatency: ipcLatency,
CGOLatency: cgoLatency,
ALSALatency: alsaLatency,
ValidationLatency: validationLatency,
SerializationLatency: serializationLatency,
EndToEndLatency: endToEndLatency,
Source: tracker.source,
FrameSize: tracker.frameSize,
CPUUsage: lp.getCurrentCPUUsage(),
MemoryUsage: lp.getCurrentMemoryUsage(),
})
}
// Check for threshold violations
if endToEndLatency > lp.config.ThresholdCritical {
lp.logger.Error().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID).
Str("source", tracker.source).Msg("critical latency threshold exceeded")
} else if endToEndLatency > lp.config.ThresholdWarning {
lp.logger.Warn().Dur("latency", endToEndLatency).Uint64("frame_id", tracker.frameID).
Str("source", tracker.source).Msg("warning latency threshold exceeded")
}
}
// updatePeakLatency atomically updates peak latency if new value is higher
func (lp *LatencyProfiler) updatePeakLatency(peakPtr *int64, newLatency int64) {
for {
current := atomic.LoadInt64(peakPtr)
if newLatency <= current || atomic.CompareAndSwapInt64(peakPtr, current, newLatency) {
break
}
}
}
// storeMeasurement stores a detailed measurement in the circular buffer
func (lp *LatencyProfiler) storeMeasurement(measurement DetailedLatencyMeasurement) {
lp.measurementMutex.Lock()
defer lp.measurementMutex.Unlock()
lp.measurements[lp.measurementIndex] = measurement
lp.measurementIndex = (lp.measurementIndex + 1) % len(lp.measurements)
}
// GetReport generates a comprehensive latency profiling report
func (lp *LatencyProfiler) GetReport() LatencyProfileReport {
totalMeasurements := atomic.LoadInt64(&lp.totalMeasurements)
if totalMeasurements == 0 {
return LatencyProfileReport{}
}
// Calculate averages
avgWebRTC := time.Duration(atomic.LoadInt64(&lp.webrtcLatencySum) / totalMeasurements)
avgIPC := time.Duration(atomic.LoadInt64(&lp.ipcLatencySum) / totalMeasurements)
avgCGO := time.Duration(atomic.LoadInt64(&lp.cgoLatencySum) / totalMeasurements)
avgALSA := time.Duration(atomic.LoadInt64(&lp.alsaLatencySum) / totalMeasurements)
avgEndToEnd := time.Duration(atomic.LoadInt64(&lp.endToEndLatencySum) / totalMeasurements)
avgValidation := time.Duration(atomic.LoadInt64(&lp.validationLatencySum) / totalMeasurements)
avgSerialization := time.Duration(atomic.LoadInt64(&lp.serializationLatencySum) / totalMeasurements)
// Get peak latencies
maxWebRTC := time.Duration(atomic.LoadInt64(&lp.maxWebrtcLatency))
maxIPC := time.Duration(atomic.LoadInt64(&lp.maxIpcLatency))
maxCGO := time.Duration(atomic.LoadInt64(&lp.maxCgoLatency))
maxALSA := time.Duration(atomic.LoadInt64(&lp.maxAlsaLatency))
maxEndToEnd := time.Duration(atomic.LoadInt64(&lp.maxEndToEndLatency))
// Determine bottleneck component
bottleneck := "WebRTC"
maxAvg := avgWebRTC
if avgIPC > maxAvg {
bottleneck = "IPC"
maxAvg = avgIPC
}
if avgCGO > maxAvg {
bottleneck = "CGO"
maxAvg = avgCGO
}
if avgALSA > maxAvg {
bottleneck = "ALSA"
}
return LatencyProfileReport{
TotalMeasurements: totalMeasurements,
AvgWebRTCLatency: avgWebRTC,
AvgIPCLatency: avgIPC,
AvgCGOLatency: avgCGO,
AvgALSALatency: avgALSA,
AvgEndToEndLatency: avgEndToEnd,
AvgValidationLatency: avgValidation,
AvgSerializationLatency: avgSerialization,
MaxWebRTCLatency: maxWebRTC,
MaxIPCLatency: maxIPC,
MaxCGOLatency: maxCGO,
MaxALSALatency: maxALSA,
MaxEndToEndLatency: maxEndToEnd,
BottleneckComponent: bottleneck,
}
}
// reportingLoop periodically logs profiling reports
func (lp *LatencyProfiler) reportingLoop() {
ticker := time.NewTicker(lp.config.ReportingInterval)
defer ticker.Stop()
for {
select {
case <-lp.ctx.Done():
return
case <-ticker.C:
report := lp.GetReport()
if report.TotalMeasurements > 0 {
lp.logReport(report)
}
}
}
}
// logReport logs a comprehensive profiling report
func (lp *LatencyProfiler) logReport(report LatencyProfileReport) {
lp.logger.Info().
Int64("total_measurements", report.TotalMeasurements).
Dur("avg_webrtc_latency", report.AvgWebRTCLatency).
Dur("avg_ipc_latency", report.AvgIPCLatency).
Dur("avg_cgo_latency", report.AvgCGOLatency).
Dur("avg_alsa_latency", report.AvgALSALatency).
Dur("avg_end_to_end_latency", report.AvgEndToEndLatency).
Dur("avg_validation_latency", report.AvgValidationLatency).
Dur("avg_serialization_latency", report.AvgSerializationLatency).
Dur("max_webrtc_latency", report.MaxWebRTCLatency).
Dur("max_ipc_latency", report.MaxIPCLatency).
Dur("max_cgo_latency", report.MaxCGOLatency).
Dur("max_alsa_latency", report.MaxALSALatency).
Dur("max_end_to_end_latency", report.MaxEndToEndLatency).
Str("bottleneck_component", report.BottleneckComponent).
Msg("latency profiling report")
}
// getCurrentCPUUsage returns current CPU usage percentage
func (lp *LatencyProfiler) getCurrentCPUUsage() float64 {
// Simplified CPU usage - in production, this would use more sophisticated monitoring
var m runtime.MemStats
runtime.ReadMemStats(&m)
return float64(runtime.NumGoroutine()) / 100.0 // Rough approximation
}
// getCurrentMemoryUsage returns current memory usage in bytes
func (lp *LatencyProfiler) getCurrentMemoryUsage() uint64 {
var m runtime.MemStats
runtime.ReadMemStats(&m)
return m.Alloc
}
// GetGlobalLatencyProfiler returns the global latency profiler instance
func GetGlobalLatencyProfiler() *LatencyProfiler {
ptr := atomic.LoadPointer(&globalLatencyProfiler)
if ptr != nil {
return (*LatencyProfiler)(ptr)
}
// Initialize on first use
if atomic.CompareAndSwapInt32(&profilerInitialized, 0, 1) {
config := DefaultLatencyProfilerConfig()
profiler := NewLatencyProfiler(config)
atomic.StorePointer(&globalLatencyProfiler, unsafe.Pointer(profiler))
return profiler
}
// Another goroutine initialized it, try again
ptr = atomic.LoadPointer(&globalLatencyProfiler)
if ptr != nil {
return (*LatencyProfiler)(ptr)
}
// Fallback: create a new profiler
config := DefaultLatencyProfilerConfig()
return NewLatencyProfiler(config)
}
// EnableLatencyProfiling enables the global latency profiler
func EnableLatencyProfiling() error {
// Latency profiling disabled
if true {
return fmt.Errorf("latency profiling is disabled in configuration")
}
profiler := GetGlobalLatencyProfiler()
return profiler.Start()
}
// DisableLatencyProfiling disables the global latency profiler
func DisableLatencyProfiling() {
ptr := atomic.LoadPointer(&globalLatencyProfiler)
if ptr != nil {
profiler := (*LatencyProfiler)(ptr)
profiler.Stop()
}
}
// ProfileFrameLatency is a convenience function to profile a single frame's latency
func ProfileFrameLatency(frameID uint64, frameSize int, source string, fn func(*FrameLatencyTracker)) {
// Latency profiling disabled
if true {
fn(nil)
return
}
profiler := GetGlobalLatencyProfiler()
if !profiler.IsEnabled() {
fn(nil)
return
}
tracker := profiler.StartFrameTracking(frameID, frameSize, source)
defer profiler.FinishTracking(tracker)
fn(tracker)
}

View File

@ -1,201 +0,0 @@
package audio
import (
"encoding/json"
"net/http"
"runtime"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// MemoryMetrics provides comprehensive memory allocation statistics
type MemoryMetrics struct {
// Runtime memory statistics
RuntimeStats RuntimeMemoryStats `json:"runtime_stats"`
// Audio buffer pool statistics
BufferPools AudioBufferPoolStats `json:"buffer_pools"`
// Zero-copy frame pool statistics
ZeroCopyPool ZeroCopyFramePoolStats `json:"zero_copy_pool"`
// Message pool statistics
MessagePool MessagePoolStats `json:"message_pool"`
// Batch processor statistics
BatchProcessor BatchProcessorMemoryStats `json:"batch_processor,omitempty"`
// Collection timestamp
Timestamp time.Time `json:"timestamp"`
}
// RuntimeMemoryStats provides Go runtime memory statistics
type RuntimeMemoryStats struct {
Alloc uint64 `json:"alloc"` // Bytes allocated and not yet freed
TotalAlloc uint64 `json:"total_alloc"` // Total bytes allocated (cumulative)
Sys uint64 `json:"sys"` // Total bytes obtained from OS
Lookups uint64 `json:"lookups"` // Number of pointer lookups
Mallocs uint64 `json:"mallocs"` // Number of mallocs
Frees uint64 `json:"frees"` // Number of frees
HeapAlloc uint64 `json:"heap_alloc"` // Bytes allocated and not yet freed (heap)
HeapSys uint64 `json:"heap_sys"` // Bytes obtained from OS for heap
HeapIdle uint64 `json:"heap_idle"` // Bytes in idle spans
HeapInuse uint64 `json:"heap_inuse"` // Bytes in non-idle spans
HeapReleased uint64 `json:"heap_released"` // Bytes released to OS
HeapObjects uint64 `json:"heap_objects"` // Total number of allocated objects
StackInuse uint64 `json:"stack_inuse"` // Bytes used by stack spans
StackSys uint64 `json:"stack_sys"` // Bytes obtained from OS for stack
MSpanInuse uint64 `json:"mspan_inuse"` // Bytes used by mspan structures
MSpanSys uint64 `json:"mspan_sys"` // Bytes obtained from OS for mspan
MCacheInuse uint64 `json:"mcache_inuse"` // Bytes used by mcache structures
MCacheSys uint64 `json:"mcache_sys"` // Bytes obtained from OS for mcache
BuckHashSys uint64 `json:"buck_hash_sys"` // Bytes used by profiling bucket hash table
GCSys uint64 `json:"gc_sys"` // Bytes used for garbage collection metadata
OtherSys uint64 `json:"other_sys"` // Bytes used for other system allocations
NextGC uint64 `json:"next_gc"` // Target heap size for next GC
LastGC uint64 `json:"last_gc"` // Time of last GC (nanoseconds since epoch)
PauseTotalNs uint64 `json:"pause_total_ns"` // Total GC pause time
NumGC uint32 `json:"num_gc"` // Number of completed GC cycles
NumForcedGC uint32 `json:"num_forced_gc"` // Number of forced GC cycles
GCCPUFraction float64 `json:"gc_cpu_fraction"` // Fraction of CPU time used by GC
}
// BatchProcessorMemoryStats provides batch processor memory statistics
type BatchProcessorMemoryStats struct {
Initialized bool `json:"initialized"`
Running bool `json:"running"`
Stats BatchAudioStats `json:"stats"`
BufferPool AudioBufferPoolDetailedStats `json:"buffer_pool,omitempty"`
}
// GetBatchAudioProcessor is defined in batch_audio.go
// BatchAudioStats is defined in batch_audio.go
var memoryMetricsLogger *zerolog.Logger
func getMemoryMetricsLogger() *zerolog.Logger {
if memoryMetricsLogger == nil {
logger := logging.GetDefaultLogger().With().Str("component", "memory-metrics").Logger()
memoryMetricsLogger = &logger
}
return memoryMetricsLogger
}
// CollectMemoryMetrics gathers comprehensive memory allocation statistics
func CollectMemoryMetrics() MemoryMetrics {
// Collect runtime memory statistics
var m runtime.MemStats
runtime.ReadMemStats(&m)
runtimeStats := RuntimeMemoryStats{
Alloc: m.Alloc,
TotalAlloc: m.TotalAlloc,
Sys: m.Sys,
Lookups: m.Lookups,
Mallocs: m.Mallocs,
Frees: m.Frees,
HeapAlloc: m.HeapAlloc,
HeapSys: m.HeapSys,
HeapIdle: m.HeapIdle,
HeapInuse: m.HeapInuse,
HeapReleased: m.HeapReleased,
HeapObjects: m.HeapObjects,
StackInuse: m.StackInuse,
StackSys: m.StackSys,
MSpanInuse: m.MSpanInuse,
MSpanSys: m.MSpanSys,
MCacheInuse: m.MCacheInuse,
MCacheSys: m.MCacheSys,
BuckHashSys: m.BuckHashSys,
GCSys: m.GCSys,
OtherSys: m.OtherSys,
NextGC: m.NextGC,
LastGC: m.LastGC,
PauseTotalNs: m.PauseTotalNs,
NumGC: m.NumGC,
NumForcedGC: m.NumForcedGC,
GCCPUFraction: m.GCCPUFraction,
}
// Collect audio buffer pool statistics
bufferPoolStats := GetAudioBufferPoolStats()
// Collect zero-copy frame pool statistics
zeroCopyStats := GetGlobalZeroCopyPoolStats()
// Collect message pool statistics
messagePoolStats := GetGlobalMessagePoolStats()
// Collect batch processor statistics if available
var batchStats BatchProcessorMemoryStats
if processor := GetBatchAudioProcessor(); processor != nil {
batchStats.Initialized = true
batchStats.Running = processor.IsRunning()
batchStats.Stats = processor.GetStats()
// Note: BatchAudioProcessor uses sync.Pool, detailed stats not available
}
return MemoryMetrics{
RuntimeStats: runtimeStats,
BufferPools: bufferPoolStats,
ZeroCopyPool: zeroCopyStats,
MessagePool: messagePoolStats,
BatchProcessor: batchStats,
Timestamp: time.Now(),
}
}
// HandleMemoryMetrics provides an HTTP handler for memory metrics
func HandleMemoryMetrics(w http.ResponseWriter, r *http.Request) {
logger := getMemoryMetricsLogger()
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
metrics := CollectMemoryMetrics()
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Cache-Control", "no-cache")
encoder := json.NewEncoder(w)
encoder.SetIndent("", " ")
if err := encoder.Encode(metrics); err != nil {
logger.Error().Err(err).Msg("failed to encode memory metrics")
http.Error(w, "Internal server error", http.StatusInternalServerError)
return
}
logger.Debug().Msg("memory metrics served")
}
// LogMemoryMetrics logs current memory metrics for debugging
func LogMemoryMetrics() {
logger := getMemoryMetricsLogger()
metrics := CollectMemoryMetrics()
logger.Info().
Uint64("heap_alloc_mb", metrics.RuntimeStats.HeapAlloc/uint64(GetConfig().BytesToMBDivisor)).
Uint64("heap_sys_mb", metrics.RuntimeStats.HeapSys/uint64(GetConfig().BytesToMBDivisor)).
Uint64("heap_objects", metrics.RuntimeStats.HeapObjects).
Uint32("num_gc", metrics.RuntimeStats.NumGC).
Float64("gc_cpu_fraction", metrics.RuntimeStats.GCCPUFraction).
Float64("buffer_pool_hit_rate", metrics.BufferPools.FramePoolHitRate).
Float64("zero_copy_hit_rate", metrics.ZeroCopyPool.HitRate).
Float64("message_pool_hit_rate", metrics.MessagePool.HitRate).
Msg("memory metrics snapshot")
}
// StartMemoryMetricsLogging starts periodic memory metrics logging
func StartMemoryMetricsLogging(interval time.Duration) {
logger := getMemoryMetricsLogger()
logger.Debug().Dur("interval", interval).Msg("memory metrics logging started")
go func() {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for range ticker.C {
LogMemoryMetrics()
}
}()
}

View File

@ -288,45 +288,7 @@ var (
)
// Device health metrics
deviceHealthStatus = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_device_health_status",
Help: "Current device health status (0=Healthy, 1=Degraded, 2=Failing, 3=Critical)",
},
[]string{"device_type"}, // device_type: capture, playback
)
deviceHealthScore = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_device_health_score",
Help: "Device health score (0.0-1.0, higher is better)",
},
[]string{"device_type"}, // device_type: capture, playback
)
deviceConsecutiveErrors = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_device_consecutive_errors",
Help: "Number of consecutive errors for device",
},
[]string{"device_type"}, // device_type: capture, playback
)
deviceTotalErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_audio_device_total_errors",
Help: "Total number of errors for device",
},
[]string{"device_type"}, // device_type: capture, playback
)
deviceLatencySpikes = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_audio_device_latency_spikes_total",
Help: "Total number of latency spikes for device",
},
[]string{"device_type"}, // device_type: capture, playback
)
// Removed device health metrics - functionality not used
// Memory metrics
memoryHeapAllocBytes = promauto.NewGauge(
@ -436,11 +398,7 @@ var (
micBytesProcessedValue int64
micConnectionDropsValue int64
// Atomic counters for device health metrics
deviceCaptureErrorsValue int64
devicePlaybackErrorsValue int64
deviceCaptureSpikesValue int64
devicePlaybackSpikesValue int64
// Atomic counters for device health metrics - functionality removed, no longer used
// Atomic counter for memory GC
memoryGCCountValue uint32
@ -639,34 +597,8 @@ func UpdateSocketBufferMetrics(component, bufferType string, size, utilization f
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateDeviceHealthMetrics updates device health metrics
func UpdateDeviceHealthMetrics(deviceType string, status int, healthScore float64, consecutiveErrors, totalErrors, latencySpikes int64) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
deviceHealthStatus.WithLabelValues(deviceType).Set(float64(status))
deviceHealthScore.WithLabelValues(deviceType).Set(healthScore)
deviceConsecutiveErrors.WithLabelValues(deviceType).Set(float64(consecutiveErrors))
// Update error counters with delta calculation
var prevErrors, prevSpikes int64
if deviceType == "capture" {
prevErrors = atomic.SwapInt64(&deviceCaptureErrorsValue, totalErrors)
prevSpikes = atomic.SwapInt64(&deviceCaptureSpikesValue, latencySpikes)
} else {
prevErrors = atomic.SwapInt64(&devicePlaybackErrorsValue, totalErrors)
prevSpikes = atomic.SwapInt64(&devicePlaybackSpikesValue, latencySpikes)
}
if prevErrors > 0 && totalErrors > prevErrors {
deviceTotalErrors.WithLabelValues(deviceType).Add(float64(totalErrors - prevErrors))
}
if prevSpikes > 0 && latencySpikes > prevSpikes {
deviceLatencySpikes.WithLabelValues(deviceType).Add(float64(latencySpikes - prevSpikes))
}
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateDeviceHealthMetrics - Device health monitoring functionality has been removed
// This function is no longer used as device health monitoring is not implemented
// UpdateMemoryMetrics updates memory metrics
func UpdateMemoryMetrics() {

View File

@ -3,7 +3,6 @@ package audio
import (
"sync"
"sync/atomic"
"time"
"unsafe"
)
@ -188,13 +187,6 @@ func (p *ZeroCopyFramePool) Get() *ZeroCopyAudioFrame {
// Put returns a zero-copy frame to the pool
func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) {
// Metrics collection removed
var startTime time.Time
trackMetrics := false // Metrics disabled
if false {
startTime = time.Now()
}
if frame == nil || !frame.pooled {
return
}
@ -235,11 +227,7 @@ func (p *ZeroCopyFramePool) Put(frame *ZeroCopyAudioFrame) {
frame.mutex.Unlock()
}
// Record metrics only for sampled operations
if trackMetrics {
latency := time.Since(startTime)
GetGranularMetricsCollector().RecordZeroCopyPut(latency, frame.capacity)
}
// Metrics recording removed - granular metrics collector was unused
}
// Data returns the frame data as a slice (zero-copy view)