Fix critical error handling and race conditions in audio system

Address 5 critical issues found in comprehensive code review:

1. Opus Encoder Configuration Failures (CRITICAL)
   - Split encoder settings into critical vs non-critical
   - Critical settings (bitrate, VBR, FEC) now fail initialization on error
   - Non-critical settings (complexity, DTX) log warnings but continue
   - Prevents silent audio quality degradation from misconfigured encoder

2. V4L2 Sample Rate Detection Error Reporting (CRITICAL)
   - Add specific error messages for different failure modes
   - Distinguish permission errors, device not found, and no signal
   - Validate detected sample rates are in reasonable range (8-192kHz)
   - Improves debuggability when HDMI audio detection fails

3. Mutex Handling in ALSA Error Recovery (CRITICAL)
   - Refactor handle_alsa_error() to NEVER unlock mutex internally
   - Caller now always responsible for unlocking after checking return
   - Eliminates complex mutex ownership semantics that caused deadlocks
   - Consistent lock/unlock patterns prevent double-unlock bugs

4. Async Audio Start Error Propagation (CRITICAL)
   - Make SetAudioOutputEnabled/SetAudioInputEnabled synchronous
   - Add 5-second timeout for audio initialization
   - Return errors to caller instead of only logging
   - Revert state on failure to maintain consistency
   - Users now get immediate feedback if audio fails to start

5. CgoSource Race Condition (CRITICAL)
   - Hold c.mu mutex during C function calls in ReadMessage/WriteMessage
   - Prevents use-after-free when Disconnect() called concurrently
   - Lock order (c.mu -> capture_mutex) is consistent, no deadlock risk
   - Fixes potential crash from accessing freed ALSA/codec resources

These changes eliminate silent failures, improve error visibility, and
prevent race conditions that could cause crashes or audio degradation.
This commit is contained in:
Alex P 2025-11-24 20:23:14 +02:00
parent 81ff87fb66
commit 0f8b368427
3 changed files with 175 additions and 78 deletions

View File

@ -250,40 +250,66 @@ func setPendingInputTrack(track *webrtc.TrackRemote) {
} }
// SetAudioOutputEnabled enables or disables audio output capture. // SetAudioOutputEnabled enables or disables audio output capture.
// Returns immediately; when enabling, audio starts asynchronously to prevent UI blocking. // When enabling, blocks up to 5 seconds waiting for audio to start.
// Check logs for async operation status. // Returns error if audio fails to start within timeout.
func SetAudioOutputEnabled(enabled bool) error { func SetAudioOutputEnabled(enabled bool) error {
if audioOutputEnabled.Swap(enabled) == enabled { if audioOutputEnabled.Swap(enabled) == enabled {
return nil return nil
} }
if enabled && activeConnections.Load() > 0 { if enabled && activeConnections.Load() > 0 {
// Start audio synchronously with timeout to provide immediate feedback
done := make(chan error, 1)
go func() { go func() {
if err := startAudio(); err != nil { done <- startAudio()
audioLogger.Error().Err(err).Msg("Failed to start output audio after enable")
}
}() }()
select {
case err := <-done:
if err != nil {
audioLogger.Error().Err(err).Msg("Failed to start output audio after enable")
audioOutputEnabled.Store(false) // Revert state on failure
return fmt.Errorf("failed to start audio output: %w", err)
}
return nil return nil
case <-time.After(5 * time.Second):
audioLogger.Error().Msg("Audio output start timed out after 5 seconds")
audioOutputEnabled.Store(false) // Revert state on timeout
return fmt.Errorf("audio output start timed out after 5 seconds")
}
} }
stopOutputAudio() stopOutputAudio()
return nil return nil
} }
// SetAudioInputEnabled enables or disables audio input playback. // SetAudioInputEnabled enables or disables audio input playback.
// Returns immediately; when enabling, audio starts asynchronously to prevent UI blocking. // When enabling, blocks up to 5 seconds waiting for audio to start.
// Check logs for async operation status. // Returns error if audio fails to start within timeout.
func SetAudioInputEnabled(enabled bool) error { func SetAudioInputEnabled(enabled bool) error {
if audioInputEnabled.Swap(enabled) == enabled { if audioInputEnabled.Swap(enabled) == enabled {
return nil return nil
} }
if enabled && activeConnections.Load() > 0 { if enabled && activeConnections.Load() > 0 {
// Start audio synchronously with timeout to provide immediate feedback
done := make(chan error, 1)
go func() { go func() {
if err := startAudio(); err != nil { done <- startAudio()
audioLogger.Error().Err(err).Msg("Failed to start input audio after enable")
}
}() }()
select {
case err := <-done:
if err != nil {
audioLogger.Error().Err(err).Msg("Failed to start input audio after enable")
audioInputEnabled.Store(false) // Revert state on failure
return fmt.Errorf("failed to start audio input: %w", err)
}
return nil return nil
case <-time.After(5 * time.Second):
audioLogger.Error().Msg("Audio input start timed out after 5 seconds")
audioInputEnabled.Store(false) // Revert state on timeout
return fmt.Errorf("audio input start timed out after 5 seconds")
}
} }
stopInputAudio() stopInputAudio()
return nil return nil
@ -309,20 +335,31 @@ func SetAudioOutputSource(source string) error {
return err return err
} }
// Handle audio restart asynchronously // Stop audio immediately (synchronous to release hardware)
go func() {
stopOutputAudio() stopOutputAudio()
if err := startAudio(); err != nil {
audioLogger.Error().Err(err).Str("source", source).Msg("Failed to start audio output after source change") // Restart audio with timeout
} done := make(chan error, 1)
go func() {
done <- startAudio()
}() }()
select {
case err := <-done:
if err != nil {
audioLogger.Error().Err(err).Str("source", source).Msg("Failed to start audio after source change")
return fmt.Errorf("failed to start audio after source change: %w", err)
}
return nil return nil
case <-time.After(5 * time.Second):
audioLogger.Error().Str("source", source).Msg("Audio restart timed out after source change")
return fmt.Errorf("audio restart timed out after 5 seconds")
}
} }
// RestartAudioOutput stops and restarts the audio output capture. // RestartAudioOutput stops and restarts the audio output capture.
// Returns immediately; restart happens asynchronously to prevent UI blocking. // Blocks up to 5 seconds waiting for audio to restart.
// Check logs for async operation status. // Returns error if restart fails or times out.
func RestartAudioOutput() error { func RestartAudioOutput() error {
audioMutex.Lock() audioMutex.Lock()
hasActiveOutput := audioOutputEnabled.Load() && currentAudioTrack != nil && outputSource.Load() != nil hasActiveOutput := audioOutputEnabled.Load() && currentAudioTrack != nil && outputSource.Load() != nil
@ -334,12 +371,24 @@ func RestartAudioOutput() error {
audioLogger.Info().Msg("Restarting audio output") audioLogger.Info().Msg("Restarting audio output")
stopOutputAudio() stopOutputAudio()
// Restart with timeout
done := make(chan error, 1)
go func() { go func() {
if err := startAudio(); err != nil { done <- startAudio()
audioLogger.Error().Err(err).Msg("Failed to restart audio output")
}
}() }()
select {
case err := <-done:
if err != nil {
audioLogger.Error().Err(err).Msg("Failed to restart audio output")
return fmt.Errorf("failed to restart audio output: %w", err)
}
return nil return nil
case <-time.After(5 * time.Second):
audioLogger.Error().Msg("Audio output restart timed out")
return fmt.Errorf("audio output restart timed out after 5 seconds")
}
} }
func handleInputTrackForSession(track *webrtc.TrackRemote) { func handleInputTrackForSession(track *webrtc.TrackRemote) {

View File

@ -227,8 +227,18 @@ static unsigned int get_hdmi_audio_sample_rate(void) {
// TC358743 is a V4L2 subdevice at /dev/v4l-subdev2 // TC358743 is a V4L2 subdevice at /dev/v4l-subdev2
int fd = open("/dev/v4l-subdev2", O_RDWR); int fd = open("/dev/v4l-subdev2", O_RDWR);
if (fd < 0) { if (fd < 0) {
fprintf(stderr, "WARNING: Could not open /dev/v4l-subdev2 to query HDMI audio sample rate: %s\n", strerror(errno)); // Distinguish between different failure modes for better diagnostics
if (errno == ENOENT) {
fprintf(stdout, "INFO: TC358743 device not found (USB audio mode or device not present)\n");
} else if (errno == EACCES || errno == EPERM) {
fprintf(stderr, "ERROR: Permission denied accessing TC358743 (/dev/v4l-subdev2)\n");
fprintf(stderr, " Check device permissions or run with appropriate privileges\n");
} else {
fprintf(stderr, "WARNING: Could not open /dev/v4l-subdev2: %s (errno=%d)\n", strerror(errno), errno);
fprintf(stderr, " HDMI audio sample rate detection unavailable, will use 48kHz default\n");
}
fflush(stderr); fflush(stderr);
fflush(stdout);
return 0; return 0;
} }
@ -242,7 +252,14 @@ static unsigned int get_hdmi_audio_sample_rate(void) {
ext_ctrls.controls = &ext_ctrl; ext_ctrls.controls = &ext_ctrl;
if (ioctl(fd, VIDIOC_G_EXT_CTRLS, &ext_ctrls) == -1) { if (ioctl(fd, VIDIOC_G_EXT_CTRLS, &ext_ctrls) == -1) {
fprintf(stderr, "WARNING: Could not query TC358743 audio sample rate control: %s (errno=%d)\n", strerror(errno), errno); // Provide specific error messages based on errno
if (errno == EINVAL) {
fprintf(stderr, "ERROR: TC358743 sample rate control not supported (driver version mismatch?)\n");
fprintf(stderr, " Ensure kernel driver supports audio_sampling_rate control\n");
} else {
fprintf(stderr, "WARNING: TC358743 ioctl failed: %s (errno=%d)\n", strerror(errno), errno);
fprintf(stderr, " Will use 48kHz default sample rate\n");
}
fflush(stderr); fflush(stderr);
close(fd); close(fd);
return 0; return 0;
@ -251,13 +268,19 @@ static unsigned int get_hdmi_audio_sample_rate(void) {
close(fd); close(fd);
unsigned int detected_rate = (unsigned int)ext_ctrl.value; unsigned int detected_rate = (unsigned int)ext_ctrl.value;
fprintf(stdout, "DEBUG: TC358743 control read returned: %u Hz (error_idx=%u)\n", detected_rate, ext_ctrls.error_idx);
fflush(stdout);
if (detected_rate == 0) { if (detected_rate == 0) {
fprintf(stdout, "INFO: TC358743 reports 0 Hz (no signal or rate not detected yet)\n"); fprintf(stdout, "INFO: TC358743 reports 0 Hz (no HDMI signal or audio not detected yet)\n");
fprintf(stdout, " Will use 48kHz default and resample if needed when signal detected\n");
fflush(stdout); fflush(stdout);
return 0; // No signal or rate not detected return 0; // No signal or rate not detected - this is expected during hotplug
}
// Validate detected rate is reasonable
if (detected_rate < 8000 || detected_rate > 192000) {
fprintf(stderr, "WARNING: TC358743 reported unusual sample rate: %u Hz (expected 32k-192k)\n", detected_rate);
fprintf(stderr, " Using detected rate anyway, but audio may not work correctly\n");
fflush(stderr);
} }
fprintf(stdout, "INFO: TC358743 detected HDMI audio sample rate: %u Hz\n", detected_rate); fprintf(stdout, "INFO: TC358743 detected HDMI audio sample rate: %u Hz\n", detected_rate);
@ -335,62 +358,59 @@ static inline void swap_stereo_channels(int16_t *buffer, uint16_t num_frames) {
* @param handle Pointer to PCM handle to use for recovery operations * @param handle Pointer to PCM handle to use for recovery operations
* @param valid_handle Pointer to the valid handle to check against (for race detection) * @param valid_handle Pointer to the valid handle to check against (for race detection)
* @param stop_flag Pointer to atomic stop flag * @param stop_flag Pointer to atomic stop flag
* @param mutex Mutex to unlock on error
* @param pcm_rc Error code from ALSA I/O operation * @param pcm_rc Error code from ALSA I/O operation
* @param recovery_attempts Pointer to uint8_t recovery attempt counter * @param recovery_attempts Pointer to uint8_t recovery attempt counter
* @param sleep_ms Milliseconds to sleep during recovery * @param sleep_ms Milliseconds to sleep during recovery
* @param max_attempts Maximum recovery attempts allowed * @param max_attempts Maximum recovery attempts allowed
* @return Three possible outcomes: * @return Return codes:
* 1 = Retry operation (error was recovered, mutex still held by caller) * 1 = Retry operation (error was recovered)
* 0 = Skip this frame and continue (mutex ALREADY UNLOCKED by this function) * 0 = Skip this frame and continue
* -1 = Fatal error, abort operation (mutex ALREADY UNLOCKED by this function) * -1 = Fatal error, abort operation
* *
* CRITICAL: On return values 0 and -1, the mutex has already been unlocked. * IMPORTANT: This function NEVER unlocks the mutex. The caller is always
* Only return value 1 requires the caller to maintain mutex ownership. * responsible for unlocking after checking the return value. This ensures
* consistent mutex ownership semantics.
*/ */
static int handle_alsa_error(snd_pcm_t *handle, snd_pcm_t **valid_handle, static int handle_alsa_error(snd_pcm_t *handle, snd_pcm_t **valid_handle,
atomic_int *stop_flag, pthread_mutex_t *mutex, atomic_int *stop_flag,
int pcm_rc, uint8_t *recovery_attempts, int pcm_rc, uint8_t *recovery_attempts,
uint32_t sleep_ms, uint8_t max_attempts) { uint32_t sleep_ms, uint8_t max_attempts) {
int err; int err;
if (pcm_rc == -EPIPE) { if (pcm_rc == -EPIPE) {
// Buffer underrun/overrun
(*recovery_attempts)++; (*recovery_attempts)++;
if (*recovery_attempts > max_attempts || handle != *valid_handle) { if (*recovery_attempts > max_attempts || handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
err = snd_pcm_prepare(handle); err = snd_pcm_prepare(handle);
if (err < 0) { if (err < 0) {
if (handle != *valid_handle) { if (handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
snd_pcm_drop(handle); snd_pcm_drop(handle);
err = snd_pcm_prepare(handle); err = snd_pcm_prepare(handle);
if (err < 0 || handle != *valid_handle) { if (err < 0 || handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
} }
return 1; return 1; // Retry
} else if (pcm_rc == -EAGAIN) { } else if (pcm_rc == -EAGAIN) {
// Resource temporarily unavailable
if (handle != *valid_handle) { if (handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
snd_pcm_wait(handle, sleep_ms); snd_pcm_wait(handle, sleep_ms);
return 1; return 1; // Retry
} else if (pcm_rc == -ESTRPIPE) { } else if (pcm_rc == -ESTRPIPE) {
// Suspended, need to resume
(*recovery_attempts)++; (*recovery_attempts)++;
if (*recovery_attempts > max_attempts || handle != *valid_handle) { if (*recovery_attempts > max_attempts || handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
uint8_t resume_attempts = 0; uint8_t resume_attempts = 0;
while ((err = snd_pcm_resume(handle)) == -EAGAIN && resume_attempts < 10) { while ((err = snd_pcm_resume(handle)) == -EAGAIN && resume_attempts < 10) {
if (*stop_flag || handle != *valid_handle) { if (*stop_flag || handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
snd_pcm_wait(handle, sleep_ms); snd_pcm_wait(handle, sleep_ms);
@ -398,44 +418,40 @@ static int handle_alsa_error(snd_pcm_t *handle, snd_pcm_t **valid_handle,
} }
if (err < 0) { if (err < 0) {
if (handle != *valid_handle) { if (handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
err = snd_pcm_prepare(handle); err = snd_pcm_prepare(handle);
if (err < 0 || handle != *valid_handle) { if (err < 0 || handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
} }
pthread_mutex_unlock(mutex); return 0; // Skip frame after suspend recovery
return 0;
} else if (pcm_rc == -ENODEV) { } else if (pcm_rc == -ENODEV) {
pthread_mutex_unlock(mutex); // Device was removed
return -1; return -1;
} else if (pcm_rc == -EIO) { } else if (pcm_rc == -EIO) {
// I/O error
(*recovery_attempts)++; (*recovery_attempts)++;
if (*recovery_attempts <= max_attempts && handle == *valid_handle) { if (*recovery_attempts <= max_attempts && handle == *valid_handle) {
snd_pcm_drop(handle); snd_pcm_drop(handle);
if (handle != *valid_handle) { if (handle != *valid_handle) {
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
err = snd_pcm_prepare(handle); err = snd_pcm_prepare(handle);
if (err >= 0 && handle == *valid_handle) { if (err >= 0 && handle == *valid_handle) {
return 1; return 1; // Retry
} }
} }
pthread_mutex_unlock(mutex);
return -1; return -1;
} else { } else {
// Other errors
(*recovery_attempts)++; (*recovery_attempts)++;
if (*recovery_attempts <= 1 && pcm_rc == -EINTR) { if (*recovery_attempts <= 1 && pcm_rc == -EINTR) {
return 1; return 1; // Retry on first interrupt
} else if (*recovery_attempts <= 1 && pcm_rc == -EBUSY && handle == *valid_handle) { } else if (*recovery_attempts <= 1 && pcm_rc == -EBUSY && handle == *valid_handle) {
snd_pcm_wait(handle, 1); snd_pcm_wait(handle, 1);
return 1; return 1; // Retry on first busy
} }
pthread_mutex_unlock(mutex);
return -1; return -1;
} }
} }
@ -714,25 +730,53 @@ int jetkvm_audio_capture_init() {
return ERR_CODEC_INIT_FAILED; return ERR_CODEC_INIT_FAILED;
} }
// Critical settings that must succeed for WebRTC compliance
#define OPUS_CTL_CRITICAL(call, desc) do { \
int _err = call; \
if (_err != OPUS_OK) { \
fprintf(stderr, "ERROR: capture: Failed to set " desc ": %s\n", opus_strerror(_err)); \
fflush(stderr); \
opus_encoder_destroy(encoder); \
encoder = NULL; \
if (capture_resampler) { \
speex_resampler_destroy(capture_resampler); \
capture_resampler = NULL; \
} \
snd_pcm_t *handle = pcm_capture_handle; \
pcm_capture_handle = NULL; \
if (handle) { \
snd_pcm_close(handle); \
} \
atomic_store(&capture_stop_requested, 0); \
capture_initializing = 0; \
return ERR_CODEC_INIT_FAILED; \
} \
} while(0)
// Non-critical settings that can fail without breaking functionality
#define OPUS_CTL_WARN(call, desc) do { \ #define OPUS_CTL_WARN(call, desc) do { \
int _err = call; \ int _err = call; \
if (_err != OPUS_OK) { \ if (_err != OPUS_OK) { \
fprintf(stderr, "WARN: capture: Failed to set " desc ": %s\n", opus_strerror(_err)); \ fprintf(stderr, "WARN: capture: Failed to set " desc ": %s (non-critical, continuing)\n", opus_strerror(_err)); \
fflush(stderr); \ fflush(stderr); \
} \ } \
} while(0) } while(0)
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)), "bitrate"); // Critical: Bitrate, VBR mode, FEC are required for proper WebRTC operation
OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)), "bitrate");
OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_VBR(OPUS_VBR)), "VBR mode");
OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(OPUS_VBR_CONSTRAINT)), "VBR constraint");
OPUS_CTL_CRITICAL(opus_encoder_ctl(encoder, OPUS_SET_INBAND_FEC(opus_fec_enabled)), "FEC");
// Non-critical: These optimize quality/performance but aren't required
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)), "complexity"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)), "complexity");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_VBR(OPUS_VBR)), "VBR mode");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(OPUS_VBR_CONSTRAINT)), "VBR constraint");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(OPUS_SIGNAL_TYPE)), "signal type"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(OPUS_SIGNAL_TYPE)), "signal type");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(OPUS_BANDWIDTH)), "bandwidth"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(OPUS_BANDWIDTH)), "bandwidth");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx_enabled)), "DTX"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx_enabled)), "DTX");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_LSB_DEPTH(OPUS_LSB_DEPTH)), "LSB depth"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_LSB_DEPTH(OPUS_LSB_DEPTH)), "LSB depth");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_INBAND_FEC(opus_fec_enabled)), "FEC");
OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(opus_packet_loss_perc)), "packet loss percentage"); OPUS_CTL_WARN(opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(opus_packet_loss_perc)), "packet loss percentage");
#undef OPUS_CTL_CRITICAL
#undef OPUS_CTL_WARN #undef OPUS_CTL_WARN
capture_initialized = 1; capture_initialized = 1;
@ -790,14 +834,15 @@ retry_read:
if (__builtin_expect(pcm_rc < 0, 0)) { if (__builtin_expect(pcm_rc < 0, 0)) {
int err_result = handle_alsa_error(handle, &pcm_capture_handle, &capture_stop_requested, int err_result = handle_alsa_error(handle, &pcm_capture_handle, &capture_stop_requested,
&capture_mutex, pcm_rc, &recovery_attempts, pcm_rc, &recovery_attempts,
sleep_milliseconds, max_recovery_attempts); sleep_milliseconds, max_recovery_attempts);
if (err_result == 1) { if (err_result == 1) {
// Recovery successful, retry (mutex still held)
goto retry_read; goto retry_read;
} else if (err_result == 0) {
return 0;
} else { } else {
return -1; // Fatal error or skip frame (err_result == -1 or 0)
pthread_mutex_unlock(&capture_mutex);
return (err_result == 0) ? 0 : -1;
} }
} }
@ -1008,14 +1053,15 @@ retry_write:
} }
if (__builtin_expect(pcm_rc < 0, 0)) { if (__builtin_expect(pcm_rc < 0, 0)) {
int err_result = handle_alsa_error(handle, &pcm_playback_handle, &playback_stop_requested, int err_result = handle_alsa_error(handle, &pcm_playback_handle, &playback_stop_requested,
&playback_mutex, pcm_rc, &recovery_attempts, pcm_rc, &recovery_attempts,
sleep_milliseconds, max_recovery_attempts); sleep_milliseconds, max_recovery_attempts);
if (err_result == 1) { if (err_result == 1) {
// Recovery successful, retry (mutex still held)
goto retry_write; goto retry_write;
} else if (err_result == 0) {
return 0;
} else { } else {
return -2; // Fatal error or skip frame (err_result == -1 or 0)
pthread_mutex_unlock(&playback_mutex);
return (err_result == 0) ? 0 : -2;
} }
} }
pthread_mutex_unlock(&playback_mutex); pthread_mutex_unlock(&playback_mutex);

View File

@ -187,18 +187,20 @@ func (c *CgoSource) IsConnected() bool {
func (c *CgoSource) ReadMessage() (uint8, []byte, error) { func (c *CgoSource) ReadMessage() (uint8, []byte, error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock()
if !c.connected { if !c.connected {
c.mu.Unlock()
return 0, nil, fmt.Errorf("not connected") return 0, nil, fmt.Errorf("not connected")
} }
if !c.outputDevice { if !c.outputDevice {
c.mu.Unlock()
return 0, nil, fmt.Errorf("ReadMessage only supported for output direction") return 0, nil, fmt.Errorf("ReadMessage only supported for output direction")
} }
c.mu.Unlock()
// Call C function without holding mutex to avoid deadlock - C layer has its own locking // Hold mutex during C call to prevent race condition with Disconnect().
// Lock order is consistent (c.mu -> capture_mutex) in all code paths,
// so this cannot deadlock. The C layer's capture_mutex protects ALSA/codec
// state, while c.mu protects the connection lifecycle.
opusSize := C.jetkvm_audio_read_encode(unsafe.Pointer(&c.opusBuf[0])) opusSize := C.jetkvm_audio_read_encode(unsafe.Pointer(&c.opusBuf[0]))
if opusSize < 0 { if opusSize < 0 {
return 0, nil, fmt.Errorf("jetkvm_audio_read_encode failed: %d", opusSize) return 0, nil, fmt.Errorf("jetkvm_audio_read_encode failed: %d", opusSize)
@ -217,16 +219,15 @@ func (c *CgoSource) ReadMessage() (uint8, []byte, error) {
func (c *CgoSource) WriteMessage(msgType uint8, payload []byte) error { func (c *CgoSource) WriteMessage(msgType uint8, payload []byte) error {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock()
if !c.connected { if !c.connected {
c.mu.Unlock()
return fmt.Errorf("not connected") return fmt.Errorf("not connected")
} }
if c.outputDevice { if c.outputDevice {
c.mu.Unlock()
return fmt.Errorf("WriteMessage only supported for input direction") return fmt.Errorf("WriteMessage only supported for input direction")
} }
c.mu.Unlock()
if msgType != ipcMsgTypeOpus { if msgType != ipcMsgTypeOpus {
return nil return nil
@ -240,7 +241,8 @@ func (c *CgoSource) WriteMessage(msgType uint8, payload []byte) error {
return fmt.Errorf("opus packet too large: %d bytes (max 1500)", len(payload)) return fmt.Errorf("opus packet too large: %d bytes (max 1500)", len(payload))
} }
// Call C function without holding mutex to avoid deadlock - C layer has its own locking // Hold mutex during C call to prevent race condition with Disconnect().
// Lock order is consistent (c.mu -> playback_mutex) in all code paths.
rc := C.jetkvm_audio_decode_write(unsafe.Pointer(&payload[0]), C.int(len(payload))) rc := C.jetkvm_audio_decode_write(unsafe.Pointer(&payload[0]), C.int(len(payload)))
if rc < 0 { if rc < 0 {
return fmt.Errorf("jetkvm_audio_decode_write failed: %d", rc) return fmt.Errorf("jetkvm_audio_decode_write failed: %d", rc)