Compare commits

...

5 Commits

Author SHA1 Message Date
Alex P a2a87b46b8 refactor(audio): move channel and process management to base supervisor
Consolidate duplicate channel and process management code from input/output supervisors into BaseSupervisor
Add new methods for channel initialization and cleanup
Standardize process termination and monitoring behavior
2025-09-07 20:14:33 +00:00
Alex P 96a6a0f8f9 refactor(audio): improve process management and error handling
- Remove unused setRunning method from BaseSupervisor
- Refactor IPC input reader to use running flag and mutex
- Add atomic state management to InputSupervisor
- Implement proper channel cleanup and process termination
- Improve error handling and logging throughout
2025-09-07 19:35:14 +00:00
Alex P bfdbbdc557 [WIP] Simplification 2025-09-07 19:13:35 +00:00
Alex P e3b4bb2002 refactor(audio): replace mute functionality with start/stop for microphone
- Replace MuteMicrophone calls with StartMicrophone/StopMicrophone for clearer behavior
- Update microphone state broadcasting to reflect actual subprocess status
- Modify UI to use enable/disable terminology instead of mute/unmute
- Ensure microphone device changes properly restart the active microphone
2025-09-07 18:32:42 +00:00
Alex P 7d39a2741e [WIP] Improvements: improve Audio Input Activation / Deactivation process so it is faster 2025-09-07 16:17:06 +00:00
10 changed files with 408 additions and 445 deletions

View File

@ -42,13 +42,13 @@ func StartAudioOutputAndAddTracks() error {
// StopMicrophoneAndRemoveTracks is a global helper to stop microphone subprocess and remove WebRTC tracks // StopMicrophoneAndRemoveTracks is a global helper to stop microphone subprocess and remove WebRTC tracks
func StopMicrophoneAndRemoveTracks() error { func StopMicrophoneAndRemoveTracks() error {
initAudioControlService() initAudioControlService()
return audioControlService.MuteMicrophone(true) return audioControlService.StopMicrophone()
} }
// StartMicrophoneAndAddTracks is a global helper to start microphone subprocess and add WebRTC tracks // StartMicrophoneAndAddTracks is a global helper to start microphone subprocess and add WebRTC tracks
func StartMicrophoneAndAddTracks() error { func StartMicrophoneAndAddTracks() error {
initAudioControlService() initAudioControlService()
return audioControlService.MuteMicrophone(false) return audioControlService.StartMicrophone()
} }
// IsAudioOutputActive is a global helper to check if audio output subprocess is running // IsAudioOutputActive is a global helper to check if audio output subprocess is running

View File

@ -9,6 +9,11 @@ var audioMuteState struct {
mu sync.RWMutex mu sync.RWMutex
} }
var microphoneMuteState struct {
muted bool
mu sync.RWMutex
}
func SetAudioMuted(muted bool) { func SetAudioMuted(muted bool) {
audioMuteState.mu.Lock() audioMuteState.mu.Lock()
audioMuteState.muted = muted audioMuteState.muted = muted
@ -20,3 +25,15 @@ func IsAudioMuted() bool {
defer audioMuteState.mu.RUnlock() defer audioMuteState.mu.RUnlock()
return audioMuteState.muted return audioMuteState.muted
} }
func SetMicrophoneMuted(muted bool) {
microphoneMuteState.mu.Lock()
microphoneMuteState.muted = muted
microphoneMuteState.mu.Unlock()
}
func IsMicrophoneMuted() bool {
microphoneMuteState.mu.RLock()
defer microphoneMuteState.mu.RUnlock()
return microphoneMuteState.muted
}

View File

@ -671,17 +671,6 @@ type AudioConfigConstants struct {
// Default 500ms provides adequate buffer monitoring without excessive overhead. // Default 500ms provides adequate buffer monitoring without excessive overhead.
BufferUpdateInterval time.Duration // 500ms BufferUpdateInterval time.Duration // 500ms
// StatsUpdateInterval defines frequency of statistics collection and reporting.
// Used in: core_metrics.go for performance statistics updates
// Impact: More frequent updates provide better monitoring but increase overhead.
// Default 5s provides comprehensive statistics without performance impact.
// SupervisorTimeout defines timeout for supervisor process operations.
// Used in: output_supervisor.go for process monitoring and control
// Impact: Shorter timeouts improve responsiveness but may cause false timeouts.
// Default 10s provides adequate time for supervisor operations.
SupervisorTimeout time.Duration // 10s
// InputSupervisorTimeout defines timeout for input supervisor operations. // InputSupervisorTimeout defines timeout for input supervisor operations.
// Used in: input_supervisor.go for input process monitoring // Used in: input_supervisor.go for input process monitoring
// Impact: Shorter timeouts improve input responsiveness but may cause false timeouts. // Impact: Shorter timeouts improve input responsiveness but may cause false timeouts.
@ -694,16 +683,6 @@ type AudioConfigConstants struct {
// Default 5s provides responsive output monitoring. // Default 5s provides responsive output monitoring.
OutputSupervisorTimeout time.Duration // 5s OutputSupervisorTimeout time.Duration // 5s
// ShortTimeout defines brief timeout for time-critical operations.
// Used in: Real-time audio processing for minimal timeout scenarios
// Impact: Very short timeouts ensure responsiveness but may cause premature failures.
// Default 5ms provides minimal timeout for critical operations.
// MediumTimeout defines moderate timeout for standard operations.
// Used in: Standard audio processing operations
// Impact: Balances responsiveness with operation completion time.
// Default 50ms provides good balance for most audio operations.
// BatchProcessingDelay defines delay between batch processing operations. // BatchProcessingDelay defines delay between batch processing operations.
// Used in: batch_audio.go for controlling batch processing timing // Used in: batch_audio.go for controlling batch processing timing
// Impact: Shorter delays improve throughput but increase CPU usage. // Impact: Shorter delays improve throughput but increase CPU usage.
@ -816,25 +795,10 @@ type AudioConfigConstants struct {
// Default 200ms provides reasonable wait time for microphone access. // Default 200ms provides reasonable wait time for microphone access.
MicContentionTimeout time.Duration // 200ms contention timeout MicContentionTimeout time.Duration // 200ms contention timeout
// Subprocess Pre-warming Configuration
// Used in: input_supervisor.go for reducing microphone activation latency
// Impact: Pre-warms audio input subprocess during startup to eliminate cold start delay
// Default true enables pre-warming for optimal user experience
// Priority Scheduler Configuration - Settings for process priority management // Priority Scheduler Configuration - Settings for process priority management
// Used in: priority_scheduler.go for system priority control // Used in: priority_scheduler.go for system priority control
// Impact: Controls valid range for process priority adjustments // Impact: Controls valid range for process priority adjustments
// MinNiceValue defines minimum (highest priority) nice value.
// Used in: priority_scheduler.go for priority validation
// Impact: Lower values allow higher priority but may affect system stability.
// Default -20 provides maximum priority elevation capability.
// MaxNiceValue defines maximum (lowest priority) nice value.
// Used in: priority_scheduler.go for priority validation
// Impact: Higher values allow lower priority for background tasks.
// Default 19 provides maximum priority reduction capability.
// Buffer Pool Configuration - Settings for memory pool preallocation // Buffer Pool Configuration - Settings for memory pool preallocation
// Used in: util_buffer_pool.go for memory pool management // Used in: util_buffer_pool.go for memory pool management
// Impact: Controls memory preallocation strategy and efficiency // Impact: Controls memory preallocation strategy and efficiency
@ -845,34 +809,14 @@ type AudioConfigConstants struct {
// Default 20% provides good balance between performance and memory efficiency. // Default 20% provides good balance between performance and memory efficiency.
PreallocPercentage int // 20% preallocation percentage PreallocPercentage int // 20% preallocation percentage
// InputPreallocPercentage defines percentage of input buffers to preallocate.
// Used in: util_buffer_pool.go for input-specific memory pool sizing
// Impact: Higher values improve input performance but increase memory usage.
// Default 30% provides enhanced input performance with reasonable memory usage.
// Exponential Moving Average Configuration - Settings for statistical smoothing // Exponential Moving Average Configuration - Settings for statistical smoothing
// Used in: core_metrics.go and various monitoring components // Used in: core_metrics.go and various monitoring components
// Impact: Controls smoothing behavior for performance metrics // Impact: Controls smoothing behavior for performance metrics
// HistoricalWeight defines weight given to historical data in EMA calculations. // Backoff Configuration - Settings for timing and retry behavior
// Used in: core_metrics.go for exponential moving average calculations
// Impact: Higher values provide more stable metrics but slower response to changes.
// Default 70% provides good stability while maintaining responsiveness.
// CurrentWeight defines weight given to current data in EMA calculations.
// Used in: metrics.go for exponential moving average calculations
// Impact: Higher values provide faster response but less stability.
// Default 30% complements historical weight for balanced EMA calculation.
// Sleep and Backoff Configuration - Settings for timing and retry behavior
// Used in: Various components for timing control and retry logic // Used in: Various components for timing control and retry logic
// Impact: Controls system timing behavior and retry strategies // Impact: Controls system timing behavior and retry strategies
// CGOSleepMicroseconds defines sleep duration for CGO operations.
// Used in: cgo_audio.go for CGO operation timing
// Impact: Longer sleeps reduce CPU usage but may increase latency.
// Default 50000 microseconds (50ms) provides good balance for CGO operations.
// BackoffStart defines initial backoff duration for retry operations. // BackoffStart defines initial backoff duration for retry operations.
// Used in: retry_manager.go for exponential backoff initialization // Used in: retry_manager.go for exponential backoff initialization
// Impact: Longer initial backoff reduces immediate retry pressure. // Impact: Longer initial backoff reduces immediate retry pressure.
@ -1974,12 +1918,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
// Default 500ms allows buffer conditions to stabilize before adjustments // Default 500ms allows buffer conditions to stabilize before adjustments
BufferUpdateInterval: 500 * time.Millisecond, BufferUpdateInterval: 500 * time.Millisecond,
// SupervisorTimeout defines timeout for supervisor operations (10s).
// Used in: Process supervision, health monitoring, restart logic
// Impact: Controls how long to wait before considering operations failed
// Default 10s allows for slow operations while preventing indefinite hangs
SupervisorTimeout: 10 * time.Second,
// InputSupervisorTimeout defines timeout for input supervision (5s). // InputSupervisorTimeout defines timeout for input supervision (5s).
// Used in: Input process monitoring, microphone supervision // Used in: Input process monitoring, microphone supervision
// Impact: Controls responsiveness of input failure detection // Impact: Controls responsiveness of input failure detection

View File

@ -95,6 +95,12 @@ func (s *AudioControlService) StartMicrophone() error {
} }
s.logger.Info().Msg("microphone started successfully") s.logger.Info().Msg("microphone started successfully")
// Broadcast microphone state change via WebSocket
broadcaster := GetAudioEventBroadcaster()
sessionActive := s.sessionProvider.IsSessionActive()
broadcaster.BroadcastMicrophoneStateChanged(true, sessionActive)
return nil return nil
} }
@ -116,32 +122,60 @@ func (s *AudioControlService) StopMicrophone() error {
audioInputManager.Stop() audioInputManager.Stop()
s.logger.Info().Msg("microphone stopped successfully") s.logger.Info().Msg("microphone stopped successfully")
// Broadcast microphone state change via WebSocket
broadcaster := GetAudioEventBroadcaster()
sessionActive := s.sessionProvider.IsSessionActive()
broadcaster.BroadcastMicrophoneStateChanged(false, sessionActive)
return nil return nil
} }
// MuteMicrophone sets the microphone mute state by controlling the microphone process // MuteMicrophone sets the microphone mute state by controlling data flow (like audio output)
func (s *AudioControlService) MuteMicrophone(muted bool) error { func (s *AudioControlService) MuteMicrophone(muted bool) error {
if muted { if muted {
// Mute: Stop microphone process // Mute: Control data flow, don't stop subprocess (like audio output)
err := s.StopMicrophone() SetMicrophoneMuted(true)
if err != nil { s.logger.Info().Msg("microphone muted (data flow disabled)")
s.logger.Error().Err(err).Msg("failed to stop microphone during mute")
return err
}
s.logger.Info().Msg("microphone muted (process stopped)")
} else { } else {
// Unmute: Start microphone process // Unmute: Ensure subprocess is running, then enable data flow
err := s.StartMicrophone() if !s.sessionProvider.IsSessionActive() {
if err != nil { return errors.New("no active session for microphone unmute")
s.logger.Error().Err(err).Msg("failed to start microphone during unmute")
return err
}
s.logger.Info().Msg("microphone unmuted (process started)")
} }
// Broadcast microphone mute state change via WebSocket audioInputManager := s.sessionProvider.GetAudioInputManager()
if audioInputManager == nil {
return errors.New("audio input manager not available")
}
// Start subprocess if not already running (async, non-blocking)
if !audioInputManager.IsRunning() {
go func() {
if err := audioInputManager.Start(); err != nil {
s.logger.Error().Err(err).Msg("failed to start microphone during unmute")
}
}()
}
// Enable data flow immediately
SetMicrophoneMuted(false)
s.logger.Info().Msg("microphone unmuted (data flow enabled)")
}
// Broadcast microphone state change via WebSocket
broadcaster := GetAudioEventBroadcaster() broadcaster := GetAudioEventBroadcaster()
broadcaster.BroadcastAudioDeviceChanged(!muted, "microphone_mute_changed") sessionActive := s.sessionProvider.IsSessionActive()
// Get actual subprocess running status (not mute status)
var subprocessRunning bool
if sessionActive {
audioInputManager := s.sessionProvider.GetAudioInputManager()
if audioInputManager != nil {
subprocessRunning = audioInputManager.IsRunning()
}
}
broadcaster.BroadcastMicrophoneStateChanged(subprocessRunning, sessionActive)
return nil return nil
} }
@ -265,5 +299,6 @@ func (s *AudioControlService) IsMicrophoneActive() bool {
return false return false
} }
// For Enable/Disable buttons, we check subprocess status
return audioInputManager.IsRunning() return audioInputManager.IsRunning()
} }

