diff --git a/internal/native/proxy.go b/internal/native/proxy.go index b5091a19..dda6a86e 100644 --- a/internal/native/proxy.go +++ b/internal/native/proxy.go @@ -120,15 +120,20 @@ type NativeProxy struct { videoStreamListener net.Listener binaryPath string - client *GRPCClient - clientMu sync.RWMutex - cmd *cmdWrapper - logger *zerolog.Logger - ready chan struct{} - options *nativeProxyOptions - restartMu sync.Mutex - restarts uint - stopped bool + startMu sync.Mutex // mutex for the start process (context and isStopped) + ctx context.Context + cancel context.CancelFunc + + client *GRPCClient + clientMu sync.RWMutex // mutex for the client + + cmd *cmdWrapper + cmdMu sync.Mutex // mutex for the cmd + + logger *zerolog.Logger + options *nativeProxyOptions + restarts uint + stopped bool } // NewNativeProxy creates a new NativeProxy that spawns a separate process @@ -147,7 +152,6 @@ func NewNativeProxy(opts NativeOptions) (*NativeProxy, error) { videoStreamUnixSocket: proxyOptions.VideoStreamUnixSocket, binaryPath: exePath, logger: nativeLogger, - ready: make(chan struct{}), options: proxyOptions, restarts: 0, } @@ -264,10 +268,8 @@ func (p *NativeProxy) handleVideoFrame(conn net.Conn) { } } +// it should be only called by start() method, as it isn't thread-safe func (p *NativeProxy) setUpGRPCClient() error { - p.clientMu.Lock() - defer p.clientMu.Unlock() - // wait until handshake completed select { case <-p.cmd.stdoutHandler.handshakeCh: @@ -306,13 +308,13 @@ func (p *NativeProxy) setUpGRPCClient() error { go p.options.OnNativeRestart() } - // Start monitoring process for crashes - go p.monitorProcess(client.getContext()) - return nil } -func (p *NativeProxy) start() error { +func (p *NativeProxy) doStart() error { + p.cmdMu.Lock() + defer p.cmdMu.Unlock() + // lock OS thread to prevent the process from being moved to a different thread // see also https://go.dev/issue/27505 runtime.LockOSThread() @@ -345,8 +347,10 @@ func (p *NativeProxy) start() error { // Start starts the native process func (p *NativeProxy) Start() error { - p.restartMu.Lock() - defer p.restartMu.Unlock() + p.startMu.Lock() + defer p.startMu.Unlock() + + p.ctx, p.cancel = context.WithCancel(context.Background()) if p.stopped { return fmt.Errorf("proxy is stopped") @@ -356,51 +360,46 @@ func (p *NativeProxy) Start() error { return fmt.Errorf("failed to start video stream listener: %w", err) } - if err := p.start(); err != nil { + if err := p.doStart(); err != nil { return fmt.Errorf("failed to start native process: %w", err) } - close(p.ready) + go p.monitorProcess() + return nil } // monitorProcess monitors the native process and restarts it if it crashes -func (p *NativeProxy) monitorProcess(ctx context.Context) { +func (p *NativeProxy) monitorProcess() { for { - select { - case <-ctx.Done(): - return - default: - } - - p.restartMu.Lock() - cmd := p.cmd - stopped := p.stopped - p.restartMu.Unlock() - - if stopped { - return - } - - if cmd == nil { - return - } - - err := cmd.Wait() - // check if the context is done - // if yes, it means that there should be already one restart in progress - select { - case <-ctx.Done(): - return - default: - } - - p.restartMu.Lock() if p.stopped { - p.restartMu.Unlock() return } - p.restartMu.Unlock() + + select { + case <-p.ctx.Done(): + p.logger.Trace().Msg("context done, stopping monitor process [before wait]") + return + default: + } + + p.cmdMu.Lock() + err := fmt.Errorf("native process not started") + if p.cmd != nil { + err = p.cmd.Wait() + } + p.cmdMu.Unlock() + + if p.stopped { + return + } + + select { + case <-p.ctx.Done(): + p.logger.Trace().Msg("context done, stopping monitor process [after wait]") + return + default: + } p.logger.Warn().Err(err).Msg("native process exited, restarting ...") @@ -419,9 +418,6 @@ func (p *NativeProxy) monitorProcess(ctx context.Context) { // restartProcess restarts the native process func (p *NativeProxy) restartProcess() error { - p.restartMu.Lock() - defer p.restartMu.Unlock() - p.restarts++ logger := p.logger.With().Uint("attempt", p.restarts).Uint("maxAttempts", p.options.MaxRestartAttempts).Logger() @@ -446,7 +442,7 @@ func (p *NativeProxy) restartProcess() error { logger.Info().Msg("gRPC client closed") logger.Info().Msg("attempting to restart native process") - if err := p.start(); err != nil { + if err := p.doStart(); err != nil { logger.Error().Err(err).Msg("failed to start native process") return fmt.Errorf("failed to start native process: %w", err) } @@ -457,17 +453,11 @@ func (p *NativeProxy) restartProcess() error { // Stop stops the native process func (p *NativeProxy) Stop() error { - p.restartMu.Lock() - defer p.restartMu.Unlock() + p.startMu.Lock() + defer p.startMu.Unlock() p.stopped = true - p.clientMu.Lock() - if err := p.client.Close(); err != nil { - p.logger.Warn().Err(err).Msg("failed to close IPC client") - } - p.clientMu.Unlock() - if p.cmd.Process != nil { if err := p.cmd.Process.Kill(); err != nil { return fmt.Errorf("failed to kill native process: %w", err) diff --git a/internal/native/server.go b/internal/native/server.go index a2835f19..252648d6 100644 --- a/internal/native/server.go +++ b/internal/native/server.go @@ -10,6 +10,7 @@ import ( "github.com/caarlos0/env/v11" "github.com/erikdubbelboer/gspt" + "github.com/rs/zerolog" ) // Native Process @@ -21,6 +22,10 @@ var ( lastProcTitle string ) +const ( + DebugModeFile = "/userdata/jetkvm/.native-debug-mode" +) + func setProcTitle(status string) { lastProcTitle = status if status != "" { @@ -30,6 +35,21 @@ func setProcTitle(status string) { gspt.SetProcTitle(title) } +func monitorCrashSignal(logger *zerolog.Logger, nativeInstance NativeInterface) { + logger.Info().Msg("DEBUG mode: will crash the process on SIGHUP signal") + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGHUP) + + // non-blocking receive + select { + case <-sigChan: + logger.Info().Msg("received SIGHUP signal, emulating crash") + nativeInstance.DoNotUseThisIsForCrashTestingOnly() + default: + } +} + // RunNativeProcess runs the native process mode func RunNativeProcess(binaryName string) { logger := nativeLogger.With().Int("pid", os.Getpid()).Logger() @@ -85,18 +105,10 @@ func RunNativeProcess(binaryName string) { logger.Fatal().Err(err).Msg("failed to write handshake message to stdout") } - go func() { - sigChan := make(chan os.Signal, 1) - signal.Notify(sigChan, syscall.SIGHUP) - - // non-blocking receive - select { - case <-sigChan: - logger.Info().Msg("received SIGHUP signal, emulating crash") - nativeInstance.DoNotUseThisIsForCrashTestingOnly() - default: - } - }() + if _, err := os.Stat(DebugModeFile); err == nil { + logger.Info().Msg("DEBUG mode: enabled") + go monitorCrashSignal(&logger, nativeInstance) + } // Set up signal handling sigChan := make(chan os.Signal, 1)