diff --git a/input_rpc.go b/input_rpc.go index 23d60fe..1981a08 100644 --- a/input_rpc.go +++ b/input_rpc.go @@ -14,7 +14,7 @@ const ( // Input RPC Direct Handlers // This module provides optimized direct handlers for high-frequency input events, // bypassing the reflection-based RPC system for improved performance. -// +// // Performance benefits: // - Eliminates reflection overhead (~2-3ms per call) // - Reduces memory allocations @@ -214,4 +214,4 @@ func isInputMethod(method string) bool { default: return false } -} \ No newline at end of file +} diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index f5367a9..c77739a 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -39,7 +39,7 @@ static volatile int playback_initialized = 0; static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) { int attempts = 3; int err; - + while (attempts-- > 0) { err = snd_pcm_open(handle, device, stream, SND_PCM_NONBLOCK); if (err >= 0) { @@ -47,7 +47,7 @@ static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream snd_pcm_nonblock(*handle, 0); return 0; } - + if (err == -EBUSY && attempts > 0) { // Device busy, wait and retry usleep(50000); // 50ms @@ -63,26 +63,26 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) { snd_pcm_hw_params_t *params; snd_pcm_sw_params_t *sw_params; int err; - + if (!handle) return -1; - + // Use stack allocation for better performance snd_pcm_hw_params_alloca(¶ms); snd_pcm_sw_params_alloca(&sw_params); - + // Hardware parameters err = snd_pcm_hw_params_any(handle, params); if (err < 0) return err; - + err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); if (err < 0) return err; - + err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE); if (err < 0) return err; - + err = snd_pcm_hw_params_set_channels(handle, params, channels); if (err < 0) return err; - + // Set exact rate for better performance err = snd_pcm_hw_params_set_rate(handle, params, sample_rate, 0); if (err < 0) { @@ -91,70 +91,70 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) { err = snd_pcm_hw_params_set_rate_near(handle, params, &rate, 0); if (err < 0) return err; } - + // Optimize buffer sizes for low latency snd_pcm_uframes_t period_size = frame_size; err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0); if (err < 0) return err; - + // Set buffer size to 4 periods for good latency/stability balance snd_pcm_uframes_t buffer_size = period_size * 4; err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size); if (err < 0) return err; - + err = snd_pcm_hw_params(handle, params); if (err < 0) return err; - + // Software parameters for optimal performance err = snd_pcm_sw_params_current(handle, sw_params); if (err < 0) return err; - + // Start playback/capture when buffer is period_size frames err = snd_pcm_sw_params_set_start_threshold(handle, sw_params, period_size); if (err < 0) return err; - + // Allow transfers when at least period_size frames are available err = snd_pcm_sw_params_set_avail_min(handle, sw_params, period_size); if (err < 0) return err; - + err = snd_pcm_sw_params(handle, sw_params); if (err < 0) return err; - + return snd_pcm_prepare(handle); } // Initialize ALSA and Opus encoder with improved safety int jetkvm_audio_init() { int err; - + // Prevent concurrent initialization if (__sync_bool_compare_and_swap(&capture_initializing, 0, 1) == 0) { return -EBUSY; // Already initializing } - + // Check if already initialized if (capture_initialized) { capture_initializing = 0; return 0; } - + // Clean up any existing resources first - if (encoder) { - opus_encoder_destroy(encoder); - encoder = NULL; + if (encoder) { + opus_encoder_destroy(encoder); + encoder = NULL; } - if (pcm_handle) { - snd_pcm_close(pcm_handle); - pcm_handle = NULL; + if (pcm_handle) { + snd_pcm_close(pcm_handle); + pcm_handle = NULL; } - + // Try to open ALSA capture device err = safe_alsa_open(&pcm_handle, "hw:1,0", SND_PCM_STREAM_CAPTURE); if (err < 0) { capture_initializing = 0; return -1; } - + // Configure the device err = configure_alsa_device(pcm_handle, "capture"); if (err < 0) { @@ -163,7 +163,7 @@ int jetkvm_audio_init() { capture_initializing = 0; return -1; } - + // Initialize Opus encoder int opus_err = 0; encoder = opus_encoder_create(sample_rate, channels, OPUS_APPLICATION_AUDIO, &opus_err); @@ -172,10 +172,10 @@ int jetkvm_audio_init() { capture_initializing = 0; return -2; } - + opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)); opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)); - + capture_initialized = 1; capture_initializing = 0; return 0; @@ -186,21 +186,21 @@ int jetkvm_audio_read_encode(void *opus_buf) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *out = (unsigned char*)opus_buf; int err = 0; - + // Safety checks if (!capture_initialized || !pcm_handle || !encoder || !opus_buf) { return -1; } - + int pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); - + // Handle ALSA errors with enhanced recovery if (pcm_rc < 0) { if (pcm_rc == -EPIPE) { // Buffer underrun - try to recover err = snd_pcm_prepare(pcm_handle); if (err < 0) return -1; - + pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); if (pcm_rc < 0) return -1; } else if (pcm_rc == -EAGAIN) { @@ -221,12 +221,12 @@ int jetkvm_audio_read_encode(void *opus_buf) { return -1; } } - + // If we got fewer frames than expected, pad with silence if (pcm_rc < frame_size) { memset(&pcm_buffer[pcm_rc * channels], 0, (frame_size - pcm_rc) * channels * sizeof(short)); } - + int nb_bytes = opus_encode(encoder, pcm_buffer, frame_size, out, max_packet_size); return nb_bytes; } @@ -234,28 +234,28 @@ int jetkvm_audio_read_encode(void *opus_buf) { // Initialize ALSA playback with improved safety int jetkvm_audio_playback_init() { int err; - + // Prevent concurrent initialization if (__sync_bool_compare_and_swap(&playback_initializing, 0, 1) == 0) { return -EBUSY; // Already initializing } - + // Check if already initialized if (playback_initialized) { playback_initializing = 0; return 0; } - + // Clean up any existing resources first - if (decoder) { - opus_decoder_destroy(decoder); - decoder = NULL; + if (decoder) { + opus_decoder_destroy(decoder); + decoder = NULL; } - if (pcm_playback_handle) { - snd_pcm_close(pcm_playback_handle); - pcm_playback_handle = NULL; + if (pcm_playback_handle) { + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; } - + // Try to open the USB gadget audio device for playback err = safe_alsa_open(&pcm_playback_handle, "hw:1,0", SND_PCM_STREAM_PLAYBACK); if (err < 0) { @@ -266,7 +266,7 @@ int jetkvm_audio_playback_init() { return -1; } } - + // Configure the device err = configure_alsa_device(pcm_playback_handle, "playback"); if (err < 0) { @@ -275,7 +275,7 @@ int jetkvm_audio_playback_init() { playback_initializing = 0; return -1; } - + // Initialize Opus decoder int opus_err = 0; decoder = opus_decoder_create(sample_rate, channels, &opus_err); @@ -285,7 +285,7 @@ int jetkvm_audio_playback_init() { playback_initializing = 0; return -2; } - + playback_initialized = 1; playback_initializing = 0; return 0; @@ -296,21 +296,21 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *in = (unsigned char*)opus_buf; int err = 0; - + // Safety checks if (!playback_initialized || !pcm_playback_handle || !decoder || !opus_buf || opus_size <= 0) { return -1; } - + // Additional bounds checking if (opus_size > max_packet_size) { return -1; } - + // Decode Opus to PCM int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); if (pcm_frames < 0) return -1; - + // Write PCM to playback device with enhanced recovery int pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); if (pcm_rc < 0) { @@ -318,7 +318,7 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { // Buffer underrun - try to recover err = snd_pcm_prepare(pcm_playback_handle); if (err < 0) return -2; - + pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); } else if (pcm_rc == -ESTRPIPE) { // Device suspended, try to resume @@ -333,7 +333,7 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { } if (pcm_rc < 0) return -2; } - + return pcm_frames; } @@ -343,20 +343,20 @@ void jetkvm_audio_playback_close() { while (playback_initializing) { usleep(1000); // 1ms } - + // Atomic check and set to prevent double cleanup if (__sync_bool_compare_and_swap(&playback_initialized, 1, 0) == 0) { return; // Already cleaned up } - - if (decoder) { - opus_decoder_destroy(decoder); - decoder = NULL; + + if (decoder) { + opus_decoder_destroy(decoder); + decoder = NULL; } - if (pcm_playback_handle) { + if (pcm_playback_handle) { snd_pcm_drain(pcm_playback_handle); - snd_pcm_close(pcm_playback_handle); - pcm_playback_handle = NULL; + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; } } @@ -366,19 +366,19 @@ void jetkvm_audio_close() { while (capture_initializing) { usleep(1000); // 1ms } - + capture_initialized = 0; - - if (encoder) { - opus_encoder_destroy(encoder); - encoder = NULL; + + if (encoder) { + opus_encoder_destroy(encoder); + encoder = NULL; } - if (pcm_handle) { + if (pcm_handle) { snd_pcm_drop(pcm_handle); // Drop pending samples - snd_pcm_close(pcm_handle); - pcm_handle = NULL; + snd_pcm_close(pcm_handle); + pcm_handle = NULL; } - + // Also clean up playback jetkvm_audio_playback_close(); } @@ -387,15 +387,15 @@ import "C" // Optimized Go wrappers with reduced overhead var ( - errAudioInitFailed = errors.New("failed to init ALSA/Opus") - errBufferTooSmall = errors.New("buffer too small") - errAudioReadEncode = errors.New("audio read/encode error") - errAudioDecodeWrite = errors.New("audio decode/write error") - errAudioPlaybackInit = errors.New("failed to init ALSA playback/Opus decoder") - errEmptyBuffer = errors.New("empty buffer") - errNilBuffer = errors.New("nil buffer") - errBufferTooLarge = errors.New("buffer too large") - errInvalidBufferPtr = errors.New("invalid buffer pointer") + errAudioInitFailed = errors.New("failed to init ALSA/Opus") + errBufferTooSmall = errors.New("buffer too small") + errAudioReadEncode = errors.New("audio read/encode error") + errAudioDecodeWrite = errors.New("audio decode/write error") + errAudioPlaybackInit = errors.New("failed to init ALSA playback/Opus decoder") + errEmptyBuffer = errors.New("empty buffer") + errNilBuffer = errors.New("nil buffer") + errBufferTooLarge = errors.New("buffer too large") + errInvalidBufferPtr = errors.New("invalid buffer pointer") ) func cgoAudioInit() error { @@ -416,7 +416,7 @@ func cgoAudioReadEncode(buf []byte) (int, error) { if len(buf) < 1276 { return 0, errBufferTooSmall } - + n := C.jetkvm_audio_read_encode(unsafe.Pointer(&buf[0])) if n < 0 { return 0, errAudioReadEncode @@ -449,18 +449,18 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { if buf == nil { return 0, errors.New("nil buffer") } - + // Validate buffer size to prevent potential overruns if len(buf) > 4096 { // Maximum reasonable Opus frame size return 0, errors.New("buffer too large") } - + // Ensure buffer is not deallocated by keeping a reference bufPtr := unsafe.Pointer(&buf[0]) if bufPtr == nil { return 0, errors.New("invalid buffer pointer") } - + // Add recovery mechanism for C function crashes defer func() { if r := recover(); r != nil { @@ -469,7 +469,7 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { _ = r // Explicitly ignore the panic value } }() - + n := C.jetkvm_audio_decode_write(bufPtr, C.int(len(buf))) if n < 0 { return 0, errors.New("audio decode/write error") @@ -479,10 +479,10 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { // Wrapper functions for non-blocking audio manager var ( - CGOAudioInit = cgoAudioInit - CGOAudioClose = cgoAudioClose - CGOAudioReadEncode = cgoAudioReadEncode - CGOAudioPlaybackInit = cgoAudioPlaybackInit - CGOAudioPlaybackClose = cgoAudioPlaybackClose - CGOAudioDecodeWrite = cgoAudioDecodeWrite + CGOAudioInit = cgoAudioInit + CGOAudioClose = cgoAudioClose + CGOAudioReadEncode = cgoAudioReadEncode + CGOAudioPlaybackInit = cgoAudioPlaybackInit + CGOAudioPlaybackClose = cgoAudioPlaybackClose + CGOAudioDecodeWrite = cgoAudioDecodeWrite ) diff --git a/main.go b/main.go index 4d7ba69..7dbd080 100644 --- a/main.go +++ b/main.go @@ -14,10 +14,10 @@ import ( ) var ( - appCtx context.Context - isAudioServer bool + appCtx context.Context + isAudioServer bool audioProcessDone chan struct{} - audioSupervisor *audio.AudioServerSupervisor + audioSupervisor *audio.AudioServerSupervisor ) func runAudioServer() { @@ -68,7 +68,7 @@ func startAudioSubprocess() error { // onProcessStart func(pid int) { logger.Info().Int("pid", pid).Msg("audio server process started") - + // Start audio relay system for main process without a track initially // The track will be updated when a WebRTC session is created if err := audio.StartAudioRelay(nil); err != nil { @@ -82,7 +82,7 @@ func startAudioSubprocess() error { } else { logger.Info().Int("pid", pid).Msg("audio server process exited gracefully") } - + // Stop audio relay when process exits audio.StopAudioRelay() }, @@ -100,12 +100,12 @@ func startAudioSubprocess() error { // Monitor supervisor and handle cleanup go func() { defer close(audioProcessDone) - + // Wait for supervisor to stop for audioSupervisor.IsRunning() { time.Sleep(100 * time.Millisecond) } - + logger.Info().Msg("audio supervisor stopped") }()