feat(cloud): add metrics for cloud connections

This commit is contained in:
Siyuan Miao 2025-04-03 19:06:21 +02:00
parent 1e9adf81d4
commit f3b5011d65
1 changed files with 127 additions and 0 deletions

127
cloud.go
View File

@ -10,6 +10,8 @@ import (
"time" "time"
"github.com/coder/websocket/wsjson" "github.com/coder/websocket/wsjson"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/coreos/go-oidc/v3/oidc" "github.com/coreos/go-oidc/v3/oidc"
@ -36,6 +38,97 @@ const (
CloudWebSocketPingInterval = 15 * time.Second CloudWebSocketPingInterval = 15 * time.Second
) )
var (
metricCloudConnectionStatus = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_status",
Help: "The status of the cloud connection",
},
)
metricCloudConnectionEstablishedTimestamp = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_established_timestamp",
Help: "The timestamp when the cloud connection was established",
},
)
metricCloudConnectionLastPingTimestamp = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_ping_timestamp",
Help: "The timestamp when the last ping response was received",
},
)
metricCloudConnectionLastPingDuration = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_ping_duration",
Help: "The duration of the last ping response",
},
)
metricCloudConnectionPingDuration = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "jetkvm_cloud_connection_ping_duration",
Help: "The duration of the ping response",
Buckets: []float64{
0.1, 0.5, 1, 10,
},
},
)
metricCloudConnectionTotalPingCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_cloud_connection_total_ping_count",
Help: "The total number of pings sent to the cloud",
},
)
metricCloudConnectionSessionRequestCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_cloud_connection_session_total_request_count",
Help: "The total number of session requests received from the cloud",
},
)
metricCloudConnectionSessionRequestDuration = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "jetkvm_cloud_connection_session_request_duration",
Help: "The duration of session requests",
Buckets: []float64{
0.1, 0.5, 1, 10,
},
},
)
metricCloudConnectionLastSessionRequestTimestamp = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_session_request_timestamp",
Help: "The timestamp of the last session request",
},
)
metricCloudConnectionLastSessionRequestDuration = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_session_request_duration",
Help: "The duration of the last session request",
},
)
metricCloudConnectionFailureCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_cloud_connection_failure_count",
Help: "The number of times the cloud connection has failed",
},
)
)
func cloudResetMetrics(established bool) {
metricCloudConnectionLastPingTimestamp.Set(-1)
metricCloudConnectionLastPingDuration.Set(-1)
metricCloudConnectionLastSessionRequestTimestamp.Set(-1)
metricCloudConnectionLastSessionRequestDuration.Set(-1)
if established {
metricCloudConnectionEstablishedTimestamp.SetToCurrentTime()
metricCloudConnectionStatus.Set(1)
} else {
metricCloudConnectionEstablishedTimestamp.Set(-1)
metricCloudConnectionStatus.Set(-1)
}
}
func handleCloudRegister(c *gin.Context) { func handleCloudRegister(c *gin.Context) {
var req CloudRegisterRequest var req CloudRegisterRequest
@ -130,15 +223,18 @@ func runWebsocketClient() error {
if err != nil { if err != nil {
return fmt.Errorf("failed to parse config.CloudURL: %w", err) return fmt.Errorf("failed to parse config.CloudURL: %w", err)
} }
if wsURL.Scheme == "http" { if wsURL.Scheme == "http" {
wsURL.Scheme = "ws" wsURL.Scheme = "ws"
} else { } else {
wsURL.Scheme = "wss" wsURL.Scheme = "wss"
} }
header := http.Header{} header := http.Header{}
header.Set("X-Device-ID", GetDeviceID()) header.Set("X-Device-ID", GetDeviceID())
header.Set("Authorization", "Bearer "+config.CloudToken) header.Set("Authorization", "Bearer "+config.CloudToken)
dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout) dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout)
defer cancelDial() defer cancelDial()
c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{ c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{
HTTPHeader: header, HTTPHeader: header,
@ -148,17 +244,35 @@ func runWebsocketClient() error {
} }
defer c.CloseNow() //nolint:errcheck defer c.CloseNow() //nolint:errcheck
cloudLogger.Infof("websocket connected to %s", wsURL) cloudLogger.Infof("websocket connected to %s", wsURL)
// set the metrics when we successfully connect to the cloud.
cloudResetMetrics(true)
runCtx, cancelRun := context.WithCancel(context.Background()) runCtx, cancelRun := context.WithCancel(context.Background())
defer cancelRun() defer cancelRun()
go func() { go func() {
for { for {
time.Sleep(CloudWebSocketPingInterval) time.Sleep(CloudWebSocketPingInterval)
// set the timer for the ping duration
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
metricCloudConnectionLastPingDuration.Set(v)
metricCloudConnectionPingDuration.Observe(v)
}))
err := c.Ping(runCtx) err := c.Ping(runCtx)
if err != nil { if err != nil {
cloudLogger.Warnf("websocket ping error: %v", err) cloudLogger.Warnf("websocket ping error: %v", err)
cancelRun() cancelRun()
return return
} }
// dont use `defer` here because we want to observe the duration of the ping
timer.ObserveDuration()
metricCloudConnectionTotalPingCount.Inc()
metricCloudConnectionLastPingTimestamp.SetToCurrentTime()
} }
}() }()
for { for {
@ -180,6 +294,8 @@ func runWebsocketClient() error {
cloudLogger.Infof("new session request: %v", req.OidcGoogle) cloudLogger.Infof("new session request: %v", req.OidcGoogle)
cloudLogger.Tracef("session request info: %v", req) cloudLogger.Tracef("session request info: %v", req)
metricCloudConnectionSessionRequestCount.Inc()
metricCloudConnectionLastSessionRequestTimestamp.SetToCurrentTime()
err = handleSessionRequest(runCtx, c, req) err = handleSessionRequest(runCtx, c, req)
if err != nil { if err != nil {
cloudLogger.Infof("error starting new session: %v", err) cloudLogger.Infof("error starting new session: %v", err)
@ -189,6 +305,12 @@ func runWebsocketClient() error {
} }
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error { func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
metricCloudConnectionLastSessionRequestDuration.Set(v)
metricCloudConnectionSessionRequestDuration.Observe(v)
}))
defer timer.ObserveDuration()
oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout) oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout)
defer cancelOIDC() defer cancelOIDC()
provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com") provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com")
@ -249,6 +371,9 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
func RunWebsocketClient() { func RunWebsocketClient() {
for { for {
// reset the metrics when we start the websocket client.
cloudResetMetrics(false)
// If the cloud token is not set, we don't need to run the websocket client. // If the cloud token is not set, we don't need to run the websocket client.
if config.CloudToken == "" { if config.CloudToken == "" {
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
@ -272,6 +397,8 @@ func RunWebsocketClient() {
err := runWebsocketClient() err := runWebsocketClient()
if err != nil { if err != nil {
cloudLogger.Errorf("websocket client error: %v", err) cloudLogger.Errorf("websocket client error: %v", err)
metricCloudConnectionStatus.Set(0)
metricCloudConnectionFailureCount.Inc()
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} }
} }