Compare commits

..

22 Commits

Author SHA1 Message Date
Alex cbf0b1aaa4
Merge 140a803ccf into cf679978be 2025-09-16 18:18:34 +00:00
Alex P 140a803ccf perf(audio): add ARM NEON SIMD optimizations for audio processing
Implement SIMD-optimized audio operations using ARM NEON for Cortex-A7 targets
Update Makefile and CI configuration to support NEON compilation flags
Add SIMD implementations for common audio operations including:
- Sample clearing and interleaving
- Volume scaling and format conversion
- Channel manipulation and balance adjustment
- Endianness swapping and prefetching
2025-09-16 18:18:19 +00:00
Marc Brooks cf679978be
fix(timesync): ensure that auto-update waits for time sync (#609)
- Added check to not attempt auto update if time sync is needed and not yet successful (delays 30 second to recheck).
- Added resync of time when DHCP or link state changes if online
- Added conditional* fallback from configured* NTP servers to the IP-named NTP servers, and then to the DNS named ones if that fails
- Added conditional* fallback from the configured* HTTP servers to the default DNS named ones.
- Uses the configuration* option for how many queries to run in parallel
- Added known static IPs for time servers (in case DNS resolution isn't up yet)
- Added time.cloudflare.com to fall-back NTP servers
- Added fallback to NTP via hostnames
- Logs the resultant time (and mode)
2025-09-16 15:37:02 +02:00
Alex P eca3c52513 PR Review Optimization: As recommended, use ternary operators instead of if/else for better readability 2025-09-16 16:17:56 +03:00
Alex P 55bcfb5a22 Consistency: keep if block multi-line 2025-09-16 16:08:16 +03:00
Alex P 0027001390 Cleanup: removed redundant code 2025-09-16 16:03:20 +03:00
Alex P caa0a60ebb Cleanup: removed redundant code 2025-09-16 16:00:55 +03:00
Alex P a5fb3bf30c Fix: remove misplaced const 2025-09-16 15:52:53 +03:00
Alex P 26e71806cb Cleanup, Optimizations: Small aaudio optimizations 2025-09-16 15:46:55 +03:00
Alex P 2f7bf55f22 Cleanup, Optimizations: Small aaudio optimizations 2025-09-16 15:45:03 +03:00
Alex P 8a3f1b6c32 Cleanup, Optimizations: Small aaudio optimizations 2025-09-16 15:37:23 +03:00
Alex P 7ffb9e1d59 Cleanup: removed redundant code, comments, etc. 2025-09-16 15:31:10 +03:00
Alex P 647eca4292 Cleanup: removed redundant code, comments, etc. 2025-09-16 15:23:16 +03:00
Alex P a8b58b5d34 [WIP] Cleanup: removed redundant code 2025-09-16 15:17:49 +03:00
Alex P b23cc50d6c [WIP] Cleanup: removed redundant code 2025-09-16 15:14:00 +03:00
Alex P 1f88dab95f [WIP] Maintainability Improvement: Add debug logging throughout the audio system for easy debugging and troubleshooting 2025-09-16 15:05:08 +03:00
Alex P 0944c886e5 [WIP] Maintainability Improvement: Add debug logging throughout the audio system for easy debugging and troubleshooting 2025-09-16 11:27:18 +03:00
Alex P 5e257b3144 [WIP] Add debug logging throughout the audio system 2025-09-16 11:26:48 +03:00
Alex P fb98c4edcb [WIP] Maintainability: Add debug / trace logs to make it easy to debug audio input issues 2025-09-16 11:11:18 +03:00
Alex P e894470ca8 [WIP] Cleanup: function naming 2025-09-16 07:33:34 +00:00
Alex P 996016b0da [WIP] Cleanup: remove unnecessary complexity 2025-09-15 23:00:03 +00:00
Alex P 7ab4a0e41d [WIP] Simplification: PR Simplification 2025-09-16 00:44:26 +03:00
35 changed files with 1228 additions and 3291 deletions

View File

@ -84,7 +84,10 @@ jobs:
version: v2.0.2
env:
CGO_ENABLED: 1
ALSA_VERSION: ${{ env.ALSA_VERSION }}
OPUS_VERSION: ${{ env.OPUS_VERSION }}
CGO_CFLAGS: "-I${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/celt"
GOOS: linux
GOARCH: arm
GOARM: 7
CC: ${{ steps.build-env.outputs.cache_path }}/../rv1106-system/tools/linux/toolchain/arm-rockchip830-linux-uclibcgnueabihf/bin/arm-rockchip830-linux-uclibcgnueabihf-gcc
PKG_CONFIG_PATH: ${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/utils:${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}
CGO_CFLAGS: "-O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops -mvectorize-with-neon-quad -marm -D__ARM_NEON -I${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/include -I${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/celt"
CGO_LDFLAGS: "-L${{ steps.build-env.outputs.cache_path }}/alsa-lib-${{ steps.build-env.outputs.alsa_version }}/src/.libs -lasound -L${{ steps.build-env.outputs.cache_path }}/opus-${{ steps.build-env.outputs.opus_version }}/.libs -lopus -lm -ldl -static"

1
.gitignore vendored
View File

@ -11,3 +11,4 @@ tmp/
device-tests.tar.gz
CLAUDE.md
.claude/

View File

@ -409,7 +409,7 @@ npm install
```bash
# Enable debug logging
export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc"
export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc,audio"
# Frontend development
export JETKVM_PROXY_URL="ws://<IP>"
@ -461,7 +461,7 @@ curl http://api:$JETKVM_PASSWORD@YOUR_DEVICE_IP/developer/pprof/
```bash
# Enable trace logging (useful for debugging)
export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc"
export LOG_TRACE_SCOPES="jetkvm,cloud,websocket,native,jsonrpc,audio"
# For frontend development
export JETKVM_PROXY_URL="ws://<JETKVM_IP>"

View File

@ -36,8 +36,8 @@ export PKG_CONFIG_PATH := $(AUDIO_LIBS_DIR)/alsa-lib-$(ALSA_VERSION)/utils:$(AUD
# Common command to clean Go cache with verbose output for all Go builds
CLEAN_GO_CACHE := @echo "Cleaning Go cache..."; go clean -cache -v
# Optimization flags for ARM Cortex-A7 with NEON
OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops
# Optimization flags for ARM Cortex-A7 with NEON SIMD
OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops -mvectorize-with-neon-quad -marm -D__ARM_NEON
# Cross-compilation environment for ARM - exported globally
export GOOS := linux

View File

@ -41,7 +41,7 @@ REMOTE_USER="root"
REMOTE_PATH="/userdata/jetkvm/bin"
SKIP_UI_BUILD=false
RESET_USB_HID_DEVICE=false
LOG_TRACE_SCOPES="${LOG_TRACE_SCOPES:-jetkvm,cloud,websocket,native,jsonrpc}"
LOG_TRACE_SCOPES="${LOG_TRACE_SCOPES:-jetkvm,cloud,websocket,native,jsonrpc,audio}"
RUN_GO_TESTS=false
RUN_GO_TESTS_ONLY=false
INSTALL_APP=false

View File

@ -1,447 +0,0 @@
package audio
import (
"context"
"math"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AdaptiveBufferConfig holds configuration for the adaptive buffer sizing algorithm.
//
// The adaptive buffer system dynamically adjusts audio buffer sizes based on real-time
// system conditions to optimize the trade-off between latency and stability. The algorithm
// uses multiple factors to make decisions:
//
// 1. System Load Monitoring:
// - CPU usage: High CPU load increases buffer sizes to prevent underruns
// - Memory usage: High memory pressure reduces buffer sizes to conserve RAM
//
// 2. Latency Tracking:
// - Target latency: Optimal latency for the current quality setting
// - Max latency: Hard limit beyond which buffers are aggressively reduced
//
// 3. Adaptation Strategy:
// - Exponential smoothing: Prevents oscillation and provides stable adjustments
// - Discrete steps: Buffer sizes change in fixed increments to avoid instability
// - Hysteresis: Different thresholds for increasing vs decreasing buffer sizes
//
// The algorithm is specifically tuned for embedded ARM systems with limited resources,
// prioritizing stability over absolute minimum latency.
type AdaptiveBufferConfig struct {
// Buffer size limits (in frames)
MinBufferSize int
MaxBufferSize int
DefaultBufferSize int
// System load thresholds
LowCPUThreshold float64 // Below this, increase buffer size
HighCPUThreshold float64 // Above this, decrease buffer size
LowMemoryThreshold float64 // Below this, increase buffer size
HighMemoryThreshold float64 // Above this, decrease buffer size
// Latency thresholds (in milliseconds)
TargetLatency time.Duration
MaxLatency time.Duration
// Adaptation parameters
AdaptationInterval time.Duration
SmoothingFactor float64 // 0.0-1.0, higher = more responsive
}
// DefaultAdaptiveBufferConfig returns optimized config for JetKVM hardware
func DefaultAdaptiveBufferConfig() AdaptiveBufferConfig {
return AdaptiveBufferConfig{
// Conservative buffer sizes for 256MB RAM constraint
MinBufferSize: Config.AdaptiveMinBufferSize,
MaxBufferSize: Config.AdaptiveMaxBufferSize,
DefaultBufferSize: Config.AdaptiveDefaultBufferSize,
// CPU thresholds optimized for single-core ARM Cortex A7 under load
LowCPUThreshold: Config.LowCPUThreshold * 100, // Below 20% CPU
HighCPUThreshold: Config.HighCPUThreshold * 100, // Above 60% CPU (lowered to be more responsive)
// Memory thresholds for 256MB total RAM
LowMemoryThreshold: Config.LowMemoryThreshold * 100, // Below 35% memory usage
HighMemoryThreshold: Config.HighMemoryThreshold * 100, // Above 75% memory usage (lowered for earlier response)
// Latency targets
TargetLatency: Config.AdaptiveBufferTargetLatency, // Target 20ms latency
MaxLatency: Config.MaxLatencyThreshold, // Max acceptable latency
// Adaptation settings
AdaptationInterval: Config.BufferUpdateInterval, // Check every 500ms
SmoothingFactor: Config.SmoothingFactor, // Moderate responsiveness
}
}
// AdaptiveBufferManager manages dynamic buffer sizing based on system conditions
type AdaptiveBufferManager struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
currentInputBufferSize int64 // Current input buffer size (atomic)
currentOutputBufferSize int64 // Current output buffer size (atomic)
averageLatency int64 // Average latency in nanoseconds (atomic)
systemCPUPercent int64 // System CPU percentage * 100 (atomic)
systemMemoryPercent int64 // System memory percentage * 100 (atomic)
adaptationCount int64 // Metrics tracking (atomic)
config AdaptiveBufferConfig
logger zerolog.Logger
// Control channels
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Metrics tracking
lastAdaptation time.Time
mutex sync.RWMutex
}
// NewAdaptiveBufferManager creates a new adaptive buffer manager
func NewAdaptiveBufferManager(config AdaptiveBufferConfig) *AdaptiveBufferManager {
logger := logging.GetDefaultLogger().With().Str("component", "adaptive-buffer").Logger()
if err := ValidateAdaptiveBufferConfig(config.MinBufferSize, config.MaxBufferSize, config.DefaultBufferSize); err != nil {
logger.Warn().Err(err).Msg("invalid adaptive buffer config, using defaults")
config = DefaultAdaptiveBufferConfig()
}
ctx, cancel := context.WithCancel(context.Background())
return &AdaptiveBufferManager{
currentInputBufferSize: int64(config.DefaultBufferSize),
currentOutputBufferSize: int64(config.DefaultBufferSize),
config: config,
logger: logger,
ctx: ctx,
cancel: cancel,
lastAdaptation: time.Now(),
}
}
// Start begins the adaptive buffer management
func (abm *AdaptiveBufferManager) Start() {
abm.wg.Add(1)
go abm.adaptationLoop()
abm.logger.Info().Msg("adaptive buffer manager started")
}
// Stop stops the adaptive buffer management
func (abm *AdaptiveBufferManager) Stop() {
abm.cancel()
abm.wg.Wait()
abm.logger.Info().Msg("adaptive buffer manager stopped")
}
// GetInputBufferSize returns the current recommended input buffer size
func (abm *AdaptiveBufferManager) GetInputBufferSize() int {
return int(atomic.LoadInt64(&abm.currentInputBufferSize))
}
// GetOutputBufferSize returns the current recommended output buffer size
func (abm *AdaptiveBufferManager) GetOutputBufferSize() int {
return int(atomic.LoadInt64(&abm.currentOutputBufferSize))
}
// UpdateLatency updates the current latency measurement
func (abm *AdaptiveBufferManager) UpdateLatency(latency time.Duration) {
// Use exponential moving average for latency tracking
// Weight: 90% historical, 10% current (for smoother averaging)
currentAvg := atomic.LoadInt64(&abm.averageLatency)
newLatencyNs := latency.Nanoseconds()
if currentAvg == 0 {
// First measurement
atomic.StoreInt64(&abm.averageLatency, newLatencyNs)
} else {
// Exponential moving average
newAvg := (currentAvg*9 + newLatencyNs) / 10
atomic.StoreInt64(&abm.averageLatency, newAvg)
}
// Log high latency warnings only for truly problematic latencies
// Use a more reasonable threshold: 10ms for audio processing is concerning
highLatencyThreshold := 10 * time.Millisecond
if latency > highLatencyThreshold {
abm.logger.Debug().
Dur("latency_ms", latency/time.Millisecond).
Dur("threshold_ms", highLatencyThreshold/time.Millisecond).
Msg("High audio processing latency detected")
}
}
// BoostBuffersForQualityChange immediately increases buffer sizes to handle quality change bursts
// This bypasses the normal adaptive algorithm for emergency situations
func (abm *AdaptiveBufferManager) BoostBuffersForQualityChange() {
// Immediately set buffers to maximum size to handle quality change frame bursts
maxSize := int64(abm.config.MaxBufferSize)
atomic.StoreInt64(&abm.currentInputBufferSize, maxSize)
atomic.StoreInt64(&abm.currentOutputBufferSize, maxSize)
abm.logger.Info().
Int("buffer_size", int(maxSize)).
Msg("Boosted buffers to maximum size for quality change")
}
// adaptationLoop is the main loop that adjusts buffer sizes
func (abm *AdaptiveBufferManager) adaptationLoop() {
defer abm.wg.Done()
ticker := time.NewTicker(abm.config.AdaptationInterval)
defer ticker.Stop()
for {
select {
case <-abm.ctx.Done():
return
case <-ticker.C:
abm.adaptBufferSizes()
}
}
}
// adaptBufferSizes analyzes system conditions and adjusts buffer sizes
// adaptBufferSizes implements the core adaptive buffer sizing algorithm.
//
// This function uses a multi-factor approach to determine optimal buffer sizes:
//
// Mathematical Model:
// 1. Factor Calculation:
//
// - CPU Factor: Sigmoid function that increases buffer size under high CPU load
//
// - Memory Factor: Inverse relationship that decreases buffer size under memory pressure
//
// - Latency Factor: Exponential decay that aggressively reduces buffers when latency exceeds targets
//
// 2. Combined Factor:
// Combined = (CPU_factor * Memory_factor * Latency_factor)
// This multiplicative approach ensures any single critical factor can override others
//
// 3. Exponential Smoothing:
// New_size = Current_size + smoothing_factor * (Target_size - Current_size)
// This prevents rapid oscillations and provides stable convergence
//
// 4. Discrete Quantization:
// Final sizes are rounded to frame boundaries and clamped to configured limits
//
// The algorithm runs periodically and only applies changes when the adaptation interval
// has elapsed, preventing excessive adjustments that could destabilize the audio pipeline.
func (abm *AdaptiveBufferManager) adaptBufferSizes() {
// Use fixed system metrics for stability
systemCPU := 50.0 // Assume moderate CPU usage
systemMemory := 60.0 // Assume moderate memory usage
atomic.StoreInt64(&abm.systemCPUPercent, int64(systemCPU*100))
atomic.StoreInt64(&abm.systemMemoryPercent, int64(systemMemory*100))
// Get current latency
currentLatencyNs := atomic.LoadInt64(&abm.averageLatency)
currentLatency := time.Duration(currentLatencyNs)
// Calculate adaptation factors
cpuFactor := abm.calculateCPUFactor(systemCPU)
memoryFactor := abm.calculateMemoryFactor(systemMemory)
latencyFactor := abm.calculateLatencyFactor(currentLatency)
// Combine factors with weights (CPU has highest priority for KVM coexistence)
combinedFactor := Config.CPUMemoryWeight*cpuFactor + Config.MemoryWeight*memoryFactor + Config.LatencyWeight*latencyFactor
// Apply adaptation with smoothing
currentInput := float64(atomic.LoadInt64(&abm.currentInputBufferSize))
currentOutput := float64(atomic.LoadInt64(&abm.currentOutputBufferSize))
// Calculate new buffer sizes
newInputSize := abm.applyAdaptation(currentInput, combinedFactor)
newOutputSize := abm.applyAdaptation(currentOutput, combinedFactor)
// Update buffer sizes if they changed significantly
adjustmentMade := false
if math.Abs(newInputSize-currentInput) >= 0.5 || math.Abs(newOutputSize-currentOutput) >= 0.5 {
atomic.StoreInt64(&abm.currentInputBufferSize, int64(math.Round(newInputSize)))
atomic.StoreInt64(&abm.currentOutputBufferSize, int64(math.Round(newOutputSize)))
atomic.AddInt64(&abm.adaptationCount, 1)
abm.mutex.Lock()
abm.lastAdaptation = time.Now()
abm.mutex.Unlock()
adjustmentMade = true
abm.logger.Debug().
Float64("cpu_percent", systemCPU).
Float64("memory_percent", systemMemory).
Dur("latency", currentLatency).
Float64("combined_factor", combinedFactor).
Int("new_input_size", int(newInputSize)).
Int("new_output_size", int(newOutputSize)).
Msg("Adapted buffer sizes")
}
// Update metrics with current state
currentInputSize := int(atomic.LoadInt64(&abm.currentInputBufferSize))
currentOutputSize := int(atomic.LoadInt64(&abm.currentOutputBufferSize))
UpdateAdaptiveBufferMetrics(currentInputSize, currentOutputSize, systemCPU, systemMemory, adjustmentMade)
}
// calculateCPUFactor returns adaptation factor based on CPU usage with threshold validation.
//
// Validation Rules:
// - CPU percentage must be within valid range [0.0, 100.0]
// - Uses LowCPUThreshold and HighCPUThreshold from config for decision boundaries
// - Default thresholds: Low=20.0%, High=80.0%
//
// Adaptation Logic:
// - CPU > HighCPUThreshold: Return -1.0 (decrease buffers to reduce CPU load)
// - CPU < LowCPUThreshold: Return +1.0 (increase buffers for better quality)
// - Between thresholds: Linear interpolation based on distance from midpoint
//
// Returns: Adaptation factor in range [-1.0, +1.0]
// - Negative values: Decrease buffer sizes to reduce CPU usage
// - Positive values: Increase buffer sizes for better audio quality
// - Zero: No adaptation needed
//
// The function ensures CPU-aware buffer management to balance audio quality
// with system performance, preventing CPU starvation of the KVM process.
func (abm *AdaptiveBufferManager) calculateCPUFactor(cpuPercent float64) float64 {
if cpuPercent > abm.config.HighCPUThreshold {
// High CPU: decrease buffers to reduce latency and give CPU to KVM
return -1.0
} else if cpuPercent < abm.config.LowCPUThreshold {
// Low CPU: increase buffers for better quality
return 1.0
}
// Medium CPU: linear interpolation
midpoint := (abm.config.HighCPUThreshold + abm.config.LowCPUThreshold) / 2
return (midpoint - cpuPercent) / (midpoint - abm.config.LowCPUThreshold)
}
// calculateMemoryFactor returns adaptation factor based on memory usage with threshold validation.
//
// Validation Rules:
// - Memory percentage must be within valid range [0.0, 100.0]
// - Uses LowMemoryThreshold and HighMemoryThreshold from config for decision boundaries
// - Default thresholds: Low=30.0%, High=85.0%
//
// Adaptation Logic:
// - Memory > HighMemoryThreshold: Return -1.0 (decrease buffers to free memory)
// - Memory < LowMemoryThreshold: Return +1.0 (increase buffers for performance)
// - Between thresholds: Linear interpolation based on distance from midpoint
//
// Returns: Adaptation factor in range [-1.0, +1.0]
// - Negative values: Decrease buffer sizes to reduce memory usage
// - Positive values: Increase buffer sizes for better performance
// - Zero: No adaptation needed
//
// The function prevents memory exhaustion while optimizing buffer sizes
// for audio processing performance and system stability.
func (abm *AdaptiveBufferManager) calculateMemoryFactor(memoryPercent float64) float64 {
if memoryPercent > abm.config.HighMemoryThreshold {
// High memory: decrease buffers to free memory
return -1.0
} else if memoryPercent < abm.config.LowMemoryThreshold {
// Low memory: increase buffers for better performance
return 1.0
}
// Medium memory: linear interpolation
midpoint := (abm.config.HighMemoryThreshold + abm.config.LowMemoryThreshold) / 2
return (midpoint - memoryPercent) / (midpoint - abm.config.LowMemoryThreshold)
}
// calculateLatencyFactor returns adaptation factor based on latency with threshold validation.
//
// Validation Rules:
// - Latency must be non-negative duration
// - Uses TargetLatency and MaxLatency from config for decision boundaries
// - Default thresholds: Target=50ms, Max=200ms
//
// Adaptation Logic:
// - Latency > MaxLatency: Return -1.0 (decrease buffers to reduce latency)
// - Latency < TargetLatency: Return +1.0 (increase buffers for quality)
// - Between thresholds: Linear interpolation based on distance from midpoint
//
// Returns: Adaptation factor in range [-1.0, +1.0]
// - Negative values: Decrease buffer sizes to reduce audio latency
// - Positive values: Increase buffer sizes for better audio quality
// - Zero: Latency is at optimal level
//
// The function balances audio latency with quality, ensuring real-time
// performance while maintaining acceptable audio processing quality.
func (abm *AdaptiveBufferManager) calculateLatencyFactor(latency time.Duration) float64 {
if latency > abm.config.MaxLatency {
// High latency: decrease buffers
return -1.0
} else if latency < abm.config.TargetLatency {
// Low latency: can increase buffers
return 1.0
}
// Medium latency: linear interpolation
midLatency := (abm.config.MaxLatency + abm.config.TargetLatency) / 2
return float64(midLatency-latency) / float64(midLatency-abm.config.TargetLatency)
}
// applyAdaptation applies the adaptation factor to current buffer size
func (abm *AdaptiveBufferManager) applyAdaptation(currentSize, factor float64) float64 {
// Calculate target size based on factor
var targetSize float64
if factor > 0 {
// Increase towards max
targetSize = currentSize + factor*(float64(abm.config.MaxBufferSize)-currentSize)
} else {
// Decrease towards min
targetSize = currentSize + factor*(currentSize-float64(abm.config.MinBufferSize))
}
// Apply smoothing
newSize := currentSize + abm.config.SmoothingFactor*(targetSize-currentSize)
// Clamp to valid range
return math.Max(float64(abm.config.MinBufferSize),
math.Min(float64(abm.config.MaxBufferSize), newSize))
}
// GetStats returns current adaptation statistics
func (abm *AdaptiveBufferManager) GetStats() map[string]interface{} {
abm.mutex.RLock()
lastAdaptation := abm.lastAdaptation
abm.mutex.RUnlock()
return map[string]interface{}{
"input_buffer_size": abm.GetInputBufferSize(),
"output_buffer_size": abm.GetOutputBufferSize(),
"average_latency_ms": float64(atomic.LoadInt64(&abm.averageLatency)) / 1e6,
"system_cpu_percent": float64(atomic.LoadInt64(&abm.systemCPUPercent)) / Config.PercentageMultiplier,
"system_memory_percent": float64(atomic.LoadInt64(&abm.systemMemoryPercent)) / Config.PercentageMultiplier,
"adaptation_count": atomic.LoadInt64(&abm.adaptationCount),
"last_adaptation": lastAdaptation,
}
}
// Global adaptive buffer manager instance
var globalAdaptiveBufferManager *AdaptiveBufferManager
var adaptiveBufferOnce sync.Once
// GetAdaptiveBufferManager returns the global adaptive buffer manager instance
func GetAdaptiveBufferManager() *AdaptiveBufferManager {
adaptiveBufferOnce.Do(func() {
globalAdaptiveBufferManager = NewAdaptiveBufferManager(DefaultAdaptiveBufferConfig())
})
return globalAdaptiveBufferManager
}
// StartAdaptiveBuffering starts the global adaptive buffer manager
func StartAdaptiveBuffering() {
GetAdaptiveBufferManager().Start()
}
// StopAdaptiveBuffering stops the global adaptive buffer manager
func StopAdaptiveBuffering() {
if globalAdaptiveBufferManager != nil {
globalAdaptiveBufferManager.Stop()
}
}

View File

@ -1,626 +0,0 @@
//go:build cgo
package audio
import (
"context"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// BatchAudioProcessor manages batched CGO operations to reduce syscall overhead
type BatchAudioProcessor struct {
// Statistics - MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
stats BatchAudioStats
// Control
ctx context.Context
cancel context.CancelFunc
logger *zerolog.Logger
batchSize int
batchDuration time.Duration
// Batch queues and state (atomic for lock-free access)
readQueue chan batchReadRequest
writeQueue chan batchWriteRequest
initialized int32
running int32
threadPinned int32
writePinned int32
// Buffers (pre-allocated to avoid allocation overhead)
readBufPool *sync.Pool
writeBufPool *sync.Pool
}
type BatchAudioStats struct {
// int64 fields MUST be first for ARM32 alignment
BatchedReads int64
SingleReads int64
BatchedWrites int64
SingleWrites int64
BatchedFrames int64
SingleFrames int64
WriteFrames int64
CGOCallsReduced int64
OSThreadPinTime time.Duration // time.Duration is int64 internally
WriteThreadTime time.Duration // time.Duration is int64 internally
LastBatchTime time.Time
LastWriteTime time.Time
}
type batchReadRequest struct {
buffer []byte
resultChan chan batchReadResult
timestamp time.Time
}
type batchReadResult struct {
length int
err error
}
type batchWriteRequest struct {
buffer []byte // Buffer for backward compatibility
opusData []byte // Opus encoded data for decode-write operations
pcmBuffer []byte // PCM buffer for decode-write operations
resultChan chan batchWriteResult
timestamp time.Time
}
type batchWriteResult struct {
length int
err error
}
// NewBatchAudioProcessor creates a new batch audio processor
func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor {
// Validate input parameters with minimal overhead
if batchSize <= 0 || batchSize > 1000 {
batchSize = Config.BatchProcessorFramesPerBatch
}
if batchDuration <= 0 {
batchDuration = Config.BatchProcessingDelay
}
// Use optimized queue sizes from configuration
queueSize := Config.BatchProcessorMaxQueueSize
if queueSize <= 0 {
queueSize = batchSize * 2 // Fallback to double batch size
}
ctx, cancel := context.WithCancel(context.Background())
// Pre-allocate logger to avoid repeated allocations
logger := logging.GetDefaultLogger().With().Str("component", "batch-audio").Logger()
frameSize := Config.MinReadEncodeBuffer
if frameSize == 0 {
frameSize = 1500 // Safe fallback
}
processor := &BatchAudioProcessor{
ctx: ctx,
cancel: cancel,
logger: &logger,
batchSize: batchSize,
batchDuration: batchDuration,
readQueue: make(chan batchReadRequest, queueSize),
writeQueue: make(chan batchWriteRequest, queueSize),
readBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, frameSize)
},
},
writeBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, frameSize)
},
},
}
return processor
}
// Start initializes and starts the batch processor
func (bap *BatchAudioProcessor) Start() error {
if !atomic.CompareAndSwapInt32(&bap.running, 0, 1) {
return nil // Already running
}
// Initialize CGO resources once per processor lifecycle
if !atomic.CompareAndSwapInt32(&bap.initialized, 0, 1) {
return nil // Already initialized
}
// Start batch processing goroutines
go bap.batchReadProcessor()
go bap.batchWriteProcessor()
bap.logger.Info().Int("batch_size", bap.batchSize).
Dur("batch_duration", bap.batchDuration).
Msg("batch audio processor started")
return nil
}
// Stop cleanly shuts down the batch processor
func (bap *BatchAudioProcessor) Stop() {
if !atomic.CompareAndSwapInt32(&bap.running, 1, 0) {
return // Already stopped
}
bap.cancel()
// Wait for processing to complete
time.Sleep(bap.batchDuration + Config.BatchProcessingDelay)
bap.logger.Info().Msg("batch audio processor stopped")
}
// BatchReadEncode performs batched audio read and encode operations
func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
// Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil {
// Only log validation errors in debug mode to reduce overhead
if bap.logger.GetLevel() <= zerolog.DebugLevel {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
}
return 0, err
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleReads, 10)
atomic.AddInt64(&bap.stats.SingleFrames, 10)
}
return CGOAudioReadEncode(buffer)
}
resultChan := make(chan batchReadResult, 1)
request := batchReadRequest{
buffer: buffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.readQueue <- request:
// Successfully queued
default:
// Queue is full, fallback to single operation
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleReads, 10)
atomic.AddInt64(&bap.stats.SingleFrames, 10)
}
return CGOAudioReadEncode(buffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(Config.BatchProcessorTimeout):
// Timeout, fallback to single operation
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleReads)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleReads, 10)
atomic.AddInt64(&bap.stats.SingleFrames, 10)
}
return CGOAudioReadEncode(buffer)
}
}
// BatchDecodeWrite performs batched audio decode and write operations
// This is the legacy version that uses a single buffer
func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
// Validate buffer before processing
if err := ValidateBufferSize(len(buffer)); err != nil {
// Only log validation errors in debug mode to reduce overhead
if bap.logger.GetLevel() <= zerolog.DebugLevel {
bap.logger.Debug().Err(err).Msg("invalid buffer for batch processing")
}
return 0, err
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleWrites, 10)
atomic.AddInt64(&bap.stats.WriteFrames, 10)
}
return CGOAudioDecodeWriteLegacy(buffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
buffer: buffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleWrites, 10)
atomic.AddInt64(&bap.stats.WriteFrames, 10)
}
return CGOAudioDecodeWriteLegacy(buffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(Config.BatchProcessorTimeout):
// Use sampling to reduce atomic operations overhead
if atomic.LoadInt64(&bap.stats.SingleWrites)%10 == 0 {
atomic.AddInt64(&bap.stats.SingleWrites, 10)
atomic.AddInt64(&bap.stats.WriteFrames, 10)
}
return CGOAudioDecodeWriteLegacy(buffer)
}
}
// BatchDecodeWriteWithBuffers performs batched audio decode and write operations with separate opus and PCM buffers
func (bap *BatchAudioProcessor) BatchDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
// Validate buffers before processing
if len(opusData) == 0 {
return 0, fmt.Errorf("empty opus data buffer")
}
if len(pcmBuffer) == 0 {
return 0, fmt.Errorf("empty PCM buffer")
}
if !bap.IsRunning() {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
opusData: opusData,
pcmBuffer: pcmBuffer,
resultChan: resultChan,
timestamp: time.Now(),
}
// Try to queue the request with non-blocking send
select {
case bap.writeQueue <- request:
// Successfully queued
default:
// Queue is full, fall back to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
// Wait for result with timeout
select {
case result := <-resultChan:
return result.length, result.err
case <-time.After(Config.BatchProcessorTimeout):
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, 1)
// Use the optimized function with separate buffers
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
}
// batchReadProcessor processes batched read operations
func (bap *BatchAudioProcessor) batchReadProcessor() {
defer bap.logger.Debug().Msg("batch read processor stopped")
ticker := time.NewTicker(bap.batchDuration)
defer ticker.Stop()
var batch []batchReadRequest
batch = make([]batchReadRequest, 0, bap.batchSize)
for atomic.LoadInt32(&bap.running) == 1 {
select {
case <-bap.ctx.Done():
return
case req := <-bap.readQueue:
batch = append(batch, req)
if len(batch) >= bap.batchSize {
bap.processBatchRead(batch)
batch = batch[:0] // Clear slice but keep capacity
}
case <-ticker.C:
if len(batch) > 0 {
bap.processBatchRead(batch)
batch = batch[:0] // Clear slice but keep capacity
}
}
}
// Process any remaining requests
if len(batch) > 0 {
bap.processBatchRead(batch)
}
}
// batchWriteProcessor processes batched write operations
func (bap *BatchAudioProcessor) batchWriteProcessor() {
defer bap.logger.Debug().Msg("batch write processor stopped")
ticker := time.NewTicker(bap.batchDuration)
defer ticker.Stop()
var batch []batchWriteRequest
batch = make([]batchWriteRequest, 0, bap.batchSize)
for atomic.LoadInt32(&bap.running) == 1 {
select {
case <-bap.ctx.Done():
return
case req := <-bap.writeQueue:
batch = append(batch, req)
if len(batch) >= bap.batchSize {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
case <-ticker.C:
if len(batch) > 0 {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
}
}
// Process any remaining requests
if len(batch) > 0 {
bap.processBatchWrite(batch)
}
}
// processBatchRead processes a batch of read requests efficiently
func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
batchSize := len(batch)
if batchSize == 0 {
return
}
threadPinningThreshold := Config.BatchProcessorThreadPinningThreshold
if threadPinningThreshold == 0 {
threadPinningThreshold = Config.MinBatchSizeForThreadPinning // Fallback
}
// Only pin to OS thread for large batches to reduce thread contention
var start time.Time
threadWasPinned := false
if batchSize >= threadPinningThreshold && atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
start = time.Now()
threadWasPinned = true
runtime.LockOSThread()
}
// Batch stats updates to reduce atomic operations (update once per batch instead of per frame)
atomic.AddInt64(&bap.stats.BatchedReads, 1)
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Process each request in the batch with minimal overhead
for i := range batch {
req := &batch[i]
length, err := CGOAudioReadEncode(req.buffer)
// Send result back (non-blocking) - reuse result struct
select {
case req.resultChan <- batchReadResult{length: length, err: err}:
default:
// Requestor timed out, drop result
}
}
// Release thread lock if we pinned it
if threadWasPinned {
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.threadPinned, 0)
bap.stats.OSThreadPinTime += time.Since(start)
}
// Update timestamp only once per batch instead of per frame
bap.stats.LastBatchTime = time.Now()
}
// processBatchWrite processes a batch of write requests efficiently
func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
if len(batch) == 0 {
return
}
threadPinningThreshold := Config.BatchProcessorThreadPinningThreshold
if threadPinningThreshold == 0 {
threadPinningThreshold = Config.MinBatchSizeForThreadPinning // Fallback
}
// Only pin to OS thread for large batches to reduce thread contention
start := time.Now()
shouldPinThread := len(batch) >= threadPinningThreshold
// Track if we pinned the thread in this call
threadWasPinned := false
if shouldPinThread && atomic.CompareAndSwapInt32(&bap.writePinned, 0, 1) {
threadWasPinned = true
runtime.LockOSThread()
// Priority scheduler not implemented - using default thread priority
}
batchSize := len(batch)
atomic.AddInt64(&bap.stats.BatchedWrites, 1)
atomic.AddInt64(&bap.stats.WriteFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Add deferred function to release thread lock if we pinned it
if threadWasPinned {
defer func() {
// Priority scheduler not implemented - using default thread priority
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.writePinned, 0)
bap.stats.WriteThreadTime += time.Since(start)
}()
}
// Process each request in the batch
for _, req := range batch {
var length int
var err error
// Handle both legacy and new decode-write operations
if req.opusData != nil && req.pcmBuffer != nil {
// New style with separate opus data and PCM buffer
length, err = CGOAudioDecodeWrite(req.opusData, req.pcmBuffer)
} else {
// Legacy style with single buffer
length, err = CGOAudioDecodeWriteLegacy(req.buffer)
}
result := batchWriteResult{
length: length,
err: err,
}
// Send result back (non-blocking)
select {
case req.resultChan <- result:
default:
// Requestor timed out, drop result
}
}
bap.stats.LastWriteTime = time.Now()
}
// GetStats returns current batch processor statistics
func (bap *BatchAudioProcessor) GetStats() BatchAudioStats {
return BatchAudioStats{
BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads),
SingleReads: atomic.LoadInt64(&bap.stats.SingleReads),
BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites),
SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites),
BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames),
SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames),
WriteFrames: atomic.LoadInt64(&bap.stats.WriteFrames),
CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced),
OSThreadPinTime: bap.stats.OSThreadPinTime,
WriteThreadTime: bap.stats.WriteThreadTime,
LastBatchTime: bap.stats.LastBatchTime,
LastWriteTime: bap.stats.LastWriteTime,
}
}
// IsRunning returns whether the batch processor is running
func (bap *BatchAudioProcessor) IsRunning() bool {
return atomic.LoadInt32(&bap.running) == 1
}
// Global batch processor instance
var (
globalBatchProcessor unsafe.Pointer // *BatchAudioProcessor
batchProcessorInitialized int32
)
// GetBatchAudioProcessor returns the global batch processor instance
func GetBatchAudioProcessor() *BatchAudioProcessor {
ptr := atomic.LoadPointer(&globalBatchProcessor)
if ptr != nil {
return (*BatchAudioProcessor)(ptr)
}
// Initialize on first use
if atomic.CompareAndSwapInt32(&batchProcessorInitialized, 0, 1) {
processor := NewBatchAudioProcessor(Config.BatchProcessorFramesPerBatch, Config.BatchProcessorTimeout)
atomic.StorePointer(&globalBatchProcessor, unsafe.Pointer(processor))
return processor
}
// Another goroutine initialized it, try again
ptr = atomic.LoadPointer(&globalBatchProcessor)
if ptr != nil {
return (*BatchAudioProcessor)(ptr)
}
// Fallback: create a new processor (should rarely happen)
return NewBatchAudioProcessor(Config.BatchProcessorFramesPerBatch, Config.BatchProcessorTimeout)
}
// EnableBatchAudioProcessing enables the global batch processor
func EnableBatchAudioProcessing() error {
processor := GetBatchAudioProcessor()
return processor.Start()
}
// DisableBatchAudioProcessing disables the global batch processor
func DisableBatchAudioProcessing() {
ptr := atomic.LoadPointer(&globalBatchProcessor)
if ptr != nil {
processor := (*BatchAudioProcessor)(ptr)
processor.Stop()
}
}
// BatchCGOAudioReadEncode is a batched version of CGOAudioReadEncode
func BatchCGOAudioReadEncode(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioReadEncode(buffer)
}
return processor.BatchReadEncode(buffer)
}
// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite
func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWriteLegacy(buffer)
}
return processor.BatchDecodeWrite(buffer)
}
// BatchCGOAudioDecodeWriteWithBuffers is a batched version of CGOAudioDecodeWrite that uses separate opus and PCM buffers
func BatchCGOAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor == nil || !processor.IsRunning() {
// Fall back to non-batched version if processor is not running
return CGOAudioDecodeWrite(opusData, pcmBuffer)
}
return processor.BatchDecodeWriteWithBuffers(opusData, pcmBuffer)
}

