diff --git a/.golangci.yml b/.golangci.yml index dd8a079..2191f18 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,4 +1,7 @@ version: "2" +run: + build-tags: + - nolint linters: enable: - forbidigo diff --git a/cloud.go b/cloud.go index ecb89b6..e2f1cd8 100644 --- a/cloud.go +++ b/cloud.go @@ -454,7 +454,7 @@ func handleSessionRequest( // Check if we have an existing session and handle renegotiation if currentSession != nil { scopedLogger.Info().Msg("handling renegotiation for existing session") - + // Handle renegotiation with existing session sd, err = currentSession.ExchangeOffer(req.Sd) if err != nil { diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index f65cba0..4956a42 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -1,3 +1,5 @@ +//go:build !nolint + package audio import ( @@ -54,7 +56,7 @@ int jetkvm_audio_read_encode(void *opus_buf) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *out = (unsigned char*)opus_buf; int pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size); - + // Handle ALSA errors with recovery if (pcm_rc < 0) { if (pcm_rc == -EPIPE) { @@ -70,12 +72,12 @@ int jetkvm_audio_read_encode(void *opus_buf) { return -1; } } - + // If we got fewer frames than expected, pad with silence if (pcm_rc < frame_size) { memset(&pcm_buffer[pcm_rc * channels], 0, (frame_size - pcm_rc) * channels * sizeof(short)); } - + int nb_bytes = opus_encode(encoder, pcm_buffer, frame_size, out, max_packet_size); return nb_bytes; } @@ -85,7 +87,7 @@ int jetkvm_audio_playback_init() { int err; snd_pcm_hw_params_t *params; if (pcm_playback_handle) return 0; - + // Try to open the USB gadget audio device for playback // This should correspond to the capture endpoint of the USB gadget if (snd_pcm_open(&pcm_playback_handle, "hw:1,0", SND_PCM_STREAM_PLAYBACK, 0) < 0) { @@ -93,7 +95,7 @@ int jetkvm_audio_playback_init() { if (snd_pcm_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK, 0) < 0) return -1; } - + snd_pcm_hw_params_malloc(¶ms); snd_pcm_hw_params_any(pcm_playback_handle, params); snd_pcm_hw_params_set_access(pcm_playback_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); @@ -104,11 +106,11 @@ int jetkvm_audio_playback_init() { snd_pcm_hw_params(pcm_playback_handle, params); snd_pcm_hw_params_free(params); snd_pcm_prepare(pcm_playback_handle); - + // Initialize Opus decoder decoder = opus_decoder_create(sample_rate, channels, &err); if (!decoder) return -2; - + return 0; } @@ -116,11 +118,11 @@ int jetkvm_audio_playback_init() { int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { short pcm_buffer[1920]; // max 2ch*960 unsigned char *in = (unsigned char*)opus_buf; - + // Decode Opus to PCM int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); if (pcm_frames < 0) return -1; - + // Write PCM to playback device int pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); if (pcm_rc < 0) { @@ -131,7 +133,7 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { } if (pcm_rc < 0) return -2; } - + return pcm_frames; } @@ -148,8 +150,6 @@ void jetkvm_audio_close() { */ import "C" - - // Go wrappers for initializing, starting, stopping, and controlling audio func cgoAudioInit() error { ret := C.jetkvm_audio_init() @@ -179,8 +179,6 @@ func cgoAudioReadEncode(buf []byte) (int, error) { return int(n), nil } - - // Go wrappers for audio playback (microphone input) func cgoAudioPlaybackInit() error { ret := C.jetkvm_audio_playback_init() @@ -206,8 +204,6 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) { return int(n), nil } - - // Wrapper functions for non-blocking audio manager func CGOAudioInit() error { return cgoAudioInit() diff --git a/internal/audio/cgo_audio_stub.go b/internal/audio/cgo_audio_stub.go index c1d142c..c66501a 100644 --- a/internal/audio/cgo_audio_stub.go +++ b/internal/audio/cgo_audio_stub.go @@ -28,4 +28,30 @@ func cgoAudioPlaybackClose() { func cgoAudioDecodeWrite(buf []byte) (int, error) { return 0, errors.New("audio not available in lint mode") -} \ No newline at end of file +} + +// Uppercase wrapper functions (called by nonblocking_audio.go) + +func CGOAudioInit() error { + return cgoAudioInit() +} + +func CGOAudioClose() { + cgoAudioClose() +} + +func CGOAudioReadEncode(buf []byte) (int, error) { + return cgoAudioReadEncode(buf) +} + +func CGOAudioPlaybackInit() error { + return cgoAudioPlaybackInit() +} + +func CGOAudioPlaybackClose() { + cgoAudioPlaybackClose() +} + +func CGOAudioDecodeWrite(buf []byte) (int, error) { + return cgoAudioDecodeWrite(buf) +} diff --git a/internal/audio/input.go b/internal/audio/input.go index f93d317..c51b929 100644 --- a/internal/audio/input.go +++ b/internal/audio/input.go @@ -11,7 +11,7 @@ import ( // AudioInputMetrics holds metrics for microphone input // Note: int64 fields must be 64-bit aligned for atomic operations on ARM type AudioInputMetrics struct { - FramesSent int64 // Must be first for alignment + FramesSent int64 // Must be first for alignment FramesDropped int64 BytesProcessed int64 ConnectionDrops int64 @@ -22,8 +22,8 @@ type AudioInputMetrics struct { // AudioInputManager manages microphone input stream from WebRTC to USB gadget type AudioInputManager struct { // metrics MUST be first for ARM32 alignment (contains int64 fields) - metrics AudioInputMetrics - + metrics AudioInputMetrics + inputBuffer chan []byte logger zerolog.Logger running int32 @@ -44,7 +44,7 @@ func (aim *AudioInputManager) Start() error { } aim.logger.Info().Msg("Starting audio input manager") - + // Start the non-blocking audio input stream err := StartNonBlockingAudioInput(aim.inputBuffer) if err != nil { @@ -62,11 +62,11 @@ func (aim *AudioInputManager) Stop() { } aim.logger.Info().Msg("Stopping audio input manager") - + // Stop the non-blocking audio input stream // Note: This is handled by the global non-blocking audio manager // Individual input streams are managed centrally - + // Drain the input buffer go func() { for { @@ -115,4 +115,4 @@ func (aim *AudioInputManager) GetMetrics() AudioInputMetrics { // IsRunning returns whether the audio input manager is running func (aim *AudioInputManager) IsRunning() bool { return atomic.LoadInt32(&aim.running) == 1 -} \ No newline at end of file +} diff --git a/internal/audio/nonblocking_api.go b/internal/audio/nonblocking_api.go index d91b645..1c3091c 100644 --- a/internal/audio/nonblocking_api.go +++ b/internal/audio/nonblocking_api.go @@ -14,11 +14,14 @@ func StartNonBlockingAudioStreaming(send func([]byte)) error { managerMutex.Lock() defer managerMutex.Unlock() - if globalNonBlockingManager != nil && globalNonBlockingManager.IsRunning() { - return ErrAudioAlreadyRunning + if globalNonBlockingManager != nil && globalNonBlockingManager.IsOutputRunning() { + return nil // Already running, this is not an error + } + + if globalNonBlockingManager == nil { + globalNonBlockingManager = NewNonBlockingAudioManager() } - globalNonBlockingManager = NewNonBlockingAudioManager() return globalNonBlockingManager.StartAudioOutput(send) } @@ -31,6 +34,11 @@ func StartNonBlockingAudioInput(receiveChan <-chan []byte) error { globalNonBlockingManager = NewNonBlockingAudioManager() } + // Check if input is already running to avoid unnecessary operations + if globalNonBlockingManager.IsInputRunning() { + return nil // Already running, this is not an error + } + return globalNonBlockingManager.StartAudioInput(receiveChan) } @@ -45,6 +53,16 @@ func StopNonBlockingAudioStreaming() { } } +// StopNonBlockingAudioInput stops only the audio input without affecting output +func StopNonBlockingAudioInput() { + managerMutex.Lock() + defer managerMutex.Unlock() + + if globalNonBlockingManager != nil && globalNonBlockingManager.IsInputRunning() { + globalNonBlockingManager.StopAudioInput() + } +} + // GetNonBlockingAudioStats returns statistics from the non-blocking audio system func GetNonBlockingAudioStats() NonBlockingAudioStats { managerMutex.Lock() @@ -62,4 +80,12 @@ func IsNonBlockingAudioRunning() bool { defer managerMutex.Unlock() return globalNonBlockingManager != nil && globalNonBlockingManager.IsRunning() -} \ No newline at end of file +} + +// IsNonBlockingAudioInputRunning returns true if the non-blocking audio input is running +func IsNonBlockingAudioInputRunning() bool { + managerMutex.Lock() + defer managerMutex.Unlock() + + return globalNonBlockingManager != nil && globalNonBlockingManager.IsInputRunning() +} diff --git a/internal/audio/nonblocking_audio.go b/internal/audio/nonblocking_audio.go index c0756d7..d0af2b8 100644 --- a/internal/audio/nonblocking_audio.go +++ b/internal/audio/nonblocking_audio.go @@ -23,14 +23,14 @@ type NonBlockingAudioManager struct { logger *zerolog.Logger // Audio output (capture from device, send to WebRTC) - outputSendFunc func([]byte) - outputWorkChan chan audioWorkItem + outputSendFunc func([]byte) + outputWorkChan chan audioWorkItem outputResultChan chan audioResult - // Audio input (receive from WebRTC, playback to device) + // Audio input (receive from WebRTC, playback to device) inputReceiveChan <-chan []byte - inputWorkChan chan audioWorkItem - inputResultChan chan audioResult + inputWorkChan chan audioWorkItem + inputResultChan chan audioResult // Worker threads and flags - int32 fields grouped together outputRunning int32 @@ -69,7 +69,7 @@ type NonBlockingAudioStats struct { InputFramesDropped int64 WorkerErrors int64 // time.Time is int64 internally, so it's also aligned - LastProcessTime time.Time + LastProcessTime time.Time } // NewNonBlockingAudioManager creates a new non-blocking audio manager @@ -81,8 +81,8 @@ func NewNonBlockingAudioManager() *NonBlockingAudioManager { ctx: ctx, cancel: cancel, logger: &logger, - outputWorkChan: make(chan audioWorkItem, 10), // Buffer for work items - outputResultChan: make(chan audioResult, 10), // Buffer for results + outputWorkChan: make(chan audioWorkItem, 10), // Buffer for work items + outputResultChan: make(chan audioResult, 10), // Buffer for results inputWorkChan: make(chan audioWorkItem, 10), inputResultChan: make(chan audioResult, 10), } @@ -327,7 +327,7 @@ func (nam *NonBlockingAudioManager) inputCoordinatorThread() { return case frame := <-nam.inputReceiveChan: - if frame == nil || len(frame) == 0 { + if len(frame) == 0 { continue } @@ -397,6 +397,16 @@ func (nam *NonBlockingAudioManager) Stop() { nam.logger.Info().Msg("non-blocking audio manager stopped") } +// StopAudioInput stops only the audio input operations +func (nam *NonBlockingAudioManager) StopAudioInput() { + nam.logger.Info().Msg("stopping audio input") + + // Stop only the input coordinator + atomic.StoreInt32(&nam.inputRunning, 0) + + nam.logger.Info().Msg("audio input stopped") +} + // GetStats returns current statistics func (nam *NonBlockingAudioManager) GetStats() NonBlockingAudioStats { return NonBlockingAudioStats{ @@ -412,4 +422,14 @@ func (nam *NonBlockingAudioManager) GetStats() NonBlockingAudioStats { // IsRunning returns true if any audio operations are running func (nam *NonBlockingAudioManager) IsRunning() bool { return atomic.LoadInt32(&nam.outputRunning) == 1 || atomic.LoadInt32(&nam.inputRunning) == 1 -} \ No newline at end of file +} + +// IsInputRunning returns true if audio input is running +func (nam *NonBlockingAudioManager) IsInputRunning() bool { + return atomic.LoadInt32(&nam.inputRunning) == 1 +} + +// IsOutputRunning returns true if audio output is running +func (nam *NonBlockingAudioManager) IsOutputRunning() bool { + return atomic.LoadInt32(&nam.outputRunning) == 1 +} diff --git a/jsonrpc.go b/jsonrpc.go index b8ecfb0..d79e10e 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -21,8 +21,8 @@ import ( // Mouse event processing with single worker var ( - mouseEventChan = make(chan mouseEventData, 100) // Buffered channel for mouse events - mouseWorkerOnce sync.Once + mouseEventChan = make(chan mouseEventData, 100) // Buffered channel for mouse events + mouseWorkerOnce sync.Once ) type mouseEventData struct { @@ -35,15 +35,15 @@ func startMouseWorker() { go func() { ticker := time.NewTicker(16 * time.Millisecond) // ~60 FPS defer ticker.Stop() - + var latestMouseEvent *mouseEventData - + for { select { case event := <-mouseEventChan: // Always keep the latest mouse event latestMouseEvent = &event - + case <-ticker.C: // Process the latest mouse event at regular intervals if latestMouseEvent != nil { @@ -68,7 +68,7 @@ func onRPCMessageThrottled(message webrtc.DataChannelMessage, session *Session) if isMouseEvent(request.Method) { // Start the mouse worker if not already started mouseWorkerOnce.Do(startMouseWorker) - + // Send to mouse worker (non-blocking) select { case mouseEventChan <- mouseEventData{message: message, session: session}: diff --git a/main.go b/main.go index f2d327a..b610757 100644 --- a/main.go +++ b/main.go @@ -155,7 +155,7 @@ func Main() { signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) <-sigs logger.Info().Msg("JetKVM Shutting Down") - + // Stop non-blocking audio manager audio.StopNonBlockingAudioStreaming() //if fuseServer != nil { diff --git a/native_notlinux.go b/native_notlinux.go index baadf34..b8dbd11 100644 --- a/native_notlinux.go +++ b/native_notlinux.go @@ -13,4 +13,4 @@ func startNativeBinary(binaryPath string) (*exec.Cmd, error) { func ExtractAndRunNativeBin() error { return fmt.Errorf("ExtractAndRunNativeBin is only supported on Linux") -} \ No newline at end of file +} diff --git a/native_shared.go b/native_shared.go index f7784f0..202348b 100644 --- a/native_shared.go +++ b/native_shared.go @@ -8,6 +8,7 @@ import ( "io" "net" "os" + "runtime" "strings" "sync" "time" @@ -165,6 +166,10 @@ func StartNativeVideoSocketServer() { } func handleCtrlClient(conn net.Conn) { + // Lock to OS thread to isolate blocking socket I/O + runtime.LockOSThread() + defer runtime.UnlockOSThread() + defer conn.Close() scopedLogger := nativeLogger.With(). @@ -172,7 +177,7 @@ func handleCtrlClient(conn net.Conn) { Str("type", "ctrl"). Logger() - scopedLogger.Info().Msg("native ctrl socket client connected") + scopedLogger.Info().Msg("native ctrl socket client connected (OS thread locked)") if ctrlSocketConn != nil { scopedLogger.Debug().Msg("closing existing native socket connection") ctrlSocketConn.Close() @@ -216,6 +221,10 @@ func handleCtrlClient(conn net.Conn) { } func handleVideoClient(conn net.Conn) { + // Lock to OS thread to isolate blocking video I/O + runtime.LockOSThread() + defer runtime.UnlockOSThread() + defer conn.Close() scopedLogger := nativeLogger.With(). @@ -223,7 +232,7 @@ func handleVideoClient(conn net.Conn) { Str("type", "video"). Logger() - scopedLogger.Info().Msg("native video socket client connected") + scopedLogger.Info().Msg("native video socket client connected (OS thread locked)") inboundPacket := make([]byte, maxVideoFrameSize) lastFrame := time.Now() @@ -277,6 +286,10 @@ func GetNativeVersion() (string, error) { } func ensureBinaryUpdated(destPath string) error { + // Lock to OS thread for file I/O operations + runtime.LockOSThread() + defer runtime.UnlockOSThread() + srcFile, err := resource.ResourceFS.Open("jetkvm_native") if err != nil { return err diff --git a/serial.go b/serial.go index 5439d13..91e1369 100644 --- a/serial.go +++ b/serial.go @@ -3,6 +3,7 @@ package kvm import ( "bufio" "io" + "runtime" "strconv" "strings" "time" @@ -141,6 +142,10 @@ func unmountDCControl() error { var dcState DCPowerState func runDCControl() { + // Lock to OS thread to isolate DC control serial I/O + runtime.LockOSThread() + defer runtime.UnlockOSThread() + scopedLogger := serialLogger.With().Str("service", "dc_control").Logger() reader := bufio.NewReader(port) hasRestoreFeature := false @@ -290,6 +295,10 @@ func handleSerialChannel(d *webrtc.DataChannel) { d.OnOpen(func() { go func() { + // Lock to OS thread to isolate serial I/O + runtime.LockOSThread() + defer runtime.UnlockOSThread() + buf := make([]byte, 1024) for { n, err := port.Read(buf) diff --git a/terminal.go b/terminal.go index e06e5cd..24622df 100644 --- a/terminal.go +++ b/terminal.go @@ -6,6 +6,7 @@ import ( "io" "os" "os/exec" + "runtime" "github.com/creack/pty" "github.com/pion/webrtc/v4" @@ -33,6 +34,10 @@ func handleTerminalChannel(d *webrtc.DataChannel) { } go func() { + // Lock to OS thread to isolate PTY I/O + runtime.LockOSThread() + defer runtime.UnlockOSThread() + buf := make([]byte, 1024) for { n, err := ptmx.Read(buf) diff --git a/ui/src/components/ActionBar.tsx b/ui/src/components/ActionBar.tsx index d2fd1ea..a3edc5e 100644 --- a/ui/src/components/ActionBar.tsx +++ b/ui/src/components/ActionBar.tsx @@ -37,6 +37,10 @@ interface MicrophoneHookReturn { stopMicrophone: () => Promise<{ success: boolean; error?: MicrophoneError }>; toggleMicrophoneMute: () => Promise<{ success: boolean; error?: MicrophoneError }>; syncMicrophoneState: () => Promise; + // Loading states + isStarting: boolean; + isStopping: boolean; + isToggling: boolean; } export default function Actionbar({ diff --git a/ui/src/components/WebRTCVideo.tsx b/ui/src/components/WebRTCVideo.tsx index 0c83065..0c7b237 100644 --- a/ui/src/components/WebRTCVideo.tsx +++ b/ui/src/components/WebRTCVideo.tsx @@ -40,6 +40,10 @@ interface MicrophoneHookReturn { stopMicrophone: () => Promise<{ success: boolean; error?: MicrophoneError }>; toggleMicrophoneMute: () => Promise<{ success: boolean; error?: MicrophoneError }>; syncMicrophoneState: () => Promise; + // Loading states + isStarting: boolean; + isStopping: boolean; + isToggling: boolean; } interface WebRTCVideoProps { diff --git a/ui/src/components/popovers/AudioControlPopover.tsx b/ui/src/components/popovers/AudioControlPopover.tsx index b8bcdca..a55b57c 100644 --- a/ui/src/components/popovers/AudioControlPopover.tsx +++ b/ui/src/components/popovers/AudioControlPopover.tsx @@ -26,6 +26,10 @@ interface MicrophoneHookReturn { stopMicrophone: () => Promise<{ success: boolean; error?: MicrophoneError }>; toggleMicrophoneMute: () => Promise<{ success: boolean; error?: MicrophoneError }>; syncMicrophoneState: () => Promise; + // Loading states + isStarting: boolean; + isStopping: boolean; + isToggling: boolean; } interface AudioConfig { @@ -76,6 +80,10 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP const [isLoading, setIsLoading] = useState(false); const [isConnected, setIsConnected] = useState(false); + // Add cooldown to prevent rapid clicking + const [lastClickTime, setLastClickTime] = useState(0); + const CLICK_COOLDOWN = 500; // 500ms cooldown between clicks + // Microphone state from props const { isMicrophoneActive, @@ -85,9 +93,12 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP stopMicrophone, toggleMicrophoneMute, syncMicrophoneState, + // Loading states + isStarting, + isStopping, + isToggling, } = microphone; const [microphoneMetrics, setMicrophoneMetrics] = useState(null); - const [isMicrophoneLoading, setIsMicrophoneLoading] = useState(false); // Audio level monitoring const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream); @@ -210,7 +221,6 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP }; const handleMicrophoneQualityChange = async (quality: number) => { - setIsMicrophoneLoading(true); try { const resp = await api.POST("/microphone/quality", { quality }); if (resp.ok) { @@ -219,13 +229,20 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP } } catch (error) { console.error("Failed to change microphone quality:", error); - } finally { - setIsMicrophoneLoading(false); } }; const handleToggleMicrophone = async () => { - setIsMicrophoneLoading(true); + const now = Date.now(); + + // Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click + if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) { + console.log("Microphone operation already in progress or within cooldown, ignoring click"); + return; + } + + setLastClickTime(now); + try { const result = isMicrophoneActive ? await stopMicrophone() : await startMicrophone(selectedInputDevice); if (!result.success && result.error) { @@ -234,13 +251,20 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP } catch (error) { console.error("Failed to toggle microphone:", error); notifications.error("An unexpected error occurred"); - } finally { - setIsMicrophoneLoading(false); } }; const handleToggleMicrophoneMute = async () => { - setIsMicrophoneLoading(true); + const now = Date.now(); + + // Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click + if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) { + console.log("Microphone operation already in progress or within cooldown, ignoring mute toggle"); + return; + } + + setLastClickTime(now); + try { const result = await toggleMicrophoneMute(); if (!result.success && result.error) { @@ -249,8 +273,6 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP } catch (error) { console.error("Failed to toggle microphone mute:", error); notifications.error("Failed to toggle microphone mute"); - } finally { - setIsMicrophoneLoading(false); } }; @@ -260,7 +282,6 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP // If microphone is currently active, restart it with the new device if (isMicrophoneActive) { - setIsMicrophoneLoading(true); try { // Stop current microphone await stopMicrophone(); @@ -269,8 +290,9 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP if (!result.success && result.error) { notifications.error(result.error.message); } - } finally { - setIsMicrophoneLoading(false); + } catch (error) { + console.error("Failed to change microphone device:", error); + notifications.error("Failed to change microphone device"); } } }; @@ -377,17 +399,26 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP