mirror of https://github.com/jetkvm/kvm.git
chore(cloud): websocket client improvements (#323)
This commit is contained in:
commit
1a26431147
153
cloud.go
153
cloud.go
|
@ -10,6 +10,8 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/coder/websocket/wsjson"
|
"github.com/coder/websocket/wsjson"
|
||||||
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||||
|
|
||||||
"github.com/coreos/go-oidc/v3/oidc"
|
"github.com/coreos/go-oidc/v3/oidc"
|
||||||
|
|
||||||
|
@ -36,6 +38,97 @@ const (
|
||||||
CloudWebSocketPingInterval = 15 * time.Second
|
CloudWebSocketPingInterval = 15 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
metricCloudConnectionStatus = promauto.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_status",
|
||||||
|
Help: "The status of the cloud connection",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionEstablishedTimestamp = promauto.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_established_timestamp",
|
||||||
|
Help: "The timestamp when the cloud connection was established",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionLastPingTimestamp = promauto.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_last_ping_timestamp",
|
||||||
|
Help: "The timestamp when the last ping response was received",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionLastPingDuration = promauto.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_last_ping_duration",
|
||||||
|
Help: "The duration of the last ping response",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionPingDuration = promauto.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_ping_duration",
|
||||||
|
Help: "The duration of the ping response",
|
||||||
|
Buckets: []float64{
|
||||||
|
0.1, 0.5, 1, 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionTotalPingCount = promauto.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_total_ping_count",
|
||||||
|
Help: "The total number of pings sent to the cloud",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionSessionRequestCount = promauto.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_session_total_request_count",
|
||||||
|
Help: "The total number of session requests received from the cloud",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionSessionRequestDuration = promauto.NewHistogram(
|
||||||
|
prometheus.HistogramOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_session_request_duration",
|
||||||
|
Help: "The duration of session requests",
|
||||||
|
Buckets: []float64{
|
||||||
|
0.1, 0.5, 1, 10,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionLastSessionRequestTimestamp = promauto.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_last_session_request_timestamp",
|
||||||
|
Help: "The timestamp of the last session request",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionLastSessionRequestDuration = promauto.NewGauge(
|
||||||
|
prometheus.GaugeOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_last_session_request_duration",
|
||||||
|
Help: "The duration of the last session request",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
metricCloudConnectionFailureCount = promauto.NewCounter(
|
||||||
|
prometheus.CounterOpts{
|
||||||
|
Name: "jetkvm_cloud_connection_failure_count",
|
||||||
|
Help: "The number of times the cloud connection has failed",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
func cloudResetMetrics(established bool) {
|
||||||
|
metricCloudConnectionLastPingTimestamp.Set(-1)
|
||||||
|
metricCloudConnectionLastPingDuration.Set(-1)
|
||||||
|
|
||||||
|
metricCloudConnectionLastSessionRequestTimestamp.Set(-1)
|
||||||
|
metricCloudConnectionLastSessionRequestDuration.Set(-1)
|
||||||
|
|
||||||
|
if established {
|
||||||
|
metricCloudConnectionEstablishedTimestamp.SetToCurrentTime()
|
||||||
|
metricCloudConnectionStatus.Set(1)
|
||||||
|
} else {
|
||||||
|
metricCloudConnectionEstablishedTimestamp.Set(-1)
|
||||||
|
metricCloudConnectionStatus.Set(-1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func handleCloudRegister(c *gin.Context) {
|
func handleCloudRegister(c *gin.Context) {
|
||||||
var req CloudRegisterRequest
|
var req CloudRegisterRequest
|
||||||
|
|
||||||
|
@ -90,11 +183,6 @@ func handleCloudRegister(c *gin.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.CloudToken == "" {
|
|
||||||
cloudLogger.Info("Starting websocket client due to adoption")
|
|
||||||
go RunWebsocketClient()
|
|
||||||
}
|
|
||||||
|
|
||||||
config.CloudToken = tokenResp.SecretToken
|
config.CloudToken = tokenResp.SecretToken
|
||||||
|
|
||||||
provider, err := oidc.NewProvider(c, "https://accounts.google.com")
|
provider, err := oidc.NewProvider(c, "https://accounts.google.com")
|
||||||
|
@ -130,19 +218,23 @@ func runWebsocketClient() error {
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
return fmt.Errorf("cloud token is not set")
|
return fmt.Errorf("cloud token is not set")
|
||||||
}
|
}
|
||||||
|
|
||||||
wsURL, err := url.Parse(config.CloudURL)
|
wsURL, err := url.Parse(config.CloudURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to parse config.CloudURL: %w", err)
|
return fmt.Errorf("failed to parse config.CloudURL: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if wsURL.Scheme == "http" {
|
if wsURL.Scheme == "http" {
|
||||||
wsURL.Scheme = "ws"
|
wsURL.Scheme = "ws"
|
||||||
} else {
|
} else {
|
||||||
wsURL.Scheme = "wss"
|
wsURL.Scheme = "wss"
|
||||||
}
|
}
|
||||||
|
|
||||||
header := http.Header{}
|
header := http.Header{}
|
||||||
header.Set("X-Device-ID", GetDeviceID())
|
header.Set("X-Device-ID", GetDeviceID())
|
||||||
header.Set("Authorization", "Bearer "+config.CloudToken)
|
header.Set("Authorization", "Bearer "+config.CloudToken)
|
||||||
dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout)
|
dialCtx, cancelDial := context.WithTimeout(context.Background(), CloudWebSocketConnectTimeout)
|
||||||
|
|
||||||
defer cancelDial()
|
defer cancelDial()
|
||||||
c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{
|
c, _, err := websocket.Dial(dialCtx, wsURL.String(), &websocket.DialOptions{
|
||||||
HTTPHeader: header,
|
HTTPHeader: header,
|
||||||
|
@ -152,17 +244,35 @@ func runWebsocketClient() error {
|
||||||
}
|
}
|
||||||
defer c.CloseNow() //nolint:errcheck
|
defer c.CloseNow() //nolint:errcheck
|
||||||
cloudLogger.Infof("websocket connected to %s", wsURL)
|
cloudLogger.Infof("websocket connected to %s", wsURL)
|
||||||
|
|
||||||
|
// set the metrics when we successfully connect to the cloud.
|
||||||
|
cloudResetMetrics(true)
|
||||||
|
|
||||||
runCtx, cancelRun := context.WithCancel(context.Background())
|
runCtx, cancelRun := context.WithCancel(context.Background())
|
||||||
defer cancelRun()
|
defer cancelRun()
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
time.Sleep(CloudWebSocketPingInterval)
|
time.Sleep(CloudWebSocketPingInterval)
|
||||||
|
|
||||||
|
// set the timer for the ping duration
|
||||||
|
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||||
|
metricCloudConnectionLastPingDuration.Set(v)
|
||||||
|
metricCloudConnectionPingDuration.Observe(v)
|
||||||
|
}))
|
||||||
|
|
||||||
err := c.Ping(runCtx)
|
err := c.Ping(runCtx)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cloudLogger.Warnf("websocket ping error: %v", err)
|
cloudLogger.Warnf("websocket ping error: %v", err)
|
||||||
cancelRun()
|
cancelRun()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// dont use `defer` here because we want to observe the duration of the ping
|
||||||
|
timer.ObserveDuration()
|
||||||
|
|
||||||
|
metricCloudConnectionTotalPingCount.Inc()
|
||||||
|
metricCloudConnectionLastPingTimestamp.SetToCurrentTime()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for {
|
for {
|
||||||
|
@ -184,6 +294,8 @@ func runWebsocketClient() error {
|
||||||
cloudLogger.Infof("new session request: %v", req.OidcGoogle)
|
cloudLogger.Infof("new session request: %v", req.OidcGoogle)
|
||||||
cloudLogger.Tracef("session request info: %v", req)
|
cloudLogger.Tracef("session request info: %v", req)
|
||||||
|
|
||||||
|
metricCloudConnectionSessionRequestCount.Inc()
|
||||||
|
metricCloudConnectionLastSessionRequestTimestamp.SetToCurrentTime()
|
||||||
err = handleSessionRequest(runCtx, c, req)
|
err = handleSessionRequest(runCtx, c, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cloudLogger.Infof("error starting new session: %v", err)
|
cloudLogger.Infof("error starting new session: %v", err)
|
||||||
|
@ -193,6 +305,12 @@ func runWebsocketClient() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
|
func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSessionRequest) error {
|
||||||
|
timer := prometheus.NewTimer(prometheus.ObserverFunc(func(v float64) {
|
||||||
|
metricCloudConnectionLastSessionRequestDuration.Set(v)
|
||||||
|
metricCloudConnectionSessionRequestDuration.Observe(v)
|
||||||
|
}))
|
||||||
|
defer timer.ObserveDuration()
|
||||||
|
|
||||||
oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout)
|
oidcCtx, cancelOIDC := context.WithTimeout(ctx, CloudOidcRequestTimeout)
|
||||||
defer cancelOIDC()
|
defer cancelOIDC()
|
||||||
provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com")
|
provider, err := oidc.NewProvider(oidcCtx, "https://accounts.google.com")
|
||||||
|
@ -253,9 +371,34 @@ func handleSessionRequest(ctx context.Context, c *websocket.Conn, req WebRTCSess
|
||||||
|
|
||||||
func RunWebsocketClient() {
|
func RunWebsocketClient() {
|
||||||
for {
|
for {
|
||||||
|
// reset the metrics when we start the websocket client.
|
||||||
|
cloudResetMetrics(false)
|
||||||
|
|
||||||
|
// If the cloud token is not set, we don't need to run the websocket client.
|
||||||
|
if config.CloudToken == "" {
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the network is not up, well, we can't connect to the cloud.
|
||||||
|
if !networkState.Up {
|
||||||
|
cloudLogger.Warn("waiting for network to be up, will retry in 3 seconds")
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the system time is not synchronized, the API request will fail anyway because the TLS handshake will fail.
|
||||||
|
if isTimeSyncNeeded() && !timeSyncSuccess {
|
||||||
|
cloudLogger.Warn("system time is not synced, will retry in 3 seconds")
|
||||||
|
time.Sleep(3 * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
err := runWebsocketClient()
|
err := runWebsocketClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cloudLogger.Errorf("websocket client error: %v", err)
|
cloudLogger.Errorf("websocket client error: %v", err)
|
||||||
|
metricCloudConnectionStatus.Set(0)
|
||||||
|
metricCloudConnectionFailureCount.Inc()
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
8
main.go
8
main.go
|
@ -72,11 +72,9 @@ func Main() {
|
||||||
if config.TLSMode != "" {
|
if config.TLSMode != "" {
|
||||||
go RunWebSecureServer()
|
go RunWebSecureServer()
|
||||||
}
|
}
|
||||||
// If the cloud token isn't set, the client won't be started by default.
|
// As websocket client already checks if the cloud token is set, we can start it here.
|
||||||
// However, if the user adopts the device via the web interface, handleCloudRegister will start the client.
|
go RunWebsocketClient()
|
||||||
if config.CloudToken != "" {
|
|
||||||
go RunWebsocketClient()
|
|
||||||
}
|
|
||||||
initSerialPort()
|
initSerialPort()
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
33
ntp.go
33
ntp.go
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/beevik/ntp"
|
"github.com/beevik/ntp"
|
||||||
|
@ -20,13 +21,41 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
builtTimestamp string
|
||||||
timeSyncRetryInterval = 0 * time.Second
|
timeSyncRetryInterval = 0 * time.Second
|
||||||
|
timeSyncSuccess = false
|
||||||
defaultNTPServers = []string{
|
defaultNTPServers = []string{
|
||||||
"time.cloudflare.com",
|
"time.cloudflare.com",
|
||||||
"time.apple.com",
|
"time.apple.com",
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func isTimeSyncNeeded() bool {
|
||||||
|
if builtTimestamp == "" {
|
||||||
|
logger.Warnf("Built timestamp is not set, time sync is needed")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
ts, err := strconv.Atoi(builtTimestamp)
|
||||||
|
if err != nil {
|
||||||
|
logger.Warnf("Failed to parse built timestamp: %v", err)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// builtTimestamp is UNIX timestamp in seconds
|
||||||
|
builtTime := time.Unix(int64(ts), 0)
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
logger.Tracef("Built time: %v, now: %v", builtTime, now)
|
||||||
|
|
||||||
|
if now.Sub(builtTime) < 0 {
|
||||||
|
logger.Warnf("System time is behind the built time, time sync is needed")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func TimeSyncLoop() {
|
func TimeSyncLoop() {
|
||||||
for {
|
for {
|
||||||
if !networkState.checked {
|
if !networkState.checked {
|
||||||
|
@ -40,6 +69,9 @@ func TimeSyncLoop() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if time sync is needed, but do nothing for now
|
||||||
|
isTimeSyncNeeded()
|
||||||
|
|
||||||
logger.Infof("Syncing system time")
|
logger.Infof("Syncing system time")
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
err := SyncSystemTime()
|
err := SyncSystemTime()
|
||||||
|
@ -56,6 +88,7 @@ func TimeSyncLoop() {
|
||||||
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
timeSyncSuccess = true
|
||||||
logger.Infof("Time sync successful, now is: %v, time taken: %v", time.Now(), time.Since(start))
|
logger.Infof("Time sync successful, now is: %v, time taken: %v", time.Now(), time.Since(start))
|
||||||
time.Sleep(timeSyncInterval) // after the first sync is done
|
time.Sleep(timeSyncInterval) // after the first sync is done
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue