Skip to content

Commit

Permalink
feat: update pointer type from aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 12, 2024
1 parent 50dc5aa commit 5cb6cd5
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
2 changes: 1 addition & 1 deletion node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
msg.Response <- bus.MessageResponse{Success: true}
case bus.STREAM_LOCAL_AGGREGATE:

localAggregate := msg.Content.Args["value"].(LocalAggregate)
localAggregate := msg.Content.Args["value"].(*LocalAggregate)
log.Debug().Any("bus local aggregate", localAggregate).Msg("local aggregate received")
a.LatestLocalAggregates.Store(localAggregate.ConfigID, localAggregate)

Expand Down
4 changes: 2 additions & 2 deletions node/pkg/aggregator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,10 @@ func insertSampleData(ctx context.Context, app *App, latestLocalAggMap *LatestLo
// return nil, err
// }

tmpLocalAggregate := LocalAggregate{ConfigID: tmpConfig.ID, Value: int64(10), Timestamp: localAggregateInsertTime}
tmpLocalAggregate := &LocalAggregate{ConfigID: tmpConfig.ID, Value: int64(10), Timestamp: localAggregateInsertTime}
latestLocalAggMap.Store(tmpConfig.ID, tmpLocalAggregate)

tmpData.rLocalAggregate = tmpLocalAggregate
tmpData.rLocalAggregate = *tmpLocalAggregate

tmpPLocalAggregate, err := db.QueryRow[LocalAggregate](ctx, InsertLocalAggregateQuery, map[string]any{"config_id": tmpConfig.ID, "value": int64(10), "time": localAggregateInsertTime})
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,24 +227,24 @@ type TriggerMessage struct {
}

type LatestLocalAggregates struct {
LocalAggregateMap map[int32]LocalAggregate
LocalAggregateMap map[int32]*LocalAggregate
mu sync.RWMutex
}

func NewLatestLocalAggregates() *LatestLocalAggregates {
return &LatestLocalAggregates{
LocalAggregateMap: map[int32]LocalAggregate{},
LocalAggregateMap: map[int32]*LocalAggregate{},
}
}

func (a *LatestLocalAggregates) Load(id int32) (LocalAggregate, bool) {
func (a *LatestLocalAggregates) Load(id int32) (*LocalAggregate, bool) {
a.mu.RLock()
defer a.mu.RUnlock()
result, ok := a.LocalAggregateMap[id]
return result, ok
}

func (a *LatestLocalAggregates) Store(id int32, aggregate LocalAggregate) {
func (a *LatestLocalAggregates) Store(id int32, aggregate *LocalAggregate) {
a.mu.Lock()
defer a.mu.Unlock()
a.LocalAggregateMap[id] = aggregate
Expand Down

0 comments on commit 5cb6cd5

Please sign in to comment.