mirror of https://github.com/jetkvm/kvm.git
Compare commits
No commits in common. "c40459664f3509d16cac01f045042e640954bafa" and "b63404c26b8da38b68975e8ec12a8f9f1abf5ec6" have entirely different histories.
c40459664f
...
b63404c26b
2
Makefile
2
Makefile
|
|
@ -29,7 +29,7 @@ ALSA_VERSION ?= 1.2.14
|
|||
OPUS_VERSION ?= 1.5.2
|
||||
|
||||
# Optimization flags for ARM Cortex-A7 with NEON
|
||||
OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops
|
||||
OPTIM_CFLAGS := -O3 -mcpu=cortex-a7 -mfpu=neon -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops
|
||||
|
||||
PROMETHEUS_TAG := github.com/prometheus/common/version
|
||||
KVM_PKG_NAME := github.com/jetkvm/kvm
|
||||
|
|
|
|||
|
|
@ -32,9 +32,8 @@ static int opus_complexity = 3; // Will be set from GetConfig().CGOOpusC
|
|||
static int opus_vbr = 1; // Will be set from GetConfig().CGOOpusVBR
|
||||
static int opus_vbr_constraint = 1; // Will be set from GetConfig().CGOOpusVBRConstraint
|
||||
static int opus_signal_type = 3; // Will be set from GetConfig().CGOOpusSignalType
|
||||
static int opus_bandwidth = 1105; // OPUS_BANDWIDTH_WIDEBAND for compatibility (was 1101)
|
||||
static int opus_bandwidth = 1105; // Will be set from GetConfig().CGOOpusBandwidth
|
||||
static int opus_dtx = 0; // Will be set from GetConfig().CGOOpusDTX
|
||||
static int opus_lsb_depth = 16; // LSB depth for improved bit allocation on constrained hardware
|
||||
static int sample_rate = 48000; // Will be set from GetConfig().CGOSampleRate
|
||||
static int channels = 2; // Will be set from GetConfig().CGOChannels
|
||||
static int frame_size = 960; // Will be set from GetConfig().CGOFrameSize
|
||||
|
|
@ -42,13 +41,10 @@ static int max_packet_size = 1500; // Will be set from GetConfig().CGOMaxPa
|
|||
static int sleep_microseconds = 1000; // Will be set from GetConfig().CGOUsleepMicroseconds
|
||||
static int max_attempts_global = 5; // Will be set from GetConfig().CGOMaxAttempts
|
||||
static int max_backoff_us_global = 500000; // Will be set from GetConfig().CGOMaxBackoffMicroseconds
|
||||
// Hardware optimization flags for constrained environments
|
||||
static int use_mmap_access = 0; // Disable MMAP for compatibility (was 1)
|
||||
static int optimized_buffer_size = 0; // Disable optimized buffer sizing for stability (was 1)
|
||||
|
||||
// Function to update constants from Go configuration
|
||||
void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constraint,
|
||||
int signal_type, int bandwidth, int dtx, int lsb_depth, int sr, int ch,
|
||||
int signal_type, int bandwidth, int dtx, int sr, int ch,
|
||||
int fs, int max_pkt, int sleep_us, int max_attempts, int max_backoff) {
|
||||
opus_bitrate = bitrate;
|
||||
opus_complexity = complexity;
|
||||
|
|
@ -57,7 +53,6 @@ void update_audio_constants(int bitrate, int complexity, int vbr, int vbr_constr
|
|||
opus_signal_type = signal_type;
|
||||
opus_bandwidth = bandwidth;
|
||||
opus_dtx = dtx;
|
||||
opus_lsb_depth = lsb_depth;
|
||||
sample_rate = sr;
|
||||
channels = ch;
|
||||
frame_size = fs;
|
||||
|
|
@ -156,16 +151,7 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) {
|
|||
err = snd_pcm_hw_params_any(handle, params);
|
||||
if (err < 0) return err;
|
||||
|
||||
// Use MMAP access for direct hardware memory access if enabled
|
||||
if (use_mmap_access) {
|
||||
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_MMAP_INTERLEAVED);
|
||||
if (err < 0) {
|
||||
// Fallback to RW access if MMAP fails
|
||||
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
|
||||
}
|
||||
} else {
|
||||
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
|
||||
}
|
||||
err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED);
|
||||
if (err < 0) return err;
|
||||
|
||||
err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE);
|
||||
|
|
@ -183,25 +169,13 @@ static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) {
|
|||
if (err < 0) return err;
|
||||
}
|
||||
|
||||
// Optimize buffer sizes for constrained hardware
|
||||
// Optimize buffer sizes for low latency
|
||||
snd_pcm_uframes_t period_size = frame_size;
|
||||
if (optimized_buffer_size) {
|
||||
// Use smaller periods for lower latency on constrained hardware
|
||||
period_size = frame_size / 2;
|
||||
if (period_size < 64) period_size = 64; // Minimum safe period size
|
||||
}
|
||||
err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0);
|
||||
if (err < 0) return err;
|
||||
|
||||
// Optimize buffer size based on hardware constraints
|
||||
snd_pcm_uframes_t buffer_size;
|
||||
if (optimized_buffer_size) {
|
||||
// Use 2 periods for ultra-low latency on constrained hardware
|
||||
buffer_size = period_size * 2;
|
||||
} else {
|
||||
// Standard 4 periods for good latency/stability balance
|
||||
buffer_size = period_size * 4;
|
||||
}
|
||||
// Set buffer size to 4 periods for good latency/stability balance
|
||||
snd_pcm_uframes_t buffer_size = period_size * 4;
|
||||
err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size);
|
||||
if (err < 0) return err;
|
||||
|
||||
|
|
@ -276,16 +250,14 @@ int jetkvm_audio_init() {
|
|||
return -2;
|
||||
}
|
||||
|
||||
// Apply optimized Opus encoder settings for constrained hardware
|
||||
// Apply optimized Opus encoder settings
|
||||
opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_VBR(opus_vbr));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(opus_vbr_constraint));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(opus_signal_type));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth)); // WIDEBAND for compatibility
|
||||
opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx));
|
||||
// Set LSB depth for improved bit allocation on constrained hardware (disabled for compatibility)
|
||||
// opus_encoder_ctl(encoder, OPUS_SET_LSB_DEPTH(opus_lsb_depth));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(opus_bandwidth));
|
||||
opus_encoder_ctl(encoder, OPUS_SET_DTX(opus_dtx));
|
||||
// Enable packet loss concealment for better resilience
|
||||
opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(5));
|
||||
// Set prediction disabled for lower latency
|
||||
|
|
@ -717,7 +689,6 @@ func cgoAudioInit() error {
|
|||
C.int(cache.opusSignalType.Load()),
|
||||
C.int(cache.opusBandwidth.Load()),
|
||||
C.int(cache.opusDTX.Load()),
|
||||
C.int(16), // LSB depth for improved bit allocation
|
||||
C.int(cache.sampleRate.Load()),
|
||||
C.int(cache.channels.Load()),
|
||||
C.int(cache.frameSize.Load()),
|
||||
|
|
|
|||
|
|
@ -1900,8 +1900,8 @@ func DefaultAudioConfig() *AudioConfigConstants {
|
|||
// WriteTimeout defines maximum wait time for IPC write operations.
|
||||
// Used in: ipc_manager.go for preventing indefinite blocking on writes
|
||||
// Impact: Balances responsiveness with reliability for IPC operations
|
||||
// Optimized to 50ms for real-time audio processing to reduce latency
|
||||
WriteTimeout: 50 * time.Millisecond,
|
||||
// Default 5 seconds provides reasonable timeout for most system conditions
|
||||
WriteTimeout: 5 * time.Second,
|
||||
|
||||
// MaxDroppedFrames defines threshold for dropped frame error handling.
|
||||
// Used in: ipc_manager.go for quality degradation detection and recovery
|
||||
|
|
|
|||
|
|
@ -243,33 +243,12 @@ type AudioInputServer struct {
|
|||
// NewAudioInputServer creates a new audio input server
|
||||
func NewAudioInputServer() (*AudioInputServer, error) {
|
||||
socketPath := getInputSocketPath()
|
||||
// Remove existing socket if any
|
||||
os.Remove(socketPath)
|
||||
|
||||
// Retry socket creation with cleanup to handle race conditions
|
||||
var listener net.Listener
|
||||
var err error
|
||||
for i := 0; i < 3; i++ {
|
||||
// Remove existing socket if any
|
||||
os.Remove(socketPath)
|
||||
|
||||
// Small delay to ensure cleanup completes
|
||||
if i > 0 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
listener, err = net.Listen("unix", socketPath)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Log retry attempt
|
||||
if i < 2 {
|
||||
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
|
||||
logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying")
|
||||
}
|
||||
}
|
||||
|
||||
listener, err := net.Listen("unix", socketPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
|
||||
return nil, fmt.Errorf("failed to create unix socket: %w", err)
|
||||
}
|
||||
|
||||
// Get initial buffer size from adaptive buffer manager
|
||||
|
|
@ -341,11 +320,7 @@ func (ais *AudioInputServer) Stop() {
|
|||
|
||||
if ais.listener != nil {
|
||||
ais.listener.Close()
|
||||
ais.listener = nil
|
||||
}
|
||||
|
||||
// Remove socket file to prevent restart issues
|
||||
os.Remove(getInputSocketPath())
|
||||
}
|
||||
|
||||
// Close closes the server and cleans up resources
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
|
|
@ -16,12 +15,6 @@ type AudioInputIPCManager struct {
|
|||
supervisor *AudioInputSupervisor
|
||||
logger zerolog.Logger
|
||||
running int32
|
||||
|
||||
// Connection monitoring and recovery
|
||||
monitoringEnabled bool
|
||||
lastConnectionCheck time.Time
|
||||
connectionFailures int32
|
||||
recoveryInProgress int32
|
||||
}
|
||||
|
||||
// NewAudioInputIPCManager creates a new IPC-based audio input manager
|
||||
|
|
@ -40,17 +33,10 @@ func (aim *AudioInputIPCManager) Start() error {
|
|||
|
||||
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("starting component")
|
||||
|
||||
// Initialize connection monitoring
|
||||
aim.monitoringEnabled = true
|
||||
aim.lastConnectionCheck = time.Now()
|
||||
atomic.StoreInt32(&aim.connectionFailures, 0)
|
||||
atomic.StoreInt32(&aim.recoveryInProgress, 0)
|
||||
|
||||
err := aim.supervisor.Start()
|
||||
if err != nil {
|
||||
// Ensure proper cleanup on supervisor start failure
|
||||
atomic.StoreInt32(&aim.running, 0)
|
||||
aim.monitoringEnabled = false
|
||||
// Reset metrics on failed start
|
||||
aim.resetMetrics()
|
||||
aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor")
|
||||
|
|
@ -94,10 +80,6 @@ func (aim *AudioInputIPCManager) Stop() {
|
|||
}
|
||||
|
||||
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("stopping component")
|
||||
|
||||
// Disable connection monitoring
|
||||
aim.monitoringEnabled = false
|
||||
|
||||
aim.supervisor.Stop()
|
||||
aim.logger.Debug().Str("component", AudioInputIPCComponent).Msg("component stopped")
|
||||
}
|
||||
|
|
@ -120,11 +102,6 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
|
|||
return nil // Empty frame, ignore
|
||||
}
|
||||
|
||||
// Check connection health periodically
|
||||
if aim.monitoringEnabled {
|
||||
aim.checkConnectionHealth()
|
||||
}
|
||||
|
||||
// Validate frame data
|
||||
if err := ValidateAudioFrame(frame); err != nil {
|
||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||
|
|
@ -145,21 +122,10 @@ func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
|
|||
if err != nil {
|
||||
// Count as dropped frame
|
||||
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
|
||||
|
||||
// Handle connection failure
|
||||
if aim.monitoringEnabled {
|
||||
aim.handleConnectionFailure(err)
|
||||
}
|
||||
|
||||
aim.logger.Debug().Err(err).Msg("failed to send frame via IPC")
|
||||
return err
|
||||
}
|
||||
|
||||
// Reset connection failure counter on successful send
|
||||
if aim.monitoringEnabled {
|
||||
atomic.StoreInt32(&aim.connectionFailures, 0)
|
||||
}
|
||||
|
||||
// Calculate and update latency (end-to-end IPC transmission time)
|
||||
latency := time.Since(startTime)
|
||||
aim.updateLatencyMetrics(latency)
|
||||
|
|
@ -249,67 +215,6 @@ func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) {
|
|||
}
|
||||
}
|
||||
|
||||
// checkConnectionHealth monitors the IPC connection health
|
||||
func (aim *AudioInputIPCManager) checkConnectionHealth() {
|
||||
now := time.Now()
|
||||
|
||||
// Check connection every 5 seconds
|
||||
if now.Sub(aim.lastConnectionCheck) < 5*time.Second {
|
||||
return
|
||||
}
|
||||
|
||||
aim.lastConnectionCheck = now
|
||||
|
||||
// Check if supervisor and client are connected
|
||||
if !aim.supervisor.IsConnected() {
|
||||
aim.logger.Warn().Str("component", AudioInputIPCComponent).Msg("IPC connection lost, attempting recovery")
|
||||
aim.handleConnectionFailure(fmt.Errorf("connection health check failed"))
|
||||
}
|
||||
}
|
||||
|
||||
// handleConnectionFailure manages connection failure recovery
|
||||
func (aim *AudioInputIPCManager) handleConnectionFailure(err error) {
|
||||
// Increment failure counter
|
||||
failures := atomic.AddInt32(&aim.connectionFailures, 1)
|
||||
|
||||
// Prevent multiple concurrent recovery attempts
|
||||
if !atomic.CompareAndSwapInt32(&aim.recoveryInProgress, 0, 1) {
|
||||
return // Recovery already in progress
|
||||
}
|
||||
|
||||
// Start recovery in a separate goroutine to avoid blocking audio processing
|
||||
go func() {
|
||||
defer atomic.StoreInt32(&aim.recoveryInProgress, 0)
|
||||
|
||||
aim.logger.Info().
|
||||
Int32("failures", failures).
|
||||
Err(err).
|
||||
Str("component", AudioInputIPCComponent).
|
||||
Msg("attempting IPC connection recovery")
|
||||
|
||||
// Stop and restart the supervisor to recover the connection
|
||||
aim.supervisor.Stop()
|
||||
|
||||
// Brief delay before restart
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Attempt to restart
|
||||
if restartErr := aim.supervisor.Start(); restartErr != nil {
|
||||
aim.logger.Error().
|
||||
Err(restartErr).
|
||||
Str("component", AudioInputIPCComponent).
|
||||
Msg("failed to recover IPC connection")
|
||||
} else {
|
||||
aim.logger.Info().
|
||||
Str("component", AudioInputIPCComponent).
|
||||
Msg("IPC connection recovered successfully")
|
||||
|
||||
// Reset failure counter on successful recovery
|
||||
atomic.StoreInt32(&aim.connectionFailures, 0)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// GetDetailedMetrics returns comprehensive performance metrics
|
||||
func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) {
|
||||
metrics := aim.GetMetrics()
|
||||
|
|
|
|||
|
|
@ -357,12 +357,9 @@ func (ais *AudioInputSupervisor) monitorSubprocess() {
|
|||
|
||||
// connectClient attempts to connect the client to the server
|
||||
func (ais *AudioInputSupervisor) connectClient() {
|
||||
// Wait briefly for the server to start and create socket
|
||||
// Wait briefly for the server to start (reduced from 500ms)
|
||||
time.Sleep(GetConfig().DefaultSleepDuration)
|
||||
|
||||
// Additional small delay to ensure socket is ready after restart
|
||||
time.Sleep(20 * time.Millisecond)
|
||||
|
||||
err := ais.client.Connect()
|
||||
if err != nil {
|
||||
ais.logger.Error().Err(err).Msg("Failed to connect to audio input server")
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package audio
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"fmt"
|
||||
"net"
|
||||
|
|
@ -148,38 +149,48 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr
|
|||
binary.LittleEndian.PutUint32(optMsg.header[5:9], msg.GetLength())
|
||||
binary.LittleEndian.PutUint64(optMsg.header[9:17], uint64(msg.GetTimestamp()))
|
||||
|
||||
// Set write deadline for timeout handling (more efficient than goroutines)
|
||||
if deadline := time.Now().Add(GetConfig().WriteTimeout); deadline.After(time.Now()) {
|
||||
if err := conn.SetWriteDeadline(deadline); err != nil {
|
||||
// If we can't set deadline, proceed without it
|
||||
// This maintains compatibility with connections that don't support deadlines
|
||||
_ = err // Explicitly ignore error for linter
|
||||
}
|
||||
}
|
||||
// Use non-blocking write with timeout
|
||||
ctx, cancel := context.WithTimeout(context.Background(), GetConfig().WriteTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Write header using pre-allocated buffer (synchronous for better performance)
|
||||
_, err := conn.Write(optMsg.header[:])
|
||||
if err != nil {
|
||||
if droppedFramesCounter != nil {
|
||||
atomic.AddInt64(droppedFramesCounter, 1)
|
||||
// Create a channel to signal write completion
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
// Write header using pre-allocated buffer
|
||||
_, err := conn.Write(optMsg.header[:])
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Write data if present
|
||||
if msg.GetLength() > 0 && msg.GetData() != nil {
|
||||
_, err = conn.Write(msg.GetData())
|
||||
// Write data if present
|
||||
if msg.GetLength() > 0 && msg.GetData() != nil {
|
||||
_, err = conn.Write(msg.GetData())
|
||||
if err != nil {
|
||||
done <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
done <- nil
|
||||
}()
|
||||
|
||||
// Wait for completion or timeout
|
||||
select {
|
||||
case err := <-done:
|
||||
if err != nil {
|
||||
if droppedFramesCounter != nil {
|
||||
atomic.AddInt64(droppedFramesCounter, 1)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
// Timeout occurred - drop frame to prevent blocking
|
||||
if droppedFramesCounter != nil {
|
||||
atomic.AddInt64(droppedFramesCounter, 1)
|
||||
}
|
||||
return fmt.Errorf("write timeout - frame dropped")
|
||||
}
|
||||
|
||||
// Clear write deadline after successful write
|
||||
_ = conn.SetWriteDeadline(time.Time{}) // Ignore error as this is cleanup
|
||||
return nil
|
||||
}
|
||||
|
||||
// Common connection acceptance with retry logic
|
||||
|
|
|
|||
|
|
@ -24,7 +24,7 @@ cd "$AUDIO_LIBS_DIR"
|
|||
[ -d opus-${OPUS_VERSION} ] || tar xf opus-${OPUS_VERSION}.tar.gz
|
||||
|
||||
# Optimization flags for ARM Cortex-A7 with NEON
|
||||
OPTIM_CFLAGS="-O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops"
|
||||
OPTIM_CFLAGS="-O3 -mcpu=cortex-a7 -mfpu=neon -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops"
|
||||
|
||||
export CC="${CROSS_PREFIX}-gcc"
|
||||
export CFLAGS="$OPTIM_CFLAGS"
|
||||
|
|
|
|||
Loading…
Reference in New Issue