[WIP] Performance Enhancements: move audion processing into a separate process

This commit is contained in:
Alex P 2025-08-21 22:16:48 +00:00
parent 7e83015932
commit 423d5775e3
23 changed files with 2565 additions and 966 deletions

View File

@ -159,8 +159,8 @@ else
msg_info "▶ Building development binary" msg_info "▶ Building development binary"
make build_dev make build_dev
# Kill any existing instances of the application # Kill any existing instances of the application (specific cleanup)
ssh "${REMOTE_USER}@${REMOTE_HOST}" "killall jetkvm_app_debug || true" ssh "${REMOTE_USER}@${REMOTE_HOST}" "killall jetkvm_app || true; killall jetkvm_native || true; killall jetkvm_app_debug || true; sleep 2"
# Copy the binary to the remote host # Copy the binary to the remote host
ssh "${REMOTE_USER}@${REMOTE_HOST}" "cat > ${REMOTE_PATH}/jetkvm_app_debug" < bin/jetkvm_app ssh "${REMOTE_USER}@${REMOTE_HOST}" "cat > ${REMOTE_PATH}/jetkvm_app_debug" < bin/jetkvm_app
@ -180,18 +180,18 @@ set -e
# Set the library path to include the directory where librockit.so is located # Set the library path to include the directory where librockit.so is located
export LD_LIBRARY_PATH=/oem/usr/lib:\$LD_LIBRARY_PATH export LD_LIBRARY_PATH=/oem/usr/lib:\$LD_LIBRARY_PATH
# Check if production jetkvm_app is running and save its state # Kill any existing instances of the application (specific cleanup)
PROD_APP_RUNNING=false killall jetkvm_app || true
if pgrep -f "/userdata/jetkvm/bin/jetkvm_app" > /dev/null; then killall jetkvm_native || true
PROD_APP_RUNNING=true
echo "Production jetkvm_app is running, will restore after development session"
else
echo "No production jetkvm_app detected"
fi
# Kill any existing instances of the application
pkill -f "/userdata/jetkvm/bin/jetkvm_app" || true
killall jetkvm_app_debug || true killall jetkvm_app_debug || true
sleep 2
# Verify no processes are using port 80
if netstat -tlnp | grep :80 > /dev/null 2>&1; then
echo "Warning: Port 80 still in use, attempting to free it..."
fuser -k 80/tcp || true
sleep 1
fi
# Navigate to the directory where the binary will be stored # Navigate to the directory where the binary will be stored
cd "${REMOTE_PATH}" cd "${REMOTE_PATH}"
@ -199,29 +199,7 @@ cd "${REMOTE_PATH}"
# Make the new binary executable # Make the new binary executable
chmod +x jetkvm_app_debug chmod +x jetkvm_app_debug
# Create a cleanup script that will restore the production app # Run the application in the background
cat > /tmp/restore_jetkvm.sh << RESTORE_EOF
#!/bin/ash
set -e
export LD_LIBRARY_PATH=/oem/usr/lib:\$LD_LIBRARY_PATH
cd ${REMOTE_PATH}
if [ "$PROD_APP_RUNNING" = "true" ]; then
echo "Restoring production jetkvm_app..."
killall jetkvm_app_debug || true
nohup /userdata/jetkvm/bin/jetkvm_app > /tmp/jetkvm_app.log 2>&1 &
echo "Production jetkvm_app restored"
else
echo "No production app was running before, not restoring"
fi
RESTORE_EOF
chmod +x /tmp/restore_jetkvm.sh
# Set up signal handler to restore production app on exit
trap '/tmp/restore_jetkvm.sh' EXIT INT TERM
# Run the application in the foreground
echo "Starting development jetkvm_app_debug..."
PION_LOG_TRACE=${LOG_TRACE_SCOPES} ./jetkvm_app_debug | tee -a /tmp/jetkvm_app_debug.log PION_LOG_TRACE=${LOG_TRACE_SCOPES} ./jetkvm_app_debug | tee -a /tmp/jetkvm_app_debug.log
EOF EOF
fi fi

View File

@ -1,13 +1,51 @@
package audio package audio
// StartAudioStreaming launches the in-process audio stream and delivers Opus frames to the provided callback. import (
// This is now a wrapper around the non-blocking audio implementation for backward compatibility. "os"
func StartAudioStreaming(send func([]byte)) error { "strings"
return StartNonBlockingAudioStreaming(send) )
// isAudioServerProcess detects if we're running as the audio server subprocess
func isAudioServerProcess() bool {
for _, arg := range os.Args {
if strings.Contains(arg, "--audio-server") {
return true
}
}
return false
} }
// StopAudioStreaming stops the in-process audio stream. // StartAudioStreaming launches the audio stream.
// This is now a wrapper around the non-blocking audio implementation for backward compatibility. // In audio server subprocess: uses CGO-based audio streaming
func StopAudioStreaming() { // In main process: this should not be called (use StartAudioRelay instead)
StopNonBlockingAudioStreaming() func StartAudioStreaming(send func([]byte)) error {
if isAudioServerProcess() {
// Audio server subprocess: use CGO audio processing
return StartAudioOutputStreaming(send)
} else {
// Main process: should use relay system instead
// This is kept for backward compatibility but not recommended
return StartAudioOutputStreaming(send)
}
}
// StopAudioStreaming stops the audio stream.
func StopAudioStreaming() {
if isAudioServerProcess() {
// Audio server subprocess: stop CGO audio processing
StopAudioOutputStreaming()
} else {
// Main process: stop relay if running
StopAudioRelay()
}
}
// StartNonBlockingAudioStreaming is an alias for backward compatibility
func StartNonBlockingAudioStreaming(send func([]byte)) error {
return StartAudioOutputStreaming(send)
}
// StopNonBlockingAudioStreaming is an alias for backward compatibility
func StopNonBlockingAudioStreaming() {
StopAudioOutputStreaming()
} }

View File

@ -28,22 +28,18 @@ type BatchAudioProcessor struct {
// Batch queues and state (atomic for lock-free access) // Batch queues and state (atomic for lock-free access)
readQueue chan batchReadRequest readQueue chan batchReadRequest
writeQueue chan batchWriteRequest
initialized int32 initialized int32
running int32 running int32
threadPinned int32 threadPinned int32
// Buffers (pre-allocated to avoid allocation overhead) // Buffers (pre-allocated to avoid allocation overhead)
readBufPool *sync.Pool readBufPool *sync.Pool
writeBufPool *sync.Pool
} }
type BatchAudioStats struct { type BatchAudioStats struct {
// int64 fields MUST be first for ARM32 alignment // int64 fields MUST be first for ARM32 alignment
BatchedReads int64 BatchedReads int64
BatchedWrites int64
SingleReads int64 SingleReads int64
SingleWrites int64
BatchedFrames int64 BatchedFrames int64
SingleFrames int64 SingleFrames int64
CGOCallsReduced int64 CGOCallsReduced int64
@ -57,22 +53,11 @@ type batchReadRequest struct {
timestamp time.Time timestamp time.Time
} }
type batchWriteRequest struct {
buffer []byte
resultChan chan batchWriteResult
timestamp time.Time
}
type batchReadResult struct { type batchReadResult struct {
length int length int
err error err error
} }
type batchWriteResult struct {
written int
err error
}
// NewBatchAudioProcessor creates a new batch audio processor // NewBatchAudioProcessor creates a new batch audio processor
func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor { func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAudioProcessor {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@ -85,17 +70,11 @@ func NewBatchAudioProcessor(batchSize int, batchDuration time.Duration) *BatchAu
batchSize: batchSize, batchSize: batchSize,
batchDuration: batchDuration, batchDuration: batchDuration,
readQueue: make(chan batchReadRequest, batchSize*2), readQueue: make(chan batchReadRequest, batchSize*2),
writeQueue: make(chan batchWriteRequest, batchSize*2),
readBufPool: &sync.Pool{ readBufPool: &sync.Pool{
New: func() interface{} { New: func() interface{} {
return make([]byte, 1500) // Max audio frame size return make([]byte, 1500) // Max audio frame size
}, },
}, },
writeBufPool: &sync.Pool{
New: func() interface{} {
return make([]byte, 4096) // Max write buffer size
},
},
} }
return processor return processor
@ -114,7 +93,6 @@ func (bap *BatchAudioProcessor) Start() error {
// Start batch processing goroutines // Start batch processing goroutines
go bap.batchReadProcessor() go bap.batchReadProcessor()
go bap.batchWriteProcessor()
bap.logger.Info().Int("batch_size", bap.batchSize). bap.logger.Info().Int("batch_size", bap.batchSize).
Dur("batch_duration", bap.batchDuration). Dur("batch_duration", bap.batchDuration).
@ -175,43 +153,7 @@ func (bap *BatchAudioProcessor) BatchReadEncode(buffer []byte) (int, error) {
} }
} }
// BatchDecodeWrite performs batched audio decode and write operations
func (bap *BatchAudioProcessor) BatchDecodeWrite(buffer []byte) (int, error) {
if atomic.LoadInt32(&bap.running) == 0 {
// Fallback to single operation if batch processor is not running
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1)
return CGOAudioDecodeWrite(buffer)
}
resultChan := make(chan batchWriteResult, 1)
request := batchWriteRequest{
buffer: buffer,
resultChan: resultChan,
timestamp: time.Now(),
}
select {
case bap.writeQueue <- request:
// Successfully queued
case <-time.After(5 * time.Millisecond):
// Queue is full or blocked, fallback to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1)
return CGOAudioDecodeWrite(buffer)
}
// Wait for result
select {
case result := <-resultChan:
return result.written, result.err
case <-time.After(50 * time.Millisecond):
// Timeout, fallback to single operation
atomic.AddInt64(&bap.stats.SingleWrites, 1)
atomic.AddInt64(&bap.stats.SingleFrames, 1)
return CGOAudioDecodeWrite(buffer)
}
}
// batchReadProcessor processes batched read operations // batchReadProcessor processes batched read operations
func (bap *BatchAudioProcessor) batchReadProcessor() { func (bap *BatchAudioProcessor) batchReadProcessor() {
@ -249,41 +191,7 @@ func (bap *BatchAudioProcessor) batchReadProcessor() {
} }
} }
// batchWriteProcessor processes batched write operations
func (bap *BatchAudioProcessor) batchWriteProcessor() {
defer bap.logger.Debug().Msg("batch write processor stopped")
ticker := time.NewTicker(bap.batchDuration)
defer ticker.Stop()
var batch []batchWriteRequest
batch = make([]batchWriteRequest, 0, bap.batchSize)
for atomic.LoadInt32(&bap.running) == 1 {
select {
case <-bap.ctx.Done():
return
case req := <-bap.writeQueue:
batch = append(batch, req)
if len(batch) >= bap.batchSize {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
case <-ticker.C:
if len(batch) > 0 {
bap.processBatchWrite(batch)
batch = batch[:0] // Clear slice but keep capacity
}
}
}
// Process any remaining requests
if len(batch) > 0 {
bap.processBatchWrite(batch)
}
}
// processBatchRead processes a batch of read requests efficiently // processBatchRead processes a batch of read requests efficiently
func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) { func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
@ -328,56 +236,13 @@ func (bap *BatchAudioProcessor) processBatchRead(batch []batchReadRequest) {
bap.stats.LastBatchTime = time.Now() bap.stats.LastBatchTime = time.Now()
} }
// processBatchWrite processes a batch of write requests efficiently
func (bap *BatchAudioProcessor) processBatchWrite(batch []batchWriteRequest) {
if len(batch) == 0 {
return
}
// Pin to OS thread for the entire batch to minimize thread switching overhead
start := time.Now()
if atomic.CompareAndSwapInt32(&bap.threadPinned, 0, 1) {
runtime.LockOSThread()
defer func() {
runtime.UnlockOSThread()
atomic.StoreInt32(&bap.threadPinned, 0)
bap.stats.OSThreadPinTime += time.Since(start)
}()
}
batchSize := len(batch)
atomic.AddInt64(&bap.stats.BatchedWrites, 1)
atomic.AddInt64(&bap.stats.BatchedFrames, int64(batchSize))
if batchSize > 1 {
atomic.AddInt64(&bap.stats.CGOCallsReduced, int64(batchSize-1))
}
// Process each request in the batch
for _, req := range batch {
written, err := CGOAudioDecodeWrite(req.buffer)
result := batchWriteResult{
written: written,
err: err,
}
// Send result back (non-blocking)
select {
case req.resultChan <- result:
default:
// Requestor timed out, drop result
}
}
bap.stats.LastBatchTime = time.Now()
}
// GetStats returns current batch processor statistics // GetStats returns current batch processor statistics
func (bap *BatchAudioProcessor) GetStats() BatchAudioStats { func (bap *BatchAudioProcessor) GetStats() BatchAudioStats {
return BatchAudioStats{ return BatchAudioStats{
BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads), BatchedReads: atomic.LoadInt64(&bap.stats.BatchedReads),
BatchedWrites: atomic.LoadInt64(&bap.stats.BatchedWrites),
SingleReads: atomic.LoadInt64(&bap.stats.SingleReads), SingleReads: atomic.LoadInt64(&bap.stats.SingleReads),
SingleWrites: atomic.LoadInt64(&bap.stats.SingleWrites),
BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames), BatchedFrames: atomic.LoadInt64(&bap.stats.BatchedFrames),
SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames), SingleFrames: atomic.LoadInt64(&bap.stats.SingleFrames),
CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced), CGOCallsReduced: atomic.LoadInt64(&bap.stats.CGOCallsReduced),
@ -444,12 +309,3 @@ func BatchCGOAudioReadEncode(buffer []byte) (int, error) {
} }
return CGOAudioReadEncode(buffer) return CGOAudioReadEncode(buffer)
} }
// BatchCGOAudioDecodeWrite is a batched version of CGOAudioDecodeWrite
func BatchCGOAudioDecodeWrite(buffer []byte) (int, error) {
processor := GetBatchAudioProcessor()
if processor != nil && processor.IsRunning() {
return processor.BatchDecodeWrite(buffer)
}
return CGOAudioDecodeWrite(buffer)
}

