[WIP] Add debug logging throughout the audio system

This commit is contained in:
Alex P 2025-09-16 11:26:48 +03:00
parent fb98c4edcb
commit 5e257b3144
5 changed files with 82 additions and 68 deletions

View File

@ -457,32 +457,22 @@ int jetkvm_audio_decode_write(void *opus_buf, int opus_size) {
// Safety checks // Safety checks
if (!playback_initialized || !pcm_playback_handle || !decoder || !opus_buf || opus_size <= 0) { if (!playback_initialized || !pcm_playback_handle || !decoder || !opus_buf || opus_size <= 0) {
printf("[AUDIO] jetkvm_audio_decode_write: Failed safety checks - playback_initialized=%d, pcm_playback_handle=%p, decoder=%p, opus_buf=%p, opus_size=%d\n",
playback_initialized, pcm_playback_handle, decoder, opus_buf, opus_size);
return -1; return -1;
} }
// Additional bounds checking // Additional bounds checking
if (opus_size > max_packet_size) { if (opus_size > max_packet_size) {
printf("[AUDIO] jetkvm_audio_decode_write: Opus packet too large - size=%d, max=%d\n", opus_size, max_packet_size);
return -1; return -1;
} }
printf("[AUDIO] jetkvm_audio_decode_write: Processing Opus packet - size=%d bytes\n", opus_size);
// Decode Opus to PCM with error handling // Decode Opus to PCM with error handling
int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); int pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0);
if (pcm_frames < 0) { if (pcm_frames < 0) {
printf("[AUDIO] jetkvm_audio_decode_write: Opus decode failed with error %d, attempting packet loss concealment\n", pcm_frames);
// Try packet loss concealment on decode error // Try packet loss concealment on decode error
pcm_frames = opus_decode(decoder, NULL, 0, pcm_buffer, frame_size, 0); pcm_frames = opus_decode(decoder, NULL, 0, pcm_buffer, frame_size, 0);
if (pcm_frames < 0) { if (pcm_frames < 0) {
printf("[AUDIO] jetkvm_audio_decode_write: Packet loss concealment also failed with error %d\n", pcm_frames);
return -1; return -1;
} }
printf("[AUDIO] jetkvm_audio_decode_write: Packet loss concealment succeeded, recovered %d frames\n", pcm_frames);
} else {
printf("[AUDIO] jetkvm_audio_decode_write: Opus decode successful - decoded %d PCM frames\n", pcm_frames);
} }
retry_write: retry_write:

View File

@ -29,24 +29,22 @@ var (
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers // OptimizedIPCMessage represents an optimized message with pre-allocated buffers
type OptimizedIPCMessage struct { type OptimizedIPCMessage struct {
header [17]byte // Pre-allocated header buffer (headerSize = 17) header [17]byte
data []byte // Reusable data buffer data []byte
msg UnifiedIPCMessage // Embedded message msg UnifiedIPCMessage
} }
// MessagePool manages a pool of reusable messages to reduce allocations // MessagePool manages a pool of reusable messages to reduce allocations
type MessagePool struct { type MessagePool struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) hitCount int64
hitCount int64 // Pool hit counter (atomic) missCount int64
missCount int64 // Pool miss counter (atomic)
// Other fields
pool chan *OptimizedIPCMessage pool chan *OptimizedIPCMessage
// Memory optimization fields
preallocated []*OptimizedIPCMessage // Pre-allocated messages for immediate use preallocated []*OptimizedIPCMessage
preallocSize int // Number of pre-allocated messages preallocSize int
maxPoolSize int // Maximum pool size to prevent memory bloat maxPoolSize int
mutex sync.RWMutex // Protects preallocated slice mutex sync.RWMutex
} }
// Global message pool instance // Global message pool instance
@ -152,30 +150,25 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
} }
} }
// AudioInputServer handles IPC communication for audio input processing
type AudioInputServer struct { type AudioInputServer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) bufferSize int64
bufferSize int64 // Current buffer size (atomic) processingTime int64
processingTime int64 // Average processing time in nanoseconds (atomic) droppedFrames int64
droppedFrames int64 // Dropped frames counter (atomic) totalFrames int64
totalFrames int64 // Total frames counter (atomic)
listener net.Listener listener net.Listener
conn net.Conn conn net.Conn
mtx sync.Mutex mtx sync.Mutex
running bool running bool
// Triple-goroutine architecture messageChan chan *UnifiedIPCMessage
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages processChan chan *UnifiedIPCMessage
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue stopChan chan struct{}
stopChan chan struct{} // Stop signal for all goroutines wg sync.WaitGroup
wg sync.WaitGroup // Wait group for goroutine coordination
// Channel resizing support channelMutex sync.RWMutex
channelMutex sync.RWMutex // Protects channel recreation lastBufferSize int64
lastBufferSize int64 // Last known buffer size for change detection
// Socket buffer configuration
socketBufferConfig SocketBufferConfig socketBufferConfig SocketBufferConfig
} }

