[Milestone] Improvement: In-flight audio output quality update

This commit is contained in:
Alex P 2025-09-09 07:44:37 +00:00
parent 2bc7e50391
commit 3c6184d0e8
4 changed files with 265 additions and 37 deletions

View File

@ -87,10 +87,10 @@ static volatile int playback_initialized = 0;
// Function to dynamically update Opus encoder parameters
int update_opus_encoder_params(int bitrate, int complexity, int vbr, int vbr_constraint,
int signal_type, int bandwidth, int dtx) {
// This function is specifically for audio OUTPUT encoder parameters
// Only require playback initialization for audio output quality changes
if (!encoder || !playback_initialized) {
return -1; // Audio output encoder not initialized
// This function works for both audio input and output encoder parameters
// Require either capture (output) or playback (input) initialization
if (!encoder || (!capture_initialized && !playback_initialized)) {
return -1; // Audio encoder not initialized
}
// Update the static variables

View File

@ -15,16 +15,18 @@ import (
// Legacy aliases for backward compatibility
type OutputIPCConfig = UnifiedIPCConfig
type OutputIPCOpusConfig = UnifiedIPCOpusConfig
type OutputMessageType = UnifiedMessageType
type OutputIPCMessage = UnifiedIPCMessage
// Legacy constants for backward compatibility
const (
OutputMessageTypeOpusFrame = MessageTypeOpusFrame
OutputMessageTypeConfig = MessageTypeConfig
OutputMessageTypeStop = MessageTypeStop
OutputMessageTypeHeartbeat = MessageTypeHeartbeat
OutputMessageTypeAck = MessageTypeAck
OutputMessageTypeOpusFrame = MessageTypeOpusFrame
OutputMessageTypeConfig = MessageTypeConfig
OutputMessageTypeOpusConfig = MessageTypeOpusConfig
OutputMessageTypeStop = MessageTypeStop
OutputMessageTypeHeartbeat = MessageTypeHeartbeat
OutputMessageTypeAck = MessageTypeAck
)
// Methods are now inherited from UnifiedIPCMessage
@ -142,14 +144,134 @@ func (s *AudioOutputServer) acceptConnections() {
s.mtx.Unlock()
s.logger.Info().Msg("Client connected to audio output server")
// Only handle one connection at a time for simplicity
for s.running && s.conn != nil {
// Keep connection alive until stopped or disconnected
time.Sleep(100 * time.Millisecond)
// Start message processing for this connection
s.wg.Add(1)
go s.handleConnection(conn)
}
}
// handleConnection processes messages from a client connection
func (s *AudioOutputServer) handleConnection(conn net.Conn) {
defer s.wg.Done()
defer conn.Close()
for s.running {
msg, err := s.readMessage(conn)
if err != nil {
if s.running {
s.logger.Error().Err(err).Msg("Failed to read message from client")
}
return
}
if err := s.processMessage(msg); err != nil {
s.logger.Error().Err(err).Msg("Failed to process message")
}
}
}
// readMessage reads a message from the connection
func (s *AudioOutputServer) readMessage(conn net.Conn) (*OutputIPCMessage, error) {
header := make([]byte, 17)
if _, err := io.ReadFull(conn, header); err != nil {
return nil, fmt.Errorf("failed to read header: %w", err)
}
magic := binary.LittleEndian.Uint32(header[0:4])
if magic != s.magicNumber {
return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic)
}
msgType := OutputMessageType(header[4])
length := binary.LittleEndian.Uint32(header[5:9])
timestamp := int64(binary.LittleEndian.Uint64(header[9:17]))
var data []byte
if length > 0 {
data = make([]byte, length)
if _, err := io.ReadFull(conn, data); err != nil {
return nil, fmt.Errorf("failed to read data: %w", err)
}
}
return &OutputIPCMessage{
Magic: magic,
Type: msgType,
Length: length,
Timestamp: timestamp,
Data: data,
}, nil
}
// processMessage processes a received message
func (s *AudioOutputServer) processMessage(msg *OutputIPCMessage) error {
switch msg.Type {
case OutputMessageTypeOpusConfig:
return s.processOpusConfig(msg.Data)
case OutputMessageTypeStop:
s.logger.Info().Msg("Received stop message")
return nil
case OutputMessageTypeHeartbeat:
s.logger.Debug().Msg("Received heartbeat")
return nil
default:
s.logger.Warn().Int("type", int(msg.Type)).Msg("Unknown message type")
return nil
}
}
// processOpusConfig processes Opus configuration updates
func (s *AudioOutputServer) processOpusConfig(data []byte) error {
// Validate configuration data size (9 * int32 = 36 bytes)
if len(data) != 36 {
return fmt.Errorf("invalid Opus configuration data size: expected 36 bytes, got %d", len(data))
}
// Decode Opus configuration
config := OutputIPCOpusConfig{
SampleRate: int(binary.LittleEndian.Uint32(data[0:4])),
Channels: int(binary.LittleEndian.Uint32(data[4:8])),
FrameSize: int(binary.LittleEndian.Uint32(data[8:12])),
Bitrate: int(binary.LittleEndian.Uint32(data[12:16])),
Complexity: int(binary.LittleEndian.Uint32(data[16:20])),
VBR: int(binary.LittleEndian.Uint32(data[20:24])),
SignalType: int(binary.LittleEndian.Uint32(data[24:28])),
Bandwidth: int(binary.LittleEndian.Uint32(data[28:32])),
DTX: int(binary.LittleEndian.Uint32(data[32:36])),
}
s.logger.Info().Interface("config", config).Msg("Received Opus configuration update")
// Ensure we're running in the audio server subprocess
if !isAudioServerProcess() {
s.logger.Warn().Msg("Opus configuration update ignored - not running in audio server subprocess")
return nil
}
// Check if audio output streaming is currently active
if atomic.LoadInt32(&outputStreamingRunning) == 0 {
s.logger.Info().Msg("Audio output streaming not active, configuration will be applied when streaming starts")
return nil
}
// Ensure capture is initialized before updating encoder parameters
// The C function requires both encoder and capture_initialized to be true
if err := cgoAudioInit(); err != nil {
s.logger.Debug().Err(err).Msg("Audio capture already initialized or initialization failed")
// Continue anyway - capture may already be initialized
}
// Apply configuration using CGO function (only if audio system is running)
vbrConstraint := Config.CGOOpusVBRConstraint
if err := updateOpusEncoderParams(config.Bitrate, config.Complexity, config.VBR, vbrConstraint, config.SignalType, config.Bandwidth, config.DTX); err != nil {
s.logger.Error().Err(err).Msg("Failed to update Opus encoder parameters - encoder may not be initialized")
return err
}
s.logger.Info().Msg("Opus encoder parameters updated successfully")
return nil
}
// SendFrame sends an audio frame to the client
func (s *AudioOutputServer) SendFrame(frame []byte) error {
s.mtx.Lock()
@ -320,6 +442,53 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
return frame, nil
}
// SendOpusConfig sends Opus configuration to the audio output server
func (c *AudioOutputClient) SendOpusConfig(config OutputIPCOpusConfig) error {
c.mtx.Lock()
defer c.mtx.Unlock()
if !c.running || c.conn == nil {
return fmt.Errorf("not connected to audio output server")
}
// Validate configuration parameters
if config.SampleRate <= 0 || config.Channels <= 0 || config.FrameSize <= 0 || config.Bitrate <= 0 {
return fmt.Errorf("invalid Opus configuration: SampleRate=%d, Channels=%d, FrameSize=%d, Bitrate=%d",
config.SampleRate, config.Channels, config.FrameSize, config.Bitrate)
}
// Serialize Opus configuration using common function
data := EncodeOpusConfig(config.SampleRate, config.Channels, config.FrameSize, config.Bitrate, config.Complexity, config.VBR, config.SignalType, config.Bandwidth, config.DTX)
msg := &OutputIPCMessage{
Magic: c.magicNumber,
Type: OutputMessageTypeOpusConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
}
return c.writeMessage(msg)
}
// writeMessage writes a message to the connection
func (c *AudioOutputClient) writeMessage(msg *OutputIPCMessage) error {
header := EncodeMessageHeader(msg.Magic, uint8(msg.Type), msg.Length, msg.Timestamp)
if _, err := c.conn.Write(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
if msg.Length > 0 && msg.Data != nil {
if _, err := c.conn.Write(msg.Data); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
}
atomic.AddInt64(&c.totalFrames, 1)
return nil
}
// GetClientStats returns client performance statistics
func (c *AudioOutputClient) GetClientStats() (total, dropped int64) {
stats := GetFrameStats(&c.totalFrames, &c.droppedFrames)

View File

@ -125,6 +125,12 @@ func (s *AudioOutputSupervisor) Start() error {
// Start the supervision loop
go s.supervisionLoop()
// Establish IPC connection to subprocess after a brief delay
go func() {
time.Sleep(500 * time.Millisecond) // Wait for subprocess to start
s.connectClient()
}()
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component started successfully")
return nil
}
@ -274,3 +280,43 @@ func (s *AudioOutputSupervisor) calculateRestartDelay() time.Duration {
return delay
}
// client holds the IPC client for communicating with the subprocess
var outputClient *AudioOutputClient
// IsConnected returns whether the supervisor has an active connection to the subprocess
func (s *AudioOutputSupervisor) IsConnected() bool {
return outputClient != nil && outputClient.IsConnected()
}
// GetClient returns the IPC client for the subprocess
func (s *AudioOutputSupervisor) GetClient() *AudioOutputClient {
return outputClient
}
// connectClient establishes connection to the audio output subprocess
func (s *AudioOutputSupervisor) connectClient() {
if outputClient == nil {
outputClient = NewAudioOutputClient()
}
// Try to connect to the subprocess
if err := outputClient.Connect(); err != nil {
s.logger.Warn().Err(err).Msg("Failed to connect to audio output subprocess")
} else {
s.logger.Info().Msg("Connected to audio output subprocess")
}
}
// SendOpusConfig sends Opus configuration to the audio output subprocess
func (s *AudioOutputSupervisor) SendOpusConfig(config OutputIPCOpusConfig) error {
if outputClient == nil {
return fmt.Errorf("client not initialized")
}
if !outputClient.IsConnected() {
return fmt.Errorf("client not connected")
}
return outputClient.SendOpusConfig(config)
}

View File

@ -211,33 +211,46 @@ func SetAudioQuality(quality AudioQuality) {
// Set new OPUS configuration for future restarts
if supervisor := GetAudioOutputSupervisor(); supervisor != nil {
supervisor.SetOpusConfig(config.Bitrate*1000, complexity, vbr, signalType, bandwidth, dtx)
}
// Send dynamic configuration update to running audio output
vbrConstraint := Config.CGOOpusVBRConstraint
if err := updateOpusEncoderParams(config.Bitrate*1000, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx); err != nil {
logger.Warn().Err(err).Msg("failed to update OPUS encoder parameters dynamically")
// Fallback to subprocess restart if dynamic update fails
if supervisor := GetAudioOutputSupervisor(); supervisor != nil {
logger.Info().Msg("falling back to subprocess restart")
supervisor.Stop()
if err := supervisor.Start(); err != nil {
logger.Error().Err(err).Msg("failed to restart audio output subprocess after dynamic update failure")
// Send dynamic configuration update to running subprocess via IPC
if supervisor.IsConnected() {
// Convert AudioConfig to OutputIPCOpusConfig with complete Opus parameters
opusConfig := OutputIPCOpusConfig{
SampleRate: config.SampleRate,
Channels: config.Channels,
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
Bitrate: config.Bitrate * 1000, // Convert kbps to bps
Complexity: complexity,
VBR: vbr,
SignalType: signalType,
Bandwidth: bandwidth,
DTX: dtx,
}
}
} else {
logger.Info().Msg("audio output quality updated dynamically")
// Reset audio output stats after config update
// Allow adaptive buffer manager to naturally adjust buffer sizes
go func() {
time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle
// Reset audio input server stats to clear persistent warnings
ResetGlobalAudioInputServerStats()
// Attempt recovery if there are still issues
time.Sleep(1 * time.Second)
RecoverGlobalAudioInputServer()
}()
logger.Info().Interface("opusConfig", opusConfig).Msg("sending Opus configuration to audio output subprocess")
if err := supervisor.SendOpusConfig(opusConfig); err != nil {
logger.Warn().Err(err).Msg("failed to send dynamic Opus config update via IPC, falling back to subprocess restart")
// Fallback to subprocess restart if IPC update fails
supervisor.Stop()
if err := supervisor.Start(); err != nil {
logger.Error().Err(err).Msg("failed to restart audio output subprocess after IPC update failure")
}
} else {
logger.Info().Msg("audio output quality updated dynamically via IPC")
// Reset audio output stats after config update
go func() {
time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle
// Reset audio input server stats to clear persistent warnings
ResetGlobalAudioInputServerStats()
// Attempt recovery if there are still issues
time.Sleep(1 * time.Second)
RecoverGlobalAudioInputServer()
}()
}
} else {
logger.Info().Bool("supervisor_running", supervisor.IsRunning()).Msg("audio output subprocess not connected, configuration will apply on next start")
}
}
}
}