[WIP] Cleanup: Reduce PR Complexity

This commit is contained in:
Alex P 2025-09-04 22:06:48 +00:00
parent c1cc8dd832
commit 4075057c2b
21 changed files with 662 additions and 1337 deletions

View File

@ -1,204 +0,0 @@
package audio
import (
"sync/atomic"
"time"
)
// AtomicCounter provides thread-safe counter operations
type AtomicCounter struct {
value int64
}
// NewAtomicCounter creates a new atomic counter
func NewAtomicCounter() *AtomicCounter {
return &AtomicCounter{}
}
// Add atomically adds delta to the counter and returns the new value
func (c *AtomicCounter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}
// Increment atomically increments the counter by 1
func (c *AtomicCounter) Increment() int64 {
return atomic.AddInt64(&c.value, 1)
}
// Load atomically loads the counter value
func (c *AtomicCounter) Load() int64 {
return atomic.LoadInt64(&c.value)
}
// Store atomically stores a new value
func (c *AtomicCounter) Store(value int64) {
atomic.StoreInt64(&c.value, value)
}
// Reset atomically resets the counter to zero
func (c *AtomicCounter) Reset() {
atomic.StoreInt64(&c.value, 0)
}
// Swap atomically swaps the value and returns the old value
func (c *AtomicCounter) Swap(new int64) int64 {
return atomic.SwapInt64(&c.value, new)
}
// FrameMetrics provides common frame tracking metrics
type FrameMetrics struct {
Total *AtomicCounter
Dropped *AtomicCounter
Bytes *AtomicCounter
}
// NewFrameMetrics creates a new frame metrics tracker
func NewFrameMetrics() *FrameMetrics {
return &FrameMetrics{
Total: NewAtomicCounter(),
Dropped: NewAtomicCounter(),
Bytes: NewAtomicCounter(),
}
}
// RecordFrame atomically records a successful frame with its size
func (fm *FrameMetrics) RecordFrame(size int64) {
fm.Total.Increment()
fm.Bytes.Add(size)
}
// RecordDrop atomically records a dropped frame
func (fm *FrameMetrics) RecordDrop() {
fm.Dropped.Increment()
}
// GetStats returns current metrics values
func (fm *FrameMetrics) GetStats() (total, dropped, bytes int64) {
return fm.Total.Load(), fm.Dropped.Load(), fm.Bytes.Load()
}
// Reset resets all metrics to zero
func (fm *FrameMetrics) Reset() {
fm.Total.Reset()
fm.Dropped.Reset()
fm.Bytes.Reset()
}
// GetDropRate calculates the drop rate as a percentage
func (fm *FrameMetrics) GetDropRate() float64 {
total := fm.Total.Load()
if total == 0 {
return 0.0
}
dropped := fm.Dropped.Load()
return float64(dropped) / float64(total) * 100.0
}
// LatencyTracker provides atomic latency tracking
type LatencyTracker struct {
current *AtomicCounter
min *AtomicCounter
max *AtomicCounter
average *AtomicCounter
samples *AtomicCounter
}
// NewLatencyTracker creates a new latency tracker
func NewLatencyTracker() *LatencyTracker {
lt := &LatencyTracker{
current: NewAtomicCounter(),
min: NewAtomicCounter(),
max: NewAtomicCounter(),
average: NewAtomicCounter(),
samples: NewAtomicCounter(),
}
// Initialize min to max value so first measurement sets it properly
lt.min.Store(int64(^uint64(0) >> 1)) // Max int64
return lt
}
// RecordLatency atomically records a new latency measurement
func (lt *LatencyTracker) RecordLatency(latency time.Duration) {
latencyNanos := latency.Nanoseconds()
lt.current.Store(latencyNanos)
lt.samples.Increment()
// Update min
for {
oldMin := lt.min.Load()
if latencyNanos >= oldMin {
break
}
if atomic.CompareAndSwapInt64(&lt.min.value, oldMin, latencyNanos) {
break
}
}
// Update max
for {
oldMax := lt.max.Load()
if latencyNanos <= oldMax {
break
}
if atomic.CompareAndSwapInt64(&lt.max.value, oldMax, latencyNanos) {
break
}
}
// Update average using exponential moving average
oldAvg := lt.average.Load()
newAvg := (oldAvg*7 + latencyNanos) / 8 // 87.5% weight to old average
lt.average.Store(newAvg)
}
// GetLatencyStats returns current latency statistics
func (lt *LatencyTracker) GetLatencyStats() (current, min, max, average time.Duration, samples int64) {
return time.Duration(lt.current.Load()),
time.Duration(lt.min.Load()),
time.Duration(lt.max.Load()),
time.Duration(lt.average.Load()),
lt.samples.Load()
}
// PoolMetrics provides common pool performance metrics
type PoolMetrics struct {
Hits *AtomicCounter
Misses *AtomicCounter
}
// NewPoolMetrics creates a new pool metrics tracker
func NewPoolMetrics() *PoolMetrics {
return &PoolMetrics{
Hits: NewAtomicCounter(),
Misses: NewAtomicCounter(),
}
}
// RecordHit atomically records a pool hit
func (pm *PoolMetrics) RecordHit() {
pm.Hits.Increment()
}
// RecordMiss atomically records a pool miss
func (pm *PoolMetrics) RecordMiss() {
pm.Misses.Increment()
}
// GetHitRate calculates the hit rate as a percentage
func (pm *PoolMetrics) GetHitRate() float64 {
hits := pm.Hits.Load()
misses := pm.Misses.Load()
total := hits + misses
if total == 0 {
return 0.0
}
return float64(hits) / float64(total) * 100.0
}
// GetStats returns hit and miss counts
func (pm *PoolMetrics) GetStats() (hits, misses int64, hitRate float64) {
hits = pm.Hits.Load()
misses = pm.Misses.Load()
hitRate = pm.GetHitRate()
return
}

View File

