refactor(audio): improve performance and simplify code structure

- Move audio server logic to dedicated package and simplify main.go
- Optimize buffer pool implementation and remove redundant logging
- Improve process monitoring with synchronized metrics updates
- Enhance microphone contention manager with simplified logic
- Replace mutex with atomic operations for metrics tracking
This commit is contained in:
Alex P 2025-08-23 22:54:01 +00:00
parent 27a999c58a
commit 76174f4486
12 changed files with 208 additions and 213 deletions

View File

@ -32,7 +32,6 @@ type AudioConfig struct {
} }
// AudioMetrics tracks audio performance metrics // AudioMetrics tracks audio performance metrics
type AudioMetrics struct { type AudioMetrics struct {
FramesReceived int64 FramesReceived int64
FramesDropped int64 FramesDropped int64

View File

@ -2,8 +2,6 @@ package audio
import ( import (
"sync" "sync"
"github.com/jetkvm/kvm/internal/logging"
) )
var audioMuteState struct { var audioMuteState struct {
@ -13,9 +11,7 @@ var audioMuteState struct {
func SetAudioMuted(muted bool) { func SetAudioMuted(muted bool) {
audioMuteState.mu.Lock() audioMuteState.mu.Lock()
prev := audioMuteState.muted
audioMuteState.muted = muted audioMuteState.muted = muted
logging.GetDefaultLogger().Info().Str("component", "audio").Msgf("SetAudioMuted: prev=%v, new=%v", prev, muted)
audioMuteState.mu.Unlock() audioMuteState.mu.Unlock()
} }

View File

@ -4,65 +4,53 @@ import (
"sync" "sync"
) )
// AudioBufferPool manages reusable audio buffers to reduce allocations
type AudioBufferPool struct { type AudioBufferPool struct {
pool sync.Pool pool sync.Pool
bufferSize int
} }
// NewAudioBufferPool creates a new buffer pool for audio frames
func NewAudioBufferPool(bufferSize int) *AudioBufferPool { func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
return &AudioBufferPool{ return &AudioBufferPool{
bufferSize: bufferSize,
pool: sync.Pool{ pool: sync.Pool{
New: func() interface{} { New: func() interface{} {
// Pre-allocate buffer with specified size return make([]byte, 0, bufferSize)
return make([]byte, bufferSize)
}, },
}, },
} }
} }
// Get retrieves a buffer from the pool
func (p *AudioBufferPool) Get() []byte { func (p *AudioBufferPool) Get() []byte {
if buf := p.pool.Get(); buf != nil { if buf := p.pool.Get(); buf != nil {
return *buf.(*[]byte) return buf.([]byte)
} }
return make([]byte, 0, 1500) // fallback if pool is empty return make([]byte, 0, p.bufferSize)
} }
// Put returns a buffer to the pool
func (p *AudioBufferPool) Put(buf []byte) { func (p *AudioBufferPool) Put(buf []byte) {
// Reset length but keep capacity for reuse if cap(buf) >= p.bufferSize {
if cap(buf) >= 1500 { // Only pool buffers of reasonable size
resetBuf := buf[:0] resetBuf := buf[:0]
p.pool.Put(&resetBuf) p.pool.Put(resetBuf)
} }
} }
// Global buffer pools for different audio operations
var ( var (
// Pool for 1500-byte audio frame buffers (Opus max frame size) audioFramePool = NewAudioBufferPool(1500)
audioFramePool = NewAudioBufferPool(1500)
// Pool for smaller control buffers
audioControlPool = NewAudioBufferPool(64) audioControlPool = NewAudioBufferPool(64)
) )
// GetAudioFrameBuffer gets a reusable buffer for audio frames
func GetAudioFrameBuffer() []byte { func GetAudioFrameBuffer() []byte {
return audioFramePool.Get() return audioFramePool.Get()
} }
// PutAudioFrameBuffer returns a buffer to the frame pool
func PutAudioFrameBuffer(buf []byte) { func PutAudioFrameBuffer(buf []byte) {
audioFramePool.Put(buf) audioFramePool.Put(buf)
} }
// GetAudioControlBuffer gets a reusable buffer for control data
func GetAudioControlBuffer() []byte { func GetAudioControlBuffer() []byte {
return audioControlPool.Get() return audioControlPool.Get()
} }
// PutAudioControlBuffer returns a buffer to the control pool
func PutAudioControlBuffer(buf []byte) { func PutAudioControlBuffer(buf []byte) {
audioControlPool.Put(buf) audioControlPool.Put(buf)
} }

View File

@ -204,7 +204,6 @@ func (aeb *AudioEventBroadcaster) sendInitialState(connectionID string) {
aeb.sendCurrentMetrics(subscriber) aeb.sendCurrentMetrics(subscriber)
} }
// getMicrophoneProcessMetrics returns microphone process metrics data, always providing a valid response
// convertAudioMetricsToEventData converts internal audio metrics to AudioMetricsData for events // convertAudioMetricsToEventData converts internal audio metrics to AudioMetricsData for events
func convertAudioMetricsToEventData(metrics AudioMetrics) AudioMetricsData { func convertAudioMetricsToEventData(metrics AudioMetrics) AudioMetricsData {
return AudioMetricsData{ return AudioMetricsData{
@ -371,8 +370,8 @@ func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubsc
// startMetricsBroadcasting starts a goroutine that periodically broadcasts metrics // startMetricsBroadcasting starts a goroutine that periodically broadcasts metrics
func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
// Use 500ms interval to match Connection Stats sidebar frequency for smooth histogram progression // Use 1000ms interval to match process monitor frequency for synchronized metrics
ticker := time.NewTicker(500 * time.Millisecond) ticker := time.NewTicker(1000 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
for range ticker.C { for range ticker.C {

View File

@ -1,6 +1,7 @@
package audio package audio
import ( import (
"context"
"sync/atomic" "sync/atomic"
"time" "time"
@ -10,51 +11,59 @@ import (
// AudioInputIPCManager manages microphone input using IPC when enabled // AudioInputIPCManager manages microphone input using IPC when enabled
type AudioInputIPCManager struct { type AudioInputIPCManager struct {
// metrics MUST be first for ARM32 alignment (contains int64 fields)
metrics AudioInputMetrics metrics AudioInputMetrics
supervisor *AudioInputSupervisor supervisor *AudioInputSupervisor
logger zerolog.Logger logger zerolog.Logger
running int32 running int32
ctx context.Context
cancel context.CancelFunc
} }
// NewAudioInputIPCManager creates a new IPC-based audio input manager // NewAudioInputIPCManager creates a new IPC-based audio input manager
func NewAudioInputIPCManager() *AudioInputIPCManager { func NewAudioInputIPCManager() *AudioInputIPCManager {
ctx, cancel := context.WithCancel(context.Background())
return &AudioInputIPCManager{ return &AudioInputIPCManager{
supervisor: NewAudioInputSupervisor(), supervisor: NewAudioInputSupervisor(),
logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(), logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(),
ctx: ctx,
cancel: cancel,
} }
} }
// Start starts the IPC-based audio input system // Start starts the IPC-based audio input system
func (aim *AudioInputIPCManager) Start() error { func (aim *AudioInputIPCManager) Start() error {
if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) { if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) {
return nil // Already running return nil
} }
aim.logger.Info().Msg("Starting IPC-based audio input system") aim.logger.Info().Msg("Starting IPC-based audio input system")
// Start the supervisor which will launch the subprocess
err := aim.supervisor.Start() err := aim.supervisor.Start()
if err != nil { if err != nil {
atomic.StoreInt32(&aim.running, 0) atomic.StoreInt32(&aim.running, 0)
aim.logger.Error().Err(err).Msg("Failed to start audio input supervisor")
return err return err
} }
// Send initial configuration
config := InputIPCConfig{ config := InputIPCConfig{
SampleRate: 48000, SampleRate: 48000,
Channels: 2, Channels: 2,
FrameSize: 960, // 20ms at 48kHz FrameSize: 960,
} }
// Wait briefly for the subprocess to be ready (reduced from 1 second) // Wait with timeout for subprocess readiness
time.Sleep(200 * time.Millisecond) select {
case <-time.After(200 * time.Millisecond):
case <-aim.ctx.Done():
aim.supervisor.Stop()
atomic.StoreInt32(&aim.running, 0)
return aim.ctx.Err()
}
err = aim.supervisor.SendConfig(config) err = aim.supervisor.SendConfig(config)
if err != nil { if err != nil {
aim.logger.Warn().Err(err).Msg("Failed to send initial config to audio input server") aim.logger.Warn().Err(err).Msg("Failed to send initial config, will retry later")
// Don't fail startup for config errors
} }
aim.logger.Info().Msg("IPC-based audio input system started") aim.logger.Info().Msg("IPC-based audio input system started")
@ -64,14 +73,12 @@ func (aim *AudioInputIPCManager) Start() error {
// Stop stops the IPC-based audio input system // Stop stops the IPC-based audio input system
func (aim *AudioInputIPCManager) Stop() { func (aim *AudioInputIPCManager) Stop() {
if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) { if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) {
return // Already stopped return
} }
aim.logger.Info().Msg("Stopping IPC-based audio input system") aim.logger.Info().Msg("Stopping IPC-based audio input system")
aim.cancel()
// Stop the supervisor
aim.supervisor.Stop() aim.supervisor.Stop()
aim.logger.Info().Msg("IPC-based audio input system stopped") aim.logger.Info().Msg("IPC-based audio input system stopped")
} }

View File

@ -2,6 +2,7 @@ package audio
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -226,7 +227,7 @@ var (
// Metrics update tracking // Metrics update tracking
metricsUpdateMutex sync.RWMutex metricsUpdateMutex sync.RWMutex
lastMetricsUpdate time.Time lastMetricsUpdate int64
// Counter value tracking (since prometheus counters don't have Get() method) // Counter value tracking (since prometheus counters don't have Get() method)
audioFramesReceivedValue int64 audioFramesReceivedValue int64
@ -241,28 +242,24 @@ var (
// UpdateAudioMetrics updates Prometheus metrics with current audio data // UpdateAudioMetrics updates Prometheus metrics with current audio data
func UpdateAudioMetrics(metrics AudioMetrics) { func UpdateAudioMetrics(metrics AudioMetrics) {
metricsUpdateMutex.Lock() oldReceived := atomic.SwapInt64(&audioFramesReceivedValue, metrics.FramesReceived)
defer metricsUpdateMutex.Unlock() if metrics.FramesReceived > oldReceived {
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - oldReceived))
// Update counters with delta values
if metrics.FramesReceived > audioFramesReceivedValue {
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - audioFramesReceivedValue))
audioFramesReceivedValue = metrics.FramesReceived
} }
if metrics.FramesDropped > audioFramesDroppedValue { oldDropped := atomic.SwapInt64(&audioFramesDroppedValue, metrics.FramesDropped)
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - audioFramesDroppedValue)) if metrics.FramesDropped > oldDropped {
audioFramesDroppedValue = metrics.FramesDropped audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
} }
if metrics.BytesProcessed > audioBytesProcessedValue { oldBytes := atomic.SwapInt64(&audioBytesProcessedValue, metrics.BytesProcessed)
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - audioBytesProcessedValue)) if metrics.BytesProcessed > oldBytes {
audioBytesProcessedValue = metrics.BytesProcessed audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
} }
if metrics.ConnectionDrops > audioConnectionDropsValue { oldDrops := atomic.SwapInt64(&audioConnectionDropsValue, metrics.ConnectionDrops)
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - audioConnectionDropsValue)) if metrics.ConnectionDrops > oldDrops {
audioConnectionDropsValue = metrics.ConnectionDrops audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
} }
// Update gauges // Update gauges
@ -271,33 +268,29 @@ func UpdateAudioMetrics(metrics AudioMetrics) {
audioLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix())) audioLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix()))
} }
lastMetricsUpdate = time.Now() atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data // UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data
func UpdateMicrophoneMetrics(metrics AudioInputMetrics) { func UpdateMicrophoneMetrics(metrics AudioInputMetrics) {
metricsUpdateMutex.Lock() oldSent := atomic.SwapInt64(&micFramesSentValue, metrics.FramesSent)
defer metricsUpdateMutex.Unlock() if metrics.FramesSent > oldSent {
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - oldSent))
// Update counters with delta values
if metrics.FramesSent > micFramesSentValue {
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - micFramesSentValue))
micFramesSentValue = metrics.FramesSent
} }
if metrics.FramesDropped > micFramesDroppedValue { oldDropped := atomic.SwapInt64(&micFramesDroppedValue, metrics.FramesDropped)
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - micFramesDroppedValue)) if metrics.FramesDropped > oldDropped {
micFramesDroppedValue = metrics.FramesDropped microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
} }
if metrics.BytesProcessed > micBytesProcessedValue { oldBytes := atomic.SwapInt64(&micBytesProcessedValue, metrics.BytesProcessed)
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - micBytesProcessedValue)) if metrics.BytesProcessed > oldBytes {
micBytesProcessedValue = metrics.BytesProcessed microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
} }
if metrics.ConnectionDrops > micConnectionDropsValue { oldDrops := atomic.SwapInt64(&micConnectionDropsValue, metrics.ConnectionDrops)
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - micConnectionDropsValue)) if metrics.ConnectionDrops > oldDrops {
micConnectionDropsValue = metrics.ConnectionDrops microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
} }
// Update gauges // Update gauges
@ -306,7 +299,7 @@ func UpdateMicrophoneMetrics(metrics AudioInputMetrics) {
microphoneLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix())) microphoneLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix()))
} }
lastMetricsUpdate = time.Now() atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data // UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data
@ -324,7 +317,7 @@ func UpdateAudioProcessMetrics(metrics ProcessMetrics, isRunning bool) {
audioProcessRunning.Set(0) audioProcessRunning.Set(0)
} }
lastMetricsUpdate = time.Now() atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data // UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data
@ -342,7 +335,7 @@ func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) {
microphoneProcessRunning.Set(0) microphoneProcessRunning.Set(0)
} }
lastMetricsUpdate = time.Now() atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// UpdateAudioConfigMetrics updates Prometheus metrics with audio configuration // UpdateAudioConfigMetrics updates Prometheus metrics with audio configuration
@ -355,7 +348,7 @@ func UpdateAudioConfigMetrics(config AudioConfig) {
audioConfigSampleRate.Set(float64(config.SampleRate)) audioConfigSampleRate.Set(float64(config.SampleRate))
audioConfigChannels.Set(float64(config.Channels)) audioConfigChannels.Set(float64(config.Channels))
lastMetricsUpdate = time.Now() atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// UpdateMicrophoneConfigMetrics updates Prometheus metrics with microphone configuration // UpdateMicrophoneConfigMetrics updates Prometheus metrics with microphone configuration
@ -368,14 +361,13 @@ func UpdateMicrophoneConfigMetrics(config AudioConfig) {
microphoneConfigSampleRate.Set(float64(config.SampleRate)) microphoneConfigSampleRate.Set(float64(config.SampleRate))
microphoneConfigChannels.Set(float64(config.Channels)) microphoneConfigChannels.Set(float64(config.Channels))
lastMetricsUpdate = time.Now() atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
} }
// GetLastMetricsUpdate returns the timestamp of the last metrics update // GetLastMetricsUpdate returns the timestamp of the last metrics update
func GetLastMetricsUpdate() time.Time { func GetLastMetricsUpdate() time.Time {
metricsUpdateMutex.RLock() timestamp := atomic.LoadInt64(&lastMetricsUpdate)
defer metricsUpdateMutex.RUnlock() return time.Unix(timestamp, 0)
return lastMetricsUpdate
} }
// StartMetricsUpdater starts a goroutine that periodically updates Prometheus metrics // StartMetricsUpdater starts a goroutine that periodically updates Prometheus metrics

