Compare commits

..

3 Commits

Author SHA1 Message Date
Aveline 87adfee033
Merge b49d67c87d into 36f06a064a 2025-11-06 23:40:54 +01:00
Siyuan b49d67c87d fix: RX goroutine cleanup 2025-11-06 22:40:44 +00:00
Siyuan 621be3c1d9 fix: PR issues 2025-11-06 21:54:56 +00:00
4 changed files with 72 additions and 53 deletions

View File

@ -30,15 +30,17 @@ type LLDP struct {
advertiseOptions *AdvertiseOptions advertiseOptions *AdvertiseOptions
onChange func(neighbors []Neighbor) onChange func(neighbors []Neighbor)
neighbors *ttlcache.Cache[string, Neighbor] neighbors *ttlcache.Cache[neighborCacheKey, Neighbor]
// State tracking // State tracking
rxRunning bool
txRunning bool txRunning bool
txCtx context.Context txCtx context.Context
txCancel context.CancelFunc txCancel context.CancelFunc
rxCtx context.Context
rxCancel context.CancelFunc rxRunning bool
rxWaitGroup *sync.WaitGroup
rxCtx context.Context
rxCancel context.CancelFunc
} }
type AdvertiseOptions struct { type AdvertiseOptions struct {
@ -72,8 +74,9 @@ func NewLLDP(opts *Options) *LLDP {
advertiseOptions: opts.AdvertiseOptions, advertiseOptions: opts.AdvertiseOptions,
enableRx: opts.EnableRx, enableRx: opts.EnableRx,
enableTx: opts.EnableTx, enableTx: opts.EnableTx,
rxWaitGroup: &sync.WaitGroup{},
l: opts.Logger, l: opts.Logger,
neighbors: ttlcache.New(ttlcache.WithTTL[string, Neighbor](1 * time.Hour)), neighbors: ttlcache.New(ttlcache.WithTTL[neighborCacheKey, Neighbor](1 * time.Hour)),
onChange: opts.OnChange, onChange: opts.OnChange,
} }
} }

View File

