-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add data provider #69
Changes from 5 commits
9f4ffab
3cbe5cf
9ec15ef
4109b19
6427494
a43db35
da60e79
1c0526d
6d3b2fc
98df4ed
16f32c5
5ae8797
ca42582
a0fc5d1
428d118
ee266d0
6da3817
e1ace31
5363606
9db1452
7ffa341
6b87f30
7821db2
243b8e8
40e0753
bb45b4e
2e69c5e
9b4677e
e2a32bd
a80b409
867a8fa
bdc9f22
76eb6ce
3d89e56
c3e8fe7
a6a08bb
e66472b
45c3808
e2ec424
b59433c
03ba118
810ac35
c23be06
9ad0f41
1639c40
a2f70db
5220784
d48814a
dedb935
e105b0f
155d315
dd2db49
1c45644
77a2473
9731058
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package data_provider | ||
|
||
import ( | ||
"fmt" | ||
"time" | ||
|
||
"github.com/rs/zerolog" | ||
"github.com/rs/zerolog/pkgerrors" | ||
"github.com/spf13/cobra" | ||
) | ||
|
||
var DataProviderCmd = &cobra.Command{ | ||
Use: "start", | ||
Short: "Start a process to fetch prices from data sources", | ||
RunE: runDataProvider, | ||
} | ||
|
||
// required | ||
const ConfigFilePathFlag = "config-file-path" | ||
const WebsocketUrl = "ws-url" | ||
|
||
func init() { | ||
DataProviderCmd.Flags().StringP(ConfigFilePathFlag, "c", "", "the path of your config json file") | ||
DataProviderCmd.Flags().StringP(WebsocketUrl, "w", "", "the websocket url to write updates to") | ||
|
||
DataProviderCmd.MarkFlagRequired(ConfigFilePathFlag) | ||
} | ||
|
||
func runDataProvider(cmd *cobra.Command, args []string) error { | ||
configFilePath, _ := cmd.Flags().GetString(ConfigFilePathFlag) | ||
wsUrl, _ := cmd.Flags().GetString(WebsocketUrl) | ||
|
||
mainLogger := mainLogger() | ||
|
||
zerolog.TimeFieldFormat = time.RFC3339Nano | ||
zerolog.DurationFieldUnit = time.Nanosecond | ||
zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack | ||
|
||
mainLogger.Info().Msg("Starting data provider") | ||
|
||
config, err := loadConfig(configFilePath) | ||
if err != nil { | ||
return fmt.Errorf("error loading config: %v", err) | ||
} | ||
|
||
runner := NewDataProviderRunner(*config, wsUrl) | ||
runner.Run() | ||
|
||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package data_provider | ||
|
||
import ( | ||
"encoding/json" | ||
"fmt" | ||
"os" | ||
) | ||
|
||
func loadConfig(configPath string) (*DataProviderConfig, error) { | ||
configBytes, err := os.ReadFile(configPath) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to read config file: %v", err) | ||
} | ||
|
||
var config DataProviderConfig | ||
if err := json.Unmarshal(configBytes, &config); err != nil { | ||
return nil, fmt.Errorf("failed to unmarshal config file: %v", err) | ||
} | ||
return &config, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package data_provider | ||
|
||
type dataSource interface { | ||
// Add all value updates to updatesCh | ||
Run(updatesCh chan DataSourceUpdateMap) | ||
GetDataSourceId() DataSourceId | ||
} | ||
|
||
func buildDataSources(config DataProviderConfig) []dataSource { | ||
// group by data source id to support batched feeds | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. overkill for our current sources (right now every dataSource is 1:1 with a value) but we might want to support batching of feeds in the future |
||
sourceConfigsByDataSource := make(map[DataSourceId][]DataProviderSourceConfig) | ||
for _, sourceConfig := range config.Sources { | ||
dataSourceId := sourceConfig.DataSourceId | ||
if _, ok := sourceConfigsByDataSource[dataSourceId]; !ok { | ||
sourceConfigsByDataSource[dataSourceId] = make([]DataProviderSourceConfig, 0) | ||
} | ||
sourceConfigsByDataSource[dataSourceId] = append(sourceConfigsByDataSource[dataSourceId], sourceConfig) | ||
|
||
} | ||
|
||
// initialize data sources | ||
allDataSources := make([]dataSource, 0) | ||
for dataSourceId, sourceConfigs := range sourceConfigsByDataSource { | ||
dataSourceBuilder := GetDataSourceBuilder(dataSourceId) | ||
dataSources := dataSourceBuilder(sourceConfigs) | ||
|
||
allDataSources = append(allDataSources, dataSources...) | ||
} | ||
|
||
return allDataSources | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
package data_provider | ||
|
||
func GetDataSourceBuilder(dataSourceId DataSourceId) func([]DataProviderSourceConfig) []dataSource { | ||
switch dataSourceId { | ||
case UniswapV2DataSourceId: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This switch statement is the only shared file that needs to change when we write a new integration |
||
return getUniswapV2DataSources | ||
case RandomDataSourceId: | ||
return getRandomDataSource | ||
default: | ||
panic("unknown data source id " + dataSourceId) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
package data_provider | ||
|
||
import ( | ||
"github.com/rs/zerolog" | ||
"github.com/rs/zerolog/log" | ||
) | ||
|
||
func baseAppLogger() zerolog.Logger { | ||
return log.With().Str("application", "stork-data-provider").Logger() | ||
} | ||
|
||
func mainLogger() zerolog.Logger { | ||
return baseAppLogger().With().Str("service", "main").Logger() | ||
} | ||
|
||
func writerLogger() zerolog.Logger { | ||
return baseAppLogger().With().Str("service", "writer").Logger() | ||
} | ||
|
||
func dataSourceLogger(dataSourceId DataSourceId) zerolog.Logger { | ||
return baseAppLogger().With().Str("service", "data_source").Str("data_source_id", string(dataSourceId)).Logger() | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
package data_provider | ||
|
||
import ( | ||
"time" | ||
) | ||
|
||
type ( | ||
DataSourceId string | ||
ValueId string | ||
|
||
DataProviderSourceConfig struct { | ||
Id ValueId `json:"id"` | ||
DataSourceId DataSourceId `json:"dataSource"` | ||
Config any `json:"config"` | ||
} | ||
|
||
DataProviderConfig struct { | ||
Sources []DataProviderSourceConfig `json:"sources,omitempty"` | ||
} | ||
|
||
DataSourceValueUpdate struct { | ||
ValueId ValueId | ||
DataSourceId DataSourceId | ||
Timestamp time.Time | ||
Value float64 | ||
} | ||
|
||
DataSourceUpdateMap map[ValueId]DataSourceValueUpdate | ||
|
||
ValueUpdate struct { | ||
PublishTimestamp int64 `json:"t"` | ||
ValueId ValueId `json:"a"` | ||
Value string `json:"v"` | ||
} | ||
|
||
ValueUpdateWebsocketMessage struct { | ||
Type string `json:"type"` | ||
Data []ValueUpdate `json:"data"` | ||
} | ||
) |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a simpler example than uniswap. Still using the scheduled data source concept |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
package data_provider | ||
|
||
import ( | ||
"math/rand" | ||
"time" | ||
|
||
"github.com/mitchellh/mapstructure" | ||
) | ||
|
||
const RandomDataSourceId = "RANDOM_NUMBER" | ||
|
||
type randomConfig struct { | ||
UpdateFrequency string `json:"updateFrequency"` | ||
MinValue float64 `json:"minValue"` | ||
MaxValue float64 `json:"maxValue"` | ||
} | ||
|
||
type randomConnector struct { | ||
valueId ValueId | ||
config randomConfig | ||
updateFrequency time.Duration | ||
} | ||
|
||
func newRandomConnector(sourceConfig DataProviderSourceConfig) *randomConnector { | ||
var randomConfig randomConfig | ||
mapstructure.Decode(sourceConfig.Config, &randomConfig) | ||
|
||
updateFrequency, err := time.ParseDuration(randomConfig.UpdateFrequency) | ||
if err != nil { | ||
panic("unable to parse update frequency: " + randomConfig.UpdateFrequency) | ||
} | ||
|
||
return &randomConnector{ | ||
valueId: sourceConfig.Id, | ||
config: randomConfig, | ||
updateFrequency: updateFrequency, | ||
} | ||
} | ||
|
||
func (r *randomConnector) GetUpdate() (DataSourceUpdateMap, error) { | ||
randValue := r.config.MinValue + rand.Float64()*(r.config.MaxValue-r.config.MinValue) | ||
|
||
updateMap := DataSourceUpdateMap{ | ||
r.valueId: DataSourceValueUpdate{ | ||
ValueId: r.valueId, | ||
DataSourceId: r.GetDataSourceId(), | ||
Timestamp: time.Now(), | ||
Value: randValue, | ||
}, | ||
} | ||
|
||
return updateMap, nil | ||
} | ||
|
||
func (r *randomConnector) GetUpdateFrequency() time.Duration { | ||
return r.updateFrequency | ||
} | ||
|
||
func (r *randomConnector) GetDataSourceId() DataSourceId { | ||
return RandomDataSourceId | ||
} | ||
|
||
func getRandomDataSource(sourceConfigs []DataProviderSourceConfig) []dataSource { | ||
dataSources := make([]dataSource, 0) | ||
for _, sourceConfig := range sourceConfigs { | ||
connector := newRandomConnector(sourceConfig) | ||
dataSource := newScheduledDataSource(connector) | ||
dataSources = append(dataSources, dataSource) | ||
} | ||
return dataSources | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be configured in the config json if we wanted, but it felt like more of a run time configuration (I might want to test without a websocket url at first to just look at the prices)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of making this a bit more generic - something like
-o
where the output could eventually take the form ofwhich could be denoted by ws(s)://, http(s)://, file://, s3:// etc
Only ws should need to be supported right now though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense - added a
Writer
interface and aGetWriter
function where we can do some branching in the future. Currently it will fail if the--output-address
isn't either blank or prefixed withws://