feat(audio): add subprocess pre-warming to reduce activation latency

implement batched metrics updates to reduce atomic operations
optimize thread locking for high-throughput scenarios only
This commit is contained in:
Alex P 2025-09-03 19:51:18 +00:00
parent 5c55da0787
commit ca365f1acd
8 changed files with 280 additions and 50 deletions

View File

@ -84,3 +84,21 @@ func GetAudioInputSupervisor() *AudioInputSupervisor {
} }
return (*AudioInputSupervisor)(ptr) return (*AudioInputSupervisor)(ptr)
} }
// PrewarmAudioInputSubprocess starts an audio input subprocess in advance to reduce activation latency
func PrewarmAudioInputSubprocess() error {
supervisor := GetAudioInputSupervisor()
if supervisor == nil {
return nil // No supervisor available, skip prewarming
}
return supervisor.PrewarmSubprocess()
}
// IsAudioInputSubprocessPrewarmed returns whether an audio input subprocess is prewarmed and ready
func IsAudioInputSubprocessPrewarmed() bool {
supervisor := GetAudioInputSupervisor()
if supervisor == nil {
return false
}
return supervisor.IsPrewarmed()
}

View File

@ -355,30 +355,94 @@ func GetAudioMetrics() AudioMetrics {
} }
} }
// RecordFrameReceived increments the frames received counter with simplified tracking // Batched metrics to reduce atomic operations frequency
var (
batchedFramesReceived int64
batchedBytesProcessed int64
batchedFramesDropped int64
batchedConnectionDrops int64
batchCounter int64
lastFlushTime int64 // Unix timestamp in nanoseconds
)
const (
// Batch size for metrics updates (reduce atomic ops by 10x)
metricsFlushInterval = 10
// Force flush every 100ms to ensure metrics freshness
metricsForceFlushNanos = 100 * 1000 * 1000 // 100ms in nanoseconds
)
// RecordFrameReceived increments the frames received counter with batched updates
func RecordFrameReceived(bytes int) { func RecordFrameReceived(bytes int) {
// Direct atomic updates to avoid sampling complexity in critical path // Use local batching to reduce atomic operations frequency
atomic.AddInt64(&metrics.FramesReceived, 1) atomic.AddInt64(&batchedFramesReceived, 1)
atomic.AddInt64(&metrics.BytesProcessed, int64(bytes)) atomic.AddInt64(&batchedBytesProcessed, int64(bytes))
// Always update timestamp for accurate last frame tracking // Update timestamp immediately for accurate tracking
metrics.LastFrameTime = time.Now() metrics.LastFrameTime = time.Now()
// Check if we should flush batched metrics
if atomic.AddInt64(&batchCounter, 1)%metricsFlushInterval == 0 {
flushBatchedMetrics()
} else {
// Force flush if too much time has passed
now := time.Now().UnixNano()
lastFlush := atomic.LoadInt64(&lastFlushTime)
if now-lastFlush > metricsForceFlushNanos {
flushBatchedMetrics()
}
}
} }
// RecordFrameDropped increments the frames dropped counter with simplified tracking // RecordFrameDropped increments the frames dropped counter with batched updates
func RecordFrameDropped() { func RecordFrameDropped() {
// Direct atomic update to avoid sampling complexity in critical path // Use local batching to reduce atomic operations frequency
atomic.AddInt64(&metrics.FramesDropped, 1) atomic.AddInt64(&batchedFramesDropped, 1)
// Check if we should flush batched metrics
if atomic.AddInt64(&batchCounter, 1)%metricsFlushInterval == 0 {
flushBatchedMetrics()
}
} }
// RecordConnectionDrop increments the connection drops counter with simplified tracking // RecordConnectionDrop increments the connection drops counter with batched updates
func RecordConnectionDrop() { func RecordConnectionDrop() {
// Direct atomic update to avoid sampling complexity in critical path // Use local batching to reduce atomic operations frequency
atomic.AddInt64(&metrics.ConnectionDrops, 1) atomic.AddInt64(&batchedConnectionDrops, 1)
// Check if we should flush batched metrics
if atomic.AddInt64(&batchCounter, 1)%metricsFlushInterval == 0 {
flushBatchedMetrics()
}
} }
// FlushPendingMetrics is now a no-op since we use direct atomic updates // flushBatchedMetrics flushes accumulated metrics to the main counters
func FlushPendingMetrics() { func flushBatchedMetrics() {
// No-op: metrics are now updated directly without local buffering // Atomically move batched metrics to main metrics
// This function is kept for API compatibility framesReceived := atomic.SwapInt64(&batchedFramesReceived, 0)
bytesProcessed := atomic.SwapInt64(&batchedBytesProcessed, 0)
framesDropped := atomic.SwapInt64(&batchedFramesDropped, 0)
connectionDrops := atomic.SwapInt64(&batchedConnectionDrops, 0)
// Update main metrics if we have any batched data
if framesReceived > 0 {
atomic.AddInt64(&metrics.FramesReceived, framesReceived)
}
if bytesProcessed > 0 {
atomic.AddInt64(&metrics.BytesProcessed, bytesProcessed)
}
if framesDropped > 0 {
atomic.AddInt64(&metrics.FramesDropped, framesDropped)
}
if connectionDrops > 0 {
atomic.AddInt64(&metrics.ConnectionDrops, connectionDrops)
}
// Update last flush time
atomic.StoreInt64(&lastFlushTime, time.Now().UnixNano())
}
// FlushPendingMetrics forces a flush of all batched metrics
func FlushPendingMetrics() {
flushBatchedMetrics()
} }

View File

@ -434,7 +434,7 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
// Skip priority setting for better performance - audio threads already have good priority // Skip priority setting for better performance - audio threads already have good priority
} }
// Update stats efficiently // Batch stats updates to reduce atomic operations (update once per batch instead of per frame)
atomic.AddInt64(&bap.stats.BatchedReads, 1) atomic.AddInt64(&bap.stats.BatchedReads, 1)
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize)) atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
if batchSize > 1 { if batchSize > 1 {
@ -461,6 +461,7 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
bap.stats.OSThreadPinTime += time.Since(start) bap.stats.OSThreadPinTime += time.Since(start)
} }
// Update timestamp only once per batch instead of per frame
bap.stats.LastBatchTime = time.Now() bap.stats.LastBatchTime = time.Now()
} }

View File

@ -902,6 +902,12 @@ type AudioConfigConstants struct {
// Default 200ms provides reasonable wait time for microphone access. // Default 200ms provides reasonable wait time for microphone access.
MicContentionTimeout time.Duration // 200ms contention timeout MicContentionTimeout time.Duration // 200ms contention timeout
// Subprocess Pre-warming Configuration
// Used in: input_supervisor.go for reducing microphone activation latency
// Impact: Pre-warms audio input subprocess during startup to eliminate cold start delay
// Default true enables pre-warming for optimal user experience
EnableSubprocessPrewarming bool // Enable subprocess pre-warming (default: true)
// Priority Scheduler Configuration - Settings for process priority management // Priority Scheduler Configuration - Settings for process priority management
// Used in: priority_scheduler.go for system priority control // Used in: priority_scheduler.go for system priority control
// Impact: Controls valid range for process priority adjustments // Impact: Controls valid range for process priority adjustments
@ -2343,6 +2349,9 @@ func DefaultAudioConfig() *AudioConfigConstants {
// Microphone Contention Configuration // Microphone Contention Configuration
MicContentionTimeout: 200 * time.Millisecond, MicContentionTimeout: 200 * time.Millisecond,
// Subprocess Pre-warming Configuration
EnableSubprocessPrewarming: true,
// Priority Scheduler Configuration // Priority Scheduler Configuration
MinNiceValue: -20, MinNiceValue: -20,
MaxNiceValue: 19, MaxNiceValue: 19,

View File

@ -1010,12 +1010,19 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
// Create a processor task that will run in the goroutine pool // Create a processor task that will run in the goroutine pool
processorTask := func() { processorTask := func() {
// Only lock OS thread and set priority for high-load scenarios
// This reduces interference with input processing threads
config := GetConfig()
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
// Set high priority for audio processing - skip logging in hotpath // Set priority only when necessary to reduce scheduler interference
_ = SetAudioThreadPriority() _ = SetAudioThreadPriority()
defer func() { _ = ResetThreadPriority() }() defer func() { _ = ResetThreadPriority() }()
}
// Create logger for this goroutine // Create logger for this goroutine
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger() logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
@ -1023,8 +1030,8 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
// Enhanced error tracking for processing // Enhanced error tracking for processing
var processingErrors int var processingErrors int
var lastProcessingError time.Time var lastProcessingError time.Time
maxProcessingErrors := GetConfig().MaxConsecutiveErrors maxProcessingErrors := config.MaxConsecutiveErrors
errorResetWindow := GetConfig().RestartWindow errorResetWindow := config.RestartWindow
defer ais.wg.Done() defer ais.wg.Done()
for { for {
@ -1127,11 +1134,18 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
// Create a monitor task that will run in the goroutine pool // Create a monitor task that will run in the goroutine pool
monitorTask := func() { monitorTask := func() {
// Monitor goroutine doesn't need thread locking for most scenarios
// Only use thread optimizations for high-throughput scenarios
config := GetConfig()
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if useThreadOptimizations {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
// Set I/O priority for monitoring // Set I/O priority for monitoring only when needed
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if err := SetAudioIOThreadPriority(); err != nil { if err := SetAudioIOThreadPriority(); err != nil {
logger.Warn().Err(err).Msg("Failed to set audio I/O priority") logger.Warn().Err(err).Msg("Failed to set audio I/O priority")
} }
@ -1140,6 +1154,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
logger.Warn().Err(err).Msg("Failed to reset thread priority") logger.Warn().Err(err).Msg("Failed to reset thread priority")
} }
}() }()
}
defer ais.wg.Done() defer ais.wg.Done()
ticker := time.NewTicker(GetConfig().DefaultTickerInterval) ticker := time.NewTicker(GetConfig().DefaultTickerInterval)

View File

@ -21,6 +21,10 @@ type AudioInputSupervisor struct {
// Environment variables for OPUS configuration // Environment variables for OPUS configuration
opusEnv []string opusEnv []string
// Pre-warming state
prewarmed bool
prewarmTime time.Time
} }
// NewAudioInputSupervisor creates a new audio input supervisor // NewAudioInputSupervisor creates a new audio input supervisor
@ -48,6 +52,73 @@ func (ais *AudioInputSupervisor) SetOpusConfig(bitrate, complexity, vbr, signalT
} }
} }
// PrewarmSubprocess starts a subprocess in advance to reduce activation latency
func (ais *AudioInputSupervisor) PrewarmSubprocess() error {
ais.mutex.Lock()
defer ais.mutex.Unlock()
// Don't prewarm if already running or prewarmed
if ais.IsRunning() || ais.prewarmed {
return nil
}
// Check for existing audio input server process first
if existingPID, err := ais.findExistingAudioInputProcess(); err == nil {
ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process for prewarming")
ais.prewarmed = true
ais.prewarmTime = time.Now()
return nil
}
// Create context for subprocess management
ais.createContext()
// Get current executable path
execPath, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to get executable path: %w", err)
}
// Build command arguments (only subprocess flag)
args := []string{"--audio-input-server"}
// Create command for audio input server subprocess
cmd := exec.CommandContext(ais.ctx, execPath, args...)
// Set environment variables for IPC and OPUS configuration
env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode
env = append(env, ais.opusEnv...) // Add OPUS configuration
cmd.Env = env
// Set process group to allow clean termination
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
ais.cmd = cmd
// Start the subprocess
err = cmd.Start()
if err != nil {
ais.cancelContext()
return fmt.Errorf("failed to prewarm audio input server process: %w", err)
}
ais.logger.Info().Int("pid", cmd.Process.Pid).Strs("args", args).Strs("opus_env", ais.opusEnv).Msg("Audio input server subprocess prewarmed")
// Add process to monitoring
ais.processMonitor.AddProcess(cmd.Process.Pid, "audio-input-server")
// Monitor the subprocess in a goroutine
go ais.monitorSubprocess()
// Mark as prewarmed
ais.prewarmed = true
ais.prewarmTime = time.Now()
return nil
}
// Start starts the audio input server subprocess // Start starts the audio input server subprocess
func (ais *AudioInputSupervisor) Start() error { func (ais *AudioInputSupervisor) Start() error {
ais.mutex.Lock() ais.mutex.Lock()
@ -60,6 +131,16 @@ func (ais *AudioInputSupervisor) Start() error {
return fmt.Errorf("audio input supervisor already running") return fmt.Errorf("audio input supervisor already running")
} }
// Use prewarmed subprocess if available
if ais.prewarmed && ais.cmd != nil && ais.cmd.Process != nil {
ais.logger.Info().Int("pid", ais.cmd.Process.Pid).Dur("prewarm_age", time.Since(ais.prewarmTime)).Msg("Using prewarmed audio input server subprocess")
ais.setRunning(true)
ais.prewarmed = false // Reset prewarmed state
// Connect client to the server
go ais.connectClient()
return nil
}
// Check for existing audio input server process // Check for existing audio input server process
if existingPID, err := ais.findExistingAudioInputProcess(); err == nil { if existingPID, err := ais.findExistingAudioInputProcess(); err == nil {
ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process, connecting to it") ais.logger.Info().Int("existing_pid", existingPID).Msg("Found existing audio input server process, connecting to it")
@ -120,11 +201,31 @@ func (ais *AudioInputSupervisor) Start() error {
return nil return nil
} }
// IsPrewarmed returns whether a subprocess is prewarmed and ready
func (ais *AudioInputSupervisor) IsPrewarmed() bool {
ais.mutex.RLock()
defer ais.mutex.RUnlock()
return ais.prewarmed
}
// GetPrewarmAge returns how long ago the subprocess was prewarmed
func (ais *AudioInputSupervisor) GetPrewarmAge() time.Duration {
ais.mutex.RLock()
defer ais.mutex.RUnlock()
if !ais.prewarmed {
return 0
}
return time.Since(ais.prewarmTime)
}
// Stop stops the audio input server subprocess // Stop stops the audio input server subprocess
func (ais *AudioInputSupervisor) Stop() { func (ais *AudioInputSupervisor) Stop() {
ais.mutex.Lock() ais.mutex.Lock()
defer ais.mutex.Unlock() defer ais.mutex.Unlock()
// Reset prewarmed state
ais.prewarmed = false
if !ais.IsRunning() { if !ais.IsRunning() {
return return
} }

View File

@ -135,9 +135,14 @@ func (s *AudioOutputStreamer) Stop() {
func (s *AudioOutputStreamer) streamLoop() { func (s *AudioOutputStreamer) streamLoop() {
defer s.wg.Done() defer s.wg.Done()
// Pin goroutine to OS thread for consistent performance // Only pin to OS thread for high-throughput scenarios to reduce scheduler interference
config := GetConfig()
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
}
// Adaptive timing for frame reading // Adaptive timing for frame reading
frameInterval := time.Duration(GetConfig().OutputStreamingFrameIntervalMS) * time.Millisecond // 50 FPS base rate frameInterval := time.Duration(GetConfig().OutputStreamingFrameIntervalMS) * time.Millisecond // 50 FPS base rate
@ -198,6 +203,11 @@ func (s *AudioOutputStreamer) streamLoop() {
func (s *AudioOutputStreamer) processingLoop() { func (s *AudioOutputStreamer) processingLoop() {
defer s.wg.Done() defer s.wg.Done()
// Only use thread optimizations for high-throughput scenarios
config := GetConfig()
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
// Pin goroutine to OS thread for consistent performance // Pin goroutine to OS thread for consistent performance
runtime.LockOSThread() runtime.LockOSThread()
defer runtime.UnlockOSThread() defer runtime.UnlockOSThread()
@ -217,6 +227,7 @@ func (s *AudioOutputStreamer) processingLoop() {
} }
} }
}() }()
}
for frameData := range s.processingChan { for frameData := range s.processingChan {
// Process frame and return buffer to pool after processing // Process frame and return buffer to pool after processing

11
main.go
View File

@ -68,6 +68,17 @@ func startAudioSubprocess() error {
config.AudioQualityLowOpusDTX, config.AudioQualityLowOpusDTX,
) )
// Pre-warm audio input subprocess to reduce activation latency (if enabled)
if config.EnableSubprocessPrewarming {
if err := audio.PrewarmAudioInputSubprocess(); err != nil {
logger.Warn().Err(err).Msg("failed to pre-warm audio input subprocess")
} else {
logger.Info().Msg("audio input subprocess pre-warmed successfully")
}
} else {
logger.Info().Msg("audio input subprocess pre-warming disabled by configuration")
}
// Note: Audio input supervisor is NOT started here - it will be started on-demand // Note: Audio input supervisor is NOT started here - it will be started on-demand
// when the user activates microphone input through the UI // when the user activates microphone input through the UI