From 5e257b3144fd4b36696a8c7fc2857cb58fad6226 Mon Sep 17 00:00:00 2001 From: Alex P Date: Tue, 16 Sep 2025 11:26:48 +0300 Subject: [PATCH] [WIP] Add debug logging throughout the audio system --- internal/audio/c/audio.c | 10 ------ internal/audio/ipc_input.go | 47 ++++++++++++---------------- internal/audio/ipc_output.go | 50 +++++++++++++++++++++--------- internal/audio/output_streaming.go | 41 ++++++++++++++---------- internal/audio/quality_presets.go | 2 ++ 5 files changed, 82 insertions(+), 68 deletions(-) diff --git a/internal/audio/c/audio.c b/internal/audio/c/audio.c index 47bb863b..c8298c2b 100644 --- a/internal/audio/c/audio.c +++ b/internal/audio/c/audio.c @@ -457,32 +457,22 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { // Safety checks if (!playback_initialized || !pcm_playback_handle || !decoder || !opus_buf || opus_size <= 0) { - printf("[AUDIO] jetkvm_audio_decode_write: Failed safety checks - playback_initialized=%d, pcm_playback_handle=%p, decoder=%p, opus_buf=%p, opus_size=%d\n", - playback_initialized, pcm_playback_handle, decoder, opus_buf, opus_size); return -1; } // Additional bounds checking if (opus_size > max_packet_size) { - printf("[AUDIO] jetkvm_audio_decode_write: Opus packet too large - size=%d, max=%d\n", opus_size, max_packet_size); return -1; } - printf("[AUDIO] jetkvm_audio_decode_write: Processing Opus packet - size=%d bytes\n", opus_size); - // Decode Opus to PCM with error handling int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); if (pcm_frames < 0) { - printf("[AUDIO] jetkvm_audio_decode_write: Opus decode failed with error %d, attempting packet loss concealment\n", pcm_frames); // Try packet loss concealment on decode error pcm_frames = opus_decode(decoder, NULL, 0, pcm_buffer, frame_size, 0); if (pcm_frames < 0) { - printf("[AUDIO] jetkvm_audio_decode_write: Packet loss concealment also failed with error %d\n", pcm_frames); return -1; } - printf("[AUDIO] jetkvm_audio_decode_write: Packet loss concealment succeeded, recovered %d frames\n", pcm_frames); - } else { - printf("[AUDIO] jetkvm_audio_decode_write: Opus decode successful - decoded %d PCM frames\n", pcm_frames); } retry_write: diff --git a/internal/audio/ipc_input.go b/internal/audio/ipc_input.go index 750ed299..f3a489a2 100644 --- a/internal/audio/ipc_input.go +++ b/internal/audio/ipc_input.go @@ -29,24 +29,22 @@ var ( // OptimizedIPCMessage represents an optimized message with pre-allocated buffers type OptimizedIPCMessage struct { - header [17]byte // Pre-allocated header buffer (headerSize = 17) - data []byte // Reusable data buffer - msg UnifiedIPCMessage // Embedded message + header [17]byte + data []byte + msg UnifiedIPCMessage } // MessagePool manages a pool of reusable messages to reduce allocations type MessagePool struct { - // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - hitCount int64 // Pool hit counter (atomic) - missCount int64 // Pool miss counter (atomic) + hitCount int64 + missCount int64 - // Other fields pool chan *OptimizedIPCMessage - // Memory optimization fields - preallocated []*OptimizedIPCMessage // Pre-allocated messages for immediate use - preallocSize int // Number of pre-allocated messages - maxPoolSize int // Maximum pool size to prevent memory bloat - mutex sync.RWMutex // Protects preallocated slice + + preallocated []*OptimizedIPCMessage + preallocSize int + maxPoolSize int + mutex sync.RWMutex } // Global message pool instance @@ -152,30 +150,25 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) { } } -// AudioInputServer handles IPC communication for audio input processing type AudioInputServer struct { - // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) - bufferSize int64 // Current buffer size (atomic) - processingTime int64 // Average processing time in nanoseconds (atomic) - droppedFrames int64 // Dropped frames counter (atomic) - totalFrames int64 // Total frames counter (atomic) + bufferSize int64 + processingTime int64 + droppedFrames int64 + totalFrames int64 listener net.Listener conn net.Conn mtx sync.Mutex running bool - // Triple-goroutine architecture - messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages - processChan chan *UnifiedIPCMessage // Buffered channel for processing queue - stopChan chan struct{} // Stop signal for all goroutines - wg sync.WaitGroup // Wait group for goroutine coordination + messageChan chan *UnifiedIPCMessage + processChan chan *UnifiedIPCMessage + stopChan chan struct{} + wg sync.WaitGroup - // Channel resizing support - channelMutex sync.RWMutex // Protects channel recreation - lastBufferSize int64 // Last known buffer size for change detection + channelMutex sync.RWMutex + lastBufferSize int64 - // Socket buffer configuration socketBufferConfig SocketBufferConfig } diff --git a/internal/audio/ipc_output.go b/internal/audio/ipc_output.go index fbe2bcb5..f5588371 100644 --- a/internal/audio/ipc_output.go +++ b/internal/audio/ipc_output.go @@ -18,10 +18,9 @@ var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePo // AudioOutputServer provides audio output IPC functionality type AudioOutputServer struct { - // Atomic counters - bufferSize int64 // Current buffer size (atomic) - droppedFrames int64 // Dropped frames counter (atomic) - totalFrames int64 // Total frames counter (atomic) + bufferSize int64 + droppedFrames int64 + totalFrames int64 listener net.Listener conn net.Conn @@ -29,12 +28,10 @@ type AudioOutputServer struct { running bool logger zerolog.Logger - // Message channels - messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages - processChan chan *UnifiedIPCMessage // Buffered channel for processing queue - wg sync.WaitGroup // Wait group for goroutine coordination + messageChan chan *UnifiedIPCMessage + processChan chan *UnifiedIPCMessage + wg sync.WaitGroup - // Configuration socketPath string magicNumber uint32 } @@ -265,6 +262,17 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error { return fmt.Errorf("no client connected") } + // Zero-cost trace logging for frame transmission + if s.logger.GetLevel() <= zerolog.TraceLevel { + totalFrames := atomic.LoadInt64(&s.totalFrames) + if totalFrames <= 5 || totalFrames%1000 == 1 { + s.logger.Trace(). + Int("frame_size", len(frame)). + Int64("total_frames_sent", totalFrames). + Msg("Sending audio frame to output client") + } + } + msg := &UnifiedIPCMessage{ Magic: s.magicNumber, Type: MessageTypeOpusFrame, @@ -301,9 +309,8 @@ func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize i // AudioOutputClient provides audio output IPC client functionality type AudioOutputClient struct { - // Atomic counters - droppedFrames int64 // Atomic counter for dropped frames - totalFrames int64 // Atomic counter for total frames + droppedFrames int64 + totalFrames int64 conn net.Conn mtx sync.Mutex @@ -311,10 +318,9 @@ type AudioOutputClient struct { logger zerolog.Logger socketPath string magicNumber uint32 - bufferPool *AudioBufferPool // Buffer pool for memory optimization + bufferPool *AudioBufferPool - // Health monitoring - autoReconnect bool // Enable automatic reconnection + autoReconnect bool } func NewAudioOutputClient() *AudioOutputClient { @@ -405,6 +411,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { } size := binary.LittleEndian.Uint32(optMsg.header[5:9]) + timestamp := int64(binary.LittleEndian.Uint64(optMsg.header[9:17])) maxFrameSize := Config.OutputMaxFrameSize if int(size) > maxFrameSize { return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize) @@ -423,6 +430,19 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) { // Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer() atomic.AddInt64(&c.totalFrames, 1) + + // Zero-cost trace logging for frame reception + if c.logger.GetLevel() <= zerolog.TraceLevel { + totalFrames := atomic.LoadInt64(&c.totalFrames) + if totalFrames <= 5 || totalFrames%1000 == 1 { + c.logger.Trace(). + Int("frame_size", int(size)). + Int64("timestamp", timestamp). + Int64("total_frames_received", totalFrames). + Msg("Received audio frame from output server") + } + } + return frame, nil } diff --git a/internal/audio/output_streaming.go b/internal/audio/output_streaming.go index 5f9f60d9..f6cdea9d 100644 --- a/internal/audio/output_streaming.go +++ b/internal/audio/output_streaming.go @@ -30,22 +30,6 @@ func getOutputStreamingLogger() *zerolog.Logger { return outputStreamingLogger } -// Removed unused NewAudioOutputStreamer function - -// Removed unused AudioOutputStreamer.Start method - -// Removed unused AudioOutputStreamer.Stop method - -// Removed unused AudioOutputStreamer.streamLoop method - -// Removed unused AudioOutputStreamer.processingLoop method - -// Removed unused AudioOutputStreamer.statisticsLoop method - -// Removed unused AudioOutputStreamer.reportStatistics method - -// Removed all unused AudioOutputStreamer methods - // StartAudioOutputStreaming starts audio output streaming (capturing system audio) func StartAudioOutputStreaming(send func([]byte)) error { if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { @@ -84,6 +68,7 @@ func StartAudioOutputStreaming(send func([]byte)) error { maxConsecutiveErrors := Config.MaxConsecutiveErrors errorBackoffDelay := Config.RetryDelay maxErrorBackoff := Config.MaxRetryDelay + var frameCount int64 for { select { @@ -143,11 +128,25 @@ func StartAudioOutputStreaming(send func([]byte)) error { } if n > 0 { + frameCount++ + // Get frame buffer from pool to reduce allocations frame := GetAudioFrameBuffer() frame = frame[:n] // Resize to actual frame size copy(frame, buffer[:n]) + // Zero-cost trace logging for output frame processing + logger := getOutputStreamingLogger() + if logger.GetLevel() <= zerolog.TraceLevel { + if frameCount <= 5 || frameCount%1000 == 1 { + logger.Trace(). + Int("frame_size", n). + Int("buffer_capacity", cap(frame)). + Int64("total_frames_sent", frameCount). + Msg("Audio output frame captured and buffered") + } + } + // Validate frame before sending if err := ValidateAudioFrame(frame); err != nil { getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame") @@ -159,6 +158,16 @@ func StartAudioOutputStreaming(send func([]byte)) error { // Return buffer to pool after sending PutAudioFrameBuffer(frame) RecordFrameReceived(n) + + // Zero-cost trace logging for successful frame transmission + if logger.GetLevel() <= zerolog.TraceLevel { + if frameCount <= 5 || frameCount%1000 == 1 { + logger.Trace(). + Int("frame_size", n). + Int64("total_frames_sent", frameCount). + Msg("Audio output frame sent successfully") + } + } } // Small delay to prevent busy waiting time.Sleep(Config.ShortSleepDuration) diff --git a/internal/audio/quality_presets.go b/internal/audio/quality_presets.go index 18a314aa..d57a61ff 100644 --- a/internal/audio/quality_presets.go +++ b/internal/audio/quality_presets.go @@ -382,10 +382,12 @@ func RecordFrameReceived(bytes int) { // RecordFrameDropped increments the frames dropped counter with batched updates func RecordFrameDropped() { + atomic.AddUint64(&batchedFramesDropped, 1) } // RecordConnectionDrop increments the connection drops counter with batched updates func RecordConnectionDrop() { + atomic.AddUint64(&batchedConnectionDrops, 1) } // flushBatchedMetrics flushes accumulated metrics to the main counters