Skip to content

Commit

Permalink
feat: submission data sharing between nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Apr 12, 2024
1 parent 346c7ec commit 7f119a3
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 35 deletions.
20 changes: 10 additions & 10 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ func (n *Aggregator) LeaderJob() error {
return n.PublishRoundMessage(n.RoundID)
}

func (n *Aggregator) HandleCustomMessage(message raft.Message) error {
func (n *Aggregator) HandleCustomMessage(ctx context.Context, message raft.Message) error {
switch message.Type {
case RoundSync:
return n.HandleRoundSyncMessage(message)
return n.HandleRoundSyncMessage(ctx, message)
case PriceData:
return n.HandlePriceDataMessage(message)
return n.HandlePriceDataMessage(ctx, message)
case ProofMsg:
return n.HandleProofMessage(message)
return n.HandleProofMessage(ctx, message)
default:
return fmt.Errorf("unknown message type received in HandleCustomMessage: %v", message.Type)
}
}

func (n *Aggregator) HandleRoundSyncMessage(msg raft.Message) error {
func (n *Aggregator) HandleRoundSyncMessage(ctx context.Context, msg raft.Message) error {
var roundSyncMessage RoundSyncMessage
err := json.Unmarshal(msg.Data, &roundSyncMessage)
if err != nil {
Expand All @@ -93,7 +93,7 @@ func (n *Aggregator) HandleRoundSyncMessage(msg raft.Message) error {
n.RoundID = roundSyncMessage.RoundID
}
var updateValue int64 = -1
value, updateTime, err := GetLatestLocalAggregate(n.nodeCtx, n.Name)
value, updateTime, err := GetLatestLocalAggregate(ctx, n.Name)

if err == nil && n.LastLocalAggregateTime.IsZero() || !n.LastLocalAggregateTime.Equal(updateTime) {
updateValue = value
Expand All @@ -103,7 +103,7 @@ func (n *Aggregator) HandleRoundSyncMessage(msg raft.Message) error {
return n.PublishPriceDataMessage(n.RoundID, updateValue)
}

func (n *Aggregator) HandlePriceDataMessage(msg raft.Message) error {
func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Message) error {
var priceDataMessage PriceDataMessage
err := json.Unmarshal(msg.Data, &priceDataMessage)
if err != nil {
Expand Down Expand Up @@ -131,7 +131,7 @@ func (n *Aggregator) HandlePriceDataMessage(msg raft.Message) error {
return err
}
log.Debug().Str("Player", "Aggregator").Int64("roundId", priceDataMessage.RoundID).Int64("global_aggregate", median).Msg("global aggregated")
err = InsertGlobalAggregate(n.nodeCtx, n.Name, median, priceDataMessage.RoundID)
err = InsertGlobalAggregate(ctx, n.Name, median, priceDataMessage.RoundID)
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to insert global aggregate")
return err
Expand All @@ -147,7 +147,7 @@ func (n *Aggregator) HandlePriceDataMessage(msg raft.Message) error {
return nil
}

func (n *Aggregator) HandleProofMessage(msg raft.Message) error {
func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) error {
var proofMessage ProofMessage
err := json.Unmarshal(msg.Data, &proofMessage)
if err != nil {
Expand All @@ -167,7 +167,7 @@ func (n *Aggregator) HandleProofMessage(msg raft.Message) error {
n.CollectedProofs[proofMessage.RoundID] = append(n.CollectedProofs[proofMessage.RoundID], proofMessage.Proof)
if len(n.CollectedProofs[proofMessage.RoundID]) >= n.Raft.SubscribersCount()+1 {
defer delete(n.CollectedProofs, proofMessage.RoundID)
err := InsertProof(n.nodeCtx, n.Name, proofMessage.RoundID, n.CollectedProofs[proofMessage.RoundID])
err := InsertProof(ctx, n.Name, proofMessage.RoundID, n.CollectedProofs[proofMessage.RoundID])
if err != nil {
log.Error().Str("Player", "Aggregator").Err(err).Msg("failed to insert proof")
return err
Expand Down
2 changes: 1 addition & 1 deletion node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *Raft) handleMessage(ctx context.Context, msg Message) error {
case ReplyVote:
return r.handleReplyVote(ctx, msg)
default:
return r.HandleCustomMessage(msg)
return r.HandleCustomMessage(ctx, msg)
}
}

Expand Down
3 changes: 2 additions & 1 deletion node/pkg/raft/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"context"
"encoding/json"
"sync"
"time"
Expand Down Expand Up @@ -65,6 +66,6 @@ type Raft struct {

LeaderJobTimeout time.Duration
LeaderJobTicker *time.Ticker
HandleCustomMessage func(Message) error
HandleCustomMessage func(context.Context, Message) error
LeaderJob func() error
}
57 changes: 44 additions & 13 deletions node/pkg/reporter/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package reporter

import (
"context"
"encoding/json"
"errors"
"os"
"strconv"
Expand Down Expand Up @@ -125,9 +126,9 @@ func (r *Reporter) leaderJob() error {
return err
}

err = StoreLastSubmission(ctx, validAggregates)
err = r.PublishSubmissionMessage(validAggregates)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("storeLastSubmission")
log.Error().Str("Player", "Reporter").Err(err).Msg("PublishSubmissionMessage")
return err
}

Expand Down Expand Up @@ -166,9 +167,13 @@ func (r *Reporter) resignLeader() {
r.Raft.UpdateRole(raft.Follower)
}

func (r *Reporter) handleCustomMessage(msg raft.Message) error {
// TODO: implement message handling related to validation
return errors.New("unknown message type")
func (r *Reporter) handleCustomMessage(ctx context.Context, msg raft.Message) error {
switch msg.Type {
case SubmissionMsg:
return r.HandleSubmissionMessage(ctx, msg)
default:
return errors.New("unknown message type")
}
}

func (r *Reporter) reportWithoutProofs(ctx context.Context, aggregates []GlobalAggregate) error {
Expand Down Expand Up @@ -279,15 +284,9 @@ func (r *Reporter) deviationJob() error {
return err
}

err = StoreLastSubmission(ctx, deviatingAggregates)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("storeLastSubmission from deviation")
return err
}

err = StoreLastSubmission(ctx, deviatingAggregates)
err = r.PublishSubmissionMessage(deviatingAggregates)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("storeLastSubmission from deviation")
log.Error().Str("Player", "Reporter").Err(err).Msg("PublishSubmissionMessage")
return err
}

Expand All @@ -310,3 +309,35 @@ func (r *Reporter) deviationJob() error {

return nil
}

func (r *Reporter) PublishSubmissionMessage(submissions []GlobalAggregate) error {
submissionMessage := SubmissionMessage{Submissions: submissions}
marshalledSubmissionMessage, err := json.Marshal(submissionMessage)
if err != nil {
return err
}

message := raft.Message{
Type: SubmissionMsg,
SentFrom: r.Raft.GetHostId(),
Data: json.RawMessage(marshalledSubmissionMessage),
}

return r.Raft.PublishMessage(message)
}

func (r *Reporter) HandleSubmissionMessage(ctx context.Context, msg raft.Message) error {
var submissionMessage SubmissionMessage
err := json.Unmarshal(msg.Data, &submissionMessage)
if err != nil {
return err
}

err = StoreLastSubmission(ctx, submissionMessage.Submissions)
if err != nil {
log.Error().Str("Player", "Reporter").Err(err).Msg("storeLastSubmission")
return err
}

return nil
}
2 changes: 1 addition & 1 deletion node/pkg/reporter/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestHandleCustomMessage(t *testing.T) {
t.Fatalf("error getting reporter: %v", err)
}

err = reporter.handleCustomMessage(raft.Message{})
err = reporter.handleCustomMessage(ctx, raft.Message{})
assert.Equal(t, err.Error(), "unknown message type")
}

Expand Down
21 changes: 13 additions & 8 deletions node/pkg/reporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@ import (
)

const (
TOPIC_STRING = "orakl-offchain-aggregation-reporter"
MESSAGE_BUFFER = 100
DEVIATION_TIMEOUT = 5 * time.Second
INITIAL_FAILURE_TIMEOUT = 50 * time.Millisecond
MAX_RETRY = 3
MAX_RETRY_DELAY = 500 * time.Millisecond
SUBMIT_WITHOUT_PROOFS = "submit(address[] memory _feeds, int256[] memory _submissions)"
SUBMIT_WITH_PROOFS = "submit(address[] memory _feeds, int256[] memory _submissions, bytes[] memory _proofs)"
SubmissionMsg raft.MessageType = "submission"
TOPIC_STRING = "orakl-offchain-aggregation-reporter"
MESSAGE_BUFFER = 100
DEVIATION_TIMEOUT = 5 * time.Second
INITIAL_FAILURE_TIMEOUT = 50 * time.Millisecond
MAX_RETRY = 3
MAX_RETRY_DELAY = 500 * time.Millisecond
SUBMIT_WITHOUT_PROOFS = "submit(address[] memory _feeds, int256[] memory _submissions)"
SUBMIT_WITH_PROOFS = "submit(address[] memory _feeds, int256[] memory _submissions, bytes[] memory _proofs)"

GET_SUBMISSIONS_QUERY = `SELECT * FROM submission_addresses;`
DEVIATION_THRESHOLD = 0.05
Expand Down Expand Up @@ -81,6 +82,10 @@ type PgsqlProof struct {
Proof []byte `db:"proof" json:"proof"`
}

type SubmissionMessage struct {
Submissions []GlobalAggregate `json:"submissions"`
}

func makeGetLatestGlobalAggregatesQuery(names []string) string {
queryNames := make([]string, len(names))
for i, name := range names {
Expand Down
2 changes: 1 addition & 1 deletion node/script/test_raft/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func main() {
return nil
}

node.HandleCustomMessage = func(message raft.Message) error {
node.HandleCustomMessage = func(ctx context.Context, message raft.Message) error {
log.Debug().Msg("Custom message")
return errors.New("unknown message type")
}
Expand Down

0 comments on commit 7f119a3

Please sign in to comment.