Skip to content

Commit

Permalink
feat: support all providers
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Oct 31, 2024
1 parent ab40030 commit 35f2025
Show file tree
Hide file tree
Showing 29 changed files with 328 additions and 209 deletions.
4 changes: 4 additions & 0 deletions node/pkg/common/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ func (m *LatestFeedDataMap) SetLatestFeedData(feedData []*FeedData) error {
m.Mu.Lock()
defer m.Mu.Unlock()
for _, data := range feedData {
if data == nil {
continue
}

prev, ok := m.FeedDataMap[data.FeedID]
if ok && prev.Timestamp.After(*data.Timestamp) {
continue
Expand Down
17 changes: 10 additions & 7 deletions node/pkg/websocketfetcher/providers/bithumb/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const dateLayout = "20060102"
const timeLayout = "150405"

// currently not referenced since Transaction api does not support volume data
func TransactionResponseToFeedDataList(data TransactionResponse, feedMap map[string]int32) ([]*common.FeedData, error) {
func TransactionResponseToFeedDataList(data TransactionResponse, feedMap map[string][]int32) ([]*common.FeedData, error) {
feedData := []*common.FeedData{}
loc, err := time.LoadLocation("Asia/Seoul")
if err != nil {
Expand All @@ -41,18 +41,21 @@ func TransactionResponseToFeedDataList(data TransactionResponse, feedMap map[str
splitted := strings.Split(transaction.Symbol, "_")
symbol := splitted[0] + "-" + splitted[1]

id, exists := feedMap[symbol]
ids, exists := feedMap[symbol]
if !exists {
log.Warn().Str("Player", "bithumb").Str("symbol", symbol).Msg("feed not found")
continue
}

feedData = append(feedData, &common.FeedData{
FeedID: id,
Value: price,
Timestamp: &timestamp,
})
for _, id := range ids {
feedData = append(feedData, &common.FeedData{
FeedID: id,
Value: price,
Timestamp: &timestamp,
})
}
}

return feedData, nil
}

Expand Down
6 changes: 4 additions & 2 deletions node/pkg/websocketfetcher/providers/coinbase/coinbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,14 @@ func (c *CoinbaseFetcher) handleMessage(ctx context.Context, message map[string]
return nil
}

feedData, err := TickerToFeedData(ticker, c.FeedMap)
feedDataList, err := TickerToFeedData(ticker, c.FeedMap)
if err != nil {
return err
}

c.FeedDataBuffer <- feedData
for _, feedData := range feedDataList {
c.FeedDataBuffer <- feedData
}

return nil
}
Expand Down
32 changes: 17 additions & 15 deletions node/pkg/websocketfetcher/providers/coinbase/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,35 +9,37 @@ import (
"github.com/rs/zerolog/log"
)

func TickerToFeedData(ticker Ticker, feedMap map[string]int32) (*common.FeedData, error) {
feedData := new(common.FeedData)
func TickerToFeedData(ticker Ticker, feedMap map[string][]int32) ([]*common.FeedData, error) {
ids, exists := feedMap[strings.ToUpper(ticker.ProductID)]
if !exists {
return nil, fmt.Errorf("feed not found")
}

timestamp, err := time.Parse(time.RFC3339Nano, ticker.Time)
if err != nil {
log.Warn().Err(err).Msg("error in parsing time")
timestamp = time.Now()
}

id, exists := feedMap[strings.ToUpper(ticker.ProductID)]
if !exists {
return feedData, fmt.Errorf("feed not found")
}

value, err := common.PriceStringToFloat64(ticker.Price)
if err != nil {
return feedData, err
return nil, err
}

volume, err := common.VolumeStringToFloat64(ticker.Volume24h)
if err != nil {
return feedData, err
return nil, err
}

feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume

return feedData, nil
result := []*common.FeedData{}
for _, id := range ids {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
result = append(result, feedData)
}

return result, nil
}
17 changes: 10 additions & 7 deletions node/pkg/websocketfetcher/providers/coinex/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ import (
"github.com/rs/zerolog/log"
)

func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common.FeedData, error) {
func ResponseToFeedDataList(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) {
feedDataList := []*common.FeedData{}

for _, item := range data.Params {
for key, value := range item {
feedData := new(common.FeedData)
id, exists := feedMap[key]
if !exists {
log.Warn().Str("Player", "Coinex").Str("key", key).Msg("feed not found")
Expand All @@ -29,11 +28,15 @@ func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common.
continue
}
timestamp := time.Now()
feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
feedData.Volume = volume
feedDataList = append(feedDataList, feedData)

for _, id := range id {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
feedData.Volume = volume
feedDataList = append(feedDataList, feedData)
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions node/pkg/websocketfetcher/providers/coinone/coinone.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ func (c *CoinoneFetcher) handleMessage(ctx context.Context, message map[string]a
if raw.ResponseType != "DATA" {
return nil
}
feedData, err := DataToFeedData(raw.Data, c.FeedMap)
feedDataList, err := DataToFeedData(raw.Data, c.FeedMap)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in DataToFeedData")
return err
}

c.FeedDataBuffer <- feedData
for _, feedData := range feedDataList {
c.FeedDataBuffer <- feedData
}

return nil
}

Expand Down
29 changes: 17 additions & 12 deletions node/pkg/websocketfetcher/providers/coinone/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,32 @@ import (
"bisonai.com/miko/node/pkg/websocketfetcher/common"
)

func DataToFeedData(data Data, feedMap map[string]int32) (*common.FeedData, error) {
feedData := new(common.FeedData)
func DataToFeedData(data Data, feedMap map[string][]int32) ([]*common.FeedData, error) {
ids, exists := feedMap[strings.ToUpper(data.TargetCurrency)+"-"+strings.ToUpper(data.QuoteCurrency)]
if !exists {
return nil, fmt.Errorf("feed not found")
}

timestamp := time.UnixMilli(data.Timestamp)
value, err := common.PriceStringToFloat64(data.Last)
if err != nil {
return feedData, err
return nil, err
}

volume, err := common.VolumeStringToFloat64(data.TargetVolume)
if err != nil {
return feedData, err
return nil, err
}

id, exists := feedMap[strings.ToUpper(data.TargetCurrency)+"-"+strings.ToUpper(data.QuoteCurrency)]
if !exists {
return feedData, fmt.Errorf("feed not found")
result := []*common.FeedData{}
for _, id := range ids {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
result = append(result, feedData)
}
feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
return feedData, nil

return result, nil
}
18 changes: 10 additions & 8 deletions node/pkg/websocketfetcher/providers/crypto/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/rs/zerolog/log"
)

func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common.FeedData, error) {
func ResponseToFeedDataList(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) {
feedData := []*common.FeedData{}

for _, tick := range data.Result.Data {
Expand Down Expand Up @@ -36,18 +36,20 @@ func ResponseToFeedDataList(data Response, feedMap map[string]int32) ([]*common.

base := rawSymbol[0]
quote := rawSymbol[1]
id, exists := feedMap[base+"-"+quote]
ids, exists := feedMap[base+"-"+quote]
if !exists {
log.Warn().Str("Player", "cryptodotcom").Str("symbol", base+"-"+quote).Msg("feed not found")
continue
}

feedData = append(feedData, &common.FeedData{
FeedID: id,
Value: value,
Timestamp: &timestamp,
Volume: volume,
})
for _, id := range ids {
feedData = append(feedData, &common.FeedData{
FeedID: id,
Value: value,
Timestamp: &timestamp,
Volume: volume,
})
}
}

return feedData, nil
Expand Down
8 changes: 6 additions & 2 deletions node/pkg/websocketfetcher/providers/gateio/gateio.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,16 @@ func (f *GateioFetcher) handleMessage(ctx context.Context, message map[string]an
return nil
}

feedData, err := ResponseToFeedData(response, f.FeedMap)
feedDataList, err := ResponseToFeedData(response, f.FeedMap)
if err != nil {
log.Error().Str("Player", "Gateio").Err(err).Msg("error in ResponseToFeedData")
return err
}
f.FeedDataBuffer <- feedData

for _, feedData := range feedDataList {
f.FeedDataBuffer <- feedData
}

return nil
}

Expand Down
28 changes: 17 additions & 11 deletions node/pkg/websocketfetcher/providers/gateio/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,35 @@ import (
"bisonai.com/miko/node/pkg/websocketfetcher/common"
)

func ResponseToFeedData(data Response, feedMap map[string]int32) (*common.FeedData, error) {
feedData := new(common.FeedData)
func ResponseToFeedData(data Response, feedMap map[string][]int32) ([]*common.FeedData, error) {

timestamp := time.Unix(data.Time, 0)
price, err := common.PriceStringToFloat64(data.Result.Last)
if err != nil {
return feedData, err
return nil, err
}

volume, err := common.VolumeStringToFloat64(data.Result.BaseVolume)
if err != nil {
return feedData, err
return nil, err
}

key := strings.Replace(data.Result.CurrencyPair, "_", "-", 1)
id, exists := feedMap[key]
ids, exists := feedMap[key]
if !exists {
return feedData, fmt.Errorf("feed not found")
return nil, fmt.Errorf("feed not found")
}

feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
feedData.Volume = volume
return feedData, nil
result := []*common.FeedData{}
for _, id := range ids {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
feedData.Volume = volume

result = append(result, feedData)
}

return result, nil
}
41 changes: 23 additions & 18 deletions node/pkg/websocketfetcher/providers/gemini/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/rs/zerolog/log"
)

func TradeResponseToFeedDataList(data Response, feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMap) ([]*common.FeedData, error) {
func TradeResponseToFeedDataList(data Response, feedMap map[string][]int32, volumeCacheMap *common.VolumeCacheMap) ([]*common.FeedData, error) {
feedDataList := []*common.FeedData{}

timestamp := time.UnixMilli(*data.TimestampMs)
for _, event := range data.Events {
feedData := new(common.FeedData)
id, exists := feedMap[event.Symbol]

ids, exists := feedMap[event.Symbol]
if !exists {
log.Warn().Str("Player", "Gemini").Str("key", event.Symbol).Msg("feed not found")
continue
Expand All @@ -26,24 +26,27 @@ func TradeResponseToFeedDataList(data Response, feedMap map[string]int32, volume
log.Warn().Str("Player", "Gemini").Err(err).Msg("error in PriceStringToFloat64")
continue
}
feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
volumeData, exists := volumeCacheMap.Map[id]
if !exists || volumeData.UpdatedAt.Before(time.Now().Add(-common.VolumeCacheLifespan)) {
feedData.Volume = 0
} else {
feedData.Volume = volumeData.Volume
}

feedDataList = append(feedDataList, feedData)
for _, id := range ids {
feedData := new(common.FeedData)
feedData.FeedID = id
feedData.Value = price
feedData.Timestamp = &timestamp
volumeData, exists := volumeCacheMap.Map[id]
if !exists || volumeData.UpdatedAt.Before(time.Now().Add(-common.VolumeCacheLifespan)) {
feedData.Volume = 0
} else {
feedData.Volume = volumeData.Volume
}
feedDataList = append(feedDataList, feedData)
}
}

return feedDataList, nil
}

func FetchVolumes(feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMap) {
for symbol, id := range feedMap {
func FetchVolumes(feedMap map[string][]int32, volumeCacheMap *common.VolumeCacheMap) {
for symbol, ids := range feedMap {
endpoint := TICKER_ENDPOINT + strings.ToLower(symbol)
result, err := request.Request[HttpTickerResponse](request.WithEndpoint(endpoint), request.WithTimeout(common.VolumeFetchTimeout))
if err != nil {
Expand Down Expand Up @@ -72,9 +75,11 @@ func FetchVolumes(feedMap map[string]int32, volumeCacheMap *common.VolumeCacheMa
}

volumeCacheMap.Mutex.Lock()
volumeCacheMap.Map[id] = common.VolumeCache{
UpdatedAt: timestamp,
Volume: volume,
for _, id := range ids {
volumeCacheMap.Map[id] = common.VolumeCache{
UpdatedAt: timestamp,
Volume: volume,
}
}
volumeCacheMap.Mutex.Unlock()
}
Expand Down
Loading

0 comments on commit 35f2025

Please sign in to comment.