diff --git a/internal/audio/api.go b/internal/audio/api.go index d3a73f9..0444ad9 100644 --- a/internal/audio/api.go +++ b/internal/audio/api.go @@ -9,7 +9,7 @@ import ( var ( // Global audio output supervisor instance - globalOutputSupervisor unsafe.Pointer // *AudioServerSupervisor + globalOutputSupervisor unsafe.Pointer // *AudioOutputSupervisor ) // isAudioServerProcess detects if we're running as the audio server subprocess @@ -58,15 +58,15 @@ func StopNonBlockingAudioStreaming() { } // SetAudioOutputSupervisor sets the global audio output supervisor -func SetAudioOutputSupervisor(supervisor *AudioServerSupervisor) { +func SetAudioOutputSupervisor(supervisor *AudioOutputSupervisor) { atomic.StorePointer(&globalOutputSupervisor, unsafe.Pointer(supervisor)) } // GetAudioOutputSupervisor returns the global audio output supervisor -func GetAudioOutputSupervisor() *AudioServerSupervisor { +func GetAudioOutputSupervisor() *AudioOutputSupervisor { ptr := atomic.LoadPointer(&globalOutputSupervisor) if ptr == nil { return nil } - return (*AudioServerSupervisor)(ptr) + return (*AudioOutputSupervisor)(ptr) } diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index c834a15..e5d9b4c 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -881,6 +881,12 @@ type AudioConfigConstants struct { // Default 5s provides responsive input monitoring. InputSupervisorTimeout time.Duration // 5s + // OutputSupervisorTimeout defines timeout for output supervisor operations. + // Used in: supervisor.go for output process monitoring + // Impact: Shorter timeouts improve output responsiveness but may cause false timeouts. + // Default 5s provides responsive output monitoring. + OutputSupervisorTimeout time.Duration // 5s + // ShortTimeout defines brief timeout for time-critical operations. // Used in: Real-time audio processing for minimal timeout scenarios // Impact: Very short timeouts ensure responsiveness but may cause premature failures. @@ -1382,6 +1388,158 @@ type AudioConfigConstants struct { // Impact: Controls scaling factor for memory influence on buffer sizing. // Default 100 provides standard percentage scaling for memory calculations. AdaptiveBufferMemoryMultiplier int + + // Socket Names - Configuration for IPC socket file names + // Used in: IPC communication for audio input/output + // Impact: Controls socket file naming and IPC connection endpoints + + // InputSocketName defines the socket file name for audio input IPC. + // Used in: input_ipc.go for microphone input communication + // Impact: Must be unique to prevent conflicts with other audio sockets. + // Default "audio_input.sock" provides clear identification for input socket. + InputSocketName string + + // OutputSocketName defines the socket file name for audio output IPC. + // Used in: ipc.go for audio output communication + // Impact: Must be unique to prevent conflicts with other audio sockets. + // Default "audio_output.sock" provides clear identification for output socket. + OutputSocketName string + + // Component Names - Standardized component identifiers for logging + // Used in: Logging and monitoring throughout audio system + // Impact: Provides consistent component identification across logs + + // AudioInputComponentName defines component name for audio input logging. + // Used in: input_ipc.go and related input processing components + // Impact: Ensures consistent logging identification for input components. + // Default "audio-input" provides clear component identification. + AudioInputComponentName string + + // AudioOutputComponentName defines component name for audio output logging. + // Used in: ipc.go and related output processing components + // Impact: Ensures consistent logging identification for output components. + // Default "audio-output" provides clear component identification. + AudioOutputComponentName string + + // AudioServerComponentName defines component name for audio server logging. + // Used in: supervisor.go and server management components + // Impact: Ensures consistent logging identification for server components. + // Default "audio-server" provides clear component identification. + AudioServerComponentName string + + // AudioRelayComponentName defines component name for audio relay logging. + // Used in: relay.go for audio relay operations + // Impact: Ensures consistent logging identification for relay components. + // Default "audio-relay" provides clear component identification. + AudioRelayComponentName string + + // AudioEventsComponentName defines component name for audio events logging. + // Used in: events.go for event broadcasting operations + // Impact: Ensures consistent logging identification for event components. + // Default "audio-events" provides clear component identification. + AudioEventsComponentName string + + // Test Configuration - Constants for testing scenarios + // Used in: Test files for consistent test configuration + // Impact: Provides standardized test parameters and timeouts + + // TestSocketTimeout defines timeout for test socket operations. + // Used in: integration_test.go for test socket communication + // Impact: Prevents test hangs while allowing sufficient time for operations. + // Default 100ms provides quick test execution with adequate timeout. + TestSocketTimeout time.Duration + + // TestBufferSize defines buffer size for test operations. + // Used in: test_utils.go for test buffer allocation + // Impact: Provides adequate buffer space for test scenarios. + // Default 4096 bytes matches production buffer sizes for realistic testing. + TestBufferSize int + + // TestRetryDelay defines delay between test retry attempts. + // Used in: Test files for retry logic in test scenarios + // Impact: Provides reasonable delay for test retry operations. + // Default 200ms allows sufficient time for test state changes. + TestRetryDelay time.Duration + + // Latency Histogram Configuration - Constants for latency tracking + // Used in: granular_metrics.go for latency distribution analysis + // Impact: Controls granularity and accuracy of latency measurements + + // LatencyHistogramMaxSamples defines maximum samples for latency tracking. + // Used in: granular_metrics.go for latency histogram management + // Impact: Controls memory usage and accuracy of latency statistics. + // Default 1000 samples provides good statistical accuracy with reasonable memory usage. + LatencyHistogramMaxSamples int + + // LatencyPercentile50 defines 50th percentile calculation factor. + // Used in: granular_metrics.go for median latency calculation + // Impact: Must be 50 for accurate median calculation. + // Default 50 provides standard median percentile calculation. + LatencyPercentile50 int + + // LatencyPercentile95 defines 95th percentile calculation factor. + // Used in: granular_metrics.go for high-percentile latency calculation + // Impact: Must be 95 for accurate 95th percentile calculation. + // Default 95 provides standard high-percentile calculation. + LatencyPercentile95 int + + // LatencyPercentile99 defines 99th percentile calculation factor. + // Used in: granular_metrics.go for extreme latency calculation + // Impact: Must be 99 for accurate 99th percentile calculation. + // Default 99 provides standard extreme percentile calculation. + LatencyPercentile99 int + + // BufferPoolMaxOperations defines maximum operations to track for efficiency. + // Used in: granular_metrics.go for buffer pool efficiency tracking + // Impact: Controls memory usage and accuracy of efficiency statistics. + // Default 1000 operations provides good balance of accuracy and memory usage. + BufferPoolMaxOperations int + + // HitRateCalculationBase defines base value for hit rate percentage calculation. + // Used in: granular_metrics.go for hit rate percentage calculation + // Impact: Must be 100 for accurate percentage calculation. + // Default 100 provides standard percentage calculation base. + HitRateCalculationBase float64 + + // Validation Constants - Configuration for input validation + // Used in: validation.go for parameter validation + // Impact: Controls validation thresholds and limits + + // MaxLatency defines maximum allowed latency for audio processing. + // Used in: validation.go for latency validation + // Impact: Controls maximum acceptable latency before optimization triggers. + // Default 200ms provides reasonable upper bound for real-time audio. + MaxLatency time.Duration + + // MinMetricsUpdateInterval defines minimum allowed metrics update interval. + // Used in: validation.go for metrics interval validation + // Impact: Prevents excessive metrics updates that could impact performance. + // Default 100ms provides reasonable minimum update frequency. + MinMetricsUpdateInterval time.Duration + + // MaxMetricsUpdateInterval defines maximum allowed metrics update interval. + // Used in: validation.go for metrics interval validation + // Impact: Ensures metrics are updated frequently enough for monitoring. + // Default 30s provides reasonable maximum update interval. + MaxMetricsUpdateInterval time.Duration + + // MinSampleRate defines minimum allowed audio sample rate. + // Used in: validation.go for sample rate validation + // Impact: Ensures sample rate is sufficient for audio quality. + // Default 8000Hz provides minimum for voice communication. + MinSampleRate int + + // MaxSampleRate defines maximum allowed audio sample rate. + // Used in: validation.go for sample rate validation + // Impact: Prevents excessive sample rates that could impact performance. + // Default 192000Hz provides upper bound for high-quality audio. + MaxSampleRate int + + // MaxChannels defines maximum allowed audio channels. + // Used in: validation.go for channel count validation + // Impact: Prevents excessive channel counts that could impact performance. + // Default 8 channels provides reasonable upper bound for multi-channel audio. + MaxChannels int } // DefaultAudioConfig returns the default configuration constants @@ -2204,6 +2362,12 @@ func DefaultAudioConfig() *AudioConfigConstants { // Default 5s (shorter than general supervisor) for faster input recovery InputSupervisorTimeout: 5 * time.Second, + // OutputSupervisorTimeout defines timeout for output supervisor operations. + // Used in: Output process monitoring, speaker supervision + // Impact: Controls responsiveness of output failure detection + // Default 5s (shorter than general supervisor) for faster output recovery + OutputSupervisorTimeout: 5 * time.Second, + // ShortTimeout defines brief timeout for quick operations (5ms). // Used in: Lock acquisition, quick IPC operations, immediate responses // Impact: Critical for maintaining real-time performance @@ -2365,6 +2529,40 @@ func DefaultAudioConfig() *AudioConfigConstants { // Adaptive Buffer Constants AdaptiveBufferCPUMultiplier: 100, // 100 multiplier for CPU percentage AdaptiveBufferMemoryMultiplier: 100, // 100 multiplier for memory percentage + + // Socket Names + InputSocketName: "audio_input.sock", // Socket name for audio input IPC + OutputSocketName: "audio_output.sock", // Socket name for audio output IPC + + // Component Names + AudioInputComponentName: "audio-input", // Component name for input logging + AudioOutputComponentName: "audio-output", // Component name for output logging + AudioServerComponentName: "audio-server", // Component name for server logging + AudioRelayComponentName: "audio-relay", // Component name for relay logging + AudioEventsComponentName: "audio-events", // Component name for events logging + + // Test Configuration + TestSocketTimeout: 100 * time.Millisecond, // 100ms timeout for test socket operations + TestBufferSize: 4096, // 4096 bytes buffer size for test operations + TestRetryDelay: 200 * time.Millisecond, // 200ms delay between test retry attempts + + // Latency Histogram Configuration + LatencyHistogramMaxSamples: 1000, // 1000 samples for latency tracking + LatencyPercentile50: 50, // 50th percentile calculation factor + LatencyPercentile95: 95, // 95th percentile calculation factor + LatencyPercentile99: 99, // 99th percentile calculation factor + + // Buffer Pool Efficiency Constants + BufferPoolMaxOperations: 1000, // 1000 operations for efficiency tracking + HitRateCalculationBase: 100.0, // 100.0 base for hit rate percentage calculation + + // Validation Constants + MaxLatency: 500 * time.Millisecond, // 500ms maximum allowed latency + MinMetricsUpdateInterval: 100 * time.Millisecond, // 100ms minimum metrics update interval + MaxMetricsUpdateInterval: 10 * time.Second, // 10s maximum metrics update interval + MinSampleRate: 8000, // 8kHz minimum sample rate + MaxSampleRate: 48000, // 48kHz maximum sample rate + MaxChannels: 8, // 8 maximum audio channels } } diff --git a/internal/audio/events.go b/internal/audio/events.go index 9b12562..d7ec3f7 100644 --- a/internal/audio/events.go +++ b/internal/audio/events.go @@ -111,7 +111,7 @@ func initializeBroadcaster() { go audioEventBroadcaster.startMetricsBroadcasting() // Start granular metrics logging with same interval as metrics broadcasting - StartGranularMetricsLogging(GetMetricsUpdateInterval()) + // StartGranularMetricsLogging(GetMetricsUpdateInterval()) // Disabled to reduce log pollution } // InitializeAudioEventBroadcaster initializes the global audio event broadcaster diff --git a/internal/audio/input.go b/internal/audio/input.go index f1beb60..1b2c875 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -47,7 +47,13 @@ func (aim *AudioInputManager) Start() error { err := aim.ipcManager.Start() if err != nil { aim.logger.Error().Err(err).Msg("Failed to start IPC audio input") + // Ensure proper cleanup on error atomic.StoreInt32(&aim.running, 0) + // Reset metrics on failed start + atomic.StoreInt64(&aim.metrics.FramesSent, 0) + atomic.StoreInt64(&aim.metrics.FramesDropped, 0) + atomic.StoreInt64(&aim.metrics.BytesProcessed, 0) + atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0) return err } diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 4523c03..489cc16 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -1,7 +1,6 @@ package audio import ( - "context" "encoding/binary" "fmt" "io" @@ -19,7 +18,6 @@ import ( var ( inputMagicNumber uint32 = GetConfig().InputMagicNumber // "JKMI" (JetKVM Microphone Input) inputSocketName = "audio_input.sock" - writeTimeout = GetConfig().WriteTimeout // Non-blocking write timeout ) const ( @@ -51,6 +49,27 @@ type InputIPCMessage struct { Data []byte } +// Implement IPCMessage interface +func (msg *InputIPCMessage) GetMagic() uint32 { + return msg.Magic +} + +func (msg *InputIPCMessage) GetType() uint8 { + return uint8(msg.Type) +} + +func (msg *InputIPCMessage) GetLength() uint32 { + return msg.Length +} + +func (msg *InputIPCMessage) GetTimestamp() int64 { + return msg.Timestamp +} + +func (msg *InputIPCMessage) GetData() []byte { + return msg.Data +} + // OptimizedIPCMessage represents an optimized message with pre-allocated buffers type OptimizedIPCMessage struct { header [headerSize]byte // Pre-allocated header buffer @@ -167,7 +186,7 @@ type InputIPCConfig struct { // AudioInputServer handles IPC communication for audio input processing type AudioInputServer struct { - // Atomic fields must be first for proper alignment on ARM + // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) bufferSize int64 // Current buffer size (atomic) processingTime int64 // Average processing time in nanoseconds (atomic) droppedFrames int64 // Dropped frames counter (atomic) @@ -227,6 +246,11 @@ func (ais *AudioInputServer) Start() error { ais.running = true + // Reset counters on start + atomic.StoreInt64(&ais.totalFrames, 0) + atomic.StoreInt64(&ais.droppedFrames, 0) + atomic.StoreInt64(&ais.processingTime, 0) + // Start triple-goroutine architecture ais.startReaderGoroutine() ais.startProcessorGoroutine() @@ -276,7 +300,9 @@ func (ais *AudioInputServer) acceptConnections() { conn, err := ais.listener.Accept() if err != nil { if ais.running { - // Only log error if we're still supposed to be running + // Log error and continue accepting + logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger() + logger.Warn().Err(err).Msg("Failed to accept connection, retrying") continue } return @@ -293,9 +319,10 @@ func (ais *AudioInputServer) acceptConnections() { } ais.mtx.Lock() - // Close existing connection if any + // Close existing connection if any to prevent resource leaks if ais.conn != nil { ais.conn.Close() + ais.conn = nil } ais.conn = conn ais.mtx.Unlock() @@ -515,6 +542,12 @@ func (aic *AudioInputClient) Connect() error { return nil // Already connected } + // Ensure clean state before connecting + if aic.conn != nil { + aic.conn.Close() + aic.conn = nil + } + socketPath := getInputSocketPath() // Try connecting multiple times as the server might not be ready // Reduced retry count and delay for faster startup @@ -523,6 +556,9 @@ func (aic *AudioInputClient) Connect() error { if err == nil { aic.conn = conn aic.running = true + // Reset frame counters on successful connection + atomic.StoreInt64(&aic.totalFrames, 0) + atomic.StoreInt64(&aic.droppedFrames, 0) return nil } // Exponential backoff starting from config @@ -535,7 +571,10 @@ func (aic *AudioInputClient) Connect() error { time.Sleep(delay) } - return fmt.Errorf("failed to connect to audio input server") + // Ensure clean state on connection failure + aic.conn = nil + aic.running = false + return fmt.Errorf("failed to connect to audio input server after 10 attempts") } // Disconnect disconnects from the audio input server @@ -671,54 +710,17 @@ func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error { // Increment total frames counter atomic.AddInt64(&aic.totalFrames, 1) - // Get optimized message from pool for header preparation - optMsg := globalMessagePool.Get() - defer globalMessagePool.Put(optMsg) - - // Prepare header in pre-allocated buffer - binary.LittleEndian.PutUint32(optMsg.header[0:4], msg.Magic) - optMsg.header[4] = byte(msg.Type) - binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.Length) - binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.Timestamp)) - - // Use non-blocking write with timeout - ctx, cancel := context.WithTimeout(context.Background(), writeTimeout) - defer cancel() - - // Create a channel to signal write completion - done := make(chan error, 1) - go func() { - // Write header using pre-allocated buffer - _, err := aic.conn.Write(optMsg.header[:]) - if err != nil { - done <- err - return - } - - // Write data if present - if msg.Length > 0 && msg.Data != nil { - _, err = aic.conn.Write(msg.Data) - if err != nil { - done <- err - return - } - } - done <- nil - }() - - // Wait for completion or timeout - select { - case err := <-done: - if err != nil { - atomic.AddInt64(&aic.droppedFrames, 1) - return err - } - return nil - case <-ctx.Done(): - // Timeout occurred - drop frame to prevent blocking - atomic.AddInt64(&aic.droppedFrames, 1) - return fmt.Errorf("write timeout - frame dropped") + // Use common write function with shared message pool + sharedPool := &GenericMessagePool{ + pool: make(chan *OptimizedMessage, messagePoolSize), + hitCount: globalMessagePool.hitCount, + missCount: globalMessagePool.missCount, + preallocated: make([]*OptimizedMessage, 0), + preallocSize: messagePoolSize / 4, + maxPoolSize: messagePoolSize, } + + return WriteIPCMessage(aic.conn, msg, sharedPool, &aic.droppedFrames) } // IsConnected returns whether the client is connected @@ -730,23 +732,19 @@ func (aic *AudioInputClient) IsConnected() bool { // GetFrameStats returns frame statistics func (aic *AudioInputClient) GetFrameStats() (total, dropped int64) { - return atomic.LoadInt64(&aic.totalFrames), atomic.LoadInt64(&aic.droppedFrames) + stats := GetFrameStats(&aic.totalFrames, &aic.droppedFrames) + return stats.Total, stats.Dropped } // GetDropRate returns the current frame drop rate as a percentage func (aic *AudioInputClient) GetDropRate() float64 { - total := atomic.LoadInt64(&aic.totalFrames) - dropped := atomic.LoadInt64(&aic.droppedFrames) - if total == 0 { - return 0.0 - } - return float64(dropped) / float64(total) * GetConfig().PercentageMultiplier + stats := GetFrameStats(&aic.totalFrames, &aic.droppedFrames) + return CalculateDropRate(stats) } // ResetStats resets frame statistics func (aic *AudioInputClient) ResetStats() { - atomic.StoreInt64(&aic.totalFrames, 0) - atomic.StoreInt64(&aic.droppedFrames, 0) + ResetFrameStats(&aic.totalFrames, &aic.droppedFrames) } // startReaderGoroutine starts the message reader goroutine diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go index 9092d17..8bb5612 100644 --- a/internal/audio/input_ipc_manager.go +++ b/internal/audio/input_ipc_manager.go @@ -35,7 +35,13 @@ func (aim *AudioInputIPCManager) Start() error { err := aim.supervisor.Start() if err != nil { + // Ensure proper cleanup on supervisor start failure atomic.StoreInt32(&aim.running, 0) + // Reset metrics on failed start + atomic.StoreInt64(&aim.metrics.FramesSent, 0) + atomic.StoreInt64(&aim.metrics.FramesDropped, 0) + atomic.StoreInt64(&aim.metrics.BytesProcessed, 0) + atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0) aim.logger.Error().Err(err).Msg("Failed to start audio input supervisor") return err } @@ -51,6 +57,7 @@ func (aim *AudioInputIPCManager) Start() error { err = aim.supervisor.SendConfig(config) if err != nil { + // Config send failure is not critical, log warning and continue aim.logger.Warn().Err(err).Msg("Failed to send initial config, will retry later") } diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go index 2f3d915..f3fe476 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc.go @@ -35,7 +35,7 @@ const ( OutputMessageTypeAck ) -// OutputIPCMessage represents an IPC message for audio output +// OutputIPCMessage represents a message sent over IPC type OutputIPCMessage struct { Magic uint32 Type OutputMessageType @@ -44,6 +44,27 @@ type OutputIPCMessage struct { Data []byte } +// Implement IPCMessage interface +func (msg *OutputIPCMessage) GetMagic() uint32 { + return msg.Magic +} + +func (msg *OutputIPCMessage) GetType() uint8 { + return uint8(msg.Type) +} + +func (msg *OutputIPCMessage) GetLength() uint32 { + return msg.Length +} + +func (msg *OutputIPCMessage) GetTimestamp() int64 { + return msg.Timestamp +} + +func (msg *OutputIPCMessage) GetData() []byte { + return msg.Data +} + // OutputOptimizedMessage represents a pre-allocated message for zero-allocation operations type OutputOptimizedMessage struct { header [17]byte // Pre-allocated header buffer (using constant value since array size must be compile-time constant) @@ -98,8 +119,8 @@ func (p *OutputMessagePool) Put(msg *OutputOptimizedMessage) { // Global message pool for output IPC var globalOutputMessagePool = NewOutputMessagePool(GetConfig().OutputMessagePoolSize) -type AudioServer struct { - // Atomic fields must be first for proper alignment on ARM +type AudioOutputServer struct { + // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) bufferSize int64 // Current buffer size (atomic) droppedFrames int64 // Dropped frames counter (atomic) totalFrames int64 // Total frames counter (atomic) @@ -122,7 +143,7 @@ type AudioServer struct { socketBufferConfig SocketBufferConfig } -func NewAudioServer() (*AudioServer, error) { +func NewAudioOutputServer() (*AudioOutputServer, error) { socketPath := getOutputSocketPath() // Remove existing socket if any os.Remove(socketPath) @@ -151,7 +172,7 @@ func NewAudioServer() (*AudioServer, error) { // Initialize socket buffer configuration socketBufferConfig := DefaultSocketBufferConfig() - return &AudioServer{ + return &AudioOutputServer{ listener: listener, messageChan: make(chan *OutputIPCMessage, initialBufferSize), stopChan: make(chan struct{}), @@ -162,7 +183,7 @@ func NewAudioServer() (*AudioServer, error) { }, nil } -func (s *AudioServer) Start() error { +func (s *AudioOutputServer) Start() error { s.mtx.Lock() defer s.mtx.Unlock() @@ -190,12 +211,14 @@ func (s *AudioServer) Start() error { } // acceptConnections accepts incoming connections -func (s *AudioServer) acceptConnections() { +func (s *AudioOutputServer) acceptConnections() { + logger := logging.GetDefaultLogger().With().Str("component", "audio-server").Logger() for s.running { conn, err := s.listener.Accept() if err != nil { if s.running { - // Only log error if we're still supposed to be running + // Log warning and retry on accept failure + logger.Warn().Err(err).Msg("Failed to accept connection, retrying") continue } return @@ -204,7 +227,6 @@ func (s *AudioServer) acceptConnections() { // Configure socket buffers for optimal performance if err := ConfigureSocketBuffers(conn, s.socketBufferConfig); err != nil { // Log warning but don't fail - socket buffer optimization is not critical - logger := logging.GetDefaultLogger().With().Str("component", "audio-server").Logger() logger.Warn().Err(err).Msg("Failed to configure socket buffers, continuing with defaults") } else { // Record socket buffer metrics for monitoring @@ -215,6 +237,7 @@ func (s *AudioServer) acceptConnections() { // Close existing connection if any if s.conn != nil { s.conn.Close() + s.conn = nil } s.conn = conn s.mtx.Unlock() @@ -222,7 +245,7 @@ func (s *AudioServer) acceptConnections() { } // startProcessorGoroutine starts the message processor -func (s *AudioServer) startProcessorGoroutine() { +func (s *AudioOutputServer) startProcessorGoroutine() { s.wg.Add(1) go func() { defer s.wg.Done() @@ -243,7 +266,7 @@ func (s *AudioServer) startProcessorGoroutine() { }() } -func (s *AudioServer) Stop() { +func (s *AudioOutputServer) Stop() { s.mtx.Lock() defer s.mtx.Unlock() @@ -271,7 +294,7 @@ func (s *AudioServer) Stop() { } } -func (s *AudioServer) Close() error { +func (s *AudioOutputServer) Close() error { s.Stop() if s.listener != nil { s.listener.Close() @@ -281,7 +304,7 @@ func (s *AudioServer) Close() error { return nil } -func (s *AudioServer) SendFrame(frame []byte) error { +func (s *AudioOutputServer) SendFrame(frame []byte) error { maxFrameSize := GetConfig().OutputMaxFrameSize if len(frame) > maxFrameSize { return fmt.Errorf("output frame size validation failed: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize) @@ -318,7 +341,7 @@ func (s *AudioServer) SendFrame(frame []byte) error { } // sendFrameToClient sends frame data directly to the connected client -func (s *AudioServer) sendFrameToClient(frame []byte) error { +func (s *AudioOutputServer) sendFrameToClient(frame []byte) error { s.mtx.Lock() defer s.mtx.Unlock() @@ -384,14 +407,13 @@ func (s *AudioServer) sendFrameToClient(frame []byte) error { } // GetServerStats returns server performance statistics -func (s *AudioServer) GetServerStats() (total, dropped int64, bufferSize int64) { - return atomic.LoadInt64(&s.totalFrames), - atomic.LoadInt64(&s.droppedFrames), - atomic.LoadInt64(&s.bufferSize) +func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize int64) { + stats := GetFrameStats(&s.totalFrames, &s.droppedFrames) + return stats.Total, stats.Dropped, atomic.LoadInt64(&s.bufferSize) } -type AudioClient struct { - // Atomic fields must be first for proper alignment on ARM +type AudioOutputClient struct { + // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) droppedFrames int64 // Atomic counter for dropped frames totalFrames int64 // Atomic counter for total frames @@ -400,12 +422,12 @@ type AudioClient struct { running bool } -func NewAudioClient() *AudioClient { - return &AudioClient{} +func NewAudioOutputClient() *AudioOutputClient { + return &AudioOutputClient{} } // Connect connects to the audio output server -func (c *AudioClient) Connect() error { +func (c *AudioOutputClient) Connect() error { c.mtx.Lock() defer c.mtx.Unlock() @@ -437,7 +459,7 @@ func (c *AudioClient) Connect() error { } // Disconnect disconnects from the audio output server -func (c *AudioClient) Disconnect() { +func (c *AudioOutputClient) Disconnect() { c.mtx.Lock() defer c.mtx.Unlock() @@ -453,18 +475,18 @@ func (c *AudioClient) Disconnect() { } // IsConnected returns whether the client is connected -func (c *AudioClient) IsConnected() bool { +func (c *AudioOutputClient) IsConnected() bool { c.mtx.Lock() defer c.mtx.Unlock() return c.running && c.conn != nil } -func (c *AudioClient) Close() error { +func (c *AudioOutputClient) Close() error { c.Disconnect() return nil } -func (c *AudioClient) ReceiveFrame() ([]byte, error) { +func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { c.mtx.Lock() defer c.mtx.Unlock() @@ -511,9 +533,9 @@ func (c *AudioClient) ReceiveFrame() ([]byte, error) { } // GetClientStats returns client performance statistics -func (c *AudioClient) GetClientStats() (total, dropped int64) { - return atomic.LoadInt64(&c.totalFrames), - atomic.LoadInt64(&c.droppedFrames) +func (c *AudioOutputClient) GetClientStats() (total, dropped int64) { + stats := GetFrameStats(&c.totalFrames, &c.droppedFrames) + return stats.Total, stats.Dropped } // Helper functions diff --git a/internal/audio/ipc_common.go b/internal/audio/ipc_common.go new file mode 100644 index 0000000..605fd99 --- /dev/null +++ b/internal/audio/ipc_common.go @@ -0,0 +1,238 @@ +package audio + +import ( + "context" + "encoding/binary" + "fmt" + "net" + "sync" + "sync/atomic" + "time" +) + +// Common IPC message interface +type IPCMessage interface { + GetMagic() uint32 + GetType() uint8 + GetLength() uint32 + GetTimestamp() int64 + GetData() []byte +} + +// Common optimized message structure +type OptimizedMessage struct { + header [17]byte // Pre-allocated header buffer + data []byte // Reusable data buffer +} + +// Generic message pool for both input and output +type GenericMessagePool struct { + // 64-bit fields must be first for proper alignment on ARM + hitCount int64 // Pool hit counter (atomic) + missCount int64 // Pool miss counter (atomic) + + pool chan *OptimizedMessage + preallocated []*OptimizedMessage // Pre-allocated messages + preallocSize int + maxPoolSize int + mutex sync.RWMutex +} + +// NewGenericMessagePool creates a new generic message pool +func NewGenericMessagePool(size int) *GenericMessagePool { + pool := &GenericMessagePool{ + pool: make(chan *OptimizedMessage, size), + preallocSize: size / 4, // 25% pre-allocated for immediate use + maxPoolSize: size, + } + + // Pre-allocate some messages for immediate use + pool.preallocated = make([]*OptimizedMessage, pool.preallocSize) + for i := 0; i < pool.preallocSize; i++ { + pool.preallocated[i] = &OptimizedMessage{ + data: make([]byte, 0, GetConfig().MaxFrameSize), + } + } + + // Fill the channel pool + for i := 0; i < size-pool.preallocSize; i++ { + select { + case pool.pool <- &OptimizedMessage{ + data: make([]byte, 0, GetConfig().MaxFrameSize), + }: + default: + break + } + } + + return pool +} + +// Get retrieves an optimized message from the pool +func (mp *GenericMessagePool) Get() *OptimizedMessage { + // Try pre-allocated first (fastest path) + mp.mutex.Lock() + if len(mp.preallocated) > 0 { + msg := mp.preallocated[len(mp.preallocated)-1] + mp.preallocated = mp.preallocated[:len(mp.preallocated)-1] + mp.mutex.Unlock() + atomic.AddInt64(&mp.hitCount, 1) + return msg + } + mp.mutex.Unlock() + + // Try channel pool + select { + case msg := <-mp.pool: + atomic.AddInt64(&mp.hitCount, 1) + return msg + default: + // Pool empty, create new message + atomic.AddInt64(&mp.missCount, 1) + return &OptimizedMessage{ + data: make([]byte, 0, GetConfig().MaxFrameSize), + } + } +} + +// Put returns an optimized message to the pool +func (mp *GenericMessagePool) Put(msg *OptimizedMessage) { + if msg == nil { + return + } + + // Reset the message for reuse + msg.data = msg.data[:0] + + // Try to return to pre-allocated slice first + mp.mutex.Lock() + if len(mp.preallocated) < mp.preallocSize { + mp.preallocated = append(mp.preallocated, msg) + mp.mutex.Unlock() + return + } + mp.mutex.Unlock() + + // Try to return to channel pool + select { + case mp.pool <- msg: + // Successfully returned to pool + default: + // Pool full, let GC handle it + } +} + +// GetStats returns pool statistics +func (mp *GenericMessagePool) GetStats() (hitCount, missCount int64, hitRate float64) { + hits := atomic.LoadInt64(&mp.hitCount) + misses := atomic.LoadInt64(&mp.missCount) + total := hits + misses + if total > 0 { + hitRate = float64(hits) / float64(total) * 100 + } + return hits, misses, hitRate +} + +// Common write message function +func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, droppedFramesCounter *int64) error { + if conn == nil { + return fmt.Errorf("connection is nil") + } + + // Get optimized message from pool for header preparation + optMsg := pool.Get() + defer pool.Put(optMsg) + + // Prepare header in pre-allocated buffer + binary.LittleEndian.PutUint32(optMsg.header[0:4], msg.GetMagic()) + optMsg.header[4] = msg.GetType() + binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength()) + binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp())) + + // Use non-blocking write with timeout + ctx, cancel := context.WithTimeout(context.Background(), GetConfig().WriteTimeout) + defer cancel() + + // Create a channel to signal write completion + done := make(chan error, 1) + go func() { + // Write header using pre-allocated buffer + _, err := conn.Write(optMsg.header[:]) + if err != nil { + done <- err + return + } + + // Write data if present + if msg.GetLength() > 0 && msg.GetData() != nil { + _, err = conn.Write(msg.GetData()) + if err != nil { + done <- err + return + } + } + done <- nil + }() + + // Wait for completion or timeout + select { + case err := <-done: + if err != nil { + if droppedFramesCounter != nil { + atomic.AddInt64(droppedFramesCounter, 1) + } + return err + } + return nil + case <-ctx.Done(): + // Timeout occurred - drop frame to prevent blocking + if droppedFramesCounter != nil { + atomic.AddInt64(droppedFramesCounter, 1) + } + return fmt.Errorf("write timeout - frame dropped") + } +} + +// Common connection acceptance with retry logic +func AcceptConnectionWithRetry(listener net.Listener, maxRetries int, retryDelay time.Duration) (net.Conn, error) { + var lastErr error + for i := 0; i < maxRetries; i++ { + conn, err := listener.Accept() + if err == nil { + return conn, nil + } + lastErr = err + if i < maxRetries-1 { + time.Sleep(retryDelay) + } + } + return nil, fmt.Errorf("failed to accept connection after %d retries: %w", maxRetries, lastErr) +} + +// Common frame statistics structure +type FrameStats struct { + Total int64 + Dropped int64 +} + +// GetFrameStats safely retrieves frame statistics +func GetFrameStats(totalCounter, droppedCounter *int64) FrameStats { + return FrameStats{ + Total: atomic.LoadInt64(totalCounter), + Dropped: atomic.LoadInt64(droppedCounter), + } +} + +// CalculateDropRate calculates the drop rate percentage +func CalculateDropRate(stats FrameStats) float64 { + if stats.Total == 0 { + return 0.0 + } + return float64(stats.Dropped) / float64(stats.Total) * 100.0 +} + +// ResetFrameStats resets frame counters +func ResetFrameStats(totalCounter, droppedCounter *int64) { + atomic.StoreInt64(totalCounter, 0) + atomic.StoreInt64(droppedCounter, 0) +} diff --git a/internal/audio/output_server_main.go b/internal/audio/output_server_main.go index 489cb94..eac483e 100644 --- a/internal/audio/output_server_main.go +++ b/internal/audio/output_server_main.go @@ -17,7 +17,7 @@ func RunAudioOutputServer() error { logger.Info().Msg("Starting audio output server subprocess") // Create audio server - server, err := NewAudioServer() + server, err := NewAudioOutputServer() if err != nil { logger.Error().Err(err).Msg("failed to create audio server") return err diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 26c8654..094f7aa 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -14,13 +14,13 @@ import ( // OutputStreamer manages high-performance audio output streaming type OutputStreamer struct { - // Atomic fields must be first for proper alignment on ARM + // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) processedFrames int64 // Total processed frames counter (atomic) droppedFrames int64 // Dropped frames counter (atomic) processingTime int64 // Average processing time in nanoseconds (atomic) lastStatsTime int64 // Last statistics update time (atomic) - client *AudioClient + client *AudioOutputClient bufferPool *AudioBufferPool ctx context.Context cancel context.CancelFunc @@ -49,7 +49,7 @@ func getOutputStreamingLogger() *zerolog.Logger { } func NewOutputStreamer() (*OutputStreamer, error) { - client := NewAudioClient() + client := NewAudioOutputClient() // Get initial batch size from adaptive buffer manager adaptiveManager := GetAdaptiveBufferManager() diff --git a/internal/audio/relay.go b/internal/audio/relay.go index 65a70f5..5b27d4f 100644 --- a/internal/audio/relay.go +++ b/internal/audio/relay.go @@ -19,7 +19,7 @@ type AudioRelay struct { framesRelayed int64 framesDropped int64 - client *AudioClient + client *AudioOutputClient ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -60,7 +60,7 @@ func (r *AudioRelay) Start(audioTrack AudioTrackWriter, config AudioConfig) erro } // Create audio client to connect to subprocess - client := NewAudioClient() + client := NewAudioOutputClient() r.client = client r.audioTrack = audioTrack r.config = config diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go index 56a0bf1..e5fbac3 100644 --- a/internal/audio/supervisor.go +++ b/internal/audio/supervisor.go @@ -34,8 +34,8 @@ func getMaxRestartDelay() time.Duration { return GetConfig().MaxRestartDelay } -// AudioServerSupervisor manages the audio server subprocess lifecycle -type AudioServerSupervisor struct { +// AudioOutputSupervisor manages the audio output server subprocess lifecycle +type AudioOutputSupervisor struct { ctx context.Context cancel context.CancelFunc logger *zerolog.Logger @@ -64,12 +64,12 @@ type AudioServerSupervisor struct { onRestart func(attempt int, delay time.Duration) } -// NewAudioServerSupervisor creates a new audio server supervisor -func NewAudioServerSupervisor() *AudioServerSupervisor { +// NewAudioOutputSupervisor creates a new audio output server supervisor +func NewAudioOutputSupervisor() *AudioOutputSupervisor { ctx, cancel := context.WithCancel(context.Background()) logger := logging.GetDefaultLogger().With().Str("component", "audio-supervisor").Logger() - return &AudioServerSupervisor{ + return &AudioOutputSupervisor{ ctx: ctx, cancel: cancel, logger: &logger, @@ -80,7 +80,7 @@ func NewAudioServerSupervisor() *AudioServerSupervisor { } // SetCallbacks sets optional callbacks for process lifecycle events -func (s *AudioServerSupervisor) SetCallbacks( +func (s *AudioOutputSupervisor) SetCallbacks( onStart func(pid int), onExit func(pid int, exitCode int, crashed bool), onRestart func(attempt int, delay time.Duration), @@ -93,8 +93,8 @@ func (s *AudioServerSupervisor) SetCallbacks( s.onRestart = onRestart } -// Start begins supervising the audio server process -func (s *AudioServerSupervisor) Start() error { +// Start begins supervising the audio output server process +func (s *AudioOutputSupervisor) Start() error { if !atomic.CompareAndSwapInt32(&s.running, 0, 1) { return fmt.Errorf("supervisor already running") } @@ -107,6 +107,10 @@ func (s *AudioServerSupervisor) Start() error { s.stopChan = make(chan struct{}) // Recreate context as well since it might have been cancelled s.ctx, s.cancel = context.WithCancel(context.Background()) + // Reset restart tracking on start + s.restartAttempts = s.restartAttempts[:0] + s.lastExitCode = 0 + s.lastExitTime = time.Time{} s.mutex.Unlock() // Start the supervision loop @@ -116,7 +120,7 @@ func (s *AudioServerSupervisor) Start() error { } // Stop gracefully stops the audio server and supervisor -func (s *AudioServerSupervisor) Stop() error { +func (s *AudioOutputSupervisor) Stop() error { if !atomic.CompareAndSwapInt32(&s.running, 1, 0) { return nil // Already stopped } @@ -140,26 +144,26 @@ func (s *AudioServerSupervisor) Stop() error { } // IsRunning returns true if the supervisor is running -func (s *AudioServerSupervisor) IsRunning() bool { +func (s *AudioOutputSupervisor) IsRunning() bool { return atomic.LoadInt32(&s.running) == 1 } // GetProcessPID returns the current process PID (0 if not running) -func (s *AudioServerSupervisor) GetProcessPID() int { +func (s *AudioOutputSupervisor) GetProcessPID() int { s.mutex.RLock() defer s.mutex.RUnlock() return s.processPID } // GetLastExitInfo returns information about the last process exit -func (s *AudioServerSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) { +func (s *AudioOutputSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) { s.mutex.RLock() defer s.mutex.RUnlock() return s.lastExitCode, s.lastExitTime } // GetProcessMetrics returns current process metrics if the process is running -func (s *AudioServerSupervisor) GetProcessMetrics() *ProcessMetrics { +func (s *AudioOutputSupervisor) GetProcessMetrics() *ProcessMetrics { s.mutex.RLock() pid := s.processPID s.mutex.RUnlock() @@ -178,7 +182,7 @@ func (s *AudioServerSupervisor) GetProcessMetrics() *ProcessMetrics { } // supervisionLoop is the main supervision loop -func (s *AudioServerSupervisor) supervisionLoop() { +func (s *AudioOutputSupervisor) supervisionLoop() { defer func() { close(s.processDone) s.logger.Info().Msg("audio server supervision ended") @@ -252,7 +256,7 @@ func (s *AudioServerSupervisor) supervisionLoop() { } // startProcess starts the audio server process -func (s *AudioServerSupervisor) startProcess() error { +func (s *AudioOutputSupervisor) startProcess() error { execPath, err := os.Executable() if err != nil { return fmt.Errorf("failed to get executable path: %w", err) @@ -285,7 +289,7 @@ func (s *AudioServerSupervisor) startProcess() error { } // waitForProcessExit waits for the current process to exit and logs the result -func (s *AudioServerSupervisor) waitForProcessExit() { +func (s *AudioOutputSupervisor) waitForProcessExit() { s.mutex.RLock() cmd := s.cmd pid := s.processPID @@ -338,7 +342,7 @@ func (s *AudioServerSupervisor) waitForProcessExit() { } // terminateProcess gracefully terminates the current process -func (s *AudioServerSupervisor) terminateProcess() { +func (s *AudioOutputSupervisor) terminateProcess() { s.mutex.RLock() cmd := s.cmd pid := s.processPID @@ -365,14 +369,14 @@ func (s *AudioServerSupervisor) terminateProcess() { select { case <-done: s.logger.Info().Int("pid", pid).Msg("audio server process terminated gracefully") - case <-time.After(GetConfig().InputSupervisorTimeout): + case <-time.After(GetConfig().OutputSupervisorTimeout): s.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL") s.forceKillProcess() } } // forceKillProcess forcefully kills the current process -func (s *AudioServerSupervisor) forceKillProcess() { +func (s *AudioOutputSupervisor) forceKillProcess() { s.mutex.RLock() cmd := s.cmd pid := s.processPID @@ -389,7 +393,7 @@ func (s *AudioServerSupervisor) forceKillProcess() { } // shouldRestart determines if the process should be restarted -func (s *AudioServerSupervisor) shouldRestart() bool { +func (s *AudioOutputSupervisor) shouldRestart() bool { if atomic.LoadInt32(&s.running) == 0 { return false // Supervisor is stopping } @@ -411,7 +415,7 @@ func (s *AudioServerSupervisor) shouldRestart() bool { } // recordRestartAttempt records a restart attempt -func (s *AudioServerSupervisor) recordRestartAttempt() { +func (s *AudioOutputSupervisor) recordRestartAttempt() { s.mutex.Lock() defer s.mutex.Unlock() @@ -419,7 +423,7 @@ func (s *AudioServerSupervisor) recordRestartAttempt() { } // calculateRestartDelay calculates the delay before next restart attempt -func (s *AudioServerSupervisor) calculateRestartDelay() time.Duration { +func (s *AudioOutputSupervisor) calculateRestartDelay() time.Duration { s.mutex.RLock() defer s.mutex.RUnlock() diff --git a/internal/audio/validation.go b/internal/audio/validation.go new file mode 100644 index 0000000..0b439c9 --- /dev/null +++ b/internal/audio/validation.go @@ -0,0 +1,177 @@ +package audio + +import ( + "errors" + "time" +) + +// Validation errors +var ( + ErrInvalidAudioQuality = errors.New("invalid audio quality level") + ErrInvalidFrameSize = errors.New("invalid frame size") + ErrInvalidFrameData = errors.New("invalid frame data") + ErrInvalidBufferSize = errors.New("invalid buffer size") + ErrInvalidPriority = errors.New("invalid priority value") + ErrInvalidLatency = errors.New("invalid latency value") + ErrInvalidConfiguration = errors.New("invalid configuration") + ErrInvalidSocketConfig = errors.New("invalid socket configuration") + ErrInvalidMetricsInterval = errors.New("invalid metrics interval") + ErrInvalidSampleRate = errors.New("invalid sample rate") + ErrInvalidChannels = errors.New("invalid channels") +) + +// ValidateAudioQuality validates audio quality enum values +func ValidateAudioQuality(quality AudioQuality) error { + switch quality { + case AudioQualityLow, AudioQualityMedium, AudioQualityHigh, AudioQualityUltra: + return nil + default: + return ErrInvalidAudioQuality + } +} + +// ValidateFrameData validates audio frame data +func ValidateFrameData(data []byte) error { + if len(data) == 0 { + return ErrInvalidFrameData + } + // Use a reasonable default if config is not available + maxFrameSize := 4096 + if config := GetConfig(); config != nil { + maxFrameSize = config.MaxAudioFrameSize + } + if len(data) > maxFrameSize { + return ErrInvalidFrameSize + } + return nil +} + +// ValidateZeroCopyFrame validates zero-copy audio frame +func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error { + if frame == nil { + return ErrInvalidFrameData + } + data := frame.Data() + if len(data) == 0 { + return ErrInvalidFrameData + } + // Use a reasonable default if config is not available + maxFrameSize := 4096 + if config := GetConfig(); config != nil { + maxFrameSize = config.MaxAudioFrameSize + } + if len(data) > maxFrameSize { + return ErrInvalidFrameSize + } + return nil +} + +// ValidateBufferSize validates buffer size parameters +func ValidateBufferSize(size int) error { + if size <= 0 { + return ErrInvalidBufferSize + } + // Use a reasonable default if config is not available + maxBuffer := 262144 // 256KB default + if config := GetConfig(); config != nil { + maxBuffer = config.SocketMaxBuffer + } + if size > maxBuffer { + return ErrInvalidBufferSize + } + return nil +} + +// ValidateThreadPriority validates thread priority values +func ValidateThreadPriority(priority int) error { + // Use reasonable defaults if config is not available + minPriority := -20 + maxPriority := 99 + if config := GetConfig(); config != nil { + minPriority = config.MinNiceValue + maxPriority = config.RTAudioHighPriority + } + if priority < minPriority || priority > maxPriority { + return ErrInvalidPriority + } + return nil +} + +// ValidateLatency validates latency values +func ValidateLatency(latency time.Duration) error { + if latency < 0 { + return ErrInvalidLatency + } + // Use a reasonable default if config is not available + maxLatency := 500 * time.Millisecond + if config := GetConfig(); config != nil { + maxLatency = config.MaxLatency + } + if latency > maxLatency { + return ErrInvalidLatency + } + return nil +} + +// ValidateMetricsInterval validates metrics update interval +func ValidateMetricsInterval(interval time.Duration) error { + // Use reasonable defaults if config is not available + minInterval := 100 * time.Millisecond + maxInterval := 10 * time.Second + if config := GetConfig(); config != nil { + minInterval = config.MinMetricsUpdateInterval + maxInterval = config.MaxMetricsUpdateInterval + } + if interval < minInterval { + return ErrInvalidMetricsInterval + } + if interval > maxInterval { + return ErrInvalidMetricsInterval + } + return nil +} + +// ValidateAdaptiveBufferConfig validates adaptive buffer configuration +func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error { + if minSize <= 0 || maxSize <= 0 || defaultSize <= 0 { + return ErrInvalidBufferSize + } + if minSize >= maxSize { + return ErrInvalidBufferSize + } + if defaultSize < minSize || defaultSize > maxSize { + return ErrInvalidBufferSize + } + // Validate against global limits + maxBuffer := 262144 // 256KB default + if config := GetConfig(); config != nil { + maxBuffer = config.SocketMaxBuffer + } + if maxSize > maxBuffer { + return ErrInvalidBufferSize + } + return nil +} + +// ValidateInputIPCConfig validates input IPC configuration +func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error { + // Use reasonable defaults if config is not available + minSampleRate := 8000 + maxSampleRate := 48000 + maxChannels := 8 + if config := GetConfig(); config != nil { + minSampleRate = config.MinSampleRate + maxSampleRate = config.MaxSampleRate + maxChannels = config.MaxChannels + } + if sampleRate < minSampleRate || sampleRate > maxSampleRate { + return ErrInvalidSampleRate + } + if channels < 1 || channels > maxChannels { + return ErrInvalidChannels + } + if frameSize <= 0 { + return ErrInvalidFrameSize + } + return nil +} diff --git a/main.go b/main.go index 961f2aa..f9de656 100644 --- a/main.go +++ b/main.go @@ -18,7 +18,7 @@ var ( appCtx context.Context isAudioServer bool audioProcessDone chan struct{} - audioSupervisor *audio.AudioServerSupervisor + audioSupervisor *audio.AudioOutputSupervisor ) // runAudioServer is now handled by audio.RunAudioOutputServer @@ -36,7 +36,7 @@ func startAudioSubprocess() error { audio.StartAdaptiveBuffering() // Create audio server supervisor - audioSupervisor = audio.NewAudioServerSupervisor() + audioSupervisor = audio.NewAudioOutputSupervisor() // Set the global supervisor for access from audio package audio.SetAudioOutputSupervisor(audioSupervisor)