Skip to content

Commit

Permalink
(DAL) Latest outgoing data from collector (#1781)
Browse files Browse the repository at this point in the history
* feat: in memory latest data

* feat: use sync.Map instead of mutex

* fix: remove unnecessary encapsulation
  • Loading branch information
nick-bisonai authored Jul 11, 2024
1 parent 79e4490 commit 858686f
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 106 deletions.
89 changes: 3 additions & 86 deletions node/pkg/dal/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"strings"

"bisonai.com/orakl/node/pkg/aggregator"
"bisonai.com/orakl/node/pkg/common/keys"
"bisonai.com/orakl/node/pkg/common/types"
"bisonai.com/orakl/node/pkg/dal/collector"
dalcommon "bisonai.com/orakl/node/pkg/dal/common"
Expand Down Expand Up @@ -146,70 +144,6 @@ func (c *Controller) handleWebsocket(conn *websocket.Conn) {
}
}

func (c *Controller) getLatestSubmissionData(ctx context.Context) ([]aggregator.SubmissionData, error) {
globalAggregateKeyList := make([]string, 0, len(c.configs))
for _, config := range c.configs {
globalAggregateKeyList = append(globalAggregateKeyList, keys.GlobalAggregateKey(config.ID))
}

globalAggregates, err := db.MGetObject[aggregator.GlobalAggregate](ctx, globalAggregateKeyList)
if err != nil {
return nil, err
}

proofKeyList := make([]string, 0, len(globalAggregates))
for _, globalAggregate := range globalAggregates {
proofKeyList = append(proofKeyList, keys.ProofKey(globalAggregate.ConfigID, globalAggregate.Round))
}

proofs, err := db.MGetObject[aggregator.Proof](ctx, proofKeyList)
if err != nil {
return nil, err
}

proofMap := make(map[int32]aggregator.Proof)
for _, proof := range proofs {
proofMap[proof.ConfigID] = proof
}

result := make([]aggregator.SubmissionData, 0, len(globalAggregates))
for _, globalAggregate := range globalAggregates {
proof, ok := proofMap[globalAggregate.ConfigID]
if !ok {
continue
}

result = append(result, aggregator.SubmissionData{
GlobalAggregate: globalAggregate,
Proof: proof,
})
}

return result, nil
}

func (c *Controller) getLatestSubmissionDataSingle(ctx context.Context, symbol string) (*aggregator.SubmissionData, error) {
config, ok := c.configs[symbol]
if !ok {
return nil, errors.New("invalid symbol")
}

globalAggregate, err := db.GetObject[aggregator.GlobalAggregate](ctx, keys.GlobalAggregateKey(config.ID))
if err != nil {
return nil, err
}

proof, err := db.GetObject[aggregator.Proof](ctx, keys.ProofKey(config.ID, globalAggregate.Round))
if err != nil {
return nil, err
}

return &aggregator.SubmissionData{
GlobalAggregate: globalAggregate,
Proof: proof,
}, nil
}

func getSymbols(c *fiber.Ctx) error {
result := []string{}
for key := range ApiController.configs {
Expand All @@ -226,20 +160,7 @@ func getLatestFeeds(c *fiber.Ctx) error {
map[string]any{"message": "getLatestFeeds called from " + c.IP()})
}()

submissionData, err := ApiController.getLatestSubmissionData(c.Context())
if err != nil {
return err
}

result := make([]dalcommon.OutgoingSubmissionData, 0, len(submissionData))

for _, data := range submissionData {
outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), data)
if err != nil {
return err
}
result = append(result, *outgoingData)
}
result := ApiController.Collector.GetAllLatestData()
return c.JSON(result)
}

Expand All @@ -264,14 +185,10 @@ func getLatestFeed(c *fiber.Ctx) error {
symbol = strings.ToUpper(symbol)
}

submissionData, err := ApiController.getLatestSubmissionDataSingle(c.Context(), symbol)
result, err := ApiController.Collector.GetLatestData(symbol)
if err != nil {
return err
}

