refactor(audio): move channel and process management to base supervisor

Consolidate duplicate channel and process management code from input/output supervisors into BaseSupervisor
Add new methods for channel initialization and cleanup
Standardize process termination and monitoring behavior
This commit is contained in:
Alex P 2025-09-07 20:14:33 +00:00
parent 96a6a0f8f9
commit a2a87b46b8
3 changed files with 159 additions and 226 deletions

View File

@ -20,12 +20,6 @@ type AudioInputSupervisor struct {
*BaseSupervisor
client *AudioInputClient
// Channel management
stopChan chan struct{}
processDone chan struct{}
stopChanClosed bool // Track if stopChan is closed
processDoneClosed bool // Track if processDone is closed
// Environment variables for OPUS configuration
opusEnv []string
}
@ -35,8 +29,6 @@ func NewAudioInputSupervisor() *AudioInputSupervisor {
return &AudioInputSupervisor{
BaseSupervisor: NewBaseSupervisor("audio-input-supervisor"),
client: NewAudioInputClient(),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
}
}
@ -67,12 +59,7 @@ func (ais *AudioInputSupervisor) Start() error {
ais.createContext()
// Recreate channels in case they were closed by a previous Stop() call
ais.mutex.Lock()
ais.processDone = make(chan struct{})
ais.stopChan = make(chan struct{})
ais.stopChanClosed = false // Reset channel closed flag
ais.processDoneClosed = false // Reset channel closed flag
ais.mutex.Unlock()
ais.initializeChannels()
// Start the supervision loop
go ais.supervisionLoop()
@ -84,12 +71,7 @@ func (ais *AudioInputSupervisor) Start() error {
// supervisionLoop is the main supervision loop
func (ais *AudioInputSupervisor) supervisionLoop() {
defer func() {
ais.mutex.Lock()
if !ais.processDoneClosed {
close(ais.processDone)
ais.processDoneClosed = true
}
ais.mutex.Unlock()
ais.closeProcessDone()
ais.logger.Info().Msg("audio input server supervision ended")
}()
@ -97,11 +79,11 @@ func (ais *AudioInputSupervisor) supervisionLoop() {
select {
case <-ais.stopChan:
ais.logger.Info().Msg("received stop signal")
ais.terminateProcess()
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
case <-ais.ctx.Done():
ais.logger.Info().Msg("context cancelled")
ais.terminateProcess()
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
default:
// Start the process
@ -111,7 +93,7 @@ func (ais *AudioInputSupervisor) supervisionLoop() {
}
// Wait for process to exit
ais.waitForProcessExit()
ais.waitForProcessExit("audio input server")
return // Single run, no restart logic for now
}
}
@ -162,85 +144,6 @@ func (ais *AudioInputSupervisor) startProcess() error {
return nil
}
// waitForProcessExit waits for the current process to exit and logs the result
func (ais *AudioInputSupervisor) waitForProcessExit() {
ais.mutex.RLock()
cmd := ais.cmd
pid := ais.processPID
ais.mutex.RUnlock()
if cmd == nil {
return
}
// Wait for process to exit
err := cmd.Wait()
ais.mutex.Lock()
ais.processPID = 0
ais.mutex.Unlock()
// Remove process from monitoring
ais.processMonitor.RemoveProcess(pid)
if err != nil {
ais.logger.Error().Int("pid", pid).Err(err).Msg("audio input server process exited with error")
} else {
ais.logger.Info().Int("pid", pid).Msg("audio input server process exited gracefully")
}
}
// terminateProcess gracefully terminates the current process
func (ais *AudioInputSupervisor) terminateProcess() {
ais.mutex.RLock()
cmd := ais.cmd
pid := ais.processPID
ais.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
ais.logger.Info().Int("pid", pid).Msg("terminating audio input server process")
// Send SIGTERM first
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
ais.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM to audio input server process")
}
// Wait for graceful shutdown
done := make(chan struct{})
go func() {
_ = cmd.Wait()
close(done)
}()
select {
case <-done:
ais.logger.Info().Int("pid", pid).Msg("audio input server process terminated gracefully")
case <-time.After(GetConfig().InputSupervisorTimeout):
ais.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL")
ais.forceKillProcess()
}
}
// forceKillProcess forcefully kills the current process
func (ais *AudioInputSupervisor) forceKillProcess() {
ais.mutex.RLock()
cmd := ais.cmd
pid := ais.processPID
ais.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
ais.logger.Warn().Int("pid", pid).Msg("force killing audio input server process")
if err := cmd.Process.Kill(); err != nil {
ais.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process")
}
}
// Stop gracefully stops the audio input server and supervisor
func (ais *AudioInputSupervisor) Stop() {
if !atomic.CompareAndSwapInt32(&ais.running, 1, 0) {
@ -255,12 +158,7 @@ func (ais *AudioInputSupervisor) Stop() {
}
// Signal stop and wait for cleanup
ais.mutex.Lock()
if !ais.stopChanClosed {
close(ais.stopChan)
ais.stopChanClosed = true
}
ais.mutex.Unlock()
ais.closeStopChan()
ais.cancelContext()
// Wait for process to exit
@ -269,7 +167,7 @@ func (ais *AudioInputSupervisor) Stop() {
ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped gracefully")
case <-time.After(GetConfig().InputSupervisorTimeout):
ais.logger.Warn().Str("component", "audio-input-supervisor").Msg("component did not stop gracefully, forcing termination")
ais.forceKillProcess()
ais.forceKillProcess("audio input server")
}
ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped")

View File

@ -8,6 +8,7 @@ import (
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
@ -32,6 +33,12 @@ type BaseSupervisor struct {
// Exit tracking
lastExitCode int
lastExitTime time.Time
// Channel management
stopChan chan struct{}
processDone chan struct{}
stopChanClosed bool
processDoneClosed bool
}
// NewBaseSupervisor creates a new base supervisor
@ -40,6 +47,8 @@ func NewBaseSupervisor(componentName string) *BaseSupervisor {
return &BaseSupervisor{
logger: &logger,
processMonitor: GetProcessMonitor(),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
}
}
@ -83,3 +92,130 @@ func (bs *BaseSupervisor) cancelContext() {
bs.cancel()
}
}
// initializeChannels recreates channels for a new supervision cycle
func (bs *BaseSupervisor) initializeChannels() {
bs.mutex.Lock()
defer bs.mutex.Unlock()
bs.stopChan = make(chan struct{})
bs.processDone = make(chan struct{})
bs.stopChanClosed = false
bs.processDoneClosed = false
}
// closeStopChan safely closes the stop channel
func (bs *BaseSupervisor) closeStopChan() {
bs.mutex.Lock()
defer bs.mutex.Unlock()
if !bs.stopChanClosed {
close(bs.stopChan)
bs.stopChanClosed = true
}
}
// closeProcessDone safely closes the process done channel
func (bs *BaseSupervisor) closeProcessDone() {
bs.mutex.Lock()
defer bs.mutex.Unlock()
if !bs.processDoneClosed {
close(bs.processDone)
bs.processDoneClosed = true
}
}
// terminateProcess gracefully terminates the current process with configurable timeout
func (bs *BaseSupervisor) terminateProcess(timeout time.Duration, processType string) {
bs.mutex.RLock()
cmd := bs.cmd
pid := bs.processPID
bs.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
bs.logger.Info().Int("pid", pid).Msgf("terminating %s process", processType)
// Send SIGTERM first
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
bs.logger.Warn().Err(err).Int("pid", pid).Msgf("failed to send SIGTERM to %s process", processType)
}
// Wait for graceful shutdown
done := make(chan struct{})
go func() {
_ = cmd.Wait()
close(done)
}()
select {
case <-done:
bs.logger.Info().Int("pid", pid).Msgf("%s process terminated gracefully", processType)
case <-time.After(timeout):
bs.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL")
bs.forceKillProcess(processType)
}
}
// forceKillProcess forcefully kills the current process
func (bs *BaseSupervisor) forceKillProcess(processType string) {
bs.mutex.RLock()
cmd := bs.cmd
pid := bs.processPID
bs.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
bs.logger.Warn().Int("pid", pid).Msgf("force killing %s process", processType)
if err := cmd.Process.Kill(); err != nil {
bs.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process")
}
}
// waitForProcessExit waits for the current process to exit and logs the result
func (bs *BaseSupervisor) waitForProcessExit(processType string) {
bs.mutex.RLock()
cmd := bs.cmd
pid := bs.processPID
bs.mutex.RUnlock()
if cmd == nil {
return
}
// Wait for process to exit
err := cmd.Wait()
bs.mutex.Lock()
bs.lastExitTime = time.Now()
bs.processPID = 0
var exitCode int
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = exitError.ExitCode()
} else {
// Process was killed or other error
exitCode = -1
}
} else {
exitCode = 0
}
bs.lastExitCode = exitCode
bs.mutex.Unlock()
// Remove process from monitoring
bs.processMonitor.RemoveProcess(pid)
if exitCode != 0 {
bs.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msgf("%s process exited with error", processType)
} else {
bs.logger.Info().Int("pid", pid).Msgf("%s process exited gracefully", processType)
}
}

View File

@ -9,7 +9,6 @@ import (
"os/exec"
"strconv"
"sync/atomic"
"syscall"
"time"
"github.com/rs/zerolog"
@ -44,12 +43,6 @@ type AudioOutputSupervisor struct {
// Restart management
restartAttempts []time.Time
// Channel management
stopChan chan struct{}
processDone chan struct{}
stopChanClosed bool // Track if stopChan is closed
processDoneClosed bool // Track if processDone is closed
// Environment variables for OPUS configuration
opusEnv []string
@ -64,8 +57,6 @@ func NewAudioOutputSupervisor() *AudioOutputSupervisor {
return &AudioOutputSupervisor{
BaseSupervisor: NewBaseSupervisor("audio-output-supervisor"),
restartAttempts: make([]time.Time, 0),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
}
}
@ -110,12 +101,10 @@ func (s *AudioOutputSupervisor) Start() error {
s.createContext()
// Recreate channels in case they were closed by a previous Stop() call
s.mutex.Lock()
s.processDone = make(chan struct{})
s.stopChan = make(chan struct{})
s.stopChanClosed = false // Reset channel closed flag
s.processDoneClosed = false // Reset channel closed flag
s.initializeChannels()
// Reset restart tracking on start
s.mutex.Lock()
s.restartAttempts = s.restartAttempts[:0]
s.mutex.Unlock()
@ -135,12 +124,7 @@ func (s *AudioOutputSupervisor) Stop() {
s.logSupervisorStop()
// Signal stop and wait for cleanup
s.mutex.Lock()
if !s.stopChanClosed {
close(s.stopChan)
s.stopChanClosed = true
}
s.mutex.Unlock()
s.closeStopChan()
s.cancelContext()
// Wait for process to exit
@ -149,7 +133,7 @@ func (s *AudioOutputSupervisor) Stop() {
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped gracefully")
case <-time.After(GetConfig().OutputSupervisorTimeout):
s.logger.Warn().Str("component", AudioOutputSupervisorComponent).Msg("component did not stop gracefully, forcing termination")
s.forceKillProcess()
s.forceKillProcess("audio output server")
}
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped")
@ -158,12 +142,7 @@ func (s *AudioOutputSupervisor) Stop() {
// supervisionLoop is the main supervision loop
func (s *AudioOutputSupervisor) supervisionLoop() {
defer func() {
s.mutex.Lock()
if !s.processDoneClosed {
close(s.processDone)
s.processDoneClosed = true
}
s.mutex.Unlock()
s.closeProcessDone()
s.logger.Info().Msg("audio server supervision ended")
}()
@ -171,11 +150,11 @@ func (s *AudioOutputSupervisor) supervisionLoop() {
select {
case <-s.stopChan:
s.logger.Info().Msg("received stop signal")
s.terminateProcess()
s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return
case <-s.ctx.Done():
s.logger.Info().Msg("context cancelled")
s.terminateProcess()
s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return
default:
// Start or restart the process
@ -282,52 +261,23 @@ func (s *AudioOutputSupervisor) startProcess() error {
return nil
}
// waitForProcessExit waits for the current process to exit and logs the result
// waitForProcessExit waits for the current process to exit and handles restart logic
func (s *AudioOutputSupervisor) waitForProcessExit() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil {
return
}
// Use base supervisor's waitForProcessExit
s.BaseSupervisor.waitForProcessExit("audio output server")
// Wait for process to exit
err := cmd.Wait()
s.mutex.Lock()
s.lastExitTime = time.Now()
s.processPID = 0
var exitCode int
var crashed bool
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = exitError.ExitCode()
crashed = exitCode != 0
} else {
// Process was killed or other error
exitCode = -1
crashed = true
}
} else {
exitCode = 0
crashed = false
}
s.lastExitCode = exitCode
s.mutex.Unlock()
// Remove process from monitoring
s.processMonitor.RemoveProcess(pid)
// Handle output-specific logic (restart tracking and callbacks)
s.mutex.RLock()
exitCode := s.lastExitCode
s.mutex.RUnlock()
crashed := exitCode != 0
if crashed {
s.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio output server process crashed")
s.recordRestartAttempt()
} else {
s.logger.Info().Int("pid", pid).Msg("audio output server process exited gracefully")
}
if s.onProcessExit != nil {
@ -335,57 +285,6 @@ func (s *AudioOutputSupervisor) waitForProcessExit() {
}
}
// terminateProcess gracefully terminates the current process
func (s *AudioOutputSupervisor) terminateProcess() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
s.logger.Info().Int("pid", pid).Msg("terminating audio output server process")
// Send SIGTERM first
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
s.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM to audio output server process")
}
// Wait for graceful shutdown
done := make(chan struct{})
go func() {
_ = cmd.Wait()
close(done)
}()
select {
case <-done:
s.logger.Info().Int("pid", pid).Msg("audio server process terminated gracefully")
case <-time.After(GetConfig().OutputSupervisorTimeout):
s.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL")
s.forceKillProcess()
}
}
// forceKillProcess forcefully kills the current process
func (s *AudioOutputSupervisor) forceKillProcess() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
s.logger.Warn().Int("pid", pid).Msg("force killing audio server process")
if err := cmd.Process.Kill(); err != nil {
s.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process")
}
}
// shouldRestart determines if the process should be restarted
func (s *AudioOutputSupervisor) shouldRestart() bool {
if atomic.LoadInt32(&s.running) == 0 {