refactor(audio): improve error handling and memory management

- remove redundant error logging in audio supervisor stop calls
- add buffer pool for memory optimization in audio relay and ipc
- return default metrics when process is not running
- add channel closed flags to prevent double closing
- standardize component naming and logging
- add comprehensive documentation for audio components
- improve test coverage with new unit tests
This commit is contained in:
Alex P 2025-08-26 14:36:07 +00:00
parent b1f85db7de
commit 6a68e23d12
20 changed files with 2001 additions and 113 deletions

View File

@ -11,7 +11,27 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
// AdaptiveBufferConfig holds configuration for adaptive buffer sizing // AdaptiveBufferConfig holds configuration for the adaptive buffer sizing algorithm.
//
// The adaptive buffer system dynamically adjusts audio buffer sizes based on real-time
// system conditions to optimize the trade-off between latency and stability. The algorithm
// uses multiple factors to make decisions:
//
// 1. System Load Monitoring:
// - CPU usage: High CPU load increases buffer sizes to prevent underruns
// - Memory usage: High memory pressure reduces buffer sizes to conserve RAM
//
// 2. Latency Tracking:
// - Target latency: Optimal latency for the current quality setting
// - Max latency: Hard limit beyond which buffers are aggressively reduced
//
// 3. Adaptation Strategy:
// - Exponential smoothing: Prevents oscillation and provides stable adjustments
// - Discrete steps: Buffer sizes change in fixed increments to avoid instability
// - Hysteresis: Different thresholds for increasing vs decreasing buffer sizes
//
// The algorithm is specifically tuned for embedded ARM systems with limited resources,
// prioritizing stability over absolute minimum latency.
type AdaptiveBufferConfig struct { type AdaptiveBufferConfig struct {
// Buffer size limits (in frames) // Buffer size limits (in frames)
MinBufferSize int MinBufferSize int
@ -156,6 +176,29 @@ func (abm *AdaptiveBufferManager) adaptationLoop() {
} }
// adaptBufferSizes analyzes system conditions and adjusts buffer sizes // adaptBufferSizes analyzes system conditions and adjusts buffer sizes
// adaptBufferSizes implements the core adaptive buffer sizing algorithm.
//
// This function uses a multi-factor approach to determine optimal buffer sizes:
//
// Mathematical Model:
// 1. Factor Calculation:
// - CPU Factor: Sigmoid function that increases buffer size under high CPU load
// - Memory Factor: Inverse relationship that decreases buffer size under memory pressure
// - Latency Factor: Exponential decay that aggressively reduces buffers when latency exceeds targets
//
// 2. Combined Factor:
// Combined = (CPU_factor * Memory_factor * Latency_factor)
// This multiplicative approach ensures any single critical factor can override others
//
// 3. Exponential Smoothing:
// New_size = Current_size + smoothing_factor * (Target_size - Current_size)
// This prevents rapid oscillations and provides stable convergence
//
// 4. Discrete Quantization:
// Final sizes are rounded to frame boundaries and clamped to configured limits
//
// The algorithm runs periodically and only applies changes when the adaptation interval
// has elapsed, preventing excessive adjustments that could destabilize the audio pipeline.
func (abm *AdaptiveBufferManager) adaptBufferSizes() { func (abm *AdaptiveBufferManager) adaptBufferSizes() {
// Collect current system metrics // Collect current system metrics
metrics := abm.processMonitor.GetCurrentMetrics() metrics := abm.processMonitor.GetCurrentMetrics()

View File

@ -234,7 +234,31 @@ int jetkvm_audio_init() {
return 0; return 0;
} }
// Read and encode one frame with robust error handling and recovery // jetkvm_audio_read_encode reads one audio frame from ALSA, encodes it with Opus, and handles errors.
//
// This function implements a robust audio capture pipeline with the following features:
// - ALSA PCM capture with automatic device recovery
// - Opus encoding with optimized settings for real-time processing
// - Progressive error recovery with exponential backoff
// - Buffer underrun and device suspension handling
//
// Error Recovery Strategy:
// 1. EPIPE (buffer underrun): Prepare device and retry with progressive delays
// 2. ESTRPIPE (device suspended): Resume device with timeout and fallback to prepare
// 3. Other errors: Log and attempt recovery up to max_recovery_attempts
//
// Performance Optimizations:
// - Stack-allocated PCM buffer to avoid heap allocations
// - Direct memory access for Opus encoding
// - Minimal system calls in the hot path
//
// Parameters:
// opus_buf: Output buffer for encoded Opus data (must be at least max_packet_size bytes)
//
// Returns:
// >0: Number of bytes written to opus_buf
// -1: Initialization error or safety check failure
// -2: Unrecoverable ALSA or Opus error after all retry attempts
int jetkvm_audio_read_encode(void *opus_buf) { int jetkvm_audio_read_encode(void *opus_buf) {
short pcm_buffer[1920]; // max 2ch*960 short pcm_buffer[1920]; // max 2ch*960
unsigned char *out = (unsigned char*)opus_buf; unsigned char *out = (unsigned char*)opus_buf;
@ -391,7 +415,32 @@ int jetkvm_audio_playback_init() {
return 0; return 0;
} }
// Decode Opus and write PCM with robust error handling and recovery // jetkvm_audio_decode_write decodes Opus data and writes PCM to ALSA playback device.
//
// This function implements a robust audio playback pipeline with the following features:
// - Opus decoding with packet loss concealment
// - ALSA PCM playback with automatic device recovery
// - Progressive error recovery with exponential backoff
// - Buffer underrun and device suspension handling
//
// Error Recovery Strategy:
// 1. EPIPE (buffer underrun): Prepare device, optionally drop+prepare, retry with delays
// 2. ESTRPIPE (device suspended): Resume with timeout, fallback to prepare if needed
// 3. Opus decode errors: Attempt packet loss concealment before failing
//
// Performance Optimizations:
// - Stack-allocated PCM buffer to minimize heap allocations
// - Bounds checking to prevent buffer overruns
// - Direct ALSA device access for minimal latency
//
// Parameters:
// opus_buf: Input buffer containing Opus-encoded audio data
// opus_size: Size of the Opus data in bytes (must be > 0 and <= max_packet_size)
//
// Returns:
// 0: Success - audio frame decoded and written to playback device
// -1: Invalid parameters, initialization error, or bounds check failure
// -2: Unrecoverable ALSA or Opus error after all retry attempts
int jetkvm_audio_decode_write(void *opus_buf, int opus_size) { int jetkvm_audio_decode_write(void *opus_buf, int opus_size) {
short pcm_buffer[1920]; // max 2ch*960 short pcm_buffer[1920]; // max 2ch*960
unsigned char *in = (unsigned char*)opus_buf; unsigned char *in = (unsigned char*)opus_buf;

View File

@ -1,6 +1,7 @@
package audio package audio
import ( import (
"fmt"
"sync/atomic" "sync/atomic"
"time" "time"
@ -31,32 +32,30 @@ type AudioInputManager struct {
func NewAudioInputManager() *AudioInputManager { func NewAudioInputManager() *AudioInputManager {
return &AudioInputManager{ return &AudioInputManager{
ipcManager: NewAudioInputIPCManager(), ipcManager: NewAudioInputIPCManager(),
logger: logging.GetDefaultLogger().With().Str("component", "audio-input").Logger(), logger: logging.GetDefaultLogger().With().Str("component", AudioInputManagerComponent).Logger(),
} }
} }
// Start begins processing microphone input // Start begins processing microphone input
func (aim *AudioInputManager) Start() error { func (aim *AudioInputManager) Start() error {
if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) { if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) {
return nil // Already running return fmt.Errorf("audio input manager is already running")
} }
aim.logger.Info().Msg("Starting audio input manager") aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("starting component")
// Start the IPC-based audio input // Start the IPC-based audio input
err := aim.ipcManager.Start() err := aim.ipcManager.Start()
if err != nil { if err != nil {
aim.logger.Error().Err(err).Msg("Failed to start IPC audio input") aim.logger.Error().Err(err).Str("component", AudioInputManagerComponent).Msg("failed to start component")
// Ensure proper cleanup on error // Ensure proper cleanup on error
atomic.StoreInt32(&aim.running, 0) atomic.StoreInt32(&aim.running, 0)
// Reset metrics on failed start // Reset metrics on failed start
atomic.StoreInt64(&aim.metrics.FramesSent, 0) aim.resetMetrics()
atomic.StoreInt64(&aim.metrics.FramesDropped, 0)
atomic.StoreInt64(&aim.metrics.BytesProcessed, 0)
atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0)
return err return err
} }
aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("component started successfully")
return nil return nil
} }
@ -66,12 +65,20 @@ func (aim *AudioInputManager) Stop() {
return // Already stopped return // Already stopped
} }
aim.logger.Info().Msg("Stopping audio input manager") aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("stopping component")
// Stop the IPC-based audio input // Stop the IPC-based audio input
aim.ipcManager.Stop() aim.ipcManager.Stop()
aim.logger.Info().Msg("Audio input manager stopped") aim.logger.Info().Str("component", AudioInputManagerComponent).Msg("component stopped")
}
// resetMetrics resets all metrics to zero
func (aim *AudioInputManager) resetMetrics() {
atomic.StoreInt64(&aim.metrics.FramesSent, 0)
atomic.StoreInt64(&aim.metrics.FramesDropped, 0)
atomic.StoreInt64(&aim.metrics.BytesProcessed, 0)
atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0)
} }
// WriteOpusFrame writes an Opus frame to the audio input system with latency tracking // WriteOpusFrame writes an Opus frame to the audio input system with latency tracking

View File

@ -21,7 +21,7 @@ type AudioInputIPCManager struct {
func NewAudioInputIPCManager() *AudioInputIPCManager { func NewAudioInputIPCManager() *AudioInputIPCManager {
return &AudioInputIPCManager{ return &AudioInputIPCManager{
supervisor: NewAudioInputSupervisor(), supervisor: NewAudioInputSupervisor(),
logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(), logger: logging.GetDefaultLogger().With().Str("component", AudioInputIPCComponent).Logger(),
} }
} }
@ -31,18 +31,15 @@ func (aim *AudioInputIPCManager) Start() error {
return nil return nil
} }
aim.logger.Info().Msg("Starting IPC-based audio input system") aim.logger.Info().Str("component", AudioInputIPCComponent).Msg("starting component")
err := aim.supervisor.Start() err := aim.supervisor.Start()
if err != nil { if err != nil {
// Ensure proper cleanup on supervisor start failure // Ensure proper cleanup on supervisor start failure
atomic.StoreInt32(&aim.running, 0) atomic.StoreInt32(&aim.running, 0)
// Reset metrics on failed start // Reset metrics on failed start
atomic.StoreInt64(&aim.metrics.FramesSent, 0) aim.resetMetrics()
atomic.StoreInt64(&aim.metrics.FramesDropped, 0) aim.logger.Error().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to start audio input supervisor")
atomic.StoreInt64(&aim.metrics.BytesProcessed, 0)
atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0)
aim.logger.Error().Err(err).Msg("Failed to start audio input supervisor")
return err return err
} }
@ -58,10 +55,10 @@ func (aim *AudioInputIPCManager) Start() error {
err = aim.supervisor.SendConfig(config) err = aim.supervisor.SendConfig(config)
if err != nil { if err != nil {
// Config send failure is not critical, log warning and continue // Config send failure is not critical, log warning and continue
aim.logger.Warn().Err(err).Msg("Failed to send initial config, will retry later") aim.logger.Warn().Err(err).Str("component", AudioInputIPCComponent).Msg("failed to send initial config, will retry later")
} }
aim.logger.Info().Msg("IPC-based audio input system started") aim.logger.Info().Str("component", AudioInputIPCComponent).Msg("component started successfully")
return nil return nil
} }
@ -71,9 +68,17 @@ func (aim *AudioInputIPCManager) Stop() {
return return
} }
aim.logger.Info().Msg("Stopping IPC-based audio input system") aim.logger.Info().Str("component", AudioInputIPCComponent).Msg("stopping component")
aim.supervisor.Stop() aim.supervisor.Stop()
aim.logger.Info().Msg("IPC-based audio input system stopped") aim.logger.Info().Str("component", AudioInputIPCComponent).Msg("component stopped")
}
// resetMetrics resets all metrics to zero
func (aim *AudioInputIPCManager) resetMetrics() {
atomic.StoreInt64(&aim.metrics.FramesSent, 0)
atomic.StoreInt64(&aim.metrics.FramesDropped, 0)
atomic.StoreInt64(&aim.metrics.BytesProcessed, 0)
atomic.StoreInt64(&aim.metrics.ConnectionDrops, 0)
} }
// WriteOpusFrame sends an Opus frame to the audio input server via IPC // WriteOpusFrame sends an Opus frame to the audio input server via IPC