View File

@ -6,43 +6,33 @@ import (
"unsafe" "unsafe"
) )
// MicrophoneContentionManager provides optimized microphone operation locking // MicrophoneContentionManager manages microphone access with cooldown periods
// with reduced contention using atomic operations and conditional locking
type MicrophoneContentionManager struct { type MicrophoneContentionManager struct {
// Atomic fields (must be 64-bit aligned on 32-bit systems) lastOpNano int64
lastOpNano int64 // Unix nanoseconds of last operation cooldownNanos int64
cooldownNanos int64 // Cooldown duration in nanoseconds operationID int64
operationID int64 // Incremental operation ID for tracking lockPtr unsafe.Pointer
// Lock-free state flags (using atomic.Pointer for lock-free updates)
lockPtr unsafe.Pointer // *sync.Mutex - conditionally allocated
} }
// NewMicrophoneContentionManager creates a new microphone contention manager
func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentionManager { func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentionManager {
return &MicrophoneContentionManager{ return &MicrophoneContentionManager{
cooldownNanos: int64(cooldown), cooldownNanos: int64(cooldown),
} }
} }
// OperationResult represents the result of attempting a microphone operation
type OperationResult struct { type OperationResult struct {
Allowed bool Allowed bool
RemainingCooldown time.Duration RemainingCooldown time.Duration
OperationID int64 OperationID int64
} }
// TryOperation attempts to perform a microphone operation with optimized contention handling
func (mcm *MicrophoneContentionManager) TryOperation() OperationResult { func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
now := time.Now().UnixNano() now := time.Now().UnixNano()
cooldown := atomic.LoadInt64(&mcm.cooldownNanos) cooldown := atomic.LoadInt64(&mcm.cooldownNanos)
// Fast path: check if we're clearly outside cooldown period using atomic read
lastOp := atomic.LoadInt64(&mcm.lastOpNano) lastOp := atomic.LoadInt64(&mcm.lastOpNano)
elapsed := now - lastOp elapsed := now - lastOp
if elapsed >= cooldown { if elapsed >= cooldown {
// Attempt atomic update without locking
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) { if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
opID := atomic.AddInt64(&mcm.operationID, 1) opID := atomic.AddInt64(&mcm.operationID, 1)
return OperationResult{ return OperationResult{
@ -51,16 +41,10 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
OperationID: opID, OperationID: opID,
} }
} }
} // Retry once if CAS failed
lastOp = atomic.LoadInt64(&mcm.lastOpNano)
// Slow path: potential contention, check remaining cooldown elapsed = now - lastOp
currentLastOp := atomic.LoadInt64(&mcm.lastOpNano) if elapsed >= cooldown && atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
currentElapsed := now - currentLastOp
if currentElapsed >= cooldown {
// Race condition: another operation might have updated lastOpNano
// Try once more with CAS
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, currentLastOp, now) {
opID := atomic.AddInt64(&mcm.operationID, 1) opID := atomic.AddInt64(&mcm.operationID, 1)
return OperationResult{ return OperationResult{
Allowed: true, Allowed: true,
@ -68,12 +52,9 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
OperationID: opID, OperationID: opID,
} }
} }
// If CAS failed, fall through to cooldown calculation
currentLastOp = atomic.LoadInt64(&mcm.lastOpNano)
currentElapsed = now - currentLastOp
} }
remaining := time.Duration(cooldown - currentElapsed) remaining := time.Duration(cooldown - elapsed)
if remaining < 0 { if remaining < 0 {
remaining = 0 remaining = 0
} }
@ -85,17 +66,14 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
} }
} }
// SetCooldown updates the cooldown duration atomically
func (mcm *MicrophoneContentionManager) SetCooldown(cooldown time.Duration) { func (mcm *MicrophoneContentionManager) SetCooldown(cooldown time.Duration) {
atomic.StoreInt64(&mcm.cooldownNanos, int64(cooldown)) atomic.StoreInt64(&mcm.cooldownNanos, int64(cooldown))
} }
// GetCooldown returns the current cooldown duration
func (mcm *MicrophoneContentionManager) GetCooldown() time.Duration { func (mcm *MicrophoneContentionManager) GetCooldown() time.Duration {
return time.Duration(atomic.LoadInt64(&mcm.cooldownNanos)) return time.Duration(atomic.LoadInt64(&mcm.cooldownNanos))
} }
// GetLastOperationTime returns the time of the last operation
func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time { func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time {
nanos := atomic.LoadInt64(&mcm.lastOpNano) nanos := atomic.LoadInt64(&mcm.lastOpNano)
if nanos == 0 { if nanos == 0 {
@ -104,55 +82,44 @@ func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time {
return time.Unix(0, nanos) return time.Unix(0, nanos)
} }
// GetOperationCount returns the total number of successful operations
func (mcm *MicrophoneContentionManager) GetOperationCount() int64 { func (mcm *MicrophoneContentionManager) GetOperationCount() int64 {
return atomic.LoadInt64(&mcm.operationID) return atomic.LoadInt64(&mcm.operationID)
} }
// Reset resets the contention manager state
func (mcm *MicrophoneContentionManager) Reset() { func (mcm *MicrophoneContentionManager) Reset() {
atomic.StoreInt64(&mcm.lastOpNano, 0) atomic.StoreInt64(&mcm.lastOpNano, 0)
atomic.StoreInt64(&mcm.operationID, 0) atomic.StoreInt64(&mcm.operationID, 0)
} }
// Global instance for microphone contention management
var ( var (
globalMicContentionManager unsafe.Pointer // *MicrophoneContentionManager globalMicContentionManager unsafe.Pointer
micContentionInitialized int32 micContentionInitialized int32
) )
// GetMicrophoneContentionManager returns the global microphone contention manager
func GetMicrophoneContentionManager() *MicrophoneContentionManager { func GetMicrophoneContentionManager() *MicrophoneContentionManager {
ptr := atomic.LoadPointer(&globalMicContentionManager) ptr := atomic.LoadPointer(&globalMicContentionManager)
if ptr != nil { if ptr != nil {
return (*MicrophoneContentionManager)(ptr) return (*MicrophoneContentionManager)(ptr)
} }
// Initialize on first use
if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) { if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) {
manager := NewMicrophoneContentionManager(200 * time.Millisecond) manager := NewMicrophoneContentionManager(200 * time.Millisecond)
atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager)) atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager))
return manager return manager
} }
// Another goroutine initialized it, try again
ptr = atomic.LoadPointer(&globalMicContentionManager) ptr = atomic.LoadPointer(&globalMicContentionManager)
if ptr != nil { if ptr != nil {
return (*MicrophoneContentionManager)(ptr) return (*MicrophoneContentionManager)(ptr)
} }
// Fallback: create a new manager (should rarely happen)
return NewMicrophoneContentionManager(200 * time.Millisecond) return NewMicrophoneContentionManager(200 * time.Millisecond)
} }
// TryMicrophoneOperation provides a convenient global function for microphone operations
func TryMicrophoneOperation() OperationResult { func TryMicrophoneOperation() OperationResult {
manager := GetMicrophoneContentionManager() return GetMicrophoneContentionManager().TryOperation()
return manager.TryOperation()
} }
// SetMicrophoneCooldown updates the global microphone cooldown
func SetMicrophoneCooldown(cooldown time.Duration) { func SetMicrophoneCooldown(cooldown time.Duration) {
manager := GetMicrophoneContentionManager() GetMicrophoneContentionManager().SetCooldown(cooldown)
manager.SetCooldown(cooldown)
} }

