From f3b5011d65ade31b34aad01a2c1e670582810f28 Mon Sep 17 00:00:00 2001 From: Siyuan Miao Date: Thu, 3 Apr 2025 19:06:21 +0200 Subject: [PATCH] feat(cloud): add metrics for cloud connections --- cloud.go | 127 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) diff --git a/cloud.go b/cloud.go index 4b9c2b4..be53b08 100644 --- a/cloud.go +++ b/cloud.go @@ -10,6 +10,8 @@ import ( "time" "github.com/coder/websocket/wsjson" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/coreos/go-oidc/v3/oidc" @@ -36,6 +38,97 @@ const ( CloudWebSocketPingInterval = 15 * time.Second ) +var ( + metricCloudConnectionStatus = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_status", + Help: "The status of the cloud connection", + }, + ) + metricCloudConnectionEstablishedTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_established_timestamp", + Help: "The timestamp when the cloud connection was established", + }, + ) + metricCloudConnectionLastPingTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_ping_timestamp", + Help: "The timestamp when the last ping response was received", + }, + ) + metricCloudConnectionLastPingDuration = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_ping_duration", + Help: "The duration of the last ping response", + }, + ) + metricCloudConnectionPingDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "jetkvm_cloud_connection_ping_duration", + Help: "The duration of the ping response", + Buckets: []float64{ + 0.1, 0.5, 1, 10, + }, + }, + ) + metricCloudConnectionTotalPingCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_cloud_connection_total_ping_count", + Help: "The total number of pings sent to the cloud", + }, + ) + metricCloudConnectionSessionRequestCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_cloud_connection_session_total_request_count", + Help: "The total number of session requests received from the cloud", + }, + ) + metricCloudConnectionSessionRequestDuration = promauto.NewHistogram( + prometheus.HistogramOpts{ + Name: "jetkvm_cloud_connection_session_request_duration", + Help: "The duration of session requests", + Buckets: []float64{ + 0.1, 0.5, 1, 10, + }, + }, + ) + metricCloudConnectionLastSessionRequestTimestamp = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_session_request_timestamp", + Help: "The timestamp of the last session request", + }, + ) + metricCloudConnectionLastSessionRequestDuration = promauto.NewGauge( + prometheus.GaugeOpts{ + Name: "jetkvm_cloud_connection_last_session_request_duration", + Help: "The duration of the last session request", + }, + ) + metricCloudConnectionFailureCount = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "jetkvm_cloud_connection_failure_count", + Help: "The number of times the cloud connection has failed", + }, + ) +) + +func cloudResetMetrics(established bool) { + metricCloudConnectionLastPingTimestamp.Set(-1) + metricCloudConnectionLastPingDuration.Set(-1) + + metricCloudConnectionLastSessionRequestTimestamp.Set(-1) + metricCloudConnectionLastSessionRequestDuration.Set(-1) + + if established { + metricCloudConnectionEstablishedTimestamp.SetToCurrentTime() + metricCloudConnectionStatus.Set(1) + } else { + metricCloudConnectionEstablishedTimestamp.Set(-1) + metricCloudConnectionStatus.Set(-1) + } +} + func handleCloudRegister(c *gin.Context) { var req CloudRegisterRequest @@ -130,15 +223,18 @@ func runWebsocketClient() error { if err != nil { return fmt.Errorf("failed to parse config.CloudURL: %w", err) } + if wsURL.Scheme == "http" { wsURL.Scheme = "ws" } else { wsURL.Scheme = "wss" } + header := http.Header{} header.Set("X-Device-ID", GetDeviceID()) header.Set("Authorization", "Bearer "+config.CloudToken) dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout) + defer cancelDial() c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{ HTTPHeader: header, @@ -148,17 +244,35 @@ func runWebsocketClient() error { } defer c.CloseNow() //nolint:errcheck cloudLogger.Infof("websocket connected to %s", wsURL) + + // set the metrics when we successfully connect to the cloud. + cloudResetMetrics(true) + runCtx, cancelRun := context.WithCancel(context.Background()) defer cancelRun() go func() { for { time.Sleep(CloudWebSocketPingInterval) + + // set the timer for the ping duration + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { + metricCloudConnectionLastPingDuration.Set(v) + metricCloudConnectionPingDuration.Observe(v) + })) + err := c.Ping(runCtx) + if err != nil { cloudLogger.Warnf("websocket ping error: %v", err) cancelRun() return } + + // dont use `defer` here because we want to observe the duration of the ping + timer.ObserveDuration() + + metricCloudConnectionTotalPingCount.Inc() + metricCloudConnectionLastPingTimestamp.SetToCurrentTime() } }() for { @@ -180,6 +294,8 @@ func runWebsocketClient() error { cloudLogger.Infof("new session request: %v", req.OidcGoogle) cloudLogger.Tracef("session request info: %v", req) + metricCloudConnectionSessionRequestCount.Inc() + metricCloudConnectionLastSessionRequestTimestamp.SetToCurrentTime() err = handleSessionRequest(runCtx, c, req) if err != nil { cloudLogger.Infof("error starting new session: %v", err) @@ -189,6 +305,12 @@ func runWebsocketClient() error { } func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { + metricCloudConnectionLastSessionRequestDuration.Set(v) + metricCloudConnectionSessionRequestDuration.Observe(v) + })) + defer timer.ObserveDuration() + oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout) defer cancelOIDC() provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com") @@ -249,6 +371,9 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess func RunWebsocketClient() { for { + // reset the metrics when we start the websocket client. + cloudResetMetrics(false) + // If the cloud token is not set, we don't need to run the websocket client. if config.CloudToken == "" { time.Sleep(5 * time.Second) @@ -272,6 +397,8 @@ func RunWebsocketClient() { err := runWebsocketClient() if err != nil { cloudLogger.Errorf("websocket client error: %v", err) + metricCloudConnectionStatus.Set(0) + metricCloudConnectionFailureCount.Inc() time.Sleep(5 * time.Second) } }