From 822b2cf92be2cb6f5de72589216004342cca3919 Mon Sep 17 00:00:00 2001 From: nick Date: Fri, 23 Aug 2024 14:45:16 +0900 Subject: [PATCH] feat: implement proxy for websocket fetchers --- node/pkg/websocketfetcher/app.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/node/pkg/websocketfetcher/app.go b/node/pkg/websocketfetcher/app.go index f2ed15819..321a6464e 100644 --- a/node/pkg/websocketfetcher/app.go +++ b/node/pkg/websocketfetcher/app.go @@ -3,6 +3,7 @@ package websocketfetcher import ( "context" "errors" + "fmt" "os" "sync" "time" @@ -106,6 +107,7 @@ type App struct { chainReader *websocketchainreader.ChainReader latestFeedDataMap *types.LatestFeedDataMap feedDataDumpChannel chan *common.FeedData + proxies []types.Proxy } func New() *App { @@ -113,7 +115,12 @@ func New() *App { } func (a *App) Init(ctx context.Context, opts ...AppOption) error { - // TODO: Proxy support + proxies, err := db.QueryRows[types.Proxy](ctx, "SELECT * FROM proxies", nil) + if err != nil { + return err + } + a.proxies = proxies + cexFactories := map[string]func(context.Context, ...common.FetcherOption) (common.FetcherInterface, error){ "binance": binance.New, "coinbase": coinbase.New, @@ -195,7 +202,14 @@ func (a *App) initializeCex(ctx context.Context, appConfig AppConfig) error { a.buffer = make(chan *common.FeedData, appConfig.BufferSize) a.storeInterval = appConfig.StoreInterval + index := 0 for name, factory := range appConfig.CexFactories { + proxyUrl := "" + if len(a.proxies) != 0 { + proxy := a.proxies[index%len(a.proxies)] + proxyUrl = fmt.Sprintf("%s://%s:%d", proxy.Protocol, proxy.Host, proxy.Port) + } + if _, ok := feedMap[name]; !ok { log.Warn().Msgf("no feeds for %s", name) continue @@ -204,12 +218,15 @@ func (a *App) initializeCex(ctx context.Context, appConfig AppConfig) error { ctx, common.WithFeedDataBuffer(a.buffer), common.WithFeedMaps(feedMap[name]), + common.WithProxy(proxyUrl), ) if err != nil { log.Error().Err(err).Msgf("error in creating %s fetcher", name) return err } a.fetchers = append(a.fetchers, fetcher) + + index++ } return nil }