Skip to content

Commit

Permalink
feat: in memory latest data
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Jul 10, 2024
1 parent 0912c0b commit c942824
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 109 deletions.
92 changes: 3 additions & 89 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"strings"

"bisonai.com/orakl/node/pkg/aggregator"
"bisonai.com/orakl/node/pkg/common/keys"
"bisonai.com/orakl/node/pkg/common/types"
"bisonai.com/orakl/node/pkg/dal/collector"
dalcommon "bisonai.com/orakl/node/pkg/dal/common"
Expand Down Expand Up @@ -146,70 +144,6 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) {
}
}

func (c *Controller) getLatestSubmissionData(ctx context.Context) ([]aggregator.SubmissionData, error) {
globalAggregateKeyList := make([]string, 0, len(c.configs))
for _, config := range c.configs {
globalAggregateKeyList = append(globalAggregateKeyList, keys.GlobalAggregateKey(config.ID))
}

globalAggregates, err := db.MGetObject[aggregator.GlobalAggregate](ctx, globalAggregateKeyList)
if err != nil {
return nil, err
}

proofKeyList := make([]string, 0, len(globalAggregates))
for _, globalAggregate := range globalAggregates {
proofKeyList = append(proofKeyList, keys.ProofKey(globalAggregate.ConfigID, globalAggregate.Round))
}

proofs, err := db.MGetObject[aggregator.Proof](ctx, proofKeyList)
if err != nil {
return nil, err
}

proofMap := make(map[int32]aggregator.Proof)
for _, proof := range proofs {
proofMap[proof.ConfigID] = proof
}

result := make([]aggregator.SubmissionData, 0, len(globalAggregates))
for _, globalAggregate := range globalAggregates {
proof, ok := proofMap[globalAggregate.ConfigID]
if !ok {
continue
}

result = append(result, aggregator.SubmissionData{
GlobalAggregate: globalAggregate,
Proof: proof,
})
}

return result, nil
}

func (c *Controller) getLatestSubmissionDataSingle(ctx context.Context, symbol string) (*aggregator.SubmissionData, error) {
config, ok := c.configs[symbol]
if !ok {
return nil, errors.New("invalid symbol")
}

globalAggregate, err := db.GetObject[aggregator.GlobalAggregate](ctx, keys.GlobalAggregateKey(config.ID))
if err != nil {
return nil, err
}

proof, err := db.GetObject[aggregator.Proof](ctx, keys.ProofKey(config.ID, globalAggregate.Round))
if err != nil {
return nil, err
}

return &aggregator.SubmissionData{
GlobalAggregate: globalAggregate,
Proof: proof,
}, nil
}

func getSymbols(c *fiber.Ctx) error {
result := []string{}
for key := range ApiController.configs {
Expand All @@ -226,20 +160,7 @@ func getLatestFeeds(c *fiber.Ctx) error {
map[string]any{"message": "getLatestFeeds called from " + c.IP()})
}()

submissionData, err := ApiController.getLatestSubmissionData(c.Context())
if err != nil {
return err
}

result := make([]dalcommon.OutgoingSubmissionData, 0, len(submissionData))

for _, data := range submissionData {
outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), data)
if err != nil {
return err
}
result = append(result, *outgoingData)
}
result := ApiController.Collector.GetAllLatestData()
return c.JSON(result)
}

Expand All @@ -264,14 +185,7 @@ func getLatestFeed(c *fiber.Ctx) error {
symbol = strings.ToUpper(symbol)
}

submissionData, err := ApiController.getLatestSubmissionDataSingle(c.Context(), symbol)
if err != nil {
return err
}
result := ApiController.Collector.GetLatestData(symbol)

outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), *submissionData)
if err != nil {
return err
}
return c.JSON(outgoingData)
return c.JSON(*result)
}
28 changes: 27 additions & 1 deletion node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Collector struct {
Symbols map[int32]string
FeedHashes map[int32][]byte
CachedWhitelist []klaytncommon.Address
LatestData map[string]*dalcommon.OutgoingSubmissionData

IsRunning bool
CancelFunc context.CancelFunc
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewCollector(ctx context.Context, configs []types.Config) (*Collector, erro
OutgoingStream: make(map[int32]chan dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestData: make(map[string]*dalcommon.OutgoingSubmissionData),
chainReader: chainReader,
CachedWhitelist: initialWhitelist,
submissionProxyContractAddr: submissionProxyContractAddr,
Expand Down Expand Up @@ -97,6 +99,24 @@ func (c *Collector) Start(ctx context.Context) {
c.trackOracleAdded(ctxWithCancel)
}

func (c *Collector) GetLatestData(symbol string) *dalcommon.OutgoingSubmissionData {
c.mu.RLock()
defer c.mu.RUnlock()
return c.LatestData[symbol]
}

func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData {
index := 0
c.mu.RLock()
result := make([]dalcommon.OutgoingSubmissionData, len(c.LatestData))
for _, value := range c.LatestData {
result[index] = *value
index++
}
c.mu.RUnlock()
return result
}

func (c *Collector) Stop() {
if c.CancelFunc != nil {
c.CancelFunc()
Expand Down Expand Up @@ -135,7 +155,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data aggregator.Sub
log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to convert incoming data to outgoing data")
return
}

defer c.storeLatest(result)
c.OutgoingStream[data.GlobalAggregate.ConfigID] <- *result
}

Expand Down Expand Up @@ -197,3 +217,9 @@ func (c *Collector) trackOracleAdded(ctx context.Context) {
}
}()
}

func (c *Collector) storeLatest(data *dalcommon.OutgoingSubmissionData) {
c.mu.Lock()
c.LatestData[data.Symbol] = data
c.mu.Unlock()
}
31 changes: 12 additions & 19 deletions node/pkg/dal/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"bisonai.com/orakl/node/pkg/admin/tests"
"bisonai.com/orakl/node/pkg/aggregator"
"bisonai.com/orakl/node/pkg/dal/api"
"bisonai.com/orakl/node/pkg/dal/common"
"bisonai.com/orakl/node/pkg/utils/request"
Expand Down Expand Up @@ -47,7 +46,6 @@ func TestApiGetLatestAll(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

testItems.Controller.Start(ctx)

sampleSubmissionData, err := generateSampleSubmissionData(
Expand All @@ -61,7 +59,12 @@ func TestApiGetLatestAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof)
err = testPublishData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}

time.Sleep(10 * time.Millisecond)

result, err := tests.GetRequest[[]common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/all", nil)
if err != nil {
Expand Down Expand Up @@ -100,19 +103,6 @@ func TestShouldFailWithoutApiKey(t *testing.T) {

assert.Equal(t, 200, resp.StatusCode)

sampleSubmissionData, err := generateSampleSubmissionData(
testItems.TmpConfig.ID,
int64(15),
time.Now(),
1,
"test-aggregate",
)
if err != nil {
t.Fatalf("error generating sample submission data: %v", err)
}

aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof)

result, err := request.RequestRaw(request.WithEndpoint("http://localhost:8090/api/v1/dal/latest-data-feeds/test-aggregate"))

if err != nil {
Expand All @@ -133,7 +123,6 @@ func TestApiGetLatest(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

testItems.Controller.Start(ctx)

sampleSubmissionData, err := generateSampleSubmissionData(
Expand All @@ -147,7 +136,12 @@ func TestApiGetLatest(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof)
err = testPublishData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}

time.Sleep(10 * time.Millisecond)

result, err := tests.GetRequest[common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/test-aggregate", nil)
if err != nil {
Expand Down Expand Up @@ -180,7 +174,6 @@ func TestApiWebsocket(t *testing.T) {
headers := map[string]string{"X-API-Key": apiKey}

testItems.Controller.Start(ctx)

go testItems.App.Listen(":8090")

conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws"), wss.WithRequestHeaders(headers))
Expand Down

0 comments on commit c942824

Please sign in to comment.