From 1e9adf81d433a23202be881de7a8cb4cbfca9953 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Thu, 3 Apr 2025 18:16:41 +0200 Subject: [PATCH 1/2] chore: skip websocket client if net isn't up or time sync hasn't complete --- cloud.go | 26 +++++++++++++++++++++----- main.go | 8 +++----- ntp.go | 33 +++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 10 deletions(-) diff --git a/cloud.go b/cloud.go index a30a14c5..4b9c2b47 100644 --- a/cloud.go +++ b/cloud.go @@ -90,11 +90,6 @@ func handleCloudRegister(c *gin.Context) { return } - if config.CloudToken == "" { - cloudLogger.Info("Starting websocket client due to adoption") - go RunWebsocketClient() - } - config.CloudToken = tokenResp.SecretToken provider, err := oidc.NewProvider(c, "https://accounts.google.com") @@ -130,6 +125,7 @@ func runWebsocketClient() error { time.Sleep(5 * time.Second) return fmt.Errorf("cloud token is not set") } + wsURL, err := url.Parse(config.CloudURL) if err != nil { return fmt.Errorf("failed to parse config.CloudURL: %w", err) @@ -253,6 +249,26 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess func RunWebsocketClient() { for { + // If the cloud token is not set, we don't need to run the websocket client. + if config.CloudToken == "" { + time.Sleep(5 * time.Second) + continue + } + + // If the network is not up, well, we can't connect to the cloud. + if !networkState.Up { + cloudLogger.Warn("waiting for network to be up, will retry in 3 seconds") + time.Sleep(3 * time.Second) + continue + } + + // If the system time is not synchronized, the API request will fail anyway because the TLS handshake will fail. + if isTimeSyncNeeded() && !timeSyncSuccess { + cloudLogger.Warn("system time is not synced, will retry in 3 seconds") + time.Sleep(3 * time.Second) + continue + } + err := runWebsocketClient() if err != nil { cloudLogger.Errorf("websocket client error: %v", err) diff --git a/main.go b/main.go index 6a555959..aeb3d857 100644 --- a/main.go +++ b/main.go @@ -72,11 +72,9 @@ func Main() { if config.TLSMode != "" { go RunWebSecureServer() } - // If the cloud token isn't set, the client won't be started by default. - // However, if the user adopts the device via the web interface, handleCloudRegister will start the client. - if config.CloudToken != "" { - go RunWebsocketClient() - } + // As websocket client already checks if the cloud token is set, we can start it here. + go RunWebsocketClient() + initSerialPort() sigs := make(chan os.Signal, 1) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) diff --git a/ntp.go b/ntp.go index 39ea7af3..27ec100c 100644 --- a/ntp.go +++ b/ntp.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "os/exec" + "strconv" "time" "github.com/beevik/ntp" @@ -20,13 +21,41 @@ const ( ) var ( + builtTimestamp string timeSyncRetryInterval = 0 * time.Second + timeSyncSuccess = false defaultNTPServers = []string{ "time.cloudflare.com", "time.apple.com", } ) +func isTimeSyncNeeded() bool { + if builtTimestamp == "" { + logger.Warnf("Built timestamp is not set, time sync is needed") + return true + } + + ts, err := strconv.Atoi(builtTimestamp) + if err != nil { + logger.Warnf("Failed to parse built timestamp: %v", err) + return true + } + + // builtTimestamp is UNIX timestamp in seconds + builtTime := time.Unix(int64(ts), 0) + now := time.Now() + + logger.Tracef("Built time: %v, now: %v", builtTime, now) + + if now.Sub(builtTime) < 0 { + logger.Warnf("System time is behind the built time, time sync is needed") + return true + } + + return false +} + func TimeSyncLoop() { for { if !networkState.checked { @@ -40,6 +69,9 @@ func TimeSyncLoop() { continue } + // check if time sync is needed, but do nothing for now + isTimeSyncNeeded() + logger.Infof("Syncing system time") start := time.Now() err := SyncSystemTime() @@ -56,6 +88,7 @@ func TimeSyncLoop() { continue } + timeSyncSuccess = true logger.Infof("Time sync successful, now is: %v, time taken: %v", time.Now(), time.Since(start)) time.Sleep(timeSyncInterval) // after the first sync is done } From f3b5011d65ade31b34aad01a2c1e670582810f28 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Thu, 3 Apr 2025 19:06:21 +0200 Subject: [PATCH 2/2] feat(cloud): add metrics for cloud connections --- cloud.go | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/cloud.go b/cloud.go index 4b9c2b47..be53b087 100644 --- a/cloud.go +++ b/cloud.go @@ -10,6 +10,8 @@ import ( "time" "github.com/coder/websocket/wsjson" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/coreos/go-oidc/v3/oidc" @@ -36,6 +38,97 @@ const ( CloudWebSocketPingInterval = 15 * time.Second ) +var ( + metricCloudConnectionStatus = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_status", + Help: "The status of the cloud connection", + }, + ) + metricCloudConnectionEstablishedTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_established_timestamp", + Help: "The timestamp when the cloud connection was established", + }, + ) + metricCloudConnectionLastPingTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_ping_timestamp", + Help: "The timestamp when the last ping response was received", + }, + ) + metricCloudConnectionLastPingDuration = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_ping_duration", + Help: "The duration of the last ping response", + }, + ) + metricCloudConnectionPingDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "jetkvm_cloud_connection_ping_duration", + Help: "The duration of the ping response", + Buckets: []float64{ + 0.1, 0.5, 1, 10, + }, + }, + ) + metricCloudConnectionTotalPingCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_cloud_connection_total_ping_count", + Help: "The total number of pings sent to the cloud", + }, + ) + metricCloudConnectionSessionRequestCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_cloud_connection_session_total_request_count", + Help: "The total number of session requests received from the cloud", + }, + ) + metricCloudConnectionSessionRequestDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "jetkvm_cloud_connection_session_request_duration", + Help: "The duration of session requests", + Buckets: []float64{ + 0.1, 0.5, 1, 10, + }, + }, + ) + metricCloudConnectionLastSessionRequestTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_session_request_timestamp", + Help: "The timestamp of the last session request", + }, + ) + metricCloudConnectionLastSessionRequestDuration = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_session_request_duration", + Help: "The duration of the last session request", + }, + ) + metricCloudConnectionFailureCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_cloud_connection_failure_count", + Help: "The number of times the cloud connection has failed", + }, + ) +) + +func cloudResetMetrics(established bool) { + metricCloudConnectionLastPingTimestamp.Set(-1) + metricCloudConnectionLastPingDuration.Set(-1) + + metricCloudConnectionLastSessionRequestTimestamp.Set(-1) + metricCloudConnectionLastSessionRequestDuration.Set(-1) + + if established { + metricCloudConnectionEstablishedTimestamp.SetToCurrentTime() + metricCloudConnectionStatus.Set(1) + } else { + metricCloudConnectionEstablishedTimestamp.Set(-1) + metricCloudConnectionStatus.Set(-1) + } +} + func handleCloudRegister(c *gin.Context) { var req CloudRegisterRequest @@ -130,15 +223,18 @@ func runWebsocketClient() error { if err != nil { return fmt.Errorf("failed to parse config.CloudURL: %w", err) } + if wsURL.Scheme == "http" { wsURL.Scheme = "ws" } else { wsURL.Scheme = "wss" } + header := http.Header{} header.Set("X-Device-ID", GetDeviceID()) header.Set("Authorization", "Bearer "+config.CloudToken) dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout) + defer cancelDial() c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{ HTTPHeader: header, @@ -148,17 +244,35 @@ func runWebsocketClient() error { } defer c.CloseNow() //nolint:errcheck cloudLogger.Infof("websocket connected to %s", wsURL) + + // set the metrics when we successfully connect to the cloud. + cloudResetMetrics(true) + runCtx, cancelRun := context.WithCancel(context.Background()) defer cancelRun() go func() { for { time.Sleep(CloudWebSocketPingInterval) + + // set the timer for the ping duration + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { + metricCloudConnectionLastPingDuration.Set(v) + metricCloudConnectionPingDuration.Observe(v) + })) + err := c.Ping(runCtx) + if err != nil { cloudLogger.Warnf("websocket ping error: %v", err) cancelRun() return } + + // dont use `defer` here because we want to observe the duration of the ping + timer.ObserveDuration() + + metricCloudConnectionTotalPingCount.Inc() + metricCloudConnectionLastPingTimestamp.SetToCurrentTime() } }() for { @@ -180,6 +294,8 @@ func runWebsocketClient() error { cloudLogger.Infof("new session request: %v", req.OidcGoogle) cloudLogger.Tracef("session request info: %v", req) + metricCloudConnectionSessionRequestCount.Inc() + metricCloudConnectionLastSessionRequestTimestamp.SetToCurrentTime() err = handleSessionRequest(runCtx, c, req) if err != nil { cloudLogger.Infof("error starting new session: %v", err) @@ -189,6 +305,12 @@ func runWebsocketClient() error { } func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { + metricCloudConnectionLastSessionRequestDuration.Set(v) + metricCloudConnectionSessionRequestDuration.Observe(v) + })) + defer timer.ObserveDuration() + oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout) defer cancelOIDC() provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com") @@ -249,6 +371,9 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess func RunWebsocketClient() { for { + // reset the metrics when we start the websocket client. + cloudResetMetrics(false) + // If the cloud token is not set, we don't need to run the websocket client. if config.CloudToken == "" { time.Sleep(5 * time.Second) @@ -272,6 +397,8 @@ func RunWebsocketClient() { err := runWebsocketClient() if err != nil { cloudLogger.Errorf("websocket client error: %v", err) + metricCloudConnectionStatus.Set(0) + metricCloudConnectionFailureCount.Inc() time.Sleep(5 * time.Second) } }