Skip to content

Commit

Permalink
(OraklNode) Track peers manually (#1572)
Browse files Browse the repository at this point in the history
* feat: implement peer count into raft pkg

* fix: remove unused property

* test: add cases to test based on feedback

* (OraklNode) Default to udp - quic (#1573)

* feat: default to udp - quic

* test: prevent possible duplicate port err

* test: fix port error

* test: fix test err

* fix: remove listen port, add missing `HOST_IP` reference

* fix: rollback listen port

* fix: update based on feedback
  • Loading branch information
nick-bisonai authored Jun 10, 2024
1 parent 0ced283 commit c4c8298
Show file tree
Hide file tree
Showing 14 changed files with 171 additions and 40 deletions.
1 change: 0 additions & 1 deletion node/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ FEED_DATA_STREAM_INTERVAL=
# (optional) designate external ip to be used if required
HOST_IP=


# `baobab` or `cypress`, defaults to baobab
CHAIN=

Expand Down
3 changes: 1 addition & 2 deletions node/cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func main() {

listenPort, err := strconv.Atoi(os.Getenv("LISTEN_PORT"))
if err != nil {
log.Error().Err(err).Msg("Error parsing LISTEN_PORT")
return
log.Warn().Msg("LISTEN_PORT missing, using random port for libp2p")
}

host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(listenPort))
Expand Down
5 changes: 3 additions & 2 deletions node/pkg/aggregator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
}
testItems.admin = admin

h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001))
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -159,7 +159,8 @@ func aggregatorCleanup(ctx context.Context, admin *fiber.App, app *App) func() e
if err != nil {
return err
}
return nil

return app.Host.Close()
}

}
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/boot/tests/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func TestSync(t *testing.T) {
}
defer cleanup()

mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic())
mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
t.Fatalf("error making host: %v", err)
}

mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic())
mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
t.Fatalf("error making host: %v", err)
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestRefresh(t *testing.T) {
}
defer cleanup()

h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic(), libp2pSetup.WithPort(10010))
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10010))
if err != nil {
t.Fatalf("error making host: %v", err)
}
Expand Down
9 changes: 9 additions & 0 deletions node/pkg/libp2p/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ func ConnectThroughBootApi(ctx context.Context, h host.Host) error {
return err
}

externalIp := os.Getenv("HOST_IP")
if externalIp != "" {
url, err = utils.ReplaceIpFromUrl(url, externalIp)
if err != nil {
log.Error().Err(err).Msg("failed to replace ip")
return err
}
}

apiEndpoint := os.Getenv("BOOT_API_URL")
if apiEndpoint == "" {
log.Info().Msg("boot api endpoint not set, using default url: http://localhost:8089")
Expand Down
27 changes: 8 additions & 19 deletions node/pkg/libp2p/setup/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"crypto/rand"
"crypto/sha256"
"fmt"
"os"
"strconv"

"bisonai.com/orakl/node/pkg/secrets"

Expand All @@ -23,7 +21,7 @@ type HostConfig struct {
PrivateKey crypto.PrivKey
SecretString string
HolePunch bool
Quic bool
Tcp bool
}

type HostOption func(*HostConfig)
Expand Down Expand Up @@ -52,28 +50,19 @@ func WithHolePunch() HostOption {
}
}

func WithQuic() HostOption {
func WithTcp() HostOption {
return func(hc *HostConfig) {
hc.Quic = true
hc.Tcp = true
}
}

func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) {
defaultPort := 0
defaultPortStr := os.Getenv("LISTEN_PORT")
if defaultPortStr != "" {
tmp, err := strconv.Atoi(defaultPortStr)
if err == nil {
defaultPort = tmp
}
}

config := &HostConfig{
Port: defaultPort,
Port: 0,
PrivateKey: nil,
SecretString: secrets.GetSecret("PRIVATE_NETWORK_SECRET"),
HolePunch: false,
Quic: false,
Tcp: false,
}
for _, opt := range opts {
opt(config)
Expand All @@ -88,10 +77,10 @@ func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) {
}

listenStr := ""
if config.Quic {
listenStr = fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.Port)
} else {
if config.Tcp {
listenStr = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", config.Port)
} else {
listenStr = fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.Port)
}

libp2pOpts := []libp2p.Option{
Expand Down
28 changes: 25 additions & 3 deletions node/pkg/libp2p/tests/libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
)

func TestMakeHost(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001))
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Errorf("Failed to make host: %v", err)
}
defer h.Close()
}

func TestMakePubsub(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001))
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Fatalf("Failed to make host: %v", err)
}
Expand All @@ -31,7 +31,7 @@ func TestMakePubsub(t *testing.T) {
}

func TestGetHostAddress(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001))
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Fatalf("Failed to make host: %v", err)
}
Expand All @@ -41,3 +41,25 @@ func TestGetHostAddress(t *testing.T) {
t.Errorf("Failed to get host address: %v", err)
}
}

func TestReplaceIp(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Fatalf("Failed to make host: %v", err)
}
defer h.Close()

url, err := utils.ExtractConnectionUrl(h)
if err != nil {
t.Fatalf("Failed to extract connection url: %v", err)
}

result, err := utils.ReplaceIpFromUrl(url, "127.0.0.1")
if err != nil {
t.Errorf("Failed to replace ip: %v", err)
}

if url == result {
t.Errorf("Failed to replace ip: %v", err)
}
}
9 changes: 9 additions & 0 deletions node/pkg/libp2p/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,12 @@ func ConnectionUrl2AddrInfo(url string) (*peer.AddrInfo, error) {

return info, nil
}

