diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 93460f0..702390f 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -32,7 +32,6 @@ type AudioConfig struct { } // AudioMetrics tracks audio performance metrics - type AudioMetrics struct { FramesReceived int64 FramesDropped int64 diff --git a/internal/audio/audio_mute.go b/internal/audio/audio_mute.go index 61d1811..bd52fa5 100644 --- a/internal/audio/audio_mute.go +++ b/internal/audio/audio_mute.go @@ -2,8 +2,6 @@ package audio import ( "sync" - - "github.com/jetkvm/kvm/internal/logging" ) var audioMuteState struct { @@ -13,9 +11,7 @@ var audioMuteState struct { func SetAudioMuted(muted bool) { audioMuteState.mu.Lock() - prev := audioMuteState.muted audioMuteState.muted = muted - logging.GetDefaultLogger().Info().Str("component", "audio").Msgf("SetAudioMuted: prev=%v, new=%v", prev, muted) audioMuteState.mu.Unlock() } diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index 65e1d5a..4888aef 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -4,65 +4,53 @@ import ( "sync" ) -// AudioBufferPool manages reusable audio buffers to reduce allocations type AudioBufferPool struct { - pool sync.Pool + pool sync.Pool + bufferSize int } -// NewAudioBufferPool creates a new buffer pool for audio frames func NewAudioBufferPool(bufferSize int) *AudioBufferPool { return &AudioBufferPool{ + bufferSize: bufferSize, pool: sync.Pool{ New: func() interface{} { - // Pre-allocate buffer with specified size - return make([]byte, bufferSize) + return make([]byte, 0, bufferSize) }, }, } } -// Get retrieves a buffer from the pool func (p *AudioBufferPool) Get() []byte { if buf := p.pool.Get(); buf != nil { - return *buf.(*[]byte) + return buf.([]byte) } - return make([]byte, 0, 1500) // fallback if pool is empty + return make([]byte, 0, p.bufferSize) } -// Put returns a buffer to the pool func (p *AudioBufferPool) Put(buf []byte) { - // Reset length but keep capacity for reuse - if cap(buf) >= 1500 { // Only pool buffers of reasonable size + if cap(buf) >= p.bufferSize { resetBuf := buf[:0] - p.pool.Put(&resetBuf) + p.pool.Put(resetBuf) } } -// Global buffer pools for different audio operations var ( - // Pool for 1500-byte audio frame buffers (Opus max frame size) - audioFramePool = NewAudioBufferPool(1500) - - // Pool for smaller control buffers + audioFramePool = NewAudioBufferPool(1500) audioControlPool = NewAudioBufferPool(64) ) -// GetAudioFrameBuffer gets a reusable buffer for audio frames func GetAudioFrameBuffer() []byte { return audioFramePool.Get() } -// PutAudioFrameBuffer returns a buffer to the frame pool func PutAudioFrameBuffer(buf []byte) { audioFramePool.Put(buf) } -// GetAudioControlBuffer gets a reusable buffer for control data func GetAudioControlBuffer() []byte { return audioControlPool.Get() } -// PutAudioControlBuffer returns a buffer to the control pool func PutAudioControlBuffer(buf []byte) { audioControlPool.Put(buf) } diff --git a/internal/audio/events.go b/internal/audio/events.go index 4b99885..6539c6a 100644 --- a/internal/audio/events.go +++ b/internal/audio/events.go @@ -204,7 +204,6 @@ func (aeb *AudioEventBroadcaster) sendInitialState(connectionID string) { aeb.sendCurrentMetrics(subscriber) } -// getMicrophoneProcessMetrics returns microphone process metrics data, always providing a valid response // convertAudioMetricsToEventData converts internal audio metrics to AudioMetricsData for events func convertAudioMetricsToEventData(metrics AudioMetrics) AudioMetricsData { return AudioMetricsData{ @@ -371,8 +370,8 @@ func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubsc // startMetricsBroadcasting starts a goroutine that periodically broadcasts metrics func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { - // Use 500ms interval to match Connection Stats sidebar frequency for smooth histogram progression - ticker := time.NewTicker(500 * time.Millisecond) + // Use 1000ms interval to match process monitor frequency for synchronized metrics + ticker := time.NewTicker(1000 * time.Millisecond) defer ticker.Stop() for range ticker.C { diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go index 4a673d9..06c5a30 100644 --- a/internal/audio/input_ipc_manager.go +++ b/internal/audio/input_ipc_manager.go @@ -1,6 +1,7 @@ package audio import ( + "context" "sync/atomic" "time" @@ -10,51 +11,59 @@ import ( // AudioInputIPCManager manages microphone input using IPC when enabled type AudioInputIPCManager struct { - // metrics MUST be first for ARM32 alignment (contains int64 fields) metrics AudioInputMetrics supervisor *AudioInputSupervisor logger zerolog.Logger running int32 + ctx context.Context + cancel context.CancelFunc } // NewAudioInputIPCManager creates a new IPC-based audio input manager func NewAudioInputIPCManager() *AudioInputIPCManager { + ctx, cancel := context.WithCancel(context.Background()) return &AudioInputIPCManager{ supervisor: NewAudioInputSupervisor(), logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(), + ctx: ctx, + cancel: cancel, } } // Start starts the IPC-based audio input system func (aim *AudioInputIPCManager) Start() error { if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) { - return nil // Already running + return nil } aim.logger.Info().Msg("Starting IPC-based audio input system") - // Start the supervisor which will launch the subprocess err := aim.supervisor.Start() if err != nil { atomic.StoreInt32(&aim.running, 0) + aim.logger.Error().Err(err).Msg("Failed to start audio input supervisor") return err } - // Send initial configuration config := InputIPCConfig{ SampleRate: 48000, Channels: 2, - FrameSize: 960, // 20ms at 48kHz + FrameSize: 960, } - // Wait briefly for the subprocess to be ready (reduced from 1 second) - time.Sleep(200 * time.Millisecond) + // Wait with timeout for subprocess readiness + select { + case <-time.After(200 * time.Millisecond): + case <-aim.ctx.Done(): + aim.supervisor.Stop() + atomic.StoreInt32(&aim.running, 0) + return aim.ctx.Err() + } err = aim.supervisor.SendConfig(config) if err != nil { - aim.logger.Warn().Err(err).Msg("Failed to send initial config to audio input server") - // Don't fail startup for config errors + aim.logger.Warn().Err(err).Msg("Failed to send initial config, will retry later") } aim.logger.Info().Msg("IPC-based audio input system started") @@ -64,14 +73,12 @@ func (aim *AudioInputIPCManager) Start() error { // Stop stops the IPC-based audio input system func (aim *AudioInputIPCManager) Stop() { if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) { - return // Already stopped + return } aim.logger.Info().Msg("Stopping IPC-based audio input system") - - // Stop the supervisor + aim.cancel() aim.supervisor.Stop() - aim.logger.Info().Msg("IPC-based audio input system stopped") } diff --git a/internal/audio/metrics.go b/internal/audio/metrics.go index 1282e14..4cfe189 100644 --- a/internal/audio/metrics.go +++ b/internal/audio/metrics.go @@ -2,6 +2,7 @@ package audio import ( "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -226,7 +227,7 @@ var ( // Metrics update tracking metricsUpdateMutex sync.RWMutex - lastMetricsUpdate time.Time + lastMetricsUpdate int64 // Counter value tracking (since prometheus counters don't have Get() method) audioFramesReceivedValue int64 @@ -241,28 +242,24 @@ var ( // UpdateAudioMetrics updates Prometheus metrics with current audio data func UpdateAudioMetrics(metrics AudioMetrics) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - // Update counters with delta values - if metrics.FramesReceived > audioFramesReceivedValue { - audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - audioFramesReceivedValue)) - audioFramesReceivedValue = metrics.FramesReceived + oldReceived := atomic.SwapInt64(&audioFramesReceivedValue, metrics.FramesReceived) + if metrics.FramesReceived > oldReceived { + audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - oldReceived)) } - if metrics.FramesDropped > audioFramesDroppedValue { - audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - audioFramesDroppedValue)) - audioFramesDroppedValue = metrics.FramesDropped + oldDropped := atomic.SwapInt64(&audioFramesDroppedValue, metrics.FramesDropped) + if metrics.FramesDropped > oldDropped { + audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped)) } - if metrics.BytesProcessed > audioBytesProcessedValue { - audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - audioBytesProcessedValue)) - audioBytesProcessedValue = metrics.BytesProcessed + oldBytes := atomic.SwapInt64(&audioBytesProcessedValue, metrics.BytesProcessed) + if metrics.BytesProcessed > oldBytes { + audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes)) } - if metrics.ConnectionDrops > audioConnectionDropsValue { - audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - audioConnectionDropsValue)) - audioConnectionDropsValue = metrics.ConnectionDrops + oldDrops := atomic.SwapInt64(&audioConnectionDropsValue, metrics.ConnectionDrops) + if metrics.ConnectionDrops > oldDrops { + audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops)) } // Update gauges @@ -271,33 +268,29 @@ func UpdateAudioMetrics(metrics AudioMetrics) { audioLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix())) } - lastMetricsUpdate = time.Now() + atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } // UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data func UpdateMicrophoneMetrics(metrics AudioInputMetrics) { - metricsUpdateMutex.Lock() - defer metricsUpdateMutex.Unlock() - - // Update counters with delta values - if metrics.FramesSent > micFramesSentValue { - microphoneFramesSentTotal.Add(float64(metrics.FramesSent - micFramesSentValue)) - micFramesSentValue = metrics.FramesSent + oldSent := atomic.SwapInt64(&micFramesSentValue, metrics.FramesSent) + if metrics.FramesSent > oldSent { + microphoneFramesSentTotal.Add(float64(metrics.FramesSent - oldSent)) } - if metrics.FramesDropped > micFramesDroppedValue { - microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - micFramesDroppedValue)) - micFramesDroppedValue = metrics.FramesDropped + oldDropped := atomic.SwapInt64(&micFramesDroppedValue, metrics.FramesDropped) + if metrics.FramesDropped > oldDropped { + microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped)) } - if metrics.BytesProcessed > micBytesProcessedValue { - microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - micBytesProcessedValue)) - micBytesProcessedValue = metrics.BytesProcessed + oldBytes := atomic.SwapInt64(&micBytesProcessedValue, metrics.BytesProcessed) + if metrics.BytesProcessed > oldBytes { + microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes)) } - if metrics.ConnectionDrops > micConnectionDropsValue { - microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - micConnectionDropsValue)) - micConnectionDropsValue = metrics.ConnectionDrops + oldDrops := atomic.SwapInt64(&micConnectionDropsValue, metrics.ConnectionDrops) + if metrics.ConnectionDrops > oldDrops { + microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops)) } // Update gauges @@ -306,7 +299,7 @@ func UpdateMicrophoneMetrics(metrics AudioInputMetrics) { microphoneLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix())) } - lastMetricsUpdate = time.Now() + atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } // UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data @@ -324,7 +317,7 @@ func UpdateAudioProcessMetrics(metrics ProcessMetrics, isRunning bool) { audioProcessRunning.Set(0) } - lastMetricsUpdate = time.Now() + atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } // UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data @@ -342,7 +335,7 @@ func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) { microphoneProcessRunning.Set(0) } - lastMetricsUpdate = time.Now() + atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } // UpdateAudioConfigMetrics updates Prometheus metrics with audio configuration @@ -355,7 +348,7 @@ func UpdateAudioConfigMetrics(config AudioConfig) { audioConfigSampleRate.Set(float64(config.SampleRate)) audioConfigChannels.Set(float64(config.Channels)) - lastMetricsUpdate = time.Now() + atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } // UpdateMicrophoneConfigMetrics updates Prometheus metrics with microphone configuration @@ -368,14 +361,13 @@ func UpdateMicrophoneConfigMetrics(config AudioConfig) { microphoneConfigSampleRate.Set(float64(config.SampleRate)) microphoneConfigChannels.Set(float64(config.Channels)) - lastMetricsUpdate = time.Now() + atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix()) } // GetLastMetricsUpdate returns the timestamp of the last metrics update func GetLastMetricsUpdate() time.Time { - metricsUpdateMutex.RLock() - defer metricsUpdateMutex.RUnlock() - return lastMetricsUpdate + timestamp := atomic.LoadInt64(&lastMetricsUpdate) + return time.Unix(timestamp, 0) } // StartMetricsUpdater starts a goroutine that periodically updates Prometheus metrics diff --git a/internal/audio/mic_contention.go b/internal/audio/mic_contention.go index 9df63e2..ef4a25f 100644 --- a/internal/audio/mic_contention.go +++ b/internal/audio/mic_contention.go @@ -6,43 +6,33 @@ import ( "unsafe" ) -// MicrophoneContentionManager provides optimized microphone operation locking -// with reduced contention using atomic operations and conditional locking +// MicrophoneContentionManager manages microphone access with cooldown periods type MicrophoneContentionManager struct { - // Atomic fields (must be 64-bit aligned on 32-bit systems) - lastOpNano int64 // Unix nanoseconds of last operation - cooldownNanos int64 // Cooldown duration in nanoseconds - operationID int64 // Incremental operation ID for tracking - - // Lock-free state flags (using atomic.Pointer for lock-free updates) - lockPtr unsafe.Pointer // *sync.Mutex - conditionally allocated + lastOpNano int64 + cooldownNanos int64 + operationID int64 + lockPtr unsafe.Pointer } -// NewMicrophoneContentionManager creates a new microphone contention manager func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentionManager { return &MicrophoneContentionManager{ cooldownNanos: int64(cooldown), } } -// OperationResult represents the result of attempting a microphone operation type OperationResult struct { Allowed bool RemainingCooldown time.Duration OperationID int64 } -// TryOperation attempts to perform a microphone operation with optimized contention handling func (mcm *MicrophoneContentionManager) TryOperation() OperationResult { now := time.Now().UnixNano() cooldown := atomic.LoadInt64(&mcm.cooldownNanos) - - // Fast path: check if we're clearly outside cooldown period using atomic read lastOp := atomic.LoadInt64(&mcm.lastOpNano) elapsed := now - lastOp if elapsed >= cooldown { - // Attempt atomic update without locking if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) { opID := atomic.AddInt64(&mcm.operationID, 1) return OperationResult{ @@ -51,16 +41,10 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult { OperationID: opID, } } - } - - // Slow path: potential contention, check remaining cooldown - currentLastOp := atomic.LoadInt64(&mcm.lastOpNano) - currentElapsed := now - currentLastOp - - if currentElapsed >= cooldown { - // Race condition: another operation might have updated lastOpNano - // Try once more with CAS - if atomic.CompareAndSwapInt64(&mcm.lastOpNano, currentLastOp, now) { + // Retry once if CAS failed + lastOp = atomic.LoadInt64(&mcm.lastOpNano) + elapsed = now - lastOp + if elapsed >= cooldown && atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) { opID := atomic.AddInt64(&mcm.operationID, 1) return OperationResult{ Allowed: true, @@ -68,12 +52,9 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult { OperationID: opID, } } - // If CAS failed, fall through to cooldown calculation - currentLastOp = atomic.LoadInt64(&mcm.lastOpNano) - currentElapsed = now - currentLastOp } - remaining := time.Duration(cooldown - currentElapsed) + remaining := time.Duration(cooldown - elapsed) if remaining < 0 { remaining = 0 } @@ -85,17 +66,14 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult { } } -// SetCooldown updates the cooldown duration atomically func (mcm *MicrophoneContentionManager) SetCooldown(cooldown time.Duration) { atomic.StoreInt64(&mcm.cooldownNanos, int64(cooldown)) } -// GetCooldown returns the current cooldown duration func (mcm *MicrophoneContentionManager) GetCooldown() time.Duration { return time.Duration(atomic.LoadInt64(&mcm.cooldownNanos)) } -// GetLastOperationTime returns the time of the last operation func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time { nanos := atomic.LoadInt64(&mcm.lastOpNano) if nanos == 0 { @@ -104,55 +82,44 @@ func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time { return time.Unix(0, nanos) } -// GetOperationCount returns the total number of successful operations func (mcm *MicrophoneContentionManager) GetOperationCount() int64 { return atomic.LoadInt64(&mcm.operationID) } -// Reset resets the contention manager state func (mcm *MicrophoneContentionManager) Reset() { atomic.StoreInt64(&mcm.lastOpNano, 0) atomic.StoreInt64(&mcm.operationID, 0) } -// Global instance for microphone contention management var ( - globalMicContentionManager unsafe.Pointer // *MicrophoneContentionManager + globalMicContentionManager unsafe.Pointer micContentionInitialized int32 ) -// GetMicrophoneContentionManager returns the global microphone contention manager func GetMicrophoneContentionManager() *MicrophoneContentionManager { ptr := atomic.LoadPointer(&globalMicContentionManager) if ptr != nil { return (*MicrophoneContentionManager)(ptr) } - // Initialize on first use if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) { manager := NewMicrophoneContentionManager(200 * time.Millisecond) atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager)) return manager } - // Another goroutine initialized it, try again ptr = atomic.LoadPointer(&globalMicContentionManager) if ptr != nil { return (*MicrophoneContentionManager)(ptr) } - // Fallback: create a new manager (should rarely happen) return NewMicrophoneContentionManager(200 * time.Millisecond) } -// TryMicrophoneOperation provides a convenient global function for microphone operations func TryMicrophoneOperation() OperationResult { - manager := GetMicrophoneContentionManager() - return manager.TryOperation() + return GetMicrophoneContentionManager().TryOperation() } -// SetMicrophoneCooldown updates the global microphone cooldown func SetMicrophoneCooldown(cooldown time.Duration) { - manager := GetMicrophoneContentionManager() - manager.SetCooldown(cooldown) + GetMicrophoneContentionManager().SetCooldown(cooldown) } diff --git a/internal/audio/output_server_main.go b/internal/audio/output_server_main.go new file mode 100644 index 0000000..7f2e17b --- /dev/null +++ b/internal/audio/output_server_main.go @@ -0,0 +1,71 @@ +package audio + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/jetkvm/kvm/internal/logging" +) + +// RunAudioOutputServer runs the audio output server subprocess +// This should be called from main() when the subprocess is detected +func RunAudioOutputServer() error { + logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger() + logger.Info().Msg("Starting audio output server subprocess") + + // Create audio server + server, err := NewAudioServer() + if err != nil { + logger.Error().Err(err).Msg("failed to create audio server") + return err + } + defer server.Close() + + // Start accepting connections + if err := server.Start(); err != nil { + logger.Error().Err(err).Msg("failed to start audio server") + return err + } + + // Initialize audio processing + err = StartNonBlockingAudioStreaming(func(frame []byte) { + if err := server.SendFrame(frame); err != nil { + logger.Warn().Err(err).Msg("failed to send audio frame") + RecordFrameDropped() + } + }) + if err != nil { + logger.Error().Err(err).Msg("failed to start audio processing") + return err + } + + logger.Info().Msg("Audio output server started, waiting for connections") + + // Set up signal handling for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Wait for shutdown signal + select { + case sig := <-sigChan: + logger.Info().Str("signal", sig.String()).Msg("Received shutdown signal") + case <-ctx.Done(): + logger.Info().Msg("Context cancelled") + } + + // Graceful shutdown + logger.Info().Msg("Shutting down audio output server") + StopNonBlockingAudioStreaming() + + // Give some time for cleanup + time.Sleep(100 * time.Millisecond) + + logger.Info().Msg("Audio output server subprocess stopped") + return nil +} diff --git a/internal/audio/process_monitor.go b/internal/audio/process_monitor.go index 6d90e06..b9d796f 100644 --- a/internal/audio/process_monitor.go +++ b/internal/audio/process_monitor.go @@ -24,7 +24,6 @@ type ProcessMetrics struct { ProcessName string `json:"process_name"` } -// ProcessMonitor monitors CPU and memory usage of processes type ProcessMonitor struct { logger zerolog.Logger mutex sync.RWMutex @@ -33,6 +32,8 @@ type ProcessMonitor struct { stopChan chan struct{} metricsChan chan ProcessMetrics updateInterval time.Duration + totalMemory int64 + memoryOnce sync.Once } // processState tracks the state needed for CPU calculation @@ -51,7 +52,7 @@ func NewProcessMonitor() *ProcessMonitor { monitoredPIDs: make(map[int]*processState), stopChan: make(chan struct{}), metricsChan: make(chan ProcessMetrics, 100), - updateInterval: 2 * time.Second, // Update every 2 seconds + updateInterval: 1000 * time.Millisecond, // Update every 1000ms to sync with websocket broadcasts } } @@ -138,30 +139,33 @@ func (pm *ProcessMonitor) monitorLoop() { } } -// collectAllMetrics collects metrics for all monitored processes func (pm *ProcessMonitor) collectAllMetrics() { pm.mutex.RLock() - pids := make(map[int]*processState) + pidsToCheck := make([]int, 0, len(pm.monitoredPIDs)) + states := make([]*processState, 0, len(pm.monitoredPIDs)) for pid, state := range pm.monitoredPIDs { - pids[pid] = state + pidsToCheck = append(pidsToCheck, pid) + states = append(states, state) } pm.mutex.RUnlock() - for pid, state := range pids { - if metric, err := pm.collectMetrics(pid, state); err == nil { + deadPIDs := make([]int, 0) + for i, pid := range pidsToCheck { + if metric, err := pm.collectMetrics(pid, states[i]); err == nil { select { case pm.metricsChan <- metric: default: - // Channel full, skip this metric } } else { - // Process might have died, remove it - pm.RemoveProcess(pid) + deadPIDs = append(deadPIDs, pid) } } + + for _, pid := range deadPIDs { + pm.RemoveProcess(pid) + } } -// collectMetrics collects metrics for a specific process func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) { now := time.Now() metric := ProcessMetrics{ @@ -170,30 +174,25 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM ProcessName: state.name, } - // Read /proc/[pid]/stat for CPU and memory info statPath := fmt.Sprintf("/proc/%d/stat", pid) statData, err := os.ReadFile(statPath) if err != nil { - return metric, fmt.Errorf("failed to read stat file: %w", err) + return metric, err } - // Parse stat file fields := strings.Fields(string(statData)) if len(fields) < 24 { - return metric, fmt.Errorf("invalid stat file format") + return metric, fmt.Errorf("invalid stat format") } - // Extract CPU times (fields 13, 14 are utime, stime in clock ticks) utime, _ := strconv.ParseInt(fields[13], 10, 64) stime, _ := strconv.ParseInt(fields[14], 10, 64) totalCPUTime := utime + stime - // Extract memory info (field 22 is vsize, field 23 is rss in pages) vsize, _ := strconv.ParseInt(fields[22], 10, 64) rss, _ := strconv.ParseInt(fields[23], 10, 64) - // Convert RSS from pages to bytes (assuming 4KB pages) - pageSize := int64(4096) + const pageSize = 4096 metric.MemoryRSS = rss * pageSize metric.MemoryVMS = vsize @@ -229,28 +228,32 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM return metric, nil } -// getTotalMemory returns total system memory in bytes func (pm *ProcessMonitor) getTotalMemory() int64 { - file, err := os.Open("/proc/meminfo") - if err != nil { - return 0 - } - defer file.Close() - - scanner := bufio.NewScanner(file) - for scanner.Scan() { - line := scanner.Text() - if strings.HasPrefix(line, "MemTotal:") { - fields := strings.Fields(line) - if len(fields) >= 2 { - if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil { - return kb * 1024 // Convert KB to bytes - } - } - break + pm.memoryOnce.Do(func() { + file, err := os.Open("/proc/meminfo") + if err != nil { + pm.totalMemory = 8 * 1024 * 1024 * 1024 // Default 8GB + return } - } - return 0 + defer file.Close() + + scanner := bufio.NewScanner(file) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "MemTotal:") { + fields := strings.Fields(line) + if len(fields) >= 2 { + if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil { + pm.totalMemory = kb * 1024 + return + } + } + break + } + } + pm.totalMemory = 8 * 1024 * 1024 * 1024 // Fallback + }) + return pm.totalMemory } // GetTotalMemory returns total system memory in bytes (public method) diff --git a/internal/audio/relay.go b/internal/audio/relay.go index 17d94c2..ca13ded 100644 --- a/internal/audio/relay.go +++ b/internal/audio/relay.go @@ -3,6 +3,7 @@ package audio import ( "context" "sync" + "time" "github.com/jetkvm/kvm/internal/logging" "github.com/pion/webrtc/v4/pkg/media" @@ -123,26 +124,34 @@ func (r *AudioRelay) UpdateTrack(audioTrack AudioTrackWriter) { r.audioTrack = audioTrack } -// relayLoop is the main relay loop that forwards frames from subprocess to WebRTC func (r *AudioRelay) relayLoop() { defer r.wg.Done() r.logger.Debug().Msg("Audio relay loop started") + const maxConsecutiveErrors = 10 + consecutiveErrors := 0 + for { select { case <-r.ctx.Done(): r.logger.Debug().Msg("Audio relay loop stopping") return default: - // Receive frame from audio server subprocess frame, err := r.client.ReceiveFrame() if err != nil { - r.logger.Error().Err(err).Msg("Failed to receive audio frame") + consecutiveErrors++ + r.logger.Error().Err(err).Int("consecutive_errors", consecutiveErrors).Msg("Failed to receive audio frame") r.incrementDropped() + + if consecutiveErrors >= maxConsecutiveErrors { + r.logger.Error().Msg("Too many consecutive errors, stopping relay") + return + } + time.Sleep(10 * time.Millisecond) continue } - // Forward frame to WebRTC + consecutiveErrors = 0 if err := r.forwardToWebRTC(frame); err != nil { r.logger.Warn().Err(err).Msg("Failed to forward frame to WebRTC") r.incrementDropped() diff --git a/main.go b/main.go index 749809a..2011cc4 100644 --- a/main.go +++ b/main.go @@ -20,43 +20,14 @@ var ( audioSupervisor *audio.AudioServerSupervisor ) +// runAudioServer is now handled by audio.RunAudioOutputServer +// This function is kept for backward compatibility but delegates to the audio package func runAudioServer() { - logger.Info().Msg("Starting audio server subprocess") - - // Create audio server - server, err := audio.NewAudioServer() + err := audio.RunAudioOutputServer() if err != nil { - logger.Error().Err(err).Msg("failed to create audio server") + logger.Error().Err(err).Msg("audio output server failed") os.Exit(1) } - defer server.Close() - - // Start accepting connections - if err := server.Start(); err != nil { - logger.Error().Err(err).Msg("failed to start audio server") - os.Exit(1) - } - - // Initialize audio processing - err = audio.StartNonBlockingAudioStreaming(func(frame []byte) { - if err := server.SendFrame(frame); err != nil { - logger.Warn().Err(err).Msg("failed to send audio frame") - audio.RecordFrameDropped() - } - }) - if err != nil { - logger.Error().Err(err).Msg("failed to start audio processing") - os.Exit(1) - } - - // Wait for termination signal - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - <-sigs - - // Cleanup - audio.StopNonBlockingAudioStreaming() - logger.Info().Msg("Audio server subprocess stopped") } func startAudioSubprocess() error { diff --git a/ui/src/components/popovers/AudioControlPopover.tsx b/ui/src/components/popovers/AudioControlPopover.tsx index 200d5a1..9dd0568 100644 --- a/ui/src/components/popovers/AudioControlPopover.tsx +++ b/ui/src/components/popovers/AudioControlPopover.tsx @@ -41,10 +41,6 @@ interface AudioConfig { FrameSize: string; } - - - - const qualityLabels = { 0: "Low (32kbps)", 1: "Medium (64kbps)", @@ -211,7 +207,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo // Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) { - console.log("Microphone operation already in progress or within cooldown, ignoring click"); return; } @@ -233,7 +228,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo // Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) { - console.log("Microphone operation already in progress or within cooldown, ignoring mute toggle"); return; } @@ -279,7 +273,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo if (videoElement && 'setSinkId' in videoElement) { try { await (videoElement as HTMLVideoElement & { setSinkId: (deviceId: string) => Promise }).setSinkId(deviceId); - console.log('Audio output device changed to:', deviceId); } catch (error: unknown) { console.error('Failed to change audio output device:', error); }