Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nick-bisonai committed May 27, 2024
1 parent cf19039 commit 376acbb
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 0 deletions.
2 changes: 2 additions & 0 deletions node/pkg/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (f *Fetcher) fetch(chainHelpers map[string]ChainHelper, proxies []Proxy) ([
errChan <- fetchErr
return
}
case *definition.Type == "websocket":
return
default:
errChan <- errorSentinel.ErrFetcherInvalidType
}
Expand Down
54 changes: 54 additions & 0 deletions node/pkg/wfetcher/binance/binance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package binance

import (
"context"
"encoding/json"
"fmt"

"bisonai.com/orakl/node/pkg/wss"
)

func Test() error {
ctx := context.Background()
conn, err := wss.NewConnection(ctx, wss.WithEndpoint(URL))
if err != nil {
return err
}

sub := subscription{
Method: "SUBSCRIBE",
Params: []Stream{
"ethusdt@miniTicker",
},
Id: 1,
}

err = conn.Write(ctx, sub)
if err != nil {
return err
}

ch := conn.ReadV2(ctx)
for {

Check failure on line 32 in node/pkg/wfetcher/binance/binance.go

View workflow job for this annotation

GitHub Actions / core-build

S1000: should use for range instead of for { select {} } (gosimple)
select {
case msg := <-ch:
rawData, ok := msg.(map[string]interface{})
if !ok {
return fmt.Errorf("received data is not a map[string]interface{}")
}
jsonData, err := json.Marshal(rawData)
if err != nil {
return err
}

var miniTicker MiniTicker
err = json.Unmarshal(jsonData, &miniTicker)
if err != nil {
return err
}

fmt.Println(miniTicker)

}
}
}
34 changes: 34 additions & 0 deletions node/pkg/wfetcher/binance/type.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package binance

import "fmt"

const (
URL = "wss://stream.binance.com:443/ws"
)

type Stream string

type MiniTicker struct {
EventType string `json:"e"`
EventTime int64 `json:"E"`
Symbol string `json:"s"`
Price string `json:"c"`
OpenPrice string `json:"o"`
HighPrice string `json:"h"`
LowPrice string `json:"l"`
Volume string `json:"v"`
QuoteVolume string `json:"q"`
}

func (m MiniTicker) String() string {
return fmt.Sprintf(
"EventType: %s, EventTime: %d, Symbol: %s, Price: %s, OpenPrice: %s, HighPrice: %s, LowPrice: %s, Volume: %s, QuoteVolume: %s\n",
m.EventType, m.EventTime, m.Symbol, m.Price, m.OpenPrice, m.HighPrice, m.LowPrice, m.Volume, m.QuoteVolume,
)
}

type subscription struct {
Method string `json:"method"`
Params []Stream `json:"params"`
Id uint32 `json:"id"`
}
17 changes: 17 additions & 0 deletions node/pkg/wss/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,23 @@ func (ws *wsConn) Read(ctx context.Context, ch chan interface{}) {
}
}

func (ws *wsConn) ReadV2(ctx context.Context) chan interface{} {
ch := make(chan interface{})
go func() {
defer close(ch)
for {
var t interface{}
err := wsjson.Read(ctx, ws.Conn, &t)
if err != nil {
log.Error().Err(err).Msg("error reading from websocket")
break
}
ch <- t
}
}()
return ch
}

func (ws *wsConn) Close() error {
err := ws.Conn.Close(websocket.StatusNormalClosure, "")
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions node/script/test_websocket/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package main

import (
"sync"

"bisonai.com/orakl/node/pkg/wfetcher/binance"
)

func main() {
var wg sync.WaitGroup
wg.Add(1)
binance.Test()

Check failure on line 12 in node/script/test_websocket/main.go

View workflow job for this annotation

GitHub Actions / core-build

Error return value of `binance.Test` is not checked (errcheck)
wg.Wait()

}

0 comments on commit 376acbb

Please sign in to comment.