kvm/internal/audio/ipc_input.go

1330 lines
38 KiB
Go

package audio
import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"runtime"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Component name constants for logging
const (
AudioInputServerComponent = "audio-input-server"
AudioInputClientComponent = "audio-input-client"
)
// Constants are now defined in unified_ipc.go
var (
maxFrameSize = Config.MaxFrameSize // Maximum Opus frame size
messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size
)
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
type OptimizedIPCMessage struct {
header [17]byte
data []byte
msg UnifiedIPCMessage
}
// MessagePool manages a pool of reusable messages to reduce allocations
type MessagePool struct {
hitCount int64
missCount int64
pool chan *OptimizedIPCMessage
preallocated []*OptimizedIPCMessage
preallocSize int
maxPoolSize int
mutex sync.RWMutex
}
// Global message pool instance
var globalMessagePool = &MessagePool{
pool: make(chan *OptimizedIPCMessage, messagePoolSize),
}
var messagePoolInitOnce sync.Once
// initializeMessagePool initializes the global message pool with pre-allocated messages
func initializeMessagePool() {
messagePoolInitOnce.Do(func() {
preallocSize := messagePoolSize / 4 // 25% pre-allocated for immediate use
globalMessagePool.preallocSize = preallocSize
globalMessagePool.maxPoolSize = messagePoolSize * Config.PoolGrowthMultiplier // Allow growth up to 2x
globalMessagePool.preallocated = make([]*OptimizedIPCMessage, 0, preallocSize)
// Pre-allocate messages for immediate use
for i := 0; i < preallocSize; i++ {
msg := &OptimizedIPCMessage{
data: make([]byte, 0, maxFrameSize),
}
globalMessagePool.preallocated = append(globalMessagePool.preallocated, msg)
}
// Fill the channel with remaining messages
for i := preallocSize; i < messagePoolSize; i++ {
globalMessagePool.pool <- &OptimizedIPCMessage{
data: make([]byte, 0, maxFrameSize),
}
}
})
}
// Get retrieves a message from the pool
func (mp *MessagePool) Get() *OptimizedIPCMessage {
initializeMessagePool()
// First try pre-allocated messages for fastest access
mp.mutex.Lock()
if len(mp.preallocated) > 0 {
msg := mp.preallocated[len(mp.preallocated)-1]
mp.preallocated = mp.preallocated[:len(mp.preallocated)-1]
mp.mutex.Unlock()
atomic.AddInt64(&mp.hitCount, 1)
// Reset message for reuse
msg.data = msg.data[:0]
msg.msg = UnifiedIPCMessage{}
return msg
}
mp.mutex.Unlock()
// Try channel pool next
select {
case msg := <-mp.pool:
atomic.AddInt64(&mp.hitCount, 1)
// Reset message for reuse and ensure proper capacity
msg.data = msg.data[:0]
msg.msg = UnifiedIPCMessage{}
// Ensure data buffer has sufficient capacity
if cap(msg.data) < maxFrameSize {
msg.data = make([]byte, 0, maxFrameSize)
}
return msg
default:
// Pool exhausted, create new message with exact capacity
atomic.AddInt64(&mp.missCount, 1)
return &OptimizedIPCMessage{
data: make([]byte, 0, maxFrameSize),
}
}
}
// Put returns a message to the pool
func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
if msg == nil {
return
}
// Validate buffer capacity - reject if too small or too large
if cap(msg.data) < maxFrameSize/2 || cap(msg.data) > maxFrameSize*2 {
return // Let GC handle oversized or undersized buffers
}
// Reset the message for reuse
msg.data = msg.data[:0]
msg.msg = UnifiedIPCMessage{}
// First try to return to pre-allocated pool for fastest reuse
mp.mutex.Lock()
if len(mp.preallocated) < mp.preallocSize {
mp.preallocated = append(mp.preallocated, msg)
mp.mutex.Unlock()
return
}
mp.mutex.Unlock()
// Try channel pool next
select {
case mp.pool <- msg:
// Successfully returned to pool
default:
// Pool full, let GC handle it
}
}
type AudioInputServer struct {
bufferSize int64
processingTime int64
droppedFrames int64
totalFrames int64
listener net.Listener
conn net.Conn
mtx sync.Mutex
running bool
messageChan chan *UnifiedIPCMessage
processChan chan *UnifiedIPCMessage
stopChan chan struct{}
wg sync.WaitGroup
channelMutex sync.RWMutex
lastBufferSize int64
socketBufferConfig SocketBufferConfig
}
// NewAudioInputServer creates a new audio input server
func NewAudioInputServer() (*AudioInputServer, error) {
socketPath := getInputSocketPath()
// Retry socket creation with cleanup to handle race conditions
var listener net.Listener
var err error
for i := 0; i < 3; i++ {
// Remove existing socket if any
os.Remove(socketPath)
// Small delay to ensure cleanup completes
if i > 0 {
time.Sleep(10 * time.Millisecond)
}
listener, err = net.Listen("unix", socketPath)
if err == nil {
break
}
// Log retry attempt
if i < 2 {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Int("attempt", i+1).Msg("Failed to create unix socket, retrying")
}
}
if err != nil {
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
}
// Get initial buffer size (512 frames for stability)
initialBufferSize := int64(512)
// Ensure minimum buffer size to prevent immediate overflow
// Use at least 50 frames to handle burst traffic
minBufferSize := int64(50)
if initialBufferSize < minBufferSize {
initialBufferSize = minBufferSize
}
// Initialize socket buffer configuration
socketBufferConfig := DefaultSocketBufferConfig()
return &AudioInputServer{
listener: listener,
messageChan: make(chan *UnifiedIPCMessage, initialBufferSize),
processChan: make(chan *UnifiedIPCMessage, initialBufferSize),
stopChan: make(chan struct{}),
bufferSize: initialBufferSize,
lastBufferSize: initialBufferSize,
socketBufferConfig: socketBufferConfig,
}, nil
}
// Start starts the audio input server
func (ais *AudioInputServer) Start() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.running {
return fmt.Errorf("server already running")
}
ais.running = true
// Reset counters on start
atomic.StoreInt64(&ais.totalFrames, 0)
atomic.StoreInt64(&ais.droppedFrames, 0)
atomic.StoreInt64(&ais.processingTime, 0)
// Start triple-goroutine architecture
ais.startReaderGoroutine()
ais.startProcessorGoroutine()
ais.startMonitorGoroutine()
// Submit the connection acceptor directly
go ais.acceptConnections()
return nil
}
// Stop stops the audio input server
func (ais *AudioInputServer) Stop() {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if !ais.running {
return
}
ais.running = false
// Signal all goroutines to stop
close(ais.stopChan)
ais.wg.Wait()
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
if ais.listener != nil {
ais.listener.Close()
ais.listener = nil
}
// Remove socket file to prevent restart issues
os.Remove(getInputSocketPath())
}
// Close closes the server and cleans up resources
func (ais *AudioInputServer) Close() {
ais.Stop()
// Remove socket file
os.Remove(getInputSocketPath())
}
// acceptConnections accepts incoming connections
func (ais *AudioInputServer) acceptConnections() {
for ais.running {
conn, err := ais.listener.Accept()
if err != nil {
if ais.running {
// Log error and continue accepting
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Msg("failed to accept connection, retrying")
continue
}
return
}
// Configure socket buffers for optimal performance
if err := ConfigureSocketBuffers(conn, ais.socketBufferConfig); err != nil {
// Log warning but don't fail - socket buffer optimization is not critical
logger := logging.GetDefaultLogger().With().Str("component", "audio-input").Logger()
logger.Warn().Err(err).Msg("failed to configure socket buffers, using defaults")
} else {
// Record socket buffer metrics for monitoring
RecordSocketBufferMetrics(conn, "audio-input")
}
ais.mtx.Lock()
// Close existing connection if any to prevent resource leaks
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
ais.conn = conn
ais.mtx.Unlock()
// Handle this connection using the goroutine pool
// Handle the connection directly
go ais.handleConnection(conn)
}
}
// handleConnection handles a single client connection
func (ais *AudioInputServer) handleConnection(conn net.Conn) {
defer conn.Close()
// Connection is now handled by the reader goroutine
// Just wait for connection to close or stop signal
for {
select {
case <-ais.stopChan:
return
default:
// Check if connection is still alive
if ais.conn == nil {
return
}
time.Sleep(Config.DefaultSleepDuration)
}
}
}
// readMessage reads a message from the connection using optimized pooled buffers with validation.
//
// Validation Rules:
// - Magic number must match InputMagicNumber ("JKMI" - JetKVM Microphone Input)
// - Message length must not exceed MaxFrameSize (default: 4096 bytes)
// - Header size is fixed at 17 bytes (4+1+4+8: Magic+Type+Length+Timestamp)
// - Data length validation prevents buffer overflow attacks
//
// Message Format:
// - Magic (4 bytes): Identifies valid JetKVM audio messages
// - Type (1 byte): InputMessageType (OpusFrame, Config, Stop, Heartbeat, Ack)
// - Length (4 bytes): Data payload size in bytes
// - Timestamp (8 bytes): Message timestamp for latency tracking
// - Data (variable): Message payload up to MaxFrameSize
//
// Error Conditions:
// - Invalid magic number: Rejects non-JetKVM messages
// - Message too large: Prevents memory exhaustion
// - Connection errors: Network/socket failures
// - Incomplete reads: Partial message reception
//
// The function uses pooled buffers for efficient memory management and
// ensures all messages conform to the JetKVM audio protocol specification.
func (ais *AudioInputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) {
// Get optimized message from pool
optMsg := globalMessagePool.Get()
defer globalMessagePool.Put(optMsg)
// Read header directly into pre-allocated buffer
_, err := io.ReadFull(conn, optMsg.header[:])
if err != nil {
return nil, err
}
// Parse header using optimized access
msg := &optMsg.msg
msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4])
msg.Type = UnifiedMessageType(optMsg.header[4])
msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9])
msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
// Validate magic number
if msg.Magic != inputMagicNumber {
return nil, fmt.Errorf("invalid magic number: got 0x%x, expected 0x%x", msg.Magic, inputMagicNumber)
}
// Validate message length
if msg.Length > uint32(maxFrameSize) {
return nil, fmt.Errorf("message too large: got %d bytes, maximum allowed %d bytes", msg.Length, maxFrameSize)
}
// Read data if present using pooled buffer
if msg.Length > 0 {
// Ensure buffer capacity
if cap(optMsg.data) < int(msg.Length) {
optMsg.data = make([]byte, msg.Length)
} else {
optMsg.data = optMsg.data[:msg.Length]
}
_, err = io.ReadFull(conn, optMsg.data)
if err != nil {
return nil, err
}
msg.Data = optMsg.data
}
// Return a copy of the message (data will be copied by caller if needed)
result := &UnifiedIPCMessage{
Magic: msg.Magic,
Type: msg.Type,
Length: msg.Length,
Timestamp: msg.Timestamp,
}
if msg.Length > 0 {
// Copy data to ensure it's not affected by buffer reuse
result.Data = make([]byte, msg.Length)
copy(result.Data, msg.Data)
}
return result, nil
}
// processMessage processes a received message
func (ais *AudioInputServer) processMessage(msg *UnifiedIPCMessage) error {
switch msg.Type {
case MessageTypeOpusFrame:
return ais.processOpusFrame(msg.Data)
case MessageTypeConfig:
return ais.processConfig(msg.Data)
case MessageTypeOpusConfig:
return ais.processOpusConfig(msg.Data)
case MessageTypeStop:
return fmt.Errorf("stop message received")
case MessageTypeHeartbeat:
return ais.sendAck()
default:
return fmt.Errorf("unknown message type: %d", msg.Type)
}
}
// processOpusFrame processes an Opus audio frame
func (ais *AudioInputServer) processOpusFrame(data []byte) error {
// Inline validation for critical audio path - avoid function call overhead
dataLen := len(data)
cachedMaxFrameSize := maxFrameSize
if dataLen > cachedMaxFrameSize {
return ErrFrameDataTooLarge
}
// Get cached config once - avoid repeated calls and locking
cache := Config
// Skip cache expiry check in hotpath - background updates handle this
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize)
defer ReturnBufferToPool(pcmBuffer)
// Log audio processing details periodically for monitoring
totalFrames := atomic.AddInt64(&ais.totalFrames, 1)
// Zero-cost debug logging for buffer allocation (first few operations)
// Only perform computations if trace logging is actually enabled
if totalFrames <= 5 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if logger.GetLevel() <= zerolog.TraceLevel {
logger.Trace().
Int("requested_buffer_size", cache.MaxPCMBufferSize).
Int("pcm_buffer_length", len(pcmBuffer)).
Int("pcm_buffer_capacity", cap(pcmBuffer)).
Msg("PCM buffer allocated from pool")
}
}
if totalFrames <= 5 || totalFrames%500 == 1 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if logger.GetLevel() <= zerolog.TraceLevel {
logger.Trace().
Int("opus_frame_size", dataLen).
Int("pcm_buffer_size", len(pcmBuffer)).
Int64("total_frames_processed", totalFrames).
Msg("Processing audio frame for USB Gadget output")
}
}
// Direct CGO call - avoid wrapper function overhead
start := time.Now()
framesWritten, err := CGOAudioDecodeWrite(data, pcmBuffer)
duration := time.Since(start)
// Log the result with detailed context
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if err != nil {
// Log error with detailed context for debugging
atomic.AddInt64(&ais.droppedFrames, 1)
// Get current statistics for context
total, success, failures, recovery, lastError, _ := GetAudioDecodeWriteStats()
successRate := float64(success) / float64(total) * 100
logger.Error().
Err(err).
Int("opus_frame_size", dataLen).
Dur("processing_duration", duration).
Int64("frames_written", int64(framesWritten)).
Int64("total_operations", total).
Int64("successful_operations", success).
Int64("failed_operations", failures).
Int64("recovery_attempts", recovery).
Float64("success_rate_percent", successRate).
Str("last_error", lastError).
Int64("total_frames_processed", totalFrames).
Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)).
Msg("Failed to decode/write audio frame to USB Gadget")
return err
}
// Log successful operations periodically to monitor health (zero-cost when trace disabled)
if (totalFrames <= 5 || totalFrames%1000 == 1) && logger.GetLevel() <= zerolog.TraceLevel {
// Get current statistics for context (only when trace is enabled)
total, success, failures, recovery, _, _ := GetAudioDecodeWriteStats()
successRate := float64(success) / float64(total) * 100
logger.Trace().
Int("opus_frame_size", dataLen).
Int64("frames_written", int64(framesWritten)).
Int64("total_operations", total).
Int64("successful_operations", success).
Int64("failed_operations", failures).
Int64("recovery_attempts", recovery).
Float64("success_rate_percent", successRate).
Int64("total_frames_processed", totalFrames).
Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)).
Msg("Successfully decoded/wrote audio frame to USB Gadget")
}
return err
}
// processConfig processes a configuration update
func (ais *AudioInputServer) processConfig(data []byte) error {
// Validate configuration data
if len(data) == 0 {
return fmt.Errorf("empty configuration data")
}
// Basic validation for configuration size
if err := ValidateBufferSize(len(data)); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Error().Err(err).Msg("Configuration buffer validation failed")
return fmt.Errorf("configuration validation failed: %w", err)
}
// Acknowledge configuration receipt
return ais.sendAck()
}
// processOpusConfig processes a complete Opus encoder configuration update
func (ais *AudioInputServer) processOpusConfig(data []byte) error {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
// Validate configuration data size (9 * int32 = 36 bytes)
if len(data) != 36 {
return fmt.Errorf("invalid Opus configuration data size: expected 36 bytes, got %d", len(data))
}
// Deserialize Opus configuration
config := UnifiedIPCOpusConfig{
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
Bitrate: int(binary.LittleEndian.Uint32(data[12:16])),
Complexity: int(binary.LittleEndian.Uint32(data[16:20])),
VBR: int(binary.LittleEndian.Uint32(data[20:24])),
SignalType: int(binary.LittleEndian.Uint32(data[24:28])),
Bandwidth: int(binary.LittleEndian.Uint32(data[28:32])),
DTX: int(binary.LittleEndian.Uint32(data[32:36])),
}
logger.Info().Interface("config", config).Msg("applying dynamic Opus encoder configuration")
// Note: We don't call CGOAudioInit() here as it would destroy and recreate the encoder,
// causing temporary unavailability. The encoder should already be initialized when
// the audio input server starts.
// Apply the Opus encoder configuration dynamically with retry logic
var err error
for attempt := 0; attempt < 3; attempt++ {
err = CGOUpdateOpusEncoderParams(
config.Bitrate,
config.Complexity,
config.VBR,
0, // VBR constraint - using default
config.SignalType,
config.Bandwidth,
config.DTX,
)
if err == nil {
break
}
logger.Warn().Err(err).Int("attempt", attempt+1).Msg("Failed to update Opus encoder parameters, retrying")
if attempt < 2 {
time.Sleep(time.Duration(attempt+1) * 50 * time.Millisecond)
}
}
if err != nil {
logger.Error().Err(err).Msg("failed to apply Opus encoder configuration after retries")
return fmt.Errorf("failed to apply Opus configuration: %w", err)
}
logger.Info().Msg("Opus encoder configuration applied successfully")
return ais.sendAck()
}
// sendAck sends an acknowledgment message
func (ais *AudioInputServer) sendAck() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.conn == nil {
return fmt.Errorf("no connection")
}
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: MessageTypeAck,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
return ais.writeMessage(ais.conn, msg)
}
// Global shared message pool for input IPC server
var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize)
// writeMessage writes a message to the connection using shared common utilities
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
// Use shared WriteIPCMessage function with global message pool
return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames)
}
// AudioInputClient handles IPC communication from the main process
type AudioInputClient struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
droppedFrames int64 // Atomic counter for dropped frames
totalFrames int64 // Atomic counter for total frames
conn net.Conn
mtx sync.Mutex
running bool
}
// NewAudioInputClient creates a new audio input client
func NewAudioInputClient() *AudioInputClient {
return &AudioInputClient{}
}
// Connect connects to the audio input server
func (aic *AudioInputClient) Connect() error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if aic.running {
return nil // Already connected
}
// Ensure clean state before connecting
if aic.conn != nil {
aic.conn.Close()
aic.conn = nil
}
socketPath := getInputSocketPath()
// Try connecting multiple times as the server might not be ready
// Reduced retry count and delay for faster startup
for i := 0; i < 10; i++ {
conn, err := net.Dial("unix", socketPath)
if err == nil {
aic.conn = conn
aic.running = true
// Reset frame counters on successful connection
atomic.StoreInt64(&aic.totalFrames, 0)
atomic.StoreInt64(&aic.droppedFrames, 0)
return nil
}
// Exponential backoff starting from config
backoffStart := Config.BackoffStart
delay := time.Duration(backoffStart.Nanoseconds()*(1<<uint(i/3))) * time.Nanosecond
maxDelay := Config.MaxRetryDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
}
// Ensure clean state on connection failure
aic.conn = nil
aic.running = false
return fmt.Errorf("failed to connect to audio input server after 10 attempts")
}
// Disconnect disconnects from the audio input server
func (aic *AudioInputClient) Disconnect() {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running {
return
}
aic.running = false
if aic.conn != nil {
// Send stop message
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: MessageTypeStop,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
_ = aic.writeMessage(msg) // Ignore errors during shutdown
aic.conn.Close()
aic.conn = nil
}
}
// SendFrame sends an Opus frame to the audio input server
func (aic *AudioInputClient) SendFrame(frame []byte) error {
// Fast path validation
if len(frame) == 0 {
return nil
}
aic.mtx.Lock()
if !aic.running || aic.conn == nil {
aic.mtx.Unlock()
return fmt.Errorf("not connected")
}
// Direct message creation without timestamp overhead
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: MessageTypeOpusFrame,
Length: uint32(len(frame)),
Data: frame,
}
err := aic.writeMessage(msg)
aic.mtx.Unlock()
return err
}
// SendFrameZeroCopy sends a zero-copy Opus frame to the audio input server
func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
if frame == nil {
return nil // Nil frame, ignore
}
frameLen := frame.Length()
if frameLen == 0 {
return nil // Empty frame, ignore
}
// Inline frame validation to reduce function call overhead
if frameLen > maxFrameSize {
return ErrFrameDataTooLarge
}
// Use zero-copy data directly
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: MessageTypeOpusFrame,
Length: uint32(frameLen),
Timestamp: time.Now().UnixNano(),
Data: frame.Data(), // Zero-copy data access
}
return aic.writeMessage(msg)
}
// SendConfig sends a configuration update to the audio input server
func (aic *AudioInputClient) SendConfig(config UnifiedIPCConfig) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
// Validate configuration parameters
if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
logger.Error().Err(err).Msg("Configuration validation failed")
return fmt.Errorf("input configuration validation failed: %w", err)
}
// Serialize config using common function
data := EncodeAudioConfig(config.SampleRate, config.Channels, config.FrameSize)
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: MessageTypeConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
}
return aic.writeMessage(msg)
}
// SendOpusConfig sends a complete Opus encoder configuration update to the audio input server
func (aic *AudioInputClient) SendOpusConfig(config UnifiedIPCOpusConfig) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
// Validate configuration parameters
if config.SampleRate <= 0 || config.Channels <= 0 || config.FrameSize <= 0 || config.Bitrate <= 0 {
return fmt.Errorf("invalid Opus configuration: SampleRate=%d, Channels=%d, FrameSize=%d, Bitrate=%d",
config.SampleRate, config.Channels, config.FrameSize, config.Bitrate)
}
// Serialize Opus configuration using common function
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: MessageTypeOpusConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
}
return aic.writeMessage(msg)
}
// SendHeartbeat sends a heartbeat message
func (aic *AudioInputClient) SendHeartbeat() error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected to audio input server")
}
msg := &UnifiedIPCMessage{
Magic: inputMagicNumber,
Type: MessageTypeHeartbeat,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
return aic.writeMessage(msg)
}
// writeMessage writes a message to the server
// Global shared message pool for input IPC clients
var globalInputMessagePool = NewGenericMessagePool(messagePoolSize)
func (aic *AudioInputClient) writeMessage(msg *UnifiedIPCMessage) error {
// Increment total frames counter
atomic.AddInt64(&aic.totalFrames, 1)
// Use shared WriteIPCMessage function with global message pool
return WriteIPCMessage(aic.conn, msg, globalInputMessagePool, &aic.droppedFrames)
}
// IsConnected returns whether the client is connected
func (aic *AudioInputClient) IsConnected() bool {
aic.mtx.Lock()
defer aic.mtx.Unlock()
return aic.running && aic.conn != nil
}
// GetFrameStats returns frame statistics
func (aic *AudioInputClient) GetFrameStats() (total, dropped int64) {
stats := GetFrameStats(&aic.totalFrames, &aic.droppedFrames)
return stats.Total, stats.Dropped
}
// GetDropRate returns the current frame drop rate as a percentage
func (aic *AudioInputClient) GetDropRate() float64 {
stats := GetFrameStats(&aic.totalFrames, &aic.droppedFrames)
return CalculateDropRate(stats)
}
// ResetStats resets frame statistics
func (aic *AudioInputClient) ResetStats() {
ResetFrameStats(&aic.totalFrames, &aic.droppedFrames)
}
// ResetServerStats resets server frame statistics
func (ais *AudioInputServer) ResetServerStats() {
atomic.StoreInt64(&ais.totalFrames, 0)
atomic.StoreInt64(&ais.droppedFrames, 0)
}
// RecoverFromDroppedFrames attempts to recover when too many frames are dropped
func (ais *AudioInputServer) RecoverFromDroppedFrames() {
total := atomic.LoadInt64(&ais.totalFrames)
dropped := atomic.LoadInt64(&ais.droppedFrames)
// If more than 50% of frames are dropped, attempt recovery
if total > 100 && dropped > total/2 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
logger.Warn().Int64("total", total).Int64("dropped", dropped).Msg("high drop rate detected, attempting recovery")
// Reset stats and update buffer size from adaptive manager
ais.ResetServerStats()
ais.UpdateBufferSize()
}
}
// startReaderGoroutine starts the message reader using the goroutine pool
func (ais *AudioInputServer) startReaderGoroutine() {
ais.wg.Add(1)
// Create a reader task that will run in the goroutine pool
readerTask := func() {
defer ais.wg.Done()
// Enhanced error tracking and recovery
var consecutiveErrors int
var lastErrorTime time.Time
maxConsecutiveErrors := Config.MaxConsecutiveErrors
errorResetWindow := Config.RestartWindow // Use existing restart window
baseBackoffDelay := Config.RetryDelay
maxBackoffDelay := Config.MaxRetryDelay
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
for ais.running {
ais.mtx.Lock()
conn := ais.conn
ais.mtx.Unlock()
if conn == nil {
time.Sleep(10 * time.Millisecond)
continue
}
msg, err := ais.readMessage(conn)
if err != nil {
if ais.running {
// Enhanced error handling with progressive backoff
now := time.Now()
// Reset error counter if enough time has passed
if now.Sub(lastErrorTime) > errorResetWindow {
consecutiveErrors = 0
}
consecutiveErrors++
lastErrorTime = now
// Skip logging in hotpath for performance - only log critical errors
// Progressive backoff based on error count
if consecutiveErrors > 1 {
backoffDelay := time.Duration(consecutiveErrors-1) * baseBackoffDelay
if backoffDelay > maxBackoffDelay {
backoffDelay = maxBackoffDelay
}
time.Sleep(backoffDelay)
}
// If too many consecutive errors, close connection to force reconnect
if consecutiveErrors >= maxConsecutiveErrors {
// Only log critical errors to reduce hotpath overhead
if logger.GetLevel() <= zerolog.ErrorLevel {
logger.Error().
Int("consecutive_errors", consecutiveErrors).
Msg("Too many consecutive read errors, closing connection")
}
ais.mtx.Lock()
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
ais.mtx.Unlock()
consecutiveErrors = 0 // Reset for next connection
}
}
continue
}
// Reset error counter on successful read
if consecutiveErrors > 0 {
consecutiveErrors = 0
// Only log recovery info if debug level enabled to reduce overhead
if logger.GetLevel() <= zerolog.InfoLevel {
logger.Info().Msg("Input connection recovered")
}
}
// Send to message channel with non-blocking write (use read lock for channel access)
ais.channelMutex.RLock()
messageChan := ais.messageChan
ais.channelMutex.RUnlock()
select {
case messageChan <- msg:
atomic.AddInt64(&ais.totalFrames, 1)
default:
// Channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
// Avoid sampling logic in critical path - only log if warn level enabled
if logger.GetLevel() <= zerolog.WarnLevel {
droppedCount := atomic.LoadInt64(&ais.droppedFrames)
logger.Warn().Int64("total_dropped", droppedCount).Msg("Message channel full, dropping frame")
}
}
}
}
// Handle the reader task directly
go readerTask()
}
// startProcessorGoroutine starts the message processor using the goroutine pool
func (ais *AudioInputServer) startProcessorGoroutine() {
ais.wg.Add(1)
// Create a processor task that will run in the goroutine pool
processorTask := func() {
// Only lock OS thread and set priority for high-load scenarios
// This reduces interference with input processing threads
config := Config
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// Priority scheduler not implemented - using default thread priority
}
// Create logger for this goroutine
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
// Enhanced error tracking for processing
var processingErrors int
var lastProcessingError time.Time
maxProcessingErrors := config.MaxConsecutiveErrors
errorResetWindow := config.RestartWindow
defer ais.wg.Done()
for {
select {
case <-ais.stopChan:
return
case msg := <-ais.getMessageChan():
// Process message with error handling
start := time.Now()
err := ais.processMessageWithRecovery(msg, logger)
processingTime := time.Since(start)
if err != nil {
// Track processing errors
now := time.Now()
if now.Sub(lastProcessingError) > errorResetWindow {
processingErrors = 0
}
processingErrors++
lastProcessingError = now
// Skip logging in hotpath for performance
// If too many processing errors, drop frames more aggressively
if processingErrors >= maxProcessingErrors {
// Clear processing queue to recover
processChan := ais.getProcessChan()
for len(processChan) > 0 {
select {
case <-processChan:
atomic.AddInt64(&ais.droppedFrames, 1)
default:
break
}
}
processingErrors = 0 // Reset after clearing queue
}
continue
}
// Reset error counter on successful processing
if processingErrors > 0 {
processingErrors = 0
// Skip logging in hotpath for performance
}
// Update processing time metrics
atomic.StoreInt64(&ais.processingTime, processingTime.Nanoseconds())
}
}
}
// Submit the processor task directly
go processorTask()
}
// processMessageWithRecovery processes a message with enhanced error recovery
func (ais *AudioInputServer) processMessageWithRecovery(msg *UnifiedIPCMessage, logger zerolog.Logger) error {
// Intelligent frame dropping: prioritize recent frames
if msg.Type == MessageTypeOpusFrame {
// Check if processing queue is getting full
processChan := ais.getProcessChan()
queueLen := len(processChan)
bufferSize := int(atomic.LoadInt64(&ais.bufferSize))
if queueLen > bufferSize*3/4 {
// Drop oldest frames, keep newest
select {
case <-processChan: // Remove oldest
atomic.AddInt64(&ais.droppedFrames, 1)
logger.Debug().Msg("Dropped oldest frame to make room")
default:
}
}
}
// Send to processing queue with timeout (use read lock for channel access)
ais.channelMutex.RLock()
processChan := ais.processChan
ais.channelMutex.RUnlock()
select {
case processChan <- msg:
return nil
case <-time.After(Config.WriteTimeout):
// Processing queue full and timeout reached, drop frame
atomic.AddInt64(&ais.droppedFrames, 1)
return fmt.Errorf("processing queue timeout")
default:
// Processing queue full, drop frame immediately
atomic.AddInt64(&ais.droppedFrames, 1)
return fmt.Errorf("processing queue full")
}
}
// startMonitorGoroutine starts the performance monitoring using the goroutine pool
func (ais *AudioInputServer) startMonitorGoroutine() {
ais.wg.Add(1)
// Create a monitor task that will run in the goroutine pool
monitorTask := func() {
// Monitor goroutine doesn't need thread locking for most scenarios
// Only use thread optimizations for high-throughput scenarios
config := Config
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
// Priority scheduler not implemented - using default thread priority
}
defer ais.wg.Done()
ticker := time.NewTicker(Config.DefaultTickerInterval)
defer ticker.Stop()
// Buffer size update ticker (less frequent)
bufferUpdateTicker := time.NewTicker(Config.BufferUpdateInterval)
defer bufferUpdateTicker.Stop()
for {
select {
case <-ais.stopChan:
return
case <-ticker.C:
// Process frames from processing queue
for {
select {
case msg := <-ais.getProcessChan():
start := time.Now()
err := ais.processMessage(msg)
processingTime := time.Since(start)
// Calculate end-to-end latency using message timestamp
var latency time.Duration
if msg.Type == MessageTypeOpusFrame && msg.Timestamp > 0 {
msgTime := time.Unix(0, msg.Timestamp)
latency = time.Since(msgTime)
// Use exponential moving average for end-to-end latency tracking
currentAvg := atomic.LoadInt64(&ais.processingTime)
// Weight: 90% historical, 10% current (for smoother averaging)
newAvg := (currentAvg*9 + latency.Nanoseconds()) / 10
atomic.StoreInt64(&ais.processingTime, newAvg)
} else {
// Fallback to processing time only
latency = processingTime
currentAvg := atomic.LoadInt64(&ais.processingTime)
newAvg := (currentAvg + processingTime.Nanoseconds()) / 2
atomic.StoreInt64(&ais.processingTime, newAvg)
}
if err != nil {
atomic.AddInt64(&ais.droppedFrames, 1)
}
default:
// No more messages to process
goto checkBufferUpdate
}
}
checkBufferUpdate:
// Check if we need to update buffer size
select {
case <-bufferUpdateTicker.C:
// Buffer size is now fixed from config
default:
// No buffer update needed
}
}
}
}
// Submit the monitor task directly
go monitorTask()
}
// GetServerStats returns server performance statistics
func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessingTime time.Duration, bufferSize int64) {
return atomic.LoadInt64(&ais.totalFrames),
atomic.LoadInt64(&ais.droppedFrames),
time.Duration(atomic.LoadInt64(&ais.processingTime)),
atomic.LoadInt64(&ais.bufferSize)
}
// UpdateBufferSize updates the buffer size (now using fixed values)
func (ais *AudioInputServer) UpdateBufferSize() {
// Buffer size is now fixed at 512 frames for stability
newSize := int64(512)
atomic.StoreInt64(&ais.bufferSize, newSize)
}
// GetMessagePoolStats returns detailed statistics about the message pool
func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats {
mp.mutex.RLock()
preallocatedCount := len(mp.preallocated)
mp.mutex.RUnlock()
hitCount := atomic.LoadInt64(&mp.hitCount)
missCount := atomic.LoadInt64(&mp.missCount)
totalRequests := hitCount + missCount
var hitRate float64
if totalRequests > 0 {
hitRate = float64(hitCount) / float64(totalRequests) * Config.PercentageMultiplier
}
// Calculate channel pool size
channelPoolSize := len(mp.pool)
return MessagePoolStats{
MaxPoolSize: mp.maxPoolSize,
ChannelPoolSize: channelPoolSize,
PreallocatedCount: int64(preallocatedCount),
PreallocatedMax: int64(mp.preallocSize),
HitCount: hitCount,
MissCount: missCount,
HitRate: hitRate,
}
}
// MessagePoolStats provides detailed message pool statistics
type MessagePoolStats struct {
MaxPoolSize int
ChannelPoolSize int
PreallocatedCount int64
PreallocatedMax int64
HitCount int64
MissCount int64
HitRate float64 // Percentage
}
// GetGlobalMessagePoolStats returns statistics for the global message pool
func GetGlobalMessagePoolStats() MessagePoolStats {
return globalMessagePool.GetMessagePoolStats()
}
// getMessageChan safely returns the current message channel
func (ais *AudioInputServer) getMessageChan() chan *UnifiedIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.messageChan
}
// getProcessChan safely returns the current process channel
func (ais *AudioInputServer) getProcessChan() chan *UnifiedIPCMessage {
ais.channelMutex.RLock()
defer ais.channelMutex.RUnlock()
return ais.processChan
}
// Helper functions
// getInputSocketPath is now defined in unified_ipc.go