View File

@ -0,0 +1,277 @@
package audio
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestAudioInputIPCManager tests the AudioInputIPCManager component
func TestAudioInputIPCManager(t *testing.T) {
tests := []struct {
name string
testFunc func(t *testing.T)
}{
{"Start", testAudioInputIPCManagerStart},
{"Stop", testAudioInputIPCManagerStop},
{"StartStop", testAudioInputIPCManagerStartStop},
{"IsRunning", testAudioInputIPCManagerIsRunning},
{"IsReady", testAudioInputIPCManagerIsReady},
{"GetMetrics", testAudioInputIPCManagerGetMetrics},
{"ConcurrentOperations", testAudioInputIPCManagerConcurrent},
{"MultipleStarts", testAudioInputIPCManagerMultipleStarts},
{"MultipleStops", testAudioInputIPCManagerMultipleStops},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.testFunc(t)
})
}
}
func testAudioInputIPCManagerStart(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Test initial state
assert.False(t, manager.IsRunning())
assert.False(t, manager.IsReady())
// Test start
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Cleanup
manager.Stop()
}
func testAudioInputIPCManagerStop(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Start first
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
// Test stop
manager.Stop()
assert.False(t, manager.IsRunning())
assert.False(t, manager.IsReady())
}
func testAudioInputIPCManagerStartStop(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Test multiple start/stop cycles
for i := 0; i < 3; i++ {
// Start
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Stop
manager.Stop()
assert.False(t, manager.IsRunning())
}
}
func testAudioInputIPCManagerIsRunning(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Initially not running
assert.False(t, manager.IsRunning())
// Start and check
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
// Stop and check
manager.Stop()
assert.False(t, manager.IsRunning())
}
func testAudioInputIPCManagerIsReady(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Initially not ready
assert.False(t, manager.IsReady())
// Start and check ready state
err := manager.Start()
require.NoError(t, err)
// Give some time for initialization
time.Sleep(100 * time.Millisecond)
// Stop
manager.Stop()
assert.False(t, manager.IsReady())
}
func testAudioInputIPCManagerGetMetrics(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Test metrics when not running
metrics := manager.GetMetrics()
assert.NotNil(t, metrics)
// Start and test metrics
err := manager.Start()
require.NoError(t, err)
metrics = manager.GetMetrics()
assert.NotNil(t, metrics)
// Cleanup
manager.Stop()
}
func testAudioInputIPCManagerConcurrent(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
var wg sync.WaitGroup
const numGoroutines = 10
// Test concurrent starts
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
manager.Start()
}()
}
wg.Wait()
// Should be running
assert.True(t, manager.IsRunning())
// Test concurrent stops
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
manager.Stop()
}()
}
wg.Wait()
// Should be stopped
assert.False(t, manager.IsRunning())
}
func testAudioInputIPCManagerMultipleStarts(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// First start should succeed
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Subsequent starts should be no-op
err = manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
err = manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Cleanup
manager.Stop()
}
func testAudioInputIPCManagerMultipleStops(t *testing.T) {
manager := NewAudioInputIPCManager()
require.NotNil(t, manager)
// Start first
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
// First stop should work
manager.Stop()
assert.False(t, manager.IsRunning())
// Subsequent stops should be no-op
manager.Stop()
assert.False(t, manager.IsRunning())
manager.Stop()
assert.False(t, manager.IsRunning())
}
// TestAudioInputIPCMetrics tests the AudioInputMetrics functionality
func TestAudioInputIPCMetrics(t *testing.T) {
metrics := &AudioInputMetrics{}
// Test initial state
assert.Equal(t, int64(0), metrics.FramesSent)
assert.Equal(t, int64(0), metrics.FramesDropped)
assert.Equal(t, int64(0), metrics.BytesProcessed)
assert.Equal(t, int64(0), metrics.ConnectionDrops)
assert.Equal(t, time.Duration(0), metrics.AverageLatency)
assert.True(t, metrics.LastFrameTime.IsZero())
// Test field assignment
metrics.FramesSent = 50
metrics.FramesDropped = 2
metrics.BytesProcessed = 512
metrics.ConnectionDrops = 1
metrics.AverageLatency = 5 * time.Millisecond
metrics.LastFrameTime = time.Now()
// Verify assignments
assert.Equal(t, int64(50), metrics.FramesSent)
assert.Equal(t, int64(2), metrics.FramesDropped)
assert.Equal(t, int64(512), metrics.BytesProcessed)
assert.Equal(t, int64(1), metrics.ConnectionDrops)
assert.Equal(t, 5*time.Millisecond, metrics.AverageLatency)
assert.False(t, metrics.LastFrameTime.IsZero())
}
// BenchmarkAudioInputIPCManager benchmarks the AudioInputIPCManager operations
func BenchmarkAudioInputIPCManager(b *testing.B) {
b.Run("Start", func(b *testing.B) {
for i := 0; i < b.N; i++ {
manager := NewAudioInputIPCManager()
manager.Start()
manager.Stop()
}
})
b.Run("IsRunning", func(b *testing.B) {
manager := NewAudioInputIPCManager()
manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.IsRunning()
}
})
b.Run("GetMetrics", func(b *testing.B) {
manager := NewAudioInputIPCManager()
manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.GetMetrics()
}
})
}

View File

@ -168,7 +168,16 @@ func (ais *AudioInputSupervisor) GetProcessMetrics() *ProcessMetrics {
defer ais.mtx.Unlock() defer ais.mtx.Unlock()
if ais.cmd == nil || ais.cmd.Process == nil { if ais.cmd == nil || ais.cmd.Process == nil {
return nil // Return default metrics when no process is running
return &ProcessMetrics{
PID: 0,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-input-server",
}
} }
pid := ais.cmd.Process.Pid pid := ais.cmd.Process.Pid
@ -178,12 +187,21 @@ func (ais *AudioInputSupervisor) GetProcessMetrics() *ProcessMetrics {
return &metric return &metric
} }
} }
return nil // Return default metrics if process not found in monitoring
return &ProcessMetrics{
PID: pid,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-input-server",
}
} }
// monitorSubprocess monitors the subprocess and handles unexpected exits // monitorSubprocess monitors the subprocess and handles unexpected exits
func (ais *AudioInputSupervisor) monitorSubprocess() { func (ais *AudioInputSupervisor) monitorSubprocess() {
if ais.cmd == nil { if ais.cmd == nil || ais.cmd.Process == nil {
return return
} }

View File

@ -0,0 +1,241 @@
package audio
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewAudioInputManager(t *testing.T) {
manager := NewAudioInputManager()
assert.NotNil(t, manager)
assert.False(t, manager.IsRunning())
assert.False(t, manager.IsReady())
}
func TestAudioInputManagerStart(t *testing.T) {
manager := NewAudioInputManager()
require.NotNil(t, manager)
// Test successful start
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Test starting already running manager
err = manager.Start()
assert.Error(t, err)
assert.Contains(t, err.Error(), "already running")
// Cleanup
manager.Stop()
}
func TestAudioInputManagerStop(t *testing.T) {
manager := NewAudioInputManager()
require.NotNil(t, manager)
// Test stopping non-running manager
manager.Stop()
assert.False(t, manager.IsRunning())
// Start and then stop
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
manager.Stop()
assert.False(t, manager.IsRunning())
}
func TestAudioInputManagerIsRunning(t *testing.T) {
manager := NewAudioInputManager()
require.NotNil(t, manager)
// Test initial state
assert.False(t, manager.IsRunning())
// Test after start
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
// Test after stop
manager.Stop()
assert.False(t, manager.IsRunning())
}
func TestAudioInputManagerIsReady(t *testing.T) {
manager := NewAudioInputManager()
require.NotNil(t, manager)
// Test initial state
assert.False(t, manager.IsReady())
// Start manager
err := manager.Start()
require.NoError(t, err)
// Give some time for initialization
time.Sleep(100 * time.Millisecond)
// Test ready state (may vary based on implementation)
// Just ensure the method doesn't panic
_ = manager.IsReady()
// Cleanup
manager.Stop()
}
func TestAudioInputManagerGetMetrics(t *testing.T) {
manager := NewAudioInputManager()
require.NotNil(t, manager)
// Test metrics when not running
metrics := manager.GetMetrics()
assert.NotNil(t, metrics)
assert.Equal(t, int64(0), metrics.FramesSent)
assert.Equal(t, int64(0), metrics.FramesDropped)
assert.Equal(t, int64(0), metrics.BytesProcessed)
assert.Equal(t, int64(0), metrics.ConnectionDrops)
// Start and test metrics
err := manager.Start()
require.NoError(t, err)
metrics = manager.GetMetrics()
assert.NotNil(t, metrics)
assert.GreaterOrEqual(t, metrics.FramesSent, int64(0))
assert.GreaterOrEqual(t, metrics.FramesDropped, int64(0))
assert.GreaterOrEqual(t, metrics.BytesProcessed, int64(0))
assert.GreaterOrEqual(t, metrics.ConnectionDrops, int64(0))
// Cleanup
manager.Stop()
}
func TestAudioInputManagerConcurrentOperations(t *testing.T) {
manager := NewAudioInputManager()
require.NotNil(t, manager)
var wg sync.WaitGroup
// Test concurrent start/stop operations
for i := 0; i < 10; i++ {
wg.Add(2)
go func() {
defer wg.Done()
_ = manager.Start()
}()
go func() {
defer wg.Done()
manager.Stop()
}()
}
// Test concurrent metric access
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = manager.GetMetrics()
}()
}
// Test concurrent status checks
for i := 0; i < 5; i++ {
wg.Add(2)
go func() {
defer wg.Done()
_ = manager.IsRunning()
}()
go func() {
defer wg.Done()
_ = manager.IsReady()
}()
}
wg.Wait()
// Cleanup
manager.Stop()
}
func TestAudioInputManagerMultipleStartStop(t *testing.T) {
manager := NewAudioInputManager()
require.NotNil(t, manager)
// Test multiple start/stop cycles
for i := 0; i < 5; i++ {
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
manager.Stop()
assert.False(t, manager.IsRunning())
}
}
func TestAudioInputMetrics(t *testing.T) {
metrics := &AudioInputMetrics{
FramesSent: 100,
FramesDropped: 5,
BytesProcessed: 1024,
ConnectionDrops: 2,
AverageLatency: time.Millisecond * 10,
LastFrameTime: time.Now(),
}
assert.Equal(t, int64(100), metrics.FramesSent)
assert.Equal(t, int64(5), metrics.FramesDropped)
assert.Equal(t, int64(1024), metrics.BytesProcessed)
assert.Equal(t, int64(2), metrics.ConnectionDrops)
assert.Equal(t, time.Millisecond*10, metrics.AverageLatency)
assert.False(t, metrics.LastFrameTime.IsZero())
}
// Benchmark tests
func BenchmarkAudioInputManager(b *testing.B) {
manager := NewAudioInputManager()
b.Run("Start", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = manager.Start()
manager.Stop()
}
})
b.Run("GetMetrics", func(b *testing.B) {
_ = manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = manager.GetMetrics()
}
})
b.Run("IsRunning", func(b *testing.B) {
_ = manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = manager.IsRunning()
}
})
b.Run("IsReady", func(b *testing.B) {
_ = manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = manager.IsReady()
}
})
}

View File

@ -337,13 +337,16 @@ type AudioOutputClient struct {
droppedFrames int64 // Atomic counter for dropped frames droppedFrames int64 // Atomic counter for dropped frames
totalFrames int64 // Atomic counter for total frames totalFrames int64 // Atomic counter for total frames
conn net.Conn conn net.Conn
mtx sync.Mutex mtx sync.Mutex
running bool running bool
bufferPool *AudioBufferPool // Buffer pool for memory optimization
} }
func NewAudioOutputClient() *AudioOutputClient { func NewAudioOutputClient() *AudioOutputClient {
return &AudioOutputClient{} return &AudioOutputClient{
bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()),
}
} }
// Connect connects to the audio output server // Connect connects to the audio output server
@ -440,13 +443,17 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize) return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize)
} }
// Read frame data // Read frame data using buffer pool to avoid allocation
frame := make([]byte, size) frame := c.bufferPool.Get()
frame = frame[:size] // Resize to actual frame size
if size > 0 { if size > 0 {
if _, err := io.ReadFull(c.conn, frame); err != nil { if _, err := io.ReadFull(c.conn, frame); err != nil {
c.bufferPool.Put(frame) // Return buffer on error
return nil, fmt.Errorf("failed to read frame data: %w", err) return nil, fmt.Errorf("failed to read frame data: %w", err)
} }
} }
// Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer()
atomic.AddInt64(&c.totalFrames, 1) atomic.AddInt64(&c.totalFrames, 1)
return frame, nil return frame, nil

View File

@ -0,0 +1,120 @@
package audio
import "time"
// Naming Standards Documentation
// This file documents the standardized naming conventions for audio components
// to ensure consistency across the entire audio system.
/*
STANDARDIZED NAMING CONVENTIONS:
1. COMPONENT HIERARCHY:
- Manager: High-level component that orchestrates multiple subsystems
- Supervisor: Process lifecycle management (start/stop/restart processes)
- Server: IPC server that handles incoming connections
- Client: IPC client that connects to servers
- Streamer: High-performance streaming component
2. NAMING PATTERNS:
Input Components:
- AudioInputManager (replaces: AudioInputManager)
- AudioInputSupervisor (replaces: AudioInputSupervisor)
- AudioInputServer (replaces: AudioInputServer)
- AudioInputClient (replaces: AudioInputClient)
- AudioInputStreamer (new: for consistency with OutputStreamer)
Output Components:
- AudioOutputManager (new: missing high-level manager)
- AudioOutputSupervisor (replaces: AudioOutputSupervisor)
- AudioOutputServer (replaces: AudioOutputServer)
- AudioOutputClient (replaces: AudioOutputClient)
- AudioOutputStreamer (replaces: OutputStreamer)
3. IPC NAMING:
- AudioInputIPCManager (replaces: AudioInputIPCManager)
- AudioOutputIPCManager (new: for consistency)
4. CONFIGURATION NAMING:
- InputIPCConfig (replaces: InputIPCConfig)
- OutputIPCConfig (new: for consistency)
5. MESSAGE NAMING:
- InputIPCMessage (replaces: InputIPCMessage)
- OutputIPCMessage (replaces: OutputIPCMessage)
- InputMessageType (replaces: InputMessageType)
- OutputMessageType (replaces: OutputMessageType)
ISSUES IDENTIFIED:
1. Missing AudioOutputManager (high-level output management)
2. Inconsistent naming: OutputStreamer vs AudioInputSupervisor
3. Missing AudioOutputIPCManager for symmetry
4. Missing OutputIPCConfig for consistency
5. Component names in logging should be standardized
IMPLEMENTATION PLAN:
1. Create AudioOutputManager to match AudioInputManager
2. Rename OutputStreamer to AudioOutputStreamer
3. Create AudioOutputIPCManager for symmetry
4. Standardize all component logging names
5. Update all references consistently
*/
// Component name constants for consistent logging
const (
// Input component names
AudioInputManagerComponent = "audio-input-manager"
AudioInputSupervisorComponent = "audio-input-supervisor"
AudioInputServerComponent = "audio-input-server"
AudioInputClientComponent = "audio-input-client"
AudioInputIPCComponent = "audio-input-ipc"
// Output component names
AudioOutputManagerComponent = "audio-output-manager"
AudioOutputSupervisorComponent = "audio-output-supervisor"
AudioOutputServerComponent = "audio-output-server"
AudioOutputClientComponent = "audio-output-client"
AudioOutputStreamerComponent = "audio-output-streamer"
AudioOutputIPCComponent = "audio-output-ipc"
// Common component names
AudioRelayComponent = "audio-relay"
AudioEventsComponent = "audio-events"
AudioMetricsComponent = "audio-metrics"
)
// Interface definitions for consistent component behavior
type AudioManagerInterface interface {
Start() error
Stop()
IsRunning() bool
IsReady() bool
GetMetrics() interface{}
}
type AudioSupervisorInterface interface {
Start() error
Stop() error
IsRunning() bool
GetProcessPID() int
GetProcessMetrics() *ProcessMetrics
}
type AudioServerInterface interface {
Start() error
Stop()
Close() error
}
type AudioClientInterface interface {
Connect() error
Disconnect()
IsConnected() bool
Close() error
}
type AudioStreamerInterface interface {
Start() error
Stop()
GetStats() (processed, dropped int64, avgProcessingTime time.Duration)
}

View File

@ -0,0 +1,177 @@
package audio
import (
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AudioOutputManager manages audio output stream using IPC mode
type AudioOutputManager struct {
metrics AudioOutputMetrics
streamer *AudioOutputStreamer
logger zerolog.Logger
running int32
}
// AudioOutputMetrics tracks output-specific metrics
type AudioOutputMetrics struct {
FramesReceived int64
FramesDropped int64
BytesProcessed int64
ConnectionDrops int64
LastFrameTime time.Time
AverageLatency time.Duration
}
// NewAudioOutputManager creates a new audio output manager
func NewAudioOutputManager() *AudioOutputManager {
streamer, err := NewAudioOutputStreamer()
if err != nil {
// Log error but continue with nil streamer - will be handled gracefully
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger()
logger.Error().Err(err).Msg("Failed to create audio output streamer")
}
return &AudioOutputManager{
streamer: streamer,
logger: logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger(),
}
}
// Start starts the audio output manager
func (aom *AudioOutputManager) Start() error {
if !atomic.CompareAndSwapInt32(&aom.running, 0, 1) {
return nil // Already running
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("starting component")
if aom.streamer == nil {
// Try to recreate streamer if it was nil
streamer, err := NewAudioOutputStreamer()
if err != nil {
atomic.StoreInt32(&aom.running, 0)
aom.logger.Error().Err(err).Str("component", AudioOutputManagerComponent).Msg("failed to create audio output streamer")
return err
}
aom.streamer = streamer
}
err := aom.streamer.Start()
if err != nil {
atomic.StoreInt32(&aom.running, 0)
// Reset metrics on failed start
aom.resetMetrics()
aom.logger.Error().Err(err).Str("component", AudioOutputManagerComponent).Msg("failed to start component")
return err
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("component started successfully")
return nil
}
// Stop stops the audio output manager
func (aom *AudioOutputManager) Stop() {
if !atomic.CompareAndSwapInt32(&aom.running, 1, 0) {
return // Already stopped
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("stopping component")
if aom.streamer != nil {
aom.streamer.Stop()
}
aom.logger.Info().Str("component", AudioOutputManagerComponent).Msg("component stopped")
}
// resetMetrics resets all metrics to zero
func (aom *AudioOutputManager) resetMetrics() {
atomic.StoreInt64(&aom.metrics.FramesReceived, 0)
atomic.StoreInt64(&aom.metrics.FramesDropped, 0)
atomic.StoreInt64(&aom.metrics.BytesProcessed, 0)
atomic.StoreInt64(&aom.metrics.ConnectionDrops, 0)
}
// IsRunning returns whether the audio output manager is running
func (aom *AudioOutputManager) IsRunning() bool {
return atomic.LoadInt32(&aom.running) == 1
}
// IsReady returns whether the audio output manager is ready to receive frames
func (aom *AudioOutputManager) IsReady() bool {
if !aom.IsRunning() || aom.streamer == nil {
return false
}
// For output, we consider it ready if the streamer is running
// This could be enhanced with connection status checks
return true
}
// GetMetrics returns current metrics
func (aom *AudioOutputManager) GetMetrics() AudioOutputMetrics {
return AudioOutputMetrics{
FramesReceived: atomic.LoadInt64(&aom.metrics.FramesReceived),
FramesDropped: atomic.LoadInt64(&aom.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aom.metrics.BytesProcessed),
ConnectionDrops: atomic.LoadInt64(&aom.metrics.ConnectionDrops),
AverageLatency: aom.metrics.AverageLatency,
LastFrameTime: aom.metrics.LastFrameTime,
}
}
// GetComprehensiveMetrics returns detailed performance metrics
func (aom *AudioOutputManager) GetComprehensiveMetrics() map[string]interface{} {
baseMetrics := aom.GetMetrics()
comprehensiveMetrics := map[string]interface{}{
"manager": map[string]interface{}{
"frames_received": baseMetrics.FramesReceived,
"frames_dropped": baseMetrics.FramesDropped,
"bytes_processed": baseMetrics.BytesProcessed,
"connection_drops": baseMetrics.ConnectionDrops,
"average_latency_ms": float64(baseMetrics.AverageLatency.Nanoseconds()) / 1e6,
"last_frame_time": baseMetrics.LastFrameTime,
"running": aom.IsRunning(),
"ready": aom.IsReady(),
},
}
if aom.streamer != nil {
processed, dropped, avgTime := aom.streamer.GetStats()
comprehensiveMetrics["streamer"] = map[string]interface{}{
"frames_processed": processed,
"frames_dropped": dropped,
"avg_processing_time_ms": float64(avgTime.Nanoseconds()) / 1e6,
}
if detailedStats := aom.streamer.GetDetailedStats(); detailedStats != nil {
comprehensiveMetrics["detailed"] = detailedStats
}
}
return comprehensiveMetrics
}
// LogPerformanceStats logs current performance statistics
func (aom *AudioOutputManager) LogPerformanceStats() {
metrics := aom.GetMetrics()
aom.logger.Info().
Int64("frames_received", metrics.FramesReceived).
Int64("frames_dropped", metrics.FramesDropped).
Int64("bytes_processed", metrics.BytesProcessed).
Int64("connection_drops", metrics.ConnectionDrops).
Float64("average_latency_ms", float64(metrics.AverageLatency.Nanoseconds())/1e6).
Bool("running", aom.IsRunning()).
Bool("ready", aom.IsReady()).
Msg("Audio output manager performance stats")
}
// GetStreamer returns the streamer for advanced operations
func (aom *AudioOutputManager) GetStreamer() *AudioOutputStreamer {
return aom.streamer
}

View File

@ -0,0 +1,277 @@
package audio
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestAudioOutputManager tests the AudioOutputManager component
func TestAudioOutputManager(t *testing.T) {
tests := []struct {
name string
testFunc func(t *testing.T)
}{
{"Start", testAudioOutputManagerStart},
{"Stop", testAudioOutputManagerStop},
{"StartStop", testAudioOutputManagerStartStop},
{"IsRunning", testAudioOutputManagerIsRunning},
{"IsReady", testAudioOutputManagerIsReady},
{"GetMetrics", testAudioOutputManagerGetMetrics},
{"ConcurrentOperations", testAudioOutputManagerConcurrent},
{"MultipleStarts", testAudioOutputManagerMultipleStarts},
{"MultipleStops", testAudioOutputManagerMultipleStops},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.testFunc(t)
})
}
}
func testAudioOutputManagerStart(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// Test initial state
assert.False(t, manager.IsRunning())
assert.False(t, manager.IsReady())
// Test start
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Cleanup
manager.Stop()
}
func testAudioOutputManagerStop(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// Start first
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
// Test stop
manager.Stop()
assert.False(t, manager.IsRunning())
assert.False(t, manager.IsReady())
}
func testAudioOutputManagerStartStop(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// Test multiple start/stop cycles
for i := 0; i < 3; i++ {
// Start
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Stop
manager.Stop()
assert.False(t, manager.IsRunning())
}
}
func testAudioOutputManagerIsRunning(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// Initially not running
assert.False(t, manager.IsRunning())
// Start and check
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
// Stop and check
manager.Stop()
assert.False(t, manager.IsRunning())
}
func testAudioOutputManagerIsReady(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// Initially not ready
assert.False(t, manager.IsReady())
// Start and check ready state
err := manager.Start()
require.NoError(t, err)
// Give some time for initialization
time.Sleep(100 * time.Millisecond)
// Stop
manager.Stop()
assert.False(t, manager.IsReady())
}
func testAudioOutputManagerGetMetrics(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// Test metrics when not running
metrics := manager.GetMetrics()
assert.NotNil(t, metrics)
// Start and test metrics
err := manager.Start()
require.NoError(t, err)
metrics = manager.GetMetrics()
assert.NotNil(t, metrics)
// Cleanup
manager.Stop()
}
func testAudioOutputManagerConcurrent(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
var wg sync.WaitGroup
const numGoroutines = 10
// Test concurrent starts
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
manager.Start()
}()
}
wg.Wait()
// Should be running
assert.True(t, manager.IsRunning())
// Test concurrent stops
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
manager.Stop()
}()
}
wg.Wait()
// Should be stopped
assert.False(t, manager.IsRunning())
}
func testAudioOutputManagerMultipleStarts(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// First start should succeed
err := manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Subsequent starts should be no-op
err = manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
err = manager.Start()
assert.NoError(t, err)
assert.True(t, manager.IsRunning())
// Cleanup
manager.Stop()
}
func testAudioOutputManagerMultipleStops(t *testing.T) {
manager := NewAudioOutputManager()
require.NotNil(t, manager)
// Start first
err := manager.Start()
require.NoError(t, err)
assert.True(t, manager.IsRunning())
// First stop should work
manager.Stop()
assert.False(t, manager.IsRunning())
// Subsequent stops should be no-op
manager.Stop()
assert.False(t, manager.IsRunning())
manager.Stop()
assert.False(t, manager.IsRunning())
}
// TestAudioOutputMetrics tests the AudioOutputMetrics functionality
func TestAudioOutputMetrics(t *testing.T) {
metrics := &AudioOutputMetrics{}
// Test initial state
assert.Equal(t, int64(0), metrics.FramesReceived)
assert.Equal(t, int64(0), metrics.FramesDropped)
assert.Equal(t, int64(0), metrics.BytesProcessed)
assert.Equal(t, int64(0), metrics.ConnectionDrops)
assert.Equal(t, time.Duration(0), metrics.AverageLatency)
assert.True(t, metrics.LastFrameTime.IsZero())
// Test field assignment
metrics.FramesReceived = 100
metrics.FramesDropped = 5
metrics.BytesProcessed = 1024
metrics.ConnectionDrops = 2
metrics.AverageLatency = 10 * time.Millisecond
metrics.LastFrameTime = time.Now()
// Verify assignments
assert.Equal(t, int64(100), metrics.FramesReceived)
assert.Equal(t, int64(5), metrics.FramesDropped)
assert.Equal(t, int64(1024), metrics.BytesProcessed)
assert.Equal(t, int64(2), metrics.ConnectionDrops)
assert.Equal(t, 10*time.Millisecond, metrics.AverageLatency)
assert.False(t, metrics.LastFrameTime.IsZero())
}
// BenchmarkAudioOutputManager benchmarks the AudioOutputManager operations
func BenchmarkAudioOutputManager(b *testing.B) {
b.Run("Start", func(b *testing.B) {
for i := 0; i < b.N; i++ {
manager := NewAudioOutputManager()
manager.Start()
manager.Stop()
}
})
b.Run("IsRunning", func(b *testing.B) {
manager := NewAudioOutputManager()
manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.IsRunning()
}
})
b.Run("GetMetrics", func(b *testing.B) {
manager := NewAudioOutputManager()
manager.Start()
defer manager.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
manager.GetMetrics()
}
})
}

