From c40459664f3509d16cac01f045042e640954bafa Mon Sep 17 00:00:00 2001 From: Alex P Date: Thu, 4 Sep 2025 10:51:07 +0000 Subject: [PATCH] Audio Input resiliency. Make sure the IPC client always recovers --- internal/audio/config_constants.go | 4 +- internal/audio/input_ipc.go | 33 ++++++++-- internal/audio/input_ipc_manager.go | 95 +++++++++++++++++++++++++++++ internal/audio/input_supervisor.go | 5 +- internal/audio/ipc_common.go | 53 +++++++--------- 5 files changed, 151 insertions(+), 39 deletions(-) diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index e9ad5605..df680716 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -1900,8 +1900,8 @@ func DefaultAudioConfig() *AudioConfigConstants { // WriteTimeout defines maximum wait time for IPC write operations. // Used in: ipc_manager.go for preventing indefinite blocking on writes // Impact: Balances responsiveness with reliability for IPC operations - // Default 5 seconds provides reasonable timeout for most system conditions - WriteTimeout: 5 * time.Second, + // Optimized to 50ms for real-time audio processing to reduce latency + WriteTimeout: 50 * time.Millisecond, // MaxDroppedFrames defines threshold for dropped frame error handling. // Used in: ipc_manager.go for quality degradation detection and recovery diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index ad9b5e3e..d07ff7c2 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -243,12 +243,33 @@ type AudioInputServer struct { // NewAudioInputServer creates a new audio input server func NewAudioInputServer() (*AudioInputServer, error) { socketPath := getInputSocketPath() - // Remove existing socket if any - os.Remove(socketPath) - listener, err := net.Listen("unix", socketPath) + // Retry socket creation with cleanup to handle race conditions + var listener net.Listener + var err error + for i := 0; i < 3; i++ { + // Remove existing socket if any + os.Remove(socketPath) + + // Small delay to ensure cleanup completes + if i > 0 { + time.Sleep(10 * time.Millisecond) + } + + listener, err = net.Listen("unix", socketPath) + if err == nil { + break + } + + // Log retry attempt + if i < 2 { + logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger() + logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying") + } + } + if err != nil { - return nil, fmt.Errorf("failed to create unix socket: %w", err) + return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err) } // Get initial buffer size from adaptive buffer manager @@ -320,7 +341,11 @@ func (ais *AudioInputServer) Stop() { if ais.listener != nil { ais.listener.Close() + ais.listener = nil } + + // Remove socket file to prevent restart issues + os.Remove(getInputSocketPath()) } // Close closes the server and cleans up resources diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go index ec571811..74c78688 100644 --- a/internal/audio/input_ipc_manager.go +++ b/internal/audio/input_ipc_manager.go @@ -1,6 +1,7 @@ package audio import ( + "fmt" "sync/atomic" "time" @@ -15,6 +16,12 @@ type AudioInputIPCManager struct { supervisor *AudioInputSupervisor logger zerolog.Logger running int32 + + // Connection monitoring and recovery + monitoringEnabled bool + lastConnectionCheck time.Time + connectionFailures int32 + recoveryInProgress int32 } // NewAudioInputIPCManager creates a new IPC-based audio input manager @@ -33,10 +40,17 @@ func (aim *AudioInputIPCManager) Start() error { aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component") + // Initialize connection monitoring + aim.monitoringEnabled = true + aim.lastConnectionCheck = time.Now() + atomic.StoreInt32(&aim.connectionFailures, 0) + atomic.StoreInt32(&aim.recoveryInProgress, 0) + err := aim.supervisor.Start() if err != nil { // Ensure proper cleanup on supervisor start failure atomic.StoreInt32(&aim.running, 0) + aim.monitoringEnabled = false // Reset metrics on failed start aim.resetMetrics() aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor") @@ -80,6 +94,10 @@ func (aim *AudioInputIPCManager) Stop() { } aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component") + + // Disable connection monitoring + aim.monitoringEnabled = false + aim.supervisor.Stop() aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped") } @@ -102,6 +120,11 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { return nil // Empty frame, ignore } + // Check connection health periodically + if aim.monitoringEnabled { + aim.checkConnectionHealth() + } + // Validate frame data if err := ValidateAudioFrame(frame); err != nil { atomic.AddInt64(&aim.metrics.FramesDropped, 1) @@ -122,10 +145,21 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { if err != nil { // Count as dropped frame atomic.AddInt64(&aim.metrics.FramesDropped, 1) + + // Handle connection failure + if aim.monitoringEnabled { + aim.handleConnectionFailure(err) + } + aim.logger.Debug().Err(err).Msg("failed to send frame via IPC") return err } + // Reset connection failure counter on successful send + if aim.monitoringEnabled { + atomic.StoreInt32(&aim.connectionFailures, 0) + } + // Calculate and update latency (end-to-end IPC transmission time) latency := time.Since(startTime) aim.updateLatencyMetrics(latency) @@ -215,6 +249,67 @@ func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) { } } +// checkConnectionHealth monitors the IPC connection health +func (aim *AudioInputIPCManager) checkConnectionHealth() { + now := time.Now() + + // Check connection every 5 seconds + if now.Sub(aim.lastConnectionCheck) < 5*time.Second { + return + } + + aim.lastConnectionCheck = now + + // Check if supervisor and client are connected + if !aim.supervisor.IsConnected() { + aim.logger.Warn().Str("component", AudioInputIPCComponent).Msg("IPC connection lost, attempting recovery") + aim.handleConnectionFailure(fmt.Errorf("connection health check failed")) + } +} + +// handleConnectionFailure manages connection failure recovery +func (aim *AudioInputIPCManager) handleConnectionFailure(err error) { + // Increment failure counter + failures := atomic.AddInt32(&aim.connectionFailures, 1) + + // Prevent multiple concurrent recovery attempts + if !atomic.CompareAndSwapInt32(&aim.recoveryInProgress, 0, 1) { + return // Recovery already in progress + } + + // Start recovery in a separate goroutine to avoid blocking audio processing + go func() { + defer atomic.StoreInt32(&aim.recoveryInProgress, 0) + + aim.logger.Info(). + Int32("failures", failures). + Err(err). + Str("component", AudioInputIPCComponent). + Msg("attempting IPC connection recovery") + + // Stop and restart the supervisor to recover the connection + aim.supervisor.Stop() + + // Brief delay before restart + time.Sleep(100 * time.Millisecond) + + // Attempt to restart + if restartErr := aim.supervisor.Start(); restartErr != nil { + aim.logger.Error(). + Err(restartErr). + Str("component", AudioInputIPCComponent). + Msg("failed to recover IPC connection") + } else { + aim.logger.Info(). + Str("component", AudioInputIPCComponent). + Msg("IPC connection recovered successfully") + + // Reset failure counter on successful recovery + atomic.StoreInt32(&aim.connectionFailures, 0) + } + }() +} + // GetDetailedMetrics returns comprehensive performance metrics func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) { metrics := aim.GetMetrics() diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index fcb71b21..0d709f63 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -357,9 +357,12 @@ func (ais *AudioInputSupervisor) monitorSubprocess() { // connectClient attempts to connect the client to the server func (ais *AudioInputSupervisor) connectClient() { - // Wait briefly for the server to start (reduced from 500ms) + // Wait briefly for the server to start and create socket time.Sleep(GetConfig().DefaultSleepDuration) + // Additional small delay to ensure socket is ready after restart + time.Sleep(20 * time.Millisecond) + err := ais.client.Connect() if err != nil { ais.logger.Error().Err(err).Msg("Failed to connect to audio input server") diff --git a/internal/audio/ipc_common.go b/internal/audio/ipc_common.go index 605fd992..38b595ec 100644 --- a/internal/audio/ipc_common.go +++ b/internal/audio/ipc_common.go @@ -1,7 +1,6 @@ package audio import ( - "context" "encoding/binary" "fmt" "net" @@ -149,48 +148,38 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength()) binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp())) - // Use non-blocking write with timeout - ctx, cancel := context.WithTimeout(context.Background(), GetConfig().WriteTimeout) - defer cancel() - - // Create a channel to signal write completion - done := make(chan error, 1) - go func() { - // Write header using pre-allocated buffer - _, err := conn.Write(optMsg.header[:]) - if err != nil { - done <- err - return + // Set write deadline for timeout handling (more efficient than goroutines) + if deadline := time.Now().Add(GetConfig().WriteTimeout); deadline.After(time.Now()) { + if err := conn.SetWriteDeadline(deadline); err != nil { + // If we can't set deadline, proceed without it + // This maintains compatibility with connections that don't support deadlines + _ = err // Explicitly ignore error for linter } + } - // Write data if present - if msg.GetLength() > 0 && msg.GetData() != nil { - _, err = conn.Write(msg.GetData()) - if err != nil { - done <- err - return - } + // Write header using pre-allocated buffer (synchronous for better performance) + _, err := conn.Write(optMsg.header[:]) + if err != nil { + if droppedFramesCounter != nil { + atomic.AddInt64(droppedFramesCounter, 1) } - done <- nil - }() + return err + } - // Wait for completion or timeout - select { - case err := <-done: + // Write data if present + if msg.GetLength() > 0 && msg.GetData() != nil { + _, err = conn.Write(msg.GetData()) if err != nil { if droppedFramesCounter != nil { atomic.AddInt64(droppedFramesCounter, 1) } return err } - return nil - case <-ctx.Done(): - // Timeout occurred - drop frame to prevent blocking - if droppedFramesCounter != nil { - atomic.AddInt64(droppedFramesCounter, 1) - } - return fmt.Errorf("write timeout - frame dropped") } + + // Clear write deadline after successful write + _ = conn.SetWriteDeadline(time.Time{}) // Ignore error as this is cleanup + return nil } // Common connection acceptance with retry logic