mirror of https://github.com/jetkvm/kvm.git
feat: cancel all ongoing requests when closing the gRPC client
This commit is contained in:
parent
9c9c085690
commit
64ec70d030
|
|
@ -20,6 +20,9 @@ import (
|
|||
|
||||
// GRPCClient wraps the gRPC client for the native service
|
||||
type GRPCClient struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
||||
conn *grpc.ClientConn
|
||||
client pb.NativeServiceClient
|
||||
logger *zerolog.Logger
|
||||
|
|
@ -58,7 +61,11 @@ func NewGRPCClient(opts grpcClientOptions) (*GRPCClient, error) {
|
|||
|
||||
client := pb.NewNativeServiceClient(conn)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
grpcClient := &GRPCClient{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
conn: conn,
|
||||
client: client,
|
||||
logger: opts.Logger,
|
||||
|
|
@ -83,8 +90,8 @@ func (c *GRPCClient) handleEventStream(stream pb.NativeService_StreamEventsClien
|
|||
c.eventM.Unlock()
|
||||
}()
|
||||
|
||||
logger := *c.logger
|
||||
for {
|
||||
logger := c.logger.With().Interface("stream", stream).Logger()
|
||||
if stream == nil {
|
||||
logger.Error().Msg("event stream is nil")
|
||||
break
|
||||
|
|
@ -128,8 +135,7 @@ func (c *GRPCClient) startEventStream() {
|
|||
}
|
||||
c.closeM.Unlock()
|
||||
|
||||
ctx := context.Background()
|
||||
stream, err := c.client.StreamEvents(ctx, &pb.Empty{})
|
||||
stream, err := c.client.StreamEvents(c.ctx, &pb.Empty{})
|
||||
if err != nil {
|
||||
c.logger.Warn().Err(err).Msg("failed to start event stream, retrying ...")
|
||||
time.Sleep(5 * time.Second)
|
||||
|
|
@ -161,7 +167,7 @@ func (c *GRPCClient) checkIsReady(ctx context.Context) error {
|
|||
|
||||
// WaitReady waits for the gRPC connection to be ready
|
||||
func (c *GRPCClient) WaitReady() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
ctx, cancel := context.WithTimeout(c.ctx, 60*time.Second)
|
||||
defer cancel()
|
||||
|
||||
prevState := connectivity.Idle
|
||||
|
|
@ -243,6 +249,9 @@ func (c *GRPCClient) Close() error {
|
|||
}
|
||||
c.closed = true
|
||||
|
||||
// cancel all ongoing operations
|
||||
c.cancel()
|
||||
|
||||
close(c.eventDone)
|
||||
|
||||
c.eventM.Lock()
|
||||
|
|
|
|||
Loading…
Reference in New Issue