Skip to content

Commit

Permalink
Fix how decoding errors are handled for UDP clients in collector (#352)
Browse files Browse the repository at this point in the history
Previously, in case of a decoding error (e.g., an unknown template ID in
a data set), the goroutine responsible for decoding messages would be
stopped, but the client would not be deleted. This would lead to a
deadlock as subsequent messages could not be written to the packetChan
channel for the client, effectively breaking the UDP collecting process
completely. To avoid this issue, we log an error when decoding fails,
but keep the client alive. Note that for IPFIX over UDP, there is no way
to notify a single UDP client of an error (with TCP, we can close the
connection if we want). This is also why the template refresh mechanism
exists.

We also fix possible race conditions in the UDP collecting process: all
access to the clients map should be protected by locking the mutex.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Aug 22, 2024
1 parent 621ab6a commit a4ae35d
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 65 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/pion/dtls/v2 v2.2.4
github.com/spf13/cobra v1.6.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.9.0
go.uber.org/mock v0.3.0
google.golang.org/protobuf v1.33.0
k8s.io/apimachinery v0.24.9
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
Expand Down
23 changes: 2 additions & 21 deletions pkg/collector/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ type CollectorInput struct {
}

type clientHandler struct {
packetChan chan *bytes.Buffer
packetChan chan *bytes.Buffer
closeClientChan chan struct{}
}

func InitCollectingProcess(input CollectorInput) (*CollectingProcess, error) {
Expand Down Expand Up @@ -132,8 +133,6 @@ func (cp *CollectingProcess) GetMsgChan() <-chan *entities.Message {
}

func (cp *CollectingProcess) CloseMsgChan() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
close(cp.messageChan)
}

Expand All @@ -155,24 +154,6 @@ func (cp *CollectingProcess) incrementNumRecordsReceived() {
cp.numOfRecordsReceived = cp.numOfRecordsReceived + 1
}

func (cp *CollectingProcess) createClient() *clientHandler {
return &clientHandler{
packetChan: make(chan *bytes.Buffer),
}
}

func (cp *CollectingProcess) addClient(address string, client *clientHandler) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.clients[address] = client
}

func (cp *CollectingProcess) deleteClient(name string) {
cp.mutex.Lock()
defer cp.mutex.Unlock()
delete(cp.clients, name)
}

func (cp *CollectingProcess) decodePacket(packetBuffer *bytes.Buffer, exportAddress string) (*entities.Message, error) {
var length, version, setID, setLen uint16
var exportTime, sequencNum, obsDomainID uint32
Expand Down
42 changes: 42 additions & 0 deletions pkg/collector/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,48 @@ func TestUDPCollectingProcess_ConcurrentClient(t *testing.T) {
cp.Stop()
}

func TestUDPCollectingProcess_DecodePacketError(t *testing.T) {
input := getCollectorInput(udpTransport, false, false)
cp, _ := InitCollectingProcess(input)
go cp.Start()
// wait until collector is ready
waitForCollectorReady(t, cp)
collectorAddr := cp.GetAddress()
resolveAddr, err := net.ResolveUDPAddr(collectorAddr.Network(), collectorAddr.String())
if err != nil {
t.Errorf("UDP Address cannot be resolved.")
}

defer cp.CloseMsgChan()
go func() {
// consume all messages to avoid blocking
ch := cp.GetMsgChan()
for range ch {
}
}()

conn, err := net.DialUDP(udpTransport, nil, resolveAddr)
if err != nil {
t.Errorf("UDP Collecting Process does not start correctly.")
}
defer conn.Close()
// write data packet before template, decodePacket should fail
conn.Write(validDataPacket)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, int64(1), cp.GetNumConnToCollector())
}, 100*time.Millisecond, 10*time.Millisecond)
time.Sleep(10 * time.Millisecond)
assert.Zero(t, cp.GetNumRecordsReceived())

conn.Write(validTemplatePacket)
assert.EventuallyWithT(t, func(t *assert.CollectT) {
assert.Equal(t, int64(1), cp.GetNumConnToCollector())
assert.Equal(t, int64(1), cp.GetNumRecordsReceived())
}, 100*time.Millisecond, 10*time.Millisecond)

cp.Stop()
}

func TestCollectingProcess_DecodeTemplateRecord(t *testing.T) {
cp := CollectingProcess{}
cp.templatesMap = make(map[uint32]map[uint16][]*entities.InfoElement)
Expand Down
29 changes: 22 additions & 7 deletions pkg/collector/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ func (cp *CollectingProcess) startTCPServer() {
}
}
cp.wg.Add(1)
go cp.handleTCPClient(conn)
go func() {
defer cp.wg.Done()
cp.handleTCPClient(conn)
}()
}
}(cp.stopChan)
<-cp.stopChan
Expand All @@ -63,12 +66,24 @@ func (cp *CollectingProcess) startTCPServer() {

func (cp *CollectingProcess) handleTCPClient(conn net.Conn) {
address := conn.RemoteAddr().String()
client := cp.createClient()
cp.addClient(address, client)
defer cp.wg.Done()
// The channels stored in clientHandler are not needed for the TCP client, so we do not
// initialize them.
client := &clientHandler{}
func() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
cp.clients[address] = client
}()
defer func() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
delete(cp.clients, address)
}()
defer conn.Close()
reader := bufio.NewReader(conn)
cp.wg.Add(1)
go func() {
reader := bufio.NewReader(conn)
defer cp.wg.Done()
for {
length, err := getMessageLength(reader)
if errors.Is(err, io.EOF) {
Expand All @@ -77,18 +92,18 @@ func (cp *CollectingProcess) handleTCPClient(conn net.Conn) {
}
if err != nil {
klog.ErrorS(err, "Error when retrieving message length")
cp.deleteClient(address)
return
}
buff := make([]byte, length)
_, err = io.ReadFull(reader, buff)
if err != nil {
klog.ErrorS(err, "Error when reading the message")
cp.deleteClient(address)
return
}
message, err := cp.decodePacket(bytes.NewBuffer(buff), address)
if err != nil {
// TODO: should we close the connection instead and force the client to
// re-open it?
klog.ErrorS(err, "Error when decoding packet")
continue
}
Expand Down
105 changes: 70 additions & 35 deletions pkg/collector/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ func (cp *CollectingProcess) startUDPServer() {
return
}
defer conn.Close()
cp.wg.Add(1)
go func() {
defer cp.wg.Done()
buff := make([]byte, cp.maxBufferSize)
for {
size, err := conn.Read(buff)
Expand All @@ -80,10 +82,9 @@ func (cp *CollectingProcess) startUDPServer() {
return
}
klog.V(2).Infof("Receiving %d bytes from %s", size, address.String())
cp.handleUDPClient(address)
buffBytes := make([]byte, size)
copy(buffBytes, buff[0:size])
cp.clients[address.String()].packetChan <- bytes.NewBuffer(buffBytes)
cp.handleUDPMessage(address, buffBytes)
}
}()
} else { // use udp
Expand All @@ -95,7 +96,9 @@ func (cp *CollectingProcess) startUDPServer() {
cp.updateAddress(conn.LocalAddr())
klog.Infof("Start UDP collecting process on %s", cp.netAddress)
defer conn.Close()
cp.wg.Add(1)
go func() {
defer cp.wg.Done()
for {
buff := make([]byte, cp.maxBufferSize)
size, address, err := conn.ReadFromUDP(buff)
Expand All @@ -107,45 +110,77 @@ func (cp *CollectingProcess) startUDPServer() {
return
}
klog.V(2).Infof("Receiving %d bytes from %s", size, address.String())
cp.handleUDPClient(address)
cp.clients[address.String()].packetChan <- bytes.NewBuffer(buff[0:size])
cp.handleUDPMessage(address, buff[0:size])
}
}()
}
<-cp.stopChan
}

func (cp *CollectingProcess) handleUDPClient(address net.Addr) {
if _, exist := cp.clients[address.String()]; !exist {
client := cp.createClient()
cp.addClient(address.String(), client)
cp.wg.Add(1)
go func() {
defer cp.wg.Done()
ticker := time.NewTicker(time.Duration(entities.TemplateRefreshTimeOut) * time.Second)
for {
select {
case <-cp.stopChan:
klog.Infof("Collecting process from %s has stopped.", address.String())
cp.deleteClient(address.String())
return
case <-ticker.C: // set timeout for udp connection
klog.Errorf("UDP connection from %s timed out.", address.String())
cp.deleteClient(address.String())
return
case packet := <-client.packetChan:
// get the message here
message, err := cp.decodePacket(packet, address.String())
if err != nil {
klog.Error(err)
return
}
klog.V(4).Infof("Processed message from exporter %v, number of records: %v, observation domain ID: %v",
message.GetExportAddress(), message.GetSet().GetNumberOfRecords(), message.GetObsDomainID())
ticker.Stop()
ticker = time.NewTicker(time.Duration(entities.TemplateRefreshTimeOut) * time.Second)
func (cp *CollectingProcess) handleUDPMessage(address net.Addr, buf []byte) {
addr := address.String()
client := func() *clientHandler {
cp.mutex.Lock()
defer cp.mutex.Unlock()
if client, ok := cp.clients[addr]; ok {
return client
}
return cp.createUDPClient(addr)
}()
// closeClientChan is necessary to make sure that there is no possibility of deadlock when
// the client goroutine decides that shutting down is necessary. Otherwise we could end up
// in a situation where the client goroutine is no longer consuming messages, but this
// goroutine is blocked on writing to packetChan. Therefore, when the client goroutine needs
// to shutdown, it will also close closeClientChan, to ensure that we don't block here.
select {
case client.packetChan <- bytes.NewBuffer(buf):
break
case <-client.closeClientChan:
break
}
}

// createUDPClient is invoked with an exclusive lock on cp.mutex.
func (cp *CollectingProcess) createUDPClient(addr string) *clientHandler {
client := &clientHandler{
packetChan: make(chan *bytes.Buffer),
closeClientChan: make(chan struct{}),
}
cp.clients[addr] = client
cp.wg.Add(1)
go func() {
defer cp.wg.Done()
ticker := time.NewTicker(time.Duration(entities.TemplateRefreshTimeOut) * time.Second)
defer ticker.Stop()
defer close(client.closeClientChan)
defer func() {
cp.mutex.Lock()
defer cp.mutex.Unlock()
delete(cp.clients, addr)
}()
for {
select {
case <-cp.stopChan:
klog.Infof("Collecting process from %s has stopped.", addr)
return
case <-ticker.C: // set timeout for udp connection
klog.Errorf("UDP connection from %s timed out.", addr)
return
case packet := <-client.packetChan:
// get the message here
message, err := cp.decodePacket(packet, addr)
if err != nil {
klog.Error(err)
// For UDP, there is no point in returning here, as the
// client would not become aware that there is an issue.
// This is why the template refresh mechanism exists.
continue
}
klog.V(4).Infof("Processed message from exporter %v, number of records: %v, observation domain ID: %v",
message.GetExportAddress(), message.GetSet().GetNumberOfRecords(), message.GetObsDomainID())
ticker.Reset(time.Duration(entities.TemplateRefreshTimeOut) * time.Second)
}
}()
}
}
}()
return client
}

0 comments on commit a4ae35d

Please sign in to comment.