Fix critical audio race conditions and improve reliability

- Replace volatile with C11 atomics for proper ARM memory barriers
- Fix race condition in audio source swapping (swap to nil before cleanup)
- Prevent double-close of ALSA handles via atomic ownership claim
- Add exponential backoff with 10-retry circuit breaker to prevent infinite loops
- Improve error propagation to report dual failures
- Add defensive null checks for concurrent access safety
- Simplify UI error handling with helper functions
- Fix TypeScript compilation error in packet loss dropdown
This commit is contained in:
Alex P 2025-11-19 16:01:40 +02:00
parent 8d69780061
commit 1d570a8cbf
7 changed files with 278 additions and 266 deletions

173
audio.go
View File

@ -1,6 +1,7 @@
package kvm package kvm
import ( import (
"fmt"
"io" "io"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -39,7 +40,7 @@ func initAudio() {
ensureConfigLoaded() ensureConfigLoaded()
audioOutputEnabled.Store(config.AudioOutputEnabled) audioOutputEnabled.Store(config.AudioOutputEnabled)
audioInputEnabled.Store(true) audioInputEnabled.Store(config.AudioInputAutoEnable)
audioLogger.Debug().Msg("Audio subsystem initialized") audioLogger.Debug().Msg("Audio subsystem initialized")
audioInitialized = true audioInitialized = true
@ -48,18 +49,25 @@ func initAudio() {
func getAudioConfig() audio.AudioConfig { func getAudioConfig() audio.AudioConfig {
cfg := audio.DefaultAudioConfig() cfg := audio.DefaultAudioConfig()
// Helper to validate and apply numeric ranges
validateAndApply := func(value int, min int, max int, paramName string) (int, bool) {
if value >= min && value <= max {
return value, true
}
if value != 0 {
audioLogger.Warn().Int(paramName, value).Msgf("Invalid %s, using default", paramName)
}
return 0, false
}
// Validate and apply bitrate // Validate and apply bitrate
if config.AudioBitrate >= 64 && config.AudioBitrate <= 256 { if bitrate, valid := validateAndApply(config.AudioBitrate, 64, 256, "audio bitrate"); valid {
cfg.Bitrate = uint16(config.AudioBitrate) cfg.Bitrate = uint16(bitrate)
} else if config.AudioBitrate != 0 {
audioLogger.Warn().Int("bitrate", config.AudioBitrate).Uint16("default", cfg.Bitrate).Msg("Invalid audio bitrate, using default")
} }
// Validate and apply complexity // Validate and apply complexity
if config.AudioComplexity >= 0 && config.AudioComplexity <= 10 { if complexity, valid := validateAndApply(config.AudioComplexity, 0, 10, "audio complexity"); valid {
cfg.Complexity = uint8(config.AudioComplexity) cfg.Complexity = uint8(complexity)
} else {
audioLogger.Warn().Int("complexity", config.AudioComplexity).Uint8("default", cfg.Complexity).Msg("Invalid audio complexity, using default")
} }
// Apply boolean flags directly // Apply boolean flags directly
@ -67,10 +75,8 @@ func getAudioConfig() audio.AudioConfig {
cfg.FECEnabled = config.AudioFECEnabled cfg.FECEnabled = config.AudioFECEnabled
// Validate and apply buffer periods // Validate and apply buffer periods
if config.AudioBufferPeriods >= 2 && config.AudioBufferPeriods <= 24 { if periods, valid := validateAndApply(config.AudioBufferPeriods, 2, 24, "buffer periods"); valid {
cfg.BufferPeriods = uint8(config.AudioBufferPeriods) cfg.BufferPeriods = uint8(periods)
} else if config.AudioBufferPeriods != 0 {
audioLogger.Warn().Int("buffer_periods", config.AudioBufferPeriods).Uint8("default", cfg.BufferPeriods).Msg("Invalid buffer periods, using default")
} }
// Validate and apply sample rate using a map for valid rates // Validate and apply sample rate using a map for valid rates
@ -82,10 +88,8 @@ func getAudioConfig() audio.AudioConfig {
} }
// Validate and apply packet loss percentage // Validate and apply packet loss percentage
if config.AudioPacketLossPerc >= 0 && config.AudioPacketLossPerc <= 100 { if pktLoss, valid := validateAndApply(config.AudioPacketLossPerc, 0, 100, "packet loss percentage"); valid {
cfg.PacketLossPerc = uint8(config.AudioPacketLossPerc) cfg.PacketLossPerc = uint8(pktLoss)
} else {
audioLogger.Warn().Int("packet_loss_perc", config.AudioPacketLossPerc).Uint8("default", cfg.PacketLossPerc).Msg("Invalid packet loss percentage, using default")
} }
return cfg return cfg
@ -107,91 +111,98 @@ func startAudio() error {
ensureConfigLoaded() ensureConfigLoaded()
var outputErr, inputErr error
if audioOutputEnabled.Load() && currentAudioTrack != nil { if audioOutputEnabled.Load() && currentAudioTrack != nil {
startOutputAudioUnderMutex(getAlsaDevice(config.AudioOutputSource)) outputErr = startOutputAudioUnderMutex(getAlsaDevice(config.AudioOutputSource))
} }
if audioInputEnabled.Load() && config.UsbDevices != nil && config.UsbDevices.Audio { if audioInputEnabled.Load() && config.UsbDevices != nil && config.UsbDevices.Audio {
startInputAudioUnderMutex(getAlsaDevice("usb")) inputErr = startInputAudioUnderMutex(getAlsaDevice("usb"))
} }
return nil if outputErr != nil && inputErr != nil {
return fmt.Errorf("audio start failed - output: %w, input: %v", outputErr, inputErr)
}
if outputErr != nil {
return outputErr
}
return inputErr
} }
func startOutputAudioUnderMutex(alsaOutputDevice string) { func startOutputAudioUnderMutex(alsaOutputDevice string) error {
newSource := audio.NewCgoOutputSource(alsaOutputDevice, getAudioConfig()) oldRelay := outputRelay.Swap(nil)
oldSource := outputSource.Swap(&newSource) oldSource := outputSource.Swap(nil)
newRelay := audio.NewOutputRelay(&newSource, currentAudioTrack)
oldRelay := outputRelay.Swap(newRelay)
if oldRelay != nil { if oldRelay != nil {
oldRelay.Stop() oldRelay.Stop()
} }
if oldSource != nil { if oldSource != nil {
(*oldSource).Disconnect() (*oldSource).Disconnect()
} }
newSource := audio.NewCgoOutputSource(alsaOutputDevice, getAudioConfig())
newRelay := audio.NewOutputRelay(&newSource, currentAudioTrack)
if err := newRelay.Start(); err != nil { if err := newRelay.Start(); err != nil {
audioLogger.Error().Err(err).Str("alsaOutputDevice", alsaOutputDevice).Msg("Failed to start audio output relay") audioLogger.Error().Err(err).Str("alsaOutputDevice", alsaOutputDevice).Msg("Failed to start audio output relay")
return err
} }
outputSource.Swap(&newSource)
outputRelay.Swap(newRelay)
return nil
} }
func startInputAudioUnderMutex(alsaPlaybackDevice string) { func startInputAudioUnderMutex(alsaPlaybackDevice string) error {
newSource := audio.NewCgoInputSource(alsaPlaybackDevice, getAudioConfig()) oldRelay := inputRelay.Swap(nil)
oldSource := inputSource.Swap(&newSource) oldSource := inputSource.Swap(nil)
newRelay := audio.NewInputRelay(&newSource)
oldRelay := inputRelay.Swap(newRelay)
if oldRelay != nil { if oldRelay != nil {
oldRelay.Stop() oldRelay.Stop()
} }
if oldSource != nil { if oldSource != nil {
(*oldSource).Disconnect() (*oldSource).Disconnect()
} }
newSource := audio.NewCgoInputSource(alsaPlaybackDevice, getAudioConfig())
newRelay := audio.NewInputRelay(&newSource)
if err := newRelay.Start(); err != nil { if err := newRelay.Start(); err != nil {
audioLogger.Error().Err(err).Str("alsaPlaybackDevice", alsaPlaybackDevice).Msg("Failed to start input relay") audioLogger.Error().Err(err).Str("alsaPlaybackDevice", alsaPlaybackDevice).Msg("Failed to start input relay")
return err
} }
}
// stopAudioComponents safely stops and cleans up audio components inputSource.Swap(&newSource)
func stopAudioComponents(relay *atomic.Pointer[audio.OutputRelay], source *atomic.Pointer[audio.AudioSource]) { inputRelay.Swap(newRelay)
audioMutex.Lock() return nil
oldRelay := relay.Swap(nil)
oldSource := source.Swap(nil)
audioMutex.Unlock()
if oldRelay != nil {
oldRelay.Stop()
}
if oldSource != nil {
(*oldSource).Disconnect()
}
}
// stopAudioComponentsInput safely stops and cleans up input audio components
func stopAudioComponentsInput(relay *atomic.Pointer[audio.InputRelay], source *atomic.Pointer[audio.AudioSource]) {
audioMutex.Lock()
oldRelay := relay.Swap(nil)
oldSource := source.Swap(nil)
audioMutex.Unlock()
if oldRelay != nil {
oldRelay.Stop()
}
if oldSource != nil {
(*oldSource).Disconnect()
}
} }
func stopOutputAudio() { func stopOutputAudio() {
stopAudioComponents(&outputRelay, &outputSource) audioMutex.Lock()
oldRelay := outputRelay.Swap(nil)
oldSource := outputSource.Swap(nil)
audioMutex.Unlock()
if oldRelay != nil {
oldRelay.Stop()
}
if oldSource != nil {
(*oldSource).Disconnect()
}
} }
func stopInputAudio() { func stopInputAudio() {
stopAudioComponentsInput(&inputRelay, &inputSource) audioMutex.Lock()
oldRelay := inputRelay.Swap(nil)
oldSource := inputSource.Swap(nil)
audioMutex.Unlock()
if oldRelay != nil {
oldRelay.Stop()
}
if oldSource != nil {
(*oldSource).Disconnect()
}
} }
func stopAudio() { func stopAudio() {
@ -234,13 +245,16 @@ func setAudioTrack(audioTrack *webrtc.TrackLocalStaticSample) {
// Start audio without taking mutex again (already holding audioMutex) // Start audio without taking mutex again (already holding audioMutex)
if audioInitialized && activeConnections.Load() > 0 && audioOutputEnabled.Load() && currentAudioTrack != nil { if audioInitialized && activeConnections.Load() > 0 && audioOutputEnabled.Load() && currentAudioTrack != nil {
startOutputAudioUnderMutex(getAlsaDevice(config.AudioOutputSource)) if err := startOutputAudioUnderMutex(getAlsaDevice(config.AudioOutputSource)); err != nil {
audioLogger.Error().Err(err).Msg("Failed to start output audio after track change")
}
} }
} }
func setPendingInputTrack(track *webrtc.TrackRemote) { func setPendingInputTrack(track *webrtc.TrackRemote) {
trackID := track.ID() trackID := new(string)
currentInputTrack.Store(&trackID) *trackID = track.ID()
currentInputTrack.Store(trackID)
go handleInputTrackForSession(track) go handleInputTrackForSession(track)
} }
@ -268,6 +282,15 @@ func SetAudioInputEnabled(enabled bool) error {
return nil return nil
} }
// SetAudioOutputSource switches between HDMI (hw:0,0) and USB (hw:1,0) audio capture.
//
// The function returns immediately after updating and persisting the config change,
// while the actual audio device switch happens asynchronously in the background:
// - Config save is synchronous to ensure the change persists even if the process crashes
// - Audio restart is async to avoid blocking the RPC caller during ALSA reconfiguration
//
// Note: The HDMI audio device (hw:0,0) can take 30-60 seconds to initialize due to
// TC358743 hardware characteristics. Callers receive success before audio actually switches.
func SetAudioOutputSource(source string) error { func SetAudioOutputSource(source string) error {
if source != "hdmi" && source != "usb" { if source != "hdmi" && source != "usb" {
return nil return nil
@ -280,6 +303,13 @@ func SetAudioOutputSource(source string) error {
config.AudioOutputSource = source config.AudioOutputSource = source
// Save config synchronously before starting async audio operations
if err := SaveConfig(); err != nil {
audioLogger.Error().Err(err).Msg("Failed to save config after audio source change")
return err
}
// Handle audio restart asynchronously
go func() { go func() {
stopOutputAudio() stopOutputAudio()
if err := startAudio(); err != nil { if err := startAudio(); err != nil {
@ -287,12 +317,6 @@ func SetAudioOutputSource(source string) error {
} }
}() }()
go func() {
if err := SaveConfig(); err != nil {
audioLogger.Error().Err(err).Msg("Failed to save config after audio source change")
}
}()
return nil return nil
} }
@ -374,6 +398,11 @@ func processInputPacket(opusData []byte) error {
return nil return nil
} }
// Defensive null check - ensure dereferenced pointer is valid
if *source == nil {
return nil
}
// Ensure source is connected // Ensure source is connected
if !(*source).IsConnected() { if !(*source).IsConnected() {
if err := (*source).Connect(); err != nil { if err := (*source).Connect(); err != nil {

View File

@ -90,7 +90,7 @@ type Config struct {
IncludePreRelease bool `json:"include_pre_release"` IncludePreRelease bool `json:"include_pre_release"`
HashedPassword string `json:"hashed_password"` HashedPassword string `json:"hashed_password"`
LocalAuthToken string `json:"local_auth_token"` LocalAuthToken string `json:"local_auth_token"`
LocalAuthMode string `json:"localAuthMode"` //TODO: fix it with migration LocalAuthMode string `json:"localAuthMode"` // Uses camelCase for backwards compatibility with existing configs
LocalLoopbackOnly bool `json:"local_loopback_only"` LocalLoopbackOnly bool `json:"local_loopback_only"`
WakeOnLanDevices []WakeOnLanDevice `json:"wake_on_lan_devices"` WakeOnLanDevices []WakeOnLanDevice `json:"wake_on_lan_devices"`
KeyboardMacros []KeyboardMacro `json:"keyboard_macros"` KeyboardMacros []KeyboardMacro `json:"keyboard_macros"`

View File

@ -25,6 +25,7 @@
#include <time.h> #include <time.h>
#include <signal.h> #include <signal.h>
#include <pthread.h> #include <pthread.h>
#include <stdatomic.h>
// ARM NEON SIMD support (required - JetKVM hardware provides ARM Cortex-A7 with NEON) // ARM NEON SIMD support (required - JetKVM hardware provides ARM Cortex-A7 with NEON)
#include <arm_neon.h> #include <arm_neon.h>
@ -46,19 +47,20 @@ static OpusDecoder *decoder = NULL;
// Audio format (S16_LE @ 48kHz) // Audio format (S16_LE @ 48kHz)
static uint32_t sample_rate = 48000; static uint32_t sample_rate = 48000;
static uint8_t capture_channels = 2; // OUTPUT: HDMI stereo → client static uint8_t capture_channels = 2; // OUTPUT: HDMI/USB stereo → client (configurable via update_audio_constants)
static uint8_t playback_channels = 1; // INPUT: Client mono mic → device static uint8_t playback_channels = 1; // INPUT: Client mono mic → device (always mono for USB audio gadget)
static uint16_t frame_size = 960; // 20ms frames at 48kHz static uint16_t frame_size = 960; // 20ms frames at 48kHz
static uint32_t opus_bitrate = 192000; static uint32_t opus_bitrate = 192000;
static uint8_t opus_complexity = 8; static uint8_t opus_complexity = 8;
static uint16_t max_packet_size = 1500; static uint16_t max_packet_size = 1500;
#define OPUS_VBR 1 // Opus encoder configuration constants (see opus_defines.h for full enum values)
#define OPUS_VBR_CONSTRAINT 1 #define OPUS_VBR 1 // Variable bitrate mode enabled
#define OPUS_SIGNAL_TYPE 3002 #define OPUS_VBR_CONSTRAINT 1 // Constrained VBR maintains bitrate ceiling
#define OPUS_BANDWIDTH 1104 #define OPUS_SIGNAL_TYPE 3002 // OPUS_SIGNAL_MUSIC (optimized for music/audio content)
#define OPUS_LSB_DEPTH 16 #define OPUS_BANDWIDTH 1104 // OPUS_BANDWIDTH_FULLBAND (0-20kHz frequency range)
#define OPUS_LSB_DEPTH 16 // 16-bit PCM sample depth (S16_LE format)
static uint8_t opus_dtx_enabled = 1; static uint8_t opus_dtx_enabled = 1;
static uint8_t opus_fec_enabled = 1; static uint8_t opus_fec_enabled = 1;
@ -70,10 +72,15 @@ static uint32_t sleep_milliseconds = 1;
static uint8_t max_attempts_global = 5; static uint8_t max_attempts_global = 5;
static uint32_t max_backoff_us_global = 500000; static uint32_t max_backoff_us_global = 500000;
static volatile int capture_stop_requested = 0; static atomic_int capture_stop_requested = 0;
static volatile int playback_stop_requested = 0; static atomic_int playback_stop_requested = 0;
// Mutexes to protect concurrent access to ALSA handles during close // Mutexes to protect concurrent access to ALSA handles during close
// These prevent race conditions when jetkvm_audio_*_close() is called while
// jetkvm_audio_read_encode() or jetkvm_audio_decode_write() are executing.
// The hot path functions acquire these mutexes briefly to validate handle
// pointers, then release before slow ALSA/Opus operations to avoid holding
// locks during I/O. Handle comparison checks detect races after operations.
static pthread_mutex_t capture_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t capture_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t playback_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t playback_mutex = PTHREAD_MUTEX_INITIALIZER;
@ -131,17 +138,21 @@ void update_audio_decoder_constants(uint32_t sr, uint8_t ch, uint16_t fs, uint16
/** /**
* Initialize ALSA device names from environment variables * Initialize ALSA device names from environment variables
* Must be called before jetkvm_audio_capture_init or jetkvm_audio_playback_init * Must be called before jetkvm_audio_capture_init or jetkvm_audio_playback_init
*
* Device mapping (set via ALSA_CAPTURE_DEVICE/ALSA_PLAYBACK_DEVICE):
* hw:0,0 = TC358743 HDMI audio input (for OUTPUT path capture)
* hw:1,0 = USB Audio Gadget (for OUTPUT path capture or INPUT path playback)
*/ */
static void init_alsa_devices_from_env(void) { static void init_alsa_devices_from_env(void) {
// Always read from environment to support device switching // Always read from environment to support device switching
alsa_capture_device = getenv("ALSA_CAPTURE_DEVICE"); alsa_capture_device = getenv("ALSA_CAPTURE_DEVICE");
if (alsa_capture_device == NULL || alsa_capture_device[0] == '\0') { if (alsa_capture_device == NULL || alsa_capture_device[0] == '\0') {
alsa_capture_device = "hw:1,0"; // Default to USB gadget alsa_capture_device = "hw:1,0"; // Default: USB gadget audio for capture
} }
alsa_playback_device = getenv("ALSA_PLAYBACK_DEVICE"); alsa_playback_device = getenv("ALSA_PLAYBACK_DEVICE");
if (alsa_playback_device == NULL || alsa_playback_device[0] == '\0') { if (alsa_playback_device == NULL || alsa_playback_device[0] == '\0') {
alsa_playback_device = "hw:1,0"; // Default to USB gadget alsa_playback_device = "hw:1,0"; // Default: USB gadget audio for playback
} }
} }
@ -227,16 +238,22 @@ static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream
* Handle ALSA I/O errors with recovery attempts * Handle ALSA I/O errors with recovery attempts
* @param handle Pointer to PCM handle to use for recovery operations * @param handle Pointer to PCM handle to use for recovery operations
* @param valid_handle Pointer to the valid handle to check against (for race detection) * @param valid_handle Pointer to the valid handle to check against (for race detection)
* @param stop_flag Pointer to volatile stop flag * @param stop_flag Pointer to atomic stop flag
* @param mutex Mutex to unlock on error * @param mutex Mutex to unlock on error
* @param pcm_rc Error code from ALSA I/O operation * @param pcm_rc Error code from ALSA I/O operation
* @param recovery_attempts Pointer to uint8_t recovery attempt counter * @param recovery_attempts Pointer to uint8_t recovery attempt counter
* @param sleep_ms Milliseconds to sleep during recovery * @param sleep_ms Milliseconds to sleep during recovery
* @param max_attempts Maximum recovery attempts allowed * @param max_attempts Maximum recovery attempts allowed
* @return 1=retry, 0=skip frame, -1=error (mutex already unlocked) * @return Three possible outcomes:
* 1 = Retry operation (error was recovered, mutex still held by caller)
* 0 = Skip this frame and continue (mutex still held, caller must unlock)
* -1 = Fatal error, abort operation (mutex ALREADY UNLOCKED by this function)
*
* CRITICAL: On return value -1, the mutex has already been unlocked. The caller
* must NOT unlock again or proceed with further I/O operations.
*/ */
static int handle_alsa_error(snd_pcm_t *handle, snd_pcm_t **valid_handle, static int handle_alsa_error(snd_pcm_t *handle, snd_pcm_t **valid_handle,
volatile int *stop_flag, pthread_mutex_t *mutex, atomic_int *stop_flag, pthread_mutex_t *mutex,
int pcm_rc, uint8_t *recovery_attempts, int pcm_rc, uint8_t *recovery_attempts,
uint32_t sleep_ms, uint8_t max_attempts) { uint32_t sleep_ms, uint8_t max_attempts) {
int err; int err;
@ -448,8 +465,7 @@ int jetkvm_audio_capture_init() {
if (encoder != NULL || pcm_capture_handle != NULL) { if (encoder != NULL || pcm_capture_handle != NULL) {
capture_initialized = 0; capture_initialized = 0;
capture_stop_requested = 1; atomic_store(&capture_stop_requested, 1);
__sync_synchronize();
if (pcm_capture_handle) { if (pcm_capture_handle) {
snd_pcm_drop(pcm_capture_handle); snd_pcm_drop(pcm_capture_handle);
@ -474,7 +490,7 @@ int jetkvm_audio_capture_init() {
fprintf(stderr, "Failed to open ALSA capture device %s: %s\n", fprintf(stderr, "Failed to open ALSA capture device %s: %s\n",
alsa_capture_device, snd_strerror(err)); alsa_capture_device, snd_strerror(err));
fflush(stderr); fflush(stderr);
capture_stop_requested = 0; atomic_store(&capture_stop_requested, 0);
capture_initializing = 0; capture_initializing = 0;
return -1; return -1;
} }
@ -486,7 +502,7 @@ int jetkvm_audio_capture_init() {
snd_pcm_t *handle = pcm_capture_handle; snd_pcm_t *handle = pcm_capture_handle;
pcm_capture_handle = NULL; pcm_capture_handle = NULL;
snd_pcm_close(handle); snd_pcm_close(handle);
capture_stop_requested = 0; atomic_store(&capture_stop_requested, 0);
capture_initializing = 0; capture_initializing = 0;
return -2; return -2;
} }
@ -503,7 +519,7 @@ int jetkvm_audio_capture_init() {
pcm_capture_handle = NULL; pcm_capture_handle = NULL;
snd_pcm_close(handle); snd_pcm_close(handle);
} }
capture_stop_requested = 0; atomic_store(&capture_stop_requested, 0);
capture_initializing = 0; capture_initializing = 0;
return -3; return -3;
} }
@ -521,7 +537,7 @@ int jetkvm_audio_capture_init() {
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(opus_packet_loss_perc)); opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(opus_packet_loss_perc));
capture_initialized = 1; capture_initialized = 1;
capture_stop_requested = 0; atomic_store(&capture_stop_requested, 0);
capture_initializing = 0; capture_initializing = 0;
return 0; return 0;
} }
@ -539,7 +555,7 @@ __attribute__((hot)) int jetkvm_audio_read_encode(void * __restrict__ opus_buf)
uint8_t recovery_attempts = 0; uint8_t recovery_attempts = 0;
const uint8_t max_recovery_attempts = 3; const uint8_t max_recovery_attempts = 3;
if (__builtin_expect(capture_stop_requested, 0)) { if (__builtin_expect(atomic_load(&capture_stop_requested), 0)) {
return -1; return -1;
} }
@ -556,16 +572,12 @@ __attribute__((hot)) int jetkvm_audio_read_encode(void * __restrict__ opus_buf)
} }
retry_read: retry_read:
if (__builtin_expect(capture_stop_requested, 0)) { if (__builtin_expect(atomic_load(&capture_stop_requested), 0)) {
pthread_mutex_unlock(&capture_mutex); pthread_mutex_unlock(&capture_mutex);
return -1; return -1;
} }
snd_pcm_t *handle = pcm_capture_handle; snd_pcm_t *handle = pcm_capture_handle;
if (!handle) {
pthread_mutex_unlock(&capture_mutex);
return -1;
}
pcm_rc = snd_pcm_readi(handle, pcm_buffer, frame_size); pcm_rc = snd_pcm_readi(handle, pcm_buffer, frame_size);
@ -601,11 +613,6 @@ retry_read:
nb_bytes = opus_encode(enc, pcm_buffer, frame_size, out, max_packet_size); nb_bytes = opus_encode(enc, pcm_buffer, frame_size, out, max_packet_size);
if (enc != encoder) {
pthread_mutex_unlock(&capture_mutex);
return -1;
}
pthread_mutex_unlock(&capture_mutex); pthread_mutex_unlock(&capture_mutex);
return nb_bytes; return nb_bytes;
} }
@ -634,7 +641,7 @@ int jetkvm_audio_playback_init() {
if (decoder != NULL || pcm_playback_handle != NULL) { if (decoder != NULL || pcm_playback_handle != NULL) {
playback_initialized = 0; playback_initialized = 0;
playback_stop_requested = 1; atomic_store(&playback_stop_requested, 1);
__sync_synchronize(); __sync_synchronize();
if (pcm_playback_handle) { if (pcm_playback_handle) {
@ -662,7 +669,7 @@ int jetkvm_audio_playback_init() {
fflush(stderr); fflush(stderr);
err = safe_alsa_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK); err = safe_alsa_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK);
if (err < 0) { if (err < 0) {
playback_stop_requested = 0; atomic_store(&playback_stop_requested, 0);
playback_initializing = 0; playback_initializing = 0;
return -1; return -1;
} }
@ -675,7 +682,7 @@ int jetkvm_audio_playback_init() {
snd_pcm_t *handle = pcm_playback_handle; snd_pcm_t *handle = pcm_playback_handle;
pcm_playback_handle = NULL; pcm_playback_handle = NULL;
snd_pcm_close(handle); snd_pcm_close(handle);
playback_stop_requested = 0; atomic_store(&playback_stop_requested, 0);
playback_initializing = 0; playback_initializing = 0;
return -1; return -1;
} }
@ -690,13 +697,13 @@ int jetkvm_audio_playback_init() {
snd_pcm_t *handle = pcm_playback_handle; snd_pcm_t *handle = pcm_playback_handle;
pcm_playback_handle = NULL; pcm_playback_handle = NULL;
snd_pcm_close(handle); snd_pcm_close(handle);
playback_stop_requested = 0; atomic_store(&playback_stop_requested, 0);
playback_initializing = 0; playback_initializing = 0;
return -2; return -2;
} }
playback_initialized = 1; playback_initialized = 1;
playback_stop_requested = 0; atomic_store(&playback_stop_requested, 0);
playback_initializing = 0; playback_initializing = 0;
return 0; return 0;
} }
@ -715,7 +722,7 @@ __attribute__((hot)) int jetkvm_audio_decode_write(void * __restrict__ opus_buf,
uint8_t recovery_attempts = 0; uint8_t recovery_attempts = 0;
const uint8_t max_recovery_attempts = 3; const uint8_t max_recovery_attempts = 3;
if (__builtin_expect(playback_stop_requested, 0)) { if (__builtin_expect(atomic_load(&playback_stop_requested), 0)) {
return -1; return -1;
} }
@ -744,24 +751,9 @@ __attribute__((hot)) int jetkvm_audio_decode_write(void * __restrict__ opus_buf,
// decode_fec=0 means normal decode (FEC data is used automatically when present) // decode_fec=0 means normal decode (FEC data is used automatically when present)
pcm_frames = opus_decode(dec, in, opus_size, pcm_buffer, frame_size, 0); pcm_frames = opus_decode(dec, in, opus_size, pcm_buffer, frame_size, 0);
if (dec != decoder) {
pthread_mutex_unlock(&playback_mutex);
return -1;
}
if (__builtin_expect(pcm_frames < 0, 0)) { if (__builtin_expect(pcm_frames < 0, 0)) {
if (!dec || dec != decoder) {
pthread_mutex_unlock(&playback_mutex);
return -1;
}
pcm_frames = opus_decode(dec, NULL, 0, pcm_buffer, frame_size, 1); pcm_frames = opus_decode(dec, NULL, 0, pcm_buffer, frame_size, 1);
if (dec != decoder) {
pthread_mutex_unlock(&playback_mutex);
return -1;
}
if (pcm_frames < 0) { if (pcm_frames < 0) {
pthread_mutex_unlock(&playback_mutex); pthread_mutex_unlock(&playback_mutex);
return -1; return -1;
@ -769,17 +761,12 @@ __attribute__((hot)) int jetkvm_audio_decode_write(void * __restrict__ opus_buf,
} }
retry_write: retry_write:
if (__builtin_expect(playback_stop_requested, 0)) { if (__builtin_expect(atomic_load(&playback_stop_requested), 0)) {
pthread_mutex_unlock(&playback_mutex); pthread_mutex_unlock(&playback_mutex);
return -1; return -1;
} }
snd_pcm_t *handle = pcm_playback_handle; snd_pcm_t *handle = pcm_playback_handle;
if (!handle) {
pthread_mutex_unlock(&playback_mutex);
return -1;
}
pcm_rc = snd_pcm_writei(handle, pcm_buffer, pcm_frames); pcm_rc = snd_pcm_writei(handle, pcm_buffer, pcm_frames);
if (handle != pcm_playback_handle) { if (handle != pcm_playback_handle) {
@ -816,19 +803,18 @@ retry_write:
*/ */
typedef void (*codec_destroy_fn)(void*); typedef void (*codec_destroy_fn)(void*);
static void close_audio_stream(volatile int *stop_requested, volatile int *initializing, static void close_audio_stream(atomic_int *stop_requested, volatile int *initializing,
volatile int *initialized, pthread_mutex_t *mutex, volatile int *initialized, pthread_mutex_t *mutex,
snd_pcm_t **pcm_handle, void **codec, snd_pcm_t **pcm_handle, void **codec,
codec_destroy_fn destroy_codec) { codec_destroy_fn destroy_codec) {
*stop_requested = 1; atomic_store(stop_requested, 1);
__sync_synchronize();
while (*initializing) { while (*initializing) {
sched_yield(); sched_yield();
} }
if (__sync_bool_compare_and_swap(initialized, 1, 0) == 0) { if (__sync_bool_compare_and_swap(initialized, 1, 0) == 0) {
*stop_requested = 0; atomic_store(stop_requested, 0);
return; return;
} }
@ -837,23 +823,23 @@ static void close_audio_stream(volatile int *stop_requested, volatile int *initi
pthread_mutex_lock(mutex); pthread_mutex_lock(mutex);
if (*pcm_handle) { snd_pcm_t *handle_to_close = *pcm_handle;
snd_pcm_drop(*pcm_handle); void *codec_to_destroy = *codec;
} *pcm_handle = NULL;
*codec = NULL;
if (*codec) {
destroy_codec(*codec);
*codec = NULL;
}
if (*pcm_handle) {
snd_pcm_close(*pcm_handle);
*pcm_handle = NULL;
}
pthread_mutex_unlock(mutex); pthread_mutex_unlock(mutex);
*stop_requested = 0; if (handle_to_close) {
snd_pcm_drop(handle_to_close);
snd_pcm_close(handle_to_close);
}
if (codec_to_destroy) {
destroy_codec(codec_to_destroy);
}
atomic_store(stop_requested, 0);
} }
void jetkvm_audio_playback_close() { void jetkvm_audio_playback_close() {

View File

@ -70,27 +70,49 @@ func (r *OutputRelay) Stop() {
func (r *OutputRelay) relayLoop() { func (r *OutputRelay) relayLoop() {
defer close(r.stopped) defer close(r.stopped)
const reconnectDelay = 1 * time.Second const initialDelay = 1 * time.Second
const maxDelay = 30 * time.Second
const maxRetries = 10
retryDelay := initialDelay
consecutiveFailures := 0
for r.running.Load() { for r.running.Load() {
if !(*r.source).IsConnected() { if !(*r.source).IsConnected() {
if err := (*r.source).Connect(); err != nil { if err := (*r.source).Connect(); err != nil {
r.logger.Debug().Err(err).Msg("failed to connect, will retry") consecutiveFailures++
time.Sleep(reconnectDelay) if consecutiveFailures >= maxRetries {
r.logger.Error().Int("failures", consecutiveFailures).Msg("Max connection retries exceeded, stopping relay")
return
}
r.logger.Debug().Err(err).Int("failures", consecutiveFailures).Dur("retry_delay", retryDelay).Msg("failed to connect, will retry")
time.Sleep(retryDelay)
retryDelay = min(retryDelay*2, maxDelay)
continue continue
} }
consecutiveFailures = 0
retryDelay = initialDelay
} }
msgType, payload, err := (*r.source).ReadMessage() msgType, payload, err := (*r.source).ReadMessage()
if err != nil { if err != nil {
if r.running.Load() { if r.running.Load() {
r.logger.Warn().Err(err).Msg("read error, reconnecting") consecutiveFailures++
if consecutiveFailures >= maxRetries {
r.logger.Error().Int("failures", consecutiveFailures).Msg("Max read retries exceeded, stopping relay")
return
}
r.logger.Warn().Err(err).Int("failures", consecutiveFailures).Msg("read error, reconnecting")
(*r.source).Disconnect() (*r.source).Disconnect()
time.Sleep(reconnectDelay) time.Sleep(retryDelay)
retryDelay = min(retryDelay*2, maxDelay)
} }
continue continue
} }
consecutiveFailures = 0
retryDelay = initialDelay
if msgType == ipcMsgTypeOpus && len(payload) > 0 { if msgType == ipcMsgTypeOpus && len(payload) > 0 {
r.sample.Data = payload r.sample.Data = payload
if err := r.audioTrack.WriteSample(r.sample); err != nil { if err := r.audioTrack.WriteSample(r.sample); err != nil {

View File

@ -229,7 +229,10 @@ func rpcSetEDID(edid string) error {
} }
config.EdidString = edid config.EdidString = edid
_ = SaveConfig() if err := SaveConfig(); err != nil {
logger.Error().Err(err).Msg("Failed to save config after EDID change")
return err
}
return nil return nil
} }

View File

@ -19,6 +19,15 @@ export default function AudioPopover() {
const [micLoading, setMicLoading] = useState(false); const [micLoading, setMicLoading] = useState(false);
const isHttps = isSecureContext(); const isHttps = isSecureContext();
// Helper function to handle RPC errors consistently
const handleRpcError = (resp: JsonRpcResponse, errorMsg?: string): boolean => {
if ("error" in resp) {
notifications.error(errorMsg || String(resp.error.data || m.unknown_error()));
return true;
}
return false;
};
useEffect(() => { useEffect(() => {
send("getAudioOutputEnabled", {}, (resp: JsonRpcResponse) => { send("getAudioOutputEnabled", {}, (resp: JsonRpcResponse) => {
if ("error" in resp) { if ("error" in resp) {
@ -46,12 +55,13 @@ export default function AudioPopover() {
const errorMsg = enabled const errorMsg = enabled
? m.audio_output_failed_enable({ error: String(resp.error.data || m.unknown_error()) }) ? m.audio_output_failed_enable({ error: String(resp.error.data || m.unknown_error()) })
: m.audio_output_failed_disable({ error: String(resp.error.data || m.unknown_error()) }); : m.audio_output_failed_disable({ error: String(resp.error.data || m.unknown_error()) });
notifications.error(errorMsg); handleRpcError(resp, errorMsg);
} else { return;
setAudioOutputEnabled(enabled);
const successMsg = enabled ? m.audio_output_enabled() : m.audio_output_disabled();
notifications.success(successMsg);
} }
setAudioOutputEnabled(enabled);
const successMsg = enabled ? m.audio_output_enabled() : m.audio_output_disabled();
notifications.success(successMsg);
}); });
}, [send]); }, [send]);
@ -63,10 +73,11 @@ export default function AudioPopover() {
const errorMsg = enabled const errorMsg = enabled
? m.audio_input_failed_enable({ error: String(resp.error.data || m.unknown_error()) }) ? m.audio_input_failed_enable({ error: String(resp.error.data || m.unknown_error()) })
: m.audio_input_failed_disable({ error: String(resp.error.data || m.unknown_error()) }); : m.audio_input_failed_disable({ error: String(resp.error.data || m.unknown_error()) });
notifications.error(errorMsg); handleRpcError(resp, errorMsg);
} else { return;
setMicrophoneEnabled(enabled);
} }
setMicrophoneEnabled(enabled);
}); });
}, [send, setMicrophoneEnabled]); }, [send, setMicrophoneEnabled]);

View File

@ -20,15 +20,26 @@ interface AudioConfigResult {
packet_loss_perc: number; packet_loss_perc: number;
} }
// Backend default values - single source of truth // UI display defaults - used to mark default options in dropdown menus
// Note: These should match backend defaults in config.go, but are fetched dynamically from API
const AUDIO_DEFAULTS = { const AUDIO_DEFAULTS = {
bitrate: 192, bitrate: 192,
complexity: 8, complexity: 8,
packetLossPerc: 0, packetLossPerc: 20, // Backend default is 20, not 0
} as const; } as const;
export default function SettingsAudioRoute() { export default function SettingsAudioRoute() {
const { send } = useJsonRpc(); const { send } = useJsonRpc();
// Helper function to handle RPC errors consistently
const handleRpcError = (resp: JsonRpcResponse, defaultMsg?: string) => {
if ("error" in resp) {
notifications.error(String(resp.error.data || defaultMsg || m.unknown_error()));
return true;
}
return false;
};
const { const {
setAudioOutputEnabled, setAudioOutputEnabled,
setAudioInputAutoEnable, setAudioInputAutoEnable,
@ -92,6 +103,7 @@ export default function SettingsAudioRoute() {
notifications.error(errorMsg); notifications.error(errorMsg);
return; return;
} }
setAudioOutputEnabled(enabled); setAudioOutputEnabled(enabled);
const successMsg = enabled ? m.audio_output_enabled() : m.audio_output_disabled(); const successMsg = enabled ? m.audio_output_enabled() : m.audio_output_disabled();
notifications.success(successMsg); notifications.success(successMsg);
@ -101,9 +113,13 @@ export default function SettingsAudioRoute() {
const handleAudioOutputSourceChange = (source: string) => { const handleAudioOutputSourceChange = (source: string) => {
send("setAudioOutputSource", { source }, (resp: JsonRpcResponse) => { send("setAudioOutputSource", { source }, (resp: JsonRpcResponse) => {
if ("error" in resp) { if ("error" in resp) {
notifications.error(m.audio_settings_output_source_failed({ error: String(resp.error.data || m.unknown_error()) })); const errorMsg = m.audio_settings_output_source_failed({
error: String(resp.error.data || m.unknown_error())
});
notifications.error(errorMsg);
return; return;
} }
setAudioOutputSource(source); setAudioOutputSource(source);
notifications.success(m.audio_settings_output_source_success()); notifications.success(m.audio_settings_output_source_success());
}); });
@ -111,10 +127,8 @@ export default function SettingsAudioRoute() {
const handleAudioInputAutoEnableChange = (enabled: boolean) => { const handleAudioInputAutoEnableChange = (enabled: boolean) => {
send("setAudioInputAutoEnable", { enabled }, (resp: JsonRpcResponse) => { send("setAudioInputAutoEnable", { enabled }, (resp: JsonRpcResponse) => {
if ("error" in resp) { if (handleRpcError(resp)) return;
notifications.error(String(resp.error.data || m.unknown_error()));
return;
}
setAudioInputAutoEnable(enabled); setAudioInputAutoEnable(enabled);
const successMsg = enabled const successMsg = enabled
? m.audio_input_auto_enable_enabled() ? m.audio_input_auto_enable_enabled()
@ -123,39 +137,36 @@ export default function SettingsAudioRoute() {
}); });
}; };
const handleAudioConfigChange = ( // Create a configuration object from current state
bitrate: number, const getCurrentConfig = () => ({
complexity: number, bitrate: audioBitrate,
dtxEnabled: boolean, complexity: audioComplexity,
fecEnabled: boolean, dtxEnabled: audioDTXEnabled,
bufferPeriods: number, fecEnabled: audioFECEnabled,
packetLossPerc: number bufferPeriods: audioBufferPeriods,
) => { packetLossPerc: audioPacketLossPerc,
send( });
"setAudioConfig",
{ bitrate, complexity, dtxEnabled, fecEnabled, bufferPeriods, packetLossPerc }, const handleAudioConfigChange = (updates: Partial<typeof getCurrentConfig>) => {
(resp: JsonRpcResponse) => { const config = { ...getCurrentConfig(), ...updates };
if ("error" in resp) {
notifications.error(String(resp.error.data || m.unknown_error())); send("setAudioConfig", config, (resp: JsonRpcResponse) => {
return; if (handleRpcError(resp)) return;
}
setAudioBitrate(bitrate); // Update all state values from the config
setAudioComplexity(complexity); setAudioBitrate(config.bitrate);
setAudioDTXEnabled(dtxEnabled); setAudioComplexity(config.complexity);
setAudioFECEnabled(fecEnabled); setAudioDTXEnabled(config.dtxEnabled);
setAudioBufferPeriods(bufferPeriods); setAudioFECEnabled(config.fecEnabled);
setAudioPacketLossPerc(packetLossPerc); setAudioBufferPeriods(config.bufferPeriods);
notifications.success(m.audio_settings_config_updated()); setAudioPacketLossPerc(config.packetLossPerc);
} notifications.success(m.audio_settings_config_updated());
); });
}; };
const handleRestartAudio = () => { const handleRestartAudio = () => {
send("restartAudioOutput", {}, (resp: JsonRpcResponse) => { send("restartAudioOutput", {}, (resp: JsonRpcResponse) => {
if ("error" in resp) { if (handleRpcError(resp)) return;
notifications.error(String(resp.error.data || m.unknown_error()));
return;
}
notifications.success(m.audio_settings_applied()); notifications.success(m.audio_settings_applied());
}); });
}; };
@ -217,16 +228,7 @@ export default function SettingsAudioRoute() {
{ value: "192", label: `192 kbps${192 === AUDIO_DEFAULTS.bitrate ? m.audio_settings_default_suffix() : ''}` }, { value: "192", label: `192 kbps${192 === AUDIO_DEFAULTS.bitrate ? m.audio_settings_default_suffix() : ''}` },
{ value: "256", label: "256 kbps" }, { value: "256", label: "256 kbps" },
]} ]}
onChange={(e) => onChange={(e) => handleAudioConfigChange({ bitrate: parseInt(e.target.value) })}
handleAudioConfigChange(
parseInt(e.target.value),
audioComplexity,
audioDTXEnabled,
audioFECEnabled,
audioBufferPeriods,
audioPacketLossPerc
)
}
/> />
</SettingsItem> </SettingsItem>
@ -244,16 +246,7 @@ export default function SettingsAudioRoute() {
{ value: "8", label: `8${8 === AUDIO_DEFAULTS.complexity ? m.audio_settings_default_suffix() : ''}` }, { value: "8", label: `8${8 === AUDIO_DEFAULTS.complexity ? m.audio_settings_default_suffix() : ''}` },
{ value: "10", label: "10 (best quality)" }, { value: "10", label: "10 (best quality)" },
]} ]}
onChange={(e) => onChange={(e) => handleAudioConfigChange({ complexity: parseInt(e.target.value) })}
handleAudioConfigChange(
audioBitrate,
parseInt(e.target.value),
audioDTXEnabled,
audioFECEnabled,
audioBufferPeriods,
audioPacketLossPerc
)
}
/> />
</SettingsItem> </SettingsItem>
@ -263,16 +256,7 @@ export default function SettingsAudioRoute() {
> >
<Checkbox <Checkbox
checked={audioDTXEnabled} checked={audioDTXEnabled}
onChange={(e) => onChange={(e) => handleAudioConfigChange({ dtxEnabled: e.target.checked })}
handleAudioConfigChange(
audioBitrate,
audioComplexity,
e.target.checked,
audioFECEnabled,
audioBufferPeriods,
audioPacketLossPerc
)
}
/> />
</SettingsItem> </SettingsItem>
@ -282,16 +266,7 @@ export default function SettingsAudioRoute() {
> >
<Checkbox <Checkbox
checked={audioFECEnabled} checked={audioFECEnabled}
onChange={(e) => onChange={(e) => handleAudioConfigChange({ fecEnabled: e.target.checked })}
handleAudioConfigChange(
audioBitrate,
audioComplexity,
audioDTXEnabled,
e.target.checked,
audioBufferPeriods,
audioPacketLossPerc
)
}
/> />
</SettingsItem> </SettingsItem>
@ -309,16 +284,7 @@ export default function SettingsAudioRoute() {
{ value: "16", label: "16 (320ms)" }, { value: "16", label: "16 (320ms)" },
{ value: "24", label: "24 (480ms)" }, { value: "24", label: "24 (480ms)" },
]} ]}
onChange={(e) => onChange={(e) => handleAudioConfigChange({ bufferPeriods: parseInt(e.target.value) })}
handleAudioConfigChange(
audioBitrate,
audioComplexity,
audioDTXEnabled,
audioFECEnabled,
parseInt(e.target.value),
audioPacketLossPerc
)
}
/> />
</SettingsItem> </SettingsItem>
@ -327,11 +293,15 @@ export default function SettingsAudioRoute() {
description={m.audio_settings_sample_rate_description()} description={m.audio_settings_sample_rate_description()}
> >
<div className="text-sm text-gray-700 dark:text-gray-300 font-medium"> <div className="text-sm text-gray-700 dark:text-gray-300 font-medium">
{audioSampleRate === 32000 && "32 kHz"} {(() => {
{audioSampleRate === 44100 && "44.1 kHz"} const rateMap: Record<number, string> = {
{audioSampleRate === 48000 && "48 kHz"} 32000: "32 kHz",
{audioSampleRate === 96000 && "96 kHz"} 44100: "44.1 kHz",
{![32000, 44100, 48000, 96000].includes(audioSampleRate) && `${audioSampleRate} Hz`} 48000: "48 kHz",
96000: "96 kHz"
};
return rateMap[audioSampleRate] || `${audioSampleRate} Hz`;
})()}
<span className="text-xs text-gray-500 dark:text-gray-400 ml-2"> <span className="text-xs text-gray-500 dark:text-gray-400 ml-2">
(auto-detected from source) (auto-detected from source)
</span> </span>
@ -346,24 +316,15 @@ export default function SettingsAudioRoute() {
size="SM" size="SM"
value={String(audioPacketLossPerc)} value={String(audioPacketLossPerc)}
options={[ options={[
{ value: "0", label: `0%${0 === AUDIO_DEFAULTS.packetLossPerc ? m.audio_settings_default_lan_suffix() : m.audio_settings_no_compensation_suffix()}` }, { value: "0", label: `0%${m.audio_settings_no_compensation_suffix()}` },
{ value: "5", label: "5%" }, { value: "5", label: "5%" },
{ value: "10", label: "10%" }, { value: "10", label: "10%" },
{ value: "15", label: "15%" }, { value: "15", label: "15%" },
{ value: "20", label: "20%" }, { value: "20", label: `20%${20 === AUDIO_DEFAULTS.packetLossPerc ? m.audio_settings_default_suffix() : ''}` },
{ value: "25", label: "25%" }, { value: "25", label: "25%" },
{ value: "30", label: "30%" }, { value: "30", label: "30%" },
]} ]}
onChange={(e) => onChange={(e) => handleAudioConfigChange({ packetLossPerc: parseInt(e.target.value) })}
handleAudioConfigChange(
audioBitrate,
audioComplexity,
audioDTXEnabled,
audioFECEnabled,
audioBufferPeriods,
parseInt(e.target.value)
)
}
/> />
</SettingsItem> </SettingsItem>