func ReplaceIpFromUrl(url string, ip string) (string, error) {
parts := strings.Split(url, "/")
if len(parts) < 5 || parts[1] != "ip4" {
return "", fmt.Errorf("invalid URL format")
}
parts[2] = ip
return strings.Join(parts, "/"), nil
}
8 changes: 1 addition & 7 deletions node/pkg/raft/accessors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package raft

import "github.com/libp2p/go-libp2p/core/peer"

func (r *Raft) IncreaseTerm() {
r.Mutex.Lock()
defer r.Mutex.Unlock()
Expand Down Expand Up @@ -75,11 +73,7 @@ func (r *Raft) UpdateVotedFor(votedFor string) {
}

func (r *Raft) SubscribersCount() int {
return len(r.Subscribers())
}

func (r *Raft) Subscribers() []peer.ID {
return r.Ps.ListPeers(r.Topic.String())
return r.Peers.Size()
}

func (r *Raft) GetHostId() string {
Expand Down
41 changes: 40 additions & 1 deletion node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog/log"

errorSentinel "bisonai.com/orakl/node/pkg/error"
"bisonai.com/orakl/node/pkg/utils/set"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ func NewRaftNode(
HeartbeatTimeout: HEARTBEAT_TIMEOUT,

LeaderJobTimeout: leaderJobTimeout,

PrevPeers: *set.NewSet[string](),
Peers: *set.NewSet[string](),
}
return r
}
Expand Down Expand Up @@ -104,12 +108,17 @@ func (r *Raft) handleMessage(ctx context.Context, msg Message) error {
return r.handleRequestVote(msg)
case ReplyVote:
return r.handleReplyVote(ctx, msg)
case ReplyHeartbeat:
return r.handleReplyHeartbeat(msg)
default:
return r.HandleCustomMessage(ctx, msg)
}
}

func (r *Raft) handleHeartbeat(msg Message) error {
r.Peers = r.PrevPeers
r.PrevPeers = *set.NewSet[string]()

if msg.SentFrom == r.GetHostId() {
return nil
}
Expand Down Expand Up @@ -151,7 +160,7 @@ func (r *Raft) handleHeartbeat(msg Message) error {
r.UpdateLeader(heartbeatMessage.LeaderID)
}

return nil
return r.sendReplyHeartbeat()
}

func (r *Raft) handleRequestVote(msg Message) error {
Expand Down Expand Up @@ -216,6 +225,17 @@ func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error {
return nil
}

func (r *Raft) handleReplyHeartbeat(msg Message) error {
var replyHeartbeatMessage ReplyHeartbeatMessage
err := json.Unmarshal(msg.Data, &replyHeartbeatMessage)
if err != nil {
return err
}

r.PrevPeers.Add(msg.SentFrom)
return nil
}

// publishing messages

func (r *Raft) PublishMessage(msg Message) error {
Expand All @@ -227,6 +247,7 @@ func (r *Raft) PublishMessage(msg Message) error {
}

func (r *Raft) sendHeartbeat() error {

heartbeatMessage := HeartbeatMessage{
LeaderID: r.GetHostId(),
Term: r.GetCurrentTerm(),
Expand All @@ -250,6 +271,24 @@ func (r *Raft) sendHeartbeat() error {
return nil
}

func (r *Raft) sendReplyHeartbeat() error {
replyHeartbeatMessage := ReplyHeartbeatMessage{}
marshalledReplyHeartbeatMsg, err := json.Marshal(replyHeartbeatMessage)
if err != nil {
return err
}
message := Message{
Type: ReplyHeartbeat,
SentFrom: r.GetHostId(),
Data: json.RawMessage(marshalledReplyHeartbeatMsg),
}
err = r.PublishMessage(message)
if err != nil {
return err
}
return nil
}

func (r *Raft) sendReplyVote(to string, voteGranted bool) error {
replyVoteMessage := ReplyRequestVoteMessage{
VoteGranted: voteGranted,
Expand Down
7 changes: 7 additions & 0 deletions node/pkg/raft/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"bisonai.com/orakl/node/pkg/utils/set"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
Expand All @@ -20,6 +21,7 @@ const (
ReplyVote MessageType = "replyVote"
AppendEntries MessageType = "appendEntries"
ReplyAppendEntries MessageType = "replyAppendEntries"
ReplyHeartbeat MessageType = "replyHeartbeat"

Leader RoleType = "leader"
Candidate RoleType = "candidate"
Expand All @@ -41,6 +43,8 @@ type HeartbeatMessage struct {
Term int `json:"term"`
}

type ReplyHeartbeatMessage struct{}

type ReplyRequestVoteMessage struct {
VoteGranted bool `json:"voteGranted"`
LeaderID string `json:"leaderID"`
Expand Down Expand Up @@ -68,4 +72,7 @@ type Raft struct {
LeaderJobTicker *time.Ticker
HandleCustomMessage func(context.Context, Message) error
LeaderJob func() error

PrevPeers set.Set[string]
Peers set.Set[string]
}
4 changes: 2 additions & 2 deletions node/pkg/reporter/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {

testItems.admin = admin

h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001))
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func reporterCleanup(ctx context.Context, admin *fiber.App, app *App) func() err
if err != nil {
return err
}
return nil
return app.Host.Close()
}
}

Expand Down
Loading

0 comments on commit c4c8298

Please sign in to comment.