diff --git a/audio.go b/audio.go index b3bb0f2f..1f6c4163 100644 --- a/audio.go +++ b/audio.go @@ -5,6 +5,7 @@ import ( "io" "sync" "sync/atomic" + "time" "github.com/jetkvm/kvm/internal/audio" "github.com/jetkvm/kvm/internal/logging" @@ -249,8 +250,7 @@ func setPendingInputTrack(track *webrtc.TrackRemote) { go handleInputTrackForSession(track) } -// SetAudioOutputEnabled enables or disables audio output capture. -// When enabling, blocks up to 5 seconds waiting for audio to start. +// SetAudioOutputEnabled blocks up to 5 seconds when enabling. // Returns error if audio fails to start within timeout. func SetAudioOutputEnabled(enabled bool) error { if audioOutputEnabled.Swap(enabled) == enabled { @@ -282,8 +282,7 @@ func SetAudioOutputEnabled(enabled bool) error { return nil } -// SetAudioInputEnabled enables or disables audio input playback. -// When enabling, blocks up to 5 seconds waiting for audio to start. +// SetAudioInputEnabled blocks up to 5 seconds when enabling. // Returns error if audio fails to start within timeout. func SetAudioInputEnabled(enabled bool) error { if audioInputEnabled.Swap(enabled) == enabled { diff --git a/internal/audio/c/audio.c b/internal/audio/c/audio.c index f736054b..33ef4fb8 100644 --- a/internal/audio/c/audio.c +++ b/internal/audio/c/audio.c @@ -54,7 +54,7 @@ static snd_pcm_t *pcm_playback_handle = NULL; // INPUT: Client microphone → de static const char *alsa_capture_device = NULL; static const char *alsa_playback_device = NULL; -static bool capture_channels_swapped = false; // True if hardware reports R,L instead of L,R +static bool capture_channels_swapped = false; static OpusEncoder *encoder = NULL; static OpusDecoder *decoder = NULL; @@ -104,11 +104,9 @@ static uint32_t max_backoff_us_global = 500000; static atomic_int capture_stop_requested = 0; static atomic_int playback_stop_requested = 0; -// Mutexes to protect concurrent access to ALSA handles and codecs throughout their lifecycle -// These prevent race conditions when jetkvm_audio_*_close() is called while -// jetkvm_audio_read_encode() or jetkvm_audio_decode_write() are executing. -// The mutexes protect initialization, cleanup, ALSA I/O, codec operations, and handle validation -// to ensure handles remain valid from acquisition through release. +// Mutexes protect handle lifecycle and codec operations, NOT the ALSA I/O itself. +// The mutex is temporarily released during snd_pcm_readi/writei to prevent blocking. +// Race conditions are detected via handle pointer comparison after reacquiring the lock. static pthread_mutex_t capture_mutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t playback_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -187,9 +185,6 @@ static void init_alsa_devices_from_env(void) { // SIMD-OPTIMIZED BUFFER OPERATIONS (ARM NEON) -/** - * Clear audio buffer using NEON (16 samples/iteration with 2x unrolling) - */ static inline void simd_clear_samples_s16(short * __restrict__ buffer, uint32_t samples) { const int16x8_t zero = vdupq_n_s16(0); uint32_t i = 0; @@ -293,11 +288,6 @@ static unsigned int get_hdmi_audio_sample_rate(void) { return detected_rate; } -/** - * Open ALSA device with exponential backoff retry - * @return 0 on success, negative error code on failure - */ -// High-precision sleep using nanosleep static inline void precise_sleep_us(uint32_t microseconds) { struct timespec ts = { .tv_sec = microseconds / 1000000, @@ -806,11 +796,6 @@ int jetkvm_audio_capture_init() { return 0; } -/** - * Read HDMI audio, resample with SpeexDSP, encode to Opus (OUTPUT path hot function) - * @param opus_buf Output buffer for encoded Opus packet - * @return >0 = Opus packet size in bytes, -1 = error - */ __attribute__((hot)) int jetkvm_audio_read_encode(void * __restrict__ opus_buf) { // Two buffers: hardware buffer + resampled buffer (at 48kHz) static short CACHE_ALIGN pcm_hw_buffer[MAX_HARDWARE_FRAME_SIZE * 2]; // Max hardware rate * stereo @@ -1007,13 +992,6 @@ int jetkvm_audio_playback_init() { return 0; } -/** - * Decode Opus, write to device speakers (INPUT path hot function) - * Processing pipeline: Opus decode (with FEC) → ALSA playback with error recovery - * @param opus_buf Encoded Opus packet from client - * @param opus_size Size of Opus packet in bytes - * @return >0 = PCM frames written, 0 = frame skipped, -1/-2 = error - */ __attribute__((hot)) int jetkvm_audio_decode_write(void * __restrict__ opus_buf, int32_t opus_size) { static short CACHE_ALIGN pcm_buffer[960 * 2]; // Cache-aligned unsigned char * __restrict__ in = (unsigned char*)opus_buf; diff --git a/internal/audio/cgo_source.go b/internal/audio/cgo_source.go index 60a7ac7f..0fb130c3 100644 --- a/internal/audio/cgo_source.go +++ b/internal/audio/cgo_source.go @@ -83,8 +83,6 @@ func (c *CgoSource) Connect() error { func (c *CgoSource) connectOutput() error { os.Setenv("ALSA_CAPTURE_DEVICE", c.alsaDevice) - // Opus uses fixed 48kHz sample rate (RFC 7587) - // SpeexDSP handles any hardware rate conversion const sampleRate = 48000 const frameSize = uint16(sampleRate * 20 / 1000) // 20ms frames diff --git a/internal/audio/relay.go b/internal/audio/relay.go index 92555441..8b42de62 100644 --- a/internal/audio/relay.go +++ b/internal/audio/relay.go @@ -77,7 +77,6 @@ func (r *OutputRelay) relayLoop() { consecutiveWriteFailures := 0 for r.running.Load() { - // Connect if not connected if !(*r.source).IsConnected() { if err := (*r.source).Connect(); err != nil { if consecutiveFailures++; consecutiveFailures >= maxRetries { @@ -93,7 +92,6 @@ func (r *OutputRelay) relayLoop() { retryDelay = 1 * time.Second } - // Read message from source msgType, payload, err := (*r.source).ReadMessage() if err != nil { if !r.running.Load() { @@ -110,11 +108,9 @@ func (r *OutputRelay) relayLoop() { continue } - // Reset retry state on successful read consecutiveFailures = 0 retryDelay = 1 * time.Second - // Write audio sample to WebRTC if msgType == ipcMsgTypeOpus && len(payload) > 0 { r.sample.Data = payload if err := r.audioTrack.WriteSample(r.sample); err != nil { @@ -129,7 +125,6 @@ func (r *OutputRelay) relayLoop() { Msg("Failed to write sample to WebRTC") } - // If too many consecutive write failures, reconnect source if consecutiveWriteFailures >= maxConsecutiveWriteFailures { r.logger.Error(). Int("failures", consecutiveWriteFailures). @@ -140,7 +135,7 @@ func (r *OutputRelay) relayLoop() { } } else { r.framesRelayed.Add(1) - consecutiveWriteFailures = 0 // Reset on successful write + consecutiveWriteFailures = 0 } } }