Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Oct 30, 2024
1 parent 9283456 commit ab40030
Show file tree
Hide file tree
Showing 15 changed files with 170 additions and 115 deletions.
6 changes: 3 additions & 3 deletions node/pkg/websocketfetcher/common/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type DexFetcherConfig struct {
}

type FeedMaps struct {
Combined map[string]int32
Separated map[string]int32
Combined map[string][]int32
Separated map[string][]int32
}

type FetcherOption func(*FetcherConfig)
Expand Down Expand Up @@ -106,7 +106,7 @@ func WithDexFeedDataBuffer(feedDataBuffer chan *FeedData) DexFetcherOption {
}

type Fetcher struct {
FeedMap map[string]int32
FeedMap map[string][]int32
Ws *wss.WebsocketHelper
FeedDataBuffer chan *FeedData
VolumeCacheMap VolumeCacheMap
Expand Down
11 changes: 7 additions & 4 deletions node/pkg/websocketfetcher/common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,15 @@ func GetWssFeedMap(feeds []Feed) map[string]FeedMaps {

if _, exists := feedMaps[provider]; !exists {
feedMaps[provider] = FeedMaps{
Combined: make(map[string]int32),
Separated: make(map[string]int32),
Combined: make(map[string][]int32),
Separated: make(map[string][]int32),
}
feedMaps[provider].Combined[combinedName] = []int32{}
feedMaps[provider].Separated[separatedName] = []int32{}

}
feedMaps[provider].Combined[combinedName] = feed.ID
feedMaps[provider].Separated[separatedName] = feed.ID
feedMaps[provider].Combined[combinedName] = append(feedMaps[provider].Combined[combinedName], feed.ID)
feedMaps[provider].Separated[separatedName] = append(feedMaps[provider].Separated[separatedName], feed.ID)
}
return feedMaps
}
Expand Down
7 changes: 5 additions & 2 deletions node/pkg/websocketfetcher/providers/binance/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ func (b *BinanceFetcher) handleMessage(ctx context.Context, message map[string]a
return nil
}

feedData, err := TickerToFeedData(ticker, b.FeedMap)
feedDataList, err := TickerToFeedData(ticker, b.FeedMap)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in MiniTickerToFeedData")
return err
}

b.FeedDataBuffer <- feedData
for _, feedData := range feedDataList {
b.FeedDataBuffer <- feedData
}

return nil
}

Expand Down
28 changes: 17 additions & 11 deletions node/pkg/websocketfetcher/providers/binance/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,33 @@ import (
"bisonai.com/miko/node/pkg/websocketfetcher/common"
)

func TickerToFeedData(miniTicker MiniTicker, feedMap map[string]int32) (*common.FeedData, error) {
feedData := new(common.FeedData)
func TickerToFeedData(miniTicker MiniTicker, feedMap map[string][]int32) ([]*common.FeedData, error) {

timestamp := time.UnixMilli(miniTicker.EventTime)
value, err := common.PriceStringToFloat64(miniTicker.Price)
if err != nil {
return feedData, err
return nil, err
}

volume, err := common.VolumeStringToFloat64(miniTicker.Volume)
if err != nil {
return feedData, err
return nil, err
}

id, exists := feedMap[miniTicker.Symbol]
ids, exists := feedMap[miniTicker.Symbol]
if !exists {
return feedData, fmt.Errorf("feed not found")
return nil, fmt.Errorf("feed not found")
}

result := []*common.FeedData{}
for _, id := range ids {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
result = append(result, feedData)
}
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume

return feedData, nil
return result, nil
}
6 changes: 4 additions & 2 deletions node/pkg/websocketfetcher/providers/bingx/bingx.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,15 @@ func (f *BingxFetcher) handleMessage(ctx context.Context, message map[string]any
log.Error().Str("Player", "Bingx").Err(err).Msg("error in bingx.handleMessage, failed to parse response")
return err
}
feedData, err := ResponseToFeedData(raw, f.FeedMap)
feedDataList, err := ResponseToFeedData(raw, f.FeedMap)
if err != nil {
log.Error().Str("Player", "Bingx").Err(err).Msg("error in bingx.handleMessage, failed to convert response to feed data")
return err
}

f.FeedDataBuffer <- feedData
for _, feedData := range feedDataList {
f.FeedDataBuffer <- feedData
}
}
return nil

Expand Down
23 changes: 14 additions & 9 deletions node/pkg/websocketfetcher/providers/bingx/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,26 @@ import (
"bisonai.com/miko/node/pkg/websocketfetcher/common"
)

func ResponseToFeedData(response Response, feedMap map[string]int32) (*common.FeedData, error) {
feedData := new(common.FeedData)
func ResponseToFeedData(response Response, feedMap map[string][]int32) ([]*common.FeedData, error) {

symbol := response.Data.Symbol
id, exists := feedMap[symbol]
ids, exists := feedMap[symbol]
if !exists {
return feedData, fmt.Errorf("feed not found for %s", symbol)
return nil, fmt.Errorf("feed not found for %s", symbol)
}
timestamp := time.UnixMilli(response.Data.EventTime)
value := common.FormatFloat64Price(response.Data.Price)
volume := response.Data.Volume

feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
return feedData, nil
result := []*common.FeedData{}
for _, id := range ids {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
result = append(result, feedData)
}

return result, nil
}
19 changes: 11 additions & 8 deletions node/pkg/websocketfetcher/providers/bitget/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
"github.com/rs/zerolog/log"
)

func ResponseToFeedDataList(data Response, feedMap map[string]int32) []*common.FeedData {
func ResponseToFeedDataList(data Response, feedMap map[string][]int32) []*common.FeedData {
feedDataList := []*common.FeedData{}
for _, tick := range data.Data {
id, exists := feedMap[tick.InstId]
ids, exists := feedMap[tick.InstId]
if !exists {
log.Error().Str("instId", tick.InstId).Msg("feed not found")
continue
Expand All @@ -32,13 +32,16 @@ func ResponseToFeedDataList(data Response, feedMap map[string]int32) []*common.F
continue
}
timestamp := time.UnixMilli(timestampRaw)
feedData := &common.FeedData{
FeedID: id,
Value: value,
Timestamp: &timestamp,
Volume: volume,

for _, id := range ids {
feedData := &common.FeedData{
FeedID: id,
Value: value,
Timestamp: &timestamp,
Volume: volume,
}
feedDataList = append(feedDataList, feedData)
}
feedDataList = append(feedDataList, feedData)
}
return feedDataList
}
6 changes: 4 additions & 2 deletions node/pkg/websocketfetcher/providers/bithumb/bithumb.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ func (f *BithumbFetcher) handleMessage(ctx context.Context, message map[string]a
return err
}

feedData, err := TickerResponseToFeedData(tickerResponse, f.FeedMap)
feedDataList, err := TickerResponseToFeedData(tickerResponse, f.FeedMap)
if err != nil {
log.Error().Str("Player", "Bithumb").Err(err).Msg("error in bithumb.handleMessage")
return err
}

f.FeedDataBuffer <- feedData
for _, feedData := range feedDataList {
f.FeedDataBuffer <- feedData
}

return nil
}
Expand Down
21 changes: 13 additions & 8 deletions node/pkg/websocketfetcher/providers/bithumb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TransactionResponseToFeedDataList(data TransactionResponse, feedMap map[str
return feedData, nil
}

func TickerResponseToFeedData(data TickerResponse, feedMap map[string]int32) (*common.FeedData, error) {
func TickerResponseToFeedData(data TickerResponse, feedMap map[string][]int32) ([]*common.FeedData, error) {
loc, _ := time.LoadLocation("Asia/Seoul")

date, err := time.ParseInLocation(dateLayout, data.Content.Date, loc)
Expand Down Expand Up @@ -88,17 +88,22 @@ func TickerResponseToFeedData(data TickerResponse, feedMap map[string]int32) (*c
splitted := strings.Split(data.Content.Symbol, "_")
symbol := splitted[0] + "-" + splitted[1]

id, exists := feedMap[symbol]
ids, exists := feedMap[symbol]
if !exists {
log.Warn().Str("Player", "bithumb").Str("symbol", symbol).Msg("feed not found")
return nil, nil
}

return &common.FeedData{
FeedID: id,
Value: price,
Timestamp: &timestamp,
Volume: volume,
}, nil
result := []*common.FeedData{}
for _, id := range ids {
result = append(result, &common.FeedData{
FeedID: id,
Value: price,
Timestamp: &timestamp,
Volume: volume,
})
}

return result, nil

}
17 changes: 10 additions & 7 deletions node/pkg/websocketfetcher/providers/bitmart/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ import (
"github.com/rs/zerolog/log"
)

func ResponseToFeedData(response Response, feedMap map[string]int32) []*common.FeedData {
func ResponseToFeedData(response Response, feedMap map[string][]int32) []*common.FeedData {
feedDataList := []*common.FeedData{}
for _, data := range response.Data {
symbol := strings.ReplaceAll(data.Symbol, "_", "-")
id, exists := feedMap[symbol]
ids, exists := feedMap[symbol]
if !exists {
log.Warn().Str("Player", "Bitmart").Str("key", symbol).Msg("feed not found")
continue
Expand All @@ -29,11 +29,14 @@ func ResponseToFeedData(response Response, feedMap map[string]int32) []*common.F
continue
}
timestamp := time.UnixMilli(data.Time)
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
feedDataList = append(feedDataList, feedData)

for _, id := range ids {
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
feedDataList = append(feedDataList, feedData)
}
}
return feedDataList
}
6 changes: 4 additions & 2 deletions node/pkg/websocketfetcher/providers/bitstamp/bitstamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,15 @@ func (f *BitstampFetcher) handleMessage(ctx context.Context, message map[string]
return nil
}

feedData, err := TradeEventToFeedData(response, f.FeedMap, &f.VolumeCacheMap)
feedDataList, err := TradeEventToFeedData(response, f.FeedMap, &f.VolumeCacheMap)
if err != nil {
log.Error().Str("Player", "Bitstamp").Err(err).Msg("error in TradeEventToFeedData")
return err
}

f.FeedDataBuffer <- feedData
for _, feedData := range feedDataList {
f.FeedDataBuffer <- feedData
}

return nil
}
Expand Down
53 changes: 31 additions & 22 deletions node/pkg/websocketfetcher/providers/bitstamp/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,46 @@ import (
"github.com/rs/zerolog/log"
)

func TradeEventToFeedData(data TradeEvent, feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMap) (*common.FeedData, error) {
feedData := new(common.FeedData)
func TradeEventToFeedData(data TradeEvent, feedMap map[string][]int32, volumeCacheMap *common.VolumeCacheMap) ([]*common.FeedData, error) {

rawTimestamp, err := strconv.ParseInt(data.Data.Microtimestamp, 10, 64)
if err != nil {
return feedData, err
return nil, err
}

timestamp := time.Unix(0, rawTimestamp*int64(time.Microsecond))
value := common.FormatFloat64Price(data.Data.Price)
splitted := strings.Split(data.Channel, "_")
if len(splitted) < 3 {
return feedData, fmt.Errorf("invalid feed name")
return nil, fmt.Errorf("invalid feed name")
}
rawSymbol := splitted[2]
id, exists := feedMap[strings.ToUpper(rawSymbol)]
ids, exists := feedMap[strings.ToUpper(rawSymbol)]
if !exists {
return feedData, fmt.Errorf("feed not found")
return nil, fmt.Errorf("feed not found")
}
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp

volumeData, exists := volumeCacheMap.Map[id]
if !exists || volumeData.UpdatedAt.Before(time.Now().Add(-common.VolumeCacheLifespan)) {
feedData.Volume = 0
} else {
feedData.Volume = volumeData.Volume
result := []*common.FeedData{}

for _, id := range ids {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp

volumeData, exists := volumeCacheMap.Map[id]
if !exists || volumeData.UpdatedAt.Before(time.Now().Add(-common.VolumeCacheLifespan)) {
feedData.Volume = 0
} else {
feedData.Volume = volumeData.Volume
}
result = append(result, feedData)
}

return feedData, nil
return result, nil
}

func FetchVolumes(feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMap) error {
func FetchVolumes(feedMap map[string][]int32, volumeCacheMap *common.VolumeCacheMap) error {
result, err := request.Request[[]VolumeEntry](request.WithEndpoint(ALL_CURRENCY_PAIR_TICKER_ENDPOINT), request.WithTimeout(common.VolumeFetchTimeout))
if err != nil {
log.Error().Str("Player", "Bitstamp").Err(err).Msg("error in FetchVolumes")
Expand All @@ -53,7 +60,7 @@ func FetchVolumes(feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMa
for i := range result {
entry := &result[i]
symbol := strings.ReplaceAll(entry.Pair, "/", "")
id, exists := feedMap[symbol]
ids, exists := feedMap[symbol]
if !exists {
continue
}
Expand All @@ -64,12 +71,14 @@ func FetchVolumes(feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMa
continue
}

volumeCacheMap.Mutex.Lock()
volumeCacheMap.Map[id] = common.VolumeCache{
UpdatedAt: time.Now(),
Volume: volume,
for _, id := range ids {
volumeCacheMap.Mutex.Lock()
volumeCacheMap.Map[id] = common.VolumeCache{
UpdatedAt: time.Now(),
Volume: volume,
}
volumeCacheMap.Mutex.Unlock()
}
volumeCacheMap.Mutex.Unlock()
}

return nil
Expand Down
Loading

0 comments on commit ab40030

Please sign in to comment.