From 423d5775e3e289353187cb6436a65305d8b3db6d Mon Sep 17 00:00:00 2001 From: Alex P Date: Thu, 21 Aug 2025 22:16:48 +0000 Subject: [PATCH] [WIP] Performance Enhancements: move audion processing into a separate process --- dev_deploy.sh | 50 +- internal/audio/api.go | 54 ++- internal/audio/batch_audio.go | 144 ------ internal/audio/events.go | 12 +- internal/audio/input.go | 139 ++++-- internal/audio/input_api.go | 94 ++++ internal/audio/input_ipc.go | 689 ++++++++++++++++++++++++++++ internal/audio/input_ipc_manager.go | 190 ++++++++ internal/audio/input_server_main.go | 72 +++ internal/audio/input_supervisor.go | 225 +++++++++ internal/audio/ipc.go | 128 ++++++ internal/audio/nonblocking_api.go | 115 ----- internal/audio/nonblocking_audio.go | 564 ----------------------- internal/audio/output_streaming.go | 91 ++++ internal/audio/relay.go | 198 ++++++++ internal/audio/relay_api.go | 109 +++++ internal/audio/supervisor.go | 400 ++++++++++++++++ main.go | 164 +++++-- resource/dev_test.sh | 2 +- tools/build_audio_deps.sh | 0 tools/setup_rv1106_toolchain.sh | 0 web.go | 22 +- webrtc.go | 69 ++- 23 files changed, 2565 insertions(+), 966 deletions(-) create mode 100644 internal/audio/input_api.go create mode 100644 internal/audio/input_ipc.go create mode 100644 internal/audio/input_ipc_manager.go create mode 100644 internal/audio/input_server_main.go create mode 100644 internal/audio/input_supervisor.go create mode 100644 internal/audio/ipc.go delete mode 100644 internal/audio/nonblocking_api.go delete mode 100644 internal/audio/nonblocking_audio.go create mode 100644 internal/audio/output_streaming.go create mode 100644 internal/audio/relay.go create mode 100644 internal/audio/relay_api.go create mode 100644 internal/audio/supervisor.go mode change 100644 => 100755 resource/dev_test.sh mode change 100644 => 100755 tools/build_audio_deps.sh mode change 100644 => 100755 tools/setup_rv1106_toolchain.sh diff --git a/dev_deploy.sh b/dev_deploy.sh index 7a79e97..eb3560a 100755 --- a/dev_deploy.sh +++ b/dev_deploy.sh @@ -159,8 +159,8 @@ else msg_info "▶ Building development binary" make build_dev - # Kill any existing instances of the application - ssh "${REMOTE_USER}@${REMOTE_HOST}" "killall jetkvm_app_debug || true" + # Kill any existing instances of the application (specific cleanup) + ssh "${REMOTE_USER}@${REMOTE_HOST}" "killall jetkvm_app || true; killall jetkvm_native || true; killall jetkvm_app_debug || true; sleep 2" # Copy the binary to the remote host ssh "${REMOTE_USER}@${REMOTE_HOST}" "cat > ${REMOTE_PATH}/jetkvm_app_debug" < bin/jetkvm_app @@ -180,18 +180,18 @@ set -e # Set the library path to include the directory where librockit.so is located export LD_LIBRARY_PATH=/oem/usr/lib:\$LD_LIBRARY_PATH -# Check if production jetkvm_app is running and save its state -PROD_APP_RUNNING=false -if pgrep -f "/userdata/jetkvm/bin/jetkvm_app" > /dev/null; then - PROD_APP_RUNNING=true - echo "Production jetkvm_app is running, will restore after development session" -else - echo "No production jetkvm_app detected" -fi - -# Kill any existing instances of the application -pkill -f "/userdata/jetkvm/bin/jetkvm_app" || true +# Kill any existing instances of the application (specific cleanup) +killall jetkvm_app || true +killall jetkvm_native || true killall jetkvm_app_debug || true +sleep 2 + +# Verify no processes are using port 80 +if netstat -tlnp | grep :80 > /dev/null 2>&1; then + echo "Warning: Port 80 still in use, attempting to free it..." + fuser -k 80/tcp || true + sleep 1 +fi # Navigate to the directory where the binary will be stored cd "${REMOTE_PATH}" @@ -199,29 +199,7 @@ cd "${REMOTE_PATH}" # Make the new binary executable chmod +x jetkvm_app_debug -# Create a cleanup script that will restore the production app -cat > /tmp/restore_jetkvm.sh << RESTORE_EOF -#!/bin/ash -set -e -export LD_LIBRARY_PATH=/oem/usr/lib:\$LD_LIBRARY_PATH -cd ${REMOTE_PATH} -if [ "$PROD_APP_RUNNING" = "true" ]; then - echo "Restoring production jetkvm_app..." - killall jetkvm_app_debug || true - nohup /userdata/jetkvm/bin/jetkvm_app > /tmp/jetkvm_app.log 2>&1 & - echo "Production jetkvm_app restored" -else - echo "No production app was running before, not restoring" -fi -RESTORE_EOF - -chmod +x /tmp/restore_jetkvm.sh - -# Set up signal handler to restore production app on exit -trap '/tmp/restore_jetkvm.sh' EXIT INT TERM - -# Run the application in the foreground -echo "Starting development jetkvm_app_debug..." +# Run the application in the background PION_LOG_TRACE=${LOG_TRACE_SCOPES} ./jetkvm_app_debug | tee -a /tmp/jetkvm_app_debug.log EOF fi diff --git a/internal/audio/api.go b/internal/audio/api.go index cbdb925..dcc3ae6 100644 --- a/internal/audio/api.go +++ b/internal/audio/api.go @@ -1,13 +1,51 @@ package audio -// StartAudioStreaming launches the in-process audio stream and delivers Opus frames to the provided callback. -// This is now a wrapper around the non-blocking audio implementation for backward compatibility. -func StartAudioStreaming(send func([]byte)) error { - return StartNonBlockingAudioStreaming(send) +import ( + "os" + "strings" +) + +// isAudioServerProcess detects if we're running as the audio server subprocess +func isAudioServerProcess() bool { + for _, arg := range os.Args { + if strings.Contains(arg, "--audio-server") { + return true + } + } + return false } -// StopAudioStreaming stops the in-process audio stream. -// This is now a wrapper around the non-blocking audio implementation for backward compatibility. -func StopAudioStreaming() { - StopNonBlockingAudioStreaming() +// StartAudioStreaming launches the audio stream. +// In audio server subprocess: uses CGO-based audio streaming +// In main process: this should not be called (use StartAudioRelay instead) +func StartAudioStreaming(send func([]byte)) error { + if isAudioServerProcess() { + // Audio server subprocess: use CGO audio processing + return StartAudioOutputStreaming(send) + } else { + // Main process: should use relay system instead + // This is kept for backward compatibility but not recommended + return StartAudioOutputStreaming(send) + } +} + +// StopAudioStreaming stops the audio stream. +func StopAudioStreaming() { + if isAudioServerProcess() { + // Audio server subprocess: stop CGO audio processing + StopAudioOutputStreaming() + } else { + // Main process: stop relay if running + StopAudioRelay() + } +} + +// StartNonBlockingAudioStreaming is an alias for backward compatibility +func StartNonBlockingAudioStreaming(send func([]byte)) error { + return StartAudioOutputStreaming(send) +} + +// StopNonBlockingAudioStreaming is an alias for backward compatibility +func StopNonBlockingAudioStreaming() { + StopAudioOutputStreaming() } diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index 61d8dcc..63e2ed0 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -28,22 +28,18 @@ type BatchAudioProcessor struct { // Batch queues and state (atomic for lock-free access) readQueue chan batchReadRequest - writeQueue chan batchWriteRequest initialized int32 running int32 threadPinned int32 // Buffers (pre-allocated to avoid allocation overhead) readBufPool *sync.Pool - writeBufPool *sync.Pool } type BatchAudioStats struct { // int64 fields MUST be first for ARM32 alignment BatchedReads int64 - BatchedWrites int64 SingleReads int64 - SingleWrites int64 BatchedFrames int64 SingleFrames int64 CGOCallsReduced int64 @@ -57,22 +53,11 @@ type batchReadRequest struct { timestamp time.Time } -type batchWriteRequest struct { - buffer []byte - resultChan chan batchWriteResult - timestamp time.Time -} - type batchReadResult struct { length int err error } -type batchWriteResult struct { - written int - err error -} - // NewBatchAudioProcessor creates a new batch audio processor func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { ctx, cancel := context.WithCancel(context.Background()) @@ -85,17 +70,11 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu batchSize: batchSize, batchDuration: batchDuration, readQueue: make(chan batchReadRequest, batchSize*2), - writeQueue: make(chan batchWriteRequest, batchSize*2), readBufPool: &sync.Pool{ New: func() interface{} { return make([]byte, 1500) // Max audio frame size }, }, - writeBufPool: &sync.Pool{ - New: func() interface{} { - return make([]byte, 4096) // Max write buffer size - }, - }, } return processor @@ -114,7 +93,6 @@ func (bap *BatchAudioProcessor) Start() error { // Start batch processing goroutines go bap.batchReadProcessor() - go bap.batchWriteProcessor() bap.logger.Info().Int("batch_size", bap.batchSize). Dur("batch_duration", bap.batchDuration). @@ -175,43 +153,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) { } } -// BatchDecodeWrite performs batched audio decode and write operations -func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) { - if atomic.LoadInt32(&bap.running) == 0 { - // Fallback to single operation if batch processor is not running - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.SingleFrames, 1) - return CGOAudioDecodeWrite(buffer) - } - resultChan := make(chan batchWriteResult, 1) - request := batchWriteRequest{ - buffer: buffer, - resultChan: resultChan, - timestamp: time.Now(), - } - - select { - case bap.writeQueue <- request: - // Successfully queued - case <-time.After(5 * time.Millisecond): - // Queue is full or blocked, fallback to single operation - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.SingleFrames, 1) - return CGOAudioDecodeWrite(buffer) - } - - // Wait for result - select { - case result := <-resultChan: - return result.written, result.err - case <-time.After(50 * time.Millisecond): - // Timeout, fallback to single operation - atomic.AddInt64(&bap.stats.SingleWrites, 1) - atomic.AddInt64(&bap.stats.SingleFrames, 1) - return CGOAudioDecodeWrite(buffer) - } -} // batchReadProcessor processes batched read operations func (bap *BatchAudioProcessor) batchReadProcessor() { @@ -249,41 +191,7 @@ func (bap *BatchAudioProcessor) batchReadProcessor() { } } -// batchWriteProcessor processes batched write operations -func (bap *BatchAudioProcessor) batchWriteProcessor() { - defer bap.logger.Debug().Msg("batch write processor stopped") - ticker := time.NewTicker(bap.batchDuration) - defer ticker.Stop() - - var batch []batchWriteRequest - batch = make([]batchWriteRequest, 0, bap.batchSize) - - for atomic.LoadInt32(&bap.running) == 1 { - select { - case <-bap.ctx.Done(): - return - - case req := <-bap.writeQueue: - batch = append(batch, req) - if len(batch) >= bap.batchSize { - bap.processBatchWrite(batch) - batch = batch[:0] // Clear slice but keep capacity - } - - case <-ticker.C: - if len(batch) > 0 { - bap.processBatchWrite(batch) - batch = batch[:0] // Clear slice but keep capacity - } - } - } - - // Process any remaining requests - if len(batch) > 0 { - bap.processBatchWrite(batch) - } -} // processBatchRead processes a batch of read requests efficiently func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { @@ -328,56 +236,13 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { bap.stats.LastBatchTime = time.Now() } -// processBatchWrite processes a batch of write requests efficiently -func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) { - if len(batch) == 0 { - return - } - // Pin to OS thread for the entire batch to minimize thread switching overhead - start := time.Now() - if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) { - runtime.LockOSThread() - defer func() { - runtime.UnlockOSThread() - atomic.StoreInt32(&bap.threadPinned, 0) - bap.stats.OSThreadPinTime += time.Since(start) - }() - } - - batchSize := len(batch) - atomic.AddInt64(&bap.stats.BatchedWrites, 1) - atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) - if batchSize > 1 { - atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1)) - } - - // Process each request in the batch - for _, req := range batch { - written, err := CGOAudioDecodeWrite(req.buffer) - result := batchWriteResult{ - written: written, - err: err, - } - - // Send result back (non-blocking) - select { - case req.resultChan <- result: - default: - // Requestor timed out, drop result - } - } - - bap.stats.LastBatchTime = time.Now() -} // GetStats returns current batch processor statistics func (bap *BatchAudioProcessor) GetStats() BatchAudioStats { return BatchAudioStats{ BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads), - BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites), SingleReads: atomic.LoadInt64(&bap.stats.SingleReads), - SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites), BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames), SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames), CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced), @@ -443,13 +308,4 @@ func BatchCGOAudioReadEncode(buffer []byte) (int, error) { return processor.BatchReadEncode(buffer) } return CGOAudioReadEncode(buffer) -} - -// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite -func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) { - processor := GetBatchAudioProcessor() - if processor != nil && processor.IsRunning() { - return processor.BatchDecodeWrite(buffer) - } - return CGOAudioDecodeWrite(buffer) } \ No newline at end of file diff --git a/internal/audio/events.go b/internal/audio/events.go index dff912b..c677c54 100644 --- a/internal/audio/events.go +++ b/internal/audio/events.go @@ -249,13 +249,13 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { for range ticker.C { aeb.mutex.RLock() subscriberCount := len(aeb.subscribers) - + // Early exit if no subscribers to save CPU if subscriberCount == 0 { aeb.mutex.RUnlock() continue } - + // Create a copy for safe iteration subscribersCopy := make([]*AudioEventSubscriber, 0, subscriberCount) for _, sub := range aeb.subscribers { @@ -270,7 +270,7 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { activeSubscribers++ } } - + // Skip metrics gathering if no active subscribers if activeSubscribers == 0 { continue @@ -357,9 +357,9 @@ func (aeb *AudioEventBroadcaster) sendToSubscriber(subscriber *AudioEventSubscri err := wsjson.Write(ctx, subscriber.conn, event) if err != nil { // Don't log network errors for closed connections as warnings, they're expected - if strings.Contains(err.Error(), "use of closed network connection") || - strings.Contains(err.Error(), "connection reset by peer") || - strings.Contains(err.Error(), "context canceled") { + if strings.Contains(err.Error(), "use of closed network connection") || + strings.Contains(err.Error(), "connection reset by peer") || + strings.Contains(err.Error(), "context canceled") { subscriber.logger.Debug().Err(err).Msg("websocket connection closed during audio event send") } else { subscriber.logger.Warn().Err(err).Msg("failed to send audio event to subscriber") diff --git a/internal/audio/input.go b/internal/audio/input.go index 1fdcfc8..5121687 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -19,21 +19,21 @@ type AudioInputMetrics struct { LastFrameTime time.Time } -// AudioInputManager manages microphone input stream from WebRTC to USB gadget +// AudioInputManager manages microphone input stream using IPC mode only type AudioInputManager struct { // metrics MUST be first for ARM32 alignment (contains int64 fields) metrics AudioInputMetrics - inputBuffer chan []byte - logger zerolog.Logger - running int32 + ipcManager *AudioInputIPCManager + logger zerolog.Logger + running int32 } -// NewAudioInputManager creates a new audio input manager +// NewAudioInputManager creates a new audio input manager (IPC mode only) func NewAudioInputManager() *AudioInputManager { return &AudioInputManager{ - inputBuffer: make(chan []byte, 100), // Buffer up to 100 frames - logger: logging.GetDefaultLogger().With().Str("component", "audio-input").Logger(), + ipcManager: NewAudioInputIPCManager(), + logger: logging.GetDefaultLogger().With().Str("component", "audio-input").Logger(), } } @@ -45,9 +45,10 @@ func (aim *AudioInputManager) Start() error { aim.logger.Info().Msg("Starting audio input manager") - // Start the non-blocking audio input stream - err := StartNonBlockingAudioInput(aim.inputBuffer) + // Start the IPC-based audio input + err := aim.ipcManager.Start() if err != nil { + aim.logger.Error().Err(err).Msg("Failed to start IPC audio input") atomic.StoreInt32(&aim.running, 0) return err } @@ -63,54 +64,102 @@ func (aim *AudioInputManager) Stop() { aim.logger.Info().Msg("Stopping audio input manager") - // Stop the non-blocking audio input stream - StopNonBlockingAudioInput() - - // Drain the input buffer - go func() { - for { - select { - case <-aim.inputBuffer: - // Drain - case <-time.After(100 * time.Millisecond): - return - } - } - }() + // Stop the IPC-based audio input + aim.ipcManager.Stop() aim.logger.Info().Msg("Audio input manager stopped") } -// WriteOpusFrame writes an Opus frame to the input buffer +// WriteOpusFrame writes an Opus frame to the audio input system with latency tracking func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { - if atomic.LoadInt32(&aim.running) == 0 { - return nil // Not running, ignore + if !aim.IsRunning() { + return nil // Not running, silently drop } - select { - case aim.inputBuffer <- frame: - atomic.AddInt64(&aim.metrics.FramesSent, 1) - atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame))) - aim.metrics.LastFrameTime = time.Now() - return nil - default: - // Buffer full, drop frame + // Track end-to-end latency from WebRTC to IPC + startTime := time.Now() + err := aim.ipcManager.WriteOpusFrame(frame) + processingTime := time.Since(startTime) + + // Log high latency warnings + if processingTime > 10*time.Millisecond { + aim.logger.Warn(). + Dur("latency_ms", processingTime). + Msg("High audio processing latency detected") + } + + if err != nil { atomic.AddInt64(&aim.metrics.FramesDropped, 1) - aim.logger.Warn().Msg("Audio input buffer full, dropping frame") - return nil + return err + } + + // Update metrics + atomic.AddInt64(&aim.metrics.FramesSent, 1) + atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame))) + aim.metrics.LastFrameTime = time.Now() + aim.metrics.AverageLatency = processingTime + return nil +} + +// GetMetrics returns current audio input metrics +func (aim *AudioInputManager) GetMetrics() AudioInputMetrics { + return AudioInputMetrics{ + FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), + FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), + BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), + AverageLatency: aim.metrics.AverageLatency, + LastFrameTime: aim.metrics.LastFrameTime, } } -// GetMetrics returns current microphone input metrics -func (aim *AudioInputManager) GetMetrics() AudioInputMetrics { - return AudioInputMetrics{ - FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), - FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), - BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), - LastFrameTime: aim.metrics.LastFrameTime, - ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops), - AverageLatency: aim.metrics.AverageLatency, +// GetComprehensiveMetrics returns detailed performance metrics across all components +func (aim *AudioInputManager) GetComprehensiveMetrics() map[string]interface{} { + // Get base metrics + baseMetrics := aim.GetMetrics() + + // Get detailed IPC metrics + ipcMetrics, detailedStats := aim.ipcManager.GetDetailedMetrics() + + comprehensiveMetrics := map[string]interface{}{ + "manager": map[string]interface{}{ + "frames_sent": baseMetrics.FramesSent, + "frames_dropped": baseMetrics.FramesDropped, + "bytes_processed": baseMetrics.BytesProcessed, + "average_latency_ms": float64(baseMetrics.AverageLatency.Nanoseconds()) / 1e6, + "last_frame_time": baseMetrics.LastFrameTime, + "running": aim.IsRunning(), + }, + "ipc": map[string]interface{}{ + "frames_sent": ipcMetrics.FramesSent, + "frames_dropped": ipcMetrics.FramesDropped, + "bytes_processed": ipcMetrics.BytesProcessed, + "average_latency_ms": float64(ipcMetrics.AverageLatency.Nanoseconds()) / 1e6, + "last_frame_time": ipcMetrics.LastFrameTime, + }, + "detailed": detailedStats, } + + return comprehensiveMetrics +} + +// LogPerformanceStats logs current performance statistics +func (aim *AudioInputManager) LogPerformanceStats() { + metrics := aim.GetComprehensiveMetrics() + + managerStats := metrics["manager"].(map[string]interface{}) + ipcStats := metrics["ipc"].(map[string]interface{}) + detailedStats := metrics["detailed"].(map[string]interface{}) + + aim.logger.Info(). + Int64("manager_frames_sent", managerStats["frames_sent"].(int64)). + Int64("manager_frames_dropped", managerStats["frames_dropped"].(int64)). + Float64("manager_latency_ms", managerStats["average_latency_ms"].(float64)). + Int64("ipc_frames_sent", ipcStats["frames_sent"].(int64)). + Int64("ipc_frames_dropped", ipcStats["frames_dropped"].(int64)). + Float64("ipc_latency_ms", ipcStats["average_latency_ms"].(float64)). + Float64("client_drop_rate", detailedStats["client_drop_rate"].(float64)). + Float64("frames_per_second", detailedStats["frames_per_second"].(float64)). + Msg("Audio input performance metrics") } // IsRunning returns whether the audio input manager is running diff --git a/internal/audio/input_api.go b/internal/audio/input_api.go new file mode 100644 index 0000000..b5acf92 --- /dev/null +++ b/internal/audio/input_api.go @@ -0,0 +1,94 @@ +package audio + +import ( + "sync/atomic" + "unsafe" +) + +var ( + // Global audio input manager instance + globalInputManager unsafe.Pointer // *AudioInputManager +) + +// AudioInputInterface defines the common interface for audio input managers +type AudioInputInterface interface { + Start() error + Stop() + WriteOpusFrame(frame []byte) error + IsRunning() bool + GetMetrics() AudioInputMetrics +} + +// GetSupervisor returns the audio input supervisor for advanced management +func (m *AudioInputManager) GetSupervisor() *AudioInputSupervisor { + return m.ipcManager.GetSupervisor() +} + +// getAudioInputManager returns the audio input manager +func getAudioInputManager() AudioInputInterface { + ptr := atomic.LoadPointer(&globalInputManager) + if ptr == nil { + // Create new manager + newManager := NewAudioInputManager() + if atomic.CompareAndSwapPointer(&globalInputManager, nil, unsafe.Pointer(newManager)) { + return newManager + } + // Another goroutine created it, use that one + ptr = atomic.LoadPointer(&globalInputManager) + } + return (*AudioInputManager)(ptr) +} + +// StartAudioInput starts the audio input system using the appropriate manager +func StartAudioInput() error { + manager := getAudioInputManager() + return manager.Start() +} + +// StopAudioInput stops the audio input system +func StopAudioInput() { + manager := getAudioInputManager() + manager.Stop() +} + +// WriteAudioInputFrame writes an Opus frame to the audio input system +func WriteAudioInputFrame(frame []byte) error { + manager := getAudioInputManager() + return manager.WriteOpusFrame(frame) +} + +// IsAudioInputRunning returns whether the audio input system is running +func IsAudioInputRunning() bool { + manager := getAudioInputManager() + return manager.IsRunning() +} + +// GetAudioInputMetrics returns current audio input metrics +func GetAudioInputMetrics() AudioInputMetrics { + manager := getAudioInputManager() + return manager.GetMetrics() +} + +// GetAudioInputIPCSupervisor returns the IPC supervisor +func GetAudioInputIPCSupervisor() *AudioInputSupervisor { + ptr := atomic.LoadPointer(&globalInputManager) + if ptr == nil { + return nil + } + + manager := (*AudioInputManager)(ptr) + return manager.GetSupervisor() +} + +// Helper functions + +// ResetAudioInputManagers resets the global manager (for testing) +func ResetAudioInputManagers() { + // Stop existing manager first + if ptr := atomic.LoadPointer(&globalInputManager); ptr != nil { + (*AudioInputManager)(ptr).Stop() + } + + // Reset pointer + atomic.StorePointer(&globalInputManager, nil) +} \ No newline at end of file diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go new file mode 100644 index 0000000..7dd55c5 --- /dev/null +++ b/internal/audio/input_ipc.go @@ -0,0 +1,689 @@ +package audio + +import ( + "context" + "encoding/binary" + "fmt" + "io" + "net" + "os" + "path/filepath" + "sync" + "sync/atomic" + "time" +) + +const ( + inputMagicNumber uint32 = 0x4A4B4D49 // "JKMI" (JetKVM Microphone Input) + inputSocketName = "audio_input.sock" + maxFrameSize = 4096 // Maximum Opus frame size + writeTimeout = 5 * time.Millisecond // Non-blocking write timeout + maxDroppedFrames = 100 // Maximum consecutive dropped frames before reconnect +) + +// InputMessageType represents the type of IPC message +type InputMessageType uint8 + +const ( + InputMessageTypeOpusFrame InputMessageType = iota + InputMessageTypeConfig + InputMessageTypeStop + InputMessageTypeHeartbeat + InputMessageTypeAck +) + +// InputIPCMessage represents a message sent over IPC +type InputIPCMessage struct { + Magic uint32 + Type InputMessageType + Length uint32 + Timestamp int64 + Data []byte +} + +// InputIPCConfig represents configuration for audio input +type InputIPCConfig struct { + SampleRate int + Channels int + FrameSize int +} + +// AudioInputServer handles IPC communication for audio input processing +type AudioInputServer struct { + // Atomic fields must be first for proper alignment on ARM + bufferSize int64 // Current buffer size (atomic) + processingTime int64 // Average processing time in nanoseconds (atomic) + droppedFrames int64 // Dropped frames counter (atomic) + totalFrames int64 // Total frames counter (atomic) + + listener net.Listener + conn net.Conn + mtx sync.Mutex + running bool + + // Triple-goroutine architecture + messageChan chan *InputIPCMessage // Buffered channel for incoming messages + processChan chan *InputIPCMessage // Buffered channel for processing queue + stopChan chan struct{} // Stop signal for all goroutines + wg sync.WaitGroup // Wait group for goroutine coordination +} + +// NewAudioInputServer creates a new audio input server +func NewAudioInputServer() (*AudioInputServer, error) { + socketPath := getInputSocketPath() + // Remove existing socket if any + os.Remove(socketPath) + + listener, err := net.Listen("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("failed to create unix socket: %w", err) + } + + // Initialize with adaptive buffer size (start with 1000 frames) + initialBufferSize := int64(1000) + + return &AudioInputServer{ + listener: listener, + messageChan: make(chan *InputIPCMessage, initialBufferSize), + processChan: make(chan *InputIPCMessage, initialBufferSize), + stopChan: make(chan struct{}), + bufferSize: initialBufferSize, + }, nil +} + +// Start starts the audio input server +func (ais *AudioInputServer) Start() error { + ais.mtx.Lock() + defer ais.mtx.Unlock() + + if ais.running { + return fmt.Errorf("server already running") + } + + ais.running = true + + // Start triple-goroutine architecture + ais.startReaderGoroutine() + ais.startProcessorGoroutine() + ais.startMonitorGoroutine() + + // Accept connections in a goroutine + go ais.acceptConnections() + + return nil +} + +// Stop stops the audio input server +func (ais *AudioInputServer) Stop() { + ais.mtx.Lock() + defer ais.mtx.Unlock() + + if !ais.running { + return + } + + ais.running = false + + // Signal all goroutines to stop + close(ais.stopChan) + ais.wg.Wait() + + if ais.conn != nil { + ais.conn.Close() + ais.conn = nil + } + + if ais.listener != nil { + ais.listener.Close() + } +} + +// Close closes the server and cleans up resources +func (ais *AudioInputServer) Close() { + ais.Stop() + // Remove socket file + os.Remove(getInputSocketPath()) +} + +// acceptConnections accepts incoming connections +func (ais *AudioInputServer) acceptConnections() { + for ais.running { + conn, err := ais.listener.Accept() + if err != nil { + if ais.running { + // Only log error if we're still supposed to be running + continue + } + return + } + + ais.mtx.Lock() + // Close existing connection if any + if ais.conn != nil { + ais.conn.Close() + } + ais.conn = conn + ais.mtx.Unlock() + + // Handle this connection + go ais.handleConnection(conn) + } +} + +// handleConnection handles a single client connection +func (ais *AudioInputServer) handleConnection(conn net.Conn) { + defer conn.Close() + + // Connection is now handled by the reader goroutine + // Just wait for connection to close or stop signal + for { + select { + case <-ais.stopChan: + return + default: + // Check if connection is still alive + if ais.conn == nil { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + +// readMessage reads a complete message from the connection +func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error) { + // Read header (magic + type + length + timestamp) + headerSize := 4 + 1 + 4 + 8 // uint32 + uint8 + uint32 + int64 + header := make([]byte, headerSize) + + _, err := io.ReadFull(conn, header) + if err != nil { + return nil, err + } + + // Parse header + msg := &InputIPCMessage{} + msg.Magic = binary.LittleEndian.Uint32(header[0:4]) + msg.Type = InputMessageType(header[4]) + msg.Length = binary.LittleEndian.Uint32(header[5:9]) + msg.Timestamp = int64(binary.LittleEndian.Uint64(header[9:17])) + + // Validate magic number + if msg.Magic != inputMagicNumber { + return nil, fmt.Errorf("invalid magic number: %x", msg.Magic) + } + + // Validate message length + if msg.Length > maxFrameSize { + return nil, fmt.Errorf("message too large: %d bytes", msg.Length) + } + + // Read data if present + if msg.Length > 0 { + msg.Data = make([]byte, msg.Length) + _, err = io.ReadFull(conn, msg.Data) + if err != nil { + return nil, err + } + } + + return msg, nil +} + +// processMessage processes a received message +func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error { + switch msg.Type { + case InputMessageTypeOpusFrame: + return ais.processOpusFrame(msg.Data) + case InputMessageTypeConfig: + return ais.processConfig(msg.Data) + case InputMessageTypeStop: + return fmt.Errorf("stop message received") + case InputMessageTypeHeartbeat: + return ais.sendAck() + default: + return fmt.Errorf("unknown message type: %d", msg.Type) + } +} + +// processOpusFrame processes an Opus audio frame +func (ais *AudioInputServer) processOpusFrame(data []byte) error { + if len(data) == 0 { + return nil // Empty frame, ignore + } + + // Process the Opus frame using CGO + _, err := CGOAudioDecodeWrite(data) + return err +} + +// processConfig processes a configuration update +func (ais *AudioInputServer) processConfig(data []byte) error { + // For now, just acknowledge the config + // TODO: Parse and apply configuration + return ais.sendAck() +} + +// sendAck sends an acknowledgment message +func (ais *AudioInputServer) sendAck() error { + ais.mtx.Lock() + defer ais.mtx.Unlock() + + if ais.conn == nil { + return fmt.Errorf("no connection") + } + + msg := &InputIPCMessage{ + Magic: inputMagicNumber, + Type: InputMessageTypeAck, + Length: 0, + Timestamp: time.Now().UnixNano(), + } + + return ais.writeMessage(ais.conn, msg) +} + +// writeMessage writes a message to the connection +func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error { + // Prepare header + headerSize := 4 + 1 + 4 + 8 + header := make([]byte, headerSize) + + binary.LittleEndian.PutUint32(header[0:4], msg.Magic) + header[4] = byte(msg.Type) + binary.LittleEndian.PutUint32(header[5:9], msg.Length) + binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp)) + + // Write header + _, err := conn.Write(header) + if err != nil { + return err + } + + // Write data if present + if msg.Length > 0 && msg.Data != nil { + _, err = conn.Write(msg.Data) + if err != nil { + return err + } + } + + return nil +} + +// AudioInputClient handles IPC communication from the main process +type AudioInputClient struct { + // Atomic fields must be first for proper alignment on ARM + droppedFrames int64 // Atomic counter for dropped frames + totalFrames int64 // Atomic counter for total frames + + conn net.Conn + mtx sync.Mutex + running bool +} + +// NewAudioInputClient creates a new audio input client +func NewAudioInputClient() *AudioInputClient { + return &AudioInputClient{} +} + +// Connect connects to the audio input server +func (aic *AudioInputClient) Connect() error { + aic.mtx.Lock() + defer aic.mtx.Unlock() + + if aic.running { + return nil // Already connected + } + + socketPath := getInputSocketPath() + // Try connecting multiple times as the server might not be ready + for i := 0; i < 5; i++ { + conn, err := net.Dial("unix", socketPath) + if err == nil { + aic.conn = conn + aic.running = true + return nil + } + time.Sleep(time.Second) + } + + return fmt.Errorf("failed to connect to audio input server") +} + +// Disconnect disconnects from the audio input server +func (aic *AudioInputClient) Disconnect() { + aic.mtx.Lock() + defer aic.mtx.Unlock() + + if !aic.running { + return + } + + aic.running = false + + if aic.conn != nil { + // Send stop message + msg := &InputIPCMessage{ + Magic: inputMagicNumber, + Type: InputMessageTypeStop, + Length: 0, + Timestamp: time.Now().UnixNano(), + } + aic.writeMessage(msg) // Ignore errors during shutdown + + aic.conn.Close() + aic.conn = nil + } +} + +// SendFrame sends an Opus frame to the audio input server +func (aic *AudioInputClient) SendFrame(frame []byte) error { + aic.mtx.Lock() + defer aic.mtx.Unlock() + + if !aic.running || aic.conn == nil { + return fmt.Errorf("not connected") + } + + if len(frame) == 0 { + return nil // Empty frame, ignore + } + + if len(frame) > maxFrameSize { + return fmt.Errorf("frame too large: %d bytes", len(frame)) + } + + msg := &InputIPCMessage{ + Magic: inputMagicNumber, + Type: InputMessageTypeOpusFrame, + Length: uint32(len(frame)), + Timestamp: time.Now().UnixNano(), + Data: frame, + } + + return aic.writeMessage(msg) +} + +// SendConfig sends a configuration update to the audio input server +func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error { + aic.mtx.Lock() + defer aic.mtx.Unlock() + + if !aic.running || aic.conn == nil { + return fmt.Errorf("not connected") + } + + // Serialize config (simple binary format) + data := make([]byte, 12) // 3 * int32 + binary.LittleEndian.PutUint32(data[0:4], uint32(config.SampleRate)) + binary.LittleEndian.PutUint32(data[4:8], uint32(config.Channels)) + binary.LittleEndian.PutUint32(data[8:12], uint32(config.FrameSize)) + + msg := &InputIPCMessage{ + Magic: inputMagicNumber, + Type: InputMessageTypeConfig, + Length: uint32(len(data)), + Timestamp: time.Now().UnixNano(), + Data: data, + } + + return aic.writeMessage(msg) +} + +// SendHeartbeat sends a heartbeat message +func (aic *AudioInputClient) SendHeartbeat() error { + aic.mtx.Lock() + defer aic.mtx.Unlock() + + if !aic.running || aic.conn == nil { + return fmt.Errorf("not connected") + } + + msg := &InputIPCMessage{ + Magic: inputMagicNumber, + Type: InputMessageTypeHeartbeat, + Length: 0, + Timestamp: time.Now().UnixNano(), + } + + return aic.writeMessage(msg) +} + +// writeMessage writes a message to the server +func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error { + // Increment total frames counter + atomic.AddInt64(&aic.totalFrames, 1) + + // Prepare header + headerSize := 4 + 1 + 4 + 8 + header := make([]byte, headerSize) + + binary.LittleEndian.PutUint32(header[0:4], msg.Magic) + header[4] = byte(msg.Type) + binary.LittleEndian.PutUint32(header[5:9], msg.Length) + binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp)) + + // Use non-blocking write with timeout + ctx, cancel := context.WithTimeout(context.Background(), writeTimeout) + defer cancel() + + // Create a channel to signal write completion + done := make(chan error, 1) + go func() { + // Write header + _, err := aic.conn.Write(header) + if err != nil { + done <- err + return + } + + // Write data if present + if msg.Length > 0 && msg.Data != nil { + _, err = aic.conn.Write(msg.Data) + if err != nil { + done <- err + return + } + } + done <- nil + }() + + // Wait for completion or timeout + select { + case err := <-done: + if err != nil { + atomic.AddInt64(&aic.droppedFrames, 1) + return err + } + return nil + case <-ctx.Done(): + // Timeout occurred - drop frame to prevent blocking + atomic.AddInt64(&aic.droppedFrames, 1) + return fmt.Errorf("write timeout - frame dropped") + } +} + +// IsConnected returns whether the client is connected +func (aic *AudioInputClient) IsConnected() bool { + aic.mtx.Lock() + defer aic.mtx.Unlock() + return aic.running && aic.conn != nil +} + +// GetFrameStats returns frame statistics +func (aic *AudioInputClient) GetFrameStats() (total, dropped int64) { + return atomic.LoadInt64(&aic.totalFrames), atomic.LoadInt64(&aic.droppedFrames) +} + +// GetDropRate returns the current frame drop rate as a percentage +func (aic *AudioInputClient) GetDropRate() float64 { + total := atomic.LoadInt64(&aic.totalFrames) + dropped := atomic.LoadInt64(&aic.droppedFrames) + if total == 0 { + return 0.0 + } + return float64(dropped) / float64(total) * 100.0 +} + +// ResetStats resets frame statistics +func (aic *AudioInputClient) ResetStats() { + atomic.StoreInt64(&aic.totalFrames, 0) + atomic.StoreInt64(&aic.droppedFrames, 0) +} + +// startReaderGoroutine starts the message reader goroutine +func (ais *AudioInputServer) startReaderGoroutine() { + ais.wg.Add(1) + go func() { + defer ais.wg.Done() + for { + select { + case <-ais.stopChan: + return + default: + if ais.conn != nil { + msg, err := ais.readMessage(ais.conn) + if err != nil { + continue // Connection error, retry + } + // Send to message channel with non-blocking write + select { + case ais.messageChan <- msg: + atomic.AddInt64(&ais.totalFrames, 1) + default: + // Channel full, drop message + atomic.AddInt64(&ais.droppedFrames, 1) + } + } + } + } + }() +} + +// startProcessorGoroutine starts the message processor goroutine +func (ais *AudioInputServer) startProcessorGoroutine() { + ais.wg.Add(1) + go func() { + defer ais.wg.Done() + for { + select { + case <-ais.stopChan: + return + case msg := <-ais.messageChan: + // Intelligent frame dropping: prioritize recent frames + if msg.Type == InputMessageTypeOpusFrame { + // Check if processing queue is getting full + queueLen := len(ais.processChan) + bufferSize := int(atomic.LoadInt64(&ais.bufferSize)) + + if queueLen > bufferSize*3/4 { + // Drop oldest frames, keep newest + select { + case <-ais.processChan: // Remove oldest + atomic.AddInt64(&ais.droppedFrames, 1) + default: + } + } + } + + // Send to processing queue + select { + case ais.processChan <- msg: + default: + // Processing queue full, drop frame + atomic.AddInt64(&ais.droppedFrames, 1) + } + } + } + }() +} + +// startMonitorGoroutine starts the performance monitoring goroutine +func (ais *AudioInputServer) startMonitorGoroutine() { + ais.wg.Add(1) + go func() { + defer ais.wg.Done() + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ais.stopChan: + return + case <-ticker.C: + // Process frames from processing queue + for { + select { + case msg := <-ais.processChan: + start := time.Now() + err := ais.processMessage(msg) + processingTime := time.Since(start).Nanoseconds() + + // Update average processing time + currentAvg := atomic.LoadInt64(&ais.processingTime) + newAvg := (currentAvg + processingTime) / 2 + atomic.StoreInt64(&ais.processingTime, newAvg) + + if err != nil { + atomic.AddInt64(&ais.droppedFrames, 1) + } + default: + // No more messages to process + goto adaptiveBuffering + } + } + + adaptiveBuffering: + // Adaptive buffer sizing based on processing time + avgTime := atomic.LoadInt64(&ais.processingTime) + currentSize := atomic.LoadInt64(&ais.bufferSize) + + if avgTime > 10*1000*1000 { // > 10ms processing time + // Increase buffer size + newSize := currentSize * 2 + if newSize > 1000 { + newSize = 1000 + } + atomic.StoreInt64(&ais.bufferSize, newSize) + } else if avgTime < 1*1000*1000 { // < 1ms processing time + // Decrease buffer size + newSize := currentSize / 2 + if newSize < 50 { + newSize = 50 + } + atomic.StoreInt64(&ais.bufferSize, newSize) + } + } + } + }() +} + +// GetServerStats returns server performance statistics +func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessingTime time.Duration, bufferSize int64) { + return atomic.LoadInt64(&ais.totalFrames), + atomic.LoadInt64(&ais.droppedFrames), + time.Duration(atomic.LoadInt64(&ais.processingTime)), + atomic.LoadInt64(&ais.bufferSize) +} + +// Helper functions + +// getInputSocketPath returns the path to the input socket +func getInputSocketPath() string { + if path := os.Getenv("JETKVM_AUDIO_INPUT_SOCKET"); path != "" { + return path + } + return filepath.Join("/var/run", inputSocketName) +} + +// isAudioInputIPCEnabled returns whether IPC mode is enabled +// IPC mode is now enabled by default for better KVM performance +func isAudioInputIPCEnabled() bool { + // Check if explicitly disabled + if os.Getenv("JETKVM_AUDIO_INPUT_IPC") == "false" { + return false + } + // Default to enabled (IPC mode) + return true +} \ No newline at end of file diff --git a/internal/audio/input_ipc_manager.go b/internal/audio/input_ipc_manager.go new file mode 100644 index 0000000..906be14 --- /dev/null +++ b/internal/audio/input_ipc_manager.go @@ -0,0 +1,190 @@ +package audio + +import ( + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// AudioInputIPCManager manages microphone input using IPC when enabled +type AudioInputIPCManager struct { + // metrics MUST be first for ARM32 alignment (contains int64 fields) + metrics AudioInputMetrics + + supervisor *AudioInputSupervisor + logger zerolog.Logger + running int32 +} + +// NewAudioInputIPCManager creates a new IPC-based audio input manager +func NewAudioInputIPCManager() *AudioInputIPCManager { + return &AudioInputIPCManager{ + supervisor: NewAudioInputSupervisor(), + logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(), + } +} + +// Start starts the IPC-based audio input system +func (aim *AudioInputIPCManager) Start() error { + if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) { + return nil // Already running + } + + aim.logger.Info().Msg("Starting IPC-based audio input system") + + // Start the supervisor which will launch the subprocess + err := aim.supervisor.Start() + if err != nil { + atomic.StoreInt32(&aim.running, 0) + return err + } + + // Send initial configuration + config := InputIPCConfig{ + SampleRate: 48000, + Channels: 2, + FrameSize: 960, // 20ms at 48kHz + } + + // Wait a bit for the subprocess to be ready + time.Sleep(time.Second) + + err = aim.supervisor.SendConfig(config) + if err != nil { + aim.logger.Warn().Err(err).Msg("Failed to send initial config to audio input server") + // Don't fail startup for config errors + } + + aim.logger.Info().Msg("IPC-based audio input system started") + return nil +} + +// Stop stops the IPC-based audio input system +func (aim *AudioInputIPCManager) Stop() { + if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) { + return // Already stopped + } + + aim.logger.Info().Msg("Stopping IPC-based audio input system") + + // Stop the supervisor + aim.supervisor.Stop() + + aim.logger.Info().Msg("IPC-based audio input system stopped") +} + +// WriteOpusFrame sends an Opus frame to the audio input server via IPC +func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error { + if atomic.LoadInt32(&aim.running) == 0 { + return nil // Not running, silently ignore + } + + if len(frame) == 0 { + return nil // Empty frame, ignore + } + + // Start latency measurement + startTime := time.Now() + + // Update metrics + atomic.AddInt64(&aim.metrics.FramesSent, 1) + atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame))) + aim.metrics.LastFrameTime = startTime + + // Send frame via IPC + err := aim.supervisor.SendFrame(frame) + if err != nil { + // Count as dropped frame + atomic.AddInt64(&aim.metrics.FramesDropped, 1) + aim.logger.Debug().Err(err).Msg("Failed to send frame via IPC") + return err + } + + // Calculate and update latency + latency := time.Since(startTime) + aim.updateLatencyMetrics(latency) + + return nil +} + +// IsRunning returns whether the IPC audio input system is running +func (aim *AudioInputIPCManager) IsRunning() bool { + return atomic.LoadInt32(&aim.running) == 1 +} + +// GetMetrics returns current metrics +func (aim *AudioInputIPCManager) GetMetrics() AudioInputMetrics { + return AudioInputMetrics{ + FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), + FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), + BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), + ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops), + AverageLatency: aim.metrics.AverageLatency, // TODO: Calculate actual latency + LastFrameTime: aim.metrics.LastFrameTime, + } +} + +// updateLatencyMetrics updates the latency metrics with exponential moving average +func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) { + // Use exponential moving average for smooth latency calculation + currentAvg := aim.metrics.AverageLatency + if currentAvg == 0 { + aim.metrics.AverageLatency = latency + } else { + // EMA with alpha = 0.1 for smooth averaging + aim.metrics.AverageLatency = time.Duration(float64(currentAvg)*0.9 + float64(latency)*0.1) + } +} + +// GetDetailedMetrics returns comprehensive performance metrics +func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) { + metrics := aim.GetMetrics() + + // Get client frame statistics + client := aim.supervisor.GetClient() + totalFrames, droppedFrames := int64(0), int64(0) + dropRate := 0.0 + if client != nil { + totalFrames, droppedFrames = client.GetFrameStats() + dropRate = client.GetDropRate() + } + + // Get server statistics if available + serverStats := make(map[string]interface{}) + if aim.supervisor.IsRunning() { + // Note: Server stats would need to be exposed through IPC + serverStats["status"] = "running" + } else { + serverStats["status"] = "stopped" + } + + detailedStats := map[string]interface{}{ + "client_total_frames": totalFrames, + "client_dropped_frames": droppedFrames, + "client_drop_rate": dropRate, + "server_stats": serverStats, + "ipc_latency_ms": float64(metrics.AverageLatency.Nanoseconds()) / 1e6, + "frames_per_second": aim.calculateFrameRate(), + } + + return metrics, detailedStats +} + +// calculateFrameRate calculates the current frame rate +func (aim *AudioInputIPCManager) calculateFrameRate() float64 { + framesSent := atomic.LoadInt64(&aim.metrics.FramesSent) + if framesSent == 0 { + return 0.0 + } + + // Estimate based on recent activity (simplified) + // In a real implementation, you'd track frames over time windows + return 50.0 // Typical Opus frame rate +} + +// GetSupervisor returns the supervisor for advanced operations +func (aim *AudioInputIPCManager) GetSupervisor() *AudioInputSupervisor { + return aim.supervisor +} diff --git a/internal/audio/input_server_main.go b/internal/audio/input_server_main.go new file mode 100644 index 0000000..6ce66f1 --- /dev/null +++ b/internal/audio/input_server_main.go @@ -0,0 +1,72 @@ +package audio + +import ( + "context" + "os" + "os/signal" + "syscall" + "time" + + "github.com/jetkvm/kvm/internal/logging" +) + +// IsAudioInputServerProcess detects if we're running as the audio input server subprocess +func IsAudioInputServerProcess() bool { + return os.Getenv("JETKVM_AUDIO_INPUT_SERVER") == "true" +} + +// RunAudioInputServer runs the audio input server subprocess +// This should be called from main() when the subprocess is detected +func RunAudioInputServer() error { + logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger() + logger.Info().Msg("Starting audio input server subprocess") + + // Initialize CGO audio system + err := CGOAudioPlaybackInit() + if err != nil { + logger.Error().Err(err).Msg("Failed to initialize CGO audio playback") + return err + } + defer CGOAudioPlaybackClose() + + // Create and start the IPC server + server, err := NewAudioInputServer() + if err != nil { + logger.Error().Err(err).Msg("Failed to create audio input server") + return err + } + defer server.Close() + + err = server.Start() + if err != nil { + logger.Error().Err(err).Msg("Failed to start audio input server") + return err + } + + logger.Info().Msg("Audio input server started, waiting for connections") + + // Set up signal handling for graceful shutdown + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + // Wait for shutdown signal + select { + case sig := <-sigChan: + logger.Info().Str("signal", sig.String()).Msg("Received shutdown signal") + case <-ctx.Done(): + logger.Info().Msg("Context cancelled") + } + + // Graceful shutdown + logger.Info().Msg("Shutting down audio input server") + server.Stop() + + // Give some time for cleanup + time.Sleep(100 * time.Millisecond) + + logger.Info().Msg("Audio input server subprocess stopped") + return nil +} diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go new file mode 100644 index 0000000..229e0aa --- /dev/null +++ b/internal/audio/input_supervisor.go @@ -0,0 +1,225 @@ +package audio + +import ( + "context" + "fmt" + "os" + "os/exec" + "sync" + "syscall" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// AudioInputSupervisor manages the audio input server subprocess +type AudioInputSupervisor struct { + cmd *exec.Cmd + cancel context.CancelFunc + mtx sync.Mutex + running bool + logger zerolog.Logger + client *AudioInputClient +} + +// NewAudioInputSupervisor creates a new audio input supervisor +func NewAudioInputSupervisor() *AudioInputSupervisor { + return &AudioInputSupervisor{ + logger: logging.GetDefaultLogger().With().Str("component", "audio-input-supervisor").Logger(), + client: NewAudioInputClient(), + } +} + +// Start starts the audio input server subprocess +func (ais *AudioInputSupervisor) Start() error { + ais.mtx.Lock() + defer ais.mtx.Unlock() + + if ais.running { + return fmt.Errorf("audio input supervisor already running") + } + + // Create context for subprocess management + ctx, cancel := context.WithCancel(context.Background()) + ais.cancel = cancel + + // Get current executable path + execPath, err := os.Executable() + if err != nil { + return fmt.Errorf("failed to get executable path: %w", err) + } + + // Create command for audio input server subprocess + cmd := exec.CommandContext(ctx, execPath) + cmd.Env = append(os.Environ(), + "JETKVM_AUDIO_INPUT_SERVER=true", // Flag to indicate this is the input server process + "JETKVM_AUDIO_INPUT_IPC=true", // Enable IPC mode + ) + + // Set process group to allow clean termination + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + ais.cmd = cmd + ais.running = true + + // Start the subprocess + err = cmd.Start() + if err != nil { + ais.running = false + cancel() + return fmt.Errorf("failed to start audio input server: %w", err) + } + + ais.logger.Info().Int("pid", cmd.Process.Pid).Msg("Audio input server subprocess started") + + // Monitor the subprocess in a goroutine + go ais.monitorSubprocess() + + // Connect client to the server + go ais.connectClient() + + return nil +} + +// Stop stops the audio input server subprocess +func (ais *AudioInputSupervisor) Stop() { + ais.mtx.Lock() + defer ais.mtx.Unlock() + + if !ais.running { + return + } + + ais.running = false + + // Disconnect client first + if ais.client != nil { + ais.client.Disconnect() + } + + // Cancel context to signal subprocess to stop + if ais.cancel != nil { + ais.cancel() + } + + // Try graceful termination first + if ais.cmd != nil && ais.cmd.Process != nil { + ais.logger.Info().Int("pid", ais.cmd.Process.Pid).Msg("Stopping audio input server subprocess") + + // Send SIGTERM + err := ais.cmd.Process.Signal(syscall.SIGTERM) + if err != nil { + ais.logger.Warn().Err(err).Msg("Failed to send SIGTERM to audio input server") + } + + // Wait for graceful shutdown with timeout + done := make(chan error, 1) + go func() { + done <- ais.cmd.Wait() + }() + + select { + case <-done: + ais.logger.Info().Msg("Audio input server subprocess stopped gracefully") + case <-time.After(5 * time.Second): + // Force kill if graceful shutdown failed + ais.logger.Warn().Msg("Audio input server subprocess did not stop gracefully, force killing") + err := ais.cmd.Process.Kill() + if err != nil { + ais.logger.Error().Err(err).Msg("Failed to kill audio input server subprocess") + } + } + } + + ais.cmd = nil + ais.cancel = nil +} + +// IsRunning returns whether the supervisor is running +func (ais *AudioInputSupervisor) IsRunning() bool { + ais.mtx.Lock() + defer ais.mtx.Unlock() + return ais.running +} + +// GetClient returns the IPC client for sending audio frames +func (ais *AudioInputSupervisor) GetClient() *AudioInputClient { + return ais.client +} + +// monitorSubprocess monitors the subprocess and handles unexpected exits +func (ais *AudioInputSupervisor) monitorSubprocess() { + if ais.cmd == nil { + return + } + + err := ais.cmd.Wait() + + ais.mtx.Lock() + defer ais.mtx.Unlock() + + if ais.running { + // Unexpected exit + if err != nil { + ais.logger.Error().Err(err).Msg("Audio input server subprocess exited unexpectedly") + } else { + ais.logger.Warn().Msg("Audio input server subprocess exited unexpectedly") + } + + // Disconnect client + if ais.client != nil { + ais.client.Disconnect() + } + + // Mark as not running + ais.running = false + ais.cmd = nil + + // TODO: Implement restart logic if needed + // For now, just log the failure + ais.logger.Info().Msg("Audio input server subprocess monitoring stopped") + } +} + +// connectClient attempts to connect the client to the server +func (ais *AudioInputSupervisor) connectClient() { + // Wait a bit for the server to start + time.Sleep(500 * time.Millisecond) + + err := ais.client.Connect() + if err != nil { + ais.logger.Error().Err(err).Msg("Failed to connect to audio input server") + return + } + + ais.logger.Info().Msg("Connected to audio input server") +} + +// SendFrame sends an audio frame to the subprocess (convenience method) +func (ais *AudioInputSupervisor) SendFrame(frame []byte) error { + if ais.client == nil { + return fmt.Errorf("client not initialized") + } + + if !ais.client.IsConnected() { + return fmt.Errorf("client not connected") + } + + return ais.client.SendFrame(frame) +} + +// SendConfig sends a configuration update to the subprocess (convenience method) +func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error { + if ais.client == nil { + return fmt.Errorf("client not initialized") + } + + if !ais.client.IsConnected() { + return fmt.Errorf("client not connected") + } + + return ais.client.SendConfig(config) +} diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go new file mode 100644 index 0000000..a8e5984 --- /dev/null +++ b/internal/audio/ipc.go @@ -0,0 +1,128 @@ +package audio + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "os" + "path/filepath" + "sync" + "time" +) + +const ( + magicNumber uint32 = 0x4A4B564D // "JKVM" + socketName = "audio_output.sock" +) + +type AudioServer struct { + listener net.Listener + conn net.Conn + mtx sync.Mutex +} + +func NewAudioServer() (*AudioServer, error) { + socketPath := filepath.Join("/var/run", socketName) + // Remove existing socket if any + os.Remove(socketPath) + + listener, err := net.Listen("unix", socketPath) + if err != nil { + return nil, fmt.Errorf("failed to create unix socket: %w", err) + } + + return &AudioServer{listener: listener}, nil +} + +func (s *AudioServer) Start() error { + conn, err := s.listener.Accept() + if err != nil { + return fmt.Errorf("failed to accept connection: %w", err) + } + s.conn = conn + return nil +} + +func (s *AudioServer) Close() error { + if s.conn != nil { + s.conn.Close() + } + return s.listener.Close() +} + +func (s *AudioServer) SendFrame(frame []byte) error { + s.mtx.Lock() + defer s.mtx.Unlock() + + if s.conn == nil { + return fmt.Errorf("no client connected") + } + + // Write magic number + if err := binary.Write(s.conn, binary.BigEndian, magicNumber); err != nil { + return fmt.Errorf("failed to write magic number: %w", err) + } + + // Write frame size + if err := binary.Write(s.conn, binary.BigEndian, uint32(len(frame))); err != nil { + return fmt.Errorf("failed to write frame size: %w", err) + } + + // Write frame data + if _, err := s.conn.Write(frame); err != nil { + return fmt.Errorf("failed to write frame data: %w", err) + } + + return nil +} + +type AudioClient struct { + conn net.Conn + mtx sync.Mutex +} + +func NewAudioClient() (*AudioClient, error) { + socketPath := filepath.Join("/var/run", socketName) + // Try connecting multiple times as the server might not be ready + for i := 0; i < 5; i++ { + conn, err := net.Dial("unix", socketPath) + if err == nil { + return &AudioClient{conn: conn}, nil + } + time.Sleep(time.Second) + } + return nil, fmt.Errorf("failed to connect to audio server") +} + +func (c *AudioClient) Close() error { + return c.conn.Close() +} + +func (c *AudioClient) ReceiveFrame() ([]byte, error) { + c.mtx.Lock() + defer c.mtx.Unlock() + + // Read magic number + var magic uint32 + if err := binary.Read(c.conn, binary.BigEndian, &magic); err != nil { + return nil, fmt.Errorf("failed to read magic number: %w", err) + } + if magic != magicNumber { + return nil, fmt.Errorf("invalid magic number: %x", magic) + } + + // Read frame size + var size uint32 + if err := binary.Read(c.conn, binary.BigEndian, &size); err != nil { + return nil, fmt.Errorf("failed to read frame size: %w", err) + } + + // Read frame data + frame := make([]byte, size) + if _, err := io.ReadFull(c.conn, frame); err != nil { + return nil, fmt.Errorf("failed to read frame data: %w", err) + } + + return frame, nil +} diff --git a/internal/audio/nonblocking_api.go b/internal/audio/nonblocking_api.go deleted file mode 100644 index 4e67df3..0000000 --- a/internal/audio/nonblocking_api.go +++ /dev/null @@ -1,115 +0,0 @@ -package audio - -import ( - "sync/atomic" - "unsafe" -) - -var ( - // Use unsafe.Pointer for atomic operations instead of mutex - globalNonBlockingManager unsafe.Pointer // *NonBlockingAudioManager -) - -// loadManager atomically loads the global manager -func loadManager() *NonBlockingAudioManager { - ptr := atomic.LoadPointer(&globalNonBlockingManager) - if ptr == nil { - return nil - } - return (*NonBlockingAudioManager)(ptr) -} - -// storeManager atomically stores the global manager -func storeManager(manager *NonBlockingAudioManager) { - atomic.StorePointer(&globalNonBlockingManager, unsafe.Pointer(manager)) -} - -// compareAndSwapManager atomically compares and swaps the global manager -func compareAndSwapManager(old, new *NonBlockingAudioManager) bool { - return atomic.CompareAndSwapPointer(&globalNonBlockingManager, - unsafe.Pointer(old), unsafe.Pointer(new)) -} - -// StartNonBlockingAudioStreaming starts the non-blocking audio streaming system -func StartNonBlockingAudioStreaming(send func([]byte)) error { - manager := loadManager() - if manager != nil && manager.IsOutputRunning() { - return nil // Already running, this is not an error - } - - if manager == nil { - newManager := NewNonBlockingAudioManager() - if !compareAndSwapManager(nil, newManager) { - // Another goroutine created manager, use it - manager = loadManager() - } else { - manager = newManager - } - } - - return manager.StartAudioOutput(send) -} - -// StartNonBlockingAudioInput starts the non-blocking audio input system -func StartNonBlockingAudioInput(receiveChan <-chan []byte) error { - manager := loadManager() - if manager == nil { - newManager := NewNonBlockingAudioManager() - if !compareAndSwapManager(nil, newManager) { - // Another goroutine created manager, use it - manager = loadManager() - } else { - manager = newManager - } - } - - // Check if input is already running to avoid unnecessary operations - if manager.IsInputRunning() { - return nil // Already running, this is not an error - } - - return manager.StartAudioInput(receiveChan) -} - -// StopNonBlockingAudioStreaming stops the non-blocking audio streaming system -func StopNonBlockingAudioStreaming() { - manager := loadManager() - if manager != nil { - manager.Stop() - storeManager(nil) - } -} - -// StopNonBlockingAudioInput stops only the audio input without affecting output -func StopNonBlockingAudioInput() { - manager := loadManager() - if manager != nil && manager.IsInputRunning() { - manager.StopAudioInput() - - // If both input and output are stopped, recreate manager to ensure clean state - if !manager.IsRunning() { - storeManager(nil) - } - } -} - -// GetNonBlockingAudioStats returns statistics from the non-blocking audio system -func GetNonBlockingAudioStats() NonBlockingAudioStats { - manager := loadManager() - if manager != nil { - return manager.GetStats() - } - return NonBlockingAudioStats{} -} - -// IsNonBlockingAudioRunning returns true if the non-blocking audio system is running -func IsNonBlockingAudioRunning() bool { - manager := loadManager() - return manager != nil && manager.IsRunning() -} - -// IsNonBlockingAudioInputRunning returns true if the non-blocking audio input is running -func IsNonBlockingAudioInputRunning() bool { - manager := loadManager() - return manager != nil && manager.IsInputRunning() -} diff --git a/internal/audio/nonblocking_audio.go b/internal/audio/nonblocking_audio.go deleted file mode 100644 index 5787a8a..0000000 --- a/internal/audio/nonblocking_audio.go +++ /dev/null @@ -1,564 +0,0 @@ -package audio - -import ( - "context" - "errors" - // "runtime" // removed: no longer directly pinning OS thread here; batching handles it - "sync" - "sync/atomic" - "time" - - "github.com/jetkvm/kvm/internal/logging" - "github.com/rs/zerolog" -) - -// NonBlockingAudioManager manages audio operations in separate worker threads -// to prevent blocking of mouse/keyboard operations -type NonBlockingAudioManager struct { - // Statistics - MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - stats NonBlockingAudioStats - - // Control - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - logger *zerolog.Logger - - // Audio output (capture from device, send to WebRTC) - outputSendFunc func([]byte) - outputWorkChan chan audioWorkItem - outputResultChan chan audioResult - - // Audio input (receive from WebRTC, playback to device) - inputReceiveChan <-chan []byte - inputWorkChan chan audioWorkItem - inputResultChan chan audioResult - - // Worker threads and flags - int32 fields grouped together - outputRunning int32 - inputRunning int32 - outputWorkerRunning int32 - inputWorkerRunning int32 -} - -type audioWorkItem struct { - workType audioWorkType - data []byte - resultChan chan audioResult -} - -type audioWorkType int - -const ( - audioWorkInit audioWorkType = iota - audioWorkReadEncode - audioWorkDecodeWrite - audioWorkClose -) - -type audioResult struct { - success bool - data []byte - length int - err error -} - -type NonBlockingAudioStats struct { - // int64 fields MUST be first for ARM32 alignment - OutputFramesProcessed int64 - OutputFramesDropped int64 - InputFramesProcessed int64 - InputFramesDropped int64 - WorkerErrors int64 - // time.Time is int64 internally, so it's also aligned - LastProcessTime time.Time -} - -// NewNonBlockingAudioManager creates a new non-blocking audio manager -func NewNonBlockingAudioManager() *NonBlockingAudioManager { - ctx, cancel := context.WithCancel(context.Background()) - logger := logging.GetDefaultLogger().With().Str("component", "nonblocking-audio").Logger() - - return &NonBlockingAudioManager{ - ctx: ctx, - cancel: cancel, - logger: &logger, - outputWorkChan: make(chan audioWorkItem, 10), // Buffer for work items - outputResultChan: make(chan audioResult, 10), // Buffer for results - inputWorkChan: make(chan audioWorkItem, 10), - inputResultChan: make(chan audioResult, 10), - } -} - -// StartAudioOutput starts non-blocking audio output (capture and encode) -func (nam *NonBlockingAudioManager) StartAudioOutput(sendFunc func([]byte)) error { - if !atomic.CompareAndSwapInt32(&nam.outputRunning, 0, 1) { - return ErrAudioAlreadyRunning - } - - nam.outputSendFunc = sendFunc - - // Enable batch audio processing for performance - EnableBatchAudioProcessing() - - // Start the blocking worker thread - nam.wg.Add(1) - go nam.outputWorkerThread() - - // Start the non-blocking coordinator - nam.wg.Add(1) - go nam.outputCoordinatorThread() - - nam.logger.Info().Msg("non-blocking audio output started with batch processing") - return nil -} - -// StartAudioInput starts non-blocking audio input (receive and decode) -func (nam *NonBlockingAudioManager) StartAudioInput(receiveChan <-chan []byte) error { - if !atomic.CompareAndSwapInt32(&nam.inputRunning, 0, 1) { - return ErrAudioAlreadyRunning - } - - nam.inputReceiveChan = receiveChan - - // Enable batch audio processing for performance - EnableBatchAudioProcessing() - - // Start the blocking worker thread - nam.wg.Add(1) - go nam.inputWorkerThread() - - // Start the non-blocking coordinator - nam.wg.Add(1) - go nam.inputCoordinatorThread() - - nam.logger.Info().Msg("non-blocking audio input started with batch processing") - return nil -} - -// outputWorkerThread handles all blocking audio output operations -func (nam *NonBlockingAudioManager) outputWorkerThread() { - defer nam.wg.Done() - defer atomic.StoreInt32(&nam.outputWorkerRunning, 0) - - atomic.StoreInt32(&nam.outputWorkerRunning, 1) - nam.logger.Debug().Msg("output worker thread started") - - // Initialize audio in worker thread - if err := CGOAudioInit(); err != nil { - nam.logger.Error().Err(err).Msg("failed to initialize audio in worker thread") - return - } - defer CGOAudioClose() - - // Use buffer pool to avoid allocations - buf := GetAudioFrameBuffer() - defer PutAudioFrameBuffer(buf) - - for { - select { - case <-nam.ctx.Done(): - nam.logger.Debug().Msg("output worker thread stopping") - return - - case workItem := <-nam.outputWorkChan: - switch workItem.workType { - case audioWorkReadEncode: - n, err := BatchCGOAudioReadEncode(buf) - - result := audioResult{ - success: err == nil, - length: n, - err: err, - } - if err == nil && n > 0 { - // Get buffer from pool and copy data - resultBuf := GetAudioFrameBuffer() - copy(resultBuf[:n], buf[:n]) - result.data = resultBuf[:n] - } - - // Send result back (non-blocking) - select { - case workItem.resultChan <- result: - case <-nam.ctx.Done(): - return - default: - // Drop result if coordinator is not ready - if result.data != nil { - PutAudioFrameBuffer(result.data) - } - atomic.AddInt64(&nam.stats.OutputFramesDropped, 1) - } - - case audioWorkClose: - nam.logger.Debug().Msg("output worker received close signal") - return - } - } - } -} - -// outputCoordinatorThread coordinates audio output without blocking -func (nam *NonBlockingAudioManager) outputCoordinatorThread() { - defer nam.wg.Done() - defer atomic.StoreInt32(&nam.outputRunning, 0) - - nam.logger.Debug().Msg("output coordinator thread started") - - ticker := time.NewTicker(20 * time.Millisecond) // Match frame timing - defer ticker.Stop() - - pendingWork := false - resultChan := make(chan audioResult, 1) - - for atomic.LoadInt32(&nam.outputRunning) == 1 { - select { - case <-nam.ctx.Done(): - nam.logger.Debug().Msg("output coordinator stopping") - return - - case <-ticker.C: - // Only submit work if worker is ready and no pending work - if !pendingWork && atomic.LoadInt32(&nam.outputWorkerRunning) == 1 { - if IsAudioMuted() { - continue // Skip when muted - } - - workItem := audioWorkItem{ - workType: audioWorkReadEncode, - resultChan: resultChan, - } - - // Submit work (non-blocking) - select { - case nam.outputWorkChan <- workItem: - pendingWork = true - default: - // Worker is busy, drop this frame - atomic.AddInt64(&nam.stats.OutputFramesDropped, 1) - } - } - - case result := <-resultChan: - pendingWork = false - nam.stats.LastProcessTime = time.Now() - - if result.success && result.data != nil && result.length > 0 { - // Send to WebRTC (non-blocking) - if nam.outputSendFunc != nil { - nam.outputSendFunc(result.data) - atomic.AddInt64(&nam.stats.OutputFramesProcessed, 1) - RecordFrameReceived(result.length) - } - // Return buffer to pool after use - PutAudioFrameBuffer(result.data) - } else if result.success && result.length == 0 { - // No data available - this is normal, not an error - // Just continue without logging or counting as error - } else { - atomic.AddInt64(&nam.stats.OutputFramesDropped, 1) - atomic.AddInt64(&nam.stats.WorkerErrors, 1) - if result.err != nil { - nam.logger.Warn().Err(result.err).Msg("audio output worker error") - } - // Clean up buffer if present - if result.data != nil { - PutAudioFrameBuffer(result.data) - } - RecordFrameDropped() - } - } - } - - // Signal worker to close - select { - case nam.outputWorkChan <- audioWorkItem{workType: audioWorkClose}: - case <-time.After(100 * time.Millisecond): - nam.logger.Warn().Msg("timeout signaling output worker to close") - } - - nam.logger.Info().Msg("output coordinator thread stopped") -} - -// inputWorkerThread handles all blocking audio input operations -func (nam *NonBlockingAudioManager) inputWorkerThread() { - defer nam.wg.Done() - // Cleanup CGO resources properly to avoid double-close scenarios - // The outputWorkerThread's CGOAudioClose() will handle all cleanup - atomic.StoreInt32(&nam.inputWorkerRunning, 0) - - atomic.StoreInt32(&nam.inputWorkerRunning, 1) - nam.logger.Debug().Msg("input worker thread started") - - // Initialize audio playback in worker thread - if err := CGOAudioPlaybackInit(); err != nil { - nam.logger.Error().Err(err).Msg("failed to initialize audio playback in worker thread") - return - } - - // Ensure CGO cleanup happens even if we exit unexpectedly - cgoInitialized := true - defer func() { - if cgoInitialized { - nam.logger.Debug().Msg("cleaning up CGO audio playback") - // Add extra safety: ensure no more CGO calls can happen - atomic.StoreInt32(&nam.inputWorkerRunning, 0) - // Note: Don't call CGOAudioPlaybackClose() here to avoid double-close - // The outputWorkerThread's CGOAudioClose() will handle all cleanup - } - }() - - for { - // If coordinator has stopped, exit worker loop - if atomic.LoadInt32(&nam.inputRunning) == 0 { - return - } - select { - case <-nam.ctx.Done(): - nam.logger.Debug().Msg("input worker thread stopping due to context cancellation") - return - - case workItem := <-nam.inputWorkChan: - switch workItem.workType { - case audioWorkDecodeWrite: - // Check if we're still supposed to be running before processing - if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 || atomic.LoadInt32(&nam.inputRunning) == 0 { - nam.logger.Debug().Msg("input worker stopping, ignoring decode work") - // Do not send to resultChan; coordinator may have exited - return - } - - // Validate input data before CGO call - if workItem.data == nil || len(workItem.data) == 0 { - result := audioResult{ - success: false, - err: errors.New("invalid audio data"), - } - - // Check if coordinator is still running before sending result - if atomic.LoadInt32(&nam.inputRunning) == 1 { - select { - case workItem.resultChan <- result: - case <-nam.ctx.Done(): - return - case <-time.After(10 * time.Millisecond): - // Timeout - coordinator may have stopped, drop result - atomic.AddInt64(&nam.stats.InputFramesDropped, 1) - } - } else { - // Coordinator has stopped, drop result - atomic.AddInt64(&nam.stats.InputFramesDropped, 1) - } - continue - } - - // Perform blocking CGO operation with panic recovery - var result audioResult - func() { - defer func() { - if r := recover(); r != nil { - nam.logger.Error().Interface("panic", r).Msg("CGO decode write panic recovered") - result = audioResult{ - success: false, - err: errors.New("CGO decode write panic"), - } - } - }() - - // Double-check we're still running before CGO call - if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 { - result = audioResult{success: false, err: errors.New("worker shutting down")} - return - } - - n, err := BatchCGOAudioDecodeWrite(workItem.data) - - result = audioResult{ - success: err == nil, - length: n, - err: err, - } - }() - - // Send result back (non-blocking) - check if coordinator is still running - if atomic.LoadInt32(&nam.inputRunning) == 1 { - select { - case workItem.resultChan <- result: - case <-nam.ctx.Done(): - return - case <-time.After(10 * time.Millisecond): - // Timeout - coordinator may have stopped, drop result - atomic.AddInt64(&nam.stats.InputFramesDropped, 1) - } - } else { - // Coordinator has stopped, drop result - atomic.AddInt64(&nam.stats.InputFramesDropped, 1) - } - - case audioWorkClose: - nam.logger.Debug().Msg("input worker received close signal") - return - } - } - } -} - -// inputCoordinatorThread coordinates audio input without blocking -func (nam *NonBlockingAudioManager) inputCoordinatorThread() { - defer nam.wg.Done() - defer atomic.StoreInt32(&nam.inputRunning, 0) - - nam.logger.Debug().Msg("input coordinator thread started") - - resultChan := make(chan audioResult, 1) - // Do not close resultChan to avoid races with worker sends during shutdown - - for atomic.LoadInt32(&nam.inputRunning) == 1 { - select { - case <-nam.ctx.Done(): - nam.logger.Debug().Msg("input coordinator stopping") - return - - case frame := <-nam.inputReceiveChan: - if len(frame) == 0 { - continue - } - - // Submit work to worker (non-blocking) - if atomic.LoadInt32(&nam.inputWorkerRunning) == 1 { - workItem := audioWorkItem{ - workType: audioWorkDecodeWrite, - data: frame, - resultChan: resultChan, - } - - select { - case nam.inputWorkChan <- workItem: - // Wait for result with timeout and context cancellation - select { - case result := <-resultChan: - if result.success { - atomic.AddInt64(&nam.stats.InputFramesProcessed, 1) - } else { - atomic.AddInt64(&nam.stats.InputFramesDropped, 1) - atomic.AddInt64(&nam.stats.WorkerErrors, 1) - if result.err != nil { - nam.logger.Warn().Err(result.err).Msg("audio input worker error") - } - } - case <-nam.ctx.Done(): - nam.logger.Debug().Msg("input coordinator stopping during result wait") - return - case <-time.After(50 * time.Millisecond): - // Timeout waiting for result - atomic.AddInt64(&nam.stats.InputFramesDropped, 1) - nam.logger.Warn().Msg("timeout waiting for input worker result") - // Drain any pending result to prevent worker blocking - select { - case <-resultChan: - default: - } - } - default: - // Worker is busy, drop this frame - atomic.AddInt64(&nam.stats.InputFramesDropped, 1) - } - } - - case <-time.After(250 * time.Millisecond): - // Periodic timeout to prevent blocking - continue - } - } - - // Avoid sending close signals or touching channels here; inputRunning=0 will stop worker via checks - nam.logger.Info().Msg("input coordinator thread stopped") -} - -// Stop stops all audio operations -func (nam *NonBlockingAudioManager) Stop() { - nam.logger.Info().Msg("stopping non-blocking audio manager") - - // Signal all threads to stop - nam.cancel() - - // Stop coordinators - atomic.StoreInt32(&nam.outputRunning, 0) - atomic.StoreInt32(&nam.inputRunning, 0) - - // Wait for all goroutines to finish - nam.wg.Wait() - - // Disable batch processing to free resources - DisableBatchAudioProcessing() - - nam.logger.Info().Msg("non-blocking audio manager stopped") -} - -// StopAudioInput stops only the audio input operations -func (nam *NonBlockingAudioManager) StopAudioInput() { - nam.logger.Info().Msg("stopping audio input") - - // Stop only the input coordinator - atomic.StoreInt32(&nam.inputRunning, 0) - - // Drain the receive channel to prevent blocking senders - go func() { - for { - select { - case <-nam.inputReceiveChan: - // Drain any remaining frames - case <-time.After(100 * time.Millisecond): - return - } - } - }() - - // Wait for the worker to actually stop to prevent race conditions - timeout := time.After(2 * time.Second) - ticker := time.NewTicker(10 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-timeout: - nam.logger.Warn().Msg("timeout waiting for input worker to stop") - return - case <-ticker.C: - if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 { - nam.logger.Info().Msg("audio input stopped successfully") - // Close ALSA playback resources now that input worker has stopped - CGOAudioPlaybackClose() - return - } - } - } -} - -// GetStats returns current statistics -func (nam *NonBlockingAudioManager) GetStats() NonBlockingAudioStats { - return NonBlockingAudioStats{ - OutputFramesProcessed: atomic.LoadInt64(&nam.stats.OutputFramesProcessed), - OutputFramesDropped: atomic.LoadInt64(&nam.stats.OutputFramesDropped), - InputFramesProcessed: atomic.LoadInt64(&nam.stats.InputFramesProcessed), - InputFramesDropped: atomic.LoadInt64(&nam.stats.InputFramesDropped), - WorkerErrors: atomic.LoadInt64(&nam.stats.WorkerErrors), - LastProcessTime: nam.stats.LastProcessTime, - } -} - -// IsRunning returns true if any audio operations are running -func (nam *NonBlockingAudioManager) IsRunning() bool { - return atomic.LoadInt32(&nam.outputRunning) == 1 || atomic.LoadInt32(&nam.inputRunning) == 1 -} - -// IsInputRunning returns true if audio input is running -func (nam *NonBlockingAudioManager) IsInputRunning() bool { - return atomic.LoadInt32(&nam.inputRunning) == 1 -} - -// IsOutputRunning returns true if audio output is running -func (nam *NonBlockingAudioManager) IsOutputRunning() bool { - return atomic.LoadInt32(&nam.outputRunning) == 1 -} diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go new file mode 100644 index 0000000..5f7d72c --- /dev/null +++ b/internal/audio/output_streaming.go @@ -0,0 +1,91 @@ +package audio + +import ( + "context" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +var ( + outputStreamingRunning int32 + outputStreamingCancel context.CancelFunc + outputStreamingLogger *zerolog.Logger +) + +func init() { + logger := logging.GetDefaultLogger().With().Str("component", "audio-output").Logger() + outputStreamingLogger = &logger +} + +// StartAudioOutputStreaming starts audio output streaming (capturing system audio) +func StartAudioOutputStreaming(send func([]byte)) error { + if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { + return ErrAudioAlreadyRunning + } + + // Initialize CGO audio capture + if err := CGOAudioInit(); err != nil { + atomic.StoreInt32(&outputStreamingRunning, 0) + return err + } + + ctx, cancel := context.WithCancel(context.Background()) + outputStreamingCancel = cancel + + // Start audio capture loop + go func() { + defer func() { + CGOAudioClose() + atomic.StoreInt32(&outputStreamingRunning, 0) + outputStreamingLogger.Info().Msg("Audio output streaming stopped") + }() + + outputStreamingLogger.Info().Msg("Audio output streaming started") + buffer := make([]byte, MaxAudioFrameSize) + + for { + select { + case <-ctx.Done(): + return + default: + // Capture audio frame + n, err := CGOAudioReadEncode(buffer) + if err != nil { + outputStreamingLogger.Warn().Err(err).Msg("Failed to read/encode audio") + continue + } + if n > 0 { + // Send frame to callback + frame := make([]byte, n) + copy(frame, buffer[:n]) + send(frame) + RecordFrameReceived(n) + } + // Small delay to prevent busy waiting + time.Sleep(10 * time.Millisecond) + } + } + }() + + return nil +} + +// StopAudioOutputStreaming stops audio output streaming +func StopAudioOutputStreaming() { + if atomic.LoadInt32(&outputStreamingRunning) == 0 { + return + } + + if outputStreamingCancel != nil { + outputStreamingCancel() + outputStreamingCancel = nil + } + + // Wait for streaming to stop + for atomic.LoadInt32(&outputStreamingRunning) == 1 { + time.Sleep(10 * time.Millisecond) + } +} \ No newline at end of file diff --git a/internal/audio/relay.go b/internal/audio/relay.go new file mode 100644 index 0000000..4082747 --- /dev/null +++ b/internal/audio/relay.go @@ -0,0 +1,198 @@ +package audio + +import ( + "context" + "sync" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/pion/webrtc/v4/pkg/media" + "github.com/rs/zerolog" +) + +// AudioRelay handles forwarding audio frames from the audio server subprocess +// to WebRTC without any CGO audio processing. This runs in the main process. +type AudioRelay struct { + client *AudioClient + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + logger *zerolog.Logger + running bool + mutex sync.RWMutex + + // WebRTC integration + audioTrack AudioTrackWriter + config AudioConfig + muted bool + + // Statistics + framesRelayed int64 + framesDropped int64 +} + +// AudioTrackWriter interface for WebRTC audio track +type AudioTrackWriter interface { + WriteSample(sample media.Sample) error +} + + + +// NewAudioRelay creates a new audio relay for the main process +func NewAudioRelay() *AudioRelay { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", "audio-relay").Logger() + + return &AudioRelay{ + ctx: ctx, + cancel: cancel, + logger: &logger, + } +} + +// Start begins the audio relay process +func (r *AudioRelay) Start(audioTrack AudioTrackWriter, config AudioConfig) error { + r.mutex.Lock() + defer r.mutex.Unlock() + + if r.running { + return nil // Already running + } + + // Create audio client to connect to subprocess + client, err := NewAudioClient() + if err != nil { + return err + } + r.client = client + r.audioTrack = audioTrack + r.config = config + + // Start relay goroutine + r.wg.Add(1) + go r.relayLoop() + + r.running = true + r.logger.Info().Msg("Audio relay started") + return nil +} + +// Stop stops the audio relay +func (r *AudioRelay) Stop() { + r.mutex.Lock() + defer r.mutex.Unlock() + + if !r.running { + return + } + + r.cancel() + r.wg.Wait() + + if r.client != nil { + r.client.Close() + r.client = nil + } + + r.running = false + r.logger.Info().Msg("Audio relay stopped") +} + +// SetMuted sets the mute state +func (r *AudioRelay) SetMuted(muted bool) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.muted = muted +} + +// IsMuted returns the current mute state (checks both relay and global mute) +func (r *AudioRelay) IsMuted() bool { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.muted || IsAudioMuted() +} + +// GetStats returns relay statistics +func (r *AudioRelay) GetStats() (framesRelayed, framesDropped int64) { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.framesRelayed, r.framesDropped +} + +// UpdateTrack updates the WebRTC audio track for the relay +func (r *AudioRelay) UpdateTrack(audioTrack AudioTrackWriter) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.audioTrack = audioTrack +} + +// relayLoop is the main relay loop that forwards frames from subprocess to WebRTC +func (r *AudioRelay) relayLoop() { + defer r.wg.Done() + r.logger.Debug().Msg("Audio relay loop started") + + for { + select { + case <-r.ctx.Done(): + r.logger.Debug().Msg("Audio relay loop stopping") + return + default: + // Receive frame from audio server subprocess + frame, err := r.client.ReceiveFrame() + if err != nil { + r.logger.Error().Err(err).Msg("Failed to receive audio frame") + r.incrementDropped() + continue + } + + // Forward frame to WebRTC + if err := r.forwardToWebRTC(frame); err != nil { + r.logger.Warn().Err(err).Msg("Failed to forward frame to WebRTC") + r.incrementDropped() + } else { + r.incrementRelayed() + } + } + } +} + +// forwardToWebRTC forwards a frame to the WebRTC audio track +func (r *AudioRelay) forwardToWebRTC(frame []byte) error { + r.mutex.RLock() + audioTrack := r.audioTrack + config := r.config + muted := r.muted + r.mutex.RUnlock() + + if audioTrack == nil { + return nil // No audio track available + } + + // Prepare sample data + var sampleData []byte + if muted { + // Send silence when muted + sampleData = make([]byte, len(frame)) + } else { + sampleData = frame + } + + // Write sample to WebRTC track + return audioTrack.WriteSample(media.Sample{ + Data: sampleData, + Duration: config.FrameSize, + }) +} + +// incrementRelayed atomically increments the relayed frames counter +func (r *AudioRelay) incrementRelayed() { + r.mutex.Lock() + r.framesRelayed++ + r.mutex.Unlock() +} + +// incrementDropped atomically increments the dropped frames counter +func (r *AudioRelay) incrementDropped() { + r.mutex.Lock() + r.framesDropped++ + r.mutex.Unlock() +} \ No newline at end of file diff --git a/internal/audio/relay_api.go b/internal/audio/relay_api.go new file mode 100644 index 0000000..7e25708 --- /dev/null +++ b/internal/audio/relay_api.go @@ -0,0 +1,109 @@ +package audio + +import ( + "sync" +) + +// Global relay instance for the main process +var ( + globalRelay *AudioRelay + relayMutex sync.RWMutex +) + +// StartAudioRelay starts the audio relay system for the main process +// This replaces the CGO-based audio system when running in main process mode +// audioTrack can be nil initially and updated later via UpdateAudioRelayTrack +func StartAudioRelay(audioTrack AudioTrackWriter) error { + relayMutex.Lock() + defer relayMutex.Unlock() + + if globalRelay != nil { + return nil // Already running + } + + // Create new relay + relay := NewAudioRelay() + + // Get current audio config + config := GetAudioConfig() + + // Start the relay (audioTrack can be nil initially) + if err := relay.Start(audioTrack, config); err != nil { + return err + } + + globalRelay = relay + return nil +} + +// StopAudioRelay stops the audio relay system +func StopAudioRelay() { + relayMutex.Lock() + defer relayMutex.Unlock() + + if globalRelay != nil { + globalRelay.Stop() + globalRelay = nil + } +} + +// SetAudioRelayMuted sets the mute state for the audio relay +func SetAudioRelayMuted(muted bool) { + relayMutex.RLock() + defer relayMutex.RUnlock() + + if globalRelay != nil { + globalRelay.SetMuted(muted) + } +} + +// IsAudioRelayMuted returns the current mute state of the audio relay +func IsAudioRelayMuted() bool { + relayMutex.RLock() + defer relayMutex.RUnlock() + + if globalRelay != nil { + return globalRelay.IsMuted() + } + return false +} + +// GetAudioRelayStats returns statistics from the audio relay +func GetAudioRelayStats() (framesRelayed, framesDropped int64) { + relayMutex.RLock() + defer relayMutex.RUnlock() + + if globalRelay != nil { + return globalRelay.GetStats() + } + return 0, 0 +} + +// IsAudioRelayRunning returns whether the audio relay is currently running +func IsAudioRelayRunning() bool { + relayMutex.RLock() + defer relayMutex.RUnlock() + + return globalRelay != nil +} + +// UpdateAudioRelayTrack updates the WebRTC audio track for the relay +func UpdateAudioRelayTrack(audioTrack AudioTrackWriter) error { + relayMutex.Lock() + defer relayMutex.Unlock() + + if globalRelay == nil { + // No relay running, start one with the provided track + relay := NewAudioRelay() + config := GetAudioConfig() + if err := relay.Start(audioTrack, config); err != nil { + return err + } + globalRelay = relay + return nil + } + + // Update the track in the existing relay + globalRelay.UpdateTrack(audioTrack) + return nil +} \ No newline at end of file diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go new file mode 100644 index 0000000..3ca3f10 --- /dev/null +++ b/internal/audio/supervisor.go @@ -0,0 +1,400 @@ +//go:build cgo +// +build cgo + +package audio + +import ( + "context" + "fmt" + "os" + "os/exec" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +const ( + // Maximum number of restart attempts within the restart window + maxRestartAttempts = 5 + // Time window for counting restart attempts + restartWindow = 5 * time.Minute + // Delay between restart attempts + restartDelay = 2 * time.Second + // Maximum restart delay (exponential backoff) + maxRestartDelay = 30 * time.Second +) + +// AudioServerSupervisor manages the audio server subprocess lifecycle +type AudioServerSupervisor struct { + ctx context.Context + cancel context.CancelFunc + logger *zerolog.Logger + mutex sync.RWMutex + running int32 + + // Process management + cmd *exec.Cmd + processPID int + + // Restart management + restartAttempts []time.Time + lastExitCode int + lastExitTime time.Time + + // Channels for coordination + processDone chan struct{} + stopChan chan struct{} + + // Callbacks + onProcessStart func(pid int) + onProcessExit func(pid int, exitCode int, crashed bool) + onRestart func(attempt int, delay time.Duration) +} + +// NewAudioServerSupervisor creates a new audio server supervisor +func NewAudioServerSupervisor() *AudioServerSupervisor { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", "audio-supervisor").Logger() + + return &AudioServerSupervisor{ + ctx: ctx, + cancel: cancel, + logger: &logger, + processDone: make(chan struct{}), + stopChan: make(chan struct{}), + } +} + +// SetCallbacks sets optional callbacks for process lifecycle events +func (s *AudioServerSupervisor) SetCallbacks( + onStart func(pid int), + onExit func(pid int, exitCode int, crashed bool), + onRestart func(attempt int, delay time.Duration), +) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.onProcessStart = onStart + s.onProcessExit = onExit + s.onRestart = onRestart +} + +// Start begins supervising the audio server process +func (s *AudioServerSupervisor) Start() error { + if !atomic.CompareAndSwapInt32(&s.running, 0, 1) { + return fmt.Errorf("supervisor already running") + } + + s.logger.Info().Msg("starting audio server supervisor") + + // Start the supervision loop + go s.supervisionLoop() + + return nil +} + +// Stop gracefully stops the audio server and supervisor +func (s *AudioServerSupervisor) Stop() error { + if !atomic.CompareAndSwapInt32(&s.running, 1, 0) { + return nil // Already stopped + } + + s.logger.Info().Msg("stopping audio server supervisor") + + // Signal stop and wait for cleanup + close(s.stopChan) + s.cancel() + + // Wait for process to exit + select { + case <-s.processDone: + s.logger.Info().Msg("audio server process stopped gracefully") + case <-time.After(10 * time.Second): + s.logger.Warn().Msg("audio server process did not stop gracefully, forcing termination") + s.forceKillProcess() + } + + return nil +} + +// IsRunning returns true if the supervisor is running +func (s *AudioServerSupervisor) IsRunning() bool { + return atomic.LoadInt32(&s.running) == 1 +} + +// GetProcessPID returns the current process PID (0 if not running) +func (s *AudioServerSupervisor) GetProcessPID() int { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.processPID +} + +// GetLastExitInfo returns information about the last process exit +func (s *AudioServerSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.lastExitCode, s.lastExitTime +} + +// supervisionLoop is the main supervision loop +func (s *AudioServerSupervisor) supervisionLoop() { + defer func() { + close(s.processDone) + s.logger.Info().Msg("audio server supervision ended") + }() + + for atomic.LoadInt32(&s.running) == 1 { + select { + case <-s.stopChan: + s.logger.Info().Msg("received stop signal") + s.terminateProcess() + return + case <-s.ctx.Done(): + s.logger.Info().Msg("context cancelled") + s.terminateProcess() + return + default: + // Start or restart the process + if err := s.startProcess(); err != nil { + s.logger.Error().Err(err).Msg("failed to start audio server process") + + // Check if we should attempt restart + if !s.shouldRestart() { + s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor") + return + } + + delay := s.calculateRestartDelay() + s.logger.Warn().Dur("delay", delay).Msg("retrying process start after delay") + + if s.onRestart != nil { + s.onRestart(len(s.restartAttempts), delay) + } + + select { + case <-time.After(delay): + case <-s.stopChan: + return + case <-s.ctx.Done(): + return + } + continue + } + + // Wait for process to exit + s.waitForProcessExit() + + // Check if we should restart + if !s.shouldRestart() { + s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor") + return + } + + // Calculate restart delay + delay := s.calculateRestartDelay() + s.logger.Info().Dur("delay", delay).Msg("restarting audio server process after delay") + + if s.onRestart != nil { + s.onRestart(len(s.restartAttempts), delay) + } + + // Wait for restart delay + select { + case <-time.After(delay): + case <-s.stopChan: + return + case <-s.ctx.Done(): + return + } + } + } +} + +// startProcess starts the audio server process +func (s *AudioServerSupervisor) startProcess() error { + execPath, err := os.Executable() + if err != nil { + return fmt.Errorf("failed to get executable path: %w", err) + } + + s.mutex.Lock() + defer s.mutex.Unlock() + + // Create new command + s.cmd = exec.CommandContext(s.ctx, execPath, "--audio-server") + s.cmd.Stdout = os.Stdout + s.cmd.Stderr = os.Stderr + + // Start the process + if err := s.cmd.Start(); err != nil { + return fmt.Errorf("failed to start process: %w", err) + } + + s.processPID = s.cmd.Process.Pid + s.logger.Info().Int("pid", s.processPID).Msg("audio server process started") + + if s.onProcessStart != nil { + s.onProcessStart(s.processPID) + } + + return nil +} + +// waitForProcessExit waits for the current process to exit and logs the result +func (s *AudioServerSupervisor) waitForProcessExit() { + s.mutex.RLock() + cmd := s.cmd + pid := s.processPID + s.mutex.RUnlock() + + if cmd == nil { + return + } + + // Wait for process to exit + err := cmd.Wait() + + s.mutex.Lock() + s.lastExitTime = time.Now() + s.processPID = 0 + + var exitCode int + var crashed bool + + if err != nil { + if exitError, ok := err.(*exec.ExitError); ok { + exitCode = exitError.ExitCode() + crashed = exitCode != 0 + } else { + // Process was killed or other error + exitCode = -1 + crashed = true + } + } else { + exitCode = 0 + crashed = false + } + + s.lastExitCode = exitCode + s.mutex.Unlock() + + if crashed { + s.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio server process crashed") + s.recordRestartAttempt() + } else { + s.logger.Info().Int("pid", pid).Msg("audio server process exited gracefully") + } + + if s.onProcessExit != nil { + s.onProcessExit(pid, exitCode, crashed) + } +} + +// terminateProcess gracefully terminates the current process +func (s *AudioServerSupervisor) terminateProcess() { + s.mutex.RLock() + cmd := s.cmd + pid := s.processPID + s.mutex.RUnlock() + + if cmd == nil || cmd.Process == nil { + return + } + + s.logger.Info().Int("pid", pid).Msg("terminating audio server process") + + // Send SIGTERM first + if err := cmd.Process.Signal(syscall.SIGTERM); err != nil { + s.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM") + } + + // Wait for graceful shutdown + done := make(chan struct{}) + go func() { + cmd.Wait() + close(done) + }() + + select { + case <-done: + s.logger.Info().Int("pid", pid).Msg("audio server process terminated gracefully") + case <-time.After(5 * time.Second): + s.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL") + s.forceKillProcess() + } +} + +// forceKillProcess forcefully kills the current process +func (s *AudioServerSupervisor) forceKillProcess() { + s.mutex.RLock() + cmd := s.cmd + pid := s.processPID + s.mutex.RUnlock() + + if cmd == nil || cmd.Process == nil { + return + } + + s.logger.Warn().Int("pid", pid).Msg("force killing audio server process") + if err := cmd.Process.Kill(); err != nil { + s.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process") + } +} + +// shouldRestart determines if the process should be restarted +func (s *AudioServerSupervisor) shouldRestart() bool { + if atomic.LoadInt32(&s.running) == 0 { + return false // Supervisor is stopping + } + + s.mutex.RLock() + defer s.mutex.RUnlock() + + // Clean up old restart attempts outside the window + now := time.Now() + var recentAttempts []time.Time + for _, attempt := range s.restartAttempts { + if now.Sub(attempt) < restartWindow { + recentAttempts = append(recentAttempts, attempt) + } + } + s.restartAttempts = recentAttempts + + return len(s.restartAttempts) < maxRestartAttempts +} + +// recordRestartAttempt records a restart attempt +func (s *AudioServerSupervisor) recordRestartAttempt() { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.restartAttempts = append(s.restartAttempts, time.Now()) +} + +// calculateRestartDelay calculates the delay before next restart attempt +func (s *AudioServerSupervisor) calculateRestartDelay() time.Duration { + s.mutex.RLock() + defer s.mutex.RUnlock() + + // Exponential backoff based on recent restart attempts + attempts := len(s.restartAttempts) + if attempts == 0 { + return restartDelay + } + + // Calculate exponential backoff: 2^attempts * base delay + delay := restartDelay + for i := 0; i < attempts && delay < maxRestartDelay; i++ { + delay *= 2 + } + + if delay > maxRestartDelay { + delay = maxRestartDelay + } + + return delay +} diff --git a/main.go b/main.go index 4853712..bdbe7df 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,8 @@ package kvm import ( "context" + "flag" + "fmt" "net/http" "os" "os/signal" @@ -10,12 +12,130 @@ import ( "github.com/gwatts/rootcerts" "github.com/jetkvm/kvm/internal/audio" - "github.com/pion/webrtc/v4/pkg/media" ) -var appCtx context.Context +var ( + appCtx context.Context + isAudioServer bool + audioProcessDone chan struct{} + audioSupervisor *audio.AudioServerSupervisor +) + +func init() { + flag.BoolVar(&isAudioServer, "audio-server", false, "Run as audio server subprocess") + audioProcessDone = make(chan struct{}) +} + +func runAudioServer() { + logger.Info().Msg("Starting audio server subprocess") + + // Create audio server + server, err := audio.NewAudioServer() + if err != nil { + logger.Error().Err(err).Msg("failed to create audio server") + os.Exit(1) + } + defer server.Close() + + // Start accepting connections + if err := server.Start(); err != nil { + logger.Error().Err(err).Msg("failed to start audio server") + os.Exit(1) + } + + // Initialize audio processing + err = audio.StartNonBlockingAudioStreaming(func(frame []byte) { + if err := server.SendFrame(frame); err != nil { + logger.Warn().Err(err).Msg("failed to send audio frame") + audio.RecordFrameDropped() + } + }) + if err != nil { + logger.Error().Err(err).Msg("failed to start audio processing") + os.Exit(1) + } + + // Wait for termination signal + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + + // Cleanup + audio.StopNonBlockingAudioStreaming() + logger.Info().Msg("Audio server subprocess stopped") +} + +func startAudioSubprocess() error { + // Create audio server supervisor + audioSupervisor = audio.NewAudioServerSupervisor() + + // Set up callbacks for process lifecycle events + audioSupervisor.SetCallbacks( + // onProcessStart + func(pid int) { + logger.Info().Int("pid", pid).Msg("audio server process started") + + // Start audio relay system for main process without a track initially + // The track will be updated when a WebRTC session is created + if err := audio.StartAudioRelay(nil); err != nil { + logger.Error().Err(err).Msg("failed to start audio relay") + } + }, + // onProcessExit + func(pid int, exitCode int, crashed bool) { + if crashed { + logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio server process crashed") + } else { + logger.Info().Int("pid", pid).Msg("audio server process exited gracefully") + } + + // Stop audio relay when process exits + audio.StopAudioRelay() + }, + // onRestart + func(attempt int, delay time.Duration) { + logger.Warn().Int("attempt", attempt).Dur("delay", delay).Msg("restarting audio server process") + }, + ) + + // Start the supervisor + if err := audioSupervisor.Start(); err != nil { + return fmt.Errorf("failed to start audio supervisor: %w", err) + } + + // Monitor supervisor and handle cleanup + go func() { + defer close(audioProcessDone) + + // Wait for supervisor to stop + for audioSupervisor.IsRunning() { + time.Sleep(100 * time.Millisecond) + } + + logger.Info().Msg("audio supervisor stopped") + }() + + return nil +} func Main() { + flag.Parse() + + // If running as audio server, only initialize audio processing + if isAudioServer { + runAudioServer() + return + } + + // If running as audio input server, only initialize audio input processing + if audio.IsAudioInputServerProcess() { + err := audio.RunAudioInputServer() + if err != nil { + logger.Error().Err(err).Msg("audio input server failed") + os.Exit(1) + } + return + } LoadConfig() var cancel context.CancelFunc @@ -80,30 +200,10 @@ func Main() { // initialize usb gadget initUsbGadget() - // Start non-blocking audio streaming and deliver Opus frames to WebRTC - err = audio.StartNonBlockingAudioStreaming(func(frame []byte) { - // Deliver Opus frame to WebRTC audio track if session is active - if currentSession != nil { - config := audio.GetAudioConfig() - var sampleData []byte - if audio.IsAudioMuted() { - sampleData = make([]byte, len(frame)) // silence - } else { - sampleData = frame - } - if err := currentSession.AudioTrack.WriteSample(media.Sample{ - Data: sampleData, - Duration: config.FrameSize, - }); err != nil { - logger.Warn().Err(err).Msg("error writing audio sample") - audio.RecordFrameDropped() - } - } else { - audio.RecordFrameDropped() - } - }) + // Start audio subprocess + err = startAudioSubprocess() if err != nil { - logger.Warn().Err(err).Msg("failed to start non-blocking audio streaming") + logger.Warn().Err(err).Msg("failed to start audio subprocess") } // Initialize session provider for audio events @@ -163,8 +263,18 @@ func Main() { <-sigs logger.Info().Msg("JetKVM Shutting Down") - // Stop non-blocking audio manager - audio.StopNonBlockingAudioStreaming() + // Stop audio subprocess and wait for cleanup + if !isAudioServer { + if audioSupervisor != nil { + logger.Info().Msg("stopping audio supervisor") + if err := audioSupervisor.Stop(); err != nil { + logger.Error().Err(err).Msg("failed to stop audio supervisor") + } + } + <-audioProcessDone + } else { + audio.StopNonBlockingAudioStreaming() + } //if fuseServer != nil { // err := setMassStorageImage(" ") // if err != nil { diff --git a/resource/dev_test.sh b/resource/dev_test.sh old mode 100644 new mode 100755 index 0497801..7451b50 --- a/resource/dev_test.sh +++ b/resource/dev_test.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash JSON_OUTPUT=false GET_COMMANDS=false if [ "$1" = "-json" ]; then diff --git a/tools/build_audio_deps.sh b/tools/build_audio_deps.sh old mode 100644 new mode 100755 diff --git a/tools/setup_rv1106_toolchain.sh b/tools/setup_rv1106_toolchain.sh old mode 100644 new mode 100755 diff --git a/web.go b/web.go index 4bed6b5..b419472 100644 --- a/web.go +++ b/web.go @@ -173,6 +173,8 @@ func setupRouter() *gin.Engine { return } audio.SetAudioMuted(req.Muted) + // Also set relay mute state if in main process + audio.SetAudioRelayMuted(req.Muted) // Broadcast audio mute state change via WebSocket broadcaster := audio.GetAudioEventBroadcaster() @@ -286,7 +288,7 @@ func setupRouter() *gin.Engine { // Optimized server-side cooldown using atomic operations opResult := audio.TryMicrophoneOperation() if !opResult.Allowed { - running := currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() + running := currentSession.AudioInputManager.IsRunning() c.JSON(200, gin.H{ "status": "cooldown", "running": running, @@ -297,7 +299,7 @@ func setupRouter() *gin.Engine { } // Check if already running before attempting to start - if currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() { + if currentSession.AudioInputManager.IsRunning() { c.JSON(200, gin.H{ "status": "already running", "running": true, @@ -312,7 +314,7 @@ func setupRouter() *gin.Engine { // Check if it's already running after the failed start attempt // This handles race conditions where another request started it - if currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() { + if currentSession.AudioInputManager.IsRunning() { c.JSON(200, gin.H{ "status": "started by concurrent request", "running": true, @@ -348,7 +350,7 @@ func setupRouter() *gin.Engine { // Optimized server-side cooldown using atomic operations opResult := audio.TryMicrophoneOperation() if !opResult.Allowed { - running := currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() + running := currentSession.AudioInputManager.IsRunning() c.JSON(200, gin.H{ "status": "cooldown", "running": running, @@ -359,7 +361,7 @@ func setupRouter() *gin.Engine { } // Check if already stopped before attempting to stop - if !currentSession.AudioInputManager.IsRunning() && !audio.IsNonBlockingAudioInputRunning() { + if !currentSession.AudioInputManager.IsRunning() { c.JSON(200, gin.H{ "status": "already stopped", "running": false, @@ -369,7 +371,7 @@ func setupRouter() *gin.Engine { currentSession.AudioInputManager.Stop() - // AudioInputManager.Stop() already coordinates a clean stop via StopNonBlockingAudioInput() + // AudioInputManager.Stop() already coordinates a clean stop via IPC audio input system // so we don't need to call it again here // Broadcast microphone state change via WebSocket @@ -437,9 +439,8 @@ func setupRouter() *gin.Engine { logger.Info().Msg("forcing microphone state reset") - // Force stop both the AudioInputManager and NonBlockingAudioManager + // Force stop the AudioInputManager currentSession.AudioInputManager.Stop() - audio.StopNonBlockingAudioInput() // Wait a bit to ensure everything is stopped time.Sleep(100 * time.Millisecond) @@ -449,9 +450,8 @@ func setupRouter() *gin.Engine { broadcaster.BroadcastMicrophoneStateChanged(false, true) c.JSON(200, gin.H{ - "status": "reset", - "audio_input_running": currentSession.AudioInputManager.IsRunning(), - "nonblocking_input_running": audio.IsNonBlockingAudioInputRunning(), + "status": "reset", + "audio_input_running": currentSession.AudioInputManager.IsRunning(), }) }) diff --git a/webrtc.go b/webrtc.go index a8c9360..a44f57e 100644 --- a/webrtc.go +++ b/webrtc.go @@ -30,10 +30,15 @@ type Session struct { AudioInputManager *audio.AudioInputManager shouldUmountVirtualMedia bool - // Microphone operation cooldown to mitigate rapid start/stop races - micOpMu sync.Mutex - lastMicOp time.Time - micCooldown time.Duration + // Microphone operation throttling + micOpMu sync.Mutex + lastMicOp time.Time + micCooldown time.Duration + + // Audio frame processing + audioFrameChan chan []byte + audioStopChan chan struct{} + audioWg sync.WaitGroup } type SessionConfig struct { @@ -118,8 +123,14 @@ func newSession(config SessionConfig) (*Session, error) { session := &Session{ peerConnection: peerConnection, AudioInputManager: audio.NewAudioInputManager(), + micCooldown: 100 * time.Millisecond, + audioFrameChan: make(chan []byte, 1000), + audioStopChan: make(chan struct{}), } + // Start audio processing goroutine + session.startAudioProcessor(*logger) + peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") switch d.Label() { @@ -155,6 +166,11 @@ func newSession(config SessionConfig) (*Session, error) { return nil, err } + // Update the audio relay with the new WebRTC audio track + if err := audio.UpdateAudioRelayTrack(session.AudioTrack); err != nil { + scopedLogger.Warn().Err(err).Msg("Failed to update audio relay track") + } + videoRtpSender, err := peerConnection.AddTrack(session.VideoTrack) if err != nil { return nil, err @@ -190,10 +206,14 @@ func newSession(config SessionConfig) (*Session, error) { // Extract Opus payload from RTP packet opusPayload := rtpPacket.Payload - if len(opusPayload) > 0 && session.AudioInputManager != nil { - err := session.AudioInputManager.WriteOpusFrame(opusPayload) - if err != nil { - scopedLogger.Warn().Err(err).Msg("Failed to write Opus frame to audio input manager") + if len(opusPayload) > 0 { + // Send to buffered channel for processing + select { + case session.audioFrameChan <- opusPayload: + // Frame sent successfully + default: + // Channel is full, drop the frame + scopedLogger.Warn().Msg("Audio frame channel full, dropping frame") } } } @@ -245,7 +265,8 @@ func newSession(config SessionConfig) (*Session, error) { err := rpcUnmountImage() scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close") } - // Stop audio input manager + // Stop audio processing and input manager + session.stopAudioProcessor() if session.AudioInputManager != nil { session.AudioInputManager.Stop() } @@ -262,6 +283,36 @@ func newSession(config SessionConfig) (*Session, error) { return session, nil } +// startAudioProcessor starts the dedicated audio processing goroutine +func (s *Session) startAudioProcessor(logger zerolog.Logger) { + s.audioWg.Add(1) + go func() { + defer s.audioWg.Done() + logger.Debug().Msg("Audio processor goroutine started") + + for { + select { + case frame := <-s.audioFrameChan: + if s.AudioInputManager != nil { + err := s.AudioInputManager.WriteOpusFrame(frame) + if err != nil { + logger.Warn().Err(err).Msg("Failed to write Opus frame to audio input manager") + } + } + case <-s.audioStopChan: + logger.Debug().Msg("Audio processor goroutine stopping") + return + } + } + }() +} + +// stopAudioProcessor stops the audio processing goroutine +func (s *Session) stopAudioProcessor() { + close(s.audioStopChan) + s.audioWg.Wait() +} + func drainRtpSender(rtpSender *webrtc.RTPSender) { // Lock to OS thread to isolate RTCP processing runtime.LockOSThread()