Compare commits

...

7 Commits

Author SHA1 Message Date
Alex P 88679cda2f refactor(audio): improve process monitoring with dynamic clock ticks
- Extract monitoring constants and configuration into centralized locations
- Implement dynamic clock ticks detection for more accurate CPU metrics
- Add warmup samples and bounds checking for CPU percentage calculation
- Replace hardcoded values with constants for better maintainability
2025-08-23 23:35:38 +00:00
Alex P 76174f4486 refactor(audio): improve performance and simplify code structure
- Move audio server logic to dedicated package and simplify main.go
- Optimize buffer pool implementation and remove redundant logging
- Improve process monitoring with synchronized metrics updates
- Enhance microphone contention manager with simplified logic
- Replace mutex with atomic operations for metrics tracking
2025-08-23 22:54:01 +00:00
Alex P 27a999c58a [WIP] Updates: audio output & input subprocesses memory & cpu usage 2025-08-23 21:51:24 +00:00
Alex P ddc2f90016 [WIP] Updates: audio output & input subprocesses memory & cpu usage 2025-08-23 21:36:57 +00:00
Alex P 692f7ddb2d [WIP] Updates: audio output & input subprocesses memory & cpu usage 2025-08-23 21:19:28 +00:00
Alex P 38ad145863 [WIP] Updates: audio output & input subprocesses memory & cpu usage 2025-08-23 21:06:02 +00:00
Alex P 879ea5e472 Fix: fix audio input by reverting change 2025-08-23 16:41:45 +00:00
13 changed files with 526 additions and 374 deletions

View File

@ -32,7 +32,6 @@ type AudioConfig struct {
}
// AudioMetrics tracks audio performance metrics
type AudioMetrics struct {
FramesReceived int64
FramesDropped int64

View File

@ -2,8 +2,6 @@ package audio
import (
"sync"
"github.com/jetkvm/kvm/internal/logging"
)
var audioMuteState struct {
@ -13,9 +11,7 @@ var audioMuteState struct {
func SetAudioMuted(muted bool) {
audioMuteState.mu.Lock()
prev := audioMuteState.muted
audioMuteState.muted = muted
logging.GetDefaultLogger().Info().Str("component", "audio").Msgf("SetAudioMuted: prev=%v, new=%v", prev, muted)
audioMuteState.mu.Unlock()
}

View File

@ -4,65 +4,53 @@ import (
"sync"
)
// AudioBufferPool manages reusable audio buffers to reduce allocations
type AudioBufferPool struct {
pool sync.Pool
pool sync.Pool
bufferSize int
}
// NewAudioBufferPool creates a new buffer pool for audio frames
func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
return &AudioBufferPool{
bufferSize: bufferSize,
pool: sync.Pool{
New: func() interface{} {
// Pre-allocate buffer with specified size
return make([]byte, bufferSize)
return make([]byte, 0, bufferSize)
},
},
}
}
// Get retrieves a buffer from the pool
func (p *AudioBufferPool) Get() []byte {
if buf := p.pool.Get(); buf != nil {
return *buf.(*[]byte)
}
return make([]byte, 0, 1500) // fallback if pool is empty
return make([]byte, 0, p.bufferSize)
}
// Put returns a buffer to the pool
func (p *AudioBufferPool) Put(buf []byte) {
// Reset length but keep capacity for reuse
if cap(buf) >= 1500 { // Only pool buffers of reasonable size
if cap(buf) >= p.bufferSize {
resetBuf := buf[:0]
p.pool.Put(&resetBuf)
}
}
// Global buffer pools for different audio operations
var (
// Pool for 1500-byte audio frame buffers (Opus max frame size)
audioFramePool = NewAudioBufferPool(1500)
// Pool for smaller control buffers
audioFramePool = NewAudioBufferPool(1500)
audioControlPool = NewAudioBufferPool(64)
)
// GetAudioFrameBuffer gets a reusable buffer for audio frames
func GetAudioFrameBuffer() []byte {
return audioFramePool.Get()
}
// PutAudioFrameBuffer returns a buffer to the frame pool
func PutAudioFrameBuffer(buf []byte) {
audioFramePool.Put(buf)
}
// GetAudioControlBuffer gets a reusable buffer for control data
func GetAudioControlBuffer() []byte {
return audioControlPool.Get()
}
// PutAudioControlBuffer returns a buffer to the control pool
func PutAudioControlBuffer(buf []byte) {
audioControlPool.Put(buf)
}

29
internal/audio/config.go Normal file
View File

@ -0,0 +1,29 @@
package audio
import "time"
// MonitoringConfig contains configuration constants for audio monitoring
type MonitoringConfig struct {
// MetricsUpdateInterval defines how often metrics are collected and broadcast
MetricsUpdateInterval time.Duration
}
// DefaultMonitoringConfig returns the default monitoring configuration
func DefaultMonitoringConfig() MonitoringConfig {
return MonitoringConfig{
MetricsUpdateInterval: 1000 * time.Millisecond, // 1 second interval
}
}
// Global monitoring configuration instance
var monitoringConfig = DefaultMonitoringConfig()
// GetMetricsUpdateInterval returns the current metrics update interval
func GetMetricsUpdateInterval() time.Duration {
return monitoringConfig.MetricsUpdateInterval
}
// SetMetricsUpdateInterval sets the metrics update interval
func SetMetricsUpdateInterval(interval time.Duration) {
monitoringConfig.MetricsUpdateInterval = interval
}

View File

@ -92,32 +92,26 @@ var (
audioEventOnce sync.Once
)
// initializeBroadcaster creates and initializes the audio event broadcaster
func initializeBroadcaster() {
l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger()
audioEventBroadcaster = &AudioEventBroadcaster{
subscribers: make(map[string]*AudioEventSubscriber),
logger: &l,
}
// Start metrics broadcasting goroutine
go audioEventBroadcaster.startMetricsBroadcasting()
}
// InitializeAudioEventBroadcaster initializes the global audio event broadcaster
func InitializeAudioEventBroadcaster() {
audioEventOnce.Do(func() {
l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger()
audioEventBroadcaster = &AudioEventBroadcaster{
subscribers: make(map[string]*AudioEventSubscriber),
logger: &l,
}
// Start metrics broadcasting goroutine
go audioEventBroadcaster.startMetricsBroadcasting()
})
audioEventOnce.Do(initializeBroadcaster)
}
// GetAudioEventBroadcaster returns the singleton audio event broadcaster
func GetAudioEventBroadcaster() *AudioEventBroadcaster {
audioEventOnce.Do(func() {
l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger()
audioEventBroadcaster = &AudioEventBroadcaster{
subscribers: make(map[string]*AudioEventSubscriber),
logger: &l,
}
// Start metrics broadcasting goroutine
go audioEventBroadcaster.startMetricsBroadcasting()
})
audioEventOnce.Do(initializeBroadcaster)
return audioEventBroadcaster
}
@ -157,22 +151,16 @@ func (aeb *AudioEventBroadcaster) Unsubscribe(connectionID string) {
// BroadcastAudioMuteChanged broadcasts audio mute state changes
func (aeb *AudioEventBroadcaster) BroadcastAudioMuteChanged(muted bool) {
event := AudioEvent{
Type: AudioEventMuteChanged,
Data: AudioMuteData{Muted: muted},
}
event := createAudioEvent(AudioEventMuteChanged, AudioMuteData{Muted: muted})
aeb.broadcast(event)
}
// BroadcastMicrophoneStateChanged broadcasts microphone state changes
func (aeb *AudioEventBroadcaster) BroadcastMicrophoneStateChanged(running, sessionActive bool) {
event := AudioEvent{
Type: AudioEventMicrophoneState,
Data: MicrophoneStateData{
Running: running,
SessionActive: sessionActive,
},
}
event := createAudioEvent(AudioEventMicrophoneState, MicrophoneStateData{
Running: running,
SessionActive: sessionActive,
})
aeb.broadcast(event)
}
@ -216,38 +204,151 @@ func (aeb *AudioEventBroadcaster) sendInitialState(connectionID string) {
aeb.sendCurrentMetrics(subscriber)
}
// convertAudioMetricsToEventData converts internal audio metrics to AudioMetricsData for events
func convertAudioMetricsToEventData(metrics AudioMetrics) AudioMetricsData {
return AudioMetricsData{
FramesReceived: metrics.FramesReceived,
FramesDropped: metrics.FramesDropped,
BytesProcessed: metrics.BytesProcessed,
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: metrics.ConnectionDrops,
AverageLatency: metrics.AverageLatency.String(),
}
}
// convertAudioMetricsToEventDataWithLatencyMs converts internal audio metrics to AudioMetricsData with millisecond latency formatting
func convertAudioMetricsToEventDataWithLatencyMs(metrics AudioMetrics) AudioMetricsData {
return AudioMetricsData{
FramesReceived: metrics.FramesReceived,
FramesDropped: metrics.FramesDropped,
BytesProcessed: metrics.BytesProcessed,
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: metrics.ConnectionDrops,
AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6),
}
}
// convertAudioInputMetricsToEventData converts internal audio input metrics to MicrophoneMetricsData for events
func convertAudioInputMetricsToEventData(metrics AudioInputMetrics) MicrophoneMetricsData {
return MicrophoneMetricsData{
FramesSent: metrics.FramesSent,
FramesDropped: metrics.FramesDropped,
BytesProcessed: metrics.BytesProcessed,
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: metrics.ConnectionDrops,
AverageLatency: metrics.AverageLatency.String(),
}
}
// convertAudioInputMetricsToEventDataWithLatencyMs converts internal audio input metrics to MicrophoneMetricsData with millisecond latency formatting
func convertAudioInputMetricsToEventDataWithLatencyMs(metrics AudioInputMetrics) MicrophoneMetricsData {
return MicrophoneMetricsData{
FramesSent: metrics.FramesSent,
FramesDropped: metrics.FramesDropped,
BytesProcessed: metrics.BytesProcessed,
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: metrics.ConnectionDrops,
AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6),
}
}
// convertProcessMetricsToEventData converts internal process metrics to ProcessMetricsData for events
func convertProcessMetricsToEventData(metrics ProcessMetrics, running bool) ProcessMetricsData {
return ProcessMetricsData{
PID: metrics.PID,
CPUPercent: metrics.CPUPercent,
MemoryRSS: metrics.MemoryRSS,
MemoryVMS: metrics.MemoryVMS,
MemoryPercent: metrics.MemoryPercent,
Running: running,
ProcessName: metrics.ProcessName,
}
}
// createProcessMetricsData creates ProcessMetricsData from ProcessMetrics with running status
func createProcessMetricsData(metrics *ProcessMetrics, running bool, processName string) ProcessMetricsData {
if metrics == nil {
return ProcessMetricsData{
PID: 0,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Running: false,
ProcessName: processName,
}
}
return ProcessMetricsData{
PID: metrics.PID,
CPUPercent: metrics.CPUPercent,
MemoryRSS: metrics.MemoryRSS,
MemoryVMS: metrics.MemoryVMS,
MemoryPercent: metrics.MemoryPercent,
Running: running,
ProcessName: metrics.ProcessName,
}
}
// getInactiveProcessMetrics returns ProcessMetricsData for an inactive audio input process
func getInactiveProcessMetrics() ProcessMetricsData {
return createProcessMetricsData(nil, false, "audio-input-server")
}
// getActiveAudioInputSupervisor safely retrieves the audio input supervisor if session is active
func getActiveAudioInputSupervisor() *AudioInputSupervisor {
sessionProvider := GetSessionProvider()
if !sessionProvider.IsSessionActive() {
return nil
}
inputManager := sessionProvider.GetAudioInputManager()
if inputManager == nil {
return nil
}
return inputManager.GetSupervisor()
}
// createAudioEvent creates an AudioEvent
func createAudioEvent(eventType AudioEventType, data interface{}) AudioEvent {
return AudioEvent{
Type: eventType,
Data: data,
}
}
func (aeb *AudioEventBroadcaster) getMicrophoneProcessMetrics() ProcessMetricsData {
inputSupervisor := getActiveAudioInputSupervisor()
if inputSupervisor == nil {
return getInactiveProcessMetrics()
}
processMetrics := inputSupervisor.GetProcessMetrics()
if processMetrics == nil {
return getInactiveProcessMetrics()
}
// If process is running but CPU is 0%, it means we're waiting for the second sample
// to calculate CPU percentage. Return metrics with correct running status.
if inputSupervisor.IsRunning() && processMetrics.CPUPercent == 0.0 {
return createProcessMetricsData(processMetrics, true, processMetrics.ProcessName)
}
// Subprocess is running, return actual metrics
return createProcessMetricsData(processMetrics, inputSupervisor.IsRunning(), processMetrics.ProcessName)
}
// sendCurrentMetrics sends current audio and microphone metrics to a subscriber
func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubscriber) {
// Send audio metrics
audioMetrics := GetAudioMetrics()
audioMetricsEvent := AudioEvent{
Type: AudioEventMetricsUpdate,
Data: AudioMetricsData{
FramesReceived: audioMetrics.FramesReceived,
FramesDropped: audioMetrics.FramesDropped,
BytesProcessed: audioMetrics.BytesProcessed,
LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: audioMetrics.ConnectionDrops,
AverageLatency: audioMetrics.AverageLatency.String(),
},
}
audioMetricsEvent := createAudioEvent(AudioEventMetricsUpdate, convertAudioMetricsToEventData(audioMetrics))
aeb.sendToSubscriber(subscriber, audioMetricsEvent)
// Send audio process metrics
if outputSupervisor := GetAudioOutputSupervisor(); outputSupervisor != nil {
if processMetrics := outputSupervisor.GetProcessMetrics(); processMetrics != nil {
audioProcessEvent := AudioEvent{
Type: AudioEventProcessMetrics,
Data: ProcessMetricsData{
PID: processMetrics.PID,
CPUPercent: processMetrics.CPUPercent,
MemoryRSS: processMetrics.MemoryRSS,
MemoryVMS: processMetrics.MemoryVMS,
MemoryPercent: processMetrics.MemoryPercent,
Running: outputSupervisor.IsRunning(),
ProcessName: processMetrics.ProcessName,
},
}
audioProcessEvent := createAudioEvent(AudioEventProcessMetrics, convertProcessMetricsToEventData(*processMetrics, outputSupervisor.IsRunning()))
aeb.sendToSubscriber(subscriber, audioProcessEvent)
}
}
@ -257,45 +358,20 @@ func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubsc
if sessionProvider.IsSessionActive() {
if inputManager := sessionProvider.GetAudioInputManager(); inputManager != nil {
micMetrics := inputManager.GetMetrics()
micMetricsEvent := AudioEvent{
Type: AudioEventMicrophoneMetrics,
Data: MicrophoneMetricsData{
FramesSent: micMetrics.FramesSent,
FramesDropped: micMetrics.FramesDropped,
BytesProcessed: micMetrics.BytesProcessed,
LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: micMetrics.ConnectionDrops,
AverageLatency: micMetrics.AverageLatency.String(),
},
}
micMetricsEvent := createAudioEvent(AudioEventMicrophoneMetrics, convertAudioInputMetricsToEventData(micMetrics))
aeb.sendToSubscriber(subscriber, micMetricsEvent)
}
}
// Send microphone process metrics
if inputSupervisor := GetAudioInputIPCSupervisor(); inputSupervisor != nil {
if processMetrics := inputSupervisor.GetProcessMetrics(); processMetrics != nil {
micProcessEvent := AudioEvent{
Type: AudioEventMicProcessMetrics,
Data: ProcessMetricsData{
PID: processMetrics.PID,
CPUPercent: processMetrics.CPUPercent,
MemoryRSS: processMetrics.MemoryRSS,
MemoryVMS: processMetrics.MemoryVMS,
MemoryPercent: processMetrics.MemoryPercent,
Running: inputSupervisor.IsRunning(),
ProcessName: processMetrics.ProcessName,
},
}
aeb.sendToSubscriber(subscriber, micProcessEvent)
}
}
// Send microphone process metrics (always send, even when subprocess is not running)
micProcessEvent := createAudioEvent(AudioEventMicProcessMetrics, aeb.getMicrophoneProcessMetrics())
aeb.sendToSubscriber(subscriber, micProcessEvent)
}
// startMetricsBroadcasting starts a goroutine that periodically broadcasts metrics
func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
// Use 1-second interval to match Connection Stats sidebar frequency for smooth histogram progression
ticker := time.NewTicker(1 * time.Second)
// Use centralized interval to match process monitor frequency for synchronized metrics
ticker := time.NewTicker(GetMetricsUpdateInterval())
defer ticker.Stop()
for range ticker.C {
@ -330,17 +406,7 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
// Broadcast audio metrics
audioMetrics := GetAudioMetrics()
audioMetricsEvent := AudioEvent{
Type: AudioEventMetricsUpdate,
Data: AudioMetricsData{
FramesReceived: audioMetrics.FramesReceived,
FramesDropped: audioMetrics.FramesDropped,
BytesProcessed: audioMetrics.BytesProcessed,
LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: audioMetrics.ConnectionDrops,
AverageLatency: fmt.Sprintf("%.1fms", float64(audioMetrics.AverageLatency.Nanoseconds())/1e6),
},
}
audioMetricsEvent := createAudioEvent(AudioEventMetricsUpdate, convertAudioMetricsToEventDataWithLatencyMs(audioMetrics))
aeb.broadcast(audioMetricsEvent)
// Broadcast microphone metrics if available using session provider
@ -348,17 +414,7 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
if sessionProvider.IsSessionActive() {
if inputManager := sessionProvider.GetAudioInputManager(); inputManager != nil {
micMetrics := inputManager.GetMetrics()
micMetricsEvent := AudioEvent{
Type: AudioEventMicrophoneMetrics,
Data: MicrophoneMetricsData{
FramesSent: micMetrics.FramesSent,
FramesDropped: micMetrics.FramesDropped,
BytesProcessed: micMetrics.BytesProcessed,
LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
ConnectionDrops: micMetrics.ConnectionDrops,
AverageLatency: fmt.Sprintf("%.1fms", float64(micMetrics.AverageLatency.Nanoseconds())/1e6),
},
}
micMetricsEvent := createAudioEvent(AudioEventMicrophoneMetrics, convertAudioInputMetricsToEventDataWithLatencyMs(micMetrics))
aeb.broadcast(micMetricsEvent)
}
}
@ -366,40 +422,14 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
// Broadcast audio process metrics
if outputSupervisor := GetAudioOutputSupervisor(); outputSupervisor != nil {
if processMetrics := outputSupervisor.GetProcessMetrics(); processMetrics != nil {
audioProcessEvent := AudioEvent{
Type: AudioEventProcessMetrics,
Data: ProcessMetricsData{
PID: processMetrics.PID,
CPUPercent: processMetrics.CPUPercent,
MemoryRSS: processMetrics.MemoryRSS,
MemoryVMS: processMetrics.MemoryVMS,
MemoryPercent: processMetrics.MemoryPercent,
Running: outputSupervisor.IsRunning(),
ProcessName: processMetrics.ProcessName,
},
}
audioProcessEvent := createAudioEvent(AudioEventProcessMetrics, convertProcessMetricsToEventData(*processMetrics, outputSupervisor.IsRunning()))
aeb.broadcast(audioProcessEvent)
}
}
// Broadcast microphone process metrics
if inputSupervisor := GetAudioInputIPCSupervisor(); inputSupervisor != nil {
if processMetrics := inputSupervisor.GetProcessMetrics(); processMetrics != nil {
micProcessEvent := AudioEvent{
Type: AudioEventMicProcessMetrics,
Data: ProcessMetricsData{
PID: processMetrics.PID,
CPUPercent: processMetrics.CPUPercent,
MemoryRSS: processMetrics.MemoryRSS,
MemoryVMS: processMetrics.MemoryVMS,
MemoryPercent: processMetrics.MemoryPercent,
Running: inputSupervisor.IsRunning(),
ProcessName: processMetrics.ProcessName,
},
}
aeb.broadcast(micProcessEvent)
}
}
// Broadcast microphone process metrics (always broadcast, even when subprocess is not running)
micProcessEvent := createAudioEvent(AudioEventMicProcessMetrics, aeb.getMicrophoneProcessMetrics())
aeb.broadcast(micProcessEvent)
}
}

