refactor(audio): improve process management and error handling

- Remove unused setRunning method from BaseSupervisor
- Refactor IPC input reader to use running flag and mutex
- Add atomic state management to InputSupervisor
- Implement proper channel cleanup and process termination
- Improve error handling and logging throughout
This commit is contained in:
Alex P 2025-09-07 19:35:14 +00:00
parent bfdbbdc557
commit 96a6a0f8f9
3 changed files with 245 additions and 217 deletions

View File

@ -10,6 +10,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
)
@ -19,6 +20,12 @@ type AudioInputSupervisor struct {
*BaseSupervisor
client *AudioInputClient
// Channel management
stopChan chan struct{}
processDone chan struct{}
stopChanClosed bool // Track if stopChan is closed
processDoneClosed bool // Track if processDone is closed
// Environment variables for OPUS configuration
opusEnv []string
}
@ -28,6 +35,8 @@ func NewAudioInputSupervisor() *AudioInputSupervisor {
return &AudioInputSupervisor{
BaseSupervisor: NewBaseSupervisor("audio-input-supervisor"),
client: NewAudioInputClient(),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
}
}
@ -48,71 +57,104 @@ func (ais *AudioInputSupervisor) SetOpusConfig(bitrate, complexity, vbr, signalT
}
}
// Start starts the audio input server subprocess
// Start begins supervising the audio input server process
func (ais *AudioInputSupervisor) Start() error {
ais.mutex.Lock()
defer ais.mutex.Unlock()
if ais.IsRunning() {
if ais.cmd != nil && ais.cmd.Process != nil {
return fmt.Errorf("audio input supervisor already running with PID %d", ais.cmd.Process.Pid)
}
return fmt.Errorf("audio input supervisor already running")
if !atomic.CompareAndSwapInt32(&ais.running, 0, 1) {
return fmt.Errorf("audio input supervisor is already running")
}
// Check for existing audio input server process
if existingPID, err := ais.findExistingAudioInputProcess(); err == nil {
ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process, connecting to it")
// Try to connect to the existing process
ais.setRunning(true)
go ais.connectClient()
return nil
}
// Create context for subprocess management
ais.logSupervisorStart()
ais.createContext()
// Get current executable path
// Recreate channels in case they were closed by a previous Stop() call
ais.mutex.Lock()
ais.processDone = make(chan struct{})
ais.stopChan = make(chan struct{})
ais.stopChanClosed = false // Reset channel closed flag
ais.processDoneClosed = false // Reset channel closed flag
ais.mutex.Unlock()
// Start the supervision loop
go ais.supervisionLoop()
ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component started successfully")
return nil
}
// supervisionLoop is the main supervision loop
func (ais *AudioInputSupervisor) supervisionLoop() {
defer func() {
ais.mutex.Lock()
if !ais.processDoneClosed {
close(ais.processDone)
ais.processDoneClosed = true
}
ais.mutex.Unlock()
ais.logger.Info().Msg("audio input server supervision ended")
}()
for atomic.LoadInt32(&ais.running) == 1 {
select {
case <-ais.stopChan:
ais.logger.Info().Msg("received stop signal")
ais.terminateProcess()
return
case <-ais.ctx.Done():
ais.logger.Info().Msg("context cancelled")
ais.terminateProcess()
return
default:
// Start the process
if err := ais.startProcess(); err != nil {
ais.logger.Error().Err(err).Msg("failed to start audio input server process")
return
}
// Wait for process to exit
ais.waitForProcessExit()
return // Single run, no restart logic for now
}
}
}
// startProcess starts the audio input server process
func (ais *AudioInputSupervisor) startProcess() error {
execPath, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to get executable path: %w", err)
}
ais.mutex.Lock()
defer ais.mutex.Unlock()
// Build command arguments (only subprocess flag)
args := []string{"--audio-input-server"}
// Create command for audio input server subprocess
cmd := exec.CommandContext(ais.ctx, execPath, args...)
// Create new command
ais.cmd = exec.CommandContext(ais.ctx, execPath, args...)
ais.cmd.Stdout = os.Stdout
ais.cmd.Stderr = os.Stderr
// Set environment variables for IPC and OPUS configuration
env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode
env = append(env, ais.opusEnv...) // Add OPUS configuration
cmd.Env = env
ais.cmd.Env = env
// Set process group to allow clean termination
cmd.SysProcAttr = &syscall.SysProcAttr{
ais.cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
ais.cmd = cmd
ais.setRunning(true)
// Start the subprocess
err = cmd.Start()
if err != nil {
ais.setRunning(false)
ais.cancelContext()
// Start the process
if err := ais.cmd.Start(); err != nil {
return fmt.Errorf("failed to start audio input server process: %w", err)
}
ais.logger.Info().Int("pid", cmd.Process.Pid).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("Audio input server subprocess started")
ais.processPID = ais.cmd.Process.Pid
ais.logger.Info().Int("pid", ais.processPID).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("audio input server process started")
// Add process to monitoring
ais.processMonitor.AddProcess(cmd.Process.Pid, "audio-input-server")
// Monitor the subprocess in a goroutine
go ais.monitorSubprocess()
ais.processMonitor.AddProcess(ais.processPID, "audio-input-server")
// Connect client to the server
go ais.connectClient()
@ -120,15 +162,91 @@ func (ais *AudioInputSupervisor) Start() error {
return nil
}
// Stop stops the audio input server subprocess
func (ais *AudioInputSupervisor) Stop() {
ais.mutex.Lock()
defer ais.mutex.Unlock()
// waitForProcessExit waits for the current process to exit and logs the result
func (ais *AudioInputSupervisor) waitForProcessExit() {
ais.mutex.RLock()
cmd := ais.cmd
pid := ais.processPID
ais.mutex.RUnlock()
if !ais.IsRunning() {
if cmd == nil {
return
}
// Wait for process to exit
err := cmd.Wait()
ais.mutex.Lock()
ais.processPID = 0
ais.mutex.Unlock()
// Remove process from monitoring
ais.processMonitor.RemoveProcess(pid)
if err != nil {
ais.logger.Error().Int("pid", pid).Err(err).Msg("audio input server process exited with error")
} else {
ais.logger.Info().Int("pid", pid).Msg("audio input server process exited gracefully")
}
}
// terminateProcess gracefully terminates the current process
func (ais *AudioInputSupervisor) terminateProcess() {
ais.mutex.RLock()
cmd := ais.cmd
pid := ais.processPID
ais.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
ais.logger.Info().Int("pid", pid).Msg("terminating audio input server process")
// Send SIGTERM first
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
ais.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM to audio input server process")
}
// Wait for graceful shutdown
done := make(chan struct{})
go func() {
_ = cmd.Wait()
close(done)
}()
select {
case <-done:
ais.logger.Info().Int("pid", pid).Msg("audio input server process terminated gracefully")
case <-time.After(GetConfig().InputSupervisorTimeout):
ais.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL")
ais.forceKillProcess()
}
}
// forceKillProcess forcefully kills the current process
func (ais *AudioInputSupervisor) forceKillProcess() {
ais.mutex.RLock()
cmd := ais.cmd
pid := ais.processPID
ais.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
ais.logger.Warn().Int("pid", pid).Msg("force killing audio input server process")
if err := cmd.Process.Kill(); err != nil {
ais.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process")
}
}
// Stop gracefully stops the audio input server and supervisor
func (ais *AudioInputSupervisor) Stop() {
if !atomic.CompareAndSwapInt32(&ais.running, 1, 0) {
return // Already stopped
}
ais.logSupervisorStop()
// Disconnect client first
@ -136,71 +254,25 @@ func (ais *AudioInputSupervisor) Stop() {
ais.client.Disconnect()
}
// Cancel context to signal subprocess to stop
// Signal stop and wait for cleanup
ais.mutex.Lock()
if !ais.stopChanClosed {
close(ais.stopChan)
ais.stopChanClosed = true
}
ais.mutex.Unlock()
ais.cancelContext()
// Try graceful termination first
if ais.cmd != nil && ais.cmd.Process != nil {
pid := ais.cmd.Process.Pid
ais.logger.Info().Int("pid", pid).Msg("Stopping audio input server subprocess")
// Send SIGTERM
err := ais.cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
ais.logger.Warn().Err(err).Msg("Failed to send SIGTERM to audio input server")
}
// Wait for graceful shutdown with timeout
done := make(chan error, 1)
var waitErr error
go func() {
waitErr = ais.cmd.Wait()
done <- waitErr
}()
select {
case <-done:
if waitErr != nil {
ais.logger.Info().Err(waitErr).Msg("Audio input server subprocess stopped with error")
} else {
ais.logger.Info().Msg("Audio input server subprocess stopped gracefully")
}
case <-time.After(GetConfig().InputSupervisorTimeout):
// Force kill if graceful shutdown failed
ais.logger.Warn().Msg("Audio input server subprocess did not stop gracefully, force killing")
// Use a more robust approach to check if process is still alive
if ais.cmd != nil && ais.cmd.Process != nil {
// Try to send signal 0 to check if process exists
if err := ais.cmd.Process.Signal(syscall.Signal(0)); err == nil {
// Process is still alive, force kill it
if killErr := ais.cmd.Process.Kill(); killErr != nil {
// Only log error if it's not "process already finished"
if !strings.Contains(killErr.Error(), "process already finished") {
ais.logger.Error().Err(killErr).Msg("Failed to kill audio input server subprocess")
} else {
ais.logger.Debug().Msg("Audio input server subprocess already finished during kill attempt")
}
} else {
ais.logger.Info().Msg("Audio input server subprocess force killed successfully")
}
} else {
ais.logger.Debug().Msg("Audio input server subprocess already finished")
}
// Wait a bit for the kill to take effect and collect the exit status
go func() {
select {
case <-done:
// Process finished
case <-time.After(1 * time.Second):
// Give up waiting
}
}()
}
}
// Wait for process to exit
select {
case <-ais.processDone:
ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped gracefully")
case <-time.After(GetConfig().InputSupervisorTimeout):
ais.logger.Warn().Str("component", "audio-input-supervisor").Msg("component did not stop gracefully, forcing termination")
ais.forceKillProcess()
}
ais.setRunning(false)
ais.cmd = nil
ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped")
}
// IsConnected returns whether the client is connected to the audio input server
@ -218,42 +290,6 @@ func (ais *AudioInputSupervisor) GetClient() *AudioInputClient {
return ais.client
}
// monitorSubprocess monitors the subprocess and handles unexpected exits
func (ais *AudioInputSupervisor) monitorSubprocess() {
if ais.cmd == nil || ais.cmd.Process == nil {
return
}
pid := ais.cmd.Process.Pid
err := ais.cmd.Wait()
// Remove process from monitoring
ais.processMonitor.RemoveProcess(pid)
ais.mutex.Lock()
defer ais.mutex.Unlock()
if ais.IsRunning() {
// Unexpected exit
if err != nil {
ais.logger.Error().Err(err).Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly")
} else {
ais.logger.Warn().Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly")
}
// Disconnect client
if ais.client != nil {
ais.client.Disconnect()
}
// Mark as not running first to prevent race conditions
ais.setRunning(false)
ais.cmd = nil
ais.logger.Info().Int("pid", pid).Msg("Audio input server subprocess monitoring stopped")
}
}
// connectClient attempts to connect the client to the server
func (ais *AudioInputSupervisor) connectClient() {
// Wait briefly for the server to start and create socket

View File

@ -884,82 +884,83 @@ func (ais *AudioInputServer) startReaderGoroutine() {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
for {
select {
case <-ais.stopChan:
return
default:
if ais.conn != nil {
msg, err := ais.readMessage(ais.conn)
if err != nil {
// Enhanced error handling with progressive backoff
now := time.Now()
for ais.running {
ais.mtx.Lock()
conn := ais.conn
ais.mtx.Unlock()
// Reset error counter if enough time has passed
if now.Sub(lastErrorTime) > errorResetWindow {
consecutiveErrors = 0
}
if conn == nil {
time.Sleep(10 * time.Millisecond)
continue
}
consecutiveErrors++
lastErrorTime = now
msg, err := ais.readMessage(conn)
if err != nil {
if ais.running {
// Enhanced error handling with progressive backoff
now := time.Now()
// Skip logging in hotpath for performance - only log critical errors
// Progressive backoff based on error count
if consecutiveErrors > 1 {
backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
time.Sleep(backoffDelay)
}
// If too many consecutive errors, close connection to force reconnect
if consecutiveErrors >= maxConsecutiveErrors {
// Only log critical errors to reduce hotpath overhead
if logger.GetLevel() <= zerolog.ErrorLevel {
logger.Error().
Int("consecutive_errors", consecutiveErrors).
Msg("Too many consecutive read errors, closing connection")
}
ais.mtx.Lock()
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
ais.mtx.Unlock()
consecutiveErrors = 0 // Reset for next connection
}
continue
}
// Reset error counter on successful read
if consecutiveErrors > 0 {
// Reset error counter if enough time has passed
if now.Sub(lastErrorTime) > errorResetWindow {
consecutiveErrors = 0
// Only log recovery info if debug level enabled to reduce overhead
if logger.GetLevel() <= zerolog.InfoLevel {
logger.Info().Msg("Input connection recovered")
}
}
// Send to message channel with non-blocking write
select {
case ais.messageChan <- msg:
atomic.AddInt64(&ais.totalFrames, 1)
default:
// Channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
// Avoid sampling logic in critical path - only log if warn level enabled
if logger.GetLevel() <= zerolog.WarnLevel {
droppedCount := atomic.LoadInt64(&ais.droppedFrames)
logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame")
consecutiveErrors++
lastErrorTime = now
// Skip logging in hotpath for performance - only log critical errors
// Progressive backoff based on error count
if consecutiveErrors > 1 {
backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
time.Sleep(backoffDelay)
}
} else {
// No connection, wait briefly before checking again
time.Sleep(GetConfig().DefaultSleepDuration)
// If too many consecutive errors, close connection to force reconnect
if consecutiveErrors >= maxConsecutiveErrors {
// Only log critical errors to reduce hotpath overhead
if logger.GetLevel() <= zerolog.ErrorLevel {
logger.Error().
Int("consecutive_errors", consecutiveErrors).
Msg("Too many consecutive read errors, closing connection")
}
ais.mtx.Lock()
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
ais.mtx.Unlock()
consecutiveErrors = 0 // Reset for next connection
}
}
continue
}
// Reset error counter on successful read
if consecutiveErrors > 0 {
consecutiveErrors = 0
// Only log recovery info if debug level enabled to reduce overhead
if logger.GetLevel() <= zerolog.InfoLevel {
logger.Info().Msg("Input connection recovered")
}
}
// Send to message channel with non-blocking write
select {
case ais.messageChan <- msg:
atomic.AddInt64(&ais.totalFrames, 1)
default:
// Channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
// Avoid sampling logic in critical path - only log if warn level enabled
if logger.GetLevel() <= zerolog.WarnLevel {
droppedCount := atomic.LoadInt64(&ais.droppedFrames)
logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame")
}
}
}

View File

@ -48,15 +48,6 @@ func (bs *BaseSupervisor) IsRunning() bool {
return atomic.LoadInt32(&bs.running) == 1
}
// setRunning atomically sets the running state
func (bs *BaseSupervisor) setRunning(running bool) {
if running {
atomic.StoreInt32(&bs.running, 1)
} else {
atomic.StoreInt32(&bs.running, 0)
}
}
// GetProcessPID returns the current process PID
func (bs *BaseSupervisor) GetProcessPID() int {
bs.mutex.RLock()