refactor(cgo-rpc): rewrite the monitor process logic

This commit is contained in:
Siyuan 2025-11-18 11:56:51 +00:00
parent 9c0ac4e3d0
commit f3b854c407
2 changed files with 79 additions and 77 deletions

View File

@ -120,15 +120,20 @@ type NativeProxy struct {
videoStreamListener net.Listener
binaryPath string
client *GRPCClient
clientMu sync.RWMutex
cmd *cmdWrapper
logger *zerolog.Logger
ready chan struct{}
options *nativeProxyOptions
restartMu sync.Mutex
restarts uint
stopped bool
startMu sync.Mutex // mutex for the start process (context and isStopped)
ctx context.Context
cancel context.CancelFunc
client *GRPCClient
clientMu sync.RWMutex // mutex for the client
cmd *cmdWrapper
cmdMu sync.Mutex // mutex for the cmd
logger *zerolog.Logger
options *nativeProxyOptions
restarts uint
stopped bool
}
// NewNativeProxy creates a new NativeProxy that spawns a separate process
@ -147,7 +152,6 @@ func NewNativeProxy(opts NativeOptions) (*NativeProxy, error) {
videoStreamUnixSocket: proxyOptions.VideoStreamUnixSocket,
binaryPath: exePath,
logger: nativeLogger,
ready: make(chan struct{}),
options: proxyOptions,
restarts: 0,
}
@ -264,10 +268,8 @@ func (p *NativeProxy) handleVideoFrame(conn net.Conn) {
}
}
// it should be only called by start() method, as it isn't thread-safe
func (p *NativeProxy) setUpGRPCClient() error {
p.clientMu.Lock()
defer p.clientMu.Unlock()
// wait until handshake completed
select {
case <-p.cmd.stdoutHandler.handshakeCh:
@ -306,13 +308,13 @@ func (p *NativeProxy) setUpGRPCClient() error {
go p.options.OnNativeRestart()
}
// Start monitoring process for crashes
go p.monitorProcess(client.getContext())
return nil
}
func (p *NativeProxy) start() error {
func (p *NativeProxy) doStart() error {
p.cmdMu.Lock()
defer p.cmdMu.Unlock()
// lock OS thread to prevent the process from being moved to a different thread
// see also https://go.dev/issue/27505
runtime.LockOSThread()
@ -345,8 +347,10 @@ func (p *NativeProxy) start() error {
// Start starts the native process
func (p *NativeProxy) Start() error {
p.restartMu.Lock()
defer p.restartMu.Unlock()
p.startMu.Lock()
defer p.startMu.Unlock()
p.ctx, p.cancel = context.WithCancel(context.Background())
if p.stopped {
return fmt.Errorf("proxy is stopped")
@ -356,51 +360,46 @@ func (p *NativeProxy) Start() error {
return fmt.Errorf("failed to start video stream listener: %w", err)
}
if err := p.start(); err != nil {
if err := p.doStart(); err != nil {
return fmt.Errorf("failed to start native process: %w", err)
}
close(p.ready)
go p.monitorProcess()
return nil
}
// monitorProcess monitors the native process and restarts it if it crashes
func (p *NativeProxy) monitorProcess(ctx context.Context) {
func (p *NativeProxy) monitorProcess() {
for {
select {
case <-ctx.Done():
return
default:
}
p.restartMu.Lock()
cmd := p.cmd
stopped := p.stopped
p.restartMu.Unlock()
if stopped {
return
}
if cmd == nil {
return
}
err := cmd.Wait()
// check if the context is done
// if yes, it means that there should be already one restart in progress
select {
case <-ctx.Done():
return
default:
}
p.restartMu.Lock()
if p.stopped {
p.restartMu.Unlock()
return
}
p.restartMu.Unlock()
select {
case <-p.ctx.Done():
p.logger.Trace().Msg("context done, stopping monitor process [before wait]")
return
default:
}
p.cmdMu.Lock()
err := fmt.Errorf("native process not started")
if p.cmd != nil {
err = p.cmd.Wait()
}
p.cmdMu.Unlock()
if p.stopped {
return
}
select {
case <-p.ctx.Done():
p.logger.Trace().Msg("context done, stopping monitor process [after wait]")
return
default:
}
p.logger.Warn().Err(err).Msg("native process exited, restarting ...")
@ -419,9 +418,6 @@ func (p *NativeProxy) monitorProcess(ctx context.Context) {
// restartProcess restarts the native process
func (p *NativeProxy) restartProcess() error {
p.restartMu.Lock()
defer p.restartMu.Unlock()
p.restarts++
logger := p.logger.With().Uint("attempt", p.restarts).Uint("maxAttempts", p.options.MaxRestartAttempts).Logger()
@ -446,7 +442,7 @@ func (p *NativeProxy) restartProcess() error {
logger.Info().Msg("gRPC client closed")
logger.Info().Msg("attempting to restart native process")
if err := p.start(); err != nil {
if err := p.doStart(); err != nil {
logger.Error().Err(err).Msg("failed to start native process")
return fmt.Errorf("failed to start native process: %w", err)
}
@ -457,17 +453,11 @@ func (p *NativeProxy) restartProcess() error {
// Stop stops the native process
func (p *NativeProxy) Stop() error {
p.restartMu.Lock()
defer p.restartMu.Unlock()
p.startMu.Lock()
defer p.startMu.Unlock()
p.stopped = true
p.clientMu.Lock()
if err := p.client.Close(); err != nil {
p.logger.Warn().Err(err).Msg("failed to close IPC client")
}
p.clientMu.Unlock()
if p.cmd.Process != nil {
if err := p.cmd.Process.Kill(); err != nil {
return fmt.Errorf("failed to kill native process: %w", err)

View File

@ -10,6 +10,7 @@ import (
"github.com/caarlos0/env/v11"
"github.com/erikdubbelboer/gspt"
"github.com/rs/zerolog"
)
// Native Process
@ -21,6 +22,10 @@ var (
lastProcTitle string
)
const (
DebugModeFile = "/userdata/jetkvm/.native-debug-mode"
)
func setProcTitle(status string) {
lastProcTitle = status
if status != "" {
@ -30,6 +35,21 @@ func setProcTitle(status string) {
gspt.SetProcTitle(title)
}
func monitorCrashSignal(logger *zerolog.Logger, nativeInstance NativeInterface) {
logger.Info().Msg("DEBUG mode: will crash the process on SIGHUP signal")
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
// non-blocking receive
select {
case <-sigChan:
logger.Info().Msg("received SIGHUP signal, emulating crash")
nativeInstance.DoNotUseThisIsForCrashTestingOnly()
default:
}
}
// RunNativeProcess runs the native process mode
func RunNativeProcess(binaryName string) {
logger := nativeLogger.With().Int("pid", os.Getpid()).Logger()
@ -85,18 +105,10 @@ func RunNativeProcess(binaryName string) {
logger.Fatal().Err(err).Msg("failed to write handshake message to stdout")
}
go func() {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
// non-blocking receive
select {
case <-sigChan:
logger.Info().Msg("received SIGHUP signal, emulating crash")
nativeInstance.DoNotUseThisIsForCrashTestingOnly()
default:
}
}()
if _, err := os.Stat(DebugModeFile); err == nil {
logger.Info().Msg("DEBUG mode: enabled")
go monitorCrashSignal(&logger, nativeInstance)
}
// Set up signal handling
sigChan := make(chan os.Signal, 1)