View File

@ -91,6 +91,11 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
return nil // Not running, silently drop return nil // Not running, silently drop
} }
// Check mute state - drop frames if microphone is muted (like audio output)
if IsMicrophoneMuted() {
return nil // Muted, silently drop
}
// Use ultra-fast validation for critical audio path // Use ultra-fast validation for critical audio path
if err := ValidateAudioFrame(frame); err != nil { if err := ValidateAudioFrame(frame); err != nil {
aim.logComponentError(AudioInputManagerComponent, err, "Frame validation failed") aim.logComponentError(AudioInputManagerComponent, err, "Frame validation failed")
@ -128,6 +133,11 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
return nil // Not running, silently drop return nil // Not running, silently drop
} }
// Check mute state - drop frames if microphone is muted (like audio output)
if IsMicrophoneMuted() {
return nil // Muted, silently drop
}
if frame == nil { if frame == nil {
atomic.AddInt64(&aim.metrics.FramesDropped, 1) atomic.AddInt64(&aim.metrics.FramesDropped, 1)
return nil return nil

View File

@ -10,6 +10,7 @@ import (
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
"sync/atomic"
"syscall" "syscall"
"time" "time"
) )
@ -48,71 +49,94 @@ func (ais *AudioInputSupervisor) SetOpusConfig(bitrate, complexity, vbr, signalT
} }
} }
// Start starts the audio input server subprocess // Start begins supervising the audio input server process
func (ais *AudioInputSupervisor) Start() error { func (ais *AudioInputSupervisor) Start() error {
ais.mutex.Lock() if !atomic.CompareAndSwapInt32(&ais.running, 0, 1) {
defer ais.mutex.Unlock() return fmt.Errorf("audio input supervisor is already running")
if ais.IsRunning() {
if ais.cmd != nil && ais.cmd.Process != nil {
return fmt.Errorf("audio input supervisor already running with PID %d", ais.cmd.Process.Pid)
}
return fmt.Errorf("audio input supervisor already running")
} }
// Check for existing audio input server process ais.logSupervisorStart()
if existingPID, err := ais.findExistingAudioInputProcess(); err == nil { ais.createContext()
ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process, connecting to it")
// Try to connect to the existing process // Recreate channels in case they were closed by a previous Stop() call
ais.setRunning(true) ais.initializeChannels()
go ais.connectClient()
// Start the supervision loop
go ais.supervisionLoop()
ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component started successfully")
return nil return nil
} }
// Create context for subprocess management // supervisionLoop is the main supervision loop
ais.createContext() func (ais *AudioInputSupervisor) supervisionLoop() {
defer func() {
ais.closeProcessDone()
ais.logger.Info().Msg("audio input server supervision ended")
}()
// Get current executable path for atomic.LoadInt32(&ais.running) == 1 {
select {
case <-ais.stopChan:
ais.logger.Info().Msg("received stop signal")
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
case <-ais.ctx.Done():
ais.logger.Info().Msg("context cancelled")
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
default:
// Start the process
if err := ais.startProcess(); err != nil {
ais.logger.Error().Err(err).Msg("failed to start audio input server process")
return
}
// Wait for process to exit
ais.waitForProcessExit("audio input server")
return // Single run, no restart logic for now
}
}
}
// startProcess starts the audio input server process
func (ais *AudioInputSupervisor) startProcess() error {
execPath, err := os.Executable() execPath, err := os.Executable()
if err != nil { if err != nil {
return fmt.Errorf("failed to get executable path: %w", err) return fmt.Errorf("failed to get executable path: %w", err)
} }
ais.mutex.Lock()
defer ais.mutex.Unlock()
// Build command arguments (only subprocess flag) // Build command arguments (only subprocess flag)
args := []string{"--audio-input-server"} args := []string{"--audio-input-server"}
// Create command for audio input server subprocess // Create new command
cmd := exec.CommandContext(ais.ctx, execPath, args...) ais.cmd = exec.CommandContext(ais.ctx, execPath, args...)
ais.cmd.Stdout = os.Stdout
ais.cmd.Stderr = os.Stderr
// Set environment variables for IPC and OPUS configuration // Set environment variables for IPC and OPUS configuration
env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode
env = append(env, ais.opusEnv...) // Add OPUS configuration env = append(env, ais.opusEnv...) // Add OPUS configuration
cmd.Env = env ais.cmd.Env = env
// Set process group to allow clean termination // Set process group to allow clean termination
cmd.SysProcAttr = &syscall.SysProcAttr{ ais.cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true, Setpgid: true,
} }
ais.cmd = cmd // Start the process
ais.setRunning(true) if err := ais.cmd.Start(); err != nil {
// Start the subprocess
err = cmd.Start()
if err != nil {
ais.setRunning(false)
ais.cancelContext()
return fmt.Errorf("failed to start audio input server process: %w", err) return fmt.Errorf("failed to start audio input server process: %w", err)
} }
ais.logger.Info().Int("pid", cmd.Process.Pid).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("Audio input server subprocess started") ais.processPID = ais.cmd.Process.Pid
ais.logger.Info().Int("pid", ais.processPID).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("audio input server process started")
// Add process to monitoring // Add process to monitoring
ais.processMonitor.AddProcess(cmd.Process.Pid, "audio-input-server") ais.processMonitor.AddProcess(ais.processPID, "audio-input-server")
// Monitor the subprocess in a goroutine
go ais.monitorSubprocess()
// Connect client to the server // Connect client to the server
go ais.connectClient() go ais.connectClient()
@ -120,13 +144,10 @@ func (ais *AudioInputSupervisor) Start() error {
return nil return nil
} }
// Stop stops the audio input server subprocess // Stop gracefully stops the audio input server and supervisor
func (ais *AudioInputSupervisor) Stop() { func (ais *AudioInputSupervisor) Stop() {
ais.mutex.Lock() if !atomic.CompareAndSwapInt32(&ais.running, 1, 0) {
defer ais.mutex.Unlock() return // Already stopped
if !ais.IsRunning() {
return
} }
ais.logSupervisorStop() ais.logSupervisorStop()
@ -136,71 +157,20 @@ func (ais *AudioInputSupervisor) Stop() {
ais.client.Disconnect() ais.client.Disconnect()
} }
// Cancel context to signal subprocess to stop // Signal stop and wait for cleanup
ais.closeStopChan()
ais.cancelContext() ais.cancelContext()
// Try graceful termination first // Wait for process to exit
if ais.cmd != nil && ais.cmd.Process != nil {
pid := ais.cmd.Process.Pid
ais.logger.Info().Int("pid", pid).Msg("Stopping audio input server subprocess")
// Send SIGTERM
err := ais.cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
ais.logger.Warn().Err(err).Msg("Failed to send SIGTERM to audio input server")
}
// Wait for graceful shutdown with timeout
done := make(chan error, 1)
var waitErr error
go func() {
waitErr = ais.cmd.Wait()
done <- waitErr
}()
select { select {
case <-done: case <-ais.processDone:
if waitErr != nil { ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped gracefully")
ais.logger.Info().Err(waitErr).Msg("Audio input server subprocess stopped with error")
} else {
ais.logger.Info().Msg("Audio input server subprocess stopped gracefully")
}
case <-time.After(GetConfig().InputSupervisorTimeout): case <-time.After(GetConfig().InputSupervisorTimeout):
// Force kill if graceful shutdown failed ais.logger.Warn().Str("component", "audio-input-supervisor").Msg("component did not stop gracefully, forcing termination")
ais.logger.Warn().Msg("Audio input server subprocess did not stop gracefully, force killing") ais.forceKillProcess("audio input server")
// Use a more robust approach to check if process is still alive
if ais.cmd != nil && ais.cmd.Process != nil {
// Try to send signal 0 to check if process exists
if err := ais.cmd.Process.Signal(syscall.Signal(0)); err == nil {
// Process is still alive, force kill it
if killErr := ais.cmd.Process.Kill(); killErr != nil {
// Only log error if it's not "process already finished"
if !strings.Contains(killErr.Error(), "process already finished") {
ais.logger.Error().Err(killErr).Msg("Failed to kill audio input server subprocess")
} else {
ais.logger.Debug().Msg("Audio input server subprocess already finished during kill attempt")
}
} else {
ais.logger.Info().Msg("Audio input server subprocess force killed successfully")
}
} else {
ais.logger.Debug().Msg("Audio input server subprocess already finished")
}
// Wait a bit for the kill to take effect and collect the exit status
go func() {
select {
case <-done:
// Process finished
case <-time.After(1 * time.Second):
// Give up waiting
}
}()
}
}
} }
ais.setRunning(false) ais.logger.Info().Str("component", "audio-input-supervisor").Msg("component stopped")
ais.cmd = nil
} }
// IsConnected returns whether the client is connected to the audio input server // IsConnected returns whether the client is connected to the audio input server
@ -218,42 +188,6 @@ func (ais *AudioInputSupervisor) GetClient() *AudioInputClient {
return ais.client return ais.client
} }
// monitorSubprocess monitors the subprocess and handles unexpected exits
func (ais *AudioInputSupervisor) monitorSubprocess() {
if ais.cmd == nil || ais.cmd.Process == nil {
return
}
pid := ais.cmd.Process.Pid
err := ais.cmd.Wait()
// Remove process from monitoring
ais.processMonitor.RemoveProcess(pid)
ais.mutex.Lock()
defer ais.mutex.Unlock()
if ais.IsRunning() {
// Unexpected exit
if err != nil {
ais.logger.Error().Err(err).Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly")
} else {
ais.logger.Warn().Int("pid", pid).Msg("Audio input server subprocess exited unexpectedly")
}
// Disconnect client
if ais.client != nil {
ais.client.Disconnect()
}
// Mark as not running first to prevent race conditions
ais.setRunning(false)
ais.cmd = nil
ais.logger.Info().Int("pid", pid).Msg("Audio input server subprocess monitoring stopped")
}
}
// connectClient attempts to connect the client to the server // connectClient attempts to connect the client to the server
func (ais *AudioInputSupervisor) connectClient() { func (ais *AudioInputSupervisor) connectClient() {
// Wait briefly for the server to start and create socket // Wait briefly for the server to start and create socket

View File

@ -884,14 +884,19 @@ func (ais *AudioInputServer) startReaderGoroutine() {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
for { for ais.running {
select { ais.mtx.Lock()
case <-ais.stopChan: conn := ais.conn
return ais.mtx.Unlock()
default:
if ais.conn != nil { if conn == nil {
msg, err := ais.readMessage(ais.conn) time.Sleep(10 * time.Millisecond)
continue
}
msg, err := ais.readMessage(conn)
if err != nil { if err != nil {
if ais.running {
// Enhanced error handling with progressive backoff // Enhanced error handling with progressive backoff
now := time.Now() now := time.Now()
@ -932,6 +937,7 @@ func (ais *AudioInputServer) startReaderGoroutine() {
consecutiveErrors = 0 // Reset for next connection consecutiveErrors = 0 // Reset for next connection
} }
}
continue continue
} }
@ -957,11 +963,6 @@ func (ais *AudioInputServer) startReaderGoroutine() {
logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame") logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame")
} }
} }
} else {
// No connection, wait briefly before checking again
time.Sleep(GetConfig().DefaultSleepDuration)
}
}
} }
} }