View File

@ -0,0 +1,71 @@
package audio
import (
"context"
"os"
"os/signal"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
// RunAudioOutputServer runs the audio output server subprocess
// This should be called from main() when the subprocess is detected
func RunAudioOutputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger()
logger.Info().Msg("Starting audio output server subprocess")
// Create audio server
server, err := NewAudioServer()
if err != nil {
logger.Error().Err(err).Msg("failed to create audio server")
return err
}
defer server.Close()
// Start accepting connections
if err := server.Start(); err != nil {
logger.Error().Err(err).Msg("failed to start audio server")
return err
}
// Initialize audio processing
err = StartNonBlockingAudioStreaming(func(frame []byte) {
if err := server.SendFrame(frame); err != nil {
logger.Warn().Err(err).Msg("failed to send audio frame")
RecordFrameDropped()
}
})
if err != nil {
logger.Error().Err(err).Msg("failed to start audio processing")
return err
}
logger.Info().Msg("Audio output server started, waiting for connections")
// Set up signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Wait for shutdown signal
select {
case sig := <-sigChan:
logger.Info().Str("signal", sig.String()).Msg("Received shutdown signal")
case <-ctx.Done():
logger.Info().Msg("Context cancelled")
}
// Graceful shutdown
logger.Info().Msg("Shutting down audio output server")
StopNonBlockingAudioStreaming()
// Give some time for cleanup
time.Sleep(100 * time.Millisecond)
logger.Info().Msg("Audio output server subprocess stopped")
return nil
}