View File

@ -358,8 +358,8 @@ func (aeb *AudioEventBroadcaster) sendToSubscriber(subscriber *AudioEventSubscri
if err != nil { if err != nil {
// Don't log network errors for closed connections as warnings, they're expected // Don't log network errors for closed connections as warnings, they're expected
if strings.Contains(err.Error(), "use of closed network connection") || if strings.Contains(err.Error(), "use of closed network connection") ||
strings.Contains(err.Error(), "connection reset by peer") || strings.Contains(err.Error(), "connection reset by peer") ||
strings.Contains(err.Error(), "context canceled") { strings.Contains(err.Error(), "context canceled") {
subscriber.logger.Debug().Err(err).Msg("websocket connection closed during audio event send") subscriber.logger.Debug().Err(err).Msg("websocket connection closed during audio event send")
} else { } else {
subscriber.logger.Warn().Err(err).Msg("failed to send audio event to subscriber") subscriber.logger.Warn().Err(err).Msg("failed to send audio event to subscriber")

View File

@ -19,21 +19,21 @@ type AudioInputMetrics struct {
LastFrameTime time.Time LastFrameTime time.Time
} }
// AudioInputManager manages microphone input stream from WebRTC to USB gadget // AudioInputManager manages microphone input stream using IPC mode only
type AudioInputManager struct { type AudioInputManager struct {
// metrics MUST be first for ARM32 alignment (contains int64 fields) // metrics MUST be first for ARM32 alignment (contains int64 fields)
metrics AudioInputMetrics metrics AudioInputMetrics
inputBuffer chan []byte ipcManager *AudioInputIPCManager
logger zerolog.Logger logger zerolog.Logger
running int32 running int32
} }
// NewAudioInputManager creates a new audio input manager // NewAudioInputManager creates a new audio input manager (IPC mode only)
func NewAudioInputManager() *AudioInputManager { func NewAudioInputManager() *AudioInputManager {
return &AudioInputManager{ return &AudioInputManager{
inputBuffer: make(chan []byte, 100), // Buffer up to 100 frames ipcManager: NewAudioInputIPCManager(),
logger: logging.GetDefaultLogger().With().Str("component", "audio-input").Logger(), logger: logging.GetDefaultLogger().With().Str("component", "audio-input").Logger(),
} }
} }
@ -45,9 +45,10 @@ func (aim *AudioInputManager) Start() error {
aim.logger.Info().Msg("Starting audio input manager") aim.logger.Info().Msg("Starting audio input manager")
// Start the non-blocking audio input stream // Start the IPC-based audio input
err := StartNonBlockingAudioInput(aim.inputBuffer) err := aim.ipcManager.Start()
if err != nil { if err != nil {
aim.logger.Error().Err(err).Msg("Failed to start IPC audio input")
atomic.StoreInt32(&aim.running, 0) atomic.StoreInt32(&aim.running, 0)
return err return err
} }
@ -63,54 +64,102 @@ func (aim *AudioInputManager) Stop() {
aim.logger.Info().Msg("Stopping audio input manager") aim.logger.Info().Msg("Stopping audio input manager")
// Stop the non-blocking audio input stream // Stop the IPC-based audio input
StopNonBlockingAudioInput() aim.ipcManager.Stop()
// Drain the input buffer
go func() {
for {
select {
case <-aim.inputBuffer:
// Drain
case <-time.After(100 * time.Millisecond):
return
}
}
}()
aim.logger.Info().Msg("Audio input manager stopped") aim.logger.Info().Msg("Audio input manager stopped")
} }
// WriteOpusFrame writes an Opus frame to the input buffer // WriteOpusFrame writes an Opus frame to the audio input system with latency tracking
func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error { func (aim *AudioInputManager) WriteOpusFrame(frame []byte) error {
if atomic.LoadInt32(&aim.running) == 0 { if !aim.IsRunning() {
return nil // Not running, ignore return nil // Not running, silently drop
} }
select { // Track end-to-end latency from WebRTC to IPC
case aim.inputBuffer <- frame: startTime := time.Now()
atomic.AddInt64(&aim.metrics.FramesSent, 1) err := aim.ipcManager.WriteOpusFrame(frame)
atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame))) processingTime := time.Since(startTime)
aim.metrics.LastFrameTime = time.Now()
return nil // Log high latency warnings
default: if processingTime > 10*time.Millisecond {
// Buffer full, drop frame aim.logger.Warn().
Dur("latency_ms", processingTime).
Msg("High audio processing latency detected")
}
if err != nil {
atomic.AddInt64(&aim.metrics.FramesDropped, 1) atomic.AddInt64(&aim.metrics.FramesDropped, 1)
aim.logger.Warn().Msg("Audio input buffer full, dropping frame") return err
return nil }
// Update metrics
atomic.AddInt64(&aim.metrics.FramesSent, 1)
atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame)))
aim.metrics.LastFrameTime = time.Now()
aim.metrics.AverageLatency = processingTime
return nil
}
// GetMetrics returns current audio input metrics
func (aim *AudioInputManager) GetMetrics() AudioInputMetrics {
return AudioInputMetrics{
FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent),
FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed),
AverageLatency: aim.metrics.AverageLatency,
LastFrameTime: aim.metrics.LastFrameTime,
} }
} }
// GetMetrics returns current microphone input metrics // GetComprehensiveMetrics returns detailed performance metrics across all components
func (aim *AudioInputManager) GetMetrics() AudioInputMetrics { func (aim *AudioInputManager) GetComprehensiveMetrics() map[string]interface{} {
return AudioInputMetrics{ // Get base metrics
FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent), baseMetrics := aim.GetMetrics()
FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed), // Get detailed IPC metrics
LastFrameTime: aim.metrics.LastFrameTime, ipcMetrics, detailedStats := aim.ipcManager.GetDetailedMetrics()
ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops),
AverageLatency: aim.metrics.AverageLatency, comprehensiveMetrics := map[string]interface{}{
"manager": map[string]interface{}{
"frames_sent": baseMetrics.FramesSent,
"frames_dropped": baseMetrics.FramesDropped,
"bytes_processed": baseMetrics.BytesProcessed,
"average_latency_ms": float64(baseMetrics.AverageLatency.Nanoseconds()) / 1e6,
"last_frame_time": baseMetrics.LastFrameTime,
"running": aim.IsRunning(),
},
"ipc": map[string]interface{}{
"frames_sent": ipcMetrics.FramesSent,
"frames_dropped": ipcMetrics.FramesDropped,
"bytes_processed": ipcMetrics.BytesProcessed,
"average_latency_ms": float64(ipcMetrics.AverageLatency.Nanoseconds()) / 1e6,
"last_frame_time": ipcMetrics.LastFrameTime,
},
"detailed": detailedStats,
} }
return comprehensiveMetrics
}
// LogPerformanceStats logs current performance statistics
func (aim *AudioInputManager) LogPerformanceStats() {
metrics := aim.GetComprehensiveMetrics()
managerStats := metrics["manager"].(map[string]interface{})
ipcStats := metrics["ipc"].(map[string]interface{})
detailedStats := metrics["detailed"].(map[string]interface{})
aim.logger.Info().
Int64("manager_frames_sent", managerStats["frames_sent"].(int64)).
Int64("manager_frames_dropped", managerStats["frames_dropped"].(int64)).
Float64("manager_latency_ms", managerStats["average_latency_ms"].(float64)).
Int64("ipc_frames_sent", ipcStats["frames_sent"].(int64)).
Int64("ipc_frames_dropped", ipcStats["frames_dropped"].(int64)).
Float64("ipc_latency_ms", ipcStats["average_latency_ms"].(float64)).
Float64("client_drop_rate", detailedStats["client_drop_rate"].(float64)).
Float64("frames_per_second", detailedStats["frames_per_second"].(float64)).
Msg("Audio input performance metrics")
} }
// IsRunning returns whether the audio input manager is running // IsRunning returns whether the audio input manager is running

View File

@ -0,0 +1,94 @@
package audio
import (
"sync/atomic"
"unsafe"
)
var (
// Global audio input manager instance
globalInputManager unsafe.Pointer // *AudioInputManager
)
// AudioInputInterface defines the common interface for audio input managers
type AudioInputInterface interface {
Start() error
Stop()
WriteOpusFrame(frame []byte) error
IsRunning() bool
GetMetrics() AudioInputMetrics
}
// GetSupervisor returns the audio input supervisor for advanced management
func (m *AudioInputManager) GetSupervisor() *AudioInputSupervisor {
return m.ipcManager.GetSupervisor()
}
// getAudioInputManager returns the audio input manager
func getAudioInputManager() AudioInputInterface {
ptr := atomic.LoadPointer(&globalInputManager)
if ptr == nil {
// Create new manager
newManager := NewAudioInputManager()
if atomic.CompareAndSwapPointer(&globalInputManager, nil, unsafe.Pointer(newManager)) {
return newManager
}
// Another goroutine created it, use that one
ptr = atomic.LoadPointer(&globalInputManager)
}
return (*AudioInputManager)(ptr)
}
// StartAudioInput starts the audio input system using the appropriate manager
func StartAudioInput() error {
manager := getAudioInputManager()
return manager.Start()
}
// StopAudioInput stops the audio input system
func StopAudioInput() {
manager := getAudioInputManager()
manager.Stop()
}
// WriteAudioInputFrame writes an Opus frame to the audio input system
func WriteAudioInputFrame(frame []byte) error {
manager := getAudioInputManager()
return manager.WriteOpusFrame(frame)
}
// IsAudioInputRunning returns whether the audio input system is running
func IsAudioInputRunning() bool {
manager := getAudioInputManager()
return manager.IsRunning()
}
// GetAudioInputMetrics returns current audio input metrics
func GetAudioInputMetrics() AudioInputMetrics {
manager := getAudioInputManager()
return manager.GetMetrics()
}
// GetAudioInputIPCSupervisor returns the IPC supervisor
func GetAudioInputIPCSupervisor() *AudioInputSupervisor {
ptr := atomic.LoadPointer(&globalInputManager)
if ptr == nil {
return nil
}
manager := (*AudioInputManager)(ptr)
return manager.GetSupervisor()
}
// Helper functions
// ResetAudioInputManagers resets the global manager (for testing)
func ResetAudioInputManagers() {
// Stop existing manager first
if ptr := atomic.LoadPointer(&globalInputManager); ptr != nil {
(*AudioInputManager)(ptr).Stop()
}
// Reset pointer
atomic.StorePointer(&globalInputManager, nil)
}

689
internal/audio/input_ipc.go Normal file
View File

