mirror of https://github.com/jetkvm/kvm.git
fix: RX goroutine cleanup
This commit is contained in:
parent
621be3c1d9
commit
b49d67c87d
|
|
@ -87,23 +87,25 @@ func (l *LLDP) setUpCapture() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LLDP) doCapture(logger *zerolog.Logger, rxCtx context.Context) {
|
func (l *LLDP) doCapture(logger *zerolog.Logger) {
|
||||||
defer func() {
|
|
||||||
l.mu.Lock()
|
|
||||||
l.rxRunning = false
|
|
||||||
l.mu.Unlock()
|
|
||||||
|
|
||||||
logger.Info().Msg("RX goroutine finished")
|
|
||||||
|
|
||||||
l.rxWaitGroup.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
l.rxWaitGroup.Add(1)
|
l.rxWaitGroup.Add(1)
|
||||||
|
defer l.rxWaitGroup.Done()
|
||||||
|
|
||||||
// TODO: use a channel to handle the packets
|
// TODO: use a channel to handle the packets
|
||||||
// PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet
|
// PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet
|
||||||
for rxCtx.Err() == nil {
|
for {
|
||||||
|
// check if the context is done before blocking call
|
||||||
|
select {
|
||||||
|
case <-l.rxCtx.Done():
|
||||||
|
logger.Info().Msg("RX context cancelled")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.Trace().Msg("waiting for next packet")
|
||||||
packet, err := l.pktSourceRx.NextPacket()
|
packet, err := l.pktSourceRx.NextPacket()
|
||||||
|
logger.Trace().Interface("packet", packet).Err(err).Msg("got next packet")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error().
|
logger.Error().
|
||||||
Err(err).
|
Err(err).
|
||||||
|
|
@ -152,9 +154,7 @@ func (l *LLDP) startCapture() error {
|
||||||
l.rxCtx, l.rxCancel = context.WithCancel(context.Background())
|
l.rxCtx, l.rxCancel = context.WithCancel(context.Background())
|
||||||
l.rxRunning = true
|
l.rxRunning = true
|
||||||
|
|
||||||
// Capture context in closure
|
go l.doCapture(&logger)
|
||||||
rxCtx := l.rxCtx
|
|
||||||
go l.doCapture(&logger, rxCtx)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -363,11 +363,25 @@ func (l *LLDP) stopCapture() error {
|
||||||
logger.Info().Msg("cancelled RX context, waiting for goroutine to finish")
|
logger.Info().Msg("cancelled RX context, waiting for goroutine to finish")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stop the TPacketRx
|
||||||
|
go func() {
|
||||||
|
if l.tPacketRx == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// write an empty packet to the TPacketRx to interrupt the blocking read
|
||||||
|
// it's a shitty workaround until https://github.com/google/gopacket/pull/777 is merged,
|
||||||
|
// or we have a better solution, see https://github.com/google/gopacket/issues/1064
|
||||||
|
l.tPacketRx.WritePacketData([]byte{})
|
||||||
|
}()
|
||||||
|
|
||||||
// wait for the goroutine to finish
|
// wait for the goroutine to finish
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
l.rxWaitGroup.Wait()
|
l.rxWaitGroup.Wait()
|
||||||
logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished")
|
logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished")
|
||||||
|
|
||||||
|
l.rxRunning = false
|
||||||
|
|
||||||
if l.tPacketRx != nil {
|
if l.tPacketRx != nil {
|
||||||
logger.Info().Msg("closing TPacketRx")
|
logger.Info().Msg("closing TPacketRx")
|
||||||
l.tPacketRx.Close()
|
l.tPacketRx.Close()
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue