Compare commits

..

3 Commits

Author SHA1 Message Date
Alex P c40459664f Audio Input resiliency. Make sure the IPC client always recovers 2025-09-04 10:51:07 +00:00
Alex P cdf6731639 [WIP] Performance Updates:
Add LSB depth parameter for improved bit allocation and disable MMAP access for compatibility.
Adjust buffer sizing logic to better handle constrained environments while maintaining stability.
2025-09-04 08:47:40 +00:00
Alex P 6f15fdf965 perf(build): update ARM optimization flags for better compatibility
Replace -mcpu with -mtune in compiler flags to improve binary compatibility
while maintaining performance for Cortex-A7 targets
2025-09-04 08:25:16 +00:00
8 changed files with 191 additions and 50 deletions

View File

@ -29,7 +29,7 @@ ALSA_VERSION ?= 1.2.14
OPUS_VERSION ?= 1.5.2 OPUS_VERSION ?= 1.5.2
# Optimization flags for ARM Cortex-A7 with NEON # Optimization flags for ARM Cortex-A7 with NEON
OPTIM_CFLAGS := -O3 -mcpu=cortex-a7 -mfpu=neon -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops
PROMETHEUS_TAG := github.com/prometheus/common/version PROMETHEUS_TAG := github.com/prometheus/common/version
KVM_PKG_NAME := github.com/jetkvm/kvm KVM_PKG_NAME := github.com/jetkvm/kvm

View File

