mirror of https://github.com/jetkvm/kvm.git
chore(websocket): logging and metrics improvement (#347)
* chore(websocket): only show warning if websocket is closed abnormally * chore(websocket): add counter for ping requests received
This commit is contained in:
parent
66a3352e5d
commit
dc1ce03697
39
cloud.go
39
cloud.go
|
@ -4,6 +4,7 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -59,6 +60,13 @@ var (
|
||||||
},
|
},
|
||||||
[]string{"type", "source"},
|
[]string{"type", "source"},
|
||||||
)
|
)
|
||||||
|
metricConnectionLastPingReceivedTimestamp = promauto.NewGaugeVec(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "jetkvm_connection_last_ping_received_timestamp",
|
||||||
|
Help: "The timestamp when the last ping request was received",
|
||||||
|
},
|
||||||
|
[]string{"type", "source"},
|
||||||
|
)
|
||||||
metricConnectionLastPingDuration = promauto.NewGaugeVec(
|
metricConnectionLastPingDuration = promauto.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Name: "jetkvm_connection_last_ping_duration",
|
Name: "jetkvm_connection_last_ping_duration",
|
||||||
|
@ -76,16 +84,23 @@ var (
|
||||||
},
|
},
|
||||||
[]string{"type", "source"},
|
[]string{"type", "source"},
|
||||||
)
|
)
|
||||||
metricConnectionTotalPingCount = promauto.NewCounterVec(
|
metricConnectionTotalPingSentCount = promauto.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "jetkvm_connection_total_ping_count",
|
Name: "jetkvm_connection_total_ping_sent",
|
||||||
Help: "The total number of pings sent to the connection",
|
Help: "The total number of pings sent to the connection",
|
||||||
},
|
},
|
||||||
[]string{"type", "source"},
|
[]string{"type", "source"},
|
||||||
)
|
)
|
||||||
|
metricConnectionTotalPingReceivedCount = promauto.NewCounterVec(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "jetkvm_connection_total_ping_received",
|
||||||
|
Help: "The total number of pings received from the connection",
|
||||||
|
},
|
||||||
|
[]string{"type", "source"},
|
||||||
|
)
|
||||||
metricConnectionSessionRequestCount = promauto.NewCounterVec(
|
metricConnectionSessionRequestCount = promauto.NewCounterVec(
|
||||||
prometheus.CounterOpts{
|
prometheus.CounterOpts{
|
||||||
Name: "jetkvm_connection_session_total_request_count",
|
Name: "jetkvm_connection_session_total_requests",
|
||||||
Help: "The total number of session requests received",
|
Help: "The total number of session requests received",
|
||||||
},
|
},
|
||||||
[]string{"type", "source"},
|
[]string{"type", "source"},
|
||||||
|
@ -131,6 +146,8 @@ func wsResetMetrics(established bool, sourceType string, source string) {
|
||||||
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).Set(-1)
|
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).Set(-1)
|
||||||
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(-1)
|
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(-1)
|
||||||
|
|
||||||
|
metricConnectionLastPingReceivedTimestamp.WithLabelValues(sourceType, source).Set(-1)
|
||||||
|
|
||||||
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).Set(-1)
|
metricConnectionLastSessionRequestTimestamp.WithLabelValues(sourceType, source).Set(-1)
|
||||||
metricConnectionLastSessionRequestDuration.WithLabelValues(sourceType, source).Set(-1)
|
metricConnectionLastSessionRequestDuration.WithLabelValues(sourceType, source).Set(-1)
|
||||||
|
|
||||||
|
@ -277,20 +294,29 @@ func runWebsocketClient() error {
|
||||||
HTTPHeader: header,
|
HTTPHeader: header,
|
||||||
OnPingReceived: func(ctx context.Context, payload []byte) bool {
|
OnPingReceived: func(ctx context.Context, payload []byte) bool {
|
||||||
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: cloud", payload, wsURL.Host)
|
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: cloud", payload, wsURL.Host)
|
||||||
|
|
||||||
|
metricConnectionTotalPingReceivedCount.WithLabelValues("cloud", wsURL.Host).Inc()
|
||||||
|
metricConnectionLastPingReceivedTimestamp.WithLabelValues("cloud", wsURL.Host).SetToCurrentTime()
|
||||||
|
|
||||||
return true
|
return true
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
// if the context is canceled, we don't want to return an error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if errors.Is(err, context.Canceled) {
|
||||||
|
cloudLogger.Infof("websocket connection canceled")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer c.CloseNow() //nolint:errcheck
|
defer c.CloseNow() //nolint:errcheck
|
||||||
cloudLogger.Infof("websocket connected to %s", wsURL)
|
cloudLogger.Infof("websocket connected to %s", wsURL)
|
||||||
|
|
||||||
// set the metrics when we successfully connect to the cloud.
|
// set the metrics when we successfully connect to the cloud.
|
||||||
wsResetMetrics(true, "cloud", "")
|
wsResetMetrics(true, "cloud", wsURL.Host)
|
||||||
|
|
||||||
// we don't have a source for the cloud connection
|
// we don't have a source for the cloud connection
|
||||||
return handleWebRTCSignalWsMessages(c, true, "")
|
return handleWebRTCSignalWsMessages(c, true, wsURL.Host)
|
||||||
}
|
}
|
||||||
|
|
||||||
func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
|
func authenticateSession(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
|
||||||
|
@ -379,9 +405,6 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
|
||||||
|
|
||||||
func RunWebsocketClient() {
|
func RunWebsocketClient() {
|
||||||
for {
|
for {
|
||||||
// reset the metrics when we start the websocket client.
|
|
||||||
wsResetMetrics(false, "cloud", "")
|
|
||||||
|
|
||||||
// If the cloud token is not set, we don't need to run the websocket client.
|
// If the cloud token is not set, we don't need to run the websocket client.
|
||||||
if config.CloudToken == "" {
|
if config.CloudToken == "" {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
20
web.go
20
web.go
|
@ -5,6 +5,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"embed"
|
"embed"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -190,6 +191,10 @@ func handleLocalWebRTCSignal(c *gin.Context) {
|
||||||
InsecureSkipVerify: true, // Allow connections from any origin
|
InsecureSkipVerify: true, // Allow connections from any origin
|
||||||
OnPingReceived: func(ctx context.Context, payload []byte) bool {
|
OnPingReceived: func(ctx context.Context, payload []byte) bool {
|
||||||
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: local", payload, source)
|
websocketLogger.Infof("ping frame received: %v, source: %s, sourceType: local", payload, source)
|
||||||
|
|
||||||
|
metricConnectionTotalPingReceivedCount.WithLabelValues("local", source).Inc()
|
||||||
|
metricConnectionLastPingReceivedTimestamp.WithLabelValues("local", source).SetToCurrentTime()
|
||||||
|
|
||||||
return true
|
return true
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -251,6 +256,15 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
|
||||||
for {
|
for {
|
||||||
time.Sleep(WebsocketPingInterval)
|
time.Sleep(WebsocketPingInterval)
|
||||||
|
|
||||||
|
if ctxErr := runCtx.Err(); ctxErr != nil {
|
||||||
|
if !errors.Is(ctxErr, context.Canceled) {
|
||||||
|
logWarnf("websocket connection closed: %v", ctxErr)
|
||||||
|
} else {
|
||||||
|
logTracef("websocket connection closed as the context was canceled: %v")
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// set the timer for the ping duration
|
// set the timer for the ping duration
|
||||||
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||||
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(v)
|
metricConnectionLastPingDuration.WithLabelValues(sourceType, source).Set(v)
|
||||||
|
@ -269,7 +283,7 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
|
||||||
// dont use `defer` here because we want to observe the duration of the ping
|
// dont use `defer` here because we want to observe the duration of the ping
|
||||||
duration := timer.ObserveDuration()
|
duration := timer.ObserveDuration()
|
||||||
|
|
||||||
metricConnectionTotalPingCount.WithLabelValues(sourceType, source).Inc()
|
metricConnectionTotalPingSentCount.WithLabelValues(sourceType, source).Inc()
|
||||||
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
|
metricConnectionLastPingTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
|
||||||
|
|
||||||
logTracef("received pong frame, duration: %v", duration)
|
logTracef("received pong frame, duration: %v", duration)
|
||||||
|
@ -317,6 +331,10 @@ func handleWebRTCSignalWsMessages(wsCon *websocket.Conn, isCloudConnection bool,
|
||||||
logWarnf("unable to write pong message: %v", err)
|
logWarnf("unable to write pong message: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
metricConnectionTotalPingReceivedCount.WithLabelValues(sourceType, source).Inc()
|
||||||
|
metricConnectionLastPingReceivedTimestamp.WithLabelValues(sourceType, source).SetToCurrentTime()
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue