mirror of https://github.com/jetkvm/kvm.git
Compare commits
7 Commits
2082b1a671
...
88679cda2f
Author | SHA1 | Date |
---|---|---|
|
88679cda2f | |
|
76174f4486 | |
|
27a999c58a | |
|
ddc2f90016 | |
|
692f7ddb2d | |
|
38ad145863 | |
|
879ea5e472 |
|
@ -32,7 +32,6 @@ type AudioConfig struct {
|
|||
}
|
||||
|
||||
// AudioMetrics tracks audio performance metrics
|
||||
|
||||
type AudioMetrics struct {
|
||||
FramesReceived int64
|
||||
FramesDropped int64
|
||||
|
|
|
@ -2,8 +2,6 @@ package audio
|
|||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
)
|
||||
|
||||
var audioMuteState struct {
|
||||
|
@ -13,9 +11,7 @@ var audioMuteState struct {
|
|||
|
||||
func SetAudioMuted(muted bool) {
|
||||
audioMuteState.mu.Lock()
|
||||
prev := audioMuteState.muted
|
||||
audioMuteState.muted = muted
|
||||
logging.GetDefaultLogger().Info().Str("component", "audio").Msgf("SetAudioMuted: prev=%v, new=%v", prev, muted)
|
||||
audioMuteState.mu.Unlock()
|
||||
}
|
||||
|
||||
|
|
|
@ -4,65 +4,53 @@ import (
|
|||
"sync"
|
||||
)
|
||||
|
||||
// AudioBufferPool manages reusable audio buffers to reduce allocations
|
||||
type AudioBufferPool struct {
|
||||
pool sync.Pool
|
||||
pool sync.Pool
|
||||
bufferSize int
|
||||
}
|
||||
|
||||
// NewAudioBufferPool creates a new buffer pool for audio frames
|
||||
func NewAudioBufferPool(bufferSize int) *AudioBufferPool {
|
||||
return &AudioBufferPool{
|
||||
bufferSize: bufferSize,
|
||||
pool: sync.Pool{
|
||||
New: func() interface{} {
|
||||
// Pre-allocate buffer with specified size
|
||||
return make([]byte, bufferSize)
|
||||
return make([]byte, 0, bufferSize)
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Get retrieves a buffer from the pool
|
||||
func (p *AudioBufferPool) Get() []byte {
|
||||
if buf := p.pool.Get(); buf != nil {
|
||||
return *buf.(*[]byte)
|
||||
}
|
||||
return make([]byte, 0, 1500) // fallback if pool is empty
|
||||
return make([]byte, 0, p.bufferSize)
|
||||
}
|
||||
|
||||
// Put returns a buffer to the pool
|
||||
func (p *AudioBufferPool) Put(buf []byte) {
|
||||
// Reset length but keep capacity for reuse
|
||||
if cap(buf) >= 1500 { // Only pool buffers of reasonable size
|
||||
if cap(buf) >= p.bufferSize {
|
||||
resetBuf := buf[:0]
|
||||
p.pool.Put(&resetBuf)
|
||||
}
|
||||
}
|
||||
|
||||
// Global buffer pools for different audio operations
|
||||
var (
|
||||
// Pool for 1500-byte audio frame buffers (Opus max frame size)
|
||||
audioFramePool = NewAudioBufferPool(1500)
|
||||
|
||||
// Pool for smaller control buffers
|
||||
audioFramePool = NewAudioBufferPool(1500)
|
||||
audioControlPool = NewAudioBufferPool(64)
|
||||
)
|
||||
|
||||
// GetAudioFrameBuffer gets a reusable buffer for audio frames
|
||||
func GetAudioFrameBuffer() []byte {
|
||||
return audioFramePool.Get()
|
||||
}
|
||||
|
||||
// PutAudioFrameBuffer returns a buffer to the frame pool
|
||||
func PutAudioFrameBuffer(buf []byte) {
|
||||
audioFramePool.Put(buf)
|
||||
}
|
||||
|
||||
// GetAudioControlBuffer gets a reusable buffer for control data
|
||||
func GetAudioControlBuffer() []byte {
|
||||
return audioControlPool.Get()
|
||||
}
|
||||
|
||||
// PutAudioControlBuffer returns a buffer to the control pool
|
||||
func PutAudioControlBuffer(buf []byte) {
|
||||
audioControlPool.Put(buf)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
package audio
|
||||
|
||||
import "time"
|
||||
|
||||
// MonitoringConfig contains configuration constants for audio monitoring
|
||||
type MonitoringConfig struct {
|
||||
// MetricsUpdateInterval defines how often metrics are collected and broadcast
|
||||
MetricsUpdateInterval time.Duration
|
||||
}
|
||||
|
||||
// DefaultMonitoringConfig returns the default monitoring configuration
|
||||
func DefaultMonitoringConfig() MonitoringConfig {
|
||||
return MonitoringConfig{
|
||||
MetricsUpdateInterval: 1000 * time.Millisecond, // 1 second interval
|
||||
}
|
||||
}
|
||||
|
||||
// Global monitoring configuration instance
|
||||
var monitoringConfig = DefaultMonitoringConfig()
|
||||
|
||||
// GetMetricsUpdateInterval returns the current metrics update interval
|
||||
func GetMetricsUpdateInterval() time.Duration {
|
||||
return monitoringConfig.MetricsUpdateInterval
|
||||
}
|
||||
|
||||
// SetMetricsUpdateInterval sets the metrics update interval
|
||||
func SetMetricsUpdateInterval(interval time.Duration) {
|
||||
monitoringConfig.MetricsUpdateInterval = interval
|
||||
}
|
|
@ -92,32 +92,26 @@ var (
|
|||
audioEventOnce sync.Once
|
||||
)
|
||||
|
||||
// initializeBroadcaster creates and initializes the audio event broadcaster
|
||||
func initializeBroadcaster() {
|
||||
l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger()
|
||||
audioEventBroadcaster = &AudioEventBroadcaster{
|
||||
subscribers: make(map[string]*AudioEventSubscriber),
|
||||
logger: &l,
|
||||
}
|
||||
|
||||
// Start metrics broadcasting goroutine
|
||||
go audioEventBroadcaster.startMetricsBroadcasting()
|
||||
}
|
||||
|
||||
// InitializeAudioEventBroadcaster initializes the global audio event broadcaster
|
||||
func InitializeAudioEventBroadcaster() {
|
||||
audioEventOnce.Do(func() {
|
||||
l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger()
|
||||
audioEventBroadcaster = &AudioEventBroadcaster{
|
||||
subscribers: make(map[string]*AudioEventSubscriber),
|
||||
logger: &l,
|
||||
}
|
||||
|
||||
// Start metrics broadcasting goroutine
|
||||
go audioEventBroadcaster.startMetricsBroadcasting()
|
||||
})
|
||||
audioEventOnce.Do(initializeBroadcaster)
|
||||
}
|
||||
|
||||
// GetAudioEventBroadcaster returns the singleton audio event broadcaster
|
||||
func GetAudioEventBroadcaster() *AudioEventBroadcaster {
|
||||
audioEventOnce.Do(func() {
|
||||
l := logging.GetDefaultLogger().With().Str("component", "audio-events").Logger()
|
||||
audioEventBroadcaster = &AudioEventBroadcaster{
|
||||
subscribers: make(map[string]*AudioEventSubscriber),
|
||||
logger: &l,
|
||||
}
|
||||
|
||||
// Start metrics broadcasting goroutine
|
||||
go audioEventBroadcaster.startMetricsBroadcasting()
|
||||
})
|
||||
audioEventOnce.Do(initializeBroadcaster)
|
||||
return audioEventBroadcaster
|
||||
}
|
||||
|
||||
|
@ -157,22 +151,16 @@ func (aeb *AudioEventBroadcaster) Unsubscribe(connectionID string) {
|
|||
|
||||
// BroadcastAudioMuteChanged broadcasts audio mute state changes
|
||||
func (aeb *AudioEventBroadcaster) BroadcastAudioMuteChanged(muted bool) {
|
||||
event := AudioEvent{
|
||||
Type: AudioEventMuteChanged,
|
||||
Data: AudioMuteData{Muted: muted},
|
||||
}
|
||||
event := createAudioEvent(AudioEventMuteChanged, AudioMuteData{Muted: muted})
|
||||
aeb.broadcast(event)
|
||||
}
|
||||
|
||||
// BroadcastMicrophoneStateChanged broadcasts microphone state changes
|
||||
func (aeb *AudioEventBroadcaster) BroadcastMicrophoneStateChanged(running, sessionActive bool) {
|
||||
event := AudioEvent{
|
||||
Type: AudioEventMicrophoneState,
|
||||
Data: MicrophoneStateData{
|
||||
Running: running,
|
||||
SessionActive: sessionActive,
|
||||
},
|
||||
}
|
||||
event := createAudioEvent(AudioEventMicrophoneState, MicrophoneStateData{
|
||||
Running: running,
|
||||
SessionActive: sessionActive,
|
||||
})
|
||||
aeb.broadcast(event)
|
||||
}
|
||||
|
||||
|
@ -216,38 +204,151 @@ func (aeb *AudioEventBroadcaster) sendInitialState(connectionID string) {
|
|||
aeb.sendCurrentMetrics(subscriber)
|
||||
}
|
||||
|
||||
// convertAudioMetricsToEventData converts internal audio metrics to AudioMetricsData for events
|
||||
func convertAudioMetricsToEventData(metrics AudioMetrics) AudioMetricsData {
|
||||
return AudioMetricsData{
|
||||
FramesReceived: metrics.FramesReceived,
|
||||
FramesDropped: metrics.FramesDropped,
|
||||
BytesProcessed: metrics.BytesProcessed,
|
||||
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: metrics.ConnectionDrops,
|
||||
AverageLatency: metrics.AverageLatency.String(),
|
||||
}
|
||||
}
|
||||
|
||||
// convertAudioMetricsToEventDataWithLatencyMs converts internal audio metrics to AudioMetricsData with millisecond latency formatting
|
||||
func convertAudioMetricsToEventDataWithLatencyMs(metrics AudioMetrics) AudioMetricsData {
|
||||
return AudioMetricsData{
|
||||
FramesReceived: metrics.FramesReceived,
|
||||
FramesDropped: metrics.FramesDropped,
|
||||
BytesProcessed: metrics.BytesProcessed,
|
||||
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: metrics.ConnectionDrops,
|
||||
AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6),
|
||||
}
|
||||
}
|
||||
|
||||
// convertAudioInputMetricsToEventData converts internal audio input metrics to MicrophoneMetricsData for events
|
||||
func convertAudioInputMetricsToEventData(metrics AudioInputMetrics) MicrophoneMetricsData {
|
||||
return MicrophoneMetricsData{
|
||||
FramesSent: metrics.FramesSent,
|
||||
FramesDropped: metrics.FramesDropped,
|
||||
BytesProcessed: metrics.BytesProcessed,
|
||||
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: metrics.ConnectionDrops,
|
||||
AverageLatency: metrics.AverageLatency.String(),
|
||||
}
|
||||
}
|
||||
|
||||
// convertAudioInputMetricsToEventDataWithLatencyMs converts internal audio input metrics to MicrophoneMetricsData with millisecond latency formatting
|
||||
func convertAudioInputMetricsToEventDataWithLatencyMs(metrics AudioInputMetrics) MicrophoneMetricsData {
|
||||
return MicrophoneMetricsData{
|
||||
FramesSent: metrics.FramesSent,
|
||||
FramesDropped: metrics.FramesDropped,
|
||||
BytesProcessed: metrics.BytesProcessed,
|
||||
LastFrameTime: metrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: metrics.ConnectionDrops,
|
||||
AverageLatency: fmt.Sprintf("%.1fms", float64(metrics.AverageLatency.Nanoseconds())/1e6),
|
||||
}
|
||||
}
|
||||
|
||||
// convertProcessMetricsToEventData converts internal process metrics to ProcessMetricsData for events
|
||||
func convertProcessMetricsToEventData(metrics ProcessMetrics, running bool) ProcessMetricsData {
|
||||
return ProcessMetricsData{
|
||||
PID: metrics.PID,
|
||||
CPUPercent: metrics.CPUPercent,
|
||||
MemoryRSS: metrics.MemoryRSS,
|
||||
MemoryVMS: metrics.MemoryVMS,
|
||||
MemoryPercent: metrics.MemoryPercent,
|
||||
Running: running,
|
||||
ProcessName: metrics.ProcessName,
|
||||
}
|
||||
}
|
||||
|
||||
// createProcessMetricsData creates ProcessMetricsData from ProcessMetrics with running status
|
||||
func createProcessMetricsData(metrics *ProcessMetrics, running bool, processName string) ProcessMetricsData {
|
||||
if metrics == nil {
|
||||
return ProcessMetricsData{
|
||||
PID: 0,
|
||||
CPUPercent: 0.0,
|
||||
MemoryRSS: 0,
|
||||
MemoryVMS: 0,
|
||||
MemoryPercent: 0.0,
|
||||
Running: false,
|
||||
ProcessName: processName,
|
||||
}
|
||||
}
|
||||
return ProcessMetricsData{
|
||||
PID: metrics.PID,
|
||||
CPUPercent: metrics.CPUPercent,
|
||||
MemoryRSS: metrics.MemoryRSS,
|
||||
MemoryVMS: metrics.MemoryVMS,
|
||||
MemoryPercent: metrics.MemoryPercent,
|
||||
Running: running,
|
||||
ProcessName: metrics.ProcessName,
|
||||
}
|
||||
}
|
||||
|
||||
// getInactiveProcessMetrics returns ProcessMetricsData for an inactive audio input process
|
||||
func getInactiveProcessMetrics() ProcessMetricsData {
|
||||
return createProcessMetricsData(nil, false, "audio-input-server")
|
||||
}
|
||||
|
||||
// getActiveAudioInputSupervisor safely retrieves the audio input supervisor if session is active
|
||||
func getActiveAudioInputSupervisor() *AudioInputSupervisor {
|
||||
sessionProvider := GetSessionProvider()
|
||||
if !sessionProvider.IsSessionActive() {
|
||||
return nil
|
||||
}
|
||||
|
||||
inputManager := sessionProvider.GetAudioInputManager()
|
||||
if inputManager == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return inputManager.GetSupervisor()
|
||||
}
|
||||
|
||||
// createAudioEvent creates an AudioEvent
|
||||
func createAudioEvent(eventType AudioEventType, data interface{}) AudioEvent {
|
||||
return AudioEvent{
|
||||
Type: eventType,
|
||||
Data: data,
|
||||
}
|
||||
}
|
||||
|
||||
func (aeb *AudioEventBroadcaster) getMicrophoneProcessMetrics() ProcessMetricsData {
|
||||
inputSupervisor := getActiveAudioInputSupervisor()
|
||||
if inputSupervisor == nil {
|
||||
return getInactiveProcessMetrics()
|
||||
}
|
||||
|
||||
processMetrics := inputSupervisor.GetProcessMetrics()
|
||||
if processMetrics == nil {
|
||||
return getInactiveProcessMetrics()
|
||||
}
|
||||
|
||||
// If process is running but CPU is 0%, it means we're waiting for the second sample
|
||||
// to calculate CPU percentage. Return metrics with correct running status.
|
||||
if inputSupervisor.IsRunning() && processMetrics.CPUPercent == 0.0 {
|
||||
return createProcessMetricsData(processMetrics, true, processMetrics.ProcessName)
|
||||
}
|
||||
|
||||
// Subprocess is running, return actual metrics
|
||||
return createProcessMetricsData(processMetrics, inputSupervisor.IsRunning(), processMetrics.ProcessName)
|
||||
}
|
||||
|
||||
// sendCurrentMetrics sends current audio and microphone metrics to a subscriber
|
||||
func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubscriber) {
|
||||
// Send audio metrics
|
||||
audioMetrics := GetAudioMetrics()
|
||||
audioMetricsEvent := AudioEvent{
|
||||
Type: AudioEventMetricsUpdate,
|
||||
Data: AudioMetricsData{
|
||||
FramesReceived: audioMetrics.FramesReceived,
|
||||
FramesDropped: audioMetrics.FramesDropped,
|
||||
BytesProcessed: audioMetrics.BytesProcessed,
|
||||
LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: audioMetrics.ConnectionDrops,
|
||||
AverageLatency: audioMetrics.AverageLatency.String(),
|
||||
},
|
||||
}
|
||||
audioMetricsEvent := createAudioEvent(AudioEventMetricsUpdate, convertAudioMetricsToEventData(audioMetrics))
|
||||
aeb.sendToSubscriber(subscriber, audioMetricsEvent)
|
||||
|
||||
// Send audio process metrics
|
||||
if outputSupervisor := GetAudioOutputSupervisor(); outputSupervisor != nil {
|
||||
if processMetrics := outputSupervisor.GetProcessMetrics(); processMetrics != nil {
|
||||
audioProcessEvent := AudioEvent{
|
||||
Type: AudioEventProcessMetrics,
|
||||
Data: ProcessMetricsData{
|
||||
PID: processMetrics.PID,
|
||||
CPUPercent: processMetrics.CPUPercent,
|
||||
MemoryRSS: processMetrics.MemoryRSS,
|
||||
MemoryVMS: processMetrics.MemoryVMS,
|
||||
MemoryPercent: processMetrics.MemoryPercent,
|
||||
Running: outputSupervisor.IsRunning(),
|
||||
ProcessName: processMetrics.ProcessName,
|
||||
},
|
||||
}
|
||||
audioProcessEvent := createAudioEvent(AudioEventProcessMetrics, convertProcessMetricsToEventData(*processMetrics, outputSupervisor.IsRunning()))
|
||||
aeb.sendToSubscriber(subscriber, audioProcessEvent)
|
||||
}
|
||||
}
|
||||
|
@ -257,45 +358,20 @@ func (aeb *AudioEventBroadcaster) sendCurrentMetrics(subscriber *AudioEventSubsc
|
|||
if sessionProvider.IsSessionActive() {
|
||||
if inputManager := sessionProvider.GetAudioInputManager(); inputManager != nil {
|
||||
micMetrics := inputManager.GetMetrics()
|
||||
micMetricsEvent := AudioEvent{
|
||||
Type: AudioEventMicrophoneMetrics,
|
||||
Data: MicrophoneMetricsData{
|
||||
FramesSent: micMetrics.FramesSent,
|
||||
FramesDropped: micMetrics.FramesDropped,
|
||||
BytesProcessed: micMetrics.BytesProcessed,
|
||||
LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: micMetrics.ConnectionDrops,
|
||||
AverageLatency: micMetrics.AverageLatency.String(),
|
||||
},
|
||||
}
|
||||
micMetricsEvent := createAudioEvent(AudioEventMicrophoneMetrics, convertAudioInputMetricsToEventData(micMetrics))
|
||||
aeb.sendToSubscriber(subscriber, micMetricsEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// Send microphone process metrics
|
||||
if inputSupervisor := GetAudioInputIPCSupervisor(); inputSupervisor != nil {
|
||||
if processMetrics := inputSupervisor.GetProcessMetrics(); processMetrics != nil {
|
||||
micProcessEvent := AudioEvent{
|
||||
Type: AudioEventMicProcessMetrics,
|
||||
Data: ProcessMetricsData{
|
||||
PID: processMetrics.PID,
|
||||
CPUPercent: processMetrics.CPUPercent,
|
||||
MemoryRSS: processMetrics.MemoryRSS,
|
||||
MemoryVMS: processMetrics.MemoryVMS,
|
||||
MemoryPercent: processMetrics.MemoryPercent,
|
||||
Running: inputSupervisor.IsRunning(),
|
||||
ProcessName: processMetrics.ProcessName,
|
||||
},
|
||||
}
|
||||
aeb.sendToSubscriber(subscriber, micProcessEvent)
|
||||
}
|
||||
}
|
||||
// Send microphone process metrics (always send, even when subprocess is not running)
|
||||
micProcessEvent := createAudioEvent(AudioEventMicProcessMetrics, aeb.getMicrophoneProcessMetrics())
|
||||
aeb.sendToSubscriber(subscriber, micProcessEvent)
|
||||
}
|
||||
|
||||
// startMetricsBroadcasting starts a goroutine that periodically broadcasts metrics
|
||||
func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
|
||||
// Use 1-second interval to match Connection Stats sidebar frequency for smooth histogram progression
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
// Use centralized interval to match process monitor frequency for synchronized metrics
|
||||
ticker := time.NewTicker(GetMetricsUpdateInterval())
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
|
@ -330,17 +406,7 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
|
|||
|
||||
// Broadcast audio metrics
|
||||
audioMetrics := GetAudioMetrics()
|
||||
audioMetricsEvent := AudioEvent{
|
||||
Type: AudioEventMetricsUpdate,
|
||||
Data: AudioMetricsData{
|
||||
FramesReceived: audioMetrics.FramesReceived,
|
||||
FramesDropped: audioMetrics.FramesDropped,
|
||||
BytesProcessed: audioMetrics.BytesProcessed,
|
||||
LastFrameTime: audioMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: audioMetrics.ConnectionDrops,
|
||||
AverageLatency: fmt.Sprintf("%.1fms", float64(audioMetrics.AverageLatency.Nanoseconds())/1e6),
|
||||
},
|
||||
}
|
||||
audioMetricsEvent := createAudioEvent(AudioEventMetricsUpdate, convertAudioMetricsToEventDataWithLatencyMs(audioMetrics))
|
||||
aeb.broadcast(audioMetricsEvent)
|
||||
|
||||
// Broadcast microphone metrics if available using session provider
|
||||
|
@ -348,17 +414,7 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
|
|||
if sessionProvider.IsSessionActive() {
|
||||
if inputManager := sessionProvider.GetAudioInputManager(); inputManager != nil {
|
||||
micMetrics := inputManager.GetMetrics()
|
||||
micMetricsEvent := AudioEvent{
|
||||
Type: AudioEventMicrophoneMetrics,
|
||||
Data: MicrophoneMetricsData{
|
||||
FramesSent: micMetrics.FramesSent,
|
||||
FramesDropped: micMetrics.FramesDropped,
|
||||
BytesProcessed: micMetrics.BytesProcessed,
|
||||
LastFrameTime: micMetrics.LastFrameTime.Format("2006-01-02T15:04:05.000Z"),
|
||||
ConnectionDrops: micMetrics.ConnectionDrops,
|
||||
AverageLatency: fmt.Sprintf("%.1fms", float64(micMetrics.AverageLatency.Nanoseconds())/1e6),
|
||||
},
|
||||
}
|
||||
micMetricsEvent := createAudioEvent(AudioEventMicrophoneMetrics, convertAudioInputMetricsToEventDataWithLatencyMs(micMetrics))
|
||||
aeb.broadcast(micMetricsEvent)
|
||||
}
|
||||
}
|
||||
|
@ -366,40 +422,14 @@ func (aeb *AudioEventBroadcaster) startMetricsBroadcasting() {
|
|||
// Broadcast audio process metrics
|
||||
if outputSupervisor := GetAudioOutputSupervisor(); outputSupervisor != nil {
|
||||
if processMetrics := outputSupervisor.GetProcessMetrics(); processMetrics != nil {
|
||||
audioProcessEvent := AudioEvent{
|
||||
Type: AudioEventProcessMetrics,
|
||||
Data: ProcessMetricsData{
|
||||
PID: processMetrics.PID,
|
||||
CPUPercent: processMetrics.CPUPercent,
|
||||
MemoryRSS: processMetrics.MemoryRSS,
|
||||
MemoryVMS: processMetrics.MemoryVMS,
|
||||
MemoryPercent: processMetrics.MemoryPercent,
|
||||
Running: outputSupervisor.IsRunning(),
|
||||
ProcessName: processMetrics.ProcessName,
|
||||
},
|
||||
}
|
||||
audioProcessEvent := createAudioEvent(AudioEventProcessMetrics, convertProcessMetricsToEventData(*processMetrics, outputSupervisor.IsRunning()))
|
||||
aeb.broadcast(audioProcessEvent)
|
||||
}
|
||||
}
|
||||
|
||||
// Broadcast microphone process metrics
|
||||
if inputSupervisor := GetAudioInputIPCSupervisor(); inputSupervisor != nil {
|
||||
if processMetrics := inputSupervisor.GetProcessMetrics(); processMetrics != nil {
|
||||
micProcessEvent := AudioEvent{
|
||||
Type: AudioEventMicProcessMetrics,
|
||||
Data: ProcessMetricsData{
|
||||
PID: processMetrics.PID,
|
||||
CPUPercent: processMetrics.CPUPercent,
|
||||
MemoryRSS: processMetrics.MemoryRSS,
|
||||
MemoryVMS: processMetrics.MemoryVMS,
|
||||
MemoryPercent: processMetrics.MemoryPercent,
|
||||
Running: inputSupervisor.IsRunning(),
|
||||
ProcessName: processMetrics.ProcessName,
|
||||
},
|
||||
}
|
||||
aeb.broadcast(micProcessEvent)
|
||||
}
|
||||
}
|
||||
// Broadcast microphone process metrics (always broadcast, even when subprocess is not running)
|
||||
micProcessEvent := createAudioEvent(AudioEventMicProcessMetrics, aeb.getMicrophoneProcessMetrics())
|
||||
aeb.broadcast(micProcessEvent)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
@ -10,51 +11,59 @@ import (
|
|||
|
||||
// AudioInputIPCManager manages microphone input using IPC when enabled
|
||||
type AudioInputIPCManager struct {
|
||||
// metrics MUST be first for ARM32 alignment (contains int64 fields)
|
||||
metrics AudioInputMetrics
|
||||
|
||||
supervisor *AudioInputSupervisor
|
||||
logger zerolog.Logger
|
||||
running int32
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewAudioInputIPCManager creates a new IPC-based audio input manager
|
||||
func NewAudioInputIPCManager() *AudioInputIPCManager {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &AudioInputIPCManager{
|
||||
supervisor: NewAudioInputSupervisor(),
|
||||
logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the IPC-based audio input system
|
||||
func (aim *AudioInputIPCManager) Start() error {
|
||||
if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) {
|
||||
return nil // Already running
|
||||
return nil
|
||||
}
|
||||
|
||||
aim.logger.Info().Msg("Starting IPC-based audio input system")
|
||||
|
||||
// Start the supervisor which will launch the subprocess
|
||||
err := aim.supervisor.Start()
|
||||
if err != nil {
|
||||
atomic.StoreInt32(&aim.running, 0)
|
||||
aim.logger.Error().Err(err).Msg("Failed to start audio input supervisor")
|
||||
return err
|
||||
}
|
||||
|
||||
// Send initial configuration
|
||||
config := InputIPCConfig{
|
||||
SampleRate: 48000,
|
||||
Channels: 2,
|
||||
FrameSize: 960, // 20ms at 48kHz
|
||||
FrameSize: 960,
|
||||
}
|
||||
|
||||
// Wait briefly for the subprocess to be ready (reduced from 1 second)
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
// Wait with timeout for subprocess readiness
|
||||
select {
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
case <-aim.ctx.Done():
|
||||
aim.supervisor.Stop()
|
||||
atomic.StoreInt32(&aim.running, 0)
|
||||
return aim.ctx.Err()
|
||||
}
|
||||
|
||||
err = aim.supervisor.SendConfig(config)
|
||||
if err != nil {
|
||||
aim.logger.Warn().Err(err).Msg("Failed to send initial config to audio input server")
|
||||
// Don't fail startup for config errors
|
||||
aim.logger.Warn().Err(err).Msg("Failed to send initial config, will retry later")
|
||||
}
|
||||
|
||||
aim.logger.Info().Msg("IPC-based audio input system started")
|
||||
|
@ -64,14 +73,12 @@ func (aim *AudioInputIPCManager) Start() error {
|
|||
// Stop stops the IPC-based audio input system
|
||||
func (aim *AudioInputIPCManager) Stop() {
|
||||
if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) {
|
||||
return // Already stopped
|
||||
return
|
||||
}
|
||||
|
||||
aim.logger.Info().Msg("Stopping IPC-based audio input system")
|
||||
|
||||
// Stop the supervisor
|
||||
aim.cancel()
|
||||
aim.supervisor.Stop()
|
||||
|
||||
aim.logger.Info().Msg("IPC-based audio input system stopped")
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@ package audio
|
|||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
@ -226,7 +227,7 @@ var (
|
|||
|
||||
// Metrics update tracking
|
||||
metricsUpdateMutex sync.RWMutex
|
||||
lastMetricsUpdate time.Time
|
||||
lastMetricsUpdate int64
|
||||
|
||||
// Counter value tracking (since prometheus counters don't have Get() method)
|
||||
audioFramesReceivedValue int64
|
||||
|
@ -241,28 +242,24 @@ var (
|
|||
|
||||
// UpdateAudioMetrics updates Prometheus metrics with current audio data
|
||||
func UpdateAudioMetrics(metrics AudioMetrics) {
|
||||
metricsUpdateMutex.Lock()
|
||||
defer metricsUpdateMutex.Unlock()
|
||||
|
||||
// Update counters with delta values
|
||||
if metrics.FramesReceived > audioFramesReceivedValue {
|
||||
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - audioFramesReceivedValue))
|
||||
audioFramesReceivedValue = metrics.FramesReceived
|
||||
oldReceived := atomic.SwapInt64(&audioFramesReceivedValue, metrics.FramesReceived)
|
||||
if metrics.FramesReceived > oldReceived {
|
||||
audioFramesReceivedTotal.Add(float64(metrics.FramesReceived - oldReceived))
|
||||
}
|
||||
|
||||
if metrics.FramesDropped > audioFramesDroppedValue {
|
||||
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - audioFramesDroppedValue))
|
||||
audioFramesDroppedValue = metrics.FramesDropped
|
||||
oldDropped := atomic.SwapInt64(&audioFramesDroppedValue, metrics.FramesDropped)
|
||||
if metrics.FramesDropped > oldDropped {
|
||||
audioFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
|
||||
}
|
||||
|
||||
if metrics.BytesProcessed > audioBytesProcessedValue {
|
||||
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - audioBytesProcessedValue))
|
||||
audioBytesProcessedValue = metrics.BytesProcessed
|
||||
oldBytes := atomic.SwapInt64(&audioBytesProcessedValue, metrics.BytesProcessed)
|
||||
if metrics.BytesProcessed > oldBytes {
|
||||
audioBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
|
||||
}
|
||||
|
||||
if metrics.ConnectionDrops > audioConnectionDropsValue {
|
||||
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - audioConnectionDropsValue))
|
||||
audioConnectionDropsValue = metrics.ConnectionDrops
|
||||
oldDrops := atomic.SwapInt64(&audioConnectionDropsValue, metrics.ConnectionDrops)
|
||||
if metrics.ConnectionDrops > oldDrops {
|
||||
audioConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
|
||||
}
|
||||
|
||||
// Update gauges
|
||||
|
@ -271,33 +268,29 @@ func UpdateAudioMetrics(metrics AudioMetrics) {
|
|||
audioLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix()))
|
||||
}
|
||||
|
||||
lastMetricsUpdate = time.Now()
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateMicrophoneMetrics updates Prometheus metrics with current microphone data
|
||||
func UpdateMicrophoneMetrics(metrics AudioInputMetrics) {
|
||||
metricsUpdateMutex.Lock()
|
||||
defer metricsUpdateMutex.Unlock()
|
||||
|
||||
// Update counters with delta values
|
||||
if metrics.FramesSent > micFramesSentValue {
|
||||
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - micFramesSentValue))
|
||||
micFramesSentValue = metrics.FramesSent
|
||||
oldSent := atomic.SwapInt64(&micFramesSentValue, metrics.FramesSent)
|
||||
if metrics.FramesSent > oldSent {
|
||||
microphoneFramesSentTotal.Add(float64(metrics.FramesSent - oldSent))
|
||||
}
|
||||
|
||||
if metrics.FramesDropped > micFramesDroppedValue {
|
||||
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - micFramesDroppedValue))
|
||||
micFramesDroppedValue = metrics.FramesDropped
|
||||
oldDropped := atomic.SwapInt64(&micFramesDroppedValue, metrics.FramesDropped)
|
||||
if metrics.FramesDropped > oldDropped {
|
||||
microphoneFramesDroppedTotal.Add(float64(metrics.FramesDropped - oldDropped))
|
||||
}
|
||||
|
||||
if metrics.BytesProcessed > micBytesProcessedValue {
|
||||
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - micBytesProcessedValue))
|
||||
micBytesProcessedValue = metrics.BytesProcessed
|
||||
oldBytes := atomic.SwapInt64(&micBytesProcessedValue, metrics.BytesProcessed)
|
||||
if metrics.BytesProcessed > oldBytes {
|
||||
microphoneBytesProcessedTotal.Add(float64(metrics.BytesProcessed - oldBytes))
|
||||
}
|
||||
|
||||
if metrics.ConnectionDrops > micConnectionDropsValue {
|
||||
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - micConnectionDropsValue))
|
||||
micConnectionDropsValue = metrics.ConnectionDrops
|
||||
oldDrops := atomic.SwapInt64(&micConnectionDropsValue, metrics.ConnectionDrops)
|
||||
if metrics.ConnectionDrops > oldDrops {
|
||||
microphoneConnectionDropsTotal.Add(float64(metrics.ConnectionDrops - oldDrops))
|
||||
}
|
||||
|
||||
// Update gauges
|
||||
|
@ -306,7 +299,7 @@ func UpdateMicrophoneMetrics(metrics AudioInputMetrics) {
|
|||
microphoneLastFrameTimestamp.Set(float64(metrics.LastFrameTime.Unix()))
|
||||
}
|
||||
|
||||
lastMetricsUpdate = time.Now()
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateAudioProcessMetrics updates Prometheus metrics with audio subprocess data
|
||||
|
@ -324,7 +317,7 @@ func UpdateAudioProcessMetrics(metrics ProcessMetrics, isRunning bool) {
|
|||
audioProcessRunning.Set(0)
|
||||
}
|
||||
|
||||
lastMetricsUpdate = time.Now()
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateMicrophoneProcessMetrics updates Prometheus metrics with microphone subprocess data
|
||||
|
@ -342,7 +335,7 @@ func UpdateMicrophoneProcessMetrics(metrics ProcessMetrics, isRunning bool) {
|
|||
microphoneProcessRunning.Set(0)
|
||||
}
|
||||
|
||||
lastMetricsUpdate = time.Now()
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateAudioConfigMetrics updates Prometheus metrics with audio configuration
|
||||
|
@ -355,7 +348,7 @@ func UpdateAudioConfigMetrics(config AudioConfig) {
|
|||
audioConfigSampleRate.Set(float64(config.SampleRate))
|
||||
audioConfigChannels.Set(float64(config.Channels))
|
||||
|
||||
lastMetricsUpdate = time.Now()
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// UpdateMicrophoneConfigMetrics updates Prometheus metrics with microphone configuration
|
||||
|
@ -368,14 +361,13 @@ func UpdateMicrophoneConfigMetrics(config AudioConfig) {
|
|||
microphoneConfigSampleRate.Set(float64(config.SampleRate))
|
||||
microphoneConfigChannels.Set(float64(config.Channels))
|
||||
|
||||
lastMetricsUpdate = time.Now()
|
||||
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
|
||||
}
|
||||
|
||||
// GetLastMetricsUpdate returns the timestamp of the last metrics update
|
||||
func GetLastMetricsUpdate() time.Time {
|
||||
metricsUpdateMutex.RLock()
|
||||
defer metricsUpdateMutex.RUnlock()
|
||||
return lastMetricsUpdate
|
||||
timestamp := atomic.LoadInt64(&lastMetricsUpdate)
|
||||
return time.Unix(timestamp, 0)
|
||||
}
|
||||
|
||||
// StartMetricsUpdater starts a goroutine that periodically updates Prometheus metrics
|
||||
|
|
|
@ -6,43 +6,33 @@ import (
|
|||
"unsafe"
|
||||
)
|
||||
|
||||
// MicrophoneContentionManager provides optimized microphone operation locking
|
||||
// with reduced contention using atomic operations and conditional locking
|
||||
// MicrophoneContentionManager manages microphone access with cooldown periods
|
||||
type MicrophoneContentionManager struct {
|
||||
// Atomic fields (must be 64-bit aligned on 32-bit systems)
|
||||
lastOpNano int64 // Unix nanoseconds of last operation
|
||||
cooldownNanos int64 // Cooldown duration in nanoseconds
|
||||
operationID int64 // Incremental operation ID for tracking
|
||||
|
||||
// Lock-free state flags (using atomic.Pointer for lock-free updates)
|
||||
lockPtr unsafe.Pointer // *sync.Mutex - conditionally allocated
|
||||
lastOpNano int64
|
||||
cooldownNanos int64
|
||||
operationID int64
|
||||
lockPtr unsafe.Pointer
|
||||
}
|
||||
|
||||
// NewMicrophoneContentionManager creates a new microphone contention manager
|
||||
func NewMicrophoneContentionManager(cooldown time.Duration) *MicrophoneContentionManager {
|
||||
return &MicrophoneContentionManager{
|
||||
cooldownNanos: int64(cooldown),
|
||||
}
|
||||
}
|
||||
|
||||
// OperationResult represents the result of attempting a microphone operation
|
||||
type OperationResult struct {
|
||||
Allowed bool
|
||||
RemainingCooldown time.Duration
|
||||
OperationID int64
|
||||
}
|
||||
|
||||
// TryOperation attempts to perform a microphone operation with optimized contention handling
|
||||
func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
|
||||
now := time.Now().UnixNano()
|
||||
cooldown := atomic.LoadInt64(&mcm.cooldownNanos)
|
||||
|
||||
// Fast path: check if we're clearly outside cooldown period using atomic read
|
||||
lastOp := atomic.LoadInt64(&mcm.lastOpNano)
|
||||
elapsed := now - lastOp
|
||||
|
||||
if elapsed >= cooldown {
|
||||
// Attempt atomic update without locking
|
||||
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
|
||||
opID := atomic.AddInt64(&mcm.operationID, 1)
|
||||
return OperationResult{
|
||||
|
@ -51,16 +41,10 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
|
|||
OperationID: opID,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Slow path: potential contention, check remaining cooldown
|
||||
currentLastOp := atomic.LoadInt64(&mcm.lastOpNano)
|
||||
currentElapsed := now - currentLastOp
|
||||
|
||||
if currentElapsed >= cooldown {
|
||||
// Race condition: another operation might have updated lastOpNano
|
||||
// Try once more with CAS
|
||||
if atomic.CompareAndSwapInt64(&mcm.lastOpNano, currentLastOp, now) {
|
||||
// Retry once if CAS failed
|
||||
lastOp = atomic.LoadInt64(&mcm.lastOpNano)
|
||||
elapsed = now - lastOp
|
||||
if elapsed >= cooldown && atomic.CompareAndSwapInt64(&mcm.lastOpNano, lastOp, now) {
|
||||
opID := atomic.AddInt64(&mcm.operationID, 1)
|
||||
return OperationResult{
|
||||
Allowed: true,
|
||||
|
@ -68,12 +52,9 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
|
|||
OperationID: opID,
|
||||
}
|
||||
}
|
||||
// If CAS failed, fall through to cooldown calculation
|
||||
currentLastOp = atomic.LoadInt64(&mcm.lastOpNano)
|
||||
currentElapsed = now - currentLastOp
|
||||
}
|
||||
|
||||
remaining := time.Duration(cooldown - currentElapsed)
|
||||
remaining := time.Duration(cooldown - elapsed)
|
||||
if remaining < 0 {
|
||||
remaining = 0
|
||||
}
|
||||
|
@ -85,17 +66,14 @@ func (mcm *MicrophoneContentionManager) TryOperation() OperationResult {
|
|||
}
|
||||
}
|
||||
|
||||
// SetCooldown updates the cooldown duration atomically
|
||||
func (mcm *MicrophoneContentionManager) SetCooldown(cooldown time.Duration) {
|
||||
atomic.StoreInt64(&mcm.cooldownNanos, int64(cooldown))
|
||||
}
|
||||
|
||||
// GetCooldown returns the current cooldown duration
|
||||
func (mcm *MicrophoneContentionManager) GetCooldown() time.Duration {
|
||||
return time.Duration(atomic.LoadInt64(&mcm.cooldownNanos))
|
||||
}
|
||||
|
||||
// GetLastOperationTime returns the time of the last operation
|
||||
func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time {
|
||||
nanos := atomic.LoadInt64(&mcm.lastOpNano)
|
||||
if nanos == 0 {
|
||||
|
@ -104,55 +82,44 @@ func (mcm *MicrophoneContentionManager) GetLastOperationTime() time.Time {
|
|||
return time.Unix(0, nanos)
|
||||
}
|
||||
|
||||
// GetOperationCount returns the total number of successful operations
|
||||
func (mcm *MicrophoneContentionManager) GetOperationCount() int64 {
|
||||
return atomic.LoadInt64(&mcm.operationID)
|
||||
}
|
||||
|
||||
// Reset resets the contention manager state
|
||||
func (mcm *MicrophoneContentionManager) Reset() {
|
||||
atomic.StoreInt64(&mcm.lastOpNano, 0)
|
||||
atomic.StoreInt64(&mcm.operationID, 0)
|
||||
}
|
||||
|
||||
// Global instance for microphone contention management
|
||||
var (
|
||||
globalMicContentionManager unsafe.Pointer // *MicrophoneContentionManager
|
||||
globalMicContentionManager unsafe.Pointer
|
||||
micContentionInitialized int32
|
||||
)
|
||||
|
||||
// GetMicrophoneContentionManager returns the global microphone contention manager
|
||||
func GetMicrophoneContentionManager() *MicrophoneContentionManager {
|
||||
ptr := atomic.LoadPointer(&globalMicContentionManager)
|
||||
if ptr != nil {
|
||||
return (*MicrophoneContentionManager)(ptr)
|
||||
}
|
||||
|
||||
// Initialize on first use
|
||||
if atomic.CompareAndSwapInt32(&micContentionInitialized, 0, 1) {
|
||||
manager := NewMicrophoneContentionManager(200 * time.Millisecond)
|
||||
atomic.StorePointer(&globalMicContentionManager, unsafe.Pointer(manager))
|
||||
return manager
|
||||
}
|
||||
|
||||
// Another goroutine initialized it, try again
|
||||
ptr = atomic.LoadPointer(&globalMicContentionManager)
|
||||
if ptr != nil {
|
||||
return (*MicrophoneContentionManager)(ptr)
|
||||
}
|
||||
|
||||
// Fallback: create a new manager (should rarely happen)
|
||||
return NewMicrophoneContentionManager(200 * time.Millisecond)
|
||||
}
|
||||
|
||||
// TryMicrophoneOperation provides a convenient global function for microphone operations
|
||||
func TryMicrophoneOperation() OperationResult {
|
||||
manager := GetMicrophoneContentionManager()
|
||||
return manager.TryOperation()
|
||||
return GetMicrophoneContentionManager().TryOperation()
|
||||
}
|
||||
|
||||
// SetMicrophoneCooldown updates the global microphone cooldown
|
||||
func SetMicrophoneCooldown(cooldown time.Duration) {
|
||||
manager := GetMicrophoneContentionManager()
|
||||
manager.SetCooldown(cooldown)
|
||||
GetMicrophoneContentionManager().SetCooldown(cooldown)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
)
|
||||
|
||||
// RunAudioOutputServer runs the audio output server subprocess
|
||||
// This should be called from main() when the subprocess is detected
|
||||
func RunAudioOutputServer() error {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger()
|
||||
logger.Info().Msg("Starting audio output server subprocess")
|
||||
|
||||
// Create audio server
|
||||
server, err := NewAudioServer()
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("failed to create audio server")
|
||||
return err
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
// Start accepting connections
|
||||
if err := server.Start(); err != nil {
|
||||
logger.Error().Err(err).Msg("failed to start audio server")
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize audio processing
|
||||
err = StartNonBlockingAudioStreaming(func(frame []byte) {
|
||||
if err := server.SendFrame(frame); err != nil {
|
||||
logger.Warn().Err(err).Msg("failed to send audio frame")
|
||||
RecordFrameDropped()
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("failed to start audio processing")
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info().Msg("Audio output server started, waiting for connections")
|
||||
|
||||
// Set up signal handling for graceful shutdown
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
// Wait for shutdown signal
|
||||
select {
|
||||
case sig := <-sigChan:
|
||||
logger.Info().Str("signal", sig.String()).Msg("Received shutdown signal")
|
||||
case <-ctx.Done():
|
||||
logger.Info().Msg("Context cancelled")
|
||||
}
|
||||
|
||||
// Graceful shutdown
|
||||
logger.Info().Msg("Shutting down audio output server")
|
||||
StopNonBlockingAudioStreaming()
|
||||
|
||||
// Give some time for cleanup
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
logger.Info().Msg("Audio output server subprocess stopped")
|
||||
return nil
|
||||
}
|
|
@ -13,6 +13,28 @@ import (
|
|||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Constants for process monitoring
|
||||
const (
|
||||
// System constants
|
||||
pageSize = 4096
|
||||
maxCPUPercent = 100.0
|
||||
minCPUPercent = 0.01
|
||||
defaultClockTicks = 250.0 // Common for embedded ARM systems
|
||||
defaultMemoryGB = 8
|
||||
|
||||
// Monitoring thresholds
|
||||
maxWarmupSamples = 3
|
||||
warmupCPUSamples = 2
|
||||
logThrottleInterval = 10
|
||||
|
||||
// Channel buffer size
|
||||
metricsChannelBuffer = 100
|
||||
|
||||
// Clock tick detection ranges
|
||||
minValidClockTicks = 50
|
||||
maxValidClockTicks = 1000
|
||||
)
|
||||
|
||||
// ProcessMetrics represents CPU and memory usage metrics for a process
|
||||
type ProcessMetrics struct {
|
||||
PID int `json:"pid"`
|
||||
|
@ -24,7 +46,6 @@ type ProcessMetrics struct {
|
|||
ProcessName string `json:"process_name"`
|
||||
}
|
||||
|
||||
// ProcessMonitor monitors CPU and memory usage of processes
|
||||
type ProcessMonitor struct {
|
||||
logger zerolog.Logger
|
||||
mutex sync.RWMutex
|
||||
|
@ -33,15 +54,20 @@ type ProcessMonitor struct {
|
|||
stopChan chan struct{}
|
||||
metricsChan chan ProcessMetrics
|
||||
updateInterval time.Duration
|
||||
totalMemory int64
|
||||
memoryOnce sync.Once
|
||||
clockTicks float64
|
||||
clockTicksOnce sync.Once
|
||||
}
|
||||
|
||||
// processState tracks the state needed for CPU calculation
|
||||
type processState struct {
|
||||
name string
|
||||
lastCPUTime int64
|
||||
lastSysTime int64
|
||||
lastUserTime int64
|
||||
lastSample time.Time
|
||||
name string
|
||||
lastCPUTime int64
|
||||
lastSysTime int64
|
||||
lastUserTime int64
|
||||
lastSample time.Time
|
||||
warmupSamples int
|
||||
}
|
||||
|
||||
// NewProcessMonitor creates a new process monitor
|
||||
|
@ -50,8 +76,8 @@ func NewProcessMonitor() *ProcessMonitor {
|
|||
logger: logging.GetDefaultLogger().With().Str("component", "process-monitor").Logger(),
|
||||
monitoredPIDs: make(map[int]*processState),
|
||||
stopChan: make(chan struct{}),
|
||||
metricsChan: make(chan ProcessMetrics, 100),
|
||||
updateInterval: 2 * time.Second, // Update every 2 seconds
|
||||
metricsChan: make(chan ProcessMetrics, metricsChannelBuffer),
|
||||
updateInterval: GetMetricsUpdateInterval(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,30 +164,33 @@ func (pm *ProcessMonitor) monitorLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// collectAllMetrics collects metrics for all monitored processes
|
||||
func (pm *ProcessMonitor) collectAllMetrics() {
|
||||
pm.mutex.RLock()
|
||||
pids := make(map[int]*processState)
|
||||
pidsToCheck := make([]int, 0, len(pm.monitoredPIDs))
|
||||
states := make([]*processState, 0, len(pm.monitoredPIDs))
|
||||
for pid, state := range pm.monitoredPIDs {
|
||||
pids[pid] = state
|
||||
pidsToCheck = append(pidsToCheck, pid)
|
||||
states = append(states, state)
|
||||
}
|
||||
pm.mutex.RUnlock()
|
||||
|
||||
for pid, state := range pids {
|
||||
if metric, err := pm.collectMetrics(pid, state); err == nil {
|
||||
deadPIDs := make([]int, 0)
|
||||
for i, pid := range pidsToCheck {
|
||||
if metric, err := pm.collectMetrics(pid, states[i]); err == nil {
|
||||
select {
|
||||
case pm.metricsChan <- metric:
|
||||
default:
|
||||
// Channel full, skip this metric
|
||||
}
|
||||
} else {
|
||||
// Process might have died, remove it
|
||||
pm.RemoveProcess(pid)
|
||||
deadPIDs = append(deadPIDs, pid)
|
||||
}
|
||||
}
|
||||
|
||||
for _, pid := range deadPIDs {
|
||||
pm.RemoveProcess(pid)
|
||||
}
|
||||
}
|
||||
|
||||
// collectMetrics collects metrics for a specific process
|
||||
func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessMetrics, error) {
|
||||
now := time.Now()
|
||||
metric := ProcessMetrics{
|
||||
|
@ -170,49 +199,33 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM
|
|||
ProcessName: state.name,
|
||||
}
|
||||
|
||||
// Read /proc/[pid]/stat for CPU and memory info
|
||||
statPath := fmt.Sprintf("/proc/%d/stat", pid)
|
||||
statData, err := os.ReadFile(statPath)
|
||||
if err != nil {
|
||||
return metric, fmt.Errorf("failed to read stat file: %w", err)
|
||||
return metric, err
|
||||
}
|
||||
|
||||
// Parse stat file
|
||||
fields := strings.Fields(string(statData))
|
||||
if len(fields) < 24 {
|
||||
return metric, fmt.Errorf("invalid stat file format")
|
||||
return metric, fmt.Errorf("invalid stat format")
|
||||
}
|
||||
|
||||
// Extract CPU times (fields 13, 14 are utime, stime in clock ticks)
|
||||
utime, _ := strconv.ParseInt(fields[13], 10, 64)
|
||||
stime, _ := strconv.ParseInt(fields[14], 10, 64)
|
||||
totalCPUTime := utime + stime
|
||||
|
||||
// Extract memory info (field 22 is vsize, field 23 is rss in pages)
|
||||
vsize, _ := strconv.ParseInt(fields[22], 10, 64)
|
||||
rss, _ := strconv.ParseInt(fields[23], 10, 64)
|
||||
|
||||
// Convert RSS from pages to bytes (assuming 4KB pages)
|
||||
pageSize := int64(4096)
|
||||
metric.MemoryRSS = rss * pageSize
|
||||
metric.MemoryVMS = vsize
|
||||
|
||||
// Calculate CPU percentage
|
||||
if !state.lastSample.IsZero() {
|
||||
timeDelta := now.Sub(state.lastSample).Seconds()
|
||||
cpuDelta := float64(totalCPUTime - state.lastCPUTime)
|
||||
metric.CPUPercent = pm.calculateCPUPercent(totalCPUTime, state, now)
|
||||
|
||||
// Convert from clock ticks to seconds (assuming 100 Hz)
|
||||
clockTicks := 100.0
|
||||
cpuSeconds := cpuDelta / clockTicks
|
||||
|
||||
if timeDelta > 0 {
|
||||
metric.CPUPercent = (cpuSeconds / timeDelta) * 100.0
|
||||
// Cap CPU percentage at 100% to handle multi-core usage
|
||||
if metric.CPUPercent > 100.0 {
|
||||
metric.CPUPercent = 100.0
|
||||
}
|
||||
}
|
||||
// Increment warmup counter
|
||||
if state.warmupSamples < maxWarmupSamples {
|
||||
state.warmupSamples++
|
||||
}
|
||||
|
||||
// Calculate memory percentage (RSS / total system memory)
|
||||
|
@ -229,28 +242,127 @@ func (pm *ProcessMonitor) collectMetrics(pid int, state *processState) (ProcessM
|
|||
return metric, nil
|
||||
}
|
||||
|
||||
// getTotalMemory returns total system memory in bytes
|
||||
func (pm *ProcessMonitor) getTotalMemory() int64 {
|
||||
file, err := os.Open("/proc/meminfo")
|
||||
if err != nil {
|
||||
return 0
|
||||
// calculateCPUPercent calculates CPU percentage for a process
|
||||
func (pm *ProcessMonitor) calculateCPUPercent(totalCPUTime int64, state *processState, now time.Time) float64 {
|
||||
if state.lastSample.IsZero() {
|
||||
// First sample - initialize baseline
|
||||
state.warmupSamples = 0
|
||||
return 0.0
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if strings.HasPrefix(line, "MemTotal:") {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) >= 2 {
|
||||
if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
|
||||
return kb * 1024 // Convert KB to bytes
|
||||
timeDelta := now.Sub(state.lastSample).Seconds()
|
||||
cpuDelta := float64(totalCPUTime - state.lastCPUTime)
|
||||
|
||||
if timeDelta <= 0 {
|
||||
return 0.0
|
||||
}
|
||||
|
||||
if cpuDelta > 0 {
|
||||
// Convert from clock ticks to seconds using actual system clock ticks
|
||||
clockTicks := pm.getClockTicks()
|
||||
cpuSeconds := cpuDelta / clockTicks
|
||||
cpuPercent := (cpuSeconds / timeDelta) * 100.0
|
||||
|
||||
// Apply bounds
|
||||
if cpuPercent > maxCPUPercent {
|
||||
cpuPercent = maxCPUPercent
|
||||
}
|
||||
if cpuPercent < minCPUPercent {
|
||||
cpuPercent = minCPUPercent
|
||||
}
|
||||
|
||||
return cpuPercent
|
||||
}
|
||||
|
||||
// No CPU delta - process was idle
|
||||
if state.warmupSamples < warmupCPUSamples {
|
||||
// During warmup, provide a small non-zero value to indicate process is alive
|
||||
return minCPUPercent
|
||||
}
|
||||
|
||||
return 0.0
|
||||
}
|
||||
|
||||
func (pm *ProcessMonitor) getClockTicks() float64 {
|
||||
pm.clockTicksOnce.Do(func() {
|
||||
// Try to detect actual clock ticks from kernel boot parameters or /proc/stat
|
||||
if data, err := os.ReadFile("/proc/cmdline"); err == nil {
|
||||
// Look for HZ parameter in kernel command line
|
||||
cmdline := string(data)
|
||||
if strings.Contains(cmdline, "HZ=") {
|
||||
fields := strings.Fields(cmdline)
|
||||
for _, field := range fields {
|
||||
if strings.HasPrefix(field, "HZ=") {
|
||||
if hz, err := strconv.ParseFloat(field[3:], 64); err == nil && hz > 0 {
|
||||
pm.clockTicks = hz
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
return 0
|
||||
|
||||
// Try reading from /proc/timer_list for more accurate detection
|
||||
if data, err := os.ReadFile("/proc/timer_list"); err == nil {
|
||||
timer := string(data)
|
||||
// Look for tick device frequency
|
||||
lines := strings.Split(timer, "\n")
|
||||
for _, line := range lines {
|
||||
if strings.Contains(line, "tick_period:") {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) >= 2 {
|
||||
if period, err := strconv.ParseInt(fields[1], 10, 64); err == nil && period > 0 {
|
||||
// Convert nanoseconds to Hz
|
||||
hz := 1000000000.0 / float64(period)
|
||||
if hz >= minValidClockTicks && hz <= maxValidClockTicks {
|
||||
pm.clockTicks = hz
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: Most embedded ARM systems (like jetKVM) use 250 Hz or 1000 Hz
|
||||
// rather than the traditional 100 Hz
|
||||
pm.clockTicks = defaultClockTicks
|
||||
pm.logger.Warn().Float64("clock_ticks", pm.clockTicks).Msg("Using fallback clock ticks value")
|
||||
|
||||
// Log successful detection for non-fallback values
|
||||
if pm.clockTicks != defaultClockTicks {
|
||||
pm.logger.Info().Float64("clock_ticks", pm.clockTicks).Msg("Detected system clock ticks")
|
||||
}
|
||||
})
|
||||
return pm.clockTicks
|
||||
}
|
||||
|
||||
func (pm *ProcessMonitor) getTotalMemory() int64 {
|
||||
pm.memoryOnce.Do(func() {
|
||||
file, err := os.Open("/proc/meminfo")
|
||||
if err != nil {
|
||||
pm.totalMemory = defaultMemoryGB * 1024 * 1024 * 1024
|
||||
return
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
scanner := bufio.NewScanner(file)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if strings.HasPrefix(line, "MemTotal:") {
|
||||
fields := strings.Fields(line)
|
||||
if len(fields) >= 2 {
|
||||
if kb, err := strconv.ParseInt(fields[1], 10, 64); err == nil {
|
||||
pm.totalMemory = kb * 1024
|
||||
return
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
pm.totalMemory = defaultMemoryGB * 1024 * 1024 * 1024 // Fallback
|
||||
})
|
||||
return pm.totalMemory
|
||||
}
|
||||
|
||||
// GetTotalMemory returns total system memory in bytes (public method)
|
||||
|
|
|
@ -3,6 +3,7 @@ package audio
|
|||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/jetkvm/kvm/internal/logging"
|
||||
"github.com/pion/webrtc/v4/pkg/media"
|
||||
|
@ -123,26 +124,34 @@ func (r *AudioRelay) UpdateTrack(audioTrack AudioTrackWriter) {
|
|||
r.audioTrack = audioTrack
|
||||
}
|
||||
|
||||
// relayLoop is the main relay loop that forwards frames from subprocess to WebRTC
|
||||
func (r *AudioRelay) relayLoop() {
|
||||
defer r.wg.Done()
|
||||
r.logger.Debug().Msg("Audio relay loop started")
|
||||
|
||||
const maxConsecutiveErrors = 10
|
||||
consecutiveErrors := 0
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.ctx.Done():
|
||||
r.logger.Debug().Msg("Audio relay loop stopping")
|
||||
return
|
||||
default:
|
||||
// Receive frame from audio server subprocess
|
||||
frame, err := r.client.ReceiveFrame()
|
||||
if err != nil {
|
||||
r.logger.Error().Err(err).Msg("Failed to receive audio frame")
|
||||
consecutiveErrors++
|
||||
r.logger.Error().Err(err).Int("consecutive_errors", consecutiveErrors).Msg("Failed to receive audio frame")
|
||||
r.incrementDropped()
|
||||
|
||||
if consecutiveErrors >= maxConsecutiveErrors {
|
||||
r.logger.Error().Msg("Too many consecutive errors, stopping relay")
|
||||
return
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
|
||||
// Forward frame to WebRTC
|
||||
consecutiveErrors = 0
|
||||
if err := r.forwardToWebRTC(frame); err != nil {
|
||||
r.logger.Warn().Err(err).Msg("Failed to forward frame to WebRTC")
|
||||
r.incrementDropped()
|
||||
|
|
49
main.go
49
main.go
|
@ -20,43 +20,14 @@ var (
|
|||
audioSupervisor *audio.AudioServerSupervisor
|
||||
)
|
||||
|
||||
// runAudioServer is now handled by audio.RunAudioOutputServer
|
||||
// This function is kept for backward compatibility but delegates to the audio package
|
||||
func runAudioServer() {
|
||||
logger.Info().Msg("Starting audio server subprocess")
|
||||
|
||||
// Create audio server
|
||||
server, err := audio.NewAudioServer()
|
||||
err := audio.RunAudioOutputServer()
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("failed to create audio server")
|
||||
logger.Error().Err(err).Msg("audio output server failed")
|
||||
os.Exit(1)
|
||||
}
|
||||
defer server.Close()
|
||||
|
||||
// Start accepting connections
|
||||
if err := server.Start(); err != nil {
|
||||
logger.Error().Err(err).Msg("failed to start audio server")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Initialize audio processing
|
||||
err = audio.StartNonBlockingAudioStreaming(func(frame []byte) {
|
||||
if err := server.SendFrame(frame); err != nil {
|
||||
logger.Warn().Err(err).Msg("failed to send audio frame")
|
||||
audio.RecordFrameDropped()
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error().Err(err).Msg("failed to start audio processing")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Wait for termination signal
|
||||
sigs := make(chan os.Signal, 1)
|
||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sigs
|
||||
|
||||
// Cleanup
|
||||
audio.StopNonBlockingAudioStreaming()
|
||||
logger.Info().Msg("Audio server subprocess stopped")
|
||||
}
|
||||
|
||||
func startAudioSubprocess() error {
|
||||
|
@ -212,14 +183,6 @@ func Main(audioServer bool, audioInputServer bool) {
|
|||
audio.InitializeAudioEventBroadcaster()
|
||||
logger.Info().Msg("audio event broadcaster initialized")
|
||||
|
||||
// Start audio input system for microphone processing
|
||||
err = audio.StartAudioInput()
|
||||
if err != nil {
|
||||
logger.Warn().Err(err).Msg("failed to start audio input system")
|
||||
} else {
|
||||
logger.Info().Msg("audio input system started")
|
||||
}
|
||||
|
||||
if err := setInitialVirtualMediaState(); err != nil {
|
||||
logger.Warn().Err(err).Msg("failed to set initial virtual media state")
|
||||
}
|
||||
|
@ -272,10 +235,6 @@ func Main(audioServer bool, audioInputServer bool) {
|
|||
|
||||
// Stop audio subprocess and wait for cleanup
|
||||
if !isAudioServer {
|
||||
// Stop audio input system
|
||||
logger.Info().Msg("stopping audio input system")
|
||||
audio.StopAudioInput()
|
||||
|
||||
if audioSupervisor != nil {
|
||||
logger.Info().Msg("stopping audio supervisor")
|
||||
if err := audioSupervisor.Stop(); err != nil {
|
||||
|
|
|
@ -41,10 +41,6 @@ interface AudioConfig {
|
|||
FrameSize: string;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
const qualityLabels = {
|
||||
0: "Low (32kbps)",
|
||||
1: "Medium (64kbps)",
|
||||
|
@ -211,7 +207,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
|
|||
|
||||
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
|
||||
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
|
||||
console.log("Microphone operation already in progress or within cooldown, ignoring click");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -233,7 +228,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
|
|||
|
||||
// Prevent rapid clicking - if any operation is in progress or within cooldown, ignore the click
|
||||
if (isStarting || isStopping || isToggling || (now - lastClickTime < CLICK_COOLDOWN)) {
|
||||
console.log("Microphone operation already in progress or within cooldown, ignoring mute toggle");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -279,7 +273,6 @@ export default function AudioControlPopover({ microphone, open }: AudioControlPo
|
|||
if (videoElement && 'setSinkId' in videoElement) {
|
||||
try {
|
||||
await (videoElement as HTMLVideoElement & { setSinkId: (deviceId: string) => Promise<void> }).setSinkId(deviceId);
|
||||
console.log('Audio output device changed to:', deviceId);
|
||||
} catch (error: unknown) {
|
||||
console.error('Failed to change audio output device:', error);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue