From 32718b967a10e451041f134b4f03c1e3fbbe0d82 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 22 Nov 2024 14:14:21 +0900 Subject: [PATCH 1/3] fix: store latest prices into redis cache --- node/pkg/common/keys/keys.go | 8 ++++++++ node/pkg/common/types/types.go | 26 ++++++++++++++++++++++++++ node/pkg/fetcher/localaggregator.go | 8 +++++++- node/pkg/websocketfetcher/app.go | 16 ++++++++++++++++ 4 files changed, 57 insertions(+), 1 deletion(-) diff --git a/node/pkg/common/keys/keys.go b/node/pkg/common/keys/keys.go index 487153e46..1359323e8 100644 --- a/node/pkg/common/keys/keys.go +++ b/node/pkg/common/keys/keys.go @@ -1,5 +1,13 @@ package keys +import ( + "strconv" +) + func SubmissionDataStreamKey(name string) string { return "submissionDataStream:" + name } + +func FeedData(feedID int32) string { + return "feedData:" + strconv.Itoa(int(feedID)) +} diff --git a/node/pkg/common/types/types.go b/node/pkg/common/types/types.go index 2852fd35b..5141bd5cb 100644 --- a/node/pkg/common/types/types.go +++ b/node/pkg/common/types/types.go @@ -1,10 +1,14 @@ package types import ( + "context" "encoding/json" "fmt" "sync" "time" + + "bisonai.com/miko/node/pkg/common/keys" + "bisonai.com/miko/node/pkg/db" ) type Proxy struct { @@ -75,6 +79,28 @@ func (m *LatestFeedDataMap) GetLatestFeedData(feedIds []int32) ([]*FeedData, err result = append(result, feedData) } } + + return result, nil +} + +func (m *LatestFeedDataMap) GetLatestFeedDataFromCache(ctx context.Context, feedIds []int32) ([]*FeedData, error) { + queryingKeys := make([]string, 0, len(feedIds)) + for _, feedId := range feedIds { + queryingKeys = append(queryingKeys, keys.FeedData(feedId)) + } + + result, err := db.MGetObject[*FeedData](ctx, queryingKeys) + if err != nil { + return nil, err + } + + if len(result) != 0 { + err = m.SetLatestFeedData(result) + if err != nil { + return nil, err + } + } + return result, nil } diff --git a/node/pkg/fetcher/localaggregator.go b/node/pkg/fetcher/localaggregator.go index 61961685b..90a101023 100644 --- a/node/pkg/fetcher/localaggregator.go +++ b/node/pkg/fetcher/localaggregator.go @@ -232,5 +232,11 @@ func (c *LocalAggregator) collect(ctx context.Context) ([]*FeedData, error) { for i, feed := range c.Feeds { feedIds[i] = feed.ID } - return c.latestFeedDataMap.GetLatestFeedData(feedIds) + + result, err := c.latestFeedDataMap.GetLatestFeedData(feedIds) + if err != nil || len(result) == 0 { + return c.latestFeedDataMap.GetLatestFeedDataFromCache(ctx, feedIds) + } + + return result, nil } diff --git a/node/pkg/websocketfetcher/app.go b/node/pkg/websocketfetcher/app.go index a078decf4..d5fae5f39 100644 --- a/node/pkg/websocketfetcher/app.go +++ b/node/pkg/websocketfetcher/app.go @@ -8,6 +8,7 @@ import ( "time" "bisonai.com/miko/node/pkg/chain/websocketchainreader" + "bisonai.com/miko/node/pkg/common/keys" "bisonai.com/miko/node/pkg/common/types" "bisonai.com/miko/node/pkg/db" "bisonai.com/miko/node/pkg/websocketfetcher/common" @@ -43,6 +44,7 @@ import ( const ( DefaultStoreInterval = 200 * time.Millisecond DefaultBufferSize = 3000 + warmCacheTTL = 15 * time.Second ) type AppConfig struct { @@ -303,7 +305,21 @@ func (a *App) storeFeedData(ctx context.Context) { if err != nil { log.Error().Err(err).Msg("error in setting latest feed data") } + + err = a.StoreIntoRedisCache(ctx, batch) + if err != nil { + log.Error().Err(err).Msg("error in storing into redis cache") + } default: return } } + +func (a *App) StoreIntoRedisCache(ctx context.Context, batch []*types.FeedData) error { + batchStoreEntries := make(map[string]any) + for _, feedData := range batch { + batchStoreEntries[keys.FeedData(feedData.FeedID)] = feedData + } + + return db.MSetObjectWithExp(ctx, batchStoreEntries, warmCacheTTL) +} From 7f9850f0bddcc8f29b19fc6f3a06971aa57d9598 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 22 Nov 2024 14:56:04 +0900 Subject: [PATCH 2/3] fix: update based on feedback --- node/pkg/common/types/types.go | 4 ++++ node/pkg/websocketfetcher/app.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/node/pkg/common/types/types.go b/node/pkg/common/types/types.go index 5141bd5cb..4fa6a3b7e 100644 --- a/node/pkg/common/types/types.go +++ b/node/pkg/common/types/types.go @@ -84,6 +84,10 @@ func (m *LatestFeedDataMap) GetLatestFeedData(feedIds []int32) ([]*FeedData, err } func (m *LatestFeedDataMap) GetLatestFeedDataFromCache(ctx context.Context, feedIds []int32) ([]*FeedData, error) { + if len(feedIds) == 0 { + return nil, nil + } + queryingKeys := make([]string, 0, len(feedIds)) for _, feedId := range feedIds { queryingKeys = append(queryingKeys, keys.FeedData(feedId)) diff --git a/node/pkg/websocketfetcher/app.go b/node/pkg/websocketfetcher/app.go index d5fae5f39..736cb5e0d 100644 --- a/node/pkg/websocketfetcher/app.go +++ b/node/pkg/websocketfetcher/app.go @@ -44,7 +44,7 @@ import ( const ( DefaultStoreInterval = 200 * time.Millisecond DefaultBufferSize = 3000 - warmCacheTTL = 15 * time.Second + warmCacheTTL = time.Minute ) type AppConfig struct { From 8e07668b9d0151869d10b791c6b815635e68d48f Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 22 Nov 2024 15:18:02 +0900 Subject: [PATCH 3/3] fix: safety code based on feedback --- node/pkg/common/types/types.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/node/pkg/common/types/types.go b/node/pkg/common/types/types.go index 4fa6a3b7e..9158dc3b2 100644 --- a/node/pkg/common/types/types.go +++ b/node/pkg/common/types/types.go @@ -105,6 +105,10 @@ func (m *LatestFeedDataMap) GetLatestFeedDataFromCache(ctx context.Context, feed } } + if result == nil { + result = []*FeedData{} + } + return result, nil }