Skip to content

Commit

Permalink
(OraklNode) Basic verification (#1352)
Browse files Browse the repository at this point in the history
* wip

* feat: reporter functionality to report with proofs

* fix: update signature generation accordingly

* fix: update based on feedback

* test: update submission script to test sigs

* fix: rename migration files, minor updates

* feat: remove unused function

* test: update chain test

* fix: update migration code, update tests

* fix: update dummy submission contract

* fix: fix syntax err

* fix: trim prefix on loading PK from string

* fix: update based on feedback

* fix: updates based on feedbacks

* fix: update based on feedbacks

* feat: concat proof from aggregator before storing
  • Loading branch information
nick-bisonai authored Apr 10, 2024
1 parent e8ca713 commit 2694dc3
Show file tree
Hide file tree
Showing 15 changed files with 763 additions and 126 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/node.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,4 @@ jobs:
ETH_PROVIDER_URL: "https://ethereum-sepolia-rpc.allthatnode.com"
ETH_REPORTER_PK: ${{ secrets.TEST_DELEGATOR_REPORTER_PK}}
TEST_FEE_PAYER_PK: ${{ secrets.DELEGATOR_FEEPAYER_PK}}
SUBMISSION_PROXY_CONTRACT: "0x8B5B98ABdc0281D5cf8bD93FE82768b22FD11623"
SUBMISSION_PROXY_CONTRACT: "0x63Fc20a60438adD9B10F94E218c16561a7827eB4"
1 change: 1 addition & 0 deletions node/migrations/000013_proofs.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS proofs;
6 changes: 6 additions & 0 deletions node/migrations/000013_proofs.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS proofs (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
round INT8 NOT NULL,
proof BYTEA
)
159 changes: 92 additions & 67 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

"time"

"bisonai.com/orakl/node/pkg/chain/helper"
"bisonai.com/orakl/node/pkg/db"
"bisonai.com/orakl/node/pkg/raft"
"bisonai.com/orakl/node/pkg/utils/calculator"
Expand All @@ -30,11 +30,19 @@ func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string) (*Aggrega
return nil, err
}

signHelper, err := helper.NewSignHelper("")
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to create sign helper")
return nil, err
}

aggregator := Aggregator{
Raft: raft.NewRaftNode(h, ps, topic, 100, LEADER_TIMEOUT),
CollectedPrices: map[int64][]int64{},
CollectedProofs: map[int64][][]byte{},
AggregatorMutex: sync.Mutex{},
RoundID: 0,
SignHelper: signHelper,
}
aggregator.Raft.LeaderJob = aggregator.LeaderJob
aggregator.Raft.HandleCustomMessage = aggregator.HandleCustomMessage
Expand All @@ -54,24 +62,7 @@ func (n *Aggregator) Run(ctx context.Context) {
func (n *Aggregator) LeaderJob() error {
n.RoundID++
n.Raft.IncreaseTerm()
roundMessage := RoundSyncMessage{
LeaderID: n.Raft.Host.ID().String(),
RoundID: n.RoundID,
}

marshalledRoundMessage, err := json.Marshal(roundMessage)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to marshal round message")
return err
}

message := raft.Message{
Type: RoundSync,
SentFrom: n.Raft.Host.ID().String(),
Data: json.RawMessage(marshalledRoundMessage),
}

return n.Raft.PublishMessage(message)
return n.PublishRoundMessage(n.RoundID)
}

func (n *Aggregator) HandleCustomMessage(message raft.Message) error {
Expand All @@ -80,6 +71,8 @@ func (n *Aggregator) HandleCustomMessage(message raft.Message) error {
return n.HandleRoundSyncMessage(message)
case PriceData:
return n.HandlePriceDataMessage(message)
case ProofMsg:
return n.HandleProofMessage(message)
default:
return fmt.Errorf("unknown message type received in HandleCustomMessage: %v", message.Type)
}
Expand All @@ -100,30 +93,14 @@ func (n *Aggregator) HandleRoundSyncMessage(msg raft.Message) error {
n.RoundID = roundSyncMessage.RoundID
}
var updateValue int64 = -1
value, updateTime, err := n.getLatestLocalAggregate(n.nodeCtx)
value, updateTime, err := GetLatestLocalAggregate(n.nodeCtx, n.Name)

if err == nil && n.LastLocalAggregateTime.IsZero() || !n.LastLocalAggregateTime.Equal(updateTime) {
updateValue = value
n.LastLocalAggregateTime = updateTime
}

priceDataMessage := PriceDataMessage{
RoundID: n.RoundID,
PriceData: updateValue,
}

marshalledPriceDataMessage, err := json.Marshal(priceDataMessage)
if err != nil {
return err
}

message := raft.Message{
Type: PriceData,
SentFrom: n.Raft.Host.ID().String(),
Data: json.RawMessage(marshalledPriceDataMessage),
}

return n.Raft.PublishMessage(message)
return n.PublishPriceDataMessage(n.RoundID, updateValue)
}

func (n *Aggregator) HandlePriceDataMessage(msg raft.Message) error {
Expand Down Expand Up @@ -154,25 +131,49 @@ func (n *Aggregator) HandlePriceDataMessage(msg raft.Message) error {
return err
}
log.Debug().Str("Player", "Aggregator").Int64("roundId", priceDataMessage.RoundID).Int64("global_aggregate", median).Msg("global aggregated")
err = n.insertGlobalAggregate(median, priceDataMessage.RoundID)
err = InsertGlobalAggregate(n.nodeCtx, n.Name, median, priceDataMessage.RoundID)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to insert global aggregate")
return err
}

proof, err := n.SignHelper.MakeGlobalAggregateProof(median)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to make global aggregate proof")
return err
}
return n.PublishProofMessage(priceDataMessage.RoundID, proof)
}
return nil
}

func (n *Aggregator) getLatestLocalAggregate(ctx context.Context) (int64, time.Time, error) {
redisAggregate, err := GetLatestLocalAggregateFromRdb(ctx, n.Name)
func (n *Aggregator) HandleProofMessage(msg raft.Message) error {
var proofMessage ProofMessage
err := json.Unmarshal(msg.Data, &proofMessage)
if err != nil {
pgsqlAggregate, err := GetLatestLocalAggregateFromPgs(ctx, n.Name)
return err
}

if proofMessage.RoundID == 0 {
return fmt.Errorf("invalid proof message: %v", proofMessage)
}

n.AggregatorMutex.Lock()
defer n.AggregatorMutex.Unlock()
if _, ok := n.CollectedProofs[proofMessage.RoundID]; !ok {
n.CollectedProofs[proofMessage.RoundID] = [][]byte{}
}

n.CollectedProofs[proofMessage.RoundID] = append(n.CollectedProofs[proofMessage.RoundID], proofMessage.Proof)
if len(n.CollectedProofs[proofMessage.RoundID]) >= n.Raft.SubscribersCount()+1 {
defer delete(n.CollectedProofs, proofMessage.RoundID)
err := InsertProof(n.nodeCtx, n.Name, proofMessage.RoundID, n.CollectedProofs[proofMessage.RoundID])
if err != nil {
return 0, time.Time{}, err
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to insert proof")
return err
}
return pgsqlAggregate.Value, pgsqlAggregate.Timestamp, nil
}
return redisAggregate.Value, redisAggregate.Timestamp, nil
return nil
}

func (n *Aggregator) getLatestRoundId(ctx context.Context) (int64, error) {
Expand All @@ -183,43 +184,67 @@ func (n *Aggregator) getLatestRoundId(ctx context.Context) (int64, error) {
return result.Round, nil
}

func (n *Aggregator) insertGlobalAggregate(value int64, round int64) error {
var errs []string
func (n *Aggregator) executeDeviation() error {
// signals for deviation job which triggers immediate aggregation and sends submission request to submitter
return nil
}

err := n.insertPgsql(n.nodeCtx, value, round)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to insert global aggregate into pgsql")
errs = append(errs, err.Error())
func (n *Aggregator) PublishRoundMessage(roundId int64) error {
roundMessage := RoundSyncMessage{
LeaderID: n.Raft.GetHostId(),
RoundID: roundId,
}

err = n.insertRdb(n.nodeCtx, value, round)
marshalledRoundMessage, err := json.Marshal(roundMessage)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to insert global aggregate into rdb")
errs = append(errs, err.Error())
return err
}

if len(errs) > 0 {
return fmt.Errorf(strings.Join(errs, "; "))
message := raft.Message{
Type: RoundSync,
SentFrom: n.Raft.GetHostId(),
Data: json.RawMessage(marshalledRoundMessage),
}

return nil
return n.Raft.PublishMessage(message)
}

func (n *Aggregator) insertPgsql(ctx context.Context, value int64, round int64) error {
return db.QueryWithoutResult(ctx, InsertGlobalAggregateQuery, map[string]any{"name": n.Name, "value": value, "round": round})
}
func (n *Aggregator) PublishPriceDataMessage(roundId int64, value int64) error {
priceDataMessage := PriceDataMessage{
RoundID: roundId,
PriceData: value,
}

func (n *Aggregator) insertRdb(ctx context.Context, value int64, round int64) error {
key := "globalAggregate:" + n.Name
data, err := json.Marshal(globalAggregate{Name: n.Name, Value: value, Round: round, Timestamp: time.Now()})
marshalledPriceDataMessage, err := json.Marshal(priceDataMessage)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to marshal global aggregate")
return err
}
return db.Set(ctx, key, string(data), time.Duration(5*time.Minute))

message := raft.Message{
Type: PriceData,
SentFrom: n.Raft.GetHostId(),
Data: json.RawMessage(marshalledPriceDataMessage),
}

return n.Raft.PublishMessage(message)
}

func (n *Aggregator) executeDeviation() error {
// signals for deviation job which triggers immediate aggregation and sends submission request to submitter
return nil
func (n *Aggregator) PublishProofMessage(roundId int64, proof []byte) error {
proofMessage := ProofMessage{
RoundID: roundId,
Proof: proof,
}

marshalledProofMessage, err := json.Marshal(proofMessage)
if err != nil {
return err
}

message := raft.Message{
Type: ProofMsg,
SentFrom: n.Raft.GetHostId(),
Data: json.RawMessage(marshalledProofMessage),
}

return n.Raft.PublishMessage(message)
}
59 changes: 55 additions & 4 deletions node/pkg/aggregator/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"testing"
"time"

"bisonai.com/orakl/node/pkg/db"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -76,7 +77,7 @@ func TestGetLatestLocalAggregate(t *testing.T) {

node.Name = "test_pair"

val, dbTime, err := node.getLatestLocalAggregate(ctx)
val, dbTime, err := GetLatestLocalAggregate(ctx, node.Name)
if err != nil {
t.Fatal("error getting latest local aggregate")
}
Expand Down Expand Up @@ -136,7 +137,7 @@ func TestInsertGlobalAggregate(t *testing.T) {

node.Name = "test_pair"

err = node.insertGlobalAggregate(20, 2)
err = InsertGlobalAggregate(ctx, node.Name, 20, 2)
if err != nil {
t.Fatal("error inserting global aggregate")
}
Expand All @@ -146,12 +147,62 @@ func TestInsertGlobalAggregate(t *testing.T) {
t.Fatal("error getting latest round id")
}

redisResult, err := GetLatestGlobalAggregateFromRdb(ctx, "test_pair")
redisResult, err := getLatestGlobalAggregateFromRdb(ctx, "test_pair")
if err != nil {
t.Fatal("error getting latest global aggregate from rdb")
}
assert.Equal(t, int64(20), redisResult.Value)
assert.Equal(t, int64(2), redisResult.Round)

assert.Equal(t, int64(2), roundId)
}

func TestInsertProof(t *testing.T) {
ctx := context.Background()
cleanup, testItems, err := setup(ctx)
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer func() {
if cleanupErr := cleanup(); cleanupErr != nil {
t.Logf("Cleanup failed: %v", cleanupErr)
}
}()

node, err := NewAggregator(testItems.app.Host, testItems.app.Pubsub, testItems.topicString)
if err != nil {
t.Fatal("error creating new node")
}

node.Name = "test_pair"

value := int64(20)
round := int64(2)
p, err := node.SignHelper.MakeGlobalAggregateProof(value)
if err != nil {
t.Fatal("error making global aggregate proof")
}

err = InsertProof(ctx, node.Name, round, [][]byte{p, p})
if err != nil {
t.Fatal("error inserting proof")
}

rdbResult, err := getProofFromRdb(ctx, node.Name, round)
if err != nil {
t.Fatal("error getting proof from rdb")
}

assert.EqualValues(t, concatBytes([][]byte{p, p}), rdbResult.Proof)

pgsqlResult, err := getProofFromPgsql(ctx, node.Name, round)
if err != nil {
t.Fatal("error getting proof from pgsql:" + err.Error())
}

assert.EqualValues(t, concatBytes([][]byte{p, p}), pgsqlResult.Proof)

err = db.QueryWithoutResult(ctx, "DELETE FROM proofs", nil)
if err != nil {
t.Fatal("error deleting proofs")
}
}
Loading

0 comments on commit 2694dc3

Please sign in to comment.