From 58fad71112c3ba4abb8b1452810578a13a7ef394 Mon Sep 17 00:00:00 2001 From: Alex P Date: Tue, 7 Oct 2025 08:49:49 +0300 Subject: [PATCH] [WIP] Updates: support in-process mode --- audio.go | 193 ++++++++++++++------ config.go | 2 + internal/audio/cgo_source.go | 216 +++++++++++++++++++++++ internal/audio/{ipc.go => ipc_source.go} | 20 +-- internal/audio/relay.go | 26 +-- internal/audio/source.go | 21 +++ jsonrpc.go | 14 ++ 7 files changed, 416 insertions(+), 76 deletions(-) create mode 100644 internal/audio/cgo_source.go rename internal/audio/{ipc.go => ipc_source.go} (90%) create mode 100644 internal/audio/source.go diff --git a/audio.go b/audio.go index b16aa29e..b8ae629a 100644 --- a/audio.go +++ b/audio.go @@ -1,6 +1,7 @@ package kvm import ( + "fmt" "io" "sync" "sync/atomic" @@ -20,8 +21,8 @@ var ( audioMutex sync.Mutex outputSupervisor *audio.Supervisor inputSupervisor *audio.Supervisor - outputClient *audio.IPCClient - inputClient *audio.IPCClient + outputSource audio.AudioSource + inputSource audio.AudioSource outputRelay *audio.OutputRelay inputRelay *audio.InputRelay audioInitialized bool @@ -66,61 +67,103 @@ func startAudioSubprocesses() error { return nil } - // Start output subprocess if not running and enabled - if outputSupervisor == nil && audioOutputEnabled.Load() { + // Start output audio if not running and enabled + if outputSource == nil && audioOutputEnabled.Load() { alsaDevice := "hw:0,0" // HDMI if useUSBForAudioOutput { alsaDevice = "hw:1,0" // USB } - outputSupervisor = audio.NewSupervisor( - "audio-output", - audio.GetAudioOutputBinaryPath(), - socketPathOutput, - []string{ - "ALSA_CAPTURE_DEVICE=" + alsaDevice, - "OPUS_BITRATE=128000", - "OPUS_COMPLEXITY=5", - }, - ) - - if err := outputSupervisor.Start(); err != nil { - audioLogger.Error().Err(err).Msg("Failed to start audio output supervisor") - outputSupervisor = nil - return err + ensureConfigLoaded() + audioMode := config.AudioMode + if audioMode == "" { + audioMode = "subprocess" // Default to subprocess } - outputClient = audio.NewIPCClient("audio-output", socketPathOutput, 0x4A4B4F55) + if audioMode == "in-process" { + // In-process CGO mode + outputSource = audio.NewCgoOutputSource(alsaDevice) + audioLogger.Debug(). + Str("mode", "in-process"). + Str("device", alsaDevice). + Msg("Audio output configured for in-process mode") + } else { + // Subprocess mode (default) + outputSupervisor = audio.NewSupervisor( + "audio-output", + audio.GetAudioOutputBinaryPath(), + socketPathOutput, + []string{ + "ALSA_CAPTURE_DEVICE=" + alsaDevice, + "OPUS_BITRATE=128000", + "OPUS_COMPLEXITY=5", + }, + ) + + if err := outputSupervisor.Start(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start audio output supervisor") + outputSupervisor = nil + return err + } + + outputSource = audio.NewIPCSource("audio-output", socketPathOutput, 0x4A4B4F55) + audioLogger.Debug(). + Str("mode", "subprocess"). + Str("device", alsaDevice). + Msg("Audio output configured for subprocess mode") + } if currentAudioTrack != nil { - outputRelay = audio.NewOutputRelay(outputClient, currentAudioTrack) + outputRelay = audio.NewOutputRelay(outputSource, currentAudioTrack) if err := outputRelay.Start(); err != nil { audioLogger.Error().Err(err).Msg("Failed to start audio output relay") } } } - // Start input subprocess if not running, USB audio enabled, and input enabled + // Start input audio if not running, USB audio enabled, and input enabled ensureConfigLoaded() - if inputSupervisor == nil && audioInputEnabled.Load() && config.UsbDevices != nil && config.UsbDevices.Audio { - inputSupervisor = audio.NewSupervisor( - "audio-input", - audio.GetAudioInputBinaryPath(), - socketPathInput, - []string{ - "ALSA_PLAYBACK_DEVICE=hw:1,0", - "OPUS_BITRATE=128000", - }, - ) + if inputSource == nil && audioInputEnabled.Load() && config.UsbDevices != nil && config.UsbDevices.Audio { + alsaPlaybackDevice := "hw:1,0" // USB speakers - if err := inputSupervisor.Start(); err != nil { - audioLogger.Error().Err(err).Msg("Failed to start input supervisor") - inputSupervisor = nil - return err + audioMode := config.AudioMode + if audioMode == "" { + audioMode = "subprocess" // Default to subprocess } - inputClient = audio.NewIPCClient("audio-input", socketPathInput, 0x4A4B4D49) - inputRelay = audio.NewInputRelay(inputClient) + if audioMode == "in-process" { + // In-process CGO mode + inputSource = audio.NewCgoInputSource(alsaPlaybackDevice) + audioLogger.Debug(). + Str("mode", "in-process"). + Str("device", alsaPlaybackDevice). + Msg("Audio input configured for in-process mode") + } else { + // Subprocess mode (default) + inputSupervisor = audio.NewSupervisor( + "audio-input", + audio.GetAudioInputBinaryPath(), + socketPathInput, + []string{ + "ALSA_PLAYBACK_DEVICE=hw:1,0", + "OPUS_BITRATE=128000", + }, + ) + + if err := inputSupervisor.Start(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start input supervisor") + inputSupervisor = nil + return err + } + + inputSource = audio.NewIPCSource("audio-input", socketPathInput, 0x4A4B4D49) + audioLogger.Debug(). + Str("mode", "subprocess"). + Str("device", alsaPlaybackDevice). + Msg("Audio input configured for subprocess mode") + } + + inputRelay = audio.NewInputRelay(inputSource) if err := inputRelay.Start(); err != nil { audioLogger.Error().Err(err).Msg("Failed to start input relay") } @@ -135,9 +178,9 @@ func stopOutputSubprocessLocked() { outputRelay.Stop() outputRelay = nil } - if outputClient != nil { - outputClient.Disconnect() - outputClient = nil + if outputSource != nil { + outputSource.Disconnect() + outputSource = nil } if outputSupervisor != nil { outputSupervisor.Stop() @@ -151,9 +194,9 @@ func stopInputSubprocessLocked() { inputRelay.Stop() inputRelay = nil } - if inputClient != nil { - inputClient.Disconnect() - inputClient = nil + if inputSource != nil { + inputSource.Disconnect() + inputSource = nil } if inputSupervisor != nil { inputSupervisor.Stop() @@ -202,8 +245,8 @@ func setAudioTrack(audioTrack *webrtc.TrackLocalStaticSample) { outputRelay = nil } - if outputClient != nil { - outputRelay = audio.NewOutputRelay(outputClient, audioTrack) + if outputSource != nil { + outputRelay = audio.NewOutputRelay(outputSource, audioTrack) if err := outputRelay.Start(); err != nil { audioLogger.Error().Err(err).Msg("Failed to start output relay") } @@ -248,6 +291,50 @@ func SetAudioOutputSource(useUSB bool) error { return nil } +// SetAudioMode switches between subprocess and in-process audio modes +func SetAudioMode(mode string) error { + if mode != "subprocess" && mode != "in-process" { + return fmt.Errorf("invalid audio mode: %s (must be 'subprocess' or 'in-process')", mode) + } + + audioMutex.Lock() + defer audioMutex.Unlock() + + ensureConfigLoaded() + if config.AudioMode == mode { + return nil // Already in desired mode + } + + audioLogger.Info(). + Str("old_mode", config.AudioMode). + Str("new_mode", mode). + Msg("Switching audio mode") + + // Save new mode to config + config.AudioMode = mode + if err := SaveConfig(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to save config") + return err + } + + // Stop all audio (both output and input) + stopAudioSubprocessesLocked() + + // Restart if there are active connections + if activeConnections.Load() > 0 { + audioMutex.Unlock() + err := startAudioSubprocesses() + audioMutex.Lock() + if err != nil { + audioLogger.Error().Err(err).Msg("Failed to restart audio with new mode") + return err + } + } + + audioLogger.Info().Str("mode", mode).Msg("Audio mode switch completed") + return nil +} + func setPendingInputTrack(track *webrtc.TrackRemote) { audioMutex.Lock() defer audioMutex.Unlock() @@ -329,23 +416,23 @@ func handleInputTrackForSession(track *webrtc.TrackRemote) { continue // Drop frame but keep reading } - // Get client in single mutex operation (hot path optimization) + // Get source in single mutex operation (hot path optimization) audioMutex.Lock() - client := inputClient + source := inputSource audioMutex.Unlock() - if client == nil { + if source == nil { continue // No relay, drop frame but keep reading } - if !client.IsConnected() { - if err := client.Connect(); err != nil { + if !source.IsConnected() { + if err := source.Connect(); err != nil { continue } } - if err := client.WriteMessage(0, opusData); err != nil { - client.Disconnect() + if err := source.WriteMessage(0, opusData); err != nil { + source.Disconnect() } } } diff --git a/config.go b/config.go index b6273836..1f1d42f4 100644 --- a/config.go +++ b/config.go @@ -105,6 +105,7 @@ type Config struct { NetworkConfig *network.NetworkConfig `json:"network_config"` DefaultLogLevel string `json:"default_log_level"` AudioOutputSource string `json:"audio_output_source"` // "hdmi" or "usb" + AudioMode string `json:"audio_mode"` // "subprocess" or "in-process" } func (c *Config) GetDisplayRotation() uint16 { @@ -165,6 +166,7 @@ var defaultConfig = &Config{ NetworkConfig: &network.NetworkConfig{}, DefaultLogLevel: "INFO", AudioOutputSource: "usb", + AudioMode: "subprocess", // Default to subprocess mode for stability } var ( diff --git a/internal/audio/cgo_source.go b/internal/audio/cgo_source.go new file mode 100644 index 00000000..72ebd8ea --- /dev/null +++ b/internal/audio/cgo_source.go @@ -0,0 +1,216 @@ +//go:build linux && (arm || arm64) + +package audio + +/* +#cgo CFLAGS: -O3 -ffast-math +#cgo LDFLAGS: -lasound -lopus -lm + +#include "c/audio.c" +*/ +import "C" +import ( + "fmt" + "sync" + "unsafe" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// CgoSource implements AudioSource via direct CGO calls to C audio functions (in-process) +type CgoSource struct { + direction string // "output" or "input" + alsaDevice string + initialized bool + connected bool + mu sync.Mutex + logger zerolog.Logger + opusBuf []byte // Reusable buffer for Opus packets +} + +// NewCgoOutputSource creates a new CGO audio source for output (HDMI/USB → browser) +func NewCgoOutputSource(alsaDevice string) *CgoSource { + logger := logging.GetDefaultLogger().With().Str("component", "audio-output-cgo").Logger() + + return &CgoSource{ + direction: "output", + alsaDevice: alsaDevice, + logger: logger, + opusBuf: make([]byte, ipcMaxFrameSize), + } +} + +// NewCgoInputSource creates a new CGO audio source for input (browser → USB speakers) +func NewCgoInputSource(alsaDevice string) *CgoSource { + logger := logging.GetDefaultLogger().With().Str("component", "audio-input-cgo").Logger() + + return &CgoSource{ + direction: "input", + alsaDevice: alsaDevice, + logger: logger, + opusBuf: make([]byte, ipcMaxFrameSize), + } +} + +// Connect initializes the C audio subsystem +func (c *CgoSource) Connect() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.connected { + return nil + } + + // Set ALSA device via environment for C code + if c.direction == "output" { + // Set capture device for output path + cDevice := C.CString(c.alsaDevice) + defer C.free(unsafe.Pointer(cDevice)) + C.alsa_capture_device = cDevice + + // Initialize constants + C.update_audio_constants( + C.uint(128000), // bitrate + C.uchar(5), // complexity + C.uint(48000), // sample_rate + C.uchar(2), // channels + C.ushort(960), // frame_size + C.ushort(1500), // max_packet_size + C.uint(1000), // sleep_us + C.uchar(5), // max_attempts + C.uint(500000), // max_backoff_us + ) + + // Initialize capture (HDMI/USB → browser) + rc := C.jetkvm_audio_capture_init() + if rc != 0 { + c.logger.Error().Int("rc", int(rc)).Msg("Failed to initialize audio capture") + return fmt.Errorf("jetkvm_audio_capture_init failed: %d", rc) + } + + c.logger.Debug().Str("device", c.alsaDevice).Msg("Audio capture initialized") + } else { + // Set playback device for input path + cDevice := C.CString(c.alsaDevice) + defer C.free(unsafe.Pointer(cDevice)) + C.alsa_playback_device = cDevice + + // Initialize decoder constants + C.update_audio_decoder_constants( + C.uint(48000), // sample_rate + C.uchar(2), // channels + C.ushort(960), // frame_size + C.ushort(1500), // max_packet_size + C.uint(1000), // sleep_us + C.uchar(5), // max_attempts + C.uint(500000), // max_backoff_us + ) + + // Initialize playback (browser → USB speakers) + rc := C.jetkvm_audio_playback_init() + if rc != 0 { + c.logger.Error().Int("rc", int(rc)).Msg("Failed to initialize audio playback") + return fmt.Errorf("jetkvm_audio_playback_init failed: %d", rc) + } + + c.logger.Debug().Str("device", c.alsaDevice).Msg("Audio playback initialized") + } + + c.connected = true + c.initialized = true + return nil +} + +// Disconnect closes the C audio subsystem +func (c *CgoSource) Disconnect() { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.connected { + return + } + + if c.direction == "output" { + C.jetkvm_audio_capture_close() + c.logger.Debug().Msg("Audio capture closed") + } else { + C.jetkvm_audio_playback_close() + c.logger.Debug().Msg("Audio playback closed") + } + + c.connected = false +} + +// IsConnected returns true if currently connected +func (c *CgoSource) IsConnected() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.connected +} + +// ReadMessage reads the next audio frame from C audio subsystem +// For output path: reads HDMI/USB audio and encodes to Opus +// For input path: not used (input uses WriteMessage instead) +// Returns message type (0 = Opus), payload data, and error +func (c *CgoSource) ReadMessage() (uint8, []byte, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.connected { + return 0, nil, fmt.Errorf("not connected") + } + + if c.direction != "output" { + return 0, nil, fmt.Errorf("ReadMessage only supported for output direction") + } + + // Call C function to read HDMI/USB audio and encode to Opus + // Returns Opus packet size (>0) or error (<0) + opusSize := C.jetkvm_audio_read_encode(unsafe.Pointer(&c.opusBuf[0])) + + if opusSize < 0 { + return 0, nil, fmt.Errorf("jetkvm_audio_read_encode failed: %d", opusSize) + } + + if opusSize == 0 { + // No data available (silence/DTX) + return 0, nil, nil + } + + // Return slice of opusBuf - caller must use immediately + return ipcMsgTypeOpus, c.opusBuf[:opusSize], nil +} + +// WriteMessage writes an Opus packet to the C audio subsystem for playback +// Only used for input path (browser → USB speakers) +func (c *CgoSource) WriteMessage(msgType uint8, payload []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + + if !c.connected { + return fmt.Errorf("not connected") + } + + if c.direction != "input" { + return fmt.Errorf("WriteMessage only supported for input direction") + } + + if msgType != ipcMsgTypeOpus { + // Ignore non-Opus messages + return nil + } + + if len(payload) == 0 { + return nil + } + + // Call C function to decode Opus and write to USB speakers + rc := C.jetkvm_audio_decode_write(unsafe.Pointer(&payload[0]), C.int(len(payload))) + + if rc < 0 { + return fmt.Errorf("jetkvm_audio_decode_write failed: %d", rc) + } + + return nil +} diff --git a/internal/audio/ipc.go b/internal/audio/ipc_source.go similarity index 90% rename from internal/audio/ipc.go rename to internal/audio/ipc_source.go index e9b50bb0..7ae82c39 100644 --- a/internal/audio/ipc.go +++ b/internal/audio/ipc_source.go @@ -33,8 +33,8 @@ const ( readTimeout = 2 * time.Second ) -// IPCClient manages Unix socket communication with audio subprocess -type IPCClient struct { +// IPCSource implements AudioSource via Unix socket communication with audio subprocess +type IPCSource struct { socketPath string magicNumber uint32 conn net.Conn @@ -43,13 +43,13 @@ type IPCClient struct { readBuf []byte // Reusable buffer for reads (single reader per client) } -// NewIPCClient creates a new IPC client +// NewIPCSource creates a new IPC audio source // For output: socketPath="/var/run/audio_output.sock", magic=ipcMagicOutput // For input: socketPath="/var/run/audio_input.sock", magic=ipcMagicInput -func NewIPCClient(name, socketPath string, magicNumber uint32) *IPCClient { +func NewIPCSource(name, socketPath string, magicNumber uint32) *IPCSource { logger := logging.GetDefaultLogger().With().Str("component", name+"-ipc").Logger() - return &IPCClient{ + return &IPCSource{ socketPath: socketPath, magicNumber: magicNumber, logger: logger, @@ -58,7 +58,7 @@ func NewIPCClient(name, socketPath string, magicNumber uint32) *IPCClient { } // Connect establishes connection to the subprocess -func (c *IPCClient) Connect() error { +func (c *IPCSource) Connect() error { c.mu.Lock() defer c.mu.Unlock() @@ -78,7 +78,7 @@ func (c *IPCClient) Connect() error { } // Disconnect closes the connection -func (c *IPCClient) Disconnect() { +func (c *IPCSource) Disconnect() { c.mu.Lock() defer c.mu.Unlock() @@ -90,7 +90,7 @@ func (c *IPCClient) Disconnect() { } // IsConnected returns true if currently connected -func (c *IPCClient) IsConnected() bool { +func (c *IPCSource) IsConnected() bool { c.mu.Lock() defer c.mu.Unlock() return c.conn != nil @@ -100,7 +100,7 @@ func (c *IPCClient) IsConnected() bool { // Returns message type, payload data, and error // IMPORTANT: The returned payload slice is only valid until the next ReadMessage call. // Callers must use the data immediately or copy if retention is needed. -func (c *IPCClient) ReadMessage() (uint8, []byte, error) { +func (c *IPCSource) ReadMessage() (uint8, []byte, error) { c.mu.Lock() defer c.mu.Unlock() @@ -150,7 +150,7 @@ func (c *IPCClient) ReadMessage() (uint8, []byte, error) { } // WriteMessage writes a complete IPC message -func (c *IPCClient) WriteMessage(msgType uint8, payload []byte) error { +func (c *IPCSource) WriteMessage(msgType uint8, payload []byte) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/internal/audio/relay.go b/internal/audio/relay.go index bda0db18..c8e3274d 100644 --- a/internal/audio/relay.go +++ b/internal/audio/relay.go @@ -12,9 +12,9 @@ import ( "github.com/rs/zerolog" ) -// OutputRelay forwards audio from subprocess (HDMI) to WebRTC (browser) +// OutputRelay forwards audio from any AudioSource (CGO or IPC) to WebRTC (browser) type OutputRelay struct { - client *IPCClient + source AudioSource audioTrack *webrtc.TrackLocalStaticSample ctx context.Context cancel context.CancelFunc @@ -28,12 +28,12 @@ type OutputRelay struct { } // NewOutputRelay creates a relay for output audio (device → browser) -func NewOutputRelay(client *IPCClient, audioTrack *webrtc.TrackLocalStaticSample) *OutputRelay { +func NewOutputRelay(source AudioSource, audioTrack *webrtc.TrackLocalStaticSample) *OutputRelay { ctx, cancel := context.WithCancel(context.Background()) logger := logging.GetDefaultLogger().With().Str("component", "audio-output-relay").Logger() return &OutputRelay{ - client: client, + source: source, audioTrack: audioTrack, ctx: ctx, cancel: cancel, @@ -68,27 +68,27 @@ func (r *OutputRelay) Stop() { Msg("output relay stopped") } -// relayLoop continuously reads from IPC and writes to WebRTC +// relayLoop continuously reads from audio source and writes to WebRTC func (r *OutputRelay) relayLoop() { const reconnectDelay = 1 * time.Second for r.running.Load() { // Ensure connected - if !r.client.IsConnected() { - if err := r.client.Connect(); err != nil { + if !r.source.IsConnected() { + if err := r.source.Connect(); err != nil { r.logger.Debug().Err(err).Msg("failed to connect, will retry") time.Sleep(reconnectDelay) continue } } - // Read message from subprocess - msgType, payload, err := r.client.ReadMessage() + // Read message from audio source + msgType, payload, err := r.source.ReadMessage() if err != nil { // Connection error - reconnect if r.running.Load() { r.logger.Warn().Err(err).Msg("read error, reconnecting") - r.client.Disconnect() + r.source.Disconnect() time.Sleep(reconnectDelay) } continue @@ -111,7 +111,7 @@ func (r *OutputRelay) relayLoop() { // InputRelay forwards audio from WebRTC (browser microphone) to subprocess (USB audio) type InputRelay struct { - client *IPCClient + source AudioSource ctx context.Context cancel context.CancelFunc logger zerolog.Logger @@ -119,12 +119,12 @@ type InputRelay struct { } // NewInputRelay creates a relay for input audio (browser → device) -func NewInputRelay(client *IPCClient) *InputRelay { +func NewInputRelay(source AudioSource) *InputRelay { ctx, cancel := context.WithCancel(context.Background()) logger := logging.GetDefaultLogger().With().Str("component", "audio-input-relay").Logger() return &InputRelay{ - client: client, + source: source, ctx: ctx, cancel: cancel, logger: logger, diff --git a/internal/audio/source.go b/internal/audio/source.go new file mode 100644 index 00000000..8212314e --- /dev/null +++ b/internal/audio/source.go @@ -0,0 +1,21 @@ +package audio + +// AudioSource provides audio frames from either CGO (in-process) or IPC (subprocess) +// This interface allows the relay goroutine to work with both modes transparently +type AudioSource interface { + // ReadMessage reads the next audio message + // Returns message type, payload data, and error + // Blocks until data is available or error occurs + ReadMessage() (msgType uint8, payload []byte, err error) + + // IsConnected returns true if the source is connected and ready + IsConnected() bool + + // Connect establishes connection to the audio source + // For CGO: initializes C audio subsystem + // For IPC: connects to Unix socket + Connect() error + + // Disconnect closes the connection and releases resources + Disconnect() +} diff --git a/jsonrpc.go b/jsonrpc.go index 7c80cbb7..6d129105 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -1021,6 +1021,18 @@ func rpcSetAudioInputEnabled(enabled bool) error { return SetAudioInputEnabled(enabled) } +func rpcGetAudioMode() (string, error) { + ensureConfigLoaded() + if config.AudioMode == "" { + return "subprocess", nil // Default + } + return config.AudioMode, nil +} + +func rpcSetAudioMode(mode string) error { + return SetAudioMode(mode) +} + func rpcSetCloudUrl(apiUrl string, appUrl string) error { currentCloudURL := config.CloudURL config.CloudURL = apiUrl @@ -1343,6 +1355,8 @@ var rpcHandlers = map[string]RPCHandler{ "setAudioOutputEnabled": {Func: rpcSetAudioOutputEnabled, Params: []string{"enabled"}}, "getAudioInputEnabled": {Func: rpcGetAudioInputEnabled}, "setAudioInputEnabled": {Func: rpcSetAudioInputEnabled, Params: []string{"enabled"}}, + "getAudioMode": {Func: rpcGetAudioMode}, + "setAudioMode": {Func: rpcSetAudioMode, Params: []string{"mode"}}, "setCloudUrl": {Func: rpcSetCloudUrl, Params: []string{"apiUrl", "appUrl"}}, "getKeyboardLayout": {Func: rpcGetKeyboardLayout}, "setKeyboardLayout": {Func: rpcSetKeyboardLayout, Params: []string{"layout"}},