diff --git a/cloud.go b/cloud.go index cec749e..ecb89b6 100644 --- a/cloud.go +++ b/cloud.go @@ -447,35 +447,76 @@ func handleSessionRequest( } } - session, err := newSession(SessionConfig{ - ws: c, - IsCloud: isCloudConnection, - LocalIP: req.IP, - ICEServers: req.ICEServers, - Logger: scopedLogger, - }) - if err != nil { - _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) - return err - } + var session *Session + var err error + var sd string - sd, err := session.ExchangeOffer(req.Sd) - if err != nil { - _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) - return err - } + // Check if we have an existing session and handle renegotiation if currentSession != nil { - writeJSONRPCEvent("otherSessionConnected", nil, currentSession) - peerConn := currentSession.peerConnection - go func() { - time.Sleep(1 * time.Second) - _ = peerConn.Close() - }() + scopedLogger.Info().Msg("handling renegotiation for existing session") + + // Handle renegotiation with existing session + sd, err = currentSession.ExchangeOffer(req.Sd) + if err != nil { + scopedLogger.Warn().Err(err).Msg("renegotiation failed, creating new session") + // If renegotiation fails, fall back to creating a new session + session, err = newSession(SessionConfig{ + ws: c, + IsCloud: isCloudConnection, + LocalIP: req.IP, + ICEServers: req.ICEServers, + Logger: scopedLogger, + }) + if err != nil { + _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) + return err + } + + sd, err = session.ExchangeOffer(req.Sd) + if err != nil { + _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) + return err + } + + // Close the old session + writeJSONRPCEvent("otherSessionConnected", nil, currentSession) + peerConn := currentSession.peerConnection + go func() { + time.Sleep(1 * time.Second) + _ = peerConn.Close() + }() + + currentSession = session + cloudLogger.Info().Interface("session", session).Msg("new session created after renegotiation failure") + } else { + scopedLogger.Info().Msg("renegotiation successful") + } + } else { + // No existing session, create a new one + scopedLogger.Info().Msg("creating new session") + session, err = newSession(SessionConfig{ + ws: c, + IsCloud: isCloudConnection, + LocalIP: req.IP, + ICEServers: req.ICEServers, + Logger: scopedLogger, + }) + if err != nil { + _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) + return err + } + + sd, err = session.ExchangeOffer(req.Sd) + if err != nil { + _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) + return err + } + + currentSession = session + cloudLogger.Info().Interface("session", session).Msg("new session accepted") + cloudLogger.Trace().Interface("session", session).Msg("new session accepted") } - cloudLogger.Info().Interface("session", session).Msg("new session accepted") - cloudLogger.Trace().Interface("session", session).Msg("new session accepted") - currentSession = session _ = wsjson.Write(context.Background(), c, gin.H{"type": "answer", "data": sd}) return nil } diff --git a/internal/audio/api.go b/internal/audio/api.go index 2cb60b8..cbdb925 100644 --- a/internal/audio/api.go +++ b/internal/audio/api.go @@ -1,11 +1,13 @@ package audio // StartAudioStreaming launches the in-process audio stream and delivers Opus frames to the provided callback. +// This is now a wrapper around the non-blocking audio implementation for backward compatibility. func StartAudioStreaming(send func([]byte)) error { - return StartCGOAudioStream(send) + return StartNonBlockingAudioStreaming(send) } // StopAudioStreaming stops the in-process audio stream. +// This is now a wrapper around the non-blocking audio implementation for backward compatibility. func StopAudioStreaming() { - StopCGOAudioStream() + StopNonBlockingAudioStreaming() } diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 555e31f..220cdad 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -1,11 +1,16 @@ package audio import ( + "errors" "sync/atomic" "time" // Explicit import for CGO audio stream glue ) +var ( + ErrAudioAlreadyRunning = errors.New("audio already running") +) + const MaxAudioFrameSize = 1500 // AudioQuality represents different audio quality presets @@ -46,6 +51,13 @@ var ( Channels: 2, FrameSize: 20 * time.Millisecond, } + currentMicrophoneConfig = AudioConfig{ + Quality: AudioQualityMedium, + Bitrate: 32, + SampleRate: 48000, + Channels: 1, + FrameSize: 20 * time.Millisecond, + } metrics AudioMetrics ) @@ -55,14 +67,14 @@ func GetAudioQualityPresets() map[AudioQuality]AudioConfig { AudioQualityLow: { Quality: AudioQualityLow, Bitrate: 32, - SampleRate: 48000, - Channels: 2, - FrameSize: 20 * time.Millisecond, + SampleRate: 22050, + Channels: 1, + FrameSize: 40 * time.Millisecond, }, AudioQualityMedium: { Quality: AudioQualityMedium, Bitrate: 64, - SampleRate: 48000, + SampleRate: 44100, Channels: 2, FrameSize: 20 * time.Millisecond, }, @@ -75,7 +87,7 @@ func GetAudioQualityPresets() map[AudioQuality]AudioConfig { }, AudioQualityUltra: { Quality: AudioQualityUltra, - Bitrate: 256, + Bitrate: 192, SampleRate: 48000, Channels: 2, FrameSize: 10 * time.Millisecond, @@ -83,6 +95,40 @@ func GetAudioQualityPresets() map[AudioQuality]AudioConfig { } } +// GetMicrophoneQualityPresets returns predefined quality configurations for microphone input +func GetMicrophoneQualityPresets() map[AudioQuality]AudioConfig { + return map[AudioQuality]AudioConfig{ + AudioQualityLow: { + Quality: AudioQualityLow, + Bitrate: 16, + SampleRate: 16000, + Channels: 1, + FrameSize: 40 * time.Millisecond, + }, + AudioQualityMedium: { + Quality: AudioQualityMedium, + Bitrate: 32, + SampleRate: 22050, + Channels: 1, + FrameSize: 20 * time.Millisecond, + }, + AudioQualityHigh: { + Quality: AudioQualityHigh, + Bitrate: 64, + SampleRate: 44100, + Channels: 1, + FrameSize: 20 * time.Millisecond, + }, + AudioQualityUltra: { + Quality: AudioQualityUltra, + Bitrate: 96, + SampleRate: 48000, + Channels: 1, + FrameSize: 10 * time.Millisecond, + }, + } +} + // SetAudioQuality updates the current audio quality configuration func SetAudioQuality(quality AudioQuality) { presets := GetAudioQualityPresets() @@ -96,6 +142,19 @@ func GetAudioConfig() AudioConfig { return currentConfig } +// SetMicrophoneQuality updates the current microphone quality configuration +func SetMicrophoneQuality(quality AudioQuality) { + presets := GetMicrophoneQualityPresets() + if config, exists := presets[quality]; exists { + currentMicrophoneConfig = config + } +} + +// GetMicrophoneConfig returns the current microphone configuration +func GetMicrophoneConfig() AudioConfig { + return currentMicrophoneConfig +} + // GetAudioMetrics returns current audio metrics func GetAudioMetrics() AudioMetrics { return AudioMetrics{ diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index ab5825e..f65cba0 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -1,15 +1,8 @@ -//go:build linux && arm -// +build linux,arm - package audio import ( "errors" - "sync/atomic" - "time" "unsafe" - - "github.com/jetkvm/kvm/internal/logging" ) /* @@ -18,10 +11,13 @@ import ( #include #include #include +#include // C state for ALSA/Opus static snd_pcm_t *pcm_handle = NULL; +static snd_pcm_t *pcm_playback_handle = NULL; static OpusEncoder *encoder = NULL; +static OpusDecoder *decoder = NULL; static int opus_bitrate = 64000; static int opus_complexity = 5; static int sample_rate = 48000; @@ -58,21 +54,101 @@ int jetkvm_audio_read_encode(void *opus_buf) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *out = (unsigned char*)opus_buf; int pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); - if (pcm_rc < 0) return -1; + + // Handle ALSA errors with recovery + if (pcm_rc < 0) { + if (pcm_rc == -EPIPE) { + // Buffer underrun - try to recover + snd_pcm_prepare(pcm_handle); + pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); + if (pcm_rc < 0) return -1; + } else if (pcm_rc == -EAGAIN) { + // No data available - return 0 to indicate no frame + return 0; + } else { + // Other error - return error code + return -1; + } + } + + // If we got fewer frames than expected, pad with silence + if (pcm_rc < frame_size) { + memset(&pcm_buffer[pcm_rc * channels], 0, (frame_size - pcm_rc) * channels * sizeof(short)); + } + int nb_bytes = opus_encode(encoder, pcm_buffer, frame_size, out, max_packet_size); return nb_bytes; } +// Initialize ALSA playback for microphone input (browser -> USB gadget) +int jetkvm_audio_playback_init() { + int err; + snd_pcm_hw_params_t *params; + if (pcm_playback_handle) return 0; + + // Try to open the USB gadget audio device for playback + // This should correspond to the capture endpoint of the USB gadget + if (snd_pcm_open(&pcm_playback_handle, "hw:1,0", SND_PCM_STREAM_PLAYBACK, 0) < 0) { + // Fallback to default device if hw:1,0 doesn't work for playback + if (snd_pcm_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK, 0) < 0) + return -1; + } + + snd_pcm_hw_params_malloc(¶ms); + snd_pcm_hw_params_any(pcm_playback_handle, params); + snd_pcm_hw_params_set_access(pcm_playback_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); + snd_pcm_hw_params_set_format(pcm_playback_handle, params, SND_PCM_FORMAT_S16_LE); + snd_pcm_hw_params_set_channels(pcm_playback_handle, params, channels); + snd_pcm_hw_params_set_rate(pcm_playback_handle, params, sample_rate, 0); + snd_pcm_hw_params_set_period_size(pcm_playback_handle, params, frame_size, 0); + snd_pcm_hw_params(pcm_playback_handle, params); + snd_pcm_hw_params_free(params); + snd_pcm_prepare(pcm_playback_handle); + + // Initialize Opus decoder + decoder = opus_decoder_create(sample_rate, channels, &err); + if (!decoder) return -2; + + return 0; +} + +// Decode Opus and write PCM to playback device +int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { + short pcm_buffer[1920]; // max 2ch*960 + unsigned char *in = (unsigned char*)opus_buf; + + // Decode Opus to PCM + int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); + if (pcm_frames < 0) return -1; + + // Write PCM to playback device + int pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); + if (pcm_rc < 0) { + // Try to recover from underrun + if (pcm_rc == -EPIPE) { + snd_pcm_prepare(pcm_playback_handle); + pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); + } + if (pcm_rc < 0) return -2; + } + + return pcm_frames; +} + +void jetkvm_audio_playback_close() { + if (decoder) { opus_decoder_destroy(decoder); decoder = NULL; } + if (pcm_playback_handle) { snd_pcm_close(pcm_playback_handle); pcm_playback_handle = NULL; } +} + void jetkvm_audio_close() { if (encoder) { opus_encoder_destroy(encoder); encoder = NULL; } if (pcm_handle) { snd_pcm_close(pcm_handle); pcm_handle = NULL; } + jetkvm_audio_playback_close(); } */ import "C" -var ( - audioStreamRunning int32 -) + // Go wrappers for initializing, starting, stopping, and controlling audio func cgoAudioInit() error { @@ -96,62 +172,63 @@ func cgoAudioReadEncode(buf []byte) (int, error) { if n < 0 { return 0, errors.New("audio read/encode error") } + if n == 0 { + // No data available - this is not an error, just no audio frame + return 0, nil + } return int(n), nil } -func StartCGOAudioStream(send func([]byte)) error { - if !atomic.CompareAndSwapInt32(&audioStreamRunning, 0, 1) { - return errors.New("audio stream already running") + + +// Go wrappers for audio playback (microphone input) +func cgoAudioPlaybackInit() error { + ret := C.jetkvm_audio_playback_init() + if ret != 0 { + return errors.New("failed to init ALSA playback/Opus decoder") } - go func() { - defer atomic.StoreInt32(&audioStreamRunning, 0) - logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger() - err := cgoAudioInit() - if err != nil { - logger.Error().Err(err).Msg("cgoAudioInit failed") - return - } - defer cgoAudioClose() - buf := make([]byte, 1500) - errorCount := 0 - for atomic.LoadInt32(&audioStreamRunning) == 1 { - m := IsAudioMuted() - // (debug) logger.Debug().Msgf("audio loop: IsAudioMuted=%v", m) - if m { - time.Sleep(20 * time.Millisecond) - continue - } - n, err := cgoAudioReadEncode(buf) - if err != nil { - logger.Warn().Err(err).Msg("cgoAudioReadEncode error") - RecordFrameDropped() - errorCount++ - if errorCount >= 10 { - logger.Warn().Msg("Too many audio read errors, reinitializing ALSA/Opus state") - cgoAudioClose() - time.Sleep(100 * time.Millisecond) - if err := cgoAudioInit(); err != nil { - logger.Error().Err(err).Msg("cgoAudioInit failed during recovery") - time.Sleep(500 * time.Millisecond) - continue - } - errorCount = 0 - } else { - time.Sleep(5 * time.Millisecond) - } - continue - } - errorCount = 0 - // (debug) logger.Debug().Msgf("frame encoded: %d bytes", n) - RecordFrameReceived(n) - send(buf[:n]) - } - logger.Info().Msg("audio loop exited") - }() return nil } -// StopCGOAudioStream signals the audio stream goroutine to stop -func StopCGOAudioStream() { - atomic.StoreInt32(&audioStreamRunning, 0) +func cgoAudioPlaybackClose() { + C.jetkvm_audio_playback_close() +} + +// Decodes Opus frame and writes to playback device +func cgoAudioDecodeWrite(buf []byte) (int, error) { + if len(buf) == 0 { + return 0, errors.New("empty buffer") + } + n := C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf))) + if n < 0 { + return 0, errors.New("audio decode/write error") + } + return int(n), nil +} + + + +// Wrapper functions for non-blocking audio manager +func CGOAudioInit() error { + return cgoAudioInit() +} + +func CGOAudioClose() { + cgoAudioClose() +} + +func CGOAudioReadEncode(buf []byte) (int, error) { + return cgoAudioReadEncode(buf) +} + +func CGOAudioPlaybackInit() error { + return cgoAudioPlaybackInit() +} + +func CGOAudioPlaybackClose() { + cgoAudioPlaybackClose() +} + +func CGOAudioDecodeWrite(buf []byte) (int, error) { + return cgoAudioDecodeWrite(buf) } diff --git a/internal/audio/cgo_audio_notlinux.go b/internal/audio/cgo_audio_notlinux.go deleted file mode 100644 index 209b7aa..0000000 --- a/internal/audio/cgo_audio_notlinux.go +++ /dev/null @@ -1,11 +0,0 @@ -//go:build !linux || !arm -// +build !linux !arm - -package audio - -// Dummy implementations for non-linux/arm builds -func StartCGOAudioStream(send func([]byte)) error { - return nil -} - -func StopCGOAudioStream() {} diff --git a/internal/audio/cgo_audio_stub.go b/internal/audio/cgo_audio_stub.go new file mode 100644 index 0000000..c1d142c --- /dev/null +++ b/internal/audio/cgo_audio_stub.go @@ -0,0 +1,31 @@ +//go:build nolint + +package audio + +import "errors" + +// Stub implementations for linting (no CGO dependencies) + +func cgoAudioInit() error { + return errors.New("audio not available in lint mode") +} + +func cgoAudioClose() { + // No-op +} + +func cgoAudioReadEncode(buf []byte) (int, error) { + return 0, errors.New("audio not available in lint mode") +} + +func cgoAudioPlaybackInit() error { + return errors.New("audio not available in lint mode") +} + +func cgoAudioPlaybackClose() { + // No-op +} + +func cgoAudioDecodeWrite(buf []byte) (int, error) { + return 0, errors.New("audio not available in lint mode") +} \ No newline at end of file diff --git a/internal/audio/input.go b/internal/audio/input.go new file mode 100644 index 0000000..f93d317 --- /dev/null +++ b/internal/audio/input.go @@ -0,0 +1,118 @@ +package audio + +import ( + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// AudioInputMetrics holds metrics for microphone input +// Note: int64 fields must be 64-bit aligned for atomic operations on ARM +type AudioInputMetrics struct { + FramesSent int64 // Must be first for alignment + FramesDropped int64 + BytesProcessed int64 + ConnectionDrops int64 + AverageLatency time.Duration // time.Duration is int64 + LastFrameTime time.Time +} + +// AudioInputManager manages microphone input stream from WebRTC to USB gadget +type AudioInputManager struct { + // metrics MUST be first for ARM32 alignment (contains int64 fields) + metrics AudioInputMetrics + + inputBuffer chan []byte + logger zerolog.Logger + running int32 +} + +// NewAudioInputManager creates a new audio input manager +func NewAudioInputManager() *AudioInputManager { + return &AudioInputManager{ + inputBuffer: make(chan []byte, 100), // Buffer up to 100 frames + logger: logging.GetDefaultLogger().With().Str("component", "audio-input").Logger(), + } +} + +// Start begins processing microphone input +func (aim *AudioInputManager) Start() error { + if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) { + return nil // Already running + } + + aim.logger.Info().Msg("Starting audio input manager") + + // Start the non-blocking audio input stream + err := StartNonBlockingAudioInput(aim.inputBuffer) + if err != nil { + atomic.StoreInt32(&aim.running, 0) + return err + } + + return nil +} + +// Stop stops processing microphone input +func (aim *AudioInputManager) Stop() { + if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) { + return // Already stopped + } + + aim.logger.Info().Msg("Stopping audio input manager") + + // Stop the non-blocking audio input stream + // Note: This is handled by the global non-blocking audio manager + // Individual input streams are managed centrally + + // Drain the input buffer + go func() { + for { + select { + case <-aim.inputBuffer: + // Drain + case <-time.After(100 * time.Millisecond): + return + } + } + }() +} + +// WriteOpusFrame writes an Opus frame to the input buffer +func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { + if atomic.LoadInt32(&aim.running) == 0 { + return nil // Not running, ignore + } + + select { + case aim.inputBuffer <- frame: + atomic.AddInt64(&aim.metrics.FramesSent, 1) + atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame))) + aim.metrics.LastFrameTime = time.Now() + return nil + default: + // Buffer full, drop frame + atomic.AddInt64(&aim.metrics.FramesDropped, 1) + aim.logger.Warn().Msg("Audio input buffer full, dropping frame") + return nil + } +} + +// GetMetrics returns current microphone input metrics +func (aim *AudioInputManager) GetMetrics() AudioInputMetrics { + return AudioInputMetrics{ + FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), + FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped), + BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), + LastFrameTime: aim.metrics.LastFrameTime, + ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops), + AverageLatency: aim.metrics.AverageLatency, + } +} + +// IsRunning returns whether the audio input manager is running +func (aim *AudioInputManager) IsRunning() bool { + return atomic.LoadInt32(&aim.running) == 1 +} \ No newline at end of file diff --git a/internal/audio/nonblocking_api.go b/internal/audio/nonblocking_api.go new file mode 100644 index 0000000..d91b645 --- /dev/null +++ b/internal/audio/nonblocking_api.go @@ -0,0 +1,65 @@ +package audio + +import ( + "sync" +) + +var ( + globalNonBlockingManager *NonBlockingAudioManager + managerMutex sync.Mutex +) + +// StartNonBlockingAudioStreaming starts the non-blocking audio streaming system +func StartNonBlockingAudioStreaming(send func([]byte)) error { + managerMutex.Lock() + defer managerMutex.Unlock() + + if globalNonBlockingManager != nil && globalNonBlockingManager.IsRunning() { + return ErrAudioAlreadyRunning + } + + globalNonBlockingManager = NewNonBlockingAudioManager() + return globalNonBlockingManager.StartAudioOutput(send) +} + +// StartNonBlockingAudioInput starts the non-blocking audio input system +func StartNonBlockingAudioInput(receiveChan <-chan []byte) error { + managerMutex.Lock() + defer managerMutex.Unlock() + + if globalNonBlockingManager == nil { + globalNonBlockingManager = NewNonBlockingAudioManager() + } + + return globalNonBlockingManager.StartAudioInput(receiveChan) +} + +// StopNonBlockingAudioStreaming stops the non-blocking audio streaming system +func StopNonBlockingAudioStreaming() { + managerMutex.Lock() + defer managerMutex.Unlock() + + if globalNonBlockingManager != nil { + globalNonBlockingManager.Stop() + globalNonBlockingManager = nil + } +} + +// GetNonBlockingAudioStats returns statistics from the non-blocking audio system +func GetNonBlockingAudioStats() NonBlockingAudioStats { + managerMutex.Lock() + defer managerMutex.Unlock() + + if globalNonBlockingManager != nil { + return globalNonBlockingManager.GetStats() + } + return NonBlockingAudioStats{} +} + +// IsNonBlockingAudioRunning returns true if the non-blocking audio system is running +func IsNonBlockingAudioRunning() bool { + managerMutex.Lock() + defer managerMutex.Unlock() + + return globalNonBlockingManager != nil && globalNonBlockingManager.IsRunning() +} \ No newline at end of file diff --git a/internal/audio/nonblocking_audio.go b/internal/audio/nonblocking_audio.go new file mode 100644 index 0000000..c0756d7 --- /dev/null +++ b/internal/audio/nonblocking_audio.go @@ -0,0 +1,415 @@ +package audio + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// NonBlockingAudioManager manages audio operations in separate worker threads +// to prevent blocking of mouse/keyboard operations +type NonBlockingAudioManager struct { + // Statistics - MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) + stats NonBlockingAudioStats + + // Control + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + logger *zerolog.Logger + + // Audio output (capture from device, send to WebRTC) + outputSendFunc func([]byte) + outputWorkChan chan audioWorkItem + outputResultChan chan audioResult + + // Audio input (receive from WebRTC, playback to device) + inputReceiveChan <-chan []byte + inputWorkChan chan audioWorkItem + inputResultChan chan audioResult + + // Worker threads and flags - int32 fields grouped together + outputRunning int32 + inputRunning int32 + outputWorkerRunning int32 + inputWorkerRunning int32 +} + +type audioWorkItem struct { + workType audioWorkType + data []byte + resultChan chan audioResult +} + +type audioWorkType int + +const ( + audioWorkInit audioWorkType = iota + audioWorkReadEncode + audioWorkDecodeWrite + audioWorkClose +) + +type audioResult struct { + success bool + data []byte + length int + err error +} + +type NonBlockingAudioStats struct { + // int64 fields MUST be first for ARM32 alignment + OutputFramesProcessed int64 + OutputFramesDropped int64 + InputFramesProcessed int64 + InputFramesDropped int64 + WorkerErrors int64 + // time.Time is int64 internally, so it's also aligned + LastProcessTime time.Time +} + +// NewNonBlockingAudioManager creates a new non-blocking audio manager +func NewNonBlockingAudioManager() *NonBlockingAudioManager { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", "nonblocking-audio").Logger() + + return &NonBlockingAudioManager{ + ctx: ctx, + cancel: cancel, + logger: &logger, + outputWorkChan: make(chan audioWorkItem, 10), // Buffer for work items + outputResultChan: make(chan audioResult, 10), // Buffer for results + inputWorkChan: make(chan audioWorkItem, 10), + inputResultChan: make(chan audioResult, 10), + } +} + +// StartAudioOutput starts non-blocking audio output (capture and encode) +func (nam *NonBlockingAudioManager) StartAudioOutput(sendFunc func([]byte)) error { + if !atomic.CompareAndSwapInt32(&nam.outputRunning, 0, 1) { + return ErrAudioAlreadyRunning + } + + nam.outputSendFunc = sendFunc + + // Start the blocking worker thread + nam.wg.Add(1) + go nam.outputWorkerThread() + + // Start the non-blocking coordinator + nam.wg.Add(1) + go nam.outputCoordinatorThread() + + nam.logger.Info().Msg("non-blocking audio output started") + return nil +} + +// StartAudioInput starts non-blocking audio input (receive and decode) +func (nam *NonBlockingAudioManager) StartAudioInput(receiveChan <-chan []byte) error { + if !atomic.CompareAndSwapInt32(&nam.inputRunning, 0, 1) { + return ErrAudioAlreadyRunning + } + + nam.inputReceiveChan = receiveChan + + // Start the blocking worker thread + nam.wg.Add(1) + go nam.inputWorkerThread() + + // Start the non-blocking coordinator + nam.wg.Add(1) + go nam.inputCoordinatorThread() + + nam.logger.Info().Msg("non-blocking audio input started") + return nil +} + +// outputWorkerThread handles all blocking audio output operations +func (nam *NonBlockingAudioManager) outputWorkerThread() { + defer nam.wg.Done() + defer atomic.StoreInt32(&nam.outputWorkerRunning, 0) + + atomic.StoreInt32(&nam.outputWorkerRunning, 1) + nam.logger.Debug().Msg("output worker thread started") + + // Initialize audio in worker thread + if err := CGOAudioInit(); err != nil { + nam.logger.Error().Err(err).Msg("failed to initialize audio in worker thread") + return + } + defer CGOAudioClose() + + buf := make([]byte, 1500) + + for { + select { + case <-nam.ctx.Done(): + nam.logger.Debug().Msg("output worker thread stopping") + return + + case workItem := <-nam.outputWorkChan: + switch workItem.workType { + case audioWorkReadEncode: + // Perform blocking audio read/encode operation + n, err := CGOAudioReadEncode(buf) + result := audioResult{ + success: err == nil, + length: n, + err: err, + } + if err == nil && n > 0 { + // Copy data to avoid race conditions + result.data = make([]byte, n) + copy(result.data, buf[:n]) + } + + // Send result back (non-blocking) + select { + case workItem.resultChan <- result: + case <-nam.ctx.Done(): + return + default: + // Drop result if coordinator is not ready + atomic.AddInt64(&nam.stats.OutputFramesDropped, 1) + } + + case audioWorkClose: + nam.logger.Debug().Msg("output worker received close signal") + return + } + } + } +} + +// outputCoordinatorThread coordinates audio output without blocking +func (nam *NonBlockingAudioManager) outputCoordinatorThread() { + defer nam.wg.Done() + defer atomic.StoreInt32(&nam.outputRunning, 0) + + nam.logger.Debug().Msg("output coordinator thread started") + + ticker := time.NewTicker(20 * time.Millisecond) // Match frame timing + defer ticker.Stop() + + pendingWork := false + resultChan := make(chan audioResult, 1) + + for atomic.LoadInt32(&nam.outputRunning) == 1 { + select { + case <-nam.ctx.Done(): + nam.logger.Debug().Msg("output coordinator stopping") + return + + case <-ticker.C: + // Only submit work if worker is ready and no pending work + if !pendingWork && atomic.LoadInt32(&nam.outputWorkerRunning) == 1 { + if IsAudioMuted() { + continue // Skip when muted + } + + workItem := audioWorkItem{ + workType: audioWorkReadEncode, + resultChan: resultChan, + } + + // Submit work (non-blocking) + select { + case nam.outputWorkChan <- workItem: + pendingWork = true + default: + // Worker is busy, drop this frame + atomic.AddInt64(&nam.stats.OutputFramesDropped, 1) + } + } + + case result := <-resultChan: + pendingWork = false + nam.stats.LastProcessTime = time.Now() + + if result.success && result.data != nil && result.length > 0 { + // Send to WebRTC (non-blocking) + if nam.outputSendFunc != nil { + nam.outputSendFunc(result.data) + atomic.AddInt64(&nam.stats.OutputFramesProcessed, 1) + RecordFrameReceived(result.length) + } + } else if result.success && result.length == 0 { + // No data available - this is normal, not an error + // Just continue without logging or counting as error + } else { + atomic.AddInt64(&nam.stats.OutputFramesDropped, 1) + atomic.AddInt64(&nam.stats.WorkerErrors, 1) + if result.err != nil { + nam.logger.Warn().Err(result.err).Msg("audio output worker error") + } + RecordFrameDropped() + } + } + } + + // Signal worker to close + select { + case nam.outputWorkChan <- audioWorkItem{workType: audioWorkClose}: + case <-time.After(100 * time.Millisecond): + nam.logger.Warn().Msg("timeout signaling output worker to close") + } + + nam.logger.Info().Msg("output coordinator thread stopped") +} + +// inputWorkerThread handles all blocking audio input operations +func (nam *NonBlockingAudioManager) inputWorkerThread() { + defer nam.wg.Done() + defer atomic.StoreInt32(&nam.inputWorkerRunning, 0) + + atomic.StoreInt32(&nam.inputWorkerRunning, 1) + nam.logger.Debug().Msg("input worker thread started") + + // Initialize audio playback in worker thread + if err := CGOAudioPlaybackInit(); err != nil { + nam.logger.Error().Err(err).Msg("failed to initialize audio playback in worker thread") + return + } + defer CGOAudioPlaybackClose() + + for { + select { + case <-nam.ctx.Done(): + nam.logger.Debug().Msg("input worker thread stopping") + return + + case workItem := <-nam.inputWorkChan: + switch workItem.workType { + case audioWorkDecodeWrite: + // Perform blocking audio decode/write operation + n, err := CGOAudioDecodeWrite(workItem.data) + result := audioResult{ + success: err == nil, + length: n, + err: err, + } + + // Send result back (non-blocking) + select { + case workItem.resultChan <- result: + case <-nam.ctx.Done(): + return + default: + // Drop result if coordinator is not ready + atomic.AddInt64(&nam.stats.InputFramesDropped, 1) + } + + case audioWorkClose: + nam.logger.Debug().Msg("input worker received close signal") + return + } + } + } +} + +// inputCoordinatorThread coordinates audio input without blocking +func (nam *NonBlockingAudioManager) inputCoordinatorThread() { + defer nam.wg.Done() + defer atomic.StoreInt32(&nam.inputRunning, 0) + + nam.logger.Debug().Msg("input coordinator thread started") + + resultChan := make(chan audioResult, 1) + + for atomic.LoadInt32(&nam.inputRunning) == 1 { + select { + case <-nam.ctx.Done(): + nam.logger.Debug().Msg("input coordinator stopping") + return + + case frame := <-nam.inputReceiveChan: + if frame == nil || len(frame) == 0 { + continue + } + + // Submit work to worker (non-blocking) + if atomic.LoadInt32(&nam.inputWorkerRunning) == 1 { + workItem := audioWorkItem{ + workType: audioWorkDecodeWrite, + data: frame, + resultChan: resultChan, + } + + select { + case nam.inputWorkChan <- workItem: + // Wait for result with timeout + select { + case result := <-resultChan: + if result.success { + atomic.AddInt64(&nam.stats.InputFramesProcessed, 1) + } else { + atomic.AddInt64(&nam.stats.InputFramesDropped, 1) + atomic.AddInt64(&nam.stats.WorkerErrors, 1) + if result.err != nil { + nam.logger.Warn().Err(result.err).Msg("audio input worker error") + } + } + case <-time.After(50 * time.Millisecond): + // Timeout waiting for result + atomic.AddInt64(&nam.stats.InputFramesDropped, 1) + nam.logger.Warn().Msg("timeout waiting for input worker result") + } + default: + // Worker is busy, drop this frame + atomic.AddInt64(&nam.stats.InputFramesDropped, 1) + } + } + + case <-time.After(250 * time.Millisecond): + // Periodic timeout to prevent blocking + continue + } + } + + // Signal worker to close + select { + case nam.inputWorkChan <- audioWorkItem{workType: audioWorkClose}: + case <-time.After(100 * time.Millisecond): + nam.logger.Warn().Msg("timeout signaling input worker to close") + } + + nam.logger.Info().Msg("input coordinator thread stopped") +} + +// Stop stops all audio operations +func (nam *NonBlockingAudioManager) Stop() { + nam.logger.Info().Msg("stopping non-blocking audio manager") + + // Signal all threads to stop + nam.cancel() + + // Stop coordinators + atomic.StoreInt32(&nam.outputRunning, 0) + atomic.StoreInt32(&nam.inputRunning, 0) + + // Wait for all goroutines to finish + nam.wg.Wait() + + nam.logger.Info().Msg("non-blocking audio manager stopped") +} + +// GetStats returns current statistics +func (nam *NonBlockingAudioManager) GetStats() NonBlockingAudioStats { + return NonBlockingAudioStats{ + OutputFramesProcessed: atomic.LoadInt64(&nam.stats.OutputFramesProcessed), + OutputFramesDropped: atomic.LoadInt64(&nam.stats.OutputFramesDropped), + InputFramesProcessed: atomic.LoadInt64(&nam.stats.InputFramesProcessed), + InputFramesDropped: atomic.LoadInt64(&nam.stats.InputFramesDropped), + WorkerErrors: atomic.LoadInt64(&nam.stats.WorkerErrors), + LastProcessTime: nam.stats.LastProcessTime, + } +} + +// IsRunning returns true if any audio operations are running +func (nam *NonBlockingAudioManager) IsRunning() bool { + return atomic.LoadInt32(&nam.outputRunning) == 1 || atomic.LoadInt32(&nam.inputRunning) == 1 +} \ No newline at end of file diff --git a/jsonrpc.go b/jsonrpc.go index e930f49..b8ecfb0 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -10,6 +10,7 @@ import ( "path/filepath" "reflect" "strconv" + "sync" "time" "github.com/pion/webrtc/v4" @@ -18,6 +19,74 @@ import ( "github.com/jetkvm/kvm/internal/usbgadget" ) +// Mouse event processing with single worker +var ( + mouseEventChan = make(chan mouseEventData, 100) // Buffered channel for mouse events + mouseWorkerOnce sync.Once +) + +type mouseEventData struct { + message webrtc.DataChannelMessage + session *Session +} + +// startMouseWorker starts a single worker goroutine for processing mouse events +func startMouseWorker() { + go func() { + ticker := time.NewTicker(16 * time.Millisecond) // ~60 FPS + defer ticker.Stop() + + var latestMouseEvent *mouseEventData + + for { + select { + case event := <-mouseEventChan: + // Always keep the latest mouse event + latestMouseEvent = &event + + case <-ticker.C: + // Process the latest mouse event at regular intervals + if latestMouseEvent != nil { + onRPCMessage(latestMouseEvent.message, latestMouseEvent.session) + latestMouseEvent = nil + } + } + } + }() +} + +// onRPCMessageThrottled handles RPC messages with special throttling for mouse events +func onRPCMessageThrottled(message webrtc.DataChannelMessage, session *Session) { + var request JSONRPCRequest + err := json.Unmarshal(message.Data, &request) + if err != nil { + onRPCMessage(message, session) + return + } + + // Check if this is a mouse event that should be throttled + if isMouseEvent(request.Method) { + // Start the mouse worker if not already started + mouseWorkerOnce.Do(startMouseWorker) + + // Send to mouse worker (non-blocking) + select { + case mouseEventChan <- mouseEventData{message: message, session: session}: + // Event queued successfully + default: + // Channel is full, drop the event (this prevents blocking) + } + } else { + // Non-mouse events are processed immediately + go onRPCMessage(message, session) + } +} + +// isMouseEvent checks if the RPC method is a mouse-related event +func isMouseEvent(method string) bool { + return method == "absMouseReport" || method == "relMouseReport" +} + type JSONRPCRequest struct { JSONRPC string `json:"jsonrpc"` Method string `json:"method"` diff --git a/main.go b/main.go index cccd5e6..f2d327a 100644 --- a/main.go +++ b/main.go @@ -80,33 +80,31 @@ func Main() { // initialize usb gadget initUsbGadget() - // Start in-process audio streaming and deliver Opus frames to WebRTC - go func() { - err := audio.StartAudioStreaming(func(frame []byte) { - // Deliver Opus frame to WebRTC audio track if session is active - if currentSession != nil { - config := audio.GetAudioConfig() - var sampleData []byte - if audio.IsAudioMuted() { - sampleData = make([]byte, len(frame)) // silence - } else { - sampleData = frame - } - if err := currentSession.AudioTrack.WriteSample(media.Sample{ - Data: sampleData, - Duration: config.FrameSize, - }); err != nil { - logger.Warn().Err(err).Msg("error writing audio sample") - audio.RecordFrameDropped() - } + // Start non-blocking audio streaming and deliver Opus frames to WebRTC + err = audio.StartNonBlockingAudioStreaming(func(frame []byte) { + // Deliver Opus frame to WebRTC audio track if session is active + if currentSession != nil { + config := audio.GetAudioConfig() + var sampleData []byte + if audio.IsAudioMuted() { + sampleData = make([]byte, len(frame)) // silence } else { + sampleData = frame + } + if err := currentSession.AudioTrack.WriteSample(media.Sample{ + Data: sampleData, + Duration: config.FrameSize, + }); err != nil { + logger.Warn().Err(err).Msg("error writing audio sample") audio.RecordFrameDropped() } - }) - if err != nil { - logger.Warn().Err(err).Msg("failed to start in-process audio streaming") + } else { + audio.RecordFrameDropped() } - }() + }) + if err != nil { + logger.Warn().Err(err).Msg("failed to start non-blocking audio streaming") + } if err := setInitialVirtualMediaState(); err != nil { logger.Warn().Err(err).Msg("failed to set initial virtual media state") @@ -157,6 +155,9 @@ func Main() { signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) <-sigs logger.Info().Msg("JetKVM Shutting Down") + + // Stop non-blocking audio manager + audio.StopNonBlockingAudioStreaming() //if fuseServer != nil { // err := setMassStorageImage(" ") // if err != nil { diff --git a/ui/src/components/ActionBar.tsx b/ui/src/components/ActionBar.tsx index 409387e..62df18a 100644 --- a/ui/src/components/ActionBar.tsx +++ b/ui/src/components/ActionBar.tsx @@ -22,10 +22,23 @@ import AudioControlPopover from "@/components/popovers/AudioControlPopover"; import { useDeviceUiNavigation } from "@/hooks/useAppNavigation"; import api from "@/api"; +// Type for microphone hook return value +interface MicrophoneHookReturn { + isMicrophoneActive: boolean; + isMicrophoneMuted: boolean; + microphoneStream: MediaStream | null; + startMicrophone: (deviceId?: string) => Promise<{ success: boolean; error?: any }>; + stopMicrophone: () => Promise<{ success: boolean; error?: any }>; + toggleMicrophoneMute: () => Promise<{ success: boolean; error?: any }>; + syncMicrophoneState: () => Promise; +} + export default function Actionbar({ requestFullscreen, + microphone, }: { requestFullscreen: () => Promise; + microphone: MicrophoneHookReturn; }) { const { navigateTo } = useDeviceUiNavigation(); const virtualKeyboard = useHidStore(state => state.isVirtualKeyboardEnabled); @@ -340,7 +353,7 @@ export default function Actionbar({ checkIfStateChanged(open); return (
- +
); }} diff --git a/ui/src/components/AudioLevelMeter.tsx b/ui/src/components/AudioLevelMeter.tsx new file mode 100644 index 0000000..dc293d2 --- /dev/null +++ b/ui/src/components/AudioLevelMeter.tsx @@ -0,0 +1,77 @@ +import React from 'react'; +import clsx from 'clsx'; + +interface AudioLevelMeterProps { + level: number; // 0-100 percentage + isActive: boolean; + className?: string; + size?: 'sm' | 'md' | 'lg'; + showLabel?: boolean; +} + +export const AudioLevelMeter: React.FC = ({ + level, + isActive, + className, + size = 'md', + showLabel = true +}) => { + const sizeClasses = { + sm: 'h-1', + md: 'h-2', + lg: 'h-3' + }; + + const getLevelColor = (level: number) => { + if (level < 20) return 'bg-green-500'; + if (level < 60) return 'bg-yellow-500'; + return 'bg-red-500'; + }; + + const getTextColor = (level: number) => { + if (level < 20) return 'text-green-600 dark:text-green-400'; + if (level < 60) return 'text-yellow-600 dark:text-yellow-400'; + return 'text-red-600 dark:text-red-400'; + }; + + return ( +
+ {showLabel && ( +
+ + Microphone Level + + + {isActive ? `${Math.round(level)}%` : 'No Signal'} + +
+ )} + +
+
+
+ + {/* Peak indicators */} +
+ 0% + 50% + 100% +
+
+ ); +}; \ No newline at end of file diff --git a/ui/src/components/AudioMetricsDashboard.tsx b/ui/src/components/AudioMetricsDashboard.tsx index 48e6fe7..08d77ea 100644 --- a/ui/src/components/AudioMetricsDashboard.tsx +++ b/ui/src/components/AudioMetricsDashboard.tsx @@ -1,8 +1,11 @@ import { useEffect, useState } from "react"; -import { MdGraphicEq, MdSignalWifi4Bar, MdError } from "react-icons/md"; +import { MdGraphicEq, MdSignalWifi4Bar, MdError, MdMic } from "react-icons/md"; import { LuActivity, LuClock, LuHardDrive, LuSettings } from "react-icons/lu"; +import { AudioLevelMeter } from "@components/AudioLevelMeter"; import { cx } from "@/cva.config"; +import { useMicrophone } from "@/hooks/useMicrophone"; +import { useAudioLevel } from "@/hooks/useAudioLevel"; import api from "@/api"; interface AudioMetrics { @@ -14,6 +17,15 @@ interface AudioMetrics { average_latency: string; } +interface MicrophoneMetrics { + frames_sent: number; + frames_dropped: number; + bytes_processed: number; + last_frame_time: string; + connection_drops: number; + average_latency: string; +} + interface AudioConfig { Quality: number; Bitrate: number; @@ -31,9 +43,15 @@ const qualityLabels = { export default function AudioMetricsDashboard() { const [metrics, setMetrics] = useState(null); + const [microphoneMetrics, setMicrophoneMetrics] = useState(null); const [config, setConfig] = useState(null); + const [microphoneConfig, setMicrophoneConfig] = useState(null); const [isConnected, setIsConnected] = useState(false); const [lastUpdate, setLastUpdate] = useState(new Date()); + + // Microphone state for audio level monitoring + const { isMicrophoneActive, isMicrophoneMuted, microphoneStream } = useMicrophone(); + const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream); useEffect(() => { loadAudioData(); @@ -57,12 +75,35 @@ export default function AudioMetricsDashboard() { setIsConnected(false); } + // Load microphone metrics + try { + const micResp = await api.GET("/microphone/metrics"); + if (micResp.ok) { + const micData = await micResp.json(); + setMicrophoneMetrics(micData); + } + } catch (micError) { + // Microphone metrics might not be available, that's okay + console.debug("Microphone metrics not available:", micError); + } + // Load config const configResp = await api.GET("/audio/quality"); if (configResp.ok) { const configData = await configResp.json(); setConfig(configData.current); } + + // Load microphone config + try { + const micConfigResp = await api.GET("/microphone/quality"); + if (micConfigResp.ok) { + const micConfigData = await micConfigResp.json(); + setMicrophoneConfig(micConfigData.current); + } + } catch (micConfigError) { + console.debug("Microphone config not available:", micConfigError); + } } catch (error) { console.error("Failed to load audio data:", error); setIsConnected(false); @@ -118,52 +159,91 @@ export default function AudioMetricsDashboard() {
{/* Current Configuration */} - {config && ( -
-
- - - Current Configuration - -
-
-
- Quality: - - {qualityLabels[config.Quality as keyof typeof qualityLabels]} +
+ {config && ( +
+
+ + + Audio Output Config
-
- Bitrate: - - {config.Bitrate}kbps - -
-
- Sample Rate: - - {config.SampleRate}Hz - -
-
- Channels: - - {config.Channels} - +
+
+ Quality: + + {qualityLabels[config.Quality as keyof typeof qualityLabels]} + +
+
+ Bitrate: + + {config.Bitrate}kbps + +
+
+ Sample Rate: + + {config.SampleRate}Hz + +
+
+ Channels: + + {config.Channels} + +
-
- )} + )} + + {microphoneConfig && ( +
+
+ + + Microphone Input Config + +
+
+
+ Quality: + + {qualityLabels[microphoneConfig.Quality as keyof typeof qualityLabels]} + +
+
+ Bitrate: + + {microphoneConfig.Bitrate}kbps + +
+
+ Sample Rate: + + {microphoneConfig.SampleRate}Hz + +
+
+ Channels: + + {microphoneConfig.Channels} + +
+
+
+ )} +
{/* Performance Metrics */} {metrics && (
- {/* Frames */} + {/* Audio Output Frames */}
- Frame Statistics + Audio Output
@@ -223,6 +303,87 @@ export default function AudioMetricsDashboard() {
+ {/* Microphone Input Metrics */} + {microphoneMetrics && ( +
+
+ + + Microphone Input + +
+
+
+
+ {formatNumber(microphoneMetrics.frames_sent)} +
+
+ Frames Sent +
+
+
+
0 + ? "text-red-600 dark:text-red-400" + : "text-green-600 dark:text-green-400" + )}> + {formatNumber(microphoneMetrics.frames_dropped)} +
+
+ Frames Dropped +
+
+
+ + {/* Microphone Drop Rate */} +
+
+ + Drop Rate + + 0 ? (microphoneMetrics.frames_dropped / microphoneMetrics.frames_sent) * 100 : 0) > 5 + ? "text-red-600 dark:text-red-400" + : (microphoneMetrics.frames_sent > 0 ? (microphoneMetrics.frames_dropped / microphoneMetrics.frames_sent) * 100 : 0) > 1 + ? "text-yellow-600 dark:text-yellow-400" + : "text-green-600 dark:text-green-400" + )}> + {microphoneMetrics.frames_sent > 0 ? ((microphoneMetrics.frames_dropped / microphoneMetrics.frames_sent) * 100).toFixed(2) : "0.00"}% + +
+
+
0 ? (microphoneMetrics.frames_dropped / microphoneMetrics.frames_sent) * 100 : 0) > 5 + ? "bg-red-500" + : (microphoneMetrics.frames_sent > 0 ? (microphoneMetrics.frames_dropped / microphoneMetrics.frames_sent) * 100 : 0) > 1 + ? "bg-yellow-500" + : "bg-green-500" + )} + style={{ + width: `${Math.min(microphoneMetrics.frames_sent > 0 ? (microphoneMetrics.frames_dropped / microphoneMetrics.frames_sent) * 100 : 0, 100)}%` + }} + /> +
+
+ + {/* Microphone Audio Level */} + {isMicrophoneActive && ( +
+ +
+ )} +
+ )} + {/* Data Transfer */}
diff --git a/ui/src/components/WebRTCVideo.tsx b/ui/src/components/WebRTCVideo.tsx index 096068a..9364f05 100644 --- a/ui/src/components/WebRTCVideo.tsx +++ b/ui/src/components/WebRTCVideo.tsx @@ -25,7 +25,22 @@ import { PointerLockBar, } from "./VideoOverlay"; -export default function WebRTCVideo() { +// Interface for microphone hook return type +interface MicrophoneHookReturn { + isMicrophoneActive: boolean; + isMicrophoneMuted: boolean; + microphoneStream: MediaStream | null; + startMicrophone: (deviceId?: string) => Promise<{ success: boolean; error?: any }>; + stopMicrophone: () => Promise<{ success: boolean; error?: any }>; + toggleMicrophoneMute: () => Promise<{ success: boolean; error?: any }>; + syncMicrophoneState: () => Promise; +} + +interface WebRTCVideoProps { + microphone: MicrophoneHookReturn; +} + +export default function WebRTCVideo({ microphone }: WebRTCVideoProps) { // Video and stream related refs and states const videoElm = useRef(null); const mediaStream = useRTCStore(state => state.mediaStream); @@ -675,7 +690,7 @@ export default function WebRTCVideo() { disabled={peerConnection?.connectionState !== "connected"} className="contents" > - +
diff --git a/ui/src/components/popovers/AudioControlPopover.tsx b/ui/src/components/popovers/AudioControlPopover.tsx index 5d2f61e..fed714e 100644 --- a/ui/src/components/popovers/AudioControlPopover.tsx +++ b/ui/src/components/popovers/AudioControlPopover.tsx @@ -1,11 +1,26 @@ import { useEffect, useState } from "react"; -import { MdVolumeOff, MdVolumeUp, MdGraphicEq } from "react-icons/md"; +import { MdVolumeOff, MdVolumeUp, MdGraphicEq, MdMic, MdMicOff, MdRefresh } from "react-icons/md"; import { LuActivity, LuSettings, LuSignal } from "react-icons/lu"; import { Button } from "@components/Button"; +import { AudioLevelMeter } from "@components/AudioLevelMeter"; import { cx } from "@/cva.config"; import { useUiStore } from "@/hooks/stores"; +import { useAudioDevices } from "@/hooks/useAudioDevices"; +import { useAudioLevel } from "@/hooks/useAudioLevel"; import api from "@/api"; +import notifications from "@/notifications"; + +// Type for microphone hook return value +interface MicrophoneHookReturn { + isMicrophoneActive: boolean; + isMicrophoneMuted: boolean; + microphoneStream: MediaStream | null; + startMicrophone: (deviceId?: string) => Promise<{ success: boolean; error?: any }>; + stopMicrophone: () => Promise<{ success: boolean; error?: any }>; + toggleMicrophoneMute: () => Promise<{ success: boolean; error?: any }>; + syncMicrophoneState: () => Promise; +} interface AudioConfig { Quality: number; @@ -24,6 +39,15 @@ interface AudioMetrics { average_latency: string; } +interface MicrophoneMetrics { + frames_sent: number; + frames_dropped: number; + bytes_processed: number; + last_frame_time: string; + connection_drops: number; + average_latency: string; +} + const qualityLabels = { @@ -33,25 +57,64 @@ const qualityLabels = { 3: "Ultra (256kbps)" }; -export default function AudioControlPopover() { - const [isMuted, setIsMuted] = useState(false); - const [currentConfig, setCurrentConfig] = useState(null); +interface AudioControlPopoverProps { + microphone: MicrophoneHookReturn; +} +export default function AudioControlPopover({ microphone }: AudioControlPopoverProps) { + const [currentConfig, setCurrentConfig] = useState(null); + const [currentMicrophoneConfig, setCurrentMicrophoneConfig] = useState(null); + const [isMuted, setIsMuted] = useState(false); const [metrics, setMetrics] = useState(null); const [showAdvanced, setShowAdvanced] = useState(false); const [isLoading, setIsLoading] = useState(false); const [isConnected, setIsConnected] = useState(false); + + // Microphone state from props + const { + isMicrophoneActive, + isMicrophoneMuted, + microphoneStream, + startMicrophone, + stopMicrophone, + toggleMicrophoneMute, + syncMicrophoneState, + } = microphone; + const [microphoneMetrics, setMicrophoneMetrics] = useState(null); + const [isMicrophoneLoading, setIsMicrophoneLoading] = useState(false); + + // Audio level monitoring + const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream); + + // Audio devices + const { + audioInputDevices, + audioOutputDevices, + selectedInputDevice, + selectedOutputDevice, + setSelectedInputDevice, + setSelectedOutputDevice, + isLoading: devicesLoading, + error: devicesError, + refreshDevices + } = useAudioDevices(); + const { toggleSidebarView } = useUiStore(); // Load initial audio state useEffect(() => { loadAudioState(); loadAudioMetrics(); + loadMicrophoneMetrics(); + syncMicrophoneState(); // Set up metrics refresh interval - const metricsInterval = setInterval(loadAudioMetrics, 2000); + const metricsInterval = setInterval(() => { + loadAudioMetrics(); + loadMicrophoneMetrics(); + }, 2000); return () => clearInterval(metricsInterval); - }, []); + }, [syncMicrophoneState]); const loadAudioState = async () => { try { @@ -68,6 +131,13 @@ export default function AudioControlPopover() { const qualityData = await qualityResp.json(); setCurrentConfig(qualityData.current); } + + // Load microphone quality config + const micQualityResp = await api.GET("/microphone/quality"); + if (micQualityResp.ok) { + const micQualityData = await micQualityResp.json(); + setCurrentMicrophoneConfig(micQualityData.current); + } } catch (error) { console.error("Failed to load audio state:", error); } @@ -90,6 +160,20 @@ export default function AudioControlPopover() { } }; + + + const loadMicrophoneMetrics = async () => { + try { + const resp = await api.GET("/microphone/metrics"); + if (resp.ok) { + const data = await resp.json(); + setMicrophoneMetrics(data); + } + } catch (error) { + console.error("Failed to load microphone metrics:", error); + } + }; + const handleToggleMute = async () => { setIsLoading(true); try { @@ -119,6 +203,89 @@ export default function AudioControlPopover() { } }; + const handleMicrophoneQualityChange = async (quality: number) => { + setIsMicrophoneLoading(true); + try { + const resp = await api.POST("/microphone/quality", { quality }); + if (resp.ok) { + const data = await resp.json(); + setCurrentMicrophoneConfig(data.config); + } + } catch (error) { + console.error("Failed to change microphone quality:", error); + } finally { + setIsMicrophoneLoading(false); + } + }; + + const handleToggleMicrophone = async () => { + setIsMicrophoneLoading(true); + try { + const result = isMicrophoneActive ? await stopMicrophone() : await startMicrophone(selectedInputDevice); + if (!result.success && result.error) { + notifications.error(result.error.message); + } + } catch (error) { + console.error("Failed to toggle microphone:", error); + notifications.error("An unexpected error occurred"); + } finally { + setIsMicrophoneLoading(false); + } + }; + + const handleToggleMicrophoneMute = async () => { + setIsMicrophoneLoading(true); + try { + const result = await toggleMicrophoneMute(); + if (!result.success && result.error) { + notifications.error(result.error.message); + } + } catch (error) { + console.error("Failed to toggle microphone mute:", error); + notifications.error("Failed to toggle microphone mute"); + } finally { + setIsMicrophoneLoading(false); + } + }; + + // Handle microphone device change + const handleMicrophoneDeviceChange = async (deviceId: string) => { + setSelectedInputDevice(deviceId); + + // If microphone is currently active, restart it with the new device + if (isMicrophoneActive) { + setIsMicrophoneLoading(true); + try { + // Stop current microphone + await stopMicrophone(); + // Start with new device + const result = await startMicrophone(deviceId); + if (!result.success && result.error) { + notifications.error(result.error.message); + } + } finally { + setIsMicrophoneLoading(false); + } + } + }; + + const handleAudioOutputDeviceChange = async (deviceId: string) => { + setSelectedOutputDevice(deviceId); + + // Find the video element and set the audio output device + const videoElement = document.querySelector('video'); + if (videoElement && 'setSinkId' in videoElement) { + try { + await (videoElement as any).setSinkId(deviceId); + console.log('Audio output device changed to:', deviceId); + } catch (error) { + console.error('Failed to change audio output device:', error); + } + } else { + console.warn('setSinkId not supported or video element not found'); + } + }; + const formatBytes = (bytes: number) => { if (bytes === 0) return "0 B"; const k = 1024; @@ -171,12 +338,212 @@ export default function AudioControlPopover() { />
+ {/* Microphone Control */} +
+
+ + + Microphone Input + +
+ +
+
+ {isMicrophoneActive ? ( + isMicrophoneMuted ? ( + + ) : ( + + ) + ) : ( + + )} + + {!isMicrophoneActive + ? "Inactive" + : isMicrophoneMuted + ? "Muted" + : "Active" + } + +
+
+
+
+ + {/* Audio Level Meter */} + {isMicrophoneActive && ( +
+ + {/* Debug information */} +
+
+ Stream: {microphoneStream ? '✓' : '✗'} + Analyzing: {isAnalyzing ? '✓' : '✗'} + Active: {isMicrophoneActive ? '✓' : '✗'} + Muted: {isMicrophoneMuted ? '✓' : '✗'} +
+ {microphoneStream && ( +
+ Tracks: {microphoneStream.getAudioTracks().length} + {microphoneStream.getAudioTracks().length > 0 && ( + + (Enabled: {microphoneStream.getAudioTracks().filter((t: MediaStreamTrack) => t.enabled).length}) + + )} +
+ )} + +
+
+ )} +
+ + {/* Device Selection */} +
+
+ + + Audio Devices + + {devicesLoading && ( +
+ )} +
+ + {devicesError && ( +
+ {devicesError} +
+ )} + + {/* Microphone Selection */} +
+ + + {isMicrophoneActive && ( +

+ Changing device will restart the microphone +

+ )} +
+ + {/* Speaker Selection */} +
+ + +
+ + +
+ + {/* Microphone Quality Settings */} + {isMicrophoneActive && ( +
+
+ + + Microphone Quality + +
+ +
+ {Object.entries(qualityLabels).map(([quality, label]) => ( + + ))} +
+ + {currentMicrophoneConfig && ( +
+
+ Sample Rate: {currentMicrophoneConfig.SampleRate}Hz + Channels: {currentMicrophoneConfig.Channels} + Bitrate: {currentMicrophoneConfig.Bitrate}kbps + Frame: {currentMicrophoneConfig.FrameSize} +
+
+ )} +
+ )} + {/* Quality Settings */}
- Audio Quality + Audio Output Quality
@@ -240,46 +607,94 @@ export default function AudioControlPopover() { {metrics ? ( <> -
-
-
Frames Received
-
- {formatNumber(metrics.frames_received)} +
+

Audio Output

+
+
+
Frames Received
+
+ {formatNumber(metrics.frames_received)} +
-
- -
-
Frames Dropped
-
0 - ? "text-red-600 dark:text-red-400" - : "text-green-600 dark:text-green-400" - )}> - {formatNumber(metrics.frames_dropped)} + +
+
Frames Dropped
+
0 + ? "text-red-600 dark:text-red-400" + : "text-green-600 dark:text-green-400" + )}> + {formatNumber(metrics.frames_dropped)} +
-
- -
-
Data Processed
-
- {formatBytes(metrics.bytes_processed)} + +
+
Data Processed
+
+ {formatBytes(metrics.bytes_processed)} +
-
- -
-
Connection Drops
-
0 - ? "text-red-600 dark:text-red-400" - : "text-green-600 dark:text-green-400" - )}> - {formatNumber(metrics.connection_drops)} + +
+
Connection Drops
+
0 + ? "text-red-600 dark:text-red-400" + : "text-green-600 dark:text-green-400" + )}> + {formatNumber(metrics.connection_drops)} +
+ {microphoneMetrics && ( +
+

Microphone Input

+
+
+
Frames Sent
+
+ {formatNumber(microphoneMetrics.frames_sent)} +
+
+ +
+
Frames Dropped
+
0 + ? "text-red-600 dark:text-red-400" + : "text-green-600 dark:text-green-400" + )}> + {formatNumber(microphoneMetrics.frames_dropped)} +
+
+ +
+
Data Processed
+
+ {formatBytes(microphoneMetrics.bytes_processed)} +
+
+ +
+
Connection Drops
+
0 + ? "text-red-600 dark:text-red-400" + : "text-green-600 dark:text-green-400" + )}> + {formatNumber(microphoneMetrics.connection_drops)} +
+
+
+
+ )} + {metrics.frames_received > 0 && (
Drop Rate
diff --git a/ui/src/hooks/stores.ts b/ui/src/hooks/stores.ts index 1a1f6b6..db31df5 100644 --- a/ui/src/hooks/stores.ts +++ b/ui/src/hooks/stores.ts @@ -117,6 +117,16 @@ interface RTCState { mediaStream: MediaStream | null; setMediaStream: (stream: MediaStream) => void; + // Microphone stream management + microphoneStream: MediaStream | null; + setMicrophoneStream: (stream: MediaStream | null) => void; + microphoneSender: RTCRtpSender | null; + setMicrophoneSender: (sender: RTCRtpSender | null) => void; + isMicrophoneActive: boolean; + setMicrophoneActive: (active: boolean) => void; + isMicrophoneMuted: boolean; + setMicrophoneMuted: (muted: boolean) => void; + videoStreamStats: RTCInboundRtpStreamStats | null; appendVideoStreamStats: (state: RTCInboundRtpStreamStats) => void; videoStreamStatsHistory: Map; @@ -166,6 +176,16 @@ export const useRTCStore = create(set => ({ mediaStream: null, setMediaStream: stream => set({ mediaStream: stream }), + // Microphone stream management + microphoneStream: null, + setMicrophoneStream: stream => set({ microphoneStream: stream }), + microphoneSender: null, + setMicrophoneSender: sender => set({ microphoneSender: sender }), + isMicrophoneActive: false, + setMicrophoneActive: active => set({ isMicrophoneActive: active }), + isMicrophoneMuted: false, + setMicrophoneMuted: muted => set({ isMicrophoneMuted: muted }), + videoStreamStats: null, appendVideoStreamStats: stats => set({ videoStreamStats: stats }), videoStreamStatsHistory: new Map(), diff --git a/ui/src/hooks/useAudioDevices.ts b/ui/src/hooks/useAudioDevices.ts new file mode 100644 index 0000000..c0b20f3 --- /dev/null +++ b/ui/src/hooks/useAudioDevices.ts @@ -0,0 +1,107 @@ +import { useState, useEffect, useCallback } from 'react'; + +export interface AudioDevice { + deviceId: string; + label: string; + kind: 'audioinput' | 'audiooutput'; +} + +export interface UseAudioDevicesReturn { + audioInputDevices: AudioDevice[]; + audioOutputDevices: AudioDevice[]; + selectedInputDevice: string; + selectedOutputDevice: string; + isLoading: boolean; + error: string | null; + refreshDevices: () => Promise; + setSelectedInputDevice: (deviceId: string) => void; + setSelectedOutputDevice: (deviceId: string) => void; +} + +export function useAudioDevices(): UseAudioDevicesReturn { + const [audioInputDevices, setAudioInputDevices] = useState([]); + const [audioOutputDevices, setAudioOutputDevices] = useState([]); + const [selectedInputDevice, setSelectedInputDevice] = useState('default'); + const [selectedOutputDevice, setSelectedOutputDevice] = useState('default'); + const [isLoading, setIsLoading] = useState(false); + const [error, setError] = useState(null); + + const refreshDevices = useCallback(async () => { + setIsLoading(true); + setError(null); + + try { + // Request permissions first to get device labels + await navigator.mediaDevices.getUserMedia({ audio: true }); + + const devices = await navigator.mediaDevices.enumerateDevices(); + + const inputDevices: AudioDevice[] = [ + { deviceId: 'default', label: 'Default Microphone', kind: 'audioinput' } + ]; + + const outputDevices: AudioDevice[] = [ + { deviceId: 'default', label: 'Default Speaker', kind: 'audiooutput' } + ]; + + devices.forEach(device => { + if (device.kind === 'audioinput' && device.deviceId !== 'default') { + inputDevices.push({ + deviceId: device.deviceId, + label: device.label || `Microphone ${device.deviceId.slice(0, 8)}`, + kind: 'audioinput' + }); + } else if (device.kind === 'audiooutput' && device.deviceId !== 'default') { + outputDevices.push({ + deviceId: device.deviceId, + label: device.label || `Speaker ${device.deviceId.slice(0, 8)}`, + kind: 'audiooutput' + }); + } + }); + + setAudioInputDevices(inputDevices); + setAudioOutputDevices(outputDevices); + + console.log('Audio devices enumerated:', { + inputs: inputDevices.length, + outputs: outputDevices.length + }); + + } catch (err) { + console.error('Failed to enumerate audio devices:', err); + setError(err instanceof Error ? err.message : 'Failed to access audio devices'); + } finally { + setIsLoading(false); + } + }, []); + + // Listen for device changes + useEffect(() => { + const handleDeviceChange = () => { + console.log('Audio devices changed, refreshing...'); + refreshDevices(); + }; + + navigator.mediaDevices.addEventListener('devicechange', handleDeviceChange); + + // Initial load + refreshDevices(); + + return () => { + navigator.mediaDevices.removeEventListener('devicechange', handleDeviceChange); + }; + }, [refreshDevices]); + + return { + audioInputDevices, + audioOutputDevices, + selectedInputDevice, + selectedOutputDevice, + isLoading, + error, + refreshDevices, + setSelectedInputDevice, + setSelectedOutputDevice, + }; +} \ No newline at end of file diff --git a/ui/src/hooks/useAudioLevel.ts b/ui/src/hooks/useAudioLevel.ts new file mode 100644 index 0000000..0e2038e --- /dev/null +++ b/ui/src/hooks/useAudioLevel.ts @@ -0,0 +1,113 @@ +import { useEffect, useRef, useState } from 'react'; + +interface AudioLevelHookResult { + audioLevel: number; // 0-100 percentage + isAnalyzing: boolean; +} + +export const useAudioLevel = (stream: MediaStream | null): AudioLevelHookResult => { + const [audioLevel, setAudioLevel] = useState(0); + const [isAnalyzing, setIsAnalyzing] = useState(false); + const audioContextRef = useRef(null); + const analyserRef = useRef(null); + const sourceRef = useRef(null); + const animationFrameRef = useRef(null); + + useEffect(() => { + if (!stream) { + // Clean up when stream is null + if (animationFrameRef.current) { + cancelAnimationFrame(animationFrameRef.current); + animationFrameRef.current = null; + } + if (sourceRef.current) { + sourceRef.current.disconnect(); + sourceRef.current = null; + } + if (audioContextRef.current) { + audioContextRef.current.close(); + audioContextRef.current = null; + } + analyserRef.current = null; + setIsAnalyzing(false); + setAudioLevel(0); + return; + } + + const audioTracks = stream.getAudioTracks(); + if (audioTracks.length === 0) { + setIsAnalyzing(false); + setAudioLevel(0); + return; + } + + try { + // Create audio context and analyser + const audioContext = new (window.AudioContext || (window as any).webkitAudioContext)(); + const analyser = audioContext.createAnalyser(); + const source = audioContext.createMediaStreamSource(stream); + + // Configure analyser + analyser.fftSize = 256; + analyser.smoothingTimeConstant = 0.8; + + // Connect nodes + source.connect(analyser); + + // Store references + audioContextRef.current = audioContext; + analyserRef.current = analyser; + sourceRef.current = source; + + const dataArray = new Uint8Array(analyser.frequencyBinCount); + + const updateLevel = () => { + if (!analyserRef.current) return; + + analyserRef.current.getByteFrequencyData(dataArray); + + // Calculate RMS (Root Mean Square) for more accurate level representation + let sum = 0; + for (let i = 0; i < dataArray.length; i++) { + sum += dataArray[i] * dataArray[i]; + } + const rms = Math.sqrt(sum / dataArray.length); + + // Convert to percentage (0-100) + const level = Math.min(100, (rms / 255) * 100); + setAudioLevel(level); + + animationFrameRef.current = requestAnimationFrame(updateLevel); + }; + + setIsAnalyzing(true); + updateLevel(); + + } catch (error) { + console.error('Failed to create audio level analyzer:', error); + setIsAnalyzing(false); + setAudioLevel(0); + } + + // Cleanup function + return () => { + if (animationFrameRef.current) { + cancelAnimationFrame(animationFrameRef.current); + animationFrameRef.current = null; + } + if (sourceRef.current) { + sourceRef.current.disconnect(); + sourceRef.current = null; + } + if (audioContextRef.current) { + audioContextRef.current.close(); + audioContextRef.current = null; + } + analyserRef.current = null; + setIsAnalyzing(false); + setAudioLevel(0); + }; + }, [stream]); + + return { audioLevel, isAnalyzing }; +}; \ No newline at end of file diff --git a/ui/src/hooks/useMicrophone.ts b/ui/src/hooks/useMicrophone.ts new file mode 100644 index 0000000..9472b6e --- /dev/null +++ b/ui/src/hooks/useMicrophone.ts @@ -0,0 +1,716 @@ +import { useCallback, useEffect, useRef } from "react"; +import { useRTCStore } from "@/hooks/stores"; +import api from "@/api"; + +export interface MicrophoneError { + type: 'permission' | 'device' | 'network' | 'unknown'; + message: string; +} + +export function useMicrophone() { + const { + peerConnection, + microphoneStream, + setMicrophoneStream, + microphoneSender, + setMicrophoneSender, + isMicrophoneActive, + setMicrophoneActive, + isMicrophoneMuted, + setMicrophoneMuted, + } = useRTCStore(); + + const microphoneStreamRef = useRef(null); + + // Cleanup function to stop microphone stream + const stopMicrophoneStream = useCallback(async () => { + console.log("stopMicrophoneStream called - cleaning up stream"); + console.trace("stopMicrophoneStream call stack"); + + if (microphoneStreamRef.current) { + console.log("Stopping microphone stream:", microphoneStreamRef.current.id); + microphoneStreamRef.current.getTracks().forEach(track => { + track.stop(); + }); + microphoneStreamRef.current = null; + setMicrophoneStream(null); + console.log("Microphone stream cleared from ref and store"); + } else { + console.log("No microphone stream to stop"); + } + + if (microphoneSender && peerConnection) { + // Instead of removing the track, replace it with null to keep the transceiver + try { + await microphoneSender.replaceTrack(null); + } catch (error) { + console.warn("Failed to replace track with null:", error); + // Fallback to removing the track + peerConnection.removeTrack(microphoneSender); + } + setMicrophoneSender(null); + } + + setMicrophoneActive(false); + setMicrophoneMuted(false); + }, [microphoneSender, peerConnection, setMicrophoneStream, setMicrophoneSender, setMicrophoneActive, setMicrophoneMuted]); + + // Debug function to check current state (can be called from browser console) + const debugMicrophoneState = useCallback(() => { + const refStream = microphoneStreamRef.current; + const state = { + isMicrophoneActive, + isMicrophoneMuted, + streamInRef: !!refStream, + streamInStore: !!microphoneStream, + senderInStore: !!microphoneSender, + streamId: refStream?.id, + storeStreamId: microphoneStream?.id, + audioTracks: refStream?.getAudioTracks().length || 0, + storeAudioTracks: microphoneStream?.getAudioTracks().length || 0, + audioTrackDetails: refStream?.getAudioTracks().map(track => ({ + id: track.id, + label: track.label, + enabled: track.enabled, + readyState: track.readyState, + muted: track.muted + })) || [], + peerConnectionState: peerConnection ? { + connectionState: peerConnection.connectionState, + iceConnectionState: peerConnection.iceConnectionState, + signalingState: peerConnection.signalingState + } : "No peer connection", + streamMatch: refStream === microphoneStream + }; + console.log("Microphone Debug State:", state); + + // Also check if streams are active + if (refStream) { + console.log("Ref stream active tracks:", refStream.getAudioTracks().filter(t => t.readyState === 'live').length); + } + if (microphoneStream && microphoneStream !== refStream) { + console.log("Store stream active tracks:", microphoneStream.getAudioTracks().filter(t => t.readyState === 'live').length); + } + + return state; + }, [isMicrophoneActive, isMicrophoneMuted, microphoneStream, microphoneSender, peerConnection]); + + // Make debug function available globally for console access + useEffect(() => { + (window as any).debugMicrophoneState = debugMicrophoneState; + return () => { + delete (window as any).debugMicrophoneState; + }; + }, [debugMicrophoneState]); + + const lastSyncRef = useRef(0); + const isStartingRef = useRef(false); // Track if we're in the middle of starting + + const syncMicrophoneState = useCallback(async () => { + // Debounce sync calls to prevent race conditions + const now = Date.now(); + if (now - lastSyncRef.current < 500) { + console.log("Skipping sync - too frequent"); + return; + } + lastSyncRef.current = now; + + // Don't sync if we're in the middle of starting the microphone + if (isStartingRef.current) { + console.log("Skipping sync - microphone is starting"); + return; + } + + try { + const response = await api.GET("/microphone/status", {}); + if (response.ok) { + const data = await response.json(); + const backendRunning = data.running; + + // If backend state differs from frontend state, sync them + if (backendRunning !== isMicrophoneActive) { + console.info(`Syncing microphone state: backend=${backendRunning}, frontend=${isMicrophoneActive}`); + setMicrophoneActive(backendRunning); + + // Only clean up stream if backend is definitely not running AND we have a stream + // Use ref to get current stream state, not stale closure value + if (!backendRunning && microphoneStreamRef.current) { + console.log("Backend not running, cleaning up stream"); + await stopMicrophoneStream(); + } + } + } + } catch (error) { + console.warn("Failed to sync microphone state:", error); + } + }, [isMicrophoneActive, setMicrophoneActive, stopMicrophoneStream]); + + // Start microphone stream + const startMicrophone = useCallback(async (deviceId?: string): Promise<{ success: boolean; error?: MicrophoneError }> => { + try { + // Set flag to prevent sync during startup + isStartingRef.current = true; + // Request microphone permission and get stream + const audioConstraints: MediaTrackConstraints = { + echoCancellation: true, + noiseSuppression: true, + autoGainControl: true, + sampleRate: 48000, + channelCount: 1, + }; + + // Add device ID if specified + if (deviceId && deviceId !== 'default') { + audioConstraints.deviceId = { exact: deviceId }; + } + + console.log("Requesting microphone with constraints:", audioConstraints); + const stream = await navigator.mediaDevices.getUserMedia({ + audio: audioConstraints + }); + + console.log("Microphone stream created successfully:", { + streamId: stream.id, + audioTracks: stream.getAudioTracks().length, + videoTracks: stream.getVideoTracks().length, + audioTrackDetails: stream.getAudioTracks().map(track => ({ + id: track.id, + label: track.label, + enabled: track.enabled, + readyState: track.readyState + })) + }); + + // Store the stream in both ref and store + microphoneStreamRef.current = stream; + setMicrophoneStream(stream); + + // Verify the stream was stored correctly + console.log("Stream storage verification:", { + refSet: !!microphoneStreamRef.current, + refId: microphoneStreamRef.current?.id, + storeWillBeSet: true // Store update is async + }); + + // Add audio track to peer connection if available + console.log("Peer connection state:", peerConnection ? { + connectionState: peerConnection.connectionState, + iceConnectionState: peerConnection.iceConnectionState, + signalingState: peerConnection.signalingState + } : "No peer connection"); + + if (peerConnection && stream.getAudioTracks().length > 0) { + const audioTrack = stream.getAudioTracks()[0]; + console.log("Starting microphone with audio track:", audioTrack.id, "kind:", audioTrack.kind); + + // Find the audio transceiver (should already exist with sendrecv direction) + const transceivers = peerConnection.getTransceivers(); + console.log("Available transceivers:", transceivers.map(t => ({ + direction: t.direction, + mid: t.mid, + senderTrack: t.sender.track?.kind, + receiverTrack: t.receiver.track?.kind + }))); + + // Look for an audio transceiver that can send (has sendrecv or sendonly direction) + const audioTransceiver = transceivers.find(transceiver => { + // Check if this transceiver is for audio and can send + const canSend = transceiver.direction === 'sendrecv' || transceiver.direction === 'sendonly'; + + // For newly created transceivers, we need to check if they're for audio + // We can do this by checking if the sender doesn't have a track yet and direction allows sending + if (canSend && !transceiver.sender.track) { + return true; + } + + // For existing transceivers, check if they already have an audio track + if (transceiver.sender.track?.kind === 'audio' || transceiver.receiver.track?.kind === 'audio') { + return canSend; + } + + return false; + }); + + console.log("Found audio transceiver:", audioTransceiver ? { + direction: audioTransceiver.direction, + mid: audioTransceiver.mid, + senderTrack: audioTransceiver.sender.track?.kind, + receiverTrack: audioTransceiver.receiver.track?.kind + } : null); + + let sender: RTCRtpSender; + if (audioTransceiver && audioTransceiver.sender) { + // Use the existing audio transceiver's sender + await audioTransceiver.sender.replaceTrack(audioTrack); + sender = audioTransceiver.sender; + console.log("Replaced audio track on existing transceiver"); + + // Verify the track was set correctly + console.log("Transceiver after track replacement:", { + direction: audioTransceiver.direction, + senderTrack: audioTransceiver.sender.track?.id, + senderTrackKind: audioTransceiver.sender.track?.kind, + senderTrackEnabled: audioTransceiver.sender.track?.enabled, + senderTrackReadyState: audioTransceiver.sender.track?.readyState + }); + } else { + // Fallback: add new track if no transceiver found + sender = peerConnection.addTrack(audioTrack, stream); + console.log("Added new audio track to peer connection"); + + // Find the transceiver that was created for this track + const newTransceiver = peerConnection.getTransceivers().find(t => t.sender === sender); + console.log("New transceiver created:", newTransceiver ? { + direction: newTransceiver.direction, + senderTrack: newTransceiver.sender.track?.id, + senderTrackKind: newTransceiver.sender.track?.kind + } : "Not found"); + } + + setMicrophoneSender(sender); + console.log("Microphone sender set:", { + senderId: sender, + track: sender.track?.id, + trackKind: sender.track?.kind, + trackEnabled: sender.track?.enabled, + trackReadyState: sender.track?.readyState + }); + + // Check sender stats to verify audio is being transmitted + setTimeout(async () => { + try { + const stats = await sender.getStats(); + console.log("Sender stats after 2 seconds:"); + stats.forEach((report, id) => { + if (report.type === 'outbound-rtp' && report.kind === 'audio') { + console.log("Outbound audio RTP stats:", { + id, + packetsSent: report.packetsSent, + bytesSent: report.bytesSent, + timestamp: report.timestamp + }); + } + }); + } catch (error) { + console.error("Failed to get sender stats:", error); + } + }, 2000); + } + + // Notify backend that microphone is started + console.log("Notifying backend about microphone start..."); + try { + const backendResp = await api.POST("/microphone/start", {}); + console.log("Backend response status:", backendResp.status, "ok:", backendResp.ok); + + if (!backendResp.ok) { + console.error("Backend microphone start failed with status:", backendResp.status); + // If backend fails, cleanup the stream + await stopMicrophoneStream(); + isStartingRef.current = false; + return { + success: false, + error: { + type: 'network', + message: 'Failed to start microphone on backend' + } + }; + } + + // Check the response to see if it was already running + const responseData = await backendResp.json(); + console.log("Backend response data:", responseData); + if (responseData.status === "already running") { + console.info("Backend microphone was already running"); + } + console.log("Backend microphone start successful"); + } catch (error) { + console.error("Backend microphone start threw error:", error); + // If backend fails, cleanup the stream + await stopMicrophoneStream(); + isStartingRef.current = false; + return { + success: false, + error: { + type: 'network', + message: 'Failed to communicate with backend' + } + }; + } + + // Only set active state after backend confirms success + setMicrophoneActive(true); + setMicrophoneMuted(false); + + console.log("Microphone state set to active. Verifying state:", { + streamInRef: !!microphoneStreamRef.current, + streamInStore: !!microphoneStream, + isActive: true, + isMuted: false + }); + + // Don't sync immediately after starting - it causes race conditions + // The sync will happen naturally through other triggers + setTimeout(() => { + // Just verify state after a delay for debugging + console.log("State check after delay:", { + streamInRef: !!microphoneStreamRef.current, + streamInStore: !!microphoneStream, + isActive: isMicrophoneActive, + isMuted: isMicrophoneMuted + }); + }, 100); + + // Clear the starting flag + isStartingRef.current = false; + return { success: true }; + } catch (error) { + console.error("Failed to start microphone:", error); + + let micError: MicrophoneError; + if (error instanceof Error) { + if (error.name === 'NotAllowedError' || error.name === 'PermissionDeniedError') { + micError = { + type: 'permission', + message: 'Microphone permission denied. Please allow microphone access and try again.' + }; + } else if (error.name === 'NotFoundError' || error.name === 'DevicesNotFoundError') { + micError = { + type: 'device', + message: 'No microphone device found. Please check your microphone connection.' + }; + } else { + micError = { + type: 'unknown', + message: error.message || 'Failed to access microphone' + }; + } + } else { + micError = { + type: 'unknown', + message: 'Unknown error occurred while accessing microphone' + }; + } + + // Clear the starting flag on error + isStartingRef.current = false; + return { success: false, error: micError }; + } + }, [peerConnection, setMicrophoneStream, setMicrophoneSender, setMicrophoneActive, setMicrophoneMuted, syncMicrophoneState, stopMicrophoneStream]); + + // Stop microphone + const stopMicrophone = useCallback(async (): Promise<{ success: boolean; error?: MicrophoneError }> => { + try { + await stopMicrophoneStream(); + + // Notify backend that microphone is stopped + try { + await api.POST("/microphone/stop", {}); + } catch (error) { + console.warn("Failed to notify backend about microphone stop:", error); + } + + // Sync state after stopping to ensure consistency + setTimeout(() => syncMicrophoneState(), 100); + + return { success: true }; + } catch (error) { + console.error("Failed to stop microphone:", error); + return { + success: false, + error: { + type: 'unknown', + message: error instanceof Error ? error.message : 'Failed to stop microphone' + } + }; + } + }, [stopMicrophoneStream, syncMicrophoneState]); + + // Toggle microphone mute + const toggleMicrophoneMute = useCallback(async (): Promise<{ success: boolean; error?: MicrophoneError }> => { + try { + // Use the ref instead of store value to avoid race conditions + const currentStream = microphoneStreamRef.current || microphoneStream; + + console.log("Toggle microphone mute - current state:", { + hasRefStream: !!microphoneStreamRef.current, + hasStoreStream: !!microphoneStream, + isActive: isMicrophoneActive, + isMuted: isMicrophoneMuted, + streamId: currentStream?.id, + audioTracks: currentStream?.getAudioTracks().length || 0 + }); + + if (!currentStream || !isMicrophoneActive) { + const errorDetails = { + hasStream: !!currentStream, + isActive: isMicrophoneActive, + storeStream: !!microphoneStream, + refStream: !!microphoneStreamRef.current, + streamId: currentStream?.id, + audioTracks: currentStream?.getAudioTracks().length || 0 + }; + console.warn("Microphone mute failed: stream or active state missing", errorDetails); + + // Provide more specific error message + let errorMessage = 'Microphone is not active'; + if (!currentStream) { + errorMessage = 'No microphone stream found. Please restart the microphone.'; + } else if (!isMicrophoneActive) { + errorMessage = 'Microphone is not marked as active. Please restart the microphone.'; + } + + return { + success: false, + error: { + type: 'device', + message: errorMessage + } + }; + } + + const audioTracks = currentStream.getAudioTracks(); + if (audioTracks.length === 0) { + return { + success: false, + error: { + type: 'device', + message: 'No audio tracks found in microphone stream' + } + }; + } + + const newMutedState = !isMicrophoneMuted; + + // Mute/unmute the audio track + audioTracks.forEach(track => { + track.enabled = !newMutedState; + console.log(`Audio track ${track.id} enabled: ${track.enabled}`); + }); + + setMicrophoneMuted(newMutedState); + + // Notify backend about mute state + try { + await api.POST("/microphone/mute", { muted: newMutedState }); + } catch (error) { + console.warn("Failed to notify backend about microphone mute:", error); + } + + return { success: true }; + } catch (error) { + console.error("Failed to toggle microphone mute:", error); + return { + success: false, + error: { + type: 'unknown', + message: error instanceof Error ? error.message : 'Failed to toggle microphone mute' + } + }; + } + }, [microphoneStream, isMicrophoneActive, isMicrophoneMuted, setMicrophoneMuted]); + + // Function to check WebRTC audio transmission stats + const checkAudioTransmissionStats = useCallback(async () => { + if (!microphoneSender) { + console.log("No microphone sender available"); + return null; + } + + try { + const stats = await microphoneSender.getStats(); + const audioStats: any[] = []; + + stats.forEach((report, id) => { + if (report.type === 'outbound-rtp' && report.kind === 'audio') { + audioStats.push({ + id, + type: report.type, + kind: report.kind, + packetsSent: report.packetsSent, + bytesSent: report.bytesSent, + timestamp: report.timestamp, + ssrc: report.ssrc + }); + } + }); + + console.log("Audio transmission stats:", audioStats); + return audioStats; + } catch (error) { + console.error("Failed to get audio transmission stats:", error); + return null; + } + }, [microphoneSender]); + + // Comprehensive test function to diagnose microphone issues + const testMicrophoneAudio = useCallback(async () => { + console.log("=== MICROPHONE AUDIO TEST ==="); + + // 1. Check if we have a stream + const stream = microphoneStreamRef.current; + if (!stream) { + console.log("❌ No microphone stream available"); + return; + } + + console.log("✅ Microphone stream exists:", stream.id); + + // 2. Check audio tracks + const audioTracks = stream.getAudioTracks(); + console.log("Audio tracks:", audioTracks.length); + + if (audioTracks.length === 0) { + console.log("❌ No audio tracks in stream"); + return; + } + + const track = audioTracks[0]; + console.log("✅ Audio track details:", { + id: track.id, + label: track.label, + enabled: track.enabled, + readyState: track.readyState, + muted: track.muted + }); + + // 3. Test audio level detection manually + try { + const audioContext = new (window.AudioContext || (window as any).webkitAudioContext)(); + const analyser = audioContext.createAnalyser(); + const source = audioContext.createMediaStreamSource(stream); + + analyser.fftSize = 256; + source.connect(analyser); + + const dataArray = new Uint8Array(analyser.frequencyBinCount); + + console.log("🎤 Testing audio level detection for 5 seconds..."); + console.log("Please speak into your microphone now!"); + + let maxLevel = 0; + let sampleCount = 0; + + const testInterval = setInterval(() => { + analyser.getByteFrequencyData(dataArray); + + let sum = 0; + for (let i = 0; i < dataArray.length; i++) { + sum += dataArray[i] * dataArray[i]; + } + const rms = Math.sqrt(sum / dataArray.length); + const level = Math.min(100, (rms / 255) * 100); + + maxLevel = Math.max(maxLevel, level); + sampleCount++; + + if (sampleCount % 10 === 0) { // Log every 10th sample + console.log(`Audio level: ${level.toFixed(1)}% (max so far: ${maxLevel.toFixed(1)}%)`); + } + }, 100); + + setTimeout(() => { + clearInterval(testInterval); + source.disconnect(); + audioContext.close(); + + console.log("🎤 Audio test completed!"); + console.log(`Maximum audio level detected: ${maxLevel.toFixed(1)}%`); + + if (maxLevel > 5) { + console.log("✅ Microphone is detecting audio!"); + } else { + console.log("❌ No significant audio detected. Check microphone permissions and hardware."); + } + }, 5000); + + } catch (error) { + console.error("❌ Failed to test audio level:", error); + } + + // 4. Check WebRTC sender + if (microphoneSender) { + console.log("✅ WebRTC sender exists"); + console.log("Sender track:", { + id: microphoneSender.track?.id, + kind: microphoneSender.track?.kind, + enabled: microphoneSender.track?.enabled, + readyState: microphoneSender.track?.readyState + }); + + // Check if sender track matches stream track + if (microphoneSender.track === track) { + console.log("✅ Sender track matches stream track"); + } else { + console.log("❌ Sender track does NOT match stream track"); + } + } else { + console.log("❌ No WebRTC sender available"); + } + + // 5. Check peer connection + if (peerConnection) { + console.log("✅ Peer connection exists"); + console.log("Connection state:", peerConnection.connectionState); + console.log("ICE connection state:", peerConnection.iceConnectionState); + + const transceivers = peerConnection.getTransceivers(); + const audioTransceivers = transceivers.filter(t => + t.sender.track?.kind === 'audio' || t.receiver.track?.kind === 'audio' + ); + + console.log("Audio transceivers:", audioTransceivers.map(t => ({ + direction: t.direction, + senderTrack: t.sender.track?.id, + receiverTrack: t.receiver.track?.id + }))); + } else { + console.log("❌ No peer connection available"); + } + + }, [microphoneSender, peerConnection]); + + // Make debug functions available globally for console access + useEffect(() => { + (window as any).debugMicrophone = debugMicrophoneState; + (window as any).checkAudioStats = checkAudioTransmissionStats; + (window as any).testMicrophoneAudio = testMicrophoneAudio; + return () => { + delete (window as any).debugMicrophone; + delete (window as any).checkAudioStats; + delete (window as any).testMicrophoneAudio; + }; + }, [debugMicrophoneState, checkAudioTransmissionStats, testMicrophoneAudio]); + + // Sync state on mount + useEffect(() => { + syncMicrophoneState(); + }, [syncMicrophoneState]); + + // Cleanup on unmount - use ref to avoid dependency on stopMicrophoneStream + useEffect(() => { + return () => { + // Clean up stream directly without depending on the callback + const stream = microphoneStreamRef.current; + if (stream) { + console.log("Cleanup: stopping microphone stream on unmount"); + stream.getAudioTracks().forEach(track => { + track.stop(); + console.log(`Cleanup: stopped audio track ${track.id}`); + }); + microphoneStreamRef.current = null; + } + }; + }, []); // No dependencies to prevent re-running + + return { + isMicrophoneActive, + isMicrophoneMuted, + microphoneStream, + startMicrophone, + stopMicrophone, + toggleMicrophoneMute, + syncMicrophoneState, + debugMicrophoneState, + }; +} \ No newline at end of file diff --git a/ui/src/routes/devices.$id.tsx b/ui/src/routes/devices.$id.tsx index 3b90090..d652f87 100644 --- a/ui/src/routes/devices.$id.tsx +++ b/ui/src/routes/devices.$id.tsx @@ -33,6 +33,7 @@ import { useVideoStore, VideoState, } from "@/hooks/stores"; +import { useMicrophone } from "@/hooks/useMicrophone"; import WebRTCVideo from "@components/WebRTCVideo"; import { checkAuth, isInCloud, isOnDevice } from "@/main"; import DashboardNavbar from "@components/Header"; @@ -142,6 +143,9 @@ export default function KvmIdRoute() { const setTransceiver = useRTCStore(state => state.setTransceiver); const location = useLocation(); + // Microphone hook - moved here to prevent unmounting when popover closes + const microphoneHook = useMicrophone(); + const isLegacySignalingEnabled = useRef(false); const [connectionFailed, setConnectionFailed] = useState(false); @@ -480,8 +484,8 @@ export default function KvmIdRoute() { }; setTransceiver(pc.addTransceiver("video", { direction: "recvonly" })); - // Add audio transceiver to receive audio from the server - pc.addTransceiver("audio", { direction: "recvonly" }); + // Add audio transceiver to receive audio from the server and send microphone audio + pc.addTransceiver("audio", { direction: "sendrecv" }); const rpcDataChannel = pc.createDataChannel("rpc"); rpcDataChannel.onopen = () => { @@ -831,7 +835,7 @@ export default function KvmIdRoute() { />
- +
3 { + c.JSON(400, gin.H{"error": "invalid quality level (0-3)"}) + return + } + + audio.SetMicrophoneQuality(audio.AudioQuality(req.Quality)) + c.JSON(200, gin.H{ + "quality": req.Quality, + "config": audio.GetMicrophoneConfig(), + }) + }) + + // Microphone API endpoints + protected.GET("/microphone/status", func(c *gin.Context) { + sessionActive := currentSession != nil + var running bool + + if sessionActive && currentSession.AudioInputManager != nil { + running = currentSession.AudioInputManager.IsRunning() + } + + c.JSON(200, gin.H{ + "running": running, + "session_active": sessionActive, + }) + }) + + protected.POST("/microphone/start", func(c *gin.Context) { + if currentSession == nil { + c.JSON(400, gin.H{"error": "no active session"}) + return + } + + if currentSession.AudioInputManager == nil { + c.JSON(500, gin.H{"error": "audio input manager not available"}) + return + } + + err := currentSession.AudioInputManager.Start() + if err != nil { + c.JSON(500, gin.H{"error": err.Error()}) + return + } + + c.JSON(200, gin.H{ + "status": "started", + "running": currentSession.AudioInputManager.IsRunning(), + }) + }) + + protected.POST("/microphone/stop", func(c *gin.Context) { + if currentSession == nil { + c.JSON(400, gin.H{"error": "no active session"}) + return + } + + if currentSession.AudioInputManager == nil { + c.JSON(500, gin.H{"error": "audio input manager not available"}) + return + } + + currentSession.AudioInputManager.Stop() + c.JSON(200, gin.H{ + "status": "stopped", + "running": currentSession.AudioInputManager.IsRunning(), + }) + }) + + protected.POST("/microphone/mute", func(c *gin.Context) { + var req struct { + Muted bool `json:"muted"` + } + + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(400, gin.H{"error": "invalid request body"}) + return + } + + // Note: Microphone muting is typically handled at the frontend level + // This endpoint is provided for consistency but doesn't affect backend processing + c.JSON(200, gin.H{ + "status": "mute state updated", + "muted": req.Muted, + }) + }) + + protected.GET("/microphone/metrics", func(c *gin.Context) { + if currentSession == nil || currentSession.AudioInputManager == nil { + c.JSON(200, gin.H{ + "frames_sent": 0, + "frames_dropped": 0, + "bytes_processed": 0, + "last_frame_time": "", + "connection_drops": 0, + "average_latency": "0s", + }) + return + } + + metrics := currentSession.AudioInputManager.GetMetrics() + c.JSON(200, gin.H{ + "frames_sent": metrics.FramesSent, + "frames_dropped": metrics.FramesDropped, + "bytes_processed": metrics.BytesProcessed, + "last_frame_time": metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + "connection_drops": metrics.ConnectionDrops, + "average_latency": metrics.AverageLatency.String(), + }) + }) + // Catch-all route for SPA r.NoRoute(func(c *gin.Context) { if c.Request.Method == "GET" && c.NegotiateFormat(gin.MIMEHTML) == gin.MIMEHTML { @@ -243,26 +373,63 @@ func handleWebRTCSession(c *gin.Context) { return } - session, err := newSession(SessionConfig{}) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err}) - return + var session *Session + var err error + var sd string + + // Check if we have an existing session and handle renegotiation + if currentSession != nil { + logger.Info().Msg("handling renegotiation for existing session") + + // Handle renegotiation with existing session + sd, err = currentSession.ExchangeOffer(req.Sd) + if err != nil { + logger.Warn().Err(err).Msg("renegotiation failed, creating new session") + // If renegotiation fails, fall back to creating a new session + session, err = newSession(SessionConfig{}) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err}) + return + } + + sd, err = session.ExchangeOffer(req.Sd) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err}) + return + } + + // Close the old session + writeJSONRPCEvent("otherSessionConnected", nil, currentSession) + peerConn := currentSession.peerConnection + go func() { + time.Sleep(1 * time.Second) + _ = peerConn.Close() + }() + + currentSession = session + logger.Info().Interface("session", session).Msg("new session created after renegotiation failure") + } else { + logger.Info().Msg("renegotiation successful") + } + } else { + // No existing session, create a new one + logger.Info().Msg("creating new session") + session, err = newSession(SessionConfig{}) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err}) + return + } + + sd, err = session.ExchangeOffer(req.Sd) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": err}) + return + } + + currentSession = session + logger.Info().Interface("session", session).Msg("new session accepted") } - sd, err := session.ExchangeOffer(req.Sd) - if err != nil { - c.JSON(http.StatusInternalServerError, gin.H{"error": err}) - return - } - if currentSession != nil { - writeJSONRPCEvent("otherSessionConnected", nil, currentSession) - peerConn := currentSession.peerConnection - go func() { - time.Sleep(1 * time.Second) - _ = peerConn.Close() - }() - } - currentSession = session c.JSON(http.StatusOK, gin.H{"sd": sd}) } diff --git a/webrtc.go b/webrtc.go index f14b72a..cb136b2 100644 --- a/webrtc.go +++ b/webrtc.go @@ -10,6 +10,7 @@ import ( "github.com/coder/websocket" "github.com/coder/websocket/wsjson" "github.com/gin-gonic/gin" + "github.com/jetkvm/kvm/internal/audio" "github.com/jetkvm/kvm/internal/logging" "github.com/pion/webrtc/v4" "github.com/rs/zerolog" @@ -23,6 +24,7 @@ type Session struct { RPCChannel *webrtc.DataChannel HidChannel *webrtc.DataChannel DiskChannel *webrtc.DataChannel + AudioInputManager *audio.AudioInputManager shouldUmountVirtualMedia bool } @@ -105,7 +107,10 @@ func newSession(config SessionConfig) (*Session, error) { if err != nil { return nil, err } - session := &Session{peerConnection: peerConnection} + session := &Session{ + peerConnection: peerConnection, + AudioInputManager: audio.NewAudioInputManager(), + } peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") @@ -113,7 +118,7 @@ func newSession(config SessionConfig) (*Session, error) { case "rpc": session.RPCChannel = d d.OnMessage(func(msg webrtc.DataChannelMessage) { - go onRPCMessage(msg, session) + go onRPCMessageThrottled(msg, session) }) triggerOTAStateUpdate() triggerVideoStateUpdate() @@ -147,10 +152,42 @@ func newSession(config SessionConfig) (*Session, error) { return nil, err } - audioRtpSender, err := peerConnection.AddTrack(session.AudioTrack) + // Add bidirectional audio transceiver for microphone input + audioTransceiver, err := peerConnection.AddTransceiverFromTrack(session.AudioTrack, webrtc.RTPTransceiverInit{ + Direction: webrtc.RTPTransceiverDirectionSendrecv, + }) if err != nil { return nil, err } + audioRtpSender := audioTransceiver.Sender() + + // Handle incoming audio track (microphone from browser) + peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + scopedLogger.Info().Str("codec", track.Codec().MimeType).Str("id", track.ID()).Msg("Got remote track") + + if track.Kind() == webrtc.RTPCodecTypeAudio && track.Codec().MimeType == webrtc.MimeTypeOpus { + scopedLogger.Info().Msg("Processing incoming audio track for microphone input") + + go func() { + for { + rtpPacket, _, err := track.ReadRTP() + if err != nil { + scopedLogger.Debug().Err(err).Msg("Error reading RTP packet from audio track") + return + } + + // Extract Opus payload from RTP packet + opusPayload := rtpPacket.Payload + if len(opusPayload) > 0 && session.AudioInputManager != nil { + err := session.AudioInputManager.WriteOpusFrame(opusPayload) + if err != nil { + scopedLogger.Warn().Err(err).Msg("Failed to write Opus frame to audio input manager") + } + } + } + }() + } + }) // Read incoming RTCP packets // Before these packets are returned they are processed by interceptors. For things @@ -196,6 +233,10 @@ func newSession(config SessionConfig) (*Session, error) { err := rpcUnmountImage() scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close") } + // Stop audio input manager + if session.AudioInputManager != nil { + session.AudioInputManager.Stop() + } if isConnected { isConnected = false actionSessions--