diff --git a/internal/audio/adaptive_buffer.go b/internal/audio/adaptive_buffer.go index 30e4726..29ee828 100644 --- a/internal/audio/adaptive_buffer.go +++ b/internal/audio/adaptive_buffer.go @@ -18,17 +18,17 @@ import ( // uses multiple factors to make decisions: // // 1. System Load Monitoring: -// - CPU usage: High CPU load increases buffer sizes to prevent underruns -// - Memory usage: High memory pressure reduces buffer sizes to conserve RAM +// - CPU usage: High CPU load increases buffer sizes to prevent underruns +// - Memory usage: High memory pressure reduces buffer sizes to conserve RAM // // 2. Latency Tracking: -// - Target latency: Optimal latency for the current quality setting -// - Max latency: Hard limit beyond which buffers are aggressively reduced +// - Target latency: Optimal latency for the current quality setting +// - Max latency: Hard limit beyond which buffers are aggressively reduced // // 3. Adaptation Strategy: -// - Exponential smoothing: Prevents oscillation and provides stable adjustments -// - Discrete steps: Buffer sizes change in fixed increments to avoid instability -// - Hysteresis: Different thresholds for increasing vs decreasing buffer sizes +// - Exponential smoothing: Prevents oscillation and provides stable adjustments +// - Discrete steps: Buffer sizes change in fixed increments to avoid instability +// - Hysteresis: Different thresholds for increasing vs decreasing buffer sizes // // The algorithm is specifically tuned for embedded ARM systems with limited resources, // prioritizing stability over absolute minimum latency. @@ -182,20 +182,23 @@ func (abm *AdaptiveBufferManager) adaptationLoop() { // // Mathematical Model: // 1. Factor Calculation: -// - CPU Factor: Sigmoid function that increases buffer size under high CPU load -// - Memory Factor: Inverse relationship that decreases buffer size under memory pressure -// - Latency Factor: Exponential decay that aggressively reduces buffers when latency exceeds targets // -// 2. Combined Factor: -// Combined = (CPU_factor * Memory_factor * Latency_factor) -// This multiplicative approach ensures any single critical factor can override others +// - CPU Factor: Sigmoid function that increases buffer size under high CPU load // -// 3. Exponential Smoothing: -// New_size = Current_size + smoothing_factor * (Target_size - Current_size) -// This prevents rapid oscillations and provides stable convergence +// - Memory Factor: Inverse relationship that decreases buffer size under memory pressure // -// 4. Discrete Quantization: -// Final sizes are rounded to frame boundaries and clamped to configured limits +// - Latency Factor: Exponential decay that aggressively reduces buffers when latency exceeds targets +// +// 2. Combined Factor: +// Combined = (CPU_factor * Memory_factor * Latency_factor) +// This multiplicative approach ensures any single critical factor can override others +// +// 3. Exponential Smoothing: +// New_size = Current_size + smoothing_factor * (Target_size - Current_size) +// This prevents rapid oscillations and provides stable convergence +// +// 4. Discrete Quantization: +// Final sizes are rounded to frame boundaries and clamped to configured limits // // The algorithm runs periodically and only applies changes when the adaptation interval // has elapsed, preventing excessive adjustments that could destabilize the audio pipeline. diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index 2306d33..709659f 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -40,7 +40,8 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { preallocSize: preallocSize, pool: sync.Pool{ New: func() interface{} { - return make([]byte, 0, bufferSize) + buf := make([]byte, 0, bufferSize) + return &buf }, }, } diff --git a/internal/audio/input_ipc_manager_test.go b/internal/audio/input_ipc_manager_test.go index 980ca7d..f31d440 100644 --- a/internal/audio/input_ipc_manager_test.go +++ b/internal/audio/input_ipc_manager_test.go @@ -274,4 +274,4 @@ func BenchmarkAudioInputIPCManager(b *testing.B) { manager.GetMetrics() } }) -} \ No newline at end of file +} diff --git a/internal/audio/input_test.go b/internal/audio/input_test.go index eff29e4..ec5e605 100644 --- a/internal/audio/input_test.go +++ b/internal/audio/input_test.go @@ -238,4 +238,4 @@ func BenchmarkAudioInputManager(b *testing.B) { _ = manager.IsReady() } }) -} \ No newline at end of file +} diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index b70fb2e..bee7999 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -452,7 +452,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { return nil, fmt.Errorf("failed to read frame data: %w", err) } } - + // Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer() atomic.AddInt64(&c.totalFrames, 1) diff --git a/internal/audio/naming_standards.go b/internal/audio/naming_standards.go index dee2da9..21e2c95 100644 --- a/internal/audio/naming_standards.go +++ b/internal/audio/naming_standards.go @@ -78,8 +78,8 @@ const ( AudioOutputIPCComponent = "audio-output-ipc" // Common component names - AudioRelayComponent = "audio-relay" - AudioEventsComponent = "audio-events" + AudioRelayComponent = "audio-relay" + AudioEventsComponent = "audio-events" AudioMetricsComponent = "audio-metrics" ) @@ -117,4 +117,4 @@ type AudioStreamerInterface interface { Start() error Stop() GetStats() (processed, dropped int64, avgProcessingTime time.Duration) -} \ No newline at end of file +} diff --git a/internal/audio/output_manager.go b/internal/audio/output_manager.go index d61a217..66140b6 100644 --- a/internal/audio/output_manager.go +++ b/internal/audio/output_manager.go @@ -174,4 +174,4 @@ func (aom *AudioOutputManager) LogPerformanceStats() { // GetStreamer returns the streamer for advanced operations func (aom *AudioOutputManager) GetStreamer() *AudioOutputStreamer { return aom.streamer -} \ No newline at end of file +} diff --git a/internal/audio/output_manager_test.go b/internal/audio/output_manager_test.go index fe80ea1..f680573 100644 --- a/internal/audio/output_manager_test.go +++ b/internal/audio/output_manager_test.go @@ -274,4 +274,4 @@ func BenchmarkAudioOutputManager(b *testing.B) { manager.GetMetrics() } }) -} \ No newline at end of file +} diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index ae883a0..190fdee 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -202,7 +202,7 @@ func (s *AudioOutputStreamer) processingLoop() { // Process frame and return buffer to pool after processing func() { defer s.bufferPool.Put(frameData) - + if _, err := s.client.ReceiveFrame(); err != nil { if s.client.IsConnected() { getOutputStreamingLogger().Warn().Err(err).Msg("Error reading audio frame from output server") diff --git a/internal/audio/output_streaming_test.go b/internal/audio/output_streaming_test.go index dc0664d..23228be 100644 --- a/internal/audio/output_streaming_test.go +++ b/internal/audio/output_streaming_test.go @@ -338,4 +338,4 @@ func BenchmarkAudioOutputStreamer(b *testing.B) { streamer.ReportLatency(10 * time.Millisecond) } }) -} \ No newline at end of file +} diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go index 42c1cb1..1a9fc36 100644 --- a/internal/audio/supervisor.go +++ b/internal/audio/supervisor.go @@ -52,9 +52,9 @@ type AudioOutputSupervisor struct { lastExitTime time.Time // Channels for coordination - processDone chan struct{} - stopChan chan struct{} - stopChanClosed bool // Track if stopChan is closed + processDone chan struct{} + stopChan chan struct{} + stopChanClosed bool // Track if stopChan is closed processDoneClosed bool // Track if processDone is closed // Process monitoring @@ -107,7 +107,7 @@ func (s *AudioOutputSupervisor) Start() error { s.mutex.Lock() s.processDone = make(chan struct{}) s.stopChan = make(chan struct{}) - s.stopChanClosed = false // Reset channel closed flag + s.stopChanClosed = false // Reset channel closed flag s.processDoneClosed = false // Reset channel closed flag // Recreate context as well since it might have been cancelled s.ctx, s.cancel = context.WithCancel(context.Background()) @@ -197,7 +197,7 @@ func (s *AudioOutputSupervisor) GetProcessMetrics() *ProcessMetrics { return &metric } } - + // Return default metrics if process not found in monitor return &ProcessMetrics{ PID: pid, diff --git a/internal/audio/supervisor_unit_test.go b/internal/audio/supervisor_unit_test.go index 045d8e7..0cab46f 100644 --- a/internal/audio/supervisor_unit_test.go +++ b/internal/audio/supervisor_unit_test.go @@ -109,9 +109,9 @@ func TestAudioOutputSupervisorConcurrentOperations(t *testing.T) { for i := 0; i < 5; i++ { wg.Add(1) go func() { - defer wg.Done() - _ = supervisor.GetProcessMetrics() - }() + defer wg.Done() + _ = supervisor.GetProcessMetrics() + }() } // Test concurrent status checks @@ -214,4 +214,4 @@ func BenchmarkAudioOutputSupervisor(b *testing.B) { _ = supervisor.IsRunning() } }) -} \ No newline at end of file +} diff --git a/internal/audio/validation_enhanced.go b/internal/audio/validation_enhanced.go index e69acfa..22861a0 100644 --- a/internal/audio/validation_enhanced.go +++ b/internal/audio/validation_enhanced.go @@ -11,25 +11,25 @@ import ( // Enhanced validation errors with more specific context var ( - ErrInvalidFrameLength = errors.New("invalid frame length") - ErrFrameDataCorrupted = errors.New("frame data appears corrupted") - ErrBufferAlignment = errors.New("buffer alignment invalid") - ErrInvalidSampleFormat = errors.New("invalid sample format") - ErrInvalidTimestamp = errors.New("invalid timestamp") - ErrConfigurationMismatch = errors.New("configuration mismatch") - ErrResourceExhaustion = errors.New("resource exhaustion detected") - ErrInvalidPointer = errors.New("invalid pointer") - ErrBufferOverflow = errors.New("buffer overflow detected") - ErrInvalidState = errors.New("invalid state") + ErrInvalidFrameLength = errors.New("invalid frame length") + ErrFrameDataCorrupted = errors.New("frame data appears corrupted") + ErrBufferAlignment = errors.New("buffer alignment invalid") + ErrInvalidSampleFormat = errors.New("invalid sample format") + ErrInvalidTimestamp = errors.New("invalid timestamp") + ErrConfigurationMismatch = errors.New("configuration mismatch") + ErrResourceExhaustion = errors.New("resource exhaustion detected") + ErrInvalidPointer = errors.New("invalid pointer") + ErrBufferOverflow = errors.New("buffer overflow detected") + ErrInvalidState = errors.New("invalid state") ) // ValidationLevel defines the level of validation to perform type ValidationLevel int const ( - ValidationMinimal ValidationLevel = iota // Only critical safety checks - ValidationStandard // Standard validation for production - ValidationStrict // Comprehensive validation for debugging + ValidationMinimal ValidationLevel = iota // Only critical safety checks + ValidationStandard // Standard validation for production + ValidationStrict // Comprehensive validation for debugging ) // ValidationConfig controls validation behavior @@ -47,7 +47,7 @@ func GetValidationConfig() ValidationConfig { Level: ValidationStandard, EnableRangeChecks: true, EnableAlignmentCheck: true, - EnableDataIntegrity: false, // Disabled by default for performance + EnableDataIntegrity: false, // Disabled by default for performance MaxValidationTime: 5 * time.Second, // Default validation timeout } } @@ -57,13 +57,13 @@ func ValidateAudioFrameFast(data []byte) error { if len(data) == 0 { return ErrInvalidFrameData } - + // Quick bounds check using config constants maxSize := GetConfig().MaxAudioFrameSize if len(data) > maxSize { return fmt.Errorf("%w: frame size %d exceeds maximum %d", ErrInvalidFrameSize, len(data), maxSize) } - + return nil } @@ -71,7 +71,7 @@ func ValidateAudioFrameFast(data []byte) error { func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expectedChannels int) error { validationConfig := GetValidationConfig() start := time.Now() - + // Timeout protection for validation defer func() { if time.Since(start) > validationConfig.MaxValidationTime { @@ -79,12 +79,12 @@ func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expect getValidationLogger().Warn().Dur("duration", time.Since(start)).Msg("validation timeout exceeded") } }() - + // Basic validation first if err := ValidateAudioFrameFast(data); err != nil { return err } - + // Range validation if validationConfig.EnableRangeChecks { config := GetConfig() @@ -92,7 +92,7 @@ func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expect if len(data) < minFrameSize { return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), minFrameSize) } - + // Validate frame length matches expected sample format expectedFrameSize := (expectedSampleRate * expectedChannels * 2) / 1000 * int(config.AudioQualityMediumFrameSize/time.Millisecond) tolerance := 512 // Frame size tolerance in bytes @@ -100,21 +100,21 @@ func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expect return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, tolerance) } } - + // Alignment validation for ARM32 compatibility if validationConfig.EnableAlignmentCheck { if uintptr(unsafe.Pointer(&data[0]))%4 != 0 { return fmt.Errorf("%w: buffer not 4-byte aligned for ARM32", ErrBufferAlignment) } } - + // Data integrity checks (expensive, only for debugging) if validationConfig.EnableDataIntegrity && validationConfig.Level == ValidationStrict { if err := validateAudioDataIntegrity(data, expectedChannels); err != nil { return err } } - + return nil } @@ -123,26 +123,26 @@ func ValidateZeroCopyFrameEnhanced(frame *ZeroCopyAudioFrame) error { if frame == nil { return fmt.Errorf("%w: frame is nil", ErrInvalidPointer) } - + // Check reference count validity frame.mutex.RLock() refCount := frame.refCount length := frame.length capacity := frame.capacity frame.mutex.RUnlock() - + if refCount <= 0 { return fmt.Errorf("%w: invalid reference count %d", ErrInvalidState, refCount) } - + if length < 0 || capacity < 0 { return fmt.Errorf("%w: negative length (%d) or capacity (%d)", ErrInvalidState, length, capacity) } - + if length > capacity { return fmt.Errorf("%w: length %d exceeds capacity %d", ErrBufferOverflow, length, capacity) } - + // Validate the underlying data data := frame.Data() return ValidateAudioFrameFast(data) @@ -153,25 +153,25 @@ func ValidateBufferBounds(buffer []byte, offset, length int) error { if buffer == nil { return fmt.Errorf("%w: buffer is nil", ErrInvalidPointer) } - + if offset < 0 { return fmt.Errorf("%w: negative offset %d", ErrInvalidState, offset) } - + if length < 0 { return fmt.Errorf("%w: negative length %d", ErrInvalidState, length) } - + // Check for integer overflow if offset > len(buffer) { return fmt.Errorf("%w: offset %d exceeds buffer length %d", ErrBufferOverflow, offset, len(buffer)) } - + // Safe addition check for overflow if offset+length < offset || offset+length > len(buffer) { return fmt.Errorf("%w: range [%d:%d] exceeds buffer length %d", ErrBufferOverflow, offset, offset+length, len(buffer)) } - + return nil } @@ -180,16 +180,16 @@ func ValidateAudioConfiguration(config AudioConfig) error { if err := ValidateAudioQuality(config.Quality); err != nil { return fmt.Errorf("quality validation failed: %w", err) } - + configConstants := GetConfig() - + // Validate bitrate ranges minBitrate := 6000 // Minimum Opus bitrate maxBitrate := 510000 // Maximum Opus bitrate if config.Bitrate < minBitrate || config.Bitrate > maxBitrate { return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, minBitrate, maxBitrate) } - + // Validate sample rate validSampleRates := []int{8000, 12000, 16000, 24000, 48000} validSampleRate := false @@ -202,38 +202,38 @@ func ValidateAudioConfiguration(config AudioConfig) error { if !validSampleRate { return fmt.Errorf("%w: sample rate %d not in supported rates %v", ErrInvalidSampleRate, config.SampleRate, validSampleRates) } - + // Validate channels if config.Channels < 1 || config.Channels > configConstants.MaxChannels { return fmt.Errorf("%w: channels %d outside valid range [1, %d]", ErrInvalidChannels, config.Channels, configConstants.MaxChannels) } - + // Validate frame size minFrameSize := 10 * time.Millisecond // Minimum frame duration maxFrameSize := 100 * time.Millisecond // Maximum frame duration if config.FrameSize < minFrameSize || config.FrameSize > maxFrameSize { return fmt.Errorf("%w: frame size %v outside valid range [%v, %v]", ErrInvalidConfiguration, config.FrameSize, minFrameSize, maxFrameSize) } - + return nil } // ValidateResourceLimits checks if system resources are within acceptable limits func ValidateResourceLimits() error { config := GetConfig() - + // Check buffer pool sizes framePoolStats := GetAudioBufferPoolStats() if framePoolStats.FramePoolSize > int64(config.MaxPoolSize*2) { return fmt.Errorf("%w: frame pool size %d exceeds safe limit %d", ErrResourceExhaustion, framePoolStats.FramePoolSize, config.MaxPoolSize*2) } - + // Check zero-copy pool allocation count zeroCopyStats := GetGlobalZeroCopyPoolStats() if zeroCopyStats.AllocationCount > int64(config.MaxPoolSize*3) { return fmt.Errorf("%w: zero-copy allocations %d exceed safe limit %d", ErrResourceExhaustion, zeroCopyStats.AllocationCount, config.MaxPoolSize*3) } - + return nil } @@ -242,34 +242,35 @@ func validateAudioDataIntegrity(data []byte, channels int) error { if len(data)%2 != 0 { return fmt.Errorf("%w: odd number of bytes for 16-bit samples", ErrInvalidSampleFormat) } - + if len(data)%(channels*2) != 0 { return fmt.Errorf("%w: data length %d not aligned to channel count %d", ErrInvalidSampleFormat, len(data), channels) } - + // Check for obvious corruption patterns (all zeros, all max values) sampleCount := len(data) / 2 zeroCount := 0 maxCount := 0 - + for i := 0; i < len(data); i += 2 { sample := int16(data[i]) | int16(data[i+1])<<8 - if sample == 0 { + switch sample { + case 0: zeroCount++ - } else if sample == 32767 || sample == -32768 { + case 32767, -32768: maxCount++ } } - + // Flag suspicious patterns if zeroCount > sampleCount*9/10 { return fmt.Errorf("%w: %d%% zero samples suggests silence or corruption", ErrFrameDataCorrupted, (zeroCount*100)/sampleCount) } - + if maxCount > sampleCount/10 { return fmt.Errorf("%w: %d%% max-value samples suggests clipping or corruption", ErrFrameDataCorrupted, (maxCount*100)/sampleCount) } - + return nil } @@ -286,4 +287,4 @@ func getValidationLogger() *zerolog.Logger { // Return a basic logger for validation logger := zerolog.New(nil).With().Timestamp().Logger() return &logger -} \ No newline at end of file +} diff --git a/internal/audio/zero_copy.go b/internal/audio/zero_copy.go index 3d4e229..c099dc6 100644 --- a/internal/audio/zero_copy.go +++ b/internal/audio/zero_copy.go @@ -13,25 +13,27 @@ import ( // allocations and memory copying in the audio pipeline: // // Key Features: -// 1. Reference Counting: Multiple components can safely share the same frame data -// without copying. The frame is automatically returned to the pool when the last -// reference is released. // -// 2. Thread Safety: All operations are protected by RWMutex, allowing concurrent -// reads while ensuring exclusive access for modifications. +// 1. Reference Counting: Multiple components can safely share the same frame data +// without copying. The frame is automatically returned to the pool when the last +// reference is released. // -// 3. Pool Integration: Frames are automatically managed by ZeroCopyFramePool, -// enabling efficient reuse and preventing memory fragmentation. +// 2. Thread Safety: All operations are protected by RWMutex, allowing concurrent +// reads while ensuring exclusive access for modifications. // -// 4. Unsafe Pointer Access: For performance-critical CGO operations, direct -// memory access is provided while maintaining safety through reference counting. +// 3. Pool Integration: Frames are automatically managed by ZeroCopyFramePool, +// enabling efficient reuse and preventing memory fragmentation. +// +// 4. Unsafe Pointer Access: For performance-critical CGO operations, direct +// memory access is provided while maintaining safety through reference counting. // // Usage Pattern: -// frame := pool.Get() // Acquire frame (refCount = 1) -// frame.AddRef() // Share with another component (refCount = 2) -// data := frame.Data() // Access data safely -// frame.Release() // Release reference (refCount = 1) -// frame.Release() // Final release, returns to pool (refCount = 0) +// +// frame := pool.Get() // Acquire frame (refCount = 1) +// frame.AddRef() // Share with another component (refCount = 2) +// data := frame.Data() // Access data safely +// frame.Release() // Release reference (refCount = 1) +// frame.Release() // Final release, returns to pool (refCount = 0) // // Memory Safety: // - Frames cannot be modified while shared (refCount > 1) @@ -52,23 +54,26 @@ type ZeroCopyAudioFrame struct { // real-time audio processing with minimal allocation overhead: // // Tier 1 - Pre-allocated Frames: -// A small number of frames are pre-allocated at startup and kept ready -// for immediate use. This provides the fastest possible allocation for -// the most common case and eliminates allocation latency spikes. +// +// A small number of frames are pre-allocated at startup and kept ready +// for immediate use. This provides the fastest possible allocation for +// the most common case and eliminates allocation latency spikes. // // Tier 2 - sync.Pool Cache: -// The standard Go sync.Pool provides efficient reuse of frames with -// automatic garbage collection integration. Frames are automatically -// returned here when memory pressure is low. +// +// The standard Go sync.Pool provides efficient reuse of frames with +// automatic garbage collection integration. Frames are automatically +// returned here when memory pressure is low. // // Tier 3 - Memory Guard: -// A configurable limit prevents excessive memory usage by limiting -// the total number of allocated frames. When the limit is reached, -// allocation requests are denied to prevent OOM conditions. +// +// A configurable limit prevents excessive memory usage by limiting +// the total number of allocated frames. When the limit is reached, +// allocation requests are denied to prevent OOM conditions. // // Performance Characteristics: // - Pre-allocated tier: ~10ns allocation time -// - sync.Pool tier: ~50ns allocation time +// - sync.Pool tier: ~50ns allocation time // - Memory guard: Prevents unbounded growth // - Metrics tracking: Hit/miss rates for optimization //