mirror of https://github.com/jetkvm/kvm.git
Compare commits
3 Commits
b63404c26b
...
c40459664f
| Author | SHA1 | Date |
|---|---|---|
|
|
c40459664f | |
|
|
cdf6731639 | |
|
|
6f15fdf965 |
2
Makefile
2
Makefile
|
|
@ -29,7 +29,7 @@ ALSA_VERSION ?= 1.2.14
|
||||||
OPUS_VERSION ?= 1.5.2
|
OPUS_VERSION ?= 1.5.2
|
||||||
|
|
||||||
# Optimization flags for ARM Cortex-A7 with NEON
|
# Optimization flags for ARM Cortex-A7 with NEON
|
||||||
OPTIM_CFLAGS := -O3 -mcpu=cortex-a7 -mfpu=neon -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops
|
OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops
|
||||||
|
|
||||||
PROMETHEUS_TAG := github.com/prometheus/common/version
|
PROMETHEUS_TAG := github.com/prometheus/common/version
|
||||||
KVM_PKG_NAME := github.com/jetkvm/kvm
|
KVM_PKG_NAME := github.com/jetkvm/kvm
|
||||||
|
|
|
||||||
|
|
@ -32,8 +32,9 @@ static int opus_complexity = 3; // Will be set from GetConfig().CGOOpusC
|
||||||
static int opus_vbr = 1; // Will be set from GetConfig().CGOOpusVBR
|
static int opus_vbr = 1; // Will be set from GetConfig().CGOOpusVBR
|
||||||
static int opus_vbr_constraint = 1; // Will be set from GetConfig().CGOOpusVBRConstraint
|
static int opus_vbr_constraint = 1; // Will be set from GetConfig().CGOOpusVBRConstraint
|
||||||
static int opus_signal_type = 3; // Will be set from GetConfig().CGOOpusSignalType
|
static int opus_signal_type = 3; // Will be set from GetConfig().CGOOpusSignalType
|
||||||
static int opus_bandwidth = 1105; // Will be set from GetConfig().CGOOpusBandwidth
|
static int opus_bandwidth = 1105; // OPUS_BANDWIDTH_WIDEBAND for compatibility (was 1101)
|
||||||
static int opus_dtx = 0; // Will be set from GetConfig().CGOOpusDTX
|
static int opus_dtx = 0; // Will be set from GetConfig().CGOOpusDTX
|
||||||
|
static int opus_lsb_depth = 16; // LSB depth for improved bit allocation on constrained hardware
|
||||||
static int sample_rate = 48000; // Will be set from GetConfig().CGOSampleRate
|
static int sample_rate = 48000; // Will be set from GetConfig().CGOSampleRate
|
||||||
static int channels = 2; // Will be set from GetConfig().CGOChannels
|
static int channels = 2; // Will be set from GetConfig().CGOChannels
|
||||||
static int frame_size = 960; // Will be set from GetConfig().CGOFrameSize
|
static int frame_size = 960; // Will be set from GetConfig().CGOFrameSize
|
||||||
|
|
@ -41,10 +42,13 @@ static int max_packet_size = 1500; // Will be set from GetConfig().CGOMaxPa
|
||||||
static int sleep_microseconds = 1000; // Will be set from GetConfig().CGOUsleepMicroseconds
|
static int sleep_microseconds = 1000; // Will be set from GetConfig().CGOUsleepMicroseconds
|
||||||
static int max_attempts_global = 5; // Will be set from GetConfig().CGOMaxAttempts
|
static int max_attempts_global = 5; // Will be set from GetConfig().CGOMaxAttempts
|
||||||
static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMaxBackoffMicroseconds
|
static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMaxBackoffMicroseconds
|
||||||
|
// Hardware optimization flags for constrained environments
|
||||||
|
static int use_mmap_access = 0; // Disable MMAP for compatibility (was 1)
|
||||||
|
static int optimized_buffer_size = 0; // Disable optimized buffer sizing for stability (was 1)
|
||||||
|
|
||||||
// Function to update constants from Go configuration
|
// Function to update constants from Go configuration
|
||||||
void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint,
|
void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint,
|
||||||
int signal_type, int bandwidth, int dtx, int sr, int ch,
|
int signal_type, int bandwidth, int dtx, int lsb_depth, int sr, int ch,
|
||||||
int fs, int max_pkt, int sleep_us, int max_attempts, int max_backoff) {
|
int fs, int max_pkt, int sleep_us, int max_attempts, int max_backoff) {
|
||||||
opus_bitrate = bitrate;
|
opus_bitrate = bitrate;
|
||||||
opus_complexity = complexity;
|
opus_complexity = complexity;
|
||||||
|
|
@ -53,6 +57,7 @@ void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constr
|
||||||
opus_signal_type = signal_type;
|
opus_signal_type = signal_type;
|
||||||
opus_bandwidth = bandwidth;
|
opus_bandwidth = bandwidth;
|
||||||
opus_dtx = dtx;
|
opus_dtx = dtx;
|
||||||
|
opus_lsb_depth = lsb_depth;
|
||||||
sample_rate = sr;
|
sample_rate = sr;
|
||||||
channels = ch;
|
channels = ch;
|
||||||
frame_size = fs;
|
frame_size = fs;
|
||||||
|
|
@ -151,7 +156,16 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) {
|
||||||
err = snd_pcm_hw_params_any(handle, params);
|
err = snd_pcm_hw_params_any(handle, params);
|
||||||
if (err < 0) return err;
|
if (err < 0) return err;
|
||||||
|
|
||||||
|
// Use MMAP access for direct hardware memory access if enabled
|
||||||
|
if (use_mmap_access) {
|
||||||
|
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_MMAP_INTERLEAVED);
|
||||||
|
if (err < 0) {
|
||||||
|
// Fallback to RW access if MMAP fails
|
||||||
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
|
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
|
||||||
|
}
|
||||||
if (err < 0) return err;
|
if (err < 0) return err;
|
||||||
|
|
||||||
err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE);
|
err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE);
|
||||||
|
|
@ -169,13 +183,25 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) {
|
||||||
if (err < 0) return err;
|
if (err < 0) return err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Optimize buffer sizes for low latency
|
// Optimize buffer sizes for constrained hardware
|
||||||
snd_pcm_uframes_t period_size = frame_size;
|
snd_pcm_uframes_t period_size = frame_size;
|
||||||
|
if (optimized_buffer_size) {
|
||||||
|
// Use smaller periods for lower latency on constrained hardware
|
||||||
|
period_size = frame_size / 2;
|
||||||
|
if (period_size < 64) period_size = 64; // Minimum safe period size
|
||||||
|
}
|
||||||
err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0);
|
err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0);
|
||||||
if (err < 0) return err;
|
if (err < 0) return err;
|
||||||
|
|
||||||
// Set buffer size to 4 periods for good latency/stability balance
|
// Optimize buffer size based on hardware constraints
|
||||||
snd_pcm_uframes_t buffer_size = period_size * 4;
|
snd_pcm_uframes_t buffer_size;
|
||||||
|
if (optimized_buffer_size) {
|
||||||
|
// Use 2 periods for ultra-low latency on constrained hardware
|
||||||
|
buffer_size = period_size * 2;
|
||||||
|
} else {
|
||||||
|
// Standard 4 periods for good latency/stability balance
|
||||||
|
buffer_size = period_size * 4;
|
||||||
|
}
|
||||||
err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size);
|
err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size);
|
||||||
if (err < 0) return err;
|
if (err < 0) return err;
|
||||||
|
|
||||||
|
|
@ -250,14 +276,16 @@ int jetkvm_audio_init() {
|
||||||
return -2;
|
return -2;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Apply optimized Opus encoder settings
|
// Apply optimized Opus encoder settings for constrained hardware
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate));
|
opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate));
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity));
|
opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity));
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr));
|
opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr));
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint));
|
opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint));
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type));
|
opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type));
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth));
|
opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth)); // WIDEBAND for compatibility
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx));
|
opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx));
|
||||||
|
// Set LSB depth for improved bit allocation on constrained hardware (disabled for compatibility)
|
||||||
|
// opus_encoder_ctl(encoder, OPUS_SET_LSB_DEPTH(opus_lsb_depth));
|
||||||
// Enable packet loss concealment for better resilience
|
// Enable packet loss concealment for better resilience
|
||||||
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(5));
|
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(5));
|
||||||
// Set prediction disabled for lower latency
|
// Set prediction disabled for lower latency
|
||||||
|
|
@ -689,6 +717,7 @@ func cgoAudioInit() error {
|
||||||
C.int(cache.opusSignalType.Load()),
|
C.int(cache.opusSignalType.Load()),
|
||||||
C.int(cache.opusBandwidth.Load()),
|
C.int(cache.opusBandwidth.Load()),
|
||||||
C.int(cache.opusDTX.Load()),
|
C.int(cache.opusDTX.Load()),
|
||||||
|
C.int(16), // LSB depth for improved bit allocation
|
||||||
C.int(cache.sampleRate.Load()),
|
C.int(cache.sampleRate.Load()),
|
||||||
C.int(cache.channels.Load()),
|
C.int(cache.channels.Load()),
|
||||||
C.int(cache.frameSize.Load()),
|
C.int(cache.frameSize.Load()),
|
||||||
|
|
|
||||||
|
|
@ -1900,8 +1900,8 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
||||||
// WriteTimeout defines maximum wait time for IPC write operations.
|
// WriteTimeout defines maximum wait time for IPC write operations.
|
||||||
// Used in: ipc_manager.go for preventing indefinite blocking on writes
|
// Used in: ipc_manager.go for preventing indefinite blocking on writes
|
||||||
// Impact: Balances responsiveness with reliability for IPC operations
|
// Impact: Balances responsiveness with reliability for IPC operations
|
||||||
// Default 5 seconds provides reasonable timeout for most system conditions
|
// Optimized to 50ms for real-time audio processing to reduce latency
|
||||||
WriteTimeout: 5 * time.Second,
|
WriteTimeout: 50 * time.Millisecond,
|
||||||
|
|
||||||
// MaxDroppedFrames defines threshold for dropped frame error handling.
|
// MaxDroppedFrames defines threshold for dropped frame error handling.
|
||||||
// Used in: ipc_manager.go for quality degradation detection and recovery
|
// Used in: ipc_manager.go for quality degradation detection and recovery
|
||||||
|
|
|
||||||
|
|
@ -243,12 +243,33 @@ type AudioInputServer struct {
|
||||||
// NewAudioInputServer creates a new audio input server
|
// NewAudioInputServer creates a new audio input server
|
||||||
func NewAudioInputServer() (*AudioInputServer, error) {
|
func NewAudioInputServer() (*AudioInputServer, error) {
|
||||||
socketPath := getInputSocketPath()
|
socketPath := getInputSocketPath()
|
||||||
|
|
||||||
|
// Retry socket creation with cleanup to handle race conditions
|
||||||
|
var listener net.Listener
|
||||||
|
var err error
|
||||||
|
for i := 0; i < 3; i++ {
|
||||||
// Remove existing socket if any
|
// Remove existing socket if any
|
||||||
os.Remove(socketPath)
|
os.Remove(socketPath)
|
||||||
|
|
||||||
listener, err := net.Listen("unix", socketPath)
|
// Small delay to ensure cleanup completes
|
||||||
|
if i > 0 {
|
||||||
|
time.Sleep(10 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
listener, err = net.Listen("unix", socketPath)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log retry attempt
|
||||||
|
if i < 2 {
|
||||||
|
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
|
||||||
|
logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create unix socket: %w", err)
|
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get initial buffer size from adaptive buffer manager
|
// Get initial buffer size from adaptive buffer manager
|
||||||
|
|
@ -320,7 +341,11 @@ func (ais *AudioInputServer) Stop() {
|
||||||
|
|
||||||
if ais.listener != nil {
|
if ais.listener != nil {
|
||||||
ais.listener.Close()
|
ais.listener.Close()
|
||||||
|
ais.listener = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Remove socket file to prevent restart issues
|
||||||
|
os.Remove(getInputSocketPath())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the server and cleans up resources
|
// Close closes the server and cleans up resources
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package audio
|
package audio
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
|
@ -15,6 +16,12 @@ type AudioInputIPCManager struct {
|
||||||
supervisor *AudioInputSupervisor
|
supervisor *AudioInputSupervisor
|
||||||
logger zerolog.Logger
|
logger zerolog.Logger
|
||||||
running int32
|
running int32
|
||||||
|
|
||||||
|
// Connection monitoring and recovery
|
||||||
|
monitoringEnabled bool
|
||||||
|
lastConnectionCheck time.Time
|
||||||
|
connectionFailures int32
|
||||||
|
recoveryInProgress int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAudioInputIPCManager creates a new IPC-based audio input manager
|
// NewAudioInputIPCManager creates a new IPC-based audio input manager
|
||||||
|
|
@ -33,10 +40,17 @@ func (aim *AudioInputIPCManager) Start() error {
|
||||||
|
|
||||||
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component")
|
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component")
|
||||||
|
|
||||||
|
// Initialize connection monitoring
|
||||||
|
aim.monitoringEnabled = true
|
||||||
|
aim.lastConnectionCheck = time.Now()
|
||||||
|
atomic.StoreInt32(&aim.connectionFailures, 0)
|
||||||
|
atomic.StoreInt32(&aim.recoveryInProgress, 0)
|
||||||
|
|
||||||
err := aim.supervisor.Start()
|
err := aim.supervisor.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Ensure proper cleanup on supervisor start failure
|
// Ensure proper cleanup on supervisor start failure
|
||||||
atomic.StoreInt32(&aim.running, 0)
|
atomic.StoreInt32(&aim.running, 0)
|
||||||
|
aim.monitoringEnabled = false
|
||||||
// Reset metrics on failed start
|
// Reset metrics on failed start
|
||||||
aim.resetMetrics()
|
aim.resetMetrics()
|
||||||
aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor")
|
aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor")
|
||||||
|
|
@ -80,6 +94,10 @@ func (aim *AudioInputIPCManager) Stop() {
|
||||||
}
|
}
|
||||||
|
|
||||||
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component")
|
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component")
|
||||||
|
|
||||||
|
// Disable connection monitoring
|
||||||
|
aim.monitoringEnabled = false
|
||||||
|
|
||||||
aim.supervisor.Stop()
|
aim.supervisor.Stop()
|
||||||
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped")
|
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped")
|
||||||
}
|
}
|
||||||
|
|
@ -102,6 +120,11 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
|
||||||
return nil // Empty frame, ignore
|
return nil // Empty frame, ignore
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check connection health periodically
|
||||||
|
if aim.monitoringEnabled {
|
||||||
|
aim.checkConnectionHealth()
|
||||||
|
}
|
||||||
|
|
||||||
// Validate frame data
|
// Validate frame data
|
||||||
if err := ValidateAudioFrame(frame); err != nil {
|
if err := ValidateAudioFrame(frame); err != nil {
|
||||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||||
|
|
@ -122,10 +145,21 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Count as dropped frame
|
// Count as dropped frame
|
||||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||||
|
|
||||||
|
// Handle connection failure
|
||||||
|
if aim.monitoringEnabled {
|
||||||
|
aim.handleConnectionFailure(err)
|
||||||
|
}
|
||||||
|
|
||||||
aim.logger.Debug().Err(err).Msg("failed to send frame via IPC")
|
aim.logger.Debug().Err(err).Msg("failed to send frame via IPC")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Reset connection failure counter on successful send
|
||||||
|
if aim.monitoringEnabled {
|
||||||
|
atomic.StoreInt32(&aim.connectionFailures, 0)
|
||||||
|
}
|
||||||
|
|
||||||
// Calculate and update latency (end-to-end IPC transmission time)
|
// Calculate and update latency (end-to-end IPC transmission time)
|
||||||
latency := time.Since(startTime)
|
latency := time.Since(startTime)
|
||||||
aim.updateLatencyMetrics(latency)
|
aim.updateLatencyMetrics(latency)
|
||||||
|
|
@ -215,6 +249,67 @@ func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkConnectionHealth monitors the IPC connection health
|
||||||
|
func (aim *AudioInputIPCManager) checkConnectionHealth() {
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Check connection every 5 seconds
|
||||||
|
if now.Sub(aim.lastConnectionCheck) < 5*time.Second {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
aim.lastConnectionCheck = now
|
||||||
|
|
||||||
|
// Check if supervisor and client are connected
|
||||||
|
if !aim.supervisor.IsConnected() {
|
||||||
|
aim.logger.Warn().Str("component", AudioInputIPCComponent).Msg("IPC connection lost, attempting recovery")
|
||||||
|
aim.handleConnectionFailure(fmt.Errorf("connection health check failed"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleConnectionFailure manages connection failure recovery
|
||||||
|
func (aim *AudioInputIPCManager) handleConnectionFailure(err error) {
|
||||||
|
// Increment failure counter
|
||||||
|
failures := atomic.AddInt32(&aim.connectionFailures, 1)
|
||||||
|
|
||||||
|
// Prevent multiple concurrent recovery attempts
|
||||||
|
if !atomic.CompareAndSwapInt32(&aim.recoveryInProgress, 0, 1) {
|
||||||
|
return // Recovery already in progress
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start recovery in a separate goroutine to avoid blocking audio processing
|
||||||
|
go func() {
|
||||||
|
defer atomic.StoreInt32(&aim.recoveryInProgress, 0)
|
||||||
|
|
||||||
|
aim.logger.Info().
|
||||||
|
Int32("failures", failures).
|
||||||
|
Err(err).
|
||||||
|
Str("component", AudioInputIPCComponent).
|
||||||
|
Msg("attempting IPC connection recovery")
|
||||||
|
|
||||||
|
// Stop and restart the supervisor to recover the connection
|
||||||
|
aim.supervisor.Stop()
|
||||||
|
|
||||||
|
// Brief delay before restart
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Attempt to restart
|
||||||
|
if restartErr := aim.supervisor.Start(); restartErr != nil {
|
||||||
|
aim.logger.Error().
|
||||||
|
Err(restartErr).
|
||||||
|
Str("component", AudioInputIPCComponent).
|
||||||
|
Msg("failed to recover IPC connection")
|
||||||
|
} else {
|
||||||
|
aim.logger.Info().
|
||||||
|
Str("component", AudioInputIPCComponent).
|
||||||
|
Msg("IPC connection recovered successfully")
|
||||||
|
|
||||||
|
// Reset failure counter on successful recovery
|
||||||
|
atomic.StoreInt32(&aim.connectionFailures, 0)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// GetDetailedMetrics returns comprehensive performance metrics
|
// GetDetailedMetrics returns comprehensive performance metrics
|
||||||
func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) {
|
func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) {
|
||||||
metrics := aim.GetMetrics()
|
metrics := aim.GetMetrics()
|
||||||
|
|
|
||||||
|
|
@ -357,9 +357,12 @@ func (ais *AudioInputSupervisor) monitorSubprocess() {
|
||||||
|
|
||||||
// connectClient attempts to connect the client to the server
|
// connectClient attempts to connect the client to the server
|
||||||
func (ais *AudioInputSupervisor) connectClient() {
|
func (ais *AudioInputSupervisor) connectClient() {
|
||||||
// Wait briefly for the server to start (reduced from 500ms)
|
// Wait briefly for the server to start and create socket
|
||||||
time.Sleep(GetConfig().DefaultSleepDuration)
|
time.Sleep(GetConfig().DefaultSleepDuration)
|
||||||
|
|
||||||
|
// Additional small delay to ensure socket is ready after restart
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
|
||||||
err := ais.client.Connect()
|
err := ais.client.Connect()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ais.logger.Error().Err(err).Msg("Failed to connect to audio input server")
|
ais.logger.Error().Err(err).Msg("Failed to connect to audio input server")
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package audio
|
package audio
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
|
@ -149,48 +148,38 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr
|
||||||
binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength())
|
binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength())
|
||||||
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp()))
|
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp()))
|
||||||
|
|
||||||
// Use non-blocking write with timeout
|
// Set write deadline for timeout handling (more efficient than goroutines)
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), GetConfig().WriteTimeout)
|
if deadline := time.Now().Add(GetConfig().WriteTimeout); deadline.After(time.Now()) {
|
||||||
defer cancel()
|
if err := conn.SetWriteDeadline(deadline); err != nil {
|
||||||
|
// If we can't set deadline, proceed without it
|
||||||
|
// This maintains compatibility with connections that don't support deadlines
|
||||||
|
_ = err // Explicitly ignore error for linter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create a channel to signal write completion
|
// Write header using pre-allocated buffer (synchronous for better performance)
|
||||||
done := make(chan error, 1)
|
|
||||||
go func() {
|
|
||||||
// Write header using pre-allocated buffer
|
|
||||||
_, err := conn.Write(optMsg.header[:])
|
_, err := conn.Write(optMsg.header[:])
|
||||||
if err != nil {
|
|
||||||
done <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write data if present
|
|
||||||
if msg.GetLength() > 0 && msg.GetData() != nil {
|
|
||||||
_, err = conn.Write(msg.GetData())
|
|
||||||
if err != nil {
|
|
||||||
done <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
done <- nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Wait for completion or timeout
|
|
||||||
select {
|
|
||||||
case err := <-done:
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if droppedFramesCounter != nil {
|
if droppedFramesCounter != nil {
|
||||||
atomic.AddInt64(droppedFramesCounter, 1)
|
atomic.AddInt64(droppedFramesCounter, 1)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
case <-ctx.Done():
|
// Write data if present
|
||||||
// Timeout occurred - drop frame to prevent blocking
|
if msg.GetLength() > 0 && msg.GetData() != nil {
|
||||||
|
_, err = conn.Write(msg.GetData())
|
||||||
|
if err != nil {
|
||||||
if droppedFramesCounter != nil {
|
if droppedFramesCounter != nil {
|
||||||
atomic.AddInt64(droppedFramesCounter, 1)
|
atomic.AddInt64(droppedFramesCounter, 1)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("write timeout - frame dropped")
|
return err
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear write deadline after successful write
|
||||||
|
_ = conn.SetWriteDeadline(time.Time{}) // Ignore error as this is cleanup
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Common connection acceptance with retry logic
|
// Common connection acceptance with retry logic
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,7 @@ cd "$AUDIO_LIBS_DIR"
|
||||||
[ -d opus-${OPUS_VERSION} ] || tar xf opus-${OPUS_VERSION}.tar.gz
|
[ -d opus-${OPUS_VERSION} ] || tar xf opus-${OPUS_VERSION}.tar.gz
|
||||||
|
|
||||||
# Optimization flags for ARM Cortex-A7 with NEON
|
# Optimization flags for ARM Cortex-A7 with NEON
|
||||||
OPTIM_CFLAGS="-O3 -mcpu=cortex-a7 -mfpu=neon -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops"
|
OPTIM_CFLAGS="-O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops"
|
||||||
|
|
||||||
export CC="${CROSS_PREFIX}-gcc"
|
export CC="${CROSS_PREFIX}-gcc"
|
||||||
export CFLAGS="$OPTIM_CFLAGS"
|
export CFLAGS="$OPTIM_CFLAGS"
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue