diff --git a/internal/audio/api.go b/internal/audio/api.go index 5d9fe5fa..e813fd85 100644 --- a/internal/audio/api.go +++ b/internal/audio/api.go @@ -84,3 +84,21 @@ func GetAudioInputSupervisor() *AudioInputSupervisor { } return (*AudioInputSupervisor)(ptr) } + +// PrewarmAudioInputSubprocess starts an audio input subprocess in advance to reduce activation latency +func PrewarmAudioInputSubprocess() error { + supervisor := GetAudioInputSupervisor() + if supervisor == nil { + return nil // No supervisor available, skip prewarming + } + return supervisor.PrewarmSubprocess() +} + +// IsAudioInputSubprocessPrewarmed returns whether an audio input subprocess is prewarmed and ready +func IsAudioInputSubprocessPrewarmed() bool { + supervisor := GetAudioInputSupervisor() + if supervisor == nil { + return false + } + return supervisor.IsPrewarmed() +} diff --git a/internal/audio/audio.go b/internal/audio/audio.go index 3ed023dc..444ab312 100644 --- a/internal/audio/audio.go +++ b/internal/audio/audio.go @@ -355,30 +355,94 @@ func GetAudioMetrics() AudioMetrics { } } -// RecordFrameReceived increments the frames received counter with simplified tracking +// Batched metrics to reduce atomic operations frequency +var ( + batchedFramesReceived int64 + batchedBytesProcessed int64 + batchedFramesDropped int64 + batchedConnectionDrops int64 + batchCounter int64 + lastFlushTime int64 // Unix timestamp in nanoseconds +) + +const ( + // Batch size for metrics updates (reduce atomic ops by 10x) + metricsFlushInterval = 10 + // Force flush every 100ms to ensure metrics freshness + metricsForceFlushNanos = 100 * 1000 * 1000 // 100ms in nanoseconds +) + +// RecordFrameReceived increments the frames received counter with batched updates func RecordFrameReceived(bytes int) { - // Direct atomic updates to avoid sampling complexity in critical path - atomic.AddInt64(&metrics.FramesReceived, 1) - atomic.AddInt64(&metrics.BytesProcessed, int64(bytes)) + // Use local batching to reduce atomic operations frequency + atomic.AddInt64(&batchedFramesReceived, 1) + atomic.AddInt64(&batchedBytesProcessed, int64(bytes)) - // Always update timestamp for accurate last frame tracking + // Update timestamp immediately for accurate tracking metrics.LastFrameTime = time.Now() + + // Check if we should flush batched metrics + if atomic.AddInt64(&batchCounter, 1)%metricsFlushInterval == 0 { + flushBatchedMetrics() + } else { + // Force flush if too much time has passed + now := time.Now().UnixNano() + lastFlush := atomic.LoadInt64(&lastFlushTime) + if now-lastFlush > metricsForceFlushNanos { + flushBatchedMetrics() + } + } } -// RecordFrameDropped increments the frames dropped counter with simplified tracking +// RecordFrameDropped increments the frames dropped counter with batched updates func RecordFrameDropped() { - // Direct atomic update to avoid sampling complexity in critical path - atomic.AddInt64(&metrics.FramesDropped, 1) + // Use local batching to reduce atomic operations frequency + atomic.AddInt64(&batchedFramesDropped, 1) + + // Check if we should flush batched metrics + if atomic.AddInt64(&batchCounter, 1)%metricsFlushInterval == 0 { + flushBatchedMetrics() + } } -// RecordConnectionDrop increments the connection drops counter with simplified tracking +// RecordConnectionDrop increments the connection drops counter with batched updates func RecordConnectionDrop() { - // Direct atomic update to avoid sampling complexity in critical path - atomic.AddInt64(&metrics.ConnectionDrops, 1) + // Use local batching to reduce atomic operations frequency + atomic.AddInt64(&batchedConnectionDrops, 1) + + // Check if we should flush batched metrics + if atomic.AddInt64(&batchCounter, 1)%metricsFlushInterval == 0 { + flushBatchedMetrics() + } } -// FlushPendingMetrics is now a no-op since we use direct atomic updates -func FlushPendingMetrics() { - // No-op: metrics are now updated directly without local buffering - // This function is kept for API compatibility +// flushBatchedMetrics flushes accumulated metrics to the main counters +func flushBatchedMetrics() { + // Atomically move batched metrics to main metrics + framesReceived := atomic.SwapInt64(&batchedFramesReceived, 0) + bytesProcessed := atomic.SwapInt64(&batchedBytesProcessed, 0) + framesDropped := atomic.SwapInt64(&batchedFramesDropped, 0) + connectionDrops := atomic.SwapInt64(&batchedConnectionDrops, 0) + + // Update main metrics if we have any batched data + if framesReceived > 0 { + atomic.AddInt64(&metrics.FramesReceived, framesReceived) + } + if bytesProcessed > 0 { + atomic.AddInt64(&metrics.BytesProcessed, bytesProcessed) + } + if framesDropped > 0 { + atomic.AddInt64(&metrics.FramesDropped, framesDropped) + } + if connectionDrops > 0 { + atomic.AddInt64(&metrics.ConnectionDrops, connectionDrops) + } + + // Update last flush time + atomic.StoreInt64(&lastFlushTime, time.Now().UnixNano()) +} + +// FlushPendingMetrics forces a flush of all batched metrics +func FlushPendingMetrics() { + flushBatchedMetrics() } diff --git a/internal/audio/batch_audio.go b/internal/audio/batch_audio.go index d6f5a860..b74c45b5 100644 --- a/internal/audio/batch_audio.go +++ b/internal/audio/batch_audio.go @@ -434,7 +434,7 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { // Skip priority setting for better performance - audio threads already have good priority } - // Update stats efficiently + // Batch stats updates to reduce atomic operations (update once per batch instead of per frame) atomic.AddInt64(&bap.stats.BatchedReads, 1) atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) if batchSize > 1 { @@ -461,6 +461,7 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { bap.stats.OSThreadPinTime += time.Since(start) } + // Update timestamp only once per batch instead of per frame bap.stats.LastBatchTime = time.Now() } diff --git a/internal/audio/config_constants.go b/internal/audio/config_constants.go index ec837dfa..d44f801f 100644 --- a/internal/audio/config_constants.go +++ b/internal/audio/config_constants.go @@ -902,6 +902,12 @@ type AudioConfigConstants struct { // Default 200ms provides reasonable wait time for microphone access. MicContentionTimeout time.Duration // 200ms contention timeout + // Subprocess Pre-warming Configuration + // Used in: input_supervisor.go for reducing microphone activation latency + // Impact: Pre-warms audio input subprocess during startup to eliminate cold start delay + // Default true enables pre-warming for optimal user experience + EnableSubprocessPrewarming bool // Enable subprocess pre-warming (default: true) + // Priority Scheduler Configuration - Settings for process priority management // Used in: priority_scheduler.go for system priority control // Impact: Controls valid range for process priority adjustments @@ -2343,6 +2349,9 @@ func DefaultAudioConfig() *AudioConfigConstants { // Microphone Contention Configuration MicContentionTimeout: 200 * time.Millisecond, + // Subprocess Pre-warming Configuration + EnableSubprocessPrewarming: true, + // Priority Scheduler Configuration MinNiceValue: -20, MaxNiceValue: 19, diff --git a/internal/audio/input_ipc.go b/internal/audio/input_ipc.go index 8b1a84a0..e2f3eed8 100644 --- a/internal/audio/input_ipc.go +++ b/internal/audio/input_ipc.go @@ -1010,12 +1010,19 @@ func (ais *AudioInputServer) startProcessorGoroutine() { // Create a processor task that will run in the goroutine pool processorTask := func() { - runtime.LockOSThread() - defer runtime.UnlockOSThread() + // Only lock OS thread and set priority for high-load scenarios + // This reduces interference with input processing threads + config := GetConfig() + useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 - // Set high priority for audio processing - skip logging in hotpath - _ = SetAudioThreadPriority() - defer func() { _ = ResetThreadPriority() }() + if useThreadOptimizations { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // Set priority only when necessary to reduce scheduler interference + _ = SetAudioThreadPriority() + defer func() { _ = ResetThreadPriority() }() + } // Create logger for this goroutine logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() @@ -1023,8 +1030,8 @@ func (ais *AudioInputServer) startProcessorGoroutine() { // Enhanced error tracking for processing var processingErrors int var lastProcessingError time.Time - maxProcessingErrors := GetConfig().MaxConsecutiveErrors - errorResetWindow := GetConfig().RestartWindow + maxProcessingErrors := config.MaxConsecutiveErrors + errorResetWindow := config.RestartWindow defer ais.wg.Done() for { @@ -1127,19 +1134,27 @@ func (ais *AudioInputServer) startMonitorGoroutine() { // Create a monitor task that will run in the goroutine pool monitorTask := func() { - runtime.LockOSThread() - defer runtime.UnlockOSThread() + // Monitor goroutine doesn't need thread locking for most scenarios + // Only use thread optimizations for high-throughput scenarios + config := GetConfig() + useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 - // Set I/O priority for monitoring logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger() - if err := SetAudioIOThreadPriority(); err != nil { - logger.Warn().Err(err).Msg("Failed to set audio I/O priority") - } - defer func() { - if err := ResetThreadPriority(); err != nil { - logger.Warn().Err(err).Msg("Failed to reset thread priority") + + if useThreadOptimizations { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // Set I/O priority for monitoring only when needed + if err := SetAudioIOThreadPriority(); err != nil { + logger.Warn().Err(err).Msg("Failed to set audio I/O priority") } - }() + defer func() { + if err := ResetThreadPriority(); err != nil { + logger.Warn().Err(err).Msg("Failed to reset thread priority") + } + }() + } defer ais.wg.Done() ticker := time.NewTicker(GetConfig().DefaultTickerInterval) diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index dfd0fe5e..3aa95840 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -21,6 +21,10 @@ type AudioInputSupervisor struct { // Environment variables for OPUS configuration opusEnv []string + + // Pre-warming state + prewarmed bool + prewarmTime time.Time } // NewAudioInputSupervisor creates a new audio input supervisor @@ -48,6 +52,73 @@ func (ais *AudioInputSupervisor) SetOpusConfig(bitrate, complexity, vbr, signalT } } +// PrewarmSubprocess starts a subprocess in advance to reduce activation latency +func (ais *AudioInputSupervisor) PrewarmSubprocess() error { + ais.mutex.Lock() + defer ais.mutex.Unlock() + + // Don't prewarm if already running or prewarmed + if ais.IsRunning() || ais.prewarmed { + return nil + } + + // Check for existing audio input server process first + if existingPID, err := ais.findExistingAudioInputProcess(); err == nil { + ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process for prewarming") + ais.prewarmed = true + ais.prewarmTime = time.Now() + return nil + } + + // Create context for subprocess management + ais.createContext() + + // Get current executable path + execPath, err := os.Executable() + if err != nil { + return fmt.Errorf("failed to get executable path: %w", err) + } + + // Build command arguments (only subprocess flag) + args := []string{"--audio-input-server"} + + // Create command for audio input server subprocess + cmd := exec.CommandContext(ais.ctx, execPath, args...) + + // Set environment variables for IPC and OPUS configuration + env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode + env = append(env, ais.opusEnv...) // Add OPUS configuration + cmd.Env = env + + // Set process group to allow clean termination + cmd.SysProcAttr = &syscall.SysProcAttr{ + Setpgid: true, + } + + ais.cmd = cmd + + // Start the subprocess + err = cmd.Start() + if err != nil { + ais.cancelContext() + return fmt.Errorf("failed to prewarm audio input server process: %w", err) + } + + ais.logger.Info().Int("pid", cmd.Process.Pid).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("Audio input server subprocess prewarmed") + + // Add process to monitoring + ais.processMonitor.AddProcess(cmd.Process.Pid, "audio-input-server") + + // Monitor the subprocess in a goroutine + go ais.monitorSubprocess() + + // Mark as prewarmed + ais.prewarmed = true + ais.prewarmTime = time.Now() + + return nil +} + // Start starts the audio input server subprocess func (ais *AudioInputSupervisor) Start() error { ais.mutex.Lock() @@ -60,6 +131,16 @@ func (ais *AudioInputSupervisor) Start() error { return fmt.Errorf("audio input supervisor already running") } + // Use prewarmed subprocess if available + if ais.prewarmed && ais.cmd != nil && ais.cmd.Process != nil { + ais.logger.Info().Int("pid", ais.cmd.Process.Pid).Dur("prewarm_age", time.Since(ais.prewarmTime)).Msg("Using prewarmed audio input server subprocess") + ais.setRunning(true) + ais.prewarmed = false // Reset prewarmed state + // Connect client to the server + go ais.connectClient() + return nil + } + // Check for existing audio input server process if existingPID, err := ais.findExistingAudioInputProcess(); err == nil { ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process, connecting to it") @@ -120,11 +201,31 @@ func (ais *AudioInputSupervisor) Start() error { return nil } +// IsPrewarmed returns whether a subprocess is prewarmed and ready +func (ais *AudioInputSupervisor) IsPrewarmed() bool { + ais.mutex.RLock() + defer ais.mutex.RUnlock() + return ais.prewarmed +} + +// GetPrewarmAge returns how long ago the subprocess was prewarmed +func (ais *AudioInputSupervisor) GetPrewarmAge() time.Duration { + ais.mutex.RLock() + defer ais.mutex.RUnlock() + if !ais.prewarmed { + return 0 + } + return time.Since(ais.prewarmTime) +} + // Stop stops the audio input server subprocess func (ais *AudioInputSupervisor) Stop() { ais.mutex.Lock() defer ais.mutex.Unlock() + // Reset prewarmed state + ais.prewarmed = false + if !ais.IsRunning() { return } diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 32880875..df2b4d3a 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -135,9 +135,14 @@ func (s *AudioOutputStreamer) Stop() { func (s *AudioOutputStreamer) streamLoop() { defer s.wg.Done() - // Pin goroutine to OS thread for consistent performance - runtime.LockOSThread() - defer runtime.UnlockOSThread() + // Only pin to OS thread for high-throughput scenarios to reduce scheduler interference + config := GetConfig() + useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 + + if useThreadOptimizations { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } // Adaptive timing for frame reading frameInterval := time.Duration(GetConfig().OutputStreamingFrameIntervalMS) * time.Millisecond // 50 FPS base rate @@ -198,25 +203,31 @@ func (s *AudioOutputStreamer) streamLoop() { func (s *AudioOutputStreamer) processingLoop() { defer s.wg.Done() - // Pin goroutine to OS thread for consistent performance - runtime.LockOSThread() - defer runtime.UnlockOSThread() + // Only use thread optimizations for high-throughput scenarios + config := GetConfig() + useThreadOptimizations := config.MaxAudioProcessorWorkers > 8 - // Set high priority for audio output processing - if err := SetAudioThreadPriority(); err != nil { - // Only log priority warnings if warn level enabled to reduce overhead - if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to set audio output processing priority") - } - } - defer func() { - if err := ResetThreadPriority(); err != nil { + if useThreadOptimizations { + // Pin goroutine to OS thread for consistent performance + runtime.LockOSThread() + defer runtime.UnlockOSThread() + + // Set high priority for audio output processing + if err := SetAudioThreadPriority(); err != nil { // Only log priority warnings if warn level enabled to reduce overhead if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { - getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reset thread priority") + getOutputStreamingLogger().Warn().Err(err).Msg("Failed to set audio output processing priority") } } - }() + defer func() { + if err := ResetThreadPriority(); err != nil { + // Only log priority warnings if warn level enabled to reduce overhead + if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel { + getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reset thread priority") + } + } + }() + } for frameData := range s.processingChan { // Process frame and return buffer to pool after processing diff --git a/main.go b/main.go index 06a1cc2f..bfd228c9 100644 --- a/main.go +++ b/main.go @@ -68,6 +68,17 @@ func startAudioSubprocess() error { config.AudioQualityLowOpusDTX, ) + // Pre-warm audio input subprocess to reduce activation latency (if enabled) + if config.EnableSubprocessPrewarming { + if err := audio.PrewarmAudioInputSubprocess(); err != nil { + logger.Warn().Err(err).Msg("failed to pre-warm audio input subprocess") + } else { + logger.Info().Msg("audio input subprocess pre-warmed successfully") + } + } else { + logger.Info().Msg("audio input subprocess pre-warming disabled by configuration") + } + // Note: Audio input supervisor is NOT started here - it will be started on-demand // when the user activates microphone input through the UI