Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed May 28, 2024
1 parent 7676923 commit a761f3a
Show file tree
Hide file tree
Showing 15 changed files with 708 additions and 0 deletions.
2 changes: 2 additions & 0 deletions node/pkg/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (f *Fetcher) fetch(chainHelpers map[string]ChainHelper, proxies []Proxy) ([
errChan <- fetchErr
return
}
case *definition.Type == "websocket":
return
default:
errChan <- errorSentinel.ErrFetcherInvalidType
}
Expand Down
1 change: 1 addition & 0 deletions node/pkg/wfetcher/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package wfetcher
73 changes: 73 additions & 0 deletions node/pkg/wfetcher/binance/binance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package binance

import (
"context"
"fmt"
"strings"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"bisonai.com/orakl/node/pkg/wss"
"github.com/rs/zerolog/log"
)

type BinanceFetcher wfetcher.Fetcher

// expected to recieve feedmap with key having format "<base><quote>"
func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*BinanceFetcher, error) {
config := &wfetcher.FetcherConfig{}
for _, opt := range opts {
opt(config)
}

fetcher := &BinanceFetcher{}
if len(config.FeedMap) == 0 {
log.Error().Str("Player", "Binance").Msg("no feed map")
return nil, fmt.Errorf("no feed map")
}
fetcher.FeedMap = config.FeedMap

streams := []Stream{}
for feed := range config.FeedMap {
streams = append(streams, Stream(strings.ToLower(feed)+"@miniTicker"))
}
subscription := Subscription{"SUBSCRIBE", streams, 1}

ws, err := wss.NewWebsocketHelper(ctx,
wss.WithEndpoint(URL),
wss.WithSubscriptions([]any{subscription}),
wss.WithProxyUrl(config.Proxy))
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in NewWebsocketHelper")
return nil, err
}
fetcher.Ws = ws

return fetcher, nil
}

func (b *BinanceFetcher) handleMessage(ctx context.Context, message map[string]any) error {
ticker, err := MessageToTicker(message)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in MessageToTicker")
return err
}

feedData, err := TickerToFeedData(ticker, b.FeedMap)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in MiniTickerToFeedData")
return err
}

err = wfetcher.StoreFeed(ctx, feedData, 2*time.Second)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in StoreFeed")
return err
}

return nil
}

func (b *BinanceFetcher) Run(ctx context.Context) {
b.Ws.Run(ctx, b.handleMessage)
}
25 changes: 25 additions & 0 deletions node/pkg/wfetcher/binance/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package binance

const (
URL = "wss://stream.binance.com:443/ws"
)

type Stream string

type Subscription struct {
Method string `json:"method"`
Params []Stream `json:"params"`
Id uint32 `json:"id"`
}

type MiniTicker struct {
EventType string `json:"e"`
EventTime int64 `json:"E"`
Symbol string `json:"s"`
Price string `json:"c"`
OpenPrice string `json:"o"`
HighPrice string `json:"h"`
LowPrice string `json:"l"`
Volume string `json:"v"`
QuoteVolume string `json:"q"`
}
57 changes: 57 additions & 0 deletions node/pkg/wfetcher/binance/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package binance

import (
"encoding/json"
"fmt"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"github.com/rs/zerolog/log"
)

func GetSubscription(names []string) Subscription {
streams := []Stream{}

for _, name := range names {
streams = append(streams, Stream(name+"@miniTicker"))
}

return Subscription{
Method: "SUBSCRIBE",
Params: streams,
Id: 1,
}
}
func MessageToTicker(msg map[string]any) (MiniTicker, error) {
var miniTicker MiniTicker
jsonData, err := json.Marshal(msg)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in json.Marshal")
return miniTicker, err
}

err = json.Unmarshal(jsonData, &miniTicker)
if err != nil {
log.Error().Str("Player", "Binance").Err(err).Msg("error in json.Unmarshal")
return miniTicker, err
}
return miniTicker, nil
}

func TickerToFeedData(miniTicker MiniTicker, feedMap map[string]int32) (*wfetcher.FeedData, error) {
feedData := new(wfetcher.FeedData)
timestamp := time.Unix(miniTicker.EventTime/1000, 0)
value, err := wfetcher.PriceStringToFloat64(miniTicker.Price)
if err != nil {
return feedData, err
}

id, exists := feedMap[miniTicker.Symbol]
if !exists {
return feedData, fmt.Errorf("feed not found")
}
feedData.FeedId = id
feedData.Value = value
feedData.Timestamp = timestamp
return feedData, nil
}
90 changes: 90 additions & 0 deletions node/pkg/wfetcher/coinone/coinone.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package coinone

import (
"context"
"fmt"
"strings"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"bisonai.com/orakl/node/pkg/wss"
"github.com/rs/zerolog/log"
)

type CoinoneFetcher wfetcher.Fetcher

