Skip to content

Commit

Permalink
feat: update restartFetcher
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Aug 27, 2024
1 parent 3f0cbbb commit c2d0fcb
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 126 deletions.
36 changes: 0 additions & 36 deletions node/pkg/admin/fetcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,39 +52,3 @@ func refresh(c *fiber.Ctx) error {
}
return c.SendString("fetcher refreshed: " + strconv.FormatBool(resp.Success))
}

func activate(c *fiber.Ctx) error {
id := c.Params("id")

msg, err := utils.SendMessage(c, bus.FETCHER, bus.ACTIVATE_FETCHER, map[string]any{"id": id})
if err != nil {
log.Error().Err(err).Str("Player", "Admin").Msg("failed to send message to fetcher")
return c.Status(fiber.StatusInternalServerError).SendString("failed to send activate message to fetcher: " + err.Error())
}

resp := <-msg.Response
if !resp.Success {
log.Error().Str("Player", "Admin").Msg("failed to activate fetcher")
return c.Status(fiber.StatusInternalServerError).SendString("failed to activate adapter: " + resp.Args["error"].(string))
}

return c.JSON(resp)
}

func deactivate(c *fiber.Ctx) error {
id := c.Params("id")

msg, err := utils.SendMessage(c, bus.FETCHER, bus.DEACTIVATE_FETCHER, map[string]any{"id": id})
if err != nil {
log.Error().Err(err).Str("Player", "Admin").Msg("failed to send message to fetcher")
return c.Status(fiber.StatusInternalServerError).SendString("failed to send deactivate message to fetcher: " + err.Error())
}

resp := <-msg.Response
if !resp.Success {
log.Error().Str("Player", "Admin").Msg("failed to deactivate fetcher")
return c.Status(fiber.StatusInternalServerError).SendString("failed to deactivate adapter: " + resp.Args["error"].(string))
}

return c.JSON(resp)
}
2 changes: 0 additions & 2 deletions node/pkg/admin/fetcher/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,4 @@ func Routes(router fiber.Router) {
fetcher.Post("/start", start)
fetcher.Post("/stop", stop)
fetcher.Post("/refresh", refresh)
fetcher.Post("/activate/:id", activate)
fetcher.Post("/deactivate/:id", deactivate)
}
36 changes: 0 additions & 36 deletions node/pkg/admin/tests/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tests

import (
"context"
"strconv"
"testing"

"bisonai.com/miko/node/pkg/bus"
Expand Down Expand Up @@ -66,38 +65,3 @@ func TestFetcherRefresh(t *testing.T) {

assert.Equal(t, string(result), "fetcher refreshed: true")
}

func TestFetcherDeactivate(t *testing.T) {
ctx := context.Background()
cleanup, testItems, err := setup(ctx)
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()

channel := testItems.mb.Subscribe(bus.FETCHER)
waitForMessage(t, channel, bus.ADMIN, bus.FETCHER, bus.DEACTIVATE_FETCHER)

_, err = RawPostRequest(testItems.app, "/api/v1/fetcher/deactivate/"+strconv.Itoa(int(testItems.tmpData.config.ID)), nil)
if err != nil {
t.Fatalf("error deactivating adapter: %v", err)
}
}

func TestAdapterActivate(t *testing.T) {
ctx := context.Background()
cleanup, testItems, err := setup(ctx)
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()

channel := testItems.mb.Subscribe(bus.FETCHER)
waitForMessage(t, channel, bus.ADMIN, bus.FETCHER, bus.ACTIVATE_FETCHER)

// activate
_, err = RawPostRequest(testItems.app, "/api/v1/fetcher/activate/"+strconv.Itoa(int(testItems.tmpData.config.ID)), nil)
if err != nil {
t.Fatalf("error activating adapter: %v", err)
}
}
51 changes: 2 additions & 49 deletions node/pkg/fetcher/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,40 +74,6 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
}

switch msg.Content.Command {
case bus.ACTIVATE_FETCHER:
log.Debug().Str("Player", "Fetcher").Msg("activate fetcher msg received")
configId, err := bus.ParseInt32MsgParam(msg, "id")
if err != nil {
log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to parse configId")
bus.HandleMessageError(err, msg, "failed to parse configId")
return
}

log.Debug().Str("Player", "Fetcher").Int32("configId", configId).Msg("activating fetcher")
err = a.startFetcherById(ctx, configId)
if err != nil {
log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to start fetcher")
bus.HandleMessageError(err, msg, "failed to start fetcher")
return
}
msg.Response <- bus.MessageResponse{Success: true}
case bus.DEACTIVATE_FETCHER:
log.Debug().Str("Player", "Fetcher").Msg("deactivate fetcher msg received")
configId, err := bus.ParseInt32MsgParam(msg, "id")
if err != nil {
log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to parse configId")
bus.HandleMessageError(err, msg, "failed to parse configId")
return
}

log.Debug().Str("Player", "Fetcher").Int32("configId", configId).Msg("deactivating fetcher")
err = a.stopFetcherById(ctx, configId)
if err != nil {
log.Error().Err(err).Str("Player", "Fetcher").Msg("failed to stop fetcher")
bus.HandleMessageError(err, msg, "failed to stop fetcher")
return
}
msg.Response <- bus.MessageResponse{Success: true}
case bus.STOP_FETCHER_APP:
log.Debug().Str("Player", "Fetcher").Msg("stopping all fetchers")
err := a.stopAll(ctx)
Expand Down Expand Up @@ -170,6 +136,8 @@ func (a *App) startAll(ctx context.Context) error {
}

func (a *App) stopAll(ctx context.Context) error {
a.WebsocketFetcher.Stop()

err := a.stopAllFetchers(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -234,14 +202,6 @@ func (a *App) startLocalAggregateBulkWriter(ctx context.Context) {
log.Debug().Str("Player", "Fetcher").Msg("LocalAggregateBulkWriter started")
}

func (a *App) startFetcherById(ctx context.Context, configId int32) error {
if fetcher, ok := a.Fetchers[configId]; ok {
return a.startFetcher(ctx, fetcher)
}
log.Error().Str("Player", "Fetcher").Int32("adapterId", configId).Msg("fetcher not found")
return errorSentinel.ErrFetcherNotFound
}

func (a *App) startAllFetchers(ctx context.Context) error {
for _, fetcher := range a.Fetchers {
err := a.startFetcher(ctx, fetcher)
Expand Down Expand Up @@ -324,13 +284,6 @@ func (a *App) stopLocalAggregateBulkWriter() error {
return nil
}

func (a *App) stopFetcherById(ctx context.Context, configId int32) error {
if fetcher, ok := a.Fetchers[configId]; ok {
return a.stopFetcher(ctx, fetcher)
}
return errorSentinel.ErrFetcherNotFound
}

func (a *App) stopAllFetchers(ctx context.Context) error {
for _, fetcher := range a.Fetchers {
err := a.stopFetcher(ctx, fetcher)
Expand Down
16 changes: 13 additions & 3 deletions node/pkg/websocketfetcher/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type App struct {
chainReader *websocketchainreader.ChainReader
latestFeedDataMap *types.LatestFeedDataMap
feedDataDumpChannel chan *common.FeedData
cancel context.CancelFunc
}

func New() *App {
Expand Down Expand Up @@ -250,22 +251,31 @@ func (a *App) initializeDex(ctx context.Context, appConfig AppConfig) error {
}

func (a *App) Start(ctx context.Context) {
ctxWithCancel, cancel := context.WithCancel(ctx)
a.cancel = cancel

for _, fetcher := range a.fetchers {
go fetcher.Run(ctx)
go fetcher.Run(ctxWithCancel)
}

ticker := time.NewTicker(a.storeInterval)
for {
select {
case <-ctx.Done():
case <-ctxWithCancel.Done():
ticker.Stop()
return
case <-ticker.C:
go a.storeFeedData(ctx)
go a.storeFeedData(ctxWithCancel)
}
}
}

func (a *App) Stop() {
if a.cancel != nil {
a.cancel()
}
}

func (a *App) storeFeedData(ctx context.Context) {
select {
case <-ctx.Done():
Expand Down

0 comments on commit c2d0fcb

Please sign in to comment.