mirror of https://github.com/jetkvm/kvm.git
Compare commits
3 Commits
35d8c3d9d1
...
87adfee033
| Author | SHA1 | Date |
|---|---|---|
|
|
87adfee033 | |
|
|
b49d67c87d | |
|
|
621be3c1d9 |
|
|
@ -30,13 +30,15 @@ type LLDP struct {
|
||||||
advertiseOptions *AdvertiseOptions
|
advertiseOptions *AdvertiseOptions
|
||||||
onChange func(neighbors []Neighbor)
|
onChange func(neighbors []Neighbor)
|
||||||
|
|
||||||
neighbors *ttlcache.Cache[string, Neighbor]
|
neighbors *ttlcache.Cache[neighborCacheKey, Neighbor]
|
||||||
|
|
||||||
// State tracking
|
// State tracking
|
||||||
rxRunning bool
|
|
||||||
txRunning bool
|
txRunning bool
|
||||||
txCtx context.Context
|
txCtx context.Context
|
||||||
txCancel context.CancelFunc
|
txCancel context.CancelFunc
|
||||||
|
|
||||||
|
rxRunning bool
|
||||||
|
rxWaitGroup *sync.WaitGroup
|
||||||
rxCtx context.Context
|
rxCtx context.Context
|
||||||
rxCancel context.CancelFunc
|
rxCancel context.CancelFunc
|
||||||
}
|
}
|
||||||
|
|
@ -72,8 +74,9 @@ func NewLLDP(opts *Options) *LLDP {
|
||||||
advertiseOptions: opts.AdvertiseOptions,
|
advertiseOptions: opts.AdvertiseOptions,
|
||||||
enableRx: opts.EnableRx,
|
enableRx: opts.EnableRx,
|
||||||
enableTx: opts.EnableTx,
|
enableTx: opts.EnableTx,
|
||||||
|
rxWaitGroup: &sync.WaitGroup{},
|
||||||
l: opts.Logger,
|
l: opts.Logger,
|
||||||
neighbors: ttlcache.New(ttlcache.WithTTL[string, Neighbor](1 * time.Hour)),
|
neighbors: ttlcache.New(ttlcache.WithTTL[neighborCacheKey, Neighbor](1 * time.Hour)),
|
||||||
onChange: opts.OnChange,
|
onChange: opts.OnChange,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package lldp
|
package lldp
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -27,22 +26,28 @@ type Neighbor struct {
|
||||||
Values map[string]string `json:"values"`
|
Values map[string]string `json:"values"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Neighbor) cacheKey() string {
|
type neighborCacheKey struct {
|
||||||
return fmt.Sprintf("%s-%s", n.Mac, n.Source)
|
mac string
|
||||||
|
source string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Neighbor) cacheKey() neighborCacheKey {
|
||||||
|
return neighborCacheKey{mac: n.Mac, source: n.Source}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) {
|
func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) {
|
||||||
logger := l.l.With().
|
logger := l.l.With().
|
||||||
|
Str("source", neighbor.Source).
|
||||||
Str("mac", neighbor.Mac).
|
Str("mac", neighbor.Mac).
|
||||||
Interface("neighbor", neighbor).
|
Interface("neighbor", neighbor).
|
||||||
Logger()
|
Logger()
|
||||||
|
|
||||||
key := neighbor.cacheKey()
|
key := neighbor.cacheKey()
|
||||||
|
|
||||||
current_neigh := l.neighbors.Get(key)
|
currentNeighbor := l.neighbors.Get(key)
|
||||||
if current_neigh != nil {
|
if currentNeighbor != nil {
|
||||||
current_source := current_neigh.Value().Source
|
currentSource := currentNeighbor.Value().Source
|
||||||
if current_source == "lldp" && neighbor.Source != "lldp" {
|
if currentSource == "lldp" && neighbor.Source != "lldp" {
|
||||||
logger.Info().Msg("skip updating neighbor, as LLDP has higher priority")
|
logger.Info().Msg("skip updating neighbor, as LLDP has higher priority")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -56,6 +61,7 @@ func (l *LLDP) addNeighbor(neighbor *Neighbor, ttl time.Duration) {
|
||||||
|
|
||||||
func (l *LLDP) deleteNeighbor(neighbor *Neighbor) {
|
func (l *LLDP) deleteNeighbor(neighbor *Neighbor) {
|
||||||
logger := l.l.With().
|
logger := l.l.With().
|
||||||
|
Str("source", neighbor.Source).
|
||||||
Str("mac", neighbor.Mac).
|
Str("mac", neighbor.Mac).
|
||||||
Logger()
|
Logger()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -87,51 +87,47 @@ func (l *LLDP) setUpCapture() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LLDP) doCapture(logger *zerolog.Logger, rxCtx context.Context) {
|
func (l *LLDP) doCapture(logger *zerolog.Logger) {
|
||||||
defer func() {
|
l.rxWaitGroup.Add(1)
|
||||||
l.mu.Lock()
|
defer l.rxWaitGroup.Done()
|
||||||
l.rxRunning = false
|
|
||||||
l.mu.Unlock()
|
|
||||||
}()
|
|
||||||
|
|
||||||
// TODO: use a channel to handle the packets
|
// TODO: use a channel to handle the packets
|
||||||
// PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet
|
// PacketSource.Packets() is not reliable and can cause panics and the upstream hasn't fixed it yet
|
||||||
for rxCtx.Err() == nil {
|
for {
|
||||||
if l.pktSourceRx == nil || l.tPacketRx == nil {
|
// check if the context is done before blocking call
|
||||||
logger.Error().Msg("packet source or TPacketRx not initialized")
|
select {
|
||||||
break
|
case <-l.rxCtx.Done():
|
||||||
|
logger.Info().Msg("RX context cancelled")
|
||||||
|
return
|
||||||
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Trace().Msg("waiting for next packet")
|
||||||
packet, err := l.pktSourceRx.NextPacket()
|
packet, err := l.pktSourceRx.NextPacket()
|
||||||
if err == nil {
|
logger.Trace().Interface("packet", packet).Err(err).Msg("got next packet")
|
||||||
if handleErr := l.handlePacket(packet, logger); handleErr != nil {
|
|
||||||
logger.Error().
|
|
||||||
Err(handleErr).
|
|
||||||
Msg("error handling packet")
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Immediately retry for temporary network errors and EAGAIN
|
if err != nil {
|
||||||
// temporary has been deprecated and most cases are timeouts
|
logger.Error().
|
||||||
if nerr, ok := err.(net.Error); ok && nerr.Timeout() {
|
Err(err).
|
||||||
continue
|
Msg("error getting next packet")
|
||||||
}
|
|
||||||
if err == syscall.EAGAIN {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Immediately break for known unrecoverable errors
|
// Immediately break for known unrecoverable errors
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF ||
|
if err == io.EOF || err == io.ErrUnexpectedEOF ||
|
||||||
err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer ||
|
err == io.ErrNoProgress || err == io.ErrClosedPipe || err == io.ErrShortBuffer ||
|
||||||
err == syscall.EBADF ||
|
err == syscall.EBADF ||
|
||||||
strings.Contains(err.Error(), "use of closed file") {
|
strings.Contains(err.Error(), "use of closed file") {
|
||||||
break
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := l.handlePacket(packet, logger); err != nil {
|
||||||
logger.Error().
|
logger.Error().
|
||||||
Err(err).
|
Err(err).
|
||||||
Msg("error receiving LLDP packet")
|
Msg("error handling packet")
|
||||||
|
continue
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -158,9 +154,7 @@ func (l *LLDP) startCapture() error {
|
||||||
l.rxCtx, l.rxCancel = context.WithCancel(context.Background())
|
l.rxCtx, l.rxCancel = context.WithCancel(context.Background())
|
||||||
l.rxRunning = true
|
l.rxRunning = true
|
||||||
|
|
||||||
// Capture context in closure
|
go l.doCapture(&logger)
|
||||||
rxCtx := l.rxCtx
|
|
||||||
go l.doCapture(&logger, rxCtx)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -369,8 +363,24 @@ func (l *LLDP) stopCapture() error {
|
||||||
logger.Info().Msg("cancelled RX context, waiting for goroutine to finish")
|
logger.Info().Msg("cancelled RX context, waiting for goroutine to finish")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait a bit for goroutine to finish
|
// stop the TPacketRx
|
||||||
time.Sleep(500 * time.Millisecond)
|
go func() {
|
||||||
|
if l.tPacketRx == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// write an empty packet to the TPacketRx to interrupt the blocking read
|
||||||
|
// it's a shitty workaround until https://github.com/google/gopacket/pull/777 is merged,
|
||||||
|
// or we have a better solution, see https://github.com/google/gopacket/issues/1064
|
||||||
|
l.tPacketRx.WritePacketData([]byte{})
|
||||||
|
}()
|
||||||
|
|
||||||
|
// wait for the goroutine to finish
|
||||||
|
start := time.Now()
|
||||||
|
l.rxWaitGroup.Wait()
|
||||||
|
logger.Info().Dur("duration", time.Since(start)).Msg("RX goroutine finished")
|
||||||
|
|
||||||
|
l.rxRunning = false
|
||||||
|
|
||||||
if l.tPacketRx != nil {
|
if l.tPacketRx != nil {
|
||||||
logger.Info().Msg("closing TPacketRx")
|
logger.Info().Msg("closing TPacketRx")
|
||||||
|
|
|
||||||
|
|
@ -60,10 +60,10 @@ var (
|
||||||
func toLLDPCapabilitiesBytes(capabilities []string) uint16 {
|
func toLLDPCapabilitiesBytes(capabilities []string) uint16 {
|
||||||
r := uint16(0)
|
r := uint16(0)
|
||||||
for _, capability := range capabilities {
|
for _, capability := range capabilities {
|
||||||
if _, ok := capabilityMap[capability]; !ok {
|
mask, ok := capabilityMap[capability]
|
||||||
continue
|
if ok {
|
||||||
|
r |= mask
|
||||||
}
|
}
|
||||||
r |= capabilityMap[capability]
|
|
||||||
}
|
}
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue