diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 479638847..ce2acbb09 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -31,7 +31,7 @@ const ( type Config = types.Config type Collector struct { - OutgoingStream map[int32]chan *dalcommon.OutgoingSubmissionData + OutgoingStream map[string]chan *dalcommon.OutgoingSubmissionData Symbols map[int32]string FeedHashes map[int32][]byte LatestTimestamps map[int32]time.Time @@ -92,7 +92,7 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) { } collector := &Collector{ - OutgoingStream: make(map[int32]chan *dalcommon.OutgoingSubmissionData, len(configs)), + OutgoingStream: make(map[string]chan *dalcommon.OutgoingSubmissionData, len(configs)), Symbols: make(map[int32]string, len(configs)), FeedHashes: make(map[int32][]byte, len(configs)), LatestTimestamps: make(map[int32]time.Time), @@ -104,7 +104,7 @@ func NewCollector(ctx context.Context, configs []Config) (*Collector, error) { redisTopics := []string{} for _, config := range configs { - collector.OutgoingStream[config.ID] = make(chan *dalcommon.OutgoingSubmissionData, 1000) + collector.OutgoingStream[config.Name] = make(chan *dalcommon.OutgoingSubmissionData, 1000) collector.Symbols[config.ID] = config.Name collector.FeedHashes[config.ID] = crypto.Keccak256([]byte(config.Name)) redisTopics = append(redisTopics, keys.SubmissionDataStreamKey(config.Name)) @@ -228,12 +228,12 @@ func (c *Collector) processIncomingData(ctx context.Context, data *aggregator.Su return } - defer func(data *dalcommon.OutgoingSubmissionData) { + defer func(result *dalcommon.OutgoingSubmissionData) { c.mu.Lock() defer c.mu.Unlock() - c.LatestData[data.Symbol] = data + c.LatestData[result.Symbol] = result }(result) - c.OutgoingStream[data.GlobalAggregate.ConfigID] <- result + c.OutgoingStream[result.Symbol] <- result } } diff --git a/node/pkg/dal/hub/hub.go b/node/pkg/dal/hub/hub.go index 9969df85d..1b5f69f52 100644 --- a/node/pkg/dal/hub/hub.go +++ b/node/pkg/dal/hub/hub.go @@ -139,25 +139,11 @@ func (h *Hub) removeClient(client *websocket.Conn) { } func (h *Hub) initializeBroadcastChannels(collector *collector.Collector) { - for configId, stream := range collector.OutgoingStream { - symbol := h.configIdToSymbol(configId) - if symbol == "" { - continue - } - + for symbol, stream := range collector.OutgoingStream { h.broadcast[symbol] = stream } } -func (h *Hub) configIdToSymbol(id int32) string { - for symbol, config := range h.Configs { - if config.ID == id { - return symbol - } - } - return "" -} - func (h *Hub) broadcastDataForSymbol(ctx context.Context, symbol string) { for data := range h.broadcast[symbol] { go h.castSubmissionData(ctx, data, &symbol)