Audio Input resiliency. Make sure the IPC client always recovers

This commit is contained in:
Alex P 2025-09-04 10:51:07 +00:00
parent cdf6731639
commit c40459664f
5 changed files with 151 additions and 39 deletions

View File

@ -1900,8 +1900,8 @@ func DefaultAudioConfig() *AudioConfigConstants {
// WriteTimeout defines maximum wait time for IPC write operations.
// Used in: ipc_manager.go for preventing indefinite blocking on writes
// Impact: Balances responsiveness with reliability for IPC operations
// Default 5 seconds provides reasonable timeout for most system conditions
WriteTimeout: 5 * time.Second,
// Optimized to 50ms for real-time audio processing to reduce latency
WriteTimeout: 50 * time.Millisecond,
// MaxDroppedFrames defines threshold for dropped frame error handling.
// Used in: ipc_manager.go for quality degradation detection and recovery

View File

@ -243,12 +243,33 @@ type AudioInputServer struct {
// NewAudioInputServer creates a new audio input server
func NewAudioInputServer() (*AudioInputServer, error) {
socketPath := getInputSocketPath()
// Remove existing socket if any
os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath)
// Retry socket creation with cleanup to handle race conditions
var listener net.Listener
var err error
for i := 0; i < 3; i++ {
// Remove existing socket if any
os.Remove(socketPath)
// Small delay to ensure cleanup completes
if i > 0 {
time.Sleep(10 * time.Millisecond)
}
listener, err = net.Listen("unix", socketPath)
if err == nil {
break
}
// Log retry attempt
if i < 2 {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying")
}
}
if err != nil {
return nil, fmt.Errorf("failed to create unix socket: %w", err)
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
}
// Get initial buffer size from adaptive buffer manager
@ -320,7 +341,11 @@ func (ais *AudioInputServer) Stop() {
if ais.listener != nil {
ais.listener.Close()
ais.listener = nil
}
// Remove socket file to prevent restart issues
os.Remove(getInputSocketPath())
}
// Close closes the server and cleans up resources

View File

@ -1,6 +1,7 @@
package audio
import (
"fmt"
"sync/atomic"
"time"
@ -15,6 +16,12 @@ type AudioInputIPCManager struct {
supervisor *AudioInputSupervisor
logger zerolog.Logger
running int32
// Connection monitoring and recovery
monitoringEnabled bool
lastConnectionCheck time.Time
connectionFailures int32
recoveryInProgress int32
}
// NewAudioInputIPCManager creates a new IPC-based audio input manager
@ -33,10 +40,17 @@ func (aim *AudioInputIPCManager) Start() error {
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component")
// Initialize connection monitoring
aim.monitoringEnabled = true
aim.lastConnectionCheck = time.Now()
atomic.StoreInt32(&aim.connectionFailures, 0)
atomic.StoreInt32(&aim.recoveryInProgress, 0)
err := aim.supervisor.Start()
if err != nil {
// Ensure proper cleanup on supervisor start failure
atomic.StoreInt32(&aim.running, 0)
aim.monitoringEnabled = false
// Reset metrics on failed start
aim.resetMetrics()
aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor")
@ -80,6 +94,10 @@ func (aim *AudioInputIPCManager) Stop() {
}
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component")
// Disable connection monitoring
aim.monitoringEnabled = false
aim.supervisor.Stop()
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped")
}
@ -102,6 +120,11 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
return nil // Empty frame, ignore
}
// Check connection health periodically
if aim.monitoringEnabled {
aim.checkConnectionHealth()
}
// Validate frame data
if err := ValidateAudioFrame(frame); err != nil {
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
@ -122,10 +145,21 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
if err != nil {
// Count as dropped frame
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
// Handle connection failure
if aim.monitoringEnabled {
aim.handleConnectionFailure(err)
}
aim.logger.Debug().Err(err).Msg("failed to send frame via IPC")
return err
}
// Reset connection failure counter on successful send
if aim.monitoringEnabled {
atomic.StoreInt32(&aim.connectionFailures, 0)
}
// Calculate and update latency (end-to-end IPC transmission time)
latency := time.Since(startTime)
aim.updateLatencyMetrics(latency)
@ -215,6 +249,67 @@ func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) {
}
}
// checkConnectionHealth monitors the IPC connection health
func (aim *AudioInputIPCManager) checkConnectionHealth() {
now := time.Now()
// Check connection every 5 seconds
if now.Sub(aim.lastConnectionCheck) < 5*time.Second {
return
}
aim.lastConnectionCheck = now
// Check if supervisor and client are connected
if !aim.supervisor.IsConnected() {
aim.logger.Warn().Str("component", AudioInputIPCComponent).Msg("IPC connection lost, attempting recovery")
aim.handleConnectionFailure(fmt.Errorf("connection health check failed"))
}
}
// handleConnectionFailure manages connection failure recovery
func (aim *AudioInputIPCManager) handleConnectionFailure(err error) {
// Increment failure counter
failures := atomic.AddInt32(&aim.connectionFailures, 1)
// Prevent multiple concurrent recovery attempts
if !atomic.CompareAndSwapInt32(&aim.recoveryInProgress, 0, 1) {
return // Recovery already in progress
}
// Start recovery in a separate goroutine to avoid blocking audio processing
go func() {
defer atomic.StoreInt32(&aim.recoveryInProgress, 0)
aim.logger.Info().
Int32("failures", failures).
Err(err).
Str("component", AudioInputIPCComponent).
Msg("attempting IPC connection recovery")
// Stop and restart the supervisor to recover the connection
aim.supervisor.Stop()
// Brief delay before restart
time.Sleep(100 * time.Millisecond)
// Attempt to restart
if restartErr := aim.supervisor.Start(); restartErr != nil {
aim.logger.Error().
Err(restartErr).
Str("component", AudioInputIPCComponent).
Msg("failed to recover IPC connection")
} else {
aim.logger.Info().
Str("component", AudioInputIPCComponent).
Msg("IPC connection recovered successfully")
// Reset failure counter on successful recovery
atomic.StoreInt32(&aim.connectionFailures, 0)
}
}()
}
// GetDetailedMetrics returns comprehensive performance metrics
func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) {
metrics := aim.GetMetrics()

View File

@ -357,9 +357,12 @@ func (ais *AudioInputSupervisor) monitorSubprocess() {
// connectClient attempts to connect the client to the server
func (ais *AudioInputSupervisor) connectClient() {
// Wait briefly for the server to start (reduced from 500ms)
// Wait briefly for the server to start and create socket
time.Sleep(GetConfig().DefaultSleepDuration)
// Additional small delay to ensure socket is ready after restart
time.Sleep(20 * time.Millisecond)
err := ais.client.Connect()
if err != nil {
ais.logger.Error().Err(err).Msg("Failed to connect to audio input server")

View File

@ -1,7 +1,6 @@
package audio
import (
"context"
"encoding/binary"
"fmt"
"net"
@ -149,48 +148,38 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr
binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength())
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp()))
// Use non-blocking write with timeout
ctx, cancel := context.WithTimeout(context.Background(), GetConfig().WriteTimeout)
defer cancel()
// Create a channel to signal write completion
done := make(chan error, 1)
go func() {
// Write header using pre-allocated buffer
_, err := conn.Write(optMsg.header[:])
if err != nil {
done <- err
return
// Set write deadline for timeout handling (more efficient than goroutines)
if deadline := time.Now().Add(GetConfig().WriteTimeout); deadline.After(time.Now()) {
if err := conn.SetWriteDeadline(deadline); err != nil {
// If we can't set deadline, proceed without it
// This maintains compatibility with connections that don't support deadlines
_ = err // Explicitly ignore error for linter
}
}
// Write data if present
if msg.GetLength() > 0 && msg.GetData() != nil {
_, err = conn.Write(msg.GetData())
if err != nil {
done <- err
return
}
// Write header using pre-allocated buffer (synchronous for better performance)
_, err := conn.Write(optMsg.header[:])
if err != nil {
if droppedFramesCounter != nil {
atomic.AddInt64(droppedFramesCounter, 1)
}
done <- nil
}()
return err
}
// Wait for completion or timeout
select {
case err := <-done:
// Write data if present
if msg.GetLength() > 0 && msg.GetData() != nil {
_, err = conn.Write(msg.GetData())
if err != nil {
if droppedFramesCounter != nil {
atomic.AddInt64(droppedFramesCounter, 1)
}
return err
}
return nil
case <-ctx.Done():
// Timeout occurred - drop frame to prevent blocking
if droppedFramesCounter != nil {
atomic.AddInt64(droppedFramesCounter, 1)
}
return fmt.Errorf("write timeout - frame dropped")
}
// Clear write deadline after successful write
_ = conn.SetWriteDeadline(time.Time{}) // Ignore error as this is cleanup
return nil
}
// Common connection acceptance with retry logic