diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 0d5f6b42..b582eab6 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -10,6 +10,7 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "syscall" "time" ) @@ -19,6 +20,12 @@ type AudioInputSupervisor struct { *BaseSupervisor client *AudioInputClient + // Channel management + stopChan chan struct{} + processDone chan struct{} + stopChanClosed bool // Track if stopChan is closed + processDoneClosed bool // Track if processDone is closed + // Environment variables for OPUS configuration opusEnv []string } @@ -28,6 +35,8 @@ func NewAudioInputSupervisor() *AudioInputSupervisor { return &AudioInputSupervisor{ BaseSupervisor: NewBaseSupervisor("audio-input-supervisor"), client: NewAudioInputClient(), + stopChan: make(chan struct{}), + processDone: make(chan struct{}), } } @@ -48,71 +57,104 @@ func (ais *AudioInputSupervisor) SetOpusConfig(bitrate, complexity, vbr, signalT } } -// Start starts the audio input server subprocess +// Start begins supervising the audio input server process func (ais *AudioInputSupervisor) Start() error { - ais.mutex.Lock() - defer ais.mutex.Unlock() - - if ais.IsRunning() { - if ais.cmd != nil && ais.cmd.Process != nil { - return fmt.Errorf("audio input supervisor already running with PID %d", ais.cmd.Process.Pid) - } - return fmt.Errorf("audio input supervisor already running") + if !atomic.CompareAndSwapInt32(&ais.running, 0, 1) { + return fmt.Errorf("audio input supervisor is already running") } - // Check for existing audio input server process - if existingPID, err := ais.findExistingAudioInputProcess(); err == nil { - ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process, connecting to it") - - // Try to connect to the existing process - ais.setRunning(true) - go ais.connectClient() - return nil - } - - // Create context for subprocess management + ais.logSupervisorStart() ais.createContext() - // Get current executable path + // Recreate channels in case they were closed by a previous Stop() call + ais.mutex.Lock() + ais.processDone = make(chan struct{}) + ais.stopChan = make(chan struct{}) + ais.stopChanClosed = false // Reset channel closed flag + ais.processDoneClosed = false // Reset channel closed flag + ais.mutex.Unlock() + + // Start the supervision loop + go ais.supervisionLoop() + + ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component started successfully") + return nil +} + +// supervisionLoop is the main supervision loop +func (ais *AudioInputSupervisor) supervisionLoop() { + defer func() { + ais.mutex.Lock() + if !ais.processDoneClosed { + close(ais.processDone) + ais.processDoneClosed = true + } + ais.mutex.Unlock() + ais.logger.Info().Msg("audio input server supervision ended") + }() + + for atomic.LoadInt32(&ais.running) == 1 { + select { + case <-ais.stopChan: + ais.logger.Info().Msg("received stop signal") + ais.terminateProcess() + return + case <-ais.ctx.Done(): + ais.logger.Info().Msg("context cancelled") + ais.terminateProcess() + return + default: + // Start the process + if err := ais.startProcess(); err != nil { + ais.logger.Error().Err(err).Msg("failed to start audio input server process") + return + } + + // Wait for process to exit + ais.waitForProcessExit() + return // Single run, no restart logic for now + } + } +} + +// startProcess starts the audio input server process +func (ais *AudioInputSupervisor) startProcess() error { execPath, err := os.Executable() if err != nil { return fmt.Errorf("failed to get executable path: %w", err) } + ais.mutex.Lock() + defer ais.mutex.Unlock() + // Build command arguments (only subprocess flag) args := []string{"--audio-input-server"} - // Create command for audio input server subprocess - cmd := exec.CommandContext(ais.ctx, execPath, args...) + // Create new command + ais.cmd = exec.CommandContext(ais.ctx, execPath, args...) + ais.cmd.Stdout = os.Stdout + ais.cmd.Stderr = os.Stderr // Set environment variables for IPC and OPUS configuration env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode env = append(env, ais.opusEnv...) // Add OPUS configuration - cmd.Env = env + ais.cmd.Env = env // Set process group to allow clean termination - cmd.SysProcAttr = &syscall.SysProcAttr{ + ais.cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, } - ais.cmd = cmd - ais.setRunning(true) - - // Start the subprocess - err = cmd.Start() - if err != nil { - ais.setRunning(false) - ais.cancelContext() + // Start the process + if err := ais.cmd.Start(); err != nil { return fmt.Errorf("failed to start audio input server process: %w", err) } - ais.logger.Info().Int("pid", cmd.Process.Pid).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("Audio input server subprocess started") + ais.processPID = ais.cmd.Process.Pid + ais.logger.Info().Int("pid", ais.processPID).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("audio input server process started") // Add process to monitoring - ais.processMonitor.AddProcess(cmd.Process.Pid, "audio-input-server") - - // Monitor the subprocess in a goroutine - go ais.monitorSubprocess() + ais.processMonitor.AddProcess(ais.processPID, "audio-input-server") // Connect client to the server go ais.connectClient() @@ -120,15 +162,91 @@ func (ais *AudioInputSupervisor) Start() error { return nil } -// Stop stops the audio input server subprocess -func (ais *AudioInputSupervisor) Stop() { - ais.mutex.Lock() - defer ais.mutex.Unlock() +// waitForProcessExit waits for the current process to exit and logs the result +func (ais *AudioInputSupervisor) waitForProcessExit() { + ais.mutex.RLock() + cmd := ais.cmd + pid := ais.processPID + ais.mutex.RUnlock() - if !ais.IsRunning() { + if cmd == nil { return } + // Wait for process to exit + err := cmd.Wait() + + ais.mutex.Lock() + ais.processPID = 0 + ais.mutex.Unlock() + + // Remove process from monitoring + ais.processMonitor.RemoveProcess(pid) + + if err != nil { + ais.logger.Error().Int("pid", pid).Err(err).Msg("audio input server process exited with error") + } else { + ais.logger.Info().Int("pid", pid).Msg("audio input server process exited gracefully") + } +} + +// terminateProcess gracefully terminates the current process +func (ais *AudioInputSupervisor) terminateProcess() { + ais.mutex.RLock() + cmd := ais.cmd + pid := ais.processPID + ais.mutex.RUnlock() + + if cmd == nil || cmd.Process == nil { + return + } + + ais.logger.Info().Int("pid", pid).Msg("terminating audio input server process") + + // Send SIGTERM first + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + ais.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM to audio input server process") + } + + // Wait for graceful shutdown + done := make(chan struct{}) + go func() { + _ = cmd.Wait() + close(done) + }() + + select { + case <-done: + ais.logger.Info().Int("pid", pid).Msg("audio input server process terminated gracefully") + case <-time.After(GetConfig().InputSupervisorTimeout): + ais.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL") + ais.forceKillProcess() + } +} + +// forceKillProcess forcefully kills the current process +func (ais *AudioInputSupervisor) forceKillProcess() { + ais.mutex.RLock() + cmd := ais.cmd + pid := ais.processPID + ais.mutex.RUnlock() + + if cmd == nil || cmd.Process == nil { + return + } + + ais.logger.Warn().Int("pid", pid).Msg("force killing audio input server process") + if err := cmd.Process.Kill(); err != nil { + ais.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process") + } +} + +// Stop gracefully stops the audio input server and supervisor +func (ais *AudioInputSupervisor) Stop() { + if !atomic.CompareAndSwapInt32(&ais.running, 1, 0) { + return // Already stopped + } + ais.logSupervisorStop() // Disconnect client first @@ -136,71 +254,25 @@ func (ais *AudioInputSupervisor) Stop() { ais.client.Disconnect() } - // Cancel context to signal subprocess to stop + // Signal stop and wait for cleanup + ais.mutex.Lock() + if !ais.stopChanClosed { + close(ais.stopChan) + ais.stopChanClosed = true + } + ais.mutex.Unlock() ais.cancelContext() - // Try graceful termination first - if ais.cmd != nil && ais.cmd.Process != nil { - pid := ais.cmd.Process.Pid - ais.logger.Info().Int("pid", pid).Msg("Stopping audio input server subprocess") - - // Send SIGTERM - err := ais.cmd.Process.Signal(syscall.SIGTERM) - if err != nil { - ais.logger.Warn().Err(err).Msg("Failed to send SIGTERM to audio input server") - } - - // Wait for graceful shutdown with timeout - done := make(chan error, 1) - var waitErr error - go func() { - waitErr = ais.cmd.Wait() - done <- waitErr - }() - - select { - case <-done: - if waitErr != nil { - ais.logger.Info().Err(waitErr).Msg("Audio input server subprocess stopped with error") - } else { - ais.logger.Info().Msg("Audio input server subprocess stopped gracefully") - } - case <-time.After(GetConfig().InputSupervisorTimeout): - // Force kill if graceful shutdown failed - ais.logger.Warn().Msg("Audio input server subprocess did not stop gracefully, force killing") - // Use a more robust approach to check if process is still alive - if ais.cmd != nil && ais.cmd.Process != nil { - // Try to send signal 0 to check if process exists - if err := ais.cmd.Process.Signal(syscall.Signal(0)); err == nil { - // Process is still alive, force kill it - if killErr := ais.cmd.Process.Kill(); killErr != nil { - // Only log error if it's not "process already finished" - if !strings.Contains(killErr.Error(), "process already finished") { - ais.logger.Error().Err(killErr).Msg("Failed to kill audio input server subprocess") - } else { - ais.logger.Debug().Msg("Audio input server subprocess already finished during kill attempt") - } - } else { - ais.logger.Info().Msg("Audio input server subprocess force killed successfully") - } - } else { - ais.logger.Debug().Msg("Audio input server subprocess already finished") - } - // Wait a bit for the kill to take effect and collect the exit status - go func() { - select { - case <-done: - // Process finished - case <-time.After(1 * time.Second): - // Give up waiting - } - }() - } - } + // Wait for process to exit + select { + case <-ais.processDone: + ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped gracefully") + case <-time.After(GetConfig().InputSupervisorTimeout): + ais.logger.Warn().Str("component", "audio-input-supervisor").Msg("component did not stop gracefully, forcing termination") + ais.forceKillProcess() } - ais.setRunning(false) - ais.cmd = nil + ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped") } // IsConnected returns whether the client is connected to the audio input server @@ -218,42 +290,6 @@ func (ais *AudioInputSupervisor) GetClient() *AudioInputClient { return ais.client } -// monitorSubprocess monitors the subprocess and handles unexpected exits -func (ais *AudioInputSupervisor) monitorSubprocess() { - if ais.cmd == nil || ais.cmd.Process == nil { - return - } - - pid := ais.cmd.Process.Pid - err := ais.cmd.Wait() - - // Remove process from monitoring - ais.processMonitor.RemoveProcess(pid) - - ais.mutex.Lock() - defer ais.mutex.Unlock() - - if ais.IsRunning() { - // Unexpected exit - if err != nil { - ais.logger.Error().Err(err).Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly") - } else { - ais.logger.Warn().Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly") - } - - // Disconnect client - if ais.client != nil { - ais.client.Disconnect() - } - - // Mark as not running first to prevent race conditions - ais.setRunning(false) - ais.cmd = nil - - ais.logger.Info().Int("pid", pid).Msg("Audio input server subprocess monitoring stopped") - } -} - // connectClient attempts to connect the client to the server func (ais *AudioInputSupervisor) connectClient() { // Wait briefly for the server to start and create socket diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index dfc05389..cac1dedf 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -884,82 +884,83 @@ func (ais *AudioInputServer) startReaderGoroutine() { logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - for { - select { - case <-ais.stopChan: - return - default: - if ais.conn != nil { - msg, err := ais.readMessage(ais.conn) - if err != nil { - // Enhanced error handling with progressive backoff - now := time.Now() + for ais.running { + ais.mtx.Lock() + conn := ais.conn + ais.mtx.Unlock() - // Reset error counter if enough time has passed - if now.Sub(lastErrorTime) > errorResetWindow { - consecutiveErrors = 0 - } + if conn == nil { + time.Sleep(10 * time.Millisecond) + continue + } - consecutiveErrors++ - lastErrorTime = now + msg, err := ais.readMessage(conn) + if err != nil { + if ais.running { + // Enhanced error handling with progressive backoff + now := time.Now() - // Skip logging in hotpath for performance - only log critical errors - - // Progressive backoff based on error count - if consecutiveErrors > 1 { - backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay - if backoffDelay > maxBackoffDelay { - backoffDelay = maxBackoffDelay - } - time.Sleep(backoffDelay) - } - - // If too many consecutive errors, close connection to force reconnect - if consecutiveErrors >= maxConsecutiveErrors { - // Only log critical errors to reduce hotpath overhead - if logger.GetLevel() <= zerolog.ErrorLevel { - logger.Error(). - Int("consecutive_errors", consecutiveErrors). - Msg("Too many consecutive read errors, closing connection") - } - - ais.mtx.Lock() - if ais.conn != nil { - ais.conn.Close() - ais.conn = nil - } - ais.mtx.Unlock() - - consecutiveErrors = 0 // Reset for next connection - } - continue - } - - // Reset error counter on successful read - if consecutiveErrors > 0 { + // Reset error counter if enough time has passed + if now.Sub(lastErrorTime) > errorResetWindow { consecutiveErrors = 0 - // Only log recovery info if debug level enabled to reduce overhead - if logger.GetLevel() <= zerolog.InfoLevel { - logger.Info().Msg("Input connection recovered") - } } - // Send to message channel with non-blocking write - select { - case ais.messageChan <- msg: - atomic.AddInt64(&ais.totalFrames, 1) - default: - // Channel full, drop message - atomic.AddInt64(&ais.droppedFrames, 1) - // Avoid sampling logic in critical path - only log if warn level enabled - if logger.GetLevel() <= zerolog.WarnLevel { - droppedCount := atomic.LoadInt64(&ais.droppedFrames) - logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame") + consecutiveErrors++ + lastErrorTime = now + + // Skip logging in hotpath for performance - only log critical errors + + // Progressive backoff based on error count + if consecutiveErrors > 1 { + backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay + if backoffDelay > maxBackoffDelay { + backoffDelay = maxBackoffDelay } + time.Sleep(backoffDelay) } - } else { - // No connection, wait briefly before checking again - time.Sleep(GetConfig().DefaultSleepDuration) + + // If too many consecutive errors, close connection to force reconnect + if consecutiveErrors >= maxConsecutiveErrors { + // Only log critical errors to reduce hotpath overhead + if logger.GetLevel() <= zerolog.ErrorLevel { + logger.Error(). + Int("consecutive_errors", consecutiveErrors). + Msg("Too many consecutive read errors, closing connection") + } + + ais.mtx.Lock() + if ais.conn != nil { + ais.conn.Close() + ais.conn = nil + } + ais.mtx.Unlock() + + consecutiveErrors = 0 // Reset for next connection + } + } + continue + } + + // Reset error counter on successful read + if consecutiveErrors > 0 { + consecutiveErrors = 0 + // Only log recovery info if debug level enabled to reduce overhead + if logger.GetLevel() <= zerolog.InfoLevel { + logger.Info().Msg("Input connection recovered") + } + } + + // Send to message channel with non-blocking write + select { + case ais.messageChan <- msg: + atomic.AddInt64(&ais.totalFrames, 1) + default: + // Channel full, drop message + atomic.AddInt64(&ais.droppedFrames, 1) + // Avoid sampling logic in critical path - only log if warn level enabled + if logger.GetLevel() <= zerolog.WarnLevel { + droppedCount := atomic.LoadInt64(&ais.droppedFrames) + logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame") } } } diff --git a/internal/audio/mgmt_base_supervisor.go b/internal/audio/mgmt_base_supervisor.go index 3a8c499d..8594055e 100644 --- a/internal/audio/mgmt_base_supervisor.go +++ b/internal/audio/mgmt_base_supervisor.go @@ -48,15 +48,6 @@ func (bs *BaseSupervisor) IsRunning() bool { return atomic.LoadInt32(&bs.running) == 1 } -// setRunning atomically sets the running state -func (bs *BaseSupervisor) setRunning(running bool) { - if running { - atomic.StoreInt32(&bs.running, 1) - } else { - atomic.StoreInt32(&bs.running, 0) - } -} - // GetProcessPID returns the current process PID func (bs *BaseSupervisor) GetProcessPID() int { bs.mutex.RLock()