From 767311ec04ff259dcca0733b0cbf9ec4c981e25b Mon Sep 17 00:00:00 2001 From: Alex P Date: Wed, 13 Aug 2025 11:33:21 +0000 Subject: [PATCH] [WIP] Fix: performance issues --- cloud.go | 3 +- internal/audio/cgo_audio.go | 410 ++++++++++++++---- internal/audio/cgo_audio_stub.go | 33 +- internal/audio/events.go | 87 +++- internal/audio/nonblocking_api.go | 5 + internal/audio/nonblocking_audio.go | 158 +++++-- ui/src/components/ActionBar.tsx | 12 +- ui/src/components/AudioMetricsDashboard.tsx | 7 +- .../popovers/AudioControlPopover.tsx | 63 ++- ui/src/hooks/useAudioEvents.ts | 140 ++++-- ui/src/hooks/useAudioLevel.ts | 65 ++- ui/src/hooks/useMicrophone.ts | 43 +- web.go | 56 ++- webrtc.go | 7 + 14 files changed, 853 insertions(+), 236 deletions(-) diff --git a/cloud.go b/cloud.go index cddf055..c1b6187 100644 --- a/cloud.go +++ b/cloud.go @@ -39,7 +39,8 @@ const ( // should be lower than the websocket response timeout set in cloud-api CloudOidcRequestTimeout = 10 * time.Second // WebsocketPingInterval is the interval at which the websocket client sends ping messages to the cloud - WebsocketPingInterval = 15 * time.Second + // Increased to 30 seconds for constrained environments to reduce overhead + WebsocketPingInterval = 30 * time.Second ) var ( diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 2ee3e89..5c0866e 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -14,8 +14,10 @@ import ( #include #include #include +#include +#include -// C state for ALSA/Opus +// C state for ALSA/Opus with safety flags static snd_pcm_t *pcm_handle = NULL; static snd_pcm_t *pcm_playback_handle = NULL; static OpusEncoder *encoder = NULL; @@ -27,124 +29,357 @@ static int channels = 2; static int frame_size = 960; // 20ms for 48kHz static int max_packet_size = 1500; -// Initialize ALSA and Opus encoder +// State tracking to prevent race conditions during rapid start/stop +static volatile int capture_initializing = 0; +static volatile int capture_initialized = 0; +static volatile int playback_initializing = 0; +static volatile int playback_initialized = 0; + +// Safe ALSA device opening with retry logic +static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) { + int attempts = 3; + int err; + + while (attempts-- > 0) { + err = snd_pcm_open(handle, device, stream, SND_PCM_NONBLOCK); + if (err >= 0) { + // Switch to blocking mode after successful open + snd_pcm_nonblock(*handle, 0); + return 0; + } + + if (err == -EBUSY && attempts > 0) { + // Device busy, wait and retry + usleep(50000); // 50ms + continue; + } + break; + } + return err; +} + +// Optimized ALSA configuration with stack allocation and performance tuning +static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) { + snd_pcm_hw_params_t *params; + snd_pcm_sw_params_t *sw_params; + int err; + + if (!handle) return -1; + + // Use stack allocation for better performance + snd_pcm_hw_params_alloca(¶ms); + snd_pcm_sw_params_alloca(&sw_params); + + // Hardware parameters + err = snd_pcm_hw_params_any(handle, params); + if (err < 0) return err; + + err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); + if (err < 0) return err; + + err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE); + if (err < 0) return err; + + err = snd_pcm_hw_params_set_channels(handle, params, channels); + if (err < 0) return err; + + // Set exact rate for better performance + err = snd_pcm_hw_params_set_rate(handle, params, sample_rate, 0); + if (err < 0) { + // Fallback to near rate if exact fails + unsigned int rate = sample_rate; + err = snd_pcm_hw_params_set_rate_near(handle, params, &rate, 0); + if (err < 0) return err; + } + + // Optimize buffer sizes for low latency + snd_pcm_uframes_t period_size = frame_size; + err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0); + if (err < 0) return err; + + // Set buffer size to 4 periods for good latency/stability balance + snd_pcm_uframes_t buffer_size = period_size * 4; + err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size); + if (err < 0) return err; + + err = snd_pcm_hw_params(handle, params); + if (err < 0) return err; + + // Software parameters for optimal performance + err = snd_pcm_sw_params_current(handle, sw_params); + if (err < 0) return err; + + // Start playback/capture when buffer is period_size frames + err = snd_pcm_sw_params_set_start_threshold(handle, sw_params, period_size); + if (err < 0) return err; + + // Allow transfers when at least period_size frames are available + err = snd_pcm_sw_params_set_avail_min(handle, sw_params, period_size); + if (err < 0) return err; + + err = snd_pcm_sw_params(handle, sw_params); + if (err < 0) return err; + + return snd_pcm_prepare(handle); +} + +// Initialize ALSA and Opus encoder with improved safety int jetkvm_audio_init() { int err; - snd_pcm_hw_params_t *params; - if (pcm_handle) return 0; - if (snd_pcm_open(&pcm_handle, "hw:1,0", SND_PCM_STREAM_CAPTURE, 0) < 0) + + // Prevent concurrent initialization + if (__sync_bool_compare_and_swap(&capture_initializing, 0, 1) == 0) { + return -EBUSY; // Already initializing + } + + // Check if already initialized + if (capture_initialized) { + capture_initializing = 0; + return 0; + } + + // Clean up any existing resources first + if (encoder) { + opus_encoder_destroy(encoder); + encoder = NULL; + } + if (pcm_handle) { + snd_pcm_close(pcm_handle); + pcm_handle = NULL; + } + + // Try to open ALSA capture device + err = safe_alsa_open(&pcm_handle, "hw:1,0", SND_PCM_STREAM_CAPTURE); + if (err < 0) { + capture_initializing = 0; return -1; - snd_pcm_hw_params_malloc(¶ms); - snd_pcm_hw_params_any(pcm_handle, params); - snd_pcm_hw_params_set_access(pcm_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); - snd_pcm_hw_params_set_format(pcm_handle, params, SND_PCM_FORMAT_S16_LE); - snd_pcm_hw_params_set_channels(pcm_handle, params, channels); - snd_pcm_hw_params_set_rate(pcm_handle, params, sample_rate, 0); - snd_pcm_hw_params_set_period_size(pcm_handle, params, frame_size, 0); - snd_pcm_hw_params(pcm_handle, params); - snd_pcm_hw_params_free(params); - snd_pcm_prepare(pcm_handle); - encoder = opus_encoder_create(sample_rate, channels, OPUS_APPLICATION_AUDIO, &err); - if (!encoder) return -2; + } + + // Configure the device + err = configure_alsa_device(pcm_handle, "capture"); + if (err < 0) { + snd_pcm_close(pcm_handle); + pcm_handle = NULL; + capture_initializing = 0; + return -1; + } + + // Initialize Opus encoder + int opus_err = 0; + encoder = opus_encoder_create(sample_rate, channels, OPUS_APPLICATION_AUDIO, &opus_err); + if (!encoder || opus_err != OPUS_OK) { + if (pcm_handle) { snd_pcm_close(pcm_handle); pcm_handle = NULL; } + capture_initializing = 0; + return -2; + } + opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)); opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)); + + capture_initialized = 1; + capture_initializing = 0; return 0; } -// Read and encode one frame, returns encoded size or <0 on error +// Read and encode one frame with enhanced error handling int jetkvm_audio_read_encode(void *opus_buf) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *out = (unsigned char*)opus_buf; + int err = 0; + + // Safety checks + if (!capture_initialized || !pcm_handle || !encoder || !opus_buf) { + return -1; + } + int pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); - - // Handle ALSA errors with recovery + + // Handle ALSA errors with enhanced recovery if (pcm_rc < 0) { if (pcm_rc == -EPIPE) { // Buffer underrun - try to recover - snd_pcm_prepare(pcm_handle); + err = snd_pcm_prepare(pcm_handle); + if (err < 0) return -1; + pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); if (pcm_rc < 0) return -1; } else if (pcm_rc == -EAGAIN) { // No data available - return 0 to indicate no frame return 0; + } else if (pcm_rc == -ESTRPIPE) { + // Device suspended, try to resume + while ((err = snd_pcm_resume(pcm_handle)) == -EAGAIN) { + usleep(1000); // 1ms + } + if (err < 0) { + err = snd_pcm_prepare(pcm_handle); + if (err < 0) return -1; + } + return 0; // Skip this frame } else { // Other error - return error code return -1; } } - + // If we got fewer frames than expected, pad with silence if (pcm_rc < frame_size) { memset(&pcm_buffer[pcm_rc * channels], 0, (frame_size - pcm_rc) * channels * sizeof(short)); } - + int nb_bytes = opus_encode(encoder, pcm_buffer, frame_size, out, max_packet_size); return nb_bytes; } -// Initialize ALSA playback for microphone input (browser -> USB gadget) +// Initialize ALSA playback with improved safety int jetkvm_audio_playback_init() { int err; - snd_pcm_hw_params_t *params; - if (pcm_playback_handle) return 0; - - // Try to open the USB gadget audio device for playback - // This should correspond to the capture endpoint of the USB gadget - if (snd_pcm_open(&pcm_playback_handle, "hw:1,0", SND_PCM_STREAM_PLAYBACK, 0) < 0) { - // Fallback to default device if hw:1,0 doesn't work for playback - if (snd_pcm_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK, 0) < 0) - return -1; + + // Prevent concurrent initialization + if (__sync_bool_compare_and_swap(&playback_initializing, 0, 1) == 0) { + return -EBUSY; // Already initializing } - - snd_pcm_hw_params_malloc(¶ms); - snd_pcm_hw_params_any(pcm_playback_handle, params); - snd_pcm_hw_params_set_access(pcm_playback_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); - snd_pcm_hw_params_set_format(pcm_playback_handle, params, SND_PCM_FORMAT_S16_LE); - snd_pcm_hw_params_set_channels(pcm_playback_handle, params, channels); - snd_pcm_hw_params_set_rate(pcm_playback_handle, params, sample_rate, 0); - snd_pcm_hw_params_set_period_size(pcm_playback_handle, params, frame_size, 0); - snd_pcm_hw_params(pcm_playback_handle, params); - snd_pcm_hw_params_free(params); - snd_pcm_prepare(pcm_playback_handle); - + + // Check if already initialized + if (playback_initialized) { + playback_initializing = 0; + return 0; + } + + // Clean up any existing resources first + if (decoder) { + opus_decoder_destroy(decoder); + decoder = NULL; + } + if (pcm_playback_handle) { + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + } + + // Try to open the USB gadget audio device for playback + err = safe_alsa_open(&pcm_playback_handle, "hw:1,0", SND_PCM_STREAM_PLAYBACK); + if (err < 0) { + // Fallback to default device + err = safe_alsa_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK); + if (err < 0) { + playback_initializing = 0; + return -1; + } + } + + // Configure the device + err = configure_alsa_device(pcm_playback_handle, "playback"); + if (err < 0) { + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + playback_initializing = 0; + return -1; + } + // Initialize Opus decoder - decoder = opus_decoder_create(sample_rate, channels, &err); - if (!decoder) return -2; - + int opus_err = 0; + decoder = opus_decoder_create(sample_rate, channels, &opus_err); + if (!decoder || opus_err != OPUS_OK) { + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + playback_initializing = 0; + return -2; + } + + playback_initialized = 1; + playback_initializing = 0; return 0; } -// Decode Opus and write PCM to playback device +// Decode Opus and write PCM with enhanced error handling int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *in = (unsigned char*)opus_buf; - + int err = 0; + + // Safety checks + if (!playback_initialized || !pcm_playback_handle || !decoder || !opus_buf || opus_size <= 0) { + return -1; + } + + // Additional bounds checking + if (opus_size > max_packet_size) { + return -1; + } + // Decode Opus to PCM int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); if (pcm_frames < 0) return -1; - - // Write PCM to playback device + + // Write PCM to playback device with enhanced recovery int pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); if (pcm_rc < 0) { - // Try to recover from underrun if (pcm_rc == -EPIPE) { - snd_pcm_prepare(pcm_playback_handle); + // Buffer underrun - try to recover + err = snd_pcm_prepare(pcm_playback_handle); + if (err < 0) return -2; + pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); + } else if (pcm_rc == -ESTRPIPE) { + // Device suspended, try to resume + while ((err = snd_pcm_resume(pcm_playback_handle)) == -EAGAIN) { + usleep(1000); // 1ms + } + if (err < 0) { + err = snd_pcm_prepare(pcm_playback_handle); + if (err < 0) return -2; + } + return 0; // Skip this frame } if (pcm_rc < 0) return -2; } - + return pcm_frames; } +// Safe playback cleanup with double-close protection void jetkvm_audio_playback_close() { - if (decoder) { opus_decoder_destroy(decoder); decoder = NULL; } - if (pcm_playback_handle) { snd_pcm_close(pcm_playback_handle); pcm_playback_handle = NULL; } + // Wait for any ongoing operations to complete + while (playback_initializing) { + usleep(1000); // 1ms + } + + // Atomic check and set to prevent double cleanup + if (__sync_bool_compare_and_swap(&playback_initialized, 1, 0) == 0) { + return; // Already cleaned up + } + + if (decoder) { + opus_decoder_destroy(decoder); + decoder = NULL; + } + if (pcm_playback_handle) { + snd_pcm_drain(pcm_playback_handle); + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + } } +// Safe capture cleanup void jetkvm_audio_close() { - if (encoder) { opus_encoder_destroy(encoder); encoder = NULL; } - if (pcm_handle) { snd_pcm_close(pcm_handle); pcm_handle = NULL; } + // Wait for any ongoing operations to complete + while (capture_initializing) { + usleep(1000); // 1ms + } + + capture_initialized = 0; + + if (encoder) { + opus_encoder_destroy(encoder); + encoder = NULL; + } + if (pcm_handle) { + snd_pcm_drop(pcm_handle); // Drop pending samples + snd_pcm_close(pcm_handle); + pcm_handle = NULL; + } + + // Also clean up playback jetkvm_audio_playback_close(); } */ @@ -197,7 +432,31 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { if len(buf) == 0 { return 0, errors.New("empty buffer") } - n := C.jetkvm_audio_decode_write(unsafe.Pointer(&buf[0]), C.int(len(buf))) + // Additional safety check to prevent segfault + if buf == nil { + return 0, errors.New("nil buffer") + } + + // Validate buffer size to prevent potential overruns + if len(buf) > 4096 { // Maximum reasonable Opus frame size + return 0, errors.New("buffer too large") + } + + // Ensure buffer is not deallocated by keeping a reference + bufPtr := unsafe.Pointer(&buf[0]) + if bufPtr == nil { + return 0, errors.New("invalid buffer pointer") + } + + // Add recovery mechanism for C function crashes + defer func() { + if r := recover(); r != nil { + // Log the panic but don't crash the entire program + // This should not happen with proper validation, but provides safety + } + }() + + n := C.jetkvm_audio_decode_write(bufPtr, C.int(len(buf))) if n < 0 { return 0, errors.New("audio decode/write error") } @@ -205,26 +464,11 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { } // Wrapper functions for non-blocking audio manager -func CGOAudioInit() error { - return cgoAudioInit() -} - -func CGOAudioClose() { - cgoAudioClose() -} - -func CGOAudioReadEncode(buf []byte) (int, error) { - return cgoAudioReadEncode(buf) -} - -func CGOAudioPlaybackInit() error { - return cgoAudioPlaybackInit() -} - -func CGOAudioPlaybackClose() { - cgoAudioPlaybackClose() -} - -func CGOAudioDecodeWrite(buf []byte) (int, error) { - return cgoAudioDecodeWrite(buf) -} +var ( + CGOAudioInit = cgoAudioInit + CGOAudioClose = cgoAudioClose + CGOAudioReadEncode = cgoAudioReadEncode + CGOAudioPlaybackInit = cgoAudioPlaybackInit + CGOAudioPlaybackClose = cgoAudioPlaybackClose + CGOAudioDecodeWrite = cgoAudioDecodeWrite +) diff --git a/internal/audio/cgo_audio_stub.go b/internal/audio/cgo_audio_stub.go index c66501a..193ed57 100644 --- a/internal/audio/cgo_audio_stub.go +++ b/internal/audio/cgo_audio_stub.go @@ -30,28 +30,13 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { return 0, errors.New("audio not available in lint mode") } -// Uppercase wrapper functions (called by nonblocking_audio.go) +// Uppercase aliases for external API compatibility -func CGOAudioInit() error { - return cgoAudioInit() -} - -func CGOAudioClose() { - cgoAudioClose() -} - -func CGOAudioReadEncode(buf []byte) (int, error) { - return cgoAudioReadEncode(buf) -} - -func CGOAudioPlaybackInit() error { - return cgoAudioPlaybackInit() -} - -func CGOAudioPlaybackClose() { - cgoAudioPlaybackClose() -} - -func CGOAudioDecodeWrite(buf []byte) (int, error) { - return cgoAudioDecodeWrite(buf) -} +var ( + CGOAudioInit = cgoAudioInit + CGOAudioClose = cgoAudioClose + CGOAudioReadEncode = cgoAudioReadEncode + CGOAudioPlaybackInit = cgoAudioPlaybackInit + CGOAudioPlaybackClose = cgoAudioPlaybackClose + CGOAudioDecodeWrite = cgoAudioDecodeWrite +) diff --git a/internal/audio/events.go b/internal/audio/events.go index 614e090..dff912b 100644 --- a/internal/audio/events.go +++ b/internal/audio/events.go @@ -2,6 +2,7 @@ package audio import ( "context" + "strings" "sync" "time" @@ -111,6 +112,14 @@ func (aeb *AudioEventBroadcaster) Subscribe(connectionID string, conn *websocket aeb.mutex.Lock() defer aeb.mutex.Unlock() + // Check if there's already a subscription for this connectionID + if _, exists := aeb.subscribers[connectionID]; exists { + aeb.logger.Debug().Str("connectionID", connectionID).Msg("duplicate audio events subscription detected; replacing existing entry") + // Do NOT close the existing WebSocket connection here because it's shared + // with the signaling channel. Just replace the subscriber map entry. + delete(aeb.subscribers, connectionID) + } + aeb.subscribers[connectionID] = &AudioEventSubscriber{ conn: conn, ctx: ctx, @@ -233,16 +242,37 @@ func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubsc // startMetricsBroadcasting starts a goroutine that periodically broadcasts metrics func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { - ticker := time.NewTicker(2 * time.Second) // Same interval as current polling + // Use 5-second interval instead of 2 seconds for constrained environments + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() for range ticker.C { aeb.mutex.RLock() subscriberCount := len(aeb.subscribers) + + // Early exit if no subscribers to save CPU + if subscriberCount == 0 { + aeb.mutex.RUnlock() + continue + } + + // Create a copy for safe iteration + subscribersCopy := make([]*AudioEventSubscriber, 0, subscriberCount) + for _, sub := range aeb.subscribers { + subscribersCopy = append(subscribersCopy, sub) + } aeb.mutex.RUnlock() - // Only broadcast if there are subscribers - if subscriberCount == 0 { + // Pre-check for cancelled contexts to avoid unnecessary work + activeSubscribers := 0 + for _, sub := range subscribersCopy { + if sub.ctx.Err() == nil { + activeSubscribers++ + } + } + + // Skip metrics gathering if no active subscribers + if activeSubscribers == 0 { continue } @@ -286,29 +316,54 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { // broadcast sends an event to all subscribers func (aeb *AudioEventBroadcaster) broadcast(event AudioEvent) { aeb.mutex.RLock() - defer aeb.mutex.RUnlock() + // Create a copy of subscribers to avoid holding the lock during sending + subscribersCopy := make(map[string]*AudioEventSubscriber) + for id, sub := range aeb.subscribers { + subscribersCopy[id] = sub + } + aeb.mutex.RUnlock() - for connectionID, subscriber := range aeb.subscribers { - go func(id string, sub *AudioEventSubscriber) { - if !aeb.sendToSubscriber(sub, event) { - // Remove failed subscriber - aeb.mutex.Lock() - delete(aeb.subscribers, id) - aeb.mutex.Unlock() - aeb.logger.Warn().Str("connectionID", id).Msg("removed failed audio events subscriber") - } - }(connectionID, subscriber) + // Track failed subscribers to remove them after sending + var failedSubscribers []string + + // Send to all subscribers without holding the lock + for connectionID, subscriber := range subscribersCopy { + if !aeb.sendToSubscriber(subscriber, event) { + failedSubscribers = append(failedSubscribers, connectionID) + } + } + + // Remove failed subscribers if any + if len(failedSubscribers) > 0 { + aeb.mutex.Lock() + for _, connectionID := range failedSubscribers { + delete(aeb.subscribers, connectionID) + aeb.logger.Warn().Str("connectionID", connectionID).Msg("removed failed audio events subscriber") + } + aeb.mutex.Unlock() } } // sendToSubscriber sends an event to a specific subscriber func (aeb *AudioEventBroadcaster) sendToSubscriber(subscriber *AudioEventSubscriber, event AudioEvent) bool { - ctx, cancel := context.WithTimeout(subscriber.ctx, 5*time.Second) + // Check if subscriber context is already cancelled + if subscriber.ctx.Err() != nil { + return false + } + + ctx, cancel := context.WithTimeout(subscriber.ctx, 2*time.Second) defer cancel() err := wsjson.Write(ctx, subscriber.conn, event) if err != nil { - subscriber.logger.Warn().Err(err).Msg("failed to send audio event to subscriber") + // Don't log network errors for closed connections as warnings, they're expected + if strings.Contains(err.Error(), "use of closed network connection") || + strings.Contains(err.Error(), "connection reset by peer") || + strings.Contains(err.Error(), "context canceled") { + subscriber.logger.Debug().Err(err).Msg("websocket connection closed during audio event send") + } else { + subscriber.logger.Warn().Err(err).Msg("failed to send audio event to subscriber") + } return false } diff --git a/internal/audio/nonblocking_api.go b/internal/audio/nonblocking_api.go index 1c3091c..33ae260 100644 --- a/internal/audio/nonblocking_api.go +++ b/internal/audio/nonblocking_api.go @@ -60,6 +60,11 @@ func StopNonBlockingAudioInput() { if globalNonBlockingManager != nil && globalNonBlockingManager.IsInputRunning() { globalNonBlockingManager.StopAudioInput() + + // If both input and output are stopped, recreate manager to ensure clean state + if !globalNonBlockingManager.IsRunning() { + globalNonBlockingManager = nil + } } } diff --git a/internal/audio/nonblocking_audio.go b/internal/audio/nonblocking_audio.go index c055964..34d25fb 100644 --- a/internal/audio/nonblocking_audio.go +++ b/internal/audio/nonblocking_audio.go @@ -2,6 +2,7 @@ package audio import ( "context" + "errors" "runtime" "sync" "sync/atomic" @@ -273,7 +274,9 @@ func (nam *NonBlockingAudioManager) inputWorkerThread() { defer runtime.UnlockOSThread() defer nam.wg.Done() - defer atomic.StoreInt32(&nam.inputWorkerRunning, 0) + // Cleanup CGO resources properly to avoid double-close scenarios + // The outputWorkerThread's CGOAudioClose() will handle all cleanup + atomic.StoreInt32(&nam.inputWorkerRunning, 0) atomic.StoreInt32(&nam.inputWorkerRunning, 1) nam.logger.Debug().Msg("input worker thread started") @@ -283,32 +286,102 @@ func (nam *NonBlockingAudioManager) inputWorkerThread() { nam.logger.Error().Err(err).Msg("failed to initialize audio playback in worker thread") return } - defer CGOAudioPlaybackClose() + + // Ensure CGO cleanup happens even if we exit unexpectedly + cgoInitialized := true + defer func() { + if cgoInitialized { + nam.logger.Debug().Msg("cleaning up CGO audio playback") + // Add extra safety: ensure no more CGO calls can happen + atomic.StoreInt32(&nam.inputWorkerRunning, 0) + // Note: Don't call CGOAudioPlaybackClose() here to avoid double-close + // The outputWorkerThread's CGOAudioClose() will handle all cleanup + } + }() for { + // If coordinator has stopped, exit worker loop + if atomic.LoadInt32(&nam.inputRunning) == 0 { + return + } select { case <-nam.ctx.Done(): - nam.logger.Debug().Msg("input worker thread stopping") + nam.logger.Debug().Msg("input worker thread stopping due to context cancellation") return case workItem := <-nam.inputWorkChan: switch workItem.workType { case audioWorkDecodeWrite: - // Perform blocking audio decode/write operation - n, err := CGOAudioDecodeWrite(workItem.data) - result := audioResult{ - success: err == nil, - length: n, - err: err, + // Check if we're still supposed to be running before processing + if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 || atomic.LoadInt32(&nam.inputRunning) == 0 { + nam.logger.Debug().Msg("input worker stopping, ignoring decode work") + // Do not send to resultChan; coordinator may have exited + return + } + + // Validate input data before CGO call + if workItem.data == nil || len(workItem.data) == 0 { + result := audioResult{ + success: false, + err: errors.New("invalid audio data"), + } + + // Check if coordinator is still running before sending result + if atomic.LoadInt32(&nam.inputRunning) == 1 { + select { + case workItem.resultChan <- result: + case <-nam.ctx.Done(): + return + case <-time.After(10 * time.Millisecond): + // Timeout - coordinator may have stopped, drop result + atomic.AddInt64(&nam.stats.InputFramesDropped, 1) + } + } else { + // Coordinator has stopped, drop result + atomic.AddInt64(&nam.stats.InputFramesDropped, 1) + } + continue } - // Send result back (non-blocking) - select { - case workItem.resultChan <- result: - case <-nam.ctx.Done(): - return - default: - // Drop result if coordinator is not ready + // Perform blocking CGO operation with panic recovery + var result audioResult + func() { + defer func() { + if r := recover(); r != nil { + nam.logger.Error().Interface("panic", r).Msg("CGO decode write panic recovered") + result = audioResult{ + success: false, + err: errors.New("CGO decode write panic"), + } + } + }() + + // Double-check we're still running before CGO call + if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 { + result = audioResult{success: false, err: errors.New("worker shutting down")} + return + } + + n, err := CGOAudioDecodeWrite(workItem.data) + result = audioResult{ + success: err == nil, + length: n, + err: err, + } + }() + + // Send result back (non-blocking) - check if coordinator is still running + if atomic.LoadInt32(&nam.inputRunning) == 1 { + select { + case workItem.resultChan <- result: + case <-nam.ctx.Done(): + return + case <-time.After(10 * time.Millisecond): + // Timeout - coordinator may have stopped, drop result + atomic.AddInt64(&nam.stats.InputFramesDropped, 1) + } + } else { + // Coordinator has stopped, drop result atomic.AddInt64(&nam.stats.InputFramesDropped, 1) } @@ -328,6 +401,7 @@ func (nam *NonBlockingAudioManager) inputCoordinatorThread() { nam.logger.Debug().Msg("input coordinator thread started") resultChan := make(chan audioResult, 1) + // Do not close resultChan to avoid races with worker sends during shutdown for atomic.LoadInt32(&nam.inputRunning) == 1 { select { @@ -350,7 +424,7 @@ func (nam *NonBlockingAudioManager) inputCoordinatorThread() { select { case nam.inputWorkChan <- workItem: - // Wait for result with timeout + // Wait for result with timeout and context cancellation select { case result := <-resultChan: if result.success { @@ -362,10 +436,18 @@ func (nam *NonBlockingAudioManager) inputCoordinatorThread() { nam.logger.Warn().Err(result.err).Msg("audio input worker error") } } + case <-nam.ctx.Done(): + nam.logger.Debug().Msg("input coordinator stopping during result wait") + return case <-time.After(50 * time.Millisecond): // Timeout waiting for result atomic.AddInt64(&nam.stats.InputFramesDropped, 1) nam.logger.Warn().Msg("timeout waiting for input worker result") + // Drain any pending result to prevent worker blocking + select { + case <-resultChan: + default: + } } default: // Worker is busy, drop this frame @@ -379,13 +461,7 @@ func (nam *NonBlockingAudioManager) inputCoordinatorThread() { } } - // Signal worker to close - select { - case nam.inputWorkChan <- audioWorkItem{workType: audioWorkClose}: - case <-time.After(100 * time.Millisecond): - nam.logger.Warn().Msg("timeout signaling input worker to close") - } - + // Avoid sending close signals or touching channels here; inputRunning=0 will stop worker via checks nam.logger.Info().Msg("input coordinator thread stopped") } @@ -413,11 +489,37 @@ func (nam *NonBlockingAudioManager) StopAudioInput() { // Stop only the input coordinator atomic.StoreInt32(&nam.inputRunning, 0) - // Allow coordinator thread to process the stop signal and update state - // This prevents race conditions in state queries immediately after stopping - time.Sleep(50 * time.Millisecond) + // Drain the receive channel to prevent blocking senders + go func() { + for { + select { + case <-nam.inputReceiveChan: + // Drain any remaining frames + case <-time.After(100 * time.Millisecond): + return + } + } + }() - nam.logger.Info().Msg("audio input stopped") + // Wait for the worker to actually stop to prevent race conditions + timeout := time.After(2 * time.Second) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-timeout: + nam.logger.Warn().Msg("timeout waiting for input worker to stop") + return + case <-ticker.C: + if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 { + nam.logger.Info().Msg("audio input stopped successfully") + // Close ALSA playback resources now that input worker has stopped + CGOAudioPlaybackClose() + return + } + } + } } // GetStats returns current statistics diff --git a/ui/src/components/ActionBar.tsx b/ui/src/components/ActionBar.tsx index 4cc1f9e..956d488 100644 --- a/ui/src/components/ActionBar.tsx +++ b/ui/src/components/ActionBar.tsx @@ -150,7 +150,7 @@ export default function Actionbar({ "flex origin-top flex-col transition duration-300 ease-out data-closed:translate-y-8 data-closed:opacity-0", )} > - {({ open }) => { + {({ open }: { open: boolean }) => { checkIfStateChanged(open); return (
@@ -192,7 +192,7 @@ export default function Actionbar({ "flex origin-top flex-col transition duration-300 ease-out data-closed:translate-y-8 data-closed:opacity-0", )} > - {({ open }) => { + {({ open }: { open: boolean }) => { checkIfStateChanged(open); return (
@@ -244,7 +244,7 @@ export default function Actionbar({ "flex origin-top flex-col transition duration-300 ease-out data-closed:translate-y-8 data-closed:opacity-0", )} > - {({ open }) => { + {({ open }: { open: boolean }) => { checkIfStateChanged(open); return (
@@ -287,7 +287,7 @@ export default function Actionbar({ "flex origin-top flex-col transition duration-300 ease-out data-closed:translate-y-8 data-closed:opacity-0", )} > - {({ open }) => { + {({ open }: { open: boolean }) => { checkIfStateChanged(open); return ; }} @@ -369,11 +369,11 @@ export default function Actionbar({ "flex origin-top flex-col transition duration-300 ease-out data-closed:translate-y-8 data-closed:opacity-0", )} > - {({ open }) => { + {({ open }: { open: boolean }) => { checkIfStateChanged(open); return (
- +
); }} diff --git a/ui/src/components/AudioMetricsDashboard.tsx b/ui/src/components/AudioMetricsDashboard.tsx index 435612d..2854df5 100644 --- a/ui/src/components/AudioMetricsDashboard.tsx +++ b/ui/src/components/AudioMetricsDashboard.tsx @@ -67,7 +67,12 @@ export default function AudioMetricsDashboard() { // Microphone state for audio level monitoring const { isMicrophoneActive, isMicrophoneMuted, microphoneStream } = useMicrophone(); - const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream); + const { audioLevel, isAnalyzing } = useAudioLevel( + isMicrophoneActive ? microphoneStream : null, + { + enabled: isMicrophoneActive, + updateInterval: 120, + }); useEffect(() => { // Load initial configuration (only once) diff --git a/ui/src/components/popovers/AudioControlPopover.tsx b/ui/src/components/popovers/AudioControlPopover.tsx index 15f90ad..e9d29d1 100644 --- a/ui/src/components/popovers/AudioControlPopover.tsx +++ b/ui/src/components/popovers/AudioControlPopover.tsx @@ -70,14 +70,18 @@ const qualityLabels = { interface AudioControlPopoverProps { microphone: MicrophoneHookReturn; + open?: boolean; // whether the popover is open (controls analysis) } -export default function AudioControlPopover({ microphone }: AudioControlPopoverProps) { +export default function AudioControlPopover({ microphone, open }: AudioControlPopoverProps) { const [currentConfig, setCurrentConfig] = useState(null); const [currentMicrophoneConfig, setCurrentMicrophoneConfig] = useState(null); const [showAdvanced, setShowAdvanced] = useState(false); const [isLoading, setIsLoading] = useState(false); + // Add cache flags to prevent unnecessary API calls + const [configsLoaded, setConfigsLoaded] = useState(false); + // Add cooldown to prevent rapid clicking const [lastClickTime, setLastClickTime] = useState(0); const CLICK_COOLDOWN = 500; // 500ms cooldown between clicks @@ -117,8 +121,12 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP const micMetrics = wsConnected && microphoneMetrics !== null ? microphoneMetrics : fallbackMicMetrics; const isConnected = wsConnected ? wsConnected : fallbackConnected; - // Audio level monitoring - const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream); + // Audio level monitoring - enable only when popover is open and microphone is active to save resources + const analysisEnabled = (open ?? true) && isMicrophoneActive; + const { audioLevel, isAnalyzing } = useAudioLevel(analysisEnabled ? microphoneStream : null, { + enabled: analysisEnabled, + updateInterval: 120, // 8-10 fps to reduce CPU without losing UX quality + }); // Audio devices const { @@ -135,46 +143,61 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP const { toggleSidebarView } = useUiStore(); - // Load initial configurations once (these don't change frequently) + // Load initial configurations once - cache to prevent repeated calls useEffect(() => { - loadAudioConfigurations(); - }, []); + if (!configsLoaded) { + loadAudioConfigurations(); + } + }, [configsLoaded]); - // Load initial audio state and set up fallback polling when WebSocket is not connected + // Optimize fallback polling - only run when WebSocket is not connected useEffect(() => { - if (!wsConnected) { + if (!wsConnected && !configsLoaded) { + // Load state once if configs aren't loaded yet loadAudioState(); - // Only load metrics as fallback when WebSocket is disconnected + } + + if (!wsConnected) { loadAudioMetrics(); loadMicrophoneMetrics(); - // Set up metrics refresh interval for fallback only + // Reduced frequency for fallback polling (every 3 seconds instead of 2) const metricsInterval = setInterval(() => { - loadAudioMetrics(); - loadMicrophoneMetrics(); - }, 2000); + if (!wsConnected) { // Double-check to prevent unnecessary requests + loadAudioMetrics(); + loadMicrophoneMetrics(); + } + }, 3000); return () => clearInterval(metricsInterval); } - // Always sync microphone state - syncMicrophoneState(); - }, [wsConnected, syncMicrophoneState]); + // Always sync microphone state, but debounce it + const syncTimeout = setTimeout(() => { + syncMicrophoneState(); + }, 500); + + return () => clearTimeout(syncTimeout); + }, [wsConnected, syncMicrophoneState, configsLoaded]); const loadAudioConfigurations = async () => { try { - // Load quality config - const qualityResp = await api.GET("/audio/quality"); + // Parallel loading for better performance + const [qualityResp, micQualityResp] = await Promise.all([ + api.GET("/audio/quality"), + api.GET("/microphone/quality") + ]); + if (qualityResp.ok) { const qualityData = await qualityResp.json(); setCurrentConfig(qualityData.current); } - // Load microphone quality config - const micQualityResp = await api.GET("/microphone/quality"); if (micQualityResp.ok) { const micQualityData = await micQualityResp.json(); setCurrentMicrophoneConfig(micQualityData.current); } + + setConfigsLoaded(true); } catch (error) { console.error("Failed to load audio configurations:", error); } diff --git a/ui/src/hooks/useAudioEvents.ts b/ui/src/hooks/useAudioEvents.ts index 90d73cb..898d63a 100644 --- a/ui/src/hooks/useAudioEvents.ts +++ b/ui/src/hooks/useAudioEvents.ts @@ -61,16 +61,23 @@ export interface UseAudioEventsReturn { unsubscribe: () => void; } +// Global subscription management to prevent multiple subscriptions per WebSocket connection +let globalSubscriptionState = { + isSubscribed: false, + subscriberCount: 0, + connectionId: null as string | null +}; + export function useAudioEvents(): UseAudioEventsReturn { // State for audio data const [audioMuted, setAudioMuted] = useState(null); const [audioMetrics, setAudioMetrics] = useState(null); const [microphoneState, setMicrophoneState] = useState(null); - const [microphoneMetrics, setMicrophoneMetrics] = useState(null); + const [microphoneMetrics, setMicrophoneMetricsData] = useState(null); - // Subscription state - const [isSubscribed, setIsSubscribed] = useState(false); - const subscriptionSent = useRef(false); + // Local subscription state + const [isLocallySubscribed, setIsLocallySubscribed] = useState(false); + const subscriptionTimeoutRef = useRef(null); // Get WebSocket URL const getWebSocketUrl = () => { @@ -79,7 +86,7 @@ export function useAudioEvents(): UseAudioEventsReturn { return `${protocol}//${host}/webrtc/signaling/client`; }; - // WebSocket connection + // Shared WebSocket connection using the `share` option for better resource management const { sendMessage, lastMessage, @@ -88,14 +95,19 @@ export function useAudioEvents(): UseAudioEventsReturn { shouldReconnect: () => true, reconnectAttempts: 10, reconnectInterval: 3000, + share: true, // Share the WebSocket connection across multiple hooks onOpen: () => { console.log('[AudioEvents] WebSocket connected'); - subscriptionSent.current = false; + // Reset global state on new connection + globalSubscriptionState.isSubscribed = false; + globalSubscriptionState.connectionId = Math.random().toString(36); }, onClose: () => { console.log('[AudioEvents] WebSocket disconnected'); - subscriptionSent.current = false; - setIsSubscribed(false); + // Reset global state on disconnect + globalSubscriptionState.isSubscribed = false; + globalSubscriptionState.subscriberCount = 0; + globalSubscriptionState.connectionId = null; }, onError: (event) => { console.error('[AudioEvents] WebSocket error:', event); @@ -104,18 +116,66 @@ export function useAudioEvents(): UseAudioEventsReturn { // Subscribe to audio events const subscribe = useCallback(() => { - if (readyState === ReadyState.OPEN && !subscriptionSent.current) { - const subscribeMessage = { - type: 'subscribe-audio-events', - data: {} - }; - - sendMessage(JSON.stringify(subscribeMessage)); - subscriptionSent.current = true; - setIsSubscribed(true); - console.log('[AudioEvents] Subscribed to audio events'); + if (readyState === ReadyState.OPEN && !globalSubscriptionState.isSubscribed) { + // Clear any pending subscription timeout + if (subscriptionTimeoutRef.current) { + clearTimeout(subscriptionTimeoutRef.current); + subscriptionTimeoutRef.current = null; + } + + // Add a small delay to prevent rapid subscription attempts + subscriptionTimeoutRef.current = setTimeout(() => { + if (readyState === ReadyState.OPEN && !globalSubscriptionState.isSubscribed) { + const subscribeMessage = { + type: 'subscribe-audio-events', + data: {} + }; + + sendMessage(JSON.stringify(subscribeMessage)); + globalSubscriptionState.isSubscribed = true; + console.log('[AudioEvents] Subscribed to audio events'); + } + }, 100); // 100ms delay to debounce subscription attempts } - }, [readyState, sendMessage]); + + // Track local subscription regardless of global state + if (!isLocallySubscribed) { + globalSubscriptionState.subscriberCount++; + setIsLocallySubscribed(true); + } + }, [readyState, sendMessage, isLocallySubscribed]); + + // Unsubscribe from audio events + const unsubscribe = useCallback(() => { + // Clear any pending subscription timeout + if (subscriptionTimeoutRef.current) { + clearTimeout(subscriptionTimeoutRef.current); + subscriptionTimeoutRef.current = null; + } + + if (isLocallySubscribed) { + globalSubscriptionState.subscriberCount--; + setIsLocallySubscribed(false); + + // Only send unsubscribe message if this is the last subscriber and connection is still open + if (globalSubscriptionState.subscriberCount <= 0 && + readyState === ReadyState.OPEN && + globalSubscriptionState.isSubscribed) { + + const unsubscribeMessage = { + type: 'unsubscribe-audio-events', + data: {} + }; + + sendMessage(JSON.stringify(unsubscribeMessage)); + globalSubscriptionState.isSubscribed = false; + globalSubscriptionState.subscriberCount = 0; + console.log('[AudioEvents] Sent unsubscribe message to backend'); + } + } + + console.log('[AudioEvents] Component unsubscribed from audio events'); + }, [readyState, isLocallySubscribed, sendMessage]); // Handle incoming messages useEffect(() => { @@ -150,7 +210,7 @@ export function useAudioEvents(): UseAudioEventsReturn { case 'microphone-metrics-update': { const micMetricsData = audioEvent.data as MicrophoneMetricsData; - setMicrophoneMetrics(micMetricsData); + setMicrophoneMetricsData(micMetricsData); break; } @@ -170,22 +230,42 @@ export function useAudioEvents(): UseAudioEventsReturn { // Auto-subscribe when connected useEffect(() => { - if (readyState === ReadyState.OPEN && !subscriptionSent.current) { + if (readyState === ReadyState.OPEN) { subscribe(); } - }, [readyState, subscribe]); + + // Cleanup subscription on component unmount or connection change + return () => { + if (subscriptionTimeoutRef.current) { + clearTimeout(subscriptionTimeoutRef.current); + subscriptionTimeoutRef.current = null; + } + unsubscribe(); + }; + }, [readyState, subscribe, unsubscribe]); - // Unsubscribe from audio events (connection will be cleaned up automatically) - const unsubscribe = useCallback(() => { - setIsSubscribed(false); - subscriptionSent.current = false; - console.log('[AudioEvents] Unsubscribed from audio events'); - }, []); + // Reset local subscription state on disconnect + useEffect(() => { + if (readyState === ReadyState.CLOSED || readyState === ReadyState.CLOSING) { + setIsLocallySubscribed(false); + if (subscriptionTimeoutRef.current) { + clearTimeout(subscriptionTimeoutRef.current); + subscriptionTimeoutRef.current = null; + } + } + }, [readyState]); + + // Cleanup on component unmount + useEffect(() => { + return () => { + unsubscribe(); + }; + }, [unsubscribe]); return { // Connection state connectionState: readyState, - isConnected: readyState === ReadyState.OPEN && isSubscribed, + isConnected: readyState === ReadyState.OPEN && globalSubscriptionState.isSubscribed, // Audio state audioMuted, @@ -193,7 +273,7 @@ export function useAudioEvents(): UseAudioEventsReturn { // Microphone state microphoneState, - microphoneMetrics, + microphoneMetrics: microphoneMetrics, // Manual subscription control subscribe, diff --git a/ui/src/hooks/useAudioLevel.ts b/ui/src/hooks/useAudioLevel.ts index 5b16623..091f963 100644 --- a/ui/src/hooks/useAudioLevel.ts +++ b/ui/src/hooks/useAudioLevel.ts @@ -5,20 +5,31 @@ interface AudioLevelHookResult { isAnalyzing: boolean; } -export const useAudioLevel = (stream: MediaStream | null): AudioLevelHookResult => { +interface AudioLevelOptions { + enabled?: boolean; // Allow external control of analysis + updateInterval?: number; // Throttle updates (default: 100ms for 10fps instead of 60fps) +} + +export const useAudioLevel = ( + stream: MediaStream | null, + options: AudioLevelOptions = {} +): AudioLevelHookResult => { + const { enabled = true, updateInterval = 100 } = options; + const [audioLevel, setAudioLevel] = useState(0); const [isAnalyzing, setIsAnalyzing] = useState(false); const audioContextRef = useRef(null); const analyserRef = useRef(null); const sourceRef = useRef(null); - const animationFrameRef = useRef(null); + const intervalRef = useRef(null); + const lastUpdateTimeRef = useRef(0); useEffect(() => { - if (!stream) { - // Clean up when stream is null - if (animationFrameRef.current) { - cancelAnimationFrame(animationFrameRef.current); - animationFrameRef.current = null; + if (!stream || !enabled) { + // Clean up when stream is null or disabled + if (intervalRef.current !== null) { + clearInterval(intervalRef.current); + intervalRef.current = null; } if (sourceRef.current) { sourceRef.current.disconnect(); @@ -47,8 +58,8 @@ export const useAudioLevel = (stream: MediaStream | null): AudioLevelHookResult const analyser = audioContext.createAnalyser(); const source = audioContext.createMediaStreamSource(stream); - // Configure analyser - analyser.fftSize = 256; + // Configure analyser - use smaller FFT for better performance + analyser.fftSize = 128; // Reduced from 256 for better performance analyser.smoothingTimeConstant = 0.8; // Connect nodes @@ -64,24 +75,34 @@ export const useAudioLevel = (stream: MediaStream | null): AudioLevelHookResult const updateLevel = () => { if (!analyserRef.current) return; + const now = performance.now(); + + // Throttle updates to reduce CPU usage + if (now - lastUpdateTimeRef.current < updateInterval) { + return; + } + lastUpdateTimeRef.current = now; + analyserRef.current.getByteFrequencyData(dataArray); - // Calculate RMS (Root Mean Square) for more accurate level representation + // Optimized RMS calculation - process only relevant frequency bands let sum = 0; - for (const value of dataArray) { + const relevantBins = Math.min(dataArray.length, 32); // Focus on lower frequencies for voice + for (let i = 0; i < relevantBins; i++) { + const value = dataArray[i]; sum += value * value; } - const rms = Math.sqrt(sum / dataArray.length); + const rms = Math.sqrt(sum / relevantBins); - // Convert to percentage (0-100) - const level = Math.min(100, (rms / 255) * 100); - setAudioLevel(level); - - animationFrameRef.current = requestAnimationFrame(updateLevel); + // Convert to percentage (0-100) with better scaling + const level = Math.min(100, Math.max(0, (rms / 180) * 100)); // Adjusted scaling for better sensitivity + setAudioLevel(Math.round(level)); }; setIsAnalyzing(true); - updateLevel(); + + // Use setInterval instead of requestAnimationFrame for more predictable timing + intervalRef.current = window.setInterval(updateLevel, updateInterval); } catch (error) { console.error('Failed to create audio level analyzer:', error); @@ -91,9 +112,9 @@ export const useAudioLevel = (stream: MediaStream | null): AudioLevelHookResult // Cleanup function return () => { - if (animationFrameRef.current) { - cancelAnimationFrame(animationFrameRef.current); - animationFrameRef.current = null; + if (intervalRef.current !== null) { + clearInterval(intervalRef.current); + intervalRef.current = null; } if (sourceRef.current) { sourceRef.current.disconnect(); @@ -107,7 +128,7 @@ export const useAudioLevel = (stream: MediaStream | null): AudioLevelHookResult setIsAnalyzing(false); setAudioLevel(0); }; - }, [stream]); + }, [stream, enabled, updateInterval]); return { audioLevel, isAnalyzing }; }; \ No newline at end of file diff --git a/ui/src/hooks/useMicrophone.ts b/ui/src/hooks/useMicrophone.ts index 53cb444..164ecda 100644 --- a/ui/src/hooks/useMicrophone.ts +++ b/ui/src/hooks/useMicrophone.ts @@ -28,6 +28,33 @@ export function useMicrophone() { const [isStopping, setIsStopping] = useState(false); const [isToggling, setIsToggling] = useState(false); + // Add debouncing refs to prevent rapid operations + const lastOperationRef = useRef(0); + const operationTimeoutRef = useRef(null); + const OPERATION_DEBOUNCE_MS = 1000; // 1 second debounce + + // Debounced operation wrapper + const debouncedOperation = useCallback((operation: () => Promise, operationType: string) => { + const now = Date.now(); + const timeSinceLastOp = now - lastOperationRef.current; + + if (timeSinceLastOp < OPERATION_DEBOUNCE_MS) { + console.log(`Debouncing ${operationType} operation - too soon (${timeSinceLastOp}ms since last)`); + return; + } + + // Clear any pending operation + if (operationTimeoutRef.current) { + clearTimeout(operationTimeoutRef.current); + operationTimeoutRef.current = null; + } + + lastOperationRef.current = now; + operation().catch(error => { + console.error(`Debounced ${operationType} operation failed:`, error); + }); + }, []); + // Cleanup function to stop microphone stream const stopMicrophoneStream = useCallback(async () => { console.log("stopMicrophoneStream called - cleaning up stream"); @@ -830,6 +857,14 @@ export function useMicrophone() { }, [microphoneSender, peerConnection]); + const startMicrophoneDebounced = useCallback((deviceId?: string) => { + debouncedOperation(() => startMicrophone(deviceId).then(() => {}), "start"); + }, [startMicrophone, debouncedOperation]); + + const stopMicrophoneDebounced = useCallback(() => { + debouncedOperation(() => stopMicrophone().then(() => {}), "stop"); + }, [stopMicrophone, debouncedOperation]); + // Make debug functions available globally for console access useEffect(() => { (window as Window & { @@ -912,10 +947,12 @@ export function useMicrophone() { startMicrophone, stopMicrophone, toggleMicrophoneMute, - syncMicrophoneState, debugMicrophoneState, - resetBackendMicrophoneState, - // Loading states + // Expose debounced variants for UI handlers + startMicrophoneDebounced, + stopMicrophoneDebounced, + // Expose sync and loading flags for consumers that expect them + syncMicrophoneState, isStarting, isStopping, isToggling, diff --git a/web.go b/web.go index c0541aa..eb1eab5 100644 --- a/web.go +++ b/web.go @@ -283,6 +283,30 @@ func setupRouter() *gin.Engine { return } + // Server-side cooldown to prevent rapid start/stop thrashing + { + cs := currentSession + cs.micOpMu.Lock() + now := time.Now() + if cs.micCooldown == 0 { + cs.micCooldown = 200 * time.Millisecond + } + since := now.Sub(cs.lastMicOp) + if since < cs.micCooldown { + remaining := cs.micCooldown - since + running := cs.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() + cs.micOpMu.Unlock() + c.JSON(200, gin.H{ + "status": "cooldown", + "running": running, + "cooldown_ms_remaining": remaining.Milliseconds(), + }) + return + } + cs.lastMicOp = now + cs.micOpMu.Unlock() + } + // Check if already running before attempting to start if currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() { c.JSON(200, gin.H{ @@ -332,6 +356,30 @@ func setupRouter() *gin.Engine { return } + // Server-side cooldown to prevent rapid start/stop thrashing + { + cs := currentSession + cs.micOpMu.Lock() + now := time.Now() + if cs.micCooldown == 0 { + cs.micCooldown = 200 * time.Millisecond + } + since := now.Sub(cs.lastMicOp) + if since < cs.micCooldown { + remaining := cs.micCooldown - since + running := cs.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() + cs.micOpMu.Unlock() + c.JSON(200, gin.H{ + "status": "cooldown", + "running": running, + "cooldown_ms_remaining": remaining.Milliseconds(), + }) + return + } + cs.lastMicOp = now + cs.micOpMu.Unlock() + } + // Check if already stopped before attempting to stop if !currentSession.AudioInputManager.IsRunning() && !audio.IsNonBlockingAudioInputRunning() { c.JSON(200, gin.H{ @@ -343,8 +391,8 @@ func setupRouter() *gin.Engine { currentSession.AudioInputManager.Stop() - // Also stop the non-blocking audio input specifically - audio.StopNonBlockingAudioInput() + // AudioInputManager.Stop() already coordinates a clean stop via StopNonBlockingAudioInput() + // so we don't need to call it again here // Broadcast microphone state change via WebSocket broadcaster := audio.GetAudioEventBroadcaster() @@ -735,6 +783,10 @@ func handleWebRTCSignalWsMessages( l.Info().Msg("client subscribing to audio events") broadcaster := audio.GetAudioEventBroadcaster() broadcaster.Subscribe(connectionID, wsCon, runCtx, &l) + } else if message.Type == "unsubscribe-audio-events" { + l.Info().Msg("client unsubscribing from audio events") + broadcaster := audio.GetAudioEventBroadcaster() + broadcaster.Unsubscribe(connectionID) } } } diff --git a/webrtc.go b/webrtc.go index a67460a..a8c9360 100644 --- a/webrtc.go +++ b/webrtc.go @@ -7,6 +7,8 @@ import ( "net" "runtime" "strings" + "sync" + "time" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" @@ -27,6 +29,11 @@ type Session struct { DiskChannel *webrtc.DataChannel AudioInputManager *audio.AudioInputManager shouldUmountVirtualMedia bool + + // Microphone operation cooldown to mitigate rapid start/stop races + micOpMu sync.Mutex + lastMicOp time.Time + micCooldown time.Duration } type SessionConfig struct {