mirror of https://github.com/jetkvm/kvm.git
fix: multiple goroutines issues
This commit is contained in:
parent
6dee8a3e24
commit
9c0ac4e3d0
|
|
@ -82,6 +82,10 @@ func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) {
|
||||||
return grpcClient, nil
|
return grpcClient, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *GRPCClient) getContext() context.Context {
|
||||||
|
return c.ctx
|
||||||
|
}
|
||||||
|
|
||||||
func (c *GRPCClient) handleEventStream(stream pb.NativeService_StreamEventsClient) {
|
func (c *GRPCClient) handleEventStream(stream pb.NativeService_StreamEventsClient) {
|
||||||
c.eventM.Lock()
|
c.eventM.Lock()
|
||||||
c.eventStream = stream
|
c.eventStream = stream
|
||||||
|
|
@ -135,6 +139,14 @@ func (c *GRPCClient) startEventStream() {
|
||||||
}
|
}
|
||||||
c.closeM.Unlock()
|
c.closeM.Unlock()
|
||||||
|
|
||||||
|
// check if the context is done
|
||||||
|
select {
|
||||||
|
case <-c.ctx.Done():
|
||||||
|
c.logger.Info().Msg("event stream context done, closing")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
stream, err := c.client.StreamEvents(c.ctx, &pb.Empty{})
|
stream, err := c.client.StreamEvents(c.ctx, &pb.Empty{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.logger.Warn().Err(err).Msg("failed to start event stream, retrying ...")
|
c.logger.Warn().Err(err).Msg("failed to start event stream, retrying ...")
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
package native
|
package native
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -119,16 +120,15 @@ type NativeProxy struct {
|
||||||
videoStreamListener net.Listener
|
videoStreamListener net.Listener
|
||||||
binaryPath string
|
binaryPath string
|
||||||
|
|
||||||
client *GRPCClient
|
client *GRPCClient
|
||||||
clientMu sync.RWMutex
|
clientMu sync.RWMutex
|
||||||
cmd *cmdWrapper
|
cmd *cmdWrapper
|
||||||
logger *zerolog.Logger
|
logger *zerolog.Logger
|
||||||
ready chan struct{}
|
ready chan struct{}
|
||||||
options *nativeProxyOptions
|
options *nativeProxyOptions
|
||||||
restartMu sync.Mutex
|
restartMu sync.Mutex
|
||||||
restarts uint
|
restarts uint
|
||||||
stopped bool
|
stopped bool
|
||||||
processWait chan error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNativeProxy creates a new NativeProxy that spawns a separate process
|
// NewNativeProxy creates a new NativeProxy that spawns a separate process
|
||||||
|
|
@ -149,7 +149,6 @@ func NewNativeProxy(opts NativeOptions) (*NativeProxy, error) {
|
||||||
logger: nativeLogger,
|
logger: nativeLogger,
|
||||||
ready: make(chan struct{}),
|
ready: make(chan struct{}),
|
||||||
options: proxyOptions,
|
options: proxyOptions,
|
||||||
processWait: make(chan error, 1),
|
|
||||||
restarts: 0,
|
restarts: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -308,7 +307,7 @@ func (p *NativeProxy) setUpGRPCClient() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start monitoring process for crashes
|
// Start monitoring process for crashes
|
||||||
go p.monitorProcess()
|
go p.monitorProcess(client.getContext())
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -366,8 +365,14 @@ func (p *NativeProxy) Start() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// monitorProcess monitors the native process and restarts it if it crashes
|
// monitorProcess monitors the native process and restarts it if it crashes
|
||||||
func (p *NativeProxy) monitorProcess() {
|
func (p *NativeProxy) monitorProcess(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
p.restartMu.Lock()
|
p.restartMu.Lock()
|
||||||
cmd := p.cmd
|
cmd := p.cmd
|
||||||
stopped := p.stopped
|
stopped := p.stopped
|
||||||
|
|
@ -382,8 +387,11 @@ func (p *NativeProxy) monitorProcess() {
|
||||||
}
|
}
|
||||||
|
|
||||||
err := cmd.Wait()
|
err := cmd.Wait()
|
||||||
|
// check if the context is done
|
||||||
|
// if yes, it means that there should be already one restart in progress
|
||||||
select {
|
select {
|
||||||
case p.processWait <- err:
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue