Skip to content

Commit

Permalink
feat: sync round based on max suggested id
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Feb 1, 2024
1 parent 953fa72 commit 816d0d2
Show file tree
Hide file tree
Showing 2 changed files with 244 additions and 75 deletions.
287 changes: 213 additions & 74 deletions fetcher-v2/utils/fetcher-node.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,37 @@ import (
"github.com/libp2p/go-libp2p/core/host"
)

type PubSubComponents struct {
Ps *pubsub.PubSub
Topic *pubsub.Topic
Sub *pubsub.Subscription
}

type NodeData struct {
NextRound string
SuggestedRounds []string
Prices map[string][]int
Mutex sync.Mutex
}

type FetcherNode struct {
Host host.Host
Ps *pubsub.PubSub
Topic *pubsub.Topic
Sub *pubsub.Subscription
Data map[string][]int
NodeMutex sync.Mutex
Cancel context.CancelFunc
Host host.Host
PubSub PubSubComponents
Data NodeData
NextRoundReady chan bool
Cancel context.CancelFunc
}

type Message struct {
Type string `json:"type"`
Data json.RawMessage `json:"data"`
}

type SampleData struct {
type RoundData struct {
Suggestion string `json:"suggestion"`
}

type PriceData struct {
Number int `json:"number"`
ID string `json:"id"`
}
Expand All @@ -41,16 +61,16 @@ func NewNode(host host.Host, ps *pubsub.PubSub, topicString string) (*FetcherNod
}

return &FetcherNode{
Host: host,
Ps: ps,
Topic: topic,
Sub: sub,
Data: make(map[string][]int),
Host: host,
PubSub: PubSubComponents{Ps: ps, Topic: topic, Sub: sub},
Data: NodeData{NextRound: "", SuggestedRounds: []string{}, Prices: make(map[string][]int), Mutex: sync.Mutex{}},
NextRoundReady: make(chan bool, 1),
Cancel: nil,
}, nil
}

func (n *FetcherNode) nodeReadiness(rt pubsub.PubSubRouter, topic string) (bool, error) {
if rt.EnoughPeers(n.Topic.String(), 1) && topic == n.Topic.String() {
if rt.EnoughPeers(n.PubSub.Topic.String(), 1) && topic == n.PubSub.Topic.String() {
return true, nil
}
return false, fmt.Errorf("not enough peers ready")
Expand All @@ -71,108 +91,220 @@ func (n *FetcherNode) Stop() {
}
}

func (n *FetcherNode) publish(ctx context.Context, interval time.Duration) {
func (n *FetcherNode) suggestRound(ctx context.Context, t time.Time, interval time.Duration) (string, error) {
hostId := n.Host.ID().String()
suggestingId := GetIDFromTimestamp(int64(interval.Seconds()), t)

roundData := RoundData{
Suggestion: suggestingId,
}

marshalledRoundData, err := json.Marshal(roundData)
if err != nil {
return "", err
}
sendRoundMessage := Message{
Type: "round",
Data: json.RawMessage(marshalledRoundData),
}

roundDataBytes, err := json.Marshal(sendRoundMessage)
if err != nil {
return "", err
}
err = n.PubSub.Topic.Publish(ctx, roundDataBytes, pubsub.WithReadiness(n.nodeReadiness))
if err != nil {
return "", err
}
log.Printf("(%s) suggested round: %s\n", hostId[len(hostId)-4:], suggestingId)

return suggestingId, nil
}

func (n *FetcherNode) publishPrice(ctx context.Context, nextRound string) error {
hostId := n.Host.ID().String()
num := RandomNumberGenerator()
priceData := PriceData{
Number: num,
ID: nextRound,
}

marshalledPriceData, err := json.Marshal(priceData)
if err != nil {
return err
}

sendMessage := Message{
Type: "price",
Data: json.RawMessage(marshalledPriceData),
}

dataBytes, err := json.Marshal(sendMessage)
if err != nil {
return err
}

err = n.PubSub.Topic.Publish(ctx, dataBytes, pubsub.WithReadiness(n.nodeReadiness))

if err != nil {
return err
}

log.Printf("(%s) published %s:%d\n", hostId[len(hostId)-4:], nextRound, num)

return nil
}

func (n *FetcherNode) publish(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:

start := time.Now()
num := RandomNumberGenerator()
id := GetIDFromTimestamp(int64(interval.Seconds()), t)

data := SampleData{
Number: num,
ID: id,
}

n.NodeMutex.Lock()
n.Data[id] = append(n.Data[id], num)
n.NodeMutex.Unlock()

dataBytes, err := json.Marshal(data)
_, err := n.suggestRound(ctx, t, interval)
if err != nil {
log.Println("json marshal failed:" + err.Error())
log.Println("suggest round failed:" + err.Error())
continue
}

err = n.Topic.Publish(ctx, dataBytes, pubsub.WithReadiness(n.nodeReadiness))
// err = n.Topic.Publish(ctx, dataBytes)
// wait until next round id is determined
n.determineNextRoundID()
n.Data.Mutex.Lock()
nextRound := n.Data.NextRound
n.Data.Mutex.Unlock()

err = n.publishPrice(ctx, nextRound)
if err != nil {
log.Println("publish failed:" + err.Error())
log.Println("publish price failed:" + err.Error())
continue
}
hostId := n.Host.ID().String()
log.Printf("(%s) published %s:%d\n", hostId[len(hostId)-4:], data.ID, data.Number)

executeAtEndOfInterval(start, interval, func() {
err := n.calculateAverage(id)
if err != nil {
log.Println(err.Error())
}
})
case <-ctx.Done():
log.Println("stoppping publish")
log.Println("stopping publish")
return
}
}
}

func (n *FetcherNode) handlePriceMessage(ctx context.Context, message Message) error {
priceData, err := n.unmarshalPrice(message.Data)
if err != nil {
return err
}
log.Printf("(%s) Received %s:%d\n", n.PubSub.Topic.String(), priceData.ID, priceData.Number)
n.Data.Mutex.Lock()
defer n.Data.Mutex.Unlock()
n.Data.Prices[priceData.ID] = append(n.Data.Prices[priceData.ID], priceData.Number)

if len(n.Data.Prices[priceData.ID]) > n.getSubscribersCount() {
log.Println("calculating average for:" + priceData.ID)
err := n.calculateAverage(priceData.ID)
if err != nil {
return err
}
}
return nil
}

func (n *FetcherNode) handleRoundMessage(ctx context.Context, message Message) error {
roundData, err := n.unmarshalRound(message.Data)
if err != nil {
return err
}
n.Data.Mutex.Lock()
defer n.Data.Mutex.Unlock()
n.Data.SuggestedRounds = append(n.Data.SuggestedRounds, roundData.Suggestion)
if len(n.Data.SuggestedRounds) > n.getSubscribersCount() {
n.Data.NextRound, err = getMaxFromStringSlice(n.Data.SuggestedRounds)
if err != nil {
return err
}
log.Println("Next round:", n.Data.NextRound)
n.Data.SuggestedRounds = []string{}
n.NextRoundReady <- true
}
return nil
}

