diff --git a/internal/audio/adaptive_optimizer.go b/internal/audio/adaptive_optimizer.go index 89fdf70..eb03b75 100644 --- a/internal/audio/adaptive_optimizer.go +++ b/internal/audio/adaptive_optimizer.go @@ -45,7 +45,7 @@ func DefaultOptimizerConfig() OptimizerConfig { CooldownPeriod: GetConfig().CooldownPeriod, Aggressiveness: GetConfig().OptimizerAggressiveness, RollbackThreshold: GetConfig().RollbackThreshold, - StabilityPeriod: 10 * time.Second, + StabilityPeriod: GetConfig().AdaptiveOptimizerStability, } } diff --git a/internal/audio/atomic_utils.go b/internal/audio/atomic_utils.go new file mode 100644 index 0000000..0a89819 --- /dev/null +++ b/internal/audio/atomic_utils.go @@ -0,0 +1,204 @@ +package audio + +import ( + "sync/atomic" + "time" +) + +// AtomicCounter provides thread-safe counter operations +type AtomicCounter struct { + value int64 +} + +// NewAtomicCounter creates a new atomic counter +func NewAtomicCounter() *AtomicCounter { + return &AtomicCounter{} +} + +// Add atomically adds delta to the counter and returns the new value +func (c *AtomicCounter) Add(delta int64) int64 { + return atomic.AddInt64(&c.value, delta) +} + +// Increment atomically increments the counter by 1 +func (c *AtomicCounter) Increment() int64 { + return atomic.AddInt64(&c.value, 1) +} + +// Load atomically loads the counter value +func (c *AtomicCounter) Load() int64 { + return atomic.LoadInt64(&c.value) +} + +// Store atomically stores a new value +func (c *AtomicCounter) Store(value int64) { + atomic.StoreInt64(&c.value, value) +} + +// Reset atomically resets the counter to zero +func (c *AtomicCounter) Reset() { + atomic.StoreInt64(&c.value, 0) +} + +// Swap atomically swaps the value and returns the old value +func (c *AtomicCounter) Swap(new int64) int64 { + return atomic.SwapInt64(&c.value, new) +} + +// FrameMetrics provides common frame tracking metrics +type FrameMetrics struct { + Total *AtomicCounter + Dropped *AtomicCounter + Bytes *AtomicCounter +} + +// NewFrameMetrics creates a new frame metrics tracker +func NewFrameMetrics() *FrameMetrics { + return &FrameMetrics{ + Total: NewAtomicCounter(), + Dropped: NewAtomicCounter(), + Bytes: NewAtomicCounter(), + } +} + +// RecordFrame atomically records a successful frame with its size +func (fm *FrameMetrics) RecordFrame(size int64) { + fm.Total.Increment() + fm.Bytes.Add(size) +} + +// RecordDrop atomically records a dropped frame +func (fm *FrameMetrics) RecordDrop() { + fm.Dropped.Increment() +} + +// GetStats returns current metrics values +func (fm *FrameMetrics) GetStats() (total, dropped, bytes int64) { + return fm.Total.Load(), fm.Dropped.Load(), fm.Bytes.Load() +} + +// Reset resets all metrics to zero +func (fm *FrameMetrics) Reset() { + fm.Total.Reset() + fm.Dropped.Reset() + fm.Bytes.Reset() +} + +// GetDropRate calculates the drop rate as a percentage +func (fm *FrameMetrics) GetDropRate() float64 { + total := fm.Total.Load() + if total == 0 { + return 0.0 + } + dropped := fm.Dropped.Load() + return float64(dropped) / float64(total) * 100.0 +} + +// LatencyTracker provides atomic latency tracking +type LatencyTracker struct { + current *AtomicCounter + min *AtomicCounter + max *AtomicCounter + average *AtomicCounter + samples *AtomicCounter +} + +// NewLatencyTracker creates a new latency tracker +func NewLatencyTracker() *LatencyTracker { + lt := &LatencyTracker{ + current: NewAtomicCounter(), + min: NewAtomicCounter(), + max: NewAtomicCounter(), + average: NewAtomicCounter(), + samples: NewAtomicCounter(), + } + // Initialize min to max value so first measurement sets it properly + lt.min.Store(int64(^uint64(0) >> 1)) // Max int64 + return lt +} + +// RecordLatency atomically records a new latency measurement +func (lt *LatencyTracker) RecordLatency(latency time.Duration) { + latencyNanos := latency.Nanoseconds() + lt.current.Store(latencyNanos) + lt.samples.Increment() + + // Update min + for { + oldMin := lt.min.Load() + if latencyNanos >= oldMin { + break + } + if atomic.CompareAndSwapInt64(<.min.value, oldMin, latencyNanos) { + break + } + } + + // Update max + for { + oldMax := lt.max.Load() + if latencyNanos <= oldMax { + break + } + if atomic.CompareAndSwapInt64(<.max.value, oldMax, latencyNanos) { + break + } + } + + // Update average using exponential moving average + oldAvg := lt.average.Load() + newAvg := (oldAvg*7 + latencyNanos) / 8 // 87.5% weight to old average + lt.average.Store(newAvg) +} + +// GetLatencyStats returns current latency statistics +func (lt *LatencyTracker) GetLatencyStats() (current, min, max, average time.Duration, samples int64) { + return time.Duration(lt.current.Load()), + time.Duration(lt.min.Load()), + time.Duration(lt.max.Load()), + time.Duration(lt.average.Load()), + lt.samples.Load() +} + +// PoolMetrics provides common pool performance metrics +type PoolMetrics struct { + Hits *AtomicCounter + Misses *AtomicCounter +} + +// NewPoolMetrics creates a new pool metrics tracker +func NewPoolMetrics() *PoolMetrics { + return &PoolMetrics{ + Hits: NewAtomicCounter(), + Misses: NewAtomicCounter(), + } +} + +// RecordHit atomically records a pool hit +func (pm *PoolMetrics) RecordHit() { + pm.Hits.Increment() +} + +// RecordMiss atomically records a pool miss +func (pm *PoolMetrics) RecordMiss() { + pm.Misses.Increment() +} + +// GetHitRate calculates the hit rate as a percentage +func (pm *PoolMetrics) GetHitRate() float64 { + hits := pm.Hits.Load() + misses := pm.Misses.Load() + total := hits + misses + if total == 0 { + return 0.0 + } + return float64(hits) / float64(total) * 100.0 +} + +// GetStats returns hit and miss counts +func (pm *PoolMetrics) GetStats() (hits, misses int64, hitRate float64) { + hits = pm.Hits.Load() + misses = pm.Misses.Load() + hitRate = pm.GetHitRate() + return +} diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 5877d77..3b222ba 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -61,12 +61,15 @@ static volatile int capture_initialized = 0; static volatile int playback_initializing = 0; static volatile int playback_initialized = 0; -// Safe ALSA device opening with retry logic +// Enhanced ALSA device opening with exponential backoff retry logic static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) { - int attempts = 3; + int max_attempts = 5; // Increased from 3 to 5 + int attempt = 0; int err; + int backoff_us = sleep_microseconds; // Start with base sleep time + const int max_backoff_us = 500000; // Max 500ms backoff - while (attempts-- > 0) { + while (attempt < max_attempts) { err = snd_pcm_open(handle, device, stream, SND_PCM_NONBLOCK); if (err >= 0) { // Switch to blocking mode after successful open @@ -74,12 +77,26 @@ static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream return 0; } - if (err == -EBUSY && attempts > 0) { - // Device busy, wait and retry - usleep(sleep_microseconds); // 50ms - continue; + attempt++; + if (attempt >= max_attempts) break; + + // Enhanced error handling with specific retry strategies + if (err == -EBUSY || err == -EAGAIN) { + // Device busy or temporarily unavailable - retry with backoff + usleep(backoff_us); + backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us; + } else if (err == -ENODEV || err == -ENOENT) { + // Device not found - longer wait as device might be initializing + usleep(backoff_us * 2); + backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us; + } else if (err == -EPERM || err == -EACCES) { + // Permission denied - shorter wait, likely persistent issue + usleep(backoff_us / 2); + } else { + // Other errors - standard backoff + usleep(backoff_us); + backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us; } - break; } return err; } @@ -217,43 +234,90 @@ int jetkvm_audio_init() { return 0; } -// Read and encode one frame with enhanced error handling +// Read and encode one frame with robust error handling and recovery int jetkvm_audio_read_encode(void *opus_buf) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *out = (unsigned char*)opus_buf; int err = 0; + int recovery_attempts = 0; + const int max_recovery_attempts = 3; // Safety checks if (!capture_initialized || !pcm_handle || !encoder || !opus_buf) { return -1; } +retry_read: + ; int pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); - // Handle ALSA errors with enhanced recovery + // Handle ALSA errors with robust recovery strategies if (pcm_rc < 0) { if (pcm_rc == -EPIPE) { - // Buffer underrun - try to recover - err = snd_pcm_prepare(pcm_handle); - if (err < 0) return -1; + // Buffer underrun - implement progressive recovery + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + return -1; // Give up after max attempts + } - pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); - if (pcm_rc < 0) return -1; + // Try to recover with prepare + err = snd_pcm_prepare(pcm_handle); + if (err < 0) { + // If prepare fails, try drop and prepare + snd_pcm_drop(pcm_handle); + err = snd_pcm_prepare(pcm_handle); + if (err < 0) return -1; + } + + // Wait before retry to allow device to stabilize + usleep(sleep_microseconds * recovery_attempts); + goto retry_read; } else if (pcm_rc == -EAGAIN) { // No data available - return 0 to indicate no frame return 0; } else if (pcm_rc == -ESTRPIPE) { - // Device suspended, try to resume - while ((err = snd_pcm_resume(pcm_handle)) == -EAGAIN) { - usleep(sleep_microseconds); // Use centralized constant + // Device suspended, implement robust resume logic + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + return -1; + } + + // Try to resume with timeout + int resume_attempts = 0; + while ((err = snd_pcm_resume(pcm_handle)) == -EAGAIN && resume_attempts < 10) { + usleep(sleep_microseconds); + resume_attempts++; } if (err < 0) { + // Resume failed, try prepare as fallback err = snd_pcm_prepare(pcm_handle); if (err < 0) return -1; } - return 0; // Skip this frame + // Wait before retry to allow device to stabilize + usleep(sleep_microseconds * recovery_attempts); + return 0; // Skip this frame but don't fail + } else if (pcm_rc == -ENODEV) { + // Device disconnected - critical error + return -1; + } else if (pcm_rc == -EIO) { + // I/O error - try recovery once + recovery_attempts++; + if (recovery_attempts <= max_recovery_attempts) { + snd_pcm_drop(pcm_handle); + err = snd_pcm_prepare(pcm_handle); + if (err >= 0) { + usleep(sleep_microseconds); + goto retry_read; + } + } + return -1; } else { - // Other error - return error code + // Other errors - limited retry for transient issues + recovery_attempts++; + if (recovery_attempts <= 1 && (pcm_rc == -EINTR || pcm_rc == -EBUSY)) { + usleep(sleep_microseconds / 2); + goto retry_read; + } return -1; } } @@ -327,11 +391,13 @@ int jetkvm_audio_playback_init() { return 0; } -// Decode Opus and write PCM with enhanced error handling +// Decode Opus and write PCM with robust error handling and recovery int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *in = (unsigned char*)opus_buf; int err = 0; + int recovery_attempts = 0; + const int max_recovery_attempts = 3; // Safety checks if (!playback_initialized || !pcm_playback_handle || !decoder || !opus_buf || opus_size <= 0) { @@ -343,31 +409,91 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { return -1; } - // Decode Opus to PCM + // Decode Opus to PCM with error handling int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); - if (pcm_frames < 0) return -1; + if (pcm_frames < 0) { + // Try packet loss concealment on decode error + pcm_frames = opus_decode(decoder, NULL, 0, pcm_buffer, frame_size, 0); + if (pcm_frames < 0) return -1; + } - // Write PCM to playback device with enhanced recovery +retry_write: + ; + // Write PCM to playback device with robust recovery int pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); if (pcm_rc < 0) { if (pcm_rc == -EPIPE) { - // Buffer underrun - try to recover - err = snd_pcm_prepare(pcm_playback_handle); - if (err < 0) return -2; - - pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); - } else if (pcm_rc == -ESTRPIPE) { - // Device suspended, try to resume - while ((err = snd_pcm_resume(pcm_playback_handle)) == -EAGAIN) { - usleep(sleep_microseconds); // Use centralized constant + // Buffer underrun - implement progressive recovery + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + return -2; } + + // Try to recover with prepare + err = snd_pcm_prepare(pcm_playback_handle); if (err < 0) { + // If prepare fails, try drop and prepare + snd_pcm_drop(pcm_playback_handle); err = snd_pcm_prepare(pcm_playback_handle); if (err < 0) return -2; } - return 0; // Skip this frame + + // Wait before retry to allow device to stabilize + usleep(sleep_microseconds * recovery_attempts); + goto retry_write; + } else if (pcm_rc == -ESTRPIPE) { + // Device suspended, implement robust resume logic + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + return -2; + } + + // Try to resume with timeout + int resume_attempts = 0; + while ((err = snd_pcm_resume(pcm_playback_handle)) == -EAGAIN && resume_attempts < 10) { + usleep(sleep_microseconds); + resume_attempts++; + } + if (err < 0) { + // Resume failed, try prepare as fallback + err = snd_pcm_prepare(pcm_playback_handle); + if (err < 0) return -2; + } + // Wait before retry to allow device to stabilize + usleep(sleep_microseconds * recovery_attempts); + return 0; // Skip this frame but don't fail + } else if (pcm_rc == -ENODEV) { + // Device disconnected - critical error + return -2; + } else if (pcm_rc == -EIO) { + // I/O error - try recovery once + recovery_attempts++; + if (recovery_attempts <= max_recovery_attempts) { + snd_pcm_drop(pcm_playback_handle); + err = snd_pcm_prepare(pcm_playback_handle); + if (err >= 0) { + usleep(sleep_microseconds); + goto retry_write; + } + } + return -2; + } else if (pcm_rc == -EAGAIN) { + // Device not ready - brief wait and retry + recovery_attempts++; + if (recovery_attempts <= max_recovery_attempts) { + usleep(sleep_microseconds / 4); + goto retry_write; + } + return -2; + } else { + // Other errors - limited retry for transient issues + recovery_attempts++; + if (recovery_attempts <= 1 && (pcm_rc == -EINTR || pcm_rc == -EBUSY)) { + usleep(sleep_microseconds / 2); + goto retry_write; + } + return -2; } - if (pcm_rc < 0) return -2; } return pcm_frames; diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index e5d9b4c..1369541 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -1540,6 +1540,49 @@ type AudioConfigConstants struct { // Impact: Prevents excessive channel counts that could impact performance. // Default 8 channels provides reasonable upper bound for multi-channel audio. MaxChannels int + + // Device Health Monitoring Configuration + // Used in: device_health.go for proactive device monitoring and recovery + // Impact: Controls health check frequency and recovery thresholds + + // HealthCheckIntervalMS defines interval between device health checks in milliseconds. + // Used in: DeviceHealthMonitor for periodic health assessment + // Impact: Lower values provide faster detection but increase CPU usage. + // Default 5000ms (5s) provides good balance between responsiveness and overhead. + HealthCheckIntervalMS int + + // HealthRecoveryThreshold defines number of consecutive successful operations + // required to mark a device as healthy after being unhealthy. + // Used in: DeviceHealthMonitor for recovery state management + // Impact: Higher values prevent premature recovery declarations. + // Default 3 consecutive successes ensures stable recovery. + HealthRecoveryThreshold int + + // HealthLatencyThresholdMS defines maximum acceptable latency in milliseconds + // before considering a device unhealthy. + // Used in: DeviceHealthMonitor for latency-based health assessment + // Impact: Lower values trigger recovery sooner but may cause false positives. + // Default 100ms provides reasonable threshold for real-time audio. + HealthLatencyThresholdMS int + + // HealthErrorRateLimit defines maximum error rate (0.0-1.0) before + // considering a device unhealthy. + // Used in: DeviceHealthMonitor for error rate assessment + // Impact: Lower values trigger recovery sooner for error-prone devices. + // Default 0.1 (10%) allows some transient errors while detecting problems. + HealthErrorRateLimit float64 + + // Latency Histogram Bucket Configuration + // Used in: LatencyHistogram for granular latency measurement buckets + // Impact: Defines the boundaries for latency distribution analysis + LatencyBucket10ms time.Duration // 10ms latency bucket + LatencyBucket25ms time.Duration // 25ms latency bucket + LatencyBucket50ms time.Duration // 50ms latency bucket + LatencyBucket100ms time.Duration // 100ms latency bucket + LatencyBucket250ms time.Duration // 250ms latency bucket + LatencyBucket500ms time.Duration // 500ms latency bucket + LatencyBucket1s time.Duration // 1s latency bucket + LatencyBucket2s time.Duration // 2s latency bucket } // DefaultAudioConfig returns the default configuration constants @@ -2563,6 +2606,22 @@ func DefaultAudioConfig() *AudioConfigConstants { MinSampleRate: 8000, // 8kHz minimum sample rate MaxSampleRate: 48000, // 48kHz maximum sample rate MaxChannels: 8, // 8 maximum audio channels + + // Device Health Monitoring Configuration + HealthCheckIntervalMS: 5000, // 5000ms (5s) health check interval + HealthRecoveryThreshold: 3, // 3 consecutive successes for recovery + HealthLatencyThresholdMS: 100, // 100ms latency threshold for health + HealthErrorRateLimit: 0.1, // 10% error rate limit for health + + // Latency Histogram Bucket Configuration + LatencyBucket10ms: 10 * time.Millisecond, // 10ms latency bucket + LatencyBucket25ms: 25 * time.Millisecond, // 25ms latency bucket + LatencyBucket50ms: 50 * time.Millisecond, // 50ms latency bucket + LatencyBucket100ms: 100 * time.Millisecond, // 100ms latency bucket + LatencyBucket250ms: 250 * time.Millisecond, // 250ms latency bucket + LatencyBucket500ms: 500 * time.Millisecond, // 500ms latency bucket + LatencyBucket1s: 1 * time.Second, // 1s latency bucket + LatencyBucket2s: 2 * time.Second, // 2s latency bucket } } diff --git a/internal/audio/device_health.go b/internal/audio/device_health.go new file mode 100644 index 0000000..8ed1c78 --- /dev/null +++ b/internal/audio/device_health.go @@ -0,0 +1,514 @@ +package audio + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// DeviceHealthStatus represents the health status of an audio device +type DeviceHealthStatus int + +const ( + DeviceHealthUnknown DeviceHealthStatus = iota + DeviceHealthHealthy + DeviceHealthDegraded + DeviceHealthFailing + DeviceHealthCritical +) + +func (s DeviceHealthStatus) String() string { + switch s { + case DeviceHealthHealthy: + return "healthy" + case DeviceHealthDegraded: + return "degraded" + case DeviceHealthFailing: + return "failing" + case DeviceHealthCritical: + return "critical" + default: + return "unknown" + } +} + +// DeviceHealthMetrics tracks health-related metrics for audio devices +type DeviceHealthMetrics struct { + // Error tracking + ConsecutiveErrors int64 `json:"consecutive_errors"` + TotalErrors int64 `json:"total_errors"` + LastErrorTime time.Time `json:"last_error_time"` + ErrorRate float64 `json:"error_rate"` // errors per minute + + // Performance metrics + AverageLatency time.Duration `json:"average_latency"` + MaxLatency time.Duration `json:"max_latency"` + LatencySpikes int64 `json:"latency_spikes"` + Underruns int64 `json:"underruns"` + Overruns int64 `json:"overruns"` + + // Device availability + LastSuccessfulOp time.Time `json:"last_successful_op"` + DeviceDisconnects int64 `json:"device_disconnects"` + RecoveryAttempts int64 `json:"recovery_attempts"` + SuccessfulRecoveries int64 `json:"successful_recoveries"` + + // Health assessment + CurrentStatus DeviceHealthStatus `json:"current_status"` + StatusLastChanged time.Time `json:"status_last_changed"` + HealthScore float64 `json:"health_score"` // 0.0 to 1.0 +} + +// DeviceHealthMonitor monitors the health of audio devices and triggers recovery +type DeviceHealthMonitor struct { + // Atomic fields first for ARM32 alignment + running int32 + monitoringEnabled int32 + + // Configuration + checkInterval time.Duration + recoveryThreshold int + latencyThreshold time.Duration + errorRateLimit float64 // max errors per minute + + // State tracking + captureMetrics *DeviceHealthMetrics + playbackMetrics *DeviceHealthMetrics + mutex sync.RWMutex + + // Control channels + ctx context.Context + cancel context.CancelFunc + stopChan chan struct{} + doneChan chan struct{} + + // Recovery callbacks + recoveryCallbacks map[string]func() error + callbackMutex sync.RWMutex + + // Logging + logger zerolog.Logger + config *AudioConfigConstants +} + +// NewDeviceHealthMonitor creates a new device health monitor +func NewDeviceHealthMonitor() *DeviceHealthMonitor { + ctx, cancel := context.WithCancel(context.Background()) + config := GetConfig() + + return &DeviceHealthMonitor{ + checkInterval: time.Duration(config.HealthCheckIntervalMS) * time.Millisecond, + recoveryThreshold: config.HealthRecoveryThreshold, + latencyThreshold: time.Duration(config.HealthLatencyThresholdMS) * time.Millisecond, + errorRateLimit: config.HealthErrorRateLimit, + captureMetrics: &DeviceHealthMetrics{ + CurrentStatus: DeviceHealthUnknown, + HealthScore: 1.0, + }, + playbackMetrics: &DeviceHealthMetrics{ + CurrentStatus: DeviceHealthUnknown, + HealthScore: 1.0, + }, + ctx: ctx, + cancel: cancel, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + recoveryCallbacks: make(map[string]func() error), + logger: logging.GetDefaultLogger().With().Str("component", "device-health-monitor").Logger(), + config: config, + } +} + +// Start begins health monitoring +func (dhm *DeviceHealthMonitor) Start() error { + if !atomic.CompareAndSwapInt32(&dhm.running, 0, 1) { + return fmt.Errorf("device health monitor already running") + } + + dhm.logger.Info().Msg("starting device health monitor") + atomic.StoreInt32(&dhm.monitoringEnabled, 1) + + go dhm.monitoringLoop() + return nil +} + +// Stop stops health monitoring +func (dhm *DeviceHealthMonitor) Stop() { + if !atomic.CompareAndSwapInt32(&dhm.running, 1, 0) { + return + } + + dhm.logger.Info().Msg("stopping device health monitor") + atomic.StoreInt32(&dhm.monitoringEnabled, 0) + + close(dhm.stopChan) + dhm.cancel() + + // Wait for monitoring loop to finish + select { + case <-dhm.doneChan: + dhm.logger.Info().Msg("device health monitor stopped") + case <-time.After(time.Duration(dhm.config.SupervisorTimeout)): + dhm.logger.Warn().Msg("device health monitor stop timeout") + } +} + +// RegisterRecoveryCallback registers a recovery function for a specific component +func (dhm *DeviceHealthMonitor) RegisterRecoveryCallback(component string, callback func() error) { + dhm.callbackMutex.Lock() + defer dhm.callbackMutex.Unlock() + dhm.recoveryCallbacks[component] = callback + dhm.logger.Info().Str("component", component).Msg("registered recovery callback") +} + +// RecordError records an error for health tracking +func (dhm *DeviceHealthMonitor) RecordError(deviceType string, err error) { + if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { + return + } + + dhm.mutex.Lock() + defer dhm.mutex.Unlock() + + var metrics *DeviceHealthMetrics + switch deviceType { + case "capture": + metrics = dhm.captureMetrics + case "playback": + metrics = dhm.playbackMetrics + default: + dhm.logger.Warn().Str("device_type", deviceType).Msg("unknown device type for error recording") + return + } + + atomic.AddInt64(&metrics.ConsecutiveErrors, 1) + atomic.AddInt64(&metrics.TotalErrors, 1) + metrics.LastErrorTime = time.Now() + + // Update error rate (errors per minute) + if !metrics.LastErrorTime.IsZero() { + timeSinceFirst := time.Since(metrics.LastErrorTime) + if timeSinceFirst > 0 { + metrics.ErrorRate = float64(metrics.TotalErrors) / timeSinceFirst.Minutes() + } + } + + dhm.logger.Debug(). + Str("device_type", deviceType). + Err(err). + Int64("consecutive_errors", metrics.ConsecutiveErrors). + Float64("error_rate", metrics.ErrorRate). + Msg("recorded device error") + + // Trigger immediate health assessment + dhm.assessDeviceHealth(deviceType, metrics) +} + +// RecordSuccess records a successful operation +func (dhm *DeviceHealthMonitor) RecordSuccess(deviceType string) { + if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { + return + } + + dhm.mutex.Lock() + defer dhm.mutex.Unlock() + + var metrics *DeviceHealthMetrics + switch deviceType { + case "capture": + metrics = dhm.captureMetrics + case "playback": + metrics = dhm.playbackMetrics + default: + return + } + + // Reset consecutive errors on success + atomic.StoreInt64(&metrics.ConsecutiveErrors, 0) + metrics.LastSuccessfulOp = time.Now() + + // Improve health score gradually + if metrics.HealthScore < 1.0 { + metrics.HealthScore = min(1.0, metrics.HealthScore+0.1) + } +} + +// RecordLatency records operation latency for health assessment +func (dhm *DeviceHealthMonitor) RecordLatency(deviceType string, latency time.Duration) { + if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { + return + } + + dhm.mutex.Lock() + defer dhm.mutex.Unlock() + + var metrics *DeviceHealthMetrics + switch deviceType { + case "capture": + metrics = dhm.captureMetrics + case "playback": + metrics = dhm.playbackMetrics + default: + return + } + + // Update latency metrics + if metrics.AverageLatency == 0 { + metrics.AverageLatency = latency + } else { + // Exponential moving average + metrics.AverageLatency = time.Duration(float64(metrics.AverageLatency)*0.9 + float64(latency)*0.1) + } + + if latency > metrics.MaxLatency { + metrics.MaxLatency = latency + } + + // Track latency spikes + if latency > dhm.latencyThreshold { + atomic.AddInt64(&metrics.LatencySpikes, 1) + } +} + +// RecordUnderrun records an audio underrun event +func (dhm *DeviceHealthMonitor) RecordUnderrun(deviceType string) { + if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { + return + } + + dhm.mutex.Lock() + defer dhm.mutex.Unlock() + + var metrics *DeviceHealthMetrics + switch deviceType { + case "capture": + metrics = dhm.captureMetrics + case "playback": + metrics = dhm.playbackMetrics + default: + return + } + + atomic.AddInt64(&metrics.Underruns, 1) + dhm.logger.Debug().Str("device_type", deviceType).Msg("recorded audio underrun") +} + +// RecordOverrun records an audio overrun event +func (dhm *DeviceHealthMonitor) RecordOverrun(deviceType string) { + if atomic.LoadInt32(&dhm.monitoringEnabled) == 0 { + return + } + + dhm.mutex.Lock() + defer dhm.mutex.Unlock() + + var metrics *DeviceHealthMetrics + switch deviceType { + case "capture": + metrics = dhm.captureMetrics + case "playback": + metrics = dhm.playbackMetrics + default: + return + } + + atomic.AddInt64(&metrics.Overruns, 1) + dhm.logger.Debug().Str("device_type", deviceType).Msg("recorded audio overrun") +} + +// GetHealthMetrics returns current health metrics +func (dhm *DeviceHealthMonitor) GetHealthMetrics() (capture, playback DeviceHealthMetrics) { + dhm.mutex.RLock() + defer dhm.mutex.RUnlock() + return *dhm.captureMetrics, *dhm.playbackMetrics +} + +// monitoringLoop runs the main health monitoring loop +func (dhm *DeviceHealthMonitor) monitoringLoop() { + defer close(dhm.doneChan) + + ticker := time.NewTicker(dhm.checkInterval) + defer ticker.Stop() + + for { + select { + case <-dhm.stopChan: + return + case <-dhm.ctx.Done(): + return + case <-ticker.C: + dhm.performHealthCheck() + } + } +} + +// performHealthCheck performs a comprehensive health check +func (dhm *DeviceHealthMonitor) performHealthCheck() { + dhm.mutex.Lock() + defer dhm.mutex.Unlock() + + // Assess health for both devices + dhm.assessDeviceHealth("capture", dhm.captureMetrics) + dhm.assessDeviceHealth("playback", dhm.playbackMetrics) + + // Check if recovery is needed + dhm.checkRecoveryNeeded("capture", dhm.captureMetrics) + dhm.checkRecoveryNeeded("playback", dhm.playbackMetrics) +} + +// assessDeviceHealth assesses the health status of a device +func (dhm *DeviceHealthMonitor) assessDeviceHealth(deviceType string, metrics *DeviceHealthMetrics) { + previousStatus := metrics.CurrentStatus + newStatus := dhm.calculateHealthStatus(metrics) + + if newStatus != previousStatus { + metrics.CurrentStatus = newStatus + metrics.StatusLastChanged = time.Now() + dhm.logger.Info(). + Str("device_type", deviceType). + Str("previous_status", previousStatus.String()). + Str("new_status", newStatus.String()). + Float64("health_score", metrics.HealthScore). + Msg("device health status changed") + } + + // Update health score + metrics.HealthScore = dhm.calculateHealthScore(metrics) +} + +// calculateHealthStatus determines health status based on metrics +func (dhm *DeviceHealthMonitor) calculateHealthStatus(metrics *DeviceHealthMetrics) DeviceHealthStatus { + consecutiveErrors := atomic.LoadInt64(&metrics.ConsecutiveErrors) + totalErrors := atomic.LoadInt64(&metrics.TotalErrors) + + // Critical: Too many consecutive errors or device disconnected recently + if consecutiveErrors >= int64(dhm.recoveryThreshold) { + return DeviceHealthCritical + } + + // Critical: No successful operations in a long time + if !metrics.LastSuccessfulOp.IsZero() && time.Since(metrics.LastSuccessfulOp) > time.Duration(dhm.config.SupervisorTimeout) { + return DeviceHealthCritical + } + + // Failing: High error rate or frequent latency spikes + if metrics.ErrorRate > dhm.errorRateLimit || atomic.LoadInt64(&metrics.LatencySpikes) > int64(dhm.config.MaxDroppedFrames) { + return DeviceHealthFailing + } + + // Degraded: Some errors or performance issues + if consecutiveErrors > 0 || totalErrors > int64(dhm.config.MaxDroppedFrames/2) || metrics.AverageLatency > dhm.latencyThreshold { + return DeviceHealthDegraded + } + + // Healthy: No significant issues + return DeviceHealthHealthy +} + +// calculateHealthScore calculates a numeric health score (0.0 to 1.0) +func (dhm *DeviceHealthMonitor) calculateHealthScore(metrics *DeviceHealthMetrics) float64 { + score := 1.0 + + // Penalize consecutive errors + consecutiveErrors := atomic.LoadInt64(&metrics.ConsecutiveErrors) + if consecutiveErrors > 0 { + score -= float64(consecutiveErrors) * 0.1 + } + + // Penalize high error rate + if metrics.ErrorRate > 0 { + score -= min(0.5, metrics.ErrorRate/dhm.errorRateLimit*0.5) + } + + // Penalize high latency + if metrics.AverageLatency > dhm.latencyThreshold { + excess := float64(metrics.AverageLatency-dhm.latencyThreshold) / float64(dhm.latencyThreshold) + score -= min(0.3, excess*0.3) + } + + // Penalize underruns/overruns + underruns := atomic.LoadInt64(&metrics.Underruns) + overruns := atomic.LoadInt64(&metrics.Overruns) + if underruns+overruns > 0 { + score -= min(0.2, float64(underruns+overruns)*0.01) + } + + return max(0.0, score) +} + +// checkRecoveryNeeded checks if recovery is needed and triggers it +func (dhm *DeviceHealthMonitor) checkRecoveryNeeded(deviceType string, metrics *DeviceHealthMetrics) { + if metrics.CurrentStatus == DeviceHealthCritical { + dhm.triggerRecovery(deviceType, metrics) + } +} + +// triggerRecovery triggers recovery for a device +func (dhm *DeviceHealthMonitor) triggerRecovery(deviceType string, metrics *DeviceHealthMetrics) { + atomic.AddInt64(&metrics.RecoveryAttempts, 1) + + dhm.logger.Warn(). + Str("device_type", deviceType). + Str("status", metrics.CurrentStatus.String()). + Int64("consecutive_errors", atomic.LoadInt64(&metrics.ConsecutiveErrors)). + Float64("error_rate", metrics.ErrorRate). + Msg("triggering device recovery") + + // Try registered recovery callbacks + dhm.callbackMutex.RLock() + defer dhm.callbackMutex.RUnlock() + + for component, callback := range dhm.recoveryCallbacks { + if callback != nil { + go func(comp string, cb func() error) { + if err := cb(); err != nil { + dhm.logger.Error(). + Str("component", comp). + Str("device_type", deviceType). + Err(err). + Msg("recovery callback failed") + } else { + atomic.AddInt64(&metrics.SuccessfulRecoveries, 1) + dhm.logger.Info(). + Str("component", comp). + Str("device_type", deviceType). + Msg("recovery callback succeeded") + } + }(component, callback) + } + } +} + +// Global device health monitor instance +var ( + globalDeviceHealthMonitor *DeviceHealthMonitor + deviceHealthOnce sync.Once +) + +// GetDeviceHealthMonitor returns the global device health monitor +func GetDeviceHealthMonitor() *DeviceHealthMonitor { + deviceHealthOnce.Do(func() { + globalDeviceHealthMonitor = NewDeviceHealthMonitor() + }) + return globalDeviceHealthMonitor +} + +// Helper functions for min/max +func min(a, b float64) float64 { + if a < b { + return a + } + return b +} + +func max(a, b float64) float64 { + if a > b { + return a + } + return b +} diff --git a/internal/audio/granular_metrics.go b/internal/audio/granular_metrics.go index f9eeecc..50fbb7d 100644 --- a/internal/audio/granular_metrics.go +++ b/internal/audio/granular_metrics.go @@ -93,18 +93,18 @@ type BufferPoolEfficiencyTracker struct { // NewLatencyHistogram creates a new latency histogram with predefined buckets func NewLatencyHistogram(maxSamples int, logger zerolog.Logger) *LatencyHistogram { - // Define latency buckets: 1ms, 5ms, 10ms, 25ms, 50ms, 100ms, 250ms, 500ms, 1s, 2s+ + // Define latency buckets using configuration constants buckets := []int64{ int64(1 * time.Millisecond), int64(5 * time.Millisecond), - int64(10 * time.Millisecond), - int64(25 * time.Millisecond), - int64(50 * time.Millisecond), - int64(100 * time.Millisecond), - int64(250 * time.Millisecond), - int64(500 * time.Millisecond), - int64(1 * time.Second), - int64(2 * time.Second), + int64(GetConfig().LatencyBucket10ms), + int64(GetConfig().LatencyBucket25ms), + int64(GetConfig().LatencyBucket50ms), + int64(GetConfig().LatencyBucket100ms), + int64(GetConfig().LatencyBucket250ms), + int64(GetConfig().LatencyBucket500ms), + int64(GetConfig().LatencyBucket1s), + int64(GetConfig().LatencyBucket2s), } return &LatencyHistogram{ diff --git a/internal/audio/input.go b/internal/audio/input.go index 1b2c875..52849a9 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -10,10 +10,10 @@ import ( // AudioInputMetrics holds metrics for microphone input type AudioInputMetrics struct { - FramesSent int64 - FramesDropped int64 - BytesProcessed int64 - ConnectionDrops int64 + FramesSent int64 // Total frames sent + FramesDropped int64 // Total frames dropped + BytesProcessed int64 // Total bytes processed + ConnectionDrops int64 // Connection drops AverageLatency time.Duration // time.Duration is int64 LastFrameTime time.Time } diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 489cc16..6dbd20d 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -13,6 +13,7 @@ import ( "time" "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" ) var ( @@ -99,16 +100,15 @@ var globalMessagePool = &MessagePool{ var messagePoolInitOnce sync.Once -// initializeMessagePool initializes the message pool with pre-allocated messages +// initializeMessagePool initializes the global message pool with pre-allocated messages func initializeMessagePool() { messagePoolInitOnce.Do(func() { - // Pre-allocate 30% of pool size for immediate availability - preallocSize := messagePoolSize * GetConfig().InputPreallocPercentage / 100 + preallocSize := messagePoolSize / 4 // 25% pre-allocated for immediate use globalMessagePool.preallocSize = preallocSize globalMessagePool.maxPoolSize = messagePoolSize * GetConfig().PoolGrowthMultiplier // Allow growth up to 2x globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize) - // Pre-allocate messages to reduce initial allocation overhead + // Pre-allocate messages for immediate use for i := 0; i < preallocSize; i++ { msg := &OptimizedIPCMessage{ data: make([]byte, 0, maxFrameSize), @@ -116,7 +116,7 @@ func initializeMessagePool() { globalMessagePool.preallocated = append(globalMessagePool.preallocated, msg) } - // Fill the channel pool with remaining messages + // Fill the channel with remaining messages for i := preallocSize; i < messagePoolSize; i++ { globalMessagePool.pool <- &OptimizedIPCMessage{ data: make([]byte, 0, maxFrameSize), @@ -488,33 +488,13 @@ func (ais *AudioInputServer) sendAck() error { return ais.writeMessage(ais.conn, msg) } -// writeMessage writes a message to the connection using optimized buffers +// Global shared message pool for input IPC server +var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize) + +// writeMessage writes a message to the connection using shared common utilities func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error { - // Get optimized message from pool for header preparation - optMsg := globalMessagePool.Get() - defer globalMessagePool.Put(optMsg) - - // Prepare header in pre-allocated buffer - binary.LittleEndian.PutUint32(optMsg.header[0:4], msg.Magic) - optMsg.header[4] = byte(msg.Type) - binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.Length) - binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.Timestamp)) - - // Write header - _, err := conn.Write(optMsg.header[:]) - if err != nil { - return err - } - - // Write data if present - if msg.Length > 0 && msg.Data != nil { - _, err = conn.Write(msg.Data) - if err != nil { - return err - } - } - - return nil + // Use shared WriteIPCMessage function with global message pool + return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames) } // AudioInputClient handles IPC communication from the main process @@ -706,21 +686,15 @@ func (aic *AudioInputClient) SendHeartbeat() error { } // writeMessage writes a message to the server +// Global shared message pool for input IPC clients +var globalInputMessagePool = NewGenericMessagePool(messagePoolSize) + func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error { // Increment total frames counter atomic.AddInt64(&aic.totalFrames, 1) - // Use common write function with shared message pool - sharedPool := &GenericMessagePool{ - pool: make(chan *OptimizedMessage, messagePoolSize), - hitCount: globalMessagePool.hitCount, - missCount: globalMessagePool.missCount, - preallocated: make([]*OptimizedMessage, 0), - preallocSize: messagePoolSize / 4, - maxPoolSize: messagePoolSize, - } - - return WriteIPCMessage(aic.conn, msg, sharedPool, &aic.droppedFrames) + // Use shared WriteIPCMessage function with global message pool + return WriteIPCMessage(aic.conn, msg, globalInputMessagePool, &aic.droppedFrames) } // IsConnected returns whether the client is connected @@ -752,6 +726,17 @@ func (ais *AudioInputServer) startReaderGoroutine() { ais.wg.Add(1) go func() { defer ais.wg.Done() + + // Enhanced error tracking and recovery + var consecutiveErrors int + var lastErrorTime time.Time + maxConsecutiveErrors := GetConfig().MaxConsecutiveErrors + errorResetWindow := GetConfig().RestartWindow // Use existing restart window + baseBackoffDelay := GetConfig().RetryDelay + maxBackoffDelay := GetConfig().MaxRetryDelay + + logger := logging.GetDefaultLogger().With().Str("component", "audio-input-reader").Logger() + for { select { case <-ais.stopChan: @@ -760,8 +745,55 @@ func (ais *AudioInputServer) startReaderGoroutine() { if ais.conn != nil { msg, err := ais.readMessage(ais.conn) if err != nil { - continue // Connection error, retry + // Enhanced error handling with progressive backoff + now := time.Now() + + // Reset error counter if enough time has passed + if now.Sub(lastErrorTime) > errorResetWindow { + consecutiveErrors = 0 + } + + consecutiveErrors++ + lastErrorTime = now + + // Log error with context + logger.Warn().Err(err). + Int("consecutive_errors", consecutiveErrors). + Msg("Failed to read message from input connection") + + // Progressive backoff based on error count + if consecutiveErrors > 1 { + backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay + if backoffDelay > maxBackoffDelay { + backoffDelay = maxBackoffDelay + } + time.Sleep(backoffDelay) + } + + // If too many consecutive errors, close connection to force reconnect + if consecutiveErrors >= maxConsecutiveErrors { + logger.Error(). + Int("consecutive_errors", consecutiveErrors). + Msg("Too many consecutive read errors, closing connection") + + ais.mtx.Lock() + if ais.conn != nil { + ais.conn.Close() + ais.conn = nil + } + ais.mtx.Unlock() + + consecutiveErrors = 0 // Reset for next connection + } + continue } + + // Reset error counter on successful read + if consecutiveErrors > 0 { + consecutiveErrors = 0 + logger.Info().Msg("Input connection recovered") + } + // Send to message channel with non-blocking write select { case ais.messageChan <- msg: @@ -769,7 +801,11 @@ func (ais *AudioInputServer) startReaderGoroutine() { default: // Channel full, drop message atomic.AddInt64(&ais.droppedFrames, 1) + logger.Warn().Msg("Message channel full, dropping frame") } + } else { + // No connection, wait briefly before checking again + time.Sleep(GetConfig().DefaultSleepDuration) } } } @@ -794,40 +830,105 @@ func (ais *AudioInputServer) startProcessorGoroutine() { } }() + // Enhanced error tracking for processing + var processingErrors int + var lastProcessingError time.Time + maxProcessingErrors := GetConfig().MaxConsecutiveErrors + errorResetWindow := GetConfig().RestartWindow + defer ais.wg.Done() for { select { case <-ais.stopChan: return case msg := <-ais.messageChan: - // Intelligent frame dropping: prioritize recent frames - if msg.Type == InputMessageTypeOpusFrame { - // Check if processing queue is getting full - queueLen := len(ais.processChan) - bufferSize := int(atomic.LoadInt64(&ais.bufferSize)) + // Process message with error handling + start := time.Now() + err := ais.processMessageWithRecovery(msg, logger) + processingTime := time.Since(start) - if queueLen > bufferSize*3/4 { - // Drop oldest frames, keep newest - select { - case <-ais.processChan: // Remove oldest - atomic.AddInt64(&ais.droppedFrames, 1) - default: - } + if err != nil { + // Track processing errors + now := time.Now() + if now.Sub(lastProcessingError) > errorResetWindow { + processingErrors = 0 } + + processingErrors++ + lastProcessingError = now + + logger.Warn().Err(err). + Int("processing_errors", processingErrors). + Dur("processing_time", processingTime). + Msg("Failed to process input message") + + // If too many processing errors, drop frames more aggressively + if processingErrors >= maxProcessingErrors { + logger.Error(). + Int("processing_errors", processingErrors). + Msg("Too many processing errors, entering aggressive drop mode") + + // Clear processing queue to recover + for len(ais.processChan) > 0 { + select { + case <-ais.processChan: + atomic.AddInt64(&ais.droppedFrames, 1) + default: + break + } + } + processingErrors = 0 // Reset after clearing queue + } + continue } - // Send to processing queue - select { - case ais.processChan <- msg: - default: - // Processing queue full, drop frame - atomic.AddInt64(&ais.droppedFrames, 1) + // Reset error counter on successful processing + if processingErrors > 0 { + processingErrors = 0 + logger.Info().Msg("Input processing recovered") } + + // Update processing time metrics + atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds()) } } }() } +// processMessageWithRecovery processes a message with enhanced error recovery +func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, logger zerolog.Logger) error { + // Intelligent frame dropping: prioritize recent frames + if msg.Type == InputMessageTypeOpusFrame { + // Check if processing queue is getting full + queueLen := len(ais.processChan) + bufferSize := int(atomic.LoadInt64(&ais.bufferSize)) + + if queueLen > bufferSize*3/4 { + // Drop oldest frames, keep newest + select { + case <-ais.processChan: // Remove oldest + atomic.AddInt64(&ais.droppedFrames, 1) + logger.Debug().Msg("Dropped oldest frame to make room") + default: + } + } + } + + // Send to processing queue with timeout + select { + case ais.processChan <- msg: + return nil + case <-time.After(GetConfig().WriteTimeout): + // Processing queue full and timeout reached, drop frame + atomic.AddInt64(&ais.droppedFrames, 1) + return fmt.Errorf("processing queue timeout") + default: + // Processing queue full, drop frame immediately + atomic.AddInt64(&ais.droppedFrames, 1) + return fmt.Errorf("processing queue full") + } +} + // startMonitorGoroutine starts the performance monitoring goroutine func (ais *AudioInputServer) startMonitorGoroutine() { ais.wg.Add(1) diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index f3fe476..7c28b8f 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -1,7 +1,6 @@ package audio import ( - "context" "encoding/binary" "fmt" "io" @@ -65,59 +64,8 @@ func (msg *OutputIPCMessage) GetData() []byte { return msg.Data } -// OutputOptimizedMessage represents a pre-allocated message for zero-allocation operations -type OutputOptimizedMessage struct { - header [17]byte // Pre-allocated header buffer (using constant value since array size must be compile-time constant) - data []byte // Reusable data buffer -} - -// OutputMessagePool manages pre-allocated messages for zero-allocation IPC -type OutputMessagePool struct { - pool chan *OutputOptimizedMessage -} - -// NewOutputMessagePool creates a new message pool -func NewOutputMessagePool(size int) *OutputMessagePool { - pool := &OutputMessagePool{ - pool: make(chan *OutputOptimizedMessage, size), - } - - // Pre-allocate messages - for i := 0; i < size; i++ { - msg := &OutputOptimizedMessage{ - data: make([]byte, GetConfig().OutputMaxFrameSize), - } - pool.pool <- msg - } - - return pool -} - -// Get retrieves a message from the pool -func (p *OutputMessagePool) Get() *OutputOptimizedMessage { - select { - case msg := <-p.pool: - return msg - default: - // Pool exhausted, create new message - return &OutputOptimizedMessage{ - data: make([]byte, GetConfig().OutputMaxFrameSize), - } - } -} - -// Put returns a message to the pool -func (p *OutputMessagePool) Put(msg *OutputOptimizedMessage) { - select { - case p.pool <- msg: - // Successfully returned to pool - default: - // Pool full, let GC handle it - } -} - -// Global message pool for output IPC -var globalOutputMessagePool = NewOutputMessagePool(GetConfig().OutputMessagePoolSize) +// Global shared message pool for output IPC client header reading +var globalOutputClientMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize) type AudioOutputServer struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) @@ -341,6 +289,9 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error { } // sendFrameToClient sends frame data directly to the connected client +// Global shared message pool for output IPC server +var globalOutputServerMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize) + func (s *AudioOutputServer) sendFrameToClient(frame []byte) error { s.mtx.Lock() defer s.mtx.Unlock() @@ -351,59 +302,28 @@ func (s *AudioOutputServer) sendFrameToClient(frame []byte) error { start := time.Now() - // Get optimized message from pool - optMsg := globalOutputMessagePool.Get() - defer globalOutputMessagePool.Put(optMsg) - - // Prepare header in pre-allocated buffer - binary.LittleEndian.PutUint32(optMsg.header[0:4], outputMagicNumber) - optMsg.header[4] = byte(OutputMessageTypeOpusFrame) - binary.LittleEndian.PutUint32(optMsg.header[5:9], uint32(len(frame))) - binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(start.UnixNano())) - - // Use non-blocking write with timeout - ctx, cancel := context.WithTimeout(context.Background(), GetConfig().OutputWriteTimeout) - defer cancel() - - // Create a channel to signal write completion - done := make(chan error, 1) - go func() { - // Write header using pre-allocated buffer - _, err := s.conn.Write(optMsg.header[:]) - if err != nil { - done <- err - return - } - - // Write frame data - if len(frame) > 0 { - _, err = s.conn.Write(frame) - if err != nil { - done <- err - return - } - } - done <- nil - }() - - // Wait for completion or timeout - select { - case err := <-done: - if err != nil { - atomic.AddInt64(&s.droppedFrames, 1) - return err - } - // Record latency for monitoring - if s.latencyMonitor != nil { - writeLatency := time.Since(start) - s.latencyMonitor.RecordLatency(writeLatency, "ipc_write") - } - return nil - case <-ctx.Done(): - // Timeout occurred - drop frame to prevent blocking - atomic.AddInt64(&s.droppedFrames, 1) - return fmt.Errorf("write timeout after %v - frame dropped to prevent blocking", GetConfig().OutputWriteTimeout) + // Create output IPC message + msg := &OutputIPCMessage{ + Magic: outputMagicNumber, + Type: OutputMessageTypeOpusFrame, + Length: uint32(len(frame)), + Timestamp: start.UnixNano(), + Data: frame, } + + // Use shared WriteIPCMessage function + err := WriteIPCMessage(s.conn, msg, globalOutputServerMessagePool, &s.droppedFrames) + if err != nil { + return err + } + + // Record latency for monitoring + if s.latencyMonitor != nil { + writeLatency := time.Since(start) + s.latencyMonitor.RecordLatency(writeLatency, "ipc_write") + } + + return nil } // GetServerStats returns server performance statistics @@ -495,8 +415,8 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { } // Get optimized message from pool for header reading - optMsg := globalOutputMessagePool.Get() - defer globalOutputMessagePool.Put(optMsg) + optMsg := globalOutputClientMessagePool.Get() + defer globalOutputClientMessagePool.Put(optMsg) // Read header if _, err := io.ReadFull(c.conn, optMsg.header[:]); err != nil { diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 094f7aa..8625213 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -321,17 +321,61 @@ func StartAudioOutputStreaming(send func([]byte)) error { getOutputStreamingLogger().Info().Str("socket_path", getOutputSocketPath()).Msg("Audio output streaming started, connected to output server") buffer := make([]byte, GetMaxAudioFrameSize()) + consecutiveErrors := 0 + maxConsecutiveErrors := GetConfig().MaxConsecutiveErrors + errorBackoffDelay := GetConfig().RetryDelay + maxErrorBackoff := GetConfig().MaxRetryDelay + for { select { case <-ctx.Done(): return default: - // Capture audio frame + // Capture audio frame with enhanced error handling n, err := CGOAudioReadEncode(buffer) if err != nil { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to read/encode audio") + consecutiveErrors++ + getOutputStreamingLogger().Warn(). + Err(err). + Int("consecutive_errors", consecutiveErrors). + Msg("Failed to read/encode audio") + + // Implement progressive backoff for consecutive errors + if consecutiveErrors >= maxConsecutiveErrors { + getOutputStreamingLogger().Error(). + Int("consecutive_errors", consecutiveErrors). + Msg("Too many consecutive audio errors, attempting recovery") + + // Try to reinitialize audio system + CGOAudioClose() + time.Sleep(errorBackoffDelay) + if initErr := CGOAudioInit(); initErr != nil { + getOutputStreamingLogger().Error(). + Err(initErr). + Msg("Failed to reinitialize audio system") + // Exponential backoff for reinitialization failures + errorBackoffDelay = time.Duration(float64(errorBackoffDelay) * GetConfig().BackoffMultiplier) + if errorBackoffDelay > maxErrorBackoff { + errorBackoffDelay = maxErrorBackoff + } + } else { + getOutputStreamingLogger().Info().Msg("Audio system reinitialized successfully") + consecutiveErrors = 0 + errorBackoffDelay = GetConfig().RetryDelay // Reset backoff + } + } else { + // Brief delay for transient errors + time.Sleep(GetConfig().ShortSleepDuration) + } continue } + + // Success - reset error counters + if consecutiveErrors > 0 { + consecutiveErrors = 0 + errorBackoffDelay = GetConfig().RetryDelay + } + if n > 0 { // Get frame buffer from pool to reduce allocations frame := GetAudioFrameBuffer() diff --git a/internal/audio/validation_enhanced.go b/internal/audio/validation_enhanced.go new file mode 100644 index 0000000..1a04632 --- /dev/null +++ b/internal/audio/validation_enhanced.go @@ -0,0 +1,281 @@ +package audio + +import ( + "errors" + "fmt" + "time" + "unsafe" +) + +// Enhanced validation errors with more specific context +var ( + ErrInvalidFrameLength = errors.New("invalid frame length") + ErrFrameDataCorrupted = errors.New("frame data appears corrupted") + ErrBufferAlignment = errors.New("buffer alignment invalid") + ErrInvalidSampleFormat = errors.New("invalid sample format") + ErrInvalidTimestamp = errors.New("invalid timestamp") + ErrConfigurationMismatch = errors.New("configuration mismatch") + ErrResourceExhaustion = errors.New("resource exhaustion detected") + ErrInvalidPointer = errors.New("invalid pointer") + ErrBufferOverflow = errors.New("buffer overflow detected") + ErrInvalidState = errors.New("invalid state") +) + +// ValidationLevel defines the level of validation to perform +type ValidationLevel int + +const ( + ValidationMinimal ValidationLevel = iota // Only critical safety checks + ValidationStandard // Standard validation for production + ValidationStrict // Comprehensive validation for debugging +) + +// ValidationConfig controls validation behavior +type ValidationConfig struct { + Level ValidationLevel + EnableRangeChecks bool + EnableAlignmentCheck bool + EnableDataIntegrity bool + MaxValidationTime time.Duration +} + +// GetValidationConfig returns the current validation configuration +func GetValidationConfig() ValidationConfig { + config := GetConfig() + return ValidationConfig{ + Level: ValidationStandard, + EnableRangeChecks: true, + EnableAlignmentCheck: true, + EnableDataIntegrity: false, // Disabled by default for performance + MaxValidationTime: time.Duration(config.ValidationTimeoutMS) * time.Millisecond, + } +} + +// ValidateAudioFrameFast performs minimal validation for performance-critical paths +func ValidateAudioFrameFast(data []byte) error { + if len(data) == 0 { + return ErrInvalidFrameData + } + + // Quick bounds check using config constants + maxSize := GetConfig().MaxAudioFrameSize + if len(data) > maxSize { + return fmt.Errorf("%w: frame size %d exceeds maximum %d", ErrInvalidFrameSize, len(data), maxSize) + } + + return nil +} + +// ValidateAudioFrameComprehensive performs thorough validation +func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expectedChannels int) error { + validationConfig := GetValidationConfig() + start := time.Now() + + // Timeout protection for validation + defer func() { + if time.Since(start) > validationConfig.MaxValidationTime { + // Log validation timeout but don't fail + getValidationLogger().Warn().Dur("duration", time.Since(start)).Msg("validation timeout exceeded") + } + }() + + // Basic validation first + if err := ValidateAudioFrameFast(data); err != nil { + return err + } + + // Range validation + if validationConfig.EnableRangeChecks { + config := GetConfig() + if len(data) < config.MinAudioFrameSize { + return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), config.MinAudioFrameSize) + } + + // Validate frame length matches expected sample format + expectedFrameSize := (expectedSampleRate * expectedChannels * 2) / 1000 * int(config.AudioQualityMediumFrameSize/time.Millisecond) + if abs(len(data)-expectedFrameSize) > config.FrameSizeTolerance { + return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, config.FrameSizeTolerance) + } + } + + // Alignment validation for ARM32 compatibility + if validationConfig.EnableAlignmentCheck { + if uintptr(unsafe.Pointer(&data[0]))%4 != 0 { + return fmt.Errorf("%w: buffer not 4-byte aligned for ARM32", ErrBufferAlignment) + } + } + + // Data integrity checks (expensive, only for debugging) + if validationConfig.EnableDataIntegrity && validationConfig.Level == ValidationStrict { + if err := validateAudioDataIntegrity(data, expectedChannels); err != nil { + return err + } + } + + return nil +} + +// ValidateZeroCopyFrameEnhanced performs enhanced zero-copy frame validation +func ValidateZeroCopyFrameEnhanced(frame *ZeroCopyAudioFrame) error { + if frame == nil { + return fmt.Errorf("%w: frame is nil", ErrInvalidPointer) + } + + // Check reference count validity + frame.mutex.RLock() + refCount := frame.refCount + length := frame.length + capacity := frame.capacity + frame.mutex.RUnlock() + + if refCount <= 0 { + return fmt.Errorf("%w: invalid reference count %d", ErrInvalidState, refCount) + } + + if length < 0 || capacity < 0 { + return fmt.Errorf("%w: negative length (%d) or capacity (%d)", ErrInvalidState, length, capacity) + } + + if length > capacity { + return fmt.Errorf("%w: length %d exceeds capacity %d", ErrBufferOverflow, length, capacity) + } + + // Validate the underlying data + data := frame.Data() + return ValidateAudioFrameFast(data) +} + +// ValidateBufferBounds performs bounds checking with overflow protection +func ValidateBufferBounds(buffer []byte, offset, length int) error { + if buffer == nil { + return fmt.Errorf("%w: buffer is nil", ErrInvalidPointer) + } + + if offset < 0 { + return fmt.Errorf("%w: negative offset %d", ErrInvalidState, offset) + } + + if length < 0 { + return fmt.Errorf("%w: negative length %d", ErrInvalidState, length) + } + + // Check for integer overflow + if offset > len(buffer) { + return fmt.Errorf("%w: offset %d exceeds buffer length %d", ErrBufferOverflow, offset, len(buffer)) + } + + // Safe addition check for overflow + if offset+length < offset || offset+length > len(buffer) { + return fmt.Errorf("%w: range [%d:%d] exceeds buffer length %d", ErrBufferOverflow, offset, offset+length, len(buffer)) + } + + return nil +} + +// ValidateAudioConfiguration performs comprehensive configuration validation +func ValidateAudioConfiguration(config AudioConfig) error { + if err := ValidateAudioQuality(config.Quality); err != nil { + return fmt.Errorf("quality validation failed: %w", err) + } + + configConstants := GetConfig() + + // Validate bitrate ranges + if config.Bitrate < configConstants.MinBitrate || config.Bitrate > configConstants.MaxBitrate { + return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, configConstants.MinBitrate, configConstants.MaxBitrate) + } + + // Validate sample rate + validSampleRates := []int{8000, 12000, 16000, 24000, 48000} + validSampleRate := false + for _, rate := range validSampleRates { + if config.SampleRate == rate { + validSampleRate = true + break + } + } + if !validSampleRate { + return fmt.Errorf("%w: sample rate %d not in supported rates %v", ErrInvalidSampleRate, config.SampleRate, validSampleRates) + } + + // Validate channels + if config.Channels < 1 || config.Channels > configConstants.MaxChannels { + return fmt.Errorf("%w: channels %d outside valid range [1, %d]", ErrInvalidChannels, config.Channels, configConstants.MaxChannels) + } + + // Validate frame size + if config.FrameSize < configConstants.MinFrameSize || config.FrameSize > configConstants.MaxFrameSize { + return fmt.Errorf("%w: frame size %v outside valid range [%v, %v]", ErrInvalidConfiguration, config.FrameSize, configConstants.MinFrameSize, configConstants.MaxFrameSize) + } + + return nil +} + +// ValidateResourceLimits checks if system resources are within acceptable limits +func ValidateResourceLimits() error { + config := GetConfig() + + // Check buffer pool sizes + framePoolStats := GetAudioBufferPoolStats() + if framePoolStats.FramePoolSize > int64(config.MaxPoolSize*2) { + return fmt.Errorf("%w: frame pool size %d exceeds safe limit %d", ErrResourceExhaustion, framePoolStats.FramePoolSize, config.MaxPoolSize*2) + } + + // Check zero-copy pool allocation count + zeroCopyStats := GetGlobalZeroCopyPoolStats() + if zeroCopyStats.AllocationCount > int64(config.MaxPoolSize*3) { + return fmt.Errorf("%w: zero-copy allocations %d exceed safe limit %d", ErrResourceExhaustion, zeroCopyStats.AllocationCount, config.MaxPoolSize*3) + } + + return nil +} + +// validateAudioDataIntegrity performs expensive data integrity checks +func validateAudioDataIntegrity(data []byte, channels int) error { + if len(data)%2 != 0 { + return fmt.Errorf("%w: odd number of bytes for 16-bit samples", ErrInvalidSampleFormat) + } + + if len(data)%(channels*2) != 0 { + return fmt.Errorf("%w: data length %d not aligned to channel count %d", ErrInvalidSampleFormat, len(data), channels) + } + + // Check for obvious corruption patterns (all zeros, all max values) + sampleCount := len(data) / 2 + zeroCount := 0 + maxCount := 0 + + for i := 0; i < len(data); i += 2 { + sample := int16(data[i]) | int16(data[i+1])<<8 + if sample == 0 { + zeroCount++ + } else if sample == 32767 || sample == -32768 { + maxCount++ + } + } + + // Flag suspicious patterns + if zeroCount > sampleCount*9/10 { + return fmt.Errorf("%w: %d%% zero samples suggests silence or corruption", ErrFrameDataCorrupted, (zeroCount*100)/sampleCount) + } + + if maxCount > sampleCount/10 { + return fmt.Errorf("%w: %d%% max-value samples suggests clipping or corruption", ErrFrameDataCorrupted, (maxCount*100)/sampleCount) + } + + return nil +} + +// Helper function for absolute value +func abs(x int) int { + if x < 0 { + return -x + } + return x +} + +// getValidationLogger returns a logger for validation operations +func getValidationLogger() *zerolog.Logger { + logger := logging.GetDefaultLogger().With().Str("component", "audio-validation").Logger() + return &logger +} \ No newline at end of file