feat(audio): improve socket handling and validation performance

- Add retry logic for socket file removal and listener creation
- Optimize message writing by combining header and data writes
- Move socket paths from temp dir to /var/run
- Refactor OPUS parameter lookup to use map for better readability
- Simplify validation functions for better performance in hotpaths
This commit is contained in:
Alex P 2025-09-09 10:16:53 +00:00
parent 5d4f4d8e10
commit 0f2aa9abe4
3 changed files with 77 additions and 128 deletions

View File

@ -72,23 +72,12 @@ func ValidateZeroCopyFrame(frame *ZeroCopyAudioFrame) error {
}
// ValidateBufferSize validates buffer size parameters with enhanced boundary checks
// Optimized to use AudioConfigCache for frequently accessed values
// Optimized for minimal overhead in hotpath
func ValidateBufferSize(size int) error {
if size <= 0 {
return fmt.Errorf("%w: buffer size %d must be positive", ErrInvalidBufferSize, size)
}
// Fast path: Check against cached max frame size
cache := Config
maxFrameSize := cache.MaxAudioFrameSize
// Most common case: validating a buffer that's sized for audio frames
if maxFrameSize > 0 && size <= maxFrameSize {
return nil
}
// Use SocketMaxBuffer as the upper limit for general buffer validation
// This allows for socket buffers while still preventing extremely large allocations
// Single boundary check using pre-cached value
if size > Config.SocketMaxBuffer {
return fmt.Errorf("%w: buffer size %d exceeds maximum %d",
ErrInvalidBufferSize, size, Config.SocketMaxBuffer)
@ -219,91 +208,42 @@ func ValidateOutputIPCConfig(sampleRate, channels, frameSize int) error {
}
// ValidateSampleRate validates audio sample rate values
// Optimized to use AudioConfigCache for frequently accessed values
// Optimized for minimal overhead in hotpath
func ValidateSampleRate(sampleRate int) error {
if sampleRate <= 0 {
return fmt.Errorf("%w: sample rate %d must be positive", ErrInvalidSampleRate, sampleRate)
}
// Fast path: Check against cached sample rate first
cache := Config
cachedRate := cache.SampleRate
// Most common case: validating against the current sample rate
if sampleRate == cachedRate {
return nil
}
// Slower path: check against all valid rates
validRates := Config.ValidSampleRates
for _, rate := range validRates {
// Direct validation against valid rates
for _, rate := range Config.ValidSampleRates {
if sampleRate == rate {
return nil
}
}
return fmt.Errorf("%w: sample rate %d not in supported rates %v",
ErrInvalidSampleRate, sampleRate, validRates)
return fmt.Errorf("%w: sample rate %d not in valid rates %v",
ErrInvalidSampleRate, sampleRate, Config.ValidSampleRates)
}
// ValidateChannelCount validates audio channel count
// Optimized to use AudioConfigCache for frequently accessed values
// Optimized for minimal overhead in hotpath
func ValidateChannelCount(channels int) error {
if channels <= 0 {
return fmt.Errorf("%w: channel count %d must be positive", ErrInvalidChannels, channels)
}
// Fast path: Check against cached channels first
cache := Config
cachedChannels := cache.Channels
// Most common case: validating against the current channel count
if channels == cachedChannels {
return nil
}
// Fast path: Check against cached max channels
cachedMaxChannels := cache.MaxChannels
if cachedMaxChannels > 0 && channels <= cachedMaxChannels {
return nil
}
// Slow path: Use current config values
updatedMaxChannels := cache.MaxChannels
if channels > updatedMaxChannels {
// Direct boundary check
if channels > Config.MaxChannels {
return fmt.Errorf("%w: channel count %d exceeds maximum %d",
ErrInvalidChannels, channels, updatedMaxChannels)
ErrInvalidChannels, channels, Config.MaxChannels)
}
return nil
}
// ValidateBitrate validates audio bitrate values (expects kbps)
// Optimized to use AudioConfigCache for frequently accessed values
// Optimized for minimal overhead in hotpath
func ValidateBitrate(bitrate int) error {
if bitrate <= 0 {
return fmt.Errorf("%w: bitrate %d must be positive", ErrInvalidBitrate, bitrate)
}
// Fast path: Check against cached bitrate values
cache := Config
minBitrate := cache.MinOpusBitrate
maxBitrate := cache.MaxOpusBitrate
// If we have valid cached values, use them
if minBitrate > 0 && maxBitrate > 0 {
// Convert kbps to bps for comparison with config limits
bitrateInBps := bitrate * 1000
if bitrateInBps < minBitrate {
return fmt.Errorf("%w: bitrate %d kbps (%d bps) below minimum %d bps",
ErrInvalidBitrate, bitrate, bitrateInBps, minBitrate)
}
if bitrateInBps > maxBitrate {
return fmt.Errorf("%w: bitrate %d kbps (%d bps) exceeds maximum %d bps",
ErrInvalidBitrate, bitrate, bitrateInBps, maxBitrate)
}
return nil
}
// Convert kbps to bps for comparison with config limits
// Direct boundary check with single conversion
bitrateInBps := bitrate * 1000
if bitrateInBps < Config.MinOpusBitrate {
return fmt.Errorf("%w: bitrate %d kbps (%d bps) below minimum %d bps",

View File

@ -8,6 +8,7 @@ import (
"net"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
@ -157,15 +158,38 @@ func (s *UnifiedAudioServer) Start() error {
return fmt.Errorf("server already running")
}
// Remove existing socket file
// Remove existing socket file with retry logic
for i := 0; i < 3; i++ {
if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove existing socket: %w", err)
s.logger.Warn().Err(err).Int("attempt", i+1).Msg("failed to remove existing socket file, retrying")
time.Sleep(100 * time.Millisecond)
continue
}
break
}
// Create listener with retry on address already in use
var listener net.Listener
var err error
for i := 0; i < 3; i++ {
listener, err = net.Listen("unix", s.socketPath)
if err == nil {
break
}
// If address is still in use, try to remove socket file again
if strings.Contains(err.Error(), "address already in use") {
s.logger.Warn().Err(err).Int("attempt", i+1).Msg("socket address in use, attempting cleanup and retry")
os.Remove(s.socketPath)
time.Sleep(200 * time.Millisecond)
continue
}
return fmt.Errorf("failed to create unix socket: %w", err)
}
// Create listener
listener, err := net.Listen("unix", s.socketPath)
if err != nil {
return fmt.Errorf("failed to create listener: %w", err)
return fmt.Errorf("failed to create unix socket after retries: %w", err)
}
s.listener = listener
@ -367,15 +391,19 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error {
func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
// Optimize: Use single write for header+data to reduce system calls
if msg.Length > 0 && msg.Data != nil {
// Pre-allocate combined buffer to avoid copying
combined := make([]byte, len(header)+len(msg.Data))
copy(combined, header)
copy(combined[len(header):], msg.Data)
if _, err := conn.Write(combined); err != nil {
return fmt.Errorf("failed to write message: %w", err)
}
} else {
if _, err := conn.Write(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
// Write data if present
if msg.Length > 0 && msg.Data != nil {
if _, err := conn.Write(msg.Data); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
}
return nil
@ -642,9 +670,9 @@ func (c *UnifiedAudioClient) calculateAdaptiveDelay(attempt int, initialDelay, m
// Helper functions for socket paths
func getInputSocketPath() string {
return filepath.Join(os.TempDir(), inputSocketName)
return filepath.Join("/var/run", inputSocketName)
}
func getOutputSocketPath() string {
return filepath.Join(os.TempDir(), outputSocketName)
return filepath.Join("/var/run", outputSocketName)
}

View File

@ -260,6 +260,16 @@ func GetAudioConfig() AudioConfig {
return currentConfig
}
// Simplified OPUS parameter lookup table
var opusParams = map[AudioQuality]struct {
complexity, vbr, signalType, bandwidth, dtx int
}{
AudioQualityLow: {Config.AudioQualityLowOpusComplexity, Config.AudioQualityLowOpusVBR, Config.AudioQualityLowOpusSignalType, Config.AudioQualityLowOpusBandwidth, Config.AudioQualityLowOpusDTX},
AudioQualityMedium: {Config.AudioQualityMediumOpusComplexity, Config.AudioQualityMediumOpusVBR, Config.AudioQualityMediumOpusSignalType, Config.AudioQualityMediumOpusBandwidth, Config.AudioQualityMediumOpusDTX},
AudioQualityHigh: {Config.AudioQualityHighOpusComplexity, Config.AudioQualityHighOpusVBR, Config.AudioQualityHighOpusSignalType, Config.AudioQualityHighOpusBandwidth, Config.AudioQualityHighOpusDTX},
AudioQualityUltra: {Config.AudioQualityUltraOpusComplexity, Config.AudioQualityUltraOpusVBR, Config.AudioQualityUltraOpusSignalType, Config.AudioQualityUltraOpusBandwidth, Config.AudioQualityUltraOpusDTX},
}
// SetMicrophoneQuality updates the current microphone quality configuration
func SetMicrophoneQuality(quality AudioQuality) {
// Validate audio quality parameter
@ -274,40 +284,11 @@ func SetMicrophoneQuality(quality AudioQuality) {
if config, exists := presets[quality]; exists {
currentMicrophoneConfig = config
// Get OPUS parameters for the selected quality
var complexity, vbr, signalType, bandwidth, dtx int
switch quality {
case AudioQualityLow:
complexity = Config.AudioQualityLowOpusComplexity
vbr = Config.AudioQualityLowOpusVBR
signalType = Config.AudioQualityLowOpusSignalType
bandwidth = Config.AudioQualityLowOpusBandwidth
dtx = Config.AudioQualityLowOpusDTX
case AudioQualityMedium:
complexity = Config.AudioQualityMediumOpusComplexity
vbr = Config.AudioQualityMediumOpusVBR
signalType = Config.AudioQualityMediumOpusSignalType
bandwidth = Config.AudioQualityMediumOpusBandwidth
dtx = Config.AudioQualityMediumOpusDTX
case AudioQualityHigh:
complexity = Config.AudioQualityHighOpusComplexity
vbr = Config.AudioQualityHighOpusVBR
signalType = Config.AudioQualityHighOpusSignalType
bandwidth = Config.AudioQualityHighOpusBandwidth
dtx = Config.AudioQualityHighOpusDTX
case AudioQualityUltra:
complexity = Config.AudioQualityUltraOpusComplexity
vbr = Config.AudioQualityUltraOpusVBR
signalType = Config.AudioQualityUltraOpusSignalType
bandwidth = Config.AudioQualityUltraOpusBandwidth
dtx = Config.AudioQualityUltraOpusDTX
default:
// Use medium quality as fallback
complexity = Config.AudioQualityMediumOpusComplexity
vbr = Config.AudioQualityMediumOpusVBR
signalType = Config.AudioQualityMediumOpusSignalType
bandwidth = Config.AudioQualityMediumOpusBandwidth
dtx = Config.AudioQualityMediumOpusDTX
// Get OPUS parameters using lookup table
params, exists := opusParams[quality]
if !exists {
// Fallback to medium quality
params = opusParams[AudioQualityMedium]
}
// Update audio input subprocess configuration dynamically without restart
@ -315,7 +296,7 @@ func SetMicrophoneQuality(quality AudioQuality) {
// Set new OPUS configuration for future restarts
if supervisor := GetAudioInputSupervisor(); supervisor != nil {
supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx)
supervisor.SetOpusConfig(config.Bitrate*1000, params.complexity, params.vbr, params.signalType, params.bandwidth, params.dtx)
// Check if microphone is active but IPC control is broken
inputManager := getAudioInputManager()
@ -336,11 +317,11 @@ func SetMicrophoneQuality(quality AudioQuality) {
Channels: config.Channels,
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
Bitrate: config.Bitrate * 1000, // Convert kbps to bps
Complexity: complexity,
VBR: vbr,
SignalType: signalType,
Bandwidth: bandwidth,
DTX: dtx,
Complexity: params.complexity,
VBR: params.vbr,
SignalType: params.signalType,
Bandwidth: params.bandwidth,
DTX: params.dtx,
}
if err := supervisor.SendOpusConfig(opusConfig); err != nil {