View File

@ -18,10 +18,9 @@ var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePo
// AudioOutputServer provides audio output IPC functionality // AudioOutputServer provides audio output IPC functionality
type AudioOutputServer struct { type AudioOutputServer struct {
// Atomic counters bufferSize int64
bufferSize int64 // Current buffer size (atomic) droppedFrames int64
droppedFrames int64 // Dropped frames counter (atomic) totalFrames int64
totalFrames int64 // Total frames counter (atomic)
listener net.Listener listener net.Listener
conn net.Conn conn net.Conn
@ -29,12 +28,10 @@ type AudioOutputServer struct {
running bool running bool
logger zerolog.Logger logger zerolog.Logger
// Message channels messageChan chan *UnifiedIPCMessage
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages processChan chan *UnifiedIPCMessage
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue wg sync.WaitGroup
wg sync.WaitGroup // Wait group for goroutine coordination
// Configuration
socketPath string socketPath string
magicNumber uint32 magicNumber uint32
} }
@ -265,6 +262,17 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error {
return fmt.Errorf("no client connected") return fmt.Errorf("no client connected")
} }
// Zero-cost trace logging for frame transmission
if s.logger.GetLevel() <= zerolog.TraceLevel {
totalFrames := atomic.LoadInt64(&s.totalFrames)
if totalFrames <= 5 || totalFrames%1000 == 1 {
s.logger.Trace().
Int("frame_size", len(frame)).
Int64("total_frames_sent", totalFrames).
Msg("Sending audio frame to output client")
}
}
msg := &UnifiedIPCMessage{ msg := &UnifiedIPCMessage{
Magic: s.magicNumber, Magic: s.magicNumber,
Type: MessageTypeOpusFrame, Type: MessageTypeOpusFrame,
@ -301,9 +309,8 @@ func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize i
// AudioOutputClient provides audio output IPC client functionality // AudioOutputClient provides audio output IPC client functionality
type AudioOutputClient struct { type AudioOutputClient struct {
// Atomic counters droppedFrames int64
droppedFrames int64 // Atomic counter for dropped frames totalFrames int64
totalFrames int64 // Atomic counter for total frames
conn net.Conn conn net.Conn
mtx sync.Mutex mtx sync.Mutex
@ -311,10 +318,9 @@ type AudioOutputClient struct {
logger zerolog.Logger logger zerolog.Logger
socketPath string socketPath string
magicNumber uint32 magicNumber uint32
bufferPool *AudioBufferPool // Buffer pool for memory optimization bufferPool *AudioBufferPool
// Health monitoring autoReconnect bool
autoReconnect bool // Enable automatic reconnection
} }
func NewAudioOutputClient() *AudioOutputClient { func NewAudioOutputClient() *AudioOutputClient {
@ -405,6 +411,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
} }
size := binary.LittleEndian.Uint32(optMsg.header[5:9]) size := binary.LittleEndian.Uint32(optMsg.header[5:9])
timestamp := int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
maxFrameSize := Config.OutputMaxFrameSize maxFrameSize := Config.OutputMaxFrameSize
if int(size) > maxFrameSize { if int(size) > maxFrameSize {
return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize) return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize)
@ -423,6 +430,19 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
// Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer() // Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer()
atomic.AddInt64(&c.totalFrames, 1) atomic.AddInt64(&c.totalFrames, 1)
// Zero-cost trace logging for frame reception
if c.logger.GetLevel() <= zerolog.TraceLevel {
totalFrames := atomic.LoadInt64(&c.totalFrames)
if totalFrames <= 5 || totalFrames%1000 == 1 {
c.logger.Trace().
Int("frame_size", int(size)).
Int64("timestamp", timestamp).
Int64("total_frames_received", totalFrames).
Msg("Received audio frame from output server")
}
}
return frame, nil return frame, nil
} }

View File

@ -30,22 +30,6 @@ func getOutputStreamingLogger() *zerolog.Logger {
return outputStreamingLogger return outputStreamingLogger
} }
// Removed unused NewAudioOutputStreamer function
// Removed unused AudioOutputStreamer.Start method
// Removed unused AudioOutputStreamer.Stop method
// Removed unused AudioOutputStreamer.streamLoop method
// Removed unused AudioOutputStreamer.processingLoop method
// Removed unused AudioOutputStreamer.statisticsLoop method
// Removed unused AudioOutputStreamer.reportStatistics method
// Removed all unused AudioOutputStreamer methods
// StartAudioOutputStreaming starts audio output streaming (capturing system audio) // StartAudioOutputStreaming starts audio output streaming (capturing system audio)
func StartAudioOutputStreaming(send func([]byte)) error { func StartAudioOutputStreaming(send func([]byte)) error {
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) { if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
@ -84,6 +68,7 @@ func StartAudioOutputStreaming(send func([]byte)) error {
maxConsecutiveErrors := Config.MaxConsecutiveErrors maxConsecutiveErrors := Config.MaxConsecutiveErrors
errorBackoffDelay := Config.RetryDelay errorBackoffDelay := Config.RetryDelay
maxErrorBackoff := Config.MaxRetryDelay maxErrorBackoff := Config.MaxRetryDelay
var frameCount int64
for { for {
select { select {
@ -143,11 +128,25 @@ func StartAudioOutputStreaming(send func([]byte)) error {
} }
if n > 0 { if n > 0 {
frameCount++
// Get frame buffer from pool to reduce allocations // Get frame buffer from pool to reduce allocations
frame := GetAudioFrameBuffer() frame := GetAudioFrameBuffer()
frame = frame[:n] // Resize to actual frame size frame = frame[:n] // Resize to actual frame size
copy(frame, buffer[:n]) copy(frame, buffer[:n])
// Zero-cost trace logging for output frame processing
logger := getOutputStreamingLogger()
if logger.GetLevel() <= zerolog.TraceLevel {
if frameCount <= 5 || frameCount%1000 == 1 {
logger.Trace().
Int("frame_size", n).
Int("buffer_capacity", cap(frame)).
Int64("total_frames_sent", frameCount).
Msg("Audio output frame captured and buffered")
}
}
// Validate frame before sending // Validate frame before sending
if err := ValidateAudioFrame(frame); err != nil { if err := ValidateAudioFrame(frame); err != nil {
getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame") getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame")
@ -159,6 +158,16 @@ func StartAudioOutputStreaming(send func([]byte)) error {
// Return buffer to pool after sending // Return buffer to pool after sending
PutAudioFrameBuffer(frame) PutAudioFrameBuffer(frame)
RecordFrameReceived(n) RecordFrameReceived(n)
// Zero-cost trace logging for successful frame transmission
if logger.GetLevel() <= zerolog.TraceLevel {
if frameCount <= 5 || frameCount%1000 == 1 {
logger.Trace().
Int("frame_size", n).
Int64("total_frames_sent", frameCount).
Msg("Audio output frame sent successfully")
}
}
} }
// Small delay to prevent busy waiting // Small delay to prevent busy waiting
time.Sleep(Config.ShortSleepDuration) time.Sleep(Config.ShortSleepDuration)

View File

@ -382,10 +382,12 @@ func RecordFrameReceived(bytes int) {
// RecordFrameDropped increments the frames dropped counter with batched updates // RecordFrameDropped increments the frames dropped counter with batched updates
func RecordFrameDropped() { func RecordFrameDropped() {
atomic.AddUint64(&batchedFramesDropped, 1)
} }
// RecordConnectionDrop increments the connection drops counter with batched updates // RecordConnectionDrop increments the connection drops counter with batched updates
func RecordConnectionDrop() { func RecordConnectionDrop() {
atomic.AddUint64(&batchedConnectionDrops, 1)
} }
// flushBatchedMetrics flushes accumulated metrics to the main counters // flushBatchedMetrics flushes accumulated metrics to the main counters