refactor: Standardize metric naming and improve websocket logging

This commit is contained in:
Adam Shiervani 2025-04-08 14:04:21 +02:00 committed by Siyuan Miao
parent d91ac1ace7
commit c1aa96c69c
4 changed files with 59 additions and 76 deletions

View File

@ -35,8 +35,8 @@ const (
// CloudOidcRequestTimeout is the timeout for OIDC token verification requests
// should be lower than the websocket response timeout set in cloud-api
CloudOidcRequestTimeout = 10 * time.Second
// CloudWebSocketPingInterval is the interval at which the websocket client sends ping messages to the cloud
CloudWebSocketPingInterval = 15 * time.Second
// WebsocketPingInterval is the interval at which the websocket client sends ping messages to the cloud
WebsocketPingInterval = 15 * time.Second
)
var (
@ -52,57 +52,57 @@ var (
Help: "The timestamp when the cloud connection was established",
},
)
metricCloudConnectionLastPingTimestamp = promauto.NewGauge(
metricConnectionLastPingTimestamp = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_ping_timestamp",
Name: "jetkvm_connection_last_ping_timestamp",
Help: "The timestamp when the last ping response was received",
},
)
metricCloudConnectionLastPingDuration = promauto.NewGauge(
metricConnectionLastPingDuration = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_ping_duration",
Name: "jetkvm_connection_last_ping_duration",
Help: "The duration of the last ping response",
},
)
metricCloudConnectionPingDuration = promauto.NewHistogram(
metricConnectionPingDuration = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "jetkvm_cloud_connection_ping_duration",
Name: "jetkvm_connection_ping_duration",
Help: "The duration of the ping response",
Buckets: []float64{
0.1, 0.5, 1, 10,
},
},
)
metricCloudConnectionTotalPingCount = promauto.NewCounter(
metricConnectionTotalPingCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_cloud_connection_total_ping_count",
Help: "The total number of pings sent to the cloud",
Name: "jetkvm_connection_total_ping_count",
Help: "The total number of pings sent to the",
},
)
metricCloudConnectionSessionRequestCount = promauto.NewCounter(
metricConnectionSessionRequestCount = promauto.NewCounter(
prometheus.CounterOpts{
Name: "jetkvm_cloud_connection_session_total_request_count",
Help: "The total number of session requests received from the cloud",
Name: "jetkvm_connection_session_total_request_count",
Help: "The total number of session requests received from the",
},
)
metricCloudConnectionSessionRequestDuration = promauto.NewHistogram(
metricConnectionSessionRequestDuration = promauto.NewHistogram(
prometheus.HistogramOpts{
Name: "jetkvm_cloud_connection_session_request_duration",
Name: "jetkvm_connection_session_request_duration",
Help: "The duration of session requests",
Buckets: []float64{
0.1, 0.5, 1, 10,
},
},
)
metricCloudConnectionLastSessionRequestTimestamp = promauto.NewGauge(
metricConnectionLastSessionRequestTimestamp = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_session_request_timestamp",
Name: "jetkvm_connection_last_session_request_timestamp",
Help: "The timestamp of the last session request",
},
)
metricCloudConnectionLastSessionRequestDuration = promauto.NewGauge(
metricConnectionLastSessionRequestDuration = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "jetkvm_cloud_connection_last_session_request_duration",
Name: "jetkvm_connection_last_session_request_duration",
Help: "The duration of the last session request",
},
)
@ -120,11 +120,11 @@ var (
)
func cloudResetMetrics(established bool) {
metricCloudConnectionLastPingTimestamp.Set(-1)
metricCloudConnectionLastPingDuration.Set(-1)
metricConnectionLastPingTimestamp.Set(-1)
metricConnectionLastPingDuration.Set(-1)
metricCloudConnectionLastSessionRequestTimestamp.Set(-1)
metricCloudConnectionLastSessionRequestDuration.Set(-1)
metricConnectionLastSessionRequestTimestamp.Set(-1)
metricConnectionLastSessionRequestDuration.Set(-1)
if established {
metricCloudConnectionEstablishedTimestamp.SetToCurrentTime()
@ -276,12 +276,6 @@ func runWebsocketClient() error {
}
func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
metricCloudConnectionLastSessionRequestDuration.Set(v)
metricCloudConnectionSessionRequestDuration.Observe(v)
}))
defer timer.ObserveDuration()
oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout)
defer cancelOIDC()
provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com")
@ -313,6 +307,12 @@ func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessi
}
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest, isCloudConnection bool) error {
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
metricConnectionLastSessionRequestDuration.Set(v)
metricConnectionSessionRequestDuration.Observe(v)
}))
defer timer.ObserveDuration()
// If the message is from the cloud, we need to authenticate the session.
if isCloudConnection {
if err := authenticateSession(ctx, c, req); err != nil {
@ -320,7 +320,12 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
}
}
session, err := newSession(SessionConfig{ws: c})
session, err := newSession(SessionConfig{
ws: c,
IsCloud: isCloudConnection,
LocalIP: req.IP,
ICEServers: req.ICEServers,
})
if err != nil {
_ = wsjson.Write(context.Background(), c, gin.H{"error": err})
return err

1
log.go
View File

@ -6,3 +6,4 @@ import "github.com/pion/logging"
// ref: https://github.com/pion/webrtc/wiki/Debugging-WebRTC
var logger = logging.NewDefaultLoggerFactory().NewLogger("jetkvm")
var cloudLogger = logging.NewDefaultLoggerFactory().NewLogger("cloud")
var websocketLogger = logging.NewDefaultLoggerFactory().NewLogger("websocket")

65
web.go
View File

@ -2,7 +2,6 @@ package kvm
import (
"context"
"crypto/sha256"
"embed"
"encoding/json"
"fmt"
@ -10,7 +9,6 @@ import (
"net/http"
"path/filepath"
"strings"
"sync"
"time"
"github.com/coder/websocket"
@ -158,20 +156,14 @@ func handleWebRTCSignalWsConnection(wsCon *websocket.Conn, isCloudConnection boo
connectionID := uuid.New().String()
cloudLogger.Infof("new websocket connection established with ID: %s", connectionID)
// Add a mutex to protect against concurrent access to session state
sessionMutex := &sync.Mutex{}
// Track processed offers to avoid duplicates
processedOffers := make(map[string]bool)
go func() {
for {
time.Sleep(CloudWebSocketPingInterval)
time.Sleep(WebsocketPingInterval)
// set the timer for the ping duration
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
metricCloudConnectionLastPingDuration.Set(v)
metricCloudConnectionPingDuration.Observe(v)
metricConnectionLastPingDuration.Set(v)
metricConnectionPingDuration.Observe(v)
}))
cloudLogger.Infof("pinging websocket")
@ -186,15 +178,15 @@ func handleWebRTCSignalWsConnection(wsCon *websocket.Conn, isCloudConnection boo
// dont use `defer` here because we want to observe the duration of the ping
timer.ObserveDuration()
metricCloudConnectionTotalPingCount.Inc()
metricCloudConnectionLastPingTimestamp.SetToCurrentTime()
metricConnectionTotalPingCount.Inc()
metricConnectionLastPingTimestamp.SetToCurrentTime()
}
}()
for {
typ, msg, err := wsCon.Read(runCtx)
if err != nil {
cloudLogger.Warnf("websocket read error: %v", err)
websocketLogger.Warnf("websocket read error: %v", err)
return err
}
if typ != websocket.MessageText {
@ -209,69 +201,54 @@ func handleWebRTCSignalWsConnection(wsCon *websocket.Conn, isCloudConnection boo
err = json.Unmarshal(msg, &message)
if err != nil {
cloudLogger.Warnf("unable to parse ws message: %v", string(msg))
websocketLogger.Warnf("unable to parse ws message: %v", string(msg))
continue
}
if message.Type == "offer" {
cloudLogger.Infof("new session request received")
websocketLogger.Infof("new session request received")
var req WebRTCSessionRequest
err = json.Unmarshal(message.Data, &req)
if err != nil {
cloudLogger.Warnf("unable to parse session request data: %v", string(message.Data))
websocketLogger.Warnf("unable to parse session request data: %v", string(message.Data))
continue
}
// Create a hash of the offer to deduplicate
offerHash := fmt.Sprintf("%x", sha256.Sum256(message.Data))
websocketLogger.Infof("new session request: %v", req.OidcGoogle)
websocketLogger.Tracef("session request info: %v", req)
sessionMutex.Lock()
isDuplicate := processedOffers[offerHash]
if !isDuplicate {
processedOffers[offerHash] = true
}
sessionMutex.Unlock()
if isDuplicate {
cloudLogger.Infof("duplicate offer detected, ignoring: %s", offerHash[:8])
continue
}
cloudLogger.Infof("new session request: %v", req.OidcGoogle)
cloudLogger.Tracef("session request info: %v", req)
metricCloudConnectionSessionRequestCount.Inc()
metricCloudConnectionLastSessionRequestTimestamp.SetToCurrentTime()
metricConnectionSessionRequestCount.Inc()
metricConnectionLastSessionRequestTimestamp.SetToCurrentTime()
err = handleSessionRequest(runCtx, wsCon, req, isCloudConnection)
if err != nil {
cloudLogger.Infof("error starting new session: %v", err)
websocketLogger.Infof("error starting new session: %v", err)
continue
}
} else if message.Type == "new-ice-candidate" {
cloudLogger.Infof("client has sent us a new ICE candidate: %v", string(message.Data))
websocketLogger.Infof("The client sent us a new ICE candidate: %v", string(message.Data))
var candidate webrtc.ICECandidateInit
// Attempt to unmarshal as a ICECandidateInit
if err := json.Unmarshal(message.Data, &candidate); err != nil {
cloudLogger.Warnf("unable to parse ICE candidate data: %v", string(message.Data))
websocketLogger.Warnf("unable to parse incoming ICE candidate data: %v", string(message.Data))
continue
}
if candidate.Candidate == "" {
cloudLogger.Warnf("empty ICE candidate, skipping")
websocketLogger.Warnf("empty incoming ICE candidate, skipping")
continue
}
cloudLogger.Infof("unmarshalled ICE candidate: %v", candidate)
websocketLogger.Infof("unmarshalled incoming ICE candidate: %v", candidate)
if currentSession == nil {
cloudLogger.Infof("no current session, skipping ICE candidate")
websocketLogger.Infof("no current session, skipping incoming ICE candidate")
continue
}
cloudLogger.Infof("adding ICE candidate to current session: %v", candidate)
websocketLogger.Infof("adding incoming ICE candidate to current session: %v", candidate)
if err = currentSession.peerConnection.AddICECandidate(candidate); err != nil {
cloudLogger.Warnf("failed to add ICE candidate: %v", err)
websocketLogger.Warnf("failed to add incoming ICE candidate to our peer connection: %v", err)
}
}
}

View File

@ -142,7 +142,7 @@ func newSession(config SessionConfig) (*Session, error) {
var isConnected bool
peerConnection.OnICECandidate(func(candidate *webrtc.ICECandidate) {
cloudLogger.Infof("AAAAAAA got a new ICE candidate: %v", candidate)
logger.Infof("Our WebRTC peerConnection has a new ICE candidate: %v", candidate)
if candidate != nil {
wsjson.Write(context.Background(), config.ws, gin.H{"type": "new-ice-candidate", "data": candidate.ToJSON()})
}