@ -32,8 +32,9 @@ static int opus_complexity = 3; // Will be set from GetConfig().CGOOpusC
static int opus_vbr = 1; // Will be set from GetConfig().CGOOpusVBR static int opus_vbr = 1; // Will be set from GetConfig().CGOOpusVBR
static int opus_vbr_constraint = 1; // Will be set from GetConfig().CGOOpusVBRConstraint static int opus_vbr_constraint = 1; // Will be set from GetConfig().CGOOpusVBRConstraint
static int opus_signal_type = 3; // Will be set from GetConfig().CGOOpusSignalType static int opus_signal_type = 3; // Will be set from GetConfig().CGOOpusSignalType
static int opus_bandwidth = 1105; // Will be set from GetConfig().CGOOpusBandwidth static int opus_bandwidth = 1105; // OPUS_BANDWIDTH_WIDEBAND for compatibility (was 1101)
static int opus_dtx = 0; // Will be set from GetConfig().CGOOpusDTX static int opus_dtx = 0; // Will be set from GetConfig().CGOOpusDTX
static int opus_lsb_depth = 16; // LSB depth for improved bit allocation on constrained hardware
static int sample_rate = 48000; // Will be set from GetConfig().CGOSampleRate static int sample_rate = 48000; // Will be set from GetConfig().CGOSampleRate
static int channels = 2; // Will be set from GetConfig().CGOChannels static int channels = 2; // Will be set from GetConfig().CGOChannels
static int frame_size = 960; // Will be set from GetConfig().CGOFrameSize static int frame_size = 960; // Will be set from GetConfig().CGOFrameSize
@ -41,10 +42,13 @@ static int max_packet_size = 1500; // Will be set from GetConfig().CGOMaxPa
static int sleep_microseconds = 1000; // Will be set from GetConfig().CGOUsleepMicroseconds static int sleep_microseconds = 1000; // Will be set from GetConfig().CGOUsleepMicroseconds
static int max_attempts_global = 5; // Will be set from GetConfig().CGOMaxAttempts static int max_attempts_global = 5; // Will be set from GetConfig().CGOMaxAttempts
static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMaxBackoffMicroseconds static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMaxBackoffMicroseconds
// Hardware optimization flags for constrained environments
static int use_mmap_access = 0; // Disable MMAP for compatibility (was 1)
static int optimized_buffer_size = 0; // Disable optimized buffer sizing for stability (was 1)
// Function to update constants from Go configuration // Function to update constants from Go configuration
void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint, void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint,
int signal_type, int bandwidth, int dtx, int sr, int ch, int signal_type, int bandwidth, int dtx, int lsb_depth, int sr, int ch,
int fs, int max_pkt, int sleep_us, int max_attempts, int max_backoff) { int fs, int max_pkt, int sleep_us, int max_attempts, int max_backoff) {
opus_bitrate = bitrate; opus_bitrate = bitrate;
opus_complexity = complexity; opus_complexity = complexity;
@ -53,6 +57,7 @@ void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constr
opus_signal_type = signal_type; opus_signal_type = signal_type;
opus_bandwidth = bandwidth; opus_bandwidth = bandwidth;
opus_dtx = dtx; opus_dtx = dtx;
opus_lsb_depth = lsb_depth;
sample_rate = sr; sample_rate = sr;
channels = ch; channels = ch;
frame_size = fs; frame_size = fs;
@ -151,7 +156,16 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) {
err = snd_pcm_hw_params_any(handle, params); err = snd_pcm_hw_params_any(handle, params);
if (err < 0) return err; if (err < 0) return err;
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); // Use MMAP access for direct hardware memory access if enabled
if (use_mmap_access) {
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_MMAP_INTERLEAVED);
if (err < 0) {
// Fallback to RW access if MMAP fails
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
}
} else {
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
}
if (err < 0) return err; if (err < 0) return err;
err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE); err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE);
@ -169,13 +183,25 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) {
if (err < 0) return err; if (err < 0) return err;
} }
// Optimize buffer sizes for low latency // Optimize buffer sizes for constrained hardware
snd_pcm_uframes_t period_size = frame_size; snd_pcm_uframes_t period_size = frame_size;
if (optimized_buffer_size) {
// Use smaller periods for lower latency on constrained hardware
period_size = frame_size / 2;
if (period_size < 64) period_size = 64; // Minimum safe period size
}
err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0); err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0);
if (err < 0) return err; if (err < 0) return err;
// Set buffer size to 4 periods for good latency/stability balance // Optimize buffer size based on hardware constraints
snd_pcm_uframes_t buffer_size = period_size * 4; snd_pcm_uframes_t buffer_size;
if (optimized_buffer_size) {
// Use 2 periods for ultra-low latency on constrained hardware
buffer_size = period_size * 2;
} else {
// Standard 4 periods for good latency/stability balance
buffer_size = period_size * 4;
}
err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size); err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size);
if (err < 0) return err; if (err < 0) return err;
@ -250,14 +276,16 @@ int jetkvm_audio_init() {
return -2; return -2;
} }
// Apply optimized Opus encoder settings // Apply optimized Opus encoder settings for constrained hardware
opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)); opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate));
opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)); opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity));
opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr)); opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr));
opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint)); opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint));
opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type)); opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type));
opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth)); opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth)); // WIDEBAND for compatibility
opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx)); opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx));
// Set LSB depth for improved bit allocation on constrained hardware (disabled for compatibility)
// opus_encoder_ctl(encoder, OPUS_SET_LSB_DEPTH(opus_lsb_depth));
// Enable packet loss concealment for better resilience // Enable packet loss concealment for better resilience
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(5)); opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(5));
// Set prediction disabled for lower latency // Set prediction disabled for lower latency
@ -689,6 +717,7 @@ func cgoAudioInit() error {
C.int(cache.opusSignalType.Load()), C.int(cache.opusSignalType.Load()),
C.int(cache.opusBandwidth.Load()), C.int(cache.opusBandwidth.Load()),
C.int(cache.opusDTX.Load()), C.int(cache.opusDTX.Load()),
C.int(16), // LSB depth for improved bit allocation
C.int(cache.sampleRate.Load()), C.int(cache.sampleRate.Load()),
C.int(cache.channels.Load()), C.int(cache.channels.Load()),
C.int(cache.frameSize.Load()), C.int(cache.frameSize.Load()),

View File

@ -1900,8 +1900,8 @@ func DefaultAudioConfig() *AudioConfigConstants {
// WriteTimeout defines maximum wait time for IPC write operations. // WriteTimeout defines maximum wait time for IPC write operations.
// Used in: ipc_manager.go for preventing indefinite blocking on writes // Used in: ipc_manager.go for preventing indefinite blocking on writes
// Impact: Balances responsiveness with reliability for IPC operations // Impact: Balances responsiveness with reliability for IPC operations
// Default 5 seconds provides reasonable timeout for most system conditions // Optimized to 50ms for real-time audio processing to reduce latency
WriteTimeout: 5 * time.Second, WriteTimeout: 50 * time.Millisecond,
// MaxDroppedFrames defines threshold for dropped frame error handling. // MaxDroppedFrames defines threshold for dropped frame error handling.
// Used in: ipc_manager.go for quality degradation detection and recovery // Used in: ipc_manager.go for quality degradation detection and recovery

View File

@ -243,12 +243,33 @@ type AudioInputServer struct {
// NewAudioInputServer creates a new audio input server // NewAudioInputServer creates a new audio input server
func NewAudioInputServer() (*AudioInputServer, error) { func NewAudioInputServer() (*AudioInputServer, error) {
socketPath := getInputSocketPath() socketPath := getInputSocketPath()
// Remove existing socket if any
os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath) // Retry socket creation with cleanup to handle race conditions
var listener net.Listener
var err error
for i := 0; i < 3; i++ {
// Remove existing socket if any
os.Remove(socketPath)
// Small delay to ensure cleanup completes
if i > 0 {
time.Sleep(10 * time.Millisecond)
}
listener, err = net.Listen("unix", socketPath)
if err == nil {
break
}
// Log retry attempt
if i < 2 {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying")
}
}
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create unix socket: %w", err) return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
} }
// Get initial buffer size from adaptive buffer manager // Get initial buffer size from adaptive buffer manager
@ -320,7 +341,11 @@ func (ais *AudioInputServer) Stop() {
if ais.listener != nil { if ais.listener != nil {
ais.listener.Close() ais.listener.Close()
ais.listener = nil
} }
// Remove socket file to prevent restart issues
os.Remove(getInputSocketPath())
} }
// Close closes the server and cleans up resources // Close closes the server and cleans up resources

View File

@ -1,6 +1,7 @@
package audio package audio
import ( import (
"fmt"
"sync/atomic" "sync/atomic"
"time" "time"
@ -15,6 +16,12 @@ type AudioInputIPCManager struct {
supervisor *AudioInputSupervisor supervisor *AudioInputSupervisor
logger zerolog.Logger logger zerolog.Logger
running int32 running int32
// Connection monitoring and recovery
monitoringEnabled bool
lastConnectionCheck time.Time
connectionFailures int32
recoveryInProgress int32
} }
// NewAudioInputIPCManager creates a new IPC-based audio input manager // NewAudioInputIPCManager creates a new IPC-based audio input manager
@ -33,10 +40,17 @@ func (aim *AudioInputIPCManager) Start() error {
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component") aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component")
// Initialize connection monitoring
aim.monitoringEnabled = true
aim.lastConnectionCheck = time.Now()
atomic.StoreInt32(&aim.connectionFailures, 0)
atomic.StoreInt32(&aim.recoveryInProgress, 0)
err := aim.supervisor.Start() err := aim.supervisor.Start()
if err != nil { if err != nil {
// Ensure proper cleanup on supervisor start failure // Ensure proper cleanup on supervisor start failure
atomic.StoreInt32(&aim.running, 0) atomic.StoreInt32(&aim.running, 0)
aim.monitoringEnabled = false
// Reset metrics on failed start // Reset metrics on failed start
aim.resetMetrics() aim.resetMetrics()
aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor") aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor")
@ -80,6 +94,10 @@ func (aim *AudioInputIPCManager) Stop() {
} }
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component") aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component")
// Disable connection monitoring
aim.monitoringEnabled = false
aim.supervisor.Stop() aim.supervisor.Stop()
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped") aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped")
} }
@ -102,6 +120,11 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
return nil // Empty frame, ignore return nil // Empty frame, ignore
} }
// Check connection health periodically
if aim.monitoringEnabled {
aim.checkConnectionHealth()
}
// Validate frame data // Validate frame data
if err := ValidateAudioFrame(frame); err != nil { if err := ValidateAudioFrame(frame); err != nil {
atomic.AddInt64(&aim.metrics.FramesDropped, 1) atomic.AddInt64(&aim.metrics.FramesDropped, 1)
@ -122,10 +145,21 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
if err != nil { if err != nil {
// Count as dropped frame // Count as dropped frame
atomic.AddInt64(&aim.metrics.FramesDropped, 1) atomic.AddInt64(&aim.metrics.FramesDropped, 1)
// Handle connection failure
if aim.monitoringEnabled {
aim.handleConnectionFailure(err)
}
aim.logger.Debug().Err(err).Msg("failed to send frame via IPC") aim.logger.Debug().Err(err).Msg("failed to send frame via IPC")
return err return err
} }
// Reset connection failure counter on successful send
if aim.monitoringEnabled {
atomic.StoreInt32(&aim.connectionFailures, 0)
}
// Calculate and update latency (end-to-end IPC transmission time) // Calculate and update latency (end-to-end IPC transmission time)
latency := time.Since(startTime) latency := time.Since(startTime)
aim.updateLatencyMetrics(latency) aim.updateLatencyMetrics(latency)
@ -215,6 +249,67 @@ func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) {
} }
} }
// checkConnectionHealth monitors the IPC connection health
func (aim *AudioInputIPCManager) checkConnectionHealth() {
now := time.Now()
// Check connection every 5 seconds
if now.Sub(aim.lastConnectionCheck) < 5*time.Second {
return
}
aim.lastConnectionCheck = now
// Check if supervisor and client are connected
if !aim.supervisor.IsConnected() {
aim.logger.Warn().Str("component", AudioInputIPCComponent).Msg("IPC connection lost, attempting recovery")
aim.handleConnectionFailure(fmt.Errorf("connection health check failed"))
}
}
// handleConnectionFailure manages connection failure recovery
func (aim *AudioInputIPCManager) handleConnectionFailure(err error) {
// Increment failure counter
failures := atomic.AddInt32(&aim.connectionFailures, 1)
// Prevent multiple concurrent recovery attempts
if !atomic.CompareAndSwapInt32(&aim.recoveryInProgress, 0, 1) {
return // Recovery already in progress
}
// Start recovery in a separate goroutine to avoid blocking audio processing
go func() {
defer atomic.StoreInt32(&aim.recoveryInProgress, 0)
aim.logger.Info().
Int32("failures", failures).
Err(err).
Str("component", AudioInputIPCComponent).
Msg("attempting IPC connection recovery")
// Stop and restart the supervisor to recover the connection
aim.supervisor.Stop()
// Brief delay before restart
time.Sleep(100 * time.Millisecond)
// Attempt to restart
if restartErr := aim.supervisor.Start(); restartErr != nil {
aim.logger.Error().
Err(restartErr).
Str("component", AudioInputIPCComponent).
Msg("failed to recover IPC connection")
} else {
aim.logger.Info().
Str("component", AudioInputIPCComponent).
Msg("IPC connection recovered successfully")
// Reset failure counter on successful recovery
atomic.StoreInt32(&aim.connectionFailures, 0)
}
}()
}
// GetDetailedMetrics returns comprehensive performance metrics // GetDetailedMetrics returns comprehensive performance metrics
func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) { func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) {
metrics := aim.GetMetrics() metrics := aim.GetMetrics()

View File

@ -357,9 +357,12 @@ func (ais *AudioInputSupervisor) monitorSubprocess() {
// connectClient attempts to connect the client to the server // connectClient attempts to connect the client to the server
func (ais *AudioInputSupervisor) connectClient() { func (ais *AudioInputSupervisor) connectClient() {
// Wait briefly for the server to start (reduced from 500ms) // Wait briefly for the server to start and create socket
time.Sleep(GetConfig().DefaultSleepDuration) time.Sleep(GetConfig().DefaultSleepDuration)
// Additional small delay to ensure socket is ready after restart
time.Sleep(20 * time.Millisecond)
err := ais.client.Connect() err := ais.client.Connect()
if err != nil { if err != nil {
ais.logger.Error().Err(err).Msg("Failed to connect to audio input server") ais.logger.Error().Err(err).Msg("Failed to connect to audio input server")

View File

@ -1,7 +1,6 @@
package audio package audio
import ( import (
"context"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"net" "net"
@ -149,48 +148,38 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr
binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength()) binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength())
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp())) binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp()))
// Use non-blocking write with timeout // Set write deadline for timeout handling (more efficient than goroutines)
ctx, cancel := context.WithTimeout(context.Background(), GetConfig().WriteTimeout) if deadline := time.Now().Add(GetConfig().WriteTimeout); deadline.After(time.Now()) {
defer cancel() if err := conn.SetWriteDeadline(deadline); err != nil {
// If we can't set deadline, proceed without it
// Create a channel to signal write completion // This maintains compatibility with connections that don't support deadlines
done := make(chan error, 1) _ = err // Explicitly ignore error for linter
go func() {
// Write header using pre-allocated buffer
_, err := conn.Write(optMsg.header[:])
if err != nil {
done <- err
return
} }
}
// Write data if present // Write header using pre-allocated buffer (synchronous for better performance)
if msg.GetLength() > 0 && msg.GetData() != nil { _, err := conn.Write(optMsg.header[:])
_, err = conn.Write(msg.GetData()) if err != nil {
if err != nil { if droppedFramesCounter != nil {
done <- err atomic.AddInt64(droppedFramesCounter, 1)
return
}
} }
done <- nil return err
}() }
// Wait for completion or timeout // Write data if present
select { if msg.GetLength() > 0 && msg.GetData() != nil {
case err := <-done: _, err = conn.Write(msg.GetData())
if err != nil { if err != nil {
if droppedFramesCounter != nil { if droppedFramesCounter != nil {
atomic.AddInt64(droppedFramesCounter, 1) atomic.AddInt64(droppedFramesCounter, 1)
} }
return err return err
} }
return nil
case <-ctx.Done():
// Timeout occurred - drop frame to prevent blocking
if droppedFramesCounter != nil {
atomic.AddInt64(droppedFramesCounter, 1)
}
return fmt.Errorf("write timeout - frame dropped")
} }
// Clear write deadline after successful write
_ = conn.SetWriteDeadline(time.Time{}) // Ignore error as this is cleanup
return nil
} }
// Common connection acceptance with retry logic // Common connection acceptance with retry logic

View File

@ -24,7 +24,7 @@ cd "$AUDIO_LIBS_DIR"
[ -d opus-${OPUS_VERSION} ] || tar xf opus-${OPUS_VERSION}.tar.gz [ -d opus-${OPUS_VERSION} ] || tar xf opus-${OPUS_VERSION}.tar.gz
# Optimization flags for ARM Cortex-A7 with NEON # Optimization flags for ARM Cortex-A7 with NEON
OPTIM_CFLAGS="-O3 -mcpu=cortex-a7 -mfpu=neon -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops" OPTIM_CFLAGS="-O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops"
export CC="${CROSS_PREFIX}-gcc" export CC="${CROSS_PREFIX}-gcc"
export CFLAGS="$OPTIM_CFLAGS" export CFLAGS="$OPTIM_CFLAGS"