Improvements, Fixes: reduce mouse lag when audio is on

This commit is contained in:
Alex P 2025-08-04 23:25:24 +03:00
parent 3dc196bab5
commit 3444607021
19 changed files with 421 additions and 123 deletions

View File

@ -1,4 +1,7 @@
version: "2"
run:
build-tags:
- nolint
linters:
enable:
- forbidigo

View File

@ -454,7 +454,7 @@ func handleSessionRequest(
// Check if we have an existing session and handle renegotiation
if currentSession != nil {
scopedLogger.Info().Msg("handling renegotiation for existing session")
// Handle renegotiation with existing session
sd, err = currentSession.ExchangeOffer(req.Sd)
if err != nil {

View File

@ -1,3 +1,5 @@
//go:build !nolint
package audio
import (
@ -54,7 +56,7 @@ int jetkvm_audio_read_encode(void *opus_buf) {
short pcm_buffer[1920]; // max 2ch*960
unsigned char *out = (unsigned char*)opus_buf;
int pcm_rc = snd_pcm_readi(pcm_handle, pcm_buffer, frame_size);
// Handle ALSA errors with recovery
if (pcm_rc < 0) {
if (pcm_rc == -EPIPE) {
@ -70,12 +72,12 @@ int jetkvm_audio_read_encode(void *opus_buf) {
return -1;
}
}
// If we got fewer frames than expected, pad with silence
if (pcm_rc < frame_size) {
memset(&pcm_buffer[pcm_rc * channels], 0, (frame_size - pcm_rc) * channels * sizeof(short));
}
int nb_bytes = opus_encode(encoder, pcm_buffer, frame_size, out, max_packet_size);
return nb_bytes;
}
@ -85,7 +87,7 @@ int jetkvm_audio_playback_init() {
int err;
snd_pcm_hw_params_t *params;
if (pcm_playback_handle) return 0;
// Try to open the USB gadget audio device for playback
// This should correspond to the capture endpoint of the USB gadget
if (snd_pcm_open(&pcm_playback_handle, "hw:1,0", SND_PCM_STREAM_PLAYBACK, 0) < 0) {
@ -93,7 +95,7 @@ int jetkvm_audio_playback_init() {
if (snd_pcm_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK, 0) < 0)
return -1;
}
snd_pcm_hw_params_malloc(&params);
snd_pcm_hw_params_any(pcm_playback_handle, params);
snd_pcm_hw_params_set_access(pcm_playback_handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
@ -104,11 +106,11 @@ int jetkvm_audio_playback_init() {
snd_pcm_hw_params(pcm_playback_handle, params);
snd_pcm_hw_params_free(params);
snd_pcm_prepare(pcm_playback_handle);
// Initialize Opus decoder
decoder = opus_decoder_create(sample_rate, channels, &err);
if (!decoder) return -2;
return 0;
}
@ -116,11 +118,11 @@ int jetkvm_audio_playback_init() {
int jetkvm_audio_decode_write(void *opus_buf, int opus_size) {
short pcm_buffer[1920]; // max 2ch*960
unsigned char *in = (unsigned char*)opus_buf;
// Decode Opus to PCM
int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0);
if (pcm_frames < 0) return -1;
// Write PCM to playback device
int pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames);
if (pcm_rc < 0) {
@ -131,7 +133,7 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) {
}
if (pcm_rc < 0) return -2;
}
return pcm_frames;
}
@ -148,8 +150,6 @@ void jetkvm_audio_close() {
*/
import "C"
// Go wrappers for initializing, starting, stopping, and controlling audio
func cgoAudioInit() error {
ret := C.jetkvm_audio_init()
@ -179,8 +179,6 @@ func cgoAudioReadEncode(buf []byte) (int, error) {
return int(n), nil
}
// Go wrappers for audio playback (microphone input)
func cgoAudioPlaybackInit() error {
ret := C.jetkvm_audio_playback_init()
@ -206,8 +204,6 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
return int(n), nil
}
// Wrapper functions for non-blocking audio manager
func CGOAudioInit() error {
return cgoAudioInit()

View File

@ -28,4 +28,30 @@ func cgoAudioPlaybackClose() {
func cgoAudioDecodeWrite(buf []byte) (int, error) {
return 0, errors.New("audio not available in lint mode")
}
}
// Uppercase wrapper functions (called by nonblocking_audio.go)
func CGOAudioInit() error {
return cgoAudioInit()
}
func CGOAudioClose() {
cgoAudioClose()
}
func CGOAudioReadEncode(buf []byte) (int, error) {
return cgoAudioReadEncode(buf)
}
func CGOAudioPlaybackInit() error {
return cgoAudioPlaybackInit()
}
func CGOAudioPlaybackClose() {
cgoAudioPlaybackClose()
}
func CGOAudioDecodeWrite(buf []byte) (int, error) {
return cgoAudioDecodeWrite(buf)
}

View File

@ -11,7 +11,7 @@ import (
// AudioInputMetrics holds metrics for microphone input
// Note: int64 fields must be 64-bit aligned for atomic operations on ARM
type AudioInputMetrics struct {
FramesSent int64 // Must be first for alignment
FramesSent int64 // Must be first for alignment
FramesDropped int64
BytesProcessed int64
ConnectionDrops int64
@ -22,8 +22,8 @@ type AudioInputMetrics struct {
// AudioInputManager manages microphone input stream from WebRTC to USB gadget
type AudioInputManager struct {
// metrics MUST be first for ARM32 alignment (contains int64 fields)
metrics AudioInputMetrics
metrics AudioInputMetrics
inputBuffer chan []byte
logger zerolog.Logger
running int32
@ -44,7 +44,7 @@ func (aim *AudioInputManager) Start() error {
}
aim.logger.Info().Msg("Starting audio input manager")
// Start the non-blocking audio input stream
err := StartNonBlockingAudioInput(aim.inputBuffer)
if err != nil {
@ -62,11 +62,11 @@ func (aim *AudioInputManager) Stop() {
}
aim.logger.Info().Msg("Stopping audio input manager")
// Stop the non-blocking audio input stream
// Note: This is handled by the global non-blocking audio manager
// Individual input streams are managed centrally
// Drain the input buffer
go func() {
for {
@ -115,4 +115,4 @@ func (aim *AudioInputManager) GetMetrics() AudioInputMetrics {
// IsRunning returns whether the audio input manager is running
func (aim *AudioInputManager) IsRunning() bool {
return atomic.LoadInt32(&aim.running) == 1
}
}

View File

@ -14,11 +14,14 @@ func StartNonBlockingAudioStreaming(send func([]byte)) error {
managerMutex.Lock()
defer managerMutex.Unlock()
if globalNonBlockingManager != nil && globalNonBlockingManager.IsRunning() {
return ErrAudioAlreadyRunning
if globalNonBlockingManager != nil && globalNonBlockingManager.IsOutputRunning() {
return nil // Already running, this is not an error
}
if globalNonBlockingManager == nil {
globalNonBlockingManager = NewNonBlockingAudioManager()
}
globalNonBlockingManager = NewNonBlockingAudioManager()
return globalNonBlockingManager.StartAudioOutput(send)
}
@ -31,6 +34,11 @@ func StartNonBlockingAudioInput(receiveChan <-chan []byte) error {
globalNonBlockingManager = NewNonBlockingAudioManager()
}
// Check if input is already running to avoid unnecessary operations
if globalNonBlockingManager.IsInputRunning() {
return nil // Already running, this is not an error
}
return globalNonBlockingManager.StartAudioInput(receiveChan)
}
@ -45,6 +53,16 @@ func StopNonBlockingAudioStreaming() {
}
}
// StopNonBlockingAudioInput stops only the audio input without affecting output
func StopNonBlockingAudioInput() {
managerMutex.Lock()
defer managerMutex.Unlock()
if globalNonBlockingManager != nil && globalNonBlockingManager.IsInputRunning() {
globalNonBlockingManager.StopAudioInput()
}
}
// GetNonBlockingAudioStats returns statistics from the non-blocking audio system
func GetNonBlockingAudioStats() NonBlockingAudioStats {
managerMutex.Lock()
@ -62,4 +80,12 @@ func IsNonBlockingAudioRunning() bool {
defer managerMutex.Unlock()
return globalNonBlockingManager != nil && globalNonBlockingManager.IsRunning()
}
}
// IsNonBlockingAudioInputRunning returns true if the non-blocking audio input is running
func IsNonBlockingAudioInputRunning() bool {
managerMutex.Lock()
defer managerMutex.Unlock()
return globalNonBlockingManager != nil && globalNonBlockingManager.IsInputRunning()
}

View File

@ -23,14 +23,14 @@ type NonBlockingAudioManager struct {
logger *zerolog.Logger
// Audio output (capture from device, send to WebRTC)
outputSendFunc func([]byte)
outputWorkChan chan audioWorkItem
outputSendFunc func([]byte)
outputWorkChan chan audioWorkItem
outputResultChan chan audioResult
// Audio input (receive from WebRTC, playback to device)
// Audio input (receive from WebRTC, playback to device)
inputReceiveChan <-chan []byte
inputWorkChan chan audioWorkItem
inputResultChan chan audioResult
inputWorkChan chan audioWorkItem
inputResultChan chan audioResult
// Worker threads and flags - int32 fields grouped together
outputRunning int32
@ -69,7 +69,7 @@ type NonBlockingAudioStats struct {
InputFramesDropped int64
WorkerErrors int64
// time.Time is int64 internally, so it's also aligned
LastProcessTime time.Time
LastProcessTime time.Time
}
// NewNonBlockingAudioManager creates a new non-blocking audio manager
@ -81,8 +81,8 @@ func NewNonBlockingAudioManager() *NonBlockingAudioManager {
ctx: ctx,
cancel: cancel,
logger: &logger,
outputWorkChan: make(chan audioWorkItem, 10), // Buffer for work items
outputResultChan: make(chan audioResult, 10), // Buffer for results
outputWorkChan: make(chan audioWorkItem, 10), // Buffer for work items
outputResultChan: make(chan audioResult, 10), // Buffer for results
inputWorkChan: make(chan audioWorkItem, 10),
inputResultChan: make(chan audioResult, 10),
}
@ -327,7 +327,7 @@ func (nam *NonBlockingAudioManager) inputCoordinatorThread() {
return
case frame := <-nam.inputReceiveChan:
if frame == nil || len(frame) == 0 {
if len(frame) == 0 {
continue
}
@ -397,6 +397,16 @@ func (nam *NonBlockingAudioManager) Stop() {
nam.logger.Info().Msg("non-blocking audio manager stopped")
}
// StopAudioInput stops only the audio input operations
func (nam *NonBlockingAudioManager) StopAudioInput() {
nam.logger.Info().Msg("stopping audio input")
// Stop only the input coordinator
atomic.StoreInt32(&nam.inputRunning, 0)
nam.logger.Info().Msg("audio input stopped")
}
// GetStats returns current statistics
func (nam *NonBlockingAudioManager) GetStats() NonBlockingAudioStats {
return NonBlockingAudioStats{
@ -412,4 +422,14 @@ func (nam *NonBlockingAudioManager) GetStats() NonBlockingAudioStats {
// IsRunning returns true if any audio operations are running
func (nam *NonBlockingAudioManager) IsRunning() bool {
return atomic.LoadInt32(&nam.outputRunning) == 1 || atomic.LoadInt32(&nam.inputRunning) == 1
}
}
// IsInputRunning returns true if audio input is running
func (nam *NonBlockingAudioManager) IsInputRunning() bool {
return atomic.LoadInt32(&nam.inputRunning) == 1
}
// IsOutputRunning returns true if audio output is running
func (nam *NonBlockingAudioManager) IsOutputRunning() bool {
return atomic.LoadInt32(&nam.outputRunning) == 1
}

View File

@ -21,8 +21,8 @@ import (
// Mouse event processing with single worker
var (
mouseEventChan = make(chan mouseEventData, 100) // Buffered channel for mouse events
mouseWorkerOnce sync.Once
mouseEventChan = make(chan mouseEventData, 100) // Buffered channel for mouse events
mouseWorkerOnce sync.Once
)
type mouseEventData struct {
@ -35,15 +35,15 @@ func startMouseWorker() {
go func() {
ticker := time.NewTicker(16 * time.Millisecond) // ~60 FPS
defer ticker.Stop()
var latestMouseEvent *mouseEventData
for {
select {
case event := <-mouseEventChan:
// Always keep the latest mouse event
latestMouseEvent = &event
case <-ticker.C:
// Process the latest mouse event at regular intervals
if latestMouseEvent != nil {
@ -68,7 +68,7 @@ func onRPCMessageThrottled(message webrtc.DataChannelMessage, session *Session)
if isMouseEvent(request.Method) {
// Start the mouse worker if not already started
mouseWorkerOnce.Do(startMouseWorker)
// Send to mouse worker (non-blocking)
select {
case mouseEventChan <- mouseEventData{message: message, session: session}:

View File

@ -155,7 +155,7 @@ func Main() {
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
logger.Info().Msg("JetKVM Shutting Down")
// Stop non-blocking audio manager
audio.StopNonBlockingAudioStreaming()
//if fuseServer != nil {

View File

@ -13,4 +13,4 @@ func startNativeBinary(binaryPath string) (*exec.Cmd, error) {
func ExtractAndRunNativeBin() error {
return fmt.Errorf("ExtractAndRunNativeBin is only supported on Linux")
}
}

View File

@ -8,6 +8,7 @@ import (
"io"
"net"
"os"
"runtime"
"strings"
"sync"
"time"
@ -165,6 +166,10 @@ func StartNativeVideoSocketServer() {
}
func handleCtrlClient(conn net.Conn) {
// Lock to OS thread to isolate blocking socket I/O
runtime.LockOSThread()
defer runtime.UnlockOSThread()
defer conn.Close()
scopedLogger := nativeLogger.With().
@ -172,7 +177,7 @@ func handleCtrlClient(conn net.Conn) {
Str("type", "ctrl").
Logger()
scopedLogger.Info().Msg("native ctrl socket client connected")
scopedLogger.Info().Msg("native ctrl socket client connected (OS thread locked)")
if ctrlSocketConn != nil {
scopedLogger.Debug().Msg("closing existing native socket connection")
ctrlSocketConn.Close()
@ -216,6 +221,10 @@ func handleCtrlClient(conn net.Conn) {
}
func handleVideoClient(conn net.Conn) {
// Lock to OS thread to isolate blocking video I/O
runtime.LockOSThread()
defer runtime.UnlockOSThread()
defer conn.Close()
scopedLogger := nativeLogger.With().
@ -223,7 +232,7 @@ func handleVideoClient(conn net.Conn) {
Str("type", "video").
Logger()
scopedLogger.Info().Msg("native video socket client connected")
scopedLogger.Info().Msg("native video socket client connected (OS thread locked)")
inboundPacket := make([]byte, maxVideoFrameSize)
lastFrame := time.Now()
@ -277,6 +286,10 @@ func GetNativeVersion() (string, error) {
}
func ensureBinaryUpdated(destPath string) error {
// Lock to OS thread for file I/O operations
runtime.LockOSThread()
defer runtime.UnlockOSThread()
srcFile, err := resource.ResourceFS.Open("jetkvm_native")
if err != nil {
return err

View File

@ -3,6 +3,7 @@ package kvm
import (
"bufio"
"io"
"runtime"
"strconv"
"strings"
"time"
@ -141,6 +142,10 @@ func unmountDCControl() error {
var dcState DCPowerState
func runDCControl() {
// Lock to OS thread to isolate DC control serial I/O
runtime.LockOSThread()
defer runtime.UnlockOSThread()
scopedLogger := serialLogger.With().Str("service", "dc_control").Logger()
reader := bufio.NewReader(port)
hasRestoreFeature := false
@ -290,6 +295,10 @@ func handleSerialChannel(d *webrtc.DataChannel) {
d.OnOpen(func() {
go func() {
// Lock to OS thread to isolate serial I/O
runtime.LockOSThread()
defer runtime.UnlockOSThread()
buf := make([]byte, 1024)
for {
n, err := port.Read(buf)

View File

@ -6,6 +6,7 @@ import (
"io"
"os"
"os/exec"
"runtime"
"github.com/creack/pty"
"github.com/pion/webrtc/v4"
@ -33,6 +34,10 @@ func handleTerminalChannel(d *webrtc.DataChannel) {
}
go func() {
// Lock to OS thread to isolate PTY I/O
runtime.LockOSThread()
defer runtime.UnlockOSThread()
buf := make([]byte, 1024)
for {
n, err := ptmx.Read(buf)

View File

@ -37,6 +37,10 @@ interface MicrophoneHookReturn {
stopMicrophone: () => Promise<{ success: boolean; error?: MicrophoneError }>;
toggleMicrophoneMute: () => Promise<{ success: boolean; error?: MicrophoneError }>;
syncMicrophoneState: () => Promise<void>;
// Loading states
isStarting: boolean;
isStopping: boolean;
isToggling: boolean;
}
export default function Actionbar({

View File

@ -40,6 +40,10 @@ interface MicrophoneHookReturn {
stopMicrophone: () => Promise<{ success: boolean; error?: MicrophoneError }>;
toggleMicrophoneMute: () => Promise<{ success: boolean; error?: MicrophoneError }>;
syncMicrophoneState: () => Promise<void>;
// Loading states
isStarting: boolean;
isStopping: boolean;
isToggling: boolean;
}
interface WebRTCVideoProps {

View File

@ -26,6 +26,10 @@ interface MicrophoneHookReturn {
stopMicrophone: () => Promise<{ success: boolean; error?: MicrophoneError }>;
toggleMicrophoneMute: () => Promise<{ success: boolean; error?: MicrophoneError }>;
syncMicrophoneState: () => Promise<void>;
// Loading states
isStarting: boolean;
isStopping: boolean;
isToggling: boolean;
}
interface AudioConfig {
@ -76,6 +80,10 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
const [isLoading, setIsLoading] = useState(false);
const [isConnected, setIsConnected] = useState(false);
// Add cooldown to prevent rapid clicking
const [lastClickTime, setLastClickTime] = useState(0);
const CLICK_COOLDOWN = 500; // 500ms cooldown between clicks
// Microphone state from props
const {
isMicrophoneActive,
@ -85,9 +93,12 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
stopMicrophone,
toggleMicrophoneMute,
syncMicrophoneState,
// Loading states
isStarting,
isStopping,
isToggling,
} = microphone;
const [microphoneMetrics, setMicrophoneMetrics] = useState<MicrophoneMetrics | null>(null);
const [isMicrophoneLoading, setIsMicrophoneLoading] = useState(false);
// Audio level monitoring
const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream);
@ -210,7 +221,6 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
};
const handleMicrophoneQualityChange = async (quality: number) => {
setIsMicrophoneLoading(true);
try {
const resp = await api.POST("/microphone/quality", { quality });
if (resp.ok) {
@ -219,13 +229,20 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
}
} catch (error) {
console.error("Failed to change microphone quality:", error);
} finally {
setIsMicrophoneLoading(false);
}
};
const handleToggleMicrophone = async () => {
setIsMicrophoneLoading(true);
const now = Date.now();
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
console.log("Microphone operation already in progress or within cooldown, ignoring click");
return;
}
setLastClickTime(now);
try {
const result = isMicrophoneActive ? await stopMicrophone() : await startMicrophone(selectedInputDevice);
if (!result.success && result.error) {
@ -234,13 +251,20 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
} catch (error) {
console.error("Failed to toggle microphone:", error);
notifications.error("An unexpected error occurred");
} finally {
setIsMicrophoneLoading(false);
}
};
const handleToggleMicrophoneMute = async () => {
setIsMicrophoneLoading(true);
const now = Date.now();
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
console.log("Microphone operation already in progress or within cooldown, ignoring mute toggle");
return;
}
setLastClickTime(now);
try {
const result = await toggleMicrophoneMute();
if (!result.success && result.error) {
@ -249,8 +273,6 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
} catch (error) {
console.error("Failed to toggle microphone mute:", error);
notifications.error("Failed to toggle microphone mute");
} finally {
setIsMicrophoneLoading(false);
}
};
@ -260,7 +282,6 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
// If microphone is currently active, restart it with the new device
if (isMicrophoneActive) {
setIsMicrophoneLoading(true);
try {
// Stop current microphone
await stopMicrophone();
@ -269,8 +290,9 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
if (!result.success && result.error) {
notifications.error(result.error.message);
}
} finally {
setIsMicrophoneLoading(false);
} catch (error) {
console.error("Failed to change microphone device:", error);
notifications.error("Failed to change microphone device");
}
}
};
@ -377,17 +399,26 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
<Button
size="SM"
theme={isMicrophoneActive ? "danger" : "primary"}
text={isMicrophoneActive ? "Stop" : "Start"}
text={
isStarting ? "Starting..." :
isStopping ? "Stopping..." :
isMicrophoneActive ? "Stop" : "Start"
}
onClick={handleToggleMicrophone}
disabled={isMicrophoneLoading}
disabled={isStarting || isStopping || isToggling}
loading={isStarting || isStopping}
/>
{isMicrophoneActive && (
<Button
size="SM"
theme={isMicrophoneMuted ? "danger" : "light"}
text={isMicrophoneMuted ? "Unmute" : "Mute"}
text={
isToggling ? (isMicrophoneMuted ? "Unmuting..." : "Muting...") :
isMicrophoneMuted ? "Unmute" : "Mute"
}
onClick={handleToggleMicrophoneMute}
disabled={isMicrophoneLoading}
disabled={isStarting || isStopping || isToggling}
loading={isToggling}
/>
)}
</div>
@ -517,13 +548,13 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
<button
key={`mic-${quality}`}
onClick={() => handleMicrophoneQualityChange(parseInt(quality))}
disabled={isMicrophoneLoading}
disabled={isStarting || isStopping || isToggling}
className={cx(
"rounded-md border px-3 py-2 text-sm font-medium transition-colors",
currentMicrophoneConfig?.Quality === parseInt(quality)
? "border-green-500 bg-green-50 text-green-700 dark:bg-green-900/20 dark:text-green-300"
: "border-slate-200 bg-white text-slate-700 hover:bg-slate-50 dark:border-slate-600 dark:bg-slate-700 dark:text-slate-300 dark:hover:bg-slate-600",
isMicrophoneLoading && "opacity-50 cursor-not-allowed"
(isStarting || isStopping || isToggling) && "opacity-50 cursor-not-allowed"
)}
>
{label}

View File

@ -1,4 +1,4 @@
import { useCallback, useEffect, useRef } from "react";
import { useCallback, useEffect, useRef, useState } from "react";
import { useRTCStore } from "@/hooks/stores";
import api from "@/api";
@ -22,6 +22,11 @@ export function useMicrophone() {
} = useRTCStore();
const microphoneStreamRef = useRef<MediaStream | null>(null);
// Loading states
const [isStarting, setIsStarting] = useState(false);
const [isStopping, setIsStopping] = useState(false);
const [isToggling, setIsToggling] = useState(false);
// Cleanup function to stop microphone stream
const stopMicrophoneStream = useCallback(async () => {
@ -110,7 +115,7 @@ export function useMicrophone() {
const syncMicrophoneState = useCallback(async () => {
// Debounce sync calls to prevent race conditions
const now = Date.now();
if (now - lastSyncRef.current < 500) {
if (now - lastSyncRef.current < 1000) { // Increased debounce time
console.log("Skipping sync - too frequent");
return;
}
@ -128,16 +133,24 @@ export function useMicrophone() {
const data = await response.json();
const backendRunning = data.running;
// If backend state differs from frontend state, sync them
// Only sync if there's a significant state difference and we're not in a transition
if (backendRunning !== isMicrophoneActive) {
console.info(`Syncing microphone state: backend=${backendRunning}, frontend=${isMicrophoneActive}`);
setMicrophoneActive(backendRunning);
// Only clean up stream if backend is definitely not running AND we have a stream
// Use ref to get current stream state, not stale closure value
if (!backendRunning && microphoneStreamRef.current) {
console.log("Backend not running, cleaning up stream");
await stopMicrophoneStream();
// If backend is running but frontend thinks it's not, just update frontend state
if (backendRunning && !isMicrophoneActive) {
console.log("Backend running, updating frontend state to active");
setMicrophoneActive(true);
}
// If backend is not running but frontend thinks it is, clean up and update state
else if (!backendRunning && isMicrophoneActive) {
console.log("Backend not running, cleaning up frontend state");
setMicrophoneActive(false);
// Only clean up stream if we actually have one
if (microphoneStreamRef.current) {
console.log("Cleaning up orphaned stream");
await stopMicrophoneStream();
}
}
}
}
@ -148,6 +161,13 @@ export function useMicrophone() {
// Start microphone stream
const startMicrophone = useCallback(async (deviceId?: string): Promise<{ success: boolean; error?: MicrophoneError }> => {
// Prevent multiple simultaneous start operations
if (isStarting || isStopping || isToggling) {
console.log("Microphone operation already in progress, skipping start");
return { success: false, error: { type: 'unknown', message: 'Operation already in progress' } };
}
setIsStarting(true);
try {
// Set flag to prevent sync during startup
isStartingRef.current = true;
@ -300,41 +320,73 @@ export function useMicrophone() {
// Notify backend that microphone is started
console.log("Notifying backend about microphone start...");
try {
const backendResp = await api.POST("/microphone/start", {});
console.log("Backend response status:", backendResp.status, "ok:", backendResp.ok);
if (!backendResp.ok) {
console.error("Backend microphone start failed with status:", backendResp.status);
// If backend fails, cleanup the stream
await stopMicrophoneStream();
isStartingRef.current = false;
return {
success: false,
error: {
type: 'network',
message: 'Failed to start microphone on backend'
// Retry logic for backend failures
let backendSuccess = false;
let lastError: any = null;
for (let attempt = 1; attempt <= 3; attempt++) {
try {
// If this is a retry, first try to stop the backend microphone to reset state
if (attempt > 1) {
console.log(`Backend start attempt ${attempt}, first trying to reset backend state...`);
try {
await api.POST("/microphone/stop", {});
// Wait a bit for the backend to reset
await new Promise(resolve => setTimeout(resolve, 200));
} catch (resetError) {
console.warn("Failed to reset backend state:", resetError);
}
}
const backendResp = await api.POST("/microphone/start", {});
console.log(`Backend response status (attempt ${attempt}):`, backendResp.status, "ok:", backendResp.ok);
if (!backendResp.ok) {
lastError = `Backend returned status ${backendResp.status}`;
console.error(`Backend microphone start failed with status: ${backendResp.status} (attempt ${attempt})`);
// For 500 errors, try again after a short delay
if (backendResp.status === 500 && attempt < 3) {
console.log(`Retrying backend start in 500ms (attempt ${attempt + 1}/3)...`);
await new Promise(resolve => setTimeout(resolve, 500));
continue;
}
} else {
// Success!
const responseData = await backendResp.json();
console.log("Backend response data:", responseData);
if (responseData.status === "already running") {
console.info("Backend microphone was already running");
}
console.log("Backend microphone start successful");
backendSuccess = true;
break;
}
} catch (error) {
lastError = error;
console.error(`Backend microphone start threw error (attempt ${attempt}):`, error);
// For network errors, try again after a short delay
if (attempt < 3) {
console.log(`Retrying backend start in 500ms (attempt ${attempt + 1}/3)...`);
await new Promise(resolve => setTimeout(resolve, 500));
continue;
}
};
}
// Check the response to see if it was already running
const responseData = await backendResp.json();
console.log("Backend response data:", responseData);
if (responseData.status === "already running") {
console.info("Backend microphone was already running");
}
console.log("Backend microphone start successful");
} catch (error) {
console.error("Backend microphone start threw error:", error);
// If backend fails, cleanup the stream
}
// If all backend attempts failed, cleanup and return error
if (!backendSuccess) {
console.error("All backend start attempts failed, cleaning up stream");
await stopMicrophoneStream();
isStartingRef.current = false;
setIsStarting(false);
return {
success: false,
error: {
type: 'network',
message: 'Failed to communicate with backend'
message: `Failed to start microphone on backend after 3 attempts. Last error: ${lastError}`
}
};
}
@ -364,6 +416,7 @@ export function useMicrophone() {
// Clear the starting flag
isStartingRef.current = false;
setIsStarting(false);
return { success: true };
} catch (error) {
console.error("Failed to start microphone:", error);
@ -395,28 +448,58 @@ export function useMicrophone() {
// Clear the starting flag on error
isStartingRef.current = false;
setIsStarting(false);
return { success: false, error: micError };
}
}, [peerConnection, setMicrophoneStream, setMicrophoneSender, setMicrophoneActive, setMicrophoneMuted, stopMicrophoneStream, isMicrophoneActive, isMicrophoneMuted, microphoneStream]);
}, [peerConnection, setMicrophoneStream, setMicrophoneSender, setMicrophoneActive, setMicrophoneMuted, stopMicrophoneStream, isMicrophoneActive, isMicrophoneMuted, microphoneStream, isStarting, isStopping, isToggling]);
// Reset backend microphone state
const resetBackendMicrophoneState = useCallback(async (): Promise<boolean> => {
try {
console.log("Resetting backend microphone state...");
await api.POST("/microphone/stop", {});
// Wait for backend to process the stop
await new Promise(resolve => setTimeout(resolve, 300));
return true;
} catch (error) {
console.warn("Failed to reset backend microphone state:", error);
return false;
}
}, []);
// Stop microphone
const stopMicrophone = useCallback(async (): Promise<{ success: boolean; error?: MicrophoneError }> => {
// Prevent multiple simultaneous stop operations
if (isStarting || isStopping || isToggling) {
console.log("Microphone operation already in progress, skipping stop");
return { success: false, error: { type: 'unknown', message: 'Operation already in progress' } };
}
setIsStopping(true);
try {
// First stop the stream
await stopMicrophoneStream();
// Notify backend that microphone is stopped
// Then notify backend that microphone is stopped
try {
await api.POST("/microphone/stop", {});
console.log("Backend notified about microphone stop");
} catch (error) {
console.warn("Failed to notify backend about microphone stop:", error);
}
// Sync state after stopping to ensure consistency
setTimeout(() => syncMicrophoneState(), 100);
// Update frontend state immediately
setMicrophoneActive(false);
setMicrophoneMuted(false);
// Sync state after stopping to ensure consistency (with longer delay)
setTimeout(() => syncMicrophoneState(), 500);
setIsStopping(false);
return { success: true };
} catch (error) {
console.error("Failed to stop microphone:", error);
setIsStopping(false);
return {
success: false,
error: {
@ -425,10 +508,17 @@ export function useMicrophone() {
}
};
}
}, [stopMicrophoneStream, syncMicrophoneState]);
}, [stopMicrophoneStream, syncMicrophoneState, setMicrophoneActive, setMicrophoneMuted, isStarting, isStopping, isToggling]);
// Toggle microphone mute
const toggleMicrophoneMute = useCallback(async (): Promise<{ success: boolean; error?: MicrophoneError }> => {
// Prevent multiple simultaneous toggle operations
if (isStarting || isStopping || isToggling) {
console.log("Microphone operation already in progress, skipping toggle");
return { success: false, error: { type: 'unknown', message: 'Operation already in progress' } };
}
setIsToggling(true);
try {
// Use the ref instead of store value to avoid race conditions
const currentStream = microphoneStreamRef.current || microphoneStream;
@ -461,6 +551,7 @@ export function useMicrophone() {
errorMessage = 'Microphone is not marked as active. Please restart the microphone.';
}
setIsToggling(false);
return {
success: false,
error: {
@ -472,6 +563,7 @@ export function useMicrophone() {
const audioTracks = currentStream.getAudioTracks();
if (audioTracks.length === 0) {
setIsToggling(false);
return {
success: false,
error: {
@ -498,9 +590,11 @@ export function useMicrophone() {
console.warn("Failed to notify backend about microphone mute:", error);
}
setIsToggling(false);
return { success: true };
} catch (error) {
console.error("Failed to toggle microphone mute:", error);
setIsToggling(false);
return {
success: false,
error: {
@ -509,7 +603,7 @@ export function useMicrophone() {
}
};
}
}, [microphoneStream, isMicrophoneActive, isMicrophoneMuted, setMicrophoneMuted]);
}, [microphoneStream, isMicrophoneActive, isMicrophoneMuted, setMicrophoneMuted, isStarting, isStopping, isToggling]);
// Function to check WebRTC audio transmission stats
const checkAudioTransmissionStats = useCallback(async () => {
@ -685,35 +779,53 @@ export function useMicrophone() {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).debugMicrophone = debugMicrophoneState;
(window as Window & {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).checkAudioStats = checkAudioTransmissionStats;
(window as Window & {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).testMicrophoneAudio = testMicrophoneAudio;
(window as Window & {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).resetBackendMicrophone = resetBackendMicrophoneState;
return () => {
delete (window as Window & {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).debugMicrophone;
delete (window as Window & {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).checkAudioStats;
delete (window as Window & {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).testMicrophoneAudio;
delete (window as Window & {
debugMicrophone?: () => unknown;
checkAudioStats?: () => unknown;
testMicrophoneAudio?: () => unknown;
resetBackendMicrophone?: () => unknown;
}).resetBackendMicrophone;
};
}, [debugMicrophoneState, checkAudioTransmissionStats, testMicrophoneAudio]);
}, [debugMicrophoneState, checkAudioTransmissionStats, testMicrophoneAudio, resetBackendMicrophoneState]);
// Sync state on mount
useEffect(() => {
@ -745,5 +857,10 @@ export function useMicrophone() {
toggleMicrophoneMute,
syncMicrophoneState,
debugMicrophoneState,
resetBackendMicrophoneState,
// Loading states
isStarting,
isStopping,
isToggling,
};
}

53
web.go
View File

@ -256,13 +256,13 @@ func setupRouter() *gin.Engine {
protected.GET("/microphone/status", func(c *gin.Context) {
sessionActive := currentSession != nil
var running bool
if sessionActive && currentSession.AudioInputManager != nil {
running = currentSession.AudioInputManager.IsRunning()
}
c.JSON(200, gin.H{
"running": running,
"running": running,
"session_active": sessionActive,
})
})
@ -278,14 +278,36 @@ func setupRouter() *gin.Engine {
return
}
// Check if already running before attempting to start
if currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() {
c.JSON(200, gin.H{
"status": "already running",
"running": true,
})
return
}
err := currentSession.AudioInputManager.Start()
if err != nil {
c.JSON(500, gin.H{"error": err.Error()})
// Log the error for debugging but don't expose internal details
logger.Warn().Err(err).Msg("failed to start microphone")
// Check if it's already running after the failed start attempt
// This handles race conditions where another request started it
if currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() {
c.JSON(200, gin.H{
"status": "started by concurrent request",
"running": true,
})
return
}
c.JSON(500, gin.H{"error": "failed to start microphone"})
return
}
c.JSON(200, gin.H{
"status": "started",
"status": "started",
"running": currentSession.AudioInputManager.IsRunning(),
})
})
@ -301,9 +323,22 @@ func setupRouter() *gin.Engine {
return
}
// Check if already stopped before attempting to stop
if !currentSession.AudioInputManager.IsRunning() && !audio.IsNonBlockingAudioInputRunning() {
c.JSON(200, gin.H{
"status": "already stopped",
"running": false,
})
return
}
currentSession.AudioInputManager.Stop()
// Also stop the non-blocking audio input specifically
audio.StopNonBlockingAudioInput()
c.JSON(200, gin.H{
"status": "stopped",
"status": "stopped",
"running": currentSession.AudioInputManager.IsRunning(),
})
})
@ -312,12 +347,12 @@ func setupRouter() *gin.Engine {
var req struct {
Muted bool `json:"muted"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(400, gin.H{"error": "invalid request body"})
return
}
// Note: Microphone muting is typically handled at the frontend level
// This endpoint is provided for consistency but doesn't affect backend processing
c.JSON(200, gin.H{
@ -380,7 +415,7 @@ func handleWebRTCSession(c *gin.Context) {
// Check if we have an existing session and handle renegotiation
if currentSession != nil {
logger.Info().Msg("handling renegotiation for existing session")
// Handle renegotiation with existing session
sd, err = currentSession.ExchangeOffer(req.Sd)
if err != nil {

View File

@ -5,6 +5,7 @@ import (
"encoding/base64"
"encoding/json"
"net"
"runtime"
"strings"
"github.com/coder/websocket"
@ -164,18 +165,22 @@ func newSession(config SessionConfig) (*Session, error) {
// Handle incoming audio track (microphone from browser)
peerConnection.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
scopedLogger.Info().Str("codec", track.Codec().MimeType).Str("id", track.ID()).Msg("Got remote track")
if track.Kind() == webrtc.RTPCodecTypeAudio && track.Codec().MimeType == webrtc.MimeTypeOpus {
scopedLogger.Info().Msg("Processing incoming audio track for microphone input")
go func() {
// Lock to OS thread to isolate RTP processing
runtime.LockOSThread()
defer runtime.UnlockOSThread()
for {
rtpPacket, _, err := track.ReadRTP()
if err != nil {
scopedLogger.Debug().Err(err).Msg("Error reading RTP packet from audio track")
return
}
// Extract Opus payload from RTP packet
opusPayload := rtpPacket.Payload
if len(opusPayload) > 0 && session.AudioInputManager != nil {
@ -251,6 +256,10 @@ func newSession(config SessionConfig) (*Session, error) {
}
func drainRtpSender(rtpSender *webrtc.RTPSender) {
// Lock to OS thread to isolate RTCP processing
runtime.LockOSThread()
defer runtime.UnlockOSThread()
rtcpBuf := make([]byte, 1500)
for {
if _, _, err := rtpSender.Read(rtcpBuf); err != nil {