diff --git a/node/.env.example b/node/.env.example index 3c741adfc..eefe4bc41 100644 --- a/node/.env.example +++ b/node/.env.example @@ -9,6 +9,7 @@ DELEGATOR_URL= SIGNER_PK= ENCRYPT_PASSWORD= + # `baobab` or `cypress`, defaults to baobab CHAIN= @@ -19,6 +20,8 @@ VAULT_ROLE= VAULT_SECRET_PATH= VAULT_KEY_NAME= +# (optional) interval for streaming feed_data from redis -> pgsql, defaults to 10s +FEED_DATA_STREAM_INTERVAL= # (optional) required if wallets table is empty KLAYTN_REPORTER_PK= # (optional) required to run klaytn_helper test diff --git a/node/.env.local b/node/.env.local index 5d924debd..b3eadc44b 100644 --- a/node/.env.local +++ b/node/.env.local @@ -9,7 +9,7 @@ ENCRYPT_PASSWORD=anything CHAIN=test KLAYTN_REPORTER_PK= SIGNER_PK= - +FEED_DATA_STREAM_INTERVAL=10s # this address is dummy contract in baobab SUBMISSION_PROXY_CONTRACT=0x284E7E442d64108Bd593Ec4b41538dCE5aEdA858 diff --git a/node/pkg/db/redis.go b/node/pkg/db/redis.go index c7c9e1342..5bef00263 100644 --- a/node/pkg/db/redis.go +++ b/node/pkg/db/redis.go @@ -50,7 +50,7 @@ func MSet(ctx context.Context, values map[string]string) error { return err } - var pairs []interface{} + var pairs []any for key, value := range values { pairs = append(pairs, key, value) } @@ -58,16 +58,47 @@ func MSet(ctx context.Context, values map[string]string) error { } func MSetObject(ctx context.Context, values map[string]any) error { - stringMap := make(map[string]string) + rdbConn, err := GetRedisConn(ctx) + if err != nil { + return err + } + + var pairs []any for key, value := range values { data, err := json.Marshal(value) if err != nil { log.Error().Err(err).Msg("Error marshalling object") return err } - stringMap[key] = string(data) + pairs = append(pairs, key, string(data)) } - return MSet(ctx, stringMap) + + return rdbConn.MSet(ctx, pairs...).Err() +} + +func MSetObjectWithExp(ctx context.Context, values map[string]any, exp time.Duration) error { + rdbConn, err := GetRedisConn(ctx) + if err != nil { + return err + } + + var pairs []any + for key, value := range values { + data, jsonMarshalErr := json.Marshal(value) + if jsonMarshalErr != nil { + log.Error().Err(jsonMarshalErr).Msg("Error marshalling object") + return jsonMarshalErr + } + pairs = append(pairs, key, string(data)) + } + + pipe := rdbConn.TxPipeline() + pipe.MSet(ctx, pairs...) + for key := range values { + pipe.Expire(ctx, key, exp) + } + _, err = pipe.Exec(ctx) + return err } func Set(ctx context.Context, key string, value string, exp time.Duration) error { @@ -160,6 +191,24 @@ func LRange(ctx context.Context, key string, start int64, end int64) ([]string, return rdbConn.LRange(ctx, key, start, end).Result() } +func LRangeObject[T any](ctx context.Context, key string, start int64, end int64) ([]T, error) { + data, err := LRange(ctx, key, start, end) + if err != nil { + log.Error().Err(err).Msg("Error getting range") + return nil, err + } + + results := make([]T, len(data)) + for i, d := range data { + err = json.Unmarshal([]byte(d), &results[i]) + if err != nil { + log.Error().Err(err).Msg("Error unmarshalling object") + return nil, err + } + } + return results, nil +} + func LPush(ctx context.Context, key string, values ...any) error { rdbConn, err := GetRedisConn(ctx) if err != nil { @@ -169,7 +218,7 @@ func LPush(ctx context.Context, key string, values ...any) error { return rdbConn.LPush(ctx, key, values...).Err() } -func LPushObject(ctx context.Context, key string, values []any) error { +func LPushObject[T any](ctx context.Context, key string, values []T) error { stringValues := make([]interface{}, len(values)) for i, v := range values { data, err := json.Marshal(v) @@ -208,15 +257,13 @@ func PopAllObject[T any](ctx context.Context, key string) ([]T, error) { return nil, err } - results := []T{} - for _, d := range data { - var t T - err = json.Unmarshal([]byte(d), &t) + results := make([]T, len(data)) + for i, d := range data { + err = json.Unmarshal([]byte(d), &results[i]) if err != nil { log.Error().Err(err).Msg("Error unmarshalling object") return nil, err } - results = append(results, t) } return results, nil } diff --git a/node/pkg/db/redis_test.go b/node/pkg/db/redis_test.go index 16b4b67e1..a424d852c 100644 --- a/node/pkg/db/redis_test.go +++ b/node/pkg/db/redis_test.go @@ -362,10 +362,10 @@ func TestLPushObject(t *testing.T) { } key := "testKey" - values := []any{ - TestStruct{ID: 1, Name: "Test1"}, - TestStruct{ID: 2, Name: "Test2"}, - TestStruct{ID: 3, Name: "Test3"}, + values := []TestStruct{ + {ID: 1, Name: "Test1"}, + {ID: 2, Name: "Test2"}, + {ID: 3, Name: "Test3"}, } err := LPushObject(ctx, key, values) @@ -443,10 +443,10 @@ func TestPopAllObject(t *testing.T) { } key := "testKey" - values := []any{ - TestStruct{ID: 1, Name: "Test1"}, - TestStruct{ID: 2, Name: "Test2"}, - TestStruct{ID: 3, Name: "Test3"}, + values := []TestStruct{ + {ID: 1, Name: "Test1"}, + {ID: 2, Name: "Test2"}, + {ID: 3, Name: "Test3"}, } // Push objects to the list @@ -480,3 +480,47 @@ func TestPopAllObject(t *testing.T) { t.Errorf("Expected empty list, got %+v", result) } } +func TestMSetObjectWithExp(t *testing.T) { + ctx := context.Background() + + values := map[string]any{ + "key1": "value1", + "key2": "value2", + } + + exp := 1 * time.Second + + err := MSetObjectWithExp(ctx, values, exp) + if err != nil { + t.Errorf("Error setting objects with expiration: %v", err) + } + + // Check if the values were set correctly + for key, value := range values { + gotValue, getValueErr := Get(ctx, key) + if getValueErr != nil { + t.Errorf("Error getting key: %v", getValueErr) + } + expectedValue, marshalErr := json.Marshal(value) + if marshalErr != nil { + t.Errorf("Error marshalling value: %v", marshalErr) + continue + } + if gotValue != string(expectedValue) { + t.Errorf("Value did not match expected. Got %v, expected %v", gotValue, string(expectedValue)) + } + } + + time.Sleep(1001 * time.Millisecond) + + // Check if the values were expired + for key := range values { + gotValue, getValueErr := Get(ctx, key) + if getValueErr == nil || !strings.Contains(getValueErr.Error(), "redis: nil") { + t.Errorf("Expected to have err") + } + if gotValue != "" { + t.Errorf("Expected empty value, got %v", gotValue) + } + } +} diff --git a/node/pkg/error/sentinel.go b/node/pkg/error/sentinel.go index 734e89976..827abb2d0 100644 --- a/node/pkg/error/sentinel.go +++ b/node/pkg/error/sentinel.go @@ -135,6 +135,8 @@ var ( ErrFetcherConvertToBigInt = &CustomError{Service: Fetcher, Code: InternalError, Message: "Failed to convert to big.Int"} ErrFetcherInvalidInput = &CustomError{Service: Fetcher, Code: InvalidInputError, Message: "Invalid input"} ErrFetcherDivisionByZero = &CustomError{Service: Fetcher, Code: InternalError, Message: "Division by zero"} + ErrCollectorCancelNotFound = &CustomError{Service: Fetcher, Code: InternalError, Message: "Collector cancel function not found"} + ErrStreamerCancelNotFound = &CustomError{Service: Fetcher, Code: InternalError, Message: "Streamer cancel function not found"} ErrLibP2pEmptyNonLocalAddress = &CustomError{Service: Others, Code: InternalError, Message: "Host has no non-local addresses"} ErrLibP2pAddressSplitFail = &CustomError{Service: Others, Code: InternalError, Message: "Failed to split address"} diff --git a/node/pkg/fetcher/app.go b/node/pkg/fetcher/app.go index fd8420c53..651e6e3ae 100644 --- a/node/pkg/fetcher/app.go +++ b/node/pkg/fetcher/app.go @@ -30,12 +30,7 @@ func (a *App) Run(ctx context.Context) error { a.subscribe(ctx) - err = a.startAllFetchers(ctx) - if err != nil { - return err - } - - return nil + return a.startAll(ctx) } func (a *App) subscribe(ctx context.Context) { @@ -109,7 +104,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { msg.Response <- bus.MessageResponse{Success: true} case bus.STOP_FETCHER_APP: log.Debug().Str("Player", "Fetcher").Msg("stopping all fetchers") - err := a.stopAllFetchers(ctx) + err := a.stopAll(ctx) if err != nil { log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to stop all fetchers") bus.HandleMessageError(err, msg, "failed to stop all fetchers") @@ -118,7 +113,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { msg.Response <- bus.MessageResponse{Success: true} case bus.START_FETCHER_APP: log.Debug().Str("Player", "Fetcher").Msg("starting all fetchers") - err := a.startAllFetchers(ctx) + err := a.startAll(ctx) if err != nil { log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to start all fetchers") bus.HandleMessageError(err, msg, "failed to start all fetchers") @@ -126,7 +121,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { } msg.Response <- bus.MessageResponse{Success: true} case bus.REFRESH_FETCHER_APP: - err := a.stopAllFetchers(ctx) + err := a.stopAll(ctx) if err != nil { log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to stop all fetchers") bus.HandleMessageError(err, msg, "failed to stop all fetchers") @@ -138,7 +133,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { bus.HandleMessageError(err, msg, "failed to initialize fetchers") return } - err = a.startAllFetchers(ctx) + err = a.startAll(ctx) if err != nil { log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to start all fetchers") bus.HandleMessageError(err, msg, "failed to start all fetchers") @@ -150,6 +145,34 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { } } +func (a *App) startAll(ctx context.Context) error { + err := a.startAllFetchers(ctx) + if err != nil { + return err + } + + err = a.startAllCollectors(ctx) + if err != nil { + return err + } + + return a.startStreamer(ctx) +} + +func (a *App) stopAll(ctx context.Context) error { + err := a.stopAllFetchers(ctx) + if err != nil { + return err + } + + err = a.stopAllCollectors(ctx) + if err != nil { + return err + } + + return a.stopStreamer(ctx) +} + func (a *App) startFetcher(ctx context.Context, fetcher *Fetcher) error { if fetcher.isRunning { log.Debug().Str("Player", "Fetcher").Str("fetcher", fetcher.Name).Msg("fetcher already running") @@ -162,6 +185,30 @@ func (a *App) startFetcher(ctx context.Context, fetcher *Fetcher) error { return nil } +func (a *App) startCollector(ctx context.Context, collector *Collector) error { + if collector.isRunning { + log.Debug().Str("Player", "Collector").Str("collector", collector.Name).Msg("collector already running") + return nil + } + + collector.Run(ctx) + + log.Debug().Str("Player", "Collector").Str("collector", collector.Name).Msg("collector started") + return nil +} + +func (a *App) startStreamer(ctx context.Context) error { + if a.Streamer.isRunning { + log.Debug().Str("Player", "Streamer").Msg("streamer already running") + return nil + } + + a.Streamer.Run(ctx) + + log.Debug().Str("Player", "Streamer").Msg("streamer started") + return nil +} + func (a *App) startFetcherById(ctx context.Context, configId int32) error { if fetcher, ok := a.Fetchers[configId]; ok { return a.startFetcher(ctx, fetcher) @@ -183,6 +230,19 @@ func (a *App) startAllFetchers(ctx context.Context) error { return nil } +func (a *App) startAllCollectors(ctx context.Context) error { + for _, collector := range a.Collectors { + err := a.startCollector(ctx, collector) + if err != nil { + log.Error().Str("Player", "Collector").Err(err).Str("collector", collector.Name).Msg("failed to start collector") + return err + } + // starts with random sleep to avoid all fetchers starting at the same time + time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)+100)) + } + return nil +} + func (a *App) stopFetcher(ctx context.Context, fetcher *Fetcher) error { log.Debug().Str("fetcher", fetcher.Name).Msg("stopping fetcher") if !fetcher.isRunning { @@ -197,6 +257,34 @@ func (a *App) stopFetcher(ctx context.Context, fetcher *Fetcher) error { return nil } +func (a *App) stopCollector(ctx context.Context, collector *Collector) error { + log.Debug().Str("collector", collector.Name).Msg("stopping collector") + if !collector.isRunning { + log.Debug().Str("Player", "Collector").Str("collector", collector.Name).Msg("collector already stopped") + return nil + } + if collector.cancel == nil { + return errorSentinel.ErrCollectorCancelNotFound + } + collector.cancel() + collector.isRunning = false + return nil +} + +func (a *App) stopStreamer(ctx context.Context) error { + log.Debug().Msg("stopping streamer") + if !a.Streamer.isRunning { + log.Debug().Str("Player", "Streamer").Msg("streamer already stopped") + return nil + } + if a.Streamer.cancel == nil { + return errorSentinel.ErrStreamerCancelNotFound + } + a.Streamer.cancel() + a.Streamer.isRunning = false + return nil +} + func (a *App) stopFetcherById(ctx context.Context, configId int32) error { if fetcher, ok := a.Fetchers[configId]; ok { return a.stopFetcher(ctx, fetcher) @@ -215,6 +303,17 @@ func (a *App) stopAllFetchers(ctx context.Context) error { return nil } +func (a *App) stopAllCollectors(ctx context.Context) error { + for _, collector := range a.Collectors { + err := a.stopCollector(ctx, collector) + if err != nil { + log.Error().Str("Player", "Collector").Err(err).Str("collector", collector.Name).Msg("failed to stop collector") + return err + } + } + return nil +} + func (a *App) getConfigs(ctx context.Context) ([]Config, error) { configs, err := db.QueryRows[Config](ctx, SelectConfigsQuery, nil) if err != nil { @@ -246,14 +345,22 @@ func (a *App) initialize(ctx context.Context) error { return err } a.Fetchers = make(map[int32]*Fetcher, len(configs)) + a.Collectors = make(map[int32]*Collector, len(configs)) for _, config := range configs { - feeds, err := a.getFeeds(ctx, config.ID) - if err != nil { - return err + feeds, getFeedsErr := a.getFeeds(ctx, config.ID) + if getFeedsErr != nil { + return getFeedsErr } a.Fetchers[config.ID] = NewFetcher(config, feeds) + a.Collectors[config.ID] = NewCollector(config, feeds) + } + streamIntervalRaw := os.Getenv("FEED_DATA_STREAM_INTERVAL") + streamInterval, err := time.ParseDuration(streamIntervalRaw) + if err != nil { + streamInterval = time.Second * 5 } + a.Streamer = NewStreamer(streamInterval) proxies, getProxyErr := a.getProxies(ctx) if getProxyErr != nil { diff --git a/node/pkg/fetcher/app_test.go b/node/pkg/fetcher/app_test.go index c3c101e48..03ded4120 100644 --- a/node/pkg/fetcher/app_test.go +++ b/node/pkg/fetcher/app_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "bisonai.com/orakl/node/pkg/admin/tests" "bisonai.com/orakl/node/pkg/db" "github.com/stretchr/testify/assert" ) @@ -37,10 +36,9 @@ func TestFetcherInitialize(t *testing.T) { for _, adapter := range app.Fetchers { assert.Greater(t, len(adapter.Feeds), 0) } - } -func TestFetcherRun(t *testing.T) { +func TestAppRun(t *testing.T) { ctx := context.Background() clean, testItems, err := setup(ctx) if err != nil { @@ -59,285 +57,66 @@ func TestFetcherRun(t *testing.T) { t.Fatalf("error initializing fetcher: %v", err) } - rowsBefore, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Equal(t, 0, len(rowsBefore)) - - feedDataRowsBefore, err := db.QueryRows[FeedDataFromDB](ctx, "SELECT * FROM feed_data", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Equal(t, 0, len(rowsBefore)) - err = app.Run(ctx) if err != nil { t.Fatalf("error running fetcher: %v", err) } - for _, fetcher := range app.Fetchers { assert.True(t, fetcher.isRunning) } - - // wait for fetcher to run - time.Sleep(WAIT_SECONDS) - - // stop running after 2 seconds - for _, fetcher := range app.Fetchers { - app.stopFetcher(ctx, fetcher) - assert.False(t, fetcher.isRunning) - } - - rowsAfter, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Greater(t, len(rowsAfter), len(rowsBefore)) - feedDataRowsAfter, err := db.QueryRows[FeedDataFromDB](ctx, "SELECT * FROM feed_data", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Greater(t, len(feedDataRowsAfter), len(feedDataRowsBefore)) - - for _, fetcher := range app.Fetchers { - configId := fetcher.Config.ID - rdbResult, err := db.Get(ctx, "localAggregate:"+strconv.Itoa(int(configId))) - if err != nil { - t.Fatalf("error reading from redis: %v", err) - } - assert.NotNil(t, rdbResult) - - defer func() { - err = db.Del(ctx, "localAggregate:"+strconv.Itoa(int(configId))) - if err != nil { - t.Fatalf("error removing from redis: %v", err) - } - }() - - } -} - -func TestFetcherFetcherStart(t *testing.T) { - ctx := context.Background() - clean, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) + for _, collector := range app.Collectors { + assert.True(t, collector.isRunning) } - defer func() { - if cleanupErr := clean(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - app := testItems.app + assert.True(t, app.Streamer.isRunning) - err = app.initialize(ctx) - if err != nil { - t.Fatalf("error initializing fetcher: %v", err) - } + time.Sleep(WAIT_SECONDS) - rowsBefore, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil) + err = app.stopAll(ctx) if err != nil { - t.Fatalf("error reading from db: %v", err) - } - feedDataRowsBefore, err := db.QueryRows[FeedDataFromDB](ctx, "SELECT * FROM feed_data", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - - for _, fetcher := range app.Fetchers { - err = app.startFetcher(ctx, fetcher) - if err != nil { - t.Fatalf("error starting adapter: %v", err) - } - assert.True(t, fetcher.isRunning) + t.Fatalf("error stopping fetcher: %v", err) } - - // wait for fetcher to run - time.Sleep(WAIT_SECONDS) - - // stop running after 2 seconds for _, fetcher := range app.Fetchers { - app.stopFetcher(ctx, fetcher) assert.False(t, fetcher.isRunning) } - - rowsAfter, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) + for _, collector := range app.Collectors { + assert.False(t, collector.isRunning) } - assert.Greater(t, len(rowsAfter), len(rowsBefore)) - feedDataRowsAfter, err := db.QueryRows[FeedDataFromDB](ctx, "SELECT * FROM feed_data", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Greater(t, len(feedDataRowsAfter), len(feedDataRowsBefore)) + assert.False(t, app.Streamer.isRunning) - // check rdb and cleanup rdb for _, fetcher := range app.Fetchers { - configId := fetcher.Config.ID - rdbResult, err := db.Get(ctx, "localAggregate:"+strconv.Itoa(int(configId))) - if err != nil { - t.Fatalf("error reading from redis: %v", err) - } - assert.NotNil(t, rdbResult) - defer func() { - err = db.Del(ctx, "localAggregate:"+strconv.Itoa(int(configId))) - if err != nil { - t.Fatalf("error removing from redis: %v", err) + for _, feed := range fetcher.Feeds { + result, letestFeedDataErr := db.GetObject[FeedData](ctx, "latestFeedData:"+strconv.Itoa(int(feed.ID))) + if letestFeedDataErr != nil { + t.Fatalf("error getting latest feed data: %v", letestFeedDataErr) } - }() - } -} - -func TestFetcherFetcherStop(t *testing.T) { - ctx := context.Background() - clean, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := clean(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) + assert.NotNil(t, result) } - }() - - app := testItems.app - - err = app.initialize(ctx) - if err != nil { - t.Fatalf("error initializing fetcher: %v", err) - } - - // first start adapters to stop - for _, fetcher := range app.Fetchers { - err = app.startFetcher(ctx, fetcher) - if err != nil { - t.Fatalf("error starting adapter: %v", err) - } - assert.True(t, fetcher.isRunning) - } - - // wait for fetcher to run - time.Sleep(WAIT_SECONDS) - - // stop adapters - for _, fetcher := range app.Fetchers { - app.stopFetcher(ctx, fetcher) - assert.False(t, fetcher.isRunning) - } - - time.Sleep(WAIT_SECONDS / 2) - rowsBefore, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - feedDataRowsBefore, err := db.QueryRows[FeedDataFromDB](ctx, "SELECT * FROM feed_data", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Greater(t, len(rowsBefore), 0) - time.Sleep(WAIT_SECONDS / 2) - - // no rows should be added after stopping - rowsAfter, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Equal(t, len(rowsAfter), len(rowsBefore)) - feedDataRowsAfter, err := db.QueryRows[FeedDataFromDB](ctx, "SELECT * FROM feed_data", nil) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Equal(t, len(feedDataRowsAfter), len(feedDataRowsBefore)) - - // check rdb and cleanup rdb - for _, fetcher := range app.Fetchers { - configId := fetcher.Config.ID - rdbResult, err := db.Get(ctx, "localAggregate:"+strconv.Itoa(int(configId))) - if err != nil { - t.Fatalf("error reading from redis: %v", err) + rdbResult, localAggregateErr := db.Get(ctx, "localAggregate:"+strconv.Itoa(int(fetcher.Config.ID))) + if localAggregateErr != nil { + t.Fatalf("error getting local aggregate: %v", localAggregateErr) } assert.NotNil(t, rdbResult) - defer func() { - err = db.Del(ctx, "localAggregate:"+strconv.Itoa(int(configId))) - if err != nil { - t.Fatalf("error removing from redis: %v", err) - } - }() } -} - -func TestFetcherFetcherStartById(t *testing.T) { - ctx := context.Background() - clean, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer func() { - if cleanupErr := clean(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - - app := testItems.app - - err = app.initialize(ctx) + buffer, err := db.LRangeObject[FeedData](ctx, "feedDataBuffer", 0, -1) if err != nil { - t.Fatalf("error initializing fetcher: %v", err) - } - - app.subscribe(ctx) - - for _, fetcher := range app.Fetchers { - _, requestErr := tests.RawPostRequest(testItems.admin, "/api/v1/fetcher/activate/"+strconv.Itoa(int(fetcher.Config.ID)), nil) - if requestErr != nil { - t.Fatalf("error starting adapter: %v", requestErr) - } - } - - for _, fetcher := range app.Fetchers { - assert.True(t, fetcher.isRunning) + t.Fatalf("error getting feed data buffer: %v", err) } + assert.Greater(t, len(buffer), 0) -} - -func TestFetcherFetcherStopById(t *testing.T) { - ctx := context.Background() - clean, testItems, err := setup(ctx) + err = app.Streamer.Job(ctx) if err != nil { - t.Fatalf("error setting up test: %v", err) + t.Fatalf("error running streamer job: %v", err) } - defer func() { - if cleanupErr := clean(); cleanupErr != nil { - t.Logf("Cleanup failed: %v", cleanupErr) - } - }() - app := testItems.app - - err = app.initialize(ctx) + feedResult, err := db.QueryRows[FeedData](ctx, "SELECT * FROM feed_data", nil) if err != nil { - t.Fatalf("error initializing fetcher: %v", err) + t.Fatalf("error querying feed data: %v", err) } + assert.Greater(t, len(feedResult), 0) - err = app.Run(ctx) + localAggregateResult, err := db.QueryRows[Aggregate](ctx, "SELECT * FROM local_aggregates", nil) if err != nil { - t.Fatalf("error running fetcher: %v", err) - } - for _, fetcher := range app.Fetchers { - assert.True(t, fetcher.isRunning) - } - - for _, fetcher := range app.Fetchers { - _, requestErr := tests.RawPostRequest(testItems.admin, "/api/v1/fetcher/deactivate/"+strconv.Itoa(int(fetcher.Config.ID)), nil) - if requestErr != nil { - t.Fatalf("error stopping adapter: %v", requestErr) - } - } - - for _, fetcher := range app.Fetchers { - assert.False(t, fetcher.isRunning) + t.Fatalf("error querying local aggregates: %v", err) } + assert.Greater(t, len(localAggregateResult), 0) } diff --git a/node/pkg/fetcher/collector.go b/node/pkg/fetcher/collector.go new file mode 100644 index 000000000..7030eb137 --- /dev/null +++ b/node/pkg/fetcher/collector.go @@ -0,0 +1,80 @@ +package fetcher + +import ( + "context" + "time" + + "bisonai.com/orakl/node/pkg/utils/calculator" + "github.com/rs/zerolog/log" +) + +func NewCollector(config Config, feeds []Feed) *Collector { + return &Collector{ + Config: config, + Feeds: feeds, + collectorCtx: nil, + cancel: nil, + } +} + +func (c *Collector) Run(ctx context.Context) { + collectorCtx, cancel := context.WithCancel(ctx) + c.collectorCtx = collectorCtx + c.cancel = cancel + c.isRunning = true + + collectorFrequency := time.Duration(c.FetchInterval) * time.Millisecond + ticker := time.NewTicker(collectorFrequency) + go func() { + for { + select { + case <-c.collectorCtx.Done(): + ticker.Stop() + return + case <-ticker.C: + err := c.Job(c.collectorCtx) + if err != nil { + log.Error().Str("Player", "Collector").Err(err).Msg("error in collectorJob") + } + } + } + }() +} + +func (c *Collector) Job(ctx context.Context) error { + log.Debug().Str("Player", "Collector").Str("collector", c.Name).Msg("collectorJob") + rawResult, err := c.collect(ctx) + if err != nil { + log.Error().Str("Player", "Collector").Err(err).Msg("error in collect") + return err + } + + aggregated, err := calculator.GetFloatMed(rawResult) + if err != nil { + log.Error().Str("Player", "Collector").Err(err).Msg("error in GetFloatMed") + return err + } + err = insertLocalAggregateRdb(ctx, c.ID, aggregated) + if err != nil { + log.Error().Str("Player", "Collector").Err(err).Msg("error in insertLocalAggregateRdb") + return err + } + return insertLocalAggregatePgsql(ctx, c.ID, aggregated) +} + +func (c *Collector) collect(ctx context.Context) ([]float64, error) { + feedIds := make([]int32, len(c.Feeds)) + for i, feed := range c.Feeds { + feedIds[i] = feed.ID + } + feedData, err := getLatestFeedData(ctx, feedIds) + if err != nil { + log.Error().Str("Player", "Collector").Err(err).Msg("error in getLatestFeedData") + return nil, err + } + result := make([]float64, len(feedData)) + for i, data := range feedData { + result[i] = data.Value + } + return result, nil +} diff --git a/node/pkg/fetcher/fetcher.go b/node/pkg/fetcher/fetcher.go index da8c19fde..7315d1bdf 100644 --- a/node/pkg/fetcher/fetcher.go +++ b/node/pkg/fetcher/fetcher.go @@ -9,7 +9,6 @@ import ( "time" errorSentinel "bisonai.com/orakl/node/pkg/error" - "bisonai.com/orakl/node/pkg/utils/calculator" "bisonai.com/orakl/node/pkg/utils/reducer" "bisonai.com/orakl/node/pkg/utils/request" "github.com/rs/zerolog/log" @@ -30,8 +29,8 @@ func (f *Fetcher) Run(ctx context.Context, chainHelpers map[string]ChainHelper, f.cancel = cancel f.isRunning = true - fetcher_frequency := time.Duration(f.FetchInterval) * time.Millisecond - ticker := time.NewTicker(fetcher_frequency) + fetcherFrequency := time.Duration(f.FetchInterval) * time.Millisecond + ticker := time.NewTicker(fetcherFrequency) go func() { for { select { @@ -39,7 +38,7 @@ func (f *Fetcher) Run(ctx context.Context, chainHelpers map[string]ChainHelper, ticker.Stop() return case <-ticker.C: - err := f.fetchAndInsert(f.fetcherCtx, chainHelpers, proxies) + err := f.fetcherJob(f.fetcherCtx, chainHelpers, proxies) if err != nil { log.Error().Str("Player", "Fetcher").Err(err).Msg("error in fetchAndInsert") } @@ -48,47 +47,21 @@ func (f *Fetcher) Run(ctx context.Context, chainHelpers map[string]ChainHelper, }() } -func (f *Fetcher) fetchAndInsert(ctx context.Context, chainHelpers map[string]ChainHelper, proxies []Proxy) error { - log.Debug().Str("Player", "Fetcher").Str("fetcher", f.Name).Msg("fetching and inserting") - results, err := f.fetch(chainHelpers, proxies) +func (f *Fetcher) fetcherJob(ctx context.Context, chainHelpers map[string]ChainHelper, proxies []Proxy) error { + log.Debug().Str("Player", "Fetcher").Str("fetcher", f.Name).Msg("fetcherJob") + result, err := f.fetch(chainHelpers, proxies) if err != nil { log.Error().Str("Player", "Fetcher").Err(err).Msg("error in fetch") return err } - log.Debug().Str("Player", "Fetcher").Str("fetcher", f.Name).Msg("fetch complete") - err = insertFeedData(ctx, results) + err = setLatestFeedData(ctx, result, time.Duration(f.FetchInterval)*time.Millisecond) if err != nil { - log.Error().Str("Player", "Fetcher").Err(err).Msg("error in insertFeedData") + log.Error().Str("Player", "Fetcher").Err(err).Msg("error in setLatestFeedData") return err } - rawValues := make([]float64, len(results)) - for i, result := range results { - rawValues[i] = result.Value - } - - aggregated, err := calculator.GetFloatMed(rawValues) - if err != nil { - log.Error().Str("Player", "Fetcher").Err(err).Msg("error in GetFloatMed") - return err - } - log.Debug().Str("Player", "Fetcher").Str("fetcher", f.Name).Float64("aggregated", aggregated).Msg("aggregated") - - err = insertLocalAggregatePgsql(ctx, f.ID, aggregated) - if err != nil { - log.Error().Str("Player", "Fetcher").Err(err).Msg("error in insertPgsql") - return err - } - log.Debug().Str("Player", "Fetcher").Str("fetcher", f.Name).Msg("inserted into pgsql") - - err = insertLocalAggregateRdb(ctx, f.ID, aggregated) - if err != nil { - log.Error().Str("Player", "Fetcher").Err(err).Msg("error in insertRdb") - return err - } - log.Debug().Str("Player", "Fetcher").Str("fetcher", f.Name).Msg("inserted into rdb") - return nil + return setFeedDataBuffer(ctx, result) } func (f *Fetcher) fetch(chainHelpers map[string]ChainHelper, proxies []Proxy) ([]FeedData, error) { @@ -130,8 +103,8 @@ func (f *Fetcher) fetch(chainHelpers map[string]ChainHelper, proxies []Proxy) ([ default: errChan <- errorSentinel.ErrFetcherInvalidType } - - dataChan <- FeedData{FeedID: feed.ID, Value: resultValue} + now := time.Now() + dataChan <- FeedData{FeedID: feed.ID, Value: resultValue, Timestamp: &now} }(feed) } @@ -231,7 +204,7 @@ func (f *Fetcher) filterProxyByLocation(proxies []Proxy, location string) []Prox filteredProxies := []Proxy{} for _, proxy := range proxies { if proxy.Location != nil && *proxy.Location == location { - filteredProxies = append(proxies, proxy) + filteredProxies = append(filteredProxies, proxy) } } return filteredProxies diff --git a/node/pkg/fetcher/fetcher_test.go b/node/pkg/fetcher/fetcher_test.go index 649226321..c3db84ecd 100644 --- a/node/pkg/fetcher/fetcher_test.go +++ b/node/pkg/fetcher/fetcher_test.go @@ -16,6 +16,92 @@ import ( "github.com/stretchr/testify/assert" ) +func TestFetcherRun(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + app := testItems.app + + err = app.initialize(ctx) + if err != nil { + t.Fatalf("error initializing fetcher: %v", err) + } + + for _, fetcher := range app.Fetchers { + fetcher.Run(ctx, app.ChainHelpers, app.Proxies) + } + + for _, fetcher := range app.Fetchers { + assert.True(t, fetcher.isRunning) + } + + for _, fetcher := range app.Fetchers { + fetcher.cancel() + } + + defer func() { + db.Del(ctx, "feedDataBuffer") + for _, fetcher := range app.Fetchers { + for _, feed := range fetcher.Feeds { + db.Del(ctx, "latestFeedData:"+strconv.Itoa(int(feed.ID))) + } + } + }() +} + +func TestFetcherFetcherJob(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + app := testItems.app + + err = app.initialize(ctx) + if err != nil { + t.Fatalf("error initializing fetcher: %v", err) + } + + for _, fetcher := range app.Fetchers { + jobErr := fetcher.fetcherJob(ctx, app.ChainHelpers, app.Proxies) + if jobErr != nil { + t.Fatalf("error fetching: %v", jobErr) + } + } + defer db.Del(ctx, "feedDataBuffer") + + for _, fetcher := range app.Fetchers { + for _, feed := range fetcher.Feeds { + res, latestFeedDataErr := db.GetObject[FeedData](ctx, "latestFeedData:"+strconv.Itoa(int(feed.ID))) + if latestFeedDataErr != nil { + t.Fatalf("error fetching feed data: %v", latestFeedDataErr) + } + assert.NotNil(t, res) + defer db.Del(ctx, "latestFeedData:"+strconv.Itoa(int(feed.ID))) + } + } + + buffer, err := db.LRangeObject[FeedData](ctx, "feedDataBuffer", 0, -1) + if err != nil { + t.Fatalf("error fetching buffer: %v", err) + } + assert.Greater(t, len(buffer), 0) +} + func TestFetcherFetch(t *testing.T) { ctx := context.Background() clean, testItems, err := setup(ctx) @@ -69,11 +155,20 @@ func TestFetcherFetchProxy(t *testing.T) { log.Fatal().Err(proxyServeErr).Msg("unexpected server shutdown") } }() - proxy, err := tests.PostRequest[Proxy](testItems.admin, "/api/v1/proxy", map[string]any{"protocol": "http", "host": "localhost", "port": 8088}) if err != nil { t.Fatalf("error creating proxy: %v", err) } + defer func() { + _, err = tests.DeleteRequest[Proxy](testItems.admin, "/api/v1/proxy/"+strconv.FormatInt(proxy.ID, 10), nil) + if err != nil { + t.Fatalf("error cleaning up proxy: %v", err) + } + + if err = srv.Shutdown(ctx); err != nil { + log.Fatal().Err(err).Msg("unexpected server shutdown") + } + }() err = app.initialize(ctx) if err != nil { @@ -88,18 +183,49 @@ func TestFetcherFetchProxy(t *testing.T) { assert.Greater(t, len(result), 0) } - _, err = tests.DeleteRequest[Proxy](testItems.admin, "/api/v1/proxy/"+strconv.FormatInt(proxy.ID, 10), nil) +} + +func TestFetcherCex(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) if err != nil { - t.Fatalf("error cleaning up proxy: %v", err) + t.Fatalf("error setting up test: %v", err) } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + app := testItems.app - if err = srv.Shutdown(ctx); err != nil { - log.Fatal().Err(err).Msg("unexpected server shutdown") + err = app.initialize(ctx) + if err != nil { + t.Fatalf("error initializing fetcher: %v", err) } + for _, fetcher := range app.Fetchers { + for _, feed := range fetcher.Feeds { + definition := new(Definition) + + err := json.Unmarshal(feed.Definition, &definition) + if err != nil { + t.Fatalf("error unmarshalling definition: %v", err) + } + if definition.Type != nil { + continue + } + + result, err := fetcher.cex(definition, app.Proxies) + if err != nil { + t.Fatalf("error fetching: %v", err) + } + assert.Greater(t, result, float64(0)) + } + } } -func TestFetcherFetchAndInsertAdapter(t *testing.T) { +func TestFetcherUniswapV3(t *testing.T) { ctx := context.Background() clean, testItems, err := setup(ctx) if err != nil { @@ -115,101 +241,100 @@ func TestFetcherFetchAndInsertAdapter(t *testing.T) { err = app.initialize(ctx) if err != nil { - log.Fatal().Err(err).Msg("error initializing fetcher") + t.Fatalf("error initializing fetcher: %v", err) } for _, fetcher := range app.Fetchers { - err = fetcher.fetchAndInsert(ctx, app.ChainHelpers, app.Proxies) - assert.NoError(t, err, "fetchAndInsert should not return an error") + for _, feed := range fetcher.Feeds { + definition := new(Definition) + + err := json.Unmarshal(feed.Definition, &definition) + if err != nil { + t.Fatalf("error unmarshalling definition: %v", err) + } + if definition.Type == nil || *definition.Type != "UniswapPool" { + continue + } + + result, err := fetcher.uniswapV3(definition, app.ChainHelpers) + if err != nil { + t.Fatalf("error fetching: %v", err) + } + assert.Greater(t, result, float64(0)) + } + } +} + +func TestRequestFeed(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + app := testItems.app + + err = app.initialize(ctx) if err != nil { - t.Fatalf("error running adapter: %v", err) + t.Fatalf("error initializing fetcher: %v", err) } for _, fetcher := range app.Fetchers { - pgResult, err := db.QueryRow[Aggregate](ctx, "SELECT * FROM local_aggregates WHERE config_id = @config_id", map[string]any{"config_id": fetcher.Config.ID}) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.NotNil(t, pgResult) + for _, feed := range fetcher.Feeds { + definition := new(Definition) + err := json.Unmarshal(feed.Definition, &definition) + if err != nil { + t.Fatalf("error unmarshalling definition: %v", err) + } + if definition.Type != nil { + continue + } - feedIds := make([]interface{}, len(fetcher.Feeds)) - for i, feed := range fetcher.Feeds { - feedIds[i] = feed.ID + result, err := fetcher.requestFeed(definition, app.Proxies) + if err != nil { + t.Fatalf("error fetching: %v", err) + } + assert.NotEqual(t, result, nil) } + } +} - feedPgResult, err := db.BulkSelect[FeedDataFromDB](ctx, "feed_data", []string{"feed_id", "value", "timestamp"}, []string{"feed_id"}, feedIds) - if err != nil { - t.Fatalf("error reading from db: %v", err) - } - assert.Greater(t, len(feedPgResult), 0) +func TestFetcherRequestWithoutProxy(t *testing.T) { + // Being tested in TestFetcherFetch + t.Skip() +} - rdbResult, err := db.Get(ctx, "localAggregate:"+strconv.Itoa(int(fetcher.Config.ID))) - if err != nil { - t.Fatalf("error reading from redis: %v", err) - } - assert.NotNil(t, rdbResult) - var redisAgg RedisAggregate - err = json.Unmarshal([]byte(rdbResult), &redisAgg) - if err != nil { - t.Fatalf("error unmarshalling from redis: %v", err) - } - assert.NotNil(t, redisAgg) - assert.NotNil(t, redisAgg.Value) +func TestFetcherRequestWithProxy(t *testing.T) { + // Being tested in TestFetcherFetchProxy + t.Skip() +} - err = db.Del(ctx, "localAggregate:"+strconv.Itoa(int(fetcher.Config.ID))) - if err != nil { - t.Fatalf("error removing from redis: %v", err) - } +func TestFetcherFilterProxyByLocation(t *testing.T) { + uk := "uk" + us := "us" + kr := "kr" + proxies := []Proxy{ + {ID: 1, Protocol: "http", Host: "localhost", Port: 8080, Location: &uk}, + {ID: 2, Protocol: "http", Host: "localhost", Port: 8081, Location: &us}, + {ID: 3, Protocol: "http", Host: "localhost", Port: 8082, Location: &kr}, } -} -func TestFetchSingle(t *testing.T) { - t.Skip() // test fails if data provider refuses connection - ctx := context.Background() - rawDefinition := ` - { - "url": "https://api.bybit.com/derivatives/v3/public/tickers?symbol=ADAUSDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "result", - "list" - ] - }, - { - "function": "INDEX", - "args": 0 - }, - { - "function": "PARSE", - "args": [ - "lastPrice" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - }` - definition := new(Definition) - err := json.Unmarshal([]byte(rawDefinition), &definition) - if err != nil { - t.Fatalf("error unmarshalling definition: %v", err) - } - - result, err := FetchSingle(ctx, definition) - if err != nil { - t.Fatalf("error fetching single: %v", err) - } - assert.Greater(t, result, float64(0)) + fetcher := NewFetcher(Config{}, []Feed{}) + + res := fetcher.filterProxyByLocation(proxies, uk) + assert.Greater(t, len(res), 0) + assert.Equal(t, res[0], proxies[0]) + + res = fetcher.filterProxyByLocation(proxies, us) + assert.Greater(t, len(res), 0) + assert.Equal(t, res[0], proxies[1]) + + res = fetcher.filterProxyByLocation(proxies, kr) + assert.Greater(t, len(res), 0) + assert.Equal(t, res[0], proxies[2]) } diff --git a/node/pkg/fetcher/main_test.go b/node/pkg/fetcher/main_test.go index c086b9913..ca0c88a2a 100644 --- a/node/pkg/fetcher/main_test.go +++ b/node/pkg/fetcher/main_test.go @@ -1,12 +1,16 @@ +//nolint:all package fetcher import ( "context" "encoding/json" + "net/http" + "net/http/httptest" "os" "testing" "bisonai.com/orakl/node/pkg/admin/config" + "bisonai.com/orakl/node/pkg/admin/feed" "bisonai.com/orakl/node/pkg/admin/fetcher" "bisonai.com/orakl/node/pkg/admin/proxy" "bisonai.com/orakl/node/pkg/admin/tests" @@ -17,74 +21,153 @@ import ( "github.com/rs/zerolog" ) -var sampleData = []string{`{ - "name": "DAI-USDT", - "feeds": [ - { - "name": "Binance-DAI-USDT", - "definition": { - "url": "https://api.binance.com/api/v3/avgPrice?symbol=DAIUSDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "price" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } +const ( + mockReply0 = `{ + "mins": 5, + "price": "1.01827085", + "closeTime": 1597204784937 + }` + + mockReply1 = `{ + "retCode": 0, + "retMsg": "OK", + "result": { + "category": "", + "list": [ + { + "symbol": "DOGEUSDT", + "bidPrice": "0.15845", + "askPrice": "0.15846", + "lastPrice": "0.15845", + "lastTickDirection": "ZeroMinusTick", + "prevPrice24h": "0.16706", + "price24hPcnt": "-0.051538", + "highPrice24h": "0.17140", + "lowPrice24h": "0.15147", + "prevPrice1h": "0.15671", + "markPrice": "0.15846", + "indexPrice": "0.15839", + "openInterest": "1522687772", + "turnover24h": "773341302.2606", + "volume24h": "4789388852.0000", + "fundingRate": "0.0001", + "nextFundingTime": "1716537600000", + "predictedDeliveryPrice": "", + "basisRate": "", + "deliveryFeeRate": "", + "deliveryTime": "0", + "openInterestValue": "241285104.35" + } + ] }, - { - "name": "Crypto-DAI-USDT", - "definition": { - "url": "https://api.crypto.com/v2/public/get-ticker?instrument_name=DAI_USDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "result", - "data" - ] - }, - { - "function": "INDEX", - "args": 0 - }, - { - "function": "PARSE", - "args": [ - "a" - ] - }, - { - "function": "POW10", - "args": 8 + "retExtInfo": { + + }, + "time": 1716537538486 + }` +) + +type TestItems struct { + admin *fiber.App + messageBus *bus.MessageBus + app *App + insertedConfigs []config.ConfigModel + insertedFeeds []feed.FeedModel + mockDataSource []*httptest.Server +} + +func setup(ctx context.Context) (func() error, *TestItems, error) { + var testItems = new(TestItems) + mb := bus.New(10) + + admin, err := utils.Setup(utils.SetupInfo{ + Version: "", + Bus: mb, + }) + if err != nil { + return nil, nil, err + } + v1 := admin.Group("/api/v1") + config.Routes(v1) + proxy.Routes(v1) + fetcher.Routes(v1) + feed.Routes(v1) + + app := New(mb) + + mockDataSource1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(mockReply0)) + })) + + mockDataSource2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Write([]byte(mockReply1)) + })) + + testItems.mockDataSource = []*httptest.Server{mockDataSource1, mockDataSource2} + testItems.admin = admin + testItems.messageBus = mb + testItems.app = app + + configs, feeds, err := insertSampleData(ctx, testItems) + if err != nil { + return nil, nil, err + } + testItems.insertedConfigs = configs + testItems.insertedFeeds = feeds + + return cleanup(ctx, testItems), testItems, nil +} + +func insertSampleData(ctx context.Context, testItems *TestItems) ([]config.ConfigModel, []feed.FeedModel, error) { + var sampleData = []string{`{ + "name": "DAI-USDT", + "feeds": [ + { + "name": "Binance-DAI-USDT", + "definition": { + "url": "` + testItems.mockDataSource[0].URL + `", + "headers": { + "Content-Type": "application/json" }, - { - "function": "ROUND" - } - ] + "method": "GET", + "reducers": [ + { + "function": "PARSE", + "args": [ + "price" + ] + }, + { + "function": "POW10", + "args": 8 + }, + { + "function": "ROUND" + } + ] + } + }, + { + "name": "UniswapV3-DAI-USDT", + "definition": { + "chainId": "1", + "address": "0x48da0965ab2d2cbf1c17c09cfb5cbe67ad5b1406", + "type": "UniswapPool", + "token0Decimals": 18, + "token1Decimals": 6 + } } - }, + ], + "fetchInterval": 2000, + "aggregateInterval": 5000, + "submitInterval": 15000 + }`, `{ + "name": "DOGE-USDT", + "feeds": [ { - "name": "Coinbase-DAI-USDT", + "name": "Bybit-DOGE-USDT", "definition": { - "url": "https://api.coinbase.com/v2/exchange-rates?currency=DAI", + "url": "` + testItems.mockDataSource[1].URL + `", "headers": { "Content-Type": "application/json" }, @@ -93,30 +176,10 @@ var sampleData = []string{`{ { "function": "PARSE", "args": [ - "data", - "rates", - "USDT" + "result", + "list" ] }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Gateio-DAI-USDT", - "definition": { - "url": "https://api.gateio.ws/api/v4/spot/tickers?currency_pair=DAI_USDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ { "function": "INDEX", "args": 0 @@ -124,34 +187,7 @@ var sampleData = []string{`{ { "function": "PARSE", "args": [ - "last" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Coinex-DAI-USDT", - "definition": { - "url": "https://api.coinex.com/v1/market/ticker?market=DAIUSDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "data", - "ticker", - "last" + "lastPrice" ] }, { @@ -163,326 +199,42 @@ var sampleData = []string{`{ } ] } - }, - { - "name": "UniswapV3-DAI-USDT", - "definition": { - "chainId": "1", - "address": "0x48da0965ab2d2cbf1c17c09cfb5cbe67ad5b1406", - "type": "UniswapPool", - "token0Decimals": 18, - "token1Decimals": 6 - } } ], "fetchInterval": 2000, "aggregateInterval": 5000, "submitInterval": 15000 -}`, `{ -"name": "DOGE-USDT", -"feeds": [ - { - "name": "Bybit-DOGE-USDT", - "definition": { - "url": "https://api.bybit.com/derivatives/v3/public/tickers?symbol=DOGEUSDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "result", - "list" - ] - }, - { - "function": "INDEX", - "args": 0 - }, - { - "function": "PARSE", - "args": [ - "lastPrice" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Binance-DOGE-USDT", - "definition": { - "url": "https://api.binance.com/api/v3/avgPrice?symbol=DOGEUSDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "price" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Kucoin-DOGE-USDT", - "definition": { - "url": "https://api.kucoin.com/api/v1/market/orderbook/level1?symbol=DOGE-USDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "data", - "price" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Crypto-DOGE-USDT", - "definition": { - "url": "https://api.crypto.com/v2/public/get-ticker?instrument_name=DOGE_USDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "result", - "data" - ] - }, - { - "function": "INDEX", - "args": 0 - }, - { - "function": "PARSE", - "args": [ - "a" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Btse-DOGE-USDT", - "definition": { - "url": "https://api.btse.com/spot/api/v3.2/price?symbol=DOGE-USDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "INDEX", - "args": 0 - }, - { - "function": "PARSE", - "args": [ - "indexPrice" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Coinbase-DOGE-USDT", - "definition": { - "url": "https://api.coinbase.com/v2/exchange-rates?currency=DOGE", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "data", - "rates", - "USDT" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Gateio-DOGE-USDT", - "definition": { - "url": "https://api.gateio.ws/api/v4/spot/tickers?currency_pair=DOGE_USDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "INDEX", - "args": 0 - }, - { - "function": "PARSE", - "args": [ - "last" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - }, - { - "name": "Coinex-DOGE-USDT", - "definition": { - "url": "https://api.coinex.com/v1/market/ticker?market=DOGEUSDT", - "headers": { - "Content-Type": "application/json" - }, - "method": "GET", - "reducers": [ - { - "function": "PARSE", - "args": [ - "data", - "ticker", - "last" - ] - }, - { - "function": "POW10", - "args": 8 - }, - { - "function": "ROUND" - } - ] - } - } -], -"fetchInterval": 2000, -"aggregateInterval": 5000, -"submitInterval": 15000 -}`} - -type TestItems struct { - admin *fiber.App - messageBus *bus.MessageBus - app *App -} + }`} -func setup(ctx context.Context) (func() error, *TestItems, error) { - var testItems = new(TestItems) - mb := bus.New(10) - - admin, err := utils.Setup(utils.SetupInfo{ - Version: "", - Bus: mb, - }) - if err != nil { - return nil, nil, err - } - v1 := admin.Group("/api/v1") - config.Routes(v1) - proxy.Routes(v1) - fetcher.Routes(v1) - - app := New(mb) - - testItems.admin = admin - testItems.messageBus = mb - testItems.app = app - - err = insertSampleData(ctx, admin) - if err != nil { - return nil, nil, err - } - - return cleanup(ctx, admin, app), testItems, nil -} - -func insertSampleData(ctx context.Context, app *fiber.App) error { var insertData = make([]config.ConfigInsertModel, len(sampleData)) var insertResults = make([]config.ConfigModel, len(sampleData)) for i := range insertData { err := json.Unmarshal([]byte(sampleData[i]), &insertData[i]) if err != nil { - return err + return nil, nil, err } } for i := range insertResults { - tmp, err := tests.PostRequest[config.ConfigModel](app, "/api/v1/config", insertData[i]) + tmp, err := tests.PostRequest[config.ConfigModel](testItems.admin, "/api/v1/config", insertData[i]) if err != nil { - return err + return nil, nil, err } insertResults[i] = tmp } - return nil + insertedFeeds, err := tests.GetRequest[[]feed.FeedModel](testItems.admin, "/api/v1/feed", nil) + if err != nil { + return nil, nil, err + } + + return insertResults, insertedFeeds, nil } -func cleanup(ctx context.Context, admin *fiber.App, app *App) func() error { +func cleanup(ctx context.Context, testItems *TestItems) func() error { return func() error { - if err := admin.Shutdown(); err != nil { + if err := testItems.admin.Shutdown(); err != nil { return err } err := db.QueryWithoutResult(ctx, "DELETE FROM configs", nil) @@ -502,10 +254,15 @@ func cleanup(ctx context.Context, admin *fiber.App, app *App) func() error { if err != nil { return err } - err = app.stopAllFetchers(ctx) + err = testItems.app.stopAllFetchers(ctx) if err != nil { return err } + + for _, server := range testItems.mockDataSource { + server.Close() + } + return nil } } diff --git a/node/pkg/fetcher/streamer.go b/node/pkg/fetcher/streamer.go new file mode 100644 index 000000000..203747d03 --- /dev/null +++ b/node/pkg/fetcher/streamer.go @@ -0,0 +1,47 @@ +package fetcher + +import ( + "context" + "time" + + "github.com/rs/zerolog/log" +) + +func NewStreamer(interval time.Duration) *Streamer { + return &Streamer{ + Interval: interval, + } +} + +func (s *Streamer) Run(ctx context.Context) { + streamerCtx, cancel := context.WithCancel(ctx) + s.streamerCtx = streamerCtx + s.cancel = cancel + s.isRunning = true + + ticker := time.NewTicker(s.Interval) + go func() { + for { + select { + case <-s.streamerCtx.Done(): + ticker.Stop() + return + case <-ticker.C: + err := s.Job(s.streamerCtx) + if err != nil { + log.Error().Str("Player", "Streamer").Err(err).Msg("error in streamerJob") + } + } + } + }() +} + +func (s *Streamer) Job(ctx context.Context) error { + log.Debug().Str("Player", "Streamer").Msg("streamerJob") + result, err := getFeedDataBuffer(ctx) + if err != nil { + log.Error().Str("Player", "Streamer").Err(err).Msg("error in getFeedDataBuffer") + return err + } + return copyFeedData(ctx, result) +} diff --git a/node/pkg/fetcher/types.go b/node/pkg/fetcher/types.go index ba53b9f5c..2f73568f1 100644 --- a/node/pkg/fetcher/types.go +++ b/node/pkg/fetcher/types.go @@ -19,8 +19,9 @@ const ( ) type FeedData struct { - FeedID int32 `db:"feed_id"` - Value float64 `db:"value"` + FeedID int32 `db:"feed_id"` + Value float64 `db:"value"` + Timestamp *time.Time `db:"timestamp"` } type Config struct { @@ -46,6 +47,23 @@ type Fetcher struct { isRunning bool } +type Collector struct { + Config + Feeds []Feed + + collectorCtx context.Context + cancel context.CancelFunc + isRunning bool +} + +type Streamer struct { + Interval time.Duration + + streamerCtx context.Context + cancel context.CancelFunc + isRunning bool +} + type Feed struct { ID int32 `db:"id"` Name string `db:"name"` @@ -56,6 +74,8 @@ type Feed struct { type App struct { Bus *bus.MessageBus Fetchers map[int32]*Fetcher + Collectors map[int32]*Collector + Streamer *Streamer Proxies []Proxy ChainHelpers map[string]ChainHelper } diff --git a/node/pkg/fetcher/utils.go b/node/pkg/fetcher/utils.go index 30fe541c8..c80d45fa9 100644 --- a/node/pkg/fetcher/utils.go +++ b/node/pkg/fetcher/utils.go @@ -11,7 +11,6 @@ import ( errorSentinel "bisonai.com/orakl/node/pkg/error" "bisonai.com/orakl/node/pkg/utils/reducer" "bisonai.com/orakl/node/pkg/utils/request" - "github.com/rs/zerolog/log" ) func FetchSingle(ctx context.Context, definition *Definition) (float64, error) { @@ -51,17 +50,34 @@ func getTokenPrice(sqrtPriceX96 *big.Int, definition *Definition) (float64, erro return math.Round(result), nil } -func insertFeedData(ctx context.Context, feedData []FeedData) error { - insertRows := make([][]any, 0, len(feedData)) +func setLatestFeedData(ctx context.Context, feedData []FeedData, expiration time.Duration) error { + latestData := make(map[string]any) for _, data := range feedData { - insertRows = append(insertRows, []any{data.FeedID, data.Value}) + latestData["latestFeedData:"+strconv.Itoa(int(data.FeedID))] = data } + return db.MSetObjectWithExp(ctx, latestData, expiration) +} - err := db.BulkInsert(ctx, "feed_data", []string{"feed_id", "value"}, insertRows) +func getLatestFeedData(ctx context.Context, feedIds []int32) ([]FeedData, error) { + keys := make([]string, len(feedIds)) + for i, feedId := range feedIds { + keys[i] = "latestFeedData:" + strconv.Itoa(int(feedId)) + } + feedData, err := db.MGetObject[FeedData](ctx, keys) if err != nil { - log.Error().Str("Player", "Fetcher").Err(err).Msg("failed to insert feed data") + return nil, err } - return err + + return feedData, nil +} + +func setFeedDataBuffer(ctx context.Context, feedData []FeedData) error { + return db.LPushObject(ctx, "feedDataBuffer", feedData) +} + +func getFeedDataBuffer(ctx context.Context) ([]FeedData, error) { + // buffer flushed on pop all + return db.PopAllObject[FeedData](ctx, "feedDataBuffer") } func insertLocalAggregatePgsql(ctx context.Context, configId int32, value float64) error { @@ -74,3 +90,12 @@ func insertLocalAggregateRdb(ctx context.Context, configId int32, value float64) data := RedisAggregate{ConfigId: configId, Value: int64(value), Timestamp: time.Now()} return db.SetObject(ctx, key, data, time.Duration(5*time.Minute)) } + +func copyFeedData(ctx context.Context, feedData []FeedData) error { + insertRows := make([][]any, len(feedData)) + for i, data := range feedData { + insertRows[i] = []any{data.FeedID, data.Value, data.Timestamp} + } + _, err := db.BulkCopy(ctx, "feed_data", []string{"feed_id", "value", "timestamp"}, insertRows) + return err +} diff --git a/node/pkg/fetcher/utils_test.go b/node/pkg/fetcher/utils_test.go new file mode 100644 index 000000000..1e8c230c5 --- /dev/null +++ b/node/pkg/fetcher/utils_test.go @@ -0,0 +1,361 @@ +//nolint:all +package fetcher + +import ( + "context" + "encoding/json" + "math/big" + "strconv" + "testing" + "time" + + "net/http" + "net/http/httptest" + + "bisonai.com/orakl/node/pkg/aggregator" + "bisonai.com/orakl/node/pkg/db" + "github.com/stretchr/testify/assert" +) + +func TestFetchSingle(t *testing.T) { + ctx := context.Background() + mockServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Write([]byte(`{ + "retCode": 0, + "retMsg": "OK", + "result": { + "category": "", + "list": [ + { + "symbol": "ADAUSDT", + "bidPrice": "0.4675", + "askPrice": "0.4676", + "lastPrice": "0.4676", + "lastTickDirection": "ZeroPlusTick", + "prevPrice24h": "0.4885", + "price24hPcnt": "-0.042784", + "highPrice24h": "0.4890", + "lowPrice24h": "0.4435", + "prevPrice1h": "0.4675", + "markPrice": "0.4675", + "indexPrice": "0.4674", + "openInterest": "125794431", + "turnover24h": "121910399.6363", + "volume24h": "261804569.0000", + "fundingRate": "0.0001", + "nextFundingTime": "1716537600000", + "predictedDeliveryPrice": "", + "basisRate": "", + "deliveryFeeRate": "", + "deliveryTime": "0", + "openInterestValue": "58808896.49" + } + ] + }, + "retExtInfo": { + + }, + "time": 1716526378174 + }`)) + })) + defer mockServer.Close() + + rawDefinition := ` + { + "url": "` + mockServer.URL + `", + "headers": { + "Content-Type": "application/json" + }, + "method": "GET", + "reducers": [ + { + "function": "PARSE", + "args": [ + "result", + "list" + ] + }, + { + "function": "INDEX", + "args": 0 + }, + { + "function": "PARSE", + "args": [ + "lastPrice" + ] + }, + { + "function": "POW10", + "args": 8 + }, + { + "function": "ROUND" + } + ] + }` + + definition := new(Definition) + err := json.Unmarshal([]byte(rawDefinition), &definition) + if err != nil { + t.Fatalf("error unmarshalling definition: %v", err) + } + + result, err := FetchSingle(ctx, definition) + if err != nil { + t.Fatalf("error fetching single: %v", err) + } + assert.Greater(t, result, float64(0)) +} + +func TestGetTokenPrice(t *testing.T) { + rawDefinition := `{ + "chainId": "1", + "address": "0x9db9e0e53058c89e5b94e29621a205198648425b", + "type": "UniswapPool", + "token0Decimals": 8, + "token1Decimals": 6 + }` + + definition := new(Definition) + err := json.Unmarshal([]byte(rawDefinition), &definition) + if err != nil { + t.Fatalf("error unmarshalling definition: %v", err) + } + + sqrtPriceX96 := new(big.Int) + sqrtPriceX96.SetString("2055909007346292057510600778491", 10) + result, err := getTokenPrice(sqrtPriceX96, definition) + if err != nil { + t.Fatalf("error getting token price: %v", err) + } + + assert.Equal(t, float64(6.733620107923e+12), result) +} + +func TestSetLatestFeedData(t *testing.T) { + ctx := context.Background() + feedData := []FeedData{ + { + FeedID: 1, + Value: 0.1, + }, + { + FeedID: 2, + Value: 0.2, + }, + } + + err := setLatestFeedData(ctx, feedData, 1*time.Second) + if err != nil { + t.Fatalf("error setting latest feed data: %v", err) + } + keys := []string{"latestFeedData:1", "latestFeedData:2"} + + defer db.Del(ctx, keys[0]) + defer db.Del(ctx, keys[1]) + + result, err := db.MGetObject[FeedData](ctx, keys) + if err != nil { + t.Fatalf("error getting latest feed data: %v", err) + } + + assert.Equal(t, 2, len(result)) + assert.Contains(t, result, feedData[0]) + assert.Contains(t, result, feedData[1]) +} + +func TestGetLatestFeedData(t *testing.T) { + ctx := context.Background() + feedData := []FeedData{ + { + FeedID: 1, + Value: 0.1, + }, + { + FeedID: 2, + Value: 0.2, + }, + } + + keys := []string{"latestFeedData:1", "latestFeedData:2"} + err := setLatestFeedData(ctx, feedData, 1*time.Second) + if err != nil { + t.Fatalf("error setting latest feed data: %v", err) + } + defer db.Del(ctx, keys[0]) + defer db.Del(ctx, keys[1]) + + result, err := getLatestFeedData(ctx, []int32{1, 2}) + if err != nil { + t.Fatalf("error getting latest feed data: %v", err) + } + + assert.Equal(t, 2, len(result)) + assert.Contains(t, result, feedData[0]) + assert.Contains(t, result, feedData[1]) +} + +func TestSetFeedDataBuffer(t *testing.T) { + ctx := context.Background() + feedData := []FeedData{ + { + FeedID: 1, + Value: 0.1, + }, + { + FeedID: 2, + Value: 0.2, + }, + } + + err := setFeedDataBuffer(ctx, feedData) + if err != nil { + t.Fatalf("error setting feed data buffer: %v", err) + } + + defer db.Del(ctx, "feedDataBuffer") + + result, err := db.LRangeObject[FeedData](ctx, "feedDataBuffer", 0, -1) + if err != nil { + t.Fatalf("error getting feed data buffer: %v", err) + } + + assert.Equal(t, 2, len(result)) + assert.Contains(t, result, feedData[0]) + assert.Contains(t, result, feedData[1]) +} + +func TestGetFeedDataBuffer(t *testing.T) { + ctx := context.Background() + feedData := []FeedData{ + { + FeedID: 1, + Value: 0.1, + }, + { + FeedID: 2, + Value: 0.2, + }, + } + + err := setFeedDataBuffer(ctx, feedData) + if err != nil { + t.Fatalf("error setting feed data buffer: %v", err) + } + + defer db.Del(ctx, "feedDataBuffer") + + result, err := getFeedDataBuffer(ctx) + if err != nil { + t.Fatalf("error getting feed data buffer: %v", err) + } + + assert.Equal(t, 2, len(result)) + assert.Contains(t, result, feedData[0]) + assert.Contains(t, result, feedData[1]) +} + +func TestInsertLocalAggregatePgsql(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + configs := testItems.insertedConfigs + for i, config := range configs { + insertLocalAggregateErr := insertLocalAggregatePgsql(ctx, config.Id, float64(i)+5) + if insertLocalAggregateErr != nil { + t.Fatalf("error inserting local aggregate pgsql: %v", insertLocalAggregateErr) + } + } + + defer db.QueryWithoutResult(ctx, "DELETE FROM local_aggregates", nil) + result, err := db.QueryRows[aggregator.LocalAggregate](ctx, "SELECT * FROM local_aggregates", nil) + if err != nil { + t.Fatalf("error getting local aggregate pgsql: %v", err) + } + + assert.Equal(t, len(configs), len(result)) +} + +func TestInsertLocalAggregateRdb(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + configs := testItems.insertedConfigs + for i, config := range configs { + err := insertLocalAggregateRdb(ctx, config.Id, float64(i)+5) + if err != nil { + t.Fatalf("error inserting local aggregate rdb: %v", err) + } + defer db.Del(ctx, "localAggregate:"+strconv.Itoa(int(config.Id))) + } + + for _, config := range configs { + key := "localAggregate:" + strconv.Itoa(int(config.Id)) + result, err := db.GetObject[aggregator.LocalAggregate](ctx, key) + if err != nil { + t.Fatalf("error getting local aggregate rdb: %v", err) + } + assert.Equal(t, int32(config.Id), result.ConfigId) + } +} + +func TestCopyFeedData(t *testing.T) { + ctx := context.Background() + clean, testItems, err := setup(ctx) + if err != nil { + t.Fatalf("error setting up test: %v", err) + } + defer func() { + if cleanupErr := clean(); cleanupErr != nil { + t.Logf("Cleanup failed: %v", cleanupErr) + } + }() + + feeds := testItems.insertedFeeds + feedData := []FeedData{} + + for i, feed := range feeds { + now := time.Now().Round(time.Second) + feedData = append(feedData, FeedData{ + FeedID: int32(*feed.Id), + Value: float64(i) + 5, + Timestamp: &now, + }) + } + + err = setFeedDataBuffer(ctx, feedData) + if err != nil { + t.Fatalf("error setting feed data buffer: %v", err) + } + + defer db.Del(ctx, "feedDataBuffer") + + err = copyFeedData(ctx, feedData) + if err != nil { + t.Fatalf("error copying feed data: %v", err) + } + + defer db.QueryWithoutResult(ctx, "DELETE FROM feed_data", nil) + result, err := db.QueryRows[FeedData](ctx, "SELECT * FROM feed_data", nil) + if err != nil { + t.Fatalf("error getting feed data: %v", err) + } + assert.Contains(t, result, feedData[0]) +}