Compare commits

..

1 Commits

Author SHA1 Message Date
Alex da65440c6e
Merge a2a87b46b8 into bcc307b147 2025-09-07 20:14:49 +00:00
6 changed files with 1929 additions and 476 deletions

View File

@ -455,8 +455,32 @@ int jetkvm_audio_playback_init() {
return 0; return 0;
} }
// jetkvm_audio_decode_write decodes Opus data and writes PCM to ALSA playback device // jetkvm_audio_decode_write decodes Opus data and writes PCM to ALSA playback device.
// with error recovery and packet loss concealment //
// This function implements a robust audio playback pipeline with the following features:
// - Opus decoding with packet loss concealment
// - ALSA PCM playback with automatic device recovery
// - Progressive error recovery with exponential backoff
// - Buffer underrun and device suspension handling
//
// Error Recovery Strategy:
// 1. EPIPE (buffer underrun): Prepare device, optionally drop+prepare, retry with delays
// 2. ESTRPIPE (device suspended): Resume with timeout, fallback to prepare if needed
// 3. Opus decode errors: Attempt packet loss concealment before failing
//
// Performance Optimizations:
// - Stack-allocated PCM buffer to minimize heap allocations
// - Bounds checking to prevent buffer overruns
// - Direct ALSA device access for minimal latency
//
// Parameters:
// opus_buf: Input buffer containing Opus-encoded audio data
// opus_size: Size of the Opus data in bytes (must be > 0 and <= max_packet_size)
//
// Returns:
// 0: Success - audio frame decoded and written to playback device
// -1: Invalid parameters, initialization error, or bounds check failure
// -2: Unrecoverable ALSA or Opus error after all retry attempts
int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { int jetkvm_audio_decode_write(void *opus_buf, int opus_size) {
short pcm_buffer[1920]; // max 2ch*960 short pcm_buffer[1920]; // max 2ch*960
unsigned char *in = (unsigned char*)opus_buf; unsigned char *in = (unsigned char*)opus_buf;

File diff suppressed because it is too large Load Diff

View File

@ -488,7 +488,13 @@ func InitValidationCache() {
GetCachedConfig().Update() GetCachedConfig().Update()
} }
// ValidateAudioFrame validates audio frame data with cached max size for performance // ValidateAudioFrame provides optimized validation for audio frame data
// This is the primary validation function used in all audio processing paths
//
// Performance optimizations:
// - Uses cached max frame size to eliminate config lookups
// - Single branch condition for optimal CPU pipeline efficiency
// - Minimal error allocation overhead
// //
//go:inline //go:inline
func ValidateAudioFrame(data []byte) error { func ValidateAudioFrame(data []byte) error {

View File

@ -70,32 +70,33 @@ func (ais *AudioInputSupervisor) Start() error {
// supervisionLoop is the main supervision loop // supervisionLoop is the main supervision loop
func (ais *AudioInputSupervisor) supervisionLoop() { func (ais *AudioInputSupervisor) supervisionLoop() {
// Configure supervision parameters (no restart for input supervisor) defer func() {
config := SupervisionConfig{ ais.closeProcessDone()
ProcessType: "audio input server", ais.logger.Info().Msg("audio input server supervision ended")
Timeout: GetConfig().InputSupervisorTimeout, }()
EnableRestart: false, // Input supervisor doesn't restart
MaxRestartAttempts: 0, for atomic.LoadInt32(&ais.running) == 1 {
RestartWindow: 0, select {
RestartDelay: 0, case <-ais.stopChan:
MaxRestartDelay: 0, ais.logger.Info().Msg("received stop signal")
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
case <-ais.ctx.Done():
ais.logger.Info().Msg("context cancelled")
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
default:
// Start the process
if err := ais.startProcess(); err != nil {
ais.logger.Error().Err(err).Msg("failed to start audio input server process")
return
} }
// Configure callbacks (input supervisor doesn't have callbacks currently) // Wait for process to exit
callbacks := ProcessCallbacks{ ais.waitForProcessExit("audio input server")
OnProcessStart: nil, return // Single run, no restart logic for now
OnProcessExit: nil, }
OnRestart: nil,
} }
// Use the base supervision loop template
ais.SupervisionLoop(
config,
callbacks,
ais.startProcess,
func() bool { return false }, // Never restart
func() time.Duration { return 0 }, // No restart delay needed
)
} }
// startProcess starts the audio input server process // startProcess starts the audio input server process

View File

@ -219,126 +219,3 @@ func (bs *BaseSupervisor) waitForProcessExit(processType string) {
bs.logger.Info().Int("pid", pid).Msgf("%s process exited gracefully", processType) bs.logger.Info().Int("pid", pid).Msgf("%s process exited gracefully", processType)
} }
} }
// SupervisionConfig holds configuration for the supervision loop
type SupervisionConfig struct {
ProcessType string
Timeout time.Duration
EnableRestart bool
MaxRestartAttempts int
RestartWindow time.Duration
RestartDelay time.Duration
MaxRestartDelay time.Duration
}
// ProcessCallbacks holds callback functions for process lifecycle events
type ProcessCallbacks struct {
OnProcessStart func(pid int)
OnProcessExit func(pid int, exitCode int, crashed bool)
OnRestart func(attempt int, delay time.Duration)
}
// SupervisionLoop provides a template for supervision loops that can be extended by specific supervisors
func (bs *BaseSupervisor) SupervisionLoop(
config SupervisionConfig,
callbacks ProcessCallbacks,
startProcessFunc func() error,
shouldRestartFunc func() bool,
calculateDelayFunc func() time.Duration,
) {
defer func() {
bs.closeProcessDone()
bs.logger.Info().Msgf("%s supervision ended", config.ProcessType)
}()
for atomic.LoadInt32(&bs.running) == 1 {
select {
case <-bs.stopChan:
bs.logger.Info().Msg("received stop signal")
bs.terminateProcess(config.Timeout, config.ProcessType)
return
case <-bs.ctx.Done():
bs.logger.Info().Msg("context cancelled")
bs.terminateProcess(config.Timeout, config.ProcessType)
return
default:
// Start or restart the process
if err := startProcessFunc(); err != nil {
bs.logger.Error().Err(err).Msgf("failed to start %s process", config.ProcessType)
// Check if we should attempt restart (only if restart is enabled)
if !config.EnableRestart || !shouldRestartFunc() {
bs.logger.Error().Msgf("maximum restart attempts exceeded or restart disabled, stopping %s supervisor", config.ProcessType)
return
}
delay := calculateDelayFunc()
bs.logger.Warn().Dur("delay", delay).Msgf("retrying %s process start after delay", config.ProcessType)
if callbacks.OnRestart != nil {
callbacks.OnRestart(0, delay) // 0 indicates start failure, not exit restart
}
select {
case <-time.After(delay):
case <-bs.stopChan:
return
case <-bs.ctx.Done():
return
}
continue
}
// Wait for process to exit
bs.waitForProcessExitWithCallback(config.ProcessType, callbacks)
// Check if we should restart (only if restart is enabled)
if !config.EnableRestart {
bs.logger.Info().Msgf("%s process completed, restart disabled", config.ProcessType)
return
}
if !shouldRestartFunc() {
bs.logger.Error().Msgf("maximum restart attempts exceeded, stopping %s supervisor", config.ProcessType)
return
}
// Calculate restart delay
delay := calculateDelayFunc()
bs.logger.Info().Dur("delay", delay).Msgf("restarting %s process after delay", config.ProcessType)
if callbacks.OnRestart != nil {
callbacks.OnRestart(1, delay) // 1 indicates restart after exit
}
// Wait for restart delay
select {
case <-time.After(delay):
case <-bs.stopChan:
return
case <-bs.ctx.Done():
return
}
}
}
}
// waitForProcessExitWithCallback extends waitForProcessExit with callback support
func (bs *BaseSupervisor) waitForProcessExitWithCallback(processType string, callbacks ProcessCallbacks) {
bs.mutex.RLock()
pid := bs.processPID
bs.mutex.RUnlock()
// Use the base waitForProcessExit logic
bs.waitForProcessExit(processType)
// Handle callbacks if provided
if callbacks.OnProcessExit != nil {
bs.mutex.RLock()
exitCode := bs.lastExitCode
bs.mutex.RUnlock()
crashed := exitCode != 0
callbacks.OnProcessExit(pid, exitCode, crashed)
}
}

