diff --git a/audio.go b/audio.go index ce88f0d4..b3bb0f2f 100644 --- a/audio.go +++ b/audio.go @@ -250,40 +250,66 @@ func setPendingInputTrack(track *webrtc.TrackRemote) { } // SetAudioOutputEnabled enables or disables audio output capture. -// Returns immediately; when enabling, audio starts asynchronously to prevent UI blocking. -// Check logs for async operation status. +// When enabling, blocks up to 5 seconds waiting for audio to start. +// Returns error if audio fails to start within timeout. func SetAudioOutputEnabled(enabled bool) error { if audioOutputEnabled.Swap(enabled) == enabled { return nil } if enabled && activeConnections.Load() > 0 { + // Start audio synchronously with timeout to provide immediate feedback + done := make(chan error, 1) go func() { - if err := startAudio(); err != nil { - audioLogger.Error().Err(err).Msg("Failed to start output audio after enable") - } + done <- startAudio() }() - return nil + + select { + case err := <-done: + if err != nil { + audioLogger.Error().Err(err).Msg("Failed to start output audio after enable") + audioOutputEnabled.Store(false) // Revert state on failure + return fmt.Errorf("failed to start audio output: %w", err) + } + return nil + case <-time.After(5 * time.Second): + audioLogger.Error().Msg("Audio output start timed out after 5 seconds") + audioOutputEnabled.Store(false) // Revert state on timeout + return fmt.Errorf("audio output start timed out after 5 seconds") + } } stopOutputAudio() return nil } // SetAudioInputEnabled enables or disables audio input playback. -// Returns immediately; when enabling, audio starts asynchronously to prevent UI blocking. -// Check logs for async operation status. +// When enabling, blocks up to 5 seconds waiting for audio to start. +// Returns error if audio fails to start within timeout. func SetAudioInputEnabled(enabled bool) error { if audioInputEnabled.Swap(enabled) == enabled { return nil } if enabled && activeConnections.Load() > 0 { + // Start audio synchronously with timeout to provide immediate feedback + done := make(chan error, 1) go func() { - if err := startAudio(); err != nil { - audioLogger.Error().Err(err).Msg("Failed to start input audio after enable") - } + done <- startAudio() }() - return nil + + select { + case err := <-done: + if err != nil { + audioLogger.Error().Err(err).Msg("Failed to start input audio after enable") + audioInputEnabled.Store(false) // Revert state on failure + return fmt.Errorf("failed to start audio input: %w", err) + } + return nil + case <-time.After(5 * time.Second): + audioLogger.Error().Msg("Audio input start timed out after 5 seconds") + audioInputEnabled.Store(false) // Revert state on timeout + return fmt.Errorf("audio input start timed out after 5 seconds") + } } stopInputAudio() return nil @@ -309,20 +335,31 @@ func SetAudioOutputSource(source string) error { return err } - // Handle audio restart asynchronously + // Stop audio immediately (synchronous to release hardware) + stopOutputAudio() + + // Restart audio with timeout + done := make(chan error, 1) go func() { - stopOutputAudio() - if err := startAudio(); err != nil { - audioLogger.Error().Err(err).Str("source", source).Msg("Failed to start audio output after source change") - } + done <- startAudio() }() - return nil + select { + case err := <-done: + if err != nil { + audioLogger.Error().Err(err).Str("source", source).Msg("Failed to start audio after source change") + return fmt.Errorf("failed to start audio after source change: %w", err) + } + return nil + case <-time.After(5 * time.Second): + audioLogger.Error().Str("source", source).Msg("Audio restart timed out after source change") + return fmt.Errorf("audio restart timed out after 5 seconds") + } } // RestartAudioOutput stops and restarts the audio output capture. -// Returns immediately; restart happens asynchronously to prevent UI blocking. -// Check logs for async operation status. +// Blocks up to 5 seconds waiting for audio to restart. +// Returns error if restart fails or times out. func RestartAudioOutput() error { audioMutex.Lock() hasActiveOutput := audioOutputEnabled.Load() && currentAudioTrack != nil && outputSource.Load() != nil @@ -334,12 +371,24 @@ func RestartAudioOutput() error { audioLogger.Info().Msg("Restarting audio output") stopOutputAudio() + + // Restart with timeout + done := make(chan error, 1) go func() { - if err := startAudio(); err != nil { - audioLogger.Error().Err(err).Msg("Failed to restart audio output") - } + done <- startAudio() }() - return nil + + select { + case err := <-done: + if err != nil { + audioLogger.Error().Err(err).Msg("Failed to restart audio output") + return fmt.Errorf("failed to restart audio output: %w", err) + } + return nil + case <-time.After(5 * time.Second): + audioLogger.Error().Msg("Audio output restart timed out") + return fmt.Errorf("audio output restart timed out after 5 seconds") + } } func handleInputTrackForSession(track *webrtc.TrackRemote) { diff --git a/internal/audio/c/audio.c b/internal/audio/c/audio.c index ca7b069b..1607c7a0 100644 --- a/internal/audio/c/audio.c +++ b/internal/audio/c/audio.c @@ -227,8 +227,18 @@ static unsigned int get_hdmi_audio_sample_rate(void) { // TC358743 is a V4L2 subdevice at /dev/v4l-subdev2 int fd = open("/dev/v4l-subdev2", O_RDWR); if (fd < 0) { - fprintf(stderr, "WARNING: Could not open /dev/v4l-subdev2 to query HDMI audio sample rate: %s\n", strerror(errno)); + // Distinguish between different failure modes for better diagnostics + if (errno == ENOENT) { + fprintf(stdout, "INFO: TC358743 device not found (USB audio mode or device not present)\n"); + } else if (errno == EACCES || errno == EPERM) { + fprintf(stderr, "ERROR: Permission denied accessing TC358743 (/dev/v4l-subdev2)\n"); + fprintf(stderr, " Check device permissions or run with appropriate privileges\n"); + } else { + fprintf(stderr, "WARNING: Could not open /dev/v4l-subdev2: %s (errno=%d)\n", strerror(errno), errno); + fprintf(stderr, " HDMI audio sample rate detection unavailable, will use 48kHz default\n"); + } fflush(stderr); + fflush(stdout); return 0; } @@ -242,7 +252,14 @@ static unsigned int get_hdmi_audio_sample_rate(void) { ext_ctrls.controls = &ext_ctrl; if (ioctl(fd, VIDIOC_G_EXT_CTRLS, &ext_ctrls) == -1) { - fprintf(stderr, "WARNING: Could not query TC358743 audio sample rate control: %s (errno=%d)\n", strerror(errno), errno); + // Provide specific error messages based on errno + if (errno == EINVAL) { + fprintf(stderr, "ERROR: TC358743 sample rate control not supported (driver version mismatch?)\n"); + fprintf(stderr, " Ensure kernel driver supports audio_sampling_rate control\n"); + } else { + fprintf(stderr, "WARNING: TC358743 ioctl failed: %s (errno=%d)\n", strerror(errno), errno); + fprintf(stderr, " Will use 48kHz default sample rate\n"); + } fflush(stderr); close(fd); return 0; @@ -251,13 +268,19 @@ static unsigned int get_hdmi_audio_sample_rate(void) { close(fd); unsigned int detected_rate = (unsigned int)ext_ctrl.value; - fprintf(stdout, "DEBUG: TC358743 control read returned: %u Hz (error_idx=%u)\n", detected_rate, ext_ctrls.error_idx); - fflush(stdout); if (detected_rate == 0) { - fprintf(stdout, "INFO: TC358743 reports 0 Hz (no signal or rate not detected yet)\n"); + fprintf(stdout, "INFO: TC358743 reports 0 Hz (no HDMI signal or audio not detected yet)\n"); + fprintf(stdout, " Will use 48kHz default and resample if needed when signal detected\n"); fflush(stdout); - return 0; // No signal or rate not detected + return 0; // No signal or rate not detected - this is expected during hotplug + } + + // Validate detected rate is reasonable + if (detected_rate < 8000 || detected_rate > 192000) { + fprintf(stderr, "WARNING: TC358743 reported unusual sample rate: %u Hz (expected 32k-192k)\n", detected_rate); + fprintf(stderr, " Using detected rate anyway, but audio may not work correctly\n"); + fflush(stderr); } fprintf(stdout, "INFO: TC358743 detected HDMI audio sample rate: %u Hz\n", detected_rate); @@ -335,62 +358,59 @@ static inline void swap_stereo_channels(int16_t *buffer, uint16_t num_frames) { * @param handle Pointer to PCM handle to use for recovery operations * @param valid_handle Pointer to the valid handle to check against (for race detection) * @param stop_flag Pointer to atomic stop flag - * @param mutex Mutex to unlock on error * @param pcm_rc Error code from ALSA I/O operation * @param recovery_attempts Pointer to uint8_t recovery attempt counter * @param sleep_ms Milliseconds to sleep during recovery * @param max_attempts Maximum recovery attempts allowed - * @return Three possible outcomes: - * 1 = Retry operation (error was recovered, mutex still held by caller) - * 0 = Skip this frame and continue (mutex ALREADY UNLOCKED by this function) - * -1 = Fatal error, abort operation (mutex ALREADY UNLOCKED by this function) + * @return Return codes: + * 1 = Retry operation (error was recovered) + * 0 = Skip this frame and continue + * -1 = Fatal error, abort operation * - * CRITICAL: On return values 0 and -1, the mutex has already been unlocked. - * Only return value 1 requires the caller to maintain mutex ownership. + * IMPORTANT: This function NEVER unlocks the mutex. The caller is always + * responsible for unlocking after checking the return value. This ensures + * consistent mutex ownership semantics. */ static int handle_alsa_error(snd_pcm_t *handle, snd_pcm_t **valid_handle, - atomic_int *stop_flag, pthread_mutex_t *mutex, + atomic_int *stop_flag, int pcm_rc, uint8_t *recovery_attempts, uint32_t sleep_ms, uint8_t max_attempts) { int err; if (pcm_rc == -EPIPE) { + // Buffer underrun/overrun (*recovery_attempts)++; if (*recovery_attempts > max_attempts || handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } err = snd_pcm_prepare(handle); if (err < 0) { if (handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } snd_pcm_drop(handle); err = snd_pcm_prepare(handle); if (err < 0 || handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } } - return 1; + return 1; // Retry } else if (pcm_rc == -EAGAIN) { + // Resource temporarily unavailable if (handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } snd_pcm_wait(handle, sleep_ms); - return 1; + return 1; // Retry } else if (pcm_rc == -ESTRPIPE) { + // Suspended, need to resume (*recovery_attempts)++; if (*recovery_attempts > max_attempts || handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } uint8_t resume_attempts = 0; while ((err = snd_pcm_resume(handle)) == -EAGAIN && resume_attempts < 10) { if (*stop_flag || handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } snd_pcm_wait(handle, sleep_ms); @@ -398,44 +418,40 @@ static int handle_alsa_error(snd_pcm_t *handle, snd_pcm_t **valid_handle, } if (err < 0) { if (handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } err = snd_pcm_prepare(handle); if (err < 0 || handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } } - pthread_mutex_unlock(mutex); - return 0; + return 0; // Skip frame after suspend recovery } else if (pcm_rc == -ENODEV) { - pthread_mutex_unlock(mutex); + // Device was removed return -1; } else if (pcm_rc == -EIO) { + // I/O error (*recovery_attempts)++; if (*recovery_attempts <= max_attempts && handle == *valid_handle) { snd_pcm_drop(handle); if (handle != *valid_handle) { - pthread_mutex_unlock(mutex); return -1; } err = snd_pcm_prepare(handle); if (err >= 0 && handle == *valid_handle) { - return 1; + return 1; // Retry } } - pthread_mutex_unlock(mutex); return -1; } else { + // Other errors (*recovery_attempts)++; if (*recovery_attempts <= 1 && pcm_rc == -EINTR) { - return 1; + return 1; // Retry on first interrupt } else if (*recovery_attempts <= 1 && pcm_rc == -EBUSY && handle == *valid_handle) { snd_pcm_wait(handle, 1); - return 1; + return 1; // Retry on first busy } - pthread_mutex_unlock(mutex); return -1; } } @@ -714,25 +730,53 @@ int jetkvm_audio_capture_init() { return ERR_CODEC_INIT_FAILED; } + // Critical settings that must succeed for WebRTC compliance + #define OPUS_CTL_CRITICAL(call, desc) do { \ + int _err = call; \ + if (_err != OPUS_OK) { \ + fprintf(stderr, "ERROR: capture: Failed to set " desc ": %s\n", opus_strerror(_err)); \ + fflush(stderr); \ + opus_encoder_destroy(encoder); \ + encoder = NULL; \ + if (capture_resampler) { \ + speex_resampler_destroy(capture_resampler); \ + capture_resampler = NULL; \ + } \ + snd_pcm_t *handle = pcm_capture_handle; \ + pcm_capture_handle = NULL; \ + if (handle) { \ + snd_pcm_close(handle); \ + } \ + atomic_store(&capture_stop_requested, 0); \ + capture_initializing = 0; \ + return ERR_CODEC_INIT_FAILED; \ + } \ + } while(0) + + // Non-critical settings that can fail without breaking functionality #define OPUS_CTL_WARN(call, desc) do { \ int _err = call; \ if (_err != OPUS_OK) { \ - fprintf(stderr, "WARN: capture: Failed to set " desc ": %s\n", opus_strerror(_err)); \ + fprintf(stderr, "WARN: capture: Failed to set " desc ": %s (non-critical, continuing)\n", opus_strerror(_err)); \ fflush(stderr); \ } \ } while(0) - OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)), "bitrate"); + // Critical: Bitrate, VBR mode, FEC are required for proper WebRTC operation + OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)), "bitrate"); + OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_VBR(OPUS_VBR)), "VBR mode"); + OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(OPUS_VBR_CONSTRAINT)), "VBR constraint"); + OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_INBAND_FEC(opus_fec_enabled)), "FEC"); + + // Non-critical: These optimize quality/performance but aren't required OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)), "complexity"); - OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_VBR(OPUS_VBR)), "VBR mode"); - OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(OPUS_VBR_CONSTRAINT)), "VBR constraint"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(OPUS_SIGNAL_TYPE)), "signal type"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(OPUS_BANDWIDTH)), "bandwidth"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx_enabled)), "DTX"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_LSB_DEPTH(OPUS_LSB_DEPTH)), "LSB depth"); - OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_INBAND_FEC(opus_fec_enabled)), "FEC"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(opus_packet_loss_perc)), "packet loss percentage"); + #undef OPUS_CTL_CRITICAL #undef OPUS_CTL_WARN capture_initialized = 1; @@ -790,14 +834,15 @@ retry_read: if (__builtin_expect(pcm_rc < 0, 0)) { int err_result = handle_alsa_error(handle, &pcm_capture_handle, &capture_stop_requested, - &capture_mutex, pcm_rc, &recovery_attempts, + pcm_rc, &recovery_attempts, sleep_milliseconds, max_recovery_attempts); if (err_result == 1) { + // Recovery successful, retry (mutex still held) goto retry_read; - } else if (err_result == 0) { - return 0; } else { - return -1; + // Fatal error or skip frame (err_result == -1 or 0) + pthread_mutex_unlock(&capture_mutex); + return (err_result == 0) ? 0 : -1; } } @@ -1008,14 +1053,15 @@ retry_write: } if (__builtin_expect(pcm_rc < 0, 0)) { int err_result = handle_alsa_error(handle, &pcm_playback_handle, &playback_stop_requested, - &playback_mutex, pcm_rc, &recovery_attempts, + pcm_rc, &recovery_attempts, sleep_milliseconds, max_recovery_attempts); if (err_result == 1) { + // Recovery successful, retry (mutex still held) goto retry_write; - } else if (err_result == 0) { - return 0; } else { - return -2; + // Fatal error or skip frame (err_result == -1 or 0) + pthread_mutex_unlock(&playback_mutex); + return (err_result == 0) ? 0 : -2; } } pthread_mutex_unlock(&playback_mutex); diff --git a/internal/audio/cgo_source.go b/internal/audio/cgo_source.go index 8cf0500c..60a7ac7f 100644 --- a/internal/audio/cgo_source.go +++ b/internal/audio/cgo_source.go @@ -187,18 +187,20 @@ func (c *CgoSource) IsConnected() bool { func (c *CgoSource) ReadMessage() (uint8, []byte, error) { c.mu.Lock() + defer c.mu.Unlock() + if !c.connected { - c.mu.Unlock() return 0, nil, fmt.Errorf("not connected") } if !c.outputDevice { - c.mu.Unlock() return 0, nil, fmt.Errorf("ReadMessage only supported for output direction") } - c.mu.Unlock() - // Call C function without holding mutex to avoid deadlock - C layer has its own locking + // Hold mutex during C call to prevent race condition with Disconnect(). + // Lock order is consistent (c.mu -> capture_mutex) in all code paths, + // so this cannot deadlock. The C layer's capture_mutex protects ALSA/codec + // state, while c.mu protects the connection lifecycle. opusSize := C.jetkvm_audio_read_encode(unsafe.Pointer(&c.opusBuf[0])) if opusSize < 0 { return 0, nil, fmt.Errorf("jetkvm_audio_read_encode failed: %d", opusSize) @@ -217,16 +219,15 @@ func (c *CgoSource) ReadMessage() (uint8, []byte, error) { func (c *CgoSource) WriteMessage(msgType uint8, payload []byte) error { c.mu.Lock() + defer c.mu.Unlock() + if !c.connected { - c.mu.Unlock() return fmt.Errorf("not connected") } if c.outputDevice { - c.mu.Unlock() return fmt.Errorf("WriteMessage only supported for input direction") } - c.mu.Unlock() if msgType != ipcMsgTypeOpus { return nil @@ -240,7 +241,8 @@ func (c *CgoSource) WriteMessage(msgType uint8, payload []byte) error { return fmt.Errorf("opus packet too large: %d bytes (max 1500)", len(payload)) } - // Call C function without holding mutex to avoid deadlock - C layer has its own locking + // Hold mutex during C call to prevent race condition with Disconnect(). + // Lock order is consistent (c.mu -> playback_mutex) in all code paths. rc := C.jetkvm_audio_decode_write(unsafe.Pointer(&payload[0]), C.int(len(payload))) if rc < 0 { return fmt.Errorf("jetkvm_audio_decode_write failed: %d", rc)