mirror of https://github.com/jetkvm/kvm.git
[WIP] Updates: Reduce PR complexity
This commit is contained in:
parent
8a189ba1b9
commit
947b4f9528
|
@ -230,63 +230,6 @@ var (
|
|||
},
|
||||
)
|
||||
|
||||
// Audio configuration metrics
|
||||
audioConfigQuality = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_audio_config_quality",
|
||||
Help: "Current audio quality setting (0=Low, 1=Medium, 2=High, 3=Ultra)",
|
||||
},
|
||||
)
|
||||
|
||||
audioConfigBitrate = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_audio_config_bitrate_kbps",
|
||||
Help: "Current audio bitrate in kbps",
|
||||
},
|
||||
)
|
||||
|
||||
audioConfigSampleRate = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_audio_config_sample_rate_hz",
|
||||
Help: "Current audio sample rate in Hz",
|
||||
},
|
||||
)
|
||||
|
||||
audioConfigChannels = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_audio_config_channels",
|
||||
Help: "Current audio channel count",
|
||||
},
|
||||
)
|
||||
|
||||
microphoneConfigQuality = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_microphone_config_quality",
|
||||
Help: "Current microphone quality setting (0=Low, 1=Medium, 2=High, 3=Ultra)",
|
||||
},
|
||||
)
|
||||
|
||||
microphoneConfigBitrate = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_microphone_config_bitrate_kbps",
|
||||
Help: "Current microphone bitrate in kbps",
|
||||
},
|
||||
)
|
||||
|
||||
microphoneConfigSampleRate = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_microphone_config_sample_rate_hz",
|
||||
Help: "Current microphone sample rate in Hz",
|
||||
},
|
||||
)
|
||||
|
||||
microphoneConfigChannels = promauto.NewGauge(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "jetkvm_microphone_config_channels",
|
||||
Help: "Current microphone channel count",
|
||||
},
|
||||
)
|
||||
|
||||
// Device health metrics
|
||||
// Removed device health metrics - functionality not used
|
||||
|
||||
|
@ -539,32 +482,6 @@ func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) {
|
|||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateAudioConfigMetrics updates Prometheus metrics with audio configuration
|
||||
func UpdateAudioConfigMetrics(config AudioConfig) {
|
||||
metricsUpdateMutex.Lock()
|
||||
defer metricsUpdateMutex.Unlock()
|
||||
|
||||
audioConfigQuality.Set(float64(config.Quality))
|
||||
audioConfigBitrate.Set(float64(config.Bitrate))
|
||||
audioConfigSampleRate.Set(float64(config.SampleRate))
|
||||
audioConfigChannels.Set(float64(config.Channels))
|
||||
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateMicrophoneConfigMetrics updates Prometheus metrics with microphone configuration
|
||||
func UpdateMicrophoneConfigMetrics(config AudioConfig) {
|
||||
metricsUpdateMutex.Lock()
|
||||
defer metricsUpdateMutex.Unlock()
|
||||
|
||||
microphoneConfigQuality.Set(float64(config.Quality))
|
||||
microphoneConfigBitrate.Set(float64(config.Bitrate))
|
||||
microphoneConfigSampleRate.Set(float64(config.SampleRate))
|
||||
microphoneConfigChannels.Set(float64(config.Channels))
|
||||
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateAdaptiveBufferMetrics updates Prometheus metrics with adaptive buffer information
|
||||
func UpdateAdaptiveBufferMetrics(inputBufferSize, outputBufferSize int, cpuPercent, memoryPercent float64, adjustmentMade bool) {
|
||||
metricsUpdateMutex.Lock()
|
||||
|
|
|
@ -14,8 +14,6 @@ type MetricsRegistry struct {
|
|||
mu sync.RWMutex
|
||||
audioMetrics AudioMetrics
|
||||
audioInputMetrics AudioInputMetrics
|
||||
audioConfig AudioConfig
|
||||
microphoneConfig AudioConfig
|
||||
lastUpdate int64 // Unix timestamp
|
||||
}
|
||||
|
||||
|
@ -56,28 +54,6 @@ func (mr *MetricsRegistry) UpdateAudioInputMetrics(metrics AudioInputMetrics) {
|
|||
UpdateMicrophoneMetrics(convertAudioInputMetricsToUnified(metrics))
|
||||
}
|
||||
|
||||
// UpdateAudioConfig updates the centralized audio configuration
|
||||
func (mr *MetricsRegistry) UpdateAudioConfig(config AudioConfig) {
|
||||
mr.mu.Lock()
|
||||
mr.audioConfig = config
|
||||
mr.lastUpdate = time.Now().Unix()
|
||||
mr.mu.Unlock()
|
||||
|
||||
// Update Prometheus metrics directly
|
||||
UpdateAudioConfigMetrics(config)
|
||||
}
|
||||
|
||||
// UpdateMicrophoneConfig updates the centralized microphone configuration
|
||||
func (mr *MetricsRegistry) UpdateMicrophoneConfig(config AudioConfig) {
|
||||
mr.mu.Lock()
|
||||
mr.microphoneConfig = config
|
||||
mr.lastUpdate = time.Now().Unix()
|
||||
mr.mu.Unlock()
|
||||
|
||||
// Update Prometheus metrics directly
|
||||
UpdateMicrophoneConfigMetrics(config)
|
||||
}
|
||||
|
||||
// GetAudioMetrics returns the current audio output metrics
|
||||
func (mr *MetricsRegistry) GetAudioMetrics() AudioMetrics {
|
||||
mr.mu.RLock()
|
||||
|
@ -92,20 +68,6 @@ func (mr *MetricsRegistry) GetAudioInputMetrics() AudioInputMetrics {
|
|||
return mr.audioInputMetrics
|
||||
}
|
||||
|
||||
// GetAudioConfig returns the current audio configuration
|
||||
func (mr *MetricsRegistry) GetAudioConfig() AudioConfig {
|
||||
mr.mu.RLock()
|
||||
defer mr.mu.RUnlock()
|
||||
return mr.audioConfig
|
||||
}
|
||||
|
||||
// GetMicrophoneConfig returns the current microphone configuration
|
||||
func (mr *MetricsRegistry) GetMicrophoneConfig() AudioConfig {
|
||||
mr.mu.RLock()
|
||||
defer mr.mu.RUnlock()
|
||||
return mr.microphoneConfig
|
||||
}
|
||||
|
||||
// GetLastUpdate returns the timestamp of the last metrics update
|
||||
func (mr *MetricsRegistry) GetLastUpdate() time.Time {
|
||||
timestamp := atomic.LoadInt64(&mr.lastUpdate)
|
||||
|
@ -132,20 +94,11 @@ func (mr *MetricsRegistry) StartMetricsCollector() {
|
|||
mr.UpdateAudioInputMetrics(metrics)
|
||||
}
|
||||
|
||||
// Collect audio output metrics directly from global metrics variable to avoid circular dependency
|
||||
audioMetrics := AudioMetrics{
|
||||
FramesReceived: atomic.LoadInt64(&metrics.FramesReceived),
|
||||
FramesDropped: atomic.LoadInt64(&metrics.FramesDropped),
|
||||
BytesProcessed: atomic.LoadInt64(&metrics.BytesProcessed),
|
||||
ConnectionDrops: atomic.LoadInt64(&metrics.ConnectionDrops),
|
||||
LastFrameTime: metrics.LastFrameTime,
|
||||
AverageLatency: metrics.AverageLatency,
|
||||
}
|
||||
mr.UpdateAudioMetrics(audioMetrics)
|
||||
|
||||
// Collect configuration directly from global variables to avoid circular dependency
|
||||
mr.UpdateAudioConfig(currentConfig)
|
||||
mr.UpdateMicrophoneConfig(currentMicrophoneConfig)
|
||||
// Collect audio output metrics from global audio output manager
|
||||
// Note: We need to get metrics from the actual audio output system
|
||||
// For now, we'll use the global metrics variable from quality_presets.go
|
||||
globalAudioMetrics := GetGlobalAudioMetrics()
|
||||
mr.UpdateAudioMetrics(globalAudioMetrics)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
|
|
@ -1,333 +0,0 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// LatencyMonitor tracks and optimizes audio latency in real-time
|
||||
type LatencyMonitor struct {
|
||||
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
|
||||
currentLatency int64 // Current latency in nanoseconds (atomic)
|
||||
averageLatency int64 // Rolling average latency in nanoseconds (atomic)
|
||||
minLatency int64 // Minimum observed latency in nanoseconds (atomic)
|
||||
maxLatency int64 // Maximum observed latency in nanoseconds (atomic)
|
||||
latencySamples int64 // Number of latency samples collected (atomic)
|
||||
jitterAccumulator int64 // Accumulated jitter for variance calculation (atomic)
|
||||
lastOptimization int64 // Timestamp of last optimization in nanoseconds (atomic)
|
||||
|
||||
config LatencyConfig
|
||||
logger zerolog.Logger
|
||||
|
||||
// Control channels
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
wg sync.WaitGroup
|
||||
|
||||
// Optimization callbacks
|
||||
optimizationCallbacks []OptimizationCallback
|
||||
mutex sync.RWMutex
|
||||
|
||||
// Performance tracking
|
||||
latencyHistory []LatencyMeasurement
|
||||
historyMutex sync.RWMutex
|
||||
}
|
||||
|
||||
// LatencyConfig holds configuration for latency monitoring
|
||||
type LatencyConfig struct {
|
||||
TargetLatency time.Duration // Target latency to maintain
|
||||
MaxLatency time.Duration // Maximum acceptable latency
|
||||
OptimizationInterval time.Duration // How often to run optimization
|
||||
HistorySize int // Number of latency measurements to keep
|
||||
JitterThreshold time.Duration // Jitter threshold for optimization
|
||||
AdaptiveThreshold float64 // Threshold for adaptive adjustments (0.0-1.0)
|
||||
}
|
||||
|
||||
// LatencyMeasurement represents a single latency measurement
|
||||
type LatencyMeasurement struct {
|
||||
Timestamp time.Time
|
||||
Latency time.Duration
|
||||
Jitter time.Duration
|
||||
Source string // Source of the measurement (e.g., "input", "output", "processing")
|
||||
}
|
||||
|
||||
// OptimizationCallback is called when latency optimization is triggered
|
||||
type OptimizationCallback func(metrics LatencyMetrics) error
|
||||
|
||||
// LatencyMetrics provides comprehensive latency statistics
|
||||
type LatencyMetrics struct {
|
||||
Current time.Duration
|
||||
Average time.Duration
|
||||
Min time.Duration
|
||||
Max time.Duration
|
||||
Jitter time.Duration
|
||||
SampleCount int64
|
||||
Trend LatencyTrend
|
||||
}
|
||||
|
||||
// LatencyTrend indicates the direction of latency changes
|
||||
type LatencyTrend int
|
||||
|
||||
const (
|
||||
LatencyTrendStable LatencyTrend = iota
|
||||
LatencyTrendIncreasing
|
||||
LatencyTrendDecreasing
|
||||
LatencyTrendVolatile
|
||||
)
|
||||
|
||||
// DefaultLatencyConfig returns a sensible default configuration
|
||||
func DefaultLatencyConfig() LatencyConfig {
|
||||
config := GetConfig()
|
||||
return LatencyConfig{
|
||||
TargetLatency: config.LatencyMonitorTarget,
|
||||
MaxLatency: config.MaxLatencyThreshold,
|
||||
OptimizationInterval: config.LatencyOptimizationInterval,
|
||||
HistorySize: config.LatencyHistorySize,
|
||||
JitterThreshold: config.JitterThreshold,
|
||||
AdaptiveThreshold: config.LatencyAdaptiveThreshold,
|
||||
}
|
||||
}
|
||||
|
||||
// NewLatencyMonitor creates a new latency monitoring system
|
||||
func NewLatencyMonitor(config LatencyConfig, logger zerolog.Logger) *LatencyMonitor {
|
||||
// Validate latency configuration
|
||||
if err := ValidateLatencyConfig(config); err != nil {
|
||||
// Log validation error and use default configuration
|
||||
logger.Error().Err(err).Msg("Invalid latency configuration provided, using defaults")
|
||||
config = DefaultLatencyConfig()
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
return &LatencyMonitor{
|
||||
config: config,
|
||||
logger: logger.With().Str("component", "latency-monitor").Logger(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
latencyHistory: make([]LatencyMeasurement, 0, config.HistorySize),
|
||||
minLatency: int64(time.Hour), // Initialize to high value
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins latency monitoring and optimization
|
||||
func (lm *LatencyMonitor) Start() {
|
||||
lm.wg.Add(1)
|
||||
go lm.monitoringLoop()
|
||||
}
|
||||
|
||||
// Stop stops the latency monitor
|
||||
func (lm *LatencyMonitor) Stop() {
|
||||
lm.cancel()
|
||||
lm.wg.Wait()
|
||||
}
|
||||
|
||||
// RecordLatency records a new latency measurement
|
||||
func (lm *LatencyMonitor) RecordLatency(latency time.Duration, source string) {
|
||||
now := time.Now()
|
||||
latencyNanos := latency.Nanoseconds()
|
||||
|
||||
// Update atomic counters
|
||||
atomic.StoreInt64(&lm.currentLatency, latencyNanos)
|
||||
atomic.AddInt64(&lm.latencySamples, 1)
|
||||
|
||||
// Update min/max
|
||||
for {
|
||||
oldMin := atomic.LoadInt64(&lm.minLatency)
|
||||
if latencyNanos >= oldMin || atomic.CompareAndSwapInt64(&lm.minLatency, oldMin, latencyNanos) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
oldMax := atomic.LoadInt64(&lm.maxLatency)
|
||||
if latencyNanos <= oldMax || atomic.CompareAndSwapInt64(&lm.maxLatency, oldMax, latencyNanos) {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Update rolling average using exponential moving average
|
||||
oldAvg := atomic.LoadInt64(&lm.averageLatency)
|
||||
newAvg := oldAvg + (latencyNanos-oldAvg)/10 // Alpha = 0.1
|
||||
atomic.StoreInt64(&lm.averageLatency, newAvg)
|
||||
|
||||
// Calculate jitter (difference from average)
|
||||
jitter := latencyNanos - newAvg
|
||||
if jitter < 0 {
|
||||
jitter = -jitter
|
||||
}
|
||||
atomic.AddInt64(&lm.jitterAccumulator, jitter)
|
||||
|
||||
// Store in history
|
||||
lm.historyMutex.Lock()
|
||||
measurement := LatencyMeasurement{
|
||||
Timestamp: now,
|
||||
Latency: latency,
|
||||
Jitter: time.Duration(jitter),
|
||||
Source: source,
|
||||
}
|
||||
|
||||
if len(lm.latencyHistory) >= lm.config.HistorySize {
|
||||
// Remove oldest measurement
|
||||
copy(lm.latencyHistory, lm.latencyHistory[1:])
|
||||
lm.latencyHistory[len(lm.latencyHistory)-1] = measurement
|
||||
} else {
|
||||
lm.latencyHistory = append(lm.latencyHistory, measurement)
|
||||
}
|
||||
lm.historyMutex.Unlock()
|
||||
}
|
||||
|
||||
// GetMetrics returns current latency metrics
|
||||
func (lm *LatencyMonitor) GetMetrics() LatencyMetrics {
|
||||
current := atomic.LoadInt64(&lm.currentLatency)
|
||||
average := atomic.LoadInt64(&lm.averageLatency)
|
||||
min := atomic.LoadInt64(&lm.minLatency)
|
||||
max := atomic.LoadInt64(&lm.maxLatency)
|
||||
samples := atomic.LoadInt64(&lm.latencySamples)
|
||||
jitterSum := atomic.LoadInt64(&lm.jitterAccumulator)
|
||||
|
||||
var jitter time.Duration
|
||||
if samples > 0 {
|
||||
jitter = time.Duration(jitterSum / samples)
|
||||
}
|
||||
|
||||
return LatencyMetrics{
|
||||
Current: time.Duration(current),
|
||||
Average: time.Duration(average),
|
||||
Min: time.Duration(min),
|
||||
Max: time.Duration(max),
|
||||
Jitter: jitter,
|
||||
SampleCount: samples,
|
||||
Trend: lm.calculateTrend(),
|
||||
}
|
||||
}
|
||||
|
||||
// AddOptimizationCallback adds a callback for latency optimization
|
||||
func (lm *LatencyMonitor) AddOptimizationCallback(callback OptimizationCallback) {
|
||||
lm.mutex.Lock()
|
||||
lm.optimizationCallbacks = append(lm.optimizationCallbacks, callback)
|
||||
lm.mutex.Unlock()
|
||||
}
|
||||
|
||||
// monitoringLoop runs the main monitoring and optimization loop
|
||||
func (lm *LatencyMonitor) monitoringLoop() {
|
||||
defer lm.wg.Done()
|
||||
|
||||
ticker := time.NewTicker(lm.config.OptimizationInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-lm.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
lm.runOptimization()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runOptimization checks if optimization is needed and triggers callbacks with threshold validation.
|
||||
//
|
||||
// Validation Rules:
|
||||
// - Current latency must not exceed MaxLatency (default: 200ms)
|
||||
// - Average latency checked against adaptive threshold: TargetLatency * (1 + AdaptiveThreshold)
|
||||
// - Jitter must not exceed JitterThreshold (default: 20ms)
|
||||
// - All latency values must be non-negative durations
|
||||
//
|
||||
// Optimization Triggers:
|
||||
// - Current latency > MaxLatency: Immediate optimization needed
|
||||
// - Average latency > adaptive threshold: Gradual optimization needed
|
||||
// - Jitter > JitterThreshold: Stability optimization needed
|
||||
//
|
||||
// Threshold Calculations:
|
||||
// - Adaptive threshold = TargetLatency * (1.0 + AdaptiveThreshold)
|
||||
// - Default: 50ms * (1.0 + 0.8) = 90ms adaptive threshold
|
||||
// - Provides buffer above target before triggering optimization
|
||||
//
|
||||
// The function ensures real-time audio performance by monitoring multiple
|
||||
// latency metrics and triggering optimization callbacks when thresholds are exceeded.
|
||||
func (lm *LatencyMonitor) runOptimization() {
|
||||
metrics := lm.GetMetrics()
|
||||
|
||||
// Check if optimization is needed
|
||||
needsOptimization := false
|
||||
|
||||
// Check if current latency exceeds threshold
|
||||
if metrics.Current > lm.config.MaxLatency {
|
||||
needsOptimization = true
|
||||
lm.logger.Warn().Dur("current_latency", metrics.Current).Dur("max_latency", lm.config.MaxLatency).Msg("latency exceeds maximum threshold")
|
||||
}
|
||||
|
||||
// Check if average latency is above adaptive threshold
|
||||
adaptiveThreshold := time.Duration(float64(lm.config.TargetLatency.Nanoseconds()) * (1.0 + lm.config.AdaptiveThreshold))
|
||||
if metrics.Average > adaptiveThreshold {
|
||||
needsOptimization = true
|
||||
}
|
||||
|
||||
// Check if jitter is too high
|
||||
if metrics.Jitter > lm.config.JitterThreshold {
|
||||
needsOptimization = true
|
||||
}
|
||||
|
||||
if needsOptimization {
|
||||
atomic.StoreInt64(&lm.lastOptimization, time.Now().UnixNano())
|
||||
|
||||
// Run optimization callbacks
|
||||
lm.mutex.RLock()
|
||||
callbacks := make([]OptimizationCallback, len(lm.optimizationCallbacks))
|
||||
copy(callbacks, lm.optimizationCallbacks)
|
||||
lm.mutex.RUnlock()
|
||||
|
||||
for _, callback := range callbacks {
|
||||
if err := callback(metrics); err != nil {
|
||||
lm.logger.Error().Err(err).Msg("optimization callback failed")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// calculateTrend analyzes recent latency measurements to determine trend
|
||||
func (lm *LatencyMonitor) calculateTrend() LatencyTrend {
|
||||
lm.historyMutex.RLock()
|
||||
defer lm.historyMutex.RUnlock()
|
||||
|
||||
if len(lm.latencyHistory) < 10 {
|
||||
return LatencyTrendStable
|
||||
}
|
||||
|
||||
// Analyze last 10 measurements
|
||||
recentMeasurements := lm.latencyHistory[len(lm.latencyHistory)-10:]
|
||||
|
||||
var increasing, decreasing int
|
||||
for i := 1; i < len(recentMeasurements); i++ {
|
||||
if recentMeasurements[i].Latency > recentMeasurements[i-1].Latency {
|
||||
increasing++
|
||||
} else if recentMeasurements[i].Latency < recentMeasurements[i-1].Latency {
|
||||
decreasing++
|
||||
}
|
||||
}
|
||||
|
||||
// Determine trend based on direction changes
|
||||
if increasing > 6 {
|
||||
return LatencyTrendIncreasing
|
||||
} else if decreasing > 6 {
|
||||
return LatencyTrendDecreasing
|
||||
} else if increasing+decreasing > 7 {
|
||||
return LatencyTrendVolatile
|
||||
}
|
||||
|
||||
return LatencyTrendStable
|
||||
}
|
||||
|
||||
// GetLatencyHistory returns a copy of recent latency measurements
|
||||
func (lm *LatencyMonitor) GetLatencyHistory() []LatencyMeasurement {
|
||||
lm.historyMutex.RLock()
|
||||
defer lm.historyMutex.RUnlock()
|
||||
|
||||
history := make([]LatencyMeasurement, len(lm.latencyHistory))
|
||||
copy(history, lm.latencyHistory)
|
||||
return history
|
||||
}
|
|
@ -330,6 +330,11 @@ func GetMicrophoneConfig() AudioConfig {
|
|||
return currentMicrophoneConfig
|
||||
}
|
||||
|
||||
// GetGlobalAudioMetrics returns the current global audio metrics
|
||||
func GetGlobalAudioMetrics() AudioMetrics {
|
||||
return metrics
|
||||
}
|
||||
|
||||
// Batched metrics to reduce atomic operations frequency
|
||||
var (
|
||||
batchedFramesReceived int64
|
||||
|
|
Loading…
Reference in New Issue