diff --git a/audio_events.go b/audio_events.go new file mode 100644 index 0000000..7c01ae7 --- /dev/null +++ b/audio_events.go @@ -0,0 +1,307 @@ +package kvm + +import ( + "context" + "sync" + "time" + + "github.com/coder/websocket" + "github.com/coder/websocket/wsjson" + "github.com/jetkvm/kvm/internal/audio" + "github.com/rs/zerolog" +) + +// AudioEventType represents different types of audio events +type AudioEventType string + +const ( + AudioEventMuteChanged AudioEventType = "audio-mute-changed" + AudioEventMetricsUpdate AudioEventType = "audio-metrics-update" + AudioEventMicrophoneState AudioEventType = "microphone-state-changed" + AudioEventMicrophoneMetrics AudioEventType = "microphone-metrics-update" +) + +// AudioEvent represents a WebSocket audio event +type AudioEvent struct { + Type AudioEventType `json:"type"` + Data interface{} `json:"data"` +} + +// AudioMuteData represents audio mute state change data +type AudioMuteData struct { + Muted bool `json:"muted"` +} + +// AudioMetricsData represents audio metrics data +type AudioMetricsData struct { + FramesReceived int64 `json:"frames_received"` + FramesDropped int64 `json:"frames_dropped"` + BytesProcessed int64 `json:"bytes_processed"` + LastFrameTime string `json:"last_frame_time"` + ConnectionDrops int64 `json:"connection_drops"` + AverageLatency string `json:"average_latency"` +} + +// MicrophoneStateData represents microphone state data +type MicrophoneStateData struct { + Running bool `json:"running"` + SessionActive bool `json:"session_active"` +} + +// MicrophoneMetricsData represents microphone metrics data +type MicrophoneMetricsData struct { + FramesSent int64 `json:"frames_sent"` + FramesDropped int64 `json:"frames_dropped"` + BytesProcessed int64 `json:"bytes_processed"` + LastFrameTime string `json:"last_frame_time"` + ConnectionDrops int64 `json:"connection_drops"` + AverageLatency string `json:"average_latency"` +} + +// AudioEventSubscriber represents a WebSocket connection subscribed to audio events +type AudioEventSubscriber struct { + conn *websocket.Conn + ctx context.Context + logger *zerolog.Logger +} + +// AudioEventBroadcaster manages audio event subscriptions and broadcasting +type AudioEventBroadcaster struct { + subscribers map[string]*AudioEventSubscriber + mutex sync.RWMutex + logger *zerolog.Logger +} + +var ( + audioEventBroadcaster *AudioEventBroadcaster + audioEventOnce sync.Once +) + +// InitializeAudioEventBroadcaster initializes the global audio event broadcaster +func InitializeAudioEventBroadcaster() { + audioEventOnce.Do(func() { + l := logger.With().Str("component", "audio-events").Logger() + audioEventBroadcaster = &AudioEventBroadcaster{ + subscribers: make(map[string]*AudioEventSubscriber), + logger: &l, + } + + // Start metrics broadcasting goroutine + go audioEventBroadcaster.startMetricsBroadcasting() + }) +} + +// GetAudioEventBroadcaster returns the singleton audio event broadcaster +func GetAudioEventBroadcaster() *AudioEventBroadcaster { + audioEventOnce.Do(func() { + l := logger.With().Str("component", "audio-events").Logger() + audioEventBroadcaster = &AudioEventBroadcaster{ + subscribers: make(map[string]*AudioEventSubscriber), + logger: &l, + } + + // Start metrics broadcasting goroutine + go audioEventBroadcaster.startMetricsBroadcasting() + }) + return audioEventBroadcaster +} + +// Subscribe adds a WebSocket connection to receive audio events +func (aeb *AudioEventBroadcaster) Subscribe(connectionID string, conn *websocket.Conn, ctx context.Context, logger *zerolog.Logger) { + aeb.mutex.Lock() + defer aeb.mutex.Unlock() + + aeb.subscribers[connectionID] = &AudioEventSubscriber{ + conn: conn, + ctx: ctx, + logger: logger, + } + + aeb.logger.Info().Str("connectionID", connectionID).Msg("audio events subscription added") + + // Send initial state to new subscriber + go aeb.sendInitialState(connectionID) +} + +// Unsubscribe removes a WebSocket connection from audio events +func (aeb *AudioEventBroadcaster) Unsubscribe(connectionID string) { + aeb.mutex.Lock() + defer aeb.mutex.Unlock() + + delete(aeb.subscribers, connectionID) + aeb.logger.Info().Str("connectionID", connectionID).Msg("audio events subscription removed") +} + +// BroadcastAudioMuteChanged broadcasts audio mute state changes +func (aeb *AudioEventBroadcaster) BroadcastAudioMuteChanged(muted bool) { + event := AudioEvent{ + Type: AudioEventMuteChanged, + Data: AudioMuteData{Muted: muted}, + } + aeb.broadcast(event) +} + +// BroadcastMicrophoneStateChanged broadcasts microphone state changes +func (aeb *AudioEventBroadcaster) BroadcastMicrophoneStateChanged(running, sessionActive bool) { + event := AudioEvent{ + Type: AudioEventMicrophoneState, + Data: MicrophoneStateData{ + Running: running, + SessionActive: sessionActive, + }, + } + aeb.broadcast(event) +} + +// sendInitialState sends current audio state to a new subscriber +func (aeb *AudioEventBroadcaster) sendInitialState(connectionID string) { + aeb.mutex.RLock() + subscriber, exists := aeb.subscribers[connectionID] + aeb.mutex.RUnlock() + + if !exists { + return + } + + // Send current audio mute state + muteEvent := AudioEvent{ + Type: AudioEventMuteChanged, + Data: AudioMuteData{Muted: audio.IsAudioMuted()}, + } + aeb.sendToSubscriber(subscriber, muteEvent) + + // Send current microphone state + sessionActive := currentSession != nil + var running bool + if sessionActive && currentSession.AudioInputManager != nil { + running = currentSession.AudioInputManager.IsRunning() + } + + micStateEvent := AudioEvent{ + Type: AudioEventMicrophoneState, + Data: MicrophoneStateData{ + Running: running, + SessionActive: sessionActive, + }, + } + aeb.sendToSubscriber(subscriber, micStateEvent) + + // Send current metrics + aeb.sendCurrentMetrics(subscriber) +} + +// sendCurrentMetrics sends current audio and microphone metrics to a subscriber +func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubscriber) { + // Send audio metrics + audioMetrics := audio.GetAudioMetrics() + audioMetricsEvent := AudioEvent{ + Type: AudioEventMetricsUpdate, + Data: AudioMetricsData{ + FramesReceived: audioMetrics.FramesReceived, + FramesDropped: audioMetrics.FramesDropped, + BytesProcessed: audioMetrics.BytesProcessed, + LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: audioMetrics.ConnectionDrops, + AverageLatency: audioMetrics.AverageLatency.String(), + }, + } + aeb.sendToSubscriber(subscriber, audioMetricsEvent) + + // Send microphone metrics + if currentSession != nil && currentSession.AudioInputManager != nil { + micMetrics := currentSession.AudioInputManager.GetMetrics() + micMetricsEvent := AudioEvent{ + Type: AudioEventMicrophoneMetrics, + Data: MicrophoneMetricsData{ + FramesSent: micMetrics.FramesSent, + FramesDropped: micMetrics.FramesDropped, + BytesProcessed: micMetrics.BytesProcessed, + LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: micMetrics.ConnectionDrops, + AverageLatency: micMetrics.AverageLatency.String(), + }, + } + aeb.sendToSubscriber(subscriber, micMetricsEvent) + } +} + +// startMetricsBroadcasting starts a goroutine that periodically broadcasts metrics +func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { + ticker := time.NewTicker(2 * time.Second) // Same interval as current polling + defer ticker.Stop() + + for range ticker.C { + aeb.mutex.RLock() + subscriberCount := len(aeb.subscribers) + aeb.mutex.RUnlock() + + // Only broadcast if there are subscribers + if subscriberCount == 0 { + continue + } + + // Broadcast audio metrics + audioMetrics := audio.GetAudioMetrics() + audioMetricsEvent := AudioEvent{ + Type: AudioEventMetricsUpdate, + Data: AudioMetricsData{ + FramesReceived: audioMetrics.FramesReceived, + FramesDropped: audioMetrics.FramesDropped, + BytesProcessed: audioMetrics.BytesProcessed, + LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: audioMetrics.ConnectionDrops, + AverageLatency: audioMetrics.AverageLatency.String(), + }, + } + aeb.broadcast(audioMetricsEvent) + + // Broadcast microphone metrics if available + if currentSession != nil && currentSession.AudioInputManager != nil { + micMetrics := currentSession.AudioInputManager.GetMetrics() + micMetricsEvent := AudioEvent{ + Type: AudioEventMicrophoneMetrics, + Data: MicrophoneMetricsData{ + FramesSent: micMetrics.FramesSent, + FramesDropped: micMetrics.FramesDropped, + BytesProcessed: micMetrics.BytesProcessed, + LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: micMetrics.ConnectionDrops, + AverageLatency: micMetrics.AverageLatency.String(), + }, + } + aeb.broadcast(micMetricsEvent) + } + } +} + +// broadcast sends an event to all subscribers +func (aeb *AudioEventBroadcaster) broadcast(event AudioEvent) { + aeb.mutex.RLock() + defer aeb.mutex.RUnlock() + + for connectionID, subscriber := range aeb.subscribers { + go func(id string, sub *AudioEventSubscriber) { + if !aeb.sendToSubscriber(sub, event) { + // Remove failed subscriber + aeb.mutex.Lock() + delete(aeb.subscribers, id) + aeb.mutex.Unlock() + aeb.logger.Warn().Str("connectionID", id).Msg("removed failed audio events subscriber") + } + }(connectionID, subscriber) + } +} + +// sendToSubscriber sends an event to a specific subscriber +func (aeb *AudioEventBroadcaster) sendToSubscriber(subscriber *AudioEventSubscriber, event AudioEvent) bool { + ctx, cancel := context.WithTimeout(subscriber.ctx, 5*time.Second) + defer cancel() + + err := wsjson.Write(ctx, subscriber.conn, event) + if err != nil { + subscriber.logger.Warn().Err(err).Msg("failed to send audio event to subscriber") + return false + } + + return true +} \ No newline at end of file diff --git a/internal/audio/nonblocking_audio.go b/internal/audio/nonblocking_audio.go index d450b93..aeadaf8 100644 --- a/internal/audio/nonblocking_audio.go +++ b/internal/audio/nonblocking_audio.go @@ -134,7 +134,7 @@ func (nam *NonBlockingAudioManager) outputWorkerThread() { // Lock to OS thread to isolate blocking CGO operations runtime.LockOSThread() defer runtime.UnlockOSThread() - + defer nam.wg.Done() defer atomic.StoreInt32(&nam.outputWorkerRunning, 0) @@ -271,7 +271,7 @@ func (nam *NonBlockingAudioManager) inputWorkerThread() { // Lock to OS thread to isolate blocking CGO operations runtime.LockOSThread() defer runtime.UnlockOSThread() - + defer nam.wg.Done() defer atomic.StoreInt32(&nam.inputWorkerRunning, 0) diff --git a/main.go b/main.go index b610757..8c96037 100644 --- a/main.go +++ b/main.go @@ -106,6 +106,10 @@ func Main() { logger.Warn().Err(err).Msg("failed to start non-blocking audio streaming") } + // Initialize audio event broadcaster for WebSocket-based real-time updates + InitializeAudioEventBroadcaster() + logger.Info().Msg("audio event broadcaster initialized") + if err := setInitialVirtualMediaState(); err != nil { logger.Warn().Err(err).Msg("failed to set initial virtual media state") } diff --git a/ui/src/components/ActionBar.tsx b/ui/src/components/ActionBar.tsx index a3edc5e..4cc1f9e 100644 --- a/ui/src/components/ActionBar.tsx +++ b/ui/src/components/ActionBar.tsx @@ -20,6 +20,7 @@ import MountPopopover from "@/components/popovers/MountPopover"; import ExtensionPopover from "@/components/popovers/ExtensionPopover"; import AudioControlPopover from "@/components/popovers/AudioControlPopover"; import { useDeviceUiNavigation } from "@/hooks/useAppNavigation"; +import { useAudioEvents } from "@/hooks/useAudioEvents"; import api from "@/api"; // Type for microphone error @@ -81,27 +82,36 @@ export default function Actionbar({ [setDisableFocusTrap], ); - // Mute/unmute state for button display - const [isMuted, setIsMuted] = useState(false); + // Use WebSocket-based audio events for real-time updates + const { audioMuted, isConnected } = useAudioEvents(); + + // Fallback to polling if WebSocket is not connected + const [fallbackMuted, setFallbackMuted] = useState(false); useEffect(() => { - api.GET("/audio/mute").then(async resp => { - if (resp.ok) { - const data = await resp.json(); - setIsMuted(!!data.muted); - } - }); - - // Refresh mute state periodically for button display - const interval = setInterval(async () => { - const resp = await api.GET("/audio/mute"); - if (resp.ok) { - const data = await resp.json(); - setIsMuted(!!data.muted); - } - }, 1000); - - return () => clearInterval(interval); - }, []); + if (!isConnected) { + // Load initial state + api.GET("/audio/mute").then(async resp => { + if (resp.ok) { + const data = await resp.json(); + setFallbackMuted(!!data.muted); + } + }); + + // Fallback polling when WebSocket is not available + const interval = setInterval(async () => { + const resp = await api.GET("/audio/mute"); + if (resp.ok) { + const data = await resp.json(); + setFallbackMuted(!!data.muted); + } + }, 1000); + + return () => clearInterval(interval); + } + }, [isConnected]); + + // Use WebSocket data when available, fallback to polling data otherwise + const isMuted = isConnected && audioMuted !== null ? audioMuted : fallbackMuted; return ( diff --git a/ui/src/components/AudioMetricsDashboard.tsx b/ui/src/components/AudioMetricsDashboard.tsx index 08d77ea..435612d 100644 --- a/ui/src/components/AudioMetricsDashboard.tsx +++ b/ui/src/components/AudioMetricsDashboard.tsx @@ -6,6 +6,7 @@ import { AudioLevelMeter } from "@components/AudioLevelMeter"; import { cx } from "@/cva.config"; import { useMicrophone } from "@/hooks/useMicrophone"; import { useAudioLevel } from "@/hooks/useAudioLevel"; +import { useAudioEvents } from "@/hooks/useAudioEvents"; import api from "@/api"; interface AudioMetrics { @@ -42,51 +43,46 @@ const qualityLabels = { }; export default function AudioMetricsDashboard() { - const [metrics, setMetrics] = useState(null); - const [microphoneMetrics, setMicrophoneMetrics] = useState(null); + // Use WebSocket-based audio events for real-time updates + const { + audioMetrics, + microphoneMetrics: wsMicrophoneMetrics, + isConnected: wsConnected + } = useAudioEvents(); + + // Fallback state for when WebSocket is not connected + const [fallbackMetrics, setFallbackMetrics] = useState(null); + const [fallbackMicrophoneMetrics, setFallbackMicrophoneMetrics] = useState(null); + const [fallbackConnected, setFallbackConnected] = useState(false); + + // Configuration state (these don't change frequently, so we can load them once) const [config, setConfig] = useState(null); const [microphoneConfig, setMicrophoneConfig] = useState(null); - const [isConnected, setIsConnected] = useState(false); const [lastUpdate, setLastUpdate] = useState(new Date()); + // Use WebSocket data when available, fallback to polling data otherwise + const metrics = wsConnected && audioMetrics !== null ? audioMetrics : fallbackMetrics; + const microphoneMetrics = wsConnected && wsMicrophoneMetrics !== null ? wsMicrophoneMetrics : fallbackMicrophoneMetrics; + const isConnected = wsConnected ? wsConnected : fallbackConnected; + // Microphone state for audio level monitoring const { isMicrophoneActive, isMicrophoneMuted, microphoneStream } = useMicrophone(); const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream); useEffect(() => { - loadAudioData(); + // Load initial configuration (only once) + loadAudioConfig(); - // Refresh every 1 second for real-time metrics - const interval = setInterval(loadAudioData, 1000); - return () => clearInterval(interval); - }, []); + // Set up fallback polling only when WebSocket is not connected + if (!wsConnected) { + loadAudioData(); + const interval = setInterval(loadAudioData, 1000); + return () => clearInterval(interval); + } + }, [wsConnected]); - const loadAudioData = async () => { + const loadAudioConfig = async () => { try { - // Load metrics - const metricsResp = await api.GET("/audio/metrics"); - if (metricsResp.ok) { - const metricsData = await metricsResp.json(); - setMetrics(metricsData); - // Consider connected if API call succeeds, regardless of frame count - setIsConnected(true); - setLastUpdate(new Date()); - } else { - setIsConnected(false); - } - - // Load microphone metrics - try { - const micResp = await api.GET("/microphone/metrics"); - if (micResp.ok) { - const micData = await micResp.json(); - setMicrophoneMetrics(micData); - } - } catch (micError) { - // Microphone metrics might not be available, that's okay - console.debug("Microphone metrics not available:", micError); - } - // Load config const configResp = await api.GET("/audio/quality"); if (configResp.ok) { @@ -104,9 +100,39 @@ export default function AudioMetricsDashboard() { } catch (micConfigError) { console.debug("Microphone config not available:", micConfigError); } + } catch (error) { + console.error("Failed to load audio config:", error); + } + }; + + const loadAudioData = async () => { + try { + // Load metrics + const metricsResp = await api.GET("/audio/metrics"); + if (metricsResp.ok) { + const metricsData = await metricsResp.json(); + setFallbackMetrics(metricsData); + // Consider connected if API call succeeds, regardless of frame count + setFallbackConnected(true); + setLastUpdate(new Date()); + } else { + setFallbackConnected(false); + } + + // Load microphone metrics + try { + const micResp = await api.GET("/microphone/metrics"); + if (micResp.ok) { + const micData = await micResp.json(); + setFallbackMicrophoneMetrics(micData); + } + } catch (micError) { + // Microphone metrics might not be available, that's okay + console.debug("Microphone metrics not available:", micError); + } } catch (error) { console.error("Failed to load audio data:", error); - setIsConnected(false); + setFallbackConnected(false); } }; diff --git a/ui/src/components/popovers/AudioControlPopover.tsx b/ui/src/components/popovers/AudioControlPopover.tsx index a55b57c..15f90ad 100644 --- a/ui/src/components/popovers/AudioControlPopover.tsx +++ b/ui/src/components/popovers/AudioControlPopover.tsx @@ -8,6 +8,7 @@ import { cx } from "@/cva.config"; import { useUiStore } from "@/hooks/stores"; import { useAudioDevices } from "@/hooks/useAudioDevices"; import { useAudioLevel } from "@/hooks/useAudioLevel"; +import { useAudioEvents } from "@/hooks/useAudioEvents"; import api from "@/api"; import notifications from "@/notifications"; @@ -74,16 +75,27 @@ interface AudioControlPopoverProps { export default function AudioControlPopover({ microphone }: AudioControlPopoverProps) { const [currentConfig, setCurrentConfig] = useState(null); const [currentMicrophoneConfig, setCurrentMicrophoneConfig] = useState(null); - const [isMuted, setIsMuted] = useState(false); - const [metrics, setMetrics] = useState(null); const [showAdvanced, setShowAdvanced] = useState(false); const [isLoading, setIsLoading] = useState(false); - const [isConnected, setIsConnected] = useState(false); // Add cooldown to prevent rapid clicking const [lastClickTime, setLastClickTime] = useState(0); const CLICK_COOLDOWN = 500; // 500ms cooldown between clicks + // Use WebSocket-based audio events for real-time updates + const { + audioMuted, + audioMetrics, + microphoneMetrics, + isConnected: wsConnected + } = useAudioEvents(); + + // Fallback state for when WebSocket is not connected + const [fallbackMuted, setFallbackMuted] = useState(false); + const [fallbackMetrics, setFallbackMetrics] = useState(null); + const [fallbackMicMetrics, setFallbackMicMetrics] = useState(null); + const [fallbackConnected, setFallbackConnected] = useState(false); + // Microphone state from props const { isMicrophoneActive, @@ -98,7 +110,12 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP isStopping, isToggling, } = microphone; - const [microphoneMetrics, setMicrophoneMetrics] = useState(null); + + // Use WebSocket data when available, fallback to polling data otherwise + const isMuted = wsConnected && audioMuted !== null ? audioMuted : fallbackMuted; + const metrics = wsConnected && audioMetrics !== null ? audioMetrics : fallbackMetrics; + const micMetrics = wsConnected && microphoneMetrics !== null ? microphoneMetrics : fallbackMicMetrics; + const isConnected = wsConnected ? wsConnected : fallbackConnected; // Audio level monitoring const { audioLevel, isAnalyzing } = useAudioLevel(microphoneStream); @@ -118,30 +135,33 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP const { toggleSidebarView } = useUiStore(); - // Load initial audio state + // Load initial configurations once (these don't change frequently) useEffect(() => { - loadAudioState(); - loadAudioMetrics(); - loadMicrophoneMetrics(); - syncMicrophoneState(); - - // Set up metrics refresh interval - const metricsInterval = setInterval(() => { + loadAudioConfigurations(); + }, []); + + // Load initial audio state and set up fallback polling when WebSocket is not connected + useEffect(() => { + if (!wsConnected) { + loadAudioState(); + // Only load metrics as fallback when WebSocket is disconnected loadAudioMetrics(); loadMicrophoneMetrics(); - }, 2000); - return () => clearInterval(metricsInterval); - }, [syncMicrophoneState]); + + // Set up metrics refresh interval for fallback only + const metricsInterval = setInterval(() => { + loadAudioMetrics(); + loadMicrophoneMetrics(); + }, 2000); + return () => clearInterval(metricsInterval); + } + + // Always sync microphone state + syncMicrophoneState(); + }, [wsConnected, syncMicrophoneState]); - const loadAudioState = async () => { + const loadAudioConfigurations = async () => { try { - // Load mute state - const muteResp = await api.GET("/audio/mute"); - if (muteResp.ok) { - const muteData = await muteResp.json(); - setIsMuted(!!muteData.muted); - } - // Load quality config const qualityResp = await api.GET("/audio/quality"); if (qualityResp.ok) { @@ -155,6 +175,19 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP const micQualityData = await micQualityResp.json(); setCurrentMicrophoneConfig(micQualityData.current); } + } catch (error) { + console.error("Failed to load audio configurations:", error); + } + }; + + const loadAudioState = async () => { + try { + // Load mute state only (configurations are loaded separately) + const muteResp = await api.GET("/audio/mute"); + if (muteResp.ok) { + const muteData = await muteResp.json(); + setFallbackMuted(!!muteData.muted); + } } catch (error) { console.error("Failed to load audio state:", error); } @@ -165,15 +198,15 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP const resp = await api.GET("/audio/metrics"); if (resp.ok) { const data = await resp.json(); - setMetrics(data); + setFallbackMetrics(data); // Consider connected if API call succeeds, regardless of frame count - setIsConnected(true); + setFallbackConnected(true); } else { - setIsConnected(false); + setFallbackConnected(false); } } catch (error) { console.error("Failed to load audio metrics:", error); - setIsConnected(false); + setFallbackConnected(false); } }; @@ -184,7 +217,7 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP const resp = await api.GET("/microphone/metrics"); if (resp.ok) { const data = await resp.json(); - setMicrophoneMetrics(data); + setFallbackMicMetrics(data); } } catch (error) { console.error("Failed to load microphone metrics:", error); @@ -196,7 +229,10 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP try { const resp = await api.POST("/audio/mute", { muted: !isMuted }); if (resp.ok) { - setIsMuted(!isMuted); + // WebSocket will handle the state update, but update fallback for immediate feedback + if (!wsConnected) { + setFallbackMuted(!isMuted); + } } } catch (error) { console.error("Failed to toggle mute:", error); @@ -687,14 +723,14 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP - {microphoneMetrics && ( + {micMetrics && (

Microphone Input

Frames Sent
- {formatNumber(microphoneMetrics.frames_sent)} + {formatNumber(micMetrics.frames_sent)}
@@ -702,18 +738,18 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
Frames Dropped
0 + micMetrics.frames_dropped > 0 ? "text-red-600 dark:text-red-400" : "text-green-600 dark:text-green-400" )}> - {formatNumber(microphoneMetrics.frames_dropped)} + {formatNumber(micMetrics.frames_dropped)}
Data Processed
- {formatBytes(microphoneMetrics.bytes_processed)} + {formatBytes(micMetrics.bytes_processed)}
@@ -721,11 +757,11 @@ export default function AudioControlPopover({ microphone }: AudioControlPopoverP
Connection Drops
0 + micMetrics.connection_drops > 0 ? "text-red-600 dark:text-red-400" : "text-green-600 dark:text-green-400" )}> - {formatNumber(microphoneMetrics.connection_drops)} + {formatNumber(micMetrics.connection_drops)}
diff --git a/ui/src/hooks/useAudioEvents.ts b/ui/src/hooks/useAudioEvents.ts new file mode 100644 index 0000000..90d73cb --- /dev/null +++ b/ui/src/hooks/useAudioEvents.ts @@ -0,0 +1,202 @@ +import { useCallback, useEffect, useRef, useState } from 'react'; +import useWebSocket, { ReadyState } from 'react-use-websocket'; + +// Audio event types matching the backend +export type AudioEventType = + | 'audio-mute-changed' + | 'audio-metrics-update' + | 'microphone-state-changed' + | 'microphone-metrics-update'; + +// Audio event data interfaces +export interface AudioMuteData { + muted: boolean; +} + +export interface AudioMetricsData { + frames_received: number; + frames_dropped: number; + bytes_processed: number; + last_frame_time: string; + connection_drops: number; + average_latency: string; +} + +export interface MicrophoneStateData { + running: boolean; + session_active: boolean; +} + +export interface MicrophoneMetricsData { + frames_sent: number; + frames_dropped: number; + bytes_processed: number; + last_frame_time: string; + connection_drops: number; + average_latency: string; +} + +// Audio event structure +export interface AudioEvent { + type: AudioEventType; + data: AudioMuteData | AudioMetricsData | MicrophoneStateData | MicrophoneMetricsData; +} + +// Hook return type +export interface UseAudioEventsReturn { + // Connection state + connectionState: ReadyState; + isConnected: boolean; + + // Audio state + audioMuted: boolean | null; + audioMetrics: AudioMetricsData | null; + + // Microphone state + microphoneState: MicrophoneStateData | null; + microphoneMetrics: MicrophoneMetricsData | null; + + // Manual subscription control + subscribe: () => void; + unsubscribe: () => void; +} + +export function useAudioEvents(): UseAudioEventsReturn { + // State for audio data + const [audioMuted, setAudioMuted] = useState(null); + const [audioMetrics, setAudioMetrics] = useState(null); + const [microphoneState, setMicrophoneState] = useState(null); + const [microphoneMetrics, setMicrophoneMetrics] = useState(null); + + // Subscription state + const [isSubscribed, setIsSubscribed] = useState(false); + const subscriptionSent = useRef(false); + + // Get WebSocket URL + const getWebSocketUrl = () => { + const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; + const host = window.location.host; + return `${protocol}//${host}/webrtc/signaling/client`; + }; + + // WebSocket connection + const { + sendMessage, + lastMessage, + readyState, + } = useWebSocket(getWebSocketUrl(), { + shouldReconnect: () => true, + reconnectAttempts: 10, + reconnectInterval: 3000, + onOpen: () => { + console.log('[AudioEvents] WebSocket connected'); + subscriptionSent.current = false; + }, + onClose: () => { + console.log('[AudioEvents] WebSocket disconnected'); + subscriptionSent.current = false; + setIsSubscribed(false); + }, + onError: (event) => { + console.error('[AudioEvents] WebSocket error:', event); + }, + }); + + // Subscribe to audio events + const subscribe = useCallback(() => { + if (readyState === ReadyState.OPEN && !subscriptionSent.current) { + const subscribeMessage = { + type: 'subscribe-audio-events', + data: {} + }; + + sendMessage(JSON.stringify(subscribeMessage)); + subscriptionSent.current = true; + setIsSubscribed(true); + console.log('[AudioEvents] Subscribed to audio events'); + } + }, [readyState, sendMessage]); + + // Handle incoming messages + useEffect(() => { + if (lastMessage !== null) { + try { + const message = JSON.parse(lastMessage.data); + + // Handle audio events + if (message.type && message.data) { + const audioEvent = message as AudioEvent; + + switch (audioEvent.type) { + case 'audio-mute-changed': { + const muteData = audioEvent.data as AudioMuteData; + setAudioMuted(muteData.muted); + console.log('[AudioEvents] Audio mute changed:', muteData.muted); + break; + } + + case 'audio-metrics-update': { + const audioMetricsData = audioEvent.data as AudioMetricsData; + setAudioMetrics(audioMetricsData); + break; + } + + case 'microphone-state-changed': { + const micStateData = audioEvent.data as MicrophoneStateData; + setMicrophoneState(micStateData); + console.log('[AudioEvents] Microphone state changed:', micStateData); + break; + } + + case 'microphone-metrics-update': { + const micMetricsData = audioEvent.data as MicrophoneMetricsData; + setMicrophoneMetrics(micMetricsData); + break; + } + + default: + // Ignore other message types (WebRTC signaling, etc.) + break; + } + } + } catch (error) { + // Ignore parsing errors for non-JSON messages (like "pong") + if (lastMessage.data !== 'pong') { + console.warn('[AudioEvents] Failed to parse WebSocket message:', error); + } + } + } + }, [lastMessage]); + + // Auto-subscribe when connected + useEffect(() => { + if (readyState === ReadyState.OPEN && !subscriptionSent.current) { + subscribe(); + } + }, [readyState, subscribe]); + + // Unsubscribe from audio events (connection will be cleaned up automatically) + const unsubscribe = useCallback(() => { + setIsSubscribed(false); + subscriptionSent.current = false; + console.log('[AudioEvents] Unsubscribed from audio events'); + }, []); + + return { + // Connection state + connectionState: readyState, + isConnected: readyState === ReadyState.OPEN && isSubscribed, + + // Audio state + audioMuted, + audioMetrics, + + // Microphone state + microphoneState, + microphoneMetrics, + + // Manual subscription control + subscribe, + unsubscribe, + }; +} \ No newline at end of file diff --git a/web.go b/web.go index b2914a0..b01ccc9 100644 --- a/web.go +++ b/web.go @@ -173,6 +173,11 @@ func setupRouter() *gin.Engine { return } audio.SetAudioMuted(req.Muted) + + // Broadcast audio mute state change via WebSocket + broadcaster := GetAudioEventBroadcaster() + broadcaster.BroadcastAudioMuteChanged(req.Muted) + c.JSON(200, gin.H{"muted": req.Muted}) }) @@ -306,6 +311,10 @@ func setupRouter() *gin.Engine { return } + // Broadcast microphone state change via WebSocket + broadcaster := GetAudioEventBroadcaster() + broadcaster.BroadcastMicrophoneStateChanged(true, true) + c.JSON(200, gin.H{ "status": "started", "running": currentSession.AudioInputManager.IsRunning(), @@ -337,6 +346,10 @@ func setupRouter() *gin.Engine { // Also stop the non-blocking audio input specifically audio.StopNonBlockingAudioInput() + // Broadcast microphone state change via WebSocket + broadcaster := GetAudioEventBroadcaster() + broadcaster.BroadcastMicrophoneStateChanged(false, true) + c.JSON(200, gin.H{ "status": "stopped", "running": currentSession.AudioInputManager.IsRunning(), @@ -533,6 +546,9 @@ func handleWebRTCSignalWsMessages( if isCloudConnection { setCloudConnectionState(CloudConnectionStateDisconnected) } + // Clean up audio event subscription + broadcaster := GetAudioEventBroadcaster() + broadcaster.Unsubscribe(connectionID) cancelRun() }() @@ -690,6 +706,10 @@ func handleWebRTCSignalWsMessages( if err = currentSession.peerConnection.AddICECandidate(candidate); err != nil { l.Warn().Str("error", err.Error()).Msg("failed to add incoming ICE candidate to our peer connection") } + } else if message.Type == "subscribe-audio-events" { + l.Info().Msg("client subscribing to audio events") + broadcaster := GetAudioEventBroadcaster() + broadcaster.Subscribe(connectionID, wsCon, runCtx, &l) } } }