clean up code

This commit is contained in:
Siyuan 2025-11-13 12:18:49 +00:00
parent d6c97d17ef
commit 91d3b47ec3
8 changed files with 317 additions and 235 deletions

View File

@ -28,14 +28,20 @@ func program() {
} }
switch subcomponent { switch subcomponent {
case "native": case "native":
gspt.SetProcTitle(os.Args[0] + " [native]")
native.RunNativeProcess(os.Args[0]) native.RunNativeProcess(os.Args[0])
default: default:
gspt.SetProcTitle(os.Args[0] + " [app]")
kvm.Main() kvm.Main()
} }
} }
func setProcTitle(status string) {
if status != "" {
status = " " + status
}
title := fmt.Sprintf("jetkvm: [supervisor]%s", status)
gspt.SetProcTitle(title)
}
func main() { func main() {
versionPtr := flag.Bool("version", false, "print version and exit") versionPtr := flag.Bool("version", false, "print version and exit")
versionJSONPtr := flag.Bool("version-json", false, "print version as json and exit") versionJSONPtr := flag.Bool("version-json", false, "print version as json and exit")
@ -65,6 +71,8 @@ func main() {
} }
func supervise() error { func supervise() error {
setProcTitle("")
// check binary path // check binary path
binPath, err := os.Executable() binPath, err := os.Executable()
if err != nil { if err != nil {
@ -109,6 +117,8 @@ func supervise() error {
return fmt.Errorf("failed to start command: %w", startErr) return fmt.Errorf("failed to start command: %w", startErr)
} }
setProcTitle(fmt.Sprintf("started (pid=%d)", cmd.Process.Pid))
go func() { go func() {
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGTERM) signal.Notify(sigChan, syscall.SIGTERM)
@ -117,8 +127,6 @@ func supervise() error {
_ = cmd.Process.Signal(sig) _ = cmd.Process.Signal(sig)
}() }()
gspt.SetProcTitle(os.Args[0] + " [sup]")
cmdErr := cmd.Wait() cmdErr := cmd.Wait()
if cmdErr == nil { if cmdErr == nil {
return nil return nil

View File

@ -29,15 +29,27 @@ type GRPCClient struct {
eventCh chan *pb.Event eventCh chan *pb.Event
eventDone chan struct{} eventDone chan struct{}
onVideoStateChange func(state VideoState)
onIndevEvent func(event string)
onRpcEvent func(event string)
closed bool closed bool
closeM sync.Mutex closeM sync.Mutex
} }
type grpcClientOptions struct {
SocketPath string
Logger *zerolog.Logger
OnVideoStateChange func(state VideoState)
OnIndevEvent func(event string)
OnRpcEvent func(event string)
}
// NewGRPCClient creates a new gRPC client connected to the native service // NewGRPCClient creates a new gRPC client connected to the native service
func NewGRPCClient(socketPath string, logger *zerolog.Logger) (*GRPCClient, error) { func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) {
// Connect to the Unix domain socket // Connect to the Unix domain socket
conn, err := grpc.NewClient( conn, err := grpc.NewClient(
fmt.Sprintf("unix-abstract:%v", socketPath), fmt.Sprintf("unix-abstract:%v", opts.SocketPath),
grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithTransportCredentials(insecure.NewCredentials()),
) )
if err != nil { if err != nil {
@ -49,9 +61,12 @@ func NewGRPCClient(socketPath string, logger *zerolog.Logger) (*GRPCClient, erro
grpcClient := &GRPCClient{ grpcClient := &GRPCClient{
conn: conn, conn: conn,
client: client, client: client,
logger: logger, logger: opts.Logger,
eventCh: make(chan *pb.Event, 100), eventCh: make(chan *pb.Event, 100),
eventDone: make(chan struct{}), eventDone: make(chan struct{}),
onVideoStateChange: opts.OnVideoStateChange,
onIndevEvent: opts.OnIndevEvent,
onRpcEvent: opts.OnRpcEvent,
} }
// Start event stream // Start event stream
@ -60,8 +75,51 @@ func NewGRPCClient(socketPath string, logger *zerolog.Logger) (*GRPCClient, erro
return grpcClient, nil return grpcClient, nil
} }
func (c *GRPCClient) setStream(stream pb.NativeService_StreamEventsClient) {
c.eventM.Lock()
defer c.eventM.Unlock()
c.eventStream = stream
}
func (c *GRPCClient) handleEventStream(stream pb.NativeService_StreamEventsClient) {
logger := *c.logger
for {
if stream == nil {
logger.Error().Msg("event stream is nil")
break
}
event, err := stream.Recv()
// enrich the logger with the event type and data, if debug mode is enabled
if c.logger.GetLevel() <= zerolog.DebugLevel {
logger = logger.With().
Str("type", event.Type).
Interface("data", event.Data).
Logger()
}
if err != nil {
if errors.Is(err, io.EOF) {
logger.Debug().Msg("event stream closed")
} else {
logger.Warn().Err(err).Msg("event stream error")
}
break
}
logger.Trace().Msg("received event")
select {
case c.eventCh <- event:
default:
logger.Warn().Msg("event channel full, dropping event")
}
}
}
func (c *GRPCClient) startEventStream() { func (c *GRPCClient) startEventStream() {
for { for {
// check if the client is closed
c.closeM.Lock() c.closeM.Lock()
if c.closed { if c.closed {
c.closeM.Unlock() c.closeM.Unlock()
@ -77,33 +135,9 @@ func (c *GRPCClient) startEventStream() {
continue continue
} }
c.eventM.Lock() c.setStream(stream)
c.eventStream = stream c.handleEventStream(stream)
c.eventM.Unlock() c.setStream(nil)
for {
event, err := stream.Recv()
if err == io.EOF {
c.logger.Debug().Msg("event stream closed")
break
}
if err != nil {
c.logger.Warn().Err(err).Msg("event stream error")
break
}
c.logger.Info().Str("type", event.Type).Msg("received event")
select {
case c.eventCh <- event:
default:
c.logger.Warn().Msg("event channel full, dropping event")
}
}
c.eventM.Lock()
c.eventStream = nil
c.eventM.Unlock()
// Wait before retrying // Wait before retrying
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -111,7 +145,8 @@ func (c *GRPCClient) startEventStream() {
} }
func (c *GRPCClient) checkIsReady(ctx context.Context) error { func (c *GRPCClient) checkIsReady(ctx context.Context) error {
c.logger.Info().Msg("connection is idle, connecting...") c.logger.Trace().Msg("connection is idle, connecting ...")
resp, err := c.client.IsReady(ctx, &pb.IsReadyRequest{}) resp, err := c.client.IsReady(ctx, &pb.IsReadyRequest{})
if err != nil { if err != nil {
if errors.Is(err, status.Error(codes.Unavailable, "")) { if errors.Is(err, status.Error(codes.Unavailable, "")) {
@ -130,8 +165,16 @@ func (c *GRPCClient) WaitReady() error {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel() defer cancel()
prevState := connectivity.Idle
for { for {
state := c.conn.GetState() state := c.conn.GetState()
c.logger.
With().
Str("state", state.String()).
Int("prev_state", int(prevState)).
Logger()
prevState = state
if state == connectivity.Idle || state == connectivity.Ready { if state == connectivity.Idle || state == connectivity.Ready {
if err := c.checkIsReady(ctx); err != nil { if err := c.checkIsReady(ctx); err != nil {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
@ -139,7 +182,8 @@ func (c *GRPCClient) WaitReady() error {
} }
} }
c.logger.Info().Str("state", state.String()).Msg("waiting for connection to be ready") c.logger.Info().Msg("waiting for connection to be ready")
if state == connectivity.Ready { if state == connectivity.Ready {
return nil return nil
} }
@ -153,47 +197,42 @@ func (c *GRPCClient) WaitReady() error {
} }
} }
func (c *GRPCClient) handleEvent(event *pb.Event) {
switch event.Type {
case "video_state_change":
state := event.GetVideoState()
if state == nil {
c.logger.Warn().Msg("video state event is nil")
return
}
c.onVideoStateChange(VideoState{
Ready: state.Ready,
Error: state.Error,
Width: int(state.Width),
Height: int(state.Height),
FramePerSecond: state.FramePerSecond,
})
case "indev_event":
c.onIndevEvent(event.GetIndevEvent())
case "rpc_event":
c.onRpcEvent(event.GetRpcEvent())
default:
c.logger.Warn().Str("type", event.Type).Msg("unknown event type")
}
}
// OnEvent registers an event handler // OnEvent registers an event handler
func (c *GRPCClient) OnEvent(eventType string, handler func(data interface{})) { func (c *GRPCClient) OnEvent(eventType string, handler func(data interface{})) {
go func() {
for {
select {
case event := <-c.eventCh:
c.handleEvent(event)
case <-c.eventDone:
return return
// go func() { }
// for { }
// select { }()
// case event := <-c.eventCh:
// if event.Type == eventType {
// var data interface{}
// switch eventType {
// case "video_state_change":
// if event.VideoState != nil {
// data = VideoState{
// Ready: event.VideoState.Ready,
// Error: event.VideoState.Error,
// Width: int(event.VideoState.Width),
// Height: int(event.VideoState.Height),
// FramePerSecond: event.VideoState.FramePerSecond,
// }
// }
// case "indev_event":
// data = event.IndevEvent
// case "rpc_event":
// data = event.RpcEvent
// case "video_frame":
// if event.VideoFrame != nil {
// data = map[string]interface{}{
// "frame": event.VideoFrame.Frame,
// "duration": time.Duration(event.VideoFrame.DurationNs),
// }
// }
// }
// if data != nil {
// handler(data)
// }
// }
// case <-c.eventDone:
// return
// }
// }
// }()
} }
// Close closes the gRPC client // Close closes the gRPC client
@ -209,7 +248,9 @@ func (c *GRPCClient) Close() error {
c.eventM.Lock() c.eventM.Lock()
if c.eventStream != nil { if c.eventStream != nil {
c.eventStream.CloseSend() if err := c.eventStream.CloseSend(); err != nil {
c.logger.Warn().Err(err).Msg("failed to close event stream")
}
} }
c.eventM.Unlock() c.eventM.Unlock()

View File

@ -6,6 +6,7 @@ import (
"net" "net"
"sync" "sync"
"github.com/erikdubbelboer/gspt"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -314,6 +315,8 @@ func (s *grpcServer) DoNotUseThisIsForCrashTestingOnly(ctx context.Context, req
// StreamEvents streams events from the native process // StreamEvents streams events from the native process
func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamEventsServer) error { func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamEventsServer) error {
gspt.SetProcTitle("jetkvm: [native] connected")
eventCh := make(chan *pb.Event, 100) eventCh := make(chan *pb.Event, 100)
// Register this channel for events // Register this channel for events
@ -349,7 +352,6 @@ func (s *grpcServer) StreamEvents(req *pb.Empty, stream pb.NativeService_StreamE
// StartGRPCServer starts the gRPC server on a Unix domain socket // StartGRPCServer starts the gRPC server on a Unix domain socket
func StartGRPCServer(server *grpcServer, socketPath string, logger *zerolog.Logger) (*grpc.Server, net.Listener, error) { func StartGRPCServer(server *grpcServer, socketPath string, logger *zerolog.Logger) (*grpc.Server, net.Listener, error) {
lis, err := net.Listen("unix", socketPath) lis, err := net.Listen("unix", socketPath)
if err != nil { if err != nil {
return nil, nil, fmt.Errorf("failed to listen on socket: %w", err) return nil, nil, fmt.Errorf("failed to listen on socket: %w", err)

View File

@ -34,4 +34,3 @@ type NativeInterface interface {
SwitchToScreenIfDifferent(screenName string) SwitchToScreenIfDifferent(screenName string)
DoNotUseThisIsForCrashTestingOnly() DoNotUseThisIsForCrashTestingOnly()
} }

View File

@ -1,10 +1,14 @@
package native package native
import ( import (
"crypto/rand"
"encoding/hex"
"fmt" "fmt"
"net" "net"
"os" "os"
"os/exec" "os/exec"
"runtime"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -36,7 +40,19 @@ type nativeProxyOptions struct {
OnVideoStateChange func(state VideoState) OnVideoStateChange func(state VideoState)
} }
func randomId(binaryLength int) string {
s := make([]byte, binaryLength)
_, err := rand.Read(s)
if err != nil {
nativeLogger.Error().Err(err).Msg("failed to generate random ID")
return strings.Repeat("0", binaryLength*2) // return all zeros if error
}
return hex.EncodeToString(s)
}
func (n *NativeOptions) toProxyOptions() *nativeProxyOptions { func (n *NativeOptions) toProxyOptions() *nativeProxyOptions {
// random 16 bytes hex string
handshakeMessage := randomId(16)
return &nativeProxyOptions{ return &nativeProxyOptions{
Disable: n.Disable, Disable: n.Disable,
SystemVersion: n.SystemVersion, SystemVersion: n.SystemVersion,
@ -47,6 +63,7 @@ func (n *NativeOptions) toProxyOptions() *nativeProxyOptions {
OnIndevEvent: n.OnIndevEvent, OnIndevEvent: n.OnIndevEvent,
OnRpcEvent: n.OnRpcEvent, OnRpcEvent: n.OnRpcEvent,
OnVideoStateChange: n.OnVideoStateChange, OnVideoStateChange: n.OnVideoStateChange,
HandshakeMessage: handshakeMessage,
} }
} }
@ -63,13 +80,14 @@ func (p *nativeProxyOptions) toNativeOptions() *NativeOptions {
// cmdWrapper wraps exec.Cmd to implement processCmd interface // cmdWrapper wraps exec.Cmd to implement processCmd interface
type cmdWrapper struct { type cmdWrapper struct {
*exec.Cmd *exec.Cmd
stdoutHandler *nativeProxyStdoutHandler
} }
func (c *cmdWrapper) GetProcess() interface { func (c *cmdWrapper) GetProcess() interface {
Kill() error Kill() error
Signal(sig interface{}) error Signal(sig interface{}) error
} { } {
return &processWrapper{Process: c.Cmd.Process} return &processWrapper{Process: c.Process}
} }
type processWrapper struct { type processWrapper struct {
@ -107,8 +125,8 @@ type NativeProxy struct {
// NewNativeProxy creates a new NativeProxy that spawns a separate process // NewNativeProxy creates a new NativeProxy that spawns a separate process
func NewNativeProxy(opts NativeOptions) (*NativeProxy, error) { func NewNativeProxy(opts NativeOptions) (*NativeProxy, error) {
proxyOptions := opts.toProxyOptions() proxyOptions := opts.toProxyOptions()
proxyOptions.CtrlUnixSocket = "jetkvm-native-grpc" proxyOptions.CtrlUnixSocket = fmt.Sprintf("jetkvm/native/grpc/%s", randomId(4))
proxyOptions.VideoStreamUnixSocket = "@jetkvm-native-video-stream" proxyOptions.VideoStreamUnixSocket = fmt.Sprintf("@jetkvm/native/video-stream/%s", randomId(4))
// Get the current executable path to spawn itself // Get the current executable path to spawn itself
exePath, err := os.Executable() exePath, err := os.Executable()
@ -125,54 +143,95 @@ func NewNativeProxy(opts NativeOptions) (*NativeProxy, error) {
options: proxyOptions, options: proxyOptions,
processWait: make(chan error, 1), processWait: make(chan error, 1),
} }
proxy.cmd, err = proxy.spawnProcess() proxy.cmd, err = proxy.toProcessCommand()
nativeLogger.Info().Msg("spawned process")
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to spawn process: %w", err) return nil, fmt.Errorf("failed to create process: %w", err)
} }
// create unix packet
listener, err := net.Listen("unixpacket", proxyOptions.VideoStreamUnixSocket)
if err != nil {
nativeLogger.Warn().Err(err).Msg("failed to start server")
return nil, fmt.Errorf("failed to start server: %w", err)
}
go func() {
for {
conn, err := listener.Accept()
if err != nil {
nativeLogger.Warn().Err(err).Msg("failed to accept socket")
continue
}
nativeLogger.Info().Str("socket", conn.RemoteAddr().String()).Msg("accepted socket")
go proxy.handleVideoFrame(conn)
}
}()
return proxy, nil return proxy, nil
} }
func (p *NativeProxy) spawnProcess() (*cmdWrapper, error) { func (p *NativeProxy) startVideoStreamListener() error {
if p.videoStreamListener != nil {
return nil
}
logger := p.logger.With().Str("socketPath", p.videoStreamUnixSocket).Logger()
listener, err := net.Listen("unixpacket", p.videoStreamUnixSocket)
if err != nil {
logger.Warn().Err(err).Msg("failed to start video stream listener")
return fmt.Errorf("failed to start video stream listener: %w", err)
}
logger.Info().Msg("video stream listener started")
p.videoStreamListener = listener
go func() {
for {
conn, err := listener.Accept()
if err != nil {
logger.Warn().Err(err).Msg("failed to accept socket")
continue
}
logger.Info().Msg("video stream socket accepted")
go p.handleVideoFrame(conn)
}
}()
return nil
}
type nativeProxyStdoutHandler struct {
mu *sync.Mutex
handshakeCh chan bool
handshakeMessage string
handshakeDone bool
}
func (w *nativeProxyStdoutHandler) Write(p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()
if !w.handshakeDone && strings.Contains(string(p), w.handshakeMessage) {
w.handshakeDone = true
w.handshakeCh <- true
}
os.Stdout.Write(p)
return len(p), nil
}
func (p *NativeProxy) toProcessCommand() (*cmdWrapper, error) {
envArgs, err := utils.MarshalEnv(p.options) envArgs, err := utils.MarshalEnv(p.options)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal environment variables: %w", err) return nil, fmt.Errorf("failed to marshal environment variables: %w", err)
} }
cmd := exec.Command( cmd := &cmdWrapper{
Cmd: exec.Command(
p.binaryPath, p.binaryPath,
"-subcomponent=native", "-subcomponent=native",
) ),
// cmd.Stdout = os.Stdout // Forward stdout to parent stdoutHandler: &nativeProxyStdoutHandler{
cmd.Stderr = os.Stderr // Forward stderr to parent mu: &sync.Mutex{},
handshakeCh: make(chan bool),
handshakeMessage: p.options.HandshakeMessage,
},
}
cmd.Stdout = cmd.stdoutHandler
cmd.Stderr = os.Stderr
cmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
Pdeathsig: syscall.SIGTERM,
}
// Set environment variable to indicate native process mode // Set environment variable to indicate native process mode
cmd.Env = append( cmd.Env = append(
os.Environ(), os.Environ(),
envArgs..., envArgs...,
) )
// Wrap cmd to implement processCmd interface
wrappedCmd := &cmdWrapper{Cmd: cmd}
return wrappedCmd, nil return cmd, nil
} }
func (p *NativeProxy) handleVideoFrame(conn net.Conn) { func (p *NativeProxy) handleVideoFrame(conn net.Conn) {
@ -184,7 +243,7 @@ func (p *NativeProxy) handleVideoFrame(conn net.Conn) {
for { for {
n, err := conn.Read(inboundPacket) n, err := conn.Read(inboundPacket)
if err != nil { if err != nil {
nativeLogger.Warn().Err(err).Msg("failed to accept socket") p.logger.Warn().Err(err).Msg("failed to read video frame from socket")
break break
} }
now := time.Now() now := time.Now()
@ -194,25 +253,27 @@ func (p *NativeProxy) handleVideoFrame(conn net.Conn) {
} }
} }
// Start starts the native process func (p *NativeProxy) setUpGRPCClient() error {
func (p *NativeProxy) Start() error { // wait until handshake completed
p.restartM.Lock() select {
defer p.restartM.Unlock() case <-p.cmd.stdoutHandler.handshakeCh:
p.logger.Info().Msg("handshake completed")
if p.stopped { case <-time.After(10 * time.Second):
return fmt.Errorf("proxy is stopped") return fmt.Errorf("handshake not completed within 10 seconds")
} }
if err := p.cmd.Start(); err != nil { logger := p.logger.With().Str("socketPath", "@"+p.nativeUnixSocket).Logger()
return fmt.Errorf("failed to start native process: %w", err) client, err := NewGRPCClient(grpcClientOptions{
} SocketPath: p.nativeUnixSocket,
Logger: &logger,
OnIndevEvent: p.options.OnIndevEvent,
OnRpcEvent: p.options.OnRpcEvent,
OnVideoStateChange: p.options.OnVideoStateChange,
})
nativeLogger.Info().Msg("process ready") logger.Info().Msg("created gRPC client")
client, err := NewGRPCClient(p.nativeUnixSocket, nativeLogger)
nativeLogger.Info().Str("socket_path", p.nativeUnixSocket).Msg("created client")
if err != nil { if err != nil {
return fmt.Errorf("failed to create IPC client: %w", err) return fmt.Errorf("failed to create gRPC client: %w", err)
} }
p.client = client p.client = client
@ -226,11 +287,46 @@ func (p *NativeProxy) Start() error {
return fmt.Errorf("failed to wait for ready: %w", err) return fmt.Errorf("failed to wait for ready: %w", err)
} }
// Set up event handlers
p.setupEventHandlers(client)
// Start monitoring process for crashes // Start monitoring process for crashes
go p.monitorProcess() go p.monitorProcess()
return nil
}
func (p *NativeProxy) start() error {
// lock OS thread to prevent the process from being moved to a different thread
// see also https://go.dev/issue/27505
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if err := p.cmd.Start(); err != nil {
return fmt.Errorf("failed to start native process: %w", err)
}
p.logger.Info().Int("pid", p.cmd.Process.Pid).Msg("native process started")
if err := p.setUpGRPCClient(); err != nil {
return fmt.Errorf("failed to set up gRPC client: %w", err)
}
return nil
}
// Start starts the native process
func (p *NativeProxy) Start() error {
p.restartM.Lock()
defer p.restartM.Unlock()
if p.stopped {
return fmt.Errorf("proxy is stopped")
}
if err := p.startVideoStreamListener(); err != nil {
return fmt.Errorf("failed to start video stream listener: %w", err)
}
if err := p.start(); err != nil {
return fmt.Errorf("failed to start native process: %w", err)
}
close(p.ready) close(p.ready)
return nil return nil
@ -289,111 +385,19 @@ func (p *NativeProxy) restartProcess() error {
return fmt.Errorf("proxy is stopped") return fmt.Errorf("proxy is stopped")
} }
wrappedCmd, err := p.spawnProcess()
if err != nil {
return fmt.Errorf("failed to spawn process: %w", err)
}
// Close old client // Close old client
if p.client != nil { if p.client != nil {
_ = p.client.Close() _ = p.client.Close()
} }
// Create new client if err := p.start(); err != nil {
client, err := NewGRPCClient(p.nativeUnixSocket, p.logger)
if err != nil {
return fmt.Errorf("failed to create IPC client: %w", err)
}
// Set up event handlers again
p.setupEventHandlers(client)
// Start the process
if err := wrappedCmd.Start(); err != nil {
return fmt.Errorf("failed to start native process: %w", err) return fmt.Errorf("failed to start native process: %w", err)
} }
// Wait for ready
if err := client.WaitReady(); err != nil {
if wrappedCmd.Process != nil {
_ = wrappedCmd.Process.Kill()
_ = wrappedCmd.Wait()
}
return fmt.Errorf("timeout waiting for ready: %w", err)
}
p.cmd = wrappedCmd
p.client = client
p.logger.Info().Msg("native process restarted successfully") p.logger.Info().Msg("native process restarted successfully")
return nil return nil
} }
func (p *NativeProxy) setupEventHandlers(client *GRPCClient) {
// if p.opts.OnVideoStateChange != nil {
// client.OnEvent("video_state_change", func(data interface{}) {
// dataBytes, err := json.Marshal(data)
// if err != nil {
// p.logger.Warn().Err(err).Msg("failed to marshal video state event")
// return
// }
// var state VideoState
// if err := json.Unmarshal(dataBytes, &state); err != nil {
// p.logger.Warn().Err(err).Msg("failed to unmarshal video state event")
// return
// }
// p.opts.OnVideoStateChange(state)
// })
// }
// if p.opts.OnIndevEvent != nil {
// client.OnEvent("indev_event", func(data interface{}) {
// if event, ok := data.(string); ok {
// p.opts.OnIndevEvent(event)
// }
// })
// }
// if p.opts.OnRpcEvent != nil {
// client.OnEvent("rpc_event", func(data interface{}) {
// if event, ok := data.(string); ok {
// p.opts.OnRpcEvent(event)
// }
// })
// }
// if p.opts.OnVideoFrameReceived != nil {
// client.OnEvent("video_frame", func(data interface{}) {
// dataMap, ok := data.(map[string]interface{})
// if !ok {
// p.logger.Warn().Msg("invalid video frame event data")
// return
// }
// frameData, ok := dataMap["frame"].([]interface{})
// if !ok {
// p.logger.Warn().Msg("invalid frame data in event")
// return
// }
// frame := make([]byte, len(frameData))
// for i, v := range frameData {
// if b, ok := v.(float64); ok {
// frame[i] = byte(b)
// }
// }
// durationNs, ok := dataMap["duration"].(float64)
// if !ok {
// p.logger.Warn().Msg("invalid duration in event")
// return
// }
// p.opts.OnVideoFrameReceived(frame, time.Duration(durationNs))
// })
// }
}
// Stop stops the native process // Stop stops the native process
func (p *NativeProxy) Stop() error { func (p *NativeProxy) Stop() error {
p.restartM.Lock() p.restartM.Lock()

View File

@ -21,7 +21,7 @@ func RunNativeProcess(binaryName string) {
logger := *nativeLogger logger := *nativeLogger
// Initialize logger // Initialize logger
gspt.SetProcTitle(binaryName + " [native]") gspt.SetProcTitle("jetkvm: [native] starting")
var proxyOptions nativeProxyOptions var proxyOptions nativeProxyOptions
if err := env.Parse(&proxyOptions); err != nil { if err := env.Parse(&proxyOptions); err != nil {
@ -45,12 +45,14 @@ func RunNativeProcess(binaryName string) {
// Create native instance // Create native instance
nativeInstance := NewNative(*nativeOptions) nativeInstance := NewNative(*nativeOptions)
gspt.SetProcTitle("jetkvm: [native] initializing")
// Start native instance // Start native instance
if err := nativeInstance.Start(); err != nil { if err := nativeInstance.Start(); err != nil {
logger.Fatal().Err(err).Msg("failed to start native instance") logger.Fatal().Err(err).Msg("failed to start native instance")
} }
gspt.SetProcTitle("jetkvm: [native] starting gRPC server")
// Create gRPC server // Create gRPC server
grpcServer := NewGRPCServer(nativeInstance, &logger) grpcServer := NewGRPCServer(nativeInstance, &logger)
@ -60,11 +62,14 @@ func RunNativeProcess(binaryName string) {
if err != nil { if err != nil {
logger.Fatal().Err(err).Msg("failed to start gRPC server") logger.Fatal().Err(err).Msg("failed to start gRPC server")
} }
gspt.SetProcTitle(binaryName + " [native] ready") gspt.SetProcTitle("jetkvm: [native] ready")
// Signal that we're ready by writing socket path to stdout (for parent to read) // Signal that we're ready by writing handshake message to stdout (for parent to read)
fmt.Fprintf(os.Stdout, "%s\n", proxyOptions.CtrlUnixSocket) // Stdout.Write is used to avoid buffering the message
defer os.Stdout.Close() _, err = os.Stdout.Write([]byte(proxyOptions.HandshakeMessage + "\n"))
if err != nil {
logger.Fatal().Err(err).Msg("failed to write handshake message to stdout")
}
// Set up signal handling // Set up signal handling
sigChan := make(chan os.Signal, 1) sigChan := make(chan os.Signal, 1)

23
main.go
View File

@ -2,22 +2,36 @@ package kvm
import ( import (
"context" "context"
"fmt"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
"time" "time"
"github.com/erikdubbelboer/gspt"
"github.com/gwatts/rootcerts" "github.com/gwatts/rootcerts"
) )
var appCtx context.Context var appCtx context.Context
var procPrefix string = "jetkvm: [app]"
func setProcTitle(status string) {
if status != "" {
status = " " + status
}
title := fmt.Sprintf("%s%s", procPrefix, status)
gspt.SetProcTitle(title)
}
func Main() { func Main() {
setProcTitle("starting")
logger.Log().Msg("JetKVM Starting Up") logger.Log().Msg("JetKVM Starting Up")
checkFailsafeReason() checkFailsafeReason()
if failsafeModeActive { if failsafeModeActive {
procPrefix = "jetkvm: [app+failsafe]"
logger.Warn().Str("reason", failsafeModeReason).Msg("failsafe mode activated") logger.Warn().Str("reason", failsafeModeReason).Msg("failsafe mode activated")
} }
@ -40,6 +54,7 @@ func Main() {
go runWatchdog() go runWatchdog()
go confirmCurrentSystem() go confirmCurrentSystem()
setProcTitle("initNative")
initNative(systemVersionLocal, appVersionLocal) initNative(systemVersionLocal, appVersionLocal)
initDisplay() initDisplay()
@ -54,6 +69,7 @@ func Main() {
Msg("loaded Root CA certificates") Msg("loaded Root CA certificates")
// Initialize network // Initialize network
setProcTitle("initNetwork")
if err := initNetwork(); err != nil { if err := initNetwork(); err != nil {
logger.Error().Err(err).Msg("failed to initialize network") logger.Error().Err(err).Msg("failed to initialize network")
// TODO: reset config to default // TODO: reset config to default
@ -61,17 +77,21 @@ func Main() {
} }
// Initialize time sync // Initialize time sync
setProcTitle("initTimeSync")
initTimeSync() initTimeSync()
timeSync.Start() timeSync.Start()
// Initialize mDNS // Initialize mDNS
setProcTitle("initMdns")
if err := initMdns(); err != nil { if err := initMdns(); err != nil {
logger.Error().Err(err).Msg("failed to initialize mDNS") logger.Error().Err(err).Msg("failed to initialize mDNS")
} }
setProcTitle("initPrometheus")
initPrometheus() initPrometheus()
// initialize usb gadget // initialize usb gadget
setProcTitle("initUsbGadget")
initUsbGadget() initUsbGadget()
if err := setInitialVirtualMediaState(); err != nil { if err := setInitialVirtualMediaState(); err != nil {
logger.Warn().Err(err).Msg("failed to set initial virtual media state") logger.Warn().Err(err).Msg("failed to set initial virtual media state")
@ -135,6 +155,9 @@ func Main() {
initPublicIPState() initPublicIPState()
initSerialPort() initSerialPort()
setProcTitle("ready")
sigs := make(chan os.Signal, 1) sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs <-sigs

View File

@ -16,7 +16,7 @@ var (
) )
func initNative(systemVersion *semver.Version, appVersion *semver.Version) { func initNative(systemVersion *semver.Version, appVersion *semver.Version) {
nativeLogger.Info().Msg("initializing native") nativeLogger.Info().Msg("initializing native proxy")
var err error var err error
nativeInstance, err = native.NewNativeProxy(native.NativeOptions{ nativeInstance, err = native.NewNativeProxy(native.NativeOptions{
Disable: failsafeModeActive, Disable: failsafeModeActive,