Skip to content

Commit

Permalink
fix: minor refactoring to reduce unnecessary indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 26, 2024
1 parent 729a8e1 commit d0ec476
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 21 deletions.
12 changes: 6 additions & 6 deletions node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
type Config = types.Config

type Collector struct {
OutgoingStream map[int32]chan *dalcommon.OutgoingSubmissionData
OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData
Symbols map[int32]string
FeedHashes map[int32][]byte
LatestTimestamps map[int32]time.Time
Expand Down Expand Up @@ -92,7 +92,7 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
}

collector := &Collector{
OutgoingStream: make(map[int32]chan *dalcommon.OutgoingSubmissionData, len(configs)),
OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestTimestamps: make(map[int32]time.Time),
Expand All @@ -104,7 +104,7 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {

redisTopics := []string{}
for _, config := range configs {
collector.OutgoingStream[config.ID] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000)
collector.Symbols[config.ID] = config.Name
collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name))
redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name))
Expand Down Expand Up @@ -228,12 +228,12 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su
return
}

defer func(data *dalcommon.OutgoingSubmissionData) {
defer func(result *dalcommon.OutgoingSubmissionData) {
c.mu.Lock()
defer c.mu.Unlock()
c.LatestData[data.Symbol] = data
c.LatestData[result.Symbol] = result
}(result)
c.OutgoingStream[data.GlobalAggregate.ConfigID] <- result
c.OutgoingStream[result.Symbol] <- result
}
}

Expand Down
16 changes: 1 addition & 15 deletions node/pkg/dal/hub/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,25 +139,11 @@ func (h *Hub) removeClient(client *websocket.Conn) {
}

func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) {
for configId, stream := range collector.OutgoingStream {
symbol := h.configIdToSymbol(configId)
if symbol == "" {
continue
}

for symbol, stream := range collector.OutgoingStream {
h.broadcast[symbol] = stream
}
}

func (h *Hub) configIdToSymbol(id int32) string {
for symbol, config := range h.Configs {
if config.ID == id {
return symbol
}
}
return ""
}

func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) {
for data := range h.broadcast[symbol] {
go h.castSubmissionData(ctx, data, &symbol)
Expand Down

0 comments on commit d0ec476

Please sign in to comment.