Skip to content

Commit

Permalink
Add UDS cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
halacs committed Jan 1, 2024
1 parent 4891101 commit 4052a5a
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 28 deletions.
6 changes: 1 addition & 5 deletions fmb920/onlinedevices.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package fmb920

import (
"fmt"
"github.com/halacs/haltonika/config"
"net"
"time"
Expand All @@ -14,10 +13,7 @@ func (s *Server) markDeviceOnline(remote *net.UDPAddr, imei string) error {
Timestamp: time.Now(),
})

_, err := s.udsServer.KeepAlive(imei)
if err != nil {
return fmt.Errorf("failed to keep alive %s device. %v", imei, err)
}
s.udsServer.KeepAlive(imei)

return nil
}
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ func initializeMetricServer(ctx context.Context, log *logrus.Logger, wg *sync.Wa
}

func initializeUdsServer(ctx context.Context, log *logrus.Logger, cfg *config.UdsServerConfig) *uds.MultiServer {
udsMultiServer, err := uds.NewMultiServer(ctx, cfg.BasePath)
var wg sync.WaitGroup
udsMultiServer, err := uds.NewMultiServer(ctx, cfg.BasePath, &wg)
if err != nil {
log.Errorf("Failed to create multi UDS server. %v", err)
}
Expand Down
78 changes: 58 additions & 20 deletions uds/multiServer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@ import (
"context"
"fmt"
"github.com/sirupsen/logrus"
"sync"
"time"
)

const (
deleteIfOlderThen = 1 * time.Hour
checkLastSeenEvery = 10 * time.Second
)

type MultiServerInterface interface {
Stop() error
StartServer(deviceID string, toDevice, fromDevice chan string) (*Server, error)
StopServer(deviceID string) error
StopAllServers() error
KeepAlive(deviceID string) (found bool, err error)
KeepAlive(deviceID string)
GetServer(deviceID string) (*Server, error)
}

Expand All @@ -20,13 +27,16 @@ type MultiServer struct {
servers map[string]*Server
log logrus.Logger
basePath string
lastSeen sync.Map
wg *sync.WaitGroup
}

func NewMultiServer(ctx context.Context, basePath string) (*MultiServer, error) {
func NewMultiServer(ctx context.Context, basePath string, wg *sync.WaitGroup) (*MultiServer, error) {
ms := &MultiServer{
ctx: ctx,
servers: make(map[string]*Server),
basePath: basePath,
wg: wg,
}

return ms, nil
Expand All @@ -46,6 +56,8 @@ func (ms *MultiServer) StartServer(deviceID string, toDevice, fromDevice chan st

ms.setServerForDevice(deviceID, udsServer)

ms.keepAliveChecker()

return udsServer, nil
}

Expand Down Expand Up @@ -85,48 +97,74 @@ func (ms *MultiServer) StopAllServers() error {
return nil
}

func (ms *MultiServer) KeepAlive(deviceID string) (found bool, err error) {
return false, nil // TODO implement
func (ms *MultiServer) KeepAlive(deviceID string) {
key := deviceID
value := time.Now()
ms.lastSeen.Store(key, value)
}

/*
func (ms *MultiServer) keepAliveChecker() error {
func (ms *MultiServer) keepAliveChecker() {
ms.wg.Add(1)
go func() {
ticker := time.NewTicker(time.Second)
defer ms.wg.Done()

ticker := time.NewTicker(checkLastSeenEvery)
defer ticker.Stop()

for {
select {
case <-ticker.C:
ms.log.Warningf("Ticker fired - NOT IMPLEMENTED YET")
// TODO we should check here if there is a socket should be closed because of time out
ms.lastSeen.Range(func(key, value any) bool {
deviceID := key.(string)
lastSeenTimestamp := value.(time.Time)

server, err := ms.GetServer(deviceID)
if err != nil {
ms.log.Errorf("Failed to close expired Unix Domain Socket. %v", err)
}

if server != nil && server.IsActive() {
err2 := ms.removeServer(deviceID)
if err2 != nil {
ms.log.Errorf("Failed to close expired Unix Domain Socket. %v", err)
}
}

if time.Now().Add(-1 * deleteIfOlderThen).Before(lastSeenTimestamp) {
ms.lastSeen.Delete(deviceID)
ms.log.Debugf("Device expired: %s", deviceID)
}

return true // continue
})
case <-ms.ctx.Done():
return
}
}
}()
return nil
}
*/

func (ms *MultiServer) setServerForDevice(deviceID string, server *Server) {
ms.servers[deviceID] = server
}

/*
TODO implement udsServer cleanup based on timeout with keep alive calls
func (ms *MultiServer) removeServer(deviceID string) error {
_, found := ms.servers[deviceID]
server, found := ms.servers[deviceID]
if !found {
return fmt.Errorf("no UDS server found for %s device ID", deviceID)
}

if server.IsActive() {
err := server.Stop()
if err != nil {
return fmt.Errorf("failed to stop server. %v", err)
}
}

delete(ms.servers, deviceID)

return nil
}
*/

func (ms *MultiServer) setServerForDevice(deviceID string, server *Server) {
ms.servers[deviceID] = server
}

func (ms *MultiServer) GetServer(deviceID string) (*Server, error) {
server, found := ms.servers[deviceID]
Expand Down
3 changes: 1 addition & 2 deletions uds/multiServerMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ func (ms *MultiServerMock) StopAllServers() error {
return nil
}

func (ms *MultiServerMock) KeepAlive(deviceID string) (found bool, err error) {
return true, err
func (ms *MultiServerMock) KeepAlive(deviceID string) {
}

func (ms *MultiServerMock) GetServer(deviceID string) (*Server, error) {
Expand Down
8 changes: 8 additions & 0 deletions uds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,14 @@ func (us *Server) SetToDeviceChannel(c chan string) {
us.log.Debugf("Device TO channel has been set")
}

func (us *Server) IsActive() bool {
if us.listener == nil {
return false
}

return false
}

func (us *Server) Stop() error {
socketPath, err := us.getUdsName()
if err != nil {
Expand Down

0 comments on commit 4052a5a

Please sign in to comment.