diff --git a/internal/audio/input_supervisor.go b/internal/audio/input_supervisor.go index 94d1e370..59cddbf0 100644 --- a/internal/audio/input_supervisor.go +++ b/internal/audio/input_supervisor.go @@ -230,7 +230,7 @@ func (ais *AudioInputSupervisor) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) er } // SendConfig sends a configuration update to the subprocess (convenience method) -func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error { +func (ais *AudioInputSupervisor) SendConfig(config UnifiedIPCConfig) error { if ais.client == nil { return fmt.Errorf("client not initialized") } @@ -243,7 +243,7 @@ func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error { } // SendOpusConfig sends a complete Opus encoder configuration to the audio input server -func (ais *AudioInputSupervisor) SendOpusConfig(config InputIPCOpusConfig) error { +func (ais *AudioInputSupervisor) SendOpusConfig(config UnifiedIPCOpusConfig) error { if ais.client == nil { return fmt.Errorf("client not initialized") } diff --git a/internal/audio/ipc_common.go b/internal/audio/ipc_common.go index 6e35a1d8..d828129c 100644 --- a/internal/audio/ipc_common.go +++ b/internal/audio/ipc_common.go @@ -134,14 +134,12 @@ func (mp *GenericMessagePool) GetStats() (hitCount, missCount int64, hitRate flo // Helper functions -// EncodeMessageHeader encodes a message header into a byte slice -func EncodeMessageHeader(magic uint32, msgType uint8, length uint32, timestamp int64) []byte { - header := make([]byte, 17) +// EncodeMessageHeader encodes a message header into a provided byte slice +func EncodeMessageHeader(header []byte, magic uint32, msgType uint8, length uint32, timestamp int64) { binary.LittleEndian.PutUint32(header[0:4], magic) header[4] = msgType binary.LittleEndian.PutUint32(header[5:9], length) binary.LittleEndian.PutUint64(header[9:17], uint64(timestamp)) - return header } // EncodeAudioConfig encodes basic audio configuration to binary format @@ -179,14 +177,12 @@ func WriteIPCMessage(conn net.Conn, msg IPCMessage, pool *GenericMessagePool, dr defer pool.Put(optMsg) // Prepare header in pre-allocated buffer - header := EncodeMessageHeader(msg.GetMagic(), msg.GetType(), msg.GetLength(), msg.GetTimestamp()) - copy(optMsg.header[:], header) + EncodeMessageHeader(optMsg.header[:], msg.GetMagic(), msg.GetType(), msg.GetLength(), msg.GetTimestamp()) // Set write deadline for timeout handling (more efficient than goroutines) if deadline := time.Now().Add(Config.WriteTimeout); deadline.After(time.Now()) { if err := conn.SetWriteDeadline(deadline); err != nil { // If we can't set deadline, proceed without it - // This maintains compatibility with connections that don't support deadlines _ = err // Explicitly ignore error for linter } } diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 2893051e..bbee28df 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -27,27 +27,11 @@ var ( messagePoolSize = Config.MessagePoolSize // Pre-allocated message pool size ) -// Legacy aliases for backward compatibility -type InputMessageType = UnifiedMessageType -type InputIPCMessage = UnifiedIPCMessage - -// Legacy constants for backward compatibility -const ( - InputMessageTypeOpusFrame = MessageTypeOpusFrame - InputMessageTypeConfig = MessageTypeConfig - InputMessageTypeOpusConfig = MessageTypeOpusConfig - InputMessageTypeStop = MessageTypeStop - InputMessageTypeHeartbeat = MessageTypeHeartbeat - InputMessageTypeAck = MessageTypeAck -) - -// Methods are now inherited from UnifiedIPCMessage - // OptimizedIPCMessage represents an optimized message with pre-allocated buffers type OptimizedIPCMessage struct { - header [17]byte // Pre-allocated header buffer (headerSize = 17) - data []byte // Reusable data buffer - msg InputIPCMessage // Embedded message + header [17]byte // Pre-allocated header buffer (headerSize = 17) + data []byte // Reusable data buffer + msg UnifiedIPCMessage // Embedded message } // MessagePool manages a pool of reusable messages to reduce allocations @@ -109,7 +93,7 @@ func (mp *MessagePool) Get() *OptimizedIPCMessage { atomic.AddInt64(&mp.hitCount, 1) // Reset message for reuse msg.data = msg.data[:0] - msg.msg = InputIPCMessage{} + msg.msg = UnifiedIPCMessage{} return msg } mp.mutex.Unlock() @@ -120,7 +104,7 @@ func (mp *MessagePool) Get() *OptimizedIPCMessage { atomic.AddInt64(&mp.hitCount, 1) // Reset message for reuse and ensure proper capacity msg.data = msg.data[:0] - msg.msg = InputIPCMessage{} + msg.msg = UnifiedIPCMessage{} // Ensure data buffer has sufficient capacity if cap(msg.data) < maxFrameSize { msg.data = make([]byte, 0, maxFrameSize) @@ -148,7 +132,7 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) { // Reset the message for reuse msg.data = msg.data[:0] - msg.msg = InputIPCMessage{} + msg.msg = UnifiedIPCMessage{} // First try to return to pre-allocated pool for fastest reuse mp.mutex.Lock() @@ -168,10 +152,6 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) { } } -// Legacy aliases for backward compatibility -type InputIPCConfig = UnifiedIPCConfig -type InputIPCOpusConfig = UnifiedIPCOpusConfig - // AudioInputServer handles IPC communication for audio input processing type AudioInputServer struct { // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) @@ -186,10 +166,10 @@ type AudioInputServer struct { running bool // Triple-goroutine architecture - messageChan chan *InputIPCMessage // Buffered channel for incoming messages - processChan chan *InputIPCMessage // Buffered channel for processing queue - stopChan chan struct{} // Stop signal for all goroutines - wg sync.WaitGroup // Wait group for goroutine coordination + messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages + processChan chan *UnifiedIPCMessage // Buffered channel for processing queue + stopChan chan struct{} // Stop signal for all goroutines + wg sync.WaitGroup // Wait group for goroutine coordination // Channel resizing support channelMutex sync.RWMutex // Protects channel recreation @@ -246,8 +226,8 @@ func NewAudioInputServer() (*AudioInputServer, error) { return &AudioInputServer{ listener: listener, - messageChan: make(chan *InputIPCMessage, initialBufferSize), - processChan: make(chan *InputIPCMessage, initialBufferSize), + messageChan: make(chan *UnifiedIPCMessage, initialBufferSize), + processChan: make(chan *UnifiedIPCMessage, initialBufferSize), stopChan: make(chan struct{}), bufferSize: initialBufferSize, lastBufferSize: initialBufferSize, @@ -405,7 +385,7 @@ func (ais *AudioInputServer) handleConnection(conn net.Conn) { // // The function uses pooled buffers for efficient memory management and // ensures all messages conform to the JetKVM audio protocol specification. -func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error) { +func (ais *AudioInputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) { // Get optimized message from pool optMsg := globalMessagePool.Get() defer globalMessagePool.Put(optMsg) @@ -419,7 +399,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error // Parse header using optimized access msg := &optMsg.msg msg.Magic = binary.LittleEndian.Uint32(optMsg.header[0:4]) - msg.Type = InputMessageType(optMsg.header[4]) + msg.Type = UnifiedMessageType(optMsg.header[4]) msg.Length = binary.LittleEndian.Uint32(optMsg.header[5:9]) msg.Timestamp = int64(binary.LittleEndian.Uint64(optMsg.header[9:17])) @@ -450,7 +430,7 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error } // Return a copy of the message (data will be copied by caller if needed) - result := &InputIPCMessage{ + result := &UnifiedIPCMessage{ Magic: msg.Magic, Type: msg.Type, Length: msg.Length, @@ -467,17 +447,17 @@ func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error } // processMessage processes a received message -func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error { +func (ais *AudioInputServer) processMessage(msg *UnifiedIPCMessage) error { switch msg.Type { - case InputMessageTypeOpusFrame: + case MessageTypeOpusFrame: return ais.processOpusFrame(msg.Data) - case InputMessageTypeConfig: + case MessageTypeConfig: return ais.processConfig(msg.Data) - case InputMessageTypeOpusConfig: + case MessageTypeOpusConfig: return ais.processOpusConfig(msg.Data) - case InputMessageTypeStop: + case MessageTypeStop: return fmt.Errorf("stop message received") - case InputMessageTypeHeartbeat: + case MessageTypeHeartbeat: return ais.sendAck() default: return fmt.Errorf("unknown message type: %d", msg.Type) @@ -538,7 +518,7 @@ func (ais *AudioInputServer) processOpusConfig(data []byte) error { } // Deserialize Opus configuration - config := InputIPCOpusConfig{ + config := UnifiedIPCOpusConfig{ SampleRate: int(binary.LittleEndian.Uint32(data[0:4])), Channels: int(binary.LittleEndian.Uint32(data[4:8])), FrameSize: int(binary.LittleEndian.Uint32(data[8:12])), @@ -581,9 +561,9 @@ func (ais *AudioInputServer) sendAck() error { return fmt.Errorf("no connection") } - msg := &InputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: inputMagicNumber, - Type: InputMessageTypeAck, + Type: MessageTypeAck, Length: 0, Timestamp: time.Now().UnixNano(), } @@ -595,7 +575,7 @@ func (ais *AudioInputServer) sendAck() error { var globalInputServerMessagePool = NewGenericMessagePool(messagePoolSize) // writeMessage writes a message to the connection using shared common utilities -func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error { +func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { // Use shared WriteIPCMessage function with global message pool return WriteIPCMessage(conn, msg, globalInputServerMessagePool, &ais.droppedFrames) } @@ -673,9 +653,9 @@ func (aic *AudioInputClient) Disconnect() { if aic.conn != nil { // Send stop message - msg := &InputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: inputMagicNumber, - Type: InputMessageTypeStop, + Type: MessageTypeStop, Length: 0, Timestamp: time.Now().UnixNano(), } @@ -700,9 +680,9 @@ func (aic *AudioInputClient) SendFrame(frame []byte) error { } // Direct message creation without timestamp overhead - msg := &InputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: inputMagicNumber, - Type: InputMessageTypeOpusFrame, + Type: MessageTypeOpusFrame, Length: uint32(len(frame)), Data: frame, } @@ -736,9 +716,9 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error } // Use zero-copy data directly - msg := &InputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: inputMagicNumber, - Type: InputMessageTypeOpusFrame, + Type: MessageTypeOpusFrame, Length: uint32(frameLen), Timestamp: time.Now().UnixNano(), Data: frame.Data(), // Zero-copy data access @@ -748,7 +728,7 @@ func (aic *AudioInputClient) SendFrameZeroCopy(frame *ZeroCopyAudioFrame) error } // SendConfig sends a configuration update to the audio input server -func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error { +func (aic *AudioInputClient) SendConfig(config UnifiedIPCConfig) error { aic.mtx.Lock() defer aic.mtx.Unlock() @@ -766,9 +746,9 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error { // Serialize config using common function data := EncodeAudioConfig(config.SampleRate, config.Channels, config.FrameSize) - msg := &InputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: inputMagicNumber, - Type: InputMessageTypeConfig, + Type: MessageTypeConfig, Length: uint32(len(data)), Timestamp: time.Now().UnixNano(), Data: data, @@ -778,7 +758,7 @@ func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error { } // SendOpusConfig sends a complete Opus encoder configuration update to the audio input server -func (aic *AudioInputClient) SendOpusConfig(config InputIPCOpusConfig) error { +func (aic *AudioInputClient) SendOpusConfig(config UnifiedIPCOpusConfig) error { aic.mtx.Lock() defer aic.mtx.Unlock() @@ -795,9 +775,9 @@ func (aic *AudioInputClient) SendOpusConfig(config InputIPCOpusConfig) error { // Serialize Opus configuration using common function data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX) - msg := &InputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: inputMagicNumber, - Type: InputMessageTypeOpusConfig, + Type: MessageTypeOpusConfig, Length: uint32(len(data)), Timestamp: time.Now().UnixNano(), Data: data, @@ -815,9 +795,9 @@ func (aic *AudioInputClient) SendHeartbeat() error { return fmt.Errorf("not connected to audio input server") } - msg := &InputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: inputMagicNumber, - Type: InputMessageTypeHeartbeat, + Type: MessageTypeHeartbeat, Length: 0, Timestamp: time.Now().UnixNano(), } @@ -829,7 +809,7 @@ func (aic *AudioInputClient) SendHeartbeat() error { // Global shared message pool for input IPC clients var globalInputMessagePool = NewGenericMessagePool(messagePoolSize) -func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error { +func (aic *AudioInputClient) writeMessage(msg *UnifiedIPCMessage) error { // Increment total frames counter atomic.AddInt64(&aic.totalFrames, 1) @@ -1093,9 +1073,9 @@ func (ais *AudioInputServer) startProcessorGoroutine() { } // processMessageWithRecovery processes a message with enhanced error recovery -func (ais *AudioInputServer) processMessageWithRecovery(msg *InputIPCMessage, logger zerolog.Logger) error { +func (ais *AudioInputServer) processMessageWithRecovery(msg *UnifiedIPCMessage, logger zerolog.Logger) error { // Intelligent frame dropping: prioritize recent frames - if msg.Type == InputMessageTypeOpusFrame { + if msg.Type == MessageTypeOpusFrame { // Check if processing queue is getting full processChan := ais.getProcessChan() queueLen := len(processChan) @@ -1172,7 +1152,7 @@ func (ais *AudioInputServer) startMonitorGoroutine() { // Calculate end-to-end latency using message timestamp var latency time.Duration - if msg.Type == InputMessageTypeOpusFrame && msg.Timestamp > 0 { + if msg.Type == MessageTypeOpusFrame && msg.Timestamp > 0 { msgTime := time.Unix(0, msg.Timestamp) latency = time.Since(msgTime) // Use exponential moving average for end-to-end latency tracking @@ -1291,14 +1271,14 @@ func GetGlobalMessagePoolStats() MessagePoolStats { } // getMessageChan safely returns the current message channel -func (ais *AudioInputServer) getMessageChan() chan *InputIPCMessage { +func (ais *AudioInputServer) getMessageChan() chan *UnifiedIPCMessage { ais.channelMutex.RLock() defer ais.channelMutex.RUnlock() return ais.messageChan } // getProcessChan safely returns the current process channel -func (ais *AudioInputServer) getProcessChan() chan *InputIPCMessage { +func (ais *AudioInputServer) getProcessChan() chan *UnifiedIPCMessage { ais.channelMutex.RLock() defer ais.channelMutex.RUnlock() return ais.processChan diff --git a/internal/audio/ipc_output.go b/internal/audio/ipc_output.go index 081332d4..fbe2bcb5 100644 --- a/internal/audio/ipc_output.go +++ b/internal/audio/ipc_output.go @@ -13,24 +13,6 @@ import ( "github.com/rs/zerolog" ) -// Legacy aliases for backward compatibility -type OutputIPCConfig = UnifiedIPCConfig -type OutputIPCOpusConfig = UnifiedIPCOpusConfig -type OutputMessageType = UnifiedMessageType -type OutputIPCMessage = UnifiedIPCMessage - -// Legacy constants for backward compatibility -const ( - OutputMessageTypeOpusFrame = MessageTypeOpusFrame - OutputMessageTypeConfig = MessageTypeConfig - OutputMessageTypeOpusConfig = MessageTypeOpusConfig - OutputMessageTypeStop = MessageTypeStop - OutputMessageTypeHeartbeat = MessageTypeHeartbeat - OutputMessageTypeAck = MessageTypeAck -) - -// Methods are now inherited from UnifiedIPCMessage - // Global shared message pool for output IPC client header reading var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize) @@ -48,9 +30,9 @@ type AudioOutputServer struct { logger zerolog.Logger // Message channels - messageChan chan *OutputIPCMessage // Buffered channel for incoming messages - processChan chan *OutputIPCMessage // Buffered channel for processing queue - wg sync.WaitGroup // Wait group for goroutine coordination + messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages + processChan chan *UnifiedIPCMessage // Buffered channel for processing queue + wg sync.WaitGroup // Wait group for goroutine coordination // Configuration socketPath string @@ -65,8 +47,8 @@ func NewAudioOutputServer() (*AudioOutputServer, error) { socketPath: socketPath, magicNumber: Config.OutputMagicNumber, logger: logger, - messageChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize), - processChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize), + messageChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), + processChan: make(chan *UnifiedIPCMessage, Config.ChannelBufferSize), } return server, nil @@ -112,6 +94,7 @@ func (s *AudioOutputServer) Stop() { if s.listener != nil { s.listener.Close() + s.listener = nil } if s.conn != nil { @@ -171,7 +154,7 @@ func (s *AudioOutputServer) handleConnection(conn net.Conn) { } // readMessage reads a message from the connection -func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error) { +func (s *AudioOutputServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) { header := make([]byte, 17) if _, err := io.ReadFull(conn, header); err != nil { return nil, fmt.Errorf("failed to read header: %w", err) @@ -182,7 +165,7 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic) } - msgType := OutputMessageType(header[4]) + msgType := UnifiedMessageType(header[4]) length := binary.LittleEndian.Uint32(header[5:9]) timestamp := int64(binary.LittleEndian.Uint64(header[9:17])) @@ -194,7 +177,7 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error } } - return &OutputIPCMessage{ + return &UnifiedIPCMessage{ Magic: magic, Type: msgType, Length: length, @@ -204,14 +187,14 @@ func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error } // processMessage processes a received message -func (s *AudioOutputServer) processMessage(msg *OutputIPCMessage) error { +func (s *AudioOutputServer) processMessage(msg *UnifiedIPCMessage) error { switch msg.Type { - case OutputMessageTypeOpusConfig: + case MessageTypeOpusConfig: return s.processOpusConfig(msg.Data) - case OutputMessageTypeStop: + case MessageTypeStop: s.logger.Info().Msg("Received stop message") return nil - case OutputMessageTypeHeartbeat: + case MessageTypeHeartbeat: s.logger.Debug().Msg("Received heartbeat") return nil default: @@ -228,7 +211,7 @@ func (s *AudioOutputServer) processOpusConfig(data []byte) error { } // Decode Opus configuration - config := OutputIPCOpusConfig{ + config := UnifiedIPCOpusConfig{ SampleRate: int(binary.LittleEndian.Uint32(data[0:4])), Channels: int(binary.LittleEndian.Uint32(data[4:8])), FrameSize: int(binary.LittleEndian.Uint32(data[8:12])), @@ -282,9 +265,9 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error { return fmt.Errorf("no client connected") } - msg := &OutputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: s.magicNumber, - Type: OutputMessageTypeOpusFrame, + Type: MessageTypeOpusFrame, Length: uint32(len(frame)), Timestamp: time.Now().UnixNano(), Data: frame, @@ -294,8 +277,9 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error { } // writeMessage writes a message to the connection -func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *OutputIPCMessage) error { - header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) +func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { + header := make([]byte, 17) + EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) if _, err := conn.Write(header); err != nil { return fmt.Errorf("failed to write header: %w", err) @@ -415,8 +399,8 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { return nil, fmt.Errorf("invalid magic number in IPC message: got 0x%x, expected 0x%x", magic, outputMagicNumber) } - msgType := OutputMessageType(optMsg.header[4]) - if msgType != OutputMessageTypeOpusFrame { + msgType := UnifiedMessageType(optMsg.header[4]) + if msgType != MessageTypeOpusFrame { return nil, fmt.Errorf("unexpected message type: %d", msgType) } @@ -443,7 +427,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { } // SendOpusConfig sends Opus configuration to the audio output server -func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error { +func (c *AudioOutputClient) SendOpusConfig(config UnifiedIPCOpusConfig) error { c.mtx.Lock() defer c.mtx.Unlock() @@ -460,9 +444,9 @@ func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error { // Serialize Opus configuration using common function data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX) - msg := &OutputIPCMessage{ + msg := &UnifiedIPCMessage{ Magic: c.magicNumber, - Type: OutputMessageTypeOpusConfig, + Type: MessageTypeOpusConfig, Length: uint32(len(data)), Timestamp: time.Now().UnixNano(), Data: data, @@ -472,8 +456,9 @@ func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error { } // writeMessage writes a message to the connection -func (c *AudioOutputClient) writeMessage(msg *OutputIPCMessage) error { - header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) +func (c *AudioOutputClient) writeMessage(msg *UnifiedIPCMessage) error { + header := make([]byte, 17) + EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) if _, err := c.conn.Write(header); err != nil { return fmt.Errorf("failed to write header: %w", err) diff --git a/internal/audio/ipc_unified.go b/internal/audio/ipc_unified.go index 79315560..9024863b 100644 --- a/internal/audio/ipc_unified.go +++ b/internal/audio/ipc_unified.go @@ -389,7 +389,8 @@ func (s *UnifiedAudioServer) SendFrame(frame []byte) error { // writeMessage writes a message to the connection func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error { - header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) + header := make([]byte, 17) + EncodeMessageHeader(header, msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) // Optimize: Use single write for header+data to reduce system calls if msg.Length > 0 && msg.Data != nil { diff --git a/internal/audio/mgmt_input_ipc_manager.go b/internal/audio/mgmt_input_ipc_manager.go index b81e42e2..acfdd89c 100644 --- a/internal/audio/mgmt_input_ipc_manager.go +++ b/internal/audio/mgmt_input_ipc_manager.go @@ -62,7 +62,7 @@ func (aim *AudioInputIPCManager) Start() error { return err } - config := InputIPCConfig{ + config := UnifiedIPCConfig{ SampleRate: Config.InputIPCSampleRate, Channels: Config.InputIPCChannels, FrameSize: Config.InputIPCFrameSize, @@ -72,7 +72,7 @@ func (aim *AudioInputIPCManager) Start() error { if err := ValidateInputIPCConfig(config.SampleRate, config.Channels, config.FrameSize); err != nil { aim.logger.Warn().Err(err).Msg("invalid input IPC config from constants, using defaults") // Use safe defaults if config validation fails - config = InputIPCConfig{ + config = UnifiedIPCConfig{ SampleRate: 48000, Channels: 2, FrameSize: 960, diff --git a/internal/audio/mgmt_output_ipc_manager.go b/internal/audio/mgmt_output_ipc_manager.go index 145c1c20..285c10df 100644 --- a/internal/audio/mgmt_output_ipc_manager.go +++ b/internal/audio/mgmt_output_ipc_manager.go @@ -56,7 +56,7 @@ func (aom *AudioOutputIPCManager) Start() error { aom.logComponentStarted(AudioOutputIPCComponent) // Send initial configuration - config := OutputIPCConfig{ + config := UnifiedIPCConfig{ SampleRate: Config.SampleRate, Channels: Config.Channels, FrameSize: int(Config.AudioQualityMediumFrameSize.Milliseconds()), @@ -202,7 +202,7 @@ func (aom *AudioOutputIPCManager) calculateFrameRate() float64 { } // SendConfig sends configuration to the IPC server -func (aom *AudioOutputIPCManager) SendConfig(config OutputIPCConfig) error { +func (aom *AudioOutputIPCManager) SendConfig(config UnifiedIPCConfig) error { if aom.server == nil { return fmt.Errorf("audio output server not initialized") } diff --git a/internal/audio/output_supervisor.go b/internal/audio/output_supervisor.go index 46267620..9da939e5 100644 --- a/internal/audio/output_supervisor.go +++ b/internal/audio/output_supervisor.go @@ -318,7 +318,7 @@ func (s *AudioOutputSupervisor) connectClient() { } // SendOpusConfig sends Opus configuration to the audio output subprocess -func (s *AudioOutputSupervisor) SendOpusConfig(config OutputIPCOpusConfig) error { +func (aos *AudioOutputSupervisor) SendOpusConfig(config UnifiedIPCOpusConfig) error { if outputClient == nil { return fmt.Errorf("client not initialized") } diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 0b495413..60d8a994 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -214,8 +214,8 @@ func SetAudioQuality(quality AudioQuality) { // Send dynamic configuration update to running subprocess via IPC if supervisor.IsConnected() { - // Convert AudioConfig to OutputIPCOpusConfig with complete Opus parameters - opusConfig := OutputIPCOpusConfig{ + // Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters + opusConfig := UnifiedIPCOpusConfig{ SampleRate: config.SampleRate, Channels: config.Channels, FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples @@ -311,8 +311,8 @@ func SetMicrophoneQuality(quality AudioQuality) { // Send dynamic configuration update to running subprocess via IPC if supervisor.IsConnected() { - // Convert AudioConfig to InputIPCOpusConfig with complete Opus parameters - opusConfig := InputIPCOpusConfig{ + // Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters + opusConfig := UnifiedIPCOpusConfig{ SampleRate: config.SampleRate, Channels: config.Channels, FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples