From 3c6184d0e8920e2306ecb89d5f3e55531cefe477 Mon Sep 17 00:00:00 2001 From: Alex P Date: Tue, 9 Sep 2025 07:44:37 +0000 Subject: [PATCH] [Milestone] Improvement: In-flight audio output quality update --- internal/audio/cgo_audio.go | 8 +- internal/audio/ipc_output.go | 187 ++++++++++++++++++++++++++-- internal/audio/output_supervisor.go | 46 +++++++ internal/audio/quality_presets.go | 61 +++++---- 4 files changed, 265 insertions(+), 37 deletions(-) diff --git a/internal/audio/cgo_audio.go b/internal/audio/cgo_audio.go index 2e6fd45d..1590184a 100644 --- a/internal/audio/cgo_audio.go +++ b/internal/audio/cgo_audio.go @@ -87,10 +87,10 @@ static volatile int playback_initialized = 0; // Function to dynamically update Opus encoder parameters int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_constraint, int signal_type, int bandwidth, int dtx) { - // This function is specifically for audio OUTPUT encoder parameters - // Only require playback initialization for audio output quality changes - if (!encoder || !playback_initialized) { - return -1; // Audio output encoder not initialized + // This function works for both audio input and output encoder parameters + // Require either capture (output) or playback (input) initialization + if (!encoder || (!capture_initialized && !playback_initialized)) { + return -1; // Audio encoder not initialized } // Update the static variables diff --git a/internal/audio/ipc_output.go b/internal/audio/ipc_output.go index 02ed33e3..081332d4 100644 --- a/internal/audio/ipc_output.go +++ b/internal/audio/ipc_output.go @@ -15,16 +15,18 @@ import ( // Legacy aliases for backward compatibility type OutputIPCConfig = UnifiedIPCConfig +type OutputIPCOpusConfig = UnifiedIPCOpusConfig type OutputMessageType = UnifiedMessageType type OutputIPCMessage = UnifiedIPCMessage // Legacy constants for backward compatibility const ( - OutputMessageTypeOpusFrame = MessageTypeOpusFrame - OutputMessageTypeConfig = MessageTypeConfig - OutputMessageTypeStop = MessageTypeStop - OutputMessageTypeHeartbeat = MessageTypeHeartbeat - OutputMessageTypeAck = MessageTypeAck + OutputMessageTypeOpusFrame = MessageTypeOpusFrame + OutputMessageTypeConfig = MessageTypeConfig + OutputMessageTypeOpusConfig = MessageTypeOpusConfig + OutputMessageTypeStop = MessageTypeStop + OutputMessageTypeHeartbeat = MessageTypeHeartbeat + OutputMessageTypeAck = MessageTypeAck ) // Methods are now inherited from UnifiedIPCMessage @@ -142,14 +144,134 @@ func (s *AudioOutputServer) acceptConnections() { s.mtx.Unlock() s.logger.Info().Msg("Client connected to audio output server") - // Only handle one connection at a time for simplicity - for s.running && s.conn != nil { - // Keep connection alive until stopped or disconnected - time.Sleep(100 * time.Millisecond) + // Start message processing for this connection + s.wg.Add(1) + go s.handleConnection(conn) + } +} + +// handleConnection processes messages from a client connection +func (s *AudioOutputServer) handleConnection(conn net.Conn) { + defer s.wg.Done() + defer conn.Close() + + for s.running { + msg, err := s.readMessage(conn) + if err != nil { + if s.running { + s.logger.Error().Err(err).Msg("Failed to read message from client") + } + return + } + + if err := s.processMessage(msg); err != nil { + s.logger.Error().Err(err).Msg("Failed to process message") } } } +// readMessage reads a message from the connection +func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error) { + header := make([]byte, 17) + if _, err := io.ReadFull(conn, header); err != nil { + return nil, fmt.Errorf("failed to read header: %w", err) + } + + magic := binary.LittleEndian.Uint32(header[0:4]) + if magic != s.magicNumber { + return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic) + } + + msgType := OutputMessageType(header[4]) + length := binary.LittleEndian.Uint32(header[5:9]) + timestamp := int64(binary.LittleEndian.Uint64(header[9:17])) + + var data []byte + if length > 0 { + data = make([]byte, length) + if _, err := io.ReadFull(conn, data); err != nil { + return nil, fmt.Errorf("failed to read data: %w", err) + } + } + + return &OutputIPCMessage{ + Magic: magic, + Type: msgType, + Length: length, + Timestamp: timestamp, + Data: data, + }, nil +} + +// processMessage processes a received message +func (s *AudioOutputServer) processMessage(msg *OutputIPCMessage) error { + switch msg.Type { + case OutputMessageTypeOpusConfig: + return s.processOpusConfig(msg.Data) + case OutputMessageTypeStop: + s.logger.Info().Msg("Received stop message") + return nil + case OutputMessageTypeHeartbeat: + s.logger.Debug().Msg("Received heartbeat") + return nil + default: + s.logger.Warn().Int("type", int(msg.Type)).Msg("Unknown message type") + return nil + } +} + +// processOpusConfig processes Opus configuration updates +func (s *AudioOutputServer) processOpusConfig(data []byte) error { + // Validate configuration data size (9 * int32 = 36 bytes) + if len(data) != 36 { + return fmt.Errorf("invalid Opus configuration data size: expected 36 bytes, got %d", len(data)) + } + + // Decode Opus configuration + config := OutputIPCOpusConfig{ + SampleRate: int(binary.LittleEndian.Uint32(data[0:4])), + Channels: int(binary.LittleEndian.Uint32(data[4:8])), + FrameSize: int(binary.LittleEndian.Uint32(data[8:12])), + Bitrate: int(binary.LittleEndian.Uint32(data[12:16])), + Complexity: int(binary.LittleEndian.Uint32(data[16:20])), + VBR: int(binary.LittleEndian.Uint32(data[20:24])), + SignalType: int(binary.LittleEndian.Uint32(data[24:28])), + Bandwidth: int(binary.LittleEndian.Uint32(data[28:32])), + DTX: int(binary.LittleEndian.Uint32(data[32:36])), + } + + s.logger.Info().Interface("config", config).Msg("Received Opus configuration update") + + // Ensure we're running in the audio server subprocess + if !isAudioServerProcess() { + s.logger.Warn().Msg("Opus configuration update ignored - not running in audio server subprocess") + return nil + } + + // Check if audio output streaming is currently active + if atomic.LoadInt32(&outputStreamingRunning) == 0 { + s.logger.Info().Msg("Audio output streaming not active, configuration will be applied when streaming starts") + return nil + } + + // Ensure capture is initialized before updating encoder parameters + // The C function requires both encoder and capture_initialized to be true + if err := cgoAudioInit(); err != nil { + s.logger.Debug().Err(err).Msg("Audio capture already initialized or initialization failed") + // Continue anyway - capture may already be initialized + } + + // Apply configuration using CGO function (only if audio system is running) + vbrConstraint := Config.CGOOpusVBRConstraint + if err := updateOpusEncoderParams(config.Bitrate, config.Complexity, config.VBR, vbrConstraint, config.SignalType, config.Bandwidth, config.DTX); err != nil { + s.logger.Error().Err(err).Msg("Failed to update Opus encoder parameters - encoder may not be initialized") + return err + } + + s.logger.Info().Msg("Opus encoder parameters updated successfully") + return nil +} + // SendFrame sends an audio frame to the client func (s *AudioOutputServer) SendFrame(frame []byte) error { s.mtx.Lock() @@ -320,6 +442,53 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { return frame, nil } +// SendOpusConfig sends Opus configuration to the audio output server +func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error { + c.mtx.Lock() + defer c.mtx.Unlock() + + if !c.running || c.conn == nil { + return fmt.Errorf("not connected to audio output server") + } + + // Validate configuration parameters + if config.SampleRate <= 0 || config.Channels <= 0 || config.FrameSize <= 0 || config.Bitrate <= 0 { + return fmt.Errorf("invalid Opus configuration: SampleRate=%d, Channels=%d, FrameSize=%d, Bitrate=%d", + config.SampleRate, config.Channels, config.FrameSize, config.Bitrate) + } + + // Serialize Opus configuration using common function + data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX) + + msg := &OutputIPCMessage{ + Magic: c.magicNumber, + Type: OutputMessageTypeOpusConfig, + Length: uint32(len(data)), + Timestamp: time.Now().UnixNano(), + Data: data, + } + + return c.writeMessage(msg) +} + +// writeMessage writes a message to the connection +func (c *AudioOutputClient) writeMessage(msg *OutputIPCMessage) error { + header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp) + + if _, err := c.conn.Write(header); err != nil { + return fmt.Errorf("failed to write header: %w", err) + } + + if msg.Length > 0 && msg.Data != nil { + if _, err := c.conn.Write(msg.Data); err != nil { + return fmt.Errorf("failed to write data: %w", err) + } + } + + atomic.AddInt64(&c.totalFrames, 1) + return nil +} + // GetClientStats returns client performance statistics func (c *AudioOutputClient) GetClientStats() (total, dropped int64) { stats := GetFrameStats(&c.totalFrames, &c.droppedFrames) diff --git a/internal/audio/output_supervisor.go b/internal/audio/output_supervisor.go index b506cab7..74611da4 100644 --- a/internal/audio/output_supervisor.go +++ b/internal/audio/output_supervisor.go @@ -125,6 +125,12 @@ func (s *AudioOutputSupervisor) Start() error { // Start the supervision loop go s.supervisionLoop() + // Establish IPC connection to subprocess after a brief delay + go func() { + time.Sleep(500 * time.Millisecond) // Wait for subprocess to start + s.connectClient() + }() + s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component started successfully") return nil } @@ -274,3 +280,43 @@ func (s *AudioOutputSupervisor) calculateRestartDelay() time.Duration { return delay } + +// client holds the IPC client for communicating with the subprocess +var outputClient *AudioOutputClient + +// IsConnected returns whether the supervisor has an active connection to the subprocess +func (s *AudioOutputSupervisor) IsConnected() bool { + return outputClient != nil && outputClient.IsConnected() +} + +// GetClient returns the IPC client for the subprocess +func (s *AudioOutputSupervisor) GetClient() *AudioOutputClient { + return outputClient +} + +// connectClient establishes connection to the audio output subprocess +func (s *AudioOutputSupervisor) connectClient() { + if outputClient == nil { + outputClient = NewAudioOutputClient() + } + + // Try to connect to the subprocess + if err := outputClient.Connect(); err != nil { + s.logger.Warn().Err(err).Msg("Failed to connect to audio output subprocess") + } else { + s.logger.Info().Msg("Connected to audio output subprocess") + } +} + +// SendOpusConfig sends Opus configuration to the audio output subprocess +func (s *AudioOutputSupervisor) SendOpusConfig(config OutputIPCOpusConfig) error { + if outputClient == nil { + return fmt.Errorf("client not initialized") + } + + if !outputClient.IsConnected() { + return fmt.Errorf("client not connected") + } + + return outputClient.SendOpusConfig(config) +} diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index bedfa104..b41bad56 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -211,33 +211,46 @@ func SetAudioQuality(quality AudioQuality) { // Set new OPUS configuration for future restarts if supervisor := GetAudioOutputSupervisor(); supervisor != nil { supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx) - } - // Send dynamic configuration update to running audio output - vbrConstraint := Config.CGOOpusVBRConstraint - if err := updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx); err != nil { - logger.Warn().Err(err).Msg("failed to update OPUS encoder parameters dynamically") - // Fallback to subprocess restart if dynamic update fails - if supervisor := GetAudioOutputSupervisor(); supervisor != nil { - logger.Info().Msg("falling back to subprocess restart") - supervisor.Stop() - if err := supervisor.Start(); err != nil { - logger.Error().Err(err).Msg("failed to restart audio output subprocess after dynamic update failure") + // Send dynamic configuration update to running subprocess via IPC + if supervisor.IsConnected() { + // Convert AudioConfig to OutputIPCOpusConfig with complete Opus parameters + opusConfig := OutputIPCOpusConfig{ + SampleRate: config.SampleRate, + Channels: config.Channels, + FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples + Bitrate: config.Bitrate * 1000, // Convert kbps to bps + Complexity: complexity, + VBR: vbr, + SignalType: signalType, + Bandwidth: bandwidth, + DTX: dtx, } - } - } else { - logger.Info().Msg("audio output quality updated dynamically") - // Reset audio output stats after config update - // Allow adaptive buffer manager to naturally adjust buffer sizes - go func() { - time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle - // Reset audio input server stats to clear persistent warnings - ResetGlobalAudioInputServerStats() - // Attempt recovery if there are still issues - time.Sleep(1 * time.Second) - RecoverGlobalAudioInputServer() - }() + logger.Info().Interface("opusConfig", opusConfig).Msg("sending Opus configuration to audio output subprocess") + if err := supervisor.SendOpusConfig(opusConfig); err != nil { + logger.Warn().Err(err).Msg("failed to send dynamic Opus config update via IPC, falling back to subprocess restart") + // Fallback to subprocess restart if IPC update fails + supervisor.Stop() + if err := supervisor.Start(); err != nil { + logger.Error().Err(err).Msg("failed to restart audio output subprocess after IPC update failure") + } + } else { + logger.Info().Msg("audio output quality updated dynamically via IPC") + + // Reset audio output stats after config update + go func() { + time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle + // Reset audio input server stats to clear persistent warnings + ResetGlobalAudioInputServerStats() + // Attempt recovery if there are still issues + time.Sleep(1 * time.Second) + RecoverGlobalAudioInputServer() + }() + } + } else { + logger.Info().Bool("supervisor_running", supervisor.IsRunning()).Msg("audio output subprocess not connected, configuration will apply on next start") + } } } }