From dc2db8ed2da82b5b53a0b934598b480464d5ddeb Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 27 Aug 2025 17:47:39 +0000 Subject: [PATCH] feat(audio): add comprehensive input validation and base components refactor(audio): restructure metrics and supervisor components into base implementations feat(audio): add validation for all audio configurations and frames fix(audio): fix atomic alignment in metrics structures refactor(audio): consolidate common functionality into base manager and supervisor feat(audio): add output IPC manager and config validation --- internal/audio/adaptive_buffer.go | 9 +- internal/audio/audio.go | 24 +- .../audio/audio_quality_edge_cases_test.go | 317 +++++++++++++++ internal/audio/base_manager.go | 120 ++++++ internal/audio/base_supervisor.go | 133 +++++++ internal/audio/batch_audio.go | 18 + internal/audio/buffer_pool.go | 10 + internal/audio/cgo_audio.go | 20 +- internal/audio/config_constants.go | 112 +++++- internal/audio/input.go | 82 ++-- internal/audio/input_ipc.go | 50 ++- internal/audio/input_ipc_manager.go | 40 +- internal/audio/input_supervisor.go | 103 ++--- internal/audio/input_test.go | 15 +- internal/audio/ipc.go | 19 +- internal/audio/latency_monitor.go | 7 + internal/audio/output_ipc_manager.go | 211 ++++++++++ internal/audio/output_manager.go | 68 ++-- internal/audio/output_streaming.go | 8 + internal/audio/regression_test.go | 362 ++++++++++++++++++ internal/audio/relay.go | 7 + internal/audio/supervisor.go | 106 +---- internal/audio/validation.go | 103 +++-- internal/audio/validation_enhanced.go | 122 +++++- 24 files changed, 1729 insertions(+), 337 deletions(-) create mode 100644 internal/audio/audio_quality_edge_cases_test.go create mode 100644 internal/audio/base_manager.go create mode 100644 internal/audio/base_supervisor.go create mode 100644 internal/audio/output_ipc_manager.go create mode 100644 internal/audio/regression_test.go diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index 29ee828..176c3d8 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -105,13 +105,20 @@ type AdaptiveBufferManager struct { // NewAdaptiveBufferManager creates a new adaptive buffer manager func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManager { + logger := logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger() + + if err := ValidateAdaptiveBufferConfig(config.MinBufferSize, config.MaxBufferSize, config.DefaultBufferSize); err != nil { + logger.Error().Err(err).Msg("Invalid adaptive buffer config, using defaults") + config = DefaultAdaptiveBufferConfig() + } + ctx, cancel := context.WithCancel(context.Background()) return &AdaptiveBufferManager{ currentInputBufferSize: int64(config.DefaultBufferSize), currentOutputBufferSize: int64(config.DefaultBufferSize), config: config, - logger: logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger(), + logger: logger, processMonitor: GetProcessMonitor(), ctx: ctx, cancel: cancel, diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 8706d3c..cffde69 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -100,6 +100,8 @@ import ( "errors" "sync/atomic" "time" + + "github.com/jetkvm/kvm/internal/logging" ) var ( @@ -190,13 +192,14 @@ var qualityPresets = map[AudioQuality]struct { func GetAudioQualityPresets() map[AudioQuality]AudioConfig { result := make(map[AudioQuality]AudioConfig) for quality, preset := range qualityPresets { - result[quality] = AudioConfig{ + config := AudioConfig{ Quality: quality, Bitrate: preset.outputBitrate, SampleRate: preset.sampleRate, Channels: preset.channels, FrameSize: preset.frameSize, } + result[quality] = config } return result } @@ -205,7 +208,7 @@ func GetAudioQualityPresets() map[AudioQuality]AudioConfig { func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig { result := make(map[AudioQuality]AudioConfig) for quality, preset := range qualityPresets { - result[quality] = AudioConfig{ + config := AudioConfig{ Quality: quality, Bitrate: preset.inputBitrate, SampleRate: func() int { @@ -217,12 +220,21 @@ func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig { Channels: 1, // Microphone is always mono FrameSize: preset.frameSize, } + result[quality] = config } return result } // SetAudioQuality updates the current audio quality configuration func SetAudioQuality(quality AudioQuality) { + // Validate audio quality parameter + if err := ValidateAudioQuality(quality); err != nil { + // Log validation error but don't fail - maintain backward compatibility + logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger() + logger.Error().Err(err).Int("quality", int(quality)).Msg("Invalid audio quality provided, ignoring") + return + } + presets := GetAudioQualityPresets() if config, exists := presets[quality]; exists { currentConfig = config @@ -236,6 +248,14 @@ func GetAudioConfig() AudioConfig { // SetMicrophoneQuality updates the current microphone quality configuration func SetMicrophoneQuality(quality AudioQuality) { + // Validate audio quality parameter + if err := ValidateAudioQuality(quality); err != nil { + // Log validation error but don't fail - maintain backward compatibility + logger := logging.GetDefaultLogger().With().Str("component", "MicrophoneConfig").Logger() + logger.Error().Err(err).Int("quality", int(quality)).Msg("Invalid microphone quality provided, ignoring") + return + } + presets := GetMicrophoneQualityPresets() if config, exists := presets[quality]; exists { currentMicrophoneConfig = config diff --git a/internal/audio/audio_quality_edge_cases_test.go b/internal/audio/audio_quality_edge_cases_test.go new file mode 100644 index 0000000..af0adfa --- /dev/null +++ b/internal/audio/audio_quality_edge_cases_test.go @@ -0,0 +1,317 @@ +//go:build cgo +// +build cgo + +package audio + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestAudioQualityEdgeCases tests edge cases for audio quality functions +// These tests ensure the recent validation removal doesn't introduce regressions +func TestAudioQualityEdgeCases(t *testing.T) { + tests := []struct { + name string + testFunc func(t *testing.T) + }{ + {"AudioQualityBoundaryValues", testAudioQualityBoundaryValues}, + {"MicrophoneQualityBoundaryValues", testMicrophoneQualityBoundaryValues}, + {"AudioQualityPresetsConsistency", testAudioQualityPresetsConsistency}, + {"MicrophoneQualityPresetsConsistency", testMicrophoneQualityPresetsConsistency}, + {"QualitySettingsThreadSafety", testQualitySettingsThreadSafety}, + {"QualityPresetsImmutability", testQualityPresetsImmutability}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.testFunc(t) + }) + } +} + +// testAudioQualityBoundaryValues tests boundary values for audio quality +func testAudioQualityBoundaryValues(t *testing.T) { + // Test minimum valid quality (0) + originalConfig := GetAudioConfig() + SetAudioQuality(AudioQualityLow) + assert.Equal(t, AudioQualityLow, GetAudioConfig().Quality, "Should accept minimum quality value") + + // Test maximum valid quality (3) + SetAudioQuality(AudioQualityUltra) + assert.Equal(t, AudioQualityUltra, GetAudioConfig().Quality, "Should accept maximum quality value") + + // Test that quality settings work correctly + SetAudioQuality(AudioQualityMedium) + currentConfig := GetAudioConfig() + assert.Equal(t, AudioQualityMedium, currentConfig.Quality, "Should set medium quality") + t.Logf("Medium quality config: %+v", currentConfig) + + SetAudioQuality(AudioQualityHigh) + currentConfig = GetAudioConfig() + assert.Equal(t, AudioQualityHigh, currentConfig.Quality, "Should set high quality") + t.Logf("High quality config: %+v", currentConfig) + + // Restore original quality + SetAudioQuality(originalConfig.Quality) +} + +// testMicrophoneQualityBoundaryValues tests boundary values for microphone quality +func testMicrophoneQualityBoundaryValues(t *testing.T) { + // Test minimum valid quality + originalConfig := GetMicrophoneConfig() + SetMicrophoneQuality(AudioQualityLow) + assert.Equal(t, AudioQualityLow, GetMicrophoneConfig().Quality, "Should accept minimum microphone quality value") + + // Test maximum valid quality + SetMicrophoneQuality(AudioQualityUltra) + assert.Equal(t, AudioQualityUltra, GetMicrophoneConfig().Quality, "Should accept maximum microphone quality value") + + // Test that quality settings work correctly + SetMicrophoneQuality(AudioQualityMedium) + currentConfig := GetMicrophoneConfig() + assert.Equal(t, AudioQualityMedium, currentConfig.Quality, "Should set medium microphone quality") + t.Logf("Medium microphone quality config: %+v", currentConfig) + + SetMicrophoneQuality(AudioQualityHigh) + currentConfig = GetMicrophoneConfig() + assert.Equal(t, AudioQualityHigh, currentConfig.Quality, "Should set high microphone quality") + t.Logf("High microphone quality config: %+v", currentConfig) + + // Restore original quality + SetMicrophoneQuality(originalConfig.Quality) +} + +// testAudioQualityPresetsConsistency tests consistency of audio quality presets +func testAudioQualityPresetsConsistency(t *testing.T) { + presets := GetAudioQualityPresets() + require.NotNil(t, presets, "Audio quality presets should not be nil") + require.NotEmpty(t, presets, "Audio quality presets should not be empty") + + // Verify presets have expected structure + for i, preset := range presets { + t.Logf("Audio preset %d: %+v", i, preset) + + // Each preset should have reasonable values + assert.GreaterOrEqual(t, preset.Bitrate, 0, "Bitrate should be non-negative") + assert.Greater(t, preset.SampleRate, 0, "Sample rate should be positive") + assert.Greater(t, preset.Channels, 0, "Channels should be positive") + } + + // Test that presets are accessible by valid quality levels + qualityLevels := []AudioQuality{AudioQualityLow, AudioQualityMedium, AudioQualityHigh, AudioQualityUltra} + for _, quality := range qualityLevels { + preset, exists := presets[quality] + assert.True(t, exists, "Preset should exist for quality %v", quality) + assert.Greater(t, preset.Bitrate, 0, "Preset bitrate should be positive for quality %v", quality) + } +} + +// testMicrophoneQualityPresetsConsistency tests consistency of microphone quality presets +func testMicrophoneQualityPresetsConsistency(t *testing.T) { + presets := GetMicrophoneQualityPresets() + require.NotNil(t, presets, "Microphone quality presets should not be nil") + require.NotEmpty(t, presets, "Microphone quality presets should not be empty") + + // Verify presets have expected structure + for i, preset := range presets { + t.Logf("Microphone preset %d: %+v", i, preset) + + // Each preset should have reasonable values + assert.GreaterOrEqual(t, preset.Bitrate, 0, "Bitrate should be non-negative") + assert.Greater(t, preset.SampleRate, 0, "Sample rate should be positive") + assert.Greater(t, preset.Channels, 0, "Channels should be positive") + } + + // Test that presets are accessible by valid quality levels + qualityLevels := []AudioQuality{AudioQualityLow, AudioQualityMedium, AudioQualityHigh, AudioQualityUltra} + for _, quality := range qualityLevels { + preset, exists := presets[quality] + assert.True(t, exists, "Microphone preset should exist for quality %v", quality) + assert.Greater(t, preset.Bitrate, 0, "Microphone preset bitrate should be positive for quality %v", quality) + } +} + +// testQualitySettingsThreadSafety tests thread safety of quality settings +func testQualitySettingsThreadSafety(t *testing.T) { + if testing.Short() { + t.Skip("Skipping thread safety test in short mode") + } + + originalAudioConfig := GetAudioConfig() + originalMicConfig := GetMicrophoneConfig() + + // Test concurrent access to quality settings + const numGoroutines = 50 + const numOperations = 100 + + done := make(chan bool, numGoroutines*2) + + // Audio quality goroutines + for i := 0; i < numGoroutines; i++ { + go func(id int) { + for j := 0; j < numOperations; j++ { + // Cycle through valid quality values + qualityIndex := j % 4 + var quality AudioQuality + switch qualityIndex { + case 0: + quality = AudioQualityLow + case 1: + quality = AudioQualityMedium + case 2: + quality = AudioQualityHigh + case 3: + quality = AudioQualityUltra + } + SetAudioQuality(quality) + _ = GetAudioConfig() + } + done <- true + }(i) + } + + // Microphone quality goroutines + for i := 0; i < numGoroutines; i++ { + go func(id int) { + for j := 0; j < numOperations; j++ { + // Cycle through valid quality values + qualityIndex := j % 4 + var quality AudioQuality + switch qualityIndex { + case 0: + quality = AudioQualityLow + case 1: + quality = AudioQualityMedium + case 2: + quality = AudioQualityHigh + case 3: + quality = AudioQualityUltra + } + SetMicrophoneQuality(quality) + _ = GetMicrophoneConfig() + } + done <- true + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < numGoroutines*2; i++ { + <-done + } + + // Verify system is still functional + SetAudioQuality(AudioQualityHigh) + assert.Equal(t, AudioQualityHigh, GetAudioConfig().Quality, "Audio quality should be settable after concurrent access") + + SetMicrophoneQuality(AudioQualityMedium) + assert.Equal(t, AudioQualityMedium, GetMicrophoneConfig().Quality, "Microphone quality should be settable after concurrent access") + + // Restore original values + SetAudioQuality(originalAudioConfig.Quality) + SetMicrophoneQuality(originalMicConfig.Quality) +} + +// testQualityPresetsImmutability tests that quality presets are not accidentally modified +func testQualityPresetsImmutability(t *testing.T) { + // Get presets multiple times and verify they're consistent + presets1 := GetAudioQualityPresets() + presets2 := GetAudioQualityPresets() + + require.Equal(t, len(presets1), len(presets2), "Preset count should be consistent") + + // Verify each preset is identical + for quality := range presets1 { + assert.Equal(t, presets1[quality].Bitrate, presets2[quality].Bitrate, + "Preset %v bitrate should be consistent", quality) + assert.Equal(t, presets1[quality].SampleRate, presets2[quality].SampleRate, + "Preset %v sample rate should be consistent", quality) + assert.Equal(t, presets1[quality].Channels, presets2[quality].Channels, + "Preset %v channels should be consistent", quality) + } + + // Test microphone presets as well + micPresets1 := GetMicrophoneQualityPresets() + micPresets2 := GetMicrophoneQualityPresets() + + require.Equal(t, len(micPresets1), len(micPresets2), "Microphone preset count should be consistent") + + for quality := range micPresets1 { + assert.Equal(t, micPresets1[quality].Bitrate, micPresets2[quality].Bitrate, + "Microphone preset %v bitrate should be consistent", quality) + assert.Equal(t, micPresets1[quality].SampleRate, micPresets2[quality].SampleRate, + "Microphone preset %v sample rate should be consistent", quality) + assert.Equal(t, micPresets1[quality].Channels, micPresets2[quality].Channels, + "Microphone preset %v channels should be consistent", quality) + } +} + +// TestQualityValidationRemovalRegression tests that validation removal doesn't cause regressions +func TestQualityValidationRemovalRegression(t *testing.T) { + // This test ensures that removing validation from GET endpoints doesn't break functionality + + // Test that presets are still accessible + audioPresets := GetAudioQualityPresets() + assert.NotNil(t, audioPresets, "Audio presets should be accessible after validation removal") + assert.NotEmpty(t, audioPresets, "Audio presets should not be empty") + + micPresets := GetMicrophoneQualityPresets() + assert.NotNil(t, micPresets, "Microphone presets should be accessible after validation removal") + assert.NotEmpty(t, micPresets, "Microphone presets should not be empty") + + // Test that quality getters still work + audioConfig := GetAudioConfig() + assert.GreaterOrEqual(t, int(audioConfig.Quality), 0, "Audio quality should be non-negative") + + micConfig := GetMicrophoneConfig() + assert.GreaterOrEqual(t, int(micConfig.Quality), 0, "Microphone quality should be non-negative") + + // Test that setters still work (for valid values) + originalAudio := GetAudioConfig() + originalMic := GetMicrophoneConfig() + + SetAudioQuality(AudioQualityMedium) + assert.Equal(t, AudioQualityMedium, GetAudioConfig().Quality, "Audio quality setter should work") + + SetMicrophoneQuality(AudioQualityHigh) + assert.Equal(t, AudioQualityHigh, GetMicrophoneConfig().Quality, "Microphone quality setter should work") + + // Restore original values + SetAudioQuality(originalAudio.Quality) + SetMicrophoneQuality(originalMic.Quality) +} + +// TestPerformanceAfterValidationRemoval tests that performance improved after validation removal +func TestPerformanceAfterValidationRemoval(t *testing.T) { + if testing.Short() { + t.Skip("Skipping performance test in short mode") + } + + // Benchmark preset access (should be faster without validation) + const iterations = 10000 + + // Time audio preset access + start := time.Now() + for i := 0; i < iterations; i++ { + _ = GetAudioQualityPresets() + } + audioDuration := time.Since(start) + + // Time microphone preset access + start = time.Now() + for i := 0; i < iterations; i++ { + _ = GetMicrophoneQualityPresets() + } + micDuration := time.Since(start) + + t.Logf("Audio presets access time for %d iterations: %v", iterations, audioDuration) + t.Logf("Microphone presets access time for %d iterations: %v", iterations, micDuration) + + // Verify reasonable performance (should complete quickly without validation overhead) + maxExpectedDuration := time.Second // Very generous limit + assert.Less(t, audioDuration, maxExpectedDuration, "Audio preset access should be fast") + assert.Less(t, micDuration, maxExpectedDuration, "Microphone preset access should be fast") +} \ No newline at end of file diff --git a/internal/audio/base_manager.go b/internal/audio/base_manager.go new file mode 100644 index 0000000..4420c1e --- /dev/null +++ b/internal/audio/base_manager.go @@ -0,0 +1,120 @@ +package audio + +import ( + "sync/atomic" + "time" + + "github.com/rs/zerolog" +) + +// BaseAudioMetrics provides common metrics fields for both input and output +// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) +type BaseAudioMetrics struct { + // Atomic int64 fields first for proper ARM32 alignment + FramesProcessed int64 `json:"frames_processed"` + FramesDropped int64 `json:"frames_dropped"` + BytesProcessed int64 `json:"bytes_processed"` + ConnectionDrops int64 `json:"connection_drops"` + + // Non-atomic fields after atomic fields + LastFrameTime time.Time `json:"last_frame_time"` + AverageLatency time.Duration `json:"average_latency"` +} + +// BaseAudioManager provides common functionality for audio managers +type BaseAudioManager struct { + metrics BaseAudioMetrics + logger zerolog.Logger + running int32 +} + +// NewBaseAudioManager creates a new base audio manager +func NewBaseAudioManager(logger zerolog.Logger) *BaseAudioManager { + return &BaseAudioManager{ + logger: logger, + } +} + +// IsRunning returns whether the manager is running +func (bam *BaseAudioManager) IsRunning() bool { + return atomic.LoadInt32(&bam.running) == 1 +} + +// setRunning atomically sets the running state +func (bam *BaseAudioManager) setRunning(running bool) bool { + if running { + return atomic.CompareAndSwapInt32(&bam.running, 0, 1) + } + return atomic.CompareAndSwapInt32(&bam.running, 1, 0) +} + +// resetMetrics resets all metrics to zero +func (bam *BaseAudioManager) resetMetrics() { + atomic.StoreInt64(&bam.metrics.FramesProcessed, 0) + atomic.StoreInt64(&bam.metrics.FramesDropped, 0) + atomic.StoreInt64(&bam.metrics.BytesProcessed, 0) + atomic.StoreInt64(&bam.metrics.ConnectionDrops, 0) + bam.metrics.LastFrameTime = time.Time{} + bam.metrics.AverageLatency = 0 +} + +// getBaseMetrics returns a copy of the base metrics +func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics { + return BaseAudioMetrics{ + FramesProcessed: atomic.LoadInt64(&bam.metrics.FramesProcessed), + FramesDropped: atomic.LoadInt64(&bam.metrics.FramesDropped), + BytesProcessed: atomic.LoadInt64(&bam.metrics.BytesProcessed), + ConnectionDrops: atomic.LoadInt64(&bam.metrics.ConnectionDrops), + LastFrameTime: bam.metrics.LastFrameTime, + AverageLatency: bam.metrics.AverageLatency, + } +} + +// recordFrameProcessed records a processed frame +func (bam *BaseAudioManager) recordFrameProcessed(bytes int) { + atomic.AddInt64(&bam.metrics.FramesProcessed, 1) + atomic.AddInt64(&bam.metrics.BytesProcessed, int64(bytes)) + bam.metrics.LastFrameTime = time.Now() +} + +// recordFrameDropped records a dropped frame +func (bam *BaseAudioManager) recordFrameDropped() { + atomic.AddInt64(&bam.metrics.FramesDropped, 1) +} + +// updateLatency updates the average latency +func (bam *BaseAudioManager) updateLatency(latency time.Duration) { + // Simple moving average - could be enhanced with more sophisticated algorithms + currentAvg := bam.metrics.AverageLatency + if currentAvg == 0 { + bam.metrics.AverageLatency = latency + } else { + // Weighted average: 90% old + 10% new + bam.metrics.AverageLatency = time.Duration(float64(currentAvg)*0.9 + float64(latency)*0.1) + } +} + +// logComponentStart logs component start with consistent format +func (bam *BaseAudioManager) logComponentStart(component string) { + bam.logger.Info().Str("component", component).Msg("starting component") +} + +// logComponentStarted logs component started with consistent format +func (bam *BaseAudioManager) logComponentStarted(component string) { + bam.logger.Info().Str("component", component).Msg("component started successfully") +} + +// logComponentStop logs component stop with consistent format +func (bam *BaseAudioManager) logComponentStop(component string) { + bam.logger.Info().Str("component", component).Msg("stopping component") +} + +// logComponentStopped logs component stopped with consistent format +func (bam *BaseAudioManager) logComponentStopped(component string) { + bam.logger.Info().Str("component", component).Msg("component stopped") +} + +// logComponentError logs component error with consistent format +func (bam *BaseAudioManager) logComponentError(component string, err error, msg string) { + bam.logger.Error().Err(err).Str("component", component).Msg(msg) +} diff --git a/internal/audio/base_supervisor.go b/internal/audio/base_supervisor.go new file mode 100644 index 0000000..00dc46a --- /dev/null +++ b/internal/audio/base_supervisor.go @@ -0,0 +1,133 @@ +//go:build cgo +// +build cgo + +package audio + +import ( + "context" + "os/exec" + "sync" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// BaseSupervisor provides common functionality for audio supervisors +type BaseSupervisor struct { + ctx context.Context + cancel context.CancelFunc + logger *zerolog.Logger + mutex sync.RWMutex + running int32 + + // Process management + cmd *exec.Cmd + processPID int + + // Process monitoring + processMonitor *ProcessMonitor + + // Exit tracking + lastExitCode int + lastExitTime time.Time +} + +// NewBaseSupervisor creates a new base supervisor +func NewBaseSupervisor(componentName string) *BaseSupervisor { + logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger() + return &BaseSupervisor{ + logger: &logger, + processMonitor: GetProcessMonitor(), + } +} + +// IsRunning returns whether the supervisor is currently running +func (bs *BaseSupervisor) IsRunning() bool { + return atomic.LoadInt32(&bs.running) == 1 +} + +// setRunning atomically sets the running state +func (bs *BaseSupervisor) setRunning(running bool) { + if running { + atomic.StoreInt32(&bs.running, 1) + } else { + atomic.StoreInt32(&bs.running, 0) + } +} + +// GetProcessPID returns the current process PID +func (bs *BaseSupervisor) GetProcessPID() int { + bs.mutex.RLock() + defer bs.mutex.RUnlock() + return bs.processPID +} + +// GetLastExitInfo returns the last exit code and time +func (bs *BaseSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) { + bs.mutex.RLock() + defer bs.mutex.RUnlock() + return bs.lastExitCode, bs.lastExitTime +} + +// GetProcessMetrics returns process metrics if available +func (bs *BaseSupervisor) GetProcessMetrics() *ProcessMetrics { + bs.mutex.RLock() + defer bs.mutex.RUnlock() + + if bs.cmd == nil || bs.cmd.Process == nil { + return &ProcessMetrics{ + PID: 0, + CPUPercent: 0.0, + MemoryRSS: 0, + MemoryVMS: 0, + MemoryPercent: 0.0, + Timestamp: time.Now(), + ProcessName: "audio-server", + } + } + + pid := bs.cmd.Process.Pid + if bs.processMonitor != nil { + metrics := bs.processMonitor.GetCurrentMetrics() + for _, metric := range metrics { + if metric.PID == pid { + return &metric + } + } + } + + // Return default metrics if process not found in monitor + return &ProcessMetrics{ + PID: pid, + CPUPercent: 0.0, + MemoryRSS: 0, + MemoryVMS: 0, + MemoryPercent: 0.0, + Timestamp: time.Now(), + ProcessName: "audio-server", + } +} + +// logSupervisorStart logs supervisor start event +func (bs *BaseSupervisor) logSupervisorStart() { + bs.logger.Info().Msg("Supervisor starting") +} + +// logSupervisorStop logs supervisor stop event +func (bs *BaseSupervisor) logSupervisorStop() { + bs.logger.Info().Msg("Supervisor stopping") +} + +// createContext creates a new context for the supervisor +func (bs *BaseSupervisor) createContext() { + bs.ctx, bs.cancel = context.WithCancel(context.Background()) +} + +// cancelContext cancels the supervisor context +func (bs *BaseSupervisor) cancelContext() { + if bs.cancel != nil { + bs.cancel() + } +} diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index 83d27ef..b59a79c 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -60,6 +60,18 @@ type batchReadResult struct { // NewBatchAudioProcessor creates a new batch audio processor func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { + // Validate input parameters + if err := ValidateBufferSize(batchSize); err != nil { + logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() + logger.Error().Err(err).Int("batchSize", batchSize).Msg("Invalid batch size provided, using default") + batchSize = GetConfig().BatchProcessorFramesPerBatch + } + if batchDuration <= 0 { + logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() + logger.Error().Dur("batchDuration", batchDuration).Msg("Invalid batch duration provided, using default") + batchDuration = GetConfig().BatchProcessingDelay + } + ctx, cancel := context.WithCancel(context.Background()) logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger() @@ -117,6 +129,12 @@ func (bap *BatchAudioProcessor) Stop() { // BatchReadEncode performs batched audio read and encode operations func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { + // Validate buffer before processing + if err := ValidateBufferSize(len(buffer)); err != nil { + bap.logger.Debug().Err(err).Msg("Invalid buffer for batch processing") + return 0, err + } + if atomic.LoadInt32(&bap.running) == 0 { // Fallback to single operation if batch processor is not running atomic.AddInt64(&bap.stats.SingleReads, 1) diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index 709659f..7460f48 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -4,6 +4,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/jetkvm/kvm/internal/logging" ) type AudioBufferPool struct { @@ -23,6 +25,14 @@ type AudioBufferPool struct { } func NewAudioBufferPool(bufferSize int) *AudioBufferPool { + // Validate buffer size parameter + if err := ValidateBufferSize(bufferSize); err != nil { + // Log validation error and use default value + logger := logging.GetDefaultLogger().With().Str("component", "AudioBufferPool").Logger() + logger.Error().Err(err).Int("bufferSize", bufferSize).Msg("Invalid buffer size provided, using default") + bufferSize = GetConfig().AudioFramePoolSize + } + // Pre-allocate 20% of max pool size for immediate availability preallocSize := GetConfig().PreallocPercentage preallocated := make([]*[]byte, 0, preallocSize) diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index a5d4ebf..040da71 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -36,11 +36,13 @@ static int channels = 2; // Will be set from GetConfig().CGOChann static int frame_size = 960; // Will be set from GetConfig().CGOFrameSize static int max_packet_size = 1500; // Will be set from GetConfig().CGOMaxPacketSize static int sleep_microseconds = 1000; // Will be set from GetConfig().CGOUsleepMicroseconds +static int max_attempts_global = 5; // Will be set from GetConfig().CGOMaxAttempts +static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMaxBackoffMicroseconds // Function to update constants from Go configuration void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint, int signal_type, int bandwidth, int dtx, int sr, int ch, - int fs, int max_pkt, int sleep_us) { + int fs, int max_pkt, int sleep_us, int max_attempts, int max_backoff) { opus_bitrate = bitrate; opus_complexity = complexity; opus_vbr = vbr; @@ -53,6 +55,8 @@ void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constr frame_size = fs; max_packet_size = max_pkt; sleep_microseconds = sleep_us; + max_attempts_global = max_attempts; + max_backoff_us_global = max_backoff; } // State tracking to prevent race conditions during rapid start/stop @@ -63,13 +67,11 @@ static volatile int playback_initialized = 0; // Enhanced ALSA device opening with exponential backoff retry logic static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) { - int max_attempts = 5; // Increased from 3 to 5 int attempt = 0; int err; int backoff_us = sleep_microseconds; // Start with base sleep time - const int max_backoff_us = 500000; // Max 500ms backoff - while (attempt < max_attempts) { + while (attempt < max_attempts_global) { err = snd_pcm_open(handle, device, stream, SND_PCM_NONBLOCK); if (err >= 0) { // Switch to blocking mode after successful open @@ -78,24 +80,24 @@ static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream } attempt++; - if (attempt >= max_attempts) break; + if (attempt >= max_attempts_global) break; // Enhanced error handling with specific retry strategies if (err == -EBUSY || err == -EAGAIN) { // Device busy or temporarily unavailable - retry with backoff usleep(backoff_us); - backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us; + backoff_us = (backoff_us * 2 < max_backoff_us_global) ? backoff_us * 2 : max_backoff_us_global; } else if (err == -ENODEV || err == -ENOENT) { // Device not found - longer wait as device might be initializing usleep(backoff_us * 2); - backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us; + backoff_us = (backoff_us * 2 < max_backoff_us_global) ? backoff_us * 2 : max_backoff_us_global; } else if (err == -EPERM || err == -EACCES) { // Permission denied - shorter wait, likely persistent issue usleep(backoff_us / 2); } else { // Other errors - standard backoff usleep(backoff_us); - backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us; + backoff_us = (backoff_us * 2 < max_backoff_us_global) ? backoff_us * 2 : max_backoff_us_global; } } return err; @@ -649,6 +651,8 @@ func cgoAudioInit() error { C.int(config.CGOFrameSize), C.int(config.CGOMaxPacketSize), C.int(config.CGOUsleepMicroseconds), + C.int(config.CGOMaxAttempts), + C.int(config.CGOMaxBackoffMicroseconds), ) result := C.jetkvm_audio_init() diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index 1369541..77a6206 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -1,6 +1,10 @@ package audio -import "time" +import ( + "time" + + "github.com/jetkvm/kvm/internal/logging" +) // AudioConfigConstants centralizes all hardcoded values used across audio components. // This configuration system allows runtime tuning of audio performance, quality, and resource usage. @@ -1541,6 +1545,82 @@ type AudioConfigConstants struct { // Default 8 channels provides reasonable upper bound for multi-channel audio. MaxChannels int + // CGO Constants + // Used in: cgo_audio.go for CGO operation limits and retry logic + // Impact: Controls CGO retry behavior and backoff timing + + // CGOMaxBackoffMicroseconds defines maximum backoff time in microseconds for CGO operations. + // Used in: safe_alsa_open for exponential backoff retry logic + // Impact: Prevents excessive wait times while allowing device recovery. + // Default 500000 microseconds (500ms) provides reasonable maximum wait time. + CGOMaxBackoffMicroseconds int + + // CGOMaxAttempts defines maximum retry attempts for CGO operations. + // Used in: safe_alsa_open for retry limit enforcement + // Impact: Prevents infinite retry loops while allowing transient error recovery. + // Default 5 attempts provides good balance between reliability and performance. + CGOMaxAttempts int + + // Validation Frame Size Limits + // Used in: validation_enhanced.go for frame duration validation + // Impact: Ensures frame sizes are within acceptable bounds for real-time audio + + // MinFrameDuration defines minimum acceptable frame duration. + // Used in: ValidateAudioConfiguration for frame size validation + // Impact: Prevents excessively small frames that could impact performance. + // Default 10ms provides minimum viable frame duration for real-time audio. + MinFrameDuration time.Duration + + // MaxFrameDuration defines maximum acceptable frame duration. + // Used in: ValidateAudioConfiguration for frame size validation + // Impact: Prevents excessively large frames that could impact latency. + // Default 100ms provides reasonable maximum frame duration. + MaxFrameDuration time.Duration + + // Valid Sample Rates + // Used in: validation_enhanced.go for sample rate validation + // Impact: Defines the set of supported sample rates for audio processing + + // ValidSampleRates defines the list of supported sample rates. + // Used in: ValidateAudioConfiguration for sample rate validation + // Impact: Ensures only supported sample rates are used in audio processing. + // Default rates support common audio standards from voice (8kHz) to professional (48kHz). + ValidSampleRates []int + + // Opus Bitrate Validation Constants + // Used in: validation_enhanced.go for bitrate range validation + // Impact: Ensures bitrate values are within Opus codec specifications + + // MinOpusBitrate defines the minimum valid Opus bitrate in bits per second. + // Used in: ValidateAudioConfiguration for bitrate validation + // Impact: Prevents bitrates below Opus codec minimum specification. + // Default 6000 bps is the minimum supported by Opus codec. + MinOpusBitrate int + + // MaxOpusBitrate defines the maximum valid Opus bitrate in bits per second. + // Used in: ValidateAudioConfiguration for bitrate validation + // Impact: Prevents bitrates above Opus codec maximum specification. + // Default 510000 bps is the maximum supported by Opus codec. + MaxOpusBitrate int + + // MaxValidationTime defines the maximum time allowed for validation operations. + // Used in: GetValidationConfig for timeout control + // Impact: Prevents validation operations from blocking indefinitely. + // Default 5s provides reasonable timeout for validation operations. + MaxValidationTime time.Duration + + // MinFrameSize defines the minimum reasonable audio frame size in bytes. + // Used in: ValidateAudioFrameComprehensive for frame size validation + // Impact: Prevents processing of unreasonably small audio frames. + // Default 64 bytes ensures minimum viable audio data. + MinFrameSize int + + // FrameSizeTolerance defines the tolerance for frame size validation in bytes. + // Used in: ValidateAudioFrameComprehensive for frame size matching + // Impact: Allows reasonable variation in frame sizes due to encoding. + // Default 512 bytes accommodates typical encoding variations. + FrameSizeTolerance int + // Device Health Monitoring Configuration // Used in: device_health.go for proactive device monitoring and recovery // Impact: Controls health check frequency and recovery thresholds @@ -2607,6 +2687,26 @@ func DefaultAudioConfig() *AudioConfigConstants { MaxSampleRate: 48000, // 48kHz maximum sample rate MaxChannels: 8, // 8 maximum audio channels + // CGO Constants + CGOMaxBackoffMicroseconds: 500000, // 500ms maximum backoff in microseconds + CGOMaxAttempts: 5, // 5 maximum retry attempts + + // Validation Frame Size Limits + MinFrameDuration: 10 * time.Millisecond, // 10ms minimum frame duration + MaxFrameDuration: 100 * time.Millisecond, // 100ms maximum frame duration + + // Valid Sample Rates + ValidSampleRates: []int{8000, 12000, 16000, 22050, 24000, 44100, 48000}, // Supported sample rates + + // Opus Bitrate Validation Constants + MinOpusBitrate: 6000, // 6000 bps minimum Opus bitrate + MaxOpusBitrate: 510000, // 510000 bps maximum Opus bitrate + + // Validation Configuration + MaxValidationTime: 5 * time.Second, // 5s maximum validation timeout + MinFrameSize: 64, // 64 bytes minimum frame size + FrameSizeTolerance: 512, // 512 bytes frame size tolerance + // Device Health Monitoring Configuration HealthCheckIntervalMS: 5000, // 5000ms (5s) health check interval HealthRecoveryThreshold: 3, // 3 consecutive successes for recovery @@ -2630,7 +2730,17 @@ var audioConfigInstance = DefaultAudioConfig() // UpdateConfig allows runtime configuration updates func UpdateConfig(newConfig *AudioConfigConstants) { + // Validate the new configuration before applying it + if err := ValidateAudioConfigConstants(newConfig); err != nil { + // Log validation error and keep current configuration + logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger() + logger.Error().Err(err).Msg("Configuration validation failed, keeping current configuration") + return + } + audioConfigInstance = newConfig + logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger() + logger.Info().Msg("Audio configuration updated successfully") } // GetConfig returns the current configuration diff --git a/internal/audio/input.go b/internal/audio/input.go index 39f90cd..f9cca8a 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -6,79 +6,75 @@ import ( "time" "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" ) // AudioInputMetrics holds metrics for microphone input +// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) type AudioInputMetrics struct { - FramesSent int64 // Total frames sent - FramesDropped int64 // Total frames dropped - BytesProcessed int64 // Total bytes processed - ConnectionDrops int64 // Connection drops - AverageLatency time.Duration // time.Duration is int64 - LastFrameTime time.Time + // Atomic int64 field first for proper ARM32 alignment + FramesSent int64 `json:"frames_sent"` // Total frames sent (input-specific) + + // Embedded struct with atomic fields properly aligned + BaseAudioMetrics } // AudioInputManager manages microphone input stream using IPC mode only type AudioInputManager struct { - metrics AudioInputMetrics - + *BaseAudioManager ipcManager *AudioInputIPCManager - logger zerolog.Logger - running int32 + framesSent int64 // Input-specific metric } -// NewAudioInputManager creates a new audio input manager (IPC mode only) +// NewAudioInputManager creates a new audio input manager func NewAudioInputManager() *AudioInputManager { + logger := logging.GetDefaultLogger().With().Str("component", AudioInputManagerComponent).Logger() return &AudioInputManager{ - ipcManager: NewAudioInputIPCManager(), - logger: logging.GetDefaultLogger().With().Str("component", AudioInputManagerComponent).Logger(), + BaseAudioManager: NewBaseAudioManager(logger), + ipcManager: NewAudioInputIPCManager(), } } // Start begins processing microphone input func (aim *AudioInputManager) Start() error { - if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) { + if !aim.setRunning(true) { return fmt.Errorf("audio input manager is already running") } - aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("starting component") + aim.logComponentStart(AudioInputManagerComponent) // Start the IPC-based audio input err := aim.ipcManager.Start() if err != nil { - aim.logger.Error().Err(err).Str("component", AudioInputManagerComponent).Msg("failed to start component") + aim.logComponentError(AudioInputManagerComponent, err, "failed to start component") // Ensure proper cleanup on error - atomic.StoreInt32(&aim.running, 0) + aim.setRunning(false) // Reset metrics on failed start aim.resetMetrics() return err } - aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("component started successfully") + aim.logComponentStarted(AudioInputManagerComponent) return nil } // Stop stops processing microphone input func (aim *AudioInputManager) Stop() { - if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) { + if !aim.setRunning(false) { return // Already stopped } - aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("stopping component") + aim.logComponentStop(AudioInputManagerComponent) // Stop the IPC-based audio input aim.ipcManager.Stop() - aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("component stopped") + aim.logComponentStopped(AudioInputManagerComponent) } // resetMetrics resets all metrics to zero func (aim *AudioInputManager) resetMetrics() { - atomic.StoreInt64(&aim.metrics.FramesSent, 0) - atomic.StoreInt64(&aim.metrics.FramesDropped, 0) - atomic.StoreInt64(&aim.metrics.BytesProcessed, 0) - atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0) + aim.BaseAudioManager.resetMetrics() + atomic.StoreInt64(&aim.framesSent, 0) } // WriteOpusFrame writes an Opus frame to the audio input system with latency tracking @@ -87,6 +83,12 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { return nil // Not running, silently drop } + // Validate frame before processing + if err := ValidateFrameData(frame); err != nil { + aim.logComponentError(AudioInputManagerComponent, err, "Frame validation failed") + return fmt.Errorf("input frame validation failed: %w", err) + } + // Track end-to-end latency from WebRTC to IPC startTime := time.Now() err := aim.ipcManager.WriteOpusFrame(frame) @@ -105,10 +107,9 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { } // Update metrics - atomic.AddInt64(&aim.metrics.FramesSent, 1) - atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame))) - aim.metrics.LastFrameTime = time.Now() - aim.metrics.AverageLatency = processingTime + atomic.AddInt64(&aim.framesSent, 1) + aim.recordFrameProcessed(len(frame)) + aim.updateLatency(processingTime) return nil } @@ -141,21 +142,17 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) } // Update metrics - atomic.AddInt64(&aim.metrics.FramesSent, 1) - atomic.AddInt64(&aim.metrics.BytesProcessed, int64(frame.Length())) - aim.metrics.LastFrameTime = time.Now() - aim.metrics.AverageLatency = processingTime + atomic.AddInt64(&aim.framesSent, 1) + aim.recordFrameProcessed(frame.Length()) + aim.updateLatency(processingTime) return nil } -// GetMetrics returns current audio input metrics +// GetMetrics returns current metrics func (aim *AudioInputManager) GetMetrics() AudioInputMetrics { return AudioInputMetrics{ - FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), - FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), - BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), - AverageLatency: aim.metrics.AverageLatency, - LastFrameTime: aim.metrics.LastFrameTime, + FramesSent: atomic.LoadInt64(&aim.framesSent), + BaseAudioMetrics: aim.getBaseMetrics(), } } @@ -209,10 +206,7 @@ func (aim *AudioInputManager) LogPerformanceStats() { Msg("Audio input performance metrics") } -// IsRunning returns whether the audio input manager is running -func (aim *AudioInputManager) IsRunning() bool { - return atomic.LoadInt32(&aim.running) == 1 -} +// Note: IsRunning() is inherited from BaseAudioManager // IsReady returns whether the audio input manager is ready to receive frames // This checks both that it's running and that the IPC connection is established diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 6dbd20d..f61090f 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -301,7 +301,7 @@ func (ais *AudioInputServer) acceptConnections() { if err != nil { if ais.running { // Log error and continue accepting - logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger() + logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() logger.Warn().Err(err).Msg("Failed to accept connection, retrying") continue } @@ -311,7 +311,7 @@ func (ais *AudioInputServer) acceptConnections() { // Configure socket buffers for optimal performance if err := ConfigureSocketBuffers(conn, ais.socketBufferConfig); err != nil { // Log warning but don't fail - socket buffer optimization is not critical - logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger() + logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() logger.Warn().Err(err).Msg("Failed to configure socket buffers, continuing with defaults") } else { // Record socket buffer metrics for monitoring @@ -458,6 +458,13 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error { return nil // Empty frame, ignore } + // Validate frame data before processing + if err := ValidateFrameData(data); err != nil { + logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() + logger.Error().Err(err).Msg("Frame validation failed") + return fmt.Errorf("input frame validation failed: %w", err) + } + // Process the Opus frame using CGO _, err := CGOAudioDecodeWrite(data) return err @@ -465,6 +472,18 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error { // processConfig processes a configuration update func (ais *AudioInputServer) processConfig(data []byte) error { + // Validate configuration data + if len(data) == 0 { + return fmt.Errorf("empty configuration data") + } + + // Basic validation for configuration size + if err := ValidateBufferSize(len(data)); err != nil { + logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() + logger.Error().Err(err).Msg("Configuration buffer validation failed") + return fmt.Errorf("configuration validation failed: %w", err) + } + // Acknowledge configuration receipt return ais.sendAck() } @@ -596,6 +615,13 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error { return nil // Empty frame, ignore } + // Validate frame data before sending + if err := ValidateFrameData(frame); err != nil { + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + logger.Error().Err(err).Msg("Frame validation failed") + return fmt.Errorf("input frame validation failed: %w", err) + } + if len(frame) > maxFrameSize { return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize) } @@ -624,6 +650,13 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error return nil // Empty frame, ignore } + // Validate zero-copy frame before sending + if err := ValidateZeroCopyFrame(frame); err != nil { + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + logger.Error().Err(err).Msg("Zero-copy frame validation failed") + return fmt.Errorf("input frame validation failed: %w", err) + } + if frame.Length() > maxFrameSize { return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", frame.Length(), maxFrameSize) } @@ -649,6 +682,13 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error { return fmt.Errorf("not connected to audio input server") } + // Validate configuration parameters + if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil { + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() + logger.Error().Err(err).Msg("Configuration validation failed") + return fmt.Errorf("input configuration validation failed: %w", err) + } + // Serialize config (simple binary format) data := make([]byte, 12) // 3 * int32 binary.LittleEndian.PutUint32(data[0:4], uint32(config.SampleRate)) @@ -735,7 +775,7 @@ func (ais *AudioInputServer) startReaderGoroutine() { baseBackoffDelay := GetConfig().RetryDelay maxBackoffDelay := GetConfig().MaxRetryDelay - logger := logging.GetDefaultLogger().With().Str("component", "audio-input-reader").Logger() + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() for { select { @@ -820,7 +860,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() { defer runtime.UnlockOSThread() // Set high priority for audio processing - logger := logging.GetDefaultLogger().With().Str("component", "audio-input-processor").Logger() + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() if err := SetAudioThreadPriority(); err != nil { logger.Warn().Err(err).Msg("Failed to set audio processing priority") } @@ -937,7 +977,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { defer runtime.UnlockOSThread() // Set I/O priority for monitoring - logger := logging.GetDefaultLogger().With().Str("component", "audio-input-monitor").Logger() + logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() if err := SetAudioIOThreadPriority(); err != nil { logger.Warn().Err(err).Msg("Failed to set audio I/O priority") } diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go index c99b46b..d024dca 100644 --- a/internal/audio/input_ipc_manager.go +++ b/internal/audio/input_ipc_manager.go @@ -49,6 +49,17 @@ func (aim *AudioInputIPCManager) Start() error { FrameSize: GetConfig().InputIPCFrameSize, } + // Validate configuration before using it + if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil { + aim.logger.Error().Err(err).Msg("Invalid input IPC config from constants, using defaults") + // Use safe defaults if config validation fails + config = InputIPCConfig{ + SampleRate: 48000, + Channels: 2, + FrameSize: 960, + } + } + // Wait for subprocess readiness time.Sleep(GetConfig().LongSleepDuration) @@ -91,6 +102,13 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { return nil // Empty frame, ignore } + // Validate frame data + if err := ValidateFrameData(frame); err != nil { + atomic.AddInt64(&aim.metrics.FramesDropped, 1) + aim.logger.Debug().Err(err).Msg("Invalid frame data") + return err + } + // Start latency measurement startTime := time.Now() @@ -125,6 +143,13 @@ func (aim *AudioInputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFram return nil // Empty frame, ignore } + // Validate zero-copy frame + if err := ValidateZeroCopyFrame(frame); err != nil { + atomic.AddInt64(&aim.metrics.FramesDropped, 1) + aim.logger.Debug().Err(err).Msg("Invalid zero-copy frame") + return err + } + // Start latency measurement startTime := time.Now() @@ -166,12 +191,15 @@ func (aim *AudioInputIPCManager) IsReady() bool { // GetMetrics returns current metrics func (aim *AudioInputIPCManager) GetMetrics() AudioInputMetrics { return AudioInputMetrics{ - FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), - FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), - BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), - ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops), - AverageLatency: aim.metrics.AverageLatency, - LastFrameTime: aim.metrics.LastFrameTime, + FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), + BaseAudioMetrics: BaseAudioMetrics{ + FramesProcessed: atomic.LoadInt64(&aim.metrics.FramesProcessed), + FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), + BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), + ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops), + AverageLatency: aim.metrics.AverageLatency, + LastFrameTime: aim.metrics.LastFrameTime, + }, } } diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index a8686cb..50e3eb6 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -1,50 +1,38 @@ package audio import ( - "context" "fmt" "os" "os/exec" - "sync" "syscall" "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" ) // AudioInputSupervisor manages the audio input server subprocess type AudioInputSupervisor struct { - cmd *exec.Cmd - cancel context.CancelFunc - mtx sync.Mutex - running bool - logger zerolog.Logger - client *AudioInputClient - processMonitor *ProcessMonitor + *BaseSupervisor + client *AudioInputClient } // NewAudioInputSupervisor creates a new audio input supervisor func NewAudioInputSupervisor() *AudioInputSupervisor { return &AudioInputSupervisor{ - logger: logging.GetDefaultLogger().With().Str("component", "audio-input-supervisor").Logger(), + BaseSupervisor: NewBaseSupervisor("audio-input-supervisor"), client: NewAudioInputClient(), - processMonitor: GetProcessMonitor(), } } // Start starts the audio input server subprocess func (ais *AudioInputSupervisor) Start() error { - ais.mtx.Lock() - defer ais.mtx.Unlock() + ais.mutex.Lock() + defer ais.mutex.Unlock() - if ais.running { + if ais.IsRunning() { return fmt.Errorf("audio input supervisor already running with PID %d", ais.cmd.Process.Pid) } // Create context for subprocess management - ctx, cancel := context.WithCancel(context.Background()) - ais.cancel = cancel + ais.createContext() // Get current executable path execPath, err := os.Executable() @@ -53,7 +41,7 @@ func (ais *AudioInputSupervisor) Start() error { } // Create command for audio input server subprocess - cmd := exec.CommandContext(ctx, execPath, "--audio-input-server") + cmd := exec.CommandContext(ais.ctx, execPath, "--audio-input-server") cmd.Env = append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true", // Enable IPC mode ) @@ -64,13 +52,13 @@ func (ais *AudioInputSupervisor) Start() error { } ais.cmd = cmd - ais.running = true + ais.setRunning(true) // Start the subprocess err = cmd.Start() if err != nil { - ais.running = false - cancel() + ais.setRunning(false) + ais.cancelContext() return fmt.Errorf("failed to start audio input server process: %w", err) } @@ -90,14 +78,14 @@ func (ais *AudioInputSupervisor) Start() error { // Stop stops the audio input server subprocess func (ais *AudioInputSupervisor) Stop() { - ais.mtx.Lock() - defer ais.mtx.Unlock() + ais.mutex.Lock() + defer ais.mutex.Unlock() - if !ais.running { + if !ais.IsRunning() { return } - ais.running = false + ais.logSupervisorStop() // Disconnect client first if ais.client != nil { @@ -105,9 +93,7 @@ func (ais *AudioInputSupervisor) Stop() { } // Cancel context to signal subprocess to stop - if ais.cancel != nil { - ais.cancel() - } + ais.cancelContext() // Try graceful termination first if ais.cmd != nil && ais.cmd.Process != nil { @@ -138,19 +124,14 @@ func (ais *AudioInputSupervisor) Stop() { } } + ais.setRunning(false) ais.cmd = nil - ais.cancel = nil -} - -// IsRunning returns whether the supervisor is running -func (ais *AudioInputSupervisor) IsRunning() bool { - ais.mtx.Lock() - defer ais.mtx.Unlock() - return ais.running } // IsConnected returns whether the client is connected to the audio input server func (ais *AudioInputSupervisor) IsConnected() bool { + ais.mutex.Lock() + defer ais.mutex.Unlock() if !ais.IsRunning() { return false } @@ -162,41 +143,11 @@ func (ais *AudioInputSupervisor) GetClient() *AudioInputClient { return ais.client } -// GetProcessMetrics returns current process metrics if the process is running +// GetProcessMetrics returns current process metrics with audio-input-server name func (ais *AudioInputSupervisor) GetProcessMetrics() *ProcessMetrics { - ais.mtx.Lock() - defer ais.mtx.Unlock() - - if ais.cmd == nil || ais.cmd.Process == nil { - // Return default metrics when no process is running - return &ProcessMetrics{ - PID: 0, - CPUPercent: 0.0, - MemoryRSS: 0, - MemoryVMS: 0, - MemoryPercent: 0.0, - Timestamp: time.Now(), - ProcessName: "audio-input-server", - } - } - - pid := ais.cmd.Process.Pid - metrics := ais.processMonitor.GetCurrentMetrics() - for _, metric := range metrics { - if metric.PID == pid { - return &metric - } - } - // Return default metrics if process not found in monitoring - return &ProcessMetrics{ - PID: pid, - CPUPercent: 0.0, - MemoryRSS: 0, - MemoryVMS: 0, - MemoryPercent: 0.0, - Timestamp: time.Now(), - ProcessName: "audio-input-server", - } + metrics := ais.BaseSupervisor.GetProcessMetrics() + metrics.ProcessName = "audio-input-server" + return metrics } // monitorSubprocess monitors the subprocess and handles unexpected exits @@ -211,10 +162,10 @@ func (ais *AudioInputSupervisor) monitorSubprocess() { // Remove process from monitoring ais.processMonitor.RemoveProcess(pid) - ais.mtx.Lock() - defer ais.mtx.Unlock() + ais.mutex.Lock() + defer ais.mutex.Unlock() - if ais.running { + if ais.IsRunning() { // Unexpected exit if err != nil { ais.logger.Error().Err(err).Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly") @@ -228,7 +179,7 @@ func (ais *AudioInputSupervisor) monitorSubprocess() { } // Mark as not running - ais.running = false + ais.setRunning(false) ais.cmd = nil ais.logger.Info().Int("pid", pid).Msg("Audio input server subprocess monitoring stopped") diff --git a/internal/audio/input_test.go b/internal/audio/input_test.go index ec5e605..3f70689 100644 --- a/internal/audio/input_test.go +++ b/internal/audio/input_test.go @@ -181,12 +181,15 @@ func TestAudioInputManagerMultipleStartStop(t *testing.T) { func TestAudioInputMetrics(t *testing.T) { metrics := &AudioInputMetrics{ - FramesSent: 100, - FramesDropped: 5, - BytesProcessed: 1024, - ConnectionDrops: 2, - AverageLatency: time.Millisecond * 10, - LastFrameTime: time.Now(), + BaseAudioMetrics: BaseAudioMetrics{ + FramesProcessed: 100, + FramesDropped: 5, + BytesProcessed: 1024, + ConnectionDrops: 2, + AverageLatency: time.Millisecond * 10, + LastFrameTime: time.Now(), + }, + FramesSent: 100, } assert.Equal(t, int64(100), metrics.FramesSent) diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index bee7999..fd92164 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -23,6 +23,13 @@ var ( // Output IPC constants are now centralized in config_constants.go // outputMaxFrameSize, outputWriteTimeout, outputMaxDroppedFrames, outputHeaderSize, outputMessagePoolSize +// OutputIPCConfig represents configuration for audio output +type OutputIPCConfig struct { + SampleRate int + Channels int + FrameSize int +} + // OutputMessageType represents the type of IPC message type OutputMessageType uint8 @@ -106,7 +113,7 @@ func NewAudioOutputServer() (*AudioOutputServer, error) { // Initialize latency monitoring latencyConfig := DefaultLatencyConfig() - logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", "audio-server").Logger() + logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", AudioOutputServerComponent).Logger() latencyMonitor := NewLatencyMonitor(latencyConfig, logger) // Initialize adaptive buffer manager with default config @@ -160,7 +167,7 @@ func (s *AudioOutputServer) Start() error { // acceptConnections accepts incoming connections func (s *AudioOutputServer) acceptConnections() { - logger := logging.GetDefaultLogger().With().Str("component", "audio-server").Logger() + logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() for s.running { conn, err := s.listener.Accept() if err != nil { @@ -253,6 +260,14 @@ func (s *AudioOutputServer) Close() error { } func (s *AudioOutputServer) SendFrame(frame []byte) error { + // Comprehensive frame validation + if err := ValidateFrameData(frame); err != nil { + logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger() + logger.Error().Err(err).Msg("Frame validation failed") + return fmt.Errorf("output frame validation failed: %w", err) + } + + // Additional output-specific size check maxFrameSize := GetConfig().OutputMaxFrameSize if len(frame) > maxFrameSize { return fmt.Errorf("output frame size validation failed: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize) diff --git a/internal/audio/latency_monitor.go b/internal/audio/latency_monitor.go index f344488..a1c0355 100644 --- a/internal/audio/latency_monitor.go +++ b/internal/audio/latency_monitor.go @@ -94,6 +94,13 @@ func DefaultLatencyConfig() LatencyConfig { // NewLatencyMonitor creates a new latency monitoring system func NewLatencyMonitor(config LatencyConfig, logger zerolog.Logger) *LatencyMonitor { + // Validate latency configuration + if err := ValidateLatencyConfig(config); err != nil { + // Log validation error and use default configuration + logger.Error().Err(err).Msg("Invalid latency configuration provided, using defaults") + config = DefaultLatencyConfig() + } + ctx, cancel := context.WithCancel(context.Background()) return &LatencyMonitor{ diff --git a/internal/audio/output_ipc_manager.go b/internal/audio/output_ipc_manager.go new file mode 100644 index 0000000..2c64c91 --- /dev/null +++ b/internal/audio/output_ipc_manager.go @@ -0,0 +1,211 @@ +package audio + +import ( + "fmt" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" +) + +// AudioOutputIPCManager manages audio output using IPC when enabled +type AudioOutputIPCManager struct { + *BaseAudioManager + server *AudioOutputServer +} + +// NewAudioOutputIPCManager creates a new IPC-based audio output manager +func NewAudioOutputIPCManager() *AudioOutputIPCManager { + return &AudioOutputIPCManager{ + BaseAudioManager: NewBaseAudioManager(logging.GetDefaultLogger().With().Str("component", AudioOutputIPCComponent).Logger()), + } +} + +// Start initializes and starts the audio output IPC manager +func (aom *AudioOutputIPCManager) Start() error { + aom.logComponentStart(AudioOutputIPCComponent) + + // Create and start the IPC server + server, err := NewAudioOutputServer() + if err != nil { + aom.logComponentError(AudioOutputIPCComponent, err, "failed to create IPC server") + return err + } + + if err := server.Start(); err != nil { + aom.logComponentError(AudioOutputIPCComponent, err, "failed to start IPC server") + return err + } + + aom.server = server + aom.setRunning(true) + aom.logComponentStarted(AudioOutputIPCComponent) + + // Send initial configuration + config := OutputIPCConfig{ + SampleRate: GetConfig().SampleRate, + Channels: GetConfig().Channels, + FrameSize: int(GetConfig().AudioQualityMediumFrameSize.Milliseconds()), + } + + if err := aom.SendConfig(config); err != nil { + aom.logger.Warn().Err(err).Msg("Failed to send initial configuration") + } + + return nil +} + +// Stop gracefully shuts down the audio output IPC manager +func (aom *AudioOutputIPCManager) Stop() { + aom.logComponentStop(AudioOutputIPCComponent) + + if aom.server != nil { + aom.server.Stop() + aom.server = nil + } + + aom.setRunning(false) + aom.resetMetrics() + aom.logComponentStopped(AudioOutputIPCComponent) +} + +// resetMetrics resets all metrics to zero +func (aom *AudioOutputIPCManager) resetMetrics() { + aom.BaseAudioManager.resetMetrics() +} + +// WriteOpusFrame sends an Opus frame to the output server +func (aom *AudioOutputIPCManager) WriteOpusFrame(frame *ZeroCopyAudioFrame) error { + if !aom.IsRunning() { + return fmt.Errorf("audio output IPC manager not running") + } + + if aom.server == nil { + return fmt.Errorf("audio output server not initialized") + } + + // Validate frame before processing + if err := ValidateZeroCopyFrame(frame); err != nil { + aom.logComponentError(AudioOutputIPCComponent, err, "Frame validation failed") + return fmt.Errorf("output frame validation failed: %w", err) + } + + start := time.Now() + + // Send frame to IPC server + if err := aom.server.SendFrame(frame.Data()); err != nil { + aom.recordFrameDropped() + return err + } + + // Update metrics + processingTime := time.Since(start) + aom.recordFrameProcessed(frame.Length()) + aom.updateLatency(processingTime) + + return nil +} + +// WriteOpusFrameZeroCopy writes an Opus audio frame using zero-copy optimization +func (aom *AudioOutputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) error { + if !aom.IsRunning() { + return fmt.Errorf("audio output IPC manager not running") + } + + if aom.server == nil { + return fmt.Errorf("audio output server not initialized") + } + + start := time.Now() + + // Extract frame data + frameData := frame.Data() + + // Send frame to IPC server (zero-copy not available, use regular send) + if err := aom.server.SendFrame(frameData); err != nil { + aom.recordFrameDropped() + return err + } + + // Update metrics + processingTime := time.Since(start) + aom.recordFrameProcessed(len(frameData)) + aom.updateLatency(processingTime) + + return nil +} + +// IsReady returns true if the IPC manager is ready to process frames +func (aom *AudioOutputIPCManager) IsReady() bool { + return aom.IsRunning() && aom.server != nil +} + +// GetMetrics returns current audio output metrics +func (aom *AudioOutputIPCManager) GetMetrics() AudioOutputMetrics { + baseMetrics := aom.getBaseMetrics() + return AudioOutputMetrics{ + FramesReceived: atomic.LoadInt64(&baseMetrics.FramesProcessed), // For output, processed = received + BaseAudioMetrics: baseMetrics, + } +} + +// GetDetailedMetrics returns detailed metrics including server statistics +func (aom *AudioOutputIPCManager) GetDetailedMetrics() (AudioOutputMetrics, map[string]interface{}) { + metrics := aom.GetMetrics() + detailed := make(map[string]interface{}) + + if aom.server != nil { + total, dropped, bufferSize := aom.server.GetServerStats() + detailed["server_total_frames"] = total + detailed["server_dropped_frames"] = dropped + detailed["server_buffer_size"] = bufferSize + detailed["server_frame_rate"] = aom.calculateFrameRate() + } + + return metrics, detailed +} + +// calculateFrameRate calculates the current frame processing rate +func (aom *AudioOutputIPCManager) calculateFrameRate() float64 { + baseMetrics := aom.getBaseMetrics() + framesProcessed := atomic.LoadInt64(&baseMetrics.FramesProcessed) + if framesProcessed == 0 { + return 0.0 + } + + // Calculate rate based on last frame time + baseMetrics = aom.getBaseMetrics() + if baseMetrics.LastFrameTime.IsZero() { + return 0.0 + } + + elapsed := time.Since(baseMetrics.LastFrameTime) + if elapsed.Seconds() == 0 { + return 0.0 + } + + return float64(framesProcessed) / elapsed.Seconds() +} + +// SendConfig sends configuration to the IPC server +func (aom *AudioOutputIPCManager) SendConfig(config OutputIPCConfig) error { + if aom.server == nil { + return fmt.Errorf("audio output server not initialized") + } + + // Validate configuration parameters + if err := ValidateOutputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil { + aom.logger.Error().Err(err).Msg("Configuration validation failed") + return fmt.Errorf("output configuration validation failed: %w", err) + } + + // Note: AudioOutputServer doesn't have SendConfig method yet + // This is a placeholder for future implementation + aom.logger.Info().Interface("config", config).Msg("configuration received") + return nil +} + +// GetServer returns the underlying IPC server (for testing) +func (aom *AudioOutputIPCManager) GetServer() *AudioOutputServer { + return aom.server +} diff --git a/internal/audio/output_manager.go b/internal/audio/output_manager.go index 66140b6..fbf302d 100644 --- a/internal/audio/output_manager.go +++ b/internal/audio/output_manager.go @@ -2,60 +2,56 @@ package audio import ( "sync/atomic" - "time" "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" ) // AudioOutputManager manages audio output stream using IPC mode type AudioOutputManager struct { - metrics AudioOutputMetrics - - streamer *AudioOutputStreamer - logger zerolog.Logger - running int32 + *BaseAudioManager + streamer *AudioOutputStreamer + framesReceived int64 // Output-specific metric } // AudioOutputMetrics tracks output-specific metrics +// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) type AudioOutputMetrics struct { - FramesReceived int64 - FramesDropped int64 - BytesProcessed int64 - ConnectionDrops int64 - LastFrameTime time.Time - AverageLatency time.Duration + // Atomic int64 field first for proper ARM32 alignment + FramesReceived int64 `json:"frames_received"` // Total frames received (output-specific) + + // Embedded struct with atomic fields properly aligned + BaseAudioMetrics } // NewAudioOutputManager creates a new audio output manager func NewAudioOutputManager() *AudioOutputManager { + logger := logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger() streamer, err := NewAudioOutputStreamer() if err != nil { // Log error but continue with nil streamer - will be handled gracefully - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger() logger.Error().Err(err).Msg("Failed to create audio output streamer") } return &AudioOutputManager{ - streamer: streamer, - logger: logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger(), + BaseAudioManager: NewBaseAudioManager(logger), + streamer: streamer, } } // Start starts the audio output manager func (aom *AudioOutputManager) Start() error { - if !atomic.CompareAndSwapInt32(&aom.running, 0, 1) { + if !aom.setRunning(true) { return nil // Already running } - aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("starting component") + aom.logComponentStart(AudioOutputManagerComponent) if aom.streamer == nil { // Try to recreate streamer if it was nil streamer, err := NewAudioOutputStreamer() if err != nil { - atomic.StoreInt32(&aom.running, 0) - aom.logger.Error().Err(err).Str("component", AudioOutputManagerComponent).Msg("failed to create audio output streamer") + aom.setRunning(false) + aom.logComponentError(AudioOutputManagerComponent, err, "failed to create audio output streamer") return err } aom.streamer = streamer @@ -63,44 +59,39 @@ func (aom *AudioOutputManager) Start() error { err := aom.streamer.Start() if err != nil { - atomic.StoreInt32(&aom.running, 0) + aom.setRunning(false) // Reset metrics on failed start aom.resetMetrics() - aom.logger.Error().Err(err).Str("component", AudioOutputManagerComponent).Msg("failed to start component") + aom.logComponentError(AudioOutputManagerComponent, err, "failed to start component") return err } - aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("component started successfully") + aom.logComponentStarted(AudioOutputManagerComponent) return nil } // Stop stops the audio output manager func (aom *AudioOutputManager) Stop() { - if !atomic.CompareAndSwapInt32(&aom.running, 1, 0) { + if !aom.setRunning(false) { return // Already stopped } - aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("stopping component") + aom.logComponentStop(AudioOutputManagerComponent) if aom.streamer != nil { aom.streamer.Stop() } - aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("component stopped") + aom.logComponentStopped(AudioOutputManagerComponent) } // resetMetrics resets all metrics to zero func (aom *AudioOutputManager) resetMetrics() { - atomic.StoreInt64(&aom.metrics.FramesReceived, 0) - atomic.StoreInt64(&aom.metrics.FramesDropped, 0) - atomic.StoreInt64(&aom.metrics.BytesProcessed, 0) - atomic.StoreInt64(&aom.metrics.ConnectionDrops, 0) + aom.BaseAudioManager.resetMetrics() + atomic.StoreInt64(&aom.framesReceived, 0) } -// IsRunning returns whether the audio output manager is running -func (aom *AudioOutputManager) IsRunning() bool { - return atomic.LoadInt32(&aom.running) == 1 -} +// Note: IsRunning() is inherited from BaseAudioManager // IsReady returns whether the audio output manager is ready to receive frames func (aom *AudioOutputManager) IsReady() bool { @@ -115,12 +106,8 @@ func (aom *AudioOutputManager) IsReady() bool { // GetMetrics returns current metrics func (aom *AudioOutputManager) GetMetrics() AudioOutputMetrics { return AudioOutputMetrics{ - FramesReceived: atomic.LoadInt64(&aom.metrics.FramesReceived), - FramesDropped: atomic.LoadInt64(&aom.metrics.FramesDropped), - BytesProcessed: atomic.LoadInt64(&aom.metrics.BytesProcessed), - ConnectionDrops: atomic.LoadInt64(&aom.metrics.ConnectionDrops), - AverageLatency: aom.metrics.AverageLatency, - LastFrameTime: aom.metrics.LastFrameTime, + FramesReceived: atomic.LoadInt64(&aom.framesReceived), + BaseAudioMetrics: aom.getBaseMetrics(), } } @@ -131,6 +118,7 @@ func (aom *AudioOutputManager) GetComprehensiveMetrics() map[string]interface{} comprehensiveMetrics := map[string]interface{}{ "manager": map[string]interface{}{ "frames_received": baseMetrics.FramesReceived, + "frames_processed": baseMetrics.FramesProcessed, "frames_dropped": baseMetrics.FramesDropped, "bytes_processed": baseMetrics.BytesProcessed, "connection_drops": baseMetrics.ConnectionDrops, diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 190fdee..4057c53 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -391,6 +391,14 @@ func StartAudioOutputStreaming(send func([]byte)) error { frame := GetAudioFrameBuffer() frame = frame[:n] // Resize to actual frame size copy(frame, buffer[:n]) + + // Validate frame before sending + if err := ValidateAudioFrameFast(frame); err != nil { + getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame") + PutAudioFrameBuffer(frame) + continue + } + send(frame) // Return buffer to pool after sending PutAudioFrameBuffer(frame) diff --git a/internal/audio/regression_test.go b/internal/audio/regression_test.go new file mode 100644 index 0000000..6d5380c --- /dev/null +++ b/internal/audio/regression_test.go @@ -0,0 +1,362 @@ +//go:build cgo +// +build cgo + +package audio + +import ( + "fmt" + "net" + "os" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// TestRegressionScenarios tests critical edge cases and error conditions +// that could cause system instability in production +func TestRegressionScenarios(t *testing.T) { + tests := []struct { + name string + testFunc func(t *testing.T) + description string + }{ + { + name: "IPCConnectionFailure", + testFunc: testIPCConnectionFailureRecovery, + description: "Test IPC connection failure and recovery scenarios", + }, + { + name: "BufferOverflow", + testFunc: testBufferOverflowHandling, + description: "Test buffer overflow protection and recovery", + }, + { + name: "SupervisorRapidRestart", + testFunc: testSupervisorRapidRestartScenario, + description: "Test supervisor behavior under rapid restart conditions", + }, + { + name: "ConcurrentStartStop", + testFunc: testConcurrentStartStopOperations, + description: "Test concurrent start/stop operations for race conditions", + }, + { + name: "MemoryLeakPrevention", + testFunc: testMemoryLeakPrevention, + description: "Test memory leak prevention in long-running scenarios", + }, + { + name: "ConfigValidationEdgeCases", + testFunc: testConfigValidationEdgeCases, + description: "Test configuration validation with edge case values", + }, + { + name: "AtomicOperationConsistency", + testFunc: testAtomicOperationConsistency, + description: "Test atomic operations consistency under high concurrency", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Logf("Running regression test: %s - %s", tt.name, tt.description) + tt.testFunc(t) + }) + } +} + +// testIPCConnectionFailureRecovery tests IPC connection failure scenarios +func testIPCConnectionFailureRecovery(t *testing.T) { + manager := NewAudioInputIPCManager() + require.NotNil(t, manager) + + // Test start with no IPC server available (should handle gracefully) + err := manager.Start() + // Should not panic or crash, may return error depending on implementation + if err != nil { + t.Logf("Expected error when no IPC server available: %v", err) + } + + // Test that manager can recover after IPC becomes available + if manager.IsRunning() { + manager.Stop() + } + + // Verify clean state after failure + assert.False(t, manager.IsRunning()) + assert.False(t, manager.IsReady()) +} + +// testBufferOverflowHandling tests buffer overflow protection +func testBufferOverflowHandling(t *testing.T) { + // Test with extremely large buffer sizes + extremelyLargeSize := 1024 * 1024 * 100 // 100MB + err := ValidateBufferSize(extremelyLargeSize) + assert.Error(t, err, "Should reject extremely large buffer sizes") + + // Test with negative buffer sizes + err = ValidateBufferSize(-1) + assert.Error(t, err, "Should reject negative buffer sizes") + + // Test with zero buffer size + err = ValidateBufferSize(0) + assert.Error(t, err, "Should reject zero buffer size") + + // Test with maximum valid buffer size + maxValidSize := GetConfig().SocketMaxBuffer + err = ValidateBufferSize(int(maxValidSize)) + assert.NoError(t, err, "Should accept maximum valid buffer size") +} + +// testSupervisorRapidRestartScenario tests supervisor under rapid restart conditions +func testSupervisorRapidRestartScenario(t *testing.T) { + if testing.Short() { + t.Skip("Skipping rapid restart test in short mode") + } + + supervisor := NewAudioOutputSupervisor() + require.NotNil(t, supervisor) + + // Perform rapid start/stop cycles to test for race conditions + for i := 0; i < 10; i++ { + err := supervisor.Start() + if err != nil { + t.Logf("Start attempt %d failed (expected in test environment): %v", i, err) + } + + // Very short delay to stress test + time.Sleep(10 * time.Millisecond) + + supervisor.Stop() + time.Sleep(10 * time.Millisecond) + } + + // Verify supervisor is in clean state after rapid cycling + assert.False(t, supervisor.IsRunning()) +} + +// testConcurrentStartStopOperations tests concurrent operations for race conditions +func testConcurrentStartStopOperations(t *testing.T) { + manager := NewAudioInputIPCManager() + require.NotNil(t, manager) + + var wg sync.WaitGroup + const numGoroutines = 10 + + // Launch multiple goroutines trying to start/stop concurrently + for i := 0; i < numGoroutines; i++ { + wg.Add(2) + + // Start goroutine + go func(id int) { + defer wg.Done() + err := manager.Start() + if err != nil { + t.Logf("Concurrent start %d: %v", id, err) + } + }(i) + + // Stop goroutine + go func(id int) { + defer wg.Done() + time.Sleep(5 * time.Millisecond) // Small delay + manager.Stop() + }(i) + } + + wg.Wait() + + // Ensure final state is consistent + manager.Stop() // Final cleanup + assert.False(t, manager.IsRunning()) +} + +// testMemoryLeakPrevention tests for memory leaks in long-running scenarios +func testMemoryLeakPrevention(t *testing.T) { + if testing.Short() { + t.Skip("Skipping memory leak test in short mode") + } + + manager := NewAudioInputIPCManager() + require.NotNil(t, manager) + + // Simulate long-running operation with periodic restarts + for cycle := 0; cycle < 5; cycle++ { + err := manager.Start() + if err != nil { + t.Logf("Start cycle %d failed (expected): %v", cycle, err) + } + + // Simulate some activity + time.Sleep(100 * time.Millisecond) + + // Get metrics to ensure they're not accumulating indefinitely + metrics := manager.GetMetrics() + assert.NotNil(t, metrics, "Metrics should be available") + + manager.Stop() + time.Sleep(50 * time.Millisecond) + } + + // Final verification + assert.False(t, manager.IsRunning()) +} + +// testConfigValidationEdgeCases tests configuration validation with edge cases +func testConfigValidationEdgeCases(t *testing.T) { + // Test sample rate edge cases + testCases := []struct { + sampleRate int + channels int + frameSize int + shouldPass bool + description string + }{ + {0, 2, 960, false, "zero sample rate"}, + {-1, 2, 960, false, "negative sample rate"}, + {1, 2, 960, false, "extremely low sample rate"}, + {999999, 2, 960, false, "extremely high sample rate"}, + {48000, 0, 960, false, "zero channels"}, + {48000, -1, 960, false, "negative channels"}, + {48000, 100, 960, false, "too many channels"}, + {48000, 2, 0, false, "zero frame size"}, + {48000, 2, -1, false, "negative frame size"}, + {48000, 2, 999999, true, "extremely large frame size"}, + {48000, 2, 960, true, "valid configuration"}, + {44100, 1, 441, true, "valid mono configuration"}, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + err := ValidateInputIPCConfig(tc.sampleRate, tc.channels, tc.frameSize) + if tc.shouldPass { + assert.NoError(t, err, "Should accept valid config: %s", tc.description) + } else { + assert.Error(t, err, "Should reject invalid config: %s", tc.description) + } + }) + } +} + +// testAtomicOperationConsistency tests atomic operations under high concurrency +func testAtomicOperationConsistency(t *testing.T) { + var counter int64 + var wg sync.WaitGroup + const numGoroutines = 100 + const incrementsPerGoroutine = 1000 + + // Launch multiple goroutines performing atomic operations + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < incrementsPerGoroutine; j++ { + atomic.AddInt64(&counter, 1) + } + }() + } + + wg.Wait() + + // Verify final count is correct + expected := int64(numGoroutines * incrementsPerGoroutine) + actual := atomic.LoadInt64(&counter) + assert.Equal(t, expected, actual, "Atomic operations should be consistent") +} + +// TestErrorRecoveryScenarios tests various error recovery scenarios +func TestErrorRecoveryScenarios(t *testing.T) { + tests := []struct { + name string + testFunc func(t *testing.T) + }{ + {"NetworkConnectionLoss", testNetworkConnectionLossRecovery}, + {"ProcessCrashRecovery", testProcessCrashRecovery}, + {"ResourceExhaustionRecovery", testResourceExhaustionRecovery}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.testFunc(t) + }) + } +} + +// testNetworkConnectionLossRecovery tests recovery from network connection loss +func testNetworkConnectionLossRecovery(t *testing.T) { + // Create a temporary socket that we can close to simulate connection loss + tempDir := t.TempDir() + socketPath := fmt.Sprintf("%s/test_recovery.sock", tempDir) + + // Create and immediately close a socket to test connection failure + listener, err := net.Listen("unix", socketPath) + if err != nil { + t.Skipf("Cannot create test socket: %v", err) + } + listener.Close() // Close immediately to simulate connection loss + + // Remove socket file to ensure connection will fail + os.Remove(socketPath) + + // Test that components handle connection loss gracefully + manager := NewAudioInputIPCManager() + require.NotNil(t, manager) + + // This should handle the connection failure gracefully + err = manager.Start() + if err != nil { + t.Logf("Expected connection failure handled: %v", err) + } + + // Cleanup + manager.Stop() +} + +// testProcessCrashRecovery tests recovery from process crashes +func testProcessCrashRecovery(t *testing.T) { + if testing.Short() { + t.Skip("Skipping process crash test in short mode") + } + + supervisor := NewAudioOutputSupervisor() + require.NotNil(t, supervisor) + + // Start supervisor (will likely fail in test environment, but should handle gracefully) + err := supervisor.Start() + if err != nil { + t.Logf("Supervisor start failed as expected in test environment: %v", err) + } + + // Verify supervisor can be stopped cleanly even after start failure + supervisor.Stop() + assert.False(t, supervisor.IsRunning()) +} + +// testResourceExhaustionRecovery tests recovery from resource exhaustion +func testResourceExhaustionRecovery(t *testing.T) { + // Test with resource constraints + manager := NewAudioInputIPCManager() + require.NotNil(t, manager) + + // Simulate resource exhaustion by rapid start/stop cycles + for i := 0; i < 20; i++ { + err := manager.Start() + if err != nil { + t.Logf("Resource exhaustion cycle %d: %v", i, err) + } + manager.Stop() + // No delay to stress test resource management + } + + // Verify system can still function after resource stress + err := manager.Start() + if err != nil { + t.Logf("Final start after resource stress: %v", err) + } + manager.Stop() + assert.False(t, manager.IsRunning()) +} \ No newline at end of file diff --git a/internal/audio/relay.go b/internal/audio/relay.go index 077a8ca..cd6ef6e 100644 --- a/internal/audio/relay.go +++ b/internal/audio/relay.go @@ -170,6 +170,13 @@ func (r *AudioRelay) relayLoop() { // forwardToWebRTC forwards a frame to the WebRTC audio track func (r *AudioRelay) forwardToWebRTC(frame []byte) error { + // Validate frame data before processing + if err := ValidateFrameData(frame); err != nil { + r.incrementDropped() + r.logger.Debug().Err(err).Msg("Invalid frame data in relay") + return err + } + r.mutex.RLock() defer r.mutex.RUnlock() diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go index 1a9fc36..40bb9e5 100644 --- a/internal/audio/supervisor.go +++ b/internal/audio/supervisor.go @@ -4,17 +4,12 @@ package audio import ( - "context" "fmt" "os" "os/exec" - "sync" "sync/atomic" "syscall" "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" ) // Restart configuration is now retrieved from centralized config @@ -36,30 +31,17 @@ func getMaxRestartDelay() time.Duration { // AudioOutputSupervisor manages the audio output server subprocess lifecycle type AudioOutputSupervisor struct { - ctx context.Context - cancel context.CancelFunc - logger *zerolog.Logger - mutex sync.RWMutex - running int32 - - // Process management - cmd *exec.Cmd - processPID int + *BaseSupervisor // Restart management restartAttempts []time.Time - lastExitCode int - lastExitTime time.Time - // Channels for coordination - processDone chan struct{} + // Channel management stopChan chan struct{} + processDone chan struct{} stopChanClosed bool // Track if stopChan is closed processDoneClosed bool // Track if processDone is closed - // Process monitoring - processMonitor *ProcessMonitor - // Callbacks onProcessStart func(pid int) onProcessExit func(pid int, exitCode int, crashed bool) @@ -68,16 +50,11 @@ type AudioOutputSupervisor struct { // NewAudioOutputSupervisor creates a new audio output server supervisor func NewAudioOutputSupervisor() *AudioOutputSupervisor { - ctx, cancel := context.WithCancel(context.Background()) - logger := logging.GetDefaultLogger().With().Str("component", AudioOutputSupervisorComponent).Logger() - return &AudioOutputSupervisor{ - ctx: ctx, - cancel: cancel, - logger: &logger, - processDone: make(chan struct{}), - stopChan: make(chan struct{}), - processMonitor: GetProcessMonitor(), + BaseSupervisor: NewBaseSupervisor("audio-output-supervisor"), + restartAttempts: make([]time.Time, 0), + stopChan: make(chan struct{}), + processDone: make(chan struct{}), } } @@ -101,7 +78,8 @@ func (s *AudioOutputSupervisor) Start() error { return fmt.Errorf("audio output supervisor is already running") } - s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("starting component") + s.logSupervisorStart() + s.createContext() // Recreate channels in case they were closed by a previous Stop() call s.mutex.Lock() @@ -109,12 +87,8 @@ func (s *AudioOutputSupervisor) Start() error { s.stopChan = make(chan struct{}) s.stopChanClosed = false // Reset channel closed flag s.processDoneClosed = false // Reset channel closed flag - // Recreate context as well since it might have been cancelled - s.ctx, s.cancel = context.WithCancel(context.Background()) // Reset restart tracking on start s.restartAttempts = s.restartAttempts[:0] - s.lastExitCode = 0 - s.lastExitTime = time.Time{} s.mutex.Unlock() // Start the supervision loop @@ -130,7 +104,7 @@ func (s *AudioOutputSupervisor) Stop() { return // Already stopped } - s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("stopping component") + s.logSupervisorStop() // Signal stop and wait for cleanup s.mutex.Lock() @@ -139,7 +113,7 @@ func (s *AudioOutputSupervisor) Stop() { s.stopChanClosed = true } s.mutex.Unlock() - s.cancel() + s.cancelContext() // Wait for process to exit select { @@ -153,61 +127,11 @@ func (s *AudioOutputSupervisor) Stop() { s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped") } -// IsRunning returns true if the supervisor is running -func (s *AudioOutputSupervisor) IsRunning() bool { - return atomic.LoadInt32(&s.running) == 1 -} - -// GetProcessPID returns the current process PID (0 if not running) -func (s *AudioOutputSupervisor) GetProcessPID() int { - s.mutex.RLock() - defer s.mutex.RUnlock() - return s.processPID -} - -// GetLastExitInfo returns information about the last process exit -func (s *AudioOutputSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) { - s.mutex.RLock() - defer s.mutex.RUnlock() - return s.lastExitCode, s.lastExitTime -} - -// GetProcessMetrics returns current process metrics if the process is running +// GetProcessMetrics returns current process metrics with audio-output-server name func (s *AudioOutputSupervisor) GetProcessMetrics() *ProcessMetrics { - s.mutex.RLock() - pid := s.processPID - s.mutex.RUnlock() - - if pid == 0 { - // Return default metrics when no process is running - return &ProcessMetrics{ - PID: 0, - CPUPercent: 0.0, - MemoryRSS: 0, - MemoryVMS: 0, - MemoryPercent: 0.0, - Timestamp: time.Now(), - ProcessName: "audio-output-server", - } - } - - metrics := s.processMonitor.GetCurrentMetrics() - for _, metric := range metrics { - if metric.PID == pid { - return &metric - } - } - - // Return default metrics if process not found in monitor - return &ProcessMetrics{ - PID: pid, - CPUPercent: 0.0, - MemoryRSS: 0, - MemoryVMS: 0, - MemoryPercent: 0.0, - Timestamp: time.Now(), - ProcessName: "audio-output-server", - } + metrics := s.BaseSupervisor.GetProcessMetrics() + metrics.ProcessName = "audio-output-server" + return metrics } // supervisionLoop is the main supervision loop diff --git a/internal/audio/validation.go b/internal/audio/validation.go index 0b439c9..1301ab8 100644 --- a/internal/audio/validation.go +++ b/internal/audio/validation.go @@ -35,11 +35,8 @@ func ValidateFrameData(data []byte) error { if len(data) == 0 { return ErrInvalidFrameData } - // Use a reasonable default if config is not available - maxFrameSize := 4096 - if config := GetConfig(); config != nil { - maxFrameSize = config.MaxAudioFrameSize - } + // Use config value or fallback to default + maxFrameSize := GetConfig().MaxAudioFrameSize if len(data) > maxFrameSize { return ErrInvalidFrameSize } @@ -55,11 +52,8 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error { if len(data) == 0 { return ErrInvalidFrameData } - // Use a reasonable default if config is not available - maxFrameSize := 4096 - if config := GetConfig(); config != nil { - maxFrameSize = config.MaxAudioFrameSize - } + // Use config value + maxFrameSize := GetConfig().MaxAudioFrameSize if len(data) > maxFrameSize { return ErrInvalidFrameSize } @@ -71,11 +65,8 @@ func ValidateBufferSize(size int) error { if size <= 0 { return ErrInvalidBufferSize } - // Use a reasonable default if config is not available - maxBuffer := 262144 // 256KB default - if config := GetConfig(); config != nil { - maxBuffer = config.SocketMaxBuffer - } + // Use config value + maxBuffer := GetConfig().SocketMaxBuffer if size > maxBuffer { return ErrInvalidBufferSize } @@ -102,11 +93,8 @@ func ValidateLatency(latency time.Duration) error { if latency < 0 { return ErrInvalidLatency } - // Use a reasonable default if config is not available - maxLatency := 500 * time.Millisecond - if config := GetConfig(); config != nil { - maxLatency = config.MaxLatency - } + // Use config value + maxLatency := GetConfig().MaxLatency if latency > maxLatency { return ErrInvalidLatency } @@ -115,13 +103,10 @@ func ValidateLatency(latency time.Duration) error { // ValidateMetricsInterval validates metrics update interval func ValidateMetricsInterval(interval time.Duration) error { - // Use reasonable defaults if config is not available - minInterval := 100 * time.Millisecond - maxInterval := 10 * time.Second - if config := GetConfig(); config != nil { - minInterval = config.MinMetricsUpdateInterval - maxInterval = config.MaxMetricsUpdateInterval - } + // Use config values + config := GetConfig() + minInterval := config.MinMetricsUpdateInterval + maxInterval := config.MaxMetricsUpdateInterval if interval < minInterval { return ErrInvalidMetricsInterval } @@ -143,10 +128,7 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error { return ErrInvalidBufferSize } // Validate against global limits - maxBuffer := 262144 // 256KB default - if config := GetConfig(); config != nil { - maxBuffer = config.SocketMaxBuffer - } + maxBuffer := GetConfig().SocketMaxBuffer if maxSize > maxBuffer { return ErrInvalidBufferSize } @@ -155,15 +137,11 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error { // ValidateInputIPCConfig validates input IPC configuration func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { - // Use reasonable defaults if config is not available - minSampleRate := 8000 - maxSampleRate := 48000 - maxChannels := 8 - if config := GetConfig(); config != nil { - minSampleRate = config.MinSampleRate - maxSampleRate = config.MaxSampleRate - maxChannels = config.MaxChannels - } + // Use config values + config := GetConfig() + minSampleRate := config.MinSampleRate + maxSampleRate := config.MaxSampleRate + maxChannels := config.MaxChannels if sampleRate < minSampleRate || sampleRate > maxSampleRate { return ErrInvalidSampleRate } @@ -175,3 +153,48 @@ func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { } return nil } + +// ValidateOutputIPCConfig validates output IPC configuration +func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error { + // Use config values + config := GetConfig() + minSampleRate := config.MinSampleRate + maxSampleRate := config.MaxSampleRate + maxChannels := config.MaxChannels + if sampleRate < minSampleRate || sampleRate > maxSampleRate { + return ErrInvalidSampleRate + } + if channels < 1 || channels > maxChannels { + return ErrInvalidChannels + } + if frameSize <= 0 { + return ErrInvalidFrameSize + } + return nil +} + +// ValidateLatencyConfig validates latency monitor configuration +func ValidateLatencyConfig(config LatencyConfig) error { + if err := ValidateLatency(config.TargetLatency); err != nil { + return err + } + if err := ValidateLatency(config.MaxLatency); err != nil { + return err + } + if config.TargetLatency >= config.MaxLatency { + return ErrInvalidLatency + } + if err := ValidateMetricsInterval(config.OptimizationInterval); err != nil { + return err + } + if config.HistorySize <= 0 { + return ErrInvalidBufferSize + } + if config.JitterThreshold < 0 { + return ErrInvalidLatency + } + if config.AdaptiveThreshold < 0 || config.AdaptiveThreshold > 1.0 { + return ErrInvalidConfiguration + } + return nil +} diff --git a/internal/audio/validation_enhanced.go b/internal/audio/validation_enhanced.go index 22861a0..df3b3a4 100644 --- a/internal/audio/validation_enhanced.go +++ b/internal/audio/validation_enhanced.go @@ -43,12 +43,13 @@ type ValidationConfig struct { // GetValidationConfig returns the current validation configuration func GetValidationConfig() ValidationConfig { + configConstants := GetConfig() return ValidationConfig{ Level: ValidationStandard, EnableRangeChecks: true, EnableAlignmentCheck: true, - EnableDataIntegrity: false, // Disabled by default for performance - MaxValidationTime: 5 * time.Second, // Default validation timeout + EnableDataIntegrity: false, // Disabled by default for performance + MaxValidationTime: configConstants.MaxValidationTime, // Configurable validation timeout } } @@ -88,16 +89,14 @@ func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expect // Range validation if validationConfig.EnableRangeChecks { config := GetConfig() - minFrameSize := 64 // Minimum reasonable frame size - if len(data) < minFrameSize { - return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), minFrameSize) + if len(data) < config.MinFrameSize { + return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), config.MinFrameSize) } // Validate frame length matches expected sample format expectedFrameSize := (expectedSampleRate * expectedChannels * 2) / 1000 * int(config.AudioQualityMediumFrameSize/time.Millisecond) - tolerance := 512 // Frame size tolerance in bytes - if abs(len(data)-expectedFrameSize) > tolerance { - return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, tolerance) + if abs(len(data)-expectedFrameSize) > config.FrameSizeTolerance { + return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, config.FrameSizeTolerance) } } @@ -184,14 +183,12 @@ func ValidateAudioConfiguration(config AudioConfig) error { configConstants := GetConfig() // Validate bitrate ranges - minBitrate := 6000 // Minimum Opus bitrate - maxBitrate := 510000 // Maximum Opus bitrate - if config.Bitrate < minBitrate || config.Bitrate > maxBitrate { - return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, minBitrate, maxBitrate) + if config.Bitrate < configConstants.MinOpusBitrate || config.Bitrate > configConstants.MaxOpusBitrate { + return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, configConstants.MinOpusBitrate, configConstants.MaxOpusBitrate) } // Validate sample rate - validSampleRates := []int{8000, 12000, 16000, 24000, 48000} + validSampleRates := configConstants.ValidSampleRates validSampleRate := false for _, rate := range validSampleRates { if config.SampleRate == rate { @@ -209,8 +206,8 @@ func ValidateAudioConfiguration(config AudioConfig) error { } // Validate frame size - minFrameSize := 10 * time.Millisecond // Minimum frame duration - maxFrameSize := 100 * time.Millisecond // Maximum frame duration + minFrameSize := GetConfig().MinFrameDuration + maxFrameSize := GetConfig().MaxFrameDuration if config.FrameSize < minFrameSize || config.FrameSize > maxFrameSize { return fmt.Errorf("%w: frame size %v outside valid range [%v, %v]", ErrInvalidConfiguration, config.FrameSize, minFrameSize, maxFrameSize) } @@ -218,6 +215,101 @@ func ValidateAudioConfiguration(config AudioConfig) error { return nil } +// ValidateAudioConfigConstants performs comprehensive validation of AudioConfigConstants +func ValidateAudioConfigConstants(config *AudioConfigConstants) error { + if config == nil { + return fmt.Errorf("%w: configuration is nil", ErrInvalidConfiguration) + } + + // Validate basic audio parameters + if config.MaxAudioFrameSize <= 0 { + return fmt.Errorf("%w: MaxAudioFrameSize must be positive", ErrInvalidConfiguration) + } + if config.SampleRate <= 0 { + return fmt.Errorf("%w: SampleRate must be positive", ErrInvalidSampleRate) + } + if config.Channels <= 0 || config.Channels > 8 { + return fmt.Errorf("%w: Channels must be between 1 and 8", ErrInvalidChannels) + } + + // Validate Opus parameters + if config.OpusBitrate < 6000 || config.OpusBitrate > 510000 { + return fmt.Errorf("%w: OpusBitrate must be between 6000 and 510000", ErrInvalidConfiguration) + } + if config.OpusComplexity < 0 || config.OpusComplexity > 10 { + return fmt.Errorf("%w: OpusComplexity must be between 0 and 10", ErrInvalidConfiguration) + } + + // Validate bitrate ranges + if config.MinOpusBitrate <= 0 || config.MaxOpusBitrate <= 0 { + return fmt.Errorf("%w: MinOpusBitrate and MaxOpusBitrate must be positive", ErrInvalidConfiguration) + } + if config.MinOpusBitrate >= config.MaxOpusBitrate { + return fmt.Errorf("%w: MinOpusBitrate must be less than MaxOpusBitrate", ErrInvalidConfiguration) + } + + // Validate sample rate ranges + if config.MinSampleRate <= 0 || config.MaxSampleRate <= 0 { + return fmt.Errorf("%w: MinSampleRate and MaxSampleRate must be positive", ErrInvalidSampleRate) + } + if config.MinSampleRate >= config.MaxSampleRate { + return fmt.Errorf("%w: MinSampleRate must be less than MaxSampleRate", ErrInvalidSampleRate) + } + + // Validate frame duration ranges + if config.MinFrameDuration <= 0 || config.MaxFrameDuration <= 0 { + return fmt.Errorf("%w: MinFrameDuration and MaxFrameDuration must be positive", ErrInvalidConfiguration) + } + if config.MinFrameDuration >= config.MaxFrameDuration { + return fmt.Errorf("%w: MinFrameDuration must be less than MaxFrameDuration", ErrInvalidConfiguration) + } + + // Validate buffer sizes + if config.SocketMinBuffer <= 0 || config.SocketMaxBuffer <= 0 { + return fmt.Errorf("%w: SocketMinBuffer and SocketMaxBuffer must be positive", ErrInvalidBufferSize) + } + if config.SocketMinBuffer >= config.SocketMaxBuffer { + return fmt.Errorf("%w: SocketMinBuffer must be less than SocketMaxBuffer", ErrInvalidBufferSize) + } + + // Validate priority ranges + if config.MinNiceValue < -20 || config.MinNiceValue > 19 { + return fmt.Errorf("%w: MinNiceValue must be between -20 and 19", ErrInvalidPriority) + } + if config.MaxNiceValue < -20 || config.MaxNiceValue > 19 { + return fmt.Errorf("%w: MaxNiceValue must be between -20 and 19", ErrInvalidPriority) + } + if config.MinNiceValue >= config.MaxNiceValue { + return fmt.Errorf("%w: MinNiceValue must be less than MaxNiceValue", ErrInvalidPriority) + } + + // Validate timeout values + if config.MaxValidationTime <= 0 { + return fmt.Errorf("%w: MaxValidationTime must be positive", ErrInvalidConfiguration) + } + if config.RestartDelay <= 0 || config.MaxRestartDelay <= 0 { + return fmt.Errorf("%w: RestartDelay and MaxRestartDelay must be positive", ErrInvalidConfiguration) + } + if config.RestartDelay >= config.MaxRestartDelay { + return fmt.Errorf("%w: RestartDelay must be less than MaxRestartDelay", ErrInvalidConfiguration) + } + + // Validate valid sample rates array + if len(config.ValidSampleRates) == 0 { + return fmt.Errorf("%w: ValidSampleRates cannot be empty", ErrInvalidSampleRate) + } + for _, rate := range config.ValidSampleRates { + if rate <= 0 { + return fmt.Errorf("%w: all ValidSampleRates must be positive", ErrInvalidSampleRate) + } + if rate < config.MinSampleRate || rate > config.MaxSampleRate { + return fmt.Errorf("%w: ValidSampleRate %d outside range [%d, %d]", ErrInvalidSampleRate, rate, config.MinSampleRate, config.MaxSampleRate) + } + } + + return nil +} + // ValidateResourceLimits checks if system resources are within acceptable limits func ValidateResourceLimits() error { config := GetConfig()