View File

@ -8,6 +8,7 @@ import (
"os/exec" "os/exec"
"sync" "sync"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/jetkvm/kvm/internal/logging" "github.com/jetkvm/kvm/internal/logging"
@ -32,6 +33,12 @@ type BaseSupervisor struct {
// Exit tracking // Exit tracking
lastExitCode int lastExitCode int
lastExitTime time.Time lastExitTime time.Time
// Channel management
stopChan chan struct{}
processDone chan struct{}
stopChanClosed bool
processDoneClosed bool
} }
// NewBaseSupervisor creates a new base supervisor // NewBaseSupervisor creates a new base supervisor
@ -40,6 +47,8 @@ func NewBaseSupervisor(componentName string) *BaseSupervisor {
return &BaseSupervisor{ return &BaseSupervisor{
logger: &logger, logger: &logger,
processMonitor: GetProcessMonitor(), processMonitor: GetProcessMonitor(),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
} }
} }
@ -48,15 +57,6 @@ func (bs *BaseSupervisor) IsRunning() bool {
return atomic.LoadInt32(&bs.running) == 1 return atomic.LoadInt32(&bs.running) == 1
} }
// setRunning atomically sets the running state
func (bs *BaseSupervisor) setRunning(running bool) {
if running {
atomic.StoreInt32(&bs.running, 1)
} else {
atomic.StoreInt32(&bs.running, 0)
}
}
// GetProcessPID returns the current process PID // GetProcessPID returns the current process PID
func (bs *BaseSupervisor) GetProcessPID() int { func (bs *BaseSupervisor) GetProcessPID() int {
bs.mutex.RLock() bs.mutex.RLock()
@ -92,3 +92,130 @@ func (bs *BaseSupervisor) cancelContext() {
bs.cancel() bs.cancel()
} }
} }
// initializeChannels recreates channels for a new supervision cycle
func (bs *BaseSupervisor) initializeChannels() {
bs.mutex.Lock()
defer bs.mutex.Unlock()
bs.stopChan = make(chan struct{})
bs.processDone = make(chan struct{})
bs.stopChanClosed = false
bs.processDoneClosed = false
}
// closeStopChan safely closes the stop channel
func (bs *BaseSupervisor) closeStopChan() {
bs.mutex.Lock()
defer bs.mutex.Unlock()
if !bs.stopChanClosed {
close(bs.stopChan)
bs.stopChanClosed = true
}
}
// closeProcessDone safely closes the process done channel
func (bs *BaseSupervisor) closeProcessDone() {
bs.mutex.Lock()
defer bs.mutex.Unlock()
if !bs.processDoneClosed {
close(bs.processDone)
bs.processDoneClosed = true
}
}
// terminateProcess gracefully terminates the current process with configurable timeout
func (bs *BaseSupervisor) terminateProcess(timeout time.Duration, processType string) {
bs.mutex.RLock()
cmd := bs.cmd
pid := bs.processPID
bs.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
bs.logger.Info().Int("pid", pid).Msgf("terminating %s process", processType)
// Send SIGTERM first
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
bs.logger.Warn().Err(err).Int("pid", pid).Msgf("failed to send SIGTERM to %s process", processType)
}
// Wait for graceful shutdown
done := make(chan struct{})
go func() {
_ = cmd.Wait()
close(done)
}()
select {
case <-done:
bs.logger.Info().Int("pid", pid).Msgf("%s process terminated gracefully", processType)
case <-time.After(timeout):
bs.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL")
bs.forceKillProcess(processType)
}
}
// forceKillProcess forcefully kills the current process
func (bs *BaseSupervisor) forceKillProcess(processType string) {
bs.mutex.RLock()
cmd := bs.cmd
pid := bs.processPID
bs.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
bs.logger.Warn().Int("pid", pid).Msgf("force killing %s process", processType)
if err := cmd.Process.Kill(); err != nil {
bs.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process")
}
}
// waitForProcessExit waits for the current process to exit and logs the result
func (bs *BaseSupervisor) waitForProcessExit(processType string) {
bs.mutex.RLock()
cmd := bs.cmd
pid := bs.processPID
bs.mutex.RUnlock()
if cmd == nil {
return
}
// Wait for process to exit
err := cmd.Wait()
bs.mutex.Lock()
bs.lastExitTime = time.Now()
bs.processPID = 0
var exitCode int
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = exitError.ExitCode()
} else {
// Process was killed or other error
exitCode = -1
}
} else {
exitCode = 0
}
bs.lastExitCode = exitCode
bs.mutex.Unlock()
// Remove process from monitoring
bs.processMonitor.RemoveProcess(pid)
if exitCode != 0 {
bs.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msgf("%s process exited with error", processType)
} else {
bs.logger.Info().Int("pid", pid).Msgf("%s process exited gracefully", processType)
}
}