@ -0,0 +1,689 @@
package audio
import (
"context"
"encoding/binary"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
)
const (
inputMagicNumber uint32 = 0x4A4B4D49 // "JKMI" (JetKVM Microphone Input)
inputSocketName = "audio_input.sock"
maxFrameSize = 4096 // Maximum Opus frame size
writeTimeout = 5 * time.Millisecond // Non-blocking write timeout
maxDroppedFrames = 100 // Maximum consecutive dropped frames before reconnect
)
// InputMessageType represents the type of IPC message
type InputMessageType uint8
const (
InputMessageTypeOpusFrame InputMessageType = iota
InputMessageTypeConfig
InputMessageTypeStop
InputMessageTypeHeartbeat
InputMessageTypeAck
)
// InputIPCMessage represents a message sent over IPC
type InputIPCMessage struct {
Magic uint32
Type InputMessageType
Length uint32
Timestamp int64
Data []byte
}
// InputIPCConfig represents configuration for audio input
type InputIPCConfig struct {
SampleRate int
Channels int
FrameSize int
}
// AudioInputServer handles IPC communication for audio input processing
type AudioInputServer struct {
// Atomic fields must be first for proper alignment on ARM
bufferSize int64 // Current buffer size (atomic)
processingTime int64 // Average processing time in nanoseconds (atomic)
droppedFrames int64 // Dropped frames counter (atomic)
totalFrames int64 // Total frames counter (atomic)
listener net.Listener
conn net.Conn
mtx sync.Mutex
running bool
// Triple-goroutine architecture
messageChan chan *InputIPCMessage // Buffered channel for incoming messages
processChan chan *InputIPCMessage // Buffered channel for processing queue
stopChan chan struct{} // Stop signal for all goroutines
wg sync.WaitGroup // Wait group for goroutine coordination
}
// NewAudioInputServer creates a new audio input server
func NewAudioInputServer() (*AudioInputServer, error) {
socketPath := getInputSocketPath()
// Remove existing socket if any
os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath)
if err != nil {
return nil, fmt.Errorf("failed to create unix socket: %w", err)
}
// Initialize with adaptive buffer size (start with 1000 frames)
initialBufferSize := int64(1000)
return &AudioInputServer{
listener: listener,
messageChan: make(chan *InputIPCMessage, initialBufferSize),
processChan: make(chan *InputIPCMessage, initialBufferSize),
stopChan: make(chan struct{}),
bufferSize: initialBufferSize,
}, nil
}
// Start starts the audio input server
func (ais *AudioInputServer) Start() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.running {
return fmt.Errorf("server already running")
}
ais.running = true
// Start triple-goroutine architecture
ais.startReaderGoroutine()
ais.startProcessorGoroutine()
ais.startMonitorGoroutine()
// Accept connections in a goroutine
go ais.acceptConnections()
return nil
}
// Stop stops the audio input server
func (ais *AudioInputServer) Stop() {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if !ais.running {
return
}
ais.running = false
// Signal all goroutines to stop
close(ais.stopChan)
ais.wg.Wait()
if ais.conn != nil {
ais.conn.Close()
ais.conn = nil
}
if ais.listener != nil {
ais.listener.Close()
}
}
// Close closes the server and cleans up resources
func (ais *AudioInputServer) Close() {
ais.Stop()
// Remove socket file
os.Remove(getInputSocketPath())
}
// acceptConnections accepts incoming connections
func (ais *AudioInputServer) acceptConnections() {
for ais.running {
conn, err := ais.listener.Accept()
if err != nil {
if ais.running {
// Only log error if we're still supposed to be running
continue
}
return
}
ais.mtx.Lock()
// Close existing connection if any
if ais.conn != nil {
ais.conn.Close()
}
ais.conn = conn
ais.mtx.Unlock()
// Handle this connection
go ais.handleConnection(conn)
}
}
// handleConnection handles a single client connection
func (ais *AudioInputServer) handleConnection(conn net.Conn) {
defer conn.Close()
// Connection is now handled by the reader goroutine
// Just wait for connection to close or stop signal
for {
select {
case <-ais.stopChan:
return
default:
// Check if connection is still alive
if ais.conn == nil {
return
}
time.Sleep(100 * time.Millisecond)
}
}
}
// readMessage reads a complete message from the connection
func (ais *AudioInputServer) readMessage(conn net.Conn) (*InputIPCMessage, error) {
// Read header (magic + type + length + timestamp)
headerSize := 4 + 1 + 4 + 8 // uint32 + uint8 + uint32 + int64
header := make([]byte, headerSize)
_, err := io.ReadFull(conn, header)
if err != nil {
return nil, err
}
// Parse header
msg := &InputIPCMessage{}
msg.Magic = binary.LittleEndian.Uint32(header[0:4])
msg.Type = InputMessageType(header[4])
msg.Length = binary.LittleEndian.Uint32(header[5:9])
msg.Timestamp = int64(binary.LittleEndian.Uint64(header[9:17]))
// Validate magic number
if msg.Magic != inputMagicNumber {
return nil, fmt.Errorf("invalid magic number: %x", msg.Magic)
}
// Validate message length
if msg.Length > maxFrameSize {
return nil, fmt.Errorf("message too large: %d bytes", msg.Length)
}
// Read data if present
if msg.Length > 0 {
msg.Data = make([]byte, msg.Length)
_, err = io.ReadFull(conn, msg.Data)
if err != nil {
return nil, err
}
}
return msg, nil
}
// processMessage processes a received message
func (ais *AudioInputServer) processMessage(msg *InputIPCMessage) error {
switch msg.Type {
case InputMessageTypeOpusFrame:
return ais.processOpusFrame(msg.Data)
case InputMessageTypeConfig:
return ais.processConfig(msg.Data)
case InputMessageTypeStop:
return fmt.Errorf("stop message received")
case InputMessageTypeHeartbeat:
return ais.sendAck()
default:
return fmt.Errorf("unknown message type: %d", msg.Type)
}
}
// processOpusFrame processes an Opus audio frame
func (ais *AudioInputServer) processOpusFrame(data []byte) error {
if len(data) == 0 {
return nil // Empty frame, ignore
}
// Process the Opus frame using CGO
_, err := CGOAudioDecodeWrite(data)
return err
}
// processConfig processes a configuration update
func (ais *AudioInputServer) processConfig(data []byte) error {
// For now, just acknowledge the config
// TODO: Parse and apply configuration
return ais.sendAck()
}
// sendAck sends an acknowledgment message
func (ais *AudioInputServer) sendAck() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.conn == nil {
return fmt.Errorf("no connection")
}
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeAck,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
return ais.writeMessage(ais.conn, msg)
}
// writeMessage writes a message to the connection
func (ais *AudioInputServer) writeMessage(conn net.Conn, msg *InputIPCMessage) error {
// Prepare header
headerSize := 4 + 1 + 4 + 8
header := make([]byte, headerSize)
binary.LittleEndian.PutUint32(header[0:4], msg.Magic)
header[4] = byte(msg.Type)
binary.LittleEndian.PutUint32(header[5:9], msg.Length)
binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp))
// Write header
_, err := conn.Write(header)
if err != nil {
return err
}
// Write data if present
if msg.Length > 0 && msg.Data != nil {
_, err = conn.Write(msg.Data)
if err != nil {
return err
}
}
return nil
}
// AudioInputClient handles IPC communication from the main process
type AudioInputClient struct {
// Atomic fields must be first for proper alignment on ARM
droppedFrames int64 // Atomic counter for dropped frames
totalFrames int64 // Atomic counter for total frames
conn net.Conn
mtx sync.Mutex
running bool
}
// NewAudioInputClient creates a new audio input client
func NewAudioInputClient() *AudioInputClient {
return &AudioInputClient{}
}
// Connect connects to the audio input server
func (aic *AudioInputClient) Connect() error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if aic.running {
return nil // Already connected
}
socketPath := getInputSocketPath()
// Try connecting multiple times as the server might not be ready
for i := 0; i < 5; i++ {
conn, err := net.Dial("unix", socketPath)
if err == nil {
aic.conn = conn
aic.running = true
return nil
}
time.Sleep(time.Second)
}
return fmt.Errorf("failed to connect to audio input server")
}
// Disconnect disconnects from the audio input server
func (aic *AudioInputClient) Disconnect() {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running {
return
}
aic.running = false
if aic.conn != nil {
// Send stop message
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeStop,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
aic.writeMessage(msg) // Ignore errors during shutdown
aic.conn.Close()
aic.conn = nil
}
}
// SendFrame sends an Opus frame to the audio input server
func (aic *AudioInputClient) SendFrame(frame []byte) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected")
}
if len(frame) == 0 {
return nil // Empty frame, ignore
}
if len(frame) > maxFrameSize {
return fmt.Errorf("frame too large: %d bytes", len(frame))
}
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeOpusFrame,
Length: uint32(len(frame)),
Timestamp: time.Now().UnixNano(),
Data: frame,
}
return aic.writeMessage(msg)
}
// SendConfig sends a configuration update to the audio input server
func (aic *AudioInputClient) SendConfig(config InputIPCConfig) error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected")
}
// Serialize config (simple binary format)
data := make([]byte, 12) // 3 * int32
binary.LittleEndian.PutUint32(data[0:4], uint32(config.SampleRate))
binary.LittleEndian.PutUint32(data[4:8], uint32(config.Channels))
binary.LittleEndian.PutUint32(data[8:12], uint32(config.FrameSize))
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeConfig,
Length: uint32(len(data)),
Timestamp: time.Now().UnixNano(),
Data: data,
}
return aic.writeMessage(msg)
}
// SendHeartbeat sends a heartbeat message
func (aic *AudioInputClient) SendHeartbeat() error {
aic.mtx.Lock()
defer aic.mtx.Unlock()
if !aic.running || aic.conn == nil {
return fmt.Errorf("not connected")
}
msg := &InputIPCMessage{
Magic: inputMagicNumber,
Type: InputMessageTypeHeartbeat,
Length: 0,
Timestamp: time.Now().UnixNano(),
}
return aic.writeMessage(msg)
}
// writeMessage writes a message to the server
func (aic *AudioInputClient) writeMessage(msg *InputIPCMessage) error {
// Increment total frames counter
atomic.AddInt64(&aic.totalFrames, 1)
// Prepare header
headerSize := 4 + 1 + 4 + 8
header := make([]byte, headerSize)
binary.LittleEndian.PutUint32(header[0:4], msg.Magic)
header[4] = byte(msg.Type)
binary.LittleEndian.PutUint32(header[5:9], msg.Length)
binary.LittleEndian.PutUint64(header[9:17], uint64(msg.Timestamp))
// Use non-blocking write with timeout
ctx, cancel := context.WithTimeout(context.Background(), writeTimeout)
defer cancel()
// Create a channel to signal write completion
done := make(chan error, 1)
go func() {
// Write header
_, err := aic.conn.Write(header)
if err != nil {
done <- err
return
}
// Write data if present
if msg.Length > 0 && msg.Data != nil {
_, err = aic.conn.Write(msg.Data)
if err != nil {
done <- err
return
}
}
done <- nil
}()
// Wait for completion or timeout
select {
case err := <-done:
if err != nil {
atomic.AddInt64(&aic.droppedFrames, 1)
return err
}
return nil
case <-ctx.Done():
// Timeout occurred - drop frame to prevent blocking
atomic.AddInt64(&aic.droppedFrames, 1)
return fmt.Errorf("write timeout - frame dropped")
}
}
// IsConnected returns whether the client is connected
func (aic *AudioInputClient) IsConnected() bool {
aic.mtx.Lock()
defer aic.mtx.Unlock()
return aic.running && aic.conn != nil
}
// GetFrameStats returns frame statistics
func (aic *AudioInputClient) GetFrameStats() (total, dropped int64) {
return atomic.LoadInt64(&aic.totalFrames), atomic.LoadInt64(&aic.droppedFrames)
}
// GetDropRate returns the current frame drop rate as a percentage
func (aic *AudioInputClient) GetDropRate() float64 {
total := atomic.LoadInt64(&aic.totalFrames)
dropped := atomic.LoadInt64(&aic.droppedFrames)
if total == 0 {
return 0.0
}
return float64(dropped) / float64(total) * 100.0
}
// ResetStats resets frame statistics
func (aic *AudioInputClient) ResetStats() {
atomic.StoreInt64(&aic.totalFrames, 0)
atomic.StoreInt64(&aic.droppedFrames, 0)
}
// startReaderGoroutine starts the message reader goroutine
func (ais *AudioInputServer) startReaderGoroutine() {
ais.wg.Add(1)
go func() {
defer ais.wg.Done()
for {
select {
case <-ais.stopChan:
return
default:
if ais.conn != nil {
msg, err := ais.readMessage(ais.conn)
if err != nil {
continue // Connection error, retry
}
// Send to message channel with non-blocking write
select {
case ais.messageChan <- msg:
atomic.AddInt64(&ais.totalFrames, 1)
default:
// Channel full, drop message
atomic.AddInt64(&ais.droppedFrames, 1)
}
}
}
}
}()
}
// startProcessorGoroutine starts the message processor goroutine
func (ais *AudioInputServer) startProcessorGoroutine() {
ais.wg.Add(1)
go func() {
defer ais.wg.Done()
for {
select {
case <-ais.stopChan:
return
case msg := <-ais.messageChan:
// Intelligent frame dropping: prioritize recent frames
if msg.Type == InputMessageTypeOpusFrame {
// Check if processing queue is getting full
queueLen := len(ais.processChan)
bufferSize := int(atomic.LoadInt64(&ais.bufferSize))
if queueLen > bufferSize*3/4 {
// Drop oldest frames, keep newest
select {
case <-ais.processChan: // Remove oldest
atomic.AddInt64(&ais.droppedFrames, 1)
default:
}
}
}
// Send to processing queue
select {
case ais.processChan <- msg:
default:
// Processing queue full, drop frame
atomic.AddInt64(&ais.droppedFrames, 1)
}
}
}
}()
}
// startMonitorGoroutine starts the performance monitoring goroutine
func (ais *AudioInputServer) startMonitorGoroutine() {
ais.wg.Add(1)
go func() {
defer ais.wg.Done()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ais.stopChan:
return
case <-ticker.C:
// Process frames from processing queue
for {
select {
case msg := <-ais.processChan:
start := time.Now()
err := ais.processMessage(msg)
processingTime := time.Since(start).Nanoseconds()
// Update average processing time
currentAvg := atomic.LoadInt64(&ais.processingTime)
newAvg := (currentAvg + processingTime) / 2
atomic.StoreInt64(&ais.processingTime, newAvg)
if err != nil {
atomic.AddInt64(&ais.droppedFrames, 1)
}
default:
// No more messages to process
goto adaptiveBuffering
}
}
adaptiveBuffering:
// Adaptive buffer sizing based on processing time
avgTime := atomic.LoadInt64(&ais.processingTime)
currentSize := atomic.LoadInt64(&ais.bufferSize)
if avgTime > 10*1000*1000 { // > 10ms processing time
// Increase buffer size
newSize := currentSize * 2
if newSize > 1000 {
newSize = 1000
}
atomic.StoreInt64(&ais.bufferSize, newSize)
} else if avgTime < 1*1000*1000 { // < 1ms processing time
// Decrease buffer size
newSize := currentSize / 2
if newSize < 50 {
newSize = 50
}
atomic.StoreInt64(&ais.bufferSize, newSize)
}
}
}
}()
}
// GetServerStats returns server performance statistics
func (ais *AudioInputServer) GetServerStats() (total, dropped int64, avgProcessingTime time.Duration, bufferSize int64) {
return atomic.LoadInt64(&ais.totalFrames),
atomic.LoadInt64(&ais.droppedFrames),
time.Duration(atomic.LoadInt64(&ais.processingTime)),
atomic.LoadInt64(&ais.bufferSize)
}
// Helper functions
// getInputSocketPath returns the path to the input socket
func getInputSocketPath() string {
if path := os.Getenv("JETKVM_AUDIO_INPUT_SOCKET"); path != "" {
return path
}
return filepath.Join("/var/run", inputSocketName)
}
// isAudioInputIPCEnabled returns whether IPC mode is enabled
// IPC mode is now enabled by default for better KVM performance
func isAudioInputIPCEnabled() bool {
// Check if explicitly disabled
if os.Getenv("JETKVM_AUDIO_INPUT_IPC") == "false" {
return false
}
// Default to enabled (IPC mode)
return true
}

