From cad4e2181a57012d3d50bc4a45e4b7ffab55cb99 Mon Sep 17 00:00:00 2001 From: samuael <39623015+samuael@users.noreply.github.com> Date: Sat, 28 Dec 2024 23:11:49 +0300 Subject: [PATCH] added public futures websocket push data handling --- exchanges/poloniex/poloniex.go | 29 +- exchanges/poloniex/poloniex_futures_types.go | 60 ++++ .../poloniex/poloniex_futures_websocket.go | 316 ++++++++++++++++-- exchanges/poloniex/poloniex_types.go | 1 + exchanges/poloniex/poloniex_websocket.go | 2 +- 5 files changed, 350 insertions(+), 58 deletions(-) diff --git a/exchanges/poloniex/poloniex.go b/exchanges/poloniex/poloniex.go index 2220af340b1..0834e1009e9 100644 --- a/exchanges/poloniex/poloniex.go +++ b/exchanges/poloniex/poloniex.go @@ -9,6 +9,7 @@ import ( "net/http" "net/url" "strconv" + "strings" "time" "github.com/thrasher-corp/gocryptotrader/common" @@ -156,7 +157,7 @@ func (p *Poloniex) GetCandlesticks(ctx context.Context, symbol currency.Pair, in if symbol.IsEmpty() { return nil, currency.ErrCurrencyPairEmpty } - intervalString, err := intervalToString(interval) + intervalString, err := IntervalString(interval) if err != nil { return nil, err } else if intervalString == "" { @@ -505,30 +506,6 @@ func (p *Poloniex) NewCurrencyDepositAddress(ctx context.Context, ccy currency.C return resp.Address, p.SendAuthenticatedHTTPRequest(ctx, exchange.RestSpot, authResourceIntensiveEPL, http.MethodPost, "/wallets/address", nil, map[string]string{"currency": ccy.String()}, &resp) } -func intervalToString(interval kline.Interval) (string, error) { - intervalMap := map[kline.Interval]string{ - kline.OneMin: "MINUTE_1", - kline.FiveMin: "MINUTE_5", - kline.TenMin: "MINUTE_10", - kline.FifteenMin: "MINUTE_15", - kline.ThirtyMin: "MINUTE_30", - kline.OneHour: "HOUR_1", - kline.TwoHour: "HOUR_2", - kline.FourHour: "HOUR_4", - kline.SixHour: "HOUR_6", - kline.TwelveHour: "HOUR_12", - kline.OneDay: "DAY_1", - kline.ThreeDay: "DAY_3", - kline.SevenDay: "WEEK_1", - kline.OneMonth: "MONTH_1", - } - intervalString, okay := intervalMap[interval] - if okay { - return intervalString, nil - } - return "", kline.ErrUnsupportedInterval -} - func stringToInterval(interval string) (kline.Interval, error) { intervalMap := map[string]kline.Interval{ "MINUTE_1": kline.OneMin, @@ -546,7 +523,7 @@ func stringToInterval(interval string) (kline.Interval, error) { "WEEK_1": kline.SevenDay, "MONTH_1": kline.OneMonth, } - intervalInstance, okay := intervalMap[interval] + intervalInstance, okay := intervalMap[strings.ToUpper(interval)] if okay { return intervalInstance, nil } diff --git a/exchanges/poloniex/poloniex_futures_types.go b/exchanges/poloniex/poloniex_futures_types.go index 852275d0b4e..03e64dfdf6a 100644 --- a/exchanges/poloniex/poloniex_futures_types.go +++ b/exchanges/poloniex/poloniex_futures_types.go @@ -211,6 +211,10 @@ type FuturesV3Orderbook struct { Bids [][]types.Number `json:"bids"` Depth types.Number `json:"s"` Timestamp types.Time `json:"ts"` + + ID types.Number `json:"id"` + Symbol string `json:"symbol"` + CreationTime types.Time `json:"cT"` } // V3FuturesCandle represents a kline data for v3 futures instrument @@ -270,11 +274,13 @@ type V3FuturesTickerDetail struct { BestAskPrice types.Number `json:"aPx"` BestAskSize types.Number `json:"aSz"` MarkPrice types.Number `json:"mPx"` + Timestamp types.Time `json:"ts"` } // InstrumentIndexPrice represents a symbols index price type InstrumentIndexPrice struct { Symbol string `json:"symbol"` + Timestamp types.Time `json:"ts"` IndexPrice types.Number `json:"iPx"` } @@ -310,6 +316,7 @@ func (v *V3FuturesIndexPriceData) UnmarshalJSON(data []byte) error { type V3FuturesMarkPrice struct { MarkPrice types.Number `json:"mPx"` Symbol string `json:"symbol"` + Timestamp types.Time `json:"ts"` } // V3FuturesMarkPriceCandle represents a k-line data for mark price @@ -364,6 +371,7 @@ type V3FuturesFundingRate struct { FundingRateSettleTime types.Time `json:"fT"` NextPredictedFundingRate types.Number `json:"nFR"` NextFundingTime types.Time `json:"nFT"` + Timestamp types.Time `json:"ts"` } // OpenInterestData represents an open interest data @@ -384,3 +392,55 @@ type RiskLimit struct { NotionalCap types.Number `json:"notionalCap"` Symbol string `json:"symbol"` } + +// WsFuturesCandlesctick represents a kline data for futures instrument +type WsFuturesCandlesctick struct { + Symbol string + LowestPrice types.Number + HighestPrice types.Number + OpenPrice types.Number + ClosePrice types.Number + Amount types.Number + Quantity types.Number + Trades types.Number + StartTime types.Time + EndTime types.Time + PushTime types.Time +} + +// UnmarshalJSON deserializes byte data into futures candlesticks into *WsFuturesCandlesctick +func (o *WsFuturesCandlesctick) UnmarshalJSON(data []byte) error { + target := [11]any{&o.Symbol, &o.LowestPrice, &o.HighestPrice, &o.OpenPrice, &o.ClosePrice, &o.Amount, &o.Quantity, &o.Trades, &o.StartTime, &o.EndTime, &o.PushTime} + return json.Unmarshal(data, &target) +} + +// FuturesTrades represents a futures trades detail +type FuturesTrades struct { + ID int `json:"id"` + Timestamp types.Time `json:"ts"` + Symbol string `json:"s"` + Price types.Number `json:"px"` + Quantity types.Number `json:"qty"` + Amount types.Number `json:"amt"` + Side string `json:"side"` + CreationTime types.Time `json:"cT"` +} + +// V3WsFuturesMarkAndIndexPriceCandle represents a websocket k-line data for mark/index candlestick data +type V3WsFuturesMarkAndIndexPriceCandle struct { + OpeningPrice types.Number + HighestPrice types.Number + LowestPrice types.Number + ClosingPrice types.Number + StartTime types.Time + EndTime types.Time + + Symbol string + PushTimestamp types.Time +} + +// UnmarshalJSON deserializes byte data into V3WsFuturesMarkAndIndexPriceCandle instance +func (v *V3WsFuturesMarkAndIndexPriceCandle) UnmarshalJSON(data []byte) error { + target := [8]any{&v.Symbol, &v.LowestPrice, &v.HighestPrice, &v.OpeningPrice, &v.ClosingPrice, &v.StartTime, &v.EndTime, &v.PushTimestamp} + return json.Unmarshal(data, &target) +} diff --git a/exchanges/poloniex/poloniex_futures_websocket.go b/exchanges/poloniex/poloniex_futures_websocket.go index 746ae8faad5..04f78369214 100644 --- a/exchanges/poloniex/poloniex_futures_websocket.go +++ b/exchanges/poloniex/poloniex_futures_websocket.go @@ -6,14 +6,21 @@ import ( "fmt" "net/http" "strconv" + "strings" "time" "github.com/gorilla/websocket" "github.com/thrasher-corp/gocryptotrader/common/crypto" + "github.com/thrasher-corp/gocryptotrader/currency" "github.com/thrasher-corp/gocryptotrader/exchanges/asset" + "github.com/thrasher-corp/gocryptotrader/exchanges/kline" + "github.com/thrasher-corp/gocryptotrader/exchanges/order" + "github.com/thrasher-corp/gocryptotrader/exchanges/orderbook" "github.com/thrasher-corp/gocryptotrader/exchanges/request" "github.com/thrasher-corp/gocryptotrader/exchanges/stream" "github.com/thrasher-corp/gocryptotrader/exchanges/subscription" + "github.com/thrasher-corp/gocryptotrader/exchanges/ticker" + "github.com/thrasher-corp/gocryptotrader/exchanges/trade" ) const ( @@ -22,17 +29,14 @@ const ( ) const ( - cnlFuturesSymbol = "symbol" - cnlFuturesOrderbookLvl2 = "book_lv2" - cnlFuturesOrderbook = "book" - cnlFuturesCandles = "candles" - cnlFuturesTickers = "tickers" - cnlFuturesTrades = "trades" - cnlFuturesIndexPrice = "index_price" - cnlFuturesMarkPrice = "mark_price" - cnlFuturesMarkPriceCandles = "mark_price_candles" - cnlFuturesIndexCandles = "index_candles" - cnlFuturesFundingRate = "funding_rate" + cnlFuturesSymbol = "symbol" + cnlFuturesOrderbookLvl2 = "book_lv2" + cnlFuturesOrderbook = "book" + cnlFuturesTickers = "tickers" + cnlFuturesTrades = "trades" + cnlFuturesIndexPrice = "index_price" + cnlFuturesMarkPrice = "mark_price" + cnlFuturesFundingRate = "funding_rate" cnlFuturesPrivatePositions = "positions" cnlFuturesPrivateOrders = "orders" @@ -40,18 +44,37 @@ const ( cnlFuturesAccount = "account" ) +const ( + candles1Min, candles5Min, candles10Min, candles15Min, candles30Min, candles1Hr, candles2Hr, + candles4Hr, candles6Hr, candles12Hr, candles1Day, candles3Day, candles1Week, candles1Month = "candles_minute_1", "candles_minute_5", "candles_minute_10", "candles_minute_15", "candles_minute_30", "candles_hour_1", + "candles_hour_2", "candles_hour_4", "candles_hour_6", + "candles_hour_12", "candles_day_1", "candles_day_3", "candles_week_1", "candles_month_1" + + markCandles1Min, markCandles5Min, markCandles10Min, markCandles15Min, + markCandles30Min, markCandles1Hr, markCandles2Hr, markCandles4Hr, markCandles12Hr, markCandles1Day, markCandles3Day, markCandles1Week = "mark_price_candles_minute_1", "mark_price_candles_minute_5", "mark_price_candles_minute_10", "mark_price_candles_minute_15", + "mark_candles_minute_30", "mark_candles_hour_1", "mark_candles_hour_2", "mark_candles_hour_4", "mark_candles_hour_12", + "mark_candles_day_1", "mark_candles_day_3", "mark_candles_week_1" + + indexCandles1Min, indexCandles5Min, indexCandles10Min, indexCandles15Min, indexCandles30Min, indexCandles1Hr, indexCandles2Hr, indexCandles4Hr, indexCandles12Hr, indexCandles1Day, indexCandles3Day, indexCandles1Week = "index_candles_minute_1", + "index_candles_minute_5", "index_candles_minute_10", "index_candles_minute_15", "index_candles_minute_30", "index_candles_hour_1", "index_candles_hour_2", "index_candles_hour_4", + "index_candles_hour_12", "index_candles_day_1", "index_candles_day_3", "index_candles_week_1" +) + var defaultFuturesChannels = []string{ cnlFuturesTickers, cnlFuturesOrderbook, - cnlFuturesCandles, + candles15Min, } +var onceFuturesOrderbook map[string]bool + // WsFuturesConnect establishes a websocket connection to the futures websocket server. func (p *Poloniex) WsFuturesConnect() error { if !p.Websocket.IsEnabled() || !p.IsEnabled() { return stream.ErrWebsocketNotEnabled } var dialer websocket.Dialer + onceFuturesOrderbook = make(map[string]bool) err := p.Websocket.SetWebsocketURL(futuresWebsocketPublicURL, false, false) if err != nil { return err @@ -158,27 +181,40 @@ func (p *Poloniex) wsFuturesHandleData(respRaw []byte) error { } return nil case cnlFuturesSymbol: - return nil - case cnlFuturesOrderbookLvl2: - return nil - case cnlFuturesOrderbook: - return nil - case cnlFuturesCandles: - return nil + var resp []ProductInfo + return p.processData(result.Data, &resp) + case cnlFuturesOrderbookLvl2, + cnlFuturesOrderbook: + return p.processFuturesOrderbook(result.Data, result.Action) + case candles1Min, candles5Min, candles10Min, candles15Min, candles30Min, candles1Hr, candles2Hr, candles4Hr, + candles6Hr, candles12Hr, candles1Day, candles3Day, candles1Week, candles1Month: + interval, err := stringToInterval(strings.Join(strings.Split(result.Channel, "_")[1:], "_")) + if err != nil { + return err + } + return p.processFuturesCandlesticks(result.Data, interval) case cnlFuturesTickers: - return nil + return p.processFuturesTickers(result.Data) case cnlFuturesTrades: - return nil + return p.processFuturesTrades(result.Data) case cnlFuturesIndexPrice: - return nil + var resp []InstrumentIndexPrice + return p.processData(result.Data, &resp) case cnlFuturesMarkPrice: - return nil - case cnlFuturesMarkPriceCandles: - return nil - case cnlFuturesIndexCandles: - return nil + var resp []V3FuturesMarkPrice + return p.processData(result.Data, &resp) + case markCandles1Min, markCandles5Min, markCandles10Min, markCandles15Min, + markCandles30Min, markCandles1Hr, markCandles2Hr, markCandles4Hr, markCandles12Hr, markCandles1Day, markCandles3Day, markCandles1Week, + // Index Candlestick channels + indexCandles1Min, indexCandles5Min, indexCandles10Min, indexCandles15Min, indexCandles30Min, + indexCandles1Hr, indexCandles2Hr, indexCandles4Hr, indexCandles12Hr, indexCandles1Day, indexCandles3Day, indexCandles1Week: + interval, err := stringToInterval(strings.Join(strings.Split(result.Channel, "_")[1:], "_")) + if err != nil { + return err + } + return p.processFuturesMarkAndIndexPriceCandlesticks(result.Data, interval) case cnlFuturesFundingRate: - return nil + return p.processFuturesFundingRate(result.Data) case cnlFuturesPrivatePositions: return nil case cnlFuturesPrivateOrders: @@ -193,6 +229,226 @@ func (p *Poloniex) wsFuturesHandleData(respRaw []byte) error { } } +func (p *Poloniex) processFuturesFundingRate(data []byte) error { + var resp []V3FuturesFundingRate + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + + for a := range resp { + pair, err := currency.NewPairFromString(resp[a].Symbol) + if err != nil { + return err + } + p.Websocket.DataHandler <- stream.FundingData{ + Timestamp: resp[0].Timestamp.Time(), + CurrencyPair: pair, + AssetType: asset.Futures, + Exchange: p.Name, + Rate: resp[a].FundingRate.Float64(), + } + } + return nil +} + +func (p *Poloniex) processFuturesMarkAndIndexPriceCandlesticks(data []byte, interval kline.Interval) error { + var resp []V3WsFuturesMarkAndIndexPriceCandle + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + + candles := make([]stream.KlineData, len(resp)) + for a := range resp { + pair, err := currency.NewPairFromString(resp[a].Symbol) + if err != nil { + return err + } + candles[a] = stream.KlineData{ + Timestamp: resp[a].PushTimestamp.Time(), + Pair: pair, + AssetType: asset.Futures, + Exchange: p.Name, + StartTime: resp[a].StartTime.Time(), + CloseTime: resp[a].EndTime.Time(), + Interval: interval.String(), + OpenPrice: resp[a].OpeningPrice.Float64(), + ClosePrice: resp[a].ClosingPrice.Float64(), + HighPrice: resp[a].HighestPrice.Float64(), + LowPrice: resp[a].LowestPrice.Float64(), + } + } + p.Websocket.DataHandler <- candles + return nil +} + +func (p *Poloniex) processData(data []byte, respStruct interface{}) error { + err := json.Unmarshal(data, &respStruct) + if err != nil { + return err + } + p.Websocket.DataHandler <- respStruct + return nil +} + +func (p *Poloniex) processFuturesOrderbook(data []byte, action string) error { + var resp []FuturesV3Orderbook + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + for x := range resp { + pair, err := currency.NewPairFromString(resp[x].Symbol) + if err != nil { + return err + } + asks := make([]orderbook.Tranche, len(resp[x].Asks)) + for a := range resp[x].Asks { + asks[a].Price = resp[x].Asks[a][0].Float64() + asks[a].Amount = resp[x].Asks[a][1].Float64() + } + bids := make([]orderbook.Tranche, len(resp[x].Bids)) + for a := range resp[x].Bids { + bids[a].Price = resp[x].Bids[a][0].Float64() + bids[a].Amount = resp[x].Bids[a][1].Float64() + } + _, okay := onceFuturesOrderbook[resp[x].Symbol] + if !okay || action == "snapshot" { + if onceFuturesOrderbook == nil { + onceFuturesOrderbook = make(map[string]bool) + } + onceFuturesOrderbook[resp[x].Symbol] = true + err := p.Websocket.Orderbook.LoadSnapshot(&orderbook.Base{ + Bids: bids, + Asks: asks, + Exchange: p.Name, + Pair: pair, + Asset: asset.Futures, + LastUpdated: resp[x].CreationTime.Time(), + LastUpdateID: resp[x].ID.Int64(), + }) + if err != nil { + return err + } + continue + } + err = p.Websocket.Orderbook.Update(&orderbook.Update{ + UpdateID: resp[x].ID.Int64(), + UpdateTime: resp[x].CreationTime.Time(), + UpdatePushedAt: resp[x].Timestamp.Time(), + Asset: asset.Futures, + Action: orderbook.UpdateInsert, + Bids: bids, + Asks: asks, + Pair: pair, + }) + if err != nil { + return err + } + } + return nil +} + +func (p *Poloniex) processFuturesTickers(data []byte) error { + var resp []V3FuturesTickerDetail + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + tickerPrices := make([]ticker.Price, len(resp)) + for a := range resp { + pair, err := currency.NewPairFromString(resp[a].Symbol) + if err != nil { + return err + } + tickerPrices[a] = ticker.Price{ + High: resp[a].HighPrice.Float64(), + Low: resp[a].LowPrice.Float64(), + Bid: resp[a].BestBidPrice.Float64(), + BidSize: resp[a].BestBidSize.Float64(), + Ask: resp[a].BestAskPrice.Float64(), + AskSize: resp[a].BestAskSize.Float64(), + Volume: resp[a].Quantity.Float64(), + QuoteVolume: resp[a].Amount.Float64(), + // PriceATH + Open: resp[a].OpeningPrice.Float64(), + Close: resp[a].ClosingPrice.Float64(), + MarkPrice: resp[a].MarkPrice.Float64(), + Pair: pair, + ExchangeName: p.Name, + AssetType: asset.Futures, + LastUpdated: resp[a].Timestamp.Time(), + } + } + p.Websocket.DataHandler <- tickerPrices + return nil +} + +// processFuturesTrades handles latest trading data for this product, including the latest price, trading volume, trading direction, etc. +func (p *Poloniex) processFuturesTrades(data []byte) error { + var resp []FuturesTrades + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + trades := make([]trade.Data, len(resp)) + for t := range resp { + pair, err := currency.NewPairFromString(resp[t].Symbol) + if err != nil { + return err + } + oSide, err := order.StringToOrderSide(resp[t].Side) + if err != nil { + return err + } + trades[t] = trade.Data{ + TID: trades[t].TID, + Exchange: p.Name, + CurrencyPair: pair, + AssetType: asset.Futures, + Side: oSide, + Price: resp[t].Price.Float64(), + Amount: resp[t].Amount.Float64(), + Timestamp: resp[t].Timestamp.Time(), + } + } + p.Websocket.DataHandler <- trades + return nil +} + +func (p *Poloniex) processFuturesCandlesticks(data []byte, interval kline.Interval) error { + var resp []WsFuturesCandlesctick + err := json.Unmarshal(data, &resp) + if err != nil { + return err + } + + candles := make([]stream.KlineData, len(resp)) + for a := range resp { + pair, err := currency.NewPairFromString(resp[a].Symbol) + if err != nil { + return err + } + candles[a] = stream.KlineData{ + Timestamp: resp[a].PushTime.Time(), + Pair: pair, + AssetType: asset.Futures, + Exchange: p.Name, + StartTime: resp[a].StartTime.Time(), + CloseTime: resp[a].EndTime.Time(), + Interval: interval.String(), + OpenPrice: resp[a].OpenPrice.Float64(), + ClosePrice: resp[a].ClosePrice.Float64(), + HighPrice: resp[a].HighestPrice.Float64(), + LowPrice: resp[a].LowestPrice.Float64(), + Volume: resp[a].Amount.Float64(), + } + } + p.Websocket.DataHandler <- candles + return nil +} + // ------------------------------------------------------------------------------------------------ // GenerateFuturesDefaultSubscriptions adds default subscriptions to futures websockets. @@ -217,13 +473,11 @@ func (p *Poloniex) GenerateFuturesDefaultSubscriptions() (subscription.List, err case cnlFuturesSymbol, cnlFuturesOrderbookLvl2, cnlFuturesOrderbook, - cnlFuturesCandles, cnlFuturesTickers, cnlFuturesTrades, cnlFuturesIndexPrice, cnlFuturesMarkPrice, - cnlFuturesMarkPriceCandles, - cnlFuturesIndexCandles, + indexCandles1Min, indexCandles5Min, indexCandles10Min, indexCandles15Min, indexCandles30Min, indexCandles1Hr, indexCandles2Hr, indexCandles4Hr, indexCandles12Hr, indexCandles1Day, indexCandles3Day, indexCandles1Week, cnlFuturesFundingRate: subscriptions = append(subscriptions, &subscription.Subscription{ Channel: channels[i], diff --git a/exchanges/poloniex/poloniex_types.go b/exchanges/poloniex/poloniex_types.go index f474863b10c..542313bd5aa 100644 --- a/exchanges/poloniex/poloniex_types.go +++ b/exchanges/poloniex/poloniex_types.go @@ -1066,6 +1066,7 @@ type FuturesSubscriptionInput struct { type FuturesSubscriptionResp struct { Channel string `json:"channel"` Data json.RawMessage `json:"data"` + Action string `json:"action"` } // InstrumentMarkAndIndexPrice represents index and mark price information of an instrument. diff --git a/exchanges/poloniex/poloniex_websocket.go b/exchanges/poloniex/poloniex_websocket.go index a9db99850fe..9588b6c8308 100644 --- a/exchanges/poloniex/poloniex_websocket.go +++ b/exchanges/poloniex/poloniex_websocket.go @@ -557,7 +557,7 @@ func (p *Poloniex) handleSubscriptions(operation string, subscs subscription.Lis if !okay { interval = kline.FiveMin } - intervalString, err := intervalToString(interval) + intervalString, err := IntervalString(interval) if err != nil { return nil, err }