diff --git a/node/pkg/admin/fetcher/controller.go b/node/pkg/admin/fetcher/controller.go index 35b0c6594..da29a9839 100644 --- a/node/pkg/admin/fetcher/controller.go +++ b/node/pkg/admin/fetcher/controller.go @@ -52,39 +52,3 @@ func refresh(c *fiber.Ctx) error { } return c.SendString("fetcher refreshed: " + strconv.FormatBool(resp.Success)) } - -func activate(c *fiber.Ctx) error { - id := c.Params("id") - - msg, err := utils.SendMessage(c, bus.FETCHER, bus.ACTIVATE_FETCHER, map[string]any{"id": id}) - if err != nil { - log.Error().Err(err).Str("Player", "Admin").Msg("failed to send message to fetcher") - return c.Status(fiber.StatusInternalServerError).SendString("failed to send activate message to fetcher: " + err.Error()) - } - - resp := <-msg.Response - if !resp.Success { - log.Error().Str("Player", "Admin").Msg("failed to activate fetcher") - return c.Status(fiber.StatusInternalServerError).SendString("failed to activate adapter: " + resp.Args["error"].(string)) - } - - return c.JSON(resp) -} - -func deactivate(c *fiber.Ctx) error { - id := c.Params("id") - - msg, err := utils.SendMessage(c, bus.FETCHER, bus.DEACTIVATE_FETCHER, map[string]any{"id": id}) - if err != nil { - log.Error().Err(err).Str("Player", "Admin").Msg("failed to send message to fetcher") - return c.Status(fiber.StatusInternalServerError).SendString("failed to send deactivate message to fetcher: " + err.Error()) - } - - resp := <-msg.Response - if !resp.Success { - log.Error().Str("Player", "Admin").Msg("failed to deactivate fetcher") - return c.Status(fiber.StatusInternalServerError).SendString("failed to deactivate adapter: " + resp.Args["error"].(string)) - } - - return c.JSON(resp) -} diff --git a/node/pkg/admin/fetcher/router.go b/node/pkg/admin/fetcher/router.go index 1256e9d55..15a3ab0b6 100644 --- a/node/pkg/admin/fetcher/router.go +++ b/node/pkg/admin/fetcher/router.go @@ -10,6 +10,4 @@ func Routes(router fiber.Router) { fetcher.Post("/start", start) fetcher.Post("/stop", stop) fetcher.Post("/refresh", refresh) - fetcher.Post("/activate/:id", activate) - fetcher.Post("/deactivate/:id", deactivate) } diff --git a/node/pkg/admin/tests/fetcher_test.go b/node/pkg/admin/tests/fetcher_test.go index 6048b1751..6e9e99fde 100644 --- a/node/pkg/admin/tests/fetcher_test.go +++ b/node/pkg/admin/tests/fetcher_test.go @@ -3,7 +3,6 @@ package tests import ( "context" - "strconv" "testing" "bisonai.com/miko/node/pkg/bus" @@ -66,38 +65,3 @@ func TestFetcherRefresh(t *testing.T) { assert.Equal(t, string(result), "fetcher refreshed: true") } - -func TestFetcherDeactivate(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - channel := testItems.mb.Subscribe(bus.FETCHER) - waitForMessage(t, channel, bus.ADMIN, bus.FETCHER, bus.DEACTIVATE_FETCHER) - - _, err = RawPostRequest(testItems.app, "/api/v1/fetcher/deactivate/"+strconv.Itoa(int(testItems.tmpData.config.ID)), nil) - if err != nil { - t.Fatalf("error deactivating adapter: %v", err) - } -} - -func TestAdapterActivate(t *testing.T) { - ctx := context.Background() - cleanup, testItems, err := setup(ctx) - if err != nil { - t.Fatalf("error setting up test: %v", err) - } - defer cleanup() - - channel := testItems.mb.Subscribe(bus.FETCHER) - waitForMessage(t, channel, bus.ADMIN, bus.FETCHER, bus.ACTIVATE_FETCHER) - - // activate - _, err = RawPostRequest(testItems.app, "/api/v1/fetcher/activate/"+strconv.Itoa(int(testItems.tmpData.config.ID)), nil) - if err != nil { - t.Fatalf("error activating adapter: %v", err) - } -} diff --git a/node/pkg/fetcher/app.go b/node/pkg/fetcher/app.go index 2f8eb15f7..40649e7de 100644 --- a/node/pkg/fetcher/app.go +++ b/node/pkg/fetcher/app.go @@ -74,40 +74,6 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) { } switch msg.Content.Command { - case bus.ACTIVATE_FETCHER: - log.Debug().Str("Player", "Fetcher").Msg("activate fetcher msg received") - configId, err := bus.ParseInt32MsgParam(msg, "id") - if err != nil { - log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to parse configId") - bus.HandleMessageError(err, msg, "failed to parse configId") - return - } - - log.Debug().Str("Player", "Fetcher").Int32("configId", configId).Msg("activating fetcher") - err = a.startFetcherById(ctx, configId) - if err != nil { - log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to start fetcher") - bus.HandleMessageError(err, msg, "failed to start fetcher") - return - } - msg.Response <- bus.MessageResponse{Success: true} - case bus.DEACTIVATE_FETCHER: - log.Debug().Str("Player", "Fetcher").Msg("deactivate fetcher msg received") - configId, err := bus.ParseInt32MsgParam(msg, "id") - if err != nil { - log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to parse configId") - bus.HandleMessageError(err, msg, "failed to parse configId") - return - } - - log.Debug().Str("Player", "Fetcher").Int32("configId", configId).Msg("deactivating fetcher") - err = a.stopFetcherById(ctx, configId) - if err != nil { - log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to stop fetcher") - bus.HandleMessageError(err, msg, "failed to stop fetcher") - return - } - msg.Response <- bus.MessageResponse{Success: true} case bus.STOP_FETCHER_APP: log.Debug().Str("Player", "Fetcher").Msg("stopping all fetchers") err := a.stopAll(ctx) @@ -170,6 +136,8 @@ func (a *App) startAll(ctx context.Context) error { } func (a *App) stopAll(ctx context.Context) error { + a.WebsocketFetcher.Stop() + err := a.stopAllFetchers(ctx) if err != nil { return err @@ -234,14 +202,6 @@ func (a *App) startLocalAggregateBulkWriter(ctx context.Context) { log.Debug().Str("Player", "Fetcher").Msg("LocalAggregateBulkWriter started") } -func (a *App) startFetcherById(ctx context.Context, configId int32) error { - if fetcher, ok := a.Fetchers[configId]; ok { - return a.startFetcher(ctx, fetcher) - } - log.Error().Str("Player", "Fetcher").Int32("adapterId", configId).Msg("fetcher not found") - return errorSentinel.ErrFetcherNotFound -} - func (a *App) startAllFetchers(ctx context.Context) error { for _, fetcher := range a.Fetchers { err := a.startFetcher(ctx, fetcher) @@ -324,13 +284,6 @@ func (a *App) stopLocalAggregateBulkWriter() error { return nil } -func (a *App) stopFetcherById(ctx context.Context, configId int32) error { - if fetcher, ok := a.Fetchers[configId]; ok { - return a.stopFetcher(ctx, fetcher) - } - return errorSentinel.ErrFetcherNotFound -} - func (a *App) stopAllFetchers(ctx context.Context) error { for _, fetcher := range a.Fetchers { err := a.stopFetcher(ctx, fetcher) diff --git a/node/pkg/websocketfetcher/app.go b/node/pkg/websocketfetcher/app.go index f2ed15819..531329b1c 100644 --- a/node/pkg/websocketfetcher/app.go +++ b/node/pkg/websocketfetcher/app.go @@ -106,6 +106,7 @@ type App struct { chainReader *websocketchainreader.ChainReader latestFeedDataMap *types.LatestFeedDataMap feedDataDumpChannel chan *common.FeedData + cancel context.CancelFunc } func New() *App { @@ -250,22 +251,31 @@ func (a *App) initializeDex(ctx context.Context, appConfig AppConfig) error { } func (a *App) Start(ctx context.Context) { + ctxWithCancel, cancel := context.WithCancel(ctx) + a.cancel = cancel + for _, fetcher := range a.fetchers { - go fetcher.Run(ctx) + go fetcher.Run(ctxWithCancel) } ticker := time.NewTicker(a.storeInterval) for { select { - case <-ctx.Done(): + case <-ctxWithCancel.Done(): ticker.Stop() return case <-ticker.C: - go a.storeFeedData(ctx) + go a.storeFeedData(ctxWithCancel) } } } +func (a *App) Stop() { + if a.cancel != nil { + a.cancel() + } +} + func (a *App) storeFeedData(ctx context.Context) { select { case <-ctx.Done():