From c1aa96c69c2b1dab4766ff6321355095a16bff3e Mon Sep 17 00:00:00 2001 From: Adam Shiervani Date: Tue, 8 Apr 2025 14:04:21 +0200 Subject: [PATCH] refactor: Standardize metric naming and improve websocket logging --- cloud.go | 67 ++++++++++++++++++++++++++++++------------------------- log.go | 1 + web.go | 65 +++++++++++++++++------------------------------------ webrtc.go | 2 +- 4 files changed, 59 insertions(+), 76 deletions(-) diff --git a/cloud.go b/cloud.go index f69f416..b87d9a9 100644 --- a/cloud.go +++ b/cloud.go @@ -35,8 +35,8 @@ const ( // CloudOidcRequestTimeout is the timeout for OIDC token verification requests // should be lower than the websocket response timeout set in cloud-api CloudOidcRequestTimeout = 10 * time.Second - // CloudWebSocketPingInterval is the interval at which the websocket client sends ping messages to the cloud - CloudWebSocketPingInterval = 15 * time.Second + // WebsocketPingInterval is the interval at which the websocket client sends ping messages to the cloud + WebsocketPingInterval = 15 * time.Second ) var ( @@ -52,57 +52,57 @@ var ( Help: "The timestamp when the cloud connection was established", }, ) - metricCloudConnectionLastPingTimestamp = promauto.NewGauge( + metricConnectionLastPingTimestamp = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "jetkvm_cloud_connection_last_ping_timestamp", + Name: "jetkvm_connection_last_ping_timestamp", Help: "The timestamp when the last ping response was received", }, ) - metricCloudConnectionLastPingDuration = promauto.NewGauge( + metricConnectionLastPingDuration = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "jetkvm_cloud_connection_last_ping_duration", + Name: "jetkvm_connection_last_ping_duration", Help: "The duration of the last ping response", }, ) - metricCloudConnectionPingDuration = promauto.NewHistogram( + metricConnectionPingDuration = promauto.NewHistogram( prometheus.HistogramOpts{ - Name: "jetkvm_cloud_connection_ping_duration", + Name: "jetkvm_connection_ping_duration", Help: "The duration of the ping response", Buckets: []float64{ 0.1, 0.5, 1, 10, }, }, ) - metricCloudConnectionTotalPingCount = promauto.NewCounter( + metricConnectionTotalPingCount = promauto.NewCounter( prometheus.CounterOpts{ - Name: "jetkvm_cloud_connection_total_ping_count", - Help: "The total number of pings sent to the cloud", + Name: "jetkvm_connection_total_ping_count", + Help: "The total number of pings sent to the", }, ) - metricCloudConnectionSessionRequestCount = promauto.NewCounter( + metricConnectionSessionRequestCount = promauto.NewCounter( prometheus.CounterOpts{ - Name: "jetkvm_cloud_connection_session_total_request_count", - Help: "The total number of session requests received from the cloud", + Name: "jetkvm_connection_session_total_request_count", + Help: "The total number of session requests received from the", }, ) - metricCloudConnectionSessionRequestDuration = promauto.NewHistogram( + metricConnectionSessionRequestDuration = promauto.NewHistogram( prometheus.HistogramOpts{ - Name: "jetkvm_cloud_connection_session_request_duration", + Name: "jetkvm_connection_session_request_duration", Help: "The duration of session requests", Buckets: []float64{ 0.1, 0.5, 1, 10, }, }, ) - metricCloudConnectionLastSessionRequestTimestamp = promauto.NewGauge( + metricConnectionLastSessionRequestTimestamp = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "jetkvm_cloud_connection_last_session_request_timestamp", + Name: "jetkvm_connection_last_session_request_timestamp", Help: "The timestamp of the last session request", }, ) - metricCloudConnectionLastSessionRequestDuration = promauto.NewGauge( + metricConnectionLastSessionRequestDuration = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "jetkvm_cloud_connection_last_session_request_duration", + Name: "jetkvm_connection_last_session_request_duration", Help: "The duration of the last session request", }, ) @@ -120,11 +120,11 @@ var ( ) func cloudResetMetrics(established bool) { - metricCloudConnectionLastPingTimestamp.Set(-1) - metricCloudConnectionLastPingDuration.Set(-1) + metricConnectionLastPingTimestamp.Set(-1) + metricConnectionLastPingDuration.Set(-1) - metricCloudConnectionLastSessionRequestTimestamp.Set(-1) - metricCloudConnectionLastSessionRequestDuration.Set(-1) + metricConnectionLastSessionRequestTimestamp.Set(-1) + metricConnectionLastSessionRequestDuration.Set(-1) if established { metricCloudConnectionEstablishedTimestamp.SetToCurrentTime() @@ -276,12 +276,6 @@ func runWebsocketClient() error { } func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error { - timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { - metricCloudConnectionLastSessionRequestDuration.Set(v) - metricCloudConnectionSessionRequestDuration.Observe(v) - })) - defer timer.ObserveDuration() - oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout) defer cancelOIDC() provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com") @@ -313,6 +307,12 @@ func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessi } func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest, isCloudConnection bool) error { + timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { + metricConnectionLastSessionRequestDuration.Set(v) + metricConnectionSessionRequestDuration.Observe(v) + })) + defer timer.ObserveDuration() + // If the message is from the cloud, we need to authenticate the session. if isCloudConnection { if err := authenticateSession(ctx, c, req); err != nil { @@ -320,7 +320,12 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess } } - session, err := newSession(SessionConfig{ws: c}) + session, err := newSession(SessionConfig{ + ws: c, + IsCloud: isCloudConnection, + LocalIP: req.IP, + ICEServers: req.ICEServers, + }) if err != nil { _ = wsjson.Write(context.Background(), c, gin.H{"error": err}) return err diff --git a/log.go b/log.go index 7718a28..0d36c0d 100644 --- a/log.go +++ b/log.go @@ -6,3 +6,4 @@ import "github.com/pion/logging" // ref: https://github.com/pion/webrtc/wiki/Debugging-WebRTC var logger = logging.NewDefaultLoggerFactory().NewLogger("jetkvm") var cloudLogger = logging.NewDefaultLoggerFactory().NewLogger("cloud") +var websocketLogger = logging.NewDefaultLoggerFactory().NewLogger("websocket") diff --git a/web.go b/web.go index b32bec6..aa6b914 100644 --- a/web.go +++ b/web.go @@ -2,7 +2,6 @@ package kvm import ( "context" - "crypto/sha256" "embed" "encoding/json" "fmt" @@ -10,7 +9,6 @@ import ( "net/http" "path/filepath" "strings" - "sync" "time" "github.com/coder/websocket" @@ -158,20 +156,14 @@ func handleWebRTCSignalWsConnection(wsCon *websocket.Conn, isCloudConnection boo connectionID := uuid.New().String() cloudLogger.Infof("new websocket connection established with ID: %s", connectionID) - // Add a mutex to protect against concurrent access to session state - sessionMutex := &sync.Mutex{} - - // Track processed offers to avoid duplicates - processedOffers := make(map[string]bool) - go func() { for { - time.Sleep(CloudWebSocketPingInterval) + time.Sleep(WebsocketPingInterval) // set the timer for the ping duration timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) { - metricCloudConnectionLastPingDuration.Set(v) - metricCloudConnectionPingDuration.Observe(v) + metricConnectionLastPingDuration.Set(v) + metricConnectionPingDuration.Observe(v) })) cloudLogger.Infof("pinging websocket") @@ -186,15 +178,15 @@ func handleWebRTCSignalWsConnection(wsCon *websocket.Conn, isCloudConnection boo // dont use `defer` here because we want to observe the duration of the ping timer.ObserveDuration() - metricCloudConnectionTotalPingCount.Inc() - metricCloudConnectionLastPingTimestamp.SetToCurrentTime() + metricConnectionTotalPingCount.Inc() + metricConnectionLastPingTimestamp.SetToCurrentTime() } }() for { typ, msg, err := wsCon.Read(runCtx) if err != nil { - cloudLogger.Warnf("websocket read error: %v", err) + websocketLogger.Warnf("websocket read error: %v", err) return err } if typ != websocket.MessageText { @@ -209,69 +201,54 @@ func handleWebRTCSignalWsConnection(wsCon *websocket.Conn, isCloudConnection boo err = json.Unmarshal(msg, &message) if err != nil { - cloudLogger.Warnf("unable to parse ws message: %v", string(msg)) + websocketLogger.Warnf("unable to parse ws message: %v", string(msg)) continue } if message.Type == "offer" { - cloudLogger.Infof("new session request received") + websocketLogger.Infof("new session request received") var req WebRTCSessionRequest err = json.Unmarshal(message.Data, &req) if err != nil { - cloudLogger.Warnf("unable to parse session request data: %v", string(message.Data)) + websocketLogger.Warnf("unable to parse session request data: %v", string(message.Data)) continue } - // Create a hash of the offer to deduplicate - offerHash := fmt.Sprintf("%x", sha256.Sum256(message.Data)) + websocketLogger.Infof("new session request: %v", req.OidcGoogle) + websocketLogger.Tracef("session request info: %v", req) - sessionMutex.Lock() - isDuplicate := processedOffers[offerHash] - if !isDuplicate { - processedOffers[offerHash] = true - } - sessionMutex.Unlock() - - if isDuplicate { - cloudLogger.Infof("duplicate offer detected, ignoring: %s", offerHash[:8]) - continue - } - - cloudLogger.Infof("new session request: %v", req.OidcGoogle) - cloudLogger.Tracef("session request info: %v", req) - - metricCloudConnectionSessionRequestCount.Inc() - metricCloudConnectionLastSessionRequestTimestamp.SetToCurrentTime() + metricConnectionSessionRequestCount.Inc() + metricConnectionLastSessionRequestTimestamp.SetToCurrentTime() err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection) if err != nil { - cloudLogger.Infof("error starting new session: %v", err) + websocketLogger.Infof("error starting new session: %v", err) continue } } else if message.Type == "new-ice-candidate" { - cloudLogger.Infof("client has sent us a new ICE candidate: %v", string(message.Data)) + websocketLogger.Infof("The client sent us a new ICE candidate: %v", string(message.Data)) var candidate webrtc.ICECandidateInit // Attempt to unmarshal as a ICECandidateInit if err := json.Unmarshal(message.Data, &candidate); err != nil { - cloudLogger.Warnf("unable to parse ICE candidate data: %v", string(message.Data)) + websocketLogger.Warnf("unable to parse incoming ICE candidate data: %v", string(message.Data)) continue } if candidate.Candidate == "" { - cloudLogger.Warnf("empty ICE candidate, skipping") + websocketLogger.Warnf("empty incoming ICE candidate, skipping") continue } - cloudLogger.Infof("unmarshalled ICE candidate: %v", candidate) + websocketLogger.Infof("unmarshalled incoming ICE candidate: %v", candidate) if currentSession == nil { - cloudLogger.Infof("no current session, skipping ICE candidate") + websocketLogger.Infof("no current session, skipping incoming ICE candidate") continue } - cloudLogger.Infof("adding ICE candidate to current session: %v", candidate) + websocketLogger.Infof("adding incoming ICE candidate to current session: %v", candidate) if err = currentSession.peerConnection.AddICECandidate(candidate); err != nil { - cloudLogger.Warnf("failed to add ICE candidate: %v", err) + websocketLogger.Warnf("failed to add incoming ICE candidate to our peer connection: %v", err) } } } diff --git a/webrtc.go b/webrtc.go index 642516d..d229b30 100644 --- a/webrtc.go +++ b/webrtc.go @@ -142,7 +142,7 @@ func newSession(config SessionConfig) (*Session, error) { var isConnected bool peerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) { - cloudLogger.Infof("AAAAAAA got a new ICE candidate: %v", candidate) + logger.Infof("Our WebRTC peerConnection has a new ICE candidate: %v", candidate) if candidate != nil { wsjson.Write(context.Background(), config.ws, gin.H{"type": "new-ice-candidate", "data": candidate.ToJSON()}) }