Skip to content

Commit

Permalink
bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
halacs committed Jan 1, 2024
1 parent 69aa93c commit 430d0f6
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,7 @@ func initializeMetricServer(ctx context.Context, log *logrus.Logger, wg *sync.Wa
}

func initializeUdsServer(ctx context.Context, log *logrus.Logger, cfg *config.UdsServerConfig) *uds.MultiServer {
var wg sync.WaitGroup
udsMultiServer, err := uds.NewMultiServer(ctx, cfg.BasePath, &wg)
udsMultiServer, err := uds.NewMultiServer(ctx, cfg.BasePath, log)
if err != nil {
log.Errorf("Failed to create multi UDS server. %v", err)
}
Expand Down
46 changes: 30 additions & 16 deletions uds/multiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,25 @@ type MultiServerInterface interface {
type MultiServer struct {
ctx context.Context
servers map[string]*Server
log logrus.Logger
log *logrus.Logger
basePath string
lastSeen sync.Map
wg *sync.WaitGroup
}

func NewMultiServer(ctx context.Context, basePath string, wg *sync.WaitGroup) (*MultiServer, error) {
func NewMultiServer(ctx context.Context, basePath string, log *logrus.Logger) (*MultiServer, error) {
var wg sync.WaitGroup

ms := &MultiServer{
ctx: ctx,
servers: make(map[string]*Server),
basePath: basePath,
wg: wg,
wg: &wg,
log: log,
}

ms.keepAliveChecker()

return ms, nil
}

Expand All @@ -56,8 +61,6 @@ func (ms *MultiServer) StartServer(deviceID string, toDevice, fromDevice chan st

ms.setServerForDevice(deviceID, udsServer)

ms.keepAliveChecker()

return udsServer, nil
}

Expand Down Expand Up @@ -101,41 +104,52 @@ func (ms *MultiServer) KeepAlive(deviceID string) {
key := deviceID
value := time.Now()
ms.lastSeen.Store(key, value)
ms.log.Tracef("multiServer keep alive: %s", deviceID)
}

func (ms *MultiServer) keepAliveChecker() {
ms.wg.Add(1)
go func() {
defer ms.wg.Done()

ms.log.Debugf("UDSServer: keep alive checker started")

ticker := time.NewTicker(checkLastSeenEvery)
defer ticker.Stop()
defer func() {
ticker.Stop()
}()

for {
select {
case <-ticker.C:
ms.log.Tracef("UDSServer: checking keep alive timestamp")

ms.lastSeen.Range(func(key, value any) bool {
deviceID := key.(string)
lastSeenTimestamp := value.(time.Time)

server, err := ms.GetServer(deviceID)
if err != nil {
ms.log.Errorf("Failed to close expired Unix Domain Socket. %v", err)
}
if lastSeenTimestamp.Add(deleteIfOlderThen).After(time.Now()) { // Do last keep alive too old?
ms.log.Infof("%v is too old (max %v allowed). UDS of %s device is going to be deleted.", lastSeenTimestamp, deleteIfOlderThen, deviceID)

if server != nil && server.IsActive() {
err2 := ms.removeServer(deviceID)
if err2 != nil {
// Stop UDS server for the given device
server, err := ms.GetServer(deviceID)
if err != nil {
ms.log.Errorf("Failed to close expired Unix Domain Socket. %v", err)
}
}

if time.Now().Add(-1 * deleteIfOlderThen).Before(lastSeenTimestamp) {
if server != nil && server.IsActive() {
err2 := ms.removeServer(deviceID)
if err2 != nil {
ms.log.Errorf("Failed to close expired Unix Domain Socket. %v", err)
}
}

// Drop keep alive data
ms.lastSeen.Delete(deviceID)
ms.log.Debugf("Device expired: %s", deviceID)
}

return true // continue
return true // continue with rest of the keep alive timestamps
})
case <-ms.ctx.Done():
return
Expand Down

0 comments on commit 430d0f6

Please sign in to comment.