@ -1,7 +1,6 @@
package lldp package lldp
import ( import (
"fmt"
"time" "time"
) )
@ -27,22 +26,28 @@ type Neighbor struct {
Values map[string]string `json:"values"` Values map[string]string `json:"values"`
} }
func (n *Neighbor) cacheKey() string { type neighborCacheKey struct {
return fmt.Sprintf("%s-%s", n.Mac, n.Source) mac string
source string
}
func (n *Neighbor) cacheKey() neighborCacheKey {
return neighborCacheKey{mac: n.Mac, source: n.Source}
} }
func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) { func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) {
logger := l.l.With(). logger := l.l.With().
Str("source", neighbor.Source).
Str("mac", neighbor.Mac). Str("mac", neighbor.Mac).
Interface("neighbor", neighbor). Interface("neighbor", neighbor).
Logger() Logger()
key := neighbor.cacheKey() key := neighbor.cacheKey()
current_neigh := l.neighbors.Get(key) currentNeighbor := l.neighbors.Get(key)
if current_neigh != nil { if currentNeighbor != nil {
current_source := current_neigh.Value().Source currentSource := currentNeighbor.Value().Source
if current_source == "lldp" && neighbor.Source != "lldp" { if currentSource == "lldp" && neighbor.Source != "lldp" {
logger.Info().Msg("skip updating neighbor, as LLDP has higher priority") logger.Info().Msg("skip updating neighbor, as LLDP has higher priority")
return return
} }
@ -56,6 +61,7 @@ func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) {
func (l *LLDP) deleteNeighbor(neighbor *Neighbor) { func (l *LLDP) deleteNeighbor(neighbor *Neighbor) {
logger := l.l.With(). logger := l.l.With().
Str("source", neighbor.Source).
Str("mac", neighbor.Mac). Str("mac", neighbor.Mac).
Logger() Logger()

View File

@ -87,51 +87,47 @@ func (l *LLDP) setUpCapture() error {
return nil return nil
} }
func (l *LLDP) doCapture(logger *zerolog.Logger, rxCtx context.Context) { func (l *LLDP) doCapture(logger *zerolog.Logger) {
defer func() { l.rxWaitGroup.Add(1)
l.mu.Lock() defer l.rxWaitGroup.Done()
l.rxRunning = false
l.mu.Unlock()
}()
// TODO: use a channel to handle the packets // TODO: use a channel to handle the packets
// PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet // PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet
for rxCtx.Err() == nil { for {
if l.pktSourceRx == nil || l.tPacketRx == nil { // check if the context is done before blocking call
logger.Error().Msg("packet source or TPacketRx not initialized") select {
break case <-l.rxCtx.Done():
logger.Info().Msg("RX context cancelled")
return
default:
} }
logger.Trace().Msg("waiting for next packet")
packet, err := l.pktSourceRx.NextPacket() packet, err := l.pktSourceRx.NextPacket()
if err == nil { logger.Trace().Interface("packet", packet).Err(err).Msg("got next packet")
if handleErr := l.handlePacket(packet, logger); handleErr != nil {
logger.Error(). if err != nil {
Err(handleErr). logger.Error().
Msg("error handling packet") Err(err).
Msg("error getting next packet")
// Immediately break for known unrecoverable errors
if err == io.EOF || err == io.ErrUnexpectedEOF ||
err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer ||
err == syscall.EBADF ||
strings.Contains(err.Error(), "use of closed file") {
return
} }
continue continue
} }
// Immediately retry for temporary network errors and EAGAIN if err := l.handlePacket(packet, logger); err != nil {
// temporary has been deprecated and most cases are timeouts logger.Error().
if nerr, ok := err.(net.Error); ok && nerr.Timeout() { Err(err).
Msg("error handling packet")
continue continue
} }
if err == syscall.EAGAIN {
continue
}
// Immediately break for known unrecoverable errors
if err == io.EOF || err == io.ErrUnexpectedEOF ||
err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer ||
err == syscall.EBADF ||
strings.Contains(err.Error(), "use of closed file") {
break
}
logger.Error().
Err(err).
Msg("error receiving LLDP packet")
} }
} }
@ -158,9 +154,7 @@ func (l *LLDP) startCapture() error {
l.rxCtx, l.rxCancel = context.WithCancel(context.Background()) l.rxCtx, l.rxCancel = context.WithCancel(context.Background())
l.rxRunning = true l.rxRunning = true
// Capture context in closure go l.doCapture(&logger)
rxCtx := l.rxCtx
go l.doCapture(&logger, rxCtx)
return nil return nil
} }
@ -369,8 +363,24 @@ func (l *LLDP) stopCapture() error {
logger.Info().Msg("cancelled RX context, waiting for goroutine to finish") logger.Info().Msg("cancelled RX context, waiting for goroutine to finish")
} }
// Wait a bit for goroutine to finish // stop the TPacketRx
time.Sleep(500 * time.Millisecond) go func() {
if l.tPacketRx == nil {
return
}
// write an empty packet to the TPacketRx to interrupt the blocking read
// it's a shitty workaround until https://github.com/google/gopacket/pull/777 is merged,
// or we have a better solution, see https://github.com/google/gopacket/issues/1064
l.tPacketRx.WritePacketData([]byte{})
}()
// wait for the goroutine to finish
start := time.Now()
l.rxWaitGroup.Wait()
logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished")
l.rxRunning = false
if l.tPacketRx != nil { if l.tPacketRx != nil {
logger.Info().Msg("closing TPacketRx") logger.Info().Msg("closing TPacketRx")

View File

@ -60,10 +60,10 @@ var (
func toLLDPCapabilitiesBytes(capabilities []string) uint16 { func toLLDPCapabilitiesBytes(capabilities []string) uint16 {
r := uint16(0) r := uint16(0)
for _, capability := range capabilities { for _, capability := range capabilities {
if _, ok := capabilityMap[capability]; !ok { mask, ok := capabilityMap[capability]
continue if ok {
r |= mask
} }
r |= capabilityMap[capability]
} }
return r return r
} }