View File

@ -1,6 +1,7 @@
package audio
import (
"context"
"sync/atomic"
"time"
@ -10,51 +11,59 @@ import (
// AudioInputIPCManager manages microphone input using IPC when enabled
type AudioInputIPCManager struct {
// metrics MUST be first for ARM32 alignment (contains int64 fields)
metrics AudioInputMetrics
supervisor *AudioInputSupervisor
logger zerolog.Logger
running int32
ctx context.Context
cancel context.CancelFunc
}
// NewAudioInputIPCManager creates a new IPC-based audio input manager
func NewAudioInputIPCManager() *AudioInputIPCManager {
ctx, cancel := context.WithCancel(context.Background())
return &AudioInputIPCManager{
supervisor: NewAudioInputSupervisor(),
logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(),
ctx: ctx,
cancel: cancel,
}
}
// Start starts the IPC-based audio input system
func (aim *AudioInputIPCManager) Start() error {
if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) {
return nil // Already running
return nil
}
aim.logger.Info().Msg("Starting IPC-based audio input system")
// Start the supervisor which will launch the subprocess
err := aim.supervisor.Start()
if err != nil {
atomic.StoreInt32(&aim.running, 0)
aim.logger.Error().Err(err).Msg("Failed to start audio input supervisor")
return err
}
// Send initial configuration
config := InputIPCConfig{
SampleRate: 48000,
Channels: 2,
FrameSize: 960, // 20ms at 48kHz
FrameSize: 960,
}
// Wait briefly for the subprocess to be ready (reduced from 1 second)
time.Sleep(200 * time.Millisecond)
// Wait with timeout for subprocess readiness
select {
case <-time.After(200 * time.Millisecond):
case <-aim.ctx.Done():
aim.supervisor.Stop()
atomic.StoreInt32(&aim.running, 0)
return aim.ctx.Err()
}
err = aim.supervisor.SendConfig(config)
if err != nil {
aim.logger.Warn().Err(err).Msg("Failed to send initial config to audio input server")
// Don't fail startup for config errors
aim.logger.Warn().Err(err).Msg("Failed to send initial config, will retry later")
}
aim.logger.Info().Msg("IPC-based audio input system started")
@ -64,14 +73,12 @@ func (aim *AudioInputIPCManager) Start() error {
// Stop stops the IPC-based audio input system
func (aim *AudioInputIPCManager) Stop() {
if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) {
return // Already stopped
return
}
aim.logger.Info().Msg("Stopping IPC-based audio input system")
// Stop the supervisor
aim.cancel()
aim.supervisor.Stop()
aim.logger.Info().Msg("IPC-based audio input system stopped")
}

View File

@ -2,6 +2,7 @@ package audio
import (
"sync"
"sync/atomic"
"time"
"github.com/prometheus/client_golang/prometheus"
@ -226,7 +227,7 @@ var (
// Metrics update tracking
metricsUpdateMutex sync.RWMutex
lastMetricsUpdate time.Time
lastMetricsUpdate int64
// Counter value tracking (since prometheus counters don't have Get() method)
audioFramesReceivedValue int64
@ -241,28 +242,24 @@ var (
// UpdateAudioMetrics updates Prometheus metrics with current audio data
func UpdateAudioMetrics(metrics AudioMetrics) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
// Update counters with delta values
if metrics.FramesReceived > audioFramesReceivedValue {
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - audioFramesReceivedValue))
audioFramesReceivedValue = metrics.FramesReceived
oldReceived := atomic.SwapInt64(&audioFramesReceivedValue, metrics.FramesReceived)
if metrics.FramesReceived > oldReceived {
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - oldReceived))
}
if metrics.FramesDropped > audioFramesDroppedValue {
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - audioFramesDroppedValue))
audioFramesDroppedValue = metrics.FramesDropped
oldDropped := atomic.SwapInt64(&audioFramesDroppedValue, metrics.FramesDropped)
if metrics.FramesDropped > oldDropped {
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
}
if metrics.BytesProcessed > audioBytesProcessedValue {
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - audioBytesProcessedValue))
audioBytesProcessedValue = metrics.BytesProcessed
oldBytes := atomic.SwapInt64(&audioBytesProcessedValue, metrics.BytesProcessed)
if metrics.BytesProcessed > oldBytes {
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
}
if metrics.ConnectionDrops > audioConnectionDropsValue {
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - audioConnectionDropsValue))
audioConnectionDropsValue = metrics.ConnectionDrops
oldDrops := atomic.SwapInt64(&audioConnectionDropsValue, metrics.ConnectionDrops)
if metrics.ConnectionDrops > oldDrops {
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
}
// Update gauges
@ -271,33 +268,29 @@ func UpdateAudioMetrics(metrics AudioMetrics) {
audioLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix()))
}
lastMetricsUpdate = time.Now()
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data
func UpdateMicrophoneMetrics(metrics AudioInputMetrics) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
// Update counters with delta values
if metrics.FramesSent > micFramesSentValue {
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - micFramesSentValue))
micFramesSentValue = metrics.FramesSent
oldSent := atomic.SwapInt64(&micFramesSentValue, metrics.FramesSent)
if metrics.FramesSent > oldSent {
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - oldSent))
}
if metrics.FramesDropped > micFramesDroppedValue {
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - micFramesDroppedValue))
micFramesDroppedValue = metrics.FramesDropped
oldDropped := atomic.SwapInt64(&micFramesDroppedValue, metrics.FramesDropped)
if metrics.FramesDropped > oldDropped {
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
}
if metrics.BytesProcessed > micBytesProcessedValue {
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - micBytesProcessedValue))
micBytesProcessedValue = metrics.BytesProcessed
oldBytes := atomic.SwapInt64(&micBytesProcessedValue, metrics.BytesProcessed)
if metrics.BytesProcessed > oldBytes {
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
}
if metrics.ConnectionDrops > micConnectionDropsValue {
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - micConnectionDropsValue))
micConnectionDropsValue = metrics.ConnectionDrops
oldDrops := atomic.SwapInt64(&micConnectionDropsValue, metrics.ConnectionDrops)
if metrics.ConnectionDrops > oldDrops {
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
}
// Update gauges
@ -306,7 +299,7 @@ func UpdateMicrophoneMetrics(metrics AudioInputMetrics) {
microphoneLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix()))
}
lastMetricsUpdate = time.Now()
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data
@ -324,7 +317,7 @@ func UpdateAudioProcessMetrics(metrics ProcessMetrics, isRunning bool) {
audioProcessRunning.Set(0)
}
lastMetricsUpdate = time.Now()
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data
@ -342,7 +335,7 @@ func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) {
microphoneProcessRunning.Set(0)
}
lastMetricsUpdate = time.Now()
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateAudioConfigMetrics updates Prometheus metrics with audio configuration
@ -355,7 +348,7 @@ func UpdateAudioConfigMetrics(config AudioConfig) {
audioConfigSampleRate.Set(float64(config.SampleRate))
audioConfigChannels.Set(float64(config.Channels))
lastMetricsUpdate = time.Now()
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateMicrophoneConfigMetrics updates Prometheus metrics with microphone configuration
@ -368,14 +361,13 @@ func UpdateMicrophoneConfigMetrics(config AudioConfig) {
microphoneConfigSampleRate.Set(float64(config.SampleRate))
microphoneConfigChannels.Set(float64(config.Channels))
lastMetricsUpdate = time.Now()
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// GetLastMetricsUpdate returns the timestamp of the last metrics update
func GetLastMetricsUpdate() time.Time {
metricsUpdateMutex.RLock()
defer metricsUpdateMutex.RUnlock()
return lastMetricsUpdate
timestamp := atomic.LoadInt64(&lastMetricsUpdate)
return time.Unix(timestamp, 0)
}
// StartMetricsUpdater starts a goroutine that periodically updates Prometheus metrics

View File

@ -6,43 +6,33 @@ import (
"unsafe"
)
// MicrophoneContentionManager provides optimized microphone operation locking
// with reduced contention using atomic operations and conditional locking
// MicrophoneContentionManager manages microphone access with cooldown periods
type MicrophoneContentionManager struct {
// Atomic fields (must be 64-bit aligned on 32-bit systems)
lastOpNano int64 // Unix nanoseconds of last operation
cooldownNanos int64 // Cooldown duration in nanoseconds
operationID int64 // Incremental operation ID for tracking
// Lock-free state flags (using atomic.Pointer for lock-free updates)
lockPtr unsafe.Pointer // *sync.Mutex - conditionally allocated
lastOpNano int64
cooldownNanos int64
operationID int64
lockPtr unsafe.Pointer
}
// NewMicrophoneContentionManager creates a new microphone contention manager
func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentionManager {
return &MicrophoneContentionManager{
cooldownNanos: int64(cooldown),
}
}
// OperationResult represents the result of attempting a microphone operation
type OperationResult struct {
Allowed bool
RemainingCooldown time.Duration
OperationID int64
}
// TryOperation attempts to perform a microphone operation with optimized contention handling
func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
now := time.Now().UnixNano()
cooldown := atomic.LoadInt64(&mcm.cooldownNanos)
// Fast path: check if we're clearly outside cooldown period using atomic read
lastOp := atomic.LoadInt64(&mcm.lastOpNano)
elapsed := now - lastOp
if elapsed >= cooldown {
// Attempt atomic update without locking
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
opID := atomic.AddInt64(&mcm.operationID, 1)
return OperationResult{
@ -51,16 +41,10 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
OperationID: opID,
}
}
}
// Slow path: potential contention, check remaining cooldown
currentLastOp := atomic.LoadInt64(&mcm.lastOpNano)
currentElapsed := now - currentLastOp
if currentElapsed >= cooldown {
// Race condition: another operation might have updated lastOpNano
// Try once more with CAS
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, currentLastOp, now) {
// Retry once if CAS failed
lastOp = atomic.LoadInt64(&mcm.lastOpNano)
elapsed = now - lastOp
if elapsed >= cooldown && atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
opID := atomic.AddInt64(&mcm.operationID, 1)
return OperationResult{
Allowed: true,
@ -68,12 +52,9 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
OperationID: opID,
}
}
// If CAS failed, fall through to cooldown calculation
currentLastOp = atomic.LoadInt64(&mcm.lastOpNano)
currentElapsed = now - currentLastOp
}
remaining := time.Duration(cooldown - currentElapsed)
remaining := time.Duration(cooldown - elapsed)
if remaining < 0 {
remaining = 0
}
@ -85,17 +66,14 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
}
}
// SetCooldown updates the cooldown duration atomically
func (mcm *MicrophoneContentionManager) SetCooldown(cooldown time.Duration) {
atomic.StoreInt64(&mcm.cooldownNanos, int64(cooldown))
}
// GetCooldown returns the current cooldown duration
func (mcm *MicrophoneContentionManager) GetCooldown() time.Duration {
return time.Duration(atomic.LoadInt64(&mcm.cooldownNanos))
}
// GetLastOperationTime returns the time of the last operation
func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time {
nanos := atomic.LoadInt64(&mcm.lastOpNano)
if nanos == 0 {
@ -104,55 +82,44 @@ func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time {
return time.Unix(0, nanos)
}
// GetOperationCount returns the total number of successful operations
func (mcm *MicrophoneContentionManager) GetOperationCount() int64 {
return atomic.LoadInt64(&mcm.operationID)
}
// Reset resets the contention manager state
func (mcm *MicrophoneContentionManager) Reset() {
atomic.StoreInt64(&mcm.lastOpNano, 0)
atomic.StoreInt64(&mcm.operationID, 0)
}
// Global instance for microphone contention management
var (
globalMicContentionManager unsafe.Pointer // *MicrophoneContentionManager
globalMicContentionManager unsafe.Pointer
micContentionInitialized int32
)
// GetMicrophoneContentionManager returns the global microphone contention manager
func GetMicrophoneContentionManager() *MicrophoneContentionManager {
ptr := atomic.LoadPointer(&globalMicContentionManager)
if ptr != nil {
return (*MicrophoneContentionManager)(ptr)
}
// Initialize on first use
if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) {
manager := NewMicrophoneContentionManager(200 * time.Millisecond)
atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager))
return manager
}
// Another goroutine initialized it, try again
ptr = atomic.LoadPointer(&globalMicContentionManager)
if ptr != nil {
return (*MicrophoneContentionManager)(ptr)
}
// Fallback: create a new manager (should rarely happen)
return NewMicrophoneContentionManager(200 * time.Millisecond)
}
// TryMicrophoneOperation provides a convenient global function for microphone operations
func TryMicrophoneOperation() OperationResult {
manager := GetMicrophoneContentionManager()
return manager.TryOperation()
return GetMicrophoneContentionManager().TryOperation()
}
// SetMicrophoneCooldown updates the global microphone cooldown
func SetMicrophoneCooldown(cooldown time.Duration) {
manager := GetMicrophoneContentionManager()
manager.SetCooldown(cooldown)
GetMicrophoneContentionManager().SetCooldown(cooldown)
}

View File

@ -0,0 +1,71 @@
package audio
import (
"context"
"os"
"os/signal"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
// RunAudioOutputServer runs the audio output server subprocess
// This should be called from main() when the subprocess is detected
func RunAudioOutputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger()
logger.Info().Msg("Starting audio output server subprocess")
// Create audio server
server, err := NewAudioServer()
if err != nil {
logger.Error().Err(err).Msg("failed to create audio server")
return err
}
defer server.Close()
// Start accepting connections
if err := server.Start(); err != nil {
logger.Error().Err(err).Msg("failed to start audio server")
return err
}
// Initialize audio processing
err = StartNonBlockingAudioStreaming(func(frame []byte) {
if err := server.SendFrame(frame); err != nil {
logger.Warn().Err(err).Msg("failed to send audio frame")
RecordFrameDropped()
}
})
if err != nil {
logger.Error().Err(err).Msg("failed to start audio processing")
return err
}
logger.Info().Msg("Audio output server started, waiting for connections")
// Set up signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Wait for shutdown signal
select {
case sig := <-sigChan:
logger.Info().Str("signal", sig.String()).Msg("Received shutdown signal")
case <-ctx.Done():
logger.Info().Msg("Context cancelled")
}
// Graceful shutdown
logger.Info().Msg("Shutting down audio output server")
StopNonBlockingAudioStreaming()
// Give some time for cleanup
time.Sleep(100 * time.Millisecond)
logger.Info().Msg("Audio output server subprocess stopped")
return nil
}

View File

@ -13,6 +13,28 @@ import (
"github.com/rs/zerolog"
)
// Constants for process monitoring
const (
// System constants
pageSize = 4096
maxCPUPercent = 100.0
minCPUPercent = 0.01
defaultClockTicks = 250.0 // Common for embedded ARM systems
defaultMemoryGB = 8
// Monitoring thresholds
maxWarmupSamples = 3
warmupCPUSamples = 2
logThrottleInterval = 10
// Channel buffer size
metricsChannelBuffer = 100
// Clock tick detection ranges
minValidClockTicks = 50
maxValidClockTicks = 1000
)
// ProcessMetrics represents CPU and memory usage metrics for a process
type ProcessMetrics struct {
PID int `json:"pid"`
@ -24,7 +46,6 @@ type ProcessMetrics struct {
ProcessName string `json:"process_name"`
}
// ProcessMonitor monitors CPU and memory usage of processes
type ProcessMonitor struct {
logger zerolog.Logger
mutex sync.RWMutex
@ -33,15 +54,20 @@ type ProcessMonitor struct {
stopChan chan struct{}
metricsChan chan ProcessMetrics
updateInterval time.Duration
totalMemory int64
memoryOnce sync.Once
clockTicks float64
clockTicksOnce sync.Once
}
// processState tracks the state needed for CPU calculation
type processState struct {
name string
lastCPUTime int64
lastSysTime int64
lastUserTime int64
lastSample time.Time
name string
lastCPUTime int64
lastSysTime int64
lastUserTime int64
lastSample time.Time
warmupSamples int
}
// NewProcessMonitor creates a new process monitor
@ -50,8 +76,8 @@ func NewProcessMonitor() *ProcessMonitor {
logger: logging.GetDefaultLogger().With().Str("component", "process-monitor").Logger(),
monitoredPIDs: make(map[int]*processState),
stopChan: make(chan struct{}),
metricsChan: make(chan ProcessMetrics, 100),
updateInterval: 2 * time.Second, // Update every 2 seconds
metricsChan: make(chan ProcessMetrics, metricsChannelBuffer),
updateInterval: GetMetricsUpdateInterval(),
}
}
@ -138,30 +164,33 @@ func (pm *ProcessMonitor) monitorLoop() {
}
}
// collectAllMetrics collects metrics for all monitored processes
func (pm *ProcessMonitor) collectAllMetrics() {
pm.mutex.RLock()
pids := make(map[int]*processState)
pidsToCheck := make([]int, 0, len(pm.monitoredPIDs))
states := make([]*processState, 0, len(pm.monitoredPIDs))
for pid, state := range pm.monitoredPIDs {
pids[pid] = state
pidsToCheck = append(pidsToCheck, pid)
states = append(states, state)
}
pm.mutex.RUnlock()
for pid, state := range pids {
if metric, err := pm.collectMetrics(pid, state); err == nil {
deadPIDs := make([]int, 0)
for i, pid := range pidsToCheck {
if metric, err := pm.collectMetrics(pid, states[i]); err == nil {
select {
case pm.metricsChan <- metric:
default:
// Channel full, skip this metric
}
} else {
// Process might have died, remove it
pm.RemoveProcess(pid)
deadPIDs = append(deadPIDs, pid)
}
}
for _, pid := range deadPIDs {
pm.RemoveProcess(pid)
}
}
// collectMetrics collects metrics for a specific process
func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) {
now := time.Now()
metric := ProcessMetrics{
@ -170,49 +199,33 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM
ProcessName: state.name,
}
// Read /proc/[pid]/stat for CPU and memory info
statPath := fmt.Sprintf("/proc/%d/stat", pid)
statData, err := os.ReadFile(statPath)
if err != nil {
return metric, fmt.Errorf("failed to read stat file: %w", err)
return metric, err
}
// Parse stat file
fields := strings.Fields(string(statData))
if len(fields) < 24 {
return metric, fmt.Errorf("invalid stat file format")
return metric, fmt.Errorf("invalid stat format")
}
// Extract CPU times (fields 13, 14 are utime, stime in clock ticks)
utime, _ := strconv.ParseInt(fields[13], 10, 64)
stime, _ := strconv.ParseInt(fields[14], 10, 64)
totalCPUTime := utime + stime
// Extract memory info (field 22 is vsize, field 23 is rss in pages)
vsize, _ := strconv.ParseInt(fields[22], 10, 64)
rss, _ := strconv.ParseInt(fields[23], 10, 64)
// Convert RSS from pages to bytes (assuming 4KB pages)
pageSize := int64(4096)
metric.MemoryRSS = rss * pageSize
metric.MemoryVMS = vsize
// Calculate CPU percentage
if !state.lastSample.IsZero() {
timeDelta := now.Sub(state.lastSample).Seconds()
cpuDelta := float64(totalCPUTime - state.lastCPUTime)
metric.CPUPercent = pm.calculateCPUPercent(totalCPUTime, state, now)
// Convert from clock ticks to seconds (assuming 100 Hz)
clockTicks := 100.0
cpuSeconds := cpuDelta / clockTicks
if timeDelta > 0 {
metric.CPUPercent = (cpuSeconds / timeDelta) * 100.0
// Cap CPU percentage at 100% to handle multi-core usage
if metric.CPUPercent > 100.0 {
metric.CPUPercent = 100.0
}
}
// Increment warmup counter
if state.warmupSamples < maxWarmupSamples {
state.warmupSamples++
}
// Calculate memory percentage (RSS / total system memory)
@ -229,28 +242,127 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM
return metric, nil
}
// getTotalMemory returns total system memory in bytes
func (pm *ProcessMonitor) getTotalMemory() int64 {
file, err := os.Open("/proc/meminfo")
if err != nil {
return 0
// calculateCPUPercent calculates CPU percentage for a process
func (pm *ProcessMonitor) calculateCPUPercent(totalCPUTime int64, state *processState, now time.Time) float64 {
if state.lastSample.IsZero() {
// First sample - initialize baseline
state.warmupSamples = 0
return 0.0
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "MemTotal:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
return kb * 1024 // Convert KB to bytes
timeDelta := now.Sub(state.lastSample).Seconds()
cpuDelta := float64(totalCPUTime - state.lastCPUTime)
if timeDelta <= 0 {
return 0.0
}
if cpuDelta > 0 {
// Convert from clock ticks to seconds using actual system clock ticks
clockTicks := pm.getClockTicks()
cpuSeconds := cpuDelta / clockTicks
cpuPercent := (cpuSeconds / timeDelta) * 100.0
// Apply bounds
if cpuPercent > maxCPUPercent {
cpuPercent = maxCPUPercent
}
if cpuPercent < minCPUPercent {
cpuPercent = minCPUPercent
}
return cpuPercent
}
// No CPU delta - process was idle
if state.warmupSamples < warmupCPUSamples {
// During warmup, provide a small non-zero value to indicate process is alive
return minCPUPercent
}
return 0.0
}
func (pm *ProcessMonitor) getClockTicks() float64 {
pm.clockTicksOnce.Do(func() {
// Try to detect actual clock ticks from kernel boot parameters or /proc/stat
if data, err := os.ReadFile("/proc/cmdline"); err == nil {
// Look for HZ parameter in kernel command line
cmdline := string(data)
if strings.Contains(cmdline, "HZ=") {
fields := strings.Fields(cmdline)
for _, field := range fields {
if strings.HasPrefix(field, "HZ=") {
if hz, err := strconv.ParseFloat(field[3:], 64); err == nil && hz > 0 {
pm.clockTicks = hz
return
}
}
}
}
break
}
}
return 0
// Try reading from /proc/timer_list for more accurate detection
if data, err := os.ReadFile("/proc/timer_list"); err == nil {
timer := string(data)
// Look for tick device frequency
lines := strings.Split(timer, "\n")
for _, line := range lines {
if strings.Contains(line, "tick_period:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
if period, err := strconv.ParseInt(fields[1], 10, 64); err == nil && period > 0 {
// Convert nanoseconds to Hz
hz := 1000000000.0 / float64(period)
if hz >= minValidClockTicks && hz <= maxValidClockTicks {
pm.clockTicks = hz
return
}
}
}
}
}
}
// Fallback: Most embedded ARM systems (like jetKVM) use 250 Hz or 1000 Hz
// rather than the traditional 100 Hz
pm.clockTicks = defaultClockTicks
pm.logger.Warn().Float64("clock_ticks", pm.clockTicks).Msg("Using fallback clock ticks value")
// Log successful detection for non-fallback values
if pm.clockTicks != defaultClockTicks {
pm.logger.Info().Float64("clock_ticks", pm.clockTicks).Msg("Detected system clock ticks")
}
})
return pm.clockTicks
}
func (pm *ProcessMonitor) getTotalMemory() int64 {
pm.memoryOnce.Do(func() {
file, err := os.Open("/proc/meminfo")
if err != nil {
pm.totalMemory = defaultMemoryGB * 1024 * 1024 * 1024
return
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "MemTotal:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
pm.totalMemory = kb * 1024
return
}
}
break
}
}
pm.totalMemory = defaultMemoryGB * 1024 * 1024 * 1024 // Fallback
})
return pm.totalMemory
}
// GetTotalMemory returns total system memory in bytes (public method)

View File

@ -3,6 +3,7 @@ package audio
import (
"context"
"sync"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/pion/webrtc/v4/pkg/media"
@ -123,26 +124,34 @@ func (r *AudioRelay) UpdateTrack(audioTrack AudioTrackWriter) {
r.audioTrack = audioTrack
}
// relayLoop is the main relay loop that forwards frames from subprocess to WebRTC
func (r *AudioRelay) relayLoop() {
defer r.wg.Done()
r.logger.Debug().Msg("Audio relay loop started")
const maxConsecutiveErrors = 10
consecutiveErrors := 0
for {
select {
case <-r.ctx.Done():
r.logger.Debug().Msg("Audio relay loop stopping")
return
default:
// Receive frame from audio server subprocess
frame, err := r.client.ReceiveFrame()
if err != nil {
r.logger.Error().Err(err).Msg("Failed to receive audio frame")
consecutiveErrors++
r.logger.Error().Err(err).Int("consecutive_errors", consecutiveErrors).Msg("Failed to receive audio frame")
r.incrementDropped()
if consecutiveErrors >= maxConsecutiveErrors {
r.logger.Error().Msg("Too many consecutive errors, stopping relay")
return
}
time.Sleep(10 * time.Millisecond)
continue
}
// Forward frame to WebRTC
consecutiveErrors = 0
if err := r.forwardToWebRTC(frame); err != nil {
r.logger.Warn().Err(err).Msg("Failed to forward frame to WebRTC")
r.incrementDropped()

49
main.go
View File

@ -20,43 +20,14 @@ var (
audioSupervisor *audio.AudioServerSupervisor
)
// runAudioServer is now handled by audio.RunAudioOutputServer
// This function is kept for backward compatibility but delegates to the audio package
func runAudioServer() {
logger.Info().Msg("Starting audio server subprocess")
// Create audio server
server, err := audio.NewAudioServer()
err := audio.RunAudioOutputServer()
if err != nil {
logger.Error().Err(err).Msg("failed to create audio server")
logger.Error().Err(err).Msg("audio output server failed")
os.Exit(1)
}
defer server.Close()
// Start accepting connections
if err := server.Start(); err != nil {
logger.Error().Err(err).Msg("failed to start audio server")
os.Exit(1)
}
// Initialize audio processing
err = audio.StartNonBlockingAudioStreaming(func(frame []byte) {
if err := server.SendFrame(frame); err != nil {
logger.Warn().Err(err).Msg("failed to send audio frame")
audio.RecordFrameDropped()
}
})
if err != nil {
logger.Error().Err(err).Msg("failed to start audio processing")
os.Exit(1)
}
// Wait for termination signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
// Cleanup
audio.StopNonBlockingAudioStreaming()
logger.Info().Msg("Audio server subprocess stopped")
}
func startAudioSubprocess() error {
@ -212,14 +183,6 @@ func Main(audioServer bool, audioInputServer bool) {
audio.InitializeAudioEventBroadcaster()
logger.Info().Msg("audio event broadcaster initialized")
// Start audio input system for microphone processing
err = audio.StartAudioInput()
if err != nil {
logger.Warn().Err(err).Msg("failed to start audio input system")
} else {
logger.Info().Msg("audio input system started")
}
if err := setInitialVirtualMediaState(); err != nil {
logger.Warn().Err(err).Msg("failed to set initial virtual media state")
}
@ -272,10 +235,6 @@ func Main(audioServer bool, audioInputServer bool) {
// Stop audio subprocess and wait for cleanup
if !isAudioServer {
// Stop audio input system
logger.Info().Msg("stopping audio input system")
audio.StopAudioInput()
if audioSupervisor != nil {
logger.Info().Msg("stopping audio supervisor")
if err := audioSupervisor.Stop(); err != nil {

View File

@ -41,10 +41,6 @@ interface AudioConfig {
FrameSize: string;
}
const qualityLabels = {
0: "Low (32kbps)",
1: "Medium (64kbps)",
@ -211,7 +207,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
console.log("Microphone operation already in progress or within cooldown, ignoring click");
return;
}
@ -233,7 +228,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
console.log("Microphone operation already in progress or within cooldown, ignoring mute toggle");
return;
}
@ -279,7 +273,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
if (videoElement && 'setSinkId' in videoElement) {
try {
await (videoElement as HTMLVideoElement & { setSinkId: (deviceId: string) => Promise<void> }).setSinkId(deviceId);
console.log('Audio output device changed to:', deviceId);
} catch (error: unknown) {
console.error('Failed to change audio output device:', error);
}