View File

@ -1,331 +0,0 @@
//go:build cgo
package audio
import (
"errors"
"sync"
"sync/atomic"
"unsafe"
)
// BatchReferenceManager handles batch reference counting operations
// to reduce atomic operation overhead for high-frequency frame operations
type BatchReferenceManager struct {
// Batch operations queue
batchQueue chan batchRefOperation
workerPool chan struct{} // Worker pool semaphore
running int32
wg sync.WaitGroup
// Statistics
batchedOps int64
singleOps int64
batchSavings int64 // Number of atomic operations saved
}
type batchRefOperation struct {
frames []*ZeroCopyAudioFrame
operation refOperationType
resultCh chan batchRefResult
}
type refOperationType int
const (
refOpAddRef refOperationType = iota
refOpRelease
refOpMixed // For operations with mixed AddRef/Release
)
// Errors
var (
ErrUnsupportedOperation = errors.New("unsupported batch reference operation")
)
type batchRefResult struct {
finalReleases []bool // For Release operations, indicates which frames had final release
err error
}
// Global batch reference manager
var (
globalBatchRefManager *BatchReferenceManager
batchRefOnce sync.Once
)
// GetBatchReferenceManager returns the global batch reference manager
func GetBatchReferenceManager() *BatchReferenceManager {
batchRefOnce.Do(func() {
globalBatchRefManager = NewBatchReferenceManager()
globalBatchRefManager.Start()
})
return globalBatchRefManager
}
// NewBatchReferenceManager creates a new batch reference manager
func NewBatchReferenceManager() *BatchReferenceManager {
return &BatchReferenceManager{
batchQueue: make(chan batchRefOperation, 256), // Buffered for high throughput
workerPool: make(chan struct{}, 4), // 4 workers for parallel processing
}
}
// Start starts the batch reference manager workers
func (brm *BatchReferenceManager) Start() {
if !atomic.CompareAndSwapInt32(&brm.running, 0, 1) {
return // Already running
}
// Start worker goroutines
for i := 0; i < cap(brm.workerPool); i++ {
brm.wg.Add(1)
go brm.worker()
}
}
// Stop stops the batch reference manager
func (brm *BatchReferenceManager) Stop() {
if !atomic.CompareAndSwapInt32(&brm.running, 1, 0) {
return // Already stopped
}
close(brm.batchQueue)
brm.wg.Wait()
}
// worker processes batch reference operations
func (brm *BatchReferenceManager) worker() {
defer brm.wg.Done()
for op := range brm.batchQueue {
brm.processBatchOperation(op)
}
}
// processBatchOperation processes a batch of reference operations
func (brm *BatchReferenceManager) processBatchOperation(op batchRefOperation) {
result := batchRefResult{}
switch op.operation {
case refOpAddRef:
// Batch AddRef operations
for _, frame := range op.frames {
if frame != nil {
atomic.AddInt32(&frame.refCount, 1)
}
}
atomic.AddInt64(&brm.batchedOps, int64(len(op.frames)))
atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1)) // Saved ops vs individual calls
case refOpRelease:
// Batch Release operations
result.finalReleases = make([]bool, len(op.frames))
for i, frame := range op.frames {
if frame != nil {
newCount := atomic.AddInt32(&frame.refCount, -1)
if newCount == 0 {
result.finalReleases[i] = true
// Return to pool if pooled
if frame.pooled {
globalZeroCopyPool.Put(frame)
}
}
}
}
atomic.AddInt64(&brm.batchedOps, int64(len(op.frames)))
atomic.AddInt64(&brm.batchSavings, int64(len(op.frames)-1))
case refOpMixed:
// Handle mixed operations (not implemented in this version)
result.err = ErrUnsupportedOperation
}
// Send result back
if op.resultCh != nil {
op.resultCh <- result
close(op.resultCh)
}
}
// BatchAddRef performs AddRef on multiple frames in a single batch
func (brm *BatchReferenceManager) BatchAddRef(frames []*ZeroCopyAudioFrame) error {
if len(frames) == 0 {
return nil
}
// For small batches, use direct operations to avoid overhead
if len(frames) <= 2 {
for _, frame := range frames {
if frame != nil {
frame.AddRef()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return nil
}
// Use batch processing for larger sets
if atomic.LoadInt32(&brm.running) == 0 {
// Fallback to individual operations if batch manager not running
for _, frame := range frames {
if frame != nil {
frame.AddRef()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return nil
}
resultCh := make(chan batchRefResult, 1)
op := batchRefOperation{
frames: frames,
operation: refOpAddRef,
resultCh: resultCh,
}
select {
case brm.batchQueue <- op:
// Wait for completion
<-resultCh
return nil
default:
// Queue full, fallback to individual operations
for _, frame := range frames {
if frame != nil {
frame.AddRef()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return nil
}
}
// BatchRelease performs Release on multiple frames in a single batch
// Returns a slice indicating which frames had their final reference released
func (brm *BatchReferenceManager) BatchRelease(frames []*ZeroCopyAudioFrame) ([]bool, error) {
if len(frames) == 0 {
return nil, nil
}
// For small batches, use direct operations
if len(frames) <= 2 {
finalReleases := make([]bool, len(frames))
for i, frame := range frames {
if frame != nil {
finalReleases[i] = frame.Release()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return finalReleases, nil
}
// Use batch processing for larger sets
if atomic.LoadInt32(&brm.running) == 0 {
// Fallback to individual operations
finalReleases := make([]bool, len(frames))
for i, frame := range frames {
if frame != nil {
finalReleases[i] = frame.Release()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return finalReleases, nil
}
resultCh := make(chan batchRefResult, 1)
op := batchRefOperation{
frames: frames,
operation: refOpRelease,
resultCh: resultCh,
}
select {
case brm.batchQueue <- op:
// Wait for completion
result := <-resultCh
return result.finalReleases, result.err
default:
// Queue full, fallback to individual operations
finalReleases := make([]bool, len(frames))
for i, frame := range frames {
if frame != nil {
finalReleases[i] = frame.Release()
}
}
atomic.AddInt64(&brm.singleOps, int64(len(frames)))
return finalReleases, nil
}
}
// GetStats returns batch reference counting statistics
func (brm *BatchReferenceManager) GetStats() (batchedOps, singleOps, savings int64) {
return atomic.LoadInt64(&brm.batchedOps),
atomic.LoadInt64(&brm.singleOps),
atomic.LoadInt64(&brm.batchSavings)
}
// Convenience functions for global batch reference manager
// BatchAddRefFrames performs batch AddRef on multiple frames
func BatchAddRefFrames(frames []*ZeroCopyAudioFrame) error {
return GetBatchReferenceManager().BatchAddRef(frames)
}
// BatchReleaseFrames performs batch Release on multiple frames
func BatchReleaseFrames(frames []*ZeroCopyAudioFrame) ([]bool, error) {
return GetBatchReferenceManager().BatchRelease(frames)
}
// GetBatchReferenceStats returns global batch reference statistics
func GetBatchReferenceStats() (batchedOps, singleOps, savings int64) {
return GetBatchReferenceManager().GetStats()
}
// ZeroCopyFrameSlice provides utilities for working with slices of zero-copy frames
type ZeroCopyFrameSlice []*ZeroCopyAudioFrame
// AddRefAll performs batch AddRef on all frames in the slice
func (zfs ZeroCopyFrameSlice) AddRefAll() error {
return BatchAddRefFrames(zfs)
}
// ReleaseAll performs batch Release on all frames in the slice
func (zfs ZeroCopyFrameSlice) ReleaseAll() ([]bool, error) {
return BatchReleaseFrames(zfs)
}
// FilterNonNil returns a new slice with only non-nil frames
func (zfs ZeroCopyFrameSlice) FilterNonNil() ZeroCopyFrameSlice {
filtered := make(ZeroCopyFrameSlice, 0, len(zfs))
for _, frame := range zfs {
if frame != nil {
filtered = append(filtered, frame)
}
}
return filtered
}
// Len returns the number of frames in the slice
func (zfs ZeroCopyFrameSlice) Len() int {
return len(zfs)
}
// Get returns the frame at the specified index
func (zfs ZeroCopyFrameSlice) Get(index int) *ZeroCopyAudioFrame {
if index < 0 || index >= len(zfs) {
return nil
}
return zfs[index]
}
// UnsafePointers returns unsafe pointers for all frames (for CGO batch operations)
func (zfs ZeroCopyFrameSlice) UnsafePointers() []unsafe.Pointer {
pointers := make([]unsafe.Pointer, len(zfs))
for i, frame := range zfs {
if frame != nil {
pointers[i] = frame.UnsafePointer()
}
}
return pointers
}

View File

@ -1,415 +0,0 @@
//go:build cgo
package audio
import (
"sync"
"sync/atomic"
"time"
)
// BatchZeroCopyProcessor handles batch operations on zero-copy audio frames
// with optimized reference counting and memory management
type BatchZeroCopyProcessor struct {
// Configuration
maxBatchSize int
batchTimeout time.Duration
processingDelay time.Duration
adaptiveThreshold float64
// Processing queues
readEncodeQueue chan *batchZeroCopyRequest
decodeWriteQueue chan *batchZeroCopyRequest
// Worker management
workerPool chan struct{}
running int32
wg sync.WaitGroup
// Statistics
batchedFrames int64
singleFrames int64
batchSavings int64
processingTimeUs int64
adaptiveHits int64
adaptiveMisses int64
}
type batchZeroCopyRequest struct {
frames []*ZeroCopyAudioFrame
operation batchZeroCopyOperation
resultCh chan batchZeroCopyResult
timestamp time.Time
}
type batchZeroCopyOperation int
const (
batchOpReadEncode batchZeroCopyOperation = iota
batchOpDecodeWrite
batchOpMixed
)
type batchZeroCopyResult struct {
encodedData [][]byte // For read-encode operations
processedCount int // Number of successfully processed frames
err error
}
// Global batch zero-copy processor
var (
globalBatchZeroCopyProcessor *BatchZeroCopyProcessor
batchZeroCopyOnce sync.Once
)
// GetBatchZeroCopyProcessor returns the global batch zero-copy processor
func GetBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
batchZeroCopyOnce.Do(func() {
globalBatchZeroCopyProcessor = NewBatchZeroCopyProcessor()
globalBatchZeroCopyProcessor.Start()
})
return globalBatchZeroCopyProcessor
}
// NewBatchZeroCopyProcessor creates a new batch zero-copy processor
func NewBatchZeroCopyProcessor() *BatchZeroCopyProcessor {
cache := Config
return &BatchZeroCopyProcessor{
maxBatchSize: cache.BatchProcessorFramesPerBatch,
batchTimeout: cache.BatchProcessorTimeout,
processingDelay: cache.BatchProcessingDelay,
adaptiveThreshold: cache.BatchProcessorAdaptiveThreshold,
readEncodeQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
decodeWriteQueue: make(chan *batchZeroCopyRequest, cache.BatchProcessorMaxQueueSize),
workerPool: make(chan struct{}, 4), // 4 workers for parallel processing
}
}
// Start starts the batch zero-copy processor workers
func (bzcp *BatchZeroCopyProcessor) Start() {
if !atomic.CompareAndSwapInt32(&bzcp.running, 0, 1) {
return // Already running
}
// Start worker goroutines for read-encode operations
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
bzcp.wg.Add(1)
go bzcp.readEncodeWorker()
}
// Start worker goroutines for decode-write operations
for i := 0; i < cap(bzcp.workerPool)/2; i++ {
bzcp.wg.Add(1)
go bzcp.decodeWriteWorker()
}
}
// Stop stops the batch zero-copy processor
func (bzcp *BatchZeroCopyProcessor) Stop() {
if !atomic.CompareAndSwapInt32(&bzcp.running, 1, 0) {
return // Already stopped
}
close(bzcp.readEncodeQueue)
close(bzcp.decodeWriteQueue)
bzcp.wg.Wait()
}
// readEncodeWorker processes batch read-encode operations
func (bzcp *BatchZeroCopyProcessor) readEncodeWorker() {
defer bzcp.wg.Done()
for req := range bzcp.readEncodeQueue {
bzcp.processBatchReadEncode(req)
}
}
// decodeWriteWorker processes batch decode-write operations
func (bzcp *BatchZeroCopyProcessor) decodeWriteWorker() {
defer bzcp.wg.Done()
for req := range bzcp.decodeWriteQueue {
bzcp.processBatchDecodeWrite(req)
}
}
// processBatchReadEncode processes a batch of read-encode operations
func (bzcp *BatchZeroCopyProcessor) processBatchReadEncode(req *batchZeroCopyRequest) {
startTime := time.Now()
result := batchZeroCopyResult{}
// Batch AddRef all frames first
err := BatchAddRefFrames(req.frames)
if err != nil {
result.err = err
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
return
}
// Process frames using existing batch read-encode logic
encodedData, err := BatchReadEncode(len(req.frames))
if err != nil {
// Batch release frames on error
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but preserve original error
_ = releaseErr
}
result.err = err
} else {
result.encodedData = encodedData
result.processedCount = len(encodedData)
// Batch release frames after successful processing
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but don't fail the operation
_ = releaseErr
}
}
// Update statistics
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
// Send result back
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
}
// processBatchDecodeWrite processes a batch of decode-write operations
func (bzcp *BatchZeroCopyProcessor) processBatchDecodeWrite(req *batchZeroCopyRequest) {
startTime := time.Now()
result := batchZeroCopyResult{}
// Batch AddRef all frames first
err := BatchAddRefFrames(req.frames)
if err != nil {
result.err = err
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
return
}
// Extract data from zero-copy frames for batch processing
frameData := make([][]byte, len(req.frames))
for i, frame := range req.frames {
if frame != nil {
// Get data from zero-copy frame
frameData[i] = frame.Data()[:frame.Length()]
}
}
// Process frames using existing batch decode-write logic
err = BatchDecodeWrite(frameData)
if err != nil {
result.err = err
} else {
result.processedCount = len(req.frames)
}
// Batch release frames
if _, releaseErr := BatchReleaseFrames(req.frames); releaseErr != nil {
// Log release error but don't override processing error
_ = releaseErr
}
// Update statistics
atomic.AddInt64(&bzcp.batchedFrames, int64(len(req.frames)))
atomic.AddInt64(&bzcp.batchSavings, int64(len(req.frames)-1))
atomic.AddInt64(&bzcp.processingTimeUs, time.Since(startTime).Microseconds())
// Send result back
if req.resultCh != nil {
req.resultCh <- result
close(req.resultCh)
}
}
// BatchReadEncodeZeroCopy performs batch read-encode on zero-copy frames
func (bzcp *BatchZeroCopyProcessor) BatchReadEncodeZeroCopy(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
if len(frames) == 0 {
return nil, nil
}
// For small batches, use direct operations to avoid overhead
if len(frames) <= 2 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
// Use adaptive threshold to determine batch vs single processing
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
totalFrames := batchedFrames + singleFrames
if totalFrames > 100 { // Only apply adaptive logic after some samples
batchRatio := float64(batchedFrames) / float64(totalFrames)
if batchRatio < bzcp.adaptiveThreshold {
// Batch processing not effective, use single processing
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
atomic.AddInt64(&bzcp.adaptiveHits, 1)
}
// Use batch processing
if atomic.LoadInt32(&bzcp.running) == 0 {
// Fallback to single processing if batch processor not running
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
resultCh := make(chan batchZeroCopyResult, 1)
req := &batchZeroCopyRequest{
frames: frames,
operation: batchOpReadEncode,
resultCh: resultCh,
timestamp: time.Now(),
}
select {
case bzcp.readEncodeQueue <- req:
// Wait for completion
result := <-resultCh
return result.encodedData, result.err
default:
// Queue full, fallback to single processing
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleReadEncode(frames)
}
}
// BatchDecodeWriteZeroCopy performs batch decode-write on zero-copy frames
func (bzcp *BatchZeroCopyProcessor) BatchDecodeWriteZeroCopy(frames []*ZeroCopyAudioFrame) error {
if len(frames) == 0 {
return nil
}
// For small batches, use direct operations
if len(frames) <= 2 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
// Use adaptive threshold
batchedFrames := atomic.LoadInt64(&bzcp.batchedFrames)
singleFrames := atomic.LoadInt64(&bzcp.singleFrames)
totalFrames := batchedFrames + singleFrames
if totalFrames > 100 {
batchRatio := float64(batchedFrames) / float64(totalFrames)
if batchRatio < bzcp.adaptiveThreshold {
atomic.AddInt64(&bzcp.adaptiveMisses, 1)
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
atomic.AddInt64(&bzcp.adaptiveHits, 1)
}
// Use batch processing
if atomic.LoadInt32(&bzcp.running) == 0 {
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
resultCh := make(chan batchZeroCopyResult, 1)
req := &batchZeroCopyRequest{
frames: frames,
operation: batchOpDecodeWrite,
resultCh: resultCh,
timestamp: time.Now(),
}
select {
case bzcp.decodeWriteQueue <- req:
// Wait for completion
result := <-resultCh
return result.err
default:
// Queue full, fallback to single processing
atomic.AddInt64(&bzcp.singleFrames, int64(len(frames)))
return bzcp.processSingleDecodeWrite(frames)
}
}
// processSingleReadEncode processes frames individually for read-encode
func (bzcp *BatchZeroCopyProcessor) processSingleReadEncode(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
// Extract data and use existing batch processing
frameData := make([][]byte, 0, len(frames))
for _, frame := range frames {
if frame != nil {
frame.AddRef()
frameData = append(frameData, frame.Data()[:frame.Length()])
}
}
// Use existing batch read-encode
result, err := BatchReadEncode(len(frameData))
// Release frames
for _, frame := range frames {
if frame != nil {
frame.Release()
}
}
return result, err
}
// processSingleDecodeWrite processes frames individually for decode-write
func (bzcp *BatchZeroCopyProcessor) processSingleDecodeWrite(frames []*ZeroCopyAudioFrame) error {
// Extract data and use existing batch processing
frameData := make([][]byte, 0, len(frames))
for _, frame := range frames {
if frame != nil {
frame.AddRef()
frameData = append(frameData, frame.Data()[:frame.Length()])
}
}
// Use existing batch decode-write
err := BatchDecodeWrite(frameData)
// Release frames
for _, frame := range frames {
if frame != nil {
frame.Release()
}
}
return err
}
// GetBatchZeroCopyStats returns batch zero-copy processing statistics
func (bzcp *BatchZeroCopyProcessor) GetBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
return atomic.LoadInt64(&bzcp.batchedFrames),
atomic.LoadInt64(&bzcp.singleFrames),
atomic.LoadInt64(&bzcp.batchSavings),
atomic.LoadInt64(&bzcp.processingTimeUs),
atomic.LoadInt64(&bzcp.adaptiveHits),
atomic.LoadInt64(&bzcp.adaptiveMisses)
}
// Convenience functions for global batch zero-copy processor
// BatchReadEncodeZeroCopyFrames performs batch read-encode on zero-copy frames
func BatchReadEncodeZeroCopyFrames(frames []*ZeroCopyAudioFrame) ([][]byte, error) {
return GetBatchZeroCopyProcessor().BatchReadEncodeZeroCopy(frames)
}
// BatchDecodeWriteZeroCopyFrames performs batch decode-write on zero-copy frames
func BatchDecodeWriteZeroCopyFrames(frames []*ZeroCopyAudioFrame) error {
return GetBatchZeroCopyProcessor().BatchDecodeWriteZeroCopy(frames)
}
// GetGlobalBatchZeroCopyStats returns global batch zero-copy processing statistics
func GetGlobalBatchZeroCopyStats() (batchedFrames, singleFrames, savings, processingTimeUs, adaptiveHits, adaptiveMisses int64) {
return GetBatchZeroCopyProcessor().GetBatchZeroCopyStats()
}

File diff suppressed because it is too large Load Diff

View File

@ -5,10 +5,15 @@ package audio
import (
"errors"
"fmt"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
/*
@ -19,9 +24,7 @@ import (
*/
import "C"
// Optimized Go wrappers with reduced overhead
var (
// Base error types for wrapping with context
errAudioInitFailed = errors.New("failed to init ALSA/Opus")
errAudioReadEncode = errors.New("audio read/encode error")
errAudioDecodeWrite = errors.New("audio decode/write error")
@ -91,6 +94,30 @@ func cgoAudioInit() error {
cache := GetCachedConfig()
cache.Update()
// Enable C trace logging if Go audio scope trace level is active
// Enable C trace logging if Go audio scope trace level is active
audioLogger := logging.GetSubsystemLogger("audio")
loggerTraceEnabled := audioLogger.GetLevel() <= zerolog.TraceLevel
// Manual check for audio scope in PION_LOG_TRACE (workaround for logging system bug)
manualTraceEnabled := false
pionTrace := os.Getenv("PION_LOG_TRACE")
if pionTrace != "" {
scopes := strings.Split(strings.ToLower(pionTrace), ",")
for _, scope := range scopes {
if strings.TrimSpace(scope) == "audio" {
manualTraceEnabled = true
break
}
}
}
// Use manual check as fallback if logging system fails
traceEnabled := loggerTraceEnabled || manualTraceEnabled
CGOSetTraceLogging(traceEnabled)
// Update C constants from cached config (atomic access, no locks)
C.update_audio_constants(
C.int(cache.opusBitrate.Load()),
@ -110,7 +137,7 @@ func cgoAudioInit() error {
C.int(Config.CGOMaxBackoffMicroseconds),
)
result := C.jetkvm_audio_init()
result := C.jetkvm_audio_capture_init()
if result != 0 {
return newAudioInitError(int(result))
}
@ -161,16 +188,6 @@ type AudioConfigCache struct {
inputProcessingTimeoutMS atomic.Int32
maxRestartAttempts atomic.Int32
// Batch processing related values
BatchProcessingTimeout time.Duration
BatchProcessorFramesPerBatch int
BatchProcessorTimeout time.Duration
BatchProcessingDelay time.Duration
MinBatchSizeForThreadPinning int
BatchProcessorMaxQueueSize int
BatchProcessorAdaptiveThreshold float64
BatchProcessorThreadPinningThreshold int
// Mutex for updating the cache
mutex sync.RWMutex
lastUpdate time.Time
@ -184,7 +201,7 @@ type AudioConfigCache struct {
// Global audio config cache instance
var globalAudioConfigCache = &AudioConfigCache{
cacheExpiry: 30 * time.Second, // Increased from 10s to 30s to further reduce cache updates
cacheExpiry: 30 * time.Second,
}
// GetCachedConfig returns the global audio config cache instance
@ -234,16 +251,6 @@ func (c *AudioConfigCache) Update() {
c.minOpusBitrate.Store(int32(Config.MinOpusBitrate))
c.maxOpusBitrate.Store(int32(Config.MaxOpusBitrate))
// Update batch processing related values
c.BatchProcessingTimeout = 100 * time.Millisecond // Fixed timeout for batch processing
c.BatchProcessorFramesPerBatch = Config.BatchProcessorFramesPerBatch
c.BatchProcessorTimeout = Config.BatchProcessorTimeout
c.BatchProcessingDelay = Config.BatchProcessingDelay
c.MinBatchSizeForThreadPinning = Config.MinBatchSizeForThreadPinning
c.BatchProcessorMaxQueueSize = Config.BatchProcessorMaxQueueSize
c.BatchProcessorAdaptiveThreshold = Config.BatchProcessorAdaptiveThreshold
c.BatchProcessorThreadPinningThreshold = Config.BatchProcessorThreadPinningThreshold
// Pre-allocate common errors
c.bufferTooSmallReadEncode = newBufferTooSmallError(0, Config.MinReadEncodeBuffer)
c.bufferTooLargeDecodeWrite = newBufferTooLargeError(Config.MaxDecodeWriteBuffer+1, Config.MaxDecodeWriteBuffer)
@ -251,6 +258,9 @@ func (c *AudioConfigCache) Update() {
c.lastUpdate = time.Now()
c.initialized.Store(true)
c.lastUpdate = time.Now()
c.initialized.Store(true)
// Update the global validation cache as well
if cachedMaxFrameSize != 0 {
cachedMaxFrameSize = Config.MaxAudioFrameSize
@ -288,8 +298,6 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error {
return c.bufferTooLargeDecodeWrite
}
// Removed duplicate config caching system - using AudioConfigCache instead
// updateCacheIfNeeded updates cache only if expired to avoid overhead
func updateCacheIfNeeded(cache *AudioConfigCache) {
if cache.initialized.Load() {
@ -335,6 +343,10 @@ func cgoAudioPlaybackInit() error {
cache := GetCachedConfig()
cache.Update()
// Enable C trace logging if Go audio scope trace level is active
audioLogger := logging.GetSubsystemLogger("audio")
CGOSetTraceLogging(audioLogger.GetLevel() <= zerolog.TraceLevel)
// No need to update C constants here as they're already set in cgoAudioInit
ret := C.jetkvm_audio_playback_init()
@ -348,9 +360,44 @@ func cgoAudioPlaybackClose() {
C.jetkvm_audio_playback_close()
}
// Audio decode/write metrics for monitoring USB Gadget audio success
var (
audioDecodeWriteTotal atomic.Int64
audioDecodeWriteSuccess atomic.Int64
audioDecodeWriteFailures atomic.Int64
audioDecodeWriteRecovery atomic.Int64
audioDecodeWriteLastError atomic.Value
audioDecodeWriteLastTime atomic.Int64
)
// GetAudioDecodeWriteStats returns current audio decode/write statistics
func GetAudioDecodeWriteStats() (total, success, failures, recovery int64, lastError string, lastTime time.Time) {
total = audioDecodeWriteTotal.Load()
success = audioDecodeWriteSuccess.Load()
failures = audioDecodeWriteFailures.Load()
recovery = audioDecodeWriteRecovery.Load()
if err := audioDecodeWriteLastError.Load(); err != nil {
lastError = err.(string)
}
lastTimeNano := audioDecodeWriteLastTime.Load()
if lastTimeNano > 0 {
lastTime = time.Unix(0, lastTimeNano)
}
return
}
func cgoAudioDecodeWrite(buf []byte) (int, error) {
start := time.Now()
audioDecodeWriteTotal.Add(1)
audioDecodeWriteLastTime.Store(start.UnixNano())
// Minimal validation - assume caller provides correct size
if len(buf) == 0 {
audioDecodeWriteFailures.Add(1)
audioDecodeWriteLastError.Store("empty buffer")
return 0, errEmptyBuffer
}
@ -359,14 +406,31 @@ func cgoAudioDecodeWrite(buf []byte) (int, error) {
// Fast path for success
if n >= 0 {
audioDecodeWriteSuccess.Add(1)
return n, nil
}
// Error handling with static errors
if n == -1 {
return 0, errAudioInitFailed
audioDecodeWriteFailures.Add(1)
var errMsg string
var err error
switch n {
case -1:
errMsg = "audio system not initialized"
err = errAudioInitFailed
case -2:
errMsg = "audio device error or recovery failed"
err = errAudioDecodeWrite
audioDecodeWriteRecovery.Add(1)
default:
errMsg = fmt.Sprintf("unknown error code %d", n)
err = errAudioDecodeWrite
}
return 0, errAudioDecodeWrite
audioDecodeWriteLastError.Store(errMsg)
return 0, err
}
// updateOpusEncoderParams dynamically updates OPUS encoder parameters
@ -388,7 +452,9 @@ func updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType
// Buffer pool for reusing buffers in CGO functions
var (
// Using SizedBufferPool for better memory management
// Simple buffer pool for PCM data
pcmBufferPool = NewAudioBufferPool(Config.MaxPCMBufferSize)
// Track buffer pool usage
cgoBufferPoolGets atomic.Int64
cgoBufferPoolPuts atomic.Int64
@ -396,19 +462,19 @@ var (
batchProcessingCount atomic.Int64
batchFrameCount atomic.Int64
batchProcessingTime atomic.Int64
// Batch time tracking removed
)
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity
func GetBufferFromPool(minCapacity int) []byte {
cgoBufferPoolGets.Add(1)
return GetOptimalBuffer(minCapacity)
// Use simple fixed-size buffer for PCM data
return pcmBufferPool.Get()
}
// ReturnBufferToPool returns a buffer to the pool
func ReturnBufferToPool(buf []byte) {
cgoBufferPoolPuts.Add(1)
ReturnOptimalBuffer(buf)
pcmBufferPool.Put(buf)
}
// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool
@ -451,125 +517,6 @@ func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
return CGOAudioDecodeWrite(data, pcmBuffer)
}
// BatchReadEncode reads and encodes multiple audio frames in a single batch
// with optimized zero-copy frame management and batch reference counting
func BatchReadEncode(batchSize int) ([][]byte, error) {
// Simple batch processing without complex overhead
frames := make([][]byte, 0, batchSize)
frameSize := 4096 // Fixed frame size for performance
for i := 0; i < batchSize; i++ {
buf := make([]byte, frameSize)
n, err := cgoAudioReadEncode(buf)
if err != nil {
if i > 0 {
return frames, nil // Return partial batch
}
return nil, err
}
if n > 0 {
frames = append(frames, buf[:n])
}
}
return frames, nil
}
// BatchDecodeWrite decodes and writes multiple audio frames in a single batch
// This reduces CGO call overhead by processing multiple frames at once
// with optimized zero-copy frame management and batch reference counting
func BatchDecodeWrite(frames [][]byte) error {
// Validate input
if len(frames) == 0 {
return nil
}
// Convert to zero-copy frames for optimized processing
zeroCopyFrames := make([]*ZeroCopyAudioFrame, 0, len(frames))
for _, frameData := range frames {
if len(frameData) > 0 {
frame := GetZeroCopyFrame()
frame.SetDataDirect(frameData) // Direct assignment without copy
zeroCopyFrames = append(zeroCopyFrames, frame)
}
}
// Use batch reference counting for efficient management
if len(zeroCopyFrames) > 0 {
// Batch AddRef all frames at once
err := BatchAddRefFrames(zeroCopyFrames)
if err != nil {
return err
}
// Ensure cleanup with batch release
defer func() {
if _, err := BatchReleaseFrames(zeroCopyFrames); err != nil {
// Log release error but don't fail the operation
_ = err
}
}()
}
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
// Track batch processing statistics - only if enabled
var startTime time.Time
// Batch time tracking removed
trackTime := false
if trackTime {
startTime = time.Now()
}
batchProcessingCount.Add(1)
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Process each zero-copy frame with optimized batch processing
frameCount := 0
for _, zcFrame := range zeroCopyFrames {
// Get frame data from zero-copy frame
frameData := zcFrame.Data()[:zcFrame.Length()]
if len(frameData) == 0 {
continue
}
// Process this frame using optimized implementation
_, err := CGOAudioDecodeWrite(frameData, pcmBuffer)
if err != nil {
// Update statistics before returning error
batchFrameCount.Add(int64(frameCount))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return err
}
frameCount++
}
// Update statistics
batchFrameCount.Add(int64(frameCount))
if trackTime {
batchProcessingTime.Add(time.Since(startTime).Microseconds())
}
return nil
}
// GetBatchProcessingStats returns statistics about batch processing
func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
count = batchProcessingCount.Load()
@ -587,11 +534,19 @@ func GetBatchProcessingStats() (count, frames, avgTimeUs int64) {
// cgoAudioDecodeWriteWithBuffers decodes opus data and writes to PCM buffer
// This implementation uses separate buffers for opus data and PCM output
func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, error) {
start := time.Now()
audioDecodeWriteTotal.Add(1)
audioDecodeWriteLastTime.Store(start.UnixNano())
// Validate input
if len(opusData) == 0 {
audioDecodeWriteFailures.Add(1)
audioDecodeWriteLastError.Store("empty opus data")
return 0, errEmptyBuffer
}
if len(pcmBuffer) == 0 {
if cap(pcmBuffer) == 0 {
audioDecodeWriteFailures.Add(1)
audioDecodeWriteLastError.Store("empty pcm buffer capacity")
return 0, errEmptyBuffer
}
@ -613,26 +568,44 @@ func cgoAudioDecodeWriteWithBuffers(opusData []byte, pcmBuffer []byte) (int, err
// Ensure data doesn't exceed max packet size
maxPacketSize := cache.GetMaxPacketSize()
if len(opusData) > maxPacketSize {
audioDecodeWriteFailures.Add(1)
errMsg := fmt.Sprintf("opus packet too large: %d > %d", len(opusData), maxPacketSize)
audioDecodeWriteLastError.Store(errMsg)
return 0, newBufferTooLargeError(len(opusData), maxPacketSize)
}
// Metrics tracking only - detailed logging handled at application level
// Direct CGO call with minimal overhead - unsafe.Pointer(&slice[0]) is never nil for non-empty slices
n := int(C.jetkvm_audio_decode_write(unsafe.Pointer(&opusData[0]), C.int(len(opusData))))
// Fast path for success case
if n >= 0 {
audioDecodeWriteSuccess.Add(1)
return n, nil
}
// Handle error cases with static error codes to reduce allocations
audioDecodeWriteFailures.Add(1)
var errMsg string
var err error
switch n {
case -1:
return 0, errAudioInitFailed
errMsg = "audio system not initialized"
err = errAudioInitFailed
case -2:
return 0, errAudioDecodeWrite
errMsg = "audio device error or recovery failed"
err = errAudioDecodeWrite
audioDecodeWriteRecovery.Add(1)
default:
return 0, newAudioDecodeWriteError(n)
errMsg = fmt.Sprintf("unknown error code %d", n)
err = newAudioDecodeWriteError(n)
}
audioDecodeWriteLastError.Store(errMsg)
return 0, err
}
// Optimized CGO function aliases - use direct function calls to reduce overhead
@ -642,10 +615,20 @@ func CGOAudioClose() { cgoAudioClose() }
func CGOAudioReadEncode(buf []byte) (int, error) { return cgoAudioReadEncode(buf) }
func CGOAudioPlaybackInit() error { return cgoAudioPlaybackInit() }
func CGOAudioPlaybackClose() { cgoAudioPlaybackClose() }
func CGOAudioDecodeWriteLegacy(buf []byte) (int, error) { return cgoAudioDecodeWrite(buf) }
func CGOAudioDecodeWrite(opusData []byte, pcmBuffer []byte) (int, error) {
return cgoAudioDecodeWriteWithBuffers(opusData, pcmBuffer)
}
func CGOUpdateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx int) error {
return updateOpusEncoderParams(bitrate, complexity, vbr, vbrConstraint, signalType, bandwidth, dtx)
}
func CGOSetTraceLogging(enabled bool) {
var cEnabled C.int
if enabled {
cEnabled = 1
} else {
cEnabled = 0
}
C.set_trace_logging(cEnabled)
}

View File

@ -1,15 +0,0 @@
package audio
import "time"
// GetMetricsUpdateInterval returns the current metrics update interval from centralized config
func GetMetricsUpdateInterval() time.Duration {
return Config.MetricsUpdateInterval
}
// SetMetricsUpdateInterval sets the metrics update interval in centralized config
func SetMetricsUpdateInterval(interval time.Duration) {
config := Config
config.MetricsUpdateInterval = interval
UpdateConfig(config)
}

View File

@ -152,11 +152,6 @@ type AudioConfigConstants struct {
MemoryFactor float64
LatencyFactor float64
// Adaptive Buffer Configuration
AdaptiveMinBufferSize int // Minimum buffer size in frames for adaptive buffering
AdaptiveMaxBufferSize int // Maximum buffer size in frames for adaptive buffering
AdaptiveDefaultBufferSize int // Default buffer size in frames for adaptive buffering
// Timing Configuration
RetryDelay time.Duration // Retry delay
MaxRetryDelay time.Duration // Maximum retry delay
@ -171,22 +166,17 @@ type AudioConfigConstants struct {
OutputSupervisorTimeout time.Duration // 5s
BatchProcessingDelay time.Duration // 10ms
// Adaptive Buffer Configuration
// LowCPUThreshold defines CPU usage threshold for buffer size reduction.
LowCPUThreshold float64 // 20% CPU threshold for buffer optimization
// HighCPUThreshold defines CPU usage threshold for buffer size increase.
HighCPUThreshold float64 // 60% CPU threshold
// System threshold configuration for buffer management
LowCPUThreshold float64 // CPU usage threshold for performance optimization
HighCPUThreshold float64 // CPU usage threshold for performance limits
LowMemoryThreshold float64 // 50% memory threshold
HighMemoryThreshold float64 // 75% memory threshold
AdaptiveBufferTargetLatency time.Duration // 20ms target latency
CooldownPeriod time.Duration // 30s cooldown period
RollbackThreshold time.Duration // 300ms rollback threshold
MaxLatencyThreshold time.Duration // 200ms max latency
JitterThreshold time.Duration // 20ms jitter threshold
LatencyOptimizationInterval time.Duration // 5s optimization interval
LatencyAdaptiveThreshold float64 // 0.8 adaptive threshold
MicContentionTimeout time.Duration // 200ms contention timeout
PreallocPercentage int // 20% preallocation percentage
BackoffStart time.Duration // 50ms initial backoff
@ -199,7 +189,6 @@ type AudioConfigConstants struct {
PercentageMultiplier float64 // Multiplier for percentage calculations (100.0)
AveragingWeight float64 // Weight for weighted averaging (0.7)
ScalingFactor float64 // General scaling factor (1.5)
SmoothingFactor float64 // Smoothing factor for adaptive buffers (0.3)
CPUMemoryWeight float64 // Weight for CPU factor in calculations (0.5)
MemoryWeight float64 // Weight for memory factor (0.3)
LatencyWeight float64 // Weight for latency factor (0.2)
@ -213,13 +202,6 @@ type AudioConfigConstants struct {
CGOPCMBufferSize int // PCM buffer size for CGO audio processing
CGONanosecondsPerSecond float64 // Nanoseconds per second conversion
// Batch Processing Constants
BatchProcessorFramesPerBatch int // Frames processed per batch (4)
BatchProcessorTimeout time.Duration // Batch processing timeout (5ms)
BatchProcessorMaxQueueSize int // Maximum batch queue size (16)
BatchProcessorAdaptiveThreshold float64 // Adaptive batch sizing threshold (0.8)
BatchProcessorThreadPinningThreshold int // Thread pinning threshold (8 frames)
// Output Streaming Constants
OutputStreamingFrameIntervalMS int // Output frame interval (20ms for 50 FPS)
@ -230,8 +212,6 @@ type AudioConfigConstants struct {
EventTimeFormatString string
EventSubscriptionDelayMS int
InputProcessingTimeoutMS int
AdaptiveBufferCPUMultiplier int
AdaptiveBufferMemoryMultiplier int
InputSocketName string
OutputSocketName string
AudioInputComponentName string
@ -493,17 +473,11 @@ func DefaultAudioConfig() *AudioConfigConstants {
OutputSupervisorTimeout: 5 * time.Second, // Output monitoring timeout
BatchProcessingDelay: 5 * time.Millisecond, // Reduced batch processing delay
// Adaptive Buffer Configuration - Optimized for single-core RV1106G3
// System Load Configuration - Optimized for single-core RV1106G3
LowCPUThreshold: 0.40, // Adjusted for single-core ARM system
HighCPUThreshold: 0.75, // Adjusted for single-core RV1106G3 (current load ~64%)
LowMemoryThreshold: 0.60,
HighMemoryThreshold: 0.85, // Adjusted for 200MB total memory system
AdaptiveBufferTargetLatency: 10 * time.Millisecond, // Aggressive target latency for responsiveness
// Adaptive Buffer Size Configuration - Optimized for quality change bursts
AdaptiveMinBufferSize: 256, // Further increased minimum to prevent emergency mode
AdaptiveMaxBufferSize: 1024, // Much higher maximum for quality changes
AdaptiveDefaultBufferSize: 512, // Higher default for stability during bursts
CooldownPeriod: 15 * time.Second, // Reduced cooldown period
RollbackThreshold: 200 * time.Millisecond, // Lower rollback threshold
@ -511,7 +485,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
MaxLatencyThreshold: 150 * time.Millisecond, // Lower max latency threshold
JitterThreshold: 15 * time.Millisecond, // Reduced jitter threshold
LatencyOptimizationInterval: 3 * time.Second, // More frequent optimization
LatencyAdaptiveThreshold: 0.7, // More aggressive adaptive threshold
// Microphone Contention Configuration
MicContentionTimeout: 200 * time.Millisecond,
@ -531,7 +504,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
AveragingWeight: 0.7, // Weight for smoothing values (70% recent, 30% historical)
ScalingFactor: 1.5, // General scaling factor for adaptive adjustments
SmoothingFactor: 0.3, // For adaptive buffer smoothing
CPUMemoryWeight: 0.5, // CPU factor weight in combined calculations
MemoryWeight: 0.3, // Memory factor weight in combined calculations
LatencyWeight: 0.2, // Latency factor weight in combined calculations
@ -544,13 +516,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
CGOPCMBufferSize: 1920, // 1920 samples for PCM buffer (max 2ch*960)
CGONanosecondsPerSecond: 1000000000.0, // 1000000000.0 for nanosecond conversions
// Batch Processing Constants - Optimized for quality change bursts
BatchProcessorFramesPerBatch: 16, // Larger batches for quality changes
BatchProcessorTimeout: 20 * time.Millisecond, // Longer timeout for bursts
BatchProcessorMaxQueueSize: 64, // Larger queue for quality changes
BatchProcessorAdaptiveThreshold: 0.6, // Lower threshold for faster adaptation
BatchProcessorThreadPinningThreshold: 8, // Lower threshold for better performance
// Output Streaming Constants - Balanced for stability
OutputStreamingFrameIntervalMS: 20, // 20ms frame interval (50 FPS) for stability
@ -572,10 +537,6 @@ func DefaultAudioConfig() *AudioConfigConstants {
// Input Processing Constants - Balanced for stability
InputProcessingTimeoutMS: 10, // 10ms processing timeout threshold
// Adaptive Buffer Constants
AdaptiveBufferCPUMultiplier: 100, // 100 multiplier for CPU percentage
AdaptiveBufferMemoryMultiplier: 100, // 100 multiplier for memory percentage
// Socket Names
InputSocketName: "audio_input.sock", // Socket name for audio input IPC
OutputSocketName: "audio_output.sock", // Socket name for audio output IPC

View File

@ -241,11 +241,6 @@ func (s *AudioControlService) SetAudioQuality(quality AudioQuality) {
SetAudioQuality(quality)
}
// SetMicrophoneQuality sets the microphone input quality
func (s *AudioControlService) SetMicrophoneQuality(quality AudioQuality) {
SetMicrophoneQuality(quality)
}
// GetAudioQualityPresets returns available audio quality presets
func (s *AudioControlService) GetAudioQualityPresets() map[AudioQuality]AudioConfig {
return GetAudioQualityPresets()

View File

@ -2,7 +2,6 @@ package audio
import (
"runtime"
"sync"
"sync/atomic"
"time"
@ -11,67 +10,6 @@ import (
)
var (
// Adaptive buffer metrics
adaptiveInputBufferSize = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_input_buffer_size_bytes",
Help: "Current adaptive input buffer size in bytes",
},
)
adaptiveOutputBufferSize = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_output_buffer_size_bytes",
Help: "Current adaptive output buffer size in bytes",
},
)
adaptiveBufferAdjustmentsTotal = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_adaptive_buffer_adjustments_total",
Help: "Total number of adaptive buffer size adjustments",
},
)
adaptiveSystemCpuPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_system_cpu_percent",
Help: "System CPU usage percentage used by adaptive buffer manager",
},
)
adaptiveSystemMemoryPercent = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_adaptive_system_memory_percent",
Help: "System memory usage percentage used by adaptive buffer manager",
},
)
// Socket buffer metrics
socketBufferSizeGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_socket_buffer_size_bytes",
Help: "Current socket buffer size in bytes",
},
[]string{"component", "buffer_type"}, // buffer_type: send, receive
)
socketBufferUtilizationGauge = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_socket_buffer_utilization_percent",
Help: "Socket buffer utilization percentage",
},
[]string{"component", "buffer_type"}, // buffer_type: send, receive
)
socketBufferOverflowCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "jetkvm_audio_socket_buffer_overflow_total",
Help: "Total number of socket buffer overflows",
},
[]string{"component", "buffer_type"}, // buffer_type: send, receive
)
// Audio output metrics
audioFramesReceivedTotal = promauto.NewCounter(
prometheus.CounterOpts{
@ -158,10 +96,7 @@ var (
},
)
// Device health metrics
// Removed device health metrics - functionality not used
// Memory metrics
// Memory metrics (basic monitoring)
memoryHeapAllocBytes = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_heap_alloc_bytes",
@ -169,20 +104,6 @@ var (
},
)
memoryHeapSysBytes = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_heap_sys_bytes",
Help: "Total heap system memory in bytes",
},
)
memoryHeapObjects = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_heap_objects",
Help: "Number of heap objects",
},
)
memoryGCCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_audio_memory_gc_total",
@ -190,73 +111,7 @@ var (
},
)
memoryGCCPUFraction = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_audio_memory_gc_cpu_fraction",
Help: "Fraction of CPU time spent in garbage collection",
},
)
// Buffer pool efficiency metrics
bufferPoolHitRate = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_hit_rate_percent",
Help: "Buffer pool hit rate percentage",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolMissRate = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_miss_rate_percent",
Help: "Buffer pool miss rate percentage",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolUtilization = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_utilization_percent",
Help: "Buffer pool utilization percentage",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolThroughput = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_throughput_ops_per_sec",
Help: "Buffer pool throughput in operations per second",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolGetLatency = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_get_latency_seconds",
Help: "Average buffer pool get operation latency in seconds",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
bufferPoolPutLatency = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_buffer_pool_put_latency_seconds",
Help: "Average buffer pool put operation latency in seconds",
},
[]string{"pool_name"}, // pool_name: frame_pool, control_pool, zero_copy_pool
)
// Latency percentile metrics
latencyPercentile = promauto.NewGaugeVec(
prometheus.GaugeOpts{
Name: "jetkvm_audio_latency_percentile_milliseconds",
Help: "Audio latency percentiles in milliseconds",
},
[]string{"source", "percentile"}, // source: input, output, processing; percentile: p50, p95, p99, min, max, avg
)
// Metrics update tracking
metricsUpdateMutex sync.RWMutex
lastMetricsUpdate int64
// Counter value tracking (since prometheus counters don't have Get() method)
@ -269,8 +124,6 @@ var (
micBytesProcessedValue uint64
micConnectionDropsValue uint64
// Atomic counters for device health metrics - functionality removed, no longer used
// Atomic counter for memory GC
memoryGCCountValue uint32
)
@ -374,49 +227,12 @@ func UpdateMicrophoneMetrics(metrics UnifiedAudioMetrics) {
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateAdaptiveBufferMetrics updates Prometheus metrics with adaptive buffer information
func UpdateAdaptiveBufferMetrics(inputBufferSize, outputBufferSize int, cpuPercent, memoryPercent float64, adjustmentMade bool) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
adaptiveInputBufferSize.Set(float64(inputBufferSize))
adaptiveOutputBufferSize.Set(float64(outputBufferSize))
adaptiveSystemCpuPercent.Set(cpuPercent)
adaptiveSystemMemoryPercent.Set(memoryPercent)
if adjustmentMade {
adaptiveBufferAdjustmentsTotal.Inc()
}
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateSocketBufferMetrics updates socket buffer metrics
func UpdateSocketBufferMetrics(component, bufferType string, size, utilization float64, overflowOccurred bool) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
socketBufferSizeGauge.WithLabelValues(component, bufferType).Set(size)
socketBufferUtilizationGauge.WithLabelValues(component, bufferType).Set(utilization)
if overflowOccurred {
socketBufferOverflowCounter.WithLabelValues(component, bufferType).Inc()
}
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateDeviceHealthMetrics - Placeholder for future device health metrics
// UpdateMemoryMetrics updates memory metrics
// UpdateMemoryMetrics updates basic memory metrics
func UpdateMemoryMetrics() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
memoryHeapAllocBytes.Set(float64(m.HeapAlloc))
memoryHeapSysBytes.Set(float64(m.HeapSys))
memoryHeapObjects.Set(float64(m.HeapObjects))
memoryGCCPUFraction.Set(m.GCCPUFraction)
// Update GC count with delta calculation
currentGCCount := uint32(m.NumGC)
@ -428,31 +244,6 @@ func UpdateMemoryMetrics() {
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateBufferPoolMetrics updates buffer pool efficiency metrics
func UpdateBufferPoolMetrics(poolName string, hitRate, missRate, utilization, throughput, getLatency, putLatency float64) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
bufferPoolHitRate.WithLabelValues(poolName).Set(hitRate * 100)
bufferPoolMissRate.WithLabelValues(poolName).Set(missRate * 100)
bufferPoolUtilization.WithLabelValues(poolName).Set(utilization * 100)
bufferPoolThroughput.WithLabelValues(poolName).Set(throughput)
bufferPoolGetLatency.WithLabelValues(poolName).Set(getLatency)
bufferPoolPutLatency.WithLabelValues(poolName).Set(putLatency)
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// UpdateLatencyMetrics updates latency percentile metrics
func UpdateLatencyMetrics(source, percentile string, latencyMilliseconds float64) {
metricsUpdateMutex.Lock()
defer metricsUpdateMutex.Unlock()
latencyPercentile.WithLabelValues(source, percentile).Set(latencyMilliseconds)
atomic.StoreInt64(&lastMetricsUpdate, time.Now().Unix())
}
// GetLastMetricsUpdate returns the timestamp of the last metrics update
func GetLastMetricsUpdate() time.Time {
timestamp := atomic.LoadInt64(&lastMetricsUpdate)

View File

@ -154,25 +154,6 @@ func ValidateMetricsInterval(interval time.Duration) error {
return nil
}
// ValidateAdaptiveBufferConfig validates adaptive buffer configuration
func ValidateAdaptiveBufferConfig(minSize, maxSize, defaultSize int) error {
if minSize <= 0 || maxSize <= 0 || defaultSize <= 0 {
return ErrInvalidBufferSize
}
if minSize >= maxSize {
return ErrInvalidBufferSize
}
if defaultSize < minSize || defaultSize > maxSize {
return ErrInvalidBufferSize
}
// Validate against global limits
maxBuffer := Config.SocketMaxBuffer
if maxSize > maxBuffer {
return ErrInvalidBufferSize
}
return nil
}
// ValidateInputIPCConfig validates input IPC configuration
func ValidateInputIPCConfig(sampleRate, channels, frameSize int) error {
minSampleRate := Config.MinSampleRate

View File

@ -1,329 +0,0 @@
package audio
import (
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Task represents a function to be executed by a worker in the pool
type Task func()
// GoroutinePool manages a pool of reusable goroutines to reduce the overhead
// of goroutine creation and destruction
type GoroutinePool struct {
// Atomic fields must be first for proper alignment on 32-bit systems
taskCount int64 // Number of tasks processed
workerCount int64 // Current number of workers
maxIdleTime time.Duration
maxWorkers int
taskQueue chan Task
workerSem chan struct{} // Semaphore to limit concurrent workers
shutdown chan struct{}
shutdownOnce sync.Once
wg sync.WaitGroup
logger *zerolog.Logger
name string
}
// NewGoroutinePool creates a new goroutine pool with the specified parameters
func NewGoroutinePool(name string, maxWorkers int, queueSize int, maxIdleTime time.Duration) *GoroutinePool {
logger := logging.GetDefaultLogger().With().Str("component", "goroutine-pool").Str("pool", name).Logger()
pool := &GoroutinePool{
maxWorkers: maxWorkers,
maxIdleTime: maxIdleTime,
taskQueue: make(chan Task, queueSize),
workerSem: make(chan struct{}, maxWorkers),
shutdown: make(chan struct{}),
logger: &logger,
name: name,
}
// Start a supervisor goroutine to monitor pool health
go pool.supervisor()
return pool
}
// Submit adds a task to the pool for execution
// Returns true if the task was accepted, false if the queue is full
func (p *GoroutinePool) Submit(task Task) bool {
select {
case <-p.shutdown:
return false // Pool is shutting down
case p.taskQueue <- task:
// Task accepted, ensure we have a worker to process it
p.ensureWorkerAvailable()
return true
default:
// Queue is full
return false
}
}
// SubmitWithBackpressure adds a task to the pool with backpressure handling
// Returns true if task was accepted, false if dropped due to backpressure
func (p *GoroutinePool) SubmitWithBackpressure(task Task) bool {
select {
case <-p.shutdown:
return false // Pool is shutting down
case p.taskQueue <- task:
// Task accepted, ensure we have a worker to process it
p.ensureWorkerAvailable()
return true
default:
// Queue is full - apply backpressure
// Check if we're in a high-load situation
queueLen := len(p.taskQueue)
queueCap := cap(p.taskQueue)
workerCount := atomic.LoadInt64(&p.workerCount)
// If queue is >90% full and we're at max workers, drop the task
if queueLen > int(float64(queueCap)*0.9) && workerCount >= int64(p.maxWorkers) {
p.logger.Warn().Int("queue_len", queueLen).Int("queue_cap", queueCap).Msg("Dropping task due to backpressure")
return false
}
// Try one more time with a short timeout
select {
case p.taskQueue <- task:
p.ensureWorkerAvailable()
return true
case <-time.After(1 * time.Millisecond):
// Still can't submit after timeout - drop task
p.logger.Debug().Msg("Task dropped after backpressure timeout")
return false
}
}
}
// ensureWorkerAvailable makes sure at least one worker is available to process tasks
func (p *GoroutinePool) ensureWorkerAvailable() {
// Check if we already have enough workers
currentWorkers := atomic.LoadInt64(&p.workerCount)
// Only start new workers if:
// 1. We have no workers at all, or
// 2. The queue is growing and we're below max workers
queueLen := len(p.taskQueue)
if currentWorkers == 0 || (queueLen > int(currentWorkers) && currentWorkers < int64(p.maxWorkers)) {
// Try to acquire a semaphore slot without blocking
select {
case p.workerSem <- struct{}{}:
// We got a slot, start a new worker
p.startWorker()
default:
// All worker slots are taken, which means we have enough workers
}
}
}
// startWorker launches a new worker goroutine
func (p *GoroutinePool) startWorker() {
p.wg.Add(1)
atomic.AddInt64(&p.workerCount, 1)
go func() {
defer func() {
atomic.AddInt64(&p.workerCount, -1)
<-p.workerSem // Release the semaphore slot
p.wg.Done()
// Recover from panics in worker tasks
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Worker recovered from panic")
}
}()
idleTimer := time.NewTimer(p.maxIdleTime)
defer idleTimer.Stop()
for {
select {
case <-p.shutdown:
return
case task, ok := <-p.taskQueue:
if !ok {
return // Channel closed
}
// Reset idle timer
if !idleTimer.Stop() {
<-idleTimer.C
}
idleTimer.Reset(p.maxIdleTime)
// Execute the task with panic recovery
func() {
defer func() {
if r := recover(); r != nil {
p.logger.Error().Interface("panic", r).Msg("Task execution panic recovered")
}
}()
task()
}()
atomic.AddInt64(&p.taskCount, 1)
case <-idleTimer.C:
// Worker has been idle for too long
// Keep at least 2 workers alive to handle incoming tasks without creating new goroutines
if atomic.LoadInt64(&p.workerCount) > 2 {
return
}
// For persistent workers (the minimum 2), use a longer idle timeout
// This prevents excessive worker creation/destruction cycles
idleTimer.Reset(p.maxIdleTime * 3) // Triple the idle time for persistent workers
}
}
}()
}
// supervisor monitors the pool and logs statistics periodically
func (p *GoroutinePool) supervisor() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-p.shutdown:
return
case <-ticker.C:
workers := atomic.LoadInt64(&p.workerCount)
tasks := atomic.LoadInt64(&p.taskCount)
queueLen := len(p.taskQueue)
p.logger.Debug().
Int64("workers", workers).
Int64("tasks_processed", tasks).
Int("queue_length", queueLen).
Msg("Pool statistics")
}
}
}
// Shutdown gracefully shuts down the pool
// If wait is true, it will wait for all tasks to complete
// If wait is false, it will terminate immediately, potentially leaving tasks unprocessed
func (p *GoroutinePool) Shutdown(wait bool) {
p.shutdownOnce.Do(func() {
close(p.shutdown)
if wait {
// Wait for all tasks to be processed
if len(p.taskQueue) > 0 {
p.logger.Debug().Int("remaining_tasks", len(p.taskQueue)).Msg("Waiting for tasks to complete")
}
// Close the task queue to signal no more tasks
close(p.taskQueue)
// Wait for all workers to finish
p.wg.Wait()
}
})
}
// GetStats returns statistics about the pool
func (p *GoroutinePool) GetStats() map[string]interface{} {
return map[string]interface{}{
"name": p.name,
"worker_count": atomic.LoadInt64(&p.workerCount),
"max_workers": p.maxWorkers,
"tasks_processed": atomic.LoadInt64(&p.taskCount),
"queue_length": len(p.taskQueue),
"queue_capacity": cap(p.taskQueue),
}
}
// Global pools for different audio processing tasks
var (
globalAudioProcessorPool atomic.Pointer[GoroutinePool]
globalAudioReaderPool atomic.Pointer[GoroutinePool]
globalAudioProcessorInitOnce sync.Once
globalAudioReaderInitOnce sync.Once
)
// GetAudioProcessorPool returns the global audio processor pool
func GetAudioProcessorPool() *GoroutinePool {
pool := globalAudioProcessorPool.Load()
if pool != nil {
return pool
}
globalAudioProcessorInitOnce.Do(func() {
config := Config
newPool := NewGoroutinePool(
"audio-processor",
config.MaxAudioProcessorWorkers,
config.AudioProcessorQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioProcessorPool.Store(newPool)
pool = newPool
})
return globalAudioProcessorPool.Load()
}
// GetAudioReaderPool returns the global audio reader pool
func GetAudioReaderPool() *GoroutinePool {
pool := globalAudioReaderPool.Load()
if pool != nil {
return pool
}
globalAudioReaderInitOnce.Do(func() {
config := Config
newPool := NewGoroutinePool(
"audio-reader",
config.MaxAudioReaderWorkers,
config.AudioReaderQueueSize,
config.WorkerMaxIdleTime,
)
globalAudioReaderPool.Store(newPool)
pool = newPool
})
return globalAudioReaderPool.Load()
}
// SubmitAudioProcessorTask submits a task to the audio processor pool
func SubmitAudioProcessorTask(task Task) bool {
return GetAudioProcessorPool().Submit(task)
}
// SubmitAudioReaderTask submits a task to the audio reader pool
func SubmitAudioReaderTask(task Task) bool {
return GetAudioReaderPool().Submit(task)
}
// SubmitAudioProcessorTaskWithBackpressure submits a task with backpressure handling
func SubmitAudioProcessorTaskWithBackpressure(task Task) bool {
return GetAudioProcessorPool().SubmitWithBackpressure(task)
}
// SubmitAudioReaderTaskWithBackpressure submits a task with backpressure handling
func SubmitAudioReaderTaskWithBackpressure(task Task) bool {
return GetAudioReaderPool().SubmitWithBackpressure(task)
}
// ShutdownAudioPools shuts down all audio goroutine pools
func ShutdownAudioPools(wait bool) {
logger := logging.GetDefaultLogger().With().Str("component", "audio-pools").Logger()
processorPool := globalAudioProcessorPool.Load()
if processorPool != nil {
logger.Info().Msg("Shutting down audio processor pool")
processorPool.Shutdown(wait)
}
readerPool := globalAudioReaderPool.Load()
if readerPool != nil {
logger.Info().Msg("Shutting down audio reader pool")
readerPool.Shutdown(wait)
}
}

View File

@ -70,9 +70,6 @@ func (aim *AudioInputManager) Stop() {
aim.logComponentStop(AudioInputManagerComponent)
// Flush any pending sampled metrics before stopping
aim.flushPendingMetrics()
// Stop the IPC-based audio input
aim.ipcManager.Stop()
@ -120,8 +117,6 @@ func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
if err != nil {
return err
}
aim.recordFrameProcessed(len(frame))
aim.updateLatency(processingTime)
return nil
}
@ -164,8 +159,6 @@ func (aim *AudioInputManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFrame)
// Update metrics
atomic.AddInt64(&aim.framesSent, 1)
aim.recordFrameProcessed(frame.Length())
aim.updateLatency(processingTime)
return nil
}

View File

@ -17,6 +17,7 @@ import (
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Global audio input server instance
@ -46,7 +47,7 @@ func RecoverGlobalAudioInputServer() {
// RunAudioInputServer runs the audio input server subprocess
// This should be called from main() when the subprocess is detected
func RunAudioInputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger()
logger := logging.GetSubsystemLogger("audio").With().Str("component", "audio-input-server").Logger()
// Parse OPUS configuration from environment variables
bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig()
@ -85,6 +86,10 @@ func RunAudioInputServer() error {
logger.Info().Msg("audio input server started, waiting for connections")
// Update C trace logging based on current audio scope log level (after environment variables are processed)
traceEnabled := logger.GetLevel() <= zerolog.TraceLevel
CGOSetTraceLogging(traceEnabled)
// Set up signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -119,6 +119,11 @@ func (ais *AudioInputSupervisor) startProcess() error {
// Set environment variables for IPC and OPUS configuration
env := append(os.Environ(), "JETKVM_AUDIO_INPUT_IPC=true") // Enable IPC mode
env = append(env, ais.opusEnv...) // Add OPUS configuration
// Pass logging environment variables directly to subprocess
// The subprocess will inherit all PION_LOG_* variables from os.Environ()
// This ensures the audio scope gets the correct trace level
ais.cmd.Env = env
// Set process group to allow clean termination

View File

@ -29,24 +29,22 @@ var (
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
type OptimizedIPCMessage struct {
header [17]byte // Pre-allocated header buffer (headerSize = 17)
data []byte // Reusable data buffer
msg UnifiedIPCMessage // Embedded message
header [17]byte
data []byte
msg UnifiedIPCMessage
}
// MessagePool manages a pool of reusable messages to reduce allocations
type MessagePool struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
hitCount int64 // Pool hit counter (atomic)
missCount int64 // Pool miss counter (atomic)
hitCount int64
missCount int64
// Other fields
pool chan *OptimizedIPCMessage
// Memory optimization fields
preallocated []*OptimizedIPCMessage // Pre-allocated messages for immediate use
preallocSize int // Number of pre-allocated messages
maxPoolSize int // Maximum pool size to prevent memory bloat
mutex sync.RWMutex // Protects preallocated slice
preallocated []*OptimizedIPCMessage
preallocSize int
maxPoolSize int
mutex sync.RWMutex
}
// Global message pool instance
@ -152,30 +150,25 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
}
}
// AudioInputServer handles IPC communication for audio input processing
type AudioInputServer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
bufferSize int64 // Current buffer size (atomic)
processingTime int64 // Average processing time in nanoseconds (atomic)
droppedFrames int64 // Dropped frames counter (atomic)
totalFrames int64 // Total frames counter (atomic)
bufferSize int64
processingTime int64
droppedFrames int64
totalFrames int64
listener net.Listener
conn net.Conn
mtx sync.Mutex
running bool
// Triple-goroutine architecture
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue
stopChan chan struct{} // Stop signal for all goroutines
wg sync.WaitGroup // Wait group for goroutine coordination
messageChan chan *UnifiedIPCMessage
processChan chan *UnifiedIPCMessage
stopChan chan struct{}
wg sync.WaitGroup
// Channel resizing support
channelMutex sync.RWMutex // Protects channel recreation
lastBufferSize int64 // Last known buffer size for change detection
channelMutex sync.RWMutex
lastBufferSize int64
// Socket buffer configuration
socketBufferConfig SocketBufferConfig
}
@ -211,8 +204,8 @@ func NewAudioInputServer() (*AudioInputServer, error) {
return nil, fmt.Errorf("failed to create unix socket after 3 attempts: %w", err)
}
// Get initial buffer size from config
initialBufferSize := int64(Config.AdaptiveDefaultBufferSize)
// Get initial buffer size (512 frames for stability)
initialBufferSize := int64(512)
// Ensure minimum buffer size to prevent immediate overflow
// Use at least 50 frames to handle burst traffic
@ -256,11 +249,8 @@ func (ais *AudioInputServer) Start() error {
ais.startProcessorGoroutine()
ais.startMonitorGoroutine()
// Submit the connection acceptor to the audio reader pool
if !SubmitAudioReaderTask(ais.acceptConnections) {
// If the pool is full or shutting down, fall back to direct goroutine creation
// Submit the connection acceptor directly
go ais.acceptConnections()
}
return nil
}
@ -335,12 +325,10 @@ func (ais *AudioInputServer) acceptConnections() {
ais.mtx.Unlock()
// Handle this connection using the goroutine pool
if !SubmitAudioReaderTask(func() { ais.handleConnection(conn) }) {
// If the pool is full or shutting down, fall back to direct goroutine creation
// Handle the connection directly
go ais.handleConnection(conn)
}
}
}
// handleConnection handles a single client connection
func (ais *AudioInputServer) handleConnection(conn net.Conn) {
@ -466,13 +454,9 @@ func (ais *AudioInputServer) processMessage(msg *UnifiedIPCMessage) error {
// processOpusFrame processes an Opus audio frame
func (ais *AudioInputServer) processOpusFrame(data []byte) error {
// Fast path: skip empty frame check - caller should handle this
dataLen := len(data)
if dataLen == 0 {
return nil
}
// Inline validation for critical audio path - avoid function call overhead
dataLen := len(data)
cachedMaxFrameSize := maxFrameSize
if dataLen > cachedMaxFrameSize {
return ErrFrameDataTooLarge
}
@ -485,8 +469,85 @@ func (ais *AudioInputServer) processOpusFrame(data []byte) error {
pcmBuffer := GetBufferFromPool(cache.MaxPCMBufferSize)
defer ReturnBufferToPool(pcmBuffer)
// Log audio processing details periodically for monitoring
totalFrames := atomic.AddInt64(&ais.totalFrames, 1)
// Zero-cost debug logging for buffer allocation (first few operations)
// Only perform computations if trace logging is actually enabled
if totalFrames <= 5 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if logger.GetLevel() <= zerolog.TraceLevel {
logger.Trace().
Int("requested_buffer_size", cache.MaxPCMBufferSize).
Int("pcm_buffer_length", len(pcmBuffer)).
Int("pcm_buffer_capacity", cap(pcmBuffer)).
Msg("PCM buffer allocated from pool")
}
}
if totalFrames <= 5 || totalFrames%500 == 1 {
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if logger.GetLevel() <= zerolog.TraceLevel {
logger.Trace().
Int("opus_frame_size", dataLen).
Int("pcm_buffer_size", len(pcmBuffer)).
Int64("total_frames_processed", totalFrames).
Msg("Processing audio frame for USB Gadget output")
}
}
// Direct CGO call - avoid wrapper function overhead
_, err := CGOAudioDecodeWrite(data, pcmBuffer)
start := time.Now()
framesWritten, err := CGOAudioDecodeWrite(data, pcmBuffer)
duration := time.Since(start)
// Log the result with detailed context
logger := logging.GetDefaultLogger().With().Str("component", AudioInputServerComponent).Logger()
if err != nil {
// Log error with detailed context for debugging
atomic.AddInt64(&ais.droppedFrames, 1)
// Get current statistics for context
total, success, failures, recovery, lastError, _ := GetAudioDecodeWriteStats()
successRate := float64(success) / float64(total) * 100
logger.Error().
Err(err).
Int("opus_frame_size", dataLen).
Dur("processing_duration", duration).
Int64("frames_written", int64(framesWritten)).
Int64("total_operations", total).
Int64("successful_operations", success).
Int64("failed_operations", failures).
Int64("recovery_attempts", recovery).
Float64("success_rate_percent", successRate).
Str("last_error", lastError).
Int64("total_frames_processed", totalFrames).
Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)).
Msg("Failed to decode/write audio frame to USB Gadget")
return err
}
// Log successful operations periodically to monitor health (zero-cost when trace disabled)
if (totalFrames <= 5 || totalFrames%1000 == 1) && logger.GetLevel() <= zerolog.TraceLevel {
// Get current statistics for context (only when trace is enabled)
total, success, failures, recovery, _, _ := GetAudioDecodeWriteStats()
successRate := float64(success) / float64(total) * 100
logger.Trace().
Int("opus_frame_size", dataLen).
Int64("frames_written", int64(framesWritten)).
Int64("total_operations", total).
Int64("successful_operations", success).
Int64("failed_operations", failures).
Int64("recovery_attempts", recovery).
Float64("success_rate_percent", successRate).
Int64("total_frames_processed", totalFrames).
Int64("dropped_frames", atomic.LoadInt64(&ais.droppedFrames)).
Msg("Successfully decoded/wrote audio frame to USB Gadget")
}
return err
}
@ -981,17 +1042,8 @@ func (ais *AudioInputServer) startReaderGoroutine() {
}
}
// Submit the reader task to the audio reader pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioReaderTaskWithBackpressure(readerTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio reader task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
// Handle the reader task directly
go readerTask()
}
// startProcessorGoroutine starts the message processor using the goroutine pool
@ -1073,17 +1125,8 @@ func (ais *AudioInputServer) startProcessorGoroutine() {
}
}
// Submit the processor task to the audio processor pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTaskWithBackpressure(processorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio processor task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
// Submit the processor task directly
go processorTask()
}
// processMessageWithRecovery processes a message with enhanced error recovery
@ -1182,9 +1225,6 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
atomic.StoreInt64(&ais.processingTime, newAvg)
}
// Report latency to adaptive buffer manager
ais.ReportLatency(latency)
if err != nil {
atomic.AddInt64(&ais.droppedFrames, 1)
}
@ -1206,17 +1246,8 @@ func (ais *AudioInputServer) startMonitorGoroutine() {
}
}
// Submit the monitor task to the audio processor pool with backpressure
logger := logging.GetDefaultLogger().With().Str("component", AudioInputClientComponent).Logger()
if !SubmitAudioProcessorTaskWithBackpressure(monitorTask) {
// Task was dropped due to backpressure - this is expected under high load
// Log at debug level to avoid spam, but track the drop
logger.Debug().Msg("Audio monitor task dropped due to backpressure")
// Don't fall back to unlimited goroutine creation
// Instead, let the system recover naturally
ais.wg.Done() // Decrement the wait group since we're not starting the task
}
// Submit the monitor task directly
go monitorTask()
}
// GetServerStats returns server performance statistics
@ -1227,18 +1258,13 @@ func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessi
atomic.LoadInt64(&ais.bufferSize)
}
// UpdateBufferSize updates the buffer size (now using fixed config values)
// UpdateBufferSize updates the buffer size (now using fixed values)
func (ais *AudioInputServer) UpdateBufferSize() {
// Buffer size is now fixed from config
newSize := int64(Config.AdaptiveDefaultBufferSize)
// Buffer size is now fixed at 512 frames for stability
newSize := int64(512)
atomic.StoreInt64(&ais.bufferSize, newSize)
}
// ReportLatency reports processing latency (now a no-op with fixed buffers)
func (ais *AudioInputServer) ReportLatency(latency time.Duration) {
// Latency reporting is now a no-op with fixed buffer sizes
}
// GetMessagePoolStats returns detailed statistics about the message pool
func (mp *MessagePool) GetMessagePoolStats() MessagePoolStats {
mp.mutex.RLock()

View File

@ -18,10 +18,9 @@ var globalOutputClientMessagePool = NewGenericMessagePool(Config.OutputMessagePo
// AudioOutputServer provides audio output IPC functionality
type AudioOutputServer struct {
// Atomic counters
bufferSize int64 // Current buffer size (atomic)
droppedFrames int64 // Dropped frames counter (atomic)
totalFrames int64 // Total frames counter (atomic)
bufferSize int64
droppedFrames int64
totalFrames int64
listener net.Listener
conn net.Conn
@ -29,12 +28,10 @@ type AudioOutputServer struct {
running bool
logger zerolog.Logger
// Message channels
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue
wg sync.WaitGroup // Wait group for goroutine coordination
messageChan chan *UnifiedIPCMessage
processChan chan *UnifiedIPCMessage
wg sync.WaitGroup
// Configuration
socketPath string
magicNumber uint32
}
@ -265,6 +262,17 @@ func (s *AudioOutputServer) SendFrame(frame []byte) error {
return fmt.Errorf("no client connected")
}
// Zero-cost trace logging for frame transmission
if s.logger.GetLevel() <= zerolog.TraceLevel {
totalFrames := atomic.LoadInt64(&s.totalFrames)
if totalFrames <= 5 || totalFrames%1000 == 1 {
s.logger.Trace().
Int("frame_size", len(frame)).
Int64("total_frames_sent", totalFrames).
Msg("Sending audio frame to output client")
}
}
msg := &UnifiedIPCMessage{
Magic: s.magicNumber,
Type: MessageTypeOpusFrame,
@ -301,9 +309,8 @@ func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize i
// AudioOutputClient provides audio output IPC client functionality
type AudioOutputClient struct {
// Atomic counters
droppedFrames int64 // Atomic counter for dropped frames
totalFrames int64 // Atomic counter for total frames
droppedFrames int64
totalFrames int64
conn net.Conn
mtx sync.Mutex
@ -311,10 +318,9 @@ type AudioOutputClient struct {
logger zerolog.Logger
socketPath string
magicNumber uint32
bufferPool *AudioBufferPool // Buffer pool for memory optimization
bufferPool *AudioBufferPool
// Health monitoring
autoReconnect bool // Enable automatic reconnection
autoReconnect bool
}
func NewAudioOutputClient() *AudioOutputClient {
@ -405,6 +411,7 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
}
size := binary.LittleEndian.Uint32(optMsg.header[5:9])
timestamp := int64(binary.LittleEndian.Uint64(optMsg.header[9:17]))
maxFrameSize := Config.OutputMaxFrameSize
if int(size) > maxFrameSize {
return nil, fmt.Errorf("received frame size validation failed: got %d bytes, maximum allowed %d bytes", size, maxFrameSize)
@ -423,6 +430,19 @@ func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
// Note: Caller is responsible for returning frame to pool via PutAudioFrameBuffer()
atomic.AddInt64(&c.totalFrames, 1)
// Zero-cost trace logging for frame reception
if c.logger.GetLevel() <= zerolog.TraceLevel {
totalFrames := atomic.LoadInt64(&c.totalFrames)
if totalFrames <= 5 || totalFrames%1000 == 1 {
c.logger.Trace().
Int("frame_size", int(size)).
Int64("timestamp", timestamp).
Int64("total_frames_received", totalFrames).
Msg("Received audio frame from output server")
}
}
return frame, nil
}

View File

@ -59,12 +59,6 @@ func (bam *BaseAudioManager) resetMetrics() {
bam.metrics.AverageLatency = 0
}
// flushPendingMetrics is now a no-op since we use direct atomic updates
func (bam *BaseAudioManager) flushPendingMetrics() {
// No-op: metrics are now updated directly without local buffering
// This function is kept for API compatibility
}
// getBaseMetrics returns a copy of the base metrics
func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics {
return BaseAudioMetrics{
@ -77,18 +71,6 @@ func (bam *BaseAudioManager) getBaseMetrics() BaseAudioMetrics {
}
}
// recordFrameProcessed records a processed frame with simplified tracking
func (bam *BaseAudioManager) recordFrameProcessed(bytes int) {
}
// recordFrameDropped records a dropped frame with simplified tracking
func (bam *BaseAudioManager) recordFrameDropped() {
}
// updateLatency updates the average latency
func (bam *BaseAudioManager) updateLatency(latency time.Duration) {
}
// logComponentStart logs component start with consistent format
func (bam *BaseAudioManager) logComponentStart(component string) {
bam.logger.Debug().Str("component", component).Msg("starting component")

View File

@ -104,19 +104,11 @@ func (aom *AudioOutputIPCManager) WriteOpusFrame(frame *ZeroCopyAudioFrame) erro
return fmt.Errorf("output frame validation failed: %w", err)
}
start := time.Now()
// Send frame to IPC server
if err := aom.server.SendFrame(frame.Data()); err != nil {
aom.recordFrameDropped()
return err
}
// Update metrics
processingTime := time.Since(start)
aom.recordFrameProcessed(frame.Length())
aom.updateLatency(processingTime)
return nil
}
@ -130,22 +122,14 @@ func (aom *AudioOutputIPCManager) WriteOpusFrameZeroCopy(frame *ZeroCopyAudioFra
return fmt.Errorf("audio output server not initialized")
}
start := time.Now()
// Extract frame data
frameData := frame.Data()
// Send frame to IPC server (zero-copy not available, use regular send)
if err := aom.server.SendFrame(frameData); err != nil {
aom.recordFrameDropped()
return err
}
// Update metrics
processingTime := time.Since(start)
aom.recordFrameProcessed(len(frameData))
aom.updateLatency(processingTime)
return nil
}

View File

@ -4,10 +4,12 @@ import (
"context"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// getEnvInt reads an integer from environment variable with a default value
@ -15,7 +17,7 @@ import (
// RunAudioOutputServer runs the audio output server subprocess
// This should be called from main() when the subprocess is detected
func RunAudioOutputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger()
logger := logging.GetSubsystemLogger("audio").With().Str("component", "audio-output-server").Logger()
// Parse OPUS configuration from environment variables
bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig()
@ -52,6 +54,27 @@ func RunAudioOutputServer() error {
logger.Info().Msg("audio output server started, waiting for connections")
// Update C trace logging based on current audio scope log level (after environment variables are processed)
loggerTraceEnabled := logger.GetLevel() <= zerolog.TraceLevel
// Manual check for audio scope in PION_LOG_TRACE (workaround for logging system bug)
manualTraceEnabled := false
pionTrace := os.Getenv("PION_LOG_TRACE")
if pionTrace != "" {
scopes := strings.Split(strings.ToLower(pionTrace), ",")
for _, scope := range scopes {
if strings.TrimSpace(scope) == "audio" {
manualTraceEnabled = true
break
}
}
}
// Use manual check as fallback if logging system fails
traceEnabled := loggerTraceEnabled || manualTraceEnabled
CGOSetTraceLogging(traceEnabled)
// Set up signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

View File

@ -14,8 +14,6 @@ import (
"github.com/rs/zerolog"
)
// Removed unused AudioOutputStreamer struct - actual streaming uses direct functions
var (
outputStreamingRunning int32
outputStreamingCancel context.CancelFunc
@ -30,22 +28,6 @@ func getOutputStreamingLogger() *zerolog.Logger {
return outputStreamingLogger
}
// Removed unused NewAudioOutputStreamer function
// Removed unused AudioOutputStreamer.Start method
// Removed unused AudioOutputStreamer.Stop method
// Removed unused AudioOutputStreamer.streamLoop method
// Removed unused AudioOutputStreamer.processingLoop method
// Removed unused AudioOutputStreamer.statisticsLoop method
// Removed unused AudioOutputStreamer.reportStatistics method
// Removed all unused AudioOutputStreamer methods
// StartAudioOutputStreaming starts audio output streaming (capturing system audio)
func StartAudioOutputStreaming(send func([]byte)) error {
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
@ -84,6 +66,7 @@ func StartAudioOutputStreaming(send func([]byte)) error {
maxConsecutiveErrors := Config.MaxConsecutiveErrors
errorBackoffDelay := Config.RetryDelay
maxErrorBackoff := Config.MaxRetryDelay
var frameCount int64
for {
select {
@ -143,11 +126,25 @@ func StartAudioOutputStreaming(send func([]byte)) error {
}
if n > 0 {
frameCount++
// Get frame buffer from pool to reduce allocations
frame := GetAudioFrameBuffer()
frame = frame[:n] // Resize to actual frame size
copy(frame, buffer[:n])
// Zero-cost trace logging for output frame processing
logger := getOutputStreamingLogger()
if logger.GetLevel() <= zerolog.TraceLevel {
if frameCount <= 5 || frameCount%1000 == 1 {
logger.Trace().
Int("frame_size", n).
Int("buffer_capacity", cap(frame)).
Int64("total_frames_sent", frameCount).
Msg("Audio output frame captured and buffered")
}
}
// Validate frame before sending
if err := ValidateAudioFrame(frame); err != nil {
getOutputStreamingLogger().Warn().Err(err).Msg("Frame validation failed, dropping frame")
@ -159,6 +156,16 @@ func StartAudioOutputStreaming(send func([]byte)) error {
// Return buffer to pool after sending
PutAudioFrameBuffer(frame)
RecordFrameReceived(n)
// Zero-cost trace logging for successful frame transmission
if logger.GetLevel() <= zerolog.TraceLevel {
if frameCount <= 5 || frameCount%1000 == 1 {
logger.Trace().
Int("frame_size", n).
Int64("total_frames_sent", frameCount).
Msg("Audio output frame sent successfully")
}
}
}
// Small delay to prevent busy waiting
time.Sleep(Config.ShortSleepDuration)

View File

@ -17,23 +17,6 @@ const (
AudioOutputSupervisorComponent = "audio-output-supervisor"
)
// Restart configuration is now retrieved from centralized config
func getMaxRestartAttempts() int {
return Config.MaxRestartAttempts
}
func getRestartWindow() time.Duration {
return Config.RestartWindow
}
func getRestartDelay() time.Duration {
return Config.RestartDelay
}
func getMaxRestartDelay() time.Duration {
return Config.MaxRestartDelay
}
// AudioOutputSupervisor manages the audio output server subprocess lifecycle
type AudioOutputSupervisor struct {
*BaseSupervisor
@ -175,10 +158,10 @@ func (s *AudioOutputSupervisor) supervisionLoop() {
ProcessType: "audio output server",
Timeout: Config.OutputSupervisorTimeout,
EnableRestart: true,
MaxRestartAttempts: getMaxRestartAttempts(),
RestartWindow: getRestartWindow(),
RestartDelay: getRestartDelay(),
MaxRestartDelay: getMaxRestartDelay(),
MaxRestartAttempts: Config.MaxRestartAttempts,
RestartWindow: Config.RestartWindow,
RestartDelay: Config.RestartDelay,
MaxRestartDelay: Config.MaxRestartDelay,
}
// Configure callbacks
@ -217,7 +200,13 @@ func (s *AudioOutputSupervisor) startProcess() error {
s.cmd.Stderr = os.Stderr
// Set environment variables for OPUS configuration
s.cmd.Env = append(os.Environ(), s.opusEnv...)
env := append(os.Environ(), s.opusEnv...)
// Pass logging environment variables directly to subprocess
// The subprocess will inherit all PION_LOG_* variables from os.Environ()
// This ensures the audio scope gets the correct trace level
s.cmd.Env = env
// Start the process
if err := s.cmd.Start(); err != nil {
@ -249,13 +238,13 @@ func (s *AudioOutputSupervisor) shouldRestart() bool {
now := time.Now()
var recentAttempts []time.Time
for _, attempt := range s.restartAttempts {
if now.Sub(attempt) < getRestartWindow() {
if now.Sub(attempt) < Config.RestartWindow {
recentAttempts = append(recentAttempts, attempt)
}
}
s.restartAttempts = recentAttempts
return len(s.restartAttempts) < getMaxRestartAttempts()
return len(s.restartAttempts) < Config.MaxRestartAttempts
}
// recordRestartAttempt records a restart attempt
@ -274,17 +263,17 @@ func (s *AudioOutputSupervisor) calculateRestartDelay() time.Duration {
// Exponential backoff based on recent restart attempts
attempts := len(s.restartAttempts)
if attempts == 0 {
return getRestartDelay()
return Config.RestartDelay
}
// Calculate exponential backoff: 2^attempts * base delay
delay := getRestartDelay()
for i := 0; i < attempts && delay < getMaxRestartDelay(); i++ {
delay := Config.RestartDelay
for i := 0; i < attempts && delay < Config.MaxRestartDelay; i++ {
delay *= 2
}
if delay > getMaxRestartDelay() {
delay = getMaxRestartDelay()
if delay > Config.MaxRestartDelay {
delay = Config.MaxRestartDelay
}
return delay

View File

@ -3,7 +3,7 @@
// Package audio provides real-time audio processing for JetKVM with low-latency streaming.
//
// Key components: output/input pipelines with Opus codec, adaptive buffer management,
// Key components: output/input pipelines with Opus codec, buffer management,
// zero-copy frame pools, IPC communication, and process supervision.
//
// Supports four quality presets (Low/Medium/High/Ultra) with configurable bitrates.
@ -260,97 +260,6 @@ func GetAudioConfig() AudioConfig {
return currentConfig
}
// Simplified OPUS parameter lookup table
var opusParams = map[AudioQuality]struct {
complexity, vbr, signalType, bandwidth, dtx int
}{
AudioQualityLow: {Config.AudioQualityLowOpusComplexity, Config.AudioQualityLowOpusVBR, Config.AudioQualityLowOpusSignalType, Config.AudioQualityLowOpusBandwidth, Config.AudioQualityLowOpusDTX},
AudioQualityMedium: {Config.AudioQualityMediumOpusComplexity, Config.AudioQualityMediumOpusVBR, Config.AudioQualityMediumOpusSignalType, Config.AudioQualityMediumOpusBandwidth, Config.AudioQualityMediumOpusDTX},
AudioQualityHigh: {Config.AudioQualityHighOpusComplexity, Config.AudioQualityHighOpusVBR, Config.AudioQualityHighOpusSignalType, Config.AudioQualityHighOpusBandwidth, Config.AudioQualityHighOpusDTX},
AudioQualityUltra: {Config.AudioQualityUltraOpusComplexity, Config.AudioQualityUltraOpusVBR, Config.AudioQualityUltraOpusSignalType, Config.AudioQualityUltraOpusBandwidth, Config.AudioQualityUltraOpusDTX},
}
// SetMicrophoneQuality updates the current microphone quality configuration
func SetMicrophoneQuality(quality AudioQuality) {
// Validate audio quality parameter
if err := ValidateAudioQuality(quality); err != nil {
// Log validation error but don't fail - maintain backward compatibility
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
logger.Warn().Err(err).Int("quality", int(quality)).Msg("invalid microphone quality, using current config")
return
}
presets := GetMicrophoneQualityPresets()
if config, exists := presets[quality]; exists {
currentMicrophoneConfig = config
// Get OPUS parameters using lookup table
params, exists := opusParams[quality]
if !exists {
// Fallback to medium quality
params = opusParams[AudioQualityMedium]
}
// Update audio input subprocess configuration dynamically without restart
logger := logging.GetDefaultLogger().With().Str("component", "audio").Logger()
// Set new OPUS configuration for future restarts
if supervisor := GetAudioInputSupervisor(); supervisor != nil {
supervisor.SetOpusConfig(config.Bitrate*1000, params.complexity, params.vbr, params.signalType, params.bandwidth, params.dtx)
// Check if microphone is active but IPC control is broken
inputManager := getAudioInputManager()
if inputManager.IsRunning() && !supervisor.IsConnected() {
// Reconnect the IPC control channel
supervisor.Stop()
time.Sleep(50 * time.Millisecond)
if err := supervisor.Start(); err != nil {
logger.Debug().Err(err).Msg("failed to reconnect IPC control channel")
}
}
// Send dynamic configuration update to running subprocess via IPC
if supervisor.IsConnected() {
// Convert AudioConfig to UnifiedIPCOpusConfig with complete Opus parameters
opusConfig := UnifiedIPCOpusConfig{
SampleRate: config.SampleRate,
Channels: config.Channels,
FrameSize: int(config.FrameSize.Milliseconds() * int64(config.SampleRate) / 1000), // Convert ms to samples
Bitrate: config.Bitrate * 1000, // Convert kbps to bps
Complexity: params.complexity,
VBR: params.vbr,
SignalType: params.signalType,
Bandwidth: params.bandwidth,
DTX: params.dtx,
}
if err := supervisor.SendOpusConfig(opusConfig); err != nil {
logger.Debug().Err(err).Msg("failed to send dynamic Opus config update via IPC")
// Fallback to subprocess restart if IPC update fails
supervisor.Stop()
if err := supervisor.Start(); err != nil {
logger.Error().Err(err).Msg("failed to restart audio input subprocess after IPC update failure")
}
} else {
logger.Info().Msg("audio input quality updated dynamically via IPC")
// Reset audio input stats after config update
go func() {
time.Sleep(Config.QualityChangeSettleDelay) // Wait for quality change to settle
// Reset audio input server stats to clear persistent warnings
ResetGlobalAudioInputServerStats()
// Attempt recovery if microphone is still having issues
time.Sleep(1 * time.Second)
RecoverGlobalAudioInputServer()
}()
}
} else {
logger.Info().Bool("supervisor_running", supervisor.IsRunning()).Msg("audio input subprocess not connected, configuration will apply on next start")
}
}
}
}
// GetMicrophoneConfig returns the current microphone configuration
func GetMicrophoneConfig() AudioConfig {
return currentMicrophoneConfig
@ -382,10 +291,12 @@ func RecordFrameReceived(bytes int) {
// RecordFrameDropped increments the frames dropped counter with batched updates
func RecordFrameDropped() {
atomic.AddUint64(&batchedFramesDropped, 1)
}
// RecordConnectionDrop increments the connection drops counter with batched updates
func RecordConnectionDrop() {
atomic.AddUint64(&batchedConnectionDrops, 1)
}
// flushBatchedMetrics flushes accumulated metrics to the main counters

View File

@ -1,217 +0,0 @@
package audio
import (
"sync"
"sync/atomic"
)
// SizedBufferPool manages a pool of buffers with size tracking
type SizedBufferPool struct {
// The underlying sync.Pool
pool sync.Pool
// Statistics for monitoring
totalBuffers atomic.Int64
totalBytes atomic.Int64
gets atomic.Int64
puts atomic.Int64
misses atomic.Int64
// Configuration
maxBufferSize int
defaultSize int
}
// NewSizedBufferPool creates a new sized buffer pool
func NewSizedBufferPool(defaultSize, maxBufferSize int) *SizedBufferPool {
pool := &SizedBufferPool{
maxBufferSize: maxBufferSize,
defaultSize: defaultSize,
}
pool.pool = sync.Pool{
New: func() interface{} {
// Track pool misses
pool.misses.Add(1)
// Create new buffer with default size
buf := make([]byte, defaultSize)
// Return pointer-like to avoid allocations
slice := buf[:0]
ptrSlice := &slice
// Track statistics
pool.totalBuffers.Add(1)
pool.totalBytes.Add(int64(cap(buf)))
return ptrSlice
},
}
return pool
}
// Get returns a buffer from the pool with at least the specified capacity
func (p *SizedBufferPool) Get(minCapacity int) []byte {
// Track gets
p.gets.Add(1)
// Get buffer from pool - handle pointer-like storage
var buf []byte
poolObj := p.pool.Get()
switch v := poolObj.(type) {
case *[]byte:
// Handle pointer-like storage from Put method
if v != nil {
buf = (*v)[:0] // Get the underlying slice
} else {
buf = make([]byte, 0, p.defaultSize)
}
case []byte:
// Handle direct slice for backward compatibility
buf = v
default:
// Fallback for unexpected types
buf = make([]byte, 0, p.defaultSize)
p.misses.Add(1)
}
// Check if buffer has sufficient capacity
if cap(buf) < minCapacity {
// Track statistics for the old buffer
p.totalBytes.Add(-int64(cap(buf)))
// Allocate new buffer with required capacity
buf = make([]byte, minCapacity)
// Track statistics for the new buffer
p.totalBytes.Add(int64(cap(buf)))
} else {
// Resize existing buffer
buf = buf[:minCapacity]
}
return buf
}
// Put returns a buffer to the pool
func (p *SizedBufferPool) Put(buf []byte) {
// Track statistics
p.puts.Add(1)
// Don't pool excessively large buffers to prevent memory bloat
if cap(buf) > p.maxBufferSize {
// Track statistics
p.totalBuffers.Add(-1)
p.totalBytes.Add(-int64(cap(buf)))
return
}
// Clear buffer contents for security
for i := range buf {
buf[i] = 0
}
// Return to pool - use pointer-like approach to avoid allocations
slice := buf[:0]
p.pool.Put(&slice)
}
// GetStats returns statistics about the buffer pool
func (p *SizedBufferPool) GetStats() (buffers, bytes, gets, puts, misses int64) {
buffers = p.totalBuffers.Load()
bytes = p.totalBytes.Load()
gets = p.gets.Load()
puts = p.puts.Load()
misses = p.misses.Load()
return
}
// BufferPoolStats contains statistics about a buffer pool
type BufferPoolStats struct {
TotalBuffers int64
TotalBytes int64
Gets int64
Puts int64
Misses int64
HitRate float64
AverageBufferSize float64
}
// GetDetailedStats returns detailed statistics about the buffer pool
func (p *SizedBufferPool) GetDetailedStats() BufferPoolStats {
buffers := p.totalBuffers.Load()
bytes := p.totalBytes.Load()
gets := p.gets.Load()
puts := p.puts.Load()
misses := p.misses.Load()
// Calculate hit rate
hitRate := 0.0
if gets > 0 {
hitRate = float64(gets-misses) / float64(gets) * 100.0
}
// Calculate average buffer size
avgSize := 0.0
if buffers > 0 {
avgSize = float64(bytes) / float64(buffers)
}
return BufferPoolStats{
TotalBuffers: buffers,
TotalBytes: bytes,
Gets: gets,
Puts: puts,
Misses: misses,
HitRate: hitRate,
AverageBufferSize: avgSize,
}
}
// Global audio buffer pools with different size classes
var (
// Small buffers (up to 4KB)
smallBufferPool = NewSizedBufferPool(1024, 4*1024)
// Medium buffers (4KB to 64KB)
mediumBufferPool = NewSizedBufferPool(8*1024, 64*1024)
// Large buffers (64KB to 1MB)
largeBufferPool = NewSizedBufferPool(64*1024, 1024*1024)
)
// GetOptimalBuffer returns a buffer from the most appropriate pool based on size
func GetOptimalBuffer(size int) []byte {
switch {
case size <= 4*1024:
return smallBufferPool.Get(size)
case size <= 64*1024:
return mediumBufferPool.Get(size)
default:
return largeBufferPool.Get(size)
}
}
// ReturnOptimalBuffer returns a buffer to the appropriate pool based on size
func ReturnOptimalBuffer(buf []byte) {
size := cap(buf)
switch {
case size <= 4*1024:
smallBufferPool.Put(buf)
case size <= 64*1024:
mediumBufferPool.Put(buf)
default:
largeBufferPool.Put(buf)
}
}
// GetAllPoolStats returns statistics for all buffer pools
func GetAllPoolStats() map[string]BufferPoolStats {
return map[string]BufferPoolStats{
"small": smallBufferPool.GetDetailedStats(),
"medium": mediumBufferPool.GetDetailedStats(),
"large": largeBufferPool.GetDetailedStats(),
}
}

View File

@ -156,23 +156,11 @@ func RecordSocketBufferMetrics(conn net.Conn, component string) {
}
// Get current socket buffer sizes
sendSize, recvSize, err := GetSocketBufferSizes(conn)
_, _, err := GetSocketBufferSizes(conn)
if err != nil {
// Log error but don't fail
return
}
// Record buffer sizes
socketBufferSizeGauge.WithLabelValues(component, "send").Set(float64(sendSize))
socketBufferSizeGauge.WithLabelValues(component, "receive").Set(float64(recvSize))
}
// RecordSocketBufferOverflow records a socket buffer overflow event
func RecordSocketBufferOverflow(component, bufferType string) {
socketBufferOverflowCounter.WithLabelValues(component, bufferType).Inc()
}
// UpdateSocketBufferUtilization updates socket buffer utilization metrics
func UpdateSocketBufferUtilization(component, bufferType string, utilizationPercent float64) {
socketBufferUtilizationGauge.WithLabelValues(component, bufferType).Set(utilizationPercent)
// Socket buffer sizes recorded for debugging if needed
}

View File

@ -48,7 +48,7 @@ type NetworkInterfaceOptions struct {
DefaultHostname string
OnStateChange func(state *NetworkInterfaceState)
OnInitialCheck func(state *NetworkInterfaceState)
OnDhcpLeaseChange func(lease *udhcpc.Lease)
OnDhcpLeaseChange func(lease *udhcpc.Lease, state *NetworkInterfaceState)
OnConfigChange func(config *NetworkConfig)
NetworkConfig *NetworkConfig
}
@ -94,7 +94,7 @@ func NewNetworkInterfaceState(opts *NetworkInterfaceOptions) (*NetworkInterfaceS
_ = s.updateNtpServersFromLease(lease)
_ = s.setHostnameIfNotSame()
opts.OnDhcpLeaseChange(lease)
opts.OnDhcpLeaseChange(lease, s)
},
})

View File

@ -9,17 +9,32 @@ import (
"github.com/beevik/ntp"
)
var defaultNTPServers = []string{
var defaultNTPServerIPs = []string{
// These servers are known by static IP and as such don't need DNS lookups
// These are from Google and Cloudflare since if they're down, the internet
// is broken anyway
"162.159.200.1", // time.cloudflare.com IPv4
"162.159.200.123", // time.cloudflare.com IPv4
"2606:4700:f1::1", // time.cloudflare.com IPv6
"2606:4700:f1::123", // time.cloudflare.com IPv6
"216.239.35.0", // time.google.com IPv4
"216.239.35.4", // time.google.com IPv4
"216.239.35.8", // time.google.com IPv4
"216.239.35.12", // time.google.com IPv4
"2001:4860:4806::", // time.google.com IPv6
"2001:4860:4806:4::", // time.google.com IPv6
"2001:4860:4806:8::", // time.google.com IPv6
"2001:4860:4806:c::", // time.google.com IPv6
}
var defaultNTPServerHostnames = []string{
// should use something from https://github.com/jauderho/public-ntp-servers
"time.apple.com",
"time.aws.com",
"time.windows.com",
"time.google.com",
"162.159.200.123", // time.cloudflare.com IPv4
"2606:4700:f1::123", // time.cloudflare.com IPv6
"0.pool.ntp.org",
"1.pool.ntp.org",
"2.pool.ntp.org",
"3.pool.ntp.org",
"time.cloudflare.com",
"pool.ntp.org",
}
func (t *TimeSync) queryNetworkTime(ntpServers []string) (now *time.Time, offset *time.Duration) {

View File

@ -158,6 +158,7 @@ func (t *TimeSync) Sync() error {
var (
now *time.Time
offset *time.Duration
log zerolog.Logger
)
metricTimeSyncCount.Inc()
@ -166,54 +167,54 @@ func (t *TimeSync) Sync() error {
Orders:
for _, mode := range syncMode.Ordering {
log = t.l.With().Str("mode", mode).Logger()
switch mode {
case "ntp_user_provided":
if syncMode.Ntp {
t.l.Info().Msg("using NTP custom servers")
log.Info().Msg("using NTP custom servers")
now, offset = t.queryNetworkTime(t.networkConfig.TimeSyncNTPServers)
if now != nil {
t.l.Info().Str("source", "NTP").Time("now", *now).Msg("time obtained")
break Orders
}
}
case "ntp_dhcp":
if syncMode.Ntp {
t.l.Info().Msg("using NTP servers from DHCP")
log.Info().Msg("using NTP servers from DHCP")
now, offset = t.queryNetworkTime(t.dhcpNtpAddresses)
if now != nil {
t.l.Info().Str("source", "NTP DHCP").Time("now", *now).Msg("time obtained")
break Orders
}
}
case "ntp":
if syncMode.Ntp && syncMode.NtpUseFallback {
t.l.Info().Msg("using NTP fallback")
now, offset = t.queryNetworkTime(defaultNTPServers)
log.Info().Msg("using NTP fallback IPs")
now, offset = t.queryNetworkTime(defaultNTPServerIPs)
if now == nil {
log.Info().Msg("using NTP fallback hostnames")
now, offset = t.queryNetworkTime(defaultNTPServerHostnames)
}
if now != nil {
t.l.Info().Str("source", "NTP fallback").Time("now", *now).Msg("time obtained")
break Orders
}
}
case "http_user_provided":
if syncMode.Http {
t.l.Info().Msg("using HTTP custom URLs")
log.Info().Msg("using HTTP custom URLs")
now = t.queryAllHttpTime(t.networkConfig.TimeSyncHTTPUrls)
if now != nil {
t.l.Info().Str("source", "HTTP").Time("now", *now).Msg("time obtained")
break Orders
}
}
case "http":
if syncMode.Http && syncMode.HttpUseFallback {
t.l.Info().Msg("using HTTP fallback")
log.Info().Msg("using HTTP fallback")
now = t.queryAllHttpTime(defaultHTTPUrls)
if now != nil {
t.l.Info().Str("source", "HTTP fallback").Time("now", *now).Msg("time obtained")
break Orders
}
}
default:
t.l.Warn().Str("mode", mode).Msg("unknown time sync mode, skipping")
log.Warn().Msg("unknown time sync mode, skipping")
}
}
@ -226,6 +227,8 @@ Orders:
now = &newNow
}
log.Info().Time("now", *now).Msg("time obtained")
err := t.setSystemTime(*now)
if err != nil {
return fmt.Errorf("failed to set system time: %w", err)

17
main.go
View File

@ -35,11 +35,6 @@ func startAudioSubprocess() error {
// Initialize validation cache for optimal performance
audio.InitValidationCache()
// Enable batch audio processing to reduce CGO call overhead
if err := audio.EnableBatchAudioProcessing(); err != nil {
logger.Warn().Err(err).Msg("failed to enable batch audio processing")
}
// Create audio server supervisor
audioSupervisor = audio.NewAudioOutputSupervisor()
@ -108,9 +103,6 @@ func startAudioSubprocess() error {
// Stop audio relay when process exits
audio.StopAudioRelay()
// Disable batch audio processing
audio.DisableBatchAudioProcessing()
},
// onRestart
func(attempt int, delay time.Duration) {
@ -254,16 +246,25 @@ func Main(audioServer bool, audioInputServer bool) {
if !config.AutoUpdateEnabled {
return
}
if isTimeSyncNeeded() || !timeSync.IsSyncSuccess() {
logger.Debug().Msg("system time is not synced, will retry in 30 seconds")
time.Sleep(30 * time.Second)
continue
}
if currentSession != nil {
logger.Debug().Msg("skipping update since a session is active")
time.Sleep(1 * time.Minute)
continue
}
includePreRelease := config.IncludePreRelease
err = TryUpdate(context.Background(), GetDeviceID(), includePreRelease)
if err != nil {
logger.Warn().Err(err).Msg("failed to auto update")
}
time.Sleep(1 * time.Hour)
}
}()

View File

@ -15,7 +15,7 @@ var (
networkState *network.NetworkInterfaceState
)
func networkStateChanged() {
func networkStateChanged(isOnline bool) {
// do not block the main thread
go waitCtrlAndRequestDisplayUpdate(true)
@ -37,6 +37,13 @@ func networkStateChanged() {
networkState.GetFQDN(),
}, true)
}
// if the network is now online, trigger an NTP sync if still needed
if isOnline && timeSync != nil && (isTimeSyncNeeded() || !timeSync.IsSyncSuccess()) {
if err := timeSync.Sync(); err != nil {
logger.Warn().Str("error", err.Error()).Msg("unable to sync time on network state change")
}
}
}
func initNetwork() error {
@ -48,13 +55,13 @@ func initNetwork() error {
NetworkConfig: config.NetworkConfig,
Logger: networkLogger,
OnStateChange: func(state *network.NetworkInterfaceState) {
networkStateChanged()
networkStateChanged(state.IsOnline())
},
OnInitialCheck: func(state *network.NetworkInterfaceState) {
networkStateChanged()
networkStateChanged(state.IsOnline())
},
OnDhcpLeaseChange: func(lease *udhcpc.Lease) {
networkStateChanged()
OnDhcpLeaseChange: func(lease *udhcpc.Lease, state *network.NetworkInterfaceState) {
networkStateChanged(state.IsOnline())
if currentSession == nil {
return
@ -64,7 +71,15 @@ func initNetwork() error {
},
OnConfigChange: func(networkConfig *network.NetworkConfig) {
config.NetworkConfig = networkConfig
networkStateChanged()
networkStateChanged(false)
if mDNS != nil {
_ = mDNS.SetListenOptions(networkConfig.GetMDNSMode())
_ = mDNS.SetLocalNames([]string{
networkState.GetHostname(),
networkState.GetFQDN(),
}, true)
}
},
})