View File

@ -9,7 +9,6 @@ import (
"os/exec" "os/exec"
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"syscall"
"time" "time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -44,12 +43,6 @@ type AudioOutputSupervisor struct {
// Restart management // Restart management
restartAttempts []time.Time restartAttempts []time.Time
// Channel management
stopChan chan struct{}
processDone chan struct{}
stopChanClosed bool // Track if stopChan is closed
processDoneClosed bool // Track if processDone is closed
// Environment variables for OPUS configuration // Environment variables for OPUS configuration
opusEnv []string opusEnv []string
@ -64,8 +57,6 @@ func NewAudioOutputSupervisor() *AudioOutputSupervisor {
return &AudioOutputSupervisor{ return &AudioOutputSupervisor{
BaseSupervisor: NewBaseSupervisor("audio-output-supervisor"), BaseSupervisor: NewBaseSupervisor("audio-output-supervisor"),
restartAttempts: make([]time.Time, 0), restartAttempts: make([]time.Time, 0),
stopChan: make(chan struct{}),
processDone: make(chan struct{}),
} }
} }
@ -110,12 +101,10 @@ func (s *AudioOutputSupervisor) Start() error {
s.createContext() s.createContext()
// Recreate channels in case they were closed by a previous Stop() call // Recreate channels in case they were closed by a previous Stop() call
s.mutex.Lock() s.initializeChannels()
s.processDone = make(chan struct{})
s.stopChan = make(chan struct{})
s.stopChanClosed = false // Reset channel closed flag
s.processDoneClosed = false // Reset channel closed flag
// Reset restart tracking on start // Reset restart tracking on start
s.mutex.Lock()
s.restartAttempts = s.restartAttempts[:0] s.restartAttempts = s.restartAttempts[:0]
s.mutex.Unlock() s.mutex.Unlock()
@ -135,21 +124,16 @@ func (s *AudioOutputSupervisor) Stop() {
s.logSupervisorStop() s.logSupervisorStop()
// Signal stop and wait for cleanup // Signal stop and wait for cleanup
s.mutex.Lock() s.closeStopChan()
if !s.stopChanClosed {
close(s.stopChan)
s.stopChanClosed = true
}
s.mutex.Unlock()
s.cancelContext() s.cancelContext()
// Wait for process to exit // Wait for process to exit
select { select {
case <-s.processDone: case <-s.processDone:
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped gracefully") s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped gracefully")
case <-time.After(GetConfig().SupervisorTimeout): case <-time.After(GetConfig().OutputSupervisorTimeout):
s.logger.Warn().Str("component", AudioOutputSupervisorComponent).Msg("component did not stop gracefully, forcing termination") s.logger.Warn().Str("component", AudioOutputSupervisorComponent).Msg("component did not stop gracefully, forcing termination")
s.forceKillProcess() s.forceKillProcess("audio output server")
} }
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped") s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped")
@ -158,12 +142,7 @@ func (s *AudioOutputSupervisor) Stop() {
// supervisionLoop is the main supervision loop // supervisionLoop is the main supervision loop
func (s *AudioOutputSupervisor) supervisionLoop() { func (s *AudioOutputSupervisor) supervisionLoop() {
defer func() { defer func() {
s.mutex.Lock() s.closeProcessDone()
if !s.processDoneClosed {
close(s.processDone)
s.processDoneClosed = true
}
s.mutex.Unlock()
s.logger.Info().Msg("audio server supervision ended") s.logger.Info().Msg("audio server supervision ended")
}() }()
@ -171,11 +150,11 @@ func (s *AudioOutputSupervisor) supervisionLoop() {
select { select {
case <-s.stopChan: case <-s.stopChan:
s.logger.Info().Msg("received stop signal") s.logger.Info().Msg("received stop signal")
s.terminateProcess() s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return return
case <-s.ctx.Done(): case <-s.ctx.Done():
s.logger.Info().Msg("context cancelled") s.logger.Info().Msg("context cancelled")
s.terminateProcess() s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return return
default: default:
// Start or restart the process // Start or restart the process
@ -282,52 +261,23 @@ func (s *AudioOutputSupervisor) startProcess() error {
return nil return nil
} }
// waitForProcessExit waits for the current process to exit and logs the result // waitForProcessExit waits for the current process to exit and handles restart logic
func (s *AudioOutputSupervisor) waitForProcessExit() { func (s *AudioOutputSupervisor) waitForProcessExit() {
s.mutex.RLock() s.mutex.RLock()
cmd := s.cmd
pid := s.processPID pid := s.processPID
s.mutex.RUnlock() s.mutex.RUnlock()
if cmd == nil { // Use base supervisor's waitForProcessExit
return s.BaseSupervisor.waitForProcessExit("audio output server")
}
// Wait for process to exit // Handle output-specific logic (restart tracking and callbacks)
err := cmd.Wait() s.mutex.RLock()
exitCode := s.lastExitCode
s.mutex.Lock() s.mutex.RUnlock()
s.lastExitTime = time.Now()
s.processPID = 0
var exitCode int
var crashed bool
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = exitError.ExitCode()
crashed = exitCode != 0
} else {
// Process was killed or other error
exitCode = -1
crashed = true
}
} else {
exitCode = 0
crashed = false
}
s.lastExitCode = exitCode
s.mutex.Unlock()
// Remove process from monitoring
s.processMonitor.RemoveProcess(pid)
crashed := exitCode != 0
if crashed { if crashed {
s.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio output server process crashed")
s.recordRestartAttempt() s.recordRestartAttempt()
} else {
s.logger.Info().Int("pid", pid).Msg("audio output server process exited gracefully")
} }
if s.onProcessExit != nil { if s.onProcessExit != nil {
@ -335,57 +285,6 @@ func (s *AudioOutputSupervisor) waitForProcessExit() {
} }
} }
// terminateProcess gracefully terminates the current process
func (s *AudioOutputSupervisor) terminateProcess() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
s.logger.Info().Int("pid", pid).Msg("terminating audio output server process")
// Send SIGTERM first
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
s.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM to audio output server process")
}
// Wait for graceful shutdown
done := make(chan struct{})
go func() {
_ = cmd.Wait()
close(done)
}()
select {
case <-done:
s.logger.Info().Int("pid", pid).Msg("audio server process terminated gracefully")
case <-time.After(GetConfig().OutputSupervisorTimeout):
s.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL")
s.forceKillProcess()
}
}
// forceKillProcess forcefully kills the current process
func (s *AudioOutputSupervisor) forceKillProcess() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
s.logger.Warn().Int("pid", pid).Msg("force killing audio server process")
if err := cmd.Process.Kill(); err != nil {
s.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process")
}
}
// shouldRestart determines if the process should be restarted // shouldRestart determines if the process should be restarted
func (s *AudioOutputSupervisor) shouldRestart() bool { func (s *AudioOutputSupervisor) shouldRestart() bool {
if atomic.LoadInt32(&s.running) == 0 { if atomic.LoadInt32(&s.running) == 0 {

View File

@ -61,14 +61,15 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
// Use WebSocket-based audio events for real-time updates // Use WebSocket-based audio events for real-time updates
const { const {
audioMuted, audioMuted,
// microphoneState - now using hook state instead
isConnected: wsConnected isConnected: wsConnected
} = useAudioEvents(); } = useAudioEvents();
// WebSocket-only implementation - no fallback polling // WebSocket-only implementation - no fallback polling
// Microphone state from props // Microphone state from props (keeping hook for legacy device operations)
const { const {
isMicrophoneActive, isMicrophoneActive: isMicrophoneActiveFromHook,
startMicrophone, startMicrophone,
stopMicrophone, stopMicrophone,
syncMicrophoneState, syncMicrophoneState,
@ -82,6 +83,9 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
const isMuted = audioMuted ?? false; const isMuted = audioMuted ?? false;
const isConnected = wsConnected; const isConnected = wsConnected;
// Note: We now use hook state instead of WebSocket state for microphone Enable/Disable
// const isMicrophoneActiveFromWS = microphoneState?.running ?? false;
// Audio devices // Audio devices
@ -197,7 +201,7 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
} }
}; };
const handleToggleMicrophoneMute = async () => { const handleToggleMicrophoneEnable = async () => {
const now = Date.now(); const now = Date.now();
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click // Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
@ -206,24 +210,27 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
} }
setLastClickTime(now); setLastClickTime(now);
setIsLoading(true);
try { try {
if (isMicrophoneActive) { if (isMicrophoneActiveFromHook) {
// Microphone is active: stop the microphone process and WebRTC tracks // Disable: Stop microphone subprocess AND remove WebRTC tracks
const result = await stopMicrophone(); const result = await stopMicrophone();
if (!result.success && result.error) { if (!result.success) {
notifications.error(result.error.message); throw new Error(result.error?.message || "Failed to stop microphone");
} }
} else { } else {
// Microphone is inactive: start the microphone process and WebRTC tracks // Enable: Start microphone subprocess AND add WebRTC tracks
const result = await startMicrophone(selectedInputDevice); const result = await startMicrophone();
if (!result.success && result.error) { if (!result.success) {
notifications.error(result.error.message); throw new Error(result.error?.message || "Failed to start microphone");
} }
} }
} catch (error) { } catch (error) {
const errorMessage = error instanceof Error ? error.message : "Failed to toggle microphone"; const errorMessage = error instanceof Error ? error.message : "Failed to toggle microphone";
notifications.error(errorMessage); notifications.error(errorMessage);
} finally {
setIsLoading(false);
} }
}; };
@ -231,8 +238,8 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
const handleMicrophoneDeviceChange = async (deviceId: string) => { const handleMicrophoneDeviceChange = async (deviceId: string) => {
setSelectedInputDevice(deviceId); setSelectedInputDevice(deviceId);
// If microphone is currently active (unmuted), restart it with the new device // If microphone is currently active, restart it with the new device
if (isMicrophoneActive) { if (isMicrophoneActiveFromHook) {
try { try {
// Stop current microphone // Stop current microphone
await stopMicrophone(); await stopMicrophone();
@ -317,26 +324,21 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
<div className="flex items-center justify-between rounded-lg bg-slate-50 p-3 dark:bg-slate-700"> <div className="flex items-center justify-between rounded-lg bg-slate-50 p-3 dark:bg-slate-700">
<div className="flex items-center gap-3"> <div className="flex items-center gap-3">
{isMicrophoneActive ? ( {isMicrophoneActiveFromHook ? (
<MdMic className="h-5 w-5 text-green-500" /> <MdMic className="h-5 w-5 text-green-500" />
) : ( ) : (
<MdMicOff className="h-5 w-5 text-red-500" /> <MdMicOff className="h-5 w-5 text-red-500" />
)} )}
<span className="font-medium text-slate-900 dark:text-slate-100"> <span className="font-medium text-slate-900 dark:text-slate-100">
{isMicrophoneActive ? "Unmuted" : "Muted"} {isMicrophoneActiveFromHook ? "Enabled" : "Disabled"}
</span> </span>
</div> </div>
<Button <Button
size="SM" size="SM"
theme={isMicrophoneActive ? "danger" : "primary"} theme={isMicrophoneActiveFromHook ? "danger" : "primary"}
text={ text={isMicrophoneActiveFromHook ? "Disable" : "Enable"}
isStarting ? "Enabling..." : onClick={handleToggleMicrophoneEnable}
isStopping ? "Disabling..." : disabled={isLoading}
isMicrophoneActive ? "Disable" : "Enable"
}
onClick={handleToggleMicrophoneMute}
disabled={isStarting || isStopping || isToggling}
loading={isStarting || isStopping}
/> />
</div> </div>
@ -378,7 +380,7 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
</option> </option>
))} ))}
</select> </select>
{isMicrophoneActive && ( {isMicrophoneActiveFromHook && (
<p className="text-xs text-slate-500 dark:text-slate-400"> <p className="text-xs text-slate-500 dark:text-slate-400">
Changing device will restart the microphone Changing device will restart the microphone
</p> </p>
@ -415,7 +417,7 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
</div> </div>
{/* Microphone Quality Settings */} {/* Microphone Quality Settings */}
{isMicrophoneActive && ( {isMicrophoneActiveFromHook && (
<div className="space-y-3"> <div className="space-y-3">
<div className="flex items-center gap-2"> <div className="flex items-center gap-2">
<MdMic className="h-4 w-4 text-slate-600 dark:text-slate-400" /> <MdMic className="h-4 w-4 text-slate-600 dark:text-slate-400" />
@ -429,13 +431,13 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
<button <button
key={`mic-${quality}`} key={`mic-${quality}`}
onClick={() => handleMicrophoneQualityChange(parseInt(quality))} onClick={() => handleMicrophoneQualityChange(parseInt(quality))}
disabled={isStarting || isStopping || isToggling} disabled={isLoading}
className={cx( className={cx(
"rounded-md border px-3 py-2 text-sm font-medium transition-colors", "rounded-md border px-3 py-2 text-sm font-medium transition-colors",
currentMicrophoneConfig?.Quality === parseInt(quality) currentMicrophoneConfig?.Quality === parseInt(quality)
? "border-green-500 bg-green-50 text-green-700 dark:bg-green-900/20 dark:text-green-300" ? "border-green-500 bg-green-50 text-green-700 dark:bg-green-900/20 dark:text-green-300"
: "border-slate-200 bg-white text-slate-700 hover:bg-slate-50 dark:border-slate-600 dark:bg-slate-700 dark:text-slate-300 dark:hover:bg-slate-600", : "border-slate-200 bg-white text-slate-700 hover:bg-slate-50 dark:border-slate-600 dark:bg-slate-700 dark:text-slate-300 dark:hover:bg-slate-600",
(isStarting || isStopping || isToggling) && "opacity-50 cursor-not-allowed" isLoading && "opacity-50 cursor-not-allowed"
)} )}
> >
{label} {label}