From 79cb2aa5ef34172712a562955243a32d8d4a9f43 Mon Sep 17 00:00:00 2001 From: Aveline <352441+ym@users.noreply.github.com> Date: Fri, 21 Nov 2025 01:44:50 +0100 Subject: [PATCH 1/4] fix(ota): set updating to false when no updates are available (#996) Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/ota/ota.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/ota/ota.go b/internal/ota/ota.go index 52cbb6e2..52b38c7d 100644 --- a/internal/ota/ota.go +++ b/internal/ota/ota.go @@ -228,6 +228,13 @@ func (s *State) doUpdate(ctx context.Context, params UpdateParams) error { s.triggerComponentUpdateState("system", systemUpdate) } + if !appUpdate.pending && !systemUpdate.pending { + scopedLogger.Info().Msg("No updates available") + s.updating = false + s.triggerStateUpdate() + return nil + } + scopedLogger.Trace().Bool("pending", appUpdate.pending).Msg("Checking for app update") if appUpdate.pending { From 316c2e6d37a6aee5f1a1953d52668c48995c34b4 Mon Sep 17 00:00:00 2001 From: Aveline <352441+ym@users.noreply.github.com> Date: Fri, 21 Nov 2025 13:26:29 +0100 Subject: [PATCH 2/4] fix(network): IPv6 addresses sorting was using wrong references (#997) --- pkg/nmlite/interface_state.go | 65 +++++++++++++++++++++++++++++++++-- pkg/nmlite/utils.go | 14 ++++---- 2 files changed, 71 insertions(+), 8 deletions(-) diff --git a/pkg/nmlite/interface_state.go b/pkg/nmlite/interface_state.go index 087cf010..efa5f087 100644 --- a/pkg/nmlite/interface_state.go +++ b/pkg/nmlite/interface_state.go @@ -2,6 +2,7 @@ package nmlite import ( "fmt" + "strings" "time" "github.com/jetkvm/kvm/internal/network/types" @@ -9,6 +10,40 @@ import ( "github.com/vishvananda/netlink" ) +type IfStateChangeReason uint + +const ( + IfStateOperStateChanged IfStateChangeReason = 1 + IfStateOnlineStateChanged IfStateChangeReason = 2 + IfStateMACAddressChanged IfStateChangeReason = 3 + IfStateIPAddressesChanged IfStateChangeReason = 4 +) + +type IfStateChangeReasons []IfStateChangeReason + +func (r IfStateChangeReason) String() string { + switch r { + case IfStateOperStateChanged: + return "oper state changed" + case IfStateOnlineStateChanged: + return "online state changed" + case IfStateMACAddressChanged: + return "MAC address changed" + case IfStateIPAddressesChanged: + return "IP addresses changed" + default: + return fmt.Sprintf("unknown change reason %d", r) + } +} + +func (rs IfStateChangeReasons) String() string { + reasons := []string{} + for _, r := range rs { + reasons = append(reasons, r.String()) + } + return strings.Join(reasons, ", ") +} + // updateInterfaceState updates the current interface state func (im *InterfaceManager) updateInterfaceState() error { nl, err := im.link() @@ -16,7 +51,10 @@ func (im *InterfaceManager) updateInterfaceState() error { return fmt.Errorf("failed to get interface: %w", err) } - var stateChanged bool + var ( + stateChanged bool + changeReasons IfStateChangeReasons + ) attrs := nl.Attrs() @@ -29,6 +67,7 @@ func (im *InterfaceManager) updateInterfaceState() error { if im.state.Up != isUp { im.state.Up = isUp stateChanged = true + changeReasons = append(changeReasons, IfStateOperStateChanged) } // Check if the interface is online @@ -36,12 +75,14 @@ func (im *InterfaceManager) updateInterfaceState() error { if im.state.Online != isOnline { im.state.Online = isOnline stateChanged = true + changeReasons = append(changeReasons, IfStateOnlineStateChanged) } // Check if the MAC address has changed if im.state.MACAddress != attrs.HardwareAddr.String() { im.state.MACAddress = attrs.HardwareAddr.String() stateChanged = true + changeReasons = append(changeReasons, IfStateMACAddressChanged) } // Update IP addresses @@ -49,6 +90,7 @@ func (im *InterfaceManager) updateInterfaceState() error { im.logger.Error().Err(err).Msg("failed to update IP addresses") } else if ipChanged { stateChanged = true + changeReasons = append(changeReasons, IfStateIPAddressesChanged) } im.state.LastUpdated = time.Now() @@ -56,7 +98,10 @@ func (im *InterfaceManager) updateInterfaceState() error { // Notify callback if state changed if stateChanged && im.onStateChange != nil { - im.logger.Debug().Interface("state", im.state).Msg("notifying state change") + im.logger.Debug(). + Stringer("changeReasons", changeReasons). + Interface("state", im.state). + Msg("notifying state change") im.onStateChange(*im.state) } @@ -80,6 +125,7 @@ func (im *InterfaceManager) updateInterfaceStateAddresses(nl *link.Link) (bool, ipv6Gateway string ipv4Ready, ipv6Ready = false, false stateChanged = false + stateChangeReason string ) routes, _ := mgr.ListDefaultRoutes(link.AfInet6) @@ -123,40 +169,55 @@ func (im *InterfaceManager) updateInterfaceStateAddresses(nl *link.Link) (bool, if !sortAndCompareStringSlices(im.state.IPv4Addresses, ipv4Addresses) { im.state.IPv4Addresses = ipv4Addresses stateChanged = true + stateChangeReason = "IPv4 addresses changed" } if !sortAndCompareIPv6AddressSlices(im.state.IPv6Addresses, ipv6Addresses) { im.state.IPv6Addresses = ipv6Addresses stateChanged = true + stateChangeReason = "IPv6 addresses changed" } if im.state.IPv4Address != ipv4Addr { im.state.IPv4Address = ipv4Addr stateChanged = true + stateChangeReason = "IPv4 address changed" } if im.state.IPv6Address != ipv6Addr { im.state.IPv6Address = ipv6Addr stateChanged = true + stateChangeReason = "IPv6 address changed" } if im.state.IPv6LinkLocal != ipv6LinkLocal { im.state.IPv6LinkLocal = ipv6LinkLocal stateChanged = true + stateChangeReason = "IPv6 link local address changed" } if im.state.IPv6Gateway != ipv6Gateway { im.state.IPv6Gateway = ipv6Gateway stateChanged = true + stateChangeReason = "IPv6 gateway changed" } if im.state.IPv4Ready != ipv4Ready { im.state.IPv4Ready = ipv4Ready stateChanged = true + stateChangeReason = "IPv4 ready state changed" } if im.state.IPv6Ready != ipv6Ready { im.state.IPv6Ready = ipv6Ready stateChanged = true + stateChangeReason = "IPv6 ready state changed" + } + + if stateChanged { + im.logger.Trace(). + Str("changeReason", stateChangeReason). + Interface("state", im.state). + Msg("interface state changed") } return stateChanged, nil diff --git a/pkg/nmlite/utils.go b/pkg/nmlite/utils.go index 49ed0078..11952d0b 100644 --- a/pkg/nmlite/utils.go +++ b/pkg/nmlite/utils.go @@ -42,17 +42,19 @@ func sortAndCompareStringSlices(a, b []string) bool { return true } +func sortIPv6AddressSlicesStable(a []types.IPv6Address) { + sort.SliceStable(a, func(i, j int) bool { + return a[i].Address.String() < a[j].Address.String() + }) +} + func sortAndCompareIPv6AddressSlices(a, b []types.IPv6Address) bool { if len(a) != len(b) { return false } - sort.SliceStable(a, func(i, j int) bool { - return a[i].Address.String() < b[j].Address.String() - }) - sort.SliceStable(b, func(i, j int) bool { - return b[i].Address.String() < a[j].Address.String() - }) + sortIPv6AddressSlicesStable(a) + sortIPv6AddressSlicesStable(b) for i := range a { if a[i].Address.String() != b[i].Address.String() { From c472752c56d4953db7f54f2fcc5cccad71a981e6 Mon Sep 17 00:00:00 2001 From: Aveline <352441+ym@users.noreply.github.com> Date: Fri, 21 Nov 2025 13:31:13 +0100 Subject: [PATCH 3/4] fix: hidRPC handshake packet should be only sent once (#969) --- ui/package-lock.json | 13 +++ ui/package.json | 1 + ui/src/hooks/useHidRpc.ts | 194 +++++++++++++++++++++++----------- ui/src/routes/devices.$id.tsx | 4 + ui/vite.config.ts | 2 +- 5 files changed, 151 insertions(+), 63 deletions(-) diff --git a/ui/package-lock.json b/ui/package-lock.json index 437b0c93..bd1d54ef 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -37,6 +37,7 @@ "react-xtermjs": "^1.0.10", "recharts": "^3.3.0", "tailwind-merge": "^3.3.1", + "tslog": "^4.10.2", "usehooks-ts": "^3.1.1", "validator": "^13.15.20", "zustand": "^4.5.2" @@ -7330,6 +7331,18 @@ "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", "license": "0BSD" }, + "node_modules/tslog": { + "version": "4.10.2", + "resolved": "https://registry.npmjs.org/tslog/-/tslog-4.10.2.tgz", + "integrity": "sha512-XuELoRpMR+sq8fuWwX7P0bcj+PRNiicOKDEb3fGNURhxWVyykCi9BNq7c4uVz7h7P0sj8qgBsr5SWS6yBClq3g==", + "license": "MIT", + "engines": { + "node": ">=16" + }, + "funding": { + "url": "https://github.com/fullstack-build/tslog?sponsor=1" + } + }, "node_modules/type-check": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz", diff --git a/ui/package.json b/ui/package.json index 0f4d7f64..70eaa662 100644 --- a/ui/package.json +++ b/ui/package.json @@ -56,6 +56,7 @@ "react-xtermjs": "^1.0.10", "recharts": "^3.3.0", "tailwind-merge": "^3.3.1", + "tslog": "^4.10.2", "usehooks-ts": "^3.1.1", "validator": "^13.15.20", "zustand": "^4.5.2" diff --git a/ui/src/hooks/useHidRpc.ts b/ui/src/hooks/useHidRpc.ts index 2db8279f..38014d20 100644 --- a/ui/src/hooks/useHidRpc.ts +++ b/ui/src/hooks/useHidRpc.ts @@ -1,4 +1,5 @@ import { useCallback, useEffect, useMemo } from "react"; +import { Logger } from "tslog"; import { useRTCStore } from "@hooks/stores"; @@ -25,6 +26,128 @@ interface sendMessageParams { requireOrdered?: boolean; } +const HANDSHAKE_TIMEOUT = 30 * 1000; // 30 seconds +const HANDSHAKE_MAX_ATTEMPTS = 10; +const logger = new Logger({ name: "hidrpc" }); + +export function doRpcHidHandshake(rpcHidChannel: RTCDataChannel, setRpcHidProtocolVersion: (version: number | null) => void) { + let attempts = 0; + let lastConnectedTime: Date | undefined; + let lastSendTime: Date | undefined; + let handshakeCompleted = false; + let handshakeInterval: ReturnType | null = null; + + const shouldGiveUp = () => { + if (attempts > HANDSHAKE_MAX_ATTEMPTS) { + logger.error(`Failed to send handshake message after ${HANDSHAKE_MAX_ATTEMPTS} attempts`); + return true; + } + + const timeSinceConnected = lastConnectedTime ? Date.now() - lastConnectedTime.getTime() : 0; + if (timeSinceConnected > HANDSHAKE_TIMEOUT) { + logger.error(`Handshake timed out after ${timeSinceConnected}ms`); + return true; + } + + return false; + } + + const sendHandshake = (initial: boolean) => { + if (handshakeCompleted) return; + + attempts++; + lastSendTime = new Date(); + + if (!initial && shouldGiveUp()) { + if (handshakeInterval) { + clearInterval(handshakeInterval); + handshakeInterval = null; + } + return; + } + + let data: Uint8Array | undefined; + try { + const message = new HandshakeMessage(HID_RPC_VERSION); + data = message.marshal(); + } catch (e) { + logger.error("Failed to marshal message", e); + return; + } + if (!data) return; + rpcHidChannel.send(data as unknown as ArrayBuffer); + + if (initial) { + handshakeInterval = setInterval(() => { + sendHandshake(false); + }, 1000); + } + }; + + const onMessage = (ev: MessageEvent) => { + const message = unmarshalHidRpcMessage(new Uint8Array(ev.data)); + if (!message || !(message instanceof HandshakeMessage)) return; + + if (!message.version) { + logger.error("Received handshake message without version", message); + return; + } + + if (message.version > HID_RPC_VERSION) { + // we assume that the UI is always using the latest version of the HID RPC protocol + // so we can't support this + // TODO: use capabilities to determine rather than version number + logger.error("Server is using a newer version than the client", message); + return; + } + + setRpcHidProtocolVersion(message.version); + + const timeUsed = lastSendTime ? Date.now() - lastSendTime.getTime() : 0; + logger.info(`Handshake completed in ${timeUsed}ms after ${attempts} attempts (Version: ${message.version} / ${HID_RPC_VERSION})`); + + // clean up + rpcHidChannel.removeEventListener("message", onMessage); + resetHandshake({ completed: true }); + }; + + const resetHandshake = ({ lastConnectedTime: newLastConnectedTime, completed }: { lastConnectedTime?: Date | undefined, completed?: boolean }) => { + if (newLastConnectedTime) lastConnectedTime = newLastConnectedTime; + lastSendTime = undefined; + attempts = 0; + if (completed !== undefined) handshakeCompleted = completed; + if (handshakeInterval) { + clearInterval(handshakeInterval); + handshakeInterval = null; + } + }; + + const onConnected = () => { + resetHandshake({ lastConnectedTime: new Date() }); + logger.info("Channel connected"); + + sendHandshake(true); + rpcHidChannel.addEventListener("message", onMessage); + }; + + const onClose = () => { + resetHandshake({ lastConnectedTime: undefined, completed: false }); + + logger.info("Channel closed"); + setRpcHidProtocolVersion(null); + + rpcHidChannel.removeEventListener("message", onMessage); + }; + + rpcHidChannel.addEventListener("open", onConnected); + rpcHidChannel.addEventListener("close", onClose); + + // handle case where channel is already open when the hook is mounted + if (rpcHidChannel.readyState === "open") { + onConnected(); + } +} + export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) { const { rpcHidChannel, @@ -78,7 +201,7 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) { try { data = message.marshal(); } catch (e) { - console.error("Failed to marshal HID RPC message", e); + logger.error("Failed to marshal message", e); } if (!data) return; @@ -151,99 +274,46 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) { sendMessage(KEEPALIVE_MESSAGE); }, [sendMessage]); - const sendHandshake = useCallback(() => { - if (hidRpcDisabled) return; - if (rpcHidProtocolVersion) return; - if (!rpcHidChannel) return; - - sendMessage(new HandshakeMessage(HID_RPC_VERSION), { ignoreHandshakeState: true }); - }, [rpcHidChannel, rpcHidProtocolVersion, sendMessage, hidRpcDisabled]); - - const handleHandshake = useCallback( - (message: HandshakeMessage) => { - if (hidRpcDisabled) return; - - if (!message.version) { - console.error("Received handshake message without version", message); - return; - } - - if (message.version > HID_RPC_VERSION) { - // we assume that the UI is always using the latest version of the HID RPC protocol - // so we can't support this - // TODO: use capabilities to determine rather than version number - console.error("Server is using a newer HID RPC version than the client", message); - return; - } - - setRpcHidProtocolVersion(message.version); - }, - [setRpcHidProtocolVersion, hidRpcDisabled], - ); - useEffect(() => { if (!rpcHidChannel) return; if (hidRpcDisabled) return; - // send handshake message - sendHandshake(); - const messageHandler = (e: MessageEvent) => { if (typeof e.data === "string") { - console.warn("Received string data in HID RPC message handler", e.data); + logger.warn("Received string data in message handler", e.data); return; } const message = unmarshalHidRpcMessage(new Uint8Array(e.data)); if (!message) { - console.warn("Received invalid HID RPC message", e.data); + logger.warn("Received invalid message", e.data); return; } - console.debug("Received HID RPC message", message); - switch (message.constructor) { - case HandshakeMessage: - handleHandshake(message as HandshakeMessage); - break; - default: - // not all events are handled here, the rest are handled by the onHidRpcMessage callback - break; - } + if (message instanceof HandshakeMessage) return; // handshake message is handled by the doRpcHidHandshake function + + // to remove it from the production build, we need to use the /* @__PURE__ */ comment here + // setting `esbuild.pure` doesn't work + /* @__PURE__ */ logger.debug("Received message", message); onHidRpcMessage?.(message); }; - const openHandler = () => { - console.info("HID RPC channel opened"); - sendHandshake(); - }; - - const closeHandler = () => { - console.info("HID RPC channel closed"); - setRpcHidProtocolVersion(null); - }; - const errorHandler = (e: Event) => { - console.error(`Error on rpcHidChannel '${rpcHidChannel.label}': ${e}`) + logger.error(`Error on channel '${rpcHidChannel.label}'`, e); }; rpcHidChannel.addEventListener("message", messageHandler); - rpcHidChannel.addEventListener("close", closeHandler); rpcHidChannel.addEventListener("error", errorHandler); - rpcHidChannel.addEventListener("open", openHandler); return () => { rpcHidChannel.removeEventListener("message", messageHandler); - rpcHidChannel.removeEventListener("close", closeHandler); rpcHidChannel.removeEventListener("error", errorHandler); - rpcHidChannel.removeEventListener("open", openHandler); }; }, [ rpcHidChannel, onHidRpcMessage, setRpcHidProtocolVersion, - sendHandshake, - handleHandshake, hidRpcDisabled, ]); diff --git a/ui/src/routes/devices.$id.tsx b/ui/src/routes/devices.$id.tsx index dbf41395..043125c0 100644 --- a/ui/src/routes/devices.$id.tsx +++ b/ui/src/routes/devices.$id.tsx @@ -53,6 +53,7 @@ import { } from "@components/VideoOverlay"; import { FeatureFlagProvider } from "@providers/FeatureFlagProvider"; import { m } from "@localizations/messages.js"; +import { doRpcHidHandshake } from "@hooks/useHidRpc"; export type AuthMode = "password" | "noPassword" | null; @@ -127,6 +128,7 @@ export default function KvmIdRoute() { setRpcHidChannel, setRpcHidUnreliableNonOrderedChannel, setRpcHidUnreliableChannel, + setRpcHidProtocolVersion, } = useRTCStore(); const location = useLocation(); @@ -498,6 +500,7 @@ export default function KvmIdRoute() { rpcHidChannel.onopen = () => { setRpcHidChannel(rpcHidChannel); }; + doRpcHidHandshake(rpcHidChannel, setRpcHidProtocolVersion); const rpcHidUnreliableChannel = pc.createDataChannel("hidrpc-unreliable-ordered", { ordered: true, @@ -534,6 +537,7 @@ export default function KvmIdRoute() { setRpcHidChannel, setRpcHidUnreliableNonOrderedChannel, setRpcHidUnreliableChannel, + setRpcHidProtocolVersion, setTransceiver, ]); diff --git a/ui/vite.config.ts b/ui/vite.config.ts index 28324b55..b5260507 100644 --- a/ui/vite.config.ts +++ b/ui/vite.config.ts @@ -39,7 +39,7 @@ export default defineConfig(({ mode, command }) => { return { plugins, esbuild: { - pure: ["console.debug"], + pure: command === "build" ? ["console.debug"]: [], }, assetsInclude: ["**/*.woff2"], build: { From d24ce1c76f429dcfab53eac4c9ea439a88a018cb Mon Sep 17 00:00:00 2001 From: Aveline <352441+ym@users.noreply.github.com> Date: Fri, 21 Nov 2025 14:37:51 +0100 Subject: [PATCH 4/4] fix: stop video stream before enabling sleep mode (#999) --- internal/native/cgo/ctrl.c | 12 ++++ internal/native/cgo/ctrl.h | 3 + internal/native/cgo/video.c | 82 +++++++++++++++++++++------ internal/native/cgo/video.h | 7 +++ internal/native/cgo_linux.go | 10 ++++ internal/native/cgo_notlinux.go | 5 ++ internal/native/chan.go | 1 + internal/native/grpc_servermethods.go | 6 -- internal/native/native.go | 20 +++++++ internal/native/server.go | 22 +++++++ internal/native/video.go | 57 +++++++++++++++++-- webrtc.go | 1 + 12 files changed, 199 insertions(+), 27 deletions(-) diff --git a/internal/native/cgo/ctrl.c b/internal/native/cgo/ctrl.c index 547d5694..62d2c8b9 100644 --- a/internal/native/cgo/ctrl.c +++ b/internal/native/cgo/ctrl.c @@ -59,6 +59,7 @@ const char *jetkvm_ui_event_code_to_name(int code) { void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second) { + state.streaming = video_get_streaming_status(); state.ready = ready; state.error = error; state.width = width; @@ -69,6 +70,13 @@ void video_report_format(bool ready, const char *error, u_int16_t width, u_int16 } } +void video_send_format_report() { + state.streaming = video_get_streaming_status(); + if (video_state_handler != NULL) { + (*video_state_handler)(&state); + } +} + int video_send_frame(const uint8_t *frame, ssize_t len) { if (video_handler != NULL) { @@ -367,6 +375,10 @@ void jetkvm_video_stop() { video_stop_streaming(); } +uint8_t jetkvm_video_get_streaming_status() { + return video_get_streaming_status(); +} + int jetkvm_video_set_quality_factor(float quality_factor) { if (quality_factor <= 0 || quality_factor > 1) { return -1; diff --git a/internal/native/cgo/ctrl.h b/internal/native/cgo/ctrl.h index 774ee147..59f9e4cd 100644 --- a/internal/native/cgo/ctrl.h +++ b/internal/native/cgo/ctrl.h @@ -8,6 +8,7 @@ typedef struct { bool ready; + uint8_t streaming; const char *error; u_int16_t width; u_int16_t height; @@ -56,6 +57,7 @@ int jetkvm_video_init(float quality_factor); void jetkvm_video_shutdown(); void jetkvm_video_start(); void jetkvm_video_stop(); +uint8_t jetkvm_video_get_streaming_status(); int jetkvm_video_set_quality_factor(float quality_factor); float jetkvm_video_get_quality_factor(); int jetkvm_video_set_edid(const char *edid_hex); @@ -64,6 +66,7 @@ char *jetkvm_video_log_status(); jetkvm_video_state_t *jetkvm_video_get_status(); void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second); +void video_send_format_report(); int video_send_frame(const uint8_t *frame, ssize_t len); diff --git a/internal/native/cgo/video.c b/internal/native/cgo/video.c index 857acbbb..9107c70e 100644 --- a/internal/native/cgo/video.c +++ b/internal/native/cgo/video.c @@ -349,7 +349,10 @@ static void *venc_read_stream(void *arg) } uint32_t detected_width, detected_height; -bool detected_signal = false, streaming_flag = false, streaming_stopped = true; +bool detected_signal = false, streaming_flag = false; + +bool streaming_stopped = true; +pthread_mutex_t streaming_stopped_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_t *streaming_thread = NULL; pthread_mutex_t streaming_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -370,8 +373,27 @@ void set_streaming_flag(bool flag) pthread_mutex_lock(&streaming_mutex); streaming_flag = flag; pthread_mutex_unlock(&streaming_mutex); + + video_send_format_report(); } +void set_streaming_stopped(bool stopped) +{ + pthread_mutex_lock(&streaming_stopped_mutex); + streaming_stopped = stopped; + pthread_mutex_unlock(&streaming_stopped_mutex); + + video_send_format_report(); +} + +bool get_streaming_stopped() +{ + pthread_mutex_lock(&streaming_stopped_mutex); + bool stopped = streaming_stopped; + pthread_mutex_unlock(&streaming_stopped_mutex); + return stopped; +} + void write_buffer_to_file(const uint8_t *buffer, size_t length, const char *filename) { FILE *file = fopen(filename, "wb"); @@ -385,8 +407,7 @@ void *run_video_stream(void *arg) log_info("running video stream"); - streaming_stopped = false; - + set_streaming_stopped(false); while (streaming_flag) { if (detected_signal == false) @@ -528,6 +549,8 @@ void *run_video_stream(void *arg) uint32_t num = 0; VIDEO_FRAME_INFO_S stFrame; + + while (streaming_flag) { FD_ZERO(&fds); @@ -539,6 +562,7 @@ void *run_video_stream(void *arg) if (r == 0) { log_info("select timeout"); + ensure_sleep_mode_disabled(); break; } if (r == -1) @@ -634,7 +658,7 @@ void *run_video_stream(void *arg) log_info("video stream thread exiting"); - streaming_stopped = true; + set_streaming_stopped(true); return NULL; } @@ -670,9 +694,10 @@ void video_start_streaming() log_info("starting video streaming"); if (streaming_thread != NULL) { - if (streaming_stopped == true) { + bool stopped = get_streaming_stopped(); + if (stopped == true) { log_error("video streaming already stopped but streaming_thread is not NULL"); - assert(streaming_stopped == true); + assert(stopped == true); } log_warn("video streaming already started"); return; @@ -699,6 +724,21 @@ void video_start_streaming() streaming_thread = new_thread; } +bool wait_for_streaming_stopped() +{ + int attempts = 0; + while (attempts < 30) { + if (get_streaming_stopped() == true) { + log_info("video streaming stopped after %d attempts", attempts); + return true; + } + usleep(100000); // 100ms + attempts++; + } + log_error("video streaming did not stop after 3s"); + return false; +} + void video_stop_streaming() { if (streaming_thread == NULL) { @@ -710,14 +750,7 @@ void video_stop_streaming() set_streaming_flag(false); log_info("waiting for video streaming thread to exit"); - int attempts = 0; - while (!streaming_stopped && attempts < 30) { - usleep(100000); // 100ms - attempts++; - } - if (!streaming_stopped) { - log_error("video streaming thread did not exit after 30s"); - } + wait_for_streaming_stopped(); pthread_join(*streaming_thread, NULL); free(streaming_thread); @@ -726,13 +759,30 @@ void video_stop_streaming() log_info("video streaming stopped"); } +uint8_t video_get_streaming_status() { + // streaming flag can be false when stopping streaming + if (get_streaming_flag() == true) return 1; + if (get_streaming_stopped() == false) return 2; + return 0; +} + void video_restart_streaming() { - if (get_streaming_flag() == true) + uint8_t streaming_status = video_get_streaming_status(); + if (streaming_status == 0) { - log_info("restarting video streaming"); + log_info("will not restart video streaming because it's stopped"); + return; + } + + if (streaming_status == 2) { video_stop_streaming(); } + + if (!wait_for_streaming_stopped()) { + return; + } + video_start_streaming(); } diff --git a/internal/native/cgo/video.h b/internal/native/cgo/video.h index 6fa00ca4..391f7ddd 100644 --- a/internal/native/cgo/video.h +++ b/internal/native/cgo/video.h @@ -31,6 +31,13 @@ void video_start_streaming(); */ void video_stop_streaming(); +/** + * @brief Get the streaming status of the video + * + * @return uint8_t 1 if the video streaming is active, 2 if the video streaming is stopping, 0 otherwise + */ +uint8_t video_get_streaming_status(); + /** * @brief Set the quality factor of the video * diff --git a/internal/native/cgo_linux.go b/internal/native/cgo_linux.go index b33eb534..dcd25e42 100644 --- a/internal/native/cgo_linux.go +++ b/internal/native/cgo_linux.go @@ -57,6 +57,7 @@ var ( func jetkvm_go_video_state_handler(state *C.jetkvm_video_state_t) { videoState := VideoState{ Ready: bool(state.ready), + Streaming: VideoStreamingStatus(state.streaming), Error: C.GoString(state.error), Width: int(state.width), Height: int(state.height), @@ -168,6 +169,15 @@ func videoStop() { C.jetkvm_video_stop() } +func videoGetStreamingStatus() VideoStreamingStatus { + cgoLock.Lock() + defer cgoLock.Unlock() + + isStreaming := C.jetkvm_video_get_streaming_status() + + return VideoStreamingStatus(isStreaming) +} + func videoLogStatus() string { cgoLock.Lock() defer cgoLock.Unlock() diff --git a/internal/native/cgo_notlinux.go b/internal/native/cgo_notlinux.go index 4602f713..9bc77806 100644 --- a/internal/native/cgo_notlinux.go +++ b/internal/native/cgo_notlinux.go @@ -123,6 +123,11 @@ func videoSetEDID(edid string) error { return nil } +func videoGetStreamingStatus() VideoStreamingStatus { + panicPlatformNotSupported() + return VideoStreamingStatusInactive +} + func crash() { panicPlatformNotSupported() } diff --git a/internal/native/chan.go b/internal/native/chan.go index 4162f260..cd6d07af 100644 --- a/internal/native/chan.go +++ b/internal/native/chan.go @@ -28,6 +28,7 @@ func (n *Native) handleVideoFrameChan() { func (n *Native) handleVideoStateChan() { for { state := <-videoStateChan + n.onVideoStateChange(state) } } diff --git a/internal/native/grpc_servermethods.go b/internal/native/grpc_servermethods.go index cc16dfd1..c1dea54f 100644 --- a/internal/native/grpc_servermethods.go +++ b/internal/native/grpc_servermethods.go @@ -70,9 +70,6 @@ func (s *grpcServer) VideoLogStatus(ctx context.Context, req *pb.Empty) (*pb.Vid } func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, error) { - procPrefix = "jetkvm: [native]" - setProcTitle(lastProcTitle) - if err := s.native.VideoStop(); err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -80,9 +77,6 @@ func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, e } func (s *grpcServer) VideoStart(ctx context.Context, req *pb.Empty) (*pb.Empty, error) { - procPrefix = "jetkvm: [native+video]" - setProcTitle(lastProcTitle) - if err := s.native.VideoStart(); err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/internal/native/native.go b/internal/native/native.go index 61c4b0ac..87eebf18 100644 --- a/internal/native/native.go +++ b/internal/native/native.go @@ -40,6 +40,26 @@ type NativeOptions struct { OnNativeRestart func() } +type VideoStreamingStatus uint8 + +const ( + VideoStreamingStatusActive VideoStreamingStatus = 1 + VideoStreamingStatusStopping VideoStreamingStatus = 2 // video is stopping, but not yet stopped + VideoStreamingStatusInactive VideoStreamingStatus = 0 +) + +func (s VideoStreamingStatus) String() string { + switch s { + case VideoStreamingStatusActive: + return "active" + case VideoStreamingStatusStopping: + return "stopping" + case VideoStreamingStatusInactive: + return "inactive" + } + return "unknown" +} + func NewNative(opts NativeOptions) *Native { pid := os.Getpid() nativeSubLogger := nativeLogger.With().Int("pid", pid).Str("scope", "native").Logger() diff --git a/internal/native/server.go b/internal/native/server.go index ae983159..f52289e8 100644 --- a/internal/native/server.go +++ b/internal/native/server.go @@ -54,6 +54,25 @@ func monitorCrashSignal(ctx context.Context, logger *zerolog.Logger, nativeInsta } } +func updateProcessTitle(state *VideoState) { + if state == nil { + procPrefix = "jetkvm: [native]" + } else { + var status string + if state.Streaming == VideoStreamingStatusInactive { + status = "inactive" + } else if !state.Ready { + status = "not ready" + } else if state.Error != "" { + status = state.Error + } else { + status = fmt.Sprintf("%s,%dx%d,%.1ffps", state.Streaming.String(), state.Width, state.Height, state.FramePerSecond) + } + procPrefix = fmt.Sprintf("jetkvm: [native+video{%s}]", status) + } + setProcTitle(lastProcTitle) +} + // RunNativeProcess runs the native process mode func RunNativeProcess(binaryName string) { appCtx, appCtxCancel := context.WithCancel(context.Background()) @@ -82,6 +101,9 @@ func RunNativeProcess(binaryName string) { logger.Fatal().Err(err).Msg("failed to write frame to video stream socket") } } + nativeOptions.OnVideoStateChange = func(state VideoState) { + updateProcessTitle(&state) + } // Create native instance nativeInstance := NewNative(*nativeOptions) diff --git a/internal/native/video.go b/internal/native/video.go index c556a938..176511c6 100644 --- a/internal/native/video.go +++ b/internal/native/video.go @@ -15,11 +15,12 @@ var extraLockTimeout = 5 * time.Second // VideoState is the state of the video stream. type VideoState struct { - Ready bool `json:"ready"` - Error string `json:"error,omitempty"` //no_signal, no_lock, out_of_range - Width int `json:"width"` - Height int `json:"height"` - FramePerSecond float64 `json:"fps"` + Ready bool `json:"ready"` + Streaming VideoStreamingStatus `json:"streaming"` + Error string `json:"error,omitempty"` //no_signal, no_lock, out_of_range + Width int `json:"width"` + Height int `json:"height"` + FramePerSecond float64 `json:"fps"` } func isSleepModeSupported() bool { @@ -27,15 +28,53 @@ func isSleepModeSupported() bool { return err == nil } +const sleepModeWaitTimeout = 3 * time.Second + +func (n *Native) waitForVideoStreamingStatus(status VideoStreamingStatus) error { + timeout := time.After(sleepModeWaitTimeout) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + if videoGetStreamingStatus() == status { + return nil + } + select { + case <-timeout: + return fmt.Errorf("timed out waiting for video streaming status to be %s", status.String()) + case <-ticker.C: + } + } +} + +// before calling this function, make sure to lock n.videoLock func (n *Native) setSleepMode(enabled bool) error { if !n.sleepModeSupported { return nil } bEnabled := "0" + shouldWait := false if enabled { bEnabled = "1" + + switch videoGetStreamingStatus() { + case VideoStreamingStatusActive: + n.l.Info().Msg("stopping video stream to enable sleep mode") + videoStop() + shouldWait = true + case VideoStreamingStatusStopping: + n.l.Info().Msg("video stream is stopping, will enable sleep mode in a few seconds") + shouldWait = true + } } + + if shouldWait { + if err := n.waitForVideoStreamingStatus(VideoStreamingStatusInactive); err != nil { + return err + } + } + return os.WriteFile(sleepModeFile, []byte(bEnabled), 0644) } @@ -159,3 +198,11 @@ func (n *Native) VideoStart() error { videoStart() return nil } + +// VideoGetStreamingStatus gets the streaming status of the video. +func (n *Native) VideoGetStreamingStatus() VideoStreamingStatus { + n.videoLock.Lock() + defer n.videoLock.Unlock() + + return videoGetStreamingStatus() +} diff --git a/webrtc.go b/webrtc.go index 76de2914..10c43ddf 100644 --- a/webrtc.go +++ b/webrtc.go @@ -386,6 +386,7 @@ func newSession(config SessionConfig) (*Session, error) { isConnected = false onActiveSessionsChanged() if decrActiveSessions() == 0 { + scopedLogger.Info().Msg("last session disconnected, stopping video stream") onLastSessionDisconnected() } }