View File

@ -10,6 +10,8 @@ import (
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/rs/zerolog"
) )
// Component name constants for logging // Component name constants for logging
@ -68,23 +70,7 @@ func (s *AudioOutputSupervisor) SetCallbacks(
defer s.mutex.Unlock() defer s.mutex.Unlock()
s.onProcessStart = onStart s.onProcessStart = onStart
s.onProcessExit = onExit
// Wrap the exit callback to include restart tracking
if onExit != nil {
s.onProcessExit = func(pid int, exitCode int, crashed bool) {
if crashed {
s.recordRestartAttempt()
}
onExit(pid, exitCode, crashed)
}
} else {
s.onProcessExit = func(pid int, exitCode int, crashed bool) {
if crashed {
s.recordRestartAttempt()
}
}
}
s.onRestart = onRestart s.onRestart = onRestart
} }
@ -153,34 +139,87 @@ func (s *AudioOutputSupervisor) Stop() {
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped") s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped")
} }
// supervisionLoop is the main loop that manages the audio output process // supervisionLoop is the main supervision loop
func (s *AudioOutputSupervisor) supervisionLoop() { func (s *AudioOutputSupervisor) supervisionLoop() {
// Configure supervision parameters defer func() {
config := SupervisionConfig{ s.closeProcessDone()
ProcessType: "audio output server", s.logger.Info().Msg("audio server supervision ended")
Timeout: GetConfig().OutputSupervisorTimeout, }()
EnableRestart: true,
MaxRestartAttempts: getMaxRestartAttempts(), for atomic.LoadInt32(&s.running) == 1 {
RestartWindow: getRestartWindow(), select {
RestartDelay: getRestartDelay(), case <-s.stopChan:
MaxRestartDelay: getMaxRestartDelay(), s.logger.Info().Msg("received stop signal")
s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return
case <-s.ctx.Done():
s.logger.Info().Msg("context cancelled")
s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return
default:
// Start or restart the process
if err := s.startProcess(); err != nil {
// Only log start errors if error level enabled to reduce overhead
if s.logger.GetLevel() <= zerolog.ErrorLevel {
s.logger.Error().Err(err).Msg("failed to start audio server process")
} }
// Configure callbacks // Check if we should attempt restart
callbacks := ProcessCallbacks{ if !s.shouldRestart() {
OnProcessStart: s.onProcessStart, // Only log critical errors to reduce overhead
OnProcessExit: s.onProcessExit, if s.logger.GetLevel() <= zerolog.ErrorLevel {
OnRestart: s.onRestart, s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor")
}
return
} }
// Use the base supervision loop template delay := s.calculateRestartDelay()
s.SupervisionLoop( // Sample logging to reduce overhead - log every 5th restart attempt
config, if len(s.restartAttempts)%5 == 0 && s.logger.GetLevel() <= zerolog.WarnLevel {
callbacks, s.logger.Warn().Dur("delay", delay).Int("attempt", len(s.restartAttempts)).Msg("retrying process start after delay")
s.startProcess, }
s.shouldRestart,
s.calculateRestartDelay, if s.onRestart != nil {
) s.onRestart(len(s.restartAttempts), delay)
}
select {
case <-time.After(delay):
case <-s.stopChan:
return
case <-s.ctx.Done():
return
}
continue
}
// Wait for process to exit
s.waitForProcessExit()
// Check if we should restart
if !s.shouldRestart() {
s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor")
return
}
// Calculate restart delay
delay := s.calculateRestartDelay()
s.logger.Info().Dur("delay", delay).Msg("restarting audio server process after delay")
if s.onRestart != nil {
s.onRestart(len(s.restartAttempts), delay)
}
// Wait for restart delay
select {
case <-time.After(delay):
case <-s.stopChan:
return
case <-s.ctx.Done():
return
}
}
}
} }
// startProcess starts the audio server process // startProcess starts the audio server process
@ -222,6 +261,30 @@ func (s *AudioOutputSupervisor) startProcess() error {
return nil return nil
} }
// waitForProcessExit waits for the current process to exit and handles restart logic
func (s *AudioOutputSupervisor) waitForProcessExit() {
s.mutex.RLock()
pid := s.processPID
s.mutex.RUnlock()
// Use base supervisor's waitForProcessExit
s.BaseSupervisor.waitForProcessExit("audio output server")
// Handle output-specific logic (restart tracking and callbacks)
s.mutex.RLock()
exitCode := s.lastExitCode
s.mutex.RUnlock()
crashed := exitCode != 0
if crashed {
s.recordRestartAttempt()
}
if s.onProcessExit != nil {
s.onProcessExit(pid, exitCode, crashed)
}
}
// shouldRestart determines if the process should be restarted // shouldRestart determines if the process should be restarted
func (s *AudioOutputSupervisor) shouldRestart() bool { func (s *AudioOutputSupervisor) shouldRestart() bool {
if atomic.LoadInt32(&s.running) == 0 { if atomic.LoadInt32(&s.running) == 0 {