// expected to recieve feedmap with key having format "<base>-<quote>"
func New(ctx context.Context, opts ...wfetcher.FetcherOption) (*CoinoneFetcher, error) {
config := &wfetcher.FetcherConfig{}
for _, opt := range opts {
opt(config)
}

fetcher := &CoinoneFetcher{}

if len(config.FeedMap) == 0 {
log.Error().Str("Player", "Coinone").Msg("no feed map")
return nil, fmt.Errorf("no feed map")
}
fetcher.FeedMap = config.FeedMap

subscriptions := []any{}
for feed := range config.FeedMap {
raw := strings.Split(feed, "-")
if len(raw) < 2 {
log.Error().Str("Player", "Coinone").Msg("invalid feed name")
return nil, fmt.Errorf("invalid feed name")
}
base := raw[0]
quote := raw[1]

subscriptions = append(subscriptions, Subscription{
RequestType: "SUBSCRIBE",
Channel: "TICKER",
Topic: Topic{
QuoteCurrency: quote,
TargetCurrency: base,
},
Format: "SHORT",
})
}
ws, err := wss.NewWebsocketHelper(ctx,
wss.WithEndpoint(URL),
wss.WithSubscriptions(subscriptions),
wss.WithProxyUrl(config.Proxy))
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in NewWebsocketHelper")
return nil, err
}
fetcher.Ws = ws

return fetcher, nil
}

func (c *CoinoneFetcher) handleMessage(ctx context.Context, message map[string]any) error {
raw, err := MessageToRawResponse(message)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in MessageToRawResponse")
return err
}

if raw.ResponseType != "DATA" {
return nil
}
feedData, err := DataToFeedData(raw.Data, c.FeedMap)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in DataToFeedData")
return err
}

err = wfetcher.StoreFeed(ctx, &feedData, 2*time.Second)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in StoreFeed")
return err
}
return nil
}

func (c *CoinoneFetcher) Run(ctx context.Context) {
c.Ws.Run(ctx, c.handleMessage)
}
47 changes: 47 additions & 0 deletions node/pkg/wfetcher/coinone/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package coinone

const (
URL = "wss://stream.coinone.co.kr"
)

type Topic struct {
QuoteCurrency string `json:"quote_currency"`
TargetCurrency string `json:"target_currency"`
}

type Subscription struct {
RequestType string `json:"request_type"`
Channel string `json:"channel"`
Topic Topic `json:"topic"`
Format string `json:"format"`
}

type Data struct {
QuoteCurrency string `json:"qc"`
TargetCurrency string `json:"tc"`
Timestamp int64 `json:"t"`
QuoteVolume string `json:"qv"`
TargetVolume string `json:"tv"`
First string `json:"fi"`
Low string `json:"lo"`
High string `json:"hi"`
Last string `json:"la"`
VolumePower string `json:"vp"`
AskBestPrice string `json:"abp"`
AskBestQty string `json:"abq"`
BidBestPrice string `json:"bbp"`
BidBestQty string `json:"bbq"`
ID string `json:"i"`
YesterdayFirst string `json:"yfi"`
YesterdayLow string `json:"ylo"`
YesterdayHigh string `json:"yhi"`
YesterdayLast string `json:"yla"`
YesterdayQuoteVolume string `json:"yqv"`
YesterdayTargetVolume string `json:"ytv"`
}

type Raw struct {
ResponseType string `json:"r"`
Channel string `json:"c"`
Data Data `json:"d"`
}
47 changes: 47 additions & 0 deletions node/pkg/wfetcher/coinone/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package coinone

import (
"encoding/json"
"fmt"
"strings"
"time"

"bisonai.com/orakl/node/pkg/wfetcher"
"github.com/rs/zerolog/log"
)

func MessageToRawResponse(msg map[string]any) (Raw, error) {
var rawResponse Raw
jsonData, err := json.Marshal(msg)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in json.Marshal")
return rawResponse, err
}

err = json.Unmarshal(jsonData, &rawResponse)
if err != nil {
log.Error().Str("Player", "Coinone").Err(err).Msg("error in json.Unmarshal")
return rawResponse, err
}
return rawResponse, nil
}

func DataToFeedData(data Data, feedMap map[string]int32) (wfetcher.FeedData, error) {
feedData := wfetcher.FeedData{}

timestamp := time.Unix(data.Timestamp/1000, 0)
value, err := wfetcher.PriceStringToFloat64(data.Last)

if err != nil {
return feedData, err
}

id, exists := feedMap[strings.ToUpper(data.TargetCurrency)+"-"+strings.ToUpper(data.QuoteCurrency)]
if !exists {
return feedData, fmt.Errorf("feed not found")
}
feedData.FeedId = id
feedData.Value = value
feedData.Timestamp = timestamp
return feedData, nil
}
Loading

0 comments on commit a761f3a

Please sign in to comment.