From 5cb6cd58d15780b4a774a46fe3d48ae47104ed1e Mon Sep 17 00:00:00 2001 From: nick Date: Mon, 12 Aug 2024 13:13:23 +0900 Subject: [PATCH] feat: update pointer type from aggregator --- node/pkg/aggregator/app.go | 2 +- node/pkg/aggregator/main_test.go | 4 ++-- node/pkg/aggregator/types.go | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/node/pkg/aggregator/app.go b/node/pkg/aggregator/app.go index e543550ac..88c5a214b 100644 --- a/node/pkg/aggregator/app.go +++ b/node/pkg/aggregator/app.go @@ -351,7 +351,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { msg.Response <- bus.MessageResponse{Success: true} case bus.STREAM_LOCAL_AGGREGATE: - localAggregate := msg.Content.Args["value"].(LocalAggregate) + localAggregate := msg.Content.Args["value"].(*LocalAggregate) log.Debug().Any("bus local aggregate", localAggregate).Msg("local aggregate received") a.LatestLocalAggregates.Store(localAggregate.ConfigID, localAggregate) diff --git a/node/pkg/aggregator/main_test.go b/node/pkg/aggregator/main_test.go index d847d5579..b68d3a097 100644 --- a/node/pkg/aggregator/main_test.go +++ b/node/pkg/aggregator/main_test.go @@ -113,10 +113,10 @@ func insertSampleData(ctx context.Context, app *App, latestLocalAggMap *LatestLo // return nil, err // } - tmpLocalAggregate := LocalAggregate{ConfigID: tmpConfig.ID, Value: int64(10), Timestamp: localAggregateInsertTime} + tmpLocalAggregate := &LocalAggregate{ConfigID: tmpConfig.ID, Value: int64(10), Timestamp: localAggregateInsertTime} latestLocalAggMap.Store(tmpConfig.ID, tmpLocalAggregate) - tmpData.rLocalAggregate = tmpLocalAggregate + tmpData.rLocalAggregate = *tmpLocalAggregate tmpPLocalAggregate, err := db.QueryRow[LocalAggregate](ctx, InsertLocalAggregateQuery, map[string]any{"config_id": tmpConfig.ID, "value": int64(10), "time": localAggregateInsertTime}) if err != nil { diff --git a/node/pkg/aggregator/types.go b/node/pkg/aggregator/types.go index 81aaa5af8..33aff2f1d 100644 --- a/node/pkg/aggregator/types.go +++ b/node/pkg/aggregator/types.go @@ -227,24 +227,24 @@ type TriggerMessage struct { } type LatestLocalAggregates struct { - LocalAggregateMap map[int32]LocalAggregate + LocalAggregateMap map[int32]*LocalAggregate mu sync.RWMutex } func NewLatestLocalAggregates() *LatestLocalAggregates { return &LatestLocalAggregates{ - LocalAggregateMap: map[int32]LocalAggregate{}, + LocalAggregateMap: map[int32]*LocalAggregate{}, } } -func (a *LatestLocalAggregates) Load(id int32) (LocalAggregate, bool) { +func (a *LatestLocalAggregates) Load(id int32) (*LocalAggregate, bool) { a.mu.RLock() defer a.mu.RUnlock() result, ok := a.LocalAggregateMap[id] return result, ok } -func (a *LatestLocalAggregates) Store(id int32, aggregate LocalAggregate) { +func (a *LatestLocalAggregates) Store(id int32, aggregate *LocalAggregate) { a.mu.Lock() defer a.mu.Unlock() a.LocalAggregateMap[id] = aggregate