outgoingData, err := ApiController.Collector.IncomingDataToOutgoingData(c.Context(), *submissionData)
if err != nil {
return err
}
return c.JSON(outgoingData)
return c.JSON(*result)
}
31 changes: 30 additions & 1 deletion node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Collector struct {
Symbols map[int32]string
FeedHashes map[int32][]byte
CachedWhitelist []klaytncommon.Address
LatestData sync.Map

IsRunning bool
CancelFunc context.CancelFunc
Expand Down Expand Up @@ -67,6 +68,7 @@ func NewCollector(ctx context.Context, configs []types.Config) (*Collector, erro
OutgoingStream: make(map[int32]chan dalcommon.OutgoingSubmissionData, len(configs)),
Symbols: make(map[int32]string, len(configs)),
FeedHashes: make(map[int32][]byte, len(configs)),
LatestData: sync.Map{},
chainReader: chainReader,
CachedWhitelist: initialWhitelist,
submissionProxyContractAddr: submissionProxyContractAddr,
Expand Down Expand Up @@ -97,6 +99,33 @@ func (c *Collector) Start(ctx context.Context) {
c.trackOracleAdded(ctxWithCancel)
}

func (c *Collector) GetLatestData(symbol string) (*dalcommon.OutgoingSubmissionData, error) {
result, ok := c.LatestData.Load(symbol)
if !ok {
return nil, errors.New("symbol not found")
}

data, ok := result.(*dalcommon.OutgoingSubmissionData)
if !ok {
return nil, errors.New("symbol not converted")
}

return data, nil
}

func (c *Collector) GetAllLatestData() []dalcommon.OutgoingSubmissionData {
result := make([]dalcommon.OutgoingSubmissionData, 0)
c.LatestData.Range(func(key, value interface{}) bool {
data, ok := value.(*dalcommon.OutgoingSubmissionData)
if !ok {
return true
}
result = append(result, *data)
return true
})
return result
}

func (c *Collector) Stop() {
if c.CancelFunc != nil {
c.CancelFunc()
Expand Down Expand Up @@ -135,7 +164,7 @@ func (c *Collector) processIncomingData(ctx context.Context, data aggregator.Sub
log.Error().Err(err).Str("Player", "DalCollector").Msg("failed to convert incoming data to outgoing data")
return
}

defer c.LatestData.Store(result.Symbol, result)
c.OutgoingStream[data.GlobalAggregate.ConfigID] <- *result
}

Expand Down
31 changes: 12 additions & 19 deletions node/pkg/dal/tests/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"bisonai.com/orakl/node/pkg/admin/tests"
"bisonai.com/orakl/node/pkg/aggregator"
"bisonai.com/orakl/node/pkg/dal/api"
"bisonai.com/orakl/node/pkg/dal/common"
"bisonai.com/orakl/node/pkg/utils/request"
Expand Down Expand Up @@ -47,7 +46,6 @@ func TestApiGetLatestAll(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

testItems.Controller.Start(ctx)

sampleSubmissionData, err := generateSampleSubmissionData(
Expand All @@ -61,7 +59,12 @@ func TestApiGetLatestAll(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof)
err = testPublishData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}

time.Sleep(10 * time.Millisecond)

result, err := tests.GetRequest[[]common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/all", nil)
if err != nil {
Expand Down Expand Up @@ -100,19 +103,6 @@ func TestShouldFailWithoutApiKey(t *testing.T) {

assert.Equal(t, 200, resp.StatusCode)

sampleSubmissionData, err := generateSampleSubmissionData(
testItems.TmpConfig.ID,
int64(15),
time.Now(),
1,
"test-aggregate",
)
if err != nil {
t.Fatalf("error generating sample submission data: %v", err)
}

aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof)

result, err := request.RequestRaw(request.WithEndpoint("http://localhost:8090/api/v1/dal/latest-data-feeds/test-aggregate"))

if err != nil {
Expand All @@ -133,7 +123,6 @@ func TestApiGetLatest(t *testing.T) {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

testItems.Controller.Start(ctx)

sampleSubmissionData, err := generateSampleSubmissionData(
Expand All @@ -147,7 +136,12 @@ func TestApiGetLatest(t *testing.T) {
t.Fatalf("error generating sample submission data: %v", err)
}

aggregator.SetLatestGlobalAggregateAndProof(ctx, testItems.TmpConfig.ID, sampleSubmissionData.GlobalAggregate, sampleSubmissionData.Proof)
err = testPublishData(ctx, *sampleSubmissionData)
if err != nil {
t.Fatalf("error publishing sample submission data: %v", err)
}

time.Sleep(10 * time.Millisecond)

result, err := tests.GetRequest[common.OutgoingSubmissionData](testItems.App, "/api/v1/dal/latest-data-feeds/test-aggregate", nil)
if err != nil {
Expand Down Expand Up @@ -180,7 +174,6 @@ func TestApiWebsocket(t *testing.T) {
headers := map[string]string{"X-API-Key": apiKey}

testItems.Controller.Start(ctx)

go testItems.App.Listen(":8090")

conn, err := wss.NewWebsocketHelper(ctx, wss.WithEndpoint("ws://localhost:8090/api/v1/dal/ws"), wss.WithRequestHeaders(headers))
Expand Down

0 comments on commit 858686f

Please sign in to comment.