Skip to content

Commit

Permalink
feat: support orangex
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed Oct 11, 2024
1 parent de30fec commit efc8e4e
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 3 deletions.
2 changes: 2 additions & 0 deletions node/pkg/websocketfetcher/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"bisonai.com/miko/node/pkg/websocketfetcher/providers/lbank"
"bisonai.com/miko/node/pkg/websocketfetcher/providers/mexc"
"bisonai.com/miko/node/pkg/websocketfetcher/providers/okx"
"bisonai.com/miko/node/pkg/websocketfetcher/providers/orangex"
"bisonai.com/miko/node/pkg/websocketfetcher/providers/uniswap"
"bisonai.com/miko/node/pkg/websocketfetcher/providers/upbit"
"bisonai.com/miko/node/pkg/websocketfetcher/providers/xt"
Expand Down Expand Up @@ -140,6 +141,7 @@ func (a *App) Init(ctx context.Context, opts ...AppOption) error {
"bitmart": bitmart.New,
"xt": xt.New,
"gopax": gopax.New,
"orangex": orangex.New,
}

dexFactories := map[string]func(...common.DexFetcherOption) common.FetcherInterface{
Expand Down
155 changes: 155 additions & 0 deletions node/pkg/websocketfetcher/providers/orangex/orangex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package orangex

import (
"context"
"encoding/json"
"strconv"
"time"

"bisonai.com/miko/node/pkg/websocketfetcher/common"
"bisonai.com/miko/node/pkg/wss"
"github.com/rs/zerolog/log"
)

const URL = "wss://api.orangex.com/ws/api/v1"

type OrangeXFetcher common.Fetcher

type Params struct {
Channels []string `json:"channels"`
}

type Subscription struct {
JsonRPC string `json:"jsonrpc"`
Method string `json:"method"`
Params Params `json:"params"`
ID int `json:"id"`
}

type TickerResponse struct {
Params struct {
Data struct {
Timestamp string `json:"timestamp"`
Stats struct {
Volume string `json:"volume"`
PriceChange string `json:"price_change"`
Low string `json:"low"`
Turnover string `json:"turnover"`
High string `json:"high"`
} `json:"stats"`
State string `json:"state"`
LastPrice string `json:"last_price"`
InstrumentName string `json:"instrument_name"`
MarkPrice string `json:"mark_price"`
BestBidPrice string `json:"best_bid_price"`
BestBidAmount string `json:"best_bid_amount"`
BestAskPrice string `json:"best_ask_price"`
BestAskAmount string `json:"best_ask_amount"`
} `json:"data"`
Channel string `json:"channel"`
} `json:"params"`
Method string `json:"method"`
JSONRPC string `json:"jsonrpc"`
}

func New(ctx context.Context, opts ...common.FetcherOption) (common.FetcherInterface, error) {
config := &common.FetcherConfig{}
for _, opt := range opts {
opt(config)
}

fetcher := &OrangeXFetcher{}
fetcher.FeedMap = config.FeedMaps.Separated
fetcher.FeedDataBuffer = config.FeedDataBuffer

channels := []string{}
for feed := range fetcher.FeedMap {
channels = append(channels, "ticker."+feed+".raw")
}

params := Params{Channels: channels}

subscription := Subscription{
JsonRPC: "2.0",
Method: "/public/subscribe",
Params: params,
ID: 1,
}

log.Debug().Any("subscription", subscription).Msg("subscription generated")

// since wsjson.Write didn't for orangex, had to pass marshalled byte instead
raw, err := json.Marshal(subscription)
if err != nil {
log.Error().Str("Player", "OrangeX").Err(err).Msg("error in orangex.New")
return nil, err
}

ws, err := wss.NewWebsocketHelper(ctx,
wss.WithEndpoint(URL),
wss.WithSubscriptions([]any{raw}),
wss.WithProxyUrl(config.Proxy))
if err != nil {
log.Error().Str("Player", "OrangeX").Err(err).Msg("error in orangex.New")
return nil, err
}
fetcher.Ws = ws
return fetcher, nil
}

func (f *OrangeXFetcher) handleMessage(ctx context.Context, message map[string]any) error {
raw, err := common.MessageToStruct[TickerResponse](message)
if err != nil {
log.Error().Str("Player", "OrangeX").Err(err).Msg("error in orangex.handleMessage")
return err
}

if raw.Params.Data.InstrumentName == "" {
return nil
}

feedData, err := TickerResponseToFeedData(raw, f.FeedMap)
if err != nil {
log.Error().Str("Player", "OrangeX").Err(err).Msg("error in orangex.handleMessage")
return err
}
f.FeedDataBuffer <- feedData
return nil
}

func (f *OrangeXFetcher) Run(ctx context.Context) {
f.Ws.Run(ctx, f.handleMessage)
}

func TickerResponseToFeedData(data TickerResponse, feedMap map[string]int32) (*common.FeedData, error) {
id, exists := feedMap[data.Params.Data.InstrumentName]
if !exists {
log.Warn().Str("Player", "OrangeX").Any("data", data).Str("key", data.Params.Data.InstrumentName).Msg("feed not found")
return nil, nil
}

feedData := new(common.FeedData)
value, err := common.PriceStringToFloat64(data.Params.Data.LastPrice)
if err != nil {
log.Error().Str("Player", "OrangeX").Err(err).Msg("error in PriceStringToFloat64")
return nil, err
}

intTimestamp, err := strconv.ParseInt(data.Params.Data.Timestamp, 10, 64)
if err != nil {
log.Error().Str("Player", "OrangeX").Err(err).Msg("error in strconv.ParseInt")
return nil, err
}
timestamp := time.UnixMilli(intTimestamp)
volume, err := common.VolumeStringToFloat64(data.Params.Data.Stats.Volume)
if err != nil {
log.Error().Str("Player", "OrangeX").Err(err).Msg("error in VolumeStringToFloat64")
return nil, err
}

feedData.FeedID = id
feedData.Value = value
feedData.Timestamp = &timestamp
feedData.Volume = volume
return feedData, nil
}
16 changes: 13 additions & 3 deletions node/pkg/wss/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,15 @@ func (ws *WebsocketHelper) dialAndSubscribe(ctx context.Context) error {

subscribeJob := func() error {
for _, subscription := range ws.Subscriptions {
err := ws.Write(ctx, subscription)
if err != nil {
return err
switch casted := subscription.(type) {
case []byte:
if err := ws.RawWrite(ctx, string(casted)); err != nil {
return err
}
default:
if err := ws.Write(ctx, casted); err != nil {
return err
}
}
}
return nil
Expand Down Expand Up @@ -165,6 +171,10 @@ func (ws *WebsocketHelper) Write(ctx context.Context, message interface{}) error
return nil
}

func (ws *WebsocketHelper) RawByteWrite(ctx context.Context, message []byte) error {
return ws.Conn.Write(ctx, websocket.MessageBinary, message)
}

func (ws *WebsocketHelper) RawWrite(ctx context.Context, message string) error {
return ws.Conn.Write(ctx, websocket.MessageText, []byte(message))
}
Expand Down

0 comments on commit efc8e4e

Please sign in to comment.