View File

@ -0,0 +1,190 @@
package audio
import (
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AudioInputIPCManager manages microphone input using IPC when enabled
type AudioInputIPCManager struct {
// metrics MUST be first for ARM32 alignment (contains int64 fields)
metrics AudioInputMetrics
supervisor *AudioInputSupervisor
logger zerolog.Logger
running int32
}
// NewAudioInputIPCManager creates a new IPC-based audio input manager
func NewAudioInputIPCManager() *AudioInputIPCManager {
return &AudioInputIPCManager{
supervisor: NewAudioInputSupervisor(),
logger: logging.GetDefaultLogger().With().Str("component", "audio-input-ipc").Logger(),
}
}
// Start starts the IPC-based audio input system
func (aim *AudioInputIPCManager) Start() error {
if !atomic.CompareAndSwapInt32(&aim.running, 0, 1) {
return nil // Already running
}
aim.logger.Info().Msg("Starting IPC-based audio input system")
// Start the supervisor which will launch the subprocess
err := aim.supervisor.Start()
if err != nil {
atomic.StoreInt32(&aim.running, 0)
return err
}
// Send initial configuration
config := InputIPCConfig{
SampleRate: 48000,
Channels: 2,
FrameSize: 960, // 20ms at 48kHz
}
// Wait a bit for the subprocess to be ready
time.Sleep(time.Second)
err = aim.supervisor.SendConfig(config)
if err != nil {
aim.logger.Warn().Err(err).Msg("Failed to send initial config to audio input server")
// Don't fail startup for config errors
}
aim.logger.Info().Msg("IPC-based audio input system started")
return nil
}
// Stop stops the IPC-based audio input system
func (aim *AudioInputIPCManager) Stop() {
if !atomic.CompareAndSwapInt32(&aim.running, 1, 0) {
return // Already stopped
}
aim.logger.Info().Msg("Stopping IPC-based audio input system")
// Stop the supervisor
aim.supervisor.Stop()
aim.logger.Info().Msg("IPC-based audio input system stopped")
}
// WriteOpusFrame sends an Opus frame to the audio input server via IPC
func (aim *AudioInputIPCManager) WriteOpusFrame(frame []byte) error {
if atomic.LoadInt32(&aim.running) == 0 {
return nil // Not running, silently ignore
}
if len(frame) == 0 {
return nil // Empty frame, ignore
}
// Start latency measurement
startTime := time.Now()
// Update metrics
atomic.AddInt64(&aim.metrics.FramesSent, 1)
atomic.AddInt64(&aim.metrics.BytesProcessed, int64(len(frame)))
aim.metrics.LastFrameTime = startTime
// Send frame via IPC
err := aim.supervisor.SendFrame(frame)
if err != nil {
// Count as dropped frame
atomic.AddInt64(&aim.metrics.FramesDropped, 1)
aim.logger.Debug().Err(err).Msg("Failed to send frame via IPC")
return err
}
// Calculate and update latency
latency := time.Since(startTime)
aim.updateLatencyMetrics(latency)
return nil
}
// IsRunning returns whether the IPC audio input system is running
func (aim *AudioInputIPCManager) IsRunning() bool {
return atomic.LoadInt32(&aim.running) == 1
}
// GetMetrics returns current metrics
func (aim *AudioInputIPCManager) GetMetrics() AudioInputMetrics {
return AudioInputMetrics{
FramesSent: atomic.LoadInt64(&aim.metrics.FramesSent),
FramesDropped: atomic.LoadInt64(&aim.metrics.FramesDropped),
BytesProcessed: atomic.LoadInt64(&aim.metrics.BytesProcessed),
ConnectionDrops: atomic.LoadInt64(&aim.metrics.ConnectionDrops),
AverageLatency: aim.metrics.AverageLatency, // TODO: Calculate actual latency
LastFrameTime: aim.metrics.LastFrameTime,
}
}
// updateLatencyMetrics updates the latency metrics with exponential moving average
func (aim *AudioInputIPCManager) updateLatencyMetrics(latency time.Duration) {
// Use exponential moving average for smooth latency calculation
currentAvg := aim.metrics.AverageLatency
if currentAvg == 0 {
aim.metrics.AverageLatency = latency
} else {
// EMA with alpha = 0.1 for smooth averaging
aim.metrics.AverageLatency = time.Duration(float64(currentAvg)*0.9 + float64(latency)*0.1)
}
}
// GetDetailedMetrics returns comprehensive performance metrics
func (aim *AudioInputIPCManager) GetDetailedMetrics() (AudioInputMetrics, map[string]interface{}) {
metrics := aim.GetMetrics()
// Get client frame statistics
client := aim.supervisor.GetClient()
totalFrames, droppedFrames := int64(0), int64(0)
dropRate := 0.0
if client != nil {
totalFrames, droppedFrames = client.GetFrameStats()
dropRate = client.GetDropRate()
}
// Get server statistics if available
serverStats := make(map[string]interface{})
if aim.supervisor.IsRunning() {
// Note: Server stats would need to be exposed through IPC
serverStats["status"] = "running"
} else {
serverStats["status"] = "stopped"
}
detailedStats := map[string]interface{}{
"client_total_frames": totalFrames,
"client_dropped_frames": droppedFrames,
"client_drop_rate": dropRate,
"server_stats": serverStats,
"ipc_latency_ms": float64(metrics.AverageLatency.Nanoseconds()) / 1e6,
"frames_per_second": aim.calculateFrameRate(),
}
return metrics, detailedStats
}
// calculateFrameRate calculates the current frame rate
func (aim *AudioInputIPCManager) calculateFrameRate() float64 {
framesSent := atomic.LoadInt64(&aim.metrics.FramesSent)
if framesSent == 0 {
return 0.0
}
// Estimate based on recent activity (simplified)
// In a real implementation, you'd track frames over time windows
return 50.0 // Typical Opus frame rate
}
// GetSupervisor returns the supervisor for advanced operations
func (aim *AudioInputIPCManager) GetSupervisor() *AudioInputSupervisor {
return aim.supervisor
}

View File

@ -0,0 +1,72 @@
package audio
import (
"context"
"os"
"os/signal"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
)
// IsAudioInputServerProcess detects if we're running as the audio input server subprocess
func IsAudioInputServerProcess() bool {
return os.Getenv("JETKVM_AUDIO_INPUT_SERVER") == "true"
}
// RunAudioInputServer runs the audio input server subprocess
// This should be called from main() when the subprocess is detected
func RunAudioInputServer() error {
logger := logging.GetDefaultLogger().With().Str("component", "audio-input-server").Logger()
logger.Info().Msg("Starting audio input server subprocess")
// Initialize CGO audio system
err := CGOAudioPlaybackInit()
if err != nil {
logger.Error().Err(err).Msg("Failed to initialize CGO audio playback")
return err
}
defer CGOAudioPlaybackClose()
// Create and start the IPC server
server, err := NewAudioInputServer()
if err != nil {
logger.Error().Err(err).Msg("Failed to create audio input server")
return err
}
defer server.Close()
err = server.Start()
if err != nil {
logger.Error().Err(err).Msg("Failed to start audio input server")
return err
}
logger.Info().Msg("Audio input server started, waiting for connections")
// Set up signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
// Wait for shutdown signal
select {
case sig := <-sigChan:
logger.Info().Str("signal", sig.String()).Msg("Received shutdown signal")
case <-ctx.Done():
logger.Info().Msg("Context cancelled")
}
// Graceful shutdown
logger.Info().Msg("Shutting down audio input server")
server.Stop()
// Give some time for cleanup
time.Sleep(100 * time.Millisecond)
logger.Info().Msg("Audio input server subprocess stopped")
return nil
}

View File

@ -0,0 +1,225 @@
package audio
import (
"context"
"fmt"
"os"
"os/exec"
"sync"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// AudioInputSupervisor manages the audio input server subprocess
type AudioInputSupervisor struct {
cmd *exec.Cmd
cancel context.CancelFunc
mtx sync.Mutex
running bool
logger zerolog.Logger
client *AudioInputClient
}
// NewAudioInputSupervisor creates a new audio input supervisor
func NewAudioInputSupervisor() *AudioInputSupervisor {
return &AudioInputSupervisor{
logger: logging.GetDefaultLogger().With().Str("component", "audio-input-supervisor").Logger(),
client: NewAudioInputClient(),
}
}
// Start starts the audio input server subprocess
func (ais *AudioInputSupervisor) Start() error {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.running {
return fmt.Errorf("audio input supervisor already running")
}
// Create context for subprocess management
ctx, cancel := context.WithCancel(context.Background())
ais.cancel = cancel
// Get current executable path
execPath, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to get executable path: %w", err)
}
// Create command for audio input server subprocess
cmd := exec.CommandContext(ctx, execPath)
cmd.Env = append(os.Environ(),
"JETKVM_AUDIO_INPUT_SERVER=true", // Flag to indicate this is the input server process
"JETKVM_AUDIO_INPUT_IPC=true", // Enable IPC mode
)
// Set process group to allow clean termination
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
}
ais.cmd = cmd
ais.running = true
// Start the subprocess
err = cmd.Start()
if err != nil {
ais.running = false
cancel()
return fmt.Errorf("failed to start audio input server: %w", err)
}
ais.logger.Info().Int("pid", cmd.Process.Pid).Msg("Audio input server subprocess started")
// Monitor the subprocess in a goroutine
go ais.monitorSubprocess()
// Connect client to the server
go ais.connectClient()
return nil
}
// Stop stops the audio input server subprocess
func (ais *AudioInputSupervisor) Stop() {
ais.mtx.Lock()
defer ais.mtx.Unlock()
if !ais.running {
return
}
ais.running = false
// Disconnect client first
if ais.client != nil {
ais.client.Disconnect()
}
// Cancel context to signal subprocess to stop
if ais.cancel != nil {
ais.cancel()
}
// Try graceful termination first
if ais.cmd != nil && ais.cmd.Process != nil {
ais.logger.Info().Int("pid", ais.cmd.Process.Pid).Msg("Stopping audio input server subprocess")
// Send SIGTERM
err := ais.cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
ais.logger.Warn().Err(err).Msg("Failed to send SIGTERM to audio input server")
}
// Wait for graceful shutdown with timeout
done := make(chan error, 1)
go func() {
done <- ais.cmd.Wait()
}()
select {
case <-done:
ais.logger.Info().Msg("Audio input server subprocess stopped gracefully")
case <-time.After(5 * time.Second):
// Force kill if graceful shutdown failed
ais.logger.Warn().Msg("Audio input server subprocess did not stop gracefully, force killing")
err := ais.cmd.Process.Kill()
if err != nil {
ais.logger.Error().Err(err).Msg("Failed to kill audio input server subprocess")
}
}
}
ais.cmd = nil
ais.cancel = nil
}
// IsRunning returns whether the supervisor is running
func (ais *AudioInputSupervisor) IsRunning() bool {
ais.mtx.Lock()
defer ais.mtx.Unlock()
return ais.running
}
// GetClient returns the IPC client for sending audio frames
func (ais *AudioInputSupervisor) GetClient() *AudioInputClient {
return ais.client
}
// monitorSubprocess monitors the subprocess and handles unexpected exits
func (ais *AudioInputSupervisor) monitorSubprocess() {
if ais.cmd == nil {
return
}
err := ais.cmd.Wait()
ais.mtx.Lock()
defer ais.mtx.Unlock()
if ais.running {
// Unexpected exit
if err != nil {
ais.logger.Error().Err(err).Msg("Audio input server subprocess exited unexpectedly")
} else {
ais.logger.Warn().Msg("Audio input server subprocess exited unexpectedly")
}
// Disconnect client
if ais.client != nil {
ais.client.Disconnect()
}
// Mark as not running
ais.running = false
ais.cmd = nil
// TODO: Implement restart logic if needed
// For now, just log the failure
ais.logger.Info().Msg("Audio input server subprocess monitoring stopped")
}
}
// connectClient attempts to connect the client to the server
func (ais *AudioInputSupervisor) connectClient() {
// Wait a bit for the server to start
time.Sleep(500 * time.Millisecond)
err := ais.client.Connect()
if err != nil {
ais.logger.Error().Err(err).Msg("Failed to connect to audio input server")
return
}
ais.logger.Info().Msg("Connected to audio input server")
}
// SendFrame sends an audio frame to the subprocess (convenience method)
func (ais *AudioInputSupervisor) SendFrame(frame []byte) error {
if ais.client == nil {
return fmt.Errorf("client not initialized")
}
if !ais.client.IsConnected() {
return fmt.Errorf("client not connected")
}
return ais.client.SendFrame(frame)
}
// SendConfig sends a configuration update to the subprocess (convenience method)
func (ais *AudioInputSupervisor) SendConfig(config InputIPCConfig) error {
if ais.client == nil {
return fmt.Errorf("client not initialized")
}
if !ais.client.IsConnected() {
return fmt.Errorf("client not connected")
}
return ais.client.SendConfig(config)
}

128
internal/audio/ipc.go Normal file
View File

