diff --git a/internal/audio/input.go b/internal/audio/input.go index 5121687..300eb61 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -166,3 +166,12 @@ func (aim *AudioInputManager) LogPerformanceStats() { func (aim *AudioInputManager) IsRunning() bool { return atomic.LoadInt32(&aim.running) == 1 } + +// IsReady returns whether the audio input manager is ready to receive frames +// This checks both that it's running and that the IPC connection is established +func (aim *AudioInputManager) IsReady() bool { + if !aim.IsRunning() { + return false + } + return aim.ipcManager.IsReady() +} diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 6a33458..45bb7ed 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -337,14 +337,20 @@ func (aic *AudioInputClient) Connect() error { socketPath := getInputSocketPath() // Try connecting multiple times as the server might not be ready - for i := 0; i < 5; i++ { + // Reduced retry count and delay for faster startup + for i := 0; i < 10; i++ { conn, err := net.Dial("unix", socketPath) if err == nil { aic.conn = conn aic.running = true return nil } - time.Sleep(time.Second) + // Exponential backoff starting at 50ms + delay := time.Duration(50*(1< 500*time.Millisecond { + delay = 500 * time.Millisecond + } + time.Sleep(delay) } return fmt.Errorf("failed to connect to audio input server") diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go index d28edc2..4a673d9 100644 --- a/internal/audio/input_ipc_manager.go +++ b/internal/audio/input_ipc_manager.go @@ -48,8 +48,8 @@ func (aim *AudioInputIPCManager) Start() error { FrameSize: 960, // 20ms at 48kHz } - // Wait a bit for the subprocess to be ready - time.Sleep(time.Second) + // Wait briefly for the subprocess to be ready (reduced from 1 second) + time.Sleep(200 * time.Millisecond) err = aim.supervisor.SendConfig(config) if err != nil { @@ -109,11 +109,20 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { return nil } -// IsRunning returns whether the IPC audio input system is running +// IsRunning returns whether the IPC manager is running func (aim *AudioInputIPCManager) IsRunning() bool { return atomic.LoadInt32(&aim.running) == 1 } +// IsReady returns whether the IPC manager is ready to receive frames +// This checks that the supervisor is connected to the audio input server +func (aim *AudioInputIPCManager) IsReady() bool { + if !aim.IsRunning() { + return false + } + return aim.supervisor.IsConnected() +} + // GetMetrics returns current metrics func (aim *AudioInputIPCManager) GetMetrics() AudioInputMetrics { return AudioInputMetrics{ diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 5ce4eec..ae2b941 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -15,19 +15,21 @@ import ( // AudioInputSupervisor manages the audio input server subprocess type AudioInputSupervisor struct { - cmd *exec.Cmd - cancel context.CancelFunc - mtx sync.Mutex - running bool - logger zerolog.Logger - client *AudioInputClient + cmd *exec.Cmd + cancel context.CancelFunc + mtx sync.Mutex + running bool + logger zerolog.Logger + client *AudioInputClient + processMonitor *ProcessMonitor } // NewAudioInputSupervisor creates a new audio input supervisor func NewAudioInputSupervisor() *AudioInputSupervisor { return &AudioInputSupervisor{ - logger: logging.GetDefaultLogger().With().Str("component", "audio-input-supervisor").Logger(), - client: NewAudioInputClient(), + logger: logging.GetDefaultLogger().With().Str("component", "audio-input-supervisor").Logger(), + client: NewAudioInputClient(), + processMonitor: GetProcessMonitor(), } } @@ -75,6 +77,9 @@ func (ais *AudioInputSupervisor) Start() error { ais.logger.Info().Int("pid", cmd.Process.Pid).Msg("Audio input server subprocess started") + // Add process to monitoring + ais.processMonitor.AddProcess(cmd.Process.Pid, "audio-input-server") + // Monitor the subprocess in a goroutine go ais.monitorSubprocess() @@ -145,19 +150,50 @@ func (ais *AudioInputSupervisor) IsRunning() bool { return ais.running } +// IsConnected returns whether the client is connected to the audio input server +func (ais *AudioInputSupervisor) IsConnected() bool { + if !ais.IsRunning() { + return false + } + return ais.client.IsConnected() +} + // GetClient returns the IPC client for sending audio frames func (ais *AudioInputSupervisor) GetClient() *AudioInputClient { return ais.client } +// GetProcessMetrics returns current process metrics if the process is running +func (ais *AudioInputSupervisor) GetProcessMetrics() *ProcessMetrics { + ais.mtx.Lock() + defer ais.mtx.Unlock() + + if ais.cmd == nil || ais.cmd.Process == nil { + return nil + } + + pid := ais.cmd.Process.Pid + metrics := ais.processMonitor.GetCurrentMetrics() + for _, metric := range metrics { + if metric.PID == pid { + return &metric + } + } + return nil +} + // monitorSubprocess monitors the subprocess and handles unexpected exits func (ais *AudioInputSupervisor) monitorSubprocess() { if ais.cmd == nil { return } + pid := ais.cmd.Process.Pid err := ais.cmd.Wait() + // Remove process from monitoring + ais.processMonitor.RemoveProcess(pid) + ais.mtx.Lock() defer ais.mtx.Unlock() @@ -184,8 +220,8 @@ func (ais *AudioInputSupervisor) monitorSubprocess() { // connectClient attempts to connect the client to the server func (ais *AudioInputSupervisor) connectClient() { - // Wait a bit for the server to start - time.Sleep(500 * time.Millisecond) + // Wait briefly for the server to start (reduced from 500ms) + time.Sleep(100 * time.Millisecond) err := ais.client.Connect() if err != nil { diff --git a/internal/audio/metrics.go b/internal/audio/metrics.go new file mode 100644 index 0000000..7a09ed9 --- /dev/null +++ b/internal/audio/metrics.go @@ -0,0 +1,410 @@ +package audio + +import ( + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + // Audio output metrics + audioFramesReceivedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_audio_frames_received_total", + Help: "Total number of audio frames received", + }, + ) + + audioFramesDroppedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_audio_frames_dropped_total", + Help: "Total number of audio frames dropped", + }, + ) + + audioBytesProcessedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_audio_bytes_processed_total", + Help: "Total number of audio bytes processed", + }, + ) + + audioConnectionDropsTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_audio_connection_drops_total", + Help: "Total number of audio connection drops", + }, + ) + + audioAverageLatencySeconds = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_average_latency_seconds", + Help: "Average audio latency in seconds", + }, + ) + + audioLastFrameTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_last_frame_timestamp_seconds", + Help: "Timestamp of the last audio frame received", + }, + ) + + // Microphone input metrics + microphoneFramesSentTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_microphone_frames_sent_total", + Help: "Total number of microphone frames sent", + }, + ) + + microphoneFramesDroppedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_microphone_frames_dropped_total", + Help: "Total number of microphone frames dropped", + }, + ) + + microphoneBytesProcessedTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_microphone_bytes_processed_total", + Help: "Total number of microphone bytes processed", + }, + ) + + microphoneConnectionDropsTotal = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_microphone_connection_drops_total", + Help: "Total number of microphone connection drops", + }, + ) + + microphoneAverageLatencySeconds = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_average_latency_seconds", + Help: "Average microphone latency in seconds", + }, + ) + + microphoneLastFrameTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_last_frame_timestamp_seconds", + Help: "Timestamp of the last microphone frame sent", + }, + ) + + // Audio subprocess process metrics + audioProcessCpuPercent = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_process_cpu_percent", + Help: "CPU usage percentage of audio output subprocess", + }, + ) + + audioProcessMemoryPercent = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_process_memory_percent", + Help: "Memory usage percentage of audio output subprocess", + }, + ) + + audioProcessMemoryRssBytes = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_process_memory_rss_bytes", + Help: "RSS memory usage in bytes of audio output subprocess", + }, + ) + + audioProcessMemoryVmsBytes = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_process_memory_vms_bytes", + Help: "VMS memory usage in bytes of audio output subprocess", + }, + ) + + audioProcessRunning = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_process_running", + Help: "Whether audio output subprocess is running (1=running, 0=stopped)", + }, + ) + + // Microphone subprocess process metrics + microphoneProcessCpuPercent = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_process_cpu_percent", + Help: "CPU usage percentage of microphone input subprocess", + }, + ) + + microphoneProcessMemoryPercent = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_process_memory_percent", + Help: "Memory usage percentage of microphone input subprocess", + }, + ) + + microphoneProcessMemoryRssBytes = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_process_memory_rss_bytes", + Help: "RSS memory usage in bytes of microphone input subprocess", + }, + ) + + microphoneProcessMemoryVmsBytes = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_process_memory_vms_bytes", + Help: "VMS memory usage in bytes of microphone input subprocess", + }, + ) + + microphoneProcessRunning = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_process_running", + Help: "Whether microphone input subprocess is running (1=running, 0=stopped)", + }, + ) + + // Audio configuration metrics + audioConfigQuality = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_config_quality", + Help: "Current audio quality setting (0=Low, 1=Medium, 2=High, 3=Ultra)", + }, + ) + + audioConfigBitrate = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_config_bitrate_kbps", + Help: "Current audio bitrate in kbps", + }, + ) + + audioConfigSampleRate = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_config_sample_rate_hz", + Help: "Current audio sample rate in Hz", + }, + ) + + audioConfigChannels = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_audio_config_channels", + Help: "Current audio channel count", + }, + ) + + microphoneConfigQuality = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_config_quality", + Help: "Current microphone quality setting (0=Low, 1=Medium, 2=High, 3=Ultra)", + }, + ) + + microphoneConfigBitrate = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_config_bitrate_kbps", + Help: "Current microphone bitrate in kbps", + }, + ) + + microphoneConfigSampleRate = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_config_sample_rate_hz", + Help: "Current microphone sample rate in Hz", + }, + ) + + microphoneConfigChannels = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_microphone_config_channels", + Help: "Current microphone channel count", + }, + ) + + // Metrics update tracking + metricsUpdateMutex sync.RWMutex + lastMetricsUpdate time.Time + + // Counter value tracking (since prometheus counters don't have Get() method) + audioFramesReceivedValue int64 + audioFramesDroppedValue int64 + audioBytesProcessedValue int64 + audioConnectionDropsValue int64 + micFramesSentValue int64 + micFramesDroppedValue int64 + micBytesProcessedValue int64 + micConnectionDropsValue int64 +) + +// UpdateAudioMetrics updates Prometheus metrics with current audio data +func UpdateAudioMetrics(metrics AudioMetrics) { + metricsUpdateMutex.Lock() + defer metricsUpdateMutex.Unlock() + + // Update counters with delta values + if metrics.FramesReceived > audioFramesReceivedValue { + audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - audioFramesReceivedValue)) + audioFramesReceivedValue = metrics.FramesReceived + } + + if metrics.FramesDropped > audioFramesDroppedValue { + audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - audioFramesDroppedValue)) + audioFramesDroppedValue = metrics.FramesDropped + } + + if metrics.BytesProcessed > audioBytesProcessedValue { + audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - audioBytesProcessedValue)) + audioBytesProcessedValue = metrics.BytesProcessed + } + + if metrics.ConnectionDrops > audioConnectionDropsValue { + audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - audioConnectionDropsValue)) + audioConnectionDropsValue = metrics.ConnectionDrops + } + + // Update gauges + audioAverageLatencySeconds.Set(float64(metrics.AverageLatency.Nanoseconds()) / 1e9) + if !metrics.LastFrameTime.IsZero() { + audioLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix())) + } + + lastMetricsUpdate = time.Now() +} + +// UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data +func UpdateMicrophoneMetrics(metrics AudioInputMetrics) { + metricsUpdateMutex.Lock() + defer metricsUpdateMutex.Unlock() + + // Update counters with delta values + if metrics.FramesSent > micFramesSentValue { + microphoneFramesSentTotal.Add(float64(metrics.FramesSent - micFramesSentValue)) + micFramesSentValue = metrics.FramesSent + } + + if metrics.FramesDropped > micFramesDroppedValue { + microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - micFramesDroppedValue)) + micFramesDroppedValue = metrics.FramesDropped + } + + if metrics.BytesProcessed > micBytesProcessedValue { + microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - micBytesProcessedValue)) + micBytesProcessedValue = metrics.BytesProcessed + } + + if metrics.ConnectionDrops > micConnectionDropsValue { + microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - micConnectionDropsValue)) + micConnectionDropsValue = metrics.ConnectionDrops + } + + // Update gauges + microphoneAverageLatencySeconds.Set(float64(metrics.AverageLatency.Nanoseconds()) / 1e9) + if !metrics.LastFrameTime.IsZero() { + microphoneLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix())) + } + + lastMetricsUpdate = time.Now() +} + +// UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data +func UpdateAudioProcessMetrics(metrics ProcessMetrics, isRunning bool) { + metricsUpdateMutex.Lock() + defer metricsUpdateMutex.Unlock() + + audioProcessCpuPercent.Set(metrics.CPUPercent) + audioProcessMemoryPercent.Set(metrics.MemoryPercent) + audioProcessMemoryRssBytes.Set(float64(metrics.MemoryRSS)) + audioProcessMemoryVmsBytes.Set(float64(metrics.MemoryVMS)) + if isRunning { + audioProcessRunning.Set(1) + } else { + audioProcessRunning.Set(0) + } + + lastMetricsUpdate = time.Now() +} + +// UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data +func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) { + metricsUpdateMutex.Lock() + defer metricsUpdateMutex.Unlock() + + microphoneProcessCpuPercent.Set(metrics.CPUPercent) + microphoneProcessMemoryPercent.Set(metrics.MemoryPercent) + microphoneProcessMemoryRssBytes.Set(float64(metrics.MemoryRSS)) + microphoneProcessMemoryVmsBytes.Set(float64(metrics.MemoryVMS)) + if isRunning { + microphoneProcessRunning.Set(1) + } else { + microphoneProcessRunning.Set(0) + } + + lastMetricsUpdate = time.Now() +} + +// UpdateAudioConfigMetrics updates Prometheus metrics with audio configuration +func UpdateAudioConfigMetrics(config AudioConfig) { + metricsUpdateMutex.Lock() + defer metricsUpdateMutex.Unlock() + + audioConfigQuality.Set(float64(config.Quality)) + audioConfigBitrate.Set(float64(config.Bitrate)) + audioConfigSampleRate.Set(float64(config.SampleRate)) + audioConfigChannels.Set(float64(config.Channels)) + + lastMetricsUpdate = time.Now() +} + +// UpdateMicrophoneConfigMetrics updates Prometheus metrics with microphone configuration +func UpdateMicrophoneConfigMetrics(config AudioConfig) { + metricsUpdateMutex.Lock() + defer metricsUpdateMutex.Unlock() + + microphoneConfigQuality.Set(float64(config.Quality)) + microphoneConfigBitrate.Set(float64(config.Bitrate)) + microphoneConfigSampleRate.Set(float64(config.SampleRate)) + microphoneConfigChannels.Set(float64(config.Channels)) + + lastMetricsUpdate = time.Now() +} + +// GetLastMetricsUpdate returns the timestamp of the last metrics update +func GetLastMetricsUpdate() time.Time { + metricsUpdateMutex.RLock() + defer metricsUpdateMutex.RUnlock() + return lastMetricsUpdate +} + +// StartMetricsUpdater starts a goroutine that periodically updates Prometheus metrics +func StartMetricsUpdater() { + go func() { + ticker := time.NewTicker(5 * time.Second) // Update every 5 seconds + defer ticker.Stop() + + for range ticker.C { + // Update audio output metrics + audioMetrics := GetAudioMetrics() + UpdateAudioMetrics(audioMetrics) + + // Update microphone input metrics + micMetrics := GetAudioInputMetrics() + UpdateMicrophoneMetrics(micMetrics) + + // Update microphone subprocess process metrics + if inputSupervisor := GetAudioInputIPCSupervisor(); inputSupervisor != nil { + if processMetrics := inputSupervisor.GetProcessMetrics(); processMetrics != nil { + UpdateMicrophoneProcessMetrics(*processMetrics, inputSupervisor.IsRunning()) + } + } + + // Update audio configuration metrics + audioConfig := GetAudioConfig() + UpdateAudioConfigMetrics(audioConfig) + micConfig := GetMicrophoneConfig() + UpdateMicrophoneConfigMetrics(micConfig) + } + }() +} \ No newline at end of file diff --git a/internal/audio/process_monitor.go b/internal/audio/process_monitor.go new file mode 100644 index 0000000..1893f87 --- /dev/null +++ b/internal/audio/process_monitor.go @@ -0,0 +1,263 @@ +package audio + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" + "sync" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// ProcessMetrics represents CPU and memory usage metrics for a process +type ProcessMetrics struct { + PID int `json:"pid"` + CPUPercent float64 `json:"cpu_percent"` + MemoryRSS int64 `json:"memory_rss_bytes"` + MemoryVMS int64 `json:"memory_vms_bytes"` + MemoryPercent float64 `json:"memory_percent"` + Timestamp time.Time `json:"timestamp"` + ProcessName string `json:"process_name"` +} + +// ProcessMonitor monitors CPU and memory usage of processes +type ProcessMonitor struct { + logger zerolog.Logger + mutex sync.RWMutex + monitoredPIDs map[int]*processState + running bool + stopChan chan struct{} + metricsChan chan ProcessMetrics + updateInterval time.Duration +} + +// processState tracks the state needed for CPU calculation +type processState struct { + name string + lastCPUTime int64 + lastSysTime int64 + lastUserTime int64 + lastSample time.Time +} + +// NewProcessMonitor creates a new process monitor +func NewProcessMonitor() *ProcessMonitor { + return &ProcessMonitor{ + logger: logging.GetDefaultLogger().With().Str("component", "process-monitor").Logger(), + monitoredPIDs: make(map[int]*processState), + stopChan: make(chan struct{}), + metricsChan: make(chan ProcessMetrics, 100), + updateInterval: 2 * time.Second, // Update every 2 seconds + } +} + +// Start begins monitoring processes +func (pm *ProcessMonitor) Start() { + pm.mutex.Lock() + defer pm.mutex.Unlock() + + if pm.running { + return + } + + pm.running = true + go pm.monitorLoop() + pm.logger.Info().Msg("Process monitor started") +} + +// Stop stops monitoring processes +func (pm *ProcessMonitor) Stop() { + pm.mutex.Lock() + defer pm.mutex.Unlock() + + if !pm.running { + return + } + + pm.running = false + close(pm.stopChan) + pm.logger.Info().Msg("Process monitor stopped") +} + +// AddProcess adds a process to monitor +func (pm *ProcessMonitor) AddProcess(pid int, name string) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + + pm.monitoredPIDs[pid] = &processState{ + name: name, + lastSample: time.Now(), + } + pm.logger.Info().Int("pid", pid).Str("name", name).Msg("Added process to monitor") +} + +// RemoveProcess removes a process from monitoring +func (pm *ProcessMonitor) RemoveProcess(pid int) { + pm.mutex.Lock() + defer pm.mutex.Unlock() + + delete(pm.monitoredPIDs, pid) + pm.logger.Info().Int("pid", pid).Msg("Removed process from monitor") +} + +// GetMetricsChan returns the channel for receiving metrics +func (pm *ProcessMonitor) GetMetricsChan() <-chan ProcessMetrics { + return pm.metricsChan +} + +// GetCurrentMetrics returns current metrics for all monitored processes +func (pm *ProcessMonitor) GetCurrentMetrics() []ProcessMetrics { + pm.mutex.RLock() + defer pm.mutex.RUnlock() + + var metrics []ProcessMetrics + for pid, state := range pm.monitoredPIDs { + if metric, err := pm.collectMetrics(pid, state); err == nil { + metrics = append(metrics, metric) + } + } + return metrics +} + +// monitorLoop is the main monitoring loop +func (pm *ProcessMonitor) monitorLoop() { + ticker := time.NewTicker(pm.updateInterval) + defer ticker.Stop() + + for { + select { + case <-pm.stopChan: + return + case <-ticker.C: + pm.collectAllMetrics() + } + } +} + +// collectAllMetrics collects metrics for all monitored processes +func (pm *ProcessMonitor) collectAllMetrics() { + pm.mutex.RLock() + pids := make(map[int]*processState) + for pid, state := range pm.monitoredPIDs { + pids[pid] = state + } + pm.mutex.RUnlock() + + for pid, state := range pids { + if metric, err := pm.collectMetrics(pid, state); err == nil { + select { + case pm.metricsChan <- metric: + default: + // Channel full, skip this metric + } + } else { + // Process might have died, remove it + pm.RemoveProcess(pid) + } + } +} + +// collectMetrics collects metrics for a specific process +func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) { + now := time.Now() + metric := ProcessMetrics{ + PID: pid, + Timestamp: now, + ProcessName: state.name, + } + + // Read /proc/[pid]/stat for CPU and memory info + statPath := fmt.Sprintf("/proc/%d/stat", pid) + statData, err := os.ReadFile(statPath) + if err != nil { + return metric, fmt.Errorf("failed to read stat file: %w", err) + } + + // Parse stat file + fields := strings.Fields(string(statData)) + if len(fields) < 24 { + return metric, fmt.Errorf("invalid stat file format") + } + + // Extract CPU times (fields 13, 14 are utime, stime in clock ticks) + utime, _ := strconv.ParseInt(fields[13], 10, 64) + stime, _ := strconv.ParseInt(fields[14], 10, 64) + totalCPUTime := utime + stime + + // Extract memory info (field 22 is vsize, field 23 is rss in pages) + vsize, _ := strconv.ParseInt(fields[22], 10, 64) + rss, _ := strconv.ParseInt(fields[23], 10, 64) + + // Convert RSS from pages to bytes (assuming 4KB pages) + pageSize := int64(4096) + metric.MemoryRSS = rss * pageSize + metric.MemoryVMS = vsize + + // Calculate CPU percentage + if !state.lastSample.IsZero() { + timeDelta := now.Sub(state.lastSample).Seconds() + cpuDelta := float64(totalCPUTime - state.lastCPUTime) + + // Convert from clock ticks to seconds (assuming 100 Hz) + clockTicks := 100.0 + cpuSeconds := cpuDelta / clockTicks + + if timeDelta > 0 { + metric.CPUPercent = (cpuSeconds / timeDelta) * 100.0 + } + } + + // Calculate memory percentage (RSS / total system memory) + if totalMem := pm.getTotalMemory(); totalMem > 0 { + metric.MemoryPercent = float64(metric.MemoryRSS) / float64(totalMem) * 100.0 + } + + // Update state for next calculation + state.lastCPUTime = totalCPUTime + state.lastUserTime = utime + state.lastSysTime = stime + state.lastSample = now + + return metric, nil +} + +// getTotalMemory returns total system memory in bytes +func (pm *ProcessMonitor) getTotalMemory() int64 { + file, err := os.Open("/proc/meminfo") + if err != nil { + return 0 + } + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "MemTotal:") { + fields := strings.Fields(line) + if len(fields) >= 2 { + if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil { + return kb * 1024 // Convert KB to bytes + } + } + break + } + } + return 0 +} + +// Global process monitor instance +var globalProcessMonitor *ProcessMonitor +var processMonitorOnce sync.Once + +// GetProcessMonitor returns the global process monitor instance +func GetProcessMonitor() *ProcessMonitor { + processMonitorOnce.Do(func() { + globalProcessMonitor = NewProcessMonitor() + globalProcessMonitor.Start() + }) + return globalProcessMonitor +} \ No newline at end of file diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go index c5c49c9..3c4f478 100644 --- a/internal/audio/supervisor.go +++ b/internal/audio/supervisor.go @@ -49,6 +49,9 @@ type AudioServerSupervisor struct { processDone chan struct{} stopChan chan struct{} + // Process monitoring + processMonitor *ProcessMonitor + // Callbacks onProcessStart func(pid int) onProcessExit func(pid int, exitCode int, crashed bool) @@ -61,11 +64,12 @@ func NewAudioServerSupervisor() *AudioServerSupervisor { logger := logging.GetDefaultLogger().With().Str("component", "audio-supervisor").Logger() return &AudioServerSupervisor{ - ctx: ctx, - cancel: cancel, - logger: &logger, - processDone: make(chan struct{}), - stopChan: make(chan struct{}), + ctx: ctx, + cancel: cancel, + logger: &logger, + processDone: make(chan struct{}), + stopChan: make(chan struct{}), + processMonitor: GetProcessMonitor(), } } @@ -140,6 +144,25 @@ func (s *AudioServerSupervisor) GetLastExitInfo() (exitCode int, exitTime time.T return s.lastExitCode, s.lastExitTime } +// GetProcessMetrics returns current process metrics if the process is running +func (s *AudioServerSupervisor) GetProcessMetrics() *ProcessMetrics { + s.mutex.RLock() + pid := s.processPID + s.mutex.RUnlock() + + if pid == 0 { + return nil + } + + metrics := s.processMonitor.GetCurrentMetrics() + for _, metric := range metrics { + if metric.PID == pid { + return &metric + } + } + return nil +} + // supervisionLoop is the main supervision loop func (s *AudioServerSupervisor) supervisionLoop() { defer func() { @@ -237,6 +260,9 @@ func (s *AudioServerSupervisor) startProcess() error { s.processPID = s.cmd.Process.Pid s.logger.Info().Int("pid", s.processPID).Msg("audio server process started") + // Add process to monitoring + s.processMonitor.AddProcess(s.processPID, "audio-server") + if s.onProcessStart != nil { s.onProcessStart(s.processPID) } @@ -282,6 +308,9 @@ func (s *AudioServerSupervisor) waitForProcessExit() { s.lastExitCode = exitCode s.mutex.Unlock() + // Remove process from monitoring + s.processMonitor.RemoveProcess(pid) + if crashed { s.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio server process crashed") s.recordRestartAttempt() diff --git a/prometheus.go b/prometheus.go index 5d4c5e7..48a3fa3 100644 --- a/prometheus.go +++ b/prometheus.go @@ -1,6 +1,7 @@ package kvm import ( + "github.com/jetkvm/kvm/internal/audio" "github.com/prometheus/client_golang/prometheus" versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version" "github.com/prometheus/common/version" @@ -10,4 +11,7 @@ func initPrometheus() { // A Prometheus metrics endpoint. version.Version = builtAppVersion prometheus.MustRegister(versioncollector.NewCollector("jetkvm")) + + // Start audio metrics collection + audio.StartMetricsUpdater() } diff --git a/ui/src/components/AudioMetricsDashboard.tsx b/ui/src/components/AudioMetricsDashboard.tsx index d56506d..e32ce1e 100644 --- a/ui/src/components/AudioMetricsDashboard.tsx +++ b/ui/src/components/AudioMetricsDashboard.tsx @@ -1,6 +1,6 @@ import { useEffect, useState } from "react"; import { MdGraphicEq, MdSignalWifi4Bar, MdError, MdMic } from "react-icons/md"; -import { LuActivity, LuClock, LuHardDrive, LuSettings } from "react-icons/lu"; +import { LuActivity, LuClock, LuHardDrive, LuSettings, LuCpu, LuMemoryStick } from "react-icons/lu"; import { AudioLevelMeter } from "@components/AudioLevelMeter"; import { cx } from "@/cva.config"; @@ -27,6 +27,14 @@ interface MicrophoneMetrics { average_latency: string; } +interface ProcessMetrics { + cpu_percent: number; + memory_percent: number; + memory_rss: number; + memory_vms: number; + running: boolean; +} + interface AudioConfig { Quality: number; Bitrate: number; @@ -55,6 +63,16 @@ export default function AudioMetricsDashboard() { const [fallbackMicrophoneMetrics, setFallbackMicrophoneMetrics] = useState(null); const [fallbackConnected, setFallbackConnected] = useState(false); + // Process metrics state + const [audioProcessMetrics, setAudioProcessMetrics] = useState(null); + const [microphoneProcessMetrics, setMicrophoneProcessMetrics] = useState(null); + + // Historical data for histograms (last 60 data points, ~1 minute at 1s intervals) + const [audioCpuHistory, setAudioCpuHistory] = useState([]); + const [audioMemoryHistory, setAudioMemoryHistory] = useState([]); + const [micCpuHistory, setMicCpuHistory] = useState([]); + const [micMemoryHistory, setMicMemoryHistory] = useState([]); + // Configuration state (these don't change frequently, so we can load them once) const [config, setConfig] = useState(null); const [microphoneConfig, setMicrophoneConfig] = useState(null); @@ -124,6 +142,29 @@ export default function AudioMetricsDashboard() { setFallbackConnected(false); } + // Load audio process metrics + try { + const audioProcessResp = await api.GET("/audio/process-metrics"); + if (audioProcessResp.ok) { + const audioProcessData = await audioProcessResp.json(); + setAudioProcessMetrics(audioProcessData); + + // Update historical data for histograms (keep last 60 points) + if (audioProcessData.running) { + setAudioCpuHistory(prev => { + const newHistory = [...prev, audioProcessData.cpu_percent]; + return newHistory.slice(-60); // Keep last 60 data points + }); + setAudioMemoryHistory(prev => { + const newHistory = [...prev, audioProcessData.memory_percent]; + return newHistory.slice(-60); + }); + } + } + } catch (audioProcessError) { + console.debug("Audio process metrics not available:", audioProcessError); + } + // Load microphone metrics try { const micResp = await api.GET("/microphone/metrics"); @@ -135,6 +176,29 @@ export default function AudioMetricsDashboard() { // Microphone metrics might not be available, that's okay console.debug("Microphone metrics not available:", micError); } + + // Load microphone process metrics + try { + const micProcessResp = await api.GET("/microphone/process-metrics"); + if (micProcessResp.ok) { + const micProcessData = await micProcessResp.json(); + setMicrophoneProcessMetrics(micProcessData); + + // Update historical data for histograms (keep last 60 points) + if (micProcessData.running) { + setMicCpuHistory(prev => { + const newHistory = [...prev, micProcessData.cpu_percent]; + return newHistory.slice(-60); // Keep last 60 data points + }); + setMicMemoryHistory(prev => { + const newHistory = [...prev, micProcessData.memory_percent]; + return newHistory.slice(-60); + }); + } + } + } catch (micProcessError) { + console.debug("Microphone process metrics not available:", micProcessError); + } } catch (error) { console.error("Failed to load audio data:", error); setFallbackConnected(false); @@ -158,6 +222,18 @@ export default function AudioMetricsDashboard() { return ((metrics.frames_dropped / metrics.frames_received) * 100); }; + const formatMemory = (bytes: number) => { + if (bytes === 0) return "0 MB"; + const mb = bytes / (1024 * 1024); + if (mb < 1024) { + return `${mb.toFixed(1)} MB`; + } + const gb = mb / 1024; + return `${gb.toFixed(2)} GB`; + }; + + + const getQualityColor = (quality: number) => { switch (quality) { case 0: return "text-yellow-600 dark:text-yellow-400"; @@ -168,6 +244,53 @@ export default function AudioMetricsDashboard() { } }; + // Histogram component for displaying historical data + const Histogram = ({ data, title, unit, color }: { + data: number[], + title: string, + unit: string, + color: string + }) => { + if (data.length === 0) return null; + + const maxValue = Math.max(...data, 1); // Avoid division by zero + const minValue = Math.min(...data); + const range = maxValue - minValue; + + return ( +
+
+ + {title} + + + {data.length > 0 ? `${data[data.length - 1].toFixed(1)}${unit}` : `0${unit}`} + +
+
+ {data.slice(-30).map((value, index) => { // Show last 30 points + const height = range > 0 ? ((value - minValue) / range) * 100 : 0; + return ( +
+ ); + })} +
+
+ {minValue.toFixed(1)}{unit} + {maxValue.toFixed(1)}{unit} +
+
+ ); + }; + return (
{/* Header */} @@ -266,6 +389,97 @@ export default function AudioMetricsDashboard() { )}
+ {/* Subprocess Resource Usage - Histogram View */} +
+ {/* Audio Output Subprocess */} + {audioProcessMetrics && ( +
+
+ + + Audio Output Process + +
+
+
+ + +
+
+
+ {formatMemory(audioProcessMetrics.memory_rss)} +
+
RSS
+
+
+
+ {formatMemory(audioProcessMetrics.memory_vms)} +
+
VMS
+
+
+
+
+ )} + + {/* Microphone Input Subprocess */} + {microphoneProcessMetrics && ( +
+
+ + + Microphone Input Process + +
+
+
+ + +
+
+
+ {formatMemory(microphoneProcessMetrics.memory_rss)} +
+
RSS
+
+
+
+ {formatMemory(microphoneProcessMetrics.memory_vms)} +
+
VMS
+
+
+
+
+ )} +
+ {/* Performance Metrics */} {metrics && (
diff --git a/ui/src/hooks/useAudioEvents.ts b/ui/src/hooks/useAudioEvents.ts index 898d63a..6579448 100644 --- a/ui/src/hooks/useAudioEvents.ts +++ b/ui/src/hooks/useAudioEvents.ts @@ -62,7 +62,7 @@ export interface UseAudioEventsReturn { } // Global subscription management to prevent multiple subscriptions per WebSocket connection -let globalSubscriptionState = { +const globalSubscriptionState = { isSubscribed: false, subscriberCount: 0, connectionId: null as string | null diff --git a/ui/src/hooks/useMicrophone.ts b/ui/src/hooks/useMicrophone.ts index 164ecda..5cd5cb1 100644 --- a/ui/src/hooks/useMicrophone.ts +++ b/ui/src/hooks/useMicrophone.ts @@ -858,11 +858,15 @@ export function useMicrophone() { }, [microphoneSender, peerConnection]); const startMicrophoneDebounced = useCallback((deviceId?: string) => { - debouncedOperation(() => startMicrophone(deviceId).then(() => {}), "start"); + debouncedOperation(async () => { + await startMicrophone(deviceId).catch(console.error); + }, "start"); }, [startMicrophone, debouncedOperation]); const stopMicrophoneDebounced = useCallback(() => { - debouncedOperation(() => stopMicrophone().then(() => {}), "stop"); + debouncedOperation(async () => { + await stopMicrophone().catch(console.error); + }, "stop"); }, [stopMicrophone, debouncedOperation]); // Make debug functions available globally for console access diff --git a/web.go b/web.go index 20e0f04..66ed27a 100644 --- a/web.go +++ b/web.go @@ -422,6 +422,87 @@ func setupRouter() *gin.Engine { }) }) + // Audio subprocess process metrics endpoints + protected.GET("/audio/process-metrics", func(c *gin.Context) { + // Access the global audio supervisor from main.go + if audioSupervisor == nil { + c.JSON(200, gin.H{ + "cpu_percent": 0.0, + "memory_percent": 0.0, + "memory_rss": 0, + "memory_vms": 0, + "running": false, + }) + return + } + + metrics := audioSupervisor.GetProcessMetrics() + if metrics == nil { + c.JSON(200, gin.H{ + "cpu_percent": 0.0, + "memory_percent": 0.0, + "memory_rss": 0, + "memory_vms": 0, + "running": false, + }) + return + } + + c.JSON(200, gin.H{ + "cpu_percent": metrics.CPUPercent, + "memory_percent": metrics.MemoryPercent, + "memory_rss": metrics.MemoryRSS, + "memory_vms": metrics.MemoryVMS, + "running": true, + }) + }) + + protected.GET("/microphone/process-metrics", func(c *gin.Context) { + if currentSession == nil || currentSession.AudioInputManager == nil { + c.JSON(200, gin.H{ + "cpu_percent": 0.0, + "memory_percent": 0.0, + "memory_rss": 0, + "memory_vms": 0, + "running": false, + }) + return + } + + // Get the supervisor from the audio input manager + supervisor := currentSession.AudioInputManager.GetSupervisor() + if supervisor == nil { + c.JSON(200, gin.H{ + "cpu_percent": 0.0, + "memory_percent": 0.0, + "memory_rss": 0, + "memory_vms": 0, + "running": false, + }) + return + } + + metrics := supervisor.GetProcessMetrics() + if metrics == nil { + c.JSON(200, gin.H{ + "cpu_percent": 0.0, + "memory_percent": 0.0, + "memory_rss": 0, + "memory_vms": 0, + "running": false, + }) + return + } + + c.JSON(200, gin.H{ + "cpu_percent": metrics.CPUPercent, + "memory_percent": metrics.MemoryPercent, + "memory_rss": metrics.MemoryRSS, + "memory_vms": metrics.MemoryVMS, + "running": true, + }) + }) + protected.POST("/microphone/reset", func(c *gin.Context) { if currentSession == nil { c.JSON(400, gin.H{"error": "no active session"}) diff --git a/webrtc.go b/webrtc.go index 8c05288..7d0c52c 100644 --- a/webrtc.go +++ b/webrtc.go @@ -292,9 +292,16 @@ func (s *Session) startAudioProcessor(logger zerolog.Logger) { select { case frame := <-s.audioFrameChan: if s.AudioInputManager != nil { - err := s.AudioInputManager.WriteOpusFrame(frame) - if err != nil { - logger.Warn().Err(err).Msg("Failed to write Opus frame to audio input manager") + // Check if audio input manager is ready before processing frames + if s.AudioInputManager.IsReady() { + err := s.AudioInputManager.WriteOpusFrame(frame) + if err != nil { + logger.Warn().Err(err).Msg("Failed to write Opus frame to audio input manager") + } + } else { + // Audio input manager not ready, drop frame silently + // This prevents the "client not connected" errors during startup + logger.Debug().Msg("Audio input manager not ready, dropping frame") } } case <-s.audioStopChan: