diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index b582eab6..6fcd70c5 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -20,12 +20,6 @@ type AudioInputSupervisor struct { *BaseSupervisor client *AudioInputClient - // Channel management - stopChan chan struct{} - processDone chan struct{} - stopChanClosed bool // Track if stopChan is closed - processDoneClosed bool // Track if processDone is closed - // Environment variables for OPUS configuration opusEnv []string } @@ -35,8 +29,6 @@ func NewAudioInputSupervisor() *AudioInputSupervisor { return &AudioInputSupervisor{ BaseSupervisor: NewBaseSupervisor("audio-input-supervisor"), client: NewAudioInputClient(), - stopChan: make(chan struct{}), - processDone: make(chan struct{}), } } @@ -67,12 +59,7 @@ func (ais *AudioInputSupervisor) Start() error { ais.createContext() // Recreate channels in case they were closed by a previous Stop() call - ais.mutex.Lock() - ais.processDone = make(chan struct{}) - ais.stopChan = make(chan struct{}) - ais.stopChanClosed = false // Reset channel closed flag - ais.processDoneClosed = false // Reset channel closed flag - ais.mutex.Unlock() + ais.initializeChannels() // Start the supervision loop go ais.supervisionLoop() @@ -84,12 +71,7 @@ func (ais *AudioInputSupervisor) Start() error { // supervisionLoop is the main supervision loop func (ais *AudioInputSupervisor) supervisionLoop() { defer func() { - ais.mutex.Lock() - if !ais.processDoneClosed { - close(ais.processDone) - ais.processDoneClosed = true - } - ais.mutex.Unlock() + ais.closeProcessDone() ais.logger.Info().Msg("audio input server supervision ended") }() @@ -97,11 +79,11 @@ func (ais *AudioInputSupervisor) supervisionLoop() { select { case <-ais.stopChan: ais.logger.Info().Msg("received stop signal") - ais.terminateProcess() + ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server") return case <-ais.ctx.Done(): ais.logger.Info().Msg("context cancelled") - ais.terminateProcess() + ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server") return default: // Start the process @@ -111,7 +93,7 @@ func (ais *AudioInputSupervisor) supervisionLoop() { } // Wait for process to exit - ais.waitForProcessExit() + ais.waitForProcessExit("audio input server") return // Single run, no restart logic for now } } @@ -162,85 +144,6 @@ func (ais *AudioInputSupervisor) startProcess() error { return nil } -// waitForProcessExit waits for the current process to exit and logs the result -func (ais *AudioInputSupervisor) waitForProcessExit() { - ais.mutex.RLock() - cmd := ais.cmd - pid := ais.processPID - ais.mutex.RUnlock() - - if cmd == nil { - return - } - - // Wait for process to exit - err := cmd.Wait() - - ais.mutex.Lock() - ais.processPID = 0 - ais.mutex.Unlock() - - // Remove process from monitoring - ais.processMonitor.RemoveProcess(pid) - - if err != nil { - ais.logger.Error().Int("pid", pid).Err(err).Msg("audio input server process exited with error") - } else { - ais.logger.Info().Int("pid", pid).Msg("audio input server process exited gracefully") - } -} - -// terminateProcess gracefully terminates the current process -func (ais *AudioInputSupervisor) terminateProcess() { - ais.mutex.RLock() - cmd := ais.cmd - pid := ais.processPID - ais.mutex.RUnlock() - - if cmd == nil || cmd.Process == nil { - return - } - - ais.logger.Info().Int("pid", pid).Msg("terminating audio input server process") - - // Send SIGTERM first - if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { - ais.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM to audio input server process") - } - - // Wait for graceful shutdown - done := make(chan struct{}) - go func() { - _ = cmd.Wait() - close(done) - }() - - select { - case <-done: - ais.logger.Info().Int("pid", pid).Msg("audio input server process terminated gracefully") - case <-time.After(GetConfig().InputSupervisorTimeout): - ais.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL") - ais.forceKillProcess() - } -} - -// forceKillProcess forcefully kills the current process -func (ais *AudioInputSupervisor) forceKillProcess() { - ais.mutex.RLock() - cmd := ais.cmd - pid := ais.processPID - ais.mutex.RUnlock() - - if cmd == nil || cmd.Process == nil { - return - } - - ais.logger.Warn().Int("pid", pid).Msg("force killing audio input server process") - if err := cmd.Process.Kill(); err != nil { - ais.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process") - } -} - // Stop gracefully stops the audio input server and supervisor func (ais *AudioInputSupervisor) Stop() { if !atomic.CompareAndSwapInt32(&ais.running, 1, 0) { @@ -255,12 +158,7 @@ func (ais *AudioInputSupervisor) Stop() { } // Signal stop and wait for cleanup - ais.mutex.Lock() - if !ais.stopChanClosed { - close(ais.stopChan) - ais.stopChanClosed = true - } - ais.mutex.Unlock() + ais.closeStopChan() ais.cancelContext() // Wait for process to exit @@ -269,7 +167,7 @@ func (ais *AudioInputSupervisor) Stop() { ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped gracefully") case <-time.After(GetConfig().InputSupervisorTimeout): ais.logger.Warn().Str("component", "audio-input-supervisor").Msg("component did not stop gracefully, forcing termination") - ais.forceKillProcess() + ais.forceKillProcess("audio input server") } ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped") diff --git a/internal/audio/mgmt_base_supervisor.go b/internal/audio/mgmt_base_supervisor.go index 8594055e..f163eb13 100644 --- a/internal/audio/mgmt_base_supervisor.go +++ b/internal/audio/mgmt_base_supervisor.go @@ -8,6 +8,7 @@ import ( "os/exec" "sync" "sync/atomic" + "syscall" "time" "github.com/jetkvm/kvm/internal/logging" @@ -32,6 +33,12 @@ type BaseSupervisor struct { // Exit tracking lastExitCode int lastExitTime time.Time + + // Channel management + stopChan chan struct{} + processDone chan struct{} + stopChanClosed bool + processDoneClosed bool } // NewBaseSupervisor creates a new base supervisor @@ -40,6 +47,8 @@ func NewBaseSupervisor(componentName string) *BaseSupervisor { return &BaseSupervisor{ logger: &logger, processMonitor: GetProcessMonitor(), + stopChan: make(chan struct{}), + processDone: make(chan struct{}), } } @@ -83,3 +92,130 @@ func (bs *BaseSupervisor) cancelContext() { bs.cancel() } } + +// initializeChannels recreates channels for a new supervision cycle +func (bs *BaseSupervisor) initializeChannels() { + bs.mutex.Lock() + defer bs.mutex.Unlock() + + bs.stopChan = make(chan struct{}) + bs.processDone = make(chan struct{}) + bs.stopChanClosed = false + bs.processDoneClosed = false +} + +// closeStopChan safely closes the stop channel +func (bs *BaseSupervisor) closeStopChan() { + bs.mutex.Lock() + defer bs.mutex.Unlock() + + if !bs.stopChanClosed { + close(bs.stopChan) + bs.stopChanClosed = true + } +} + +// closeProcessDone safely closes the process done channel +func (bs *BaseSupervisor) closeProcessDone() { + bs.mutex.Lock() + defer bs.mutex.Unlock() + + if !bs.processDoneClosed { + close(bs.processDone) + bs.processDoneClosed = true + } +} + +// terminateProcess gracefully terminates the current process with configurable timeout +func (bs *BaseSupervisor) terminateProcess(timeout time.Duration, processType string) { + bs.mutex.RLock() + cmd := bs.cmd + pid := bs.processPID + bs.mutex.RUnlock() + + if cmd == nil || cmd.Process == nil { + return + } + + bs.logger.Info().Int("pid", pid).Msgf("terminating %s process", processType) + + // Send SIGTERM first + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + bs.logger.Warn().Err(err).Int("pid", pid).Msgf("failed to send SIGTERM to %s process", processType) + } + + // Wait for graceful shutdown + done := make(chan struct{}) + go func() { + _ = cmd.Wait() + close(done) + }() + + select { + case <-done: + bs.logger.Info().Int("pid", pid).Msgf("%s process terminated gracefully", processType) + case <-time.After(timeout): + bs.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL") + bs.forceKillProcess(processType) + } +} + +// forceKillProcess forcefully kills the current process +func (bs *BaseSupervisor) forceKillProcess(processType string) { + bs.mutex.RLock() + cmd := bs.cmd + pid := bs.processPID + bs.mutex.RUnlock() + + if cmd == nil || cmd.Process == nil { + return + } + + bs.logger.Warn().Int("pid", pid).Msgf("force killing %s process", processType) + if err := cmd.Process.Kill(); err != nil { + bs.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process") + } +} + +// waitForProcessExit waits for the current process to exit and logs the result +func (bs *BaseSupervisor) waitForProcessExit(processType string) { + bs.mutex.RLock() + cmd := bs.cmd + pid := bs.processPID + bs.mutex.RUnlock() + + if cmd == nil { + return + } + + // Wait for process to exit + err := cmd.Wait() + + bs.mutex.Lock() + bs.lastExitTime = time.Now() + bs.processPID = 0 + + var exitCode int + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + exitCode = exitError.ExitCode() + } else { + // Process was killed or other error + exitCode = -1 + } + } else { + exitCode = 0 + } + + bs.lastExitCode = exitCode + bs.mutex.Unlock() + + // Remove process from monitoring + bs.processMonitor.RemoveProcess(pid) + + if exitCode != 0 { + bs.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msgf("%s process exited with error", processType) + } else { + bs.logger.Info().Int("pid", pid).Msgf("%s process exited gracefully", processType) + } +} diff --git a/internal/audio/output_supervisor.go b/internal/audio/output_supervisor.go index 9079a98f..31cdac10 100644 --- a/internal/audio/output_supervisor.go +++ b/internal/audio/output_supervisor.go @@ -9,7 +9,6 @@ import ( "os/exec" "strconv" "sync/atomic" - "syscall" "time" "github.com/rs/zerolog" @@ -44,12 +43,6 @@ type AudioOutputSupervisor struct { // Restart management restartAttempts []time.Time - // Channel management - stopChan chan struct{} - processDone chan struct{} - stopChanClosed bool // Track if stopChan is closed - processDoneClosed bool // Track if processDone is closed - // Environment variables for OPUS configuration opusEnv []string @@ -64,8 +57,6 @@ func NewAudioOutputSupervisor() *AudioOutputSupervisor { return &AudioOutputSupervisor{ BaseSupervisor: NewBaseSupervisor("audio-output-supervisor"), restartAttempts: make([]time.Time, 0), - stopChan: make(chan struct{}), - processDone: make(chan struct{}), } } @@ -110,12 +101,10 @@ func (s *AudioOutputSupervisor) Start() error { s.createContext() // Recreate channels in case they were closed by a previous Stop() call - s.mutex.Lock() - s.processDone = make(chan struct{}) - s.stopChan = make(chan struct{}) - s.stopChanClosed = false // Reset channel closed flag - s.processDoneClosed = false // Reset channel closed flag + s.initializeChannels() + // Reset restart tracking on start + s.mutex.Lock() s.restartAttempts = s.restartAttempts[:0] s.mutex.Unlock() @@ -135,12 +124,7 @@ func (s *AudioOutputSupervisor) Stop() { s.logSupervisorStop() // Signal stop and wait for cleanup - s.mutex.Lock() - if !s.stopChanClosed { - close(s.stopChan) - s.stopChanClosed = true - } - s.mutex.Unlock() + s.closeStopChan() s.cancelContext() // Wait for process to exit @@ -149,7 +133,7 @@ func (s *AudioOutputSupervisor) Stop() { s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped gracefully") case <-time.After(GetConfig().OutputSupervisorTimeout): s.logger.Warn().Str("component", AudioOutputSupervisorComponent).Msg("component did not stop gracefully, forcing termination") - s.forceKillProcess() + s.forceKillProcess("audio output server") } s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped") @@ -158,12 +142,7 @@ func (s *AudioOutputSupervisor) Stop() { // supervisionLoop is the main supervision loop func (s *AudioOutputSupervisor) supervisionLoop() { defer func() { - s.mutex.Lock() - if !s.processDoneClosed { - close(s.processDone) - s.processDoneClosed = true - } - s.mutex.Unlock() + s.closeProcessDone() s.logger.Info().Msg("audio server supervision ended") }() @@ -171,11 +150,11 @@ func (s *AudioOutputSupervisor) supervisionLoop() { select { case <-s.stopChan: s.logger.Info().Msg("received stop signal") - s.terminateProcess() + s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server") return case <-s.ctx.Done(): s.logger.Info().Msg("context cancelled") - s.terminateProcess() + s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server") return default: // Start or restart the process @@ -282,52 +261,23 @@ func (s *AudioOutputSupervisor) startProcess() error { return nil } -// waitForProcessExit waits for the current process to exit and logs the result +// waitForProcessExit waits for the current process to exit and handles restart logic func (s *AudioOutputSupervisor) waitForProcessExit() { s.mutex.RLock() - cmd := s.cmd pid := s.processPID s.mutex.RUnlock() - if cmd == nil { - return - } + // Use base supervisor's waitForProcessExit + s.BaseSupervisor.waitForProcessExit("audio output server") - // Wait for process to exit - err := cmd.Wait() - - s.mutex.Lock() - s.lastExitTime = time.Now() - s.processPID = 0 - - var exitCode int - var crashed bool - - if err != nil { - if exitError, ok := err.(*exec.ExitError); ok { - exitCode = exitError.ExitCode() - crashed = exitCode != 0 - } else { - // Process was killed or other error - exitCode = -1 - crashed = true - } - } else { - exitCode = 0 - crashed = false - } - - s.lastExitCode = exitCode - s.mutex.Unlock() - - // Remove process from monitoring - s.processMonitor.RemoveProcess(pid) + // Handle output-specific logic (restart tracking and callbacks) + s.mutex.RLock() + exitCode := s.lastExitCode + s.mutex.RUnlock() + crashed := exitCode != 0 if crashed { - s.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio output server process crashed") s.recordRestartAttempt() - } else { - s.logger.Info().Int("pid", pid).Msg("audio output server process exited gracefully") } if s.onProcessExit != nil { @@ -335,57 +285,6 @@ func (s *AudioOutputSupervisor) waitForProcessExit() { } } -// terminateProcess gracefully terminates the current process -func (s *AudioOutputSupervisor) terminateProcess() { - s.mutex.RLock() - cmd := s.cmd - pid := s.processPID - s.mutex.RUnlock() - - if cmd == nil || cmd.Process == nil { - return - } - - s.logger.Info().Int("pid", pid).Msg("terminating audio output server process") - - // Send SIGTERM first - if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { - s.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM to audio output server process") - } - - // Wait for graceful shutdown - done := make(chan struct{}) - go func() { - _ = cmd.Wait() - close(done) - }() - - select { - case <-done: - s.logger.Info().Int("pid", pid).Msg("audio server process terminated gracefully") - case <-time.After(GetConfig().OutputSupervisorTimeout): - s.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL") - s.forceKillProcess() - } -} - -// forceKillProcess forcefully kills the current process -func (s *AudioOutputSupervisor) forceKillProcess() { - s.mutex.RLock() - cmd := s.cmd - pid := s.processPID - s.mutex.RUnlock() - - if cmd == nil || cmd.Process == nil { - return - } - - s.logger.Warn().Int("pid", pid).Msg("force killing audio server process") - if err := cmd.Process.Kill(); err != nil { - s.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process") - } -} - // shouldRestart determines if the process should be restarted func (s *AudioOutputSupervisor) shouldRestart() bool { if atomic.LoadInt32(&s.running) == 0 {