@ -0,0 +1,128 @@
package audio
import (
"encoding/binary"
"fmt"
"io"
"net"
"os"
"path/filepath"
"sync"
"time"
)
const (
magicNumber uint32 = 0x4A4B564D // "JKVM"
socketName = "audio_output.sock"
)
type AudioServer struct {
listener net.Listener
conn net.Conn
mtx sync.Mutex
}
func NewAudioServer() (*AudioServer, error) {
socketPath := filepath.Join("/var/run", socketName)
// Remove existing socket if any
os.Remove(socketPath)
listener, err := net.Listen("unix", socketPath)
if err != nil {
return nil, fmt.Errorf("failed to create unix socket: %w", err)
}
return &AudioServer{listener: listener}, nil
}
func (s *AudioServer) Start() error {
conn, err := s.listener.Accept()
if err != nil {
return fmt.Errorf("failed to accept connection: %w", err)
}
s.conn = conn
return nil
}
func (s *AudioServer) Close() error {
if s.conn != nil {
s.conn.Close()
}
return s.listener.Close()
}
func (s *AudioServer) SendFrame(frame []byte) error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.conn == nil {
return fmt.Errorf("no client connected")
}
// Write magic number
if err := binary.Write(s.conn, binary.BigEndian, magicNumber); err != nil {
return fmt.Errorf("failed to write magic number: %w", err)
}
// Write frame size
if err := binary.Write(s.conn, binary.BigEndian, uint32(len(frame))); err != nil {
return fmt.Errorf("failed to write frame size: %w", err)
}
// Write frame data
if _, err := s.conn.Write(frame); err != nil {
return fmt.Errorf("failed to write frame data: %w", err)
}
return nil
}
type AudioClient struct {
conn net.Conn
mtx sync.Mutex
}
func NewAudioClient() (*AudioClient, error) {
socketPath := filepath.Join("/var/run", socketName)
// Try connecting multiple times as the server might not be ready
for i := 0; i < 5; i++ {
conn, err := net.Dial("unix", socketPath)
if err == nil {
return &AudioClient{conn: conn}, nil
}
time.Sleep(time.Second)
}
return nil, fmt.Errorf("failed to connect to audio server")
}
func (c *AudioClient) Close() error {
return c.conn.Close()
}
func (c *AudioClient) ReceiveFrame() ([]byte, error) {
c.mtx.Lock()
defer c.mtx.Unlock()
// Read magic number
var magic uint32
if err := binary.Read(c.conn, binary.BigEndian, &magic); err != nil {
return nil, fmt.Errorf("failed to read magic number: %w", err)
}
if magic != magicNumber {
return nil, fmt.Errorf("invalid magic number: %x", magic)
}
// Read frame size
var size uint32
if err := binary.Read(c.conn, binary.BigEndian, &size); err != nil {
return nil, fmt.Errorf("failed to read frame size: %w", err)
}
// Read frame data
frame := make([]byte, size)
if _, err := io.ReadFull(c.conn, frame); err != nil {
return nil, fmt.Errorf("failed to read frame data: %w", err)
}
return frame, nil
}

View File

@ -1,115 +0,0 @@
package audio
import (
"sync/atomic"
"unsafe"
)
var (
// Use unsafe.Pointer for atomic operations instead of mutex
globalNonBlockingManager unsafe.Pointer // *NonBlockingAudioManager
)
// loadManager atomically loads the global manager
func loadManager() *NonBlockingAudioManager {
ptr := atomic.LoadPointer(&globalNonBlockingManager)
if ptr == nil {
return nil
}
return (*NonBlockingAudioManager)(ptr)
}
// storeManager atomically stores the global manager
func storeManager(manager *NonBlockingAudioManager) {
atomic.StorePointer(&globalNonBlockingManager, unsafe.Pointer(manager))
}
// compareAndSwapManager atomically compares and swaps the global manager
func compareAndSwapManager(old, new *NonBlockingAudioManager) bool {
return atomic.CompareAndSwapPointer(&globalNonBlockingManager,
unsafe.Pointer(old), unsafe.Pointer(new))
}
// StartNonBlockingAudioStreaming starts the non-blocking audio streaming system
func StartNonBlockingAudioStreaming(send func([]byte)) error {
manager := loadManager()
if manager != nil && manager.IsOutputRunning() {
return nil // Already running, this is not an error
}
if manager == nil {
newManager := NewNonBlockingAudioManager()
if !compareAndSwapManager(nil, newManager) {
// Another goroutine created manager, use it
manager = loadManager()
} else {
manager = newManager
}
}
return manager.StartAudioOutput(send)
}
// StartNonBlockingAudioInput starts the non-blocking audio input system
func StartNonBlockingAudioInput(receiveChan <-chan []byte) error {
manager := loadManager()
if manager == nil {
newManager := NewNonBlockingAudioManager()
if !compareAndSwapManager(nil, newManager) {
// Another goroutine created manager, use it
manager = loadManager()
} else {
manager = newManager
}
}
// Check if input is already running to avoid unnecessary operations
if manager.IsInputRunning() {
return nil // Already running, this is not an error
}
return manager.StartAudioInput(receiveChan)
}
// StopNonBlockingAudioStreaming stops the non-blocking audio streaming system
func StopNonBlockingAudioStreaming() {
manager := loadManager()
if manager != nil {
manager.Stop()
storeManager(nil)
}
}
// StopNonBlockingAudioInput stops only the audio input without affecting output
func StopNonBlockingAudioInput() {
manager := loadManager()
if manager != nil && manager.IsInputRunning() {
manager.StopAudioInput()
// If both input and output are stopped, recreate manager to ensure clean state
if !manager.IsRunning() {
storeManager(nil)
}
}
}
// GetNonBlockingAudioStats returns statistics from the non-blocking audio system
func GetNonBlockingAudioStats() NonBlockingAudioStats {
manager := loadManager()
if manager != nil {
return manager.GetStats()
}
return NonBlockingAudioStats{}
}
// IsNonBlockingAudioRunning returns true if the non-blocking audio system is running
func IsNonBlockingAudioRunning() bool {
manager := loadManager()
return manager != nil && manager.IsRunning()
}
// IsNonBlockingAudioInputRunning returns true if the non-blocking audio input is running
func IsNonBlockingAudioInputRunning() bool {
manager := loadManager()
return manager != nil && manager.IsInputRunning()
}

View File