@ -910,11 +910,8 @@ func (c *AudioConfigCache) GetBufferTooLargeError() error {
// Removed duplicate config caching system - using AudioConfigCache instead
func cgoAudioReadEncode(buf []byte) (int, error) {
// Fast path: Use AudioConfigCache to avoid GetConfig() in hot path
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
// updateCacheIfNeeded updates cache only if expired to avoid overhead
func updateCacheIfNeeded(cache *AudioConfigCache) {
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
@ -925,6 +922,11 @@ func cgoAudioReadEncode(buf []byte) (int, error) {
} else {
cache.Update()
}
}
func cgoAudioReadEncode(buf []byte) (int, error) {
cache := GetCachedConfig()
updateCacheIfNeeded(cache)
// Fast validation with cached values - avoid lock with atomic access
minRequired := cache.GetMinReadEncodeBuffer()
@ -1073,14 +1075,12 @@ var (
// GetBufferFromPool gets a buffer from the pool with at least the specified capacity
func GetBufferFromPool(minCapacity int) []byte {
cgoBufferPoolGets.Add(1)
// Use the SizedBufferPool for better memory management
return GetOptimalBuffer(minCapacity)
}
// ReturnBufferToPool returns a buffer to the pool
func ReturnBufferToPool(buf []byte) {
cgoBufferPoolPuts.Add(1)
// Use the SizedBufferPool for better memory management
ReturnOptimalBuffer(buf)
}
@ -1151,104 +1151,49 @@ func (b *AudioFrameBatch) Release() {
*/
// ReadEncodeWithPooledBuffer reads audio data and encodes it using a buffer from the pool
// This reduces memory allocations by reusing buffers
func ReadEncodeWithPooledBuffer() ([]byte, int, error) {
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
updateCacheIfNeeded(cache)
// Get a buffer from the pool with appropriate capacity
bufferSize := cache.GetMinReadEncodeBuffer()
if bufferSize == 0 {
bufferSize = 1500 // Fallback if cache not initialized
bufferSize = 1500
}
// Get buffer from pool
buf := GetBufferFromPool(bufferSize)
// Perform read/encode operation
n, err := cgoAudioReadEncode(buf)
if err != nil {
// Return buffer to pool on error
ReturnBufferToPool(buf)
return nil, 0, err
}
// Resize buffer to actual data size
result := buf[:n]
// Return the buffer with data
return result, n, nil
return buf[:n], n, nil
}
// DecodeWriteWithPooledBuffer decodes and writes audio data using a pooled buffer
// The caller is responsible for returning the input buffer to the pool if needed
func DecodeWriteWithPooledBuffer(data []byte) (int, error) {
// Validate input
if len(data) == 0 {
return 0, errEmptyBuffer
}
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
updateCacheIfNeeded(cache)
// Ensure data doesn't exceed max packet size
maxPacketSize := cache.GetMaxPacketSize()
if len(data) > maxPacketSize {
return 0, newBufferTooLargeError(len(data), maxPacketSize)
}
// Get a PCM buffer from the pool for optimized decode-write
pcmBuffer := GetBufferFromPool(cache.GetMaxPCMBufferSize())
defer ReturnBufferToPool(pcmBuffer)
// Perform decode/write operation using optimized implementation
n, err := CGOAudioDecodeWrite(data, pcmBuffer)
// Return result
return n, err
return CGOAudioDecodeWrite(data, pcmBuffer)
}
// BatchReadEncode reads and encodes multiple audio frames in a single batch
// This reduces CGO call overhead by processing multiple frames at once
func BatchReadEncode(batchSize int) ([][]byte, error) {
// Get cached config
cache := GetCachedConfig()
// Only update cache if expired - avoid unnecessary overhead
// Use proper locking to avoid race condition
if cache.initialized.Load() {
cache.mutex.RLock()
cacheExpired := time.Since(cache.lastUpdate) > cache.cacheExpiry
cache.mutex.RUnlock()
if cacheExpired {
cache.Update()
}
} else {
cache.Update()
}
updateCacheIfNeeded(cache)
// Calculate total buffer size needed for batch
frameSize := cache.GetMinReadEncodeBuffer()

View File

@ -0,0 +1,56 @@
package audio
import (
"os"
"strconv"
"github.com/jetkvm/kvm/internal/logging"
)
// getEnvInt reads an integer value from environment variable with fallback to default
func getEnvInt(key string, defaultValue int) int {
if value := os.Getenv(key); value != "" {
if intValue, err := strconv.Atoi(value); err == nil {
return intValue
}
}
return defaultValue
}
// parseOpusConfig reads OPUS configuration from environment variables
// with fallback to default config values
func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) {
// Read configuration from environment variables with config defaults
bitrate = getEnvInt("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate)
complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity)
vbr = getEnvInt("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR)
signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType)
bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth)
dtx = getEnvInt("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX)
return bitrate, complexity, vbr, signalType, bandwidth, dtx
}
// applyOpusConfig applies OPUS configuration to the global config
// with optional logging for the specified component
func applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx int, component string, enableLogging bool) {
config := GetConfig()
config.CGOOpusBitrate = bitrate
config.CGOOpusComplexity = complexity
config.CGOOpusVBR = vbr
config.CGOOpusSignalType = signalType
config.CGOOpusBandwidth = bandwidth
config.CGOOpusDTX = dtx
if enableLogging {
logger := logging.GetDefaultLogger().With().Str("component", component).Logger()
logger.Info().
Int("bitrate", bitrate).
Int("complexity", complexity).
Int("vbr", vbr).
Int("signal_type", signalType).
Int("bandwidth", bandwidth).
Int("dtx", dtx).
Msg("applied OPUS configuration")
}
}

View File

@ -6,7 +6,6 @@ import (
"io"
"net"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
@ -16,65 +15,31 @@ import (
"github.com/rs/zerolog"
)
var (
inputMagicNumber uint32 = GetConfig().InputMagicNumber // "JKMI" (JetKVM Microphone Input)
inputSocketName = "audio_input.sock"
)
const (
headerSize = 17 // Fixed header size: 4+1+4+8 bytes - matches GetConfig().HeaderSize
)
// Constants are now defined in unified_ipc.go
var (
maxFrameSize = GetConfig().MaxFrameSize // Maximum Opus frame size
messagePoolSize = GetConfig().MessagePoolSize // Pre-allocated message pool size
)
// InputMessageType represents the type of IPC message
type InputMessageType uint8
// Legacy aliases for backward compatibility
type InputMessageType = UnifiedMessageType
type InputIPCMessage = UnifiedIPCMessage
// Legacy constants for backward compatibility
const (
InputMessageTypeOpusFrame InputMessageType = iota
InputMessageTypeConfig
InputMessageTypeOpusConfig
InputMessageTypeStop
InputMessageTypeHeartbeat
InputMessageTypeAck
InputMessageTypeOpusFrame = MessageTypeOpusFrame
InputMessageTypeConfig = MessageTypeConfig
InputMessageTypeOpusConfig = MessageTypeOpusConfig
InputMessageTypeStop = MessageTypeStop
InputMessageTypeHeartbeat = MessageTypeHeartbeat
InputMessageTypeAck = MessageTypeAck
)
// InputIPCMessage represents a message sent over IPC
type InputIPCMessage struct {
Magic uint32
Type InputMessageType
Length uint32
Timestamp int64
Data []byte
}
// Implement IPCMessage interface
func (msg *InputIPCMessage) GetMagic() uint32 {
return msg.Magic
}
func (msg *InputIPCMessage) GetType() uint8 {
return uint8(msg.Type)
}
func (msg *InputIPCMessage) GetLength() uint32 {
return msg.Length
}
func (msg *InputIPCMessage) GetTimestamp() int64 {
return msg.Timestamp
}
func (msg *InputIPCMessage) GetData() []byte {
return msg.Data
}
// Methods are now inherited from UnifiedIPCMessage
// OptimizedIPCMessage represents an optimized message with pre-allocated buffers
type OptimizedIPCMessage struct {
header [headerSize]byte // Pre-allocated header buffer
header [17]byte // Pre-allocated header buffer (headerSize = 17)
data []byte // Reusable data buffer
msg InputIPCMessage // Embedded message
}
@ -197,25 +162,9 @@ func (mp *MessagePool) Put(msg *OptimizedIPCMessage) {
}
}
// InputIPCConfig represents configuration for audio input
type InputIPCConfig struct {
SampleRate int
Channels int
FrameSize int
}
// InputIPCOpusConfig contains complete Opus encoder configuration
type InputIPCOpusConfig struct {
SampleRate int
Channels int
FrameSize int
Bitrate int
Complexity int
VBR int
SignalType int
Bandwidth int
DTX int
}
// Legacy aliases for backward compatibility
type InputIPCConfig = UnifiedIPCConfig
type InputIPCOpusConfig = UnifiedIPCOpusConfig
// AudioInputServer handles IPC communication for audio input processing
type AudioInputServer struct {
@ -1305,10 +1254,4 @@ func GetGlobalMessagePoolStats() MessagePoolStats {
// Helper functions
// getInputSocketPath returns the path to the input socket
func getInputSocketPath() string {
if path := os.Getenv("JETKVM_AUDIO_INPUT_SOCKET"); path != "" {
return path
}
return filepath.Join("/var/run", inputSocketName)
}
// getInputSocketPath is now defined in unified_ipc.go

View File

@ -13,7 +13,6 @@ import (
"context"
"os"
"os/signal"
"strconv"
"syscall"
"time"
@ -21,39 +20,6 @@ import (
)
// getEnvInt reads an integer from environment variable with a default value
func getEnvIntInput(key string, defaultValue int) int {
if value := os.Getenv(key); value != "" {
if intValue, err := strconv.Atoi(value); err == nil {
return intValue
}
}
return defaultValue
}
// parseOpusConfigInput reads OPUS configuration from environment variables
// with fallback to default config values for input server
func parseOpusConfigInput() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) {
// Read configuration from environment variables with config defaults
bitrate = getEnvIntInput("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate)
complexity = getEnvIntInput("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity)
vbr = getEnvIntInput("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR)
signalType = getEnvIntInput("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType)
bandwidth = getEnvIntInput("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth)
dtx = getEnvIntInput("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX)
return bitrate, complexity, vbr, signalType, bandwidth, dtx
}
// applyOpusConfigInput applies OPUS configuration to the global config for input server
func applyOpusConfigInput(bitrate, complexity, vbr, signalType, bandwidth, dtx int) {
config := GetConfig()
config.CGOOpusBitrate = bitrate
config.CGOOpusComplexity = complexity
config.CGOOpusVBR = vbr
config.CGOOpusSignalType = signalType
config.CGOOpusBandwidth = bandwidth
config.CGOOpusDTX = dtx
}
// RunAudioInputServer runs the audio input server subprocess
// This should be called from main() when the subprocess is detected
@ -62,8 +28,8 @@ func RunAudioInputServer() error {
logger.Debug().Msg("audio input server subprocess starting")
// Parse OPUS configuration from environment variables
bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfigInput()
applyOpusConfigInput(bitrate, complexity, vbr, signalType, bandwidth, dtx)
bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig()
applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx, "audio-input-server", false)
// Initialize validation cache for optimal performance
InitValidationCache()

View File

@ -48,8 +48,6 @@ func (ais *AudioInputSupervisor) SetOpusConfig(bitrate, complexity, vbr, signalT
}
}
// Start starts the audio input server subprocess
func (ais *AudioInputSupervisor) Start() error {
ais.mutex.Lock()
@ -122,15 +120,11 @@ func (ais *AudioInputSupervisor) Start() error {
return nil
}
// Stop stops the audio input server subprocess
func (ais *AudioInputSupervisor) Stop() {
ais.mutex.Lock()
defer ais.mutex.Unlock()
if !ais.IsRunning() {
return
}

View File

@ -4,359 +4,46 @@ import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
var (
outputMagicNumber uint32 = GetConfig().OutputMagicNumber // "JKOU" (JetKVM Output)
outputSocketName = "audio_output.sock"
)
// Output IPC constants are now centralized in config_constants.go
// outputMaxFrameSize, outputWriteTimeout, outputMaxDroppedFrames, outputHeaderSize, outputMessagePoolSize
// OutputIPCConfig represents configuration for audio output
type OutputIPCConfig struct {
SampleRate int
Channels int
FrameSize int
}
// OutputMessageType represents the type of IPC message
type OutputMessageType uint8
// Legacy aliases for backward compatibility
type OutputIPCConfig = UnifiedIPCConfig
type OutputMessageType = UnifiedMessageType
type OutputIPCMessage = UnifiedIPCMessage
// Legacy constants for backward compatibility
const (
OutputMessageTypeOpusFrame OutputMessageType = iota
OutputMessageTypeConfig
OutputMessageTypeStop
OutputMessageTypeHeartbeat
OutputMessageTypeAck
OutputMessageTypeOpusFrame = MessageTypeOpusFrame
OutputMessageTypeConfig = MessageTypeConfig
OutputMessageTypeStop = MessageTypeStop
OutputMessageTypeHeartbeat = MessageTypeHeartbeat
OutputMessageTypeAck = MessageTypeAck
)
// OutputIPCMessage represents a message sent over IPC
type OutputIPCMessage struct {
Magic uint32
Type OutputMessageType
Length uint32
Timestamp int64
Data []byte
}
// Implement IPCMessage interface
func (msg *OutputIPCMessage) GetMagic() uint32 {
return msg.Magic
}
func (msg *OutputIPCMessage) GetType() uint8 {
return uint8(msg.Type)
}
func (msg *OutputIPCMessage) GetLength() uint32 {
return msg.Length
}
func (msg *OutputIPCMessage) GetTimestamp() int64 {
return msg.Timestamp
}
func (msg *OutputIPCMessage) GetData() []byte {
return msg.Data
}
// Methods are now inherited from UnifiedIPCMessage
// Global shared message pool for output IPC client header reading
var globalOutputClientMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize)
type AudioOutputServer struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
bufferSize int64 // Current buffer size (atomic)
droppedFrames int64 // Dropped frames counter (atomic)
totalFrames int64 // Total frames counter (atomic)
listener net.Listener
conn net.Conn
mtx sync.Mutex
running bool
// Advanced message handling
messageChan chan *OutputIPCMessage // Buffered channel for incoming messages
stopChan chan struct{} // Stop signal
wg sync.WaitGroup // Wait group for goroutine coordination
// Latency monitoring
latencyMonitor *LatencyMonitor
adaptiveOptimizer *AdaptiveOptimizer
// Socket buffer configuration
socketBufferConfig SocketBufferConfig
}
// AudioOutputServer is now an alias for UnifiedAudioServer
type AudioOutputServer = UnifiedAudioServer
func NewAudioOutputServer() (*AudioOutputServer, error) {
socketPath := getOutputSocketPath()
// Remove existing socket if any
os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath)
if err != nil {
return nil, fmt.Errorf("failed to create unix socket: %w", err)
return NewUnifiedAudioServer(false) // false = output server
}
// Initialize with adaptive buffer size (start with 500 frames)
initialBufferSize := int64(GetConfig().InitialBufferFrames)
// Start method is now inherited from UnifiedAudioServer
// Initialize latency monitoring
latencyConfig := DefaultLatencyConfig()
logger := zerolog.New(os.Stderr).With().Timestamp().Str("component", AudioOutputServerComponent).Logger()
latencyMonitor := NewLatencyMonitor(latencyConfig, logger)
// acceptConnections method is now inherited from UnifiedAudioServer
// Initialize adaptive buffer manager with default config
bufferConfig := DefaultAdaptiveBufferConfig()
bufferManager := NewAdaptiveBufferManager(bufferConfig)
// startProcessorGoroutine method is now inherited from UnifiedAudioServer
// Initialize adaptive optimizer
optimizerConfig := DefaultOptimizerConfig()
adaptiveOptimizer := NewAdaptiveOptimizer(latencyMonitor, bufferManager, optimizerConfig, logger)
// Stop method is now inherited from UnifiedAudioServer
// Initialize socket buffer configuration
socketBufferConfig := DefaultSocketBufferConfig()
// Close method is now inherited from UnifiedAudioServer
return &AudioOutputServer{
listener: listener,
messageChan: make(chan *OutputIPCMessage, initialBufferSize),
stopChan: make(chan struct{}),
bufferSize: initialBufferSize,
latencyMonitor: latencyMonitor,
adaptiveOptimizer: adaptiveOptimizer,
socketBufferConfig: socketBufferConfig,
}, nil
}
func (s *AudioOutputServer) Start() error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.running {
return fmt.Errorf("server already running")
}
s.running = true
// Start latency monitoring and adaptive optimization
if s.latencyMonitor != nil {
s.latencyMonitor.Start()
}
if s.adaptiveOptimizer != nil {
s.adaptiveOptimizer.Start()
}
// Start message processor goroutine
s.startProcessorGoroutine()
// Submit the connection acceptor to the audio reader pool
if !SubmitAudioReaderTask(s.acceptConnections) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Warn().Msg("Audio reader pool full or shutting down, falling back to direct goroutine creation")
go s.acceptConnections()
}
return nil
}
// acceptConnections accepts incoming connections
func (s *AudioOutputServer) acceptConnections() {
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
for s.running {
conn, err := s.listener.Accept()
if err != nil {
if s.running {
// Log warning and retry on accept failure
logger.Warn().Err(err).Msg("Failed to accept connection, retrying")
continue
}
return
}
// Configure socket buffers for optimal performance
if err := ConfigureSocketBuffers(conn, s.socketBufferConfig); err != nil {
// Log warning but don't fail - socket buffer optimization is not critical
logger.Warn().Err(err).Msg("Failed to configure socket buffers, continuing with defaults")
} else {
// Record socket buffer metrics for monitoring
RecordSocketBufferMetrics(conn, "audio-output")
}
s.mtx.Lock()
// Close existing connection if any
if s.conn != nil {
s.conn.Close()
s.conn = nil
}
s.conn = conn
s.mtx.Unlock()
}
}
// startProcessorGoroutine starts the message processor using the goroutine pool
func (s *AudioOutputServer) startProcessorGoroutine() {
s.wg.Add(1)
// Create a processor task that will run in the goroutine pool
processorTask := func() {
defer s.wg.Done()
for {
select {
case msg := <-s.messageChan:
// Process message (currently just frame sending)
if msg.Type == OutputMessageTypeOpusFrame {
if err := s.sendFrameToClient(msg.Data); err != nil {
// Log error but continue processing
atomic.AddInt64(&s.droppedFrames, 1)
}
}
case <-s.stopChan:
return
}
}
}
// Submit the processor task to the audio processor pool
if !SubmitAudioProcessorTask(processorTask) {
// If the pool is full or shutting down, fall back to direct goroutine creation
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Warn().Msg("Audio processor pool full or shutting down, falling back to direct goroutine creation")
go processorTask()
}
}
func (s *AudioOutputServer) Stop() {
s.mtx.Lock()
defer s.mtx.Unlock()
if !s.running {
return
}
s.running = false
// Stop latency monitoring and adaptive optimization
if s.adaptiveOptimizer != nil {
s.adaptiveOptimizer.Stop()
}
if s.latencyMonitor != nil {
s.latencyMonitor.Stop()
}
// Signal processor to stop
close(s.stopChan)
s.wg.Wait()
if s.conn != nil {
s.conn.Close()
s.conn = nil
}
}
func (s *AudioOutputServer) Close() error {
s.Stop()
if s.listener != nil {
s.listener.Close()
}
// Remove socket file
os.Remove(getOutputSocketPath())
return nil
}
func (s *AudioOutputServer) SendFrame(frame []byte) error {
// Use ultra-fast validation for critical audio path
if err := ValidateAudioFrame(frame); err != nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputServerComponent).Logger()
logger.Error().Err(err).Msg("Frame validation failed")
return fmt.Errorf("output frame validation failed: %w", err)
}
// Additional output-specific size check
maxFrameSize := GetConfig().OutputMaxFrameSize
if len(frame) > maxFrameSize {
return fmt.Errorf("output frame size validation failed: got %d bytes, maximum allowed %d bytes", len(frame), maxFrameSize)
}
start := time.Now()
// Create IPC message
msg := &OutputIPCMessage{
Magic: outputMagicNumber,
Type: OutputMessageTypeOpusFrame,
Length: uint32(len(frame)),
Timestamp: start.UnixNano(),
Data: frame,
}
// Try to send via message channel (non-blocking)
select {
case s.messageChan <- msg:
atomic.AddInt64(&s.totalFrames, 1)
// Record latency for monitoring
if s.latencyMonitor != nil {
processingTime := time.Since(start)
s.latencyMonitor.RecordLatency(processingTime, "ipc_send")
}
return nil
default:
// Channel full, drop frame to prevent blocking
atomic.AddInt64(&s.droppedFrames, 1)
return fmt.Errorf("output message channel full (capacity: %d) - frame dropped to prevent blocking", cap(s.messageChan))
}
}
// sendFrameToClient sends frame data directly to the connected client
// Global shared message pool for output IPC server
var globalOutputServerMessagePool = NewGenericMessagePool(GetConfig().OutputMessagePoolSize)
func (s *AudioOutputServer) sendFrameToClient(frame []byte) error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.conn == nil {
return fmt.Errorf("no audio output client connected to server")
}
start := time.Now()
// Create output IPC message
msg := &OutputIPCMessage{
Magic: outputMagicNumber,
Type: OutputMessageTypeOpusFrame,
Length: uint32(len(frame)),
Timestamp: start.UnixNano(),
Data: frame,
}
// Use shared WriteIPCMessage function
err := WriteIPCMessage(s.conn, msg, globalOutputServerMessagePool, &s.droppedFrames)
if err != nil {
return err
}
// Record latency for monitoring
if s.latencyMonitor != nil {
writeLatency := time.Since(start)
s.latencyMonitor.RecordLatency(writeLatency, "ipc_write")
}
return nil
}
// SendFrame method is now inherited from UnifiedAudioServer
// GetServerStats returns server performance statistics
func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize int64) {
@ -364,82 +51,20 @@ func (s *AudioOutputServer) GetServerStats() (total, dropped int64, bufferSize i
return stats.Total, stats.Dropped, atomic.LoadInt64(&s.bufferSize)
}
type AudioOutputClient struct {
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
droppedFrames int64 // Atomic counter for dropped frames
totalFrames int64 // Atomic counter for total frames
conn net.Conn
mtx sync.Mutex
running bool
bufferPool *AudioBufferPool // Buffer pool for memory optimization
}
// AudioOutputClient is now an alias for UnifiedAudioClient
type AudioOutputClient = UnifiedAudioClient
func NewAudioOutputClient() *AudioOutputClient {
return &AudioOutputClient{
bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()),
}
return NewUnifiedAudioClient(false) // false = output client
}
// Connect connects to the audio output server
func (c *AudioOutputClient) Connect() error {
c.mtx.Lock()
defer c.mtx.Unlock()
// Connect method is now inherited from UnifiedAudioClient
if c.running {
return nil // Already connected
}
// Disconnect method is now inherited from UnifiedAudioClient
socketPath := getOutputSocketPath()
// Try connecting multiple times as the server might not be ready
// Reduced retry count and delay for faster startup
for i := 0; i < 8; i++ {
conn, err := net.Dial("unix", socketPath)
if err == nil {
c.conn = conn
c.running = true
return nil
}
// Exponential backoff starting from config
backoffStart := GetConfig().BackoffStart
delay := time.Duration(backoffStart.Nanoseconds()*(1<<uint(i/3))) * time.Nanosecond
maxDelay := GetConfig().MaxRetryDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
}
// IsConnected method is now inherited from UnifiedAudioClient
return fmt.Errorf("failed to connect to audio output server at %s after %d retries", socketPath, 8)
}
// Disconnect disconnects from the audio output server
func (c *AudioOutputClient) Disconnect() {
c.mtx.Lock()
defer c.mtx.Unlock()
if !c.running {
return
}
c.running = false
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
}
// IsConnected returns whether the client is connected
func (c *AudioOutputClient) IsConnected() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.running && c.conn != nil
}
func (c *AudioOutputClient) Close() error {
c.Disconnect()
return nil
}
// Close method is now inherited from UnifiedAudioClient
func (c *AudioOutputClient) ReceiveFrame() ([]byte, error) {
c.mtx.Lock()
@ -499,10 +124,4 @@ func (c *AudioOutputClient) GetClientStats() (total, dropped int64) {
// Helper functions
// getOutputSocketPath returns the path to the output socket
func getOutputSocketPath() string {
if path := os.Getenv("JETKVM_AUDIO_OUTPUT_SOCKET"); path != "" {
return path
}
return filepath.Join("/var/run", outputSocketName)
}
// getOutputSocketPath is now defined in unified_ipc.go

View File

@ -25,11 +25,9 @@ STANDARDIZED NAMING CONVENTIONS:
- AudioInputStreamer (new: for consistency with OutputStreamer)
Output Components:
- AudioOutputManager (new: missing high-level manager)
- AudioOutputSupervisor (replaces: AudioOutputSupervisor)
- AudioOutputServer (replaces: AudioOutputServer)
- AudioOutputClient (replaces: AudioOutputClient)
- AudioOutputStreamer (replaces: OutputStreamer)
3. IPC NAMING:
- AudioInputIPCManager (replaces: AudioInputIPCManager)
@ -46,18 +44,14 @@ STANDARDIZED NAMING CONVENTIONS:
- OutputMessageType (replaces: OutputMessageType)
ISSUES IDENTIFIED:
1. Missing AudioOutputManager (high-level output management)
2. Inconsistent naming: OutputStreamer vs AudioInputSupervisor
3. Missing AudioOutputIPCManager for symmetry
4. Missing OutputIPCConfig for consistency
5. Component names in logging should be standardized
1. Missing AudioOutputIPCManager for symmetry
2. Missing OutputIPCConfig for consistency
3. Component names in logging should be standardized
IMPLEMENTATION PLAN:
1. Create AudioOutputManager to match AudioInputManager
2. Rename OutputStreamer to AudioOutputStreamer
3. Create AudioOutputIPCManager for symmetry
4. Standardize all component logging names
5. Update all references consistently
1. Create AudioOutputIPCManager for symmetry
2. Standardize all component logging names
3. Update all references consistently
*/
// Component name constants for consistent logging
@ -70,11 +64,9 @@ const (
AudioInputIPCComponent = "audio-input-ipc"
// Output component names
AudioOutputManagerComponent = "audio-output-manager"
AudioOutputSupervisorComponent = "audio-output-supervisor"
AudioOutputServerComponent = "audio-output-server"
AudioOutputClientComponent = "audio-output-client"
AudioOutputStreamerComponent = "audio-output-streamer"
AudioOutputIPCComponent = "audio-output-ipc"
// Common component names

View File

@ -8,6 +8,15 @@ import (
"github.com/jetkvm/kvm/internal/logging"
)
// AudioOutputMetrics represents metrics for audio output operations
type AudioOutputMetrics struct {
// Atomic int64 field first for proper ARM32 alignment
FramesReceived int64 `json:"frames_received"` // Total frames received (output-specific)
// Embedded struct with atomic fields properly aligned
BaseAudioMetrics
}
// AudioOutputIPCManager manages audio output using IPC when enabled
type AudioOutputIPCManager struct {
*BaseAudioManager

View File

@ -1,151 +0,0 @@
package audio
import (
"sync/atomic"
"github.com/jetkvm/kvm/internal/logging"
)
// AudioOutputManager manages audio output stream using IPC mode
type AudioOutputManager struct {
*BaseAudioManager
streamer *AudioOutputStreamer
framesReceived int64 // Output-specific metric
}
// AudioOutputMetrics tracks output-specific metrics
// Atomic fields MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
type AudioOutputMetrics struct {
// Atomic int64 field first for proper ARM32 alignment
FramesReceived int64 `json:"frames_received"` // Total frames received (output-specific)
// Embedded struct with atomic fields properly aligned
BaseAudioMetrics
}
// NewAudioOutputManager creates a new audio output manager
func NewAudioOutputManager() *AudioOutputManager {
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputManagerComponent).Logger()
streamer, err := NewAudioOutputStreamer()
if err != nil {
// Log error but continue with nil streamer - will be handled gracefully
logger.Error().Err(err).Msg("Failed to create audio output streamer")
}
return &AudioOutputManager{
BaseAudioManager: NewBaseAudioManager(logger),
streamer: streamer,
}
}
// Start starts the audio output manager
func (aom *AudioOutputManager) Start() error {
if !aom.setRunning(true) {
return nil // Already running
}
aom.logComponentStart(AudioOutputManagerComponent)
if aom.streamer == nil {
// Try to recreate streamer if it was nil
streamer, err := NewAudioOutputStreamer()
if err != nil {
aom.setRunning(false)
aom.logComponentError(AudioOutputManagerComponent, err, "failed to create audio output streamer")
return err
}
aom.streamer = streamer
}
err := aom.streamer.Start()
if err != nil {
aom.setRunning(false)
// Reset metrics on failed start
aom.resetMetrics()
aom.logComponentError(AudioOutputManagerComponent, err, "failed to start component")
return err
}
aom.logComponentStarted(AudioOutputManagerComponent)
return nil
}
// Stop stops the audio output manager
func (aom *AudioOutputManager) Stop() {
if !aom.setRunning(false) {
return // Already stopped
}
aom.logComponentStop(AudioOutputManagerComponent)
if aom.streamer != nil {
aom.streamer.Stop()
}
aom.logComponentStopped(AudioOutputManagerComponent)
}
// resetMetrics resets all metrics to zero
func (aom *AudioOutputManager) resetMetrics() {
aom.BaseAudioManager.resetMetrics()
atomic.StoreInt64(&aom.framesReceived, 0)
}
// Note: IsRunning() is inherited from BaseAudioManager
// IsReady returns whether the audio output manager is ready to receive frames
func (aom *AudioOutputManager) IsReady() bool {
if !aom.IsRunning() || aom.streamer == nil {
return false
}
// For output, we consider it ready if the streamer is running
// This could be enhanced with connection status checks
return true
}
// GetMetrics returns current metrics
func (aom *AudioOutputManager) GetMetrics() AudioOutputMetrics {
return AudioOutputMetrics{
FramesReceived: atomic.LoadInt64(&aom.framesReceived),
BaseAudioMetrics: aom.getBaseMetrics(),
}
}
// GetComprehensiveMetrics returns detailed performance metrics
func (aom *AudioOutputManager) GetComprehensiveMetrics() map[string]interface{} {
baseMetrics := aom.GetMetrics()
comprehensiveMetrics := map[string]interface{}{
"manager": map[string]interface{}{
"frames_received": baseMetrics.FramesReceived,
"frames_processed": baseMetrics.FramesProcessed,
"frames_dropped": baseMetrics.FramesDropped,
"bytes_processed": baseMetrics.BytesProcessed,
"connection_drops": baseMetrics.ConnectionDrops,
"average_latency_ms": float64(baseMetrics.AverageLatency.Nanoseconds()) / 1e6,
"last_frame_time": baseMetrics.LastFrameTime,
"running": aom.IsRunning(),
"ready": aom.IsReady(),
},
}
if aom.streamer != nil {
processed, dropped, avgTime := aom.streamer.GetStats()
comprehensiveMetrics["streamer"] = map[string]interface{}{
"frames_processed": processed,
"frames_dropped": dropped,
"avg_processing_time_ms": float64(avgTime.Nanoseconds()) / 1e6,
}
if detailedStats := aom.streamer.GetDetailedStats(); detailedStats != nil {
comprehensiveMetrics["detailed"] = detailedStats
}
}
return comprehensiveMetrics
}
// GetStreamer returns the streamer for advanced operations
func (aom *AudioOutputManager) GetStreamer() *AudioOutputStreamer {
return aom.streamer
}

View File

@ -4,7 +4,6 @@ import (
"context"
"os"
"os/signal"
"strconv"
"syscall"
"time"
@ -12,50 +11,6 @@ import (
)
// getEnvInt reads an integer from environment variable with a default value
func getEnvInt(key string, defaultValue int) int {
if value := os.Getenv(key); value != "" {
if intValue, err := strconv.Atoi(value); err == nil {
return intValue
}
}
return defaultValue
}
// parseOpusConfig reads OPUS configuration from environment variables
// with fallback to default config values
func parseOpusConfig() (bitrate, complexity, vbr, signalType, bandwidth, dtx int) {
// Read configuration from environment variables with config defaults
bitrate = getEnvInt("JETKVM_OPUS_BITRATE", GetConfig().CGOOpusBitrate)
complexity = getEnvInt("JETKVM_OPUS_COMPLEXITY", GetConfig().CGOOpusComplexity)
vbr = getEnvInt("JETKVM_OPUS_VBR", GetConfig().CGOOpusVBR)
signalType = getEnvInt("JETKVM_OPUS_SIGNAL_TYPE", GetConfig().CGOOpusSignalType)
bandwidth = getEnvInt("JETKVM_OPUS_BANDWIDTH", GetConfig().CGOOpusBandwidth)
dtx = getEnvInt("JETKVM_OPUS_DTX", GetConfig().CGOOpusDTX)
return bitrate, complexity, vbr, signalType, bandwidth, dtx
}
// applyOpusConfig applies OPUS configuration to the global config
func applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx int) {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-server").Logger()
config := GetConfig()
config.CGOOpusBitrate = bitrate
config.CGOOpusComplexity = complexity
config.CGOOpusVBR = vbr
config.CGOOpusSignalType = signalType
config.CGOOpusBandwidth = bandwidth
config.CGOOpusDTX = dtx
logger.Info().
Int("bitrate", bitrate).
Int("complexity", complexity).
Int("vbr", vbr).
Int("signal_type", signalType).
Int("bandwidth", bandwidth).
Int("dtx", dtx).
Msg("applied OPUS configuration")
}
// RunAudioOutputServer runs the audio output server subprocess
// This should be called from main() when the subprocess is detected
@ -65,7 +20,7 @@ func RunAudioOutputServer() error {
// Parse OPUS configuration from environment variables
bitrate, complexity, vbr, signalType, bandwidth, dtx := parseOpusConfig()
applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx)
applyOpusConfig(bitrate, complexity, vbr, signalType, bandwidth, dtx, "audio-output-server", true)
// Initialize validation cache for optimal performance
InitValidationCache()
@ -76,7 +31,7 @@ func RunAudioOutputServer() error {
logger.Error().Err(err).Msg("failed to create audio server")
return err
}
defer server.Close()
defer server.Stop()
// Start accepting connections
if err := server.Start(); err != nil {

View File

@ -6,9 +6,7 @@ package audio
import (
"context"
"fmt"
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
@ -16,31 +14,7 @@ import (
"github.com/rs/zerolog"
)
// AudioOutputStreamer manages high-performance audio output streaming
type AudioOutputStreamer struct {
// Atomic int64 fields MUST be first for ARM32 alignment (8-byte alignment required)
processedFrames int64 // Total processed frames counter (atomic)
droppedFrames int64 // Dropped frames counter (atomic)
processingTime int64 // Average processing time in nanoseconds (atomic)
lastStatsTime int64 // Last statistics update time (atomic)
// Other fields after atomic int64 fields
sampleRate int32 // Sample every N frames (default: 10)
client *AudioOutputClient
bufferPool *AudioBufferPool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
running bool
mtx sync.Mutex
chanClosed bool // Track if processing channel is closed
// Adaptive processing configuration
batchSize int // Adaptive batch size for frame processing
processingChan chan []byte // Buffered channel for frame processing
statsInterval time.Duration // Statistics reporting interval
}
// Removed unused AudioOutputStreamer struct - actual streaming uses direct functions
var (
outputStreamingRunning int32
@ -50,298 +24,27 @@ var (
func getOutputStreamingLogger() *zerolog.Logger {
if outputStreamingLogger == nil {
logger := logging.GetDefaultLogger().With().Str("component", AudioOutputStreamerComponent).Logger()
logger := logging.GetDefaultLogger().With().Str("component", "audio-output-streaming").Logger()
outputStreamingLogger = &logger
}
return outputStreamingLogger
}
func NewAudioOutputStreamer() (*AudioOutputStreamer, error) {
client := NewAudioOutputClient()
// Removed unused NewAudioOutputStreamer function
// Get initial batch size from adaptive buffer manager
adaptiveManager := GetAdaptiveBufferManager()
initialBatchSize := adaptiveManager.GetOutputBufferSize()
// Removed unused AudioOutputStreamer.Start method
ctx, cancel := context.WithCancel(context.Background())
return &AudioOutputStreamer{
client: client,
bufferPool: NewAudioBufferPool(GetMaxAudioFrameSize()), // Use existing buffer pool
ctx: ctx,
cancel: cancel,
batchSize: initialBatchSize, // Use adaptive batch size
processingChan: make(chan []byte, GetConfig().ChannelBufferSize), // Large buffer for smooth processing
statsInterval: GetConfig().StatsUpdateInterval, // Statistics interval from config
lastStatsTime: time.Now().UnixNano(),
sampleRate: 10, // Update metrics every 10 frames to reduce atomic ops
}, nil
}
// Removed unused AudioOutputStreamer.Stop method
func (s *AudioOutputStreamer) Start() error {
s.mtx.Lock()
defer s.mtx.Unlock()
// Removed unused AudioOutputStreamer.streamLoop method
if s.running {
return fmt.Errorf("output streamer already running")
}
// Removed unused AudioOutputStreamer.processingLoop method
// Connect to audio output server
if err := s.client.Connect(); err != nil {
return fmt.Errorf("failed to connect to audio output server at %s: %w", getOutputSocketPath(), err)
}
// Removed unused AudioOutputStreamer.statisticsLoop method
s.running = true
// Removed unused AudioOutputStreamer.reportStatistics method
// Start multiple goroutines for optimal performance
s.wg.Add(3)
go s.streamLoop() // Main streaming loop
go s.processingLoop() // Frame processing loop
go s.statisticsLoop() // Performance monitoring loop
return nil
}
func (s *AudioOutputStreamer) Stop() {
s.mtx.Lock()
defer s.mtx.Unlock()
if !s.running {
return
}
s.running = false
s.cancel()
// Flush any pending sampled metrics before stopping
s.flushPendingMetrics()
// Close processing channel to signal goroutines (only if not already closed)
if !s.chanClosed {
close(s.processingChan)
s.chanClosed = true
}
// Wait for all goroutines to finish
s.wg.Wait()
if s.client != nil {
s.client.Close()
}
}
func (s *AudioOutputStreamer) streamLoop() {
defer s.wg.Done()
// Only pin to OS thread for high-throughput scenarios to reduce scheduler interference
config := GetConfig()
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}
// Adaptive timing for frame reading
frameInterval := time.Duration(GetConfig().OutputStreamingFrameIntervalMS) * time.Millisecond // 50 FPS base rate
ticker := time.NewTicker(frameInterval)
defer ticker.Stop()
// Batch size update ticker
batchUpdateTicker := time.NewTicker(GetConfig().BufferUpdateInterval)
defer batchUpdateTicker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-batchUpdateTicker.C:
// Update batch size from adaptive buffer manager
s.UpdateBatchSize()
case <-ticker.C:
// Read audio data from CGO with timing measurement
startTime := time.Now()
frameBuf := s.bufferPool.Get()
n, err := CGOAudioReadEncode(frameBuf)
processingDuration := time.Since(startTime)
if err != nil {
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to read audio data")
s.bufferPool.Put(frameBuf)
atomic.AddInt64(&s.droppedFrames, 1)
continue
}
if n > 0 {
// Send frame for processing (non-blocking)
// Use buffer pool to avoid allocation
frameData := s.bufferPool.Get()
frameData = frameData[:n]
copy(frameData, frameBuf[:n])
select {
case s.processingChan <- frameData:
atomic.AddInt64(&s.processedFrames, 1)
// Update processing time statistics
atomic.StoreInt64(&s.processingTime, int64(processingDuration))
// Report latency to adaptive buffer manager
s.ReportLatency(processingDuration)
default:
// Processing channel full, drop frame
atomic.AddInt64(&s.droppedFrames, 1)
}
}
s.bufferPool.Put(frameBuf)
}
}
}
// processingLoop handles frame processing in a separate goroutine
func (s *AudioOutputStreamer) processingLoop() {
defer s.wg.Done()
// Only use thread optimizations for high-throughput scenarios
config := GetConfig()
useThreadOptimizations := config.MaxAudioProcessorWorkers > 8
if useThreadOptimizations {
// Pin goroutine to OS thread for consistent performance
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}
for frameData := range s.processingChan {
// Process frame and return buffer to pool after processing
func() {
defer s.bufferPool.Put(frameData)
if _, err := s.client.ReceiveFrame(); err != nil {
if s.client.IsConnected() {
// Sample logging to reduce overhead - log every 50th error
if atomic.LoadInt64(&s.droppedFrames)%50 == 0 && getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel {
getOutputStreamingLogger().Warn().Err(err).Msg("Error reading audio frame from output server")
}
s.recordFrameDropped()
}
// Try to reconnect if disconnected
if !s.client.IsConnected() {
if err := s.client.Connect(); err != nil {
// Only log reconnection failures if warn level enabled
if getOutputStreamingLogger().GetLevel() <= zerolog.WarnLevel {
getOutputStreamingLogger().Warn().Err(err).Msg("Failed to reconnect")
}
}
}
} else {
s.recordFrameProcessed()
}
}()
}
}
// statisticsLoop monitors and reports performance statistics
func (s *AudioOutputStreamer) statisticsLoop() {
defer s.wg.Done()
ticker := time.NewTicker(s.statsInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.reportStatistics()
}
}
}
// reportStatistics logs current performance statistics
func (s *AudioOutputStreamer) reportStatistics() {
processed := atomic.LoadInt64(&s.processedFrames)
dropped := atomic.LoadInt64(&s.droppedFrames)
processingTime := atomic.LoadInt64(&s.processingTime)
if processed > 0 {
dropRate := float64(dropped) / float64(processed+dropped) * GetConfig().PercentageMultiplier
avgProcessingTime := time.Duration(processingTime)
getOutputStreamingLogger().Info().Int64("processed", processed).Int64("dropped", dropped).Float64("drop_rate", dropRate).Dur("avg_processing", avgProcessingTime).Msg("Output Audio Stats")
// Get client statistics
clientTotal, clientDropped := s.client.GetClientStats()
getOutputStreamingLogger().Info().Int64("total", clientTotal).Int64("dropped", clientDropped).Msg("Client Stats")
}
}
// recordFrameProcessed records a processed frame with sampling optimization
func (s *AudioOutputStreamer) recordFrameProcessed() {
}
// recordFrameDropped records a dropped frame with sampling optimization
func (s *AudioOutputStreamer) recordFrameDropped() {
}
// flushPendingMetrics flushes any pending sampled metrics to atomic counters
func (s *AudioOutputStreamer) flushPendingMetrics() {
}
// GetStats returns streaming statistics with pending metrics flushed
func (s *AudioOutputStreamer) GetStats() (processed, dropped int64, avgProcessingTime time.Duration) {
// Flush pending metrics for accurate reading
s.flushPendingMetrics()
processed = atomic.LoadInt64(&s.processedFrames)
dropped = atomic.LoadInt64(&s.droppedFrames)
processingTimeNs := atomic.LoadInt64(&s.processingTime)
avgProcessingTime = time.Duration(processingTimeNs)
return
}
// GetDetailedStats returns comprehensive streaming statistics
func (s *AudioOutputStreamer) GetDetailedStats() map[string]interface{} {
// Flush pending metrics for accurate reading
s.flushPendingMetrics()
processed := atomic.LoadInt64(&s.processedFrames)
dropped := atomic.LoadInt64(&s.droppedFrames)
processingTime := atomic.LoadInt64(&s.processingTime)
stats := map[string]interface{}{
"processed_frames": processed,
"dropped_frames": dropped,
"avg_processing_time_ns": processingTime,
"batch_size": s.batchSize,
"channel_buffer_size": cap(s.processingChan),
"channel_current_size": len(s.processingChan),
"connected": s.client.IsConnected(),
}
if processed+dropped > 0 {
stats["drop_rate_percent"] = float64(dropped) / float64(processed+dropped) * GetConfig().PercentageMultiplier
}
// Add client statistics
clientTotal, clientDropped := s.client.GetClientStats()
stats["client_total_frames"] = clientTotal
stats["client_dropped_frames"] = clientDropped
return stats
}
// UpdateBatchSize updates the batch size from adaptive buffer manager
func (s *AudioOutputStreamer) UpdateBatchSize() {
s.mtx.Lock()
adaptiveManager := GetAdaptiveBufferManager()
s.batchSize = adaptiveManager.GetOutputBufferSize()
s.mtx.Unlock()
}
// ReportLatency reports processing latency to adaptive buffer manager
func (s *AudioOutputStreamer) ReportLatency(latency time.Duration) {
adaptiveManager := GetAdaptiveBufferManager()
adaptiveManager.UpdateLatency(latency)
}
// Removed all unused AudioOutputStreamer methods
// StartAudioOutputStreaming starts audio output streaming (capturing system audio)
func StartAudioOutputStreaming(send func([]byte)) error {

View File

@ -0,0 +1,510 @@
package audio
import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// Unified IPC constants
var (
outputMagicNumber uint32 = GetConfig().OutputMagicNumber // "JKOU" (JetKVM Output)
inputMagicNumber uint32 = GetConfig().InputMagicNumber // "JKMI" (JetKVM Microphone Input)
outputSocketName = "audio_output.sock"
inputSocketName = "audio_input.sock"
headerSize = 17 // Fixed header size: 4+1+4+8 bytes
)
// UnifiedMessageType represents the type of IPC message for both input and output
type UnifiedMessageType uint8
const (
MessageTypeOpusFrame UnifiedMessageType = iota
MessageTypeConfig
MessageTypeOpusConfig
MessageTypeStop
MessageTypeHeartbeat
MessageTypeAck
)
// UnifiedIPCMessage represents a message sent over IPC for both input and output
type UnifiedIPCMessage struct {
Magic uint32
Type UnifiedMessageType
Length uint32
Timestamp int64
Data []byte
}
// Implement IPCMessage interface
func (msg *UnifiedIPCMessage) GetMagic() uint32 {
return msg.Magic
}
func (msg *UnifiedIPCMessage) GetType() uint8 {
return uint8(msg.Type)
}
func (msg *UnifiedIPCMessage) GetLength() uint32 {
return msg.Length
}
func (msg *UnifiedIPCMessage) GetTimestamp() int64 {
return msg.Timestamp
}
func (msg *UnifiedIPCMessage) GetData() []byte {
return msg.Data
}
// UnifiedIPCConfig represents configuration for audio
type UnifiedIPCConfig struct {
SampleRate int
Channels int
FrameSize int
}
// UnifiedIPCOpusConfig represents Opus-specific configuration
type UnifiedIPCOpusConfig struct {
SampleRate int
Channels int
FrameSize int
Bitrate int
Complexity int
VBR int
SignalType int
Bandwidth int
DTX int
}
// UnifiedAudioServer provides common functionality for both input and output servers
type UnifiedAudioServer struct {
// Atomic counters for performance monitoring
bufferSize int64 // Current buffer size (atomic)
droppedFrames int64 // Dropped frames counter (atomic)
totalFrames int64 // Total frames counter (atomic)
listener net.Listener
conn net.Conn
mtx sync.Mutex
running bool
logger zerolog.Logger
// Message channels
messageChan chan *UnifiedIPCMessage // Buffered channel for incoming messages
processChan chan *UnifiedIPCMessage // Buffered channel for processing queue
wg sync.WaitGroup // Wait group for goroutine coordination
// Configuration
socketPath string
magicNumber uint32
socketBufferConfig SocketBufferConfig
// Performance monitoring
latencyMonitor *LatencyMonitor
adaptiveOptimizer *AdaptiveOptimizer
}
// NewUnifiedAudioServer creates a new unified audio server
func NewUnifiedAudioServer(isInput bool) (*UnifiedAudioServer, error) {
var socketPath string
var magicNumber uint32
var componentName string
if isInput {
socketPath = getInputSocketPath()
magicNumber = inputMagicNumber
componentName = "audio-input-server"
} else {
socketPath = getOutputSocketPath()
magicNumber = outputMagicNumber
componentName = "audio-output-server"
}
logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger()
server := &UnifiedAudioServer{
logger: logger,
socketPath: socketPath,
magicNumber: magicNumber,
messageChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize),
processChan: make(chan *UnifiedIPCMessage, GetConfig().ChannelBufferSize),
socketBufferConfig: DefaultSocketBufferConfig(),
latencyMonitor: nil,
adaptiveOptimizer: nil,
}
return server, nil
}
// Start starts the unified audio server
func (s *UnifiedAudioServer) Start() error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.running {
return fmt.Errorf("server already running")
}
// Remove existing socket file
if err := os.Remove(s.socketPath); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to remove existing socket: %w", err)
}
// Create listener
listener, err := net.Listen("unix", s.socketPath)
if err != nil {
return fmt.Errorf("failed to create listener: %w", err)
}
s.listener = listener
s.running = true
// Start goroutines
s.wg.Add(3)
go s.acceptConnections()
go s.startReaderGoroutine()
go s.startProcessorGoroutine()
s.logger.Info().Str("socket_path", s.socketPath).Msg("Unified audio server started")
return nil
}
// Stop stops the unified audio server
func (s *UnifiedAudioServer) Stop() {
s.mtx.Lock()
defer s.mtx.Unlock()
if !s.running {
return
}
s.running = false
if s.listener != nil {
s.listener.Close()
}
if s.conn != nil {
s.conn.Close()
}
// Close channels
close(s.messageChan)
close(s.processChan)
// Wait for goroutines to finish
s.wg.Wait()
// Remove socket file
os.Remove(s.socketPath)
s.logger.Info().Msg("Unified audio server stopped")
}
// acceptConnections handles incoming connections
func (s *UnifiedAudioServer) acceptConnections() {
defer s.wg.Done()
for s.running {
conn, err := AcceptConnectionWithRetry(s.listener, 3, 100*time.Millisecond)
if err != nil {
if s.running {
s.logger.Error().Err(err).Msg("Failed to accept connection")
}
continue
}
s.mtx.Lock()
if s.conn != nil {
s.conn.Close()
}
s.conn = conn
s.mtx.Unlock()
s.logger.Info().Msg("Client connected")
}
}
// startReaderGoroutine handles reading messages from the connection
func (s *UnifiedAudioServer) startReaderGoroutine() {
defer s.wg.Done()
for s.running {
s.mtx.Lock()
conn := s.conn
s.mtx.Unlock()
if conn == nil {
time.Sleep(10 * time.Millisecond)
continue
}
msg, err := s.readMessage(conn)
if err != nil {
if s.running {
s.logger.Error().Err(err).Msg("Failed to read message")
}
continue
}
select {
case s.messageChan <- msg:
default:
atomic.AddInt64(&s.droppedFrames, 1)
s.logger.Warn().Msg("Message channel full, dropping message")
}
}
}
// startProcessorGoroutine handles processing messages
func (s *UnifiedAudioServer) startProcessorGoroutine() {
defer s.wg.Done()
for msg := range s.messageChan {
select {
case s.processChan <- msg:
atomic.AddInt64(&s.totalFrames, 1)
default:
atomic.AddInt64(&s.droppedFrames, 1)
s.logger.Warn().Msg("Process channel full, dropping message")
}
}
}
// readMessage reads a message from the connection
func (s *UnifiedAudioServer) readMessage(conn net.Conn) (*UnifiedIPCMessage, error) {
// Read header
header := make([]byte, headerSize)
if _, err := io.ReadFull(conn, header); err != nil {
return nil, fmt.Errorf("failed to read header: %w", err)
}
// Parse header
magic := binary.LittleEndian.Uint32(header[0:4])
if magic != s.magicNumber {
return nil, fmt.Errorf("invalid magic number: expected %d, got %d", s.magicNumber, magic)
}
msgType := UnifiedMessageType(header[4])
length := binary.LittleEndian.Uint32(header[5:9])
timestamp := int64(binary.LittleEndian.Uint64(header[9:17]))
// Validate length
if length > uint32(GetConfig().MaxFrameSize) {
return nil, fmt.Errorf("message too large: %d bytes", length)
}
// Read data
var data []byte
if length > 0 {
data = make([]byte, length)
if _, err := io.ReadFull(conn, data); err != nil {
return nil, fmt.Errorf("failed to read data: %w", err)
}
}
return &UnifiedIPCMessage{
Magic: magic,
Type: msgType,
Length: length,
Timestamp: timestamp,
Data: data,
}, nil
}
// SendFrame sends a frame to the connected client
func (s *UnifiedAudioServer) SendFrame(frame []byte) error {
s.mtx.Lock()
defer s.mtx.Unlock()
if !s.running || s.conn == nil {
return fmt.Errorf("no client connected")
}
start := time.Now()
// Create message
msg := &UnifiedIPCMessage{
Magic: s.magicNumber,
Type: MessageTypeOpusFrame,
Length: uint32(len(frame)),
Timestamp: start.UnixNano(),
Data: frame,
}
// Write message to connection
err := s.writeMessage(s.conn, msg)
if err != nil {
atomic.AddInt64(&s.droppedFrames, 1)
return err
}
// Record latency for monitoring
if s.latencyMonitor != nil {
writeLatency := time.Since(start)
s.latencyMonitor.RecordLatency(writeLatency, "ipc_write")
}
atomic.AddInt64(&s.totalFrames, 1)
return nil
}
// writeMessage writes a message to the connection
func (s *UnifiedAudioServer) writeMessage(conn net.Conn, msg *UnifiedIPCMessage) error {
// Write header
header := make([]byte, headerSize)
binary.LittleEndian.PutUint32(header[0:4], msg.Magic)
header[4] = uint8(msg.Type)
binary.LittleEndian.PutUint32(header[5:9], msg.Length)
binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp))
if _, err := conn.Write(header); err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
// Write data if present
if msg.Length > 0 && msg.Data != nil {
if _, err := conn.Write(msg.Data); err != nil {
return fmt.Errorf("failed to write data: %w", err)
}
}
return nil
}
// UnifiedAudioClient provides common functionality for both input and output clients
type UnifiedAudioClient struct {
// Atomic fields first for ARM32 alignment
droppedFrames int64 // Atomic counter for dropped frames
totalFrames int64 // Atomic counter for total frames
conn net.Conn
mtx sync.Mutex
running bool
logger zerolog.Logger
socketPath string
magicNumber uint32
bufferPool *AudioBufferPool // Buffer pool for memory optimization
}
// NewUnifiedAudioClient creates a new unified audio client
func NewUnifiedAudioClient(isInput bool) *UnifiedAudioClient {
var socketPath string
var magicNumber uint32
var componentName string
if isInput {
socketPath = getInputSocketPath()
magicNumber = inputMagicNumber
componentName = "audio-input-client"
} else {
socketPath = getOutputSocketPath()
magicNumber = outputMagicNumber
componentName = "audio-output-client"
}
logger := logging.GetDefaultLogger().With().Str("component", componentName).Logger()
return &UnifiedAudioClient{
logger: logger,
socketPath: socketPath,
magicNumber: magicNumber,
bufferPool: NewAudioBufferPool(GetConfig().MaxFrameSize),
}
}
// Connect connects the client to the server
func (c *UnifiedAudioClient) Connect() error {
c.mtx.Lock()
defer c.mtx.Unlock()
if c.running {
return nil // Already connected
}
// Ensure clean state before connecting
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
// Try connecting multiple times as the server might not be ready
// Reduced retry count and delay for faster startup
for i := 0; i < 10; i++ {
conn, err := net.Dial("unix", c.socketPath)
if err == nil {
c.conn = conn
c.running = true
// Reset frame counters on successful connection
atomic.StoreInt64(&c.totalFrames, 0)
atomic.StoreInt64(&c.droppedFrames, 0)
c.logger.Info().Str("socket_path", c.socketPath).Msg("Connected to server")
return nil
}
// Exponential backoff starting from config
backoffStart := GetConfig().BackoffStart
delay := time.Duration(backoffStart.Nanoseconds()*(1<<uint(i/3))) * time.Nanosecond
maxDelay := GetConfig().MaxRetryDelay
if delay > maxDelay {
delay = maxDelay
}
time.Sleep(delay)
}
// Ensure clean state on connection failure
c.conn = nil
c.running = false
return fmt.Errorf("failed to connect to audio server after 10 attempts")
}
// Disconnect disconnects the client from the server
func (c *UnifiedAudioClient) Disconnect() {
c.mtx.Lock()
defer c.mtx.Unlock()
if !c.running {
return
}
c.running = false
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
c.logger.Info().Msg("Disconnected from server")
}
// IsConnected returns whether the client is connected
func (c *UnifiedAudioClient) IsConnected() bool {
c.mtx.Lock()
defer c.mtx.Unlock()
return c.running && c.conn != nil
}
// GetFrameStats returns frame statistics
func (c *UnifiedAudioClient) GetFrameStats() (total, dropped int64) {
total = atomic.LoadInt64(&c.totalFrames)
dropped = atomic.LoadInt64(&c.droppedFrames)
return total, dropped
}
// Helper functions for socket paths
func getInputSocketPath() string {
return filepath.Join(os.TempDir(), inputSocketName)
}
func getOutputSocketPath() string {
return filepath.Join(os.TempDir(), outputSocketName)
}

View File

@ -17,7 +17,7 @@ var (
ErrFrameDataEmpty = errors.New("invalid frame data: frame data is empty")
ErrFrameDataTooLarge = errors.New("invalid frame data: exceeds maximum")
ErrInvalidBufferSize = errors.New("invalid buffer size")
ErrInvalidPriority = errors.New("invalid priority value")
ErrInvalidLatency = errors.New("invalid latency value")
ErrInvalidConfiguration = errors.New("invalid configuration")
ErrInvalidSocketConfig = errors.New("invalid socket configuration")
@ -99,16 +99,6 @@ func ValidateBufferSize(size int) error {
return nil
}
// ValidateThreadPriority validates thread priority values with system limits
func ValidateThreadPriority(priority int) error {
const minPriority, maxPriority = -20, 19
if priority < minPriority || priority > maxPriority {
return fmt.Errorf("%w: priority %d outside valid range [%d, %d]",
ErrInvalidPriority, priority, minPriority, maxPriority)
}
return nil
}
// ValidateLatency validates latency duration values with reasonable bounds
// Optimized to use AudioConfigCache for frequently accessed values
func ValidateLatency(latency time.Duration) error {

View File

@ -68,8 +68,6 @@ func startAudioSubprocess() error {
config.AudioQualityLowOpusDTX,
)
// Note: Audio input supervisor is NOT started here - it will be started on-demand
// when the user activates microphone input through the UI