diff --git a/internal/audio/events.go b/internal/audio/events.go index 6ef65a6..4b99885 100644 --- a/internal/audio/events.go +++ b/internal/audio/events.go @@ -92,32 +92,26 @@ var ( audioEventOnce sync.Once ) +// initializeBroadcaster creates and initializes the audio event broadcaster +func initializeBroadcaster() { + l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger() + audioEventBroadcaster = &AudioEventBroadcaster{ + subscribers: make(map[string]*AudioEventSubscriber), + logger: &l, + } + + // Start metrics broadcasting goroutine + go audioEventBroadcaster.startMetricsBroadcasting() +} + // InitializeAudioEventBroadcaster initializes the global audio event broadcaster func InitializeAudioEventBroadcaster() { - audioEventOnce.Do(func() { - l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger() - audioEventBroadcaster = &AudioEventBroadcaster{ - subscribers: make(map[string]*AudioEventSubscriber), - logger: &l, - } - - // Start metrics broadcasting goroutine - go audioEventBroadcaster.startMetricsBroadcasting() - }) + audioEventOnce.Do(initializeBroadcaster) } // GetAudioEventBroadcaster returns the singleton audio event broadcaster func GetAudioEventBroadcaster() *AudioEventBroadcaster { - audioEventOnce.Do(func() { - l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger() - audioEventBroadcaster = &AudioEventBroadcaster{ - subscribers: make(map[string]*AudioEventSubscriber), - logger: &l, - } - - // Start metrics broadcasting goroutine - go audioEventBroadcaster.startMetricsBroadcasting() - }) + audioEventOnce.Do(initializeBroadcaster) return audioEventBroadcaster } @@ -157,22 +151,16 @@ func (aeb *AudioEventBroadcaster) Unsubscribe(connectionID string) { // BroadcastAudioMuteChanged broadcasts audio mute state changes func (aeb *AudioEventBroadcaster) BroadcastAudioMuteChanged(muted bool) { - event := AudioEvent{ - Type: AudioEventMuteChanged, - Data: AudioMuteData{Muted: muted}, - } + event := createAudioEvent(AudioEventMuteChanged, AudioMuteData{Muted: muted}) aeb.broadcast(event) } // BroadcastMicrophoneStateChanged broadcasts microphone state changes func (aeb *AudioEventBroadcaster) BroadcastMicrophoneStateChanged(running, sessionActive bool) { - event := AudioEvent{ - Type: AudioEventMicrophoneState, - Data: MicrophoneStateData{ - Running: running, - SessionActive: sessionActive, - }, - } + event := createAudioEvent(AudioEventMicrophoneState, MicrophoneStateData{ + Running: running, + SessionActive: sessionActive, + }) aeb.broadcast(event) } @@ -217,31 +205,121 @@ func (aeb *AudioEventBroadcaster) sendInitialState(connectionID string) { } // getMicrophoneProcessMetrics returns microphone process metrics data, always providing a valid response -// getInactiveProcessMetrics returns ProcessMetricsData for an inactive audio input process -func getInactiveProcessMetrics() ProcessMetricsData { - return ProcessMetricsData{ - PID: 0, - CPUPercent: 0.0, - MemoryRSS: 0, - MemoryVMS: 0, - MemoryPercent: 0.0, - Running: false, - ProcessName: "audio-input-server", +// convertAudioMetricsToEventData converts internal audio metrics to AudioMetricsData for events +func convertAudioMetricsToEventData(metrics AudioMetrics) AudioMetricsData { + return AudioMetricsData{ + FramesReceived: metrics.FramesReceived, + FramesDropped: metrics.FramesDropped, + BytesProcessed: metrics.BytesProcessed, + LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: metrics.ConnectionDrops, + AverageLatency: metrics.AverageLatency.String(), } } -func (aeb *AudioEventBroadcaster) getMicrophoneProcessMetrics() ProcessMetricsData { +// convertAudioMetricsToEventDataWithLatencyMs converts internal audio metrics to AudioMetricsData with millisecond latency formatting +func convertAudioMetricsToEventDataWithLatencyMs(metrics AudioMetrics) AudioMetricsData { + return AudioMetricsData{ + FramesReceived: metrics.FramesReceived, + FramesDropped: metrics.FramesDropped, + BytesProcessed: metrics.BytesProcessed, + LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: metrics.ConnectionDrops, + AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6), + } +} + +// convertAudioInputMetricsToEventData converts internal audio input metrics to MicrophoneMetricsData for events +func convertAudioInputMetricsToEventData(metrics AudioInputMetrics) MicrophoneMetricsData { + return MicrophoneMetricsData{ + FramesSent: metrics.FramesSent, + FramesDropped: metrics.FramesDropped, + BytesProcessed: metrics.BytesProcessed, + LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: metrics.ConnectionDrops, + AverageLatency: metrics.AverageLatency.String(), + } +} + +// convertAudioInputMetricsToEventDataWithLatencyMs converts internal audio input metrics to MicrophoneMetricsData with millisecond latency formatting +func convertAudioInputMetricsToEventDataWithLatencyMs(metrics AudioInputMetrics) MicrophoneMetricsData { + return MicrophoneMetricsData{ + FramesSent: metrics.FramesSent, + FramesDropped: metrics.FramesDropped, + BytesProcessed: metrics.BytesProcessed, + LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), + ConnectionDrops: metrics.ConnectionDrops, + AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6), + } +} + +// convertProcessMetricsToEventData converts internal process metrics to ProcessMetricsData for events +func convertProcessMetricsToEventData(metrics ProcessMetrics, running bool) ProcessMetricsData { + return ProcessMetricsData{ + PID: metrics.PID, + CPUPercent: metrics.CPUPercent, + MemoryRSS: metrics.MemoryRSS, + MemoryVMS: metrics.MemoryVMS, + MemoryPercent: metrics.MemoryPercent, + Running: running, + ProcessName: metrics.ProcessName, + } +} + +// createProcessMetricsData creates ProcessMetricsData from ProcessMetrics with running status +func createProcessMetricsData(metrics *ProcessMetrics, running bool, processName string) ProcessMetricsData { + if metrics == nil { + return ProcessMetricsData{ + PID: 0, + CPUPercent: 0.0, + MemoryRSS: 0, + MemoryVMS: 0, + MemoryPercent: 0.0, + Running: false, + ProcessName: processName, + } + } + return ProcessMetricsData{ + PID: metrics.PID, + CPUPercent: metrics.CPUPercent, + MemoryRSS: metrics.MemoryRSS, + MemoryVMS: metrics.MemoryVMS, + MemoryPercent: metrics.MemoryPercent, + Running: running, + ProcessName: metrics.ProcessName, + } +} + +// getInactiveProcessMetrics returns ProcessMetricsData for an inactive audio input process +func getInactiveProcessMetrics() ProcessMetricsData { + return createProcessMetricsData(nil, false, "audio-input-server") +} + +// getActiveAudioInputSupervisor safely retrieves the audio input supervisor if session is active +func getActiveAudioInputSupervisor() *AudioInputSupervisor { sessionProvider := GetSessionProvider() if !sessionProvider.IsSessionActive() { - return getInactiveProcessMetrics() + return nil } inputManager := sessionProvider.GetAudioInputManager() if inputManager == nil { - return getInactiveProcessMetrics() + return nil } - inputSupervisor := inputManager.GetSupervisor() + return inputManager.GetSupervisor() +} + +// createAudioEvent creates an AudioEvent +func createAudioEvent(eventType AudioEventType, data interface{}) AudioEvent { + return AudioEvent{ + Type: eventType, + Data: data, + } +} + +func (aeb *AudioEventBroadcaster) getMicrophoneProcessMetrics() ProcessMetricsData { + inputSupervisor := getActiveAudioInputSupervisor() if inputSupervisor == nil { return getInactiveProcessMetrics() } @@ -252,63 +330,26 @@ func (aeb *AudioEventBroadcaster) getMicrophoneProcessMetrics() ProcessMetricsDa } // If process is running but CPU is 0%, it means we're waiting for the second sample - // to calculate CPU percentage. Return metrics with correct running status but skip CPU data. + // to calculate CPU percentage. Return metrics with correct running status. if inputSupervisor.IsRunning() && processMetrics.CPUPercent == 0.0 { - return ProcessMetricsData{ - PID: processMetrics.PID, - CPUPercent: 0.0, // Keep 0% but with correct running status - MemoryRSS: processMetrics.MemoryRSS, - MemoryVMS: processMetrics.MemoryVMS, - MemoryPercent: processMetrics.MemoryPercent, - Running: true, // Correctly show as running - ProcessName: processMetrics.ProcessName, - } + return createProcessMetricsData(processMetrics, true, processMetrics.ProcessName) } // Subprocess is running, return actual metrics - return ProcessMetricsData{ - PID: processMetrics.PID, - CPUPercent: processMetrics.CPUPercent, - MemoryRSS: processMetrics.MemoryRSS, - MemoryVMS: processMetrics.MemoryVMS, - MemoryPercent: processMetrics.MemoryPercent, - Running: inputSupervisor.IsRunning(), - ProcessName: processMetrics.ProcessName, - } + return createProcessMetricsData(processMetrics, inputSupervisor.IsRunning(), processMetrics.ProcessName) } // sendCurrentMetrics sends current audio and microphone metrics to a subscriber func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubscriber) { // Send audio metrics audioMetrics := GetAudioMetrics() - audioMetricsEvent := AudioEvent{ - Type: AudioEventMetricsUpdate, - Data: AudioMetricsData{ - FramesReceived: audioMetrics.FramesReceived, - FramesDropped: audioMetrics.FramesDropped, - BytesProcessed: audioMetrics.BytesProcessed, - LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), - ConnectionDrops: audioMetrics.ConnectionDrops, - AverageLatency: audioMetrics.AverageLatency.String(), - }, - } + audioMetricsEvent := createAudioEvent(AudioEventMetricsUpdate, convertAudioMetricsToEventData(audioMetrics)) aeb.sendToSubscriber(subscriber, audioMetricsEvent) // Send audio process metrics if outputSupervisor := GetAudioOutputSupervisor(); outputSupervisor != nil { if processMetrics := outputSupervisor.GetProcessMetrics(); processMetrics != nil { - audioProcessEvent := AudioEvent{ - Type: AudioEventProcessMetrics, - Data: ProcessMetricsData{ - PID: processMetrics.PID, - CPUPercent: processMetrics.CPUPercent, - MemoryRSS: processMetrics.MemoryRSS, - MemoryVMS: processMetrics.MemoryVMS, - MemoryPercent: processMetrics.MemoryPercent, - Running: outputSupervisor.IsRunning(), - ProcessName: processMetrics.ProcessName, - }, - } + audioProcessEvent := createAudioEvent(AudioEventProcessMetrics, convertProcessMetricsToEventData(*processMetrics, outputSupervisor.IsRunning())) aeb.sendToSubscriber(subscriber, audioProcessEvent) } } @@ -318,26 +359,13 @@ func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubsc if sessionProvider.IsSessionActive() { if inputManager := sessionProvider.GetAudioInputManager(); inputManager != nil { micMetrics := inputManager.GetMetrics() - micMetricsEvent := AudioEvent{ - Type: AudioEventMicrophoneMetrics, - Data: MicrophoneMetricsData{ - FramesSent: micMetrics.FramesSent, - FramesDropped: micMetrics.FramesDropped, - BytesProcessed: micMetrics.BytesProcessed, - LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), - ConnectionDrops: micMetrics.ConnectionDrops, - AverageLatency: micMetrics.AverageLatency.String(), - }, - } + micMetricsEvent := createAudioEvent(AudioEventMicrophoneMetrics, convertAudioInputMetricsToEventData(micMetrics)) aeb.sendToSubscriber(subscriber, micMetricsEvent) } } // Send microphone process metrics (always send, even when subprocess is not running) - micProcessEvent := AudioEvent{ - Type: AudioEventMicProcessMetrics, - Data: aeb.getMicrophoneProcessMetrics(), - } + micProcessEvent := createAudioEvent(AudioEventMicProcessMetrics, aeb.getMicrophoneProcessMetrics()) aeb.sendToSubscriber(subscriber, micProcessEvent) } @@ -379,17 +407,7 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { // Broadcast audio metrics audioMetrics := GetAudioMetrics() - audioMetricsEvent := AudioEvent{ - Type: AudioEventMetricsUpdate, - Data: AudioMetricsData{ - FramesReceived: audioMetrics.FramesReceived, - FramesDropped: audioMetrics.FramesDropped, - BytesProcessed: audioMetrics.BytesProcessed, - LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), - ConnectionDrops: audioMetrics.ConnectionDrops, - AverageLatency: fmt.Sprintf("%.1fms", float64(audioMetrics.AverageLatency.Nanoseconds())/1e6), - }, - } + audioMetricsEvent := createAudioEvent(AudioEventMetricsUpdate, convertAudioMetricsToEventDataWithLatencyMs(audioMetrics)) aeb.broadcast(audioMetricsEvent) // Broadcast microphone metrics if available using session provider @@ -397,17 +415,7 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { if sessionProvider.IsSessionActive() { if inputManager := sessionProvider.GetAudioInputManager(); inputManager != nil { micMetrics := inputManager.GetMetrics() - micMetricsEvent := AudioEvent{ - Type: AudioEventMicrophoneMetrics, - Data: MicrophoneMetricsData{ - FramesSent: micMetrics.FramesSent, - FramesDropped: micMetrics.FramesDropped, - BytesProcessed: micMetrics.BytesProcessed, - LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"), - ConnectionDrops: micMetrics.ConnectionDrops, - AverageLatency: fmt.Sprintf("%.1fms", float64(micMetrics.AverageLatency.Nanoseconds())/1e6), - }, - } + micMetricsEvent := createAudioEvent(AudioEventMicrophoneMetrics, convertAudioInputMetricsToEventDataWithLatencyMs(micMetrics)) aeb.broadcast(micMetricsEvent) } } @@ -415,27 +423,13 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() { // Broadcast audio process metrics if outputSupervisor := GetAudioOutputSupervisor(); outputSupervisor != nil { if processMetrics := outputSupervisor.GetProcessMetrics(); processMetrics != nil { - audioProcessEvent := AudioEvent{ - Type: AudioEventProcessMetrics, - Data: ProcessMetricsData{ - PID: processMetrics.PID, - CPUPercent: processMetrics.CPUPercent, - MemoryRSS: processMetrics.MemoryRSS, - MemoryVMS: processMetrics.MemoryVMS, - MemoryPercent: processMetrics.MemoryPercent, - Running: outputSupervisor.IsRunning(), - ProcessName: processMetrics.ProcessName, - }, - } + audioProcessEvent := createAudioEvent(AudioEventProcessMetrics, convertProcessMetricsToEventData(*processMetrics, outputSupervisor.IsRunning())) aeb.broadcast(audioProcessEvent) } } // Broadcast microphone process metrics (always broadcast, even when subprocess is not running) - micProcessEvent := AudioEvent{ - Type: AudioEventMicProcessMetrics, - Data: aeb.getMicrophoneProcessMetrics(), - } + micProcessEvent := createAudioEvent(AudioEventMicProcessMetrics, aeb.getMicrophoneProcessMetrics()) aeb.broadcast(micProcessEvent) } }