feat(audio): add comprehensive input validation and base components

refactor(audio): restructure metrics and supervisor components into base implementations
feat(audio): add validation for all audio configurations and frames
fix(audio): fix atomic alignment in metrics structures
refactor(audio): consolidate common functionality into base manager and supervisor
feat(audio): add output IPC manager and config validation
This commit is contained in:
Alex P 2025-08-27 17:47:39 +00:00
parent 8fb0b9f9c6
commit dc2db8ed2d
24 changed files with 1729 additions and 337 deletions

View File

@ -105,13 +105,20 @@ type AdaptiveBufferManager struct {
// NewAdaptiveBufferManager creates a new adaptive buffer manager
func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManager {
logger := logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger()
if err := ValidateAdaptiveBufferConfig(config.MinBufferSize, config.MaxBufferSize, config.DefaultBufferSize); err != nil {
logger.Error().Err(err).Msg("Invalid adaptive buffer config, using defaults")
config = DefaultAdaptiveBufferConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &AdaptiveBufferManager{
currentInputBufferSize: int64(config.DefaultBufferSize),
currentOutputBufferSize: int64(config.DefaultBufferSize),
config: config,
logger: logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger(),
logger: logger,
processMonitor: GetProcessMonitor(),
ctx: ctx,
cancel: cancel,

View File

@ -100,6 +100,8 @@ import (
"errors"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
var (
@ -190,13 +192,14 @@ var qualityPresets = map[AudioQuality]struct {
func GetAudioQualityPresets() map[AudioQuality]AudioConfig {
result := make(map[AudioQuality]AudioConfig)
for quality, preset := range qualityPresets {
result[quality] = AudioConfig{
config := AudioConfig{
Quality: quality,
Bitrate: preset.outputBitrate,
SampleRate: preset.sampleRate,
Channels: preset.channels,
FrameSize: preset.frameSize,
}
result[quality] = config
}
return result
}
@ -205,7 +208,7 @@ func GetAudioQualityPresets() map[AudioQuality]AudioConfig {
func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig {
result := make(map[AudioQuality]AudioConfig)
for quality, preset := range qualityPresets {
result[quality] = AudioConfig{
config := AudioConfig{
Quality: quality,
Bitrate: preset.inputBitrate,
SampleRate: func() int {
@ -217,12 +220,21 @@ func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig {
Channels: 1, // Microphone is always mono
FrameSize: preset.frameSize,
}
result[quality] = config
}
return result
}
// SetAudioQuality updates the current audio quality configuration
func SetAudioQuality(quality AudioQuality) {
// Validate audio quality parameter
if err := ValidateAudioQuality(quality); err != nil {
// Log validation error but don't fail - maintain backward compatibility
logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger()
logger.Error().Err(err).Int("quality", int(quality)).Msg("Invalid audio quality provided, ignoring")
return
}
presets := GetAudioQualityPresets()
if config, exists := presets[quality]; exists {
currentConfig = config
@ -236,6 +248,14 @@ func GetAudioConfig() AudioConfig {
// SetMicrophoneQuality updates the current microphone quality configuration
func SetMicrophoneQuality(quality AudioQuality) {
// Validate audio quality parameter
if err := ValidateAudioQuality(quality); err != nil {
// Log validation error but don't fail - maintain backward compatibility
logger := logging.GetDefaultLogger().With().Str("component", "MicrophoneConfig").Logger()
logger.Error().Err(err).Int("quality", int(quality)).Msg("Invalid microphone quality provided, ignoring")
return
}
presets := GetMicrophoneQualityPresets()
if config, exists := presets[quality]; exists {
currentMicrophoneConfig = config

View File

@ -0,0 +1,317 @@
//go:build cgo
// +build cgo
package audio
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestAudioQualityEdgeCases tests edge cases for audio quality functions
// These tests ensure the recent validation removal doesn't introduce regressions
func TestAudioQualityEdgeCases(t *testing.T) {
tests := []struct {
name string
testFunc func(t *testing.T)
}{
{"AudioQualityBoundaryValues", testAudioQualityBoundaryValues},
{"MicrophoneQualityBoundaryValues", testMicrophoneQualityBoundaryValues},
{"AudioQualityPresetsConsistency", testAudioQualityPresetsConsistency},
{"MicrophoneQualityPresetsConsistency", testMicrophoneQualityPresetsConsistency},
{"QualitySettingsThreadSafety", testQualitySettingsThreadSafety},
{"QualityPresetsImmutability", testQualityPresetsImmutability},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.testFunc(t)
})
}
}
// testAudioQualityBoundaryValues tests boundary values for audio quality
func testAudioQualityBoundaryValues(t *testing.T) {
// Test minimum valid quality (0)
originalConfig := GetAudioConfig()
SetAudioQuality(AudioQualityLow)
assert.Equal(t, AudioQualityLow, GetAudioConfig().Quality, "Should accept minimum quality value")
// Test maximum valid quality (3)
SetAudioQuality(AudioQualityUltra)
assert.Equal(t, AudioQualityUltra, GetAudioConfig().Quality, "Should accept maximum quality value")
// Test that quality settings work correctly
SetAudioQuality(AudioQualityMedium)
currentConfig := GetAudioConfig()
assert.Equal(t, AudioQualityMedium, currentConfig.Quality, "Should set medium quality")
t.Logf("Medium quality config: %+v", currentConfig)
SetAudioQuality(AudioQualityHigh)
currentConfig = GetAudioConfig()
assert.Equal(t, AudioQualityHigh, currentConfig.Quality, "Should set high quality")
t.Logf("High quality config: %+v", currentConfig)
// Restore original quality
SetAudioQuality(originalConfig.Quality)
}
// testMicrophoneQualityBoundaryValues tests boundary values for microphone quality
func testMicrophoneQualityBoundaryValues(t *testing.T) {
// Test minimum valid quality
originalConfig := GetMicrophoneConfig()
SetMicrophoneQuality(AudioQualityLow)
assert.Equal(t, AudioQualityLow, GetMicrophoneConfig().Quality, "Should accept minimum microphone quality value")
// Test maximum valid quality
SetMicrophoneQuality(AudioQualityUltra)
assert.Equal(t, AudioQualityUltra, GetMicrophoneConfig().Quality, "Should accept maximum microphone quality value")
// Test that quality settings work correctly
SetMicrophoneQuality(AudioQualityMedium)
currentConfig := GetMicrophoneConfig()
assert.Equal(t, AudioQualityMedium, currentConfig.Quality, "Should set medium microphone quality")
t.Logf("Medium microphone quality config: %+v", currentConfig)
SetMicrophoneQuality(AudioQualityHigh)
currentConfig = GetMicrophoneConfig()
assert.Equal(t, AudioQualityHigh, currentConfig.Quality, "Should set high microphone quality")
t.Logf("High microphone quality config: %+v", currentConfig)
// Restore original quality
SetMicrophoneQuality(originalConfig.Quality)
}
// testAudioQualityPresetsConsistency tests consistency of audio quality presets
func testAudioQualityPresetsConsistency(t *testing.T) {
presets := GetAudioQualityPresets()
require.NotNil(t, presets, "Audio quality presets should not be nil")
require.NotEmpty(t, presets, "Audio quality presets should not be empty")
// Verify presets have expected structure
for i, preset := range presets {
t.Logf("Audio preset %d: %+v", i, preset)
// Each preset should have reasonable values
assert.GreaterOrEqual(t, preset.Bitrate, 0, "Bitrate should be non-negative")
assert.Greater(t, preset.SampleRate, 0, "Sample rate should be positive")
assert.Greater(t, preset.Channels, 0, "Channels should be positive")
}
// Test that presets are accessible by valid quality levels
qualityLevels := []AudioQuality{AudioQualityLow, AudioQualityMedium, AudioQualityHigh, AudioQualityUltra}
for _, quality := range qualityLevels {
preset, exists := presets[quality]
assert.True(t, exists, "Preset should exist for quality %v", quality)
assert.Greater(t, preset.Bitrate, 0, "Preset bitrate should be positive for quality %v", quality)
}
}
// testMicrophoneQualityPresetsConsistency tests consistency of microphone quality presets
func testMicrophoneQualityPresetsConsistency(t *testing.T) {
presets := GetMicrophoneQualityPresets()
require.NotNil(t, presets, "Microphone quality presets should not be nil")
require.NotEmpty(t, presets, "Microphone quality presets should not be empty")
// Verify presets have expected structure
for i, preset := range presets {
t.Logf("Microphone preset %d: %+v", i, preset)
// Each preset should have reasonable values
assert.GreaterOrEqual(t, preset.Bitrate, 0, "Bitrate should be non-negative")
assert.Greater(t, preset.SampleRate, 0, "Sample rate should be positive")
assert.Greater(t, preset.Channels, 0, "Channels should be positive")
}
// Test that presets are accessible by valid quality levels
qualityLevels := []AudioQuality{AudioQualityLow, AudioQualityMedium, AudioQualityHigh, AudioQualityUltra}
for _, quality := range qualityLevels {
preset, exists := presets[quality]
assert.True(t, exists, "Microphone preset should exist for quality %v", quality)
assert.Greater(t, preset.Bitrate, 0, "Microphone preset bitrate should be positive for quality %v", quality)
}
}
// testQualitySettingsThreadSafety tests thread safety of quality settings
func testQualitySettingsThreadSafety(t *testing.T) {
if testing.Short() {
t.Skip("Skipping thread safety test in short mode")
}
originalAudioConfig := GetAudioConfig()
originalMicConfig := GetMicrophoneConfig()
// Test concurrent access to quality settings
const numGoroutines = 50
const numOperations = 100
done := make(chan bool, numGoroutines*2)
// Audio quality goroutines
for i := 0; i < numGoroutines; i++ {
go func(id int) {
for j := 0; j < numOperations; j++ {
// Cycle through valid quality values
qualityIndex := j % 4
var quality AudioQuality
switch qualityIndex {
case 0:
quality = AudioQualityLow
case 1:
quality = AudioQualityMedium
case 2:
quality = AudioQualityHigh
case 3:
quality = AudioQualityUltra
}
SetAudioQuality(quality)
_ = GetAudioConfig()
}
done <- true
}(i)
}
// Microphone quality goroutines
for i := 0; i < numGoroutines; i++ {
go func(id int) {
for j := 0; j < numOperations; j++ {
// Cycle through valid quality values
qualityIndex := j % 4
var quality AudioQuality
switch qualityIndex {
case 0:
quality = AudioQualityLow
case 1:
quality = AudioQualityMedium
case 2:
quality = AudioQualityHigh
case 3:
quality = AudioQualityUltra
}
SetMicrophoneQuality(quality)
_ = GetMicrophoneConfig()
}
done <- true
}(i)
}
// Wait for all goroutines to complete
for i := 0; i < numGoroutines*2; i++ {
<-done
}
// Verify system is still functional
SetAudioQuality(AudioQualityHigh)
assert.Equal(t, AudioQualityHigh, GetAudioConfig().Quality, "Audio quality should be settable after concurrent access")
SetMicrophoneQuality(AudioQualityMedium)
assert.Equal(t, AudioQualityMedium, GetMicrophoneConfig().Quality, "Microphone quality should be settable after concurrent access")
// Restore original values
SetAudioQuality(originalAudioConfig.Quality)
SetMicrophoneQuality(originalMicConfig.Quality)
}
// testQualityPresetsImmutability tests that quality presets are not accidentally modified
func testQualityPresetsImmutability(t *testing.T) {
// Get presets multiple times and verify they're consistent
presets1 := GetAudioQualityPresets()
presets2 := GetAudioQualityPresets()
require.Equal(t, len(presets1), len(presets2), "Preset count should be consistent")
// Verify each preset is identical
for quality := range presets1 {
assert.Equal(t, presets1[quality].Bitrate, presets2[quality].Bitrate,
"Preset %v bitrate should be consistent", quality)
assert.Equal(t, presets1[quality].SampleRate, presets2[quality].SampleRate,
"Preset %v sample rate should be consistent", quality)
assert.Equal(t, presets1[quality].Channels, presets2[quality].Channels,
"Preset %v channels should be consistent", quality)
}
// Test microphone presets as well
micPresets1 := GetMicrophoneQualityPresets()
micPresets2 := GetMicrophoneQualityPresets()
require.Equal(t, len(micPresets1), len(micPresets2), "Microphone preset count should be consistent")
for quality := range micPresets1 {
assert.Equal(t, micPresets1[quality].Bitrate, micPresets2[quality].Bitrate,
"Microphone preset %v bitrate should be consistent", quality)
assert.Equal(t, micPresets1[quality].SampleRate, micPresets2[quality].SampleRate,
"Microphone preset %v sample rate should be consistent", quality)
assert.Equal(t, micPresets1[quality].Channels, micPresets2[quality].Channels,
"Microphone preset %v channels should be consistent", quality)
}
}
// TestQualityValidationRemovalRegression tests that validation removal doesn't cause regressions
func TestQualityValidationRemovalRegression(t *testing.T) {
// This test ensures that removing validation from GET endpoints doesn't break functionality
// Test that presets are still accessible
audioPresets := GetAudioQualityPresets()
assert.NotNil(t, audioPresets, "Audio presets should be accessible after validation removal")
assert.NotEmpty(t, audioPresets, "Audio presets should not be empty")
micPresets := GetMicrophoneQualityPresets()
assert.NotNil(t, micPresets, "Microphone presets should be accessible after validation removal")
assert.NotEmpty(t, micPresets, "Microphone presets should not be empty")
// Test that quality getters still work
audioConfig := GetAudioConfig()
assert.GreaterOrEqual(t, int(audioConfig.Quality), 0, "Audio quality should be non-negative")
micConfig := GetMicrophoneConfig()
assert.GreaterOrEqual(t, int(micConfig.Quality), 0, "Microphone quality should be non-negative")
// Test that setters still work (for valid values)
originalAudio := GetAudioConfig()
originalMic := GetMicrophoneConfig()
SetAudioQuality(AudioQualityMedium)
assert.Equal(t, AudioQualityMedium, GetAudioConfig().Quality, "Audio quality setter should work")
SetMicrophoneQuality(AudioQualityHigh)
assert.Equal(t, AudioQualityHigh, GetMicrophoneConfig().Quality, "Microphone quality setter should work")
// Restore original values
SetAudioQuality(originalAudio.Quality)
SetMicrophoneQuality(originalMic.Quality)
}
// TestPerformanceAfterValidationRemoval tests that performance improved after validation removal
func TestPerformanceAfterValidationRemoval(t *testing.T) {
if testing.Short() {
t.Skip("Skipping performance test in short mode")
}
// Benchmark preset access (should be faster without validation)
const iterations = 10000
// Time audio preset access
start := time.Now()
for i := 0; i < iterations; i++ {
_ = GetAudioQualityPresets()
}
audioDuration := time.Since(start)
// Time microphone preset access
start = time.Now()
for i := 0; i < iterations; i++ {
_ = GetMicrophoneQualityPresets()
}
micDuration := time.Since(start)
t.Logf("Audio presets access time for %d iterations: %v", iterations, audioDuration)
t.Logf("Microphone presets access time for %d iterations: %v", iterations, micDuration)
// Verify reasonable performance (should complete quickly without validation overhead)
maxExpectedDuration := time.Second // Very generous limit
assert.Less(t, audioDuration, maxExpectedDuration, "Audio preset access should be fast")
assert.Less(t, micDuration, maxExpectedDuration, "Microphone preset access should be fast")
}

View File

@ -0,0 +1,120 @@
package audio
import (
"sync/atomic"
"time"
"github.com/rs/zerolog"
)
// BaseAudioMetrics provides common metrics fields for both input and output
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
type BaseAudioMetrics struct {
// Atomic int64 fields first for proper ARM32 alignment
FramesProcessed int64 `json:"frames_processed"`
FramesDropped int64 `json:"frames_dropped"`
BytesProcessed int64 `json:"bytes_processed"`
ConnectionDrops int64 `json:"connection_drops"`
// Non-atomic fields after atomic fields
LastFrameTime time.Time `json:"last_frame_time"`
AverageLatency time.Duration `json:"average_latency"`
}
// BaseAudioManager provides common functionality for audio managers
type BaseAudioManager struct {
metrics BaseAudioMetrics
logger zerolog.Logger
running int32
}
// NewBaseAudioManager creates a new base audio manager
func NewBaseAudioManager(logger zerolog.Logger) *BaseAudioManager {
return &BaseAudioManager{
logger: logger,
}
}
// IsRunning returns whether the manager is running
func (bam *BaseAudioManager) IsRunning() bool {
return atomic.LoadInt32(&bam.running) == 1
}
// setRunning atomically sets the running state
func (bam *BaseAudioManager) setRunning(running bool) bool {
if running {
return atomic.CompareAndSwapInt32(&bam.running, 0, 1)
}
return atomic.CompareAndSwapInt32(&bam.running, 1, 0)
}
// resetMetrics resets all metrics to zero
func (bam *BaseAudioManager) resetMetrics() {
atomic.StoreInt64(&bam.metrics.FramesProcessed, 0)
atomic.StoreInt64(&bam.metrics.FramesDropped, 0)
atomic.StoreInt64(&bam.metrics.BytesProcessed, 0)
atomic.StoreInt64(&bam.metrics.ConnectionDrops, 0)
bam.metrics.LastFrameTime = time.Time{}
bam.metrics.AverageLatency = 0
}
// getBaseMetrics returns a copy of the base metrics
func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics {
return BaseAudioMetrics{
FramesProcessed: atomic.LoadInt64(&bam.metrics.FramesProcessed),
FramesDropped: atomic.LoadInt64(&bam.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&bam.metrics.BytesProcessed),
ConnectionDrops: atomic.LoadInt64(&bam.metrics.ConnectionDrops),
LastFrameTime: bam.metrics.LastFrameTime,
AverageLatency: bam.metrics.AverageLatency,
}
}
// recordFrameProcessed records a processed frame
func (bam *BaseAudioManager) recordFrameProcessed(bytes int) {
atomic.AddInt64(&bam.metrics.FramesProcessed, 1)
atomic.AddInt64(&bam.metrics.BytesProcessed, int64(bytes))
bam.metrics.LastFrameTime = time.Now()
}
// recordFrameDropped records a dropped frame
func (bam *BaseAudioManager) recordFrameDropped() {
atomic.AddInt64(&bam.metrics.FramesDropped, 1)
}
// updateLatency updates the average latency
func (bam *BaseAudioManager) updateLatency(latency time.Duration) {
// Simple moving average - could be enhanced with more sophisticated algorithms
currentAvg := bam.metrics.AverageLatency
if currentAvg == 0 {
bam.metrics.AverageLatency = latency
} else {
// Weighted average: 90% old + 10% new
bam.metrics.AverageLatency = time.Duration(float64(currentAvg)*0.9 + float64(latency)*0.1)
}
}
// logComponentStart logs component start with consistent format
func (bam *BaseAudioManager) logComponentStart(component string) {
bam.logger.Info().Str("component", component).Msg("starting component")
}
// logComponentStarted logs component started with consistent format
func (bam *BaseAudioManager) logComponentStarted(component string) {
bam.logger.Info().Str("component", component).Msg("component started successfully")
}
// logComponentStop logs component stop with consistent format
func (bam *BaseAudioManager) logComponentStop(component string) {
bam.logger.Info().Str("component", component).Msg("stopping component")
}
// logComponentStopped logs component stopped with consistent format
func (bam *BaseAudioManager) logComponentStopped(component string) {
bam.logger.Info().Str("component", component).Msg("component stopped")
}
// logComponentError logs component error with consistent format
func (bam *BaseAudioManager) logComponentError(component string, err error, msg string) {
bam.logger.Error().Err(err).Str("component", component).Msg(msg)
}

View File

@ -0,0 +1,133 @@
//go:build cgo
// +build cgo
package audio
import (
"context"
"os/exec"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// BaseSupervisor provides common functionality for audio supervisors
type BaseSupervisor struct {
ctx context.Context
cancel context.CancelFunc
logger *zerolog.Logger
mutex sync.RWMutex
running int32
// Process management
cmd *exec.Cmd
processPID int
// Process monitoring
processMonitor *ProcessMonitor
// Exit tracking
lastExitCode int
lastExitTime time.Time
}
// NewBaseSupervisor creates a new base supervisor
func NewBaseSupervisor(componentName string) *BaseSupervisor {
logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger()
return &BaseSupervisor{
logger: &logger,
processMonitor: GetProcessMonitor(),
}
}
// IsRunning returns whether the supervisor is currently running
func (bs *BaseSupervisor) IsRunning() bool {
return atomic.LoadInt32(&bs.running) == 1
}
// setRunning atomically sets the running state
func (bs *BaseSupervisor) setRunning(running bool) {
if running {
atomic.StoreInt32(&bs.running, 1)
} else {
atomic.StoreInt32(&bs.running, 0)
}
}
// GetProcessPID returns the current process PID
func (bs *BaseSupervisor) GetProcessPID() int {
bs.mutex.RLock()
defer bs.mutex.RUnlock()
return bs.processPID
}
// GetLastExitInfo returns the last exit code and time
func (bs *BaseSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) {
bs.mutex.RLock()
defer bs.mutex.RUnlock()
return bs.lastExitCode, bs.lastExitTime
}
// GetProcessMetrics returns process metrics if available
func (bs *BaseSupervisor) GetProcessMetrics() *ProcessMetrics {
bs.mutex.RLock()
defer bs.mutex.RUnlock()
if bs.cmd == nil || bs.cmd.Process == nil {
return &ProcessMetrics{
PID: 0,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-server",
}
}
pid := bs.cmd.Process.Pid
if bs.processMonitor != nil {
metrics := bs.processMonitor.GetCurrentMetrics()
for _, metric := range metrics {
if metric.PID == pid {
return &metric
}
}
}
// Return default metrics if process not found in monitor
return &ProcessMetrics{
PID: pid,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-server",
}
}
// logSupervisorStart logs supervisor start event
func (bs *BaseSupervisor) logSupervisorStart() {
bs.logger.Info().Msg("Supervisor starting")
}
// logSupervisorStop logs supervisor stop event
func (bs *BaseSupervisor) logSupervisorStop() {
bs.logger.Info().Msg("Supervisor stopping")
}
// createContext creates a new context for the supervisor
func (bs *BaseSupervisor) createContext() {
bs.ctx, bs.cancel = context.WithCancel(context.Background())
}
// cancelContext cancels the supervisor context
func (bs *BaseSupervisor) cancelContext() {
if bs.cancel != nil {
bs.cancel()
}
}

View File

@ -60,6 +60,18 @@ type batchReadResult struct {
// NewBatchAudioProcessor creates a new batch audio processor
func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor {
// Validate input parameters
if err := ValidateBufferSize(batchSize); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Error().Err(err).Int("batchSize", batchSize).Msg("Invalid batch size provided, using default")
batchSize = GetConfig().BatchProcessorFramesPerBatch
}
if batchDuration <= 0 {
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
logger.Error().Dur("batchDuration", batchDuration).Msg("Invalid batch duration provided, using default")
batchDuration = GetConfig().BatchProcessingDelay
}
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
@ -117,6 +129,12 @@ func (bap *BatchAudioProcessor) Stop() {
// BatchReadEncode performs batched audio read and encode operations
func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
// Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil {
bap.logger.Debug().Err(err).Msg("Invalid buffer for batch processing")
return 0, err
}
if atomic.LoadInt32(&bap.running) == 0 {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleReads, 1)

View File

@ -4,6 +4,8 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
type AudioBufferPool struct {
@ -23,6 +25,14 @@ type AudioBufferPool struct {
}
func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
// Validate buffer size parameter
if err := ValidateBufferSize(bufferSize); err != nil {
// Log validation error and use default value
logger := logging.GetDefaultLogger().With().Str("component", "AudioBufferPool").Logger()
logger.Error().Err(err).Int("bufferSize", bufferSize).Msg("Invalid buffer size provided, using default")
bufferSize = GetConfig().AudioFramePoolSize
}
// Pre-allocate 20% of max pool size for immediate availability
preallocSize := GetConfig().PreallocPercentage
preallocated := make([]*[]byte, 0, preallocSize)

View File

@ -36,11 +36,13 @@ static int channels = 2; // Will be set from GetConfig().CGOChann
static int frame_size = 960; // Will be set from GetConfig().CGOFrameSize
static int max_packet_size = 1500; // Will be set from GetConfig().CGOMaxPacketSize
static int sleep_microseconds = 1000; // Will be set from GetConfig().CGOUsleepMicroseconds
static int max_attempts_global = 5; // Will be set from GetConfig().CGOMaxAttempts
static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMaxBackoffMicroseconds
// Function to update constants from Go configuration
void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint,
int signal_type, int bandwidth, int dtx, int sr, int ch,
int fs, int max_pkt, int sleep_us) {
int fs, int max_pkt, int sleep_us, int max_attempts, int max_backoff) {
opus_bitrate = bitrate;
opus_complexity = complexity;
opus_vbr = vbr;
@ -53,6 +55,8 @@ void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constr
frame_size = fs;
max_packet_size = max_pkt;
sleep_microseconds = sleep_us;
max_attempts_global = max_attempts;
max_backoff_us_global = max_backoff;
}
// State tracking to prevent race conditions during rapid start/stop
@ -63,13 +67,11 @@ static volatile int playback_initialized = 0;
// Enhanced ALSA device opening with exponential backoff retry logic
static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) {
int max_attempts = 5; // Increased from 3 to 5
int attempt = 0;
int err;
int backoff_us = sleep_microseconds; // Start with base sleep time
const int max_backoff_us = 500000; // Max 500ms backoff
while (attempt < max_attempts) {
while (attempt < max_attempts_global) {
err = snd_pcm_open(handle, device, stream, SND_PCM_NONBLOCK);
if (err >= 0) {
// Switch to blocking mode after successful open
@ -78,24 +80,24 @@ static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream
}
attempt++;
if (attempt >= max_attempts) break;
if (attempt >= max_attempts_global) break;
// Enhanced error handling with specific retry strategies
if (err == -EBUSY || err == -EAGAIN) {
// Device busy or temporarily unavailable - retry with backoff
usleep(backoff_us);
backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us;
backoff_us = (backoff_us * 2 < max_backoff_us_global) ? backoff_us * 2 : max_backoff_us_global;
} else if (err == -ENODEV || err == -ENOENT) {
// Device not found - longer wait as device might be initializing
usleep(backoff_us * 2);
backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us;
backoff_us = (backoff_us * 2 < max_backoff_us_global) ? backoff_us * 2 : max_backoff_us_global;
} else if (err == -EPERM || err == -EACCES) {
// Permission denied - shorter wait, likely persistent issue
usleep(backoff_us / 2);
} else {
// Other errors - standard backoff
usleep(backoff_us);
backoff_us = (backoff_us * 2 < max_backoff_us) ? backoff_us * 2 : max_backoff_us;
backoff_us = (backoff_us * 2 < max_backoff_us_global) ? backoff_us * 2 : max_backoff_us_global;
}
}
return err;
@ -649,6 +651,8 @@ func cgoAudioInit() error {
C.int(config.CGOFrameSize),
C.int(config.CGOMaxPacketSize),
C.int(config.CGOUsleepMicroseconds),
C.int(config.CGOMaxAttempts),
C.int(config.CGOMaxBackoffMicroseconds),
)
result := C.jetkvm_audio_init()

View File

@ -1,6 +1,10 @@
package audio
import "time"
import (
"time"
"github.com/jetkvm/kvm/internal/logging"
)
// AudioConfigConstants centralizes all hardcoded values used across audio components.
// This configuration system allows runtime tuning of audio performance, quality, and resource usage.
@ -1541,6 +1545,82 @@ type AudioConfigConstants struct {
// Default 8 channels provides reasonable upper bound for multi-channel audio.
MaxChannels int
// CGO Constants
// Used in: cgo_audio.go for CGO operation limits and retry logic
// Impact: Controls CGO retry behavior and backoff timing
// CGOMaxBackoffMicroseconds defines maximum backoff time in microseconds for CGO operations.
// Used in: safe_alsa_open for exponential backoff retry logic
// Impact: Prevents excessive wait times while allowing device recovery.
// Default 500000 microseconds (500ms) provides reasonable maximum wait time.
CGOMaxBackoffMicroseconds int
// CGOMaxAttempts defines maximum retry attempts for CGO operations.
// Used in: safe_alsa_open for retry limit enforcement
// Impact: Prevents infinite retry loops while allowing transient error recovery.
// Default 5 attempts provides good balance between reliability and performance.
CGOMaxAttempts int
// Validation Frame Size Limits
// Used in: validation_enhanced.go for frame duration validation
// Impact: Ensures frame sizes are within acceptable bounds for real-time audio
// MinFrameDuration defines minimum acceptable frame duration.
// Used in: ValidateAudioConfiguration for frame size validation
// Impact: Prevents excessively small frames that could impact performance.
// Default 10ms provides minimum viable frame duration for real-time audio.
MinFrameDuration time.Duration
// MaxFrameDuration defines maximum acceptable frame duration.
// Used in: ValidateAudioConfiguration for frame size validation
// Impact: Prevents excessively large frames that could impact latency.
// Default 100ms provides reasonable maximum frame duration.
MaxFrameDuration time.Duration
// Valid Sample Rates
// Used in: validation_enhanced.go for sample rate validation
// Impact: Defines the set of supported sample rates for audio processing
// ValidSampleRates defines the list of supported sample rates.
// Used in: ValidateAudioConfiguration for sample rate validation
// Impact: Ensures only supported sample rates are used in audio processing.
// Default rates support common audio standards from voice (8kHz) to professional (48kHz).
ValidSampleRates []int
// Opus Bitrate Validation Constants
// Used in: validation_enhanced.go for bitrate range validation
// Impact: Ensures bitrate values are within Opus codec specifications
// MinOpusBitrate defines the minimum valid Opus bitrate in bits per second.
// Used in: ValidateAudioConfiguration for bitrate validation
// Impact: Prevents bitrates below Opus codec minimum specification.
// Default 6000 bps is the minimum supported by Opus codec.
MinOpusBitrate int
// MaxOpusBitrate defines the maximum valid Opus bitrate in bits per second.
// Used in: ValidateAudioConfiguration for bitrate validation
// Impact: Prevents bitrates above Opus codec maximum specification.
// Default 510000 bps is the maximum supported by Opus codec.
MaxOpusBitrate int
// MaxValidationTime defines the maximum time allowed for validation operations.
// Used in: GetValidationConfig for timeout control
// Impact: Prevents validation operations from blocking indefinitely.
// Default 5s provides reasonable timeout for validation operations.
MaxValidationTime time.Duration
// MinFrameSize defines the minimum reasonable audio frame size in bytes.
// Used in: ValidateAudioFrameComprehensive for frame size validation
// Impact: Prevents processing of unreasonably small audio frames.
// Default 64 bytes ensures minimum viable audio data.
MinFrameSize int
// FrameSizeTolerance defines the tolerance for frame size validation in bytes.
// Used in: ValidateAudioFrameComprehensive for frame size matching
// Impact: Allows reasonable variation in frame sizes due to encoding.
// Default 512 bytes accommodates typical encoding variations.
FrameSizeTolerance int
// Device Health Monitoring Configuration
// Used in: device_health.go for proactive device monitoring and recovery
// Impact: Controls health check frequency and recovery thresholds
@ -2607,6 +2687,26 @@ func DefaultAudioConfig() *AudioConfigConstants {
MaxSampleRate: 48000, // 48kHz maximum sample rate
MaxChannels: 8, // 8 maximum audio channels
// CGO Constants
CGOMaxBackoffMicroseconds: 500000, // 500ms maximum backoff in microseconds
CGOMaxAttempts: 5, // 5 maximum retry attempts
// Validation Frame Size Limits
MinFrameDuration: 10 * time.Millisecond, // 10ms minimum frame duration
MaxFrameDuration: 100 * time.Millisecond, // 100ms maximum frame duration
// Valid Sample Rates
ValidSampleRates: []int{8000, 12000, 16000, 22050, 24000, 44100, 48000}, // Supported sample rates
// Opus Bitrate Validation Constants
MinOpusBitrate: 6000, // 6000 bps minimum Opus bitrate
MaxOpusBitrate: 510000, // 510000 bps maximum Opus bitrate
// Validation Configuration
MaxValidationTime: 5 * time.Second, // 5s maximum validation timeout
MinFrameSize: 64, // 64 bytes minimum frame size
FrameSizeTolerance: 512, // 512 bytes frame size tolerance
// Device Health Monitoring Configuration
HealthCheckIntervalMS: 5000, // 5000ms (5s) health check interval
HealthRecoveryThreshold: 3, // 3 consecutive successes for recovery
@ -2630,7 +2730,17 @@ var audioConfigInstance = DefaultAudioConfig()
// UpdateConfig allows runtime configuration updates
func UpdateConfig(newConfig *AudioConfigConstants) {
// Validate the new configuration before applying it
if err := ValidateAudioConfigConstants(newConfig); err != nil {
// Log validation error and keep current configuration
logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger()
logger.Error().Err(err).Msg("Configuration validation failed, keeping current configuration")
return
}
audioConfigInstance = newConfig
logger := logging.GetDefaultLogger().With().Str("component", "AudioConfig").Logger()
logger.Info().Msg("Audio configuration updated successfully")
}
// GetConfig returns the current configuration

View File

@ -6,79 +6,75 @@ import (
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AudioInputMetrics holds metrics for microphone input
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
type AudioInputMetrics struct {
FramesSent int64 // Total frames sent
FramesDropped int64 // Total frames dropped
BytesProcessed int64 // Total bytes processed
ConnectionDrops int64 // Connection drops
AverageLatency time.Duration // time.Duration is int64
LastFrameTime time.Time
// Atomic int64 field first for proper ARM32 alignment
FramesSent int64 `json:"frames_sent"` // Total frames sent (input-specific)
// Embedded struct with atomic fields properly aligned
BaseAudioMetrics
}
// AudioInputManager manages microphone input stream using IPC mode only
type AudioInputManager struct {
metrics AudioInputMetrics
*BaseAudioManager
ipcManager *AudioInputIPCManager
logger zerolog.Logger
running int32
framesSent int64 // Input-specific metric
}
// NewAudioInputManager creates a new audio input manager (IPC mode only)
// NewAudioInputManager creates a new audio input manager
func NewAudioInputManager() *AudioInputManager {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputManagerComponent).Logger()
return &AudioInputManager{
ipcManager: NewAudioInputIPCManager(),
logger: logging.GetDefaultLogger().With().Str("component", AudioInputManagerComponent).Logger(),
BaseAudioManager: NewBaseAudioManager(logger),
ipcManager: NewAudioInputIPCManager(),
}
}
// Start begins processing microphone input
func (aim *AudioInputManager) Start() error {
if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) {
if !aim.setRunning(true) {
return fmt.Errorf("audio input manager is already running")
}
aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("starting component")
aim.logComponentStart(AudioInputManagerComponent)
// Start the IPC-based audio input
err := aim.ipcManager.Start()
if err != nil {
aim.logger.Error().Err(err).Str("component", AudioInputManagerComponent).Msg("failed to start component")
aim.logComponentError(AudioInputManagerComponent, err, "failed to start component")
// Ensure proper cleanup on error
atomic.StoreInt32(&aim.running, 0)
aim.setRunning(false)
// Reset metrics on failed start
aim.resetMetrics()
return err
}
aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("component started successfully")
aim.logComponentStarted(AudioInputManagerComponent)
return nil
}
// Stop stops processing microphone input
func (aim *AudioInputManager) Stop() {
if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) {
if !aim.setRunning(false) {
return // Already stopped
}
aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("stopping component")
aim.logComponentStop(AudioInputManagerComponent)
// Stop the IPC-based audio input
aim.ipcManager.Stop()
aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("component stopped")
aim.logComponentStopped(AudioInputManagerComponent)
}
// resetMetrics resets all metrics to zero
func (aim *AudioInputManager) resetMetrics() {
atomic.StoreInt64(&aim.metrics.FramesSent, 0)
atomic.StoreInt64(&aim.metrics.FramesDropped, 0)
atomic.StoreInt64(&aim.metrics.BytesProcessed, 0)
atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0)
aim.BaseAudioManager.resetMetrics()
atomic.StoreInt64(&aim.framesSent, 0)
}
// WriteOpusFrame writes an Opus frame to the audio input system with latency tracking
@ -87,6 +83,12 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
return nil // Not running, silently drop
}
// Validate frame before processing
if err := ValidateFrameData(frame); err != nil {
aim.logComponentError(AudioInputManagerComponent, err, "Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
// Track end-to-end latency from WebRTC to IPC
startTime := time.Now()
err := aim.ipcManager.WriteOpusFrame(frame)
@ -105,10 +107,9 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
}
// Update metrics
atomic.AddInt64(&aim.metrics.FramesSent, 1)
atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame)))
aim.metrics.LastFrameTime = time.Now()
aim.metrics.AverageLatency = processingTime
atomic.AddInt64(&aim.framesSent, 1)
aim.recordFrameProcessed(len(frame))
aim.updateLatency(processingTime)
return nil
}
@ -141,21 +142,17 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
}
// Update metrics
atomic.AddInt64(&aim.metrics.FramesSent, 1)
atomic.AddInt64(&aim.metrics.BytesProcessed, int64(frame.Length()))
aim.metrics.LastFrameTime = time.Now()
aim.metrics.AverageLatency = processingTime
atomic.AddInt64(&aim.framesSent, 1)
aim.recordFrameProcessed(frame.Length())
aim.updateLatency(processingTime)
return nil
}
// GetMetrics returns current audio input metrics
// GetMetrics returns current metrics
func (aim *AudioInputManager) GetMetrics() AudioInputMetrics {
return AudioInputMetrics{
FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent),
FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed),
AverageLatency: aim.metrics.AverageLatency,
LastFrameTime: aim.metrics.LastFrameTime,
FramesSent: atomic.LoadInt64(&aim.framesSent),
BaseAudioMetrics: aim.getBaseMetrics(),
}
}
@ -209,10 +206,7 @@ func (aim *AudioInputManager) LogPerformanceStats() {
Msg("Audio input performance metrics")
}
// IsRunning returns whether the audio input manager is running
func (aim *AudioInputManager) IsRunning() bool {
return atomic.LoadInt32(&aim.running) == 1
}
// Note: IsRunning() is inherited from BaseAudioManager
// IsReady returns whether the audio input manager is ready to receive frames
// This checks both that it's running and that the IPC connection is established

View File

@ -301,7 +301,7 @@ func (ais *AudioInputServer) acceptConnections() {
if err != nil {
if ais.running {
// Log error and continue accepting
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger()
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Warn().Err(err).Msg("Failed to accept connection, retrying")
continue
}
@ -311,7 +311,7 @@ func (ais *AudioInputServer) acceptConnections() {
// Configure socket buffers for optimal performance
if err := ConfigureSocketBuffers(conn, ais.socketBufferConfig); err != nil {
// Log warning but don't fail - socket buffer optimization is not critical
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger()
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Warn().Err(err).Msg("Failed to configure socket buffers, continuing with defaults")
} else {
// Record socket buffer metrics for monitoring
@ -458,6 +458,13 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error {
return nil // Empty frame, ignore
}
// Validate frame data before processing
if err := ValidateFrameData(data); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
// Process the Opus frame using CGO
_, err := CGOAudioDecodeWrite(data)
return err
@ -465,6 +472,18 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error {
// processConfig processes a configuration update
func (ais *AudioInputServer) processConfig(data []byte) error {
// Validate configuration data
if len(data) == 0 {
return fmt.Errorf("empty configuration data")
}
// Basic validation for configuration size
if err := ValidateBufferSize(len(data)); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Error().Err(err).Msg("Configuration buffer validation failed")
return fmt.Errorf("configuration validation failed: %w", err)
}
// Acknowledge configuration receipt
return ais.sendAck()
}
@ -596,6 +615,13 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error {
return nil // Empty frame, ignore
}
// Validate frame data before sending
if err := ValidateFrameData(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
if len(frame) > maxFrameSize {
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize)
}
@ -624,6 +650,13 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error
return nil // Empty frame, ignore
}
// Validate zero-copy frame before sending
if err := ValidateZeroCopyFrame(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Zero-copy frame validation failed")
return fmt.Errorf("input frame validation failed: %w", err)
}
if frame.Length() > maxFrameSize {
return fmt.Errorf("frame too large: got %d bytes, maximum allowed %d bytes", frame.Length(), maxFrameSize)
}
@ -649,6 +682,13 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
return fmt.Errorf("not connected to audio input server")
}
// Validate configuration parameters
if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Configuration validation failed")
return fmt.Errorf("input configuration validation failed: %w", err)
}
// Serialize config (simple binary format)
data := make([]byte, 12) // 3 * int32
binary.LittleEndian.PutUint32(data[0:4], uint32(config.SampleRate))
@ -735,7 +775,7 @@ func (ais *AudioInputServer) startReaderGoroutine() {
baseBackoffDelay := GetConfig().RetryDelay
maxBackoffDelay := GetConfig().MaxRetryDelay
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-reader").Logger()
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
for {
select {
@ -820,7 +860,7 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
defer runtime.UnlockOSThread()
// Set high priority for audio processing
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-processor").Logger()
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if err := SetAudioThreadPriority(); err != nil {
logger.Warn().Err(err).Msg("Failed to set audio processing priority")
}
@ -937,7 +977,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
defer runtime.UnlockOSThread()
// Set I/O priority for monitoring
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-monitor").Logger()
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if err := SetAudioIOThreadPriority(); err != nil {
logger.Warn().Err(err).Msg("Failed to set audio I/O priority")
}

View File

@ -49,6 +49,17 @@ func (aim *AudioInputIPCManager) Start() error {
FrameSize: GetConfig().InputIPCFrameSize,
}
// Validate configuration before using it
if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
aim.logger.Error().Err(err).Msg("Invalid input IPC config from constants, using defaults")
// Use safe defaults if config validation fails
config = InputIPCConfig{
SampleRate: 48000,
Channels: 2,
FrameSize: 960,
}
}
// Wait for subprocess readiness
time.Sleep(GetConfig().LongSleepDuration)
@ -91,6 +102,13 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
return nil // Empty frame, ignore
}
// Validate frame data
if err := ValidateFrameData(frame); err != nil {
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
aim.logger.Debug().Err(err).Msg("Invalid frame data")
return err
}
// Start latency measurement
startTime := time.Now()
@ -125,6 +143,13 @@ func (aim *AudioInputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFram
return nil // Empty frame, ignore
}
// Validate zero-copy frame
if err := ValidateZeroCopyFrame(frame); err != nil {
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
aim.logger.Debug().Err(err).Msg("Invalid zero-copy frame")
return err
}
// Start latency measurement
startTime := time.Now()
@ -166,12 +191,15 @@ func (aim *AudioInputIPCManager) IsReady() bool {
// GetMetrics returns current metrics
func (aim *AudioInputIPCManager) GetMetrics() AudioInputMetrics {
return AudioInputMetrics{
FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent),
FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed),
ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops),
AverageLatency: aim.metrics.AverageLatency,
LastFrameTime: aim.metrics.LastFrameTime,
FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent),
BaseAudioMetrics: BaseAudioMetrics{
FramesProcessed: atomic.LoadInt64(&aim.metrics.FramesProcessed),
FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed),
ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops),
AverageLatency: aim.metrics.AverageLatency,
LastFrameTime: aim.metrics.LastFrameTime,
},
}
}

View File

@ -1,50 +1,38 @@
package audio
import (
"context"
"fmt"
"os"
"os/exec"
"sync"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AudioInputSupervisor manages the audio input server subprocess
type AudioInputSupervisor struct {
cmd *exec.Cmd
cancel context.CancelFunc
mtx sync.Mutex
running bool
logger zerolog.Logger
client *AudioInputClient
processMonitor *ProcessMonitor
*BaseSupervisor
client *AudioInputClient
}
// NewAudioInputSupervisor creates a new audio input supervisor
func NewAudioInputSupervisor() *AudioInputSupervisor {
return &AudioInputSupervisor{
logger: logging.GetDefaultLogger().With().Str("component", "audio-input-supervisor").Logger(),
BaseSupervisor: NewBaseSupervisor("audio-input-supervisor"),
client: NewAudioInputClient(),
processMonitor: GetProcessMonitor(),
}
}
// Start starts the audio input server subprocess
func (ais *AudioInputSupervisor) Start() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
ais.mutex.Lock()
defer ais.mutex.Unlock()
if ais.running {
if ais.IsRunning() {
return fmt.Errorf("audio input supervisor already running with PID %d", ais.cmd.Process.Pid)
}
// Create context for subprocess management
ctx, cancel := context.WithCancel(context.Background())
ais.cancel = cancel
ais.createContext()
// Get current executable path
execPath, err := os.Executable()
@ -53,7 +41,7 @@ func (ais *AudioInputSupervisor) Start() error {
}
// Create command for audio input server subprocess
cmd := exec.CommandContext(ctx, execPath, "--audio-input-server")
cmd := exec.CommandContext(ais.ctx, execPath, "--audio-input-server")
cmd.Env = append(os.Environ(),
"JETKVM_AUDIO_INPUT_IPC=true", // Enable IPC mode
)
@ -64,13 +52,13 @@ func (ais *AudioInputSupervisor) Start() error {
}
ais.cmd = cmd
ais.running = true
ais.setRunning(true)
// Start the subprocess
err = cmd.Start()
if err != nil {
ais.running = false
cancel()
ais.setRunning(false)
ais.cancelContext()
return fmt.Errorf("failed to start audio input server process: %w", err)
}
@ -90,14 +78,14 @@ func (ais *AudioInputSupervisor) Start() error {
// Stop stops the audio input server subprocess
func (ais *AudioInputSupervisor) Stop() {
ais.mtx.Lock()
defer ais.mtx.Unlock()
ais.mutex.Lock()
defer ais.mutex.Unlock()
if !ais.running {
if !ais.IsRunning() {
return
}
ais.running = false
ais.logSupervisorStop()
// Disconnect client first
if ais.client != nil {
@ -105,9 +93,7 @@ func (ais *AudioInputSupervisor) Stop() {
}
// Cancel context to signal subprocess to stop
if ais.cancel != nil {
ais.cancel()
}
ais.cancelContext()
// Try graceful termination first
if ais.cmd != nil && ais.cmd.Process != nil {
@ -138,19 +124,14 @@ func (ais *AudioInputSupervisor) Stop() {
}
}
ais.setRunning(false)
ais.cmd = nil
ais.cancel = nil
}
// IsRunning returns whether the supervisor is running
func (ais *AudioInputSupervisor) IsRunning() bool {
ais.mtx.Lock()
defer ais.mtx.Unlock()
return ais.running
}
// IsConnected returns whether the client is connected to the audio input server
func (ais *AudioInputSupervisor) IsConnected() bool {
ais.mutex.Lock()
defer ais.mutex.Unlock()
if !ais.IsRunning() {
return false
}
@ -162,41 +143,11 @@ func (ais *AudioInputSupervisor) GetClient() *AudioInputClient {
return ais.client
}
// GetProcessMetrics returns current process metrics if the process is running
// GetProcessMetrics returns current process metrics with audio-input-server name
func (ais *AudioInputSupervisor) GetProcessMetrics() *ProcessMetrics {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.cmd == nil || ais.cmd.Process == nil {
// Return default metrics when no process is running
return &ProcessMetrics{
PID: 0,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-input-server",
}
}
pid := ais.cmd.Process.Pid
metrics := ais.processMonitor.GetCurrentMetrics()
for _, metric := range metrics {
if metric.PID == pid {
return &metric
}
}
// Return default metrics if process not found in monitoring
return &ProcessMetrics{
PID: pid,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-input-server",
}
metrics := ais.BaseSupervisor.GetProcessMetrics()
metrics.ProcessName = "audio-input-server"
return metrics
}
// monitorSubprocess monitors the subprocess and handles unexpected exits
@ -211,10 +162,10 @@ func (ais *AudioInputSupervisor) monitorSubprocess() {
// Remove process from monitoring
ais.processMonitor.RemoveProcess(pid)
ais.mtx.Lock()
defer ais.mtx.Unlock()
ais.mutex.Lock()
defer ais.mutex.Unlock()
if ais.running {
if ais.IsRunning() {
// Unexpected exit
if err != nil {
ais.logger.Error().Err(err).Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly")
@ -228,7 +179,7 @@ func (ais *AudioInputSupervisor) monitorSubprocess() {
}
// Mark as not running
ais.running = false
ais.setRunning(false)
ais.cmd = nil
ais.logger.Info().Int("pid", pid).Msg("Audio input server subprocess monitoring stopped")

View File

@ -181,12 +181,15 @@ func TestAudioInputManagerMultipleStartStop(t *testing.T) {
func TestAudioInputMetrics(t *testing.T) {
metrics := &AudioInputMetrics{
FramesSent: 100,
FramesDropped: 5,
BytesProcessed: 1024,
ConnectionDrops: 2,
AverageLatency: time.Millisecond * 10,
LastFrameTime: time.Now(),
BaseAudioMetrics: BaseAudioMetrics{
FramesProcessed: 100,
FramesDropped: 5,
BytesProcessed: 1024,
ConnectionDrops: 2,
AverageLatency: time.Millisecond * 10,
LastFrameTime: time.Now(),
},
FramesSent: 100,
}
assert.Equal(t, int64(100), metrics.FramesSent)

View File

@ -23,6 +23,13 @@ var (
// Output IPC constants are now centralized in config_constants.go
// outputMaxFrameSize, outputWriteTimeout, outputMaxDroppedFrames, outputHeaderSize, outputMessagePoolSize
// OutputIPCConfig represents configuration for audio output
type OutputIPCConfig struct {
SampleRate int
Channels int
FrameSize int
}
// OutputMessageType represents the type of IPC message
type OutputMessageType uint8
@ -106,7 +113,7 @@ func NewAudioOutputServer() (*AudioOutputServer, error) {
// Initialize latency monitoring
latencyConfig := DefaultLatencyConfig()
logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", "audio-server").Logger()
logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", AudioOutputServerComponent).Logger()
latencyMonitor := NewLatencyMonitor(latencyConfig, logger)
// Initialize adaptive buffer manager with default config
@ -160,7 +167,7 @@ func (s *AudioOutputServer) Start() error {
// acceptConnections accepts incoming connections
func (s *AudioOutputServer) acceptConnections() {
logger := logging.GetDefaultLogger().With().Str("component", "audio-server").Logger()
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
for s.running {
conn, err := s.listener.Accept()
if err != nil {
@ -253,6 +260,14 @@ func (s *AudioOutputServer) Close() error {
}
func (s *AudioOutputServer) SendFrame(frame []byte) error {
// Comprehensive frame validation
if err := ValidateFrameData(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("output frame validation failed: %w", err)
}
// Additional output-specific size check
maxFrameSize := GetConfig().OutputMaxFrameSize
if len(frame) > maxFrameSize {
return fmt.Errorf("output frame size validation failed: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize)

View File

@ -94,6 +94,13 @@ func DefaultLatencyConfig() LatencyConfig {
// NewLatencyMonitor creates a new latency monitoring system
func NewLatencyMonitor(config LatencyConfig, logger zerolog.Logger) *LatencyMonitor {
// Validate latency configuration
if err := ValidateLatencyConfig(config); err != nil {
// Log validation error and use default configuration
logger.Error().Err(err).Msg("Invalid latency configuration provided, using defaults")
config = DefaultLatencyConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &LatencyMonitor{

View File

@ -0,0 +1,211 @@
package audio
import (
"fmt"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
// AudioOutputIPCManager manages audio output using IPC when enabled
type AudioOutputIPCManager struct {
*BaseAudioManager
server *AudioOutputServer
}
// NewAudioOutputIPCManager creates a new IPC-based audio output manager
func NewAudioOutputIPCManager() *AudioOutputIPCManager {
return &AudioOutputIPCManager{
BaseAudioManager: NewBaseAudioManager(logging.GetDefaultLogger().With().Str("component", AudioOutputIPCComponent).Logger()),
}
}
// Start initializes and starts the audio output IPC manager
func (aom *AudioOutputIPCManager) Start() error {
aom.logComponentStart(AudioOutputIPCComponent)
// Create and start the IPC server
server, err := NewAudioOutputServer()
if err != nil {
aom.logComponentError(AudioOutputIPCComponent, err, "failed to create IPC server")
return err
}
if err := server.Start(); err != nil {
aom.logComponentError(AudioOutputIPCComponent, err, "failed to start IPC server")
return err
}
aom.server = server
aom.setRunning(true)
aom.logComponentStarted(AudioOutputIPCComponent)
// Send initial configuration
config := OutputIPCConfig{
SampleRate: GetConfig().SampleRate,
Channels: GetConfig().Channels,
FrameSize: int(GetConfig().AudioQualityMediumFrameSize.Milliseconds()),
}
if err := aom.SendConfig(config); err != nil {
aom.logger.Warn().Err(err).Msg("Failed to send initial configuration")
}
return nil
}
// Stop gracefully shuts down the audio output IPC manager
func (aom *AudioOutputIPCManager) Stop() {
aom.logComponentStop(AudioOutputIPCComponent)
if aom.server != nil {
aom.server.Stop()
aom.server = nil
}
aom.setRunning(false)
aom.resetMetrics()
aom.logComponentStopped(AudioOutputIPCComponent)
}
// resetMetrics resets all metrics to zero
func (aom *AudioOutputIPCManager) resetMetrics() {
aom.BaseAudioManager.resetMetrics()
}
// WriteOpusFrame sends an Opus frame to the output server
func (aom *AudioOutputIPCManager) WriteOpusFrame(frame *ZeroCopyAudioFrame) error {
if !aom.IsRunning() {
return fmt.Errorf("audio output IPC manager not running")
}
if aom.server == nil {
return fmt.Errorf("audio output server not initialized")
}
// Validate frame before processing
if err := ValidateZeroCopyFrame(frame); err != nil {
aom.logComponentError(AudioOutputIPCComponent, err, "Frame validation failed")
return fmt.Errorf("output frame validation failed: %w", err)
}
start := time.Now()
// Send frame to IPC server
if err := aom.server.SendFrame(frame.Data()); err != nil {
aom.recordFrameDropped()
return err
}
// Update metrics
processingTime := time.Since(start)
aom.recordFrameProcessed(frame.Length())
aom.updateLatency(processingTime)
return nil
}
// WriteOpusFrameZeroCopy writes an Opus audio frame using zero-copy optimization
func (aom *AudioOutputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame) error {
if !aom.IsRunning() {
return fmt.Errorf("audio output IPC manager not running")
}
if aom.server == nil {
return fmt.Errorf("audio output server not initialized")
}
start := time.Now()
// Extract frame data
frameData := frame.Data()
// Send frame to IPC server (zero-copy not available, use regular send)
if err := aom.server.SendFrame(frameData); err != nil {
aom.recordFrameDropped()
return err
}
// Update metrics
processingTime := time.Since(start)
aom.recordFrameProcessed(len(frameData))
aom.updateLatency(processingTime)
return nil
}
// IsReady returns true if the IPC manager is ready to process frames
func (aom *AudioOutputIPCManager) IsReady() bool {
return aom.IsRunning() && aom.server != nil
}
// GetMetrics returns current audio output metrics
func (aom *AudioOutputIPCManager) GetMetrics() AudioOutputMetrics {
baseMetrics := aom.getBaseMetrics()
return AudioOutputMetrics{
FramesReceived: atomic.LoadInt64(&baseMetrics.FramesProcessed), // For output, processed = received
BaseAudioMetrics: baseMetrics,
}
}
// GetDetailedMetrics returns detailed metrics including server statistics
func (aom *AudioOutputIPCManager) GetDetailedMetrics() (AudioOutputMetrics, map[string]interface{}) {
metrics := aom.GetMetrics()
detailed := make(map[string]interface{})
if aom.server != nil {
total, dropped, bufferSize := aom.server.GetServerStats()
detailed["server_total_frames"] = total
detailed["server_dropped_frames"] = dropped
detailed["server_buffer_size"] = bufferSize
detailed["server_frame_rate"] = aom.calculateFrameRate()
}
return metrics, detailed
}
// calculateFrameRate calculates the current frame processing rate
func (aom *AudioOutputIPCManager) calculateFrameRate() float64 {
baseMetrics := aom.getBaseMetrics()
framesProcessed := atomic.LoadInt64(&baseMetrics.FramesProcessed)
if framesProcessed == 0 {
return 0.0
}
// Calculate rate based on last frame time
baseMetrics = aom.getBaseMetrics()
if baseMetrics.LastFrameTime.IsZero() {
return 0.0
}
elapsed := time.Since(baseMetrics.LastFrameTime)
if elapsed.Seconds() == 0 {
return 0.0
}
return float64(framesProcessed) / elapsed.Seconds()
}
// SendConfig sends configuration to the IPC server
func (aom *AudioOutputIPCManager) SendConfig(config OutputIPCConfig) error {
if aom.server == nil {
return fmt.Errorf("audio output server not initialized")
}
// Validate configuration parameters
if err := ValidateOutputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
aom.logger.Error().Err(err).Msg("Configuration validation failed")
return fmt.Errorf("output configuration validation failed: %w", err)
}
// Note: AudioOutputServer doesn't have SendConfig method yet
// This is a placeholder for future implementation
aom.logger.Info().Interface("config", config).Msg("configuration received")
return nil
}
// GetServer returns the underlying IPC server (for testing)
func (aom *AudioOutputIPCManager) GetServer() *AudioOutputServer {
return aom.server
}

View File

@ -2,60 +2,56 @@ package audio
import (
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AudioOutputManager manages audio output stream using IPC mode
type AudioOutputManager struct {
metrics AudioOutputMetrics
streamer *AudioOutputStreamer
logger zerolog.Logger
running int32
*BaseAudioManager
streamer *AudioOutputStreamer
framesReceived int64 // Output-specific metric
}
// AudioOutputMetrics tracks output-specific metrics
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
type AudioOutputMetrics struct {
FramesReceived int64
FramesDropped int64
BytesProcessed int64
ConnectionDrops int64
LastFrameTime time.Time
AverageLatency time.Duration
// Atomic int64 field first for proper ARM32 alignment
FramesReceived int64 `json:"frames_received"` // Total frames received (output-specific)
// Embedded struct with atomic fields properly aligned
BaseAudioMetrics
}
// NewAudioOutputManager creates a new audio output manager
func NewAudioOutputManager() *AudioOutputManager {
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger()
streamer, err := NewAudioOutputStreamer()
if err != nil {
// Log error but continue with nil streamer - will be handled gracefully
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger()
logger.Error().Err(err).Msg("Failed to create audio output streamer")
}
return &AudioOutputManager{
streamer: streamer,
logger: logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger(),
BaseAudioManager: NewBaseAudioManager(logger),
streamer: streamer,
}
}
// Start starts the audio output manager
func (aom *AudioOutputManager) Start() error {
if !atomic.CompareAndSwapInt32(&aom.running, 0, 1) {
if !aom.setRunning(true) {
return nil // Already running
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("starting component")
aom.logComponentStart(AudioOutputManagerComponent)
if aom.streamer == nil {
// Try to recreate streamer if it was nil
streamer, err := NewAudioOutputStreamer()
if err != nil {
atomic.StoreInt32(&aom.running, 0)
aom.logger.Error().Err(err).Str("component", AudioOutputManagerComponent).Msg("failed to create audio output streamer")
aom.setRunning(false)
aom.logComponentError(AudioOutputManagerComponent, err, "failed to create audio output streamer")
return err
}
aom.streamer = streamer
@ -63,44 +59,39 @@ func (aom *AudioOutputManager) Start() error {
err := aom.streamer.Start()
if err != nil {
atomic.StoreInt32(&aom.running, 0)
aom.setRunning(false)
// Reset metrics on failed start
aom.resetMetrics()
aom.logger.Error().Err(err).Str("component", AudioOutputManagerComponent).Msg("failed to start component")
aom.logComponentError(AudioOutputManagerComponent, err, "failed to start component")
return err
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("component started successfully")
aom.logComponentStarted(AudioOutputManagerComponent)
return nil
}
// Stop stops the audio output manager
func (aom *AudioOutputManager) Stop() {
if !atomic.CompareAndSwapInt32(&aom.running, 1, 0) {
if !aom.setRunning(false) {
return // Already stopped
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("stopping component")
aom.logComponentStop(AudioOutputManagerComponent)
if aom.streamer != nil {
aom.streamer.Stop()
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("component stopped")
aom.logComponentStopped(AudioOutputManagerComponent)
}
// resetMetrics resets all metrics to zero
func (aom *AudioOutputManager) resetMetrics() {
atomic.StoreInt64(&aom.metrics.FramesReceived, 0)
atomic.StoreInt64(&aom.metrics.FramesDropped, 0)
atomic.StoreInt64(&aom.metrics.BytesProcessed, 0)
atomic.StoreInt64(&aom.metrics.ConnectionDrops, 0)
aom.BaseAudioManager.resetMetrics()
atomic.StoreInt64(&aom.framesReceived, 0)
}
// IsRunning returns whether the audio output manager is running
func (aom *AudioOutputManager) IsRunning() bool {
return atomic.LoadInt32(&aom.running) == 1
}
// Note: IsRunning() is inherited from BaseAudioManager
// IsReady returns whether the audio output manager is ready to receive frames
func (aom *AudioOutputManager) IsReady() bool {
@ -115,12 +106,8 @@ func (aom *AudioOutputManager) IsReady() bool {
// GetMetrics returns current metrics
func (aom *AudioOutputManager) GetMetrics() AudioOutputMetrics {
return AudioOutputMetrics{
FramesReceived: atomic.LoadInt64(&aom.metrics.FramesReceived),
FramesDropped: atomic.LoadInt64(&aom.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aom.metrics.BytesProcessed),
ConnectionDrops: atomic.LoadInt64(&aom.metrics.ConnectionDrops),
AverageLatency: aom.metrics.AverageLatency,
LastFrameTime: aom.metrics.LastFrameTime,
FramesReceived: atomic.LoadInt64(&aom.framesReceived),
BaseAudioMetrics: aom.getBaseMetrics(),
}
}
@ -131,6 +118,7 @@ func (aom *AudioOutputManager) GetComprehensiveMetrics() map[string]interface{}
comprehensiveMetrics := map[string]interface{}{
"manager": map[string]interface{}{
"frames_received": baseMetrics.FramesReceived,
"frames_processed": baseMetrics.FramesProcessed,
"frames_dropped": baseMetrics.FramesDropped,
"bytes_processed": baseMetrics.BytesProcessed,
"connection_drops": baseMetrics.ConnectionDrops,

View File

@ -391,6 +391,14 @@ func StartAudioOutputStreaming(send func([]byte)) error {
frame := GetAudioFrameBuffer()
frame = frame[:n] // Resize to actual frame size
copy(frame, buffer[:n])
// Validate frame before sending
if err := ValidateAudioFrameFast(frame); err != nil {
getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame")
PutAudioFrameBuffer(frame)
continue
}
send(frame)
// Return buffer to pool after sending
PutAudioFrameBuffer(frame)

View File

@ -0,0 +1,362 @@
//go:build cgo
// +build cgo
package audio
import (
"fmt"
"net"
"os"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestRegressionScenarios tests critical edge cases and error conditions
// that could cause system instability in production
func TestRegressionScenarios(t *testing.T) {
tests := []struct {
name string
testFunc func(t *testing.T)
description string
}{
{
name: "IPCConnectionFailure",
testFunc: testIPCConnectionFailureRecovery,
description: "Test IPC connection failure and recovery scenarios",
},
{
name: "BufferOverflow",
testFunc: testBufferOverflowHandling,
description: "Test buffer overflow protection and recovery",
},
{
name: "SupervisorRapidRestart",
testFunc: testSupervisorRapidRestartScenario,
description: "Test supervisor behavior under rapid restart conditions",
},
{
name: "ConcurrentStartStop",
testFunc: testConcurrentStartStopOperations,
description: "Test concurrent start/stop operations for race conditions",
},
{
name: "MemoryLeakPrevention",
testFunc: testMemoryLeakPrevention,
description: "Test memory leak prevention in long-running scenarios",
},
{
name: "ConfigValidationEdgeCases",
testFunc: testConfigValidationEdgeCases,
description: "Test configuration validation with edge case values",
},
{
name: "AtomicOperationConsistency",
testFunc: testAtomicOperationConsistency,
description: "Test atomic operations consistency under high concurrency",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Logf("Running regression test: %s - %s", tt.name, tt.description)
tt.testFunc(t)
})
}
}
// testIPCConnectionFailureRecovery tests IPC connection failure scenarios
func testIPCConnectionFailureRecovery(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Test start with no IPC server available (should handle gracefully)
err := manager.Start()
// Should not panic or crash, may return error depending on implementation
if err != nil {
t.Logf("Expected error when no IPC server available: %v", err)
}
// Test that manager can recover after IPC becomes available
if manager.IsRunning() {
manager.Stop()
}
// Verify clean state after failure
assert.False(t, manager.IsRunning())
assert.False(t, manager.IsReady())
}
// testBufferOverflowHandling tests buffer overflow protection
func testBufferOverflowHandling(t *testing.T) {
// Test with extremely large buffer sizes
extremelyLargeSize := 1024 * 1024 * 100 // 100MB
err := ValidateBufferSize(extremelyLargeSize)
assert.Error(t, err, "Should reject extremely large buffer sizes")
// Test with negative buffer sizes
err = ValidateBufferSize(-1)
assert.Error(t, err, "Should reject negative buffer sizes")
// Test with zero buffer size
err = ValidateBufferSize(0)
assert.Error(t, err, "Should reject zero buffer size")
// Test with maximum valid buffer size
maxValidSize := GetConfig().SocketMaxBuffer
err = ValidateBufferSize(int(maxValidSize))
assert.NoError(t, err, "Should accept maximum valid buffer size")
}
// testSupervisorRapidRestartScenario tests supervisor under rapid restart conditions
func testSupervisorRapidRestartScenario(t *testing.T) {
if testing.Short() {
t.Skip("Skipping rapid restart test in short mode")
}
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Perform rapid start/stop cycles to test for race conditions
for i := 0; i < 10; i++ {
err := supervisor.Start()
if err != nil {
t.Logf("Start attempt %d failed (expected in test environment): %v", i, err)
}
// Very short delay to stress test
time.Sleep(10 * time.Millisecond)
supervisor.Stop()
time.Sleep(10 * time.Millisecond)
}
// Verify supervisor is in clean state after rapid cycling
assert.False(t, supervisor.IsRunning())
}
// testConcurrentStartStopOperations tests concurrent operations for race conditions
func testConcurrentStartStopOperations(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
var wg sync.WaitGroup
const numGoroutines = 10
// Launch multiple goroutines trying to start/stop concurrently
for i := 0; i < numGoroutines; i++ {
wg.Add(2)
// Start goroutine
go func(id int) {
defer wg.Done()
err := manager.Start()
if err != nil {
t.Logf("Concurrent start %d: %v", id, err)
}
}(i)
// Stop goroutine
go func(id int) {
defer wg.Done()
time.Sleep(5 * time.Millisecond) // Small delay
manager.Stop()
}(i)
}
wg.Wait()
// Ensure final state is consistent
manager.Stop() // Final cleanup
assert.False(t, manager.IsRunning())
}
// testMemoryLeakPrevention tests for memory leaks in long-running scenarios
func testMemoryLeakPrevention(t *testing.T) {
if testing.Short() {
t.Skip("Skipping memory leak test in short mode")
}
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Simulate long-running operation with periodic restarts
for cycle := 0; cycle < 5; cycle++ {
err := manager.Start()
if err != nil {
t.Logf("Start cycle %d failed (expected): %v", cycle, err)
}
// Simulate some activity
time.Sleep(100 * time.Millisecond)
// Get metrics to ensure they're not accumulating indefinitely
metrics := manager.GetMetrics()
assert.NotNil(t, metrics, "Metrics should be available")
manager.Stop()
time.Sleep(50 * time.Millisecond)
}
// Final verification
assert.False(t, manager.IsRunning())
}
// testConfigValidationEdgeCases tests configuration validation with edge cases
func testConfigValidationEdgeCases(t *testing.T) {
// Test sample rate edge cases
testCases := []struct {
sampleRate int
channels int
frameSize int
shouldPass bool
description string
}{
{0, 2, 960, false, "zero sample rate"},
{-1, 2, 960, false, "negative sample rate"},
{1, 2, 960, false, "extremely low sample rate"},
{999999, 2, 960, false, "extremely high sample rate"},
{48000, 0, 960, false, "zero channels"},
{48000, -1, 960, false, "negative channels"},
{48000, 100, 960, false, "too many channels"},
{48000, 2, 0, false, "zero frame size"},
{48000, 2, -1, false, "negative frame size"},
{48000, 2, 999999, true, "extremely large frame size"},
{48000, 2, 960, true, "valid configuration"},
{44100, 1, 441, true, "valid mono configuration"},
}
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
err := ValidateInputIPCConfig(tc.sampleRate, tc.channels, tc.frameSize)
if tc.shouldPass {
assert.NoError(t, err, "Should accept valid config: %s", tc.description)
} else {
assert.Error(t, err, "Should reject invalid config: %s", tc.description)
}
})
}
}
// testAtomicOperationConsistency tests atomic operations under high concurrency
func testAtomicOperationConsistency(t *testing.T) {
var counter int64
var wg sync.WaitGroup
const numGoroutines = 100
const incrementsPerGoroutine = 1000
// Launch multiple goroutines performing atomic operations
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < incrementsPerGoroutine; j++ {
atomic.AddInt64(&counter, 1)
}
}()
}
wg.Wait()
// Verify final count is correct
expected := int64(numGoroutines * incrementsPerGoroutine)
actual := atomic.LoadInt64(&counter)
assert.Equal(t, expected, actual, "Atomic operations should be consistent")
}
// TestErrorRecoveryScenarios tests various error recovery scenarios
func TestErrorRecoveryScenarios(t *testing.T) {
tests := []struct {
name string
testFunc func(t *testing.T)
}{
{"NetworkConnectionLoss", testNetworkConnectionLossRecovery},
{"ProcessCrashRecovery", testProcessCrashRecovery},
{"ResourceExhaustionRecovery", testResourceExhaustionRecovery},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.testFunc(t)
})
}
}
// testNetworkConnectionLossRecovery tests recovery from network connection loss
func testNetworkConnectionLossRecovery(t *testing.T) {
// Create a temporary socket that we can close to simulate connection loss
tempDir := t.TempDir()
socketPath := fmt.Sprintf("%s/test_recovery.sock", tempDir)
// Create and immediately close a socket to test connection failure
listener, err := net.Listen("unix", socketPath)
if err != nil {
t.Skipf("Cannot create test socket: %v", err)
}
listener.Close() // Close immediately to simulate connection loss
// Remove socket file to ensure connection will fail
os.Remove(socketPath)
// Test that components handle connection loss gracefully
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// This should handle the connection failure gracefully
err = manager.Start()
if err != nil {
t.Logf("Expected connection failure handled: %v", err)
}
// Cleanup
manager.Stop()
}
// testProcessCrashRecovery tests recovery from process crashes
func testProcessCrashRecovery(t *testing.T) {
if testing.Short() {
t.Skip("Skipping process crash test in short mode")
}
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Start supervisor (will likely fail in test environment, but should handle gracefully)
err := supervisor.Start()
if err != nil {
t.Logf("Supervisor start failed as expected in test environment: %v", err)
}
// Verify supervisor can be stopped cleanly even after start failure
supervisor.Stop()
assert.False(t, supervisor.IsRunning())
}
// testResourceExhaustionRecovery tests recovery from resource exhaustion
func testResourceExhaustionRecovery(t *testing.T) {
// Test with resource constraints
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Simulate resource exhaustion by rapid start/stop cycles
for i := 0; i < 20; i++ {
err := manager.Start()
if err != nil {
t.Logf("Resource exhaustion cycle %d: %v", i, err)
}
manager.Stop()
// No delay to stress test resource management
}
// Verify system can still function after resource stress
err := manager.Start()
if err != nil {
t.Logf("Final start after resource stress: %v", err)
}
manager.Stop()
assert.False(t, manager.IsRunning())
}

View File

@ -170,6 +170,13 @@ func (r *AudioRelay) relayLoop() {
// forwardToWebRTC forwards a frame to the WebRTC audio track
func (r *AudioRelay) forwardToWebRTC(frame []byte) error {
// Validate frame data before processing
if err := ValidateFrameData(frame); err != nil {
r.incrementDropped()
r.logger.Debug().Err(err).Msg("Invalid frame data in relay")
return err
}
r.mutex.RLock()
defer r.mutex.RUnlock()

View File

@ -4,17 +4,12 @@
package audio
import (
"context"
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Restart configuration is now retrieved from centralized config
@ -36,30 +31,17 @@ func getMaxRestartDelay() time.Duration {
// AudioOutputSupervisor manages the audio output server subprocess lifecycle
type AudioOutputSupervisor struct {
ctx context.Context
cancel context.CancelFunc
logger *zerolog.Logger
mutex sync.RWMutex
running int32
// Process management
cmd *exec.Cmd
processPID int
*BaseSupervisor
// Restart management
restartAttempts []time.Time
lastExitCode int
lastExitTime time.Time
// Channels for coordination
processDone chan struct{}
// Channel management
stopChan chan struct{}
processDone chan struct{}
stopChanClosed bool // Track if stopChan is closed
processDoneClosed bool // Track if processDone is closed
// Process monitoring
processMonitor *ProcessMonitor
// Callbacks
onProcessStart func(pid int)
onProcessExit func(pid int, exitCode int, crashed bool)
@ -68,16 +50,11 @@ type AudioOutputSupervisor struct {
// NewAudioOutputSupervisor creates a new audio output server supervisor
func NewAudioOutputSupervisor() *AudioOutputSupervisor {
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputSupervisorComponent).Logger()
return &AudioOutputSupervisor{
ctx: ctx,
cancel: cancel,
logger: &logger,
processDone: make(chan struct{}),
stopChan: make(chan struct{}),
processMonitor: GetProcessMonitor(),
BaseSupervisor: NewBaseSupervisor("audio-output-supervisor"),
restartAttempts: make([]time.Time, 0),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
}
}
@ -101,7 +78,8 @@ func (s *AudioOutputSupervisor) Start() error {
return fmt.Errorf("audio output supervisor is already running")
}
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("starting component")
s.logSupervisorStart()
s.createContext()
// Recreate channels in case they were closed by a previous Stop() call
s.mutex.Lock()
@ -109,12 +87,8 @@ func (s *AudioOutputSupervisor) Start() error {
s.stopChan = make(chan struct{})
s.stopChanClosed = false // Reset channel closed flag
s.processDoneClosed = false // Reset channel closed flag
// Recreate context as well since it might have been cancelled
s.ctx, s.cancel = context.WithCancel(context.Background())
// Reset restart tracking on start
s.restartAttempts = s.restartAttempts[:0]
s.lastExitCode = 0
s.lastExitTime = time.Time{}
s.mutex.Unlock()
// Start the supervision loop
@ -130,7 +104,7 @@ func (s *AudioOutputSupervisor) Stop() {
return // Already stopped
}
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("stopping component")
s.logSupervisorStop()
// Signal stop and wait for cleanup
s.mutex.Lock()
@ -139,7 +113,7 @@ func (s *AudioOutputSupervisor) Stop() {
s.stopChanClosed = true
}
s.mutex.Unlock()
s.cancel()
s.cancelContext()
// Wait for process to exit
select {
@ -153,61 +127,11 @@ func (s *AudioOutputSupervisor) Stop() {
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped")
}
// IsRunning returns true if the supervisor is running
func (s *AudioOutputSupervisor) IsRunning() bool {
return atomic.LoadInt32(&s.running) == 1
}
// GetProcessPID returns the current process PID (0 if not running)
func (s *AudioOutputSupervisor) GetProcessPID() int {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.processPID
}
// GetLastExitInfo returns information about the last process exit
func (s *AudioOutputSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.lastExitCode, s.lastExitTime
}
// GetProcessMetrics returns current process metrics if the process is running
// GetProcessMetrics returns current process metrics with audio-output-server name
func (s *AudioOutputSupervisor) GetProcessMetrics() *ProcessMetrics {
s.mutex.RLock()
pid := s.processPID
s.mutex.RUnlock()
if pid == 0 {
// Return default metrics when no process is running
return &ProcessMetrics{
PID: 0,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-output-server",
}
}
metrics := s.processMonitor.GetCurrentMetrics()
for _, metric := range metrics {
if metric.PID == pid {
return &metric
}
}
// Return default metrics if process not found in monitor
return &ProcessMetrics{
PID: pid,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-output-server",
}
metrics := s.BaseSupervisor.GetProcessMetrics()
metrics.ProcessName = "audio-output-server"
return metrics
}
// supervisionLoop is the main supervision loop

View File

@ -35,11 +35,8 @@ func ValidateFrameData(data []byte) error {
if len(data) == 0 {
return ErrInvalidFrameData
}
// Use a reasonable default if config is not available
maxFrameSize := 4096
if config := GetConfig(); config != nil {
maxFrameSize = config.MaxAudioFrameSize
}
// Use config value or fallback to default
maxFrameSize := GetConfig().MaxAudioFrameSize
if len(data) > maxFrameSize {
return ErrInvalidFrameSize
}
@ -55,11 +52,8 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error {
if len(data) == 0 {
return ErrInvalidFrameData
}
// Use a reasonable default if config is not available
maxFrameSize := 4096
if config := GetConfig(); config != nil {
maxFrameSize = config.MaxAudioFrameSize
}
// Use config value
maxFrameSize := GetConfig().MaxAudioFrameSize
if len(data) > maxFrameSize {
return ErrInvalidFrameSize
}
@ -71,11 +65,8 @@ func ValidateBufferSize(size int) error {
if size <= 0 {
return ErrInvalidBufferSize
}
// Use a reasonable default if config is not available
maxBuffer := 262144 // 256KB default
if config := GetConfig(); config != nil {
maxBuffer = config.SocketMaxBuffer
}
// Use config value
maxBuffer := GetConfig().SocketMaxBuffer
if size > maxBuffer {
return ErrInvalidBufferSize
}
@ -102,11 +93,8 @@ func ValidateLatency(latency time.Duration) error {
if latency < 0 {
return ErrInvalidLatency
}
// Use a reasonable default if config is not available
maxLatency := 500 * time.Millisecond
if config := GetConfig(); config != nil {
maxLatency = config.MaxLatency
}
// Use config value
maxLatency := GetConfig().MaxLatency
if latency > maxLatency {
return ErrInvalidLatency
}
@ -115,13 +103,10 @@ func ValidateLatency(latency time.Duration) error {
// ValidateMetricsInterval validates metrics update interval
func ValidateMetricsInterval(interval time.Duration) error {
// Use reasonable defaults if config is not available
minInterval := 100 * time.Millisecond
maxInterval := 10 * time.Second
if config := GetConfig(); config != nil {
minInterval = config.MinMetricsUpdateInterval
maxInterval = config.MaxMetricsUpdateInterval
}
// Use config values
config := GetConfig()
minInterval := config.MinMetricsUpdateInterval
maxInterval := config.MaxMetricsUpdateInterval
if interval < minInterval {
return ErrInvalidMetricsInterval
}
@ -143,10 +128,7 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error {
return ErrInvalidBufferSize
}
// Validate against global limits
maxBuffer := 262144 // 256KB default
if config := GetConfig(); config != nil {
maxBuffer = config.SocketMaxBuffer
}
maxBuffer := GetConfig().SocketMaxBuffer
if maxSize > maxBuffer {
return ErrInvalidBufferSize
}
@ -155,15 +137,11 @@ func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error {
// ValidateInputIPCConfig validates input IPC configuration
func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error {
// Use reasonable defaults if config is not available
minSampleRate := 8000
maxSampleRate := 48000
maxChannels := 8
if config := GetConfig(); config != nil {
minSampleRate = config.MinSampleRate
maxSampleRate = config.MaxSampleRate
maxChannels = config.MaxChannels
}
// Use config values
config := GetConfig()
minSampleRate := config.MinSampleRate
maxSampleRate := config.MaxSampleRate
maxChannels := config.MaxChannels
if sampleRate < minSampleRate || sampleRate > maxSampleRate {
return ErrInvalidSampleRate
}
@ -175,3 +153,48 @@ func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error {
}
return nil
}
// ValidateOutputIPCConfig validates output IPC configuration
func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error {
// Use config values
config := GetConfig()
minSampleRate := config.MinSampleRate
maxSampleRate := config.MaxSampleRate
maxChannels := config.MaxChannels
if sampleRate < minSampleRate || sampleRate > maxSampleRate {
return ErrInvalidSampleRate
}
if channels < 1 || channels > maxChannels {
return ErrInvalidChannels
}
if frameSize <= 0 {
return ErrInvalidFrameSize
}
return nil
}
// ValidateLatencyConfig validates latency monitor configuration
func ValidateLatencyConfig(config LatencyConfig) error {
if err := ValidateLatency(config.TargetLatency); err != nil {
return err
}
if err := ValidateLatency(config.MaxLatency); err != nil {
return err
}
if config.TargetLatency >= config.MaxLatency {
return ErrInvalidLatency
}
if err := ValidateMetricsInterval(config.OptimizationInterval); err != nil {
return err
}
if config.HistorySize <= 0 {
return ErrInvalidBufferSize
}
if config.JitterThreshold < 0 {
return ErrInvalidLatency
}
if config.AdaptiveThreshold < 0 || config.AdaptiveThreshold > 1.0 {
return ErrInvalidConfiguration
}
return nil
}

View File

@ -43,12 +43,13 @@ type ValidationConfig struct {
// GetValidationConfig returns the current validation configuration
func GetValidationConfig() ValidationConfig {
configConstants := GetConfig()
return ValidationConfig{
Level: ValidationStandard,
EnableRangeChecks: true,
EnableAlignmentCheck: true,
EnableDataIntegrity: false, // Disabled by default for performance
MaxValidationTime: 5 * time.Second, // Default validation timeout
EnableDataIntegrity: false, // Disabled by default for performance
MaxValidationTime: configConstants.MaxValidationTime, // Configurable validation timeout
}
}
@ -88,16 +89,14 @@ func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expect
// Range validation
if validationConfig.EnableRangeChecks {
config := GetConfig()
minFrameSize := 64 // Minimum reasonable frame size
if len(data) < minFrameSize {
return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), minFrameSize)
if len(data) < config.MinFrameSize {
return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), config.MinFrameSize)
}
// Validate frame length matches expected sample format
expectedFrameSize := (expectedSampleRate * expectedChannels * 2) / 1000 * int(config.AudioQualityMediumFrameSize/time.Millisecond)
tolerance := 512 // Frame size tolerance in bytes
if abs(len(data)-expectedFrameSize) > tolerance {
return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, tolerance)
if abs(len(data)-expectedFrameSize) > config.FrameSizeTolerance {
return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, config.FrameSizeTolerance)
}
}
@ -184,14 +183,12 @@ func ValidateAudioConfiguration(config AudioConfig) error {
configConstants := GetConfig()
// Validate bitrate ranges
minBitrate := 6000 // Minimum Opus bitrate
maxBitrate := 510000 // Maximum Opus bitrate
if config.Bitrate < minBitrate || config.Bitrate > maxBitrate {
return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, minBitrate, maxBitrate)
if config.Bitrate < configConstants.MinOpusBitrate || config.Bitrate > configConstants.MaxOpusBitrate {
return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, configConstants.MinOpusBitrate, configConstants.MaxOpusBitrate)
}
// Validate sample rate
validSampleRates := []int{8000, 12000, 16000, 24000, 48000}
validSampleRates := configConstants.ValidSampleRates
validSampleRate := false
for _, rate := range validSampleRates {
if config.SampleRate == rate {
@ -209,8 +206,8 @@ func ValidateAudioConfiguration(config AudioConfig) error {
}
// Validate frame size
minFrameSize := 10 * time.Millisecond // Minimum frame duration
maxFrameSize := 100 * time.Millisecond // Maximum frame duration
minFrameSize := GetConfig().MinFrameDuration
maxFrameSize := GetConfig().MaxFrameDuration
if config.FrameSize < minFrameSize || config.FrameSize > maxFrameSize {
return fmt.Errorf("%w: frame size %v outside valid range [%v, %v]", ErrInvalidConfiguration, config.FrameSize, minFrameSize, maxFrameSize)
}
@ -218,6 +215,101 @@ func ValidateAudioConfiguration(config AudioConfig) error {
return nil
}
// ValidateAudioConfigConstants performs comprehensive validation of AudioConfigConstants
func ValidateAudioConfigConstants(config *AudioConfigConstants) error {
if config == nil {
return fmt.Errorf("%w: configuration is nil", ErrInvalidConfiguration)
}
// Validate basic audio parameters
if config.MaxAudioFrameSize <= 0 {
return fmt.Errorf("%w: MaxAudioFrameSize must be positive", ErrInvalidConfiguration)
}
if config.SampleRate <= 0 {
return fmt.Errorf("%w: SampleRate must be positive", ErrInvalidSampleRate)
}
if config.Channels <= 0 || config.Channels > 8 {
return fmt.Errorf("%w: Channels must be between 1 and 8", ErrInvalidChannels)
}
// Validate Opus parameters
if config.OpusBitrate < 6000 || config.OpusBitrate > 510000 {
return fmt.Errorf("%w: OpusBitrate must be between 6000 and 510000", ErrInvalidConfiguration)
}
if config.OpusComplexity < 0 || config.OpusComplexity > 10 {
return fmt.Errorf("%w: OpusComplexity must be between 0 and 10", ErrInvalidConfiguration)
}
// Validate bitrate ranges
if config.MinOpusBitrate <= 0 || config.MaxOpusBitrate <= 0 {
return fmt.Errorf("%w: MinOpusBitrate and MaxOpusBitrate must be positive", ErrInvalidConfiguration)
}
if config.MinOpusBitrate >= config.MaxOpusBitrate {
return fmt.Errorf("%w: MinOpusBitrate must be less than MaxOpusBitrate", ErrInvalidConfiguration)
}
// Validate sample rate ranges
if config.MinSampleRate <= 0 || config.MaxSampleRate <= 0 {
return fmt.Errorf("%w: MinSampleRate and MaxSampleRate must be positive", ErrInvalidSampleRate)
}
if config.MinSampleRate >= config.MaxSampleRate {
return fmt.Errorf("%w: MinSampleRate must be less than MaxSampleRate", ErrInvalidSampleRate)
}
// Validate frame duration ranges
if config.MinFrameDuration <= 0 || config.MaxFrameDuration <= 0 {
return fmt.Errorf("%w: MinFrameDuration and MaxFrameDuration must be positive", ErrInvalidConfiguration)
}
if config.MinFrameDuration >= config.MaxFrameDuration {
return fmt.Errorf("%w: MinFrameDuration must be less than MaxFrameDuration", ErrInvalidConfiguration)
}
// Validate buffer sizes
if config.SocketMinBuffer <= 0 || config.SocketMaxBuffer <= 0 {
return fmt.Errorf("%w: SocketMinBuffer and SocketMaxBuffer must be positive", ErrInvalidBufferSize)
}
if config.SocketMinBuffer >= config.SocketMaxBuffer {
return fmt.Errorf("%w: SocketMinBuffer must be less than SocketMaxBuffer", ErrInvalidBufferSize)
}
// Validate priority ranges
if config.MinNiceValue < -20 || config.MinNiceValue > 19 {
return fmt.Errorf("%w: MinNiceValue must be between -20 and 19", ErrInvalidPriority)
}
if config.MaxNiceValue < -20 || config.MaxNiceValue > 19 {
return fmt.Errorf("%w: MaxNiceValue must be between -20 and 19", ErrInvalidPriority)
}
if config.MinNiceValue >= config.MaxNiceValue {
return fmt.Errorf("%w: MinNiceValue must be less than MaxNiceValue", ErrInvalidPriority)
}
// Validate timeout values
if config.MaxValidationTime <= 0 {
return fmt.Errorf("%w: MaxValidationTime must be positive", ErrInvalidConfiguration)
}
if config.RestartDelay <= 0 || config.MaxRestartDelay <= 0 {
return fmt.Errorf("%w: RestartDelay and MaxRestartDelay must be positive", ErrInvalidConfiguration)
}
if config.RestartDelay >= config.MaxRestartDelay {
return fmt.Errorf("%w: RestartDelay must be less than MaxRestartDelay", ErrInvalidConfiguration)
}
// Validate valid sample rates array
if len(config.ValidSampleRates) == 0 {
return fmt.Errorf("%w: ValidSampleRates cannot be empty", ErrInvalidSampleRate)
}
for _, rate := range config.ValidSampleRates {
if rate <= 0 {
return fmt.Errorf("%w: all ValidSampleRates must be positive", ErrInvalidSampleRate)
}
if rate < config.MinSampleRate || rate > config.MaxSampleRate {
return fmt.Errorf("%w: ValidSampleRate %d outside range [%d, %d]", ErrInvalidSampleRate, rate, config.MinSampleRate, config.MaxSampleRate)
}
}
return nil
}
// ValidateResourceLimits checks if system resources are within acceptable limits
func ValidateResourceLimits() error {
config := GetConfig()