From c3559e175372807efbb32e1c5dee5d2fc170815e Mon Sep 17 00:00:00 2001 From: nick Date: Wed, 10 Jul 2024 22:49:29 +0900 Subject: [PATCH 1/3] feat: in memory latest data --- node/pkg/dal/api/controller.go | 92 +---------------------------- node/pkg/dal/collector/collector.go | 28 ++++++++- node/pkg/dal/tests/api_test.go | 31 ++++------ 3 files changed, 42 insertions(+), 109 deletions(-) diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index 60f56dd01..30df9e3a7 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -5,8 +5,6 @@ import ( "errors" "strings" - "bisonai.com/orakl/node/pkg/aggregator" - "bisonai.com/orakl/node/pkg/common/keys" "bisonai.com/orakl/node/pkg/common/types" "bisonai.com/orakl/node/pkg/dal/collector" dalcommon "bisonai.com/orakl/node/pkg/dal/common" @@ -146,70 +144,6 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) { } } -func (c *Controller) getLatestSubmissionData(ctx context.Context) ([]aggregator.SubmissionData, error) { - globalAggregateKeyList := make([]string, 0, len(c.configs)) - for _, config := range c.configs { - globalAggregateKeyList = append(globalAggregateKeyList, keys.GlobalAggregateKey(config.ID)) - } - - globalAggregates, err := db.MGetObject[aggregator.GlobalAggregate](ctx, globalAggregateKeyList) - if err != nil { - return nil, err - } - - proofKeyList := make([]string, 0, len(globalAggregates)) - for _, globalAggregate := range globalAggregates { - proofKeyList = append(proofKeyList, keys.ProofKey(globalAggregate.ConfigID, globalAggregate.Round)) - } - - proofs, err := db.MGetObject[aggregator.Proof](ctx, proofKeyList) - if err != nil { - return nil, err - } - - proofMap := make(map[int32]aggregator.Proof) - for _, proof := range proofs { - proofMap[proof.ConfigID] = proof - } - - result := make([]aggregator.SubmissionData, 0, len(globalAggregates)) - for _, globalAggregate := range globalAggregates { - proof, ok := proofMap[globalAggregate.ConfigID] - if !ok { - continue - } - - result = append(result, aggregator.SubmissionData{ - GlobalAggregate: globalAggregate, - Proof: proof, - }) - } - - return result, nil -} - -func (c *Controller) getLatestSubmissionDataSingle(ctx context.Context, symbol string) (*aggregator.SubmissionData, error) { - config, ok := c.configs[symbol] - if !ok { - return nil, errors.New("invalid symbol") - } - - globalAggregate, err := db.GetObject[aggregator.GlobalAggregate](ctx, keys.GlobalAggregateKey(config.ID)) - if err != nil { - return nil, err - } - - proof, err := db.GetObject[aggregator.Proof](ctx, keys.ProofKey(config.ID, globalAggregate.Round)) - if err != nil { - return nil, err - } - - return &aggregator.SubmissionData{ - GlobalAggregate: globalAggregate, - Proof: proof, - }, nil -} - func getSymbols(c *fiber.Ctx) error { result := []string{} for key := range ApiController.configs { @@ -226,20 +160,7 @@ func getLatestFeeds(c *fiber.Ctx) error { map[string]any{"message": "getLatestFeeds called from " + c.IP()}) }() - submissionData, err := ApiController.getLatestSubmissionData(c.Context()) - if err != nil { - return err - } - - result := make([]dalcommon.OutgoingSubmissionData, 0, len(submissionData)) - - for _, data := range submissionData { - outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), data) - if err != nil { - return err - } - result = append(result, *outgoingData) - } + result := ApiController.Collector.GetAllLatestData() return c.JSON(result) } @@ -264,14 +185,7 @@ func getLatestFeed(c *fiber.Ctx) error { symbol = strings.ToUpper(symbol) } - submissionData, err := ApiController.getLatestSubmissionDataSingle(c.Context(), symbol) - if err != nil { - return err - } + result := ApiController.Collector.GetLatestData(symbol) - outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), *submissionData) - if err != nil { - return err - } - return c.JSON(outgoingData) + return c.JSON(*result) } diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 9f2680f33..5dc14f3da 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -31,6 +31,7 @@ type Collector struct { Symbols map[int32]string FeedHashes map[int32][]byte CachedWhitelist []klaytncommon.Address + LatestData map[string]*dalcommon.OutgoingSubmissionData IsRunning bool CancelFunc context.CancelFunc @@ -67,6 +68,7 @@ func NewCollector(ctx context.Context, configs []types.Config) (*Collector, erro OutgoingStream: make(map[int32]chan dalcommon.OutgoingSubmissionData, len(configs)), Symbols: make(map[int32]string, len(configs)), FeedHashes: make(map[int32][]byte, len(configs)), + LatestData: make(map[string]*dalcommon.OutgoingSubmissionData), chainReader: chainReader, CachedWhitelist: initialWhitelist, submissionProxyContractAddr: submissionProxyContractAddr, @@ -97,6 +99,24 @@ func (c *Collector) Start(ctx context.Context) { c.trackOracleAdded(ctxWithCancel) } +func (c *Collector) GetLatestData(symbol string) *dalcommon.OutgoingSubmissionData { + c.mu.RLock() + defer c.mu.RUnlock() + return c.LatestData[symbol] +} + +func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData { + index := 0 + c.mu.RLock() + result := make([]dalcommon.OutgoingSubmissionData, len(c.LatestData)) + for _, value := range c.LatestData { + result[index] = *value + index++ + } + c.mu.RUnlock() + return result +} + func (c *Collector) Stop() { if c.CancelFunc != nil { c.CancelFunc() @@ -135,7 +155,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data aggregator.Sub log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to convert incoming data to outgoing data") return } - + defer c.storeLatest(result) c.OutgoingStream[data.GlobalAggregate.ConfigID] <- *result } @@ -197,3 +217,9 @@ func (c *Collector) trackOracleAdded(ctx context.Context) { } }() } + +func (c *Collector) storeLatest(data *dalcommon.OutgoingSubmissionData) { + c.mu.Lock() + c.LatestData[data.Symbol] = data + c.mu.Unlock() +} diff --git a/node/pkg/dal/tests/api_test.go b/node/pkg/dal/tests/api_test.go index d1df5af47..5703ac061 100644 --- a/node/pkg/dal/tests/api_test.go +++ b/node/pkg/dal/tests/api_test.go @@ -8,7 +8,6 @@ import ( "time" "bisonai.com/orakl/node/pkg/admin/tests" - "bisonai.com/orakl/node/pkg/aggregator" "bisonai.com/orakl/node/pkg/dal/api" "bisonai.com/orakl/node/pkg/dal/common" "bisonai.com/orakl/node/pkg/utils/request" @@ -47,7 +46,6 @@ func TestApiGetLatestAll(t *testing.T) { t.Logf("Cleanup failed: %v", cleanupErr) } }() - testItems.Controller.Start(ctx) sampleSubmissionData, err := generateSampleSubmissionData( @@ -61,7 +59,12 @@ func TestApiGetLatestAll(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof) + err = testPublishData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error publishing sample submission data: %v", err) + } + + time.Sleep(10 * time.Millisecond) result, err := tests.GetRequest[[]common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/all", nil) if err != nil { @@ -100,19 +103,6 @@ func TestShouldFailWithoutApiKey(t *testing.T) { assert.Equal(t, 200, resp.StatusCode) - sampleSubmissionData, err := generateSampleSubmissionData( - testItems.TmpConfig.ID, - int64(15), - time.Now(), - 1, - "test-aggregate", - ) - if err != nil { - t.Fatalf("error generating sample submission data: %v", err) - } - - aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof) - result, err := request.RequestRaw(request.WithEndpoint("http://localhost:8090/api/v1/dal/latest-data-feeds/test-aggregate")) if err != nil { @@ -133,7 +123,6 @@ func TestApiGetLatest(t *testing.T) { t.Logf("Cleanup failed: %v", cleanupErr) } }() - testItems.Controller.Start(ctx) sampleSubmissionData, err := generateSampleSubmissionData( @@ -147,7 +136,12 @@ func TestApiGetLatest(t *testing.T) { t.Fatalf("error generating sample submission data: %v", err) } - aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof) + err = testPublishData(ctx, *sampleSubmissionData) + if err != nil { + t.Fatalf("error publishing sample submission data: %v", err) + } + + time.Sleep(10 * time.Millisecond) result, err := tests.GetRequest[common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/test-aggregate", nil) if err != nil { @@ -180,7 +174,6 @@ func TestApiWebsocket(t *testing.T) { headers := map[string]string{"X-API-Key": apiKey} testItems.Controller.Start(ctx) - go testItems.App.Listen(":8090") conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws"), wss.WithRequestHeaders(headers)) From 336dd372a908c13bb6bec039d407f67adeb067ba Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 11 Jul 2024 10:31:11 +0900 Subject: [PATCH 2/3] feat: use sync.Map instead of mutex --- node/pkg/dal/api/controller.go | 5 +++- node/pkg/dal/collector/collector.go | 41 +++++++++++++++++------------ 2 files changed, 28 insertions(+), 18 deletions(-) diff --git a/node/pkg/dal/api/controller.go b/node/pkg/dal/api/controller.go index 30df9e3a7..9e5614e1d 100644 --- a/node/pkg/dal/api/controller.go +++ b/node/pkg/dal/api/controller.go @@ -185,7 +185,10 @@ func getLatestFeed(c *fiber.Ctx) error { symbol = strings.ToUpper(symbol) } - result := ApiController.Collector.GetLatestData(symbol) + result, err := ApiController.Collector.GetLatestData(symbol) + if err != nil { + return err + } return c.JSON(*result) } diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 5dc14f3da..67b00f01c 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -31,7 +31,7 @@ type Collector struct { Symbols map[int32]string FeedHashes map[int32][]byte CachedWhitelist []klaytncommon.Address - LatestData map[string]*dalcommon.OutgoingSubmissionData + LatestData sync.Map IsRunning bool CancelFunc context.CancelFunc @@ -68,7 +68,7 @@ func NewCollector(ctx context.Context, configs []types.Config) (*Collector, erro OutgoingStream: make(map[int32]chan dalcommon.OutgoingSubmissionData, len(configs)), Symbols: make(map[int32]string, len(configs)), FeedHashes: make(map[int32][]byte, len(configs)), - LatestData: make(map[string]*dalcommon.OutgoingSubmissionData), + LatestData: sync.Map{}, chainReader: chainReader, CachedWhitelist: initialWhitelist, submissionProxyContractAddr: submissionProxyContractAddr, @@ -99,21 +99,30 @@ func (c *Collector) Start(ctx context.Context) { c.trackOracleAdded(ctxWithCancel) } -func (c *Collector) GetLatestData(symbol string) *dalcommon.OutgoingSubmissionData { - c.mu.RLock() - defer c.mu.RUnlock() - return c.LatestData[symbol] +func (c *Collector) GetLatestData(symbol string) (*dalcommon.OutgoingSubmissionData, error) { + result, ok := c.LatestData.Load(symbol) + if !ok { + return nil, errors.New("symbol not found") + } + + data, ok := result.(*dalcommon.OutgoingSubmissionData) + if !ok { + return nil, errors.New("symbol not converted") + } + + return data, nil } func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData { - index := 0 - c.mu.RLock() - result := make([]dalcommon.OutgoingSubmissionData, len(c.LatestData)) - for _, value := range c.LatestData { - result[index] = *value - index++ - } - c.mu.RUnlock() + result := make([]dalcommon.OutgoingSubmissionData, 0) + c.LatestData.Range(func(key, value interface{}) bool { + data, ok := value.(*dalcommon.OutgoingSubmissionData) + if !ok { + return true + } + result = append(result, *data) + return true + }) return result } @@ -219,7 +228,5 @@ func (c *Collector) trackOracleAdded(ctx context.Context) { } func (c *Collector) storeLatest(data *dalcommon.OutgoingSubmissionData) { - c.mu.Lock() - c.LatestData[data.Symbol] = data - c.mu.Unlock() + c.LatestData.Store(data.Symbol, data) } From a9a64c862e8f4d1ff9bd371e164dd65968a5f67d Mon Sep 17 00:00:00 2001 From: nick Date: Thu, 11 Jul 2024 12:00:19 +0900 Subject: [PATCH 3/3] fix: remove unnecessary encapsulation --- node/pkg/dal/collector/collector.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/node/pkg/dal/collector/collector.go b/node/pkg/dal/collector/collector.go index 67b00f01c..0e005c53b 100644 --- a/node/pkg/dal/collector/collector.go +++ b/node/pkg/dal/collector/collector.go @@ -164,7 +164,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data aggregator.Sub log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to convert incoming data to outgoing data") return } - defer c.storeLatest(result) + defer c.LatestData.Store(result.Symbol, result) c.OutgoingStream[data.GlobalAggregate.ConfigID] <- *result } @@ -226,7 +226,3 @@ func (c *Collector) trackOracleAdded(ctx context.Context) { } }() } - -func (c *Collector) storeLatest(data *dalcommon.OutgoingSubmissionData) { - c.LatestData.Store(data.Symbol, data) -}