func (n *FetcherNode) subscribe(ctx context.Context) {
for {
select {
case <-ctx.Done():
log.Println("stopping subscribe")
return
default:
m, err := n.Sub.Next(ctx)
rawMessage, err := n.PubSub.Sub.Next(ctx)
if err != nil {
log.Println("message recieve failed:" + err.Error())
continue
}

if m.ReceivedFrom == n.Host.ID() {
// log.Println("Received message from self")
log.Println("message receive failed:" + err.Error())
continue
}

recievedFrom := m.ReceivedFrom

var data SampleData
err = json.Unmarshal(m.Data, &data)
message, err := n.unmarshalMessage(rawMessage.Data)
if err != nil {
log.Println("json unmarshal failed" + err.Error())
log.Println("unexpected message:" + err.Error())
continue
}
log.Printf("(%s) Received %s:%d from %s\n", n.Topic.String(), data.ID, data.Number, recievedFrom[len(recievedFrom)-4:])

n.NodeMutex.Lock()
n.Data[data.ID] = append(n.Data[data.ID], data.Number)
n.NodeMutex.Unlock()
switch message.Type {
case "price":
err := n.handlePriceMessage(ctx, message)
if err != nil {
log.Println("handle price message failed:" + err.Error())
}
case "round":
err := n.handleRoundMessage(ctx, message)
if err != nil {
log.Println("handle round message failed:" + err.Error())
}
default:
log.Println("unexpected message type")
}
}
}
}

func (n *FetcherNode) unmarshalMessage(data []byte) (Message, error) {
var m Message
err := json.Unmarshal(data, &m)
if err != nil {
return Message{}, err
}
return m, nil
}

func (n *FetcherNode) unmarshalPrice(data json.RawMessage) (PriceData, error) {
var p PriceData
err := json.Unmarshal(data, &p)
if err != nil {
return PriceData{}, err
}
return p, nil
}

func (n *FetcherNode) unmarshalRound(data json.RawMessage) (RoundData, error) {
var r RoundData
err := json.Unmarshal(data, &r)
if err != nil {
return RoundData{}, err
}
return r, nil
}

func (n *FetcherNode) calculateAverage(id string) error {
n.NodeMutex.Lock()
defer n.NodeMutex.Unlock()
if len(n.Data) == 0 {
if len(n.Data.Prices) == 0 {
return fmt.Errorf("no data to calculate average")
}

sum := 0
for _, num := range n.Data[id] {
for _, num := range n.Data.Prices[id] {
sum += num
}

result := sum / len(n.Data[id])
if len(n.Data[id]) > 1 {
fmt.Printf("topic: %s, id: %s, average:%d\n", n.Topic.String(), id, result)
result := sum / len(n.Data.Prices[id])
if len(n.Data.Prices[id]) > 1 {
fmt.Printf("topic: %s, id: %s, average:%d\n", n.PubSub.Topic.String(), id, result)
}

delete(n.Data, id)
delete(n.Data.Prices, id)
return nil
}

Expand All @@ -182,18 +314,25 @@ func (n *FetcherNode) getSubscribersCount() int {
}

func (n *FetcherNode) subscribers() []peer.ID {
return n.Ps.ListPeers(n.Topic.String())
return n.PubSub.Ps.ListPeers(n.PubSub.Topic.String())
}

func (n *FetcherNode) PrintInfo() {
n.NodeMutex.Lock()
defer n.NodeMutex.Unlock()
n.Data.Mutex.Lock()
defer n.Data.Mutex.Unlock()
fmt.Println("Node Info:")
fmt.Println("Host ID:", n.Host.ID())
fmt.Println("Topic:", n.Topic.String())
fmt.Println("Data:", n.Data)
fmt.Println("Topic:", n.PubSub.Topic.String())
fmt.Println("Prices:", n.Data.Prices)
}

func (n *FetcherNode) String() string {
return fmt.Sprintf("Host ID: %s, Topic: %s, Data: %v, Subscribers: %d", n.Host.ID(), n.Topic.String(), n.Data, n.getSubscribersCount())
return fmt.Sprintf("Host ID: %s, Topic: %s, Prices: %v, Subscribers: %d", n.Host.ID(), n.PubSub.Topic.String(), n.Data.Prices, n.getSubscribersCount())
}

func (n *FetcherNode) determineNextRoundID() {
for len(n.NextRoundReady) > 0 { // Drain the channel
<-n.NextRoundReady
}
<-n.NextRoundReady
}
Loading

0 comments on commit 816d0d2

Please sign in to comment.