From 04aa35249a81751ba983270cd53875a135362891 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 07:41:21 +0200 Subject: [PATCH 01/13] chore(log): add jsonRpcLogger --- jsonrpc.go | 35 ++++++++++++++++++++++++++--------- log.go | 5 ++++- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/jsonrpc.go b/jsonrpc.go index d56b8ea..d788e86 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -47,12 +47,12 @@ type BacklightSettings struct { func writeJSONRPCResponse(response JSONRPCResponse, session *Session) { responseBytes, err := json.Marshal(response) if err != nil { - logger.Warn().Err(err).Msg("Error marshalling JSONRPC response") + jsonRpcLogger.Warn().Err(err).Msg("Error marshalling JSONRPC response") return } err = session.RPCChannel.SendText(string(responseBytes)) if err != nil { - logger.Warn().Err(err).Msg("Error sending JSONRPC response") + jsonRpcLogger.Warn().Err(err).Msg("Error sending JSONRPC response") return } } @@ -65,16 +65,20 @@ func writeJSONRPCEvent(event string, params interface{}, session *Session) { } requestBytes, err := json.Marshal(request) if err != nil { - logger.Warn().Err(err).Msg("Error marshalling JSONRPC event") + jsonRpcLogger.Warn().Err(err).Msg("Error marshalling JSONRPC event") return } if session == nil || session.RPCChannel == nil { - logger.Info().Msg("RPC channel not available") + jsonRpcLogger.Info().Msg("RPC channel not available") return } - err = session.RPCChannel.SendText(string(requestBytes)) + + requestString := string(requestBytes) + jsonRpcLogger.Info().Str("data", requestString).Msg("Sending JSONRPC event") + + err = session.RPCChannel.SendText(requestString) if err != nil { - logger.Warn().Err(err).Msg("Error sending JSONRPC event") + jsonRpcLogger.Warn().Err(err).Str("data", requestString).Msg("Error sending JSONRPC event") return } } @@ -83,6 +87,11 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) { var request JSONRPCRequest err := json.Unmarshal(message.Data, &request) if err != nil { + jsonRpcLogger.Warn(). + Str("data", string(message.Data)). + Err(err). + Msg("Error unmarshalling JSONRPC request") + errorResponse := JSONRPCResponse{ JSONRPC: "2.0", Error: map[string]interface{}{ @@ -95,7 +104,13 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) { return } - logger.Trace().Str("method", request.Method).Interface("params", request.Params).Interface("id", request.ID).Msg("Received RPC request") + scopedLogger := jsonRpcLogger.With(). + Str("method", request.Method). + Interface("params", request.Params). + Interface("id", request.ID).Logger() + + scopedLogger.Trace().Msg("Received RPC request") + handler, ok := rpcHandlers[request.Method] if !ok { errorResponse := JSONRPCResponse{ @@ -110,9 +125,10 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) { return } - logger.Trace().Str("method", request.Method).Interface("id", request.ID).Msg("Calling RPC handler") + scopedLogger.Trace().Msg("Calling RPC handler") result, err := callRPCHandler(handler, request.Params) if err != nil { + scopedLogger.Error().Err(err).Msg("Error calling RPC handler") errorResponse := JSONRPCResponse{ JSONRPC: "2.0", Error: map[string]interface{}{ @@ -126,7 +142,8 @@ func onRPCMessage(message webrtc.DataChannelMessage, session *Session) { return } - logger.Trace().Interface("result", result).Interface("id", request.ID).Msg("RPC handler returned") + scopedLogger.Trace().Interface("result", result).Msg("RPC handler returned") + response := JSONRPCResponse{ JSONRPC: "2.0", Result: result, diff --git a/log.go b/log.go index 5dac1f6..caea975 100644 --- a/log.go +++ b/log.go @@ -49,8 +49,11 @@ var ( ntpLogger = getLogger("ntp") displayLogger = getLogger("display") usbLogger = getLogger("usb") - ginLogger = getLogger("gin") + jsonRpcLogger = getLogger("jsonrpc") websecureLogger = getLogger("websecure") + + // external components + ginLogger = getLogger("gin") ) func updateLogLevel() { From 5f7dded973d0978472f72d53c1e0a8620a609c1e Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 07:42:47 +0200 Subject: [PATCH 02/13] chore(log): add watchdogLogger --- hw.go | 6 +++--- log.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hw.go b/hw.go index 02e4815..21bffad 100644 --- a/hw.go +++ b/hw.go @@ -54,7 +54,7 @@ func GetDeviceID() string { func runWatchdog() { file, err := os.OpenFile("/dev/watchdog", os.O_WRONLY, 0) if err != nil { - logger.Warn().Err(err).Msg("unable to open /dev/watchdog, skipping watchdog reset") + watchdogLogger.Warn().Err(err).Msg("unable to open /dev/watchdog, skipping watchdog reset") return } defer file.Close() @@ -65,13 +65,13 @@ func runWatchdog() { case <-ticker.C: _, err = file.Write([]byte{0}) if err != nil { - logger.Warn().Err(err).Msg("error writing to /dev/watchdog, system may reboot") + watchdogLogger.Warn().Err(err).Msg("error writing to /dev/watchdog, system may reboot") } case <-appCtx.Done(): //disarm watchdog with magic value _, err := file.Write([]byte("V")) if err != nil { - logger.Warn().Err(err).Msg("failed to disarm watchdog, system may reboot") + watchdogLogger.Warn().Err(err).Msg("failed to disarm watchdog, system may reboot") } return } diff --git a/log.go b/log.go index caea975..dd39048 100644 --- a/log.go +++ b/log.go @@ -50,8 +50,8 @@ var ( displayLogger = getLogger("display") usbLogger = getLogger("usb") jsonRpcLogger = getLogger("jsonrpc") + watchdogLogger = getLogger("watchdog") websecureLogger = getLogger("websecure") - // external components ginLogger = getLogger("gin") ) From 48a917fd76516b097ed9e0eaa4328752f38ebefd Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 07:49:03 +0200 Subject: [PATCH 03/13] chore(log): add otaLogger --- log.go | 1 + ota.go | 59 +++++++++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/log.go b/log.go index dd39048..9f531bf 100644 --- a/log.go +++ b/log.go @@ -52,6 +52,7 @@ var ( jsonRpcLogger = getLogger("jsonrpc") watchdogLogger = getLogger("watchdog") websecureLogger = getLogger("websecure") + otaLogger = getLogger("ota") // external components ginLogger = getLogger("gin") ) diff --git a/ota.go b/ota.go index 64d7a26..020c743 100644 --- a/ota.go +++ b/ota.go @@ -16,6 +16,7 @@ import ( "time" "github.com/Masterminds/semver/v3" + "github.com/rs/zerolog" ) type UpdateMetadata struct { @@ -191,7 +192,11 @@ func downloadFile(ctx context.Context, path string, url string, downloadProgress return nil } -func verifyFile(path string, expectedHash string, verifyProgress *float32) error { +func verifyFile(path string, expectedHash string, verifyProgress *float32, scopedLogger *zerolog.Logger) error { + if scopedLogger == nil { + scopedLogger = &otaLogger + } + unverifiedPath := path + ".unverified" fileToHash, err := os.Open(unverifiedPath) if err != nil { @@ -235,7 +240,7 @@ func verifyFile(path string, expectedHash string, verifyProgress *float32) error } hashSum := hash.Sum(nil) - logger.Info().Str("path", path).Str("hash", hex.EncodeToString(hashSum)).Msg("SHA256 hash of") + scopedLogger.Info().Str("path", path).Str("hash", hex.EncodeToString(hashSum)).Msg("SHA256 hash of") if hex.EncodeToString(hashSum) != expectedHash { return fmt.Errorf("hash mismatch: %x != %s", hashSum, expectedHash) @@ -285,7 +290,12 @@ func triggerOTAStateUpdate() { } func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) error { - logger.Info().Msg("Trying to update...") + scopedLogger := otaLogger.With(). + Str("deviceId", deviceId). + Str("includePreRelease", fmt.Sprintf("%v", includePreRelease)). + Logger() + + scopedLogger.Info().Msg("Trying to update...") if otaState.Updating { return fmt.Errorf("update already in progress") } @@ -303,6 +313,7 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err updateStatus, err := GetUpdateStatus(ctx, deviceId, includePreRelease) if err != nil { otaState.Error = fmt.Sprintf("Error checking for updates: %v", err) + scopedLogger.Error().Err(err).Msg("Error checking for updates") return fmt.Errorf("error checking for updates: %w", err) } @@ -320,11 +331,15 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err rebootNeeded := false if appUpdateAvailable { - logger.Info().Str("local", local.AppVersion).Str("remote", remote.AppVersion).Msg("App update available") + scopedLogger.Info(). + Str("local", local.AppVersion). + Str("remote", remote.AppVersion). + Msg("App update available") err := downloadFile(ctx, "/userdata/jetkvm/jetkvm_app.update", remote.AppUrl, &otaState.AppDownloadProgress) if err != nil { otaState.Error = fmt.Sprintf("Error downloading app update: %v", err) + scopedLogger.Error().Err(err).Msg("Error downloading app update") triggerOTAStateUpdate() return err } @@ -333,9 +348,15 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err otaState.AppDownloadProgress = 1 triggerOTAStateUpdate() - err = verifyFile("/userdata/jetkvm/jetkvm_app.update", remote.AppHash, &otaState.AppVerificationProgress) + err = verifyFile( + "/userdata/jetkvm/jetkvm_app.update", + remote.AppHash, + &otaState.AppVerificationProgress, + &scopedLogger, + ) if err != nil { otaState.Error = fmt.Sprintf("Error verifying app update hash: %v", err) + scopedLogger.Error().Err(err).Msg("Error verifying app update hash") triggerOTAStateUpdate() return err } @@ -346,18 +367,22 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err otaState.AppUpdateProgress = 1 triggerOTAStateUpdate() - logger.Info().Msg("App update downloaded") + scopedLogger.Info().Msg("App update downloaded") rebootNeeded = true } else { - logger.Info().Msg("App is up to date") + scopedLogger.Info().Msg("App is up to date") } if systemUpdateAvailable { - logger.Info().Str("local", local.SystemVersion).Str("remote", remote.SystemVersion).Msg("System update available") + scopedLogger.Info(). + Str("local", local.SystemVersion). + Str("remote", remote.SystemVersion). + Msg("System update available") err := downloadFile(ctx, "/userdata/jetkvm/update_system.tar", remote.SystemUrl, &otaState.SystemDownloadProgress) if err != nil { otaState.Error = fmt.Sprintf("Error downloading system update: %v", err) + scopedLogger.Error().Err(err).Msg("Error downloading system update") triggerOTAStateUpdate() return err } @@ -369,15 +394,17 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err err = verifyFile("/userdata/jetkvm/update_system.tar", remote.SystemHash, &otaState.SystemVerificationProgress) if err != nil { otaState.Error = fmt.Sprintf("Error verifying system update hash: %v", err) + scopedLogger.Error().Err(err).Msg("Error verifying system update hash") triggerOTAStateUpdate() return err } - logger.Info().Msg("System update downloaded") + scopedLogger.Info().Msg("System update downloaded") verifyFinished := time.Now() otaState.SystemVerifiedAt = &verifyFinished otaState.SystemVerificationProgress = 1 triggerOTAStateUpdate() + scopedLogger.Info().Msg("Starting rk_ota command") cmd := exec.Command("rk_ota", "--misc=update", "--tar_path=/userdata/jetkvm/update_system.tar", "--save_dir=/userdata/jetkvm/ota_save", "--partition=all") var b bytes.Buffer cmd.Stdout = &b @@ -385,6 +412,7 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err err = cmd.Start() if err != nil { otaState.Error = fmt.Sprintf("Error starting rk_ota command: %v", err) + scopedLogger.Error().Err(err).Msg("Error starting rk_ota command") return fmt.Errorf("error starting rk_ota command: %w", err) } ctx, cancel := context.WithCancel(context.Background()) @@ -416,25 +444,30 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err output := b.String() if err != nil { otaState.Error = fmt.Sprintf("Error executing rk_ota command: %v\nOutput: %s", err, output) + scopedLogger.Error(). + Err(err). + Str("output", output). + Int("exitCode", cmd.ProcessState.ExitCode()). + Msg("Error executing rk_ota command") return fmt.Errorf("error executing rk_ota command: %w\nOutput: %s", err, output) } - - logger.Info().Str("output", output).Msg("rk_ota success") + scopedLogger.Info().Str("output", output).Msg("rk_ota success") otaState.SystemUpdateProgress = 1 otaState.SystemUpdatedAt = &verifyFinished triggerOTAStateUpdate() rebootNeeded = true } else { - logger.Info().Msg("System is up to date") + scopedLogger.Info().Msg("System is up to date") } if rebootNeeded { - logger.Info().Msg("System Rebooting in 10s") + scopedLogger.Info().Msg("System Rebooting in 10s") time.Sleep(10 * time.Second) cmd := exec.Command("reboot") err := cmd.Start() if err != nil { otaState.Error = fmt.Sprintf("Failed to start reboot: %v", err) + scopedLogger.Error().Err(err).Msg("Failed to start reboot") return fmt.Errorf("failed to start reboot: %w", err) } else { os.Exit(0) From 612c50bfe265cfbfb6c32635ee1882d26d25bf95 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 07:56:18 +0200 Subject: [PATCH 04/13] chore(log): add serialLogger --- log.go | 1 + serial.go | 47 +++++++++++++++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/log.go b/log.go index 9f531bf..616fd71 100644 --- a/log.go +++ b/log.go @@ -53,6 +53,7 @@ var ( watchdogLogger = getLogger("watchdog") websecureLogger = getLogger("websecure") otaLogger = getLogger("ota") + serialLogger = getLogger("serial") // external components ginLogger = getLogger("gin") ) diff --git a/serial.go b/serial.go index 732c022..214a465 100644 --- a/serial.go +++ b/serial.go @@ -35,17 +35,19 @@ var ( ) func runATXControl() { + scopedLogger := serialLogger.With().Str("service", "atx_control").Logger() + reader := bufio.NewReader(port) for { line, err := reader.ReadString('\n') if err != nil { - logger.Warn().Err(err).Msg("Error reading from serial port") + scopedLogger.Warn().Err(err).Msg("Error reading from serial port") return } // Each line should be 4 binary digits + newline if len(line) != 5 { - logger.Warn().Int("length", len(line)).Msg("Invalid line length") + scopedLogger.Warn().Int("length", len(line)).Msg("Invalid line length") continue } @@ -66,7 +68,12 @@ func runATXControl() { newLedPWRState != ledPWRState || newBtnRSTState != btnRSTState || newBtnPWRState != btnPWRState { - logger.Debug().Bool("hdd", newLedHDDState).Bool("pwr", newLedPWRState).Bool("rst", newBtnRSTState).Bool("pwr", newBtnPWRState).Msg("Status changed") + scopedLogger.Debug(). + Bool("hdd", newLedHDDState). + Bool("pwr", newLedPWRState). + Bool("rst", newBtnRSTState). + Bool("pwr", newBtnPWRState). + Msg("Status changed") // Update states ledHDDState = newLedHDDState @@ -133,45 +140,46 @@ func unmountDCControl() error { var dcState DCPowerState func runDCControl() { + scopedLogger := serialLogger.With().Str("service", "dc_control").Logger() reader := bufio.NewReader(port) for { line, err := reader.ReadString('\n') if err != nil { - logger.Warn().Err(err).Msg("Error reading from serial port") + scopedLogger.Warn().Err(err).Msg("Error reading from serial port") return } // Split the line by semicolon parts := strings.Split(strings.TrimSpace(line), ";") if len(parts) != 4 { - logger.Warn().Str("line", line).Msg("Invalid line") + scopedLogger.Warn().Str("line", line).Msg("Invalid line") continue } // Parse new states powerState, err := strconv.Atoi(parts[0]) if err != nil { - logger.Warn().Err(err).Msg("Invalid power state") + scopedLogger.Warn().Err(err).Msg("Invalid power state") continue } dcState.IsOn = powerState == 1 milliVolts, err := strconv.ParseFloat(parts[1], 64) if err != nil { - logger.Warn().Err(err).Msg("Invalid voltage") + scopedLogger.Warn().Err(err).Msg("Invalid voltage") continue } volts := milliVolts / 1000 // Convert mV to V milliAmps, err := strconv.ParseFloat(parts[2], 64) if err != nil { - logger.Warn().Err(err).Msg("Invalid current") + scopedLogger.Warn().Err(err).Msg("Invalid current") continue } amps := milliAmps / 1000 // Convert mA to A milliWatts, err := strconv.ParseFloat(parts[3], 64) if err != nil { - logger.Warn().Err(err).Msg("Invalid power") + scopedLogger.Warn().Err(err).Msg("Invalid power") continue } watts := milliWatts / 1000 // Convert mW to W @@ -225,12 +233,19 @@ func reopenSerialPort() error { var err error port, err = serial.Open(serialPortPath, defaultMode) if err != nil { - logger.Warn().Err(err).Msg("Error opening serial port") + serialLogger.Error(). + Err(err). + Str("path", serialPortPath). + Interface("mode", defaultMode). + Msg("Error opening serial port") } return nil } func handleSerialChannel(d *webrtc.DataChannel) { + scopedLogger := serialLogger.With(). + Uint16("data_channel_id", *d.ID()).Logger() + d.OnOpen(func() { go func() { buf := make([]byte, 1024) @@ -238,13 +253,13 @@ func handleSerialChannel(d *webrtc.DataChannel) { n, err := port.Read(buf) if err != nil { if err != io.EOF { - logger.Warn().Err(err).Msg("Failed to read from serial port") + scopedLogger.Warn().Err(err).Msg("Failed to read from serial port") } break } err = d.Send(buf[:n]) if err != nil { - logger.Warn().Err(err).Msg("Failed to send serial output") + scopedLogger.Warn().Err(err).Msg("Failed to send serial output") break } } @@ -257,11 +272,15 @@ func handleSerialChannel(d *webrtc.DataChannel) { } _, err := port.Write(msg.Data) if err != nil { - logger.Warn().Err(err).Msg("Failed to write to serial") + scopedLogger.Warn().Err(err).Msg("Failed to write to serial") } }) - d.OnClose(func() { + d.OnError(func(err error) { + scopedLogger.Warn().Err(err).Msg("Serial channel error") + }) + d.OnClose(func() { + scopedLogger.Info().Msg("Serial channel closed") }) } From d5f8e51a14b615c428fb5bc0589a930ab6d51d98 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 07:58:11 +0200 Subject: [PATCH 05/13] chore(log): add terminalLogger --- log.go | 5 +++-- terminal.go | 18 +++++++++++++----- 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/log.go b/log.go index 616fd71..f52059e 100644 --- a/log.go +++ b/log.go @@ -47,13 +47,14 @@ var ( websocketLogger = getLogger("websocket") nativeLogger = getLogger("native") ntpLogger = getLogger("ntp") - displayLogger = getLogger("display") - usbLogger = getLogger("usb") jsonRpcLogger = getLogger("jsonrpc") watchdogLogger = getLogger("watchdog") websecureLogger = getLogger("websecure") otaLogger = getLogger("ota") serialLogger = getLogger("serial") + terminalLogger = getLogger("terminal") + displayLogger = getLogger("display") + usbLogger = getLogger("usb") // external components ginLogger = getLogger("gin") ) diff --git a/terminal.go b/terminal.go index 2200064..e98b4de 100644 --- a/terminal.go +++ b/terminal.go @@ -16,6 +16,9 @@ type TerminalSize struct { } func handleTerminalChannel(d *webrtc.DataChannel) { + scopedLogger := terminalLogger.With(). + Uint16("data_channel_id", *d.ID()).Logger() + var ptmx *os.File var cmd *exec.Cmd d.OnOpen(func() { @@ -23,7 +26,7 @@ func handleTerminalChannel(d *webrtc.DataChannel) { var err error ptmx, err = pty.Start(cmd) if err != nil { - logger.Warn().Err(err).Msg("Failed to start pty") + scopedLogger.Warn().Err(err).Msg("Failed to start pty") d.Close() return } @@ -34,13 +37,13 @@ func handleTerminalChannel(d *webrtc.DataChannel) { n, err := ptmx.Read(buf) if err != nil { if err != io.EOF { - logger.Warn().Err(err).Msg("Failed to read from pty") + scopedLogger.Warn().Err(err).Msg("Failed to read from pty") } break } err = d.Send(buf[:n]) if err != nil { - logger.Warn().Err(err).Msg("Failed to send pty output") + scopedLogger.Warn().Err(err).Msg("Failed to send pty output") break } } @@ -63,11 +66,11 @@ func handleTerminalChannel(d *webrtc.DataChannel) { return } } - logger.Warn().Err(err).Msg("Failed to parse terminal size") + scopedLogger.Warn().Err(err).Msg("Failed to parse terminal size") } _, err := ptmx.Write(msg.Data) if err != nil { - logger.Warn().Err(err).Msg("Failed to write to pty") + scopedLogger.Warn().Err(err).Msg("Failed to write to pty") } }) @@ -78,5 +81,10 @@ func handleTerminalChannel(d *webrtc.DataChannel) { if cmd != nil && cmd.Process != nil { _ = cmd.Process.Kill() } + scopedLogger.Info().Msg("Terminal channel closed") + }) + + d.OnError(func(err error) { + scopedLogger.Warn().Err(err).Msg("Terminal channel error") }) } From e08ff425c3a1e8e86e4548ab4cbab87b3d7ef9d5 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 08:05:04 +0200 Subject: [PATCH 06/13] chore(log): add webRtcLogger --- cloud.go | 11 ++++++++++- log.go | 1 + web.go | 2 +- webrtc.go | 30 +++++++++++++++++++++--------- 4 files changed, 33 insertions(+), 11 deletions(-) diff --git a/cloud.go b/cloud.go index a999fc8..579d1f6 100644 --- a/cloud.go +++ b/cloud.go @@ -19,6 +19,7 @@ import ( "github.com/coder/websocket" "github.com/gin-gonic/gin" + "github.com/rs/zerolog" ) type CloudRegisterRequest struct { @@ -355,7 +356,14 @@ func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessi return nil } -func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest, isCloudConnection bool, source string) error { +func handleSessionRequest( + ctx context.Context, + c *websocket.Conn, + req WebRTCSessionRequest, + isCloudConnection bool, + source string, + scopedLogger *zerolog.Logger, +) error { var sourceType string if isCloudConnection { sourceType = "cloud" @@ -381,6 +389,7 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess IsCloud: isCloudConnection, LocalIP: req.IP, ICEServers: req.ICEServers, + Logger: scopedLogger, }) if err != nil { _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) diff --git a/log.go b/log.go index f52059e..9b9a9e5 100644 --- a/log.go +++ b/log.go @@ -45,6 +45,7 @@ var ( logger = getLogger("jetkvm") cloudLogger = getLogger("cloud") websocketLogger = getLogger("websocket") + webrtcLogger = getLogger("webrtc") nativeLogger = getLogger("native") ntpLogger = getLogger("ntp") jsonRpcLogger = getLogger("jsonrpc") diff --git a/web.go b/web.go index 8bf34af..b696637 100644 --- a/web.go +++ b/web.go @@ -363,7 +363,7 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool, metricConnectionSessionRequestCount.WithLabelValues(sourceType, source).Inc() metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime() - err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection, source) + err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection, source, &l) if err != nil { l.Warn().Str("error", err.Error()).Msg("error starting new session") continue diff --git a/webrtc.go b/webrtc.go index d01b4b6..5bc6623 100644 --- a/webrtc.go +++ b/webrtc.go @@ -11,6 +11,7 @@ import ( "github.com/coder/websocket/wsjson" "github.com/gin-gonic/gin" "github.com/pion/webrtc/v4" + "github.com/rs/zerolog" ) type Session struct { @@ -28,6 +29,7 @@ type SessionConfig struct { LocalIP string IsCloud bool ws *websocket.Conn + Logger *zerolog.Logger } func (s *Session) ExchangeOffer(offerStr string) (string, error) { @@ -70,19 +72,27 @@ func newSession(config SessionConfig) (*Session, error) { } iceServer := webrtc.ICEServer{} + var scopedLogger *zerolog.Logger + if config.Logger != nil { + l := config.Logger.With().Str("component", "webrtc").Logger() + scopedLogger = &l + } else { + scopedLogger = &webrtcLogger + } + if config.IsCloud { if config.ICEServers == nil { - logger.Info().Msg("ICE Servers not provided by cloud") + scopedLogger.Info().Msg("ICE Servers not provided by cloud") } else { iceServer.URLs = config.ICEServers - logger.Info().Interface("iceServers", iceServer.URLs).Msg("Using ICE Servers provided by cloud") + scopedLogger.Info().Interface("iceServers", iceServer.URLs).Msg("Using ICE Servers provided by cloud") } if config.LocalIP == "" || net.ParseIP(config.LocalIP) == nil { - logger.Info().Str("localIP", config.LocalIP).Msg("Local IP address not provided or invalid, won't set NAT1To1IPs") + scopedLogger.Info().Str("localIP", config.LocalIP).Msg("Local IP address not provided or invalid, won't set NAT1To1IPs") } else { webrtcSettingEngine.SetNAT1To1IPs([]string{config.LocalIP}, webrtc.ICECandidateTypeSrflx) - logger.Info().Str("localIP", config.LocalIP).Msg("Setting NAT1To1IPs") + scopedLogger.Info().Str("localIP", config.LocalIP).Msg("Setting NAT1To1IPs") } } @@ -96,7 +106,7 @@ func newSession(config SessionConfig) (*Session, error) { session := &Session{peerConnection: peerConnection} peerConnection.OnDataChannel(func(d *webrtc.DataChannel) { - logger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") + scopedLogger.Info().Str("label", d.Label()).Uint16("id", *d.ID()).Msg("New DataChannel") switch d.Label() { case "rpc": session.RPCChannel = d @@ -144,17 +154,17 @@ func newSession(config SessionConfig) (*Session, error) { var isConnected bool peerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) { - logger.Info().Interface("candidate", candidate).Msg("Our WebRTC peerConnection has a new ICE candidate") + scopedLogger.Info().Interface("candidate", candidate).Msg("WebRTC peerConnection has a new ICE candidate") if candidate != nil { err := wsjson.Write(context.Background(), config.ws, gin.H{"type": "new-ice-candidate", "data": candidate.ToJSON()}) if err != nil { - logger.Warn().Err(err).Msg("failed to write new-ice-candidate to WebRTC signaling channel") + scopedLogger.Warn().Err(err).Msg("failed to write new-ice-candidate to WebRTC signaling channel") } } }) peerConnection.OnICEConnectionStateChange(func(connectionState webrtc.ICEConnectionState) { - logger.Info().Str("connectionState", connectionState.String()).Msg("Connection State has changed") + scopedLogger.Info().Str("connectionState", connectionState.String()).Msg("ICE Connection State has changed") if connectionState == webrtc.ICEConnectionStateConnected { if !isConnected { isConnected = true @@ -167,15 +177,17 @@ func newSession(config SessionConfig) (*Session, error) { } //state changes on closing browser tab disconnected->failed, we need to manually close it if connectionState == webrtc.ICEConnectionStateFailed { + scopedLogger.Debug().Msg("ICE Connection State is failed, closing peerConnection") _ = peerConnection.Close() } if connectionState == webrtc.ICEConnectionStateClosed { + scopedLogger.Debug().Msg("ICE Connection State is closed, unmounting virtual media") if session == currentSession { currentSession = nil } if session.shouldUmountVirtualMedia { err := rpcUnmountImage() - logger.Debug().Err(err).Msg("unmount image failed on connection close") + scopedLogger.Warn().Err(err).Msg("unmount image failed on connection close") } if isConnected { isConnected = false From 64894216058b9bc1fcaaca511c93a1482015c10b Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 08:05:35 +0200 Subject: [PATCH 07/13] fix(ota): verifyFile missing arguments --- ota.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ota.go b/ota.go index 020c743..ded1353 100644 --- a/ota.go +++ b/ota.go @@ -391,7 +391,12 @@ func TryUpdate(ctx context.Context, deviceId string, includePreRelease bool) err otaState.SystemDownloadProgress = 1 triggerOTAStateUpdate() - err = verifyFile("/userdata/jetkvm/update_system.tar", remote.SystemHash, &otaState.SystemVerificationProgress) + err = verifyFile( + "/userdata/jetkvm/update_system.tar", + remote.SystemHash, + &otaState.SystemVerificationProgress, + &scopedLogger, + ) if err != nil { otaState.Error = fmt.Sprintf("Error verifying system update hash: %v", err) scopedLogger.Error().Err(err).Msg("Error verifying system update hash") From 924b55059ff19d47c000b303e8de9d2e2088f8ee Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 08:14:44 +0200 Subject: [PATCH 08/13] chore(log): add wolLogger --- log.go | 14 ++++++++++++++ wol.go | 9 +++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/log.go b/log.go index 9b9a9e5..3ee8473 100644 --- a/log.go +++ b/log.go @@ -55,11 +55,25 @@ var ( serialLogger = getLogger("serial") terminalLogger = getLogger("terminal") displayLogger = getLogger("display") + wolLogger = getLogger("wol") usbLogger = getLogger("usb") // external components ginLogger = getLogger("gin") ) +func ErrorfL(l *zerolog.Logger, format string, err error, args ...interface{}) error { + if l == nil { + l = &logger + } + + l.Error().Err(err).Msgf(format, args...) + + err_msg := err.Error() + ": %v" + err_args := append(args, err) + + return fmt.Errorf(err_msg, err_args...) +} + func updateLogLevel() { scopeLevelMutex.Lock() defer scopeLevelMutex.Unlock() diff --git a/wol.go b/wol.go index f0e68bb..d6fc8bc 100644 --- a/wol.go +++ b/wol.go @@ -3,7 +3,6 @@ package kvm import ( "bytes" "encoding/binary" - "fmt" "net" ) @@ -12,7 +11,7 @@ func rpcSendWOLMagicPacket(macAddress string) error { // Parse the MAC address mac, err := net.ParseMAC(macAddress) if err != nil { - return fmt.Errorf("invalid MAC address: %v", err) + return ErrorfL(&wolLogger, "invalid MAC address", err) } // Create the magic packet @@ -21,16 +20,18 @@ func rpcSendWOLMagicPacket(macAddress string) error { // Set up UDP connection conn, err := net.Dial("udp", "255.255.255.255:9") if err != nil { - return fmt.Errorf("failed to establish UDP connection: %v", err) + return ErrorfL(&wolLogger, "failed to establish UDP connection", err) } defer conn.Close() // Send the packet _, err = conn.Write(packet) if err != nil { - return fmt.Errorf("failed to send WOL packet: %v", err) + return ErrorfL(&wolLogger, "failed to send WOL packet", err) } + wolLogger.Info().Str("mac", macAddress).Msg("WOL packet sent") + return nil } From 0ba7902f82cfd574d760d4aece4c7d3c4771e319 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 12:55:36 +0200 Subject: [PATCH 09/13] chore: update logging --- config.go | 4 + display.go | 9 ++- jsonrpc.go | 8 +- log.go | 227 +++++++++++++++++++++++++++++++++++++---------------- main.go | 10 ++- native.go | 56 ++++++++----- network.go | 19 +++-- ota.go | 2 +- usb.go | 2 +- web.go | 2 +- web_tls.go | 4 +- webrtc.go | 2 +- wol.go | 6 +- 13 files changed, 246 insertions(+), 105 deletions(-) diff --git a/config.go b/config.go index f19b6e0..cf096a7 100644 --- a/config.go +++ b/config.go @@ -93,6 +93,7 @@ type Config struct { TLSMode string `json:"tls_mode"` // options: "self-signed", "user-defined", "" UsbConfig *usbgadget.Config `json:"usb_config"` UsbDevices *usbgadget.Devices `json:"usb_devices"` + DefaultLogLevel string `json:"default_log_level"` } const configPath = "/userdata/kvm_config.json" @@ -120,6 +121,7 @@ var defaultConfig = &Config{ Keyboard: true, MassStorage: true, }, + DefaultLogLevel: "INFO", } var ( @@ -163,6 +165,8 @@ func LoadConfig() { } config = &loadedConfig + + rootLogger.UpdateLogLevel() } func SaveConfig() error { diff --git a/display.go b/display.go index a5528a3..38e12b1 100644 --- a/display.go +++ b/display.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strconv" + "sync" "time" ) @@ -70,9 +71,15 @@ func updateDisplay() { } } -var displayInited = false +var ( + displayInited = false + displayUpdateLock = sync.Mutex{} +) func requestDisplayUpdate() { + displayUpdateLock.Lock() + defer displayUpdateLock.Unlock() + if !displayInited { displayLogger.Info().Msg("display not inited, skipping updates") return diff --git a/jsonrpc.go b/jsonrpc.go index d788e86..0c7d7fd 100644 --- a/jsonrpc.go +++ b/jsonrpc.go @@ -74,11 +74,15 @@ func writeJSONRPCEvent(event string, params interface{}, session *Session) { } requestString := string(requestBytes) - jsonRpcLogger.Info().Str("data", requestString).Msg("Sending JSONRPC event") + scopedLogger := jsonRpcLogger.With(). + Str("data", requestString). + Logger() + + scopedLogger.Info().Msg("sending JSONRPC event") err = session.RPCChannel.SendText(requestString) if err != nil { - jsonRpcLogger.Warn().Err(err).Str("data", requestString).Msg("Error sending JSONRPC event") + scopedLogger.Warn().Err(err).Msg("error sending JSONRPC event") return } } diff --git a/log.go b/log.go index 3ee8473..b97a473 100644 --- a/log.go +++ b/log.go @@ -12,8 +12,36 @@ import ( "github.com/rs/zerolog" ) +type Logger struct { + l *zerolog.Logger + scopeLoggers map[string]*zerolog.Logger + scopeLevels map[string]zerolog.Level + scopeLevelMutex sync.Mutex + + defaultLogLevelFromEnv zerolog.Level + defaultLogLevelFromConfig zerolog.Level + defaultLogLevel zerolog.Level +} + +const ( + defaultLogLevel = zerolog.ErrorLevel +) + +type logOutput struct { + mu *sync.Mutex +} + +func (w *logOutput) Write(p []byte) (n int, err error) { + w.mu.Lock() + defer w.mu.Unlock() + + // TODO: write to file or syslog + + return len(p), nil +} + var ( - defaultLogOutput io.Writer = zerolog.ConsoleWriter{ + consoleLogOutput io.Writer = zerolog.ConsoleWriter{ Out: os.Stdout, TimeFormat: time.RFC3339, PartsOrder: []string{"time", "level", "scope", "component", "message"}, @@ -28,57 +56,10 @@ var ( return val }, } - defaultLogLevel = zerolog.ErrorLevel - rootLogger = zerolog.New(defaultLogOutput).With(). - Str("scope", "jetkvm"). - Timestamp(). - Stack(). - Logger() -) + fileLogOutput io.Writer = &logOutput{mu: &sync.Mutex{}} + defaultLogOutput = zerolog.MultiLevelWriter(consoleLogOutput, fileLogOutput) -var ( - scopeLevels map[string]zerolog.Level - scopeLevelMutex = sync.Mutex{} -) - -var ( - logger = getLogger("jetkvm") - cloudLogger = getLogger("cloud") - websocketLogger = getLogger("websocket") - webrtcLogger = getLogger("webrtc") - nativeLogger = getLogger("native") - ntpLogger = getLogger("ntp") - jsonRpcLogger = getLogger("jsonrpc") - watchdogLogger = getLogger("watchdog") - websecureLogger = getLogger("websecure") - otaLogger = getLogger("ota") - serialLogger = getLogger("serial") - terminalLogger = getLogger("terminal") - displayLogger = getLogger("display") - wolLogger = getLogger("wol") - usbLogger = getLogger("usb") - // external components - ginLogger = getLogger("gin") -) - -func ErrorfL(l *zerolog.Logger, format string, err error, args ...interface{}) error { - if l == nil { - l = &logger - } - - l.Error().Err(err).Msgf(format, args...) - - err_msg := err.Error() + ": %v" - err_args := append(args, err) - - return fmt.Errorf(err_msg, err_args...) -} - -func updateLogLevel() { - scopeLevelMutex.Lock() - defer scopeLevelMutex.Unlock() - - logLevels := map[string]zerolog.Level{ + zerologLevels = map[string]zerolog.Level{ "DISABLE": zerolog.Disabled, "NOLEVEL": zerolog.NoLevel, "PANIC": zerolog.PanicLevel, @@ -90,9 +71,35 @@ func updateLogLevel() { "TRACE": zerolog.TraceLevel, } - scopeLevels = make(map[string]zerolog.Level) + rootZerologLogger = zerolog.New(defaultLogOutput).With(). + Str("scope", "jetkvm"). + Timestamp(). + Stack(). + Logger() + rootLogger = NewLogger(rootZerologLogger) +) - for name, level := range logLevels { +func NewLogger(zerologLogger zerolog.Logger) *Logger { + return &Logger{ + l: &zerologLogger, + scopeLoggers: make(map[string]*zerolog.Logger), + scopeLevels: make(map[string]zerolog.Level), + scopeLevelMutex: sync.Mutex{}, + defaultLogLevelFromEnv: -2, + defaultLogLevelFromConfig: -2, + defaultLogLevel: defaultLogLevel, + } +} + +func (l *Logger) updateLogLevel() { + l.scopeLevelMutex.Lock() + defer l.scopeLevelMutex.Unlock() + + l.scopeLevels = make(map[string]zerolog.Level) + + finalDefaultLogLevel := l.defaultLogLevel + + for name, level := range zerologLevels { env := os.Getenv(fmt.Sprintf("JETKVM_LOG_%s", name)) if env == "" { @@ -108,8 +115,10 @@ func updateLogLevel() { } if strings.ToLower(env) == "all" { - if defaultLogLevel > level { - defaultLogLevel = level + l.defaultLogLevelFromEnv = level + + if finalDefaultLogLevel > level { + finalDefaultLogLevel = level } continue @@ -117,26 +126,112 @@ func updateLogLevel() { scopes := strings.Split(strings.ToLower(env), ",") for _, scope := range scopes { - scopeLevels[scope] = level + l.scopeLevels[scope] = level + } + } + + l.defaultLogLevel = finalDefaultLogLevel +} + +func (l *Logger) getScopeLoggerLevel(scope string) zerolog.Level { + if l.scopeLevels == nil { + l.updateLogLevel() + } + + var scopeLevel zerolog.Level + if l.defaultLogLevelFromConfig != -2 { + scopeLevel = l.defaultLogLevelFromConfig + } + if l.defaultLogLevelFromEnv != -2 { + scopeLevel = l.defaultLogLevelFromEnv + } + + // if the scope is not in the map, use the default level from the root logger + if level, ok := l.scopeLevels[scope]; ok { + scopeLevel = level + } + + return scopeLevel +} + +func (l *Logger) newScopeLogger(scope string) zerolog.Logger { + scopeLevel := l.getScopeLoggerLevel(scope) + logger := l.l.Level(scopeLevel).With().Str("component", scope).Logger() + + return logger +} + +func (l *Logger) getLogger(scope string) *zerolog.Logger { + logger, ok := l.scopeLoggers[scope] + if !ok || logger == nil { + scopeLogger := l.newScopeLogger(scope) + l.scopeLoggers[scope] = &scopeLogger + } + + return l.scopeLoggers[scope] +} + +func (l *Logger) UpdateLogLevel() { + needUpdate := false + + if config != nil && config.DefaultLogLevel != "" { + if logLevel, ok := zerologLevels[config.DefaultLogLevel]; ok { + l.defaultLogLevelFromConfig = logLevel + } else { + l.l.Warn().Str("logLevel", config.DefaultLogLevel).Msg("invalid defaultLogLevel from config, using ERROR") + } + + if l.defaultLogLevelFromConfig != l.defaultLogLevel { + needUpdate = true + } + } + + l.updateLogLevel() + + if needUpdate { + for scope, logger := range l.scopeLoggers { + currentLevel := logger.GetLevel() + targetLevel := l.getScopeLoggerLevel(scope) + if currentLevel != targetLevel { + *logger = l.newScopeLogger(scope) + } } } } -func getLogger(scope string) zerolog.Logger { - if scopeLevels == nil { - updateLogLevel() +func ErrorfL(l *zerolog.Logger, format string, err error, args ...interface{}) error { + if l == nil { + l = rootLogger.getLogger("jetkvm") } - l := rootLogger.With().Str("component", scope).Logger() + l.Error().Err(err).Msgf(format, args...) - // if the scope is not in the map, use the default level from the root logger - if level, ok := scopeLevels[scope]; ok { - return l.Level(level) - } + err_msg := err.Error() + ": %v" + err_args := append(args, err) - return l.Level(defaultLogLevel) + return fmt.Errorf(err_msg, err_args...) } +var ( + logger = rootLogger.getLogger("jetkvm") + cloudLogger = rootLogger.getLogger("cloud") + websocketLogger = rootLogger.getLogger("websocket") + webrtcLogger = rootLogger.getLogger("webrtc") + nativeLogger = rootLogger.getLogger("native") + ntpLogger = rootLogger.getLogger("ntp") + jsonRpcLogger = rootLogger.getLogger("jsonrpc") + watchdogLogger = rootLogger.getLogger("watchdog") + websecureLogger = rootLogger.getLogger("websecure") + otaLogger = rootLogger.getLogger("ota") + serialLogger = rootLogger.getLogger("serial") + terminalLogger = rootLogger.getLogger("terminal") + displayLogger = rootLogger.getLogger("display") + wolLogger = rootLogger.getLogger("wol") + usbLogger = rootLogger.getLogger("usb") + // external components + ginLogger = rootLogger.getLogger("gin") +) + type pionLogger struct { logger *zerolog.Logger } @@ -180,7 +275,7 @@ func (c pionLogger) Errorf(format string, args ...interface{}) { type pionLoggerFactory struct{} func (c pionLoggerFactory) NewLogger(subsystem string) logging.LeveledLogger { - logger := getLogger(subsystem).With(). + logger := rootLogger.getLogger(subsystem).With(). Str("scope", "pion"). Str("component", subsystem). Logger() diff --git a/main.go b/main.go index d74b1ef..9eab708 100644 --- a/main.go +++ b/main.go @@ -14,22 +14,26 @@ import ( var appCtx context.Context func Main() { + LoadConfig() + logger.Debug().Msg("config loaded") + var cancel context.CancelFunc appCtx, cancel = context.WithCancel(context.Background()) defer cancel() - logger.Info().Msg("Starting JetKvm") + logger.Info().Msg("starting JetKvm") + go runWatchdog() go confirmCurrentSystem() http.DefaultClient.Timeout = 1 * time.Minute - LoadConfig() - logger.Debug().Msg("config loaded") err := rootcerts.UpdateDefaultTransport() if err != nil { logger.Warn().Err(err).Msg("failed to load CA certs") } + initNetwork() + go TimeSyncLoop() StartNativeCtrlSocketServer() diff --git a/native.go b/native.go index 630093b..ded7dc4 100644 --- a/native.go +++ b/native.go @@ -75,25 +75,29 @@ func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse return nil, fmt.Errorf("error marshaling ctrl action: %w", err) } - nativeLogger.Info().Str("action", ctrlAction.Action).Msg("sending ctrl action") + scopedLogger := nativeLogger.With(). + Str("action", ctrlAction.Action). + Interface("params", ctrlAction.Params).Logger() + + scopedLogger.Debug().Msg("sending ctrl action") err = WriteCtrlMessage(jsonData) if err != nil { delete(ongoingRequests, ctrlAction.Seq) - return nil, fmt.Errorf("error writing ctrl message: %w", err) + return nil, ErrorfL(&scopedLogger, "error writing ctrl message", err) } select { case response := <-responseChan: delete(ongoingRequests, seq) if response.Error != "" { - return nil, fmt.Errorf("error native response: %s", response.Error) + return nil, ErrorfL(&scopedLogger, "error native response: %s", fmt.Errorf(response.Error)) } return response, nil case <-time.After(5 * time.Second): close(responseChan) delete(ongoingRequests, seq) - return nil, fmt.Errorf("timeout waiting for response") + return nil, ErrorfL(&scopedLogger, "timeout waiting for response", nil) } } @@ -115,31 +119,35 @@ func waitCtrlClientConnected() { } func StartNativeSocketServer(socketPath string, handleClient func(net.Conn), isCtrl bool) net.Listener { + scopedLogger := nativeLogger.With(). + Str("socket_path", socketPath). + Logger() + // Remove the socket file if it already exists if _, err := os.Stat(socketPath); err == nil { if err := os.Remove(socketPath); err != nil { - nativeLogger.Warn().Err(err).Str("socket_path", socketPath).Msg("Failed to remove existing socket file") + scopedLogger.Warn().Err(err).Msg("failed to remove existing socket file") os.Exit(1) } } listener, err := net.Listen("unixpacket", socketPath) if err != nil { - nativeLogger.Warn().Err(err).Str("socket_path", socketPath).Msg("Failed to start server") + scopedLogger.Warn().Err(err).Msg("failed to start server") os.Exit(1) } - nativeLogger.Info().Str("socket_path", socketPath).Msg("Server listening") + scopedLogger.Info().Msg("server listening") go func() { conn, err := listener.Accept() listener.Close() if err != nil { - nativeLogger.Warn().Err(err).Str("socket_path", socketPath).Msg("failed to accept sock") + scopedLogger.Warn().Err(err).Msg("failed to accept socket") } if isCtrl { close(ctrlClientConnected) - nativeLogger.Debug().Msg("first native ctrl socket client connected") + scopedLogger.Debug().Msg("first native ctrl socket client connected") } handleClient(conn) }() @@ -160,9 +168,14 @@ func StartNativeVideoSocketServer() { func handleCtrlClient(conn net.Conn) { defer conn.Close() - nativeLogger.Debug().Msg("native socket client connected") + scopedLogger := nativeLogger.With(). + Str("addr", conn.RemoteAddr().String()). + Str("type", "ctrl"). + Logger() + + scopedLogger.Info().Msg("native ctrl socket client connected") if ctrlSocketConn != nil { - nativeLogger.Debug().Msg("closing existing native socket connection") + scopedLogger.Debug().Msg("closing existing native socket connection") ctrlSocketConn.Close() } @@ -175,7 +188,7 @@ func handleCtrlClient(conn net.Conn) { for { n, err := conn.Read(readBuf) if err != nil { - nativeLogger.Warn().Err(err).Msg("error reading from ctrl sock") + scopedLogger.Warn().Err(err).Msg("error reading from ctrl sock") break } readMsg := string(readBuf[:n]) @@ -183,10 +196,10 @@ func handleCtrlClient(conn net.Conn) { ctrlResp := CtrlResponse{} err = json.Unmarshal([]byte(readMsg), &ctrlResp) if err != nil { - nativeLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing ctrl sock msg") + scopedLogger.Warn().Err(err).Str("data", readMsg).Msg("error parsing ctrl sock msg") continue } - nativeLogger.Trace().Interface("data", ctrlResp).Msg("ctrl sock msg") + scopedLogger.Trace().Interface("data", ctrlResp).Msg("ctrl sock msg") if ctrlResp.Seq != 0 { responseChan, ok := ongoingRequests[ctrlResp.Seq] @@ -200,20 +213,25 @@ func handleCtrlClient(conn net.Conn) { } } - nativeLogger.Debug().Msg("ctrl sock disconnected") + scopedLogger.Debug().Msg("ctrl sock disconnected") } func handleVideoClient(conn net.Conn) { defer conn.Close() - nativeLogger.Info().Str("addr", conn.RemoteAddr().String()).Msg("Native video socket client connected") + scopedLogger := nativeLogger.With(). + Str("addr", conn.RemoteAddr().String()). + Str("type", "video"). + Logger() + + scopedLogger.Info().Msg("native video socket client connected") inboundPacket := make([]byte, maxFrameSize) lastFrame := time.Now() for { n, err := conn.Read(inboundPacket) if err != nil { - nativeLogger.Warn().Err(err).Msg("error during read") + scopedLogger.Warn().Err(err).Msg("error during read") return } now := time.Now() @@ -222,7 +240,7 @@ func handleVideoClient(conn net.Conn) { if currentSession != nil { err := currentSession.VideoTrack.WriteSample(media.Sample{Data: inboundPacket[:n], Duration: sinceLastFrame}) if err != nil { - nativeLogger.Warn().Err(err).Msg("error writing sample") + scopedLogger.Warn().Err(err).Msg("error writing sample") } } } @@ -277,7 +295,7 @@ func ExtractAndRunNativeBin() error { } }() - nativeLogger.Info().Int("pid", cmd.Process.Pid).Msg("Binary started") + nativeLogger.Info().Int("pid", cmd.Process.Pid).Msg("jetkvm_native binary started") return nil } diff --git a/network.go b/network.go index 4051c06..6948d9a 100644 --- a/network.go +++ b/network.go @@ -105,7 +105,11 @@ func checkNetworkState() { } if newState != networkState { - logger.Info().Msg("network state changed") + logger.Info(). + Interface("newState", newState). + Interface("oldState", networkState). + Msg("network state changed") + // restart MDNS _ = startMDNS() networkState = newState @@ -116,7 +120,7 @@ func checkNetworkState() { func startMDNS() error { // If server was previously running, stop it if mDNSConn != nil { - logger.Info().Msg("Stopping mDNS server") + logger.Info().Msg("stopping mDNS server") err := mDNSConn.Close() if err != nil { logger.Warn().Err(err).Msg("failed to stop mDNS server") @@ -124,7 +128,11 @@ func startMDNS() error { } // Start a new server - logger.Info().Msg("Starting mDNS server on jetkvm.local") + hostname := "jetkvm.local" + + scopedLogger := logger.With().Str("hostname", hostname).Logger() + scopedLogger.Info().Msg("starting mDNS server") + addr4, err := net.ResolveUDPAddr("udp4", mdns.DefaultAddressIPv4) if err != nil { return err @@ -146,10 +154,11 @@ func startMDNS() error { } mDNSConn, err = mdns.Server(ipv4.NewPacketConn(l4), ipv6.NewPacketConn(l6), &mdns.Config{ - LocalNames: []string{"jetkvm.local"}, //TODO: make it configurable + LocalNames: []string{hostname}, //TODO: make it configurable LoggerFactory: defaultLoggerFactory, }) if err != nil { + scopedLogger.Warn().Err(err).Msg("failed to start mDNS server") mDNSConn = nil return err } @@ -190,7 +199,7 @@ func getNTPServersFromDHCPInfo() ([]string, error) { return servers, nil } -func init() { +func initNetwork() { ensureConfigLoaded() updates := make(chan netlink.LinkUpdate) diff --git a/ota.go b/ota.go index ded1353..a5da772 100644 --- a/ota.go +++ b/ota.go @@ -194,7 +194,7 @@ func downloadFile(ctx context.Context, path string, url string, downloadProgress func verifyFile(path string, expectedHash string, verifyProgress *float32, scopedLogger *zerolog.Logger) error { if scopedLogger == nil { - scopedLogger = &otaLogger + scopedLogger = otaLogger } unverifiedPath := path + ".unverified" diff --git a/usb.go b/usb.go index 03ea8a3..3395db4 100644 --- a/usb.go +++ b/usb.go @@ -15,7 +15,7 @@ func initUsbGadget() { "jetkvm", config.UsbDevices, config.UsbConfig, - &usbLogger, + usbLogger, ) go func() { diff --git a/web.go b/web.go index b696637..8ff5929 100644 --- a/web.go +++ b/web.go @@ -69,7 +69,7 @@ func setupRouter() *gin.Engine { r := gin.Default() r.Use(gin_logger.SetLogger( gin_logger.WithLogger(func(*gin.Context, zerolog.Logger) zerolog.Logger { - return ginLogger + return *ginLogger }), )) staticFS, _ := fs.Sub(staticFiles, "static") diff --git a/web_tls.go b/web_tls.go index 048e9fa..2989957 100644 --- a/web_tls.go +++ b/web_tls.go @@ -38,12 +38,12 @@ func initCertStore() { websecureLogger.Warn().Msg("TLS store already initialized, it should not be initialized again") return } - certStore = websecure.NewCertStore(tlsStorePath, &websecureLogger) + certStore = websecure.NewCertStore(tlsStorePath, websecureLogger) certStore.LoadCertificates() certSigner = websecure.NewSelfSigner( certStore, - &websecureLogger, + websecureLogger, webSecureSelfSignedDefaultDomain, webSecureSelfSignedOrganization, webSecureSelfSignedOU, diff --git a/webrtc.go b/webrtc.go index 5bc6623..1e093e2 100644 --- a/webrtc.go +++ b/webrtc.go @@ -77,7 +77,7 @@ func newSession(config SessionConfig) (*Session, error) { l := config.Logger.With().Str("component", "webrtc").Logger() scopedLogger = &l } else { - scopedLogger = &webrtcLogger + scopedLogger = webrtcLogger } if config.IsCloud { diff --git a/wol.go b/wol.go index d6fc8bc..47f921a 100644 --- a/wol.go +++ b/wol.go @@ -11,7 +11,7 @@ func rpcSendWOLMagicPacket(macAddress string) error { // Parse the MAC address mac, err := net.ParseMAC(macAddress) if err != nil { - return ErrorfL(&wolLogger, "invalid MAC address", err) + return ErrorfL(wolLogger, "invalid MAC address", err) } // Create the magic packet @@ -20,14 +20,14 @@ func rpcSendWOLMagicPacket(macAddress string) error { // Set up UDP connection conn, err := net.Dial("udp", "255.255.255.255:9") if err != nil { - return ErrorfL(&wolLogger, "failed to establish UDP connection", err) + return ErrorfL(wolLogger, "failed to establish UDP connection", err) } defer conn.Close() // Send the packet _, err = conn.Write(packet) if err != nil { - return ErrorfL(&wolLogger, "failed to send WOL packet", err) + return ErrorfL(wolLogger, "failed to send WOL packet", err) } wolLogger.Info().Str("mac", macAddress).Msg("WOL packet sent") From 334b3bee60471561061402e4add71194321d8933 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 13:05:03 +0200 Subject: [PATCH 10/13] chore: fix linting issue --- native.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/native.go b/native.go index ded7dc4..b61598c 100644 --- a/native.go +++ b/native.go @@ -3,6 +3,7 @@ package kvm import ( "bytes" "encoding/json" + "errors" "fmt" "io" "net" @@ -91,7 +92,11 @@ func CallCtrlAction(action string, params map[string]interface{}) (*CtrlResponse case response := <-responseChan: delete(ongoingRequests, seq) if response.Error != "" { - return nil, ErrorfL(&scopedLogger, "error native response: %s", fmt.Errorf(response.Error)) + return nil, ErrorfL( + &scopedLogger, + "error native response: %s", + errors.New(response.Error), + ) } return response, nil case <-time.After(5 * time.Second): From 8888d13824ffbc5424940bd3cb91b3a442f10e1f Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 13:08:52 +0200 Subject: [PATCH 11/13] chore(log): add nbdLogger --- block_device.go | 20 ++++++++++++++++---- log.go | 1 + 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/block_device.go b/block_device.go index 4a8769e..d93f474 100644 --- a/block_device.go +++ b/block_device.go @@ -9,6 +9,7 @@ import ( "github.com/pojntfx/go-nbd/pkg/client" "github.com/pojntfx/go-nbd/pkg/server" + "github.com/rs/zerolog" ) type remoteImageBackend struct { @@ -72,6 +73,8 @@ type NBDDevice struct { serverConn net.Conn clientConn net.Conn dev *os.File + + l *zerolog.Logger } func NewNBDDevice() *NBDDevice { @@ -90,10 +93,18 @@ func (d *NBDDevice) Start() error { return err } + if d.l == nil { + scopedLogger := nbdLogger.With(). + Str("socket_path", nbdSocketPath). + Str("device_path", nbdDevicePath). + Logger() + d.l = &scopedLogger + } + // Remove the socket file if it already exists if _, err := os.Stat(nbdSocketPath); err == nil { if err := os.Remove(nbdSocketPath); err != nil { - nativeLogger.Warn().Err(err).Str("socket_path", nbdSocketPath).Msg("Failed to remove existing socket file") + d.l.Error().Err(err).Msg("failed to remove existing socket file") os.Exit(1) } } @@ -134,7 +145,8 @@ func (d *NBDDevice) runServerConn() { MaximumBlockSize: uint32(16 * 1024), SupportsMultiConn: false, }) - nativeLogger.Info().Err(err).Msg("nbd server exited") + + d.l.Info().Err(err).Msg("nbd server exited") } func (d *NBDDevice) runClientConn() { @@ -142,14 +154,14 @@ func (d *NBDDevice) runClientConn() { ExportName: "jetkvm", BlockSize: uint32(4 * 1024), }) - nativeLogger.Info().Err(err).Msg("nbd client exited") + d.l.Info().Err(err).Msg("nbd client exited") } func (d *NBDDevice) Close() { if d.dev != nil { err := client.Disconnect(d.dev) if err != nil { - nativeLogger.Warn().Err(err).Msg("error disconnecting nbd client") + d.l.Warn().Err(err).Msg("error disconnecting nbd client") } _ = d.dev.Close() } diff --git a/log.go b/log.go index b97a473..ce5ab84 100644 --- a/log.go +++ b/log.go @@ -218,6 +218,7 @@ var ( websocketLogger = rootLogger.getLogger("websocket") webrtcLogger = rootLogger.getLogger("webrtc") nativeLogger = rootLogger.getLogger("native") + nbdLogger = rootLogger.getLogger("nbd") ntpLogger = rootLogger.getLogger("ntp") jsonRpcLogger = rootLogger.getLogger("jsonrpc") watchdogLogger = rootLogger.getLogger("watchdog") From f98eaddf15a806b572d75a043d8dfc1732df004e Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 13:12:14 +0200 Subject: [PATCH 12/13] chore(log): ntp logger --- ntp.go | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/ntp.go b/ntp.go index aa7d17a..069143d 100644 --- a/ntp.go +++ b/ntp.go @@ -1,7 +1,6 @@ package kvm import ( - "errors" "fmt" "net/http" "os/exec" @@ -111,20 +110,27 @@ func SyncSystemTime() (err error) { func queryNetworkTime() (*time.Time, error) { ntpServers, err := getNTPServersFromDHCPInfo() if err != nil { - ntpLogger.Error().Str("error", err.Error()).Msg("failed to get NTP servers from DHCP info") + ntpLogger.Info().Err(err).Msg("failed to get NTP servers from DHCP info") } if ntpServers == nil { ntpServers = defaultNTPServers - ntpLogger.Info().Str("ntp_servers", fmt.Sprintf("%v", ntpServers)).Msg("Using default NTP servers") + ntpLogger.Info(). + Interface("ntp_servers", ntpServers). + Msg("Using default NTP servers") } else { - ntpLogger.Info().Str("ntp_servers", fmt.Sprintf("%v", ntpServers)).Msg("Using NTP servers from DHCP") + ntpLogger.Info(). + Interface("ntp_servers", ntpServers). + Msg("Using NTP servers from DHCP") } for _, server := range ntpServers { now, err := queryNtpServer(server, timeSyncTimeout) if err == nil { - ntpLogger.Info().Str("ntp_server", server).Str("time", now.Format(time.RFC3339)).Msg("NTP server returned time") + ntpLogger.Info(). + Str("ntp_server", server). + Str("time", now.Format(time.RFC3339)). + Msg("NTP server returned time") return now, nil } } @@ -135,12 +141,15 @@ func queryNetworkTime() (*time.Time, error) { for _, url := range httpUrls { now, err := queryHttpTime(url, timeSyncTimeout) if err == nil { - ntpLogger.Info().Str("http_url", url).Str("time", now.Format(time.RFC3339)).Msg("HTTP server returned time") + ntpLogger.Info(). + Str("http_url", url). + Str("time", now.Format(time.RFC3339)). + Msg("HTTP server returned time") return now, nil } } - ntpLogger.Error().Msg("failed to query network time") - return nil, errors.New("failed to query network time") + + return nil, ErrorfL(ntpLogger, "failed to query network time", nil) } func queryNtpServer(server string, timeout time.Duration) (now *time.Time, err error) { From 94e83249ef829e007ba842a8e55630e9c710ef3c Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Fri, 11 Apr 2025 16:03:46 +0200 Subject: [PATCH 13/13] chore(cloud): use request id from the cloud --- cloud.go | 32 ++++++++++++++++++++++++++++---- web.go | 14 +++++++++----- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/cloud.go b/cloud.go index 579d1f6..fd96c41 100644 --- a/cloud.go +++ b/cloud.go @@ -12,6 +12,7 @@ import ( "time" "github.com/coder/websocket/wsjson" + "github.com/google/uuid" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -290,13 +291,15 @@ func runWebsocketClient() error { header.Set("Authorization", "Bearer "+config.CloudToken) dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout) - scopedLogger := websocketLogger.With(). + l := websocketLogger.With(). Str("source", wsURL.Host). Str("sourceType", "cloud"). Logger() + scopedLogger := &l + defer cancelDial() - c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{ + c, resp, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{ HTTPHeader: header, OnPingReceived: func(ctx context.Context, payload []byte) bool { scopedLogger.Info().Bytes("payload", payload).Int("length", len(payload)).Msg("ping frame received") @@ -307,6 +310,24 @@ func runWebsocketClient() error { return true }, }) + + // get the request id from the response header + connectionId := resp.Header.Get("X-Request-ID") + if connectionId == "" { + connectionId = resp.Header.Get("Cf-Ray") + } + if connectionId == "" { + connectionId = uuid.New().String() + scopedLogger.Warn(). + Str("connectionId", connectionId). + Msg("no connection id received from the server, generating a new one") + } + + lWithConnectionId := scopedLogger.With(). + Str("connectionID", connectionId). + Logger() + scopedLogger = &lWithConnectionId + // if the context is canceled, we don't want to return an error if err != nil { if errors.Is(err, context.Canceled) { @@ -316,13 +337,16 @@ func runWebsocketClient() error { return err } defer c.CloseNow() //nolint:errcheck - cloudLogger.Info().Str("url", wsURL.String()).Msg("websocket connected") + cloudLogger.Info(). + Str("url", wsURL.String()). + Str("connectionID", connectionId). + Msg("websocket connected") // set the metrics when we successfully connect to the cloud. wsResetMetrics(true, "cloud", wsURL.Host) // we don't have a source for the cloud connection - return handleWebRTCSignalWsMessages(c, true, wsURL.Host, &scopedLogger) + return handleWebRTCSignalWsMessages(c, true, wsURL.Host, connectionId, scopedLogger) } func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error { diff --git a/web.go b/web.go index 8ff5929..6e74a13 100644 --- a/web.go +++ b/web.go @@ -189,6 +189,7 @@ var ( func handleLocalWebRTCSignal(c *gin.Context) { // get the source from the request source := c.ClientIP() + connectionID := uuid.New().String() scopedLogger := websocketLogger.With(). Str("component", "websocket"). @@ -226,20 +227,23 @@ func handleLocalWebRTCSignal(c *gin.Context) { return } - err = handleWebRTCSignalWsMessages(wsCon, false, source, &scopedLogger) + err = handleWebRTCSignalWsMessages(wsCon, false, source, connectionID, &scopedLogger) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) return } } -func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool, source string, scopedLogger *zerolog.Logger) error { +func handleWebRTCSignalWsMessages( + wsCon *websocket.Conn, + isCloudConnection bool, + source string, + connectionID string, + scopedLogger *zerolog.Logger, +) error { runCtx, cancelRun := context.WithCancel(context.Background()) defer cancelRun() - // Add connection tracking to detect reconnections - connectionID := uuid.New().String() - // connection type var sourceType string if isCloudConnection {