Merge branch 'dev' into feat/audio-support

Integrate latest dev branch changes including:
- HID RPC handshake improvements
- Video sleep mode functionality
- IPv6 address sorting fixes
- OTA update flow improvements
- Video stream lifecycle management

All audio functionality preserved and tested.
This commit is contained in:
Alex P 2025-11-24 23:30:49 +02:00
commit aa0fe18b4a
20 changed files with 428 additions and 98 deletions

View File

@ -59,6 +59,7 @@ const char *jetkvm_ui_event_code_to_name(int code) {
void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second) void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second)
{ {
state.streaming = video_get_streaming_status();
state.ready = ready; state.ready = ready;
state.error = error; state.error = error;
state.width = width; state.width = width;
@ -69,6 +70,13 @@ void video_report_format(bool ready, const char *error, u_int16_t width, u_int16
} }
} }
void video_send_format_report() {
state.streaming = video_get_streaming_status();
if (video_state_handler != NULL) {
(*video_state_handler)(&state);
}
}
int video_send_frame(const uint8_t *frame, ssize_t len) int video_send_frame(const uint8_t *frame, ssize_t len)
{ {
if (video_handler != NULL) { if (video_handler != NULL) {
@ -367,6 +375,10 @@ void jetkvm_video_stop() {
video_stop_streaming(); video_stop_streaming();
} }
uint8_t jetkvm_video_get_streaming_status() {
return video_get_streaming_status();
}
int jetkvm_video_set_quality_factor(float quality_factor) { int jetkvm_video_set_quality_factor(float quality_factor) {
if (quality_factor <= 0 || quality_factor > 1) { if (quality_factor <= 0 || quality_factor > 1) {
return -1; return -1;

View File

@ -8,6 +8,7 @@
typedef struct typedef struct
{ {
bool ready; bool ready;
uint8_t streaming;
const char *error; const char *error;
u_int16_t width; u_int16_t width;
u_int16_t height; u_int16_t height;
@ -56,6 +57,7 @@ int jetkvm_video_init(float quality_factor);
void jetkvm_video_shutdown(); void jetkvm_video_shutdown();
void jetkvm_video_start(); void jetkvm_video_start();
void jetkvm_video_stop(); void jetkvm_video_stop();
uint8_t jetkvm_video_get_streaming_status();
int jetkvm_video_set_quality_factor(float quality_factor); int jetkvm_video_set_quality_factor(float quality_factor);
float jetkvm_video_get_quality_factor(); float jetkvm_video_get_quality_factor();
int jetkvm_video_set_edid(const char *edid_hex); int jetkvm_video_set_edid(const char *edid_hex);
@ -64,6 +66,7 @@ char *jetkvm_video_log_status();
jetkvm_video_state_t *jetkvm_video_get_status(); jetkvm_video_state_t *jetkvm_video_get_status();
void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second); void video_report_format(bool ready, const char *error, u_int16_t width, u_int16_t height, double frame_per_second);
void video_send_format_report();
int video_send_frame(const uint8_t *frame, ssize_t len); int video_send_frame(const uint8_t *frame, ssize_t len);

View File

@ -349,7 +349,10 @@ static void *venc_read_stream(void *arg)
} }
uint32_t detected_width, detected_height; uint32_t detected_width, detected_height;
bool detected_signal = false, streaming_flag = false, streaming_stopped = true; bool detected_signal = false, streaming_flag = false;
bool streaming_stopped = true;
pthread_mutex_t streaming_stopped_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_t *streaming_thread = NULL; pthread_t *streaming_thread = NULL;
pthread_mutex_t streaming_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_t streaming_mutex = PTHREAD_MUTEX_INITIALIZER;
@ -370,6 +373,25 @@ void set_streaming_flag(bool flag)
pthread_mutex_lock(&streaming_mutex); pthread_mutex_lock(&streaming_mutex);
streaming_flag = flag; streaming_flag = flag;
pthread_mutex_unlock(&streaming_mutex); pthread_mutex_unlock(&streaming_mutex);
video_send_format_report();
}
void set_streaming_stopped(bool stopped)
{
pthread_mutex_lock(&streaming_stopped_mutex);
streaming_stopped = stopped;
pthread_mutex_unlock(&streaming_stopped_mutex);
video_send_format_report();
}
bool get_streaming_stopped()
{
pthread_mutex_lock(&streaming_stopped_mutex);
bool stopped = streaming_stopped;
pthread_mutex_unlock(&streaming_stopped_mutex);
return stopped;
} }
void write_buffer_to_file(const uint8_t *buffer, size_t length, const char *filename) void write_buffer_to_file(const uint8_t *buffer, size_t length, const char *filename)
@ -385,8 +407,7 @@ void *run_video_stream(void *arg)
log_info("running video stream"); log_info("running video stream");
streaming_stopped = false; set_streaming_stopped(false);
while (streaming_flag) while (streaming_flag)
{ {
if (detected_signal == false) if (detected_signal == false)
@ -528,6 +549,8 @@ void *run_video_stream(void *arg)
uint32_t num = 0; uint32_t num = 0;
VIDEO_FRAME_INFO_S stFrame; VIDEO_FRAME_INFO_S stFrame;
while (streaming_flag) while (streaming_flag)
{ {
FD_ZERO(&fds); FD_ZERO(&fds);
@ -539,6 +562,7 @@ void *run_video_stream(void *arg)
if (r == 0) if (r == 0)
{ {
log_info("select timeout"); log_info("select timeout");
ensure_sleep_mode_disabled();
break; break;
} }
if (r == -1) if (r == -1)
@ -634,7 +658,7 @@ void *run_video_stream(void *arg)
log_info("video stream thread exiting"); log_info("video stream thread exiting");
streaming_stopped = true; set_streaming_stopped(true);
return NULL; return NULL;
} }
@ -670,9 +694,10 @@ void video_start_streaming()
log_info("starting video streaming"); log_info("starting video streaming");
if (streaming_thread != NULL) if (streaming_thread != NULL)
{ {
if (streaming_stopped == true) { bool stopped = get_streaming_stopped();
if (stopped == true) {
log_error("video streaming already stopped but streaming_thread is not NULL"); log_error("video streaming already stopped but streaming_thread is not NULL");
assert(streaming_stopped == true); assert(stopped == true);
} }
log_warn("video streaming already started"); log_warn("video streaming already started");
return; return;
@ -699,6 +724,21 @@ void video_start_streaming()
streaming_thread = new_thread; streaming_thread = new_thread;
} }
bool wait_for_streaming_stopped()
{
int attempts = 0;
while (attempts < 30) {
if (get_streaming_stopped() == true) {
log_info("video streaming stopped after %d attempts", attempts);
return true;
}
usleep(100000); // 100ms
attempts++;
}
log_error("video streaming did not stop after 3s");
return false;
}
void video_stop_streaming() void video_stop_streaming()
{ {
if (streaming_thread == NULL) { if (streaming_thread == NULL) {
@ -710,14 +750,7 @@ void video_stop_streaming()
set_streaming_flag(false); set_streaming_flag(false);
log_info("waiting for video streaming thread to exit"); log_info("waiting for video streaming thread to exit");
int attempts = 0; wait_for_streaming_stopped();
while (!streaming_stopped && attempts < 30) {
usleep(100000); // 100ms
attempts++;
}
if (!streaming_stopped) {
log_error("video streaming thread did not exit after 30s");
}
pthread_join(*streaming_thread, NULL); pthread_join(*streaming_thread, NULL);
free(streaming_thread); free(streaming_thread);
@ -726,13 +759,30 @@ void video_stop_streaming()
log_info("video streaming stopped"); log_info("video streaming stopped");
} }
uint8_t video_get_streaming_status() {
// streaming flag can be false when stopping streaming
if (get_streaming_flag() == true) return 1;
if (get_streaming_stopped() == false) return 2;
return 0;
}
void video_restart_streaming() void video_restart_streaming()
{ {
if (get_streaming_flag() == true) uint8_t streaming_status = video_get_streaming_status();
if (streaming_status == 0)
{ {
log_info("restarting video streaming"); log_info("will not restart video streaming because it's stopped");
return;
}
if (streaming_status == 2) {
video_stop_streaming(); video_stop_streaming();
} }
if (!wait_for_streaming_stopped()) {
return;
}
video_start_streaming(); video_start_streaming();
} }

View File

@ -31,6 +31,13 @@ void video_start_streaming();
*/ */
void video_stop_streaming(); void video_stop_streaming();
/**
* @brief Get the streaming status of the video
*
* @return uint8_t 1 if the video streaming is active, 2 if the video streaming is stopping, 0 otherwise
*/
uint8_t video_get_streaming_status();
/** /**
* @brief Set the quality factor of the video * @brief Set the quality factor of the video
* *

View File

@ -57,6 +57,7 @@ var (
func jetkvm_go_video_state_handler(state *C.jetkvm_video_state_t) { func jetkvm_go_video_state_handler(state *C.jetkvm_video_state_t) {
videoState := VideoState{ videoState := VideoState{
Ready: bool(state.ready), Ready: bool(state.ready),
Streaming: VideoStreamingStatus(state.streaming),
Error: C.GoString(state.error), Error: C.GoString(state.error),
Width: int(state.width), Width: int(state.width),
Height: int(state.height), Height: int(state.height),
@ -168,6 +169,15 @@ func videoStop() {
C.jetkvm_video_stop() C.jetkvm_video_stop()
} }
func videoGetStreamingStatus() VideoStreamingStatus {
cgoLock.Lock()
defer cgoLock.Unlock()
isStreaming := C.jetkvm_video_get_streaming_status()
return VideoStreamingStatus(isStreaming)
}
func videoLogStatus() string { func videoLogStatus() string {
cgoLock.Lock() cgoLock.Lock()
defer cgoLock.Unlock() defer cgoLock.Unlock()

View File

@ -123,6 +123,11 @@ func videoSetEDID(edid string) error {
return nil return nil
} }
func videoGetStreamingStatus() VideoStreamingStatus {
panicPlatformNotSupported()
return VideoStreamingStatusInactive
}
func crash() { func crash() {
panicPlatformNotSupported() panicPlatformNotSupported()
} }

View File

@ -28,6 +28,7 @@ func (n *Native) handleVideoFrameChan() {
func (n *Native) handleVideoStateChan() { func (n *Native) handleVideoStateChan() {
for { for {
state := <-videoStateChan state := <-videoStateChan
n.onVideoStateChange(state) n.onVideoStateChange(state)
} }
} }

View File

@ -70,9 +70,6 @@ func (s *grpcServer) VideoLogStatus(ctx context.Context, req *pb.Empty) (*pb.Vid
} }
func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, error) { func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, error) {
procPrefix = "jetkvm: [native]"
setProcTitle(lastProcTitle)
if err := s.native.VideoStop(); err != nil { if err := s.native.VideoStop(); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -80,9 +77,6 @@ func (s *grpcServer) VideoStop(ctx context.Context, req *pb.Empty) (*pb.Empty, e
} }
func (s *grpcServer) VideoStart(ctx context.Context, req *pb.Empty) (*pb.Empty, error) { func (s *grpcServer) VideoStart(ctx context.Context, req *pb.Empty) (*pb.Empty, error) {
procPrefix = "jetkvm: [native+video]"
setProcTitle(lastProcTitle)
if err := s.native.VideoStart(); err != nil { if err := s.native.VideoStart(); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

View File

@ -40,6 +40,26 @@ type NativeOptions struct {
OnNativeRestart func() OnNativeRestart func()
} }
type VideoStreamingStatus uint8
const (
VideoStreamingStatusActive VideoStreamingStatus = 1
VideoStreamingStatusStopping VideoStreamingStatus = 2 // video is stopping, but not yet stopped
VideoStreamingStatusInactive VideoStreamingStatus = 0
)
func (s VideoStreamingStatus) String() string {
switch s {
case VideoStreamingStatusActive:
return "active"
case VideoStreamingStatusStopping:
return "stopping"
case VideoStreamingStatusInactive:
return "inactive"
}
return "unknown"
}
func NewNative(opts NativeOptions) *Native { func NewNative(opts NativeOptions) *Native {
pid := os.Getpid() pid := os.Getpid()
nativeSubLogger := nativeLogger.With().Int("pid", pid).Str("scope", "native").Logger() nativeSubLogger := nativeLogger.With().Int("pid", pid).Str("scope", "native").Logger()

View File

@ -54,6 +54,25 @@ func monitorCrashSignal(ctx context.Context, logger *zerolog.Logger, nativeInsta
} }
} }
func updateProcessTitle(state *VideoState) {
if state == nil {
procPrefix = "jetkvm: [native]"
} else {
var status string
if state.Streaming == VideoStreamingStatusInactive {
status = "inactive"
} else if !state.Ready {
status = "not ready"
} else if state.Error != "" {
status = state.Error
} else {
status = fmt.Sprintf("%s,%dx%d,%.1ffps", state.Streaming.String(), state.Width, state.Height, state.FramePerSecond)
}
procPrefix = fmt.Sprintf("jetkvm: [native+video{%s}]", status)
}
setProcTitle(lastProcTitle)
}
// RunNativeProcess runs the native process mode // RunNativeProcess runs the native process mode
func RunNativeProcess(binaryName string) { func RunNativeProcess(binaryName string) {
appCtx, appCtxCancel := context.WithCancel(context.Background()) appCtx, appCtxCancel := context.WithCancel(context.Background())
@ -82,6 +101,9 @@ func RunNativeProcess(binaryName string) {
logger.Fatal().Err(err).Msg("failed to write frame to video stream socket") logger.Fatal().Err(err).Msg("failed to write frame to video stream socket")
} }
} }
nativeOptions.OnVideoStateChange = func(state VideoState) {
updateProcessTitle(&state)
}
// Create native instance // Create native instance
nativeInstance := NewNative(*nativeOptions) nativeInstance := NewNative(*nativeOptions)

View File

@ -15,6 +15,7 @@ var extraLockTimeout = 5 * time.Second
// VideoState is the state of the video stream. // VideoState is the state of the video stream.
type VideoState struct { type VideoState struct {
Ready bool `json:"ready"` Ready bool `json:"ready"`
Streaming VideoStreamingStatus `json:"streaming"`
Error string `json:"error,omitempty"` //no_signal, no_lock, out_of_range Error string `json:"error,omitempty"` //no_signal, no_lock, out_of_range
Width int `json:"width"` Width int `json:"width"`
Height int `json:"height"` Height int `json:"height"`
@ -26,15 +27,53 @@ func isSleepModeSupported() bool {
return err == nil return err == nil
} }
const sleepModeWaitTimeout = 3 * time.Second
func (n *Native) waitForVideoStreamingStatus(status VideoStreamingStatus) error {
timeout := time.After(sleepModeWaitTimeout)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
if videoGetStreamingStatus() == status {
return nil
}
select {
case <-timeout:
return fmt.Errorf("timed out waiting for video streaming status to be %s", status.String())
case <-ticker.C:
}
}
}
// before calling this function, make sure to lock n.videoLock
func (n *Native) setSleepMode(enabled bool) error { func (n *Native) setSleepMode(enabled bool) error {
if !n.sleepModeSupported { if !n.sleepModeSupported {
return nil return nil
} }
bEnabled := "0" bEnabled := "0"
shouldWait := false
if enabled { if enabled {
bEnabled = "1" bEnabled = "1"
switch videoGetStreamingStatus() {
case VideoStreamingStatusActive:
n.l.Info().Msg("stopping video stream to enable sleep mode")
videoStop()
shouldWait = true
case VideoStreamingStatusStopping:
n.l.Info().Msg("video stream is stopping, will enable sleep mode in a few seconds")
shouldWait = true
} }
}
if shouldWait {
if err := n.waitForVideoStreamingStatus(VideoStreamingStatusInactive); err != nil {
return err
}
}
return os.WriteFile(sleepModeFile, []byte(bEnabled), 0644) return os.WriteFile(sleepModeFile, []byte(bEnabled), 0644)
} }
@ -159,3 +198,11 @@ func (n *Native) VideoStart() error {
videoStart() videoStart()
return nil return nil
} }
// VideoGetStreamingStatus gets the streaming status of the video.
func (n *Native) VideoGetStreamingStatus() VideoStreamingStatus {
n.videoLock.Lock()
defer n.videoLock.Unlock()
return videoGetStreamingStatus()
}

View File

@ -228,6 +228,13 @@ func (s *State) doUpdate(ctx context.Context, params UpdateParams) error {
s.triggerComponentUpdateState("system", systemUpdate) s.triggerComponentUpdateState("system", systemUpdate)
} }
if !appUpdate.pending && !systemUpdate.pending {
scopedLogger.Info().Msg("No updates available")
s.updating = false
s.triggerStateUpdate()
return nil
}
scopedLogger.Trace().Bool("pending", appUpdate.pending).Msg("Checking for app update") scopedLogger.Trace().Bool("pending", appUpdate.pending).Msg("Checking for app update")
if appUpdate.pending { if appUpdate.pending {

View File

@ -2,6 +2,7 @@ package nmlite
import ( import (
"fmt" "fmt"
"strings"
"time" "time"
"github.com/jetkvm/kvm/internal/network/types" "github.com/jetkvm/kvm/internal/network/types"
@ -9,6 +10,40 @@ import (
"github.com/vishvananda/netlink" "github.com/vishvananda/netlink"
) )
type IfStateChangeReason uint
const (
IfStateOperStateChanged IfStateChangeReason = 1
IfStateOnlineStateChanged IfStateChangeReason = 2
IfStateMACAddressChanged IfStateChangeReason = 3
IfStateIPAddressesChanged IfStateChangeReason = 4
)
type IfStateChangeReasons []IfStateChangeReason
func (r IfStateChangeReason) String() string {
switch r {
case IfStateOperStateChanged:
return "oper state changed"
case IfStateOnlineStateChanged:
return "online state changed"
case IfStateMACAddressChanged:
return "MAC address changed"
case IfStateIPAddressesChanged:
return "IP addresses changed"
default:
return fmt.Sprintf("unknown change reason %d", r)
}
}
func (rs IfStateChangeReasons) String() string {
reasons := []string{}
for _, r := range rs {
reasons = append(reasons, r.String())
}
return strings.Join(reasons, ", ")
}
// updateInterfaceState updates the current interface state // updateInterfaceState updates the current interface state
func (im *InterfaceManager) updateInterfaceState() error { func (im *InterfaceManager) updateInterfaceState() error {
nl, err := im.link() nl, err := im.link()
@ -16,7 +51,10 @@ func (im *InterfaceManager) updateInterfaceState() error {
return fmt.Errorf("failed to get interface: %w", err) return fmt.Errorf("failed to get interface: %w", err)
} }
var stateChanged bool var (
stateChanged bool
changeReasons IfStateChangeReasons
)
attrs := nl.Attrs() attrs := nl.Attrs()
@ -29,6 +67,7 @@ func (im *InterfaceManager) updateInterfaceState() error {
if im.state.Up != isUp { if im.state.Up != isUp {
im.state.Up = isUp im.state.Up = isUp
stateChanged = true stateChanged = true
changeReasons = append(changeReasons, IfStateOperStateChanged)
} }
// Check if the interface is online // Check if the interface is online
@ -36,12 +75,14 @@ func (im *InterfaceManager) updateInterfaceState() error {
if im.state.Online != isOnline { if im.state.Online != isOnline {
im.state.Online = isOnline im.state.Online = isOnline
stateChanged = true stateChanged = true
changeReasons = append(changeReasons, IfStateOnlineStateChanged)
} }
// Check if the MAC address has changed // Check if the MAC address has changed
if im.state.MACAddress != attrs.HardwareAddr.String() { if im.state.MACAddress != attrs.HardwareAddr.String() {
im.state.MACAddress = attrs.HardwareAddr.String() im.state.MACAddress = attrs.HardwareAddr.String()
stateChanged = true stateChanged = true
changeReasons = append(changeReasons, IfStateMACAddressChanged)
} }
// Update IP addresses // Update IP addresses
@ -49,6 +90,7 @@ func (im *InterfaceManager) updateInterfaceState() error {
im.logger.Error().Err(err).Msg("failed to update IP addresses") im.logger.Error().Err(err).Msg("failed to update IP addresses")
} else if ipChanged { } else if ipChanged {
stateChanged = true stateChanged = true
changeReasons = append(changeReasons, IfStateIPAddressesChanged)
} }
im.state.LastUpdated = time.Now() im.state.LastUpdated = time.Now()
@ -56,7 +98,10 @@ func (im *InterfaceManager) updateInterfaceState() error {
// Notify callback if state changed // Notify callback if state changed
if stateChanged && im.onStateChange != nil { if stateChanged && im.onStateChange != nil {
im.logger.Debug().Interface("state", im.state).Msg("notifying state change") im.logger.Debug().
Stringer("changeReasons", changeReasons).
Interface("state", im.state).
Msg("notifying state change")
im.onStateChange(*im.state) im.onStateChange(*im.state)
} }
@ -80,6 +125,7 @@ func (im *InterfaceManager) updateInterfaceStateAddresses(nl *link.Link) (bool,
ipv6Gateway string ipv6Gateway string
ipv4Ready, ipv6Ready = false, false ipv4Ready, ipv6Ready = false, false
stateChanged = false stateChanged = false
stateChangeReason string
) )
routes, _ := mgr.ListDefaultRoutes(link.AfInet6) routes, _ := mgr.ListDefaultRoutes(link.AfInet6)
@ -123,40 +169,55 @@ func (im *InterfaceManager) updateInterfaceStateAddresses(nl *link.Link) (bool,
if !sortAndCompareStringSlices(im.state.IPv4Addresses, ipv4Addresses) { if !sortAndCompareStringSlices(im.state.IPv4Addresses, ipv4Addresses) {
im.state.IPv4Addresses = ipv4Addresses im.state.IPv4Addresses = ipv4Addresses
stateChanged = true stateChanged = true
stateChangeReason = "IPv4 addresses changed"
} }
if !sortAndCompareIPv6AddressSlices(im.state.IPv6Addresses, ipv6Addresses) { if !sortAndCompareIPv6AddressSlices(im.state.IPv6Addresses, ipv6Addresses) {
im.state.IPv6Addresses = ipv6Addresses im.state.IPv6Addresses = ipv6Addresses
stateChanged = true stateChanged = true
stateChangeReason = "IPv6 addresses changed"
} }
if im.state.IPv4Address != ipv4Addr { if im.state.IPv4Address != ipv4Addr {
im.state.IPv4Address = ipv4Addr im.state.IPv4Address = ipv4Addr
stateChanged = true stateChanged = true
stateChangeReason = "IPv4 address changed"
} }
if im.state.IPv6Address != ipv6Addr { if im.state.IPv6Address != ipv6Addr {
im.state.IPv6Address = ipv6Addr im.state.IPv6Address = ipv6Addr
stateChanged = true stateChanged = true
stateChangeReason = "IPv6 address changed"
} }
if im.state.IPv6LinkLocal != ipv6LinkLocal { if im.state.IPv6LinkLocal != ipv6LinkLocal {
im.state.IPv6LinkLocal = ipv6LinkLocal im.state.IPv6LinkLocal = ipv6LinkLocal
stateChanged = true stateChanged = true
stateChangeReason = "IPv6 link local address changed"
} }
if im.state.IPv6Gateway != ipv6Gateway { if im.state.IPv6Gateway != ipv6Gateway {
im.state.IPv6Gateway = ipv6Gateway im.state.IPv6Gateway = ipv6Gateway
stateChanged = true stateChanged = true
stateChangeReason = "IPv6 gateway changed"
} }
if im.state.IPv4Ready != ipv4Ready { if im.state.IPv4Ready != ipv4Ready {
im.state.IPv4Ready = ipv4Ready im.state.IPv4Ready = ipv4Ready
stateChanged = true stateChanged = true
stateChangeReason = "IPv4 ready state changed"
} }
if im.state.IPv6Ready != ipv6Ready { if im.state.IPv6Ready != ipv6Ready {
im.state.IPv6Ready = ipv6Ready im.state.IPv6Ready = ipv6Ready
stateChanged = true stateChanged = true
stateChangeReason = "IPv6 ready state changed"
}
if stateChanged {
im.logger.Trace().
Str("changeReason", stateChangeReason).
Interface("state", im.state).
Msg("interface state changed")
} }
return stateChanged, nil return stateChanged, nil

View File

@ -42,17 +42,19 @@ func sortAndCompareStringSlices(a, b []string) bool {
return true return true
} }
func sortIPv6AddressSlicesStable(a []types.IPv6Address) {
sort.SliceStable(a, func(i, j int) bool {
return a[i].Address.String() < a[j].Address.String()
})
}
func sortAndCompareIPv6AddressSlices(a, b []types.IPv6Address) bool { func sortAndCompareIPv6AddressSlices(a, b []types.IPv6Address) bool {
if len(a) != len(b) { if len(a) != len(b) {
return false return false
} }
sort.SliceStable(a, func(i, j int) bool { sortIPv6AddressSlicesStable(a)
return a[i].Address.String() < b[j].Address.String() sortIPv6AddressSlicesStable(b)
})
sort.SliceStable(b, func(i, j int) bool {
return b[i].Address.String() < a[j].Address.String()
})
for i := range a { for i := range a {
if a[i].Address.String() != b[i].Address.String() { if a[i].Address.String() != b[i].Address.String() {

13
ui/package-lock.json generated
View File

@ -37,6 +37,7 @@
"react-xtermjs": "^1.0.10", "react-xtermjs": "^1.0.10",
"recharts": "^3.3.0", "recharts": "^3.3.0",
"tailwind-merge": "^3.3.1", "tailwind-merge": "^3.3.1",
"tslog": "^4.10.2",
"usehooks-ts": "^3.1.1", "usehooks-ts": "^3.1.1",
"validator": "^13.15.20", "validator": "^13.15.20",
"zustand": "^4.5.2" "zustand": "^4.5.2"
@ -7330,6 +7331,18 @@
"integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==", "integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==",
"license": "0BSD" "license": "0BSD"
}, },
"node_modules/tslog": {
"version": "4.10.2",
"resolved": "https://registry.npmjs.org/tslog/-/tslog-4.10.2.tgz",
"integrity": "sha512-XuELoRpMR+sq8fuWwX7P0bcj+PRNiicOKDEb3fGNURhxWVyykCi9BNq7c4uVz7h7P0sj8qgBsr5SWS6yBClq3g==",
"license": "MIT",
"engines": {
"node": ">=16"
},
"funding": {
"url": "https://github.com/fullstack-build/tslog?sponsor=1"
}
},
"node_modules/type-check": { "node_modules/type-check": {
"version": "0.4.0", "version": "0.4.0",
"resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz", "resolved": "https://registry.npmjs.org/type-check/-/type-check-0.4.0.tgz",

View File

@ -56,6 +56,7 @@
"react-xtermjs": "^1.0.10", "react-xtermjs": "^1.0.10",
"recharts": "^3.3.0", "recharts": "^3.3.0",
"tailwind-merge": "^3.3.1", "tailwind-merge": "^3.3.1",
"tslog": "^4.10.2",
"usehooks-ts": "^3.1.1", "usehooks-ts": "^3.1.1",
"validator": "^13.15.20", "validator": "^13.15.20",
"zustand": "^4.5.2" "zustand": "^4.5.2"

View File

@ -1,4 +1,5 @@
import { useCallback, useEffect, useMemo } from "react"; import { useCallback, useEffect, useMemo } from "react";
import { Logger } from "tslog";
import { useRTCStore } from "@hooks/stores"; import { useRTCStore } from "@hooks/stores";
@ -25,6 +26,128 @@ interface sendMessageParams {
requireOrdered?: boolean; requireOrdered?: boolean;
} }
const HANDSHAKE_TIMEOUT = 30 * 1000; // 30 seconds
const HANDSHAKE_MAX_ATTEMPTS = 10;
const logger = new Logger({ name: "hidrpc" });
export function doRpcHidHandshake(rpcHidChannel: RTCDataChannel, setRpcHidProtocolVersion: (version: number | null) => void) {
let attempts = 0;
let lastConnectedTime: Date | undefined;
let lastSendTime: Date | undefined;
let handshakeCompleted = false;
let handshakeInterval: ReturnType<typeof setInterval> | null = null;
const shouldGiveUp = () => {
if (attempts > HANDSHAKE_MAX_ATTEMPTS) {
logger.error(`Failed to send handshake message after ${HANDSHAKE_MAX_ATTEMPTS} attempts`);
return true;
}
const timeSinceConnected = lastConnectedTime ? Date.now() - lastConnectedTime.getTime() : 0;
if (timeSinceConnected > HANDSHAKE_TIMEOUT) {
logger.error(`Handshake timed out after ${timeSinceConnected}ms`);
return true;
}
return false;
}
const sendHandshake = (initial: boolean) => {
if (handshakeCompleted) return;
attempts++;
lastSendTime = new Date();
if (!initial && shouldGiveUp()) {
if (handshakeInterval) {
clearInterval(handshakeInterval);
handshakeInterval = null;
}
return;
}
let data: Uint8Array | undefined;
try {
const message = new HandshakeMessage(HID_RPC_VERSION);
data = message.marshal();
} catch (e) {
logger.error("Failed to marshal message", e);
return;
}
if (!data) return;
rpcHidChannel.send(data as unknown as ArrayBuffer);
if (initial) {
handshakeInterval = setInterval(() => {
sendHandshake(false);
}, 1000);
}
};
const onMessage = (ev: MessageEvent) => {
const message = unmarshalHidRpcMessage(new Uint8Array(ev.data));
if (!message || !(message instanceof HandshakeMessage)) return;
if (!message.version) {
logger.error("Received handshake message without version", message);
return;
}
if (message.version > HID_RPC_VERSION) {
// we assume that the UI is always using the latest version of the HID RPC protocol
// so we can't support this
// TODO: use capabilities to determine rather than version number
logger.error("Server is using a newer version than the client", message);
return;
}
setRpcHidProtocolVersion(message.version);
const timeUsed = lastSendTime ? Date.now() - lastSendTime.getTime() : 0;
logger.info(`Handshake completed in ${timeUsed}ms after ${attempts} attempts (Version: ${message.version} / ${HID_RPC_VERSION})`);
// clean up
rpcHidChannel.removeEventListener("message", onMessage);
resetHandshake({ completed: true });
};
const resetHandshake = ({ lastConnectedTime: newLastConnectedTime, completed }: { lastConnectedTime?: Date | undefined, completed?: boolean }) => {
if (newLastConnectedTime) lastConnectedTime = newLastConnectedTime;
lastSendTime = undefined;
attempts = 0;
if (completed !== undefined) handshakeCompleted = completed;
if (handshakeInterval) {
clearInterval(handshakeInterval);
handshakeInterval = null;
}
};
const onConnected = () => {
resetHandshake({ lastConnectedTime: new Date() });
logger.info("Channel connected");
sendHandshake(true);
rpcHidChannel.addEventListener("message", onMessage);
};
const onClose = () => {
resetHandshake({ lastConnectedTime: undefined, completed: false });
logger.info("Channel closed");
setRpcHidProtocolVersion(null);
rpcHidChannel.removeEventListener("message", onMessage);
};
rpcHidChannel.addEventListener("open", onConnected);
rpcHidChannel.addEventListener("close", onClose);
// handle case where channel is already open when the hook is mounted
if (rpcHidChannel.readyState === "open") {
onConnected();
}
}
export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) { export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
const { const {
rpcHidChannel, rpcHidChannel,
@ -78,7 +201,7 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
try { try {
data = message.marshal(); data = message.marshal();
} catch (e) { } catch (e) {
console.error("Failed to marshal HID RPC message", e); logger.error("Failed to marshal message", e);
} }
if (!data) return; if (!data) return;
@ -151,99 +274,46 @@ export function useHidRpc(onHidRpcMessage?: (payload: RpcMessage) => void) {
sendMessage(KEEPALIVE_MESSAGE); sendMessage(KEEPALIVE_MESSAGE);
}, [sendMessage]); }, [sendMessage]);
const sendHandshake = useCallback(() => {
if (hidRpcDisabled) return;
if (rpcHidProtocolVersion) return;
if (!rpcHidChannel) return;
sendMessage(new HandshakeMessage(HID_RPC_VERSION), { ignoreHandshakeState: true });
}, [rpcHidChannel, rpcHidProtocolVersion, sendMessage, hidRpcDisabled]);
const handleHandshake = useCallback(
(message: HandshakeMessage) => {
if (hidRpcDisabled) return;
if (!message.version) {
console.error("Received handshake message without version", message);
return;
}
if (message.version > HID_RPC_VERSION) {
// we assume that the UI is always using the latest version of the HID RPC protocol
// so we can't support this
// TODO: use capabilities to determine rather than version number
console.error("Server is using a newer HID RPC version than the client", message);
return;
}
setRpcHidProtocolVersion(message.version);
},
[setRpcHidProtocolVersion, hidRpcDisabled],
);
useEffect(() => { useEffect(() => {
if (!rpcHidChannel) return; if (!rpcHidChannel) return;
if (hidRpcDisabled) return; if (hidRpcDisabled) return;
// send handshake message
sendHandshake();
const messageHandler = (e: MessageEvent) => { const messageHandler = (e: MessageEvent) => {
if (typeof e.data === "string") { if (typeof e.data === "string") {
console.warn("Received string data in HID RPC message handler", e.data); logger.warn("Received string data in message handler", e.data);
return; return;
} }
const message = unmarshalHidRpcMessage(new Uint8Array(e.data)); const message = unmarshalHidRpcMessage(new Uint8Array(e.data));
if (!message) { if (!message) {
console.warn("Received invalid HID RPC message", e.data); logger.warn("Received invalid message", e.data);
return; return;
} }
console.debug("Received HID RPC message", message); if (message instanceof HandshakeMessage) return; // handshake message is handled by the doRpcHidHandshake function
switch (message.constructor) {
case HandshakeMessage: // to remove it from the production build, we need to use the /* @__PURE__ */ comment here
handleHandshake(message as HandshakeMessage); // setting `esbuild.pure` doesn't work
break; /* @__PURE__ */ logger.debug("Received message", message);
default:
// not all events are handled here, the rest are handled by the onHidRpcMessage callback
break;
}
onHidRpcMessage?.(message); onHidRpcMessage?.(message);
}; };
const openHandler = () => {
console.info("HID RPC channel opened");
sendHandshake();
};
const closeHandler = () => {
console.info("HID RPC channel closed");
setRpcHidProtocolVersion(null);
};
const errorHandler = (e: Event) => { const errorHandler = (e: Event) => {
console.error(`Error on rpcHidChannel '${rpcHidChannel.label}': ${e}`) logger.error(`Error on channel '${rpcHidChannel.label}'`, e);
}; };
rpcHidChannel.addEventListener("message", messageHandler); rpcHidChannel.addEventListener("message", messageHandler);
rpcHidChannel.addEventListener("close", closeHandler);
rpcHidChannel.addEventListener("error", errorHandler); rpcHidChannel.addEventListener("error", errorHandler);
rpcHidChannel.addEventListener("open", openHandler);
return () => { return () => {
rpcHidChannel.removeEventListener("message", messageHandler); rpcHidChannel.removeEventListener("message", messageHandler);
rpcHidChannel.removeEventListener("close", closeHandler);
rpcHidChannel.removeEventListener("error", errorHandler); rpcHidChannel.removeEventListener("error", errorHandler);
rpcHidChannel.removeEventListener("open", openHandler);
}; };
}, [ }, [
rpcHidChannel, rpcHidChannel,
onHidRpcMessage, onHidRpcMessage,
setRpcHidProtocolVersion, setRpcHidProtocolVersion,
sendHandshake,
handleHandshake,
hidRpcDisabled, hidRpcDisabled,
]); ]);

View File

@ -55,6 +55,7 @@ import {
import { FeatureFlagProvider } from "@providers/FeatureFlagProvider"; import { FeatureFlagProvider } from "@providers/FeatureFlagProvider";
import { m } from "@localizations/messages.js"; import { m } from "@localizations/messages.js";
import { isSecureContext } from "@/utils"; import { isSecureContext } from "@/utils";
import { doRpcHidHandshake } from "@hooks/useHidRpc";
export type AuthMode = "password" | "noPassword" | null; export type AuthMode = "password" | "noPassword" | null;
@ -132,6 +133,7 @@ export default function KvmIdRoute() {
setRpcHidChannel, setRpcHidChannel,
setRpcHidUnreliableNonOrderedChannel, setRpcHidUnreliableNonOrderedChannel,
setRpcHidUnreliableChannel, setRpcHidUnreliableChannel,
setRpcHidProtocolVersion,
} = useRTCStore(); } = useRTCStore();
const location = useLocation(); const location = useLocation();
@ -555,6 +557,7 @@ export default function KvmIdRoute() {
rpcHidChannel.onopen = () => { rpcHidChannel.onopen = () => {
setRpcHidChannel(rpcHidChannel); setRpcHidChannel(rpcHidChannel);
}; };
doRpcHidHandshake(rpcHidChannel, setRpcHidProtocolVersion);
const rpcHidUnreliableChannel = pc.createDataChannel("hidrpc-unreliable-ordered", { const rpcHidUnreliableChannel = pc.createDataChannel("hidrpc-unreliable-ordered", {
ordered: true, ordered: true,
@ -591,6 +594,7 @@ export default function KvmIdRoute() {
setRpcHidChannel, setRpcHidChannel,
setRpcHidUnreliableNonOrderedChannel, setRpcHidUnreliableNonOrderedChannel,
setRpcHidUnreliableChannel, setRpcHidUnreliableChannel,
setRpcHidProtocolVersion,
setTransceiver, setTransceiver,
setAudioTransceiver, setAudioTransceiver,
audioInputAutoEnable, audioInputAutoEnable,

View File

@ -39,7 +39,7 @@ export default defineConfig(({ mode, command }) => {
return { return {
plugins, plugins,
esbuild: { esbuild: {
pure: ["console.debug"], pure: command === "build" ? ["console.debug"]: [],
}, },
assetsInclude: ["**/*.woff2"], assetsInclude: ["**/*.woff2"],
build: { build: {

View File

@ -423,6 +423,7 @@ func newSession(config SessionConfig) (*Session, error) {
isConnected = false isConnected = false
onActiveSessionsChanged() onActiveSessionsChanged()
if decrActiveSessions() == 0 { if decrActiveSessions() == 0 {
scopedLogger.Info().Msg("last session disconnected, stopping video stream")
onLastSessionDisconnected() onLastSessionDisconnected()
} }
} }