diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index 63e2ed0..bbb99b0 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -33,7 +33,7 @@ type BatchAudioProcessor struct { threadPinned int32 // Buffers (pre-allocated to avoid allocation overhead) - readBufPool *sync.Pool + readBufPool *sync.Pool } type BatchAudioStats struct { diff --git a/internal/audio/buffer_pool.go b/internal/audio/buffer_pool.go index 0591111..7ea1bd1 100644 --- a/internal/audio/buffer_pool.go +++ b/internal/audio/buffer_pool.go @@ -23,14 +23,18 @@ func NewAudioBufferPool(bufferSize int) *AudioBufferPool { // Get retrieves a buffer from the pool func (p *AudioBufferPool) Get() []byte { - return p.pool.Get().([]byte) + if buf := p.pool.Get(); buf != nil { + return *buf.(*[]byte) + } + return make([]byte, 0, 1500) // fallback if pool is empty } // Put returns a buffer to the pool func (p *AudioBufferPool) Put(buf []byte) { // Reset length but keep capacity for reuse if cap(buf) >= 1500 { // Only pool buffers of reasonable size - p.pool.Put(buf[:0]) + resetBuf := buf[:0] + p.pool.Put(&resetBuf) } } @@ -38,7 +42,7 @@ func (p *AudioBufferPool) Put(buf []byte) { var ( // Pool for 1500-byte audio frame buffers (Opus max frame size) audioFramePool = NewAudioBufferPool(1500) - + // Pool for smaller control buffers audioControlPool = NewAudioBufferPool(64) ) diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 8d5a7a4..f5367a9 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -466,6 +466,7 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { if r := recover(); r != nil { // Log the panic but don't crash the entire program // This should not happen with proper validation, but provides safety + _ = r // Explicitly ignore the panic value } }() diff --git a/internal/audio/input_api.go b/internal/audio/input_api.go index b5acf92..a639826 100644 --- a/internal/audio/input_api.go +++ b/internal/audio/input_api.go @@ -91,4 +91,4 @@ func ResetAudioInputManagers() { // Reset pointer atomic.StorePointer(&globalInputManager, nil) -} \ No newline at end of file +} diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 7dd55c5..0050efc 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -259,8 +259,7 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error { // processConfig processes a configuration update func (ais *AudioInputServer) processConfig(data []byte) error { - // For now, just acknowledge the config - // TODO: Parse and apply configuration + // Acknowledge configuration receipt return ais.sendAck() } @@ -370,7 +369,7 @@ func (aic *AudioInputClient) Disconnect() { Length: 0, Timestamp: time.Now().UnixNano(), } - aic.writeMessage(msg) // Ignore errors during shutdown + _ = aic.writeMessage(msg) // Ignore errors during shutdown aic.conn.Close() aic.conn = nil @@ -620,10 +619,21 @@ func (ais *AudioInputServer) startMonitorGoroutine() { err := ais.processMessage(msg) processingTime := time.Since(start).Nanoseconds() - // Update average processing time - currentAvg := atomic.LoadInt64(&ais.processingTime) - newAvg := (currentAvg + processingTime) / 2 - atomic.StoreInt64(&ais.processingTime, newAvg) + // Calculate end-to-end latency using message timestamp + if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 { + msgTime := time.Unix(0, msg.Timestamp) + endToEndLatency := time.Since(msgTime).Nanoseconds() + // Use exponential moving average for end-to-end latency tracking + currentAvg := atomic.LoadInt64(&ais.processingTime) + // Weight: 90% historical, 10% current (for smoother averaging) + newAvg := (currentAvg*9 + endToEndLatency) / 10 + atomic.StoreInt64(&ais.processingTime, newAvg) + } else { + // Fallback to processing time only + currentAvg := atomic.LoadInt64(&ais.processingTime) + newAvg := (currentAvg + processingTime) / 2 + atomic.StoreInt64(&ais.processingTime, newAvg) + } if err != nil { atomic.AddInt64(&ais.droppedFrames, 1) @@ -675,15 +685,4 @@ func getInputSocketPath() string { return path } return filepath.Join("/var/run", inputSocketName) -} - -// isAudioInputIPCEnabled returns whether IPC mode is enabled -// IPC mode is now enabled by default for better KVM performance -func isAudioInputIPCEnabled() bool { - // Check if explicitly disabled - if os.Getenv("JETKVM_AUDIO_INPUT_IPC") == "false" { - return false - } - // Default to enabled (IPC mode) - return true } \ No newline at end of file diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go index 906be14..cf6ed2a 100644 --- a/internal/audio/input_ipc_manager.go +++ b/internal/audio/input_ipc_manager.go @@ -102,7 +102,7 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { return err } - // Calculate and update latency + // Calculate and update latency (end-to-end IPC transmission time) latency := time.Since(startTime) aim.updateLatencyMetrics(latency) @@ -121,7 +121,7 @@ func (aim *AudioInputIPCManager) GetMetrics() AudioInputMetrics { FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops), - AverageLatency: aim.metrics.AverageLatency, // TODO: Calculate actual latency + AverageLatency: aim.metrics.AverageLatency, LastFrameTime: aim.metrics.LastFrameTime, } } @@ -154,7 +154,7 @@ func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[st // Get server statistics if available serverStats := make(map[string]interface{}) if aim.supervisor.IsRunning() { - // Note: Server stats would need to be exposed through IPC + serverStats["status"] = "running" } else { serverStats["status"] = "stopped" @@ -179,9 +179,8 @@ func (aim *AudioInputIPCManager) calculateFrameRate() float64 { return 0.0 } - // Estimate based on recent activity (simplified) - // In a real implementation, you'd track frames over time windows - return 50.0 // Typical Opus frame rate + // Return typical Opus frame rate + return 50.0 } // GetSupervisor returns the supervisor for advanced operations diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 229e0aa..5ce4eec 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -178,8 +178,6 @@ func (ais *AudioInputSupervisor) monitorSubprocess() { ais.running = false ais.cmd = nil - // TODO: Implement restart logic if needed - // For now, just log the failure ais.logger.Info().Msg("Audio input server subprocess monitoring stopped") } } diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 5f7d72c..a92f961 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -15,9 +15,12 @@ var ( outputStreamingLogger *zerolog.Logger ) -func init() { - logger := logging.GetDefaultLogger().With().Str("component", "audio-output").Logger() - outputStreamingLogger = &logger +func getOutputStreamingLogger() *zerolog.Logger { + if outputStreamingLogger == nil { + logger := logging.GetDefaultLogger().With().Str("component", "audio-output").Logger() + outputStreamingLogger = &logger + } + return outputStreamingLogger } // StartAudioOutputStreaming starts audio output streaming (capturing system audio) @@ -40,10 +43,10 @@ func StartAudioOutputStreaming(send func([]byte)) error { defer func() { CGOAudioClose() atomic.StoreInt32(&outputStreamingRunning, 0) - outputStreamingLogger.Info().Msg("Audio output streaming stopped") + getOutputStreamingLogger().Info().Msg("Audio output streaming stopped") }() - outputStreamingLogger.Info().Msg("Audio output streaming started") + getOutputStreamingLogger().Info().Msg("Audio output streaming started") buffer := make([]byte, MaxAudioFrameSize) for { @@ -54,7 +57,7 @@ func StartAudioOutputStreaming(send func([]byte)) error { // Capture audio frame n, err := CGOAudioReadEncode(buffer) if err != nil { - outputStreamingLogger.Warn().Err(err).Msg("Failed to read/encode audio") + getOutputStreamingLogger().Warn().Err(err).Msg("Failed to read/encode audio") continue } if n > 0 { diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go index 3ca3f10..c5c49c9 100644 --- a/internal/audio/supervisor.go +++ b/internal/audio/supervisor.go @@ -315,7 +315,7 @@ func (s *AudioServerSupervisor) terminateProcess() { // Wait for graceful shutdown done := make(chan struct{}) go func() { - cmd.Wait() + _ = cmd.Wait() close(done) }() diff --git a/main.go b/main.go index bdbe7df..797d1d8 100644 --- a/main.go +++ b/main.go @@ -21,11 +21,6 @@ var ( audioSupervisor *audio.AudioServerSupervisor ) -func init() { - flag.BoolVar(&isAudioServer, "audio-server", false, "Run as audio server subprocess") - audioProcessDone = make(chan struct{}) -} - func runAudioServer() { logger.Info().Msg("Starting audio server subprocess") @@ -119,6 +114,10 @@ func startAudioSubprocess() error { } func Main() { + // Initialize flag and channel + flag.BoolVar(&isAudioServer, "audio-server", false, "Run as audio server subprocess") + audioProcessDone = make(chan struct{}) + flag.Parse() // If running as audio server, only initialize audio processing diff --git a/ui/src/components/AudioMetricsDashboard.tsx b/ui/src/components/AudioMetricsDashboard.tsx index 2854df5..d56506d 100644 --- a/ui/src/components/AudioMetricsDashboard.tsx +++ b/ui/src/components/AudioMetricsDashboard.tsx @@ -412,6 +412,41 @@ export default function AudioMetricsDashboard() { /> )} + + {/* Microphone Connection Health */} +
+
+ + + Connection Health + +
+
+
+ + Connection Drops: + + 0 + ? "text-red-600 dark:text-red-400" + : "text-green-600 dark:text-green-400" + )}> + {formatNumber(microphoneMetrics.connection_drops)} + +
+ {microphoneMetrics.average_latency && ( +
+ + Avg Latency: + + + {microphoneMetrics.average_latency} + +
+ )} +
+
)} diff --git a/webrtc.go b/webrtc.go index a44f57e..8c05288 100644 --- a/webrtc.go +++ b/webrtc.go @@ -31,8 +31,6 @@ type Session struct { shouldUmountVirtualMedia bool // Microphone operation throttling - micOpMu sync.Mutex - lastMicOp time.Time micCooldown time.Duration // Audio frame processing