View File

@ -24,7 +24,6 @@ type ProcessMetrics struct {
ProcessName string `json:"process_name"` ProcessName string `json:"process_name"`
} }
// ProcessMonitor monitors CPU and memory usage of processes
type ProcessMonitor struct { type ProcessMonitor struct {
logger zerolog.Logger logger zerolog.Logger
mutex sync.RWMutex mutex sync.RWMutex
@ -33,6 +32,8 @@ type ProcessMonitor struct {
stopChan chan struct{} stopChan chan struct{}
metricsChan chan ProcessMetrics metricsChan chan ProcessMetrics
updateInterval time.Duration updateInterval time.Duration
totalMemory int64
memoryOnce sync.Once
} }
// processState tracks the state needed for CPU calculation // processState tracks the state needed for CPU calculation
@ -51,7 +52,7 @@ func NewProcessMonitor() *ProcessMonitor {
monitoredPIDs: make(map[int]*processState), monitoredPIDs: make(map[int]*processState),
stopChan: make(chan struct{}), stopChan: make(chan struct{}),
metricsChan: make(chan ProcessMetrics, 100), metricsChan: make(chan ProcessMetrics, 100),
updateInterval: 2 * time.Second, // Update every 2 seconds updateInterval: 1000 * time.Millisecond, // Update every 1000ms to sync with websocket broadcasts
} }
} }
@ -138,30 +139,33 @@ func (pm *ProcessMonitor) monitorLoop() {
} }
} }
// collectAllMetrics collects metrics for all monitored processes
func (pm *ProcessMonitor) collectAllMetrics() { func (pm *ProcessMonitor) collectAllMetrics() {
pm.mutex.RLock() pm.mutex.RLock()
pids := make(map[int]*processState) pidsToCheck := make([]int, 0, len(pm.monitoredPIDs))
states := make([]*processState, 0, len(pm.monitoredPIDs))
for pid, state := range pm.monitoredPIDs { for pid, state := range pm.monitoredPIDs {
pids[pid] = state pidsToCheck = append(pidsToCheck, pid)
states = append(states, state)
} }
pm.mutex.RUnlock() pm.mutex.RUnlock()
for pid, state := range pids { deadPIDs := make([]int, 0)
if metric, err := pm.collectMetrics(pid, state); err == nil { for i, pid := range pidsToCheck {
if metric, err := pm.collectMetrics(pid, states[i]); err == nil {
select { select {
case pm.metricsChan <- metric: case pm.metricsChan <- metric:
default: default:
// Channel full, skip this metric
} }
} else { } else {
// Process might have died, remove it deadPIDs = append(deadPIDs, pid)
pm.RemoveProcess(pid)
} }
} }
for _, pid := range deadPIDs {
pm.RemoveProcess(pid)
}
} }
// collectMetrics collects metrics for a specific process
func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) { func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) {
now := time.Now() now := time.Now()
metric := ProcessMetrics{ metric := ProcessMetrics{
@ -170,30 +174,25 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM
ProcessName: state.name, ProcessName: state.name,
} }
// Read /proc/[pid]/stat for CPU and memory info
statPath := fmt.Sprintf("/proc/%d/stat", pid) statPath := fmt.Sprintf("/proc/%d/stat", pid)
statData, err := os.ReadFile(statPath) statData, err := os.ReadFile(statPath)
if err != nil { if err != nil {
return metric, fmt.Errorf("failed to read stat file: %w", err) return metric, err
} }
// Parse stat file
fields := strings.Fields(string(statData)) fields := strings.Fields(string(statData))
if len(fields) < 24 { if len(fields) < 24 {
return metric, fmt.Errorf("invalid stat file format") return metric, fmt.Errorf("invalid stat format")
} }
// Extract CPU times (fields 13, 14 are utime, stime in clock ticks)
utime, _ := strconv.ParseInt(fields[13], 10, 64) utime, _ := strconv.ParseInt(fields[13], 10, 64)
stime, _ := strconv.ParseInt(fields[14], 10, 64) stime, _ := strconv.ParseInt(fields[14], 10, 64)
totalCPUTime := utime + stime totalCPUTime := utime + stime
// Extract memory info (field 22 is vsize, field 23 is rss in pages)
vsize, _ := strconv.ParseInt(fields[22], 10, 64) vsize, _ := strconv.ParseInt(fields[22], 10, 64)
rss, _ := strconv.ParseInt(fields[23], 10, 64) rss, _ := strconv.ParseInt(fields[23], 10, 64)
// Convert RSS from pages to bytes (assuming 4KB pages) const pageSize = 4096
pageSize := int64(4096)
metric.MemoryRSS = rss * pageSize metric.MemoryRSS = rss * pageSize
metric.MemoryVMS = vsize metric.MemoryVMS = vsize
@ -229,28 +228,32 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM
return metric, nil return metric, nil
} }
// getTotalMemory returns total system memory in bytes
func (pm *ProcessMonitor) getTotalMemory() int64 { func (pm *ProcessMonitor) getTotalMemory() int64 {
file, err := os.Open("/proc/meminfo") pm.memoryOnce.Do(func() {
if err != nil { file, err := os.Open("/proc/meminfo")
return 0 if err != nil {
} pm.totalMemory = 8 * 1024 * 1024 * 1024 // Default 8GB
defer file.Close() return
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "MemTotal:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
return kb * 1024 // Convert KB to bytes
}
}
break
} }
} defer file.Close()
return 0
scanner := bufio.NewScanner(file)
for scanner.Scan() {
line := scanner.Text()
if strings.HasPrefix(line, "MemTotal:") {
fields := strings.Fields(line)
if len(fields) >= 2 {
if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
pm.totalMemory = kb * 1024
return
}
}
break
}
}
pm.totalMemory = 8 * 1024 * 1024 * 1024 // Fallback
})
return pm.totalMemory
} }
// GetTotalMemory returns total system memory in bytes (public method) // GetTotalMemory returns total system memory in bytes (public method)

View File

@ -3,6 +3,7 @@ package audio
import ( import (
"context" "context"
"sync" "sync"
"time"
"github.com/jetkvm/kvm/internal/logging" "github.com/jetkvm/kvm/internal/logging"
"github.com/pion/webrtc/v4/pkg/media" "github.com/pion/webrtc/v4/pkg/media"
@ -123,26 +124,34 @@ func (r *AudioRelay) UpdateTrack(audioTrack AudioTrackWriter) {
r.audioTrack = audioTrack r.audioTrack = audioTrack
} }
// relayLoop is the main relay loop that forwards frames from subprocess to WebRTC
func (r *AudioRelay) relayLoop() { func (r *AudioRelay) relayLoop() {
defer r.wg.Done() defer r.wg.Done()
r.logger.Debug().Msg("Audio relay loop started") r.logger.Debug().Msg("Audio relay loop started")
const maxConsecutiveErrors = 10
consecutiveErrors := 0
for { for {
select { select {
case <-r.ctx.Done(): case <-r.ctx.Done():
r.logger.Debug().Msg("Audio relay loop stopping") r.logger.Debug().Msg("Audio relay loop stopping")
return return
default: default:
// Receive frame from audio server subprocess
frame, err := r.client.ReceiveFrame() frame, err := r.client.ReceiveFrame()
if err != nil { if err != nil {
r.logger.Error().Err(err).Msg("Failed to receive audio frame") consecutiveErrors++
r.logger.Error().Err(err).Int("consecutive_errors", consecutiveErrors).Msg("Failed to receive audio frame")
r.incrementDropped() r.incrementDropped()
if consecutiveErrors >= maxConsecutiveErrors {
r.logger.Error().Msg("Too many consecutive errors, stopping relay")
return
}
time.Sleep(10 * time.Millisecond)
continue continue
} }
// Forward frame to WebRTC consecutiveErrors = 0
if err := r.forwardToWebRTC(frame); err != nil { if err := r.forwardToWebRTC(frame); err != nil {
r.logger.Warn().Err(err).Msg("Failed to forward frame to WebRTC") r.logger.Warn().Err(err).Msg("Failed to forward frame to WebRTC")
r.incrementDropped() r.incrementDropped()

37
main.go
View File

@ -20,43 +20,14 @@ var (
audioSupervisor *audio.AudioServerSupervisor audioSupervisor *audio.AudioServerSupervisor
) )
// runAudioServer is now handled by audio.RunAudioOutputServer
// This function is kept for backward compatibility but delegates to the audio package
func runAudioServer() { func runAudioServer() {
logger.Info().Msg("Starting audio server subprocess") err := audio.RunAudioOutputServer()
// Create audio server
server, err := audio.NewAudioServer()
if err != nil { if err != nil {
logger.Error().Err(err).Msg("failed to create audio server") logger.Error().Err(err).Msg("audio output server failed")
os.Exit(1) os.Exit(1)
} }
defer server.Close()
// Start accepting connections
if err := server.Start(); err != nil {
logger.Error().Err(err).Msg("failed to start audio server")
os.Exit(1)
}
// Initialize audio processing
err = audio.StartNonBlockingAudioStreaming(func(frame []byte) {
if err := server.SendFrame(frame); err != nil {
logger.Warn().Err(err).Msg("failed to send audio frame")
audio.RecordFrameDropped()
}
})
if err != nil {
logger.Error().Err(err).Msg("failed to start audio processing")
os.Exit(1)
}
// Wait for termination signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
// Cleanup
audio.StopNonBlockingAudioStreaming()
logger.Info().Msg("Audio server subprocess stopped")
} }
func startAudioSubprocess() error { func startAudioSubprocess() error {

View File

@ -41,10 +41,6 @@ interface AudioConfig {
FrameSize: string; FrameSize: string;
} }
const qualityLabels = { const qualityLabels = {
0: "Low (32kbps)", 0: "Low (32kbps)",
1: "Medium (64kbps)", 1: "Medium (64kbps)",
@ -211,7 +207,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click // Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) { if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
console.log("Microphone operation already in progress or within cooldown, ignoring click");
return; return;
} }
@ -233,7 +228,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click // Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) { if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
console.log("Microphone operation already in progress or within cooldown, ignoring mute toggle");
return; return;
} }
@ -279,7 +273,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
if (videoElement && 'setSinkId' in videoElement) { if (videoElement && 'setSinkId' in videoElement) {
try { try {
await (videoElement as HTMLVideoElement & { setSinkId: (deviceId: string) => Promise<void> }).setSinkId(deviceId); await (videoElement as HTMLVideoElement & { setSinkId: (deviceId: string) => Promise<void> }).setSinkId(deviceId);
console.log('Audio output device changed to:', deviceId);
} catch (error: unknown) { } catch (error: unknown) {
console.error('Failed to change audio output device:', error); console.error('Failed to change audio output device:', error);
} }