View File

@ -12,9 +12,9 @@ import (
"github.com/rs/zerolog" "github.com/rs/zerolog"
) )
// OutputStreamer manages high-performance audio output streaming // AudioOutputStreamer manages high-performance audio output streaming
type OutputStreamer struct { type AudioOutputStreamer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) // Performance metrics (atomic operations for thread safety)
processedFrames int64 // Total processed frames counter (atomic) processedFrames int64 // Total processed frames counter (atomic)
droppedFrames int64 // Dropped frames counter (atomic) droppedFrames int64 // Dropped frames counter (atomic)
processingTime int64 // Average processing time in nanoseconds (atomic) processingTime int64 // Average processing time in nanoseconds (atomic)
@ -27,8 +27,9 @@ type OutputStreamer struct {
wg sync.WaitGroup wg sync.WaitGroup
running bool running bool
mtx sync.Mutex mtx sync.Mutex
chanClosed bool // Track if processing channel is closed
// Performance optimization fields // Adaptive processing configuration
batchSize int // Adaptive batch size for frame processing batchSize int // Adaptive batch size for frame processing
processingChan chan []byte // Buffered channel for frame processing processingChan chan []byte // Buffered channel for frame processing
statsInterval time.Duration // Statistics reporting interval statsInterval time.Duration // Statistics reporting interval
@ -42,13 +43,13 @@ var (
func getOutputStreamingLogger() *zerolog.Logger { func getOutputStreamingLogger() *zerolog.Logger {
if outputStreamingLogger == nil { if outputStreamingLogger == nil {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output").Logger() logger := logging.GetDefaultLogger().With().Str("component", AudioOutputStreamerComponent).Logger()
outputStreamingLogger = &logger outputStreamingLogger = &logger
} }
return outputStreamingLogger return outputStreamingLogger
} }
func NewOutputStreamer() (*OutputStreamer, error) { func NewAudioOutputStreamer() (*AudioOutputStreamer, error) {
client := NewAudioOutputClient() client := NewAudioOutputClient()
// Get initial batch size from adaptive buffer manager // Get initial batch size from adaptive buffer manager
@ -56,7 +57,7 @@ func NewOutputStreamer() (*OutputStreamer, error) {
initialBatchSize := adaptiveManager.GetOutputBufferSize() initialBatchSize := adaptiveManager.GetOutputBufferSize()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
return &OutputStreamer{ return &AudioOutputStreamer{
client: client, client: client,
bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()), // Use existing buffer pool bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()), // Use existing buffer pool
ctx: ctx, ctx: ctx,
@ -68,7 +69,7 @@ func NewOutputStreamer() (*OutputStreamer, error) {
}, nil }, nil
} }
func (s *OutputStreamer) Start() error { func (s *AudioOutputStreamer) Start() error {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
@ -92,7 +93,7 @@ func (s *OutputStreamer) Start() error {
return nil return nil
} }
func (s *OutputStreamer) Stop() { func (s *AudioOutputStreamer) Stop() {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()
@ -103,8 +104,11 @@ func (s *OutputStreamer) Stop() {
s.running = false s.running = false
s.cancel() s.cancel()
// Close processing channel to signal goroutines // Close processing channel to signal goroutines (only if not already closed)
close(s.processingChan) if !s.chanClosed {
close(s.processingChan)
s.chanClosed = true
}
// Wait for all goroutines to finish // Wait for all goroutines to finish
s.wg.Wait() s.wg.Wait()
@ -114,7 +118,7 @@ func (s *OutputStreamer) Stop() {
} }
} }
func (s *OutputStreamer) streamLoop() { func (s *AudioOutputStreamer) streamLoop() {
defer s.wg.Done() defer s.wg.Done()
// Pin goroutine to OS thread for consistent performance // Pin goroutine to OS thread for consistent performance
@ -153,7 +157,9 @@ func (s *OutputStreamer) streamLoop() {
if n > 0 { if n > 0 {
// Send frame for processing (non-blocking) // Send frame for processing (non-blocking)
frameData := make([]byte, n) // Use buffer pool to avoid allocation
frameData := s.bufferPool.Get()
frameData = frameData[:n]
copy(frameData, frameBuf[:n]) copy(frameData, frameBuf[:n])
select { select {
@ -175,7 +181,7 @@ func (s *OutputStreamer) streamLoop() {
} }
// processingLoop handles frame processing in a separate goroutine // processingLoop handles frame processing in a separate goroutine
func (s *OutputStreamer) processingLoop() { func (s *AudioOutputStreamer) processingLoop() {
defer s.wg.Done() defer s.wg.Done()
// Pin goroutine to OS thread for consistent performance // Pin goroutine to OS thread for consistent performance
@ -192,25 +198,29 @@ func (s *OutputStreamer) processingLoop() {
} }
}() }()
for range s.processingChan { for frameData := range s.processingChan {
// Process frame (currently just receiving, but can be extended) // Process frame and return buffer to pool after processing
if _, err := s.client.ReceiveFrame(); err != nil { func() {
if s.client.IsConnected() { defer s.bufferPool.Put(frameData)
getOutputStreamingLogger().Warn().Err(err).Msg("Error reading audio frame from output server")
atomic.AddInt64(&s.droppedFrames, 1) if _, err := s.client.ReceiveFrame(); err != nil {
} if s.client.IsConnected() {
// Try to reconnect if disconnected getOutputStreamingLogger().Warn().Err(err).Msg("Error reading audio frame from output server")
if !s.client.IsConnected() { atomic.AddInt64(&s.droppedFrames, 1)
if err := s.client.Connect(); err != nil { }
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reconnect") // Try to reconnect if disconnected
if !s.client.IsConnected() {
if err := s.client.Connect(); err != nil {
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reconnect")
}
} }
} }
} }()
} }
} }
// statisticsLoop monitors and reports performance statistics // statisticsLoop monitors and reports performance statistics
func (s *OutputStreamer) statisticsLoop() { func (s *AudioOutputStreamer) statisticsLoop() {
defer s.wg.Done() defer s.wg.Done()
ticker := time.NewTicker(s.statsInterval) ticker := time.NewTicker(s.statsInterval)
@ -227,7 +237,7 @@ func (s *OutputStreamer) statisticsLoop() {
} }
// reportStatistics logs current performance statistics // reportStatistics logs current performance statistics
func (s *OutputStreamer) reportStatistics() { func (s *AudioOutputStreamer) reportStatistics() {
processed := atomic.LoadInt64(&s.processedFrames) processed := atomic.LoadInt64(&s.processedFrames)
dropped := atomic.LoadInt64(&s.droppedFrames) dropped := atomic.LoadInt64(&s.droppedFrames)
processingTime := atomic.LoadInt64(&s.processingTime) processingTime := atomic.LoadInt64(&s.processingTime)
@ -245,7 +255,7 @@ func (s *OutputStreamer) reportStatistics() {
} }
// GetStats returns streaming statistics // GetStats returns streaming statistics
func (s *OutputStreamer) GetStats() (processed, dropped int64, avgProcessingTime time.Duration) { func (s *AudioOutputStreamer) GetStats() (processed, dropped int64, avgProcessingTime time.Duration) {
processed = atomic.LoadInt64(&s.processedFrames) processed = atomic.LoadInt64(&s.processedFrames)
dropped = atomic.LoadInt64(&s.droppedFrames) dropped = atomic.LoadInt64(&s.droppedFrames)
processingTimeNs := atomic.LoadInt64(&s.processingTime) processingTimeNs := atomic.LoadInt64(&s.processingTime)
@ -254,7 +264,7 @@ func (s *OutputStreamer) GetStats() (processed, dropped int64, avgProcessingTime
} }
// GetDetailedStats returns comprehensive streaming statistics // GetDetailedStats returns comprehensive streaming statistics
func (s *OutputStreamer) GetDetailedStats() map[string]interface{} { func (s *AudioOutputStreamer) GetDetailedStats() map[string]interface{} {
processed := atomic.LoadInt64(&s.processedFrames) processed := atomic.LoadInt64(&s.processedFrames)
dropped := atomic.LoadInt64(&s.droppedFrames) dropped := atomic.LoadInt64(&s.droppedFrames)
processingTime := atomic.LoadInt64(&s.processingTime) processingTime := atomic.LoadInt64(&s.processingTime)
@ -282,7 +292,7 @@ func (s *OutputStreamer) GetDetailedStats() map[string]interface{} {
} }
// UpdateBatchSize updates the batch size from adaptive buffer manager // UpdateBatchSize updates the batch size from adaptive buffer manager
func (s *OutputStreamer) UpdateBatchSize() { func (s *AudioOutputStreamer) UpdateBatchSize() {
s.mtx.Lock() s.mtx.Lock()
adaptiveManager := GetAdaptiveBufferManager() adaptiveManager := GetAdaptiveBufferManager()
s.batchSize = adaptiveManager.GetOutputBufferSize() s.batchSize = adaptiveManager.GetOutputBufferSize()
@ -290,7 +300,7 @@ func (s *OutputStreamer) UpdateBatchSize() {
} }
// ReportLatency reports processing latency to adaptive buffer manager // ReportLatency reports processing latency to adaptive buffer manager
func (s *OutputStreamer) ReportLatency(latency time.Duration) { func (s *AudioOutputStreamer) ReportLatency(latency time.Duration) {
adaptiveManager := GetAdaptiveBufferManager() adaptiveManager := GetAdaptiveBufferManager()
adaptiveManager.UpdateLatency(latency) adaptiveManager.UpdateLatency(latency)
} }

View File

@ -0,0 +1,341 @@
package audio
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
// TestAudioOutputStreamer tests the AudioOutputStreamer component
func TestAudioOutputStreamer(t *testing.T) {
tests := []struct {
name string
testFunc func(t *testing.T)
}{
{"NewAudioOutputStreamer", testNewAudioOutputStreamer},
{"Start", testAudioOutputStreamerStart},
{"Stop", testAudioOutputStreamerStop},
{"StartStop", testAudioOutputStreamerStartStop},
{"GetStats", testAudioOutputStreamerGetStats},
{"GetDetailedStats", testAudioOutputStreamerGetDetailedStats},
{"UpdateBatchSize", testAudioOutputStreamerUpdateBatchSize},
{"ReportLatency", testAudioOutputStreamerReportLatency},
{"ConcurrentOperations", testAudioOutputStreamerConcurrent},
{"MultipleStarts", testAudioOutputStreamerMultipleStarts},
{"MultipleStops", testAudioOutputStreamerMultipleStops},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.testFunc(t)
})
}
}
func testNewAudioOutputStreamer(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
// If creation fails due to missing dependencies, skip the test
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Test initial state
processed, dropped, avgTime := streamer.GetStats()
assert.GreaterOrEqual(t, processed, int64(0))
assert.GreaterOrEqual(t, dropped, int64(0))
assert.GreaterOrEqual(t, avgTime, time.Duration(0))
// Cleanup
streamer.Stop()
}
func testAudioOutputStreamerStart(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Test start
err = streamer.Start()
assert.NoError(t, err)
// Cleanup
streamer.Stop()
}
func testAudioOutputStreamerStop(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Start first
err = streamer.Start()
require.NoError(t, err)
// Test stop
streamer.Stop()
// Multiple stops should be safe
streamer.Stop()
streamer.Stop()
}
func testAudioOutputStreamerStartStop(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Test multiple start/stop cycles
for i := 0; i < 3; i++ {
// Start
err = streamer.Start()
assert.NoError(t, err)
// Stop
streamer.Stop()
}
}
func testAudioOutputStreamerGetStats(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Test stats when not running
processed, dropped, avgTime := streamer.GetStats()
assert.Equal(t, int64(0), processed)
assert.Equal(t, int64(0), dropped)
assert.GreaterOrEqual(t, avgTime, time.Duration(0))
// Start and test stats
err = streamer.Start()
require.NoError(t, err)
processed, dropped, avgTime = streamer.GetStats()
assert.GreaterOrEqual(t, processed, int64(0))
assert.GreaterOrEqual(t, dropped, int64(0))
assert.GreaterOrEqual(t, avgTime, time.Duration(0))
// Cleanup
streamer.Stop()
}
func testAudioOutputStreamerGetDetailedStats(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Test detailed stats
stats := streamer.GetDetailedStats()
assert.NotNil(t, stats)
assert.Contains(t, stats, "processed_frames")
assert.Contains(t, stats, "dropped_frames")
assert.Contains(t, stats, "batch_size")
assert.Contains(t, stats, "connected")
assert.Equal(t, int64(0), stats["processed_frames"])
assert.Equal(t, int64(0), stats["dropped_frames"])
// Start and test detailed stats
err = streamer.Start()
require.NoError(t, err)
stats = streamer.GetDetailedStats()
assert.NotNil(t, stats)
assert.Contains(t, stats, "processed_frames")
assert.Contains(t, stats, "dropped_frames")
// Cleanup
streamer.Stop()
}
func testAudioOutputStreamerUpdateBatchSize(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Test updating batch size (no parameters, uses adaptive manager)
streamer.UpdateBatchSize()
streamer.UpdateBatchSize()
streamer.UpdateBatchSize()
// Cleanup
streamer.Stop()
}
func testAudioOutputStreamerReportLatency(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Test reporting latency
streamer.ReportLatency(10 * time.Millisecond)
streamer.ReportLatency(5 * time.Millisecond)
streamer.ReportLatency(15 * time.Millisecond)
// Cleanup
streamer.Stop()
}
func testAudioOutputStreamerConcurrent(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
var wg sync.WaitGroup
const numGoroutines = 10
// Test concurrent starts
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
streamer.Start()
}()
}
wg.Wait()
// Test concurrent operations
wg.Add(numGoroutines * 3)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
streamer.GetStats()
}()
go func() {
defer wg.Done()
streamer.UpdateBatchSize()
}()
go func() {
defer wg.Done()
streamer.ReportLatency(10 * time.Millisecond)
}()
}
wg.Wait()
// Test concurrent stops
wg.Add(numGoroutines)
for i := 0; i < numGoroutines; i++ {
go func() {
defer wg.Done()
streamer.Stop()
}()
}
wg.Wait()
}
func testAudioOutputStreamerMultipleStarts(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// First start should succeed
err = streamer.Start()
assert.NoError(t, err)
// Subsequent starts should return error
err = streamer.Start()
assert.Error(t, err)
assert.Contains(t, err.Error(), "already running")
err = streamer.Start()
assert.Error(t, err)
assert.Contains(t, err.Error(), "already running")
// Cleanup
streamer.Stop()
}
func testAudioOutputStreamerMultipleStops(t *testing.T) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
t.Skipf("Skipping test due to missing dependencies: %v", err)
return
}
require.NotNil(t, streamer)
// Start first
err = streamer.Start()
require.NoError(t, err)
// Multiple stops should be safe
streamer.Stop()
streamer.Stop()
streamer.Stop()
}
// BenchmarkAudioOutputStreamer benchmarks the AudioOutputStreamer operations
func BenchmarkAudioOutputStreamer(b *testing.B) {
b.Run("GetStats", func(b *testing.B) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
b.Skipf("Skipping benchmark due to missing dependencies: %v", err)
return
}
defer streamer.Stop()
streamer.Start()
b.ResetTimer()
for i := 0; i < b.N; i++ {
streamer.GetStats()
}
})
b.Run("UpdateBatchSize", func(b *testing.B) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
b.Skipf("Skipping benchmark due to missing dependencies: %v", err)
return
}
defer streamer.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
streamer.UpdateBatchSize()
}
})
b.Run("ReportLatency", func(b *testing.B) {
streamer, err := NewAudioOutputStreamer()
if err != nil {
b.Skipf("Skipping benchmark due to missing dependencies: %v", err)
return
}
defer streamer.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
streamer.ReportLatency(10 * time.Millisecond)
}
})
}

View File

@ -19,13 +19,14 @@ type AudioRelay struct {
framesRelayed int64 framesRelayed int64
framesDropped int64 framesDropped int64
client *AudioOutputClient client *AudioOutputClient
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
logger *zerolog.Logger logger *zerolog.Logger
running bool running bool
mutex sync.RWMutex mutex sync.RWMutex
bufferPool *AudioBufferPool // Buffer pool for memory optimization
// WebRTC integration // WebRTC integration
audioTrack AudioTrackWriter audioTrack AudioTrackWriter
@ -44,9 +45,10 @@ func NewAudioRelay() *AudioRelay {
logger := logging.GetDefaultLogger().With().Str("component", "audio-relay").Logger() logger := logging.GetDefaultLogger().With().Str("component", "audio-relay").Logger()
return &AudioRelay{ return &AudioRelay{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
logger: &logger, logger: &logger,
bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()),
} }
} }
@ -188,8 +190,14 @@ func (r *AudioRelay) forwardToWebRTC(frame []byte) error {
// Prepare sample data // Prepare sample data
var sampleData []byte var sampleData []byte
if muted { if muted {
// Send silence when muted // Send silence when muted - use buffer pool to avoid allocation
sampleData = make([]byte, len(frame)) sampleData = r.bufferPool.Get()
sampleData = sampleData[:len(frame)] // Resize to frame length
// Clear the buffer to create silence
for i := range sampleData {
sampleData[i] = 0
}
defer r.bufferPool.Put(sampleData) // Return to pool after use
} else { } else {
sampleData = frame sampleData = frame
} }

View File

@ -54,6 +54,8 @@ type AudioOutputSupervisor struct {
// Channels for coordination // Channels for coordination
processDone chan struct{} processDone chan struct{}
stopChan chan struct{} stopChan chan struct{}
stopChanClosed bool // Track if stopChan is closed
processDoneClosed bool // Track if processDone is closed
// Process monitoring // Process monitoring
processMonitor *ProcessMonitor processMonitor *ProcessMonitor
@ -67,7 +69,7 @@ type AudioOutputSupervisor struct {
// NewAudioOutputSupervisor creates a new audio output server supervisor // NewAudioOutputSupervisor creates a new audio output server supervisor
func NewAudioOutputSupervisor() *AudioOutputSupervisor { func NewAudioOutputSupervisor() *AudioOutputSupervisor {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "audio-supervisor").Logger() logger := logging.GetDefaultLogger().With().Str("component", AudioOutputSupervisorComponent).Logger()
return &AudioOutputSupervisor{ return &AudioOutputSupervisor{
ctx: ctx, ctx: ctx,
@ -96,15 +98,17 @@ func (s *AudioOutputSupervisor) SetCallbacks(
// Start begins supervising the audio output server process // Start begins supervising the audio output server process
func (s *AudioOutputSupervisor) Start() error { func (s *AudioOutputSupervisor) Start() error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) { if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return fmt.Errorf("supervisor already running") return fmt.Errorf("audio output supervisor is already running")
} }
s.logger.Info().Msg("starting audio server supervisor") s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("starting component")
// Recreate channels in case they were closed by a previous Stop() call // Recreate channels in case they were closed by a previous Stop() call
s.mutex.Lock() s.mutex.Lock()
s.processDone = make(chan struct{}) s.processDone = make(chan struct{})
s.stopChan = make(chan struct{}) s.stopChan = make(chan struct{})
s.stopChanClosed = false // Reset channel closed flag
s.processDoneClosed = false // Reset channel closed flag
// Recreate context as well since it might have been cancelled // Recreate context as well since it might have been cancelled
s.ctx, s.cancel = context.WithCancel(context.Background()) s.ctx, s.cancel = context.WithCancel(context.Background())
// Reset restart tracking on start // Reset restart tracking on start
@ -116,31 +120,37 @@ func (s *AudioOutputSupervisor) Start() error {
// Start the supervision loop // Start the supervision loop
go s.supervisionLoop() go s.supervisionLoop()
s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component started successfully")
return nil return nil
} }
// Stop gracefully stops the audio server and supervisor // Stop gracefully stops the audio server and supervisor
func (s *AudioOutputSupervisor) Stop() error { func (s *AudioOutputSupervisor) Stop() {
if !atomic.CompareAndSwapInt32(&s.running, 1, 0) { if !atomic.CompareAndSwapInt32(&s.running, 1, 0) {
return nil // Already stopped return // Already stopped
} }
s.logger.Info().Msg("stopping audio server supervisor") s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("stopping component")
// Signal stop and wait for cleanup // Signal stop and wait for cleanup
close(s.stopChan) s.mutex.Lock()
if !s.stopChanClosed {
close(s.stopChan)
s.stopChanClosed = true
}
s.mutex.Unlock()
s.cancel() s.cancel()
// Wait for process to exit // Wait for process to exit
select { select {
case <-s.processDone: case <-s.processDone:
s.logger.Info().Msg("audio server process stopped gracefully") s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped gracefully")
case <-time.After(GetConfig().SupervisorTimeout): case <-time.After(GetConfig().SupervisorTimeout):
s.logger.Warn().Msg("audio server process did not stop gracefully, forcing termination") s.logger.Warn().Str("component", AudioOutputSupervisorComponent).Msg("component did not stop gracefully, forcing termination")
s.forceKillProcess() s.forceKillProcess()
} }
return nil s.logger.Info().Str("component", AudioOutputSupervisorComponent).Msg("component stopped")
} }
// IsRunning returns true if the supervisor is running // IsRunning returns true if the supervisor is running
@ -169,7 +179,16 @@ func (s *AudioOutputSupervisor) GetProcessMetrics() *ProcessMetrics {
s.mutex.RUnlock() s.mutex.RUnlock()
if pid == 0 { if pid == 0 {
return nil // Return default metrics when no process is running
return &ProcessMetrics{
PID: 0,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-output-server",
}
} }
metrics := s.processMonitor.GetCurrentMetrics() metrics := s.processMonitor.GetCurrentMetrics()
@ -178,13 +197,28 @@ func (s *AudioOutputSupervisor) GetProcessMetrics() *ProcessMetrics {
return &metric return &metric
} }
} }
return nil
// Return default metrics if process not found in monitor
return &ProcessMetrics{
PID: pid,
CPUPercent: 0.0,
MemoryRSS: 0,
MemoryVMS: 0,
MemoryPercent: 0.0,
Timestamp: time.Now(),
ProcessName: "audio-output-server",
}
} }
// supervisionLoop is the main supervision loop // supervisionLoop is the main supervision loop
func (s *AudioOutputSupervisor) supervisionLoop() { func (s *AudioOutputSupervisor) supervisionLoop() {
defer func() { defer func() {
close(s.processDone) s.mutex.Lock()
if !s.processDoneClosed {
close(s.processDone)
s.processDoneClosed = true
}
s.mutex.Unlock()
s.logger.Info().Msg("audio server supervision ended") s.logger.Info().Msg("audio server supervision ended")
}() }()

View File

@ -0,0 +1,217 @@
package audio
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestNewAudioOutputSupervisor(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
assert.NotNil(t, supervisor)
assert.False(t, supervisor.IsRunning())
}
func TestAudioOutputSupervisorStart(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Test successful start
err := supervisor.Start()
assert.NoError(t, err)
assert.True(t, supervisor.IsRunning())
// Test starting already running supervisor
err = supervisor.Start()
assert.Error(t, err)
assert.Contains(t, err.Error(), "already running")
// Cleanup
supervisor.Stop()
}
func TestAudioOutputSupervisorStop(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Test stopping non-running supervisor
supervisor.Stop()
assert.False(t, supervisor.IsRunning())
// Start and then stop
err := supervisor.Start()
require.NoError(t, err)
assert.True(t, supervisor.IsRunning())
supervisor.Stop()
assert.False(t, supervisor.IsRunning())
}
func TestAudioOutputSupervisorIsRunning(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Test initial state
assert.False(t, supervisor.IsRunning())
// Test after start
err := supervisor.Start()
require.NoError(t, err)
assert.True(t, supervisor.IsRunning())
// Test after stop
supervisor.Stop()
assert.False(t, supervisor.IsRunning())
}
func TestAudioOutputSupervisorGetProcessMetrics(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Test metrics when not running
metrics := supervisor.GetProcessMetrics()
assert.NotNil(t, metrics)
// Start and test metrics
err := supervisor.Start()
require.NoError(t, err)
metrics = supervisor.GetProcessMetrics()
assert.NotNil(t, metrics)
// Cleanup
supervisor.Stop()
}
func TestAudioOutputSupervisorConcurrentOperations(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
var wg sync.WaitGroup
// Test concurrent start/stop operations
for i := 0; i < 10; i++ {
wg.Add(2)
go func() {
defer wg.Done()
_ = supervisor.Start()
}()
go func() {
defer wg.Done()
supervisor.Stop()
}()
}
// Test concurrent metric access
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = supervisor.GetProcessMetrics()
}()
}
// Test concurrent status checks
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = supervisor.IsRunning()
}()
}
wg.Wait()
// Cleanup
supervisor.Stop()
}
func TestAudioOutputSupervisorMultipleStartStop(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Test multiple start/stop cycles
for i := 0; i < 5; i++ {
err := supervisor.Start()
assert.NoError(t, err)
assert.True(t, supervisor.IsRunning())
supervisor.Stop()
assert.False(t, supervisor.IsRunning())
}
}
func TestAudioOutputSupervisorHealthCheck(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Start supervisor
err := supervisor.Start()
require.NoError(t, err)
// Give some time for health monitoring to initialize
time.Sleep(100 * time.Millisecond)
// Test that supervisor is still running
assert.True(t, supervisor.IsRunning())
// Cleanup
supervisor.Stop()
}
func TestAudioOutputSupervisorProcessManagement(t *testing.T) {
supervisor := NewAudioOutputSupervisor()
require.NotNil(t, supervisor)
// Start supervisor
err := supervisor.Start()
require.NoError(t, err)
// Give some time for process management to initialize
time.Sleep(200 * time.Millisecond)
// Test that supervisor is managing processes
assert.True(t, supervisor.IsRunning())
// Cleanup
supervisor.Stop()
// Ensure supervisor stopped cleanly
assert.False(t, supervisor.IsRunning())
}
// Benchmark tests
func BenchmarkAudioOutputSupervisor(b *testing.B) {
supervisor := NewAudioOutputSupervisor()
b.Run("Start", func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = supervisor.Start()
supervisor.Stop()
}
})
b.Run("GetProcessMetrics", func(b *testing.B) {
_ = supervisor.Start()
defer supervisor.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = supervisor.GetProcessMetrics()
}
})
b.Run("IsRunning", func(b *testing.B) {
_ = supervisor.Start()
defer supervisor.Stop()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = supervisor.IsRunning()
}
})
}

View File

@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"time" "time"
"unsafe" "unsafe"
"github.com/rs/zerolog"
) )
// Enhanced validation errors with more specific context // Enhanced validation errors with more specific context
@ -41,13 +43,12 @@ type ValidationConfig struct {
// GetValidationConfig returns the current validation configuration // GetValidationConfig returns the current validation configuration
func GetValidationConfig() ValidationConfig { func GetValidationConfig() ValidationConfig {
config := GetConfig()
return ValidationConfig{ return ValidationConfig{
Level: ValidationStandard, Level: ValidationStandard,
EnableRangeChecks: true, EnableRangeChecks: true,
EnableAlignmentCheck: true, EnableAlignmentCheck: true,
EnableDataIntegrity: false, // Disabled by default for performance EnableDataIntegrity: false, // Disabled by default for performance
MaxValidationTime: time.Duration(config.ValidationTimeoutMS) * time.Millisecond, MaxValidationTime: 5 * time.Second, // Default validation timeout
} }
} }
@ -87,14 +88,16 @@ func ValidateAudioFrameComprehensive(data []byte, expectedSampleRate int, expect
// Range validation // Range validation
if validationConfig.EnableRangeChecks { if validationConfig.EnableRangeChecks {
config := GetConfig() config := GetConfig()
if len(data) < config.MinAudioFrameSize { minFrameSize := 64 // Minimum reasonable frame size
return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), config.MinAudioFrameSize) if len(data) < minFrameSize {
return fmt.Errorf("%w: frame size %d below minimum %d", ErrInvalidFrameSize, len(data), minFrameSize)
} }
// Validate frame length matches expected sample format // Validate frame length matches expected sample format
expectedFrameSize := (expectedSampleRate * expectedChannels * 2) / 1000 * int(config.AudioQualityMediumFrameSize/time.Millisecond) expectedFrameSize := (expectedSampleRate * expectedChannels * 2) / 1000 * int(config.AudioQualityMediumFrameSize/time.Millisecond)
if abs(len(data)-expectedFrameSize) > config.FrameSizeTolerance { tolerance := 512 // Frame size tolerance in bytes
return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, config.FrameSizeTolerance) if abs(len(data)-expectedFrameSize) > tolerance {
return fmt.Errorf("%w: frame size %d doesn't match expected %d (±%d)", ErrInvalidFrameLength, len(data), expectedFrameSize, tolerance)
} }
} }
@ -181,8 +184,10 @@ func ValidateAudioConfiguration(config AudioConfig) error {
configConstants := GetConfig() configConstants := GetConfig()
// Validate bitrate ranges // Validate bitrate ranges
if config.Bitrate < configConstants.MinBitrate || config.Bitrate > configConstants.MaxBitrate { minBitrate := 6000 // Minimum Opus bitrate
return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, configConstants.MinBitrate, configConstants.MaxBitrate) maxBitrate := 510000 // Maximum Opus bitrate
if config.Bitrate < minBitrate || config.Bitrate > maxBitrate {
return fmt.Errorf("%w: bitrate %d outside valid range [%d, %d]", ErrInvalidConfiguration, config.Bitrate, minBitrate, maxBitrate)
} }
// Validate sample rate // Validate sample rate
@ -204,8 +209,10 @@ func ValidateAudioConfiguration(config AudioConfig) error {
} }
// Validate frame size // Validate frame size
if config.FrameSize < configConstants.MinFrameSize || config.FrameSize > configConstants.MaxFrameSize { minFrameSize := 10 * time.Millisecond // Minimum frame duration
return fmt.Errorf("%w: frame size %v outside valid range [%v, %v]", ErrInvalidConfiguration, config.FrameSize, configConstants.MinFrameSize, configConstants.MaxFrameSize) maxFrameSize := 100 * time.Millisecond // Maximum frame duration
if config.FrameSize < minFrameSize || config.FrameSize > maxFrameSize {
return fmt.Errorf("%w: frame size %v outside valid range [%v, %v]", ErrInvalidConfiguration, config.FrameSize, minFrameSize, maxFrameSize)
} }
return nil return nil
@ -276,6 +283,7 @@ func abs(x int) int {
// getValidationLogger returns a logger for validation operations // getValidationLogger returns a logger for validation operations
func getValidationLogger() *zerolog.Logger { func getValidationLogger() *zerolog.Logger {
logger := logging.GetDefaultLogger().With().Str("component", "audio-validation").Logger() // Return a basic logger for validation
logger := zerolog.New(nil).With().Timestamp().Logger()
return &logger return &logger
} }

View File

@ -7,8 +7,36 @@ import (
"unsafe" "unsafe"
) )
// ZeroCopyAudioFrame represents an audio frame that can be passed between // ZeroCopyAudioFrame represents a reference-counted audio frame for zero-copy operations.
// components without copying the underlying data //
// This structure implements a sophisticated memory management system designed to minimize
// allocations and memory copying in the audio pipeline:
//
// Key Features:
// 1. Reference Counting: Multiple components can safely share the same frame data
// without copying. The frame is automatically returned to the pool when the last
// reference is released.
//
// 2. Thread Safety: All operations are protected by RWMutex, allowing concurrent
// reads while ensuring exclusive access for modifications.
//
// 3. Pool Integration: Frames are automatically managed by ZeroCopyFramePool,
// enabling efficient reuse and preventing memory fragmentation.
//
// 4. Unsafe Pointer Access: For performance-critical CGO operations, direct
// memory access is provided while maintaining safety through reference counting.
//
// Usage Pattern:
// frame := pool.Get() // Acquire frame (refCount = 1)
// frame.AddRef() // Share with another component (refCount = 2)
// data := frame.Data() // Access data safely
// frame.Release() // Release reference (refCount = 1)
// frame.Release() // Final release, returns to pool (refCount = 0)
//
// Memory Safety:
// - Frames cannot be modified while shared (refCount > 1)
// - Data access is bounds-checked to prevent buffer overruns
// - Pool management prevents use-after-free scenarios
type ZeroCopyAudioFrame struct { type ZeroCopyAudioFrame struct {
data []byte data []byte
length int length int
@ -18,7 +46,34 @@ type ZeroCopyAudioFrame struct {
pooled bool pooled bool
} }
// ZeroCopyFramePool manages reusable zero-copy audio frames // ZeroCopyFramePool manages a pool of reusable zero-copy audio frames.
//
// This pool implements a three-tier memory management strategy optimized for
// real-time audio processing with minimal allocation overhead:
//
// Tier 1 - Pre-allocated Frames:
// A small number of frames are pre-allocated at startup and kept ready
// for immediate use. This provides the fastest possible allocation for
// the most common case and eliminates allocation latency spikes.
//
// Tier 2 - sync.Pool Cache:
// The standard Go sync.Pool provides efficient reuse of frames with
// automatic garbage collection integration. Frames are automatically
// returned here when memory pressure is low.
//
// Tier 3 - Memory Guard:
// A configurable limit prevents excessive memory usage by limiting
// the total number of allocated frames. When the limit is reached,
// allocation requests are denied to prevent OOM conditions.
//
// Performance Characteristics:
// - Pre-allocated tier: ~10ns allocation time
// - sync.Pool tier: ~50ns allocation time
// - Memory guard: Prevents unbounded growth
// - Metrics tracking: Hit/miss rates for optimization
//
// The pool is designed for embedded systems with limited memory (256MB)
// where predictable memory usage is more important than absolute performance.
type ZeroCopyFramePool struct { type ZeroCopyFramePool struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment) // Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
counter int64 // Frame counter (atomic) counter int64 // Frame counter (atomic)

View File

@ -967,9 +967,7 @@ func rpcSetUsbDevices(usbDevices usbgadget.Devices) error {
// Stop audio output supervisor // Stop audio output supervisor
if audioSupervisor != nil && audioSupervisor.IsRunning() { if audioSupervisor != nil && audioSupervisor.IsRunning() {
logger.Info().Msg("stopping audio output supervisor") logger.Info().Msg("stopping audio output supervisor")
if err := audioSupervisor.Stop(); err != nil { audioSupervisor.Stop()
logger.Error().Err(err).Msg("failed to stop audio supervisor")
}
// Wait for audio processes to fully stop before proceeding // Wait for audio processes to fully stop before proceeding
for i := 0; i < 50; i++ { // Wait up to 5 seconds for i := 0; i < 50; i++ { // Wait up to 5 seconds
if !audioSupervisor.IsRunning() { if !audioSupervisor.IsRunning() {
@ -1063,9 +1061,7 @@ func rpcSetUsbDeviceState(device string, enabled bool) error {
// Stop audio output supervisor // Stop audio output supervisor
if audioSupervisor != nil && audioSupervisor.IsRunning() { if audioSupervisor != nil && audioSupervisor.IsRunning() {
logger.Info().Msg("stopping audio output supervisor") logger.Info().Msg("stopping audio output supervisor")
if err := audioSupervisor.Stop(); err != nil { audioSupervisor.Stop()
logger.Error().Err(err).Msg("failed to stop audio supervisor")
}
// Wait for audio processes to fully stop // Wait for audio processes to fully stop
for i := 0; i < 50; i++ { // Wait up to 5 seconds for i := 0; i < 50; i++ { // Wait up to 5 seconds
if !audioSupervisor.IsRunning() { if !audioSupervisor.IsRunning() {

View File

@ -251,9 +251,7 @@ func Main(audioServer bool, audioInputServer bool) {
if !isAudioServer { if !isAudioServer {
if audioSupervisor != nil { if audioSupervisor != nil {
logger.Info().Msg("stopping audio supervisor") logger.Info().Msg("stopping audio supervisor")
if err := audioSupervisor.Stop(); err != nil { audioSupervisor.Stop()
logger.Error().Err(err).Msg("failed to stop audio supervisor")
}
} }
<-audioProcessDone <-audioProcessDone
} else { } else {