@ -1,564 +0,0 @@
package audio
import (
"context"
"errors"
// "runtime" // removed: no longer directly pinning OS thread here; batching handles it
"sync"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
// NonBlockingAudioManager manages audio operations in separate worker threads
// to prevent blocking of mouse/keyboard operations
type NonBlockingAudioManager struct {
// Statistics - MUST be first for ARM32 alignment (int64 fields need 8-byte alignment)
stats NonBlockingAudioStats
// Control
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *zerolog.Logger
// Audio output (capture from device, send to WebRTC)
outputSendFunc func([]byte)
outputWorkChan chan audioWorkItem
outputResultChan chan audioResult
// Audio input (receive from WebRTC, playback to device)
inputReceiveChan <-chan []byte
inputWorkChan chan audioWorkItem
inputResultChan chan audioResult
// Worker threads and flags - int32 fields grouped together
outputRunning int32
inputRunning int32
outputWorkerRunning int32
inputWorkerRunning int32
}
type audioWorkItem struct {
workType audioWorkType
data []byte
resultChan chan audioResult
}
type audioWorkType int
const (
audioWorkInit audioWorkType = iota
audioWorkReadEncode
audioWorkDecodeWrite
audioWorkClose
)
type audioResult struct {
success bool
data []byte
length int
err error
}
type NonBlockingAudioStats struct {
// int64 fields MUST be first for ARM32 alignment
OutputFramesProcessed int64
OutputFramesDropped int64
InputFramesProcessed int64
InputFramesDropped int64
WorkerErrors int64
// time.Time is int64 internally, so it's also aligned
LastProcessTime time.Time
}
// NewNonBlockingAudioManager creates a new non-blocking audio manager
func NewNonBlockingAudioManager() *NonBlockingAudioManager {
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "nonblocking-audio").Logger()
return &NonBlockingAudioManager{
ctx: ctx,
cancel: cancel,
logger: &logger,
outputWorkChan: make(chan audioWorkItem, 10), // Buffer for work items
outputResultChan: make(chan audioResult, 10), // Buffer for results
inputWorkChan: make(chan audioWorkItem, 10),
inputResultChan: make(chan audioResult, 10),
}
}
// StartAudioOutput starts non-blocking audio output (capture and encode)
func (nam *NonBlockingAudioManager) StartAudioOutput(sendFunc func([]byte)) error {
if !atomic.CompareAndSwapInt32(&nam.outputRunning, 0, 1) {
return ErrAudioAlreadyRunning
}
nam.outputSendFunc = sendFunc
// Enable batch audio processing for performance
EnableBatchAudioProcessing()
// Start the blocking worker thread
nam.wg.Add(1)
go nam.outputWorkerThread()
// Start the non-blocking coordinator
nam.wg.Add(1)
go nam.outputCoordinatorThread()
nam.logger.Info().Msg("non-blocking audio output started with batch processing")
return nil
}
// StartAudioInput starts non-blocking audio input (receive and decode)
func (nam *NonBlockingAudioManager) StartAudioInput(receiveChan <-chan []byte) error {
if !atomic.CompareAndSwapInt32(&nam.inputRunning, 0, 1) {
return ErrAudioAlreadyRunning
}
nam.inputReceiveChan = receiveChan
// Enable batch audio processing for performance
EnableBatchAudioProcessing()
// Start the blocking worker thread
nam.wg.Add(1)
go nam.inputWorkerThread()
// Start the non-blocking coordinator
nam.wg.Add(1)
go nam.inputCoordinatorThread()
nam.logger.Info().Msg("non-blocking audio input started with batch processing")
return nil
}
// outputWorkerThread handles all blocking audio output operations
func (nam *NonBlockingAudioManager) outputWorkerThread() {
defer nam.wg.Done()
defer atomic.StoreInt32(&nam.outputWorkerRunning, 0)
atomic.StoreInt32(&nam.outputWorkerRunning, 1)
nam.logger.Debug().Msg("output worker thread started")
// Initialize audio in worker thread
if err := CGOAudioInit(); err != nil {
nam.logger.Error().Err(err).Msg("failed to initialize audio in worker thread")
return
}
defer CGOAudioClose()
// Use buffer pool to avoid allocations
buf := GetAudioFrameBuffer()
defer PutAudioFrameBuffer(buf)
for {
select {
case <-nam.ctx.Done():
nam.logger.Debug().Msg("output worker thread stopping")
return
case workItem := <-nam.outputWorkChan:
switch workItem.workType {
case audioWorkReadEncode:
n, err := BatchCGOAudioReadEncode(buf)
result := audioResult{
success: err == nil,
length: n,
err: err,
}
if err == nil && n > 0 {
// Get buffer from pool and copy data
resultBuf := GetAudioFrameBuffer()
copy(resultBuf[:n], buf[:n])
result.data = resultBuf[:n]
}
// Send result back (non-blocking)
select {
case workItem.resultChan <- result:
case <-nam.ctx.Done():
return
default:
// Drop result if coordinator is not ready
if result.data != nil {
PutAudioFrameBuffer(result.data)
}
atomic.AddInt64(&nam.stats.OutputFramesDropped, 1)
}
case audioWorkClose:
nam.logger.Debug().Msg("output worker received close signal")
return
}
}
}
}
// outputCoordinatorThread coordinates audio output without blocking
func (nam *NonBlockingAudioManager) outputCoordinatorThread() {
defer nam.wg.Done()
defer atomic.StoreInt32(&nam.outputRunning, 0)
nam.logger.Debug().Msg("output coordinator thread started")
ticker := time.NewTicker(20 * time.Millisecond) // Match frame timing
defer ticker.Stop()
pendingWork := false
resultChan := make(chan audioResult, 1)
for atomic.LoadInt32(&nam.outputRunning) == 1 {
select {
case <-nam.ctx.Done():
nam.logger.Debug().Msg("output coordinator stopping")
return
case <-ticker.C:
// Only submit work if worker is ready and no pending work
if !pendingWork && atomic.LoadInt32(&nam.outputWorkerRunning) == 1 {
if IsAudioMuted() {
continue // Skip when muted
}
workItem := audioWorkItem{
workType: audioWorkReadEncode,
resultChan: resultChan,
}
// Submit work (non-blocking)
select {
case nam.outputWorkChan <- workItem:
pendingWork = true
default:
// Worker is busy, drop this frame
atomic.AddInt64(&nam.stats.OutputFramesDropped, 1)
}
}
case result := <-resultChan:
pendingWork = false
nam.stats.LastProcessTime = time.Now()
if result.success && result.data != nil && result.length > 0 {
// Send to WebRTC (non-blocking)
if nam.outputSendFunc != nil {
nam.outputSendFunc(result.data)
atomic.AddInt64(&nam.stats.OutputFramesProcessed, 1)
RecordFrameReceived(result.length)
}
// Return buffer to pool after use
PutAudioFrameBuffer(result.data)
} else if result.success && result.length == 0 {
// No data available - this is normal, not an error
// Just continue without logging or counting as error
} else {
atomic.AddInt64(&nam.stats.OutputFramesDropped, 1)
atomic.AddInt64(&nam.stats.WorkerErrors, 1)
if result.err != nil {
nam.logger.Warn().Err(result.err).Msg("audio output worker error")
}
// Clean up buffer if present
if result.data != nil {
PutAudioFrameBuffer(result.data)
}
RecordFrameDropped()
}
}
}
// Signal worker to close
select {
case nam.outputWorkChan <- audioWorkItem{workType: audioWorkClose}:
case <-time.After(100 * time.Millisecond):
nam.logger.Warn().Msg("timeout signaling output worker to close")
}
nam.logger.Info().Msg("output coordinator thread stopped")
}
// inputWorkerThread handles all blocking audio input operations
func (nam *NonBlockingAudioManager) inputWorkerThread() {
defer nam.wg.Done()
// Cleanup CGO resources properly to avoid double-close scenarios
// The outputWorkerThread's CGOAudioClose() will handle all cleanup
atomic.StoreInt32(&nam.inputWorkerRunning, 0)
atomic.StoreInt32(&nam.inputWorkerRunning, 1)
nam.logger.Debug().Msg("input worker thread started")
// Initialize audio playback in worker thread
if err := CGOAudioPlaybackInit(); err != nil {
nam.logger.Error().Err(err).Msg("failed to initialize audio playback in worker thread")
return
}
// Ensure CGO cleanup happens even if we exit unexpectedly
cgoInitialized := true
defer func() {
if cgoInitialized {
nam.logger.Debug().Msg("cleaning up CGO audio playback")
// Add extra safety: ensure no more CGO calls can happen
atomic.StoreInt32(&nam.inputWorkerRunning, 0)
// Note: Don't call CGOAudioPlaybackClose() here to avoid double-close
// The outputWorkerThread's CGOAudioClose() will handle all cleanup
}
}()
for {
// If coordinator has stopped, exit worker loop
if atomic.LoadInt32(&nam.inputRunning) == 0 {
return
}
select {
case <-nam.ctx.Done():
nam.logger.Debug().Msg("input worker thread stopping due to context cancellation")
return
case workItem := <-nam.inputWorkChan:
switch workItem.workType {
case audioWorkDecodeWrite:
// Check if we're still supposed to be running before processing
if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 || atomic.LoadInt32(&nam.inputRunning) == 0 {
nam.logger.Debug().Msg("input worker stopping, ignoring decode work")
// Do not send to resultChan; coordinator may have exited
return
}
// Validate input data before CGO call
if workItem.data == nil || len(workItem.data) == 0 {
result := audioResult{
success: false,
err: errors.New("invalid audio data"),
}
// Check if coordinator is still running before sending result
if atomic.LoadInt32(&nam.inputRunning) == 1 {
select {
case workItem.resultChan <- result:
case <-nam.ctx.Done():
return
case <-time.After(10 * time.Millisecond):
// Timeout - coordinator may have stopped, drop result
atomic.AddInt64(&nam.stats.InputFramesDropped, 1)
}
} else {
// Coordinator has stopped, drop result
atomic.AddInt64(&nam.stats.InputFramesDropped, 1)
}
continue
}
// Perform blocking CGO operation with panic recovery
var result audioResult
func() {
defer func() {
if r := recover(); r != nil {
nam.logger.Error().Interface("panic", r).Msg("CGO decode write panic recovered")
result = audioResult{
success: false,
err: errors.New("CGO decode write panic"),
}
}
}()
// Double-check we're still running before CGO call
if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 {
result = audioResult{success: false, err: errors.New("worker shutting down")}
return
}
n, err := BatchCGOAudioDecodeWrite(workItem.data)
result = audioResult{
success: err == nil,
length: n,
err: err,
}
}()
// Send result back (non-blocking) - check if coordinator is still running
if atomic.LoadInt32(&nam.inputRunning) == 1 {
select {
case workItem.resultChan <- result:
case <-nam.ctx.Done():
return
case <-time.After(10 * time.Millisecond):
// Timeout - coordinator may have stopped, drop result
atomic.AddInt64(&nam.stats.InputFramesDropped, 1)
}
} else {
// Coordinator has stopped, drop result
atomic.AddInt64(&nam.stats.InputFramesDropped, 1)
}
case audioWorkClose:
nam.logger.Debug().Msg("input worker received close signal")
return
}
}
}
}
// inputCoordinatorThread coordinates audio input without blocking
func (nam *NonBlockingAudioManager) inputCoordinatorThread() {
defer nam.wg.Done()
defer atomic.StoreInt32(&nam.inputRunning, 0)
nam.logger.Debug().Msg("input coordinator thread started")
resultChan := make(chan audioResult, 1)
// Do not close resultChan to avoid races with worker sends during shutdown
for atomic.LoadInt32(&nam.inputRunning) == 1 {
select {
case <-nam.ctx.Done():
nam.logger.Debug().Msg("input coordinator stopping")
return
case frame := <-nam.inputReceiveChan:
if len(frame) == 0 {
continue
}
// Submit work to worker (non-blocking)
if atomic.LoadInt32(&nam.inputWorkerRunning) == 1 {
workItem := audioWorkItem{
workType: audioWorkDecodeWrite,
data: frame,
resultChan: resultChan,
}
select {
case nam.inputWorkChan <- workItem:
// Wait for result with timeout and context cancellation
select {
case result := <-resultChan:
if result.success {
atomic.AddInt64(&nam.stats.InputFramesProcessed, 1)
} else {
atomic.AddInt64(&nam.stats.InputFramesDropped, 1)
atomic.AddInt64(&nam.stats.WorkerErrors, 1)
if result.err != nil {
nam.logger.Warn().Err(result.err).Msg("audio input worker error")
}
}
case <-nam.ctx.Done():
nam.logger.Debug().Msg("input coordinator stopping during result wait")
return
case <-time.After(50 * time.Millisecond):
// Timeout waiting for result
atomic.AddInt64(&nam.stats.InputFramesDropped, 1)
nam.logger.Warn().Msg("timeout waiting for input worker result")
// Drain any pending result to prevent worker blocking
select {
case <-resultChan:
default:
}
}
default:
// Worker is busy, drop this frame
atomic.AddInt64(&nam.stats.InputFramesDropped, 1)
}
}
case <-time.After(250 * time.Millisecond):
// Periodic timeout to prevent blocking
continue
}
}
// Avoid sending close signals or touching channels here; inputRunning=0 will stop worker via checks
nam.logger.Info().Msg("input coordinator thread stopped")
}
// Stop stops all audio operations
func (nam *NonBlockingAudioManager) Stop() {
nam.logger.Info().Msg("stopping non-blocking audio manager")
// Signal all threads to stop
nam.cancel()
// Stop coordinators
atomic.StoreInt32(&nam.outputRunning, 0)
atomic.StoreInt32(&nam.inputRunning, 0)
// Wait for all goroutines to finish
nam.wg.Wait()
// Disable batch processing to free resources
DisableBatchAudioProcessing()
nam.logger.Info().Msg("non-blocking audio manager stopped")
}
// StopAudioInput stops only the audio input operations
func (nam *NonBlockingAudioManager) StopAudioInput() {
nam.logger.Info().Msg("stopping audio input")
// Stop only the input coordinator
atomic.StoreInt32(&nam.inputRunning, 0)
// Drain the receive channel to prevent blocking senders
go func() {
for {
select {
case <-nam.inputReceiveChan:
// Drain any remaining frames
case <-time.After(100 * time.Millisecond):
return
}
}
}()
// Wait for the worker to actually stop to prevent race conditions
timeout := time.After(2 * time.Second)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timeout:
nam.logger.Warn().Msg("timeout waiting for input worker to stop")
return
case <-ticker.C:
if atomic.LoadInt32(&nam.inputWorkerRunning) == 0 {
nam.logger.Info().Msg("audio input stopped successfully")
// Close ALSA playback resources now that input worker has stopped
CGOAudioPlaybackClose()
return
}
}
}
}
// GetStats returns current statistics
func (nam *NonBlockingAudioManager) GetStats() NonBlockingAudioStats {
return NonBlockingAudioStats{
OutputFramesProcessed: atomic.LoadInt64(&nam.stats.OutputFramesProcessed),
OutputFramesDropped: atomic.LoadInt64(&nam.stats.OutputFramesDropped),
InputFramesProcessed: atomic.LoadInt64(&nam.stats.InputFramesProcessed),
InputFramesDropped: atomic.LoadInt64(&nam.stats.InputFramesDropped),
WorkerErrors: atomic.LoadInt64(&nam.stats.WorkerErrors),
LastProcessTime: nam.stats.LastProcessTime,
}
}
// IsRunning returns true if any audio operations are running
func (nam *NonBlockingAudioManager) IsRunning() bool {
return atomic.LoadInt32(&nam.outputRunning) == 1 || atomic.LoadInt32(&nam.inputRunning) == 1
}
// IsInputRunning returns true if audio input is running
func (nam *NonBlockingAudioManager) IsInputRunning() bool {
return atomic.LoadInt32(&nam.inputRunning) == 1
}
// IsOutputRunning returns true if audio output is running
func (nam *NonBlockingAudioManager) IsOutputRunning() bool {
return atomic.LoadInt32(&nam.outputRunning) == 1
}

View File

@ -0,0 +1,91 @@
package audio
import (
"context"
"sync/atomic"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
var (
outputStreamingRunning int32
outputStreamingCancel context.CancelFunc
outputStreamingLogger *zerolog.Logger
)
func init() {
logger := logging.GetDefaultLogger().With().Str("component", "audio-output").Logger()
outputStreamingLogger = &logger
}
// StartAudioOutputStreaming starts audio output streaming (capturing system audio)
func StartAudioOutputStreaming(send func([]byte)) error {
if !atomic.CompareAndSwapInt32(&outputStreamingRunning, 0, 1) {
return ErrAudioAlreadyRunning
}
// Initialize CGO audio capture
if err := CGOAudioInit(); err != nil {
atomic.StoreInt32(&outputStreamingRunning, 0)
return err
}
ctx, cancel := context.WithCancel(context.Background())
outputStreamingCancel = cancel
// Start audio capture loop
go func() {
defer func() {
CGOAudioClose()
atomic.StoreInt32(&outputStreamingRunning, 0)
outputStreamingLogger.Info().Msg("Audio output streaming stopped")
}()
outputStreamingLogger.Info().Msg("Audio output streaming started")
buffer := make([]byte, MaxAudioFrameSize)
for {
select {
case <-ctx.Done():
return
default:
// Capture audio frame
n, err := CGOAudioReadEncode(buffer)
if err != nil {
outputStreamingLogger.Warn().Err(err).Msg("Failed to read/encode audio")
continue
}
if n > 0 {
// Send frame to callback
frame := make([]byte, n)
copy(frame, buffer[:n])
send(frame)
RecordFrameReceived(n)
}
// Small delay to prevent busy waiting
time.Sleep(10 * time.Millisecond)
}
}
}()
return nil
}
// StopAudioOutputStreaming stops audio output streaming
func StopAudioOutputStreaming() {
if atomic.LoadInt32(&outputStreamingRunning) == 0 {
return
}
if outputStreamingCancel != nil {
outputStreamingCancel()
outputStreamingCancel = nil
}
// Wait for streaming to stop
for atomic.LoadInt32(&outputStreamingRunning) == 1 {
time.Sleep(10 * time.Millisecond)
}
}

198
internal/audio/relay.go Normal file
View File

@ -0,0 +1,198 @@
package audio
import (
"context"
"sync"
"github.com/jetkvm/kvm/internal/logging"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/rs/zerolog"
)
// AudioRelay handles forwarding audio frames from the audio server subprocess
// to WebRTC without any CGO audio processing. This runs in the main process.
type AudioRelay struct {
client *AudioClient
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
logger *zerolog.Logger
running bool
mutex sync.RWMutex
// WebRTC integration
audioTrack AudioTrackWriter
config AudioConfig
muted bool
// Statistics
framesRelayed int64
framesDropped int64
}
// AudioTrackWriter interface for WebRTC audio track
type AudioTrackWriter interface {
WriteSample(sample media.Sample) error
}
// NewAudioRelay creates a new audio relay for the main process
func NewAudioRelay() *AudioRelay {
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "audio-relay").Logger()
return &AudioRelay{
ctx: ctx,
cancel: cancel,
logger: &logger,
}
}
// Start begins the audio relay process
func (r *AudioRelay) Start(audioTrack AudioTrackWriter, config AudioConfig) error {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.running {
return nil // Already running
}
// Create audio client to connect to subprocess
client, err := NewAudioClient()
if err != nil {
return err
}
r.client = client
r.audioTrack = audioTrack
r.config = config
// Start relay goroutine
r.wg.Add(1)
go r.relayLoop()
r.running = true
r.logger.Info().Msg("Audio relay started")
return nil
}
// Stop stops the audio relay
func (r *AudioRelay) Stop() {
r.mutex.Lock()
defer r.mutex.Unlock()
if !r.running {
return
}
r.cancel()
r.wg.Wait()
if r.client != nil {
r.client.Close()
r.client = nil
}
r.running = false
r.logger.Info().Msg("Audio relay stopped")
}
// SetMuted sets the mute state
func (r *AudioRelay) SetMuted(muted bool) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.muted = muted
}
// IsMuted returns the current mute state (checks both relay and global mute)
func (r *AudioRelay) IsMuted() bool {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.muted || IsAudioMuted()
}
// GetStats returns relay statistics
func (r *AudioRelay) GetStats() (framesRelayed, framesDropped int64) {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.framesRelayed, r.framesDropped
}
// UpdateTrack updates the WebRTC audio track for the relay
func (r *AudioRelay) UpdateTrack(audioTrack AudioTrackWriter) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.audioTrack = audioTrack
}
// relayLoop is the main relay loop that forwards frames from subprocess to WebRTC
func (r *AudioRelay) relayLoop() {
defer r.wg.Done()
r.logger.Debug().Msg("Audio relay loop started")
for {
select {
case <-r.ctx.Done():
r.logger.Debug().Msg("Audio relay loop stopping")
return
default:
// Receive frame from audio server subprocess
frame, err := r.client.ReceiveFrame()
if err != nil {
r.logger.Error().Err(err).Msg("Failed to receive audio frame")
r.incrementDropped()
continue
}
// Forward frame to WebRTC
if err := r.forwardToWebRTC(frame); err != nil {
r.logger.Warn().Err(err).Msg("Failed to forward frame to WebRTC")
r.incrementDropped()
} else {
r.incrementRelayed()
}
}
}
}
// forwardToWebRTC forwards a frame to the WebRTC audio track
func (r *AudioRelay) forwardToWebRTC(frame []byte) error {
r.mutex.RLock()
audioTrack := r.audioTrack
config := r.config
muted := r.muted
r.mutex.RUnlock()
if audioTrack == nil {
return nil // No audio track available
}
// Prepare sample data
var sampleData []byte
if muted {
// Send silence when muted
sampleData = make([]byte, len(frame))
} else {
sampleData = frame
}
// Write sample to WebRTC track
return audioTrack.WriteSample(media.Sample{
Data: sampleData,
Duration: config.FrameSize,
})
}
// incrementRelayed atomically increments the relayed frames counter
func (r *AudioRelay) incrementRelayed() {
r.mutex.Lock()
r.framesRelayed++
r.mutex.Unlock()
}
// incrementDropped atomically increments the dropped frames counter
func (r *AudioRelay) incrementDropped() {
r.mutex.Lock()
r.framesDropped++
r.mutex.Unlock()
}

109
internal/audio/relay_api.go Normal file
View File

@ -0,0 +1,109 @@
package audio
import (
"sync"
)
// Global relay instance for the main process
var (
globalRelay *AudioRelay
relayMutex sync.RWMutex
)
// StartAudioRelay starts the audio relay system for the main process
// This replaces the CGO-based audio system when running in main process mode
// audioTrack can be nil initially and updated later via UpdateAudioRelayTrack
func StartAudioRelay(audioTrack AudioTrackWriter) error {
relayMutex.Lock()
defer relayMutex.Unlock()
if globalRelay != nil {
return nil // Already running
}
// Create new relay
relay := NewAudioRelay()
// Get current audio config
config := GetAudioConfig()
// Start the relay (audioTrack can be nil initially)
if err := relay.Start(audioTrack, config); err != nil {
return err
}
globalRelay = relay
return nil
}
// StopAudioRelay stops the audio relay system
func StopAudioRelay() {
relayMutex.Lock()
defer relayMutex.Unlock()
if globalRelay != nil {
globalRelay.Stop()
globalRelay = nil
}
}
// SetAudioRelayMuted sets the mute state for the audio relay
func SetAudioRelayMuted(muted bool) {
relayMutex.RLock()
defer relayMutex.RUnlock()
if globalRelay != nil {
globalRelay.SetMuted(muted)
}
}
// IsAudioRelayMuted returns the current mute state of the audio relay
func IsAudioRelayMuted() bool {
relayMutex.RLock()
defer relayMutex.RUnlock()
if globalRelay != nil {
return globalRelay.IsMuted()
}
return false
}
// GetAudioRelayStats returns statistics from the audio relay
func GetAudioRelayStats() (framesRelayed, framesDropped int64) {
relayMutex.RLock()
defer relayMutex.RUnlock()
if globalRelay != nil {
return globalRelay.GetStats()
}
return 0, 0
}
// IsAudioRelayRunning returns whether the audio relay is currently running
func IsAudioRelayRunning() bool {
relayMutex.RLock()
defer relayMutex.RUnlock()
return globalRelay != nil
}
// UpdateAudioRelayTrack updates the WebRTC audio track for the relay
func UpdateAudioRelayTrack(audioTrack AudioTrackWriter) error {
relayMutex.Lock()
defer relayMutex.Unlock()
if globalRelay == nil {
// No relay running, start one with the provided track
relay := NewAudioRelay()
config := GetAudioConfig()
if err := relay.Start(audioTrack, config); err != nil {
return err
}
globalRelay = relay
return nil
}
// Update the track in the existing relay
globalRelay.UpdateTrack(audioTrack)
return nil
}

View File

@ -0,0 +1,400 @@
//go:build cgo
// +build cgo
package audio
import (
"context"
"fmt"
"os"
"os/exec"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/jetkvm/kvm/internal/logging"
"github.com/rs/zerolog"
)
const (
// Maximum number of restart attempts within the restart window
maxRestartAttempts = 5
// Time window for counting restart attempts
restartWindow = 5 * time.Minute
// Delay between restart attempts
restartDelay = 2 * time.Second
// Maximum restart delay (exponential backoff)
maxRestartDelay = 30 * time.Second
)
// AudioServerSupervisor manages the audio server subprocess lifecycle
type AudioServerSupervisor struct {
ctx context.Context
cancel context.CancelFunc
logger *zerolog.Logger
mutex sync.RWMutex
running int32
// Process management
cmd *exec.Cmd
processPID int
// Restart management
restartAttempts []time.Time
lastExitCode int
lastExitTime time.Time
// Channels for coordination
processDone chan struct{}
stopChan chan struct{}
// Callbacks
onProcessStart func(pid int)
onProcessExit func(pid int, exitCode int, crashed bool)
onRestart func(attempt int, delay time.Duration)
}
// NewAudioServerSupervisor creates a new audio server supervisor
func NewAudioServerSupervisor() *AudioServerSupervisor {
ctx, cancel := context.WithCancel(context.Background())
logger := logging.GetDefaultLogger().With().Str("component", "audio-supervisor").Logger()
return &AudioServerSupervisor{
ctx: ctx,
cancel: cancel,
logger: &logger,
processDone: make(chan struct{}),
stopChan: make(chan struct{}),
}
}
// SetCallbacks sets optional callbacks for process lifecycle events
func (s *AudioServerSupervisor) SetCallbacks(
onStart func(pid int),
onExit func(pid int, exitCode int, crashed bool),
onRestart func(attempt int, delay time.Duration),
) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.onProcessStart = onStart
s.onProcessExit = onExit
s.onRestart = onRestart
}
// Start begins supervising the audio server process
func (s *AudioServerSupervisor) Start() error {
if !atomic.CompareAndSwapInt32(&s.running, 0, 1) {
return fmt.Errorf("supervisor already running")
}
s.logger.Info().Msg("starting audio server supervisor")
// Start the supervision loop
go s.supervisionLoop()
return nil
}
// Stop gracefully stops the audio server and supervisor
func (s *AudioServerSupervisor) Stop() error {
if !atomic.CompareAndSwapInt32(&s.running, 1, 0) {
return nil // Already stopped
}
s.logger.Info().Msg("stopping audio server supervisor")
// Signal stop and wait for cleanup
close(s.stopChan)
s.cancel()
// Wait for process to exit
select {
case <-s.processDone:
s.logger.Info().Msg("audio server process stopped gracefully")
case <-time.After(10 * time.Second):
s.logger.Warn().Msg("audio server process did not stop gracefully, forcing termination")
s.forceKillProcess()
}
return nil
}
// IsRunning returns true if the supervisor is running
func (s *AudioServerSupervisor) IsRunning() bool {
return atomic.LoadInt32(&s.running) == 1
}
// GetProcessPID returns the current process PID (0 if not running)
func (s *AudioServerSupervisor) GetProcessPID() int {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.processPID
}
// GetLastExitInfo returns information about the last process exit
func (s *AudioServerSupervisor) GetLastExitInfo() (exitCode int, exitTime time.Time) {
s.mutex.RLock()
defer s.mutex.RUnlock()
return s.lastExitCode, s.lastExitTime
}
// supervisionLoop is the main supervision loop
func (s *AudioServerSupervisor) supervisionLoop() {
defer func() {
close(s.processDone)
s.logger.Info().Msg("audio server supervision ended")
}()
for atomic.LoadInt32(&s.running) == 1 {
select {
case <-s.stopChan:
s.logger.Info().Msg("received stop signal")
s.terminateProcess()
return
case <-s.ctx.Done():
s.logger.Info().Msg("context cancelled")
s.terminateProcess()
return
default:
// Start or restart the process
if err := s.startProcess(); err != nil {
s.logger.Error().Err(err).Msg("failed to start audio server process")
// Check if we should attempt restart
if !s.shouldRestart() {
s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor")
return
}
delay := s.calculateRestartDelay()
s.logger.Warn().Dur("delay", delay).Msg("retrying process start after delay")
if s.onRestart != nil {
s.onRestart(len(s.restartAttempts), delay)
}
select {
case <-time.After(delay):
case <-s.stopChan:
return
case <-s.ctx.Done():
return
}
continue
}
// Wait for process to exit
s.waitForProcessExit()
// Check if we should restart
if !s.shouldRestart() {
s.logger.Error().Msg("maximum restart attempts exceeded, stopping supervisor")
return
}
// Calculate restart delay
delay := s.calculateRestartDelay()
s.logger.Info().Dur("delay", delay).Msg("restarting audio server process after delay")
if s.onRestart != nil {
s.onRestart(len(s.restartAttempts), delay)
}
// Wait for restart delay
select {
case <-time.After(delay):
case <-s.stopChan:
return
case <-s.ctx.Done():
return
}
}
}
}
// startProcess starts the audio server process
func (s *AudioServerSupervisor) startProcess() error {
execPath, err := os.Executable()
if err != nil {
return fmt.Errorf("failed to get executable path: %w", err)
}
s.mutex.Lock()
defer s.mutex.Unlock()
// Create new command
s.cmd = exec.CommandContext(s.ctx, execPath, "--audio-server")
s.cmd.Stdout = os.Stdout
s.cmd.Stderr = os.Stderr
// Start the process
if err := s.cmd.Start(); err != nil {
return fmt.Errorf("failed to start process: %w", err)
}
s.processPID = s.cmd.Process.Pid
s.logger.Info().Int("pid", s.processPID).Msg("audio server process started")
if s.onProcessStart != nil {
s.onProcessStart(s.processPID)
}
return nil
}
// waitForProcessExit waits for the current process to exit and logs the result
func (s *AudioServerSupervisor) waitForProcessExit() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil {
return
}
// Wait for process to exit
err := cmd.Wait()
s.mutex.Lock()
s.lastExitTime = time.Now()
s.processPID = 0
var exitCode int
var crashed bool
if err != nil {
if exitError, ok := err.(*exec.ExitError); ok {
exitCode = exitError.ExitCode()
crashed = exitCode != 0
} else {
// Process was killed or other error
exitCode = -1
crashed = true
}
} else {
exitCode = 0
crashed = false
}
s.lastExitCode = exitCode
s.mutex.Unlock()
if crashed {
s.logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio server process crashed")
s.recordRestartAttempt()
} else {
s.logger.Info().Int("pid", pid).Msg("audio server process exited gracefully")
}
if s.onProcessExit != nil {
s.onProcessExit(pid, exitCode, crashed)
}
}
// terminateProcess gracefully terminates the current process
func (s *AudioServerSupervisor) terminateProcess() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
s.logger.Info().Int("pid", pid).Msg("terminating audio server process")
// Send SIGTERM first
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
s.logger.Warn().Err(err).Int("pid", pid).Msg("failed to send SIGTERM")
}
// Wait for graceful shutdown
done := make(chan struct{})
go func() {
cmd.Wait()
close(done)
}()
select {
case <-done:
s.logger.Info().Int("pid", pid).Msg("audio server process terminated gracefully")
case <-time.After(5 * time.Second):
s.logger.Warn().Int("pid", pid).Msg("process did not terminate gracefully, sending SIGKILL")
s.forceKillProcess()
}
}
// forceKillProcess forcefully kills the current process
func (s *AudioServerSupervisor) forceKillProcess() {
s.mutex.RLock()
cmd := s.cmd
pid := s.processPID
s.mutex.RUnlock()
if cmd == nil || cmd.Process == nil {
return
}
s.logger.Warn().Int("pid", pid).Msg("force killing audio server process")
if err := cmd.Process.Kill(); err != nil {
s.logger.Error().Err(err).Int("pid", pid).Msg("failed to kill process")
}
}
// shouldRestart determines if the process should be restarted
func (s *AudioServerSupervisor) shouldRestart() bool {
if atomic.LoadInt32(&s.running) == 0 {
return false // Supervisor is stopping
}
s.mutex.RLock()
defer s.mutex.RUnlock()
// Clean up old restart attempts outside the window
now := time.Now()
var recentAttempts []time.Time
for _, attempt := range s.restartAttempts {
if now.Sub(attempt) < restartWindow {
recentAttempts = append(recentAttempts, attempt)
}
}
s.restartAttempts = recentAttempts
return len(s.restartAttempts) < maxRestartAttempts
}
// recordRestartAttempt records a restart attempt
func (s *AudioServerSupervisor) recordRestartAttempt() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.restartAttempts = append(s.restartAttempts, time.Now())
}
// calculateRestartDelay calculates the delay before next restart attempt
func (s *AudioServerSupervisor) calculateRestartDelay() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
// Exponential backoff based on recent restart attempts
attempts := len(s.restartAttempts)
if attempts == 0 {
return restartDelay
}
// Calculate exponential backoff: 2^attempts * base delay
delay := restartDelay
for i := 0; i < attempts && delay < maxRestartDelay; i++ {
delay *= 2
}
if delay > maxRestartDelay {
delay = maxRestartDelay
}
return delay
}

164
main.go
View File

@ -2,6 +2,8 @@ package kvm
import ( import (
"context" "context"
"flag"
"fmt"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
@ -10,12 +12,130 @@ import (
"github.com/gwatts/rootcerts" "github.com/gwatts/rootcerts"
"github.com/jetkvm/kvm/internal/audio" "github.com/jetkvm/kvm/internal/audio"
"github.com/pion/webrtc/v4/pkg/media"
) )
var appCtx context.Context var (
appCtx context.Context
isAudioServer bool
audioProcessDone chan struct{}
audioSupervisor *audio.AudioServerSupervisor
)
func init() {
flag.BoolVar(&isAudioServer, "audio-server", false, "Run as audio server subprocess")
audioProcessDone = make(chan struct{})
}
func runAudioServer() {
logger.Info().Msg("Starting audio server subprocess")
// Create audio server
server, err := audio.NewAudioServer()
if err != nil {
logger.Error().Err(err).Msg("failed to create audio server")
os.Exit(1)
}
defer server.Close()
// Start accepting connections
if err := server.Start(); err != nil {
logger.Error().Err(err).Msg("failed to start audio server")
os.Exit(1)
}
// Initialize audio processing
err = audio.StartNonBlockingAudioStreaming(func(frame []byte) {
if err := server.SendFrame(frame); err != nil {
logger.Warn().Err(err).Msg("failed to send audio frame")
audio.RecordFrameDropped()
}
})
if err != nil {
logger.Error().Err(err).Msg("failed to start audio processing")
os.Exit(1)
}
// Wait for termination signal
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
// Cleanup
audio.StopNonBlockingAudioStreaming()
logger.Info().Msg("Audio server subprocess stopped")
}
func startAudioSubprocess() error {
// Create audio server supervisor
audioSupervisor = audio.NewAudioServerSupervisor()
// Set up callbacks for process lifecycle events
audioSupervisor.SetCallbacks(
// onProcessStart
func(pid int) {
logger.Info().Int("pid", pid).Msg("audio server process started")
// Start audio relay system for main process without a track initially
// The track will be updated when a WebRTC session is created
if err := audio.StartAudioRelay(nil); err != nil {
logger.Error().Err(err).Msg("failed to start audio relay")
}
},
// onProcessExit
func(pid int, exitCode int, crashed bool) {
if crashed {
logger.Error().Int("pid", pid).Int("exit_code", exitCode).Msg("audio server process crashed")
} else {
logger.Info().Int("pid", pid).Msg("audio server process exited gracefully")
}
// Stop audio relay when process exits
audio.StopAudioRelay()
},
// onRestart
func(attempt int, delay time.Duration) {
logger.Warn().Int("attempt", attempt).Dur("delay", delay).Msg("restarting audio server process")
},
)
// Start the supervisor
if err := audioSupervisor.Start(); err != nil {
return fmt.Errorf("failed to start audio supervisor: %w", err)
}
// Monitor supervisor and handle cleanup
go func() {
defer close(audioProcessDone)
// Wait for supervisor to stop
for audioSupervisor.IsRunning() {
time.Sleep(100 * time.Millisecond)
}
logger.Info().Msg("audio supervisor stopped")
}()
return nil
}
func Main() { func Main() {
flag.Parse()
// If running as audio server, only initialize audio processing
if isAudioServer {
runAudioServer()
return
}
// If running as audio input server, only initialize audio input processing
if audio.IsAudioInputServerProcess() {
err := audio.RunAudioInputServer()
if err != nil {
logger.Error().Err(err).Msg("audio input server failed")
os.Exit(1)
}
return
}
LoadConfig() LoadConfig()
var cancel context.CancelFunc var cancel context.CancelFunc
@ -80,30 +200,10 @@ func Main() {
// initialize usb gadget // initialize usb gadget
initUsbGadget() initUsbGadget()
// Start non-blocking audio streaming and deliver Opus frames to WebRTC // Start audio subprocess
err = audio.StartNonBlockingAudioStreaming(func(frame []byte) { err = startAudioSubprocess()
// Deliver Opus frame to WebRTC audio track if session is active
if currentSession != nil {
config := audio.GetAudioConfig()
var sampleData []byte
if audio.IsAudioMuted() {
sampleData = make([]byte, len(frame)) // silence
} else {
sampleData = frame
}
if err := currentSession.AudioTrack.WriteSample(media.Sample{
Data: sampleData,
Duration: config.FrameSize,
}); err != nil {
logger.Warn().Err(err).Msg("error writing audio sample")
audio.RecordFrameDropped()
}
} else {
audio.RecordFrameDropped()
}
})
if err != nil { if err != nil {
logger.Warn().Err(err).Msg("failed to start non-blocking audio streaming") logger.Warn().Err(err).Msg("failed to start audio subprocess")
} }
// Initialize session provider for audio events // Initialize session provider for audio events
@ -163,8 +263,18 @@ func Main() {
<-sigs <-sigs
logger.Info().Msg("JetKVM Shutting Down") logger.Info().Msg("JetKVM Shutting Down")
// Stop non-blocking audio manager // Stop audio subprocess and wait for cleanup
audio.StopNonBlockingAudioStreaming() if !isAudioServer {
if audioSupervisor != nil {
logger.Info().Msg("stopping audio supervisor")
if err := audioSupervisor.Stop(); err != nil {
logger.Error().Err(err).Msg("failed to stop audio supervisor")
}
}
<-audioProcessDone
} else {
audio.StopNonBlockingAudioStreaming()
}
//if fuseServer != nil { //if fuseServer != nil {
// err := setMassStorageImage(" ") // err := setMassStorageImage(" ")
// if err != nil { // if err != nil {

2
resource/dev_test.sh Normal file → Executable file
View File

@ -1,4 +1,4 @@
#!/bin/sh #!/bin/bash
JSON_OUTPUT=false JSON_OUTPUT=false
GET_COMMANDS=false GET_COMMANDS=false
if [ "$1" = "-json" ]; then if [ "$1" = "-json" ]; then

0
tools/build_audio_deps.sh Normal file → Executable file
View File

0
tools/setup_rv1106_toolchain.sh Normal file → Executable file
View File

22
web.go
View File

@ -173,6 +173,8 @@ func setupRouter() *gin.Engine {
return return
} }
audio.SetAudioMuted(req.Muted) audio.SetAudioMuted(req.Muted)
// Also set relay mute state if in main process
audio.SetAudioRelayMuted(req.Muted)
// Broadcast audio mute state change via WebSocket // Broadcast audio mute state change via WebSocket
broadcaster := audio.GetAudioEventBroadcaster() broadcaster := audio.GetAudioEventBroadcaster()
@ -286,7 +288,7 @@ func setupRouter() *gin.Engine {
// Optimized server-side cooldown using atomic operations // Optimized server-side cooldown using atomic operations
opResult := audio.TryMicrophoneOperation() opResult := audio.TryMicrophoneOperation()
if !opResult.Allowed { if !opResult.Allowed {
running := currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() running := currentSession.AudioInputManager.IsRunning()
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"status": "cooldown", "status": "cooldown",
"running": running, "running": running,
@ -297,7 +299,7 @@ func setupRouter() *gin.Engine {
} }
// Check if already running before attempting to start // Check if already running before attempting to start
if currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() { if currentSession.AudioInputManager.IsRunning() {
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"status": "already running", "status": "already running",
"running": true, "running": true,
@ -312,7 +314,7 @@ func setupRouter() *gin.Engine {
// Check if it's already running after the failed start attempt // Check if it's already running after the failed start attempt
// This handles race conditions where another request started it // This handles race conditions where another request started it
if currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() { if currentSession.AudioInputManager.IsRunning() {
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"status": "started by concurrent request", "status": "started by concurrent request",
"running": true, "running": true,
@ -348,7 +350,7 @@ func setupRouter() *gin.Engine {
// Optimized server-side cooldown using atomic operations // Optimized server-side cooldown using atomic operations
opResult := audio.TryMicrophoneOperation() opResult := audio.TryMicrophoneOperation()
if !opResult.Allowed { if !opResult.Allowed {
running := currentSession.AudioInputManager.IsRunning() || audio.IsNonBlockingAudioInputRunning() running := currentSession.AudioInputManager.IsRunning()
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"status": "cooldown", "status": "cooldown",
"running": running, "running": running,
@ -359,7 +361,7 @@ func setupRouter() *gin.Engine {
} }
// Check if already stopped before attempting to stop // Check if already stopped before attempting to stop
if !currentSession.AudioInputManager.IsRunning() && !audio.IsNonBlockingAudioInputRunning() { if !currentSession.AudioInputManager.IsRunning() {
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"status": "already stopped", "status": "already stopped",
"running": false, "running": false,
@ -369,7 +371,7 @@ func setupRouter() *gin.Engine {
currentSession.AudioInputManager.Stop() currentSession.AudioInputManager.Stop()
// AudioInputManager.Stop() already coordinates a clean stop via StopNonBlockingAudioInput() // AudioInputManager.Stop() already coordinates a clean stop via IPC audio input system
// so we don't need to call it again here // so we don't need to call it again here
// Broadcast microphone state change via WebSocket // Broadcast microphone state change via WebSocket
@ -437,9 +439,8 @@ func setupRouter() *gin.Engine {
logger.Info().Msg("forcing microphone state reset") logger.Info().Msg("forcing microphone state reset")
// Force stop both the AudioInputManager and NonBlockingAudioManager // Force stop the AudioInputManager
currentSession.AudioInputManager.Stop() currentSession.AudioInputManager.Stop()
audio.StopNonBlockingAudioInput()
// Wait a bit to ensure everything is stopped // Wait a bit to ensure everything is stopped
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
@ -449,9 +450,8 @@ func setupRouter() *gin.Engine {
broadcaster.BroadcastMicrophoneStateChanged(false, true) broadcaster.BroadcastMicrophoneStateChanged(false, true)
c.JSON(200, gin.H{ c.JSON(200, gin.H{
"status": "reset", "status": "reset",
"audio_input_running": currentSession.AudioInputManager.IsRunning(), "audio_input_running": currentSession.AudioInputManager.IsRunning(),
"nonblocking_input_running": audio.IsNonBlockingAudioInputRunning(),
}) })
}) })

View File

@ -30,10 +30,15 @@ type Session struct {
AudioInputManager *audio.AudioInputManager AudioInputManager *audio.AudioInputManager
shouldUmountVirtualMedia bool shouldUmountVirtualMedia bool
// Microphone operation cooldown to mitigate rapid start/stop races // Microphone operation throttling
micOpMu sync.Mutex micOpMu sync.Mutex
lastMicOp time.Time lastMicOp time.Time
micCooldown time.Duration micCooldown time.Duration
// Audio frame processing
audioFrameChan chan []byte
audioStopChan chan struct{}
audioWg sync.WaitGroup
} }
type SessionConfig struct { type SessionConfig struct {
@ -118,8 +123,14 @@ func newSession(config SessionConfig) (*Session, error) {
session := &Session{ session := &Session{
peerConnection: peerConnection, peerConnection: peerConnection,
AudioInputManager: audio.NewAudioInputManager(), AudioInputManager: audio.NewAudioInputManager(),
micCooldown: 100 * time.Millisecond,
audioFrameChan: make(chan []byte, 1000),
audioStopChan: make(chan struct{}),
} }
// Start audio processing goroutine
session.startAudioProcessor(*logger)
peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { peerConnection.OnDataChannel(func(d *webrtc.DataChannel) {
scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel")
switch d.Label() { switch d.Label() {
@ -155,6 +166,11 @@ func newSession(config SessionConfig) (*Session, error) {
return nil, err return nil, err
} }
// Update the audio relay with the new WebRTC audio track
if err := audio.UpdateAudioRelayTrack(session.AudioTrack); err != nil {
scopedLogger.Warn().Err(err).Msg("Failed to update audio relay track")
}
videoRtpSender, err := peerConnection.AddTrack(session.VideoTrack) videoRtpSender, err := peerConnection.AddTrack(session.VideoTrack)
if err != nil { if err != nil {
return nil, err return nil, err
@ -190,10 +206,14 @@ func newSession(config SessionConfig) (*Session, error) {
// Extract Opus payload from RTP packet // Extract Opus payload from RTP packet
opusPayload := rtpPacket.Payload opusPayload := rtpPacket.Payload
if len(opusPayload) > 0 && session.AudioInputManager != nil { if len(opusPayload) > 0 {
err := session.AudioInputManager.WriteOpusFrame(opusPayload) // Send to buffered channel for processing
if err != nil { select {
scopedLogger.Warn().Err(err).Msg("Failed to write Opus frame to audio input manager") case session.audioFrameChan <- opusPayload:
// Frame sent successfully
default:
// Channel is full, drop the frame
scopedLogger.Warn().Msg("Audio frame channel full, dropping frame")
} }
} }
} }
@ -245,7 +265,8 @@ func newSession(config SessionConfig) (*Session, error) {
err := rpcUnmountImage() err := rpcUnmountImage()
scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close") scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close")
} }
// Stop audio input manager // Stop audio processing and input manager
session.stopAudioProcessor()
if session.AudioInputManager != nil { if session.AudioInputManager != nil {
session.AudioInputManager.Stop() session.AudioInputManager.Stop()
} }
@ -262,6 +283,36 @@ func newSession(config SessionConfig) (*Session, error) {
return session, nil return session, nil
} }
// startAudioProcessor starts the dedicated audio processing goroutine
func (s *Session) startAudioProcessor(logger zerolog.Logger) {
s.audioWg.Add(1)
go func() {
defer s.audioWg.Done()
logger.Debug().Msg("Audio processor goroutine started")
for {
select {
case frame := <-s.audioFrameChan:
if s.AudioInputManager != nil {
err := s.AudioInputManager.WriteOpusFrame(frame)
if err != nil {
logger.Warn().Err(err).Msg("Failed to write Opus frame to audio input manager")
}
}
case <-s.audioStopChan:
logger.Debug().Msg("Audio processor goroutine stopping")
return
}
}
}()
}
// stopAudioProcessor stops the audio processing goroutine
func (s *Session) stopAudioProcessor() {
close(s.audioStopChan)
s.audioWg.Wait()
}
func drainRtpSender(rtpSender *webrtc.RTPSender) { func drainRtpSender(rtpSender *webrtc.RTPSender) {
// Lock to OS thread to isolate RTCP processing // Lock to OS thread to isolate RTCP processing
runtime.LockOSThread() runtime.LockOSThread()