From f71d18039b4c8b91877d9f214121e0fc5179c64e Mon Sep 17 00:00:00 2001 From: Alex P Date: Tue, 9 Sep 2025 06:59:55 +0000 Subject: [PATCH] [WIP] Cleanup: reduce PR complexity --- internal/audio/ipc_output.go | 261 ++++++++++++++++++++++++++++++---- internal/audio/ipc_unified.go | 1 - 2 files changed, 234 insertions(+), 28 deletions(-) diff --git a/internal/audio/ipc_output.go b/internal/audio/ipc_output.go index 473b7f70..d97f2dad 100644 --- a/internal/audio/ipc_output.go +++ b/internal/audio/ipc_output.go @@ -4,7 +4,13 @@ import ( "encoding/binary" "fmt" "io" + "net" + "sync" "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" ) // Legacy aliases for backward compatibility @@ -26,45 +32,247 @@ const ( // Global shared message pool for output IPC client header reading var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePoolSize) -// AudioOutputServer is now an alias for UnifiedAudioServer -type AudioOutputServer = UnifiedAudioServer +// AudioOutputServer provides audio output IPC functionality +type AudioOutputServer struct { + // Atomic counters + bufferSize int64 // Current buffer size (atomic) + droppedFrames int64 // Dropped frames counter (atomic) + totalFrames int64 // Total frames counter (atomic) + + listener net.Listener + conn net.Conn + mtx sync.Mutex + running bool + logger zerolog.Logger + + // Message channels + messageChan chan *OutputIPCMessage // Buffered channel for incoming messages + processChan chan *OutputIPCMessage // Buffered channel for processing queue + wg sync.WaitGroup // Wait group for goroutine coordination + + // Configuration + socketPath string + magicNumber uint32 +} func NewAudioOutputServer() (*AudioOutputServer, error) { - return NewUnifiedAudioServer(false) // false = output server + socketPath := getOutputSocketPath() + logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger() + + server := &AudioOutputServer{ + socketPath: socketPath, + magicNumber: Config.OutputMagicNumber, + logger: logger, + messageChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize), + processChan: make(chan *OutputIPCMessage, Config.ChannelBufferSize), + } + + return server, nil } -// Start method is now inherited from UnifiedAudioServer - -// acceptConnections method is now inherited from UnifiedAudioServer - -// startProcessorGoroutine method is now inherited from UnifiedAudioServer - -// Stop method is now inherited from UnifiedAudioServer - -// Close method is now inherited from UnifiedAudioServer - -// SendFrame method is now inherited from UnifiedAudioServer - // GetServerStats returns server performance statistics -func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize int64) { - stats := GetFrameStats(&s.totalFrames, &s.droppedFrames) - return stats.Total, stats.Dropped, atomic.LoadInt64(&s.bufferSize) +// Start starts the audio output server +func (s *AudioOutputServer) Start() error { + s.mtx.Lock() + defer s.mtx.Unlock() + + if s.running { + return fmt.Errorf("audio output server is already running") + } + + // Create Unix socket + listener, err := net.Listen("unix", s.socketPath) + if err != nil { + return fmt.Errorf("failed to create unix socket: %w", err) + } + + s.listener = listener + s.running = true + + // Start goroutines + s.wg.Add(1) + go s.acceptConnections() + + s.logger.Info().Str("socket_path", s.socketPath).Msg("Audio output server started") + return nil } -// AudioOutputClient is now an alias for UnifiedAudioClient -type AudioOutputClient = UnifiedAudioClient +// Stop stops the audio output server +func (s *AudioOutputServer) Stop() { + s.mtx.Lock() + defer s.mtx.Unlock() + + if !s.running { + return + } + + s.running = false + + if s.listener != nil { + s.listener.Close() + } + + if s.conn != nil { + s.conn.Close() + } + + // Close channels + close(s.messageChan) + close(s.processChan) + + s.wg.Wait() + s.logger.Info().Msg("Audio output server stopped") +} + +// acceptConnections handles incoming connections +func (s *AudioOutputServer) acceptConnections() { + defer s.wg.Done() + + for s.running { + conn, err := s.listener.Accept() + if err != nil { + if s.running { + s.logger.Error().Err(err).Msg("Failed to accept connection") + } + return + } + + s.mtx.Lock() + s.conn = conn + s.mtx.Unlock() + + s.logger.Info().Msg("Client connected to audio output server") + // Only handle one connection at a time for simplicity + for s.running && s.conn != nil { + // Keep connection alive until stopped or disconnected + time.Sleep(100 * time.Millisecond) + } + } +} + +// SendFrame sends an audio frame to the client +func (s *AudioOutputServer) SendFrame(frame []byte) error { + s.mtx.Lock() + conn := s.conn + s.mtx.Unlock() + + if conn == nil { + return fmt.Errorf("no client connected") + } + + msg := &OutputIPCMessage{ + Magic: s.magicNumber, + Type: OutputMessageTypeOpusFrame, + Length: uint32(len(frame)), + Timestamp: time.Now().UnixNano(), + Data: frame, + } + + return s.writeMessage(conn, msg) +} + +// writeMessage writes a message to the connection +func (s *AudioOutputServer) writeMessage(conn net.Conn, msg *OutputIPCMessage) error { + header := make([]byte, 17) + binary.LittleEndian.PutUint32(header[0:4], msg.Magic) + header[4] = uint8(msg.Type) + binary.LittleEndian.PutUint32(header[5:9], msg.Length) + binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp)) + + if _, err := conn.Write(header); err != nil { + return fmt.Errorf("failed to write header: %w", err) + } + + if msg.Length > 0 && msg.Data != nil { + if _, err := conn.Write(msg.Data); err != nil { + return fmt.Errorf("failed to write data: %w", err) + } + } + + atomic.AddInt64(&s.totalFrames, 1) + return nil +} + +func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize int64) { + return atomic.LoadInt64(&s.totalFrames), atomic.LoadInt64(&s.droppedFrames), atomic.LoadInt64(&s.bufferSize) +} + +// AudioOutputClient provides audio output IPC client functionality +type AudioOutputClient struct { + // Atomic counters + droppedFrames int64 // Atomic counter for dropped frames + totalFrames int64 // Atomic counter for total frames + + conn net.Conn + mtx sync.Mutex + running bool + logger zerolog.Logger + socketPath string + magicNumber uint32 + bufferPool *AudioBufferPool // Buffer pool for memory optimization + + // Health monitoring + autoReconnect bool // Enable automatic reconnection +} func NewAudioOutputClient() *AudioOutputClient { - return NewUnifiedAudioClient(false) // false = output client + socketPath := getOutputSocketPath() + logger := logging.GetDefaultLogger().With().Str("component", "audio-output-client").Logger() + + return &AudioOutputClient{ + socketPath: socketPath, + magicNumber: Config.OutputMagicNumber, + logger: logger, + bufferPool: NewAudioBufferPool(Config.MaxFrameSize), + autoReconnect: true, + } } -// Connect method is now inherited from UnifiedAudioClient +// Connect connects to the audio output server +func (c *AudioOutputClient) Connect() error { + c.mtx.Lock() + defer c.mtx.Unlock() -// Disconnect method is now inherited from UnifiedAudioClient + if c.running { + return fmt.Errorf("audio output client is already connected") + } -// IsConnected method is now inherited from UnifiedAudioClient + conn, err := net.Dial("unix", c.socketPath) + if err != nil { + return fmt.Errorf("failed to connect to audio output server: %w", err) + } -// Close method is now inherited from UnifiedAudioClient + c.conn = conn + c.running = true + c.logger.Info().Str("socket_path", c.socketPath).Msg("Connected to audio output server") + return nil +} + +// Disconnect disconnects from the audio output server +func (c *AudioOutputClient) Disconnect() { + c.mtx.Lock() + defer c.mtx.Unlock() + + if !c.running { + return + } + + c.running = false + + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + c.logger.Info().Msg("Disconnected from audio output server") +} + +// IsConnected returns whether the client is connected +func (c *AudioOutputClient) IsConnected() bool { + c.mtx.Lock() + defer c.mtx.Unlock() + return c.running && c.conn != nil +} func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { c.mtx.Lock() @@ -123,5 +331,4 @@ func (c *AudioOutputClient) GetClientStats() (total, dropped int64) { } // Helper functions - -// getOutputSocketPath is now defined in unified_ipc.go +// getOutputSocketPath is defined in ipc_unified.go diff --git a/internal/audio/ipc_unified.go b/internal/audio/ipc_unified.go index 4ff6eea9..14ad0b19 100644 --- a/internal/audio/ipc_unified.go +++ b/internal/audio/ipc_unified.go @@ -98,7 +98,6 @@ type UnifiedIPCOpusConfig struct { // UnifiedAudioServer provides common functionality for both input and output servers type UnifiedAudioServer struct { // Atomic counters for performance monitoring - bufferSize int64 // Current buffer size (atomic) droppedFrames int64 // Dropped frames counter (atomic) totalFrames int64 // Total frames counter (atomic)