diff --git a/.devcontainer/install-deps.sh b/.devcontainer/install-deps.sh index 4435d25b..94106cc9 100755 --- a/.devcontainer/install-deps.sh +++ b/.devcontainer/install-deps.sh @@ -5,7 +5,7 @@ function sudo() { if [ "$UID" -eq 0 ]; then "$@" else - ${SUDO_PATH} "$@" + ${SUDO_PATH} -E "$@" fi } @@ -16,7 +16,7 @@ sudo apt-get update && \ sudo apt-get install -y --no-install-recommends \ build-essential \ device-tree-compiler \ - gperf g++-multilib gcc-multilib \ + gperf \ libnl-3-dev libdbus-1-dev libelf-dev libmpc-dev dwarves \ bc openssl flex bison libssl-dev python3 python-is-python3 texinfo kmod cmake \ wget zstd \ @@ -30,6 +30,34 @@ pushd "${BUILDKIT_TMPDIR}" > /dev/null wget https://github.com/jetkvm/rv1106-system/releases/download/${BUILDKIT_VERSION}/buildkit.tar.zst && \ sudo mkdir -p /opt/jetkvm-native-buildkit && \ - sudo tar --use-compress-program="unzstd --long=31" -xvf buildkit.tar.zst -C /opt/jetkvm-native-buildkit && \ + sudo tar --use-compress-program="zstd -d --long=31" -xvf buildkit.tar.zst -C /opt/jetkvm-native-buildkit && \ rm buildkit.tar.zst -popd \ No newline at end of file +popd + +# Install audio dependencies (ALSA and Opus) for JetKVM +echo "Installing JetKVM audio dependencies..." +SCRIPT_DIR="$(dirname "$(readlink -f "$0")")" +PROJECT_ROOT="$(dirname "${SCRIPT_DIR}")" +AUDIO_DEPS_SCRIPT="${PROJECT_ROOT}/install_audio_deps.sh" + +if [ -f "${AUDIO_DEPS_SCRIPT}" ]; then + echo "Running audio dependencies installation..." + # Pre-create audio libs directory with proper permissions + sudo mkdir -p /opt/jetkvm-audio-libs + sudo chmod 777 /opt/jetkvm-audio-libs + # Run installation script (now it can write without sudo) + bash "${AUDIO_DEPS_SCRIPT}" + echo "Audio dependencies installation completed." + if [ -d "/opt/jetkvm-audio-libs" ]; then + echo "Audio libraries installed in /opt/jetkvm-audio-libs" + # Set recursive permissions for all subdirectories and files + sudo chmod -R 777 /opt/jetkvm-audio-libs + echo "Permissions set to allow all users access to audio libraries" + else + echo "Error: /opt/jetkvm-audio-libs directory not found after installation." + exit 1 + fi +else + echo "Warning: Audio dependencies script not found at ${AUDIO_DEPS_SCRIPT}" + echo "Skipping audio dependencies installation." +fi diff --git a/.devcontainer/install_audio_deps.sh b/.devcontainer/install_audio_deps.sh new file mode 100755 index 00000000..8d369db4 --- /dev/null +++ b/.devcontainer/install_audio_deps.sh @@ -0,0 +1,74 @@ +#!/bin/bash +# .devcontainer/install_audio_deps.sh +# Build ALSA and Opus static libs for ARM in /opt/jetkvm-audio-libs +set -e + +# Sudo wrapper function +SUDO_PATH=$(which sudo 2>/dev/null || echo "") +function use_sudo() { + if [ "$UID" -eq 0 ]; then + "$@" + elif [ -n "$SUDO_PATH" ]; then + ${SUDO_PATH} -E "$@" + else + "$@" + fi +} + +# Accept version parameters or use defaults +ALSA_VERSION="${1:-1.2.14}" +OPUS_VERSION="${2:-1.5.2}" + +AUDIO_LIBS_DIR="/opt/jetkvm-audio-libs" +BUILDKIT_PATH="/opt/jetkvm-native-buildkit" +BUILDKIT_FLAVOR="arm-rockchip830-linux-uclibcgnueabihf" +CROSS_PREFIX="$BUILDKIT_PATH/bin/$BUILDKIT_FLAVOR" + +# Create directory with proper permissions +use_sudo mkdir -p "$AUDIO_LIBS_DIR" +use_sudo chmod 777 "$AUDIO_LIBS_DIR" +cd "$AUDIO_LIBS_DIR" + +# Download sources +[ -f alsa-lib-${ALSA_VERSION}.tar.bz2 ] || wget -N https://www.alsa-project.org/files/pub/lib/alsa-lib-${ALSA_VERSION}.tar.bz2 +[ -f opus-${OPUS_VERSION}.tar.gz ] || wget -N https://downloads.xiph.org/releases/opus/opus-${OPUS_VERSION}.tar.gz + +# Extract +[ -d alsa-lib-${ALSA_VERSION} ] || tar xf alsa-lib-${ALSA_VERSION}.tar.bz2 +[ -d opus-${OPUS_VERSION} ] || tar xf opus-${OPUS_VERSION}.tar.gz + +# Optimization flags for ARM Cortex-A7 with NEON (simplified to avoid FD_SETSIZE issues) +OPTIM_CFLAGS="-O2 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard" + +export CC="${CROSS_PREFIX}-gcc" +export CFLAGS="$OPTIM_CFLAGS" +export CXXFLAGS="$OPTIM_CFLAGS" + +# Build ALSA +cd alsa-lib-${ALSA_VERSION} +if [ ! -f .built ]; then + chown -R $(whoami):$(whoami) . + # Use minimal ALSA configuration to avoid FD_SETSIZE issues in devcontainer + CFLAGS="$OPTIM_CFLAGS" ./configure --host $BUILDKIT_FLAVOR \ + --enable-static=yes --enable-shared=no \ + --with-pcm-plugins=rate,linear \ + --disable-seq --disable-rawmidi --disable-ucm \ + --disable-python --disable-old-symbols \ + --disable-topology --disable-hwdep --disable-mixer \ + --disable-alisp --disable-aload --disable-resmgr + make -j$(nproc) + touch .built +fi +cd .. + +# Build Opus +cd opus-${OPUS_VERSION} +if [ ! -f .built ]; then + chown -R $(whoami):$(whoami) . + CFLAGS="$OPTIM_CFLAGS" ./configure --host $BUILDKIT_FLAVOR --enable-static=yes --enable-shared=no --enable-fixed-point + make -j$(nproc) + touch .built +fi +cd .. + +echo "ALSA and Opus built in $AUDIO_LIBS_DIR" diff --git a/.gitignore b/.gitignore index 99b7ce95..3d721389 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,5 @@ node_modules # generated during the build process #internal/native/include -#internal/native/lib \ No newline at end of file +#internal/native/lib +internal/audio/bin/ diff --git a/Dockerfile.build b/Dockerfile.build index db433b2d..b588da1a 100644 --- a/Dockerfile.build +++ b/Dockerfile.build @@ -6,6 +6,8 @@ ENV GOPATH=/go ENV PATH=$GOPATH/bin:/usr/local/go/bin:$PATH COPY install-deps.sh /install-deps.sh +COPY install_audio_deps.sh /install_audio_deps.sh + RUN /install-deps.sh # Create build directory @@ -21,4 +23,4 @@ RUN go mod download && go mod verify COPY entrypoint.sh /entrypoint.sh RUN chmod +x /entrypoint.sh -ENTRYPOINT [ "/entrypoint.sh" ] \ No newline at end of file +ENTRYPOINT [ "/entrypoint.sh" ] diff --git a/Makefile b/Makefile index c3554879..4f4c9893 100644 --- a/Makefile +++ b/Makefile @@ -1,16 +1,59 @@ -BRANCH := $(shell git rev-parse --abbrev-ref HEAD) -BUILDDATE := $(shell date -u +%FT%T%z) -BUILDTS := $(shell date -u +%s) -REVISION := $(shell git rev-parse HEAD) +# Build ALSA and Opus static libs for ARM in /opt/jetkvm-audio-libs +build_audio_deps: + bash .devcontainer/install_audio_deps.sh $(ALSA_VERSION) $(OPUS_VERSION) + +# Prepare everything needed for local development (toolchain + audio deps + Go tools) +dev_env: build_audio_deps + $(CLEAN_GO_CACHE) + @echo "Installing Go development tools..." + go install golang.org/x/tools/cmd/goimports@latest + @echo "Development environment ready." +JETKVM_HOME ?= $(HOME)/.jetkvm +BUILDKIT_PATH ?= /opt/jetkvm-native-buildkit +BUILDKIT_FLAVOR ?= arm-rockchip830-linux-uclibcgnueabihf +AUDIO_LIBS_DIR ?= /opt/jetkvm-audio-libs + +BRANCH ?= $(shell git rev-parse --abbrev-ref HEAD) +BUILDDATE ?= $(shell date -u +%FT%T%z) +BUILDTS ?= $(shell date -u +%s) +REVISION ?= $(shell git rev-parse HEAD) VERSION_DEV := 0.4.9-dev$(shell date +%Y%m%d%H%M) VERSION := 0.4.8 + +# Audio library versions +ALSA_VERSION ?= 1.2.14 +OPUS_VERSION ?= 1.5.2 + +# Set PKG_CONFIG_PATH globally for all targets that use CGO with audio libraries +export PKG_CONFIG_PATH := $(AUDIO_LIBS_DIR)/alsa-lib-$(ALSA_VERSION)/utils:$(AUDIO_LIBS_DIR)/opus-$(OPUS_VERSION) + +# Common command to clean Go cache with verbose output for all Go builds +CLEAN_GO_CACHE := @echo "Cleaning Go cache..."; go clean -cache -v + +# Optimization flags for ARM Cortex-A7 with NEON SIMD +OPTIM_CFLAGS := -O3 -mfpu=neon -mtune=cortex-a7 -mfloat-abi=hard -ftree-vectorize -ffast-math -funroll-loops -mvectorize-with-neon-quad -marm -D__ARM_NEON + +# Cross-compilation environment for ARM - exported globally +export GOOS := linux +export GOARCH := arm +export GOARM := 7 +export CC := $(BUILDKIT_PATH)/bin/$(BUILDKIT_FLAVOR)-gcc +export CGO_ENABLED := 1 +export CGO_CFLAGS := $(OPTIM_CFLAGS) -I$(BUILDKIT_PATH)/$(BUILDKIT_FLAVOR)/include -I$(BUILDKIT_PATH)/$(BUILDKIT_FLAVOR)/sysroot/usr/include +export CGO_LDFLAGS := -L$(BUILDKIT_PATH)/$(BUILDKIT_FLAVOR)/lib -L$(BUILDKIT_PATH)/$(BUILDKIT_FLAVOR)/sysroot/usr/lib -lrockit -lrockchip_mpp -lrga -lpthread -lm -ldl + +# Audio-specific flags (only used for audio C binaries, NOT for main Go app) +AUDIO_CFLAGS := $(CGO_CFLAGS) -I$(AUDIO_LIBS_DIR)/alsa-lib-$(ALSA_VERSION)/include -I$(AUDIO_LIBS_DIR)/opus-$(OPUS_VERSION)/include -I$(AUDIO_LIBS_DIR)/opus-$(OPUS_VERSION)/celt +AUDIO_LDFLAGS := $(AUDIO_LIBS_DIR)/alsa-lib-$(ALSA_VERSION)/src/.libs/libasound.a $(AUDIO_LIBS_DIR)/opus-$(OPUS_VERSION)/.libs/libopus.a -lm -ldl -lpthread + PROMETHEUS_TAG := github.com/prometheus/common/version KVM_PKG_NAME := github.com/jetkvm/kvm BUILDKIT_FLAVOR := arm-rockchip830-linux-uclibcgnueabihf BUILDKIT_PATH ?= /opt/jetkvm-native-buildkit SKIP_NATIVE_IF_EXISTS ?= 0 +SKIP_AUDIO_BINARIES_IF_EXISTS ?= 0 SKIP_UI_BUILD ?= 0 GO_BUILD_ARGS := -tags netgo,timetzdata,nomsgpack GO_RELEASE_BUILD_ARGS := -trimpath $(GO_BUILD_ARGS) @@ -49,22 +92,67 @@ build_native: ./scripts/build_cgo.sh; \ fi -build_dev: build_native +# Build audio output C binary (ALSA capture → Opus encode → IPC) +build_audio_output: build_audio_deps + @if [ "$(SKIP_AUDIO_BINARIES_IF_EXISTS)" = "1" ] && [ -f "$(BIN_DIR)/jetkvm_audio_output" ]; then \ + echo "jetkvm_audio_output already exists, skipping build..."; \ + else \ + echo "Building audio output binary (100% static)..."; \ + mkdir -p $(BIN_DIR); \ + $(CC) $(AUDIO_CFLAGS) -static \ + -o $(BIN_DIR)/jetkvm_audio_output \ + internal/audio/c/jetkvm_audio_output.c \ + internal/audio/c/ipc_protocol.c \ + internal/audio/c/audio_common.c \ + internal/audio/c/audio.c \ + $(AUDIO_LDFLAGS); \ + fi + +# Build audio input C binary (IPC → Opus decode → ALSA playback) +build_audio_input: build_audio_deps + @if [ "$(SKIP_AUDIO_BINARIES_IF_EXISTS)" = "1" ] && [ -f "$(BIN_DIR)/jetkvm_audio_input" ]; then \ + echo "jetkvm_audio_input already exists, skipping build..."; \ + else \ + echo "Building audio input binary (100% static)..."; \ + mkdir -p $(BIN_DIR); \ + $(CC) $(AUDIO_CFLAGS) -static \ + -o $(BIN_DIR)/jetkvm_audio_input \ + internal/audio/c/jetkvm_audio_input.c \ + internal/audio/c/ipc_protocol.c \ + internal/audio/c/audio_common.c \ + internal/audio/c/audio.c \ + $(AUDIO_LDFLAGS); \ + fi + +# Build both audio binaries and copy to embed location +build_audio_binaries: build_audio_output build_audio_input + @echo "Audio binaries built successfully" + @echo "Copying binaries to embed location..." + @mkdir -p internal/audio/bin + @cp $(BIN_DIR)/jetkvm_audio_output internal/audio/bin/ + @cp $(BIN_DIR)/jetkvm_audio_input internal/audio/bin/ + @echo "Binaries ready for embedding" + +build_dev: build_native build_audio_deps build_audio_binaries + $(CLEAN_GO_CACHE) @echo "Building..." - $(GO_CMD) build \ + go build \ -ldflags="$(GO_LDFLAGS) -X $(KVM_PKG_NAME).builtAppVersion=$(VERSION_DEV)" \ $(GO_RELEASE_BUILD_ARGS) \ -o $(BIN_DIR)/jetkvm_app -v cmd/main.go build_test2json: + $(CLEAN_GO_CACHE) $(GO_CMD) build -o $(BIN_DIR)/test2json cmd/test2json build_gotestsum: + $(CLEAN_GO_CACHE) @echo "Building gotestsum..." $(GO_CMD) install gotest.tools/gotestsum@latest cp $(shell $(GO_CMD) env GOPATH)/bin/linux_arm/gotestsum $(BIN_DIR)/gotestsum -build_dev_test: build_test2json build_gotestsum +build_dev_test: build_audio_deps build_test2json build_gotestsum + $(CLEAN_GO_CACHE) # collect all directories that contain tests @echo "Building tests for devices ..." @rm -rf $(BIN_DIR)/tests && mkdir -p $(BIN_DIR)/tests @@ -74,7 +162,7 @@ build_dev_test: build_test2json build_gotestsum test_pkg_name=$$(echo $$test | sed 's/^.\///g'); \ test_pkg_full_name=$(KVM_PKG_NAME)/$$(echo $$test | sed 's/^.\///g'); \ test_filename=$$(echo $$test_pkg_name | sed 's/\//__/g')_test; \ - $(GO_CMD) test -v \ + go test -v \ -ldflags="$(GO_LDFLAGS) -X $(KVM_PKG_NAME).builtAppVersion=$(VERSION_DEV)" \ $(GO_BUILD_ARGS) \ -c -o $(BIN_DIR)/tests/$$test_filename $$test; \ @@ -111,9 +199,10 @@ dev_release: frontend build_dev rclone copyto bin/jetkvm_app r2://jetkvm-update/app/$(VERSION_DEV)/jetkvm_app rclone copyto bin/jetkvm_app.sha256 r2://jetkvm-update/app/$(VERSION_DEV)/jetkvm_app.sha256 -build_release: frontend build_native +build_release: frontend build_native build_audio_deps build_audio_binaries + $(CLEAN_GO_CACHE) @echo "Building release..." - $(GO_CMD) build \ + go build \ -ldflags="$(GO_LDFLAGS) -X $(KVM_PKG_NAME).builtAppVersion=$(VERSION)" \ $(GO_RELEASE_BUILD_ARGS) \ -o bin/jetkvm_app cmd/main.go @@ -127,4 +216,39 @@ release: @echo "Uploading release..." @shasum -a 256 bin/jetkvm_app | cut -d ' ' -f 1 > bin/jetkvm_app.sha256 rclone copyto bin/jetkvm_app r2://jetkvm-update/app/$(VERSION)/jetkvm_app - rclone copyto bin/jetkvm_app.sha256 r2://jetkvm-update/app/$(VERSION)/jetkvm_app.sha256 \ No newline at end of file + rclone copyto bin/jetkvm_app.sha256 r2://jetkvm-update/app/$(VERSION)/jetkvm_app.sha256 + +# Run both Go and UI linting +lint: lint-go lint-ui + @echo "All linting completed successfully!" + +# Run golangci-lint locally with the same configuration as CI +lint-go: build_audio_deps + @echo "Running golangci-lint..." + @mkdir -p static && touch static/.gitkeep + golangci-lint run --verbose + +# Run both Go and UI linting with auto-fix +lint-fix: lint-go-fix lint-ui-fix + @echo "All linting with auto-fix completed successfully!" + +# Run golangci-lint with auto-fix +lint-go-fix: build_audio_deps + @echo "Running golangci-lint with auto-fix..." + @mkdir -p static && touch static/.gitkeep + golangci-lint run --fix --verbose + +# Run UI linting locally (mirrors GitHub workflow ui-lint.yml) +lint-ui: + @echo "Running UI lint..." + @cd ui && npm ci + @cd ui && npm run lint + +# Run UI linting with auto-fix +lint-ui-fix: + @echo "Running UI lint with auto-fix..." + @cd ui && npm ci + @cd ui && npm run lint:fix + +# Legacy alias for UI linting (for backward compatibility) +ui-lint: lint-ui diff --git a/audio.go b/audio.go new file mode 100644 index 00000000..c0c5f69a --- /dev/null +++ b/audio.go @@ -0,0 +1,351 @@ +package kvm + +import ( + "io" + "sync" + "sync/atomic" + + "github.com/jetkvm/kvm/internal/audio" + "github.com/jetkvm/kvm/internal/logging" + "github.com/pion/webrtc/v4" + "github.com/rs/zerolog" +) + +const ( + socketPathOutput = "/var/run/audio_output.sock" + socketPathInput = "/var/run/audio_input.sock" +) + +var ( + audioMutex sync.Mutex + outputSupervisor *audio.Supervisor + inputSupervisor *audio.Supervisor + outputClient *audio.IPCClient + inputClient *audio.IPCClient + outputRelay *audio.OutputRelay + inputRelay *audio.InputRelay + audioInitialized bool + activeConnections atomic.Int32 + audioLogger zerolog.Logger + currentAudioTrack *webrtc.TrackLocalStaticSample + inputTrackHandling atomic.Bool + useUSBForAudioOutput bool + audioOutputEnabled atomic.Bool + audioInputEnabled atomic.Bool +) + +func initAudio() { + audioLogger = logging.GetDefaultLogger().With().Str("component", "audio-manager").Logger() + + if err := audio.ExtractEmbeddedBinaries(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to extract audio binaries") + return + } + + // Load audio output source from config + ensureConfigLoaded() + useUSBForAudioOutput = config.AudioOutputSource == "usb" + + // Enable both by default + audioOutputEnabled.Store(true) + audioInputEnabled.Store(true) + + audioLogger.Debug(). + Str("source", config.AudioOutputSource). + Msg("Audio subsystem initialized") + audioInitialized = true +} + +// startAudioSubprocesses starts audio subprocesses and relays (skips already running ones) +func startAudioSubprocesses() error { + audioMutex.Lock() + defer audioMutex.Unlock() + + if !audioInitialized { + audioLogger.Warn().Msg("Audio not initialized, skipping subprocess start") + return nil + } + + // Start output subprocess if not running and enabled + if outputSupervisor == nil && audioOutputEnabled.Load() { + alsaDevice := "hw:0,0" // HDMI + if useUSBForAudioOutput { + alsaDevice = "hw:1,0" // USB + } + + outputSupervisor = audio.NewSupervisor( + "audio-output", + audio.GetAudioOutputBinaryPath(), + socketPathOutput, + []string{ + "ALSA_CAPTURE_DEVICE=" + alsaDevice, + "OPUS_BITRATE=128000", + "OPUS_COMPLEXITY=2", + }, + ) + + if err := outputSupervisor.Start(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start audio output supervisor") + outputSupervisor = nil + return err + } + + outputClient = audio.NewIPCClient("audio-output", socketPathOutput, 0x4A4B4F55) + + if currentAudioTrack != nil { + outputRelay = audio.NewOutputRelay(outputClient, currentAudioTrack) + if err := outputRelay.Start(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start audio output relay") + } + } + } + + // Start input subprocess if not running, USB audio enabled, and input enabled + ensureConfigLoaded() + if inputSupervisor == nil && audioInputEnabled.Load() && config.UsbDevices != nil && config.UsbDevices.Audio { + inputSupervisor = audio.NewSupervisor( + "audio-input", + audio.GetAudioInputBinaryPath(), + socketPathInput, + []string{ + "ALSA_PLAYBACK_DEVICE=hw:1,0", + "OPUS_BITRATE=128000", + }, + ) + + if err := inputSupervisor.Start(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start input supervisor") + inputSupervisor = nil + return err + } + + inputClient = audio.NewIPCClient("audio-input", socketPathInput, 0x4A4B4D49) + inputRelay = audio.NewInputRelay(inputClient) + if err := inputRelay.Start(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start input relay") + } + } + + return nil +} + +// stopOutputSubprocessLocked stops output subprocess (assumes mutex is held) +func stopOutputSubprocessLocked() { + if outputRelay != nil { + outputRelay.Stop() + outputRelay = nil + } + if outputClient != nil { + outputClient.Disconnect() + outputClient = nil + } + if outputSupervisor != nil { + outputSupervisor.Stop() + outputSupervisor = nil + } +} + +// stopInputSubprocessLocked stops input subprocess (assumes mutex is held) +func stopInputSubprocessLocked() { + if inputRelay != nil { + inputRelay.Stop() + inputRelay = nil + } + if inputClient != nil { + inputClient.Disconnect() + inputClient = nil + } + if inputSupervisor != nil { + inputSupervisor.Stop() + inputSupervisor = nil + } +} + +// stopAudioSubprocessesLocked stops all audio subprocesses (assumes mutex is held) +func stopAudioSubprocessesLocked() { + stopOutputSubprocessLocked() + stopInputSubprocessLocked() +} + +// stopAudioSubprocesses stops all audio subprocesses +func stopAudioSubprocesses() { + audioMutex.Lock() + defer audioMutex.Unlock() + stopAudioSubprocessesLocked() +} + +func onWebRTCConnect() { + count := activeConnections.Add(1) + if count == 1 { + if err := startAudioSubprocesses(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start audio subprocesses") + } + } +} + +func onWebRTCDisconnect() { + count := activeConnections.Add(-1) + if count == 0 { + // Stop audio immediately to release HDMI audio device which shares hardware with video device + stopAudioSubprocesses() + } +} + +func setAudioTrack(audioTrack *webrtc.TrackLocalStaticSample) { + audioMutex.Lock() + defer audioMutex.Unlock() + + currentAudioTrack = audioTrack + + if outputRelay != nil { + outputRelay.Stop() + outputRelay = nil + } + + if outputClient != nil { + outputRelay = audio.NewOutputRelay(outputClient, audioTrack) + if err := outputRelay.Start(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to start output relay") + } + } +} + +// SetAudioOutputSource switches between HDMI and USB audio output +func SetAudioOutputSource(useUSB bool) error { + audioMutex.Lock() + defer audioMutex.Unlock() + + if useUSBForAudioOutput == useUSB { + return nil + } + + useUSBForAudioOutput = useUSB + + ensureConfigLoaded() + if useUSB { + config.AudioOutputSource = "usb" + } else { + config.AudioOutputSource = "hdmi" + } + if err := SaveConfig(); err != nil { + audioLogger.Error().Err(err).Msg("Failed to save config") + return err + } + + stopOutputSubprocessLocked() + + // Restart if there are active connections + if activeConnections.Load() > 0 { + audioMutex.Unlock() + err := startAudioSubprocesses() + audioMutex.Lock() + if err != nil { + audioLogger.Error().Err(err).Msg("Failed to restart audio output") + return err + } + } + + return nil +} + +func setPendingInputTrack(track *webrtc.TrackRemote) { + audioMutex.Lock() + defer audioMutex.Unlock() + + // Start input track handler only once per WebRTC session + if inputTrackHandling.CompareAndSwap(false, true) { + go handleInputTrackForSession(track) + } +} + +// SetAudioOutputEnabled enables or disables audio output +func SetAudioOutputEnabled(enabled bool) error { + if audioOutputEnabled.Swap(enabled) == enabled { + return nil // Already in desired state + } + + if enabled { + if activeConnections.Load() > 0 { + return startAudioSubprocesses() + } + } else { + audioMutex.Lock() + stopOutputSubprocessLocked() + audioMutex.Unlock() + } + + return nil +} + +// SetAudioInputEnabled enables or disables audio input +func SetAudioInputEnabled(enabled bool) error { + if audioInputEnabled.Swap(enabled) == enabled { + return nil // Already in desired state + } + + if enabled { + if activeConnections.Load() > 0 { + return startAudioSubprocesses() + } + } else { + audioMutex.Lock() + stopInputSubprocessLocked() + audioMutex.Unlock() + } + + return nil +} + +// handleInputTrackForSession runs for the entire WebRTC session lifetime +// It continuously reads from the track and sends to whatever relay is currently active +func handleInputTrackForSession(track *webrtc.TrackRemote) { + defer inputTrackHandling.Store(false) + + audioLogger.Debug(). + Str("codec", track.Codec().MimeType). + Str("track_id", track.ID()). + Msg("starting session-lifetime track handler") + + for { + // Read RTP packet (must always read to keep track alive) + rtpPacket, _, err := track.ReadRTP() + if err != nil { + if err == io.EOF { + audioLogger.Debug().Msg("audio track ended") + return + } + audioLogger.Warn().Err(err).Msg("failed to read RTP packet") + continue + } + + // Extract Opus payload + opusData := rtpPacket.Payload + if len(opusData) == 0 { + continue + } + + // Only send if input is enabled + if !audioInputEnabled.Load() { + continue // Drop frame but keep reading + } + + // Get client in single mutex operation (hot path optimization) + audioMutex.Lock() + client := inputClient + audioMutex.Unlock() + + if client == nil { + continue // No relay, drop frame but keep reading + } + + if !client.IsConnected() { + if err := client.Connect(); err != nil { + continue + } + } + + if err := client.WriteMessage(0, opusData); err != nil { + client.Disconnect() + } + } +} diff --git a/config.go b/config.go index c83ccfc7..2ae9d8bb 100644 --- a/config.go +++ b/config.go @@ -104,6 +104,7 @@ type Config struct { UsbDevices *usbgadget.Devices `json:"usb_devices"` NetworkConfig *network.NetworkConfig `json:"network_config"` DefaultLogLevel string `json:"default_log_level"` + AudioOutputSource string `json:"audio_output_source"` // "hdmi" or "usb" } func (c *Config) GetDisplayRotation() uint16 { @@ -160,8 +161,9 @@ var defaultConfig = &Config{ Keyboard: true, MassStorage: true, }, - NetworkConfig: &network.NetworkConfig{}, - DefaultLogLevel: "INFO", + NetworkConfig: &network.NetworkConfig{}, + DefaultLogLevel: "INFO", + AudioOutputSource: "hdmi", } var ( diff --git a/internal/audio/c/audio.c b/internal/audio/c/audio.c new file mode 100644 index 00000000..a3ce3a95 --- /dev/null +++ b/internal/audio/c/audio.c @@ -0,0 +1,789 @@ +/* + * JetKVM Audio Processing Module + * + * Bidirectional audio processing optimized for ARM NEON SIMD: + * - OUTPUT PATH: TC358743 HDMI audio → Client speakers + * Pipeline: ALSA hw:0,0 capture → Opus encode (128kbps, FEC enabled) + * + * - INPUT PATH: Client microphone → Device speakers + * Pipeline: Opus decode (with FEC) → ALSA hw:1,0 playback + * + * Key features: + * - ARM NEON SIMD optimization for all audio operations + * - Opus in-band FEC for packet loss resilience + * - S16_LE @ 48kHz stereo, 20ms frames (960 samples) + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// ARM NEON SIMD support (always available on JetKVM's ARM Cortex-A7) +#include + +// RV1106 (Cortex-A7) has 64-byte cache lines +#define CACHE_LINE_SIZE 64 +#define SIMD_ALIGN __attribute__((aligned(16))) +#define CACHE_ALIGN __attribute__((aligned(CACHE_LINE_SIZE))) +#define SIMD_PREFETCH(addr, rw, locality) __builtin_prefetch(addr, rw, locality) + +// Compile-time trace logging - disabled for production (zero overhead) +#define TRACE_LOG(...) ((void)0) + +// ALSA device handles +static snd_pcm_t *pcm_capture_handle = NULL; // OUTPUT: TC358743 HDMI audio → client +static snd_pcm_t *pcm_playback_handle = NULL; // INPUT: Client microphone → device speakers + +// ALSA device names +static const char *alsa_capture_device = NULL; +static const char *alsa_playback_device = NULL; + +// Opus codec instances +static OpusEncoder *encoder = NULL; +static OpusDecoder *decoder = NULL; + +// Audio format (S16_LE @ 48kHz stereo) +static uint32_t sample_rate = 48000; +static uint8_t channels = 2; +static uint16_t frame_size = 960; // 20ms frames at 48kHz + +static uint32_t opus_bitrate = 128000; +static uint8_t opus_complexity = 2; +static uint16_t max_packet_size = 1500; + +// Opus encoder constants (hardcoded for production) +#define OPUS_VBR 1 // VBR enabled +#define OPUS_VBR_CONSTRAINT 0 // Unconstrained VBR (better for low-volume signals) +#define OPUS_SIGNAL_TYPE 3002 // OPUS_SIGNAL_MUSIC (better transient handling) +#define OPUS_BANDWIDTH 1105 // OPUS_BANDWIDTH_FULLBAND (20kHz, enabled by 128kbps bitrate) +#define OPUS_DTX 0 // DTX disabled (prevents audio drops) +#define OPUS_LSB_DEPTH 16 // 16-bit depth + +// ALSA retry configuration +static uint32_t sleep_microseconds = 1000; +static uint32_t sleep_milliseconds = 1; // Precomputed: sleep_microseconds / 1000 +static uint8_t max_attempts_global = 5; +static uint32_t max_backoff_us_global = 500000; + +int jetkvm_audio_capture_init(); +void jetkvm_audio_capture_close(); +int jetkvm_audio_read_encode(void *opus_buf); + +int jetkvm_audio_playback_init(); +void jetkvm_audio_playback_close(); +int jetkvm_audio_decode_write(void *opus_buf, int opus_size); + +void update_audio_constants(uint32_t bitrate, uint8_t complexity, + uint32_t sr, uint8_t ch, uint16_t fs, uint16_t max_pkt, + uint32_t sleep_us, uint8_t max_attempts, uint32_t max_backoff); +void update_audio_decoder_constants(uint32_t sr, uint8_t ch, uint16_t fs, uint16_t max_pkt, + uint32_t sleep_us, uint8_t max_attempts, uint32_t max_backoff); +int update_opus_encoder_params(uint32_t bitrate, uint8_t complexity); + + +/** + * Sync encoder configuration from Go to C + */ +void update_audio_constants(uint32_t bitrate, uint8_t complexity, + uint32_t sr, uint8_t ch, uint16_t fs, uint16_t max_pkt, + uint32_t sleep_us, uint8_t max_attempts, uint32_t max_backoff) { + opus_bitrate = bitrate; + opus_complexity = complexity; + sample_rate = sr; + channels = ch; + frame_size = fs; + max_packet_size = max_pkt; + sleep_microseconds = sleep_us; + sleep_milliseconds = sleep_us / 1000; // Precompute for snd_pcm_wait + max_attempts_global = max_attempts; + max_backoff_us_global = max_backoff; +} + +/** + * Sync decoder configuration from Go to C (no encoder-only params) + */ +void update_audio_decoder_constants(uint32_t sr, uint8_t ch, uint16_t fs, uint16_t max_pkt, + uint32_t sleep_us, uint8_t max_attempts, uint32_t max_backoff) { + sample_rate = sr; + channels = ch; + frame_size = fs; + max_packet_size = max_pkt; + sleep_microseconds = sleep_us; + sleep_milliseconds = sleep_us / 1000; // Precompute for snd_pcm_wait + max_attempts_global = max_attempts; + max_backoff_us_global = max_backoff; +} + +/** + * Initialize ALSA device names from environment variables + * Must be called before jetkvm_audio_capture_init or jetkvm_audio_playback_init + */ +static void init_alsa_devices_from_env(void) { + if (alsa_capture_device == NULL) { + alsa_capture_device = getenv("ALSA_CAPTURE_DEVICE"); + if (alsa_capture_device == NULL || alsa_capture_device[0] == '\0') { + alsa_capture_device = "hw:0,0"; // Default to HDMI + } + } + if (alsa_playback_device == NULL) { + alsa_playback_device = getenv("ALSA_PLAYBACK_DEVICE"); + if (alsa_playback_device == NULL || alsa_playback_device[0] == '\0') { + alsa_playback_device = "hw:1,0"; // Default to USB gadget + } + } +} + +// SIMD-OPTIMIZED BUFFER OPERATIONS (ARM NEON) + +/** + * Clear audio buffer using NEON (16 samples/iteration with 2x unrolling) + */ +static inline void simd_clear_samples_s16(short * __restrict__ buffer, uint32_t samples) { + const int16x8_t zero = vdupq_n_s16(0); + uint32_t i = 0; + + // Process 16 samples at a time (2x unrolled for better pipeline utilization) + uint32_t simd_samples = samples & ~15U; + for (; i < simd_samples; i += 16) { + vst1q_s16(&buffer[i], zero); + vst1q_s16(&buffer[i + 8], zero); + } + + // Handle remaining 8 samples + if (i + 8 <= samples) { + vst1q_s16(&buffer[i], zero); + i += 8; + } + + // Scalar: remaining samples + for (; i < samples; i++) { + buffer[i] = 0; + } +} + +// INITIALIZATION STATE TRACKING + +static volatile sig_atomic_t capture_initializing = 0; +static volatile sig_atomic_t capture_initialized = 0; +static volatile sig_atomic_t playback_initializing = 0; +static volatile sig_atomic_t playback_initialized = 0; + +/** + * Update Opus encoder settings at runtime (does NOT modify FEC or hardcoded settings) + * @return 0 on success, -1 if not initialized, >0 if some settings failed + */ +int update_opus_encoder_params(uint32_t bitrate, uint8_t complexity) { + if (!encoder || !capture_initialized) { + return -1; + } + + // Update runtime-configurable parameters + opus_bitrate = bitrate; + opus_complexity = complexity; + + // Apply settings to encoder + int result = 0; + result |= opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)); + result |= opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)); + + return result; +} + +// ALSA UTILITY FUNCTIONS + +/** + * Open ALSA device with exponential backoff retry + * @return 0 on success, negative error code on failure + */ +// Helper: High-precision sleep using nanosleep (better than usleep) +static inline void precise_sleep_us(uint32_t microseconds) { + struct timespec ts = { + .tv_sec = microseconds / 1000000, + .tv_nsec = (microseconds % 1000000) * 1000 + }; + nanosleep(&ts, NULL); +} + +static int safe_alsa_open(snd_pcm_t **handle, const char *device, snd_pcm_stream_t stream) { + uint8_t attempt = 0; + int err; + uint32_t backoff_us = sleep_microseconds; + + while (attempt < max_attempts_global) { + err = snd_pcm_open(handle, device, stream, SND_PCM_NONBLOCK); + if (err >= 0) { + snd_pcm_nonblock(*handle, 0); + return 0; + } + + attempt++; + + // Exponential backoff with bit shift (faster than multiplication) + if (err == -EBUSY || err == -EAGAIN) { + precise_sleep_us(backoff_us); + backoff_us = (backoff_us << 1 < max_backoff_us_global) ? (backoff_us << 1) : max_backoff_us_global; + } else if (err == -ENODEV || err == -ENOENT) { + precise_sleep_us(backoff_us << 1); + backoff_us = (backoff_us << 1 < max_backoff_us_global) ? (backoff_us << 1) : max_backoff_us_global; + } else if (err == -EPERM || err == -EACCES) { + precise_sleep_us(backoff_us >> 1); + } else { + precise_sleep_us(backoff_us); + backoff_us = (backoff_us << 1 < max_backoff_us_global) ? (backoff_us << 1) : max_backoff_us_global; + } + } + return err; +} + +/** + * Configure ALSA device (S16_LE @ 48kHz stereo with optimized buffering) + * @param handle ALSA PCM handle + * @param device_name Unused (for debugging only) + * @return 0 on success, negative error code on failure + */ +static int configure_alsa_device(snd_pcm_t *handle, const char *device_name) { + snd_pcm_hw_params_t *params; + snd_pcm_sw_params_t *sw_params; + int err; + + if (!handle) return -1; + + snd_pcm_hw_params_alloca(¶ms); + snd_pcm_sw_params_alloca(&sw_params); + + err = snd_pcm_hw_params_any(handle, params); + if (err < 0) return err; + + err = snd_pcm_hw_params_set_access(handle, params, SND_PCM_ACCESS_RW_INTERLEAVED); + if (err < 0) return err; + + err = snd_pcm_hw_params_set_format(handle, params, SND_PCM_FORMAT_S16_LE); + if (err < 0) return err; + + err = snd_pcm_hw_params_set_channels(handle, params, channels); + if (err < 0) return err; + + err = snd_pcm_hw_params_set_rate(handle, params, sample_rate, 0); + if (err < 0) { + unsigned int rate = sample_rate; + err = snd_pcm_hw_params_set_rate_near(handle, params, &rate, 0); + if (err < 0) return err; + } + + snd_pcm_uframes_t period_size = frame_size; // Optimized: use full frame as period + if (period_size < 64) period_size = 64; + + err = snd_pcm_hw_params_set_period_size_near(handle, params, &period_size, 0); + if (err < 0) return err; + + snd_pcm_uframes_t buffer_size = period_size * 2; // Optimized: minimal buffer for low latency + err = snd_pcm_hw_params_set_buffer_size_near(handle, params, &buffer_size); + if (err < 0) return err; + + err = snd_pcm_hw_params(handle, params); + if (err < 0) return err; + + err = snd_pcm_sw_params_current(handle, sw_params); + if (err < 0) return err; + + err = snd_pcm_sw_params_set_start_threshold(handle, sw_params, period_size); + if (err < 0) return err; + + err = snd_pcm_sw_params_set_avail_min(handle, sw_params, period_size); + if (err < 0) return err; + + err = snd_pcm_sw_params(handle, sw_params); + if (err < 0) return err; + + return snd_pcm_prepare(handle); +} + +// AUDIO OUTPUT PATH FUNCTIONS (TC358743 HDMI Audio → Client Speakers) + +/** + * Initialize OUTPUT path (TC358743 HDMI capture → Opus encoder) + * Opens hw:0,0 (TC358743) and creates Opus encoder with optimized settings + * @return 0 on success, -EBUSY if initializing, -1/-2/-3 on errors + */ +int jetkvm_audio_capture_init() { + int err; + + init_alsa_devices_from_env(); + + if (__sync_bool_compare_and_swap(&capture_initializing, 0, 1) == 0) { + return -EBUSY; + } + + if (capture_initialized) { + capture_initializing = 0; + return 0; + } + + if (encoder) { + opus_encoder_destroy(encoder); + encoder = NULL; + } + if (pcm_capture_handle) { + snd_pcm_close(pcm_capture_handle); + pcm_capture_handle = NULL; + } + + err = safe_alsa_open(&pcm_capture_handle, alsa_capture_device, SND_PCM_STREAM_CAPTURE); + if (err < 0) { + fprintf(stderr, "Failed to open ALSA capture device %s: %s\n", + alsa_capture_device, snd_strerror(err)); + fflush(stderr); + capture_initializing = 0; + return -1; + } + + err = configure_alsa_device(pcm_capture_handle, "capture"); + if (err < 0) { + snd_pcm_close(pcm_capture_handle); + pcm_capture_handle = NULL; + capture_initializing = 0; + return -2; + } + + int opus_err = 0; + encoder = opus_encoder_create(sample_rate, channels, OPUS_APPLICATION_AUDIO, &opus_err); + if (!encoder || opus_err != OPUS_OK) { + if (pcm_capture_handle) { + snd_pcm_close(pcm_capture_handle); + pcm_capture_handle = NULL; + } + capture_initializing = 0; + return -3; + } + + // Configure encoder with optimized settings + opus_encoder_ctl(encoder, OPUS_SET_BITRATE(opus_bitrate)); + opus_encoder_ctl(encoder, OPUS_SET_COMPLEXITY(opus_complexity)); + opus_encoder_ctl(encoder, OPUS_SET_VBR(OPUS_VBR)); + opus_encoder_ctl(encoder, OPUS_SET_VBR_CONSTRAINT(OPUS_VBR_CONSTRAINT)); + opus_encoder_ctl(encoder, OPUS_SET_SIGNAL(OPUS_SIGNAL_TYPE)); + opus_encoder_ctl(encoder, OPUS_SET_BANDWIDTH(OPUS_BANDWIDTH)); + opus_encoder_ctl(encoder, OPUS_SET_DTX(OPUS_DTX)); + opus_encoder_ctl(encoder, OPUS_SET_LSB_DEPTH(OPUS_LSB_DEPTH)); + + opus_encoder_ctl(encoder, OPUS_SET_INBAND_FEC(1)); + opus_encoder_ctl(encoder, OPUS_SET_PACKET_LOSS_PERC(20)); + + capture_initialized = 1; + capture_initializing = 0; + return 0; +} + +/** + * Read HDMI audio, encode to Opus (OUTPUT path hot function) + * @param opus_buf Output buffer for encoded Opus packet + * @return >0 = Opus packet size in bytes, -1 = error + */ +__attribute__((hot)) int jetkvm_audio_read_encode(void * __restrict__ opus_buf) { + static short CACHE_ALIGN pcm_buffer[960 * 2]; // Cache-aligned + unsigned char * __restrict__ out = (unsigned char*)opus_buf; + int32_t pcm_rc, nb_bytes; + int32_t err = 0; + uint8_t recovery_attempts = 0; + const uint8_t max_recovery_attempts = 3; + + // Prefetch for write (out) and read (pcm_buffer) - RV1106 has small L1 cache + SIMD_PREFETCH(out, 1, 0); // Write, immediate use + SIMD_PREFETCH(pcm_buffer, 0, 0); // Read, immediate use + SIMD_PREFETCH(pcm_buffer + 64, 0, 1); // Prefetch next cache line + + if (__builtin_expect(!capture_initialized || !pcm_capture_handle || !encoder || !opus_buf, 0)) { + TRACE_LOG("[AUDIO_OUTPUT] jetkvm_audio_read_encode: Failed safety checks - capture_initialized=%d, pcm_capture_handle=%p, encoder=%p, opus_buf=%p\n", + capture_initialized, pcm_capture_handle, encoder, opus_buf); + return -1; + } + +retry_read: + // Read 960 frames (20ms) from ALSA capture device + pcm_rc = snd_pcm_readi(pcm_capture_handle, pcm_buffer, frame_size); + + if (__builtin_expect(pcm_rc < 0, 0)) { + if (pcm_rc == -EPIPE) { + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + return -1; + } + err = snd_pcm_prepare(pcm_capture_handle); + if (err < 0) { + snd_pcm_drop(pcm_capture_handle); + err = snd_pcm_prepare(pcm_capture_handle); + if (err < 0) return -1; + } + goto retry_read; + } else if (pcm_rc == -EAGAIN) { + // Wait for data to be available + snd_pcm_wait(pcm_capture_handle, sleep_milliseconds); + goto retry_read; + } else if (pcm_rc == -ESTRPIPE) { + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + return -1; + } + uint8_t resume_attempts = 0; + while ((err = snd_pcm_resume(pcm_capture_handle)) == -EAGAIN && resume_attempts < 10) { + snd_pcm_wait(pcm_capture_handle, sleep_milliseconds); + resume_attempts++; + } + if (err < 0) { + err = snd_pcm_prepare(pcm_capture_handle); + if (err < 0) return -1; + } + return 0; + } else if (pcm_rc == -ENODEV) { + return -1; + } else if (pcm_rc == -EIO) { + recovery_attempts++; + if (recovery_attempts <= max_recovery_attempts) { + snd_pcm_drop(pcm_capture_handle); + err = snd_pcm_prepare(pcm_capture_handle); + if (err >= 0) { + goto retry_read; + } + } + return -1; + } else { + recovery_attempts++; + if (recovery_attempts <= 1 && pcm_rc == -EINTR) { + goto retry_read; + } else if (recovery_attempts <= 1 && pcm_rc == -EBUSY) { + snd_pcm_wait(pcm_capture_handle, 1); // Wait 1ms for device + goto retry_read; + } + return -1; + } + } + + // Zero-pad if we got a short read + if (__builtin_expect(pcm_rc < frame_size, 0)) { + uint32_t remaining_samples = (frame_size - pcm_rc) * channels; + simd_clear_samples_s16(&pcm_buffer[pcm_rc * channels], remaining_samples); + } + + // Find peak amplitude with NEON SIMD + uint32_t total_samples = frame_size * channels; + int16x8_t vmax = vdupq_n_s16(0); + + uint32_t i; + for (i = 0; i + 8 <= total_samples; i += 8) { + int16x8_t v = vld1q_s16(&pcm_buffer[i]); + int16x8_t vabs = vabsq_s16(v); + vmax = vmaxq_s16(vmax, vabs); + } + + // Horizontal max reduction (manual for ARMv7) + int16x4_t vmax_low = vget_low_s16(vmax); + int16x4_t vmax_high = vget_high_s16(vmax); + int16x4_t vmax_reduced = vmax_s16(vmax_low, vmax_high); + vmax_reduced = vpmax_s16(vmax_reduced, vmax_reduced); + vmax_reduced = vpmax_s16(vmax_reduced, vmax_reduced); + int16_t peak = vget_lane_s16(vmax_reduced, 0); + + // Handle remaining samples + for (; i < total_samples; i++) { + int16_t abs_val = (pcm_buffer[i] < 0) ? -pcm_buffer[i] : pcm_buffer[i]; + if (abs_val > peak) peak = abs_val; + } + + // Apply gain if signal is weak (below -18dB = 4096) but above noise floor + // Noise gate: only apply gain if peak > 256 (below this is likely just noise) + // Target: boost to ~50% of range (16384) to improve SNR + if (peak > 256 && peak < 4096) { + float gain = 16384.0f / peak; + if (gain > 8.0f) gain = 8.0f; // Max 18dB boost + + // Apply gain with NEON and saturation + float32x4_t vgain = vdupq_n_f32(gain); + for (i = 0; i + 8 <= total_samples; i += 8) { + int16x8_t v = vld1q_s16(&pcm_buffer[i]); + + // Convert to float, apply gain, saturate back to int16 + int32x4_t v_low = vmovl_s16(vget_low_s16(v)); + int32x4_t v_high = vmovl_s16(vget_high_s16(v)); + + float32x4_t f_low = vcvtq_f32_s32(v_low); + float32x4_t f_high = vcvtq_f32_s32(v_high); + + f_low = vmulq_f32(f_low, vgain); + f_high = vmulq_f32(f_high, vgain); + + v_low = vcvtq_s32_f32(f_low); + v_high = vcvtq_s32_f32(f_high); + + // Saturate to int16 range + int16x4_t result_low = vqmovn_s32(v_low); + int16x4_t result_high = vqmovn_s32(v_high); + + vst1q_s16(&pcm_buffer[i], vcombine_s16(result_low, result_high)); + } + + // Handle remaining samples + for (; i < total_samples; i++) { + int32_t boosted = (int32_t)(pcm_buffer[i] * gain); + if (boosted > 32767) boosted = 32767; + if (boosted < -32768) boosted = -32768; + pcm_buffer[i] = (int16_t)boosted; + } + } + + nb_bytes = opus_encode(encoder, pcm_buffer, frame_size, out, max_packet_size); + return nb_bytes; +} + +// AUDIO INPUT PATH FUNCTIONS (Client Microphone → Device Speakers) + +/** + * Initialize INPUT path (Opus decoder → device speakers) + * Opens hw:1,0 (USB gadget) or "default" and creates Opus decoder + * @return 0 on success, -EBUSY if initializing, -1/-2 on errors + */ +int jetkvm_audio_playback_init() { + int err; + + init_alsa_devices_from_env(); + + if (__sync_bool_compare_and_swap(&playback_initializing, 0, 1) == 0) { + return -EBUSY; + } + + if (playback_initialized) { + playback_initializing = 0; + return 0; + } + + if (decoder) { + opus_decoder_destroy(decoder); + decoder = NULL; + } + if (pcm_playback_handle) { + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + } + + err = safe_alsa_open(&pcm_playback_handle, alsa_playback_device, SND_PCM_STREAM_PLAYBACK); + if (err < 0) { + fprintf(stderr, "Failed to open ALSA playback device %s: %s\n", + alsa_playback_device, snd_strerror(err)); + fflush(stderr); + err = safe_alsa_open(&pcm_playback_handle, "default", SND_PCM_STREAM_PLAYBACK); + if (err < 0) { + playback_initializing = 0; + return -1; + } + } + + err = configure_alsa_device(pcm_playback_handle, "playback"); + if (err < 0) { + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + playback_initializing = 0; + return -1; + } + + int opus_err = 0; + decoder = opus_decoder_create(sample_rate, channels, &opus_err); + if (!decoder || opus_err != OPUS_OK) { + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + playback_initializing = 0; + return -2; + } + + playback_initialized = 1; + playback_initializing = 0; + return 0; +} + +/** + * Decode Opus, write to device speakers (INPUT path hot function) + * Processing pipeline: Opus decode (with FEC) → ALSA playback with error recovery + * @param opus_buf Encoded Opus packet from client + * @param opus_size Size of Opus packet in bytes + * @return >0 = PCM frames written, 0 = frame skipped, -1/-2 = error + */ +__attribute__((hot)) int jetkvm_audio_decode_write(void * __restrict__ opus_buf, int32_t opus_size) { + static short CACHE_ALIGN pcm_buffer[960 * 2]; // Cache-aligned + unsigned char * __restrict__ in = (unsigned char*)opus_buf; + int32_t pcm_frames, pcm_rc, err = 0; + uint8_t recovery_attempts = 0; + const uint8_t max_recovery_attempts = 3; + + // Prefetch input buffer - locality 0 for immediate use + SIMD_PREFETCH(in, 0, 0); + + if (__builtin_expect(!playback_initialized || !pcm_playback_handle || !decoder || !opus_buf || opus_size <= 0, 0)) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Failed safety checks - playback_initialized=%d, pcm_playback_handle=%p, decoder=%p, opus_buf=%p, opus_size=%d\n", + playback_initialized, pcm_playback_handle, decoder, opus_buf, opus_size); + return -1; + } + + if (opus_size > max_packet_size) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Opus packet too large - size=%d, max=%d\n", opus_size, max_packet_size); + return -1; + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Processing Opus packet - size=%d bytes\n", opus_size); + + // Decode Opus packet to PCM (FEC automatically applied if embedded in packet) + // decode_fec=0 means normal decode (FEC data is used automatically when present) + pcm_frames = opus_decode(decoder, in, opus_size, pcm_buffer, frame_size, 0); + + if (__builtin_expect(pcm_frames < 0, 0)) { + // Decode failed - attempt packet loss concealment using FEC from previous packet + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Opus decode failed with error %d, attempting packet loss concealment\n", pcm_frames); + + // decode_fec=1 means use FEC data from the NEXT packet to reconstruct THIS lost packet + pcm_frames = opus_decode(decoder, NULL, 0, pcm_buffer, frame_size, 1); + if (pcm_frames < 0) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Packet loss concealment also failed with error %d\n", pcm_frames); + return -1; + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Packet loss concealment succeeded, recovered %d frames\n", pcm_frames); + } else + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Opus decode successful - decoded %d PCM frames\n", pcm_frames); + +retry_write: + // Write decoded PCM to ALSA playback device + pcm_rc = snd_pcm_writei(pcm_playback_handle, pcm_buffer, pcm_frames); + if (__builtin_expect(pcm_rc < 0, 0)) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: ALSA write failed with error %d (%s), attempt %d/%d\n", + pcm_rc, snd_strerror(pcm_rc), recovery_attempts + 1, max_recovery_attempts); + + if (pcm_rc == -EPIPE) { + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Buffer underrun recovery failed after %d attempts\n", max_recovery_attempts); + return -2; + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Buffer underrun detected, attempting recovery (attempt %d)\n", recovery_attempts); + err = snd_pcm_prepare(pcm_playback_handle); + if (err < 0) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: snd_pcm_prepare failed (%s), trying drop+prepare\n", snd_strerror(err)); + snd_pcm_drop(pcm_playback_handle); + err = snd_pcm_prepare(pcm_playback_handle); + if (err < 0) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: drop+prepare recovery failed (%s)\n", snd_strerror(err)); + return -2; + } + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Buffer underrun recovery successful, retrying write\n"); + goto retry_write; + } else if (pcm_rc == -ESTRPIPE) { + recovery_attempts++; + if (recovery_attempts > max_recovery_attempts) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Device suspend recovery failed after %d attempts\n", max_recovery_attempts); + return -2; + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Device suspended, attempting resume (attempt %d)\n", recovery_attempts); + uint8_t resume_attempts = 0; + while ((err = snd_pcm_resume(pcm_playback_handle)) == -EAGAIN && resume_attempts < 10) { + snd_pcm_wait(pcm_playback_handle, sleep_milliseconds); + resume_attempts++; + } + if (err < 0) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Device resume failed (%s), trying prepare fallback\n", snd_strerror(err)); + err = snd_pcm_prepare(pcm_playback_handle); + if (err < 0) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Prepare fallback failed (%s)\n", snd_strerror(err)); + return -2; + } + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Device suspend recovery successful, skipping frame\n"); + return 0; + } else if (pcm_rc == -ENODEV) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Device disconnected (ENODEV) - critical error\n"); + return -2; + } else if (pcm_rc == -EIO) { + recovery_attempts++; + if (recovery_attempts <= max_recovery_attempts) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: I/O error detected, attempting recovery\n"); + snd_pcm_drop(pcm_playback_handle); + err = snd_pcm_prepare(pcm_playback_handle); + if (err >= 0) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: I/O error recovery successful, retrying write\n"); + goto retry_write; + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: I/O error recovery failed (%s)\n", snd_strerror(err)); + } + return -2; + } else if (pcm_rc == -EAGAIN) { + recovery_attempts++; + if (recovery_attempts <= max_recovery_attempts) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Device not ready (EAGAIN), waiting and retrying\n"); + snd_pcm_wait(pcm_playback_handle, 1); // Wait 1ms + goto retry_write; + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Device not ready recovery failed after %d attempts\n", max_recovery_attempts); + return -2; + } else { + recovery_attempts++; + if (recovery_attempts <= 1 && (pcm_rc == -EINTR || pcm_rc == -EBUSY)) { + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Transient error %d (%s), retrying once\n", pcm_rc, snd_strerror(pcm_rc)); + snd_pcm_wait(pcm_playback_handle, 1); // Wait 1ms + goto retry_write; + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Unrecoverable error %d (%s)\n", pcm_rc, snd_strerror(pcm_rc)); + return -2; + } + } + TRACE_LOG("[AUDIO_INPUT] jetkvm_audio_decode_write: Successfully wrote %d PCM frames to device\n", pcm_frames); + return pcm_frames; +} + +// CLEANUP FUNCTIONS + +/** + * Close INPUT path (thread-safe with drain) + */ +void jetkvm_audio_playback_close() { + while (playback_initializing) { + sched_yield(); + } + + if (__sync_bool_compare_and_swap(&playback_initialized, 1, 0) == 0) { + return; + } + + if (decoder) { + opus_decoder_destroy(decoder); + decoder = NULL; + } + if (pcm_playback_handle) { + snd_pcm_drain(pcm_playback_handle); + snd_pcm_close(pcm_playback_handle); + pcm_playback_handle = NULL; + } +} + +/** + * Close OUTPUT path (thread-safe with drain) + */ +void jetkvm_audio_capture_close() { + while (capture_initializing) { + sched_yield(); + } + + if (__sync_bool_compare_and_swap(&capture_initialized, 1, 0) == 0) { + return; + } + + if (encoder) { + opus_encoder_destroy(encoder); + encoder = NULL; + } + if (pcm_capture_handle) { + snd_pcm_drain(pcm_capture_handle); + snd_pcm_close(pcm_capture_handle); + pcm_capture_handle = NULL; + } +} diff --git a/internal/audio/c/audio_common.c b/internal/audio/c/audio_common.c new file mode 100644 index 00000000..c0b54a1a --- /dev/null +++ b/internal/audio/c/audio_common.c @@ -0,0 +1,169 @@ +/* + * JetKVM Audio Common Utilities + * + * Shared functions used by both audio input and output servers + */ + +#include "audio_common.h" +#include "ipc_protocol.h" +#include +#include +#include +#include +#include +#include +#include + +// Forward declarations for encoder update (only in output server) +extern int update_opus_encoder_params(uint32_t bitrate, uint8_t complexity); + +// GLOBAL STATE FOR SIGNAL HANDLER + +// Pointer to the running flag that will be set to 0 on shutdown +static volatile sig_atomic_t *g_running_ptr = NULL; + +// SIGNAL HANDLERS + +static void signal_handler(int signo) { + if (signo == SIGTERM || signo == SIGINT) { + printf("Audio server: Received signal %d, shutting down...\n", signo); + if (g_running_ptr != NULL) { + *g_running_ptr = 0; + } + } +} + +void audio_common_setup_signal_handlers(volatile sig_atomic_t *running) { + g_running_ptr = running; + + struct sigaction sa; + memset(&sa, 0, sizeof(sa)); + sa.sa_handler = signal_handler; + sigemptyset(&sa.sa_mask); + sa.sa_flags = 0; + + sigaction(SIGTERM, &sa, NULL); + sigaction(SIGINT, &sa, NULL); + + // Ignore SIGPIPE (write to closed socket should return error, not crash) + signal(SIGPIPE, SIG_IGN); +} + + +int32_t audio_common_parse_env_int(const char *name, int32_t default_value) { + const char *str = getenv(name); + if (str == NULL || str[0] == '\0') { + return default_value; + } + return (int32_t)atoi(str); +} + +const char* audio_common_parse_env_string(const char *name, const char *default_value) { + const char *str = getenv(name); + if (str == NULL || str[0] == '\0') { + return default_value; + } + return str; +} + +// COMMON CONFIGURATION + +void audio_common_load_config(audio_config_t *config, int is_output) { + // ALSA device configuration + if (is_output) { + config->alsa_device = audio_common_parse_env_string("ALSA_CAPTURE_DEVICE", "hw:0,0"); + } else { + config->alsa_device = audio_common_parse_env_string("ALSA_PLAYBACK_DEVICE", "hw:1,0"); + } + + // Common Opus configuration + config->opus_bitrate = audio_common_parse_env_int("OPUS_BITRATE", 128000); + config->opus_complexity = audio_common_parse_env_int("OPUS_COMPLEXITY", 2); + + // Audio format + config->sample_rate = audio_common_parse_env_int("AUDIO_SAMPLE_RATE", 48000); + config->channels = audio_common_parse_env_int("AUDIO_CHANNELS", 2); + config->frame_size = audio_common_parse_env_int("AUDIO_FRAME_SIZE", 960); + + // Log configuration + printf("Audio %s Server Configuration:\n", is_output ? "Output" : "Input"); + printf(" ALSA Device: %s\n", config->alsa_device); + printf(" Sample Rate: %d Hz\n", config->sample_rate); + printf(" Channels: %d\n", config->channels); + printf(" Frame Size: %d samples\n", config->frame_size); + if (is_output) { + printf(" Opus Bitrate: %d bps\n", config->opus_bitrate); + printf(" Opus Complexity: %d\n", config->opus_complexity); + } +} + +void audio_common_print_startup(const char *server_name) { + printf("JetKVM %s Starting...\n", server_name); +} + +void audio_common_print_shutdown(const char *server_name) { + printf("Shutting down %s...\n", server_name); +} + + +int audio_common_handle_opus_config(const uint8_t *data, uint32_t length, int is_encoder) { + ipc_opus_config_t config; + + if (ipc_parse_opus_config(data, length, &config) != 0) { + fprintf(stderr, "Failed to parse Opus config\n"); + return -1; + } + + if (is_encoder) { + printf("Received Opus config: bitrate=%u, complexity=%u\n", + config.bitrate, config.complexity); + + int result = update_opus_encoder_params( + config.bitrate, + config.complexity + ); + + if (result != 0) { + fprintf(stderr, "Warning: Failed to apply Opus encoder parameters\n"); + } + } else { + printf("Received Opus config (informational): bitrate=%u, complexity=%u\n", + config.bitrate, config.complexity); + } + + return 0; +} + +// IPC MAIN LOOP HELPERS + +int audio_common_server_loop(int server_sock, volatile sig_atomic_t *running, + connection_handler_t handler) { + while (*running) { + printf("Waiting for client connection...\n"); + + int client_sock = accept(server_sock, NULL, NULL); + if (client_sock < 0) { + if (*running) { + fprintf(stderr, "Failed to accept client, retrying...\n"); + struct timespec ts = {1, 0}; // 1 second + nanosleep(&ts, NULL); + continue; + } + break; + } + + printf("Client connected (fd=%d)\n", client_sock); + + // Run handler with this client + handler(client_sock, running); + + // Close client connection + close(client_sock); + + if (*running) { + printf("Client disconnected, waiting for next client...\n"); + } + } + + return 0; +} diff --git a/internal/audio/c/audio_common.h b/internal/audio/c/audio_common.h new file mode 100644 index 00000000..4ce13587 --- /dev/null +++ b/internal/audio/c/audio_common.h @@ -0,0 +1,160 @@ +/* + * JetKVM Audio Common Utilities + * + * Shared functions used by both audio input and output servers + */ + +#ifndef JETKVM_AUDIO_COMMON_H +#define JETKVM_AUDIO_COMMON_H + +#include +#include + +// SHARED CONSTANTS + +// Audio processing parameters +#define AUDIO_MAX_PACKET_SIZE 1500 // Maximum Opus packet size +#define AUDIO_SLEEP_MICROSECONDS 1000 // Default sleep time in microseconds +#define AUDIO_MAX_ATTEMPTS 5 // Maximum retry attempts +#define AUDIO_MAX_BACKOFF_US 500000 // Maximum backoff in microseconds + +// Error handling +#define AUDIO_MAX_CONSECUTIVE_ERRORS 10 // Maximum consecutive errors before giving up + +// Performance monitoring +#define AUDIO_TRACE_MASK 0x3FF // Log every 1024th frame (bit mask for efficiency) + +// SIGNAL HANDLERS + +/** + * Setup signal handlers for graceful shutdown. + * Handles SIGTERM and SIGINT by setting the running flag to 0. + * Ignores SIGPIPE to prevent crashes on broken pipe writes. + * + * @param running Pointer to the volatile running flag to set on shutdown + */ +void audio_common_setup_signal_handlers(volatile sig_atomic_t *running); + + +/** + * Parse integer from environment variable. + * Returns default_value if variable is not set or empty. + * + * @param name Environment variable name + * @param default_value Default value if not set + * @return Parsed integer value or default + */ +int32_t audio_common_parse_env_int(const char *name, int32_t default_value); + +/** + * Parse string from environment variable. + * Returns default_value if variable is not set or empty. + * + * @param name Environment variable name + * @param default_value Default value if not set + * @return Environment variable value or default (not duplicated) + */ +const char* audio_common_parse_env_string(const char *name, const char *default_value); + + +// COMMON CONFIGURATION + +/** + * Common audio configuration structure + */ +typedef struct { + const char *alsa_device; // ALSA device path + int opus_bitrate; // Opus bitrate + int opus_complexity; // Opus complexity + int sample_rate; // Sample rate + int channels; // Number of channels + int frame_size; // Frame size in samples +} audio_config_t; + +/** + * Load common audio configuration from environment + * @param config Output configuration + * @param is_output true for output server, false for input + */ +void audio_common_load_config(audio_config_t *config, int is_output); + +/** + * Print server startup message + * @param server_name Name of the server (e.g., "Audio Output Server") + */ +void audio_common_print_startup(const char *server_name); + +/** + * Print server shutdown message + * @param server_name Name of the server + */ +void audio_common_print_shutdown(const char *server_name); + +// ERROR TRACKING + +/** + * Error tracking state for audio processing loops + */ +typedef struct { + uint8_t consecutive_errors; // Current consecutive error count + uint32_t frame_count; // Total frames processed +} audio_error_tracker_t; + +/** + * Initialize error tracker + */ +static inline void audio_error_tracker_init(audio_error_tracker_t *tracker) { + tracker->consecutive_errors = 0; + tracker->frame_count = 0; +} + +/** + * Record an error and check if we should give up + * Returns 1 if too many errors, 0 to continue + */ +static inline uint8_t audio_error_tracker_record_error(audio_error_tracker_t *tracker) { + tracker->consecutive_errors++; + return (tracker->consecutive_errors >= AUDIO_MAX_CONSECUTIVE_ERRORS) ? 1 : 0; +} + +/** + * Record success and increment frame count + */ +static inline void audio_error_tracker_record_success(audio_error_tracker_t *tracker) { + tracker->consecutive_errors = 0; + tracker->frame_count++; +} + +/** + * Check if we should log trace info for this frame + */ +static inline uint8_t audio_error_tracker_should_trace(audio_error_tracker_t *tracker) { + return ((tracker->frame_count & AUDIO_TRACE_MASK) == 1) ? 1 : 0; +} + + +/** + * Parse Opus config message and optionally apply to encoder. + * @param data Raw message data + * @param length Message length + * @param is_encoder If true, apply config to encoder (output server) + * @return 0 on success, -1 on error + */ +int audio_common_handle_opus_config(const uint8_t *data, uint32_t length, int is_encoder); + +// IPC MAIN LOOP HELPERS + +/** + * Common server accept loop with signal handling. + * Accepts clients and calls handler function for each connection. + * + * @param server_sock Server socket from ipc_create_server + * @param running Pointer to running flag (set to 0 on shutdown) + * @param handler Connection handler function + * @return 0 on clean shutdown, -1 on error + */ +typedef int (*connection_handler_t)(int client_sock, volatile sig_atomic_t *running); +int audio_common_server_loop(int server_sock, volatile sig_atomic_t *running, + connection_handler_t handler); + +#endif // JETKVM_AUDIO_COMMON_H diff --git a/internal/audio/c/ipc_protocol.c b/internal/audio/c/ipc_protocol.c new file mode 100644 index 00000000..0b997e13 --- /dev/null +++ b/internal/audio/c/ipc_protocol.c @@ -0,0 +1,328 @@ +/* + * JetKVM Audio IPC Protocol Implementation + * + * Implements Unix domain socket communication with exact byte-level + * compatibility with Go implementation in internal/audio/ipc_*.go + */ + +#include "ipc_protocol.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// HELPER FUNCTIONS + +/** + * Read exactly N bytes from socket (loops until complete or error). + * This is critical because read() may return partial data. + */ +int ipc_read_full(int sock, void *buf, size_t len) { + uint8_t *ptr = (uint8_t *)buf; + size_t remaining = len; + + while (remaining > 0) { + ssize_t n = read(sock, ptr, remaining); + + if (n < 0) { + if (errno == EINTR) { + continue; // Interrupted by signal, retry + } + return -1; + } + + if (n == 0) { + return -1; // Connection closed + } + + ptr += n; + remaining -= n; + } + + return 0; // Success +} + + +// MESSAGE READ/WRITE + +/** + * Read a complete IPC message from socket. + * Returns 0 on success, -1 on error. + * Caller MUST free msg->data if non-NULL! + */ +int ipc_read_message(int sock, ipc_message_t *msg, uint32_t expected_magic) { + if (msg == NULL) { + return -1; + } + + // Initialize message + memset(msg, 0, sizeof(ipc_message_t)); + + // 1. Read header (9 bytes) + if (ipc_read_full(sock, &msg->header, IPC_HEADER_SIZE) != 0) { + return -1; + } + + // 2. Convert from little-endian (required on big-endian systems) + msg->header.magic = le32toh(msg->header.magic); + msg->header.length = le32toh(msg->header.length); + + // 3. Validate magic number + if (msg->header.magic != expected_magic) { + fprintf(stderr, "IPC: Invalid magic number: got 0x%08X, expected 0x%08X\n", + msg->header.magic, expected_magic); + return -1; + } + + // 4. Validate length + if (msg->header.length > IPC_MAX_FRAME_SIZE) { + fprintf(stderr, "IPC: Message too large: %u bytes (max %d)\n", + msg->header.length, IPC_MAX_FRAME_SIZE); + return -1; + } + + // 5. Read payload if present + if (msg->header.length > 0) { + msg->data = malloc(msg->header.length); + if (msg->data == NULL) { + fprintf(stderr, "IPC: Failed to allocate %u bytes for payload\n", + msg->header.length); + return -1; + } + + if (ipc_read_full(sock, msg->data, msg->header.length) != 0) { + free(msg->data); + msg->data = NULL; + return -1; + } + } + + return 0; // Success +} + +/** + * Read a complete IPC message using pre-allocated buffer (zero-copy). + */ +int ipc_read_message_zerocopy(int sock, ipc_message_t *msg, uint32_t expected_magic, + uint8_t *buffer, uint32_t buffer_size) { + if (msg == NULL || buffer == NULL) { + return -1; + } + + // Initialize message + memset(msg, 0, sizeof(ipc_message_t)); + + // 1. Read header (9 bytes) + if (ipc_read_full(sock, &msg->header, IPC_HEADER_SIZE) != 0) { + return -1; + } + + // 2. Convert from little-endian + msg->header.magic = le32toh(msg->header.magic); + msg->header.length = le32toh(msg->header.length); + + // 3. Validate magic number + if (msg->header.magic != expected_magic) { + return -1; + } + + // 4. Validate length + if (msg->header.length > IPC_MAX_FRAME_SIZE || msg->header.length > buffer_size) { + return -1; + } + + // 5. Read payload directly into provided buffer (zero-copy) + if (msg->header.length > 0) { + if (ipc_read_full(sock, buffer, msg->header.length) != 0) { + return -1; + } + msg->data = buffer; // Point to provided buffer, no allocation + } + + return 0; // Success +} + +/** + * Write a complete IPC message to socket. + * Uses writev() for atomic header+payload write. + * Returns 0 on success, -1 on error. + */ +int ipc_write_message(int sock, uint32_t magic, uint8_t type, + const uint8_t *data, uint32_t length) { + // Validate length + if (length > IPC_MAX_FRAME_SIZE) { + fprintf(stderr, "IPC: Message too large: %u bytes (max %d)\n", + length, IPC_MAX_FRAME_SIZE); + return -1; + } + + // Prepare header + ipc_header_t header; + header.magic = htole32(magic); + header.type = type; + header.length = htole32(length); + + // Use writev for atomic write (if possible) + struct iovec iov[2]; + iov[0].iov_base = &header; + iov[0].iov_len = IPC_HEADER_SIZE; + iov[1].iov_base = (void *)data; + iov[1].iov_len = length; + + int iovcnt = (length > 0) ? 2 : 1; + size_t total_len = IPC_HEADER_SIZE + length; + + ssize_t written = writev(sock, iov, iovcnt); + + if (written < 0) { + if (errno == EINTR) { + // Retry once on interrupt + written = writev(sock, iov, iovcnt); + } + + if (written < 0) { + perror("IPC: writev failed"); + return -1; + } + } + + if ((size_t)written != total_len) { + fprintf(stderr, "IPC: Partial write: %zd/%zu bytes\n", written, total_len); + return -1; + } + + return 0; // Success +} + + +/** + * Parse Opus configuration from message data (36 bytes, little-endian). + */ +int ipc_parse_opus_config(const uint8_t *data, uint32_t length, ipc_opus_config_t *config) { + if (data == NULL || config == NULL) { + return -1; + } + + if (length != 36) { + fprintf(stderr, "IPC: Invalid Opus config size: %u bytes (expected 36)\n", length); + return -1; + } + + // Parse little-endian uint32 fields + const uint32_t *u32_data = (const uint32_t *)data; + config->sample_rate = le32toh(u32_data[0]); + config->channels = le32toh(u32_data[1]); + config->frame_size = le32toh(u32_data[2]); + config->bitrate = le32toh(u32_data[3]); + config->complexity = le32toh(u32_data[4]); + config->vbr = le32toh(u32_data[5]); + config->signal_type = le32toh(u32_data[6]); + config->bandwidth = le32toh(u32_data[7]); + config->dtx = le32toh(u32_data[8]); + + return 0; // Success +} + +/** + * Parse basic audio configuration from message data (12 bytes, little-endian). + */ +int ipc_parse_config(const uint8_t *data, uint32_t length, ipc_config_t *config) { + if (data == NULL || config == NULL) { + return -1; + } + + if (length != 12) { + fprintf(stderr, "IPC: Invalid config size: %u bytes (expected 12)\n", length); + return -1; + } + + // Parse little-endian uint32 fields + const uint32_t *u32_data = (const uint32_t *)data; + config->sample_rate = le32toh(u32_data[0]); + config->channels = le32toh(u32_data[1]); + config->frame_size = le32toh(u32_data[2]); + + return 0; // Success +} + +/** + * Free message resources. + */ +void ipc_free_message(ipc_message_t *msg) { + if (msg != NULL && msg->data != NULL) { + free(msg->data); + msg->data = NULL; + } +} + +// SOCKET MANAGEMENT + +/** + * Create Unix domain socket server. + */ +int ipc_create_server(const char *socket_path) { + if (socket_path == NULL) { + return -1; + } + + // 1. Create socket + int sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { + perror("IPC: socket() failed"); + return -1; + } + + // 2. Remove existing socket file (ignore errors) + unlink(socket_path); + + // 3. Bind to path + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + + if (strlen(socket_path) >= sizeof(addr.sun_path)) { + fprintf(stderr, "IPC: Socket path too long: %s\n", socket_path); + close(sock); + return -1; + } + + strncpy(addr.sun_path, socket_path, sizeof(addr.sun_path) - 1); + + if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("IPC: bind() failed"); + close(sock); + return -1; + } + + // 4. Listen with backlog=1 (single client) + if (listen(sock, 1) < 0) { + perror("IPC: listen() failed"); + close(sock); + return -1; + } + + printf("IPC: Server listening on %s\n", socket_path); + return sock; +} + +/** + * Accept client connection. + */ +int ipc_accept_client(int server_sock) { + int client_sock = accept(server_sock, NULL, NULL); + + if (client_sock < 0) { + perror("IPC: accept() failed"); + return -1; + } + + printf("IPC: Client connected (fd=%d)\n", client_sock); + return client_sock; +} diff --git a/internal/audio/c/ipc_protocol.h b/internal/audio/c/ipc_protocol.h new file mode 100644 index 00000000..c83dcc53 --- /dev/null +++ b/internal/audio/c/ipc_protocol.h @@ -0,0 +1,211 @@ +/* + * JetKVM Audio IPC Protocol + * + * Wire protocol for Unix domain socket communication between main process + * and audio subprocesses. This protocol is 100% compatible with the Go + * implementation in internal/audio/ipc_*.go + * + * CRITICAL: All multi-byte integers use LITTLE-ENDIAN byte order. + */ + +#ifndef JETKVM_IPC_PROTOCOL_H +#define JETKVM_IPC_PROTOCOL_H + +#include +#include + +// PROTOCOL CONSTANTS + +// Magic numbers (ASCII representation when read as little-endian) +#define IPC_MAGIC_OUTPUT 0x4A4B4F55 // "JKOU" - JetKVM Output (device → browser) +#define IPC_MAGIC_INPUT 0x4A4B4D49 // "JKMI" - JetKVM Microphone Input (browser → device) + +// Message types (matches Go UnifiedMessageType enum) +#define IPC_MSG_TYPE_OPUS_FRAME 0 // Audio frame data (Opus encoded) +#define IPC_MSG_TYPE_CONFIG 1 // Basic audio config (12 bytes) +#define IPC_MSG_TYPE_OPUS_CONFIG 2 // Complete Opus config (36 bytes) +#define IPC_MSG_TYPE_STOP 3 // Shutdown signal +#define IPC_MSG_TYPE_HEARTBEAT 4 // Keep-alive ping +#define IPC_MSG_TYPE_ACK 5 // Acknowledgment + +// Size constraints +#define IPC_HEADER_SIZE 9 // Fixed header size (reduced from 17) +#define IPC_MAX_FRAME_SIZE 1024 // Maximum payload size (128kbps @ 20ms = ~600 bytes worst case with VBR+FEC) + +// Socket paths +#define IPC_SOCKET_OUTPUT "/var/run/audio_output.sock" +#define IPC_SOCKET_INPUT "/var/run/audio_input.sock" + +// WIRE FORMAT STRUCTURES + +/** + * IPC message header (9 bytes, little-endian) + * + * Byte layout: + * [0-3] magic uint32_t LE Magic number (0x4A4B4F55 or 0x4A4B4D49) + * [4] type uint8_t Message type (0-5) + * [5-8] length uint32_t LE Payload size in bytes + * [9+] data uint8_t[] Variable payload + * + * CRITICAL: Must use __attribute__((packed)) to prevent padding. + * + * NOTE: Timestamp removed (was unused, saved 8 bytes per message) + */ +typedef struct __attribute__((packed)) { + uint32_t magic; // Magic number (LE) + uint8_t type; // Message type + uint32_t length; // Payload length in bytes (LE) +} ipc_header_t; + +/** + * Basic audio configuration (12 bytes) + * Message type: IPC_MSG_TYPE_CONFIG + * + * All fields are uint32_t little-endian. + */ +typedef struct __attribute__((packed)) { + uint32_t sample_rate; // Samples per second (e.g., 48000) + uint32_t channels; // Number of channels (e.g., 2 for stereo) + uint32_t frame_size; // Samples per frame (e.g., 960) +} ipc_config_t; + +/** + * Complete Opus encoder/decoder configuration (36 bytes) + * Message type: IPC_MSG_TYPE_OPUS_CONFIG + * + * All fields are uint32_t little-endian. + * Note: Negative values (like signal_type=-1000) are stored as two's complement uint32. + */ +typedef struct __attribute__((packed)) { + uint32_t sample_rate; // Samples per second (48000) + uint32_t channels; // Number of channels (2) + uint32_t frame_size; // Samples per frame per channel (960 = 20ms @ 48kHz) + uint32_t bitrate; // Bits per second (128000) + uint32_t complexity; // Encoder complexity 0-10 (2=balanced quality/speed) + uint32_t vbr; // Variable bitrate: 0=disabled, 1=enabled + uint32_t signal_type; // Signal type: -1000=auto, 3001=voice, 3002=music + uint32_t bandwidth; // Bandwidth: 1101=narrowband, 1102=mediumband, 1103=wideband, 1104=superwideband, 1105=fullband + uint32_t dtx; // Discontinuous transmission: 0=disabled, 1=enabled +} ipc_opus_config_t; + +/** + * Complete IPC message (header + payload) + */ +typedef struct { + ipc_header_t header; + uint8_t *data; // Dynamically allocated payload (NULL if length=0) +} ipc_message_t; + + +/** + * Read a complete IPC message from socket. + * + * This function: + * 1. Reads exactly 9 bytes (header) + * 2. Validates magic number + * 3. Validates length <= IPC_MAX_FRAME_SIZE + * 4. Allocates and reads payload if length > 0 + * 5. Stores result in msg->header and msg->data + * + * @param sock Socket file descriptor + * @param msg Output message (data will be malloc'd if length > 0) + * @param expected_magic Expected magic number (IPC_MAGIC_OUTPUT or IPC_MAGIC_INPUT) + * @return 0 on success, -1 on error + * + * CALLER MUST FREE msg->data if non-NULL! + */ +int ipc_read_message(int sock, ipc_message_t *msg, uint32_t expected_magic); + +/** + * Read a complete IPC message using pre-allocated buffer (zero-copy). + * + * @param sock Socket file descriptor + * @param msg Message structure to fill + * @param expected_magic Expected magic number for validation + * @param buffer Pre-allocated buffer for message data + * @param buffer_size Size of pre-allocated buffer + * @return 0 on success, -1 on error + * + * msg->data will point to buffer (no allocation). Caller does NOT need to free. + */ +int ipc_read_message_zerocopy(int sock, ipc_message_t *msg, uint32_t expected_magic, + uint8_t *buffer, uint32_t buffer_size); + +/** + * Write a complete IPC message to socket. + * + * This function writes header + payload atomically (if possible via writev). + * + * @param sock Socket file descriptor + * @param magic Magic number (IPC_MAGIC_OUTPUT or IPC_MAGIC_INPUT) + * @param type Message type (IPC_MSG_TYPE_*) + * @param data Payload data (can be NULL if length=0) + * @param length Payload length in bytes + * @return 0 on success, -1 on error + */ +int ipc_write_message(int sock, uint32_t magic, uint8_t type, + const uint8_t *data, uint32_t length); + +/** + * Parse Opus configuration from message data. + * + * @param data Payload data (must be exactly 36 bytes) + * @param length Payload length (must be 36) + * @param config Output Opus configuration + * @return 0 on success, -1 if length != 36 + */ +int ipc_parse_opus_config(const uint8_t *data, uint32_t length, ipc_opus_config_t *config); + +/** + * Parse basic audio configuration from message data. + * + * @param data Payload data (must be exactly 12 bytes) + * @param length Payload length (must be 12) + * @param config Output audio configuration + * @return 0 on success, -1 if length != 12 + */ +int ipc_parse_config(const uint8_t *data, uint32_t length, ipc_config_t *config); + +/** + * Free message resources. + * + * @param msg Message to free (frees msg->data if non-NULL) + */ +void ipc_free_message(ipc_message_t *msg); + + +/** + * Create Unix domain socket server. + * + * This function: + * 1. Creates socket with AF_UNIX, SOCK_STREAM + * 2. Removes existing socket file + * 3. Binds to specified path + * 4. Listens with backlog=1 (single client) + * + * @param socket_path Path to Unix socket (e.g., "/var/run/audio_output.sock") + * @return Socket fd on success, -1 on error + */ +int ipc_create_server(const char *socket_path); + +/** + * Accept client connection with automatic retry. + * + * Blocks until client connects or error occurs. + * + * @param server_sock Server socket fd from ipc_create_server() + * @return Client socket fd on success, -1 on error + */ +int ipc_accept_client(int server_sock); + +/** + * Helper: Read exactly N bytes from socket (loops until complete or error). + * + * @param sock Socket file descriptor + * @param buf Output buffer + * @param len Number of bytes to read + * @return 0 on success, -1 on error + */ +int ipc_read_full(int sock, void *buf, size_t len); + +#endif // JETKVM_IPC_PROTOCOL_H diff --git a/internal/audio/c/jetkvm_audio_input.c b/internal/audio/c/jetkvm_audio_input.c new file mode 100644 index 00000000..e1d262be --- /dev/null +++ b/internal/audio/c/jetkvm_audio_input.c @@ -0,0 +1,169 @@ +/* + * JetKVM Audio Input Server + * + * Standalone C binary for audio input path: + * Browser → WebRTC → Go Process → IPC Receive → Opus Decode → ALSA Playback (USB Gadget) + * + * This replaces the Go subprocess that was running with --audio-input-server flag. + * + * IMPORTANT: This binary only does OPUS DECODING (not encoding). + * The browser already encodes audio to Opus before sending via WebRTC. + */ + +#include "ipc_protocol.h" +#include "audio_common.h" +#include +#include +#include +#include +#include + +// Forward declarations from audio.c +extern int jetkvm_audio_playback_init(void); +extern void jetkvm_audio_playback_close(void); +extern int jetkvm_audio_decode_write(void *opus_buf, int opus_size); +extern void update_audio_decoder_constants(uint32_t sr, uint8_t ch, uint16_t fs, uint16_t max_pkt, + uint32_t sleep_us, uint8_t max_attempts, uint32_t max_backoff); + + +static volatile sig_atomic_t g_running = 1; + + +/** + * Send ACK response for heartbeat messages. + */ +static inline int32_t send_ack(int32_t client_sock) { + return ipc_write_message(client_sock, IPC_MAGIC_INPUT, IPC_MSG_TYPE_ACK, NULL, 0); +} + + +/** + * Main audio decode and playback loop. + * Receives Opus frames via IPC, decodes, writes to ALSA. + */ +static int run_audio_loop(int client_sock, volatile sig_atomic_t *running) { + audio_error_tracker_t tracker; + audio_error_tracker_init(&tracker); + + // Static buffer for zero-copy IPC (no malloc/free per frame) + static uint8_t frame_buffer[IPC_MAX_FRAME_SIZE] __attribute__((aligned(64))); + + printf("Starting audio input loop...\n"); + + while (*running) { + ipc_message_t msg; + + if (ipc_read_message_zerocopy(client_sock, &msg, IPC_MAGIC_INPUT, + frame_buffer, sizeof(frame_buffer)) != 0) { + if (*running) { + fprintf(stderr, "Failed to read message from client\n"); + } + break; + } + + switch (msg.header.type) { + case IPC_MSG_TYPE_OPUS_FRAME: { + if (msg.header.length == 0 || msg.data == NULL) { + fprintf(stderr, "Warning: Empty Opus frame received\n"); + continue; + } + + int frames_written = jetkvm_audio_decode_write(msg.data, msg.header.length); + + if (frames_written < 0) { + fprintf(stderr, "Audio decode/write failed (error %d/%d)\n", + tracker.consecutive_errors + 1, AUDIO_MAX_CONSECUTIVE_ERRORS); + + if (audio_error_tracker_record_error(&tracker)) { + fprintf(stderr, "Too many consecutive errors, giving up\n"); + return -1; + } + } else { + audio_error_tracker_record_success(&tracker); + + if (audio_error_tracker_should_trace(&tracker)) { + printf("Processed frame %u (opus_size=%u, pcm_frames=%d)\n", + tracker.frame_count, msg.header.length, frames_written); + } + } + + break; + } + + case IPC_MSG_TYPE_CONFIG: + printf("Received basic audio config\n"); + send_ack(client_sock); + break; + + case IPC_MSG_TYPE_OPUS_CONFIG: + audio_common_handle_opus_config(msg.data, msg.header.length, 0); + send_ack(client_sock); + break; + + case IPC_MSG_TYPE_STOP: + printf("Received stop message\n"); + *running = 0; + return 0; + + case IPC_MSG_TYPE_HEARTBEAT: + send_ack(client_sock); + break; + + default: + printf("Warning: Unknown message type: %u\n", msg.header.type); + break; + } + } + + printf("Audio input loop ended after %u frames\n", tracker.frame_count); + return 0; +} + + +int main(int argc, char **argv) { + audio_common_print_startup("Audio Input Server"); + + // Setup signal handlers + audio_common_setup_signal_handlers(&g_running); + + // Load configuration from environment + audio_config_t config; + audio_common_load_config(&config, 0); // 0 = input server + + // Apply decoder constants to audio.c (encoder params not needed) + update_audio_decoder_constants( + config.sample_rate, + config.channels, + config.frame_size, + AUDIO_MAX_PACKET_SIZE, + AUDIO_SLEEP_MICROSECONDS, + AUDIO_MAX_ATTEMPTS, + AUDIO_MAX_BACKOFF_US + ); + + // Initialize audio playback (Opus decoder + ALSA playback) + printf("Initializing audio playback on device: %s\n", config.alsa_device); + if (jetkvm_audio_playback_init() != 0) { + fprintf(stderr, "Failed to initialize audio playback\n"); + return 1; + } + + // Create IPC server + int server_sock = ipc_create_server(IPC_SOCKET_INPUT); + if (server_sock < 0) { + fprintf(stderr, "Failed to create IPC server\n"); + jetkvm_audio_playback_close(); + return 1; + } + + // Main connection loop + audio_common_server_loop(server_sock, &g_running, run_audio_loop); + + audio_common_print_shutdown("audio input server"); + close(server_sock); + unlink(IPC_SOCKET_INPUT); + jetkvm_audio_playback_close(); + + printf("Audio input server exited cleanly\n"); + return 0; +} diff --git a/internal/audio/c/jetkvm_audio_output.c b/internal/audio/c/jetkvm_audio_output.c new file mode 100644 index 00000000..a459e4db --- /dev/null +++ b/internal/audio/c/jetkvm_audio_output.c @@ -0,0 +1,193 @@ +/* + * JetKVM Audio Output Server + * + * Standalone C binary for audio output path: + * ALSA Capture (TC358743 HDMI) → Opus Encode → IPC Send → Go Process → WebRTC → Browser + * + * This replaces the Go subprocess that was running with --audio-output-server flag. + */ + +#include "ipc_protocol.h" +#include "audio_common.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Forward declarations from audio.c +extern int jetkvm_audio_capture_init(void); +extern void jetkvm_audio_capture_close(void); +extern int jetkvm_audio_read_encode(void *opus_buf); +extern void update_audio_constants(uint32_t bitrate, uint8_t complexity, + uint32_t sr, uint8_t ch, uint16_t fs, uint16_t max_pkt, + uint32_t sleep_us, uint8_t max_attempts, uint32_t max_backoff); +extern int update_opus_encoder_params(uint32_t bitrate, uint8_t complexity); + + +static volatile sig_atomic_t g_running = 1; + + +static void load_output_config(audio_config_t *common) { + audio_common_load_config(common, 1); // 1 = output server +} + + +/** + * Handle incoming IPC messages from client (non-blocking). + * Returns 0 on success, -1 on error. + */ +static int handle_incoming_messages(int client_sock, volatile sig_atomic_t *running) { + // Static buffer for zero-copy IPC (control messages are small) + static uint8_t msg_buffer[IPC_MAX_FRAME_SIZE] __attribute__((aligned(64))); + + // Set non-blocking mode for client socket + int flags = fcntl(client_sock, F_GETFL, 0); + fcntl(client_sock, F_SETFL, flags | O_NONBLOCK); + + ipc_message_t msg; + + // Try to read message (non-blocking, zero-copy) + int result = ipc_read_message_zerocopy(client_sock, &msg, IPC_MAGIC_OUTPUT, + msg_buffer, sizeof(msg_buffer)); + + // Restore blocking mode + fcntl(client_sock, F_SETFL, flags); + + if (result != 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return 0; // No message available, not an error + } + return -1; + } + + switch (msg.header.type) { + case IPC_MSG_TYPE_OPUS_CONFIG: + audio_common_handle_opus_config(msg.data, msg.header.length, 1); + break; + + case IPC_MSG_TYPE_STOP: + printf("Received stop message\n"); + *running = 0; + break; + + case IPC_MSG_TYPE_HEARTBEAT: + break; + + default: + printf("Warning: Unknown message type: %u\n", msg.header.type); + break; + } + + return 0; +} + + +/** + * Main audio capture and encode loop. + * Continuously reads from ALSA, encodes to Opus, sends via IPC. + */ +static int run_audio_loop(int client_sock, volatile sig_atomic_t *running) { + uint8_t opus_buffer[IPC_MAX_FRAME_SIZE]; + audio_error_tracker_t tracker; + audio_error_tracker_init(&tracker); + + printf("Starting audio output loop...\n"); + + while (*running) { + if (handle_incoming_messages(client_sock, running) < 0) { + fprintf(stderr, "Client disconnected, waiting for reconnection...\n"); + break; + } + + int opus_size = jetkvm_audio_read_encode(opus_buffer); + + if (opus_size < 0) { + fprintf(stderr, "Audio read/encode failed (error %d/%d)\n", + tracker.consecutive_errors + 1, AUDIO_MAX_CONSECUTIVE_ERRORS); + + if (audio_error_tracker_record_error(&tracker)) { + fprintf(stderr, "Too many consecutive errors, giving up\n"); + return -1; + } + // No sleep needed - jetkvm_audio_read_encode already uses snd_pcm_wait internally + continue; + } + + if (opus_size == 0) { + // Frame skipped for recovery, minimal yield + sched_yield(); + continue; + } + + audio_error_tracker_record_success(&tracker); + + if (ipc_write_message(client_sock, IPC_MAGIC_OUTPUT, IPC_MSG_TYPE_OPUS_FRAME, + opus_buffer, opus_size) != 0) { + fprintf(stderr, "Failed to send frame to client\n"); + break; + } + + if (audio_error_tracker_should_trace(&tracker)) { + printf("Sent frame %u (size=%d bytes)\n", tracker.frame_count, opus_size); + } + } + + printf("Audio output loop ended after %u frames\n", tracker.frame_count); + return 0; +} + + +int main(int argc, char **argv) { + audio_common_print_startup("Audio Output Server"); + + // Setup signal handlers + audio_common_setup_signal_handlers(&g_running); + + // Load configuration from environment + audio_config_t common; + load_output_config(&common); + + // Apply audio constants to audio.c + update_audio_constants( + common.opus_bitrate, + common.opus_complexity, + common.sample_rate, + common.channels, + common.frame_size, + AUDIO_MAX_PACKET_SIZE, + AUDIO_SLEEP_MICROSECONDS, + AUDIO_MAX_ATTEMPTS, + AUDIO_MAX_BACKOFF_US + ); + + // Initialize audio capture + printf("Initializing audio capture on device: %s\n", common.alsa_device); + if (jetkvm_audio_capture_init() != 0) { + fprintf(stderr, "Failed to initialize audio capture\n"); + return 1; + } + + // Create IPC server + int server_sock = ipc_create_server(IPC_SOCKET_OUTPUT); + if (server_sock < 0) { + fprintf(stderr, "Failed to create IPC server\n"); + jetkvm_audio_capture_close(); + return 1; + } + + // Main connection loop + audio_common_server_loop(server_sock, &g_running, run_audio_loop); + + audio_common_print_shutdown("audio output server"); + close(server_sock); + unlink(IPC_SOCKET_OUTPUT); + jetkvm_audio_capture_close(); + + printf("Audio output server exited cleanly\n"); + return 0; +} diff --git a/internal/audio/embed.go b/internal/audio/embed.go new file mode 100644 index 00000000..9caf199b --- /dev/null +++ b/internal/audio/embed.go @@ -0,0 +1,80 @@ +package audio + +import ( + _ "embed" + "fmt" + "os" +) + +// Embedded C audio binaries (built during compilation) +// +//go:embed bin/jetkvm_audio_output +var audioOutputBinary []byte + +//go:embed bin/jetkvm_audio_input +var audioInputBinary []byte + +const ( + audioBinDir = "/userdata/jetkvm/bin" + audioOutputBinPath = audioBinDir + "/jetkvm_audio_output" + audioInputBinPath = audioBinDir + "/jetkvm_audio_input" + binaryFileMode = 0755 // rwxr-xr-x +) + +// ExtractEmbeddedBinaries extracts the embedded C audio binaries to disk +// This should be called during application startup before audio supervisors are started +func ExtractEmbeddedBinaries() error { + // Create bin directory if it doesn't exist + if err := os.MkdirAll(audioBinDir, 0755); err != nil { + return fmt.Errorf("failed to create audio bin directory: %w", err) + } + + // Extract audio output binary + if err := extractBinary(audioOutputBinary, audioOutputBinPath); err != nil { + return fmt.Errorf("failed to extract audio output binary: %w", err) + } + + // Extract audio input binary + if err := extractBinary(audioInputBinary, audioInputBinPath); err != nil { + return fmt.Errorf("failed to extract audio input binary: %w", err) + } + + return nil +} + +// extractBinary writes embedded binary data to disk with executable permissions +func extractBinary(data []byte, path string) error { + // Check if binary already exists and is valid + if info, err := os.Stat(path); err == nil { + // File exists - check if size matches + if info.Size() == int64(len(data)) { + // Binary already extracted and matches embedded version + return nil + } + // Size mismatch - need to update + } + + // Write to temporary file first for atomic replacement + tmpPath := path + ".tmp" + if err := os.WriteFile(tmpPath, data, binaryFileMode); err != nil { + return fmt.Errorf("failed to write binary to %s: %w", tmpPath, err) + } + + // Atomically rename to final path + if err := os.Rename(tmpPath, path); err != nil { + os.Remove(tmpPath) // Clean up on error + return fmt.Errorf("failed to rename binary to %s: %w", path, err) + } + + return nil +} + +// GetAudioOutputBinaryPath returns the path to the audio output binary +func GetAudioOutputBinaryPath() string { + return audioOutputBinPath +} + +// GetAudioInputBinaryPath returns the path to the audio input binary +func GetAudioInputBinaryPath() string { + return audioInputBinPath +} diff --git a/internal/audio/ipc.go b/internal/audio/ipc.go new file mode 100644 index 00000000..e9b50bb0 --- /dev/null +++ b/internal/audio/ipc.go @@ -0,0 +1,185 @@ +package audio + +import ( + "encoding/binary" + "fmt" + "io" + "net" + "sync" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// Buffer pool for zero-allocation writes +var writeBufferPool = sync.Pool{ + New: func() interface{} { + buf := make([]byte, ipcHeaderSize+ipcMaxFrameSize) + return &buf + }, +} + +// IPC Protocol constants (matches C implementation in ipc_protocol.h) +const ( + ipcMagicOutput = 0x4A4B4F55 // "JKOU" - Output (device → browser) + ipcMagicInput = 0x4A4B4D49 // "JKMI" - Input (browser → device) + ipcHeaderSize = 9 // Reduced from 17 (removed 8-byte timestamp) + ipcMaxFrameSize = 1024 // 128kbps @ 20ms = ~600 bytes worst case with VBR+FEC + ipcMsgTypeOpus = 0 + ipcMsgTypeConfig = 1 + ipcMsgTypeStop = 3 + connectTimeout = 5 * time.Second + readTimeout = 2 * time.Second +) + +// IPCClient manages Unix socket communication with audio subprocess +type IPCClient struct { + socketPath string + magicNumber uint32 + conn net.Conn + mu sync.Mutex + logger zerolog.Logger + readBuf []byte // Reusable buffer for reads (single reader per client) +} + +// NewIPCClient creates a new IPC client +// For output: socketPath="/var/run/audio_output.sock", magic=ipcMagicOutput +// For input: socketPath="/var/run/audio_input.sock", magic=ipcMagicInput +func NewIPCClient(name, socketPath string, magicNumber uint32) *IPCClient { + logger := logging.GetDefaultLogger().With().Str("component", name+"-ipc").Logger() + + return &IPCClient{ + socketPath: socketPath, + magicNumber: magicNumber, + logger: logger, + readBuf: make([]byte, ipcMaxFrameSize), + } +} + +// Connect establishes connection to the subprocess +func (c *IPCClient) Connect() error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + c.conn.Close() + c.conn = nil + } + + conn, err := net.DialTimeout("unix", c.socketPath, connectTimeout) + if err != nil { + return fmt.Errorf("failed to connect to %s: %w", c.socketPath, err) + } + + c.conn = conn + c.logger.Debug().Str("socket", c.socketPath).Msg("connected to subprocess") + return nil +} + +// Disconnect closes the connection +func (c *IPCClient) Disconnect() { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn != nil { + c.conn.Close() + c.conn = nil + c.logger.Debug().Msg("disconnected from subprocess") + } +} + +// IsConnected returns true if currently connected +func (c *IPCClient) IsConnected() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.conn != nil +} + +// ReadMessage reads a complete IPC message (header + payload) +// Returns message type, payload data, and error +// IMPORTANT: The returned payload slice is only valid until the next ReadMessage call. +// Callers must use the data immediately or copy if retention is needed. +func (c *IPCClient) ReadMessage() (uint8, []byte, error) { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + return 0, nil, fmt.Errorf("not connected") + } + + // Set read deadline + if err := c.conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { + return 0, nil, fmt.Errorf("failed to set read deadline: %w", err) + } + + // Read 9-byte header + var header [ipcHeaderSize]byte + if _, err := io.ReadFull(c.conn, header[:]); err != nil { + return 0, nil, fmt.Errorf("failed to read header: %w", err) + } + + // Parse header (little-endian) + magic := binary.LittleEndian.Uint32(header[0:4]) + msgType := header[4] + length := binary.LittleEndian.Uint32(header[5:9]) + + // Validate magic number + if magic != c.magicNumber { + return 0, nil, fmt.Errorf("invalid magic: got 0x%X, expected 0x%X", magic, c.magicNumber) + } + + // Validate length + if length > ipcMaxFrameSize { + return 0, nil, fmt.Errorf("message too large: %d bytes", length) + } + + // Read payload if present + if length == 0 { + return msgType, nil, nil + } + + // Read directly into reusable buffer (zero-allocation) + if _, err := io.ReadFull(c.conn, c.readBuf[:length]); err != nil { + return 0, nil, fmt.Errorf("failed to read payload: %w", err) + } + + // Return slice of readBuf - caller must use immediately, data is only valid until next ReadMessage + // This avoids allocation in hot path (50 frames/sec) + return msgType, c.readBuf[:length], nil +} + +// WriteMessage writes a complete IPC message +func (c *IPCClient) WriteMessage(msgType uint8, payload []byte) error { + c.mu.Lock() + defer c.mu.Unlock() + + if c.conn == nil { + return fmt.Errorf("not connected") + } + + length := uint32(len(payload)) + if length > ipcMaxFrameSize { + return fmt.Errorf("payload too large: %d bytes", length) + } + + // Get buffer from pool for zero-allocation write + bufPtr := writeBufferPool.Get().(*[]byte) + defer writeBufferPool.Put(bufPtr) + buf := *bufPtr + + // Build header in pooled buffer (9 bytes, little-endian) + binary.LittleEndian.PutUint32(buf[0:4], c.magicNumber) + buf[4] = msgType + binary.LittleEndian.PutUint32(buf[5:9], length) + + // Copy payload after header + copy(buf[ipcHeaderSize:], payload) + + // Write header + payload atomically + if _, err := c.conn.Write(buf[:ipcHeaderSize+length]); err != nil { + return fmt.Errorf("failed to write message: %w", err) + } + + return nil +} diff --git a/internal/audio/relay.go b/internal/audio/relay.go new file mode 100644 index 00000000..bda0db18 --- /dev/null +++ b/internal/audio/relay.go @@ -0,0 +1,152 @@ +package audio + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/pion/webrtc/v4" + "github.com/pion/webrtc/v4/pkg/media" + "github.com/rs/zerolog" +) + +// OutputRelay forwards audio from subprocess (HDMI) to WebRTC (browser) +type OutputRelay struct { + client *IPCClient + audioTrack *webrtc.TrackLocalStaticSample + ctx context.Context + cancel context.CancelFunc + logger zerolog.Logger + running atomic.Bool + sample media.Sample // Reusable sample for zero-allocation hot path + + // Stats (Uint32: overflows after 2.7 years @ 50fps, faster atomics on 32-bit ARM) + framesRelayed atomic.Uint32 + framesDropped atomic.Uint32 +} + +// NewOutputRelay creates a relay for output audio (device → browser) +func NewOutputRelay(client *IPCClient, audioTrack *webrtc.TrackLocalStaticSample) *OutputRelay { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", "audio-output-relay").Logger() + + return &OutputRelay{ + client: client, + audioTrack: audioTrack, + ctx: ctx, + cancel: cancel, + logger: logger, + sample: media.Sample{ + Duration: 20 * time.Millisecond, // Constant for all Opus frames + }, + } +} + +// Start begins relaying audio frames +func (r *OutputRelay) Start() error { + if r.running.Swap(true) { + return fmt.Errorf("output relay already running") + } + + go r.relayLoop() + r.logger.Debug().Msg("output relay started") + return nil +} + +// Stop stops the relay +func (r *OutputRelay) Stop() { + if !r.running.Swap(false) { + return + } + + r.cancel() + r.logger.Debug(). + Uint32("frames_relayed", r.framesRelayed.Load()). + Uint32("frames_dropped", r.framesDropped.Load()). + Msg("output relay stopped") +} + +// relayLoop continuously reads from IPC and writes to WebRTC +func (r *OutputRelay) relayLoop() { + const reconnectDelay = 1 * time.Second + + for r.running.Load() { + // Ensure connected + if !r.client.IsConnected() { + if err := r.client.Connect(); err != nil { + r.logger.Debug().Err(err).Msg("failed to connect, will retry") + time.Sleep(reconnectDelay) + continue + } + } + + // Read message from subprocess + msgType, payload, err := r.client.ReadMessage() + if err != nil { + // Connection error - reconnect + if r.running.Load() { + r.logger.Warn().Err(err).Msg("read error, reconnecting") + r.client.Disconnect() + time.Sleep(reconnectDelay) + } + continue + } + + // Handle message + if msgType == ipcMsgTypeOpus && len(payload) > 0 { + // Reuse sample struct (zero-allocation hot path) + r.sample.Data = payload + + if err := r.audioTrack.WriteSample(r.sample); err != nil { + r.framesDropped.Add(1) + r.logger.Warn().Err(err).Msg("failed to write sample to WebRTC") + } else { + r.framesRelayed.Add(1) + } + } + } +} + +// InputRelay forwards audio from WebRTC (browser microphone) to subprocess (USB audio) +type InputRelay struct { + client *IPCClient + ctx context.Context + cancel context.CancelFunc + logger zerolog.Logger + running atomic.Bool +} + +// NewInputRelay creates a relay for input audio (browser → device) +func NewInputRelay(client *IPCClient) *InputRelay { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", "audio-input-relay").Logger() + + return &InputRelay{ + client: client, + ctx: ctx, + cancel: cancel, + logger: logger, + } +} + +// Start begins relaying audio frames +func (r *InputRelay) Start() error { + if r.running.Swap(true) { + return fmt.Errorf("input relay already running") + } + + r.logger.Debug().Msg("input relay started") + return nil +} + +// Stop stops the relay +func (r *InputRelay) Stop() { + if !r.running.Swap(false) { + return + } + + r.cancel() + r.logger.Debug().Msg("input relay stopped") +} diff --git a/internal/audio/supervisor.go b/internal/audio/supervisor.go new file mode 100644 index 00000000..3b1ceb93 --- /dev/null +++ b/internal/audio/supervisor.go @@ -0,0 +1,187 @@ +package audio + +import ( + "bufio" + "context" + "fmt" + "io" + "os" + "os/exec" + "sync/atomic" + "time" + + "github.com/jetkvm/kvm/internal/logging" + "github.com/rs/zerolog" +) + +// Supervisor manages a subprocess lifecycle with automatic restart +type Supervisor struct { + name string + binaryPath string + socketPath string + env []string + + cmd *exec.Cmd + ctx context.Context + cancel context.CancelFunc + running atomic.Bool + done chan struct{} // Closed when supervision loop exits + logger zerolog.Logger + + // Restart state + restartCount uint8 + lastRestartAt time.Time + restartBackoff time.Duration +} + +const ( + minRestartDelay = 1 * time.Second + maxRestartDelay = 30 * time.Second + restartWindow = 5 * time.Minute // Reset backoff if process runs this long +) + +// NewSupervisor creates a new subprocess supervisor +func NewSupervisor(name, binaryPath, socketPath string, env []string) *Supervisor { + ctx, cancel := context.WithCancel(context.Background()) + logger := logging.GetDefaultLogger().With().Str("component", name).Logger() + + return &Supervisor{ + name: name, + binaryPath: binaryPath, + socketPath: socketPath, + env: env, + ctx: ctx, + cancel: cancel, + done: make(chan struct{}), + logger: logger, + restartBackoff: minRestartDelay, + } +} + +// Start begins supervising the subprocess +func (s *Supervisor) Start() error { + if s.running.Load() { + return fmt.Errorf("%s: already running", s.name) + } + + s.running.Store(true) + go s.supervisionLoop() + s.logger.Debug().Msg("supervisor started") + return nil +} + +// Stop gracefully stops the subprocess +func (s *Supervisor) Stop() { + if !s.running.Swap(false) { + return // Already stopped + } + + s.logger.Debug().Msg("stopping supervisor") + s.cancel() + + // Kill process if running + if s.cmd != nil && s.cmd.Process != nil { + _ = s.cmd.Process.Kill() // Ignore error, process may already be dead + } + + // Wait for supervision loop to exit + <-s.done + + // Clean up socket file + os.Remove(s.socketPath) + s.logger.Debug().Msg("supervisor stopped") +} + +// supervisionLoop manages the subprocess lifecycle +func (s *Supervisor) supervisionLoop() { + defer close(s.done) + + for s.running.Load() { + // Check if we should reset backoff (process ran long enough) + if !s.lastRestartAt.IsZero() && time.Since(s.lastRestartAt) > restartWindow { + s.restartCount = 0 + s.restartBackoff = minRestartDelay + s.logger.Debug().Msg("reset restart backoff after stable run") + } + + // Start the process + if err := s.startProcess(); err != nil { + s.logger.Error().Err(err).Msg("failed to start process") + } else { + // Wait for process to exit + err := s.cmd.Wait() + + if s.running.Load() { + // Process crashed (not intentional shutdown) + s.logger.Warn(). + Err(err). + Uint8("restart_count", s.restartCount). + Dur("backoff", s.restartBackoff). + Msg("process exited unexpectedly, will restart") + + s.restartCount++ + s.lastRestartAt = time.Now() + + // Calculate next backoff (exponential: 1s, 2s, 4s, 8s, 16s, 30s) + s.restartBackoff *= 2 + if s.restartBackoff > maxRestartDelay { + s.restartBackoff = maxRestartDelay + } + + // Wait before restart + select { + case <-time.After(s.restartBackoff): + // Continue to next iteration + case <-s.ctx.Done(): + return // Shutting down + } + } else { + // Intentional shutdown + s.logger.Debug().Msg("process exited cleanly") + return + } + } + } +} + +// logPipe reads from a pipe and logs each line at debug level +func (s *Supervisor) logPipe(reader io.ReadCloser, stream string) { + scanner := bufio.NewScanner(reader) + for scanner.Scan() { + line := scanner.Text() + s.logger.Debug().Str("stream", stream).Msg(line) + } + reader.Close() +} + +// startProcess starts the subprocess +func (s *Supervisor) startProcess() error { + s.cmd = exec.CommandContext(s.ctx, s.binaryPath) + s.cmd.Env = append(os.Environ(), s.env...) + + // Create pipes for subprocess output + stdout, err := s.cmd.StdoutPipe() + if err != nil { + return fmt.Errorf("failed to create stdout pipe: %w", err) + } + stderr, err := s.cmd.StderrPipe() + if err != nil { + return fmt.Errorf("failed to create stderr pipe: %w", err) + } + + if err := s.cmd.Start(); err != nil { + return fmt.Errorf("failed to start %s: %w", s.name, err) + } + + // Start goroutines to log subprocess output at debug level + go s.logPipe(stdout, "stdout") + go s.logPipe(stderr, "stderr") + + s.logger.Debug(). + Int("pid", s.cmd.Process.Pid). + Str("binary", s.binaryPath). + Strs("custom_env", s.env). + Msg("process started") + + return nil +} diff --git a/internal/usbgadget/changeset_resolver.go b/internal/usbgadget/changeset_resolver.go index 67812e0d..c06fac96 100644 --- a/internal/usbgadget/changeset_resolver.go +++ b/internal/usbgadget/changeset_resolver.go @@ -1,7 +1,9 @@ package usbgadget import ( + "context" "fmt" + "time" "github.com/rs/zerolog" "github.com/sourcegraph/tf-dag/dag" @@ -114,7 +116,20 @@ func (c *ChangeSetResolver) resolveChanges(initial bool) error { } func (c *ChangeSetResolver) applyChanges() error { + return c.applyChangesWithTimeout(45 * time.Second) +} + +func (c *ChangeSetResolver) applyChangesWithTimeout(timeout time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + for _, change := range c.resolvedChanges { + select { + case <-ctx.Done(): + return fmt.Errorf("USB gadget reconfiguration timed out after %v: %w", timeout, ctx.Err()) + default: + } + change.ResetActionResolution() action := change.Action() actionStr := FileChangeResolvedActionString[action] @@ -126,7 +141,7 @@ func (c *ChangeSetResolver) applyChanges() error { l.Str("action", actionStr).Str("change", change.String()).Msg("applying change") - err := c.changeset.applyChange(change) + err := c.applyChangeWithTimeout(ctx, change) if err != nil { if change.IgnoreErrors { c.l.Warn().Str("change", change.String()).Err(err).Msg("ignoring error") @@ -139,6 +154,20 @@ func (c *ChangeSetResolver) applyChanges() error { return nil } +func (c *ChangeSetResolver) applyChangeWithTimeout(ctx context.Context, change *FileChange) error { + done := make(chan error, 1) + go func() { + done <- c.changeset.applyChange(change) + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + return fmt.Errorf("change application timed out for %s: %w", change.String(), ctx.Err()) + } +} + func (c *ChangeSetResolver) GetChanges() ([]*FileChange, error) { localChanges := c.changeset.Changes changesMap := make(map[string]*FileChange) diff --git a/internal/usbgadget/config.go b/internal/usbgadget/config.go index 6d1bd391..0a9b3caa 100644 --- a/internal/usbgadget/config.go +++ b/internal/usbgadget/config.go @@ -59,6 +59,23 @@ var defaultGadgetConfig = map[string]gadgetConfigItem{ // mass storage "mass_storage_base": massStorageBaseConfig, "mass_storage_lun0": massStorageLun0Config, + // audio (UAC1 - USB Audio Class 1) + "audio": { + order: 4000, + device: "uac1.usb0", + path: []string{"functions", "uac1.usb0"}, + configPath: []string{"uac1.usb0"}, + attrs: gadgetAttributes{ + "p_chmask": "3", // Playback: stereo (2 channels) + "p_srate": "48000", // Playback: 48kHz sample rate + "p_ssize": "2", // Playback: 16-bit (2 bytes) + "p_volume_present": "0", // Playback: no volume control + "c_chmask": "3", // Capture: stereo (2 channels) + "c_srate": "48000", // Capture: 48kHz sample rate + "c_ssize": "2", // Capture: 16-bit (2 bytes) + "c_volume_present": "0", // Capture: no volume control + }, + }, } func (u *UsbGadget) isGadgetConfigItemEnabled(itemKey string) bool { @@ -73,6 +90,8 @@ func (u *UsbGadget) isGadgetConfigItemEnabled(itemKey string) bool { return u.enabledDevices.MassStorage case "mass_storage_lun0": return u.enabledDevices.MassStorage + case "audio": + return u.enabledDevices.Audio default: return true } @@ -182,6 +201,9 @@ func (u *UsbGadget) Init() error { return u.logError("unable to initialize USB stack", err) } + // Pre-open HID files to reduce input latency + u.PreOpenHidFiles() + return nil } @@ -191,11 +213,17 @@ func (u *UsbGadget) UpdateGadgetConfig() error { u.loadGadgetConfig() + // Close HID files before reconfiguration to prevent "file already closed" errors + u.CloseHidFiles() + err := u.configureUsbGadget(true) if err != nil { return u.logError("unable to update gadget config", err) } + // Reopen HID files after reconfiguration + u.PreOpenHidFiles() + return nil } diff --git a/internal/usbgadget/config_tx.go b/internal/usbgadget/config_tx.go index df8a3d1b..d5591b17 100644 --- a/internal/usbgadget/config_tx.go +++ b/internal/usbgadget/config_tx.go @@ -1,10 +1,12 @@ package usbgadget import ( + "context" "fmt" "path" "path/filepath" "sort" + "time" "github.com/rs/zerolog" ) @@ -52,22 +54,49 @@ func (u *UsbGadget) newUsbGadgetTransaction(lock bool) error { } func (u *UsbGadget) WithTransaction(fn func() error) error { - u.txLock.Lock() - defer u.txLock.Unlock() + return u.WithTransactionTimeout(fn, 60*time.Second) +} - err := u.newUsbGadgetTransaction(false) - if err != nil { - u.log.Error().Err(err).Msg("failed to create transaction") - return err - } - if err := fn(); err != nil { - u.log.Error().Err(err).Msg("transaction failed") - return err - } - result := u.tx.Commit() - u.tx = nil +// WithTransactionTimeout executes a USB gadget transaction with a specified timeout +// to prevent indefinite blocking during USB reconfiguration operations +func (u *UsbGadget) WithTransactionTimeout(fn func() error, timeout time.Duration) error { + // Create a context with timeout for the entire transaction + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() - return result + // Channel to signal when the transaction is complete + done := make(chan error, 1) + + // Execute the transaction in a goroutine + go func() { + u.txLock.Lock() + defer u.txLock.Unlock() + + err := u.newUsbGadgetTransaction(false) + if err != nil { + u.log.Error().Err(err).Msg("failed to create transaction") + done <- err + return + } + + if err := fn(); err != nil { + u.log.Error().Err(err).Msg("transaction failed") + done <- err + return + } + + result := u.tx.Commit() + u.tx = nil + done <- result + }() + + // Wait for either completion or timeout + select { + case err := <-done: + return err + case <-ctx.Done(): + return fmt.Errorf("USB gadget transaction timed out after %v: %w", timeout, ctx.Err()) + } } func (tx *UsbGadgetTransaction) addFileChange(component string, change RequestedFileChange) string { diff --git a/internal/usbgadget/usbgadget.go b/internal/usbgadget/usbgadget.go index f01ae09d..c7c86d23 100644 --- a/internal/usbgadget/usbgadget.go +++ b/internal/usbgadget/usbgadget.go @@ -19,6 +19,7 @@ type Devices struct { RelativeMouse bool `json:"relative_mouse"` Keyboard bool `json:"keyboard"` MassStorage bool `json:"mass_storage"` + Audio bool `json:"audio"` } // Config is a struct that represents the customizations for a USB gadget. @@ -39,6 +40,7 @@ var defaultUsbGadgetDevices = Devices{ RelativeMouse: true, Keyboard: true, MassStorage: true, + Audio: true, } type KeysDownState struct { @@ -188,3 +190,63 @@ func (u *UsbGadget) Close() error { return nil } + +// CloseHidFiles closes all open HID files +func (u *UsbGadget) CloseHidFiles() { + u.log.Debug().Msg("closing HID files") + + // Close keyboard HID file + if u.keyboardHidFile != nil { + if err := u.keyboardHidFile.Close(); err != nil { + u.log.Debug().Err(err).Msg("failed to close keyboard HID file") + } + u.keyboardHidFile = nil + } + + // Close absolute mouse HID file + if u.absMouseHidFile != nil { + if err := u.absMouseHidFile.Close(); err != nil { + u.log.Debug().Err(err).Msg("failed to close absolute mouse HID file") + } + u.absMouseHidFile = nil + } + + // Close relative mouse HID file + if u.relMouseHidFile != nil { + if err := u.relMouseHidFile.Close(); err != nil { + u.log.Debug().Err(err).Msg("failed to close relative mouse HID file") + } + u.relMouseHidFile = nil + } +} + +// PreOpenHidFiles opens all HID files to reduce input latency +func (u *UsbGadget) PreOpenHidFiles() { + // Add a small delay to allow USB gadget reconfiguration to complete + // This prevents "no such device or address" errors when trying to open HID files + time.Sleep(100 * time.Millisecond) + + if u.enabledDevices.Keyboard { + if err := u.openKeyboardHidFile(); err != nil { + u.log.Debug().Err(err).Msg("failed to pre-open keyboard HID file") + } + } + if u.enabledDevices.AbsoluteMouse { + if u.absMouseHidFile == nil { + var err error + u.absMouseHidFile, err = os.OpenFile("/dev/hidg1", os.O_RDWR, 0666) + if err != nil { + u.log.Debug().Err(err).Msg("failed to pre-open absolute mouse HID file") + } + } + } + if u.enabledDevices.RelativeMouse { + if u.relMouseHidFile == nil { + var err error + u.relMouseHidFile, err = os.OpenFile("/dev/hidg2", os.O_RDWR, 0666) + if err != nil { + u.log.Debug().Err(err).Msg("failed to pre-open relative mouse HID file") + } + } + } +} diff --git a/jsonrpc.go b/jsonrpc.go index 0ff44a78..7c80cbb7 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -700,7 +700,8 @@ func rpcSetUsbConfig(usbConfig usbgadget.Config) error { LoadConfig() config.UsbConfig = &usbConfig gadget.SetGadgetConfig(config.UsbConfig) - return updateUsbRelatedConfig() + wasAudioEnabled := config.UsbDevices != nil && config.UsbDevices.Audio + return updateUsbRelatedConfig(wasAudioEnabled) } func rpcGetWakeOnLanDevices() ([]WakeOnLanDevice, error) { @@ -911,23 +912,67 @@ func rpcGetUsbDevices() (usbgadget.Devices, error) { return *config.UsbDevices, nil } -func updateUsbRelatedConfig() error { +func updateUsbRelatedConfig(wasAudioEnabled bool) error { + ensureConfigLoaded() + + audioSourceChanged := false + + // If USB audio is being disabled and audio output source is USB, switch to HDMI + if config.UsbDevices != nil && !config.UsbDevices.Audio && config.AudioOutputSource == "usb" { + audioMutex.Lock() + config.AudioOutputSource = "hdmi" + useUSBForAudioOutput = false + audioSourceChanged = true + audioMutex.Unlock() + } + + // If USB audio is being enabled (was disabled, now enabled), switch to USB + if config.UsbDevices != nil && config.UsbDevices.Audio && !wasAudioEnabled { + audioMutex.Lock() + config.AudioOutputSource = "usb" + useUSBForAudioOutput = true + audioSourceChanged = true + audioMutex.Unlock() + } + + // Stop audio subprocesses before USB reconfiguration + // Input always uses USB, output depends on audioSourceChanged + audioMutex.Lock() + stopInputSubprocessLocked() + if audioSourceChanged { + stopOutputSubprocessLocked() + } + audioMutex.Unlock() + if err := gadget.UpdateGadgetConfig(); err != nil { return fmt.Errorf("failed to write gadget config: %w", err) } + if err := SaveConfig(); err != nil { return fmt.Errorf("failed to save config: %w", err) } + + // Restart audio if source changed or USB audio is enabled with active connections + // The subprocess supervisor and relay handle device readiness via retry logic + if activeConnections.Load() > 0 && (audioSourceChanged || (config.UsbDevices != nil && config.UsbDevices.Audio)) { + if err := startAudioSubprocesses(); err != nil { + logger.Warn().Err(err).Msg("Failed to restart audio after USB reconfiguration") + } + } + return nil } func rpcSetUsbDevices(usbDevices usbgadget.Devices) error { + wasAudioEnabled := config.UsbDevices != nil && config.UsbDevices.Audio config.UsbDevices = &usbDevices gadget.SetGadgetDevices(config.UsbDevices) - return updateUsbRelatedConfig() + return updateUsbRelatedConfig(wasAudioEnabled) } func rpcSetUsbDeviceState(device string, enabled bool) error { + wasAudioEnabled := config.UsbDevices != nil && config.UsbDevices.Audio + switch device { case "absoluteMouse": config.UsbDevices.AbsoluteMouse = enabled @@ -937,11 +982,43 @@ func rpcSetUsbDeviceState(device string, enabled bool) error { config.UsbDevices.Keyboard = enabled case "massStorage": config.UsbDevices.MassStorage = enabled + case "audio": + config.UsbDevices.Audio = enabled default: return fmt.Errorf("invalid device: %s", device) } gadget.SetGadgetDevices(config.UsbDevices) - return updateUsbRelatedConfig() + return updateUsbRelatedConfig(wasAudioEnabled) +} + +func rpcGetAudioOutputSource() (string, error) { + ensureConfigLoaded() + return config.AudioOutputSource, nil +} + +func rpcSetAudioOutputSource(source string) error { + if source != "hdmi" && source != "usb" { + return fmt.Errorf("invalid audio output source: %s (must be 'hdmi' or 'usb')", source) + } + + useUSB := source == "usb" + return SetAudioOutputSource(useUSB) +} + +func rpcGetAudioOutputEnabled() (bool, error) { + return audioOutputEnabled.Load(), nil +} + +func rpcSetAudioOutputEnabled(enabled bool) error { + return SetAudioOutputEnabled(enabled) +} + +func rpcGetAudioInputEnabled() (bool, error) { + return audioInputEnabled.Load(), nil +} + +func rpcSetAudioInputEnabled(enabled bool) error { + return SetAudioInputEnabled(enabled) } func rpcSetCloudUrl(apiUrl string, appUrl string) error { @@ -1260,6 +1337,12 @@ var rpcHandlers = map[string]RPCHandler{ "getUsbDevices": {Func: rpcGetUsbDevices}, "setUsbDevices": {Func: rpcSetUsbDevices, Params: []string{"devices"}}, "setUsbDeviceState": {Func: rpcSetUsbDeviceState, Params: []string{"device", "enabled"}}, + "getAudioOutputSource": {Func: rpcGetAudioOutputSource}, + "setAudioOutputSource": {Func: rpcSetAudioOutputSource, Params: []string{"source"}}, + "getAudioOutputEnabled": {Func: rpcGetAudioOutputEnabled}, + "setAudioOutputEnabled": {Func: rpcSetAudioOutputEnabled, Params: []string{"enabled"}}, + "getAudioInputEnabled": {Func: rpcGetAudioInputEnabled}, + "setAudioInputEnabled": {Func: rpcSetAudioInputEnabled, Params: []string{"enabled"}}, "setCloudUrl": {Func: rpcSetCloudUrl, Params: []string{"apiUrl", "appUrl"}}, "getKeyboardLayout": {Func: rpcGetKeyboardLayout}, "setKeyboardLayout": {Func: rpcSetKeyboardLayout, Params: []string{"layout"}}, diff --git a/main.go b/main.go index e9931d46..89cda6f3 100644 --- a/main.go +++ b/main.go @@ -34,6 +34,7 @@ func Main() { go confirmCurrentSystem() initNative(systemVersionLocal, appVersionLocal) + initAudio() http.DefaultClient.Timeout = 1 * time.Minute @@ -123,6 +124,9 @@ func Main() { signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) <-sigs logger.Info().Msg("JetKVM Shutting Down") + + stopAudioSubprocesses() + //if fuseServer != nil { // err := setMassStorageImage(" ") // if err != nil { diff --git a/ui/src/components/ActionBar.tsx b/ui/src/components/ActionBar.tsx index 4f79d7ed..3a76c869 100644 --- a/ui/src/components/ActionBar.tsx +++ b/ui/src/components/ActionBar.tsx @@ -1,5 +1,5 @@ import { MdOutlineContentPasteGo } from "react-icons/md"; -import { LuCable, LuHardDrive, LuMaximize, LuSettings, LuSignal } from "react-icons/lu"; +import { LuCable, LuHardDrive, LuMaximize, LuSettings, LuSignal, LuVolume2 } from "react-icons/lu"; import { FaKeyboard } from "react-icons/fa6"; import { Popover, PopoverButton, PopoverPanel } from "@headlessui/react"; import { Fragment, useCallback, useRef } from "react"; @@ -18,6 +18,7 @@ import PasteModal from "@/components/popovers/PasteModal"; import WakeOnLanModal from "@/components/popovers/WakeOnLan/Index"; import MountPopopover from "@/components/popovers/MountPopover"; import ExtensionPopover from "@/components/popovers/ExtensionPopover"; +import AudioPopover from "@/components/popovers/AudioPopover"; import { useDeviceUiNavigation } from "@/hooks/useAppNavigation"; export default function Actionbar({ @@ -203,6 +204,36 @@ export default function Actionbar({ onClick={() => setVirtualKeyboardEnabled(!isVirtualKeyboardEnabled)} /> + + +