refactor(audio): consolidate supervision logic into base implementation

Move common supervision loop logic to BaseSupervisor with configurable parameters
Simplify input/output supervisor implementations by using base template
Update function comments to be more concise
This commit is contained in:
Alex P 2025-09-08 05:53:06 +00:00
parent a2a87b46b8
commit 6890f17a54
6 changed files with 490 additions and 1943 deletions

View File

@ -455,32 +455,8 @@ int jetkvm_audio_playback_init() {
return 0; return 0;
} }
// jetkvm_audio_decode_write decodes Opus data and writes PCM to ALSA playback device. // jetkvm_audio_decode_write decodes Opus data and writes PCM to ALSA playback device
// // with error recovery and packet loss concealment
// This function implements a robust audio playback pipeline with the following features:
// - Opus decoding with packet loss concealment
// - ALSA PCM playback with automatic device recovery
// - Progressive error recovery with exponential backoff
// - Buffer underrun and device suspension handling
//
// Error Recovery Strategy:
// 1. EPIPE (buffer underrun): Prepare device, optionally drop+prepare, retry with delays
// 2. ESTRPIPE (device suspended): Resume with timeout, fallback to prepare if needed
// 3. Opus decode errors: Attempt packet loss concealment before failing
//
// Performance Optimizations:
// - Stack-allocated PCM buffer to minimize heap allocations
// - Bounds checking to prevent buffer overruns
// - Direct ALSA device access for minimal latency
//
// Parameters:
// opus_buf: Input buffer containing Opus-encoded audio data
// opus_size: Size of the Opus data in bytes (must be > 0 and <= max_packet_size)
//
// Returns:
// 0: Success - audio frame decoded and written to playback device
// -1: Invalid parameters, initialization error, or bounds check failure
// -2: Unrecoverable ALSA or Opus error after all retry attempts
int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { int jetkvm_audio_decode_write(void *opus_buf, int opus_size) {
short pcm_buffer[1920]; // max 2ch*960 short pcm_buffer[1920]; // max 2ch*960
unsigned char *in = (unsigned char*)opus_buf; unsigned char *in = (unsigned char*)opus_buf;

File diff suppressed because it is too large Load Diff

View File

@ -488,13 +488,7 @@ func InitValidationCache() {
GetCachedConfig().Update() GetCachedConfig().Update()
} }
// ValidateAudioFrame provides optimized validation for audio frame data // ValidateAudioFrame validates audio frame data with cached max size for performance
// This is the primary validation function used in all audio processing paths
//
// Performance optimizations:
// - Uses cached max frame size to eliminate config lookups
// - Single branch condition for optimal CPU pipeline efficiency
// - Minimal error allocation overhead
// //
//go:inline //go:inline
func ValidateAudioFrame(data []byte) error { func ValidateAudioFrame(data []byte) error {

View File

@ -70,33 +70,32 @@ func (ais *AudioInputSupervisor) Start() error {
// supervisionLoop is the main supervision loop // supervisionLoop is the main supervision loop
func (ais *AudioInputSupervisor) supervisionLoop() { func (ais *AudioInputSupervisor) supervisionLoop() {
defer func() { // Configure supervision parameters (no restart for input supervisor)
ais.closeProcessDone() config := SupervisionConfig{
ais.logger.Info().Msg("audio input server supervision ended") ProcessType: "audio input server",
}() Timeout: GetConfig().InputSupervisorTimeout,
EnableRestart: false, // Input supervisor doesn't restart
for atomic.LoadInt32(&ais.running) == 1 { MaxRestartAttempts: 0,
select { RestartWindow: 0,
case <-ais.stopChan: RestartDelay: 0,
ais.logger.Info().Msg("received stop signal") MaxRestartDelay: 0,
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
case <-ais.ctx.Done():
ais.logger.Info().Msg("context cancelled")
ais.terminateProcess(GetConfig().InputSupervisorTimeout, "audio input server")
return
default:
// Start the process
if err := ais.startProcess(); err != nil {
ais.logger.Error().Err(err).Msg("failed to start audio input server process")
return
}
// Wait for process to exit
ais.waitForProcessExit("audio input server")
return // Single run, no restart logic for now
}
} }
// Configure callbacks (input supervisor doesn't have callbacks currently)
callbacks := ProcessCallbacks{
OnProcessStart: nil,
OnProcessExit: nil,
OnRestart: nil,
}
// Use the base supervision loop template
ais.SupervisionLoop(
config,
callbacks,
ais.startProcess,
func() bool { return false }, // Never restart
func() time.Duration { return 0 }, // No restart delay needed
)
} }
// startProcess starts the audio input server process // startProcess starts the audio input server process

View File

@ -219,3 +219,126 @@ func (bs *BaseSupervisor) waitForProcessExit(processType string) {
bs.logger.Info().Int("pid", pid).Msgf("%s process exited gracefully", processType) bs.logger.Info().Int("pid", pid).Msgf("%s process exited gracefully", processType)
} }
} }
// SupervisionConfig holds configuration for the supervision loop
type SupervisionConfig struct {
ProcessType string
Timeout time.Duration
EnableRestart bool
MaxRestartAttempts int
RestartWindow time.Duration
RestartDelay time.Duration
MaxRestartDelay time.Duration
}
// ProcessCallbacks holds callback functions for process lifecycle events
type ProcessCallbacks struct {
OnProcessStart func(pid int)
OnProcessExit func(pid int, exitCode int, crashed bool)
OnRestart func(attempt int, delay time.Duration)
}
// SupervisionLoop provides a template for supervision loops that can be extended by specific supervisors
func (bs *BaseSupervisor) SupervisionLoop(
config SupervisionConfig,
callbacks ProcessCallbacks,
startProcessFunc func() error,
shouldRestartFunc func() bool,
calculateDelayFunc func() time.Duration,
) {
defer func() {
bs.closeProcessDone()
bs.logger.Info().Msgf("%s supervision ended", config.ProcessType)
}()
for atomic.LoadInt32(&bs.running) == 1 {
select {
case <-bs.stopChan:
bs.logger.Info().Msg("received stop signal")
bs.terminateProcess(config.Timeout, config.ProcessType)
return
case <-bs.ctx.Done():
bs.logger.Info().Msg("context cancelled")
bs.terminateProcess(config.Timeout, config.ProcessType)
return
default:
// Start or restart the process
if err := startProcessFunc(); err != nil {
bs.logger.Error().Err(err).Msgf("failed to start %s process", config.ProcessType)
// Check if we should attempt restart (only if restart is enabled)
if !config.EnableRestart || !shouldRestartFunc() {
bs.logger.Error().Msgf("maximum restart attempts exceeded or restart disabled, stopping %s supervisor", config.ProcessType)
return
}
delay := calculateDelayFunc()
bs.logger.Warn().Dur("delay", delay).Msgf("retrying %s process start after delay", config.ProcessType)
if callbacks.OnRestart != nil {
callbacks.OnRestart(0, delay) // 0 indicates start failure, not exit restart
}
select {
case <-time.After(delay):
case <-bs.stopChan:
return
case <-bs.ctx.Done():
return
}
continue
}
// Wait for process to exit
bs.waitForProcessExitWithCallback(config.ProcessType, callbacks)
// Check if we should restart (only if restart is enabled)
if !config.EnableRestart {
bs.logger.Info().Msgf("%s process completed, restart disabled", config.ProcessType)
return
}
if !shouldRestartFunc() {
bs.logger.Error().Msgf("maximum restart attempts exceeded, stopping %s supervisor", config.ProcessType)
return
}
// Calculate restart delay
delay := calculateDelayFunc()
bs.logger.Info().Dur("delay", delay).Msgf("restarting %s process after delay", config.ProcessType)
if callbacks.OnRestart != nil {
callbacks.OnRestart(1, delay) // 1 indicates restart after exit
}
// Wait for restart delay
select {
case <-time.After(delay):
case <-bs.stopChan:
return
case <-bs.ctx.Done():
return
}
}
}
}
// waitForProcessExitWithCallback extends waitForProcessExit with callback support
func (bs *BaseSupervisor) waitForProcessExitWithCallback(processType string, callbacks ProcessCallbacks) {
bs.mutex.RLock()
pid := bs.processPID
bs.mutex.RUnlock()
// Use the base waitForProcessExit logic
bs.waitForProcessExit(processType)
// Handle callbacks if provided
if callbacks.OnProcessExit != nil {
bs.mutex.RLock()
exitCode := bs.lastExitCode
bs.mutex.RUnlock()
crashed := exitCode != 0
callbacks.OnProcessExit(pid, exitCode, crashed)
}
}

View File

@ -10,8 +10,6 @@ import (
"strconv" "strconv"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/rs/zerolog"
) )
// Component name constants for logging // Component name constants for logging
@ -70,7 +68,23 @@ func (s *AudioOutputSupervisor) SetCallbacks(
defer s.mutex.Unlock() defer s.mutex.Unlock()
s.onProcessStart = onStart s.onProcessStart = onStart
s.onProcessExit = onExit
// Wrap the exit callback to include restart tracking
if onExit != nil {
s.onProcessExit = func(pid int, exitCode int, crashed bool) {
if crashed {
s.recordRestartAttempt()
}
onExit(pid, exitCode, crashed)
}
} else {
s.onProcessExit = func(pid int, exitCode int, crashed bool) {
if crashed {
s.recordRestartAttempt()
}
}
}
s.onRestart = onRestart s.onRestart = onRestart
} }
@ -139,87 +153,34 @@ func (s *AudioOutputSupervisor) Stop() {
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped") s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped")
} }
// supervisionLoop is the main supervision loop // supervisionLoop is the main loop that manages the audio output process
func (s *AudioOutputSupervisor) supervisionLoop() { func (s *AudioOutputSupervisor) supervisionLoop() {
defer func() { // Configure supervision parameters
s.closeProcessDone() config := SupervisionConfig{
s.logger.Info().Msg("audio server supervision ended") ProcessType: "audio output server",
}() Timeout: GetConfig().OutputSupervisorTimeout,
EnableRestart: true,
for atomic.LoadInt32(&s.running) == 1 { MaxRestartAttempts: getMaxRestartAttempts(),
select { RestartWindow: getRestartWindow(),
case <-s.stopChan: RestartDelay: getRestartDelay(),
s.logger.Info().Msg("received stop signal") MaxRestartDelay: getMaxRestartDelay(),
s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return
case <-s.ctx.Done():
s.logger.Info().Msg("context cancelled")
s.terminateProcess(GetConfig().OutputSupervisorTimeout, "audio output server")
return
default:
// Start or restart the process
if err := s.startProcess(); err != nil {
// Only log start errors if error level enabled to reduce overhead
if s.logger.GetLevel() <= zerolog.ErrorLevel {
s.logger.Error().Err(err).Msg("failed to start audio server process")
}
// Check if we should attempt restart
if !s.shouldRestart() {
// Only log critical errors to reduce overhead
if s.logger.GetLevel() <= zerolog.ErrorLevel {
s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor")
}
return
}
delay := s.calculateRestartDelay()
// Sample logging to reduce overhead - log every 5th restart attempt
if len(s.restartAttempts)%5 == 0 && s.logger.GetLevel() <= zerolog.WarnLevel {
s.logger.Warn().Dur("delay", delay).Int("attempt", len(s.restartAttempts)).Msg("retrying process start after delay")
}
if s.onRestart != nil {
s.onRestart(len(s.restartAttempts), delay)
}
select {
case <-time.After(delay):
case <-s.stopChan:
return
case <-s.ctx.Done():
return
}
continue
}
// Wait for process to exit
s.waitForProcessExit()
// Check if we should restart
if !s.shouldRestart() {
s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor")
return
}
// Calculate restart delay
delay := s.calculateRestartDelay()
s.logger.Info().Dur("delay", delay).Msg("restarting audio server process after delay")
if s.onRestart != nil {
s.onRestart(len(s.restartAttempts), delay)
}
// Wait for restart delay
select {
case <-time.After(delay):
case <-s.stopChan:
return
case <-s.ctx.Done():
return
}
}
} }
// Configure callbacks
callbacks := ProcessCallbacks{
OnProcessStart: s.onProcessStart,
OnProcessExit: s.onProcessExit,
OnRestart: s.onRestart,
}
// Use the base supervision loop template
s.SupervisionLoop(
config,
callbacks,
s.startProcess,
s.shouldRestart,
s.calculateRestartDelay,
)
} }
// startProcess starts the audio server process // startProcess starts the audio server process
@ -261,30 +222,6 @@ func (s *AudioOutputSupervisor) startProcess() error {
return nil return nil
} }
// waitForProcessExit waits for the current process to exit and handles restart logic
func (s *AudioOutputSupervisor) waitForProcessExit() {
s.mutex.RLock()
pid := s.processPID
s.mutex.RUnlock()
// Use base supervisor's waitForProcessExit
s.BaseSupervisor.waitForProcessExit("audio output server")
// Handle output-specific logic (restart tracking and callbacks)
s.mutex.RLock()
exitCode := s.lastExitCode
s.mutex.RUnlock()
crashed := exitCode != 0
if crashed {
s.recordRestartAttempt()
}
if s.onProcessExit != nil {
s.onProcessExit(pid, exitCode, crashed)
}
}
// shouldRestart determines if the process should be restarted // shouldRestart determines if the process should be restarted
func (s *AudioOutputSupervisor) shouldRestart() bool { func (s *AudioOutputSupervisor) shouldRestart() bool {
if atomic.LoadInt32(&s.running) == 0 { if atomic.LoadInt32(&s.running) == 0 {