Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(OraklNode) chore, fix alias declaration #2068

Merged
merged 3 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/chain/helper"
"bisonai.com/orakl/node/pkg/common/types"
"bisonai.com/orakl/node/pkg/db"

errorSentinel "bisonai.com/orakl/node/pkg/error"
Expand Down Expand Up @@ -352,7 +351,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
msg.Response <- bus.MessageResponse{Success: true}
case bus.STREAM_LOCAL_AGGREGATE:

localAggregate := msg.Content.Args["value"].(types.LocalAggregate)
localAggregate := msg.Content.Args["value"].(LocalAggregate)
log.Debug().Any("bus local aggregate", localAggregate).Msg("local aggregate received")
a.LatestLocalAggregates.Store(localAggregate.ConfigID, localAggregate)

Expand Down
5 changes: 2 additions & 3 deletions node/pkg/aggregator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"bisonai.com/orakl/node/pkg/admin/utils"
"bisonai.com/orakl/node/pkg/chain/helper"
"bisonai.com/orakl/node/pkg/common/keys"
"bisonai.com/orakl/node/pkg/common/types"

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/db"
Expand All @@ -30,7 +29,7 @@ const (

type TmpData struct {
config Config
rLocalAggregate types.LocalAggregate
rLocalAggregate LocalAggregate
pLocalAggregate LocalAggregate
globalAggregate GlobalAggregate
}
Expand Down Expand Up @@ -114,7 +113,7 @@ func insertSampleData(ctx context.Context, app *App, latestLocalAggMap *LatestLo
// return nil, err
// }

tmpLocalAggregate := types.LocalAggregate{ConfigID: tmpConfig.ID, Value: int64(10), Timestamp: localAggregateInsertTime}
tmpLocalAggregate := LocalAggregate{ConfigID: tmpConfig.ID, Value: int64(10), Timestamp: localAggregateInsertTime}
latestLocalAggMap.Store(tmpConfig.ID, tmpLocalAggregate)

tmpData.rLocalAggregate = tmpLocalAggregate
Expand Down
14 changes: 7 additions & 7 deletions node/pkg/aggregator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ const (
InsertProofQuery = `INSERT INTO proofs (config_id, round, proof) VALUES (@config_id, @round, @proof) RETURNING *`
)

type LocalAggregate types.LocalAggregate
type Proof types.Proof
type GlobalAggregate types.GlobalAggregate
type LocalAggregate = types.LocalAggregate
type Proof = types.Proof
type GlobalAggregate = types.GlobalAggregate

type SubmissionData struct {
GlobalAggregate GlobalAggregate
Expand Down Expand Up @@ -227,24 +227,24 @@ type TriggerMessage struct {
}

type LatestLocalAggregates struct {
LocalAggregateMap map[int32]types.LocalAggregate
LocalAggregateMap map[int32]LocalAggregate
mu sync.RWMutex
}

func NewLatestLocalAggregates() *LatestLocalAggregates {
return &LatestLocalAggregates{
LocalAggregateMap: map[int32]types.LocalAggregate{},
LocalAggregateMap: map[int32]LocalAggregate{},
}
}

func (a *LatestLocalAggregates) Load(id int32) (types.LocalAggregate, bool) {
func (a *LatestLocalAggregates) Load(id int32) (LocalAggregate, bool) {
a.mu.RLock()
defer a.mu.RUnlock()
result, ok := a.LocalAggregateMap[id]
return result, ok
}

func (a *LatestLocalAggregates) Store(id int32, aggregate types.LocalAggregate) {
func (a *LatestLocalAggregates) Store(id int32, aggregate LocalAggregate) {
a.mu.Lock()
defer a.mu.Unlock()
a.LocalAggregateMap[id] = aggregate
Expand Down
5 changes: 5 additions & 0 deletions node/pkg/common/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"encoding/json"
"fmt"
"time"
)

Expand All @@ -13,6 +14,10 @@ type Proxy struct {
Location *string `db:"location"`
}

func (proxy *Proxy) GetProxyUrl() string {
return fmt.Sprintf("%s://%s:%d", proxy.Protocol, proxy.Host, proxy.Port)
}

type Feed struct {
ID int32 `db:"id"`
Name string `db:"name"`
Expand Down
7 changes: 3 additions & 4 deletions node/pkg/dal/api/hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@ import (
"sync"
"time"

"bisonai.com/orakl/node/pkg/common/types"
"bisonai.com/orakl/node/pkg/dal/collector"
dalcommon "bisonai.com/orakl/node/pkg/dal/common"
"github.com/gofiber/contrib/websocket"
"github.com/rs/zerolog/log"
)

func HubSetup(ctx context.Context, configs []types.Config) *Hub {
configMap := make(map[string]types.Config)
func HubSetup(ctx context.Context, configs []Config) *Hub {
configMap := make(map[string]Config)
for _, config := range configs {
configMap[config.Name] = config
}
Expand All @@ -22,7 +21,7 @@ func HubSetup(ctx context.Context, configs []types.Config) *Hub {
return hub
}

func NewHub(configs map[string]types.Config) *Hub {
func NewHub(configs map[string]Config) *Hub {
return &Hub{
configs: configs,
clients: make(map[*ThreadSafeClient]map[string]bool),
Expand Down
5 changes: 3 additions & 2 deletions node/pkg/dal/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ import (

const MAX_CONNECTIONS = 10

type Config = types.Config

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
}

type Hub struct {
configs map[string]types.Config
configs map[string]Config
clients map[*ThreadSafeClient]map[string]bool
register chan *ThreadSafeClient
unregister chan *ThreadSafeClient
broadcast map[string]chan *dalcommon.OutgoingSubmissionData
connPerIP map[string][]*ThreadSafeClient
mu sync.RWMutex

}

type BulkResponse struct {
Expand Down
6 changes: 4 additions & 2 deletions node/pkg/dal/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"github.com/rs/zerolog/log"
)

type Config = types.Config

func Run(ctx context.Context) error {
log.Debug().Msg("Starting DAL API server")

Expand Down Expand Up @@ -65,6 +67,6 @@ func Run(ctx context.Context) error {
return app.Listen(":" + port)
}

func fetchConfigs(ctx context.Context, endpoint string) ([]types.Config, error) {
return request.Request[[]types.Config](request.WithEndpoint(endpoint + "/config"))
func fetchConfigs(ctx context.Context, endpoint string) ([]Config, error) {
return request.Request[[]Config](request.WithEndpoint(endpoint + "/config"))
}
4 changes: 3 additions & 1 deletion node/pkg/dal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ const (
OracleAdded = "OracleAdded(address oracle, uint256 expirationTime)"
)

type Config = types.Config

type Collector struct {
IncomingStream map[int32]chan *aggregator.SubmissionData
OutgoingStream map[int32]chan *dalcommon.OutgoingSubmissionData
Expand All @@ -44,7 +46,7 @@ type Collector struct {
mu sync.RWMutex
}

func NewCollector(ctx context.Context, configs []types.Config) (*Collector, error) {
func NewCollector(ctx context.Context, configs []Config) (*Collector, error) {
kaiaWebsocketUrl := os.Getenv("KAIA_WEBSOCKET_URL")
if kaiaWebsocketUrl == "" {
return nil, errors.New("KAIA_WEBSOCKET_URL is not set")
Expand Down
8 changes: 5 additions & 3 deletions node/pkg/dal/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (
"github.com/rs/zerolog/log"
)

type Config = types.Config

type TestItems struct {
App *fiber.App
Collector *collector.Collector
Controller *api.Hub
TmpConfig types.Config
TmpConfig Config
MockAdmin *httptest.Server
ApiKey string
}
Expand Down Expand Up @@ -88,15 +90,15 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
"submitInterval": 15000}]`))
}))

testItems.TmpConfig = types.Config{
testItems.TmpConfig = Config{
ID: 13,
Name: "test-aggregate",
FetchInterval: 15000,
AggregateInterval: 15000,
SubmitInterval: 15000,
}

configs := []types.Config{testItems.TmpConfig}
configs := []Config{testItems.TmpConfig}

keyCache := keycache.NewAPIKeyCache(1 * time.Hour)
keyCache.CleanupLoop(10 * time.Minute)
Expand Down
3 changes: 1 addition & 2 deletions node/pkg/fetcher/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/common/types"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -127,7 +126,7 @@ func calculateAggregatedPrice(valueWeightedAveragePrice, medianPrice float64) fl

func (c *Collector) streamLocalAggregate(ctx context.Context, aggregated float64) error {
if aggregated != 0 {
busLocalAggregate := types.LocalAggregate{
busLocalAggregate := LocalAggregate{
ConfigID: c.ID,
Value: int64(aggregated),
Timestamp: time.Now(),
Expand Down
9 changes: 4 additions & 5 deletions node/pkg/fetcher/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,17 @@ const (
DefaultMedianRatio = 0.05
)

type Feed types.Feed
type FeedData types.FeedData
type LocalAggregate types.LocalAggregate
type Feed = types.Feed
type FeedData = types.FeedData
type LocalAggregate = types.LocalAggregate
type Proxy = types.Proxy

type Config struct {
ID int32 `db:"id"`
Name string `db:"name"`
FetchInterval int32 `db:"fetch_interval"`
}

type Proxy types.Proxy

type Fetcher struct {
Config
Feeds []Feed
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/reporter/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ const (
MAX_INTERVAL = 3600
)

type GlobalAggregate = types.GlobalAggregate

type Config struct {
Name string `json:"name"`
SubmitInterval *int `json:"submitInterval"`
Expand Down Expand Up @@ -130,8 +132,6 @@ type Reporter struct {
Job func() error
}

type GlobalAggregate types.GlobalAggregate

type RawSubmissionData struct {
Symbol string `json:"symbol"`
Value string `json:"value"`
Expand Down
11 changes: 3 additions & 8 deletions node/pkg/websocketfetcher/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,15 @@ const (
VolumeFetchInterval = 10000
)

type Proxy types.Proxy
type Proxy = types.Proxy
type Feed = types.Feed
type FeedData = types.FeedData

func GetDexFeedsQuery(name string) string {
name = capitalizeFirstLetter(name)
return fmt.Sprintf(`SELECT * FROM feeds WHERE definition::jsonb @> '{"type": "%sPool"}'::jsonb;`, name)
}

func (proxy *Proxy) GetProxyUrl() string {
return fmt.Sprintf("%s://%s:%d", proxy.Protocol, proxy.Host, proxy.Port)
}

type Feed types.Feed
type FeedData types.FeedData

type FeedDefinition struct {
Type string `json:"type"`
Provider string `json:"provider"`
Expand Down
6 changes: 4 additions & 2 deletions node/script/test_dal_consumer_ws/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/rs/zerolog/log"
)

type Config = types.Config

type Subscription struct {
Method string `json:"method"`
Params []string `json:"params"`
Expand Down Expand Up @@ -54,10 +56,10 @@ func main() {
wg.Wait()
}

func fetchConfigs() ([]types.Config, error) {
func fetchConfigs() ([]Config, error) {

endpoint := "https://config.orakl.network/baobab_configs.json"
configs, err := request.Request[[]types.Config](request.WithEndpoint(endpoint))
configs, err := request.Request[[]Config](request.WithEndpoint(endpoint))
if